Reorganizing kvstore sources and switching build system to CMake.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 15 Feb 2010 23:50:09 +0000 (15:50 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 15 Feb 2010 23:50:09 +0000 (15:50 -0800)
27 files changed:
kvstore/CMakeLists.txt [new file with mode: 0644]
kvstore/backend.cc
kvstore/debian/README.Debian [deleted file]
kvstore/debian/README.source [deleted file]
kvstore/debian/changelog [deleted file]
kvstore/debian/compat [deleted file]
kvstore/debian/control [deleted file]
kvstore/debian/copyright [deleted file]
kvstore/debian/docs [deleted file]
kvstore/debian/install [deleted file]
kvstore/debian/rules [deleted file]
kvstore/protobufrpc.cc [new file with mode: 0644]
kvstore/protobufrpc.h [new file with mode: 0644]
kvstore/protobufrpc/SConscript [deleted file]
kvstore/protobufrpc/SConstruct [deleted file]
kvstore/protobufrpc/protobufrpc.cc [deleted file]
kvstore/protobufrpc/protobufrpc.h [deleted file]
kvstore/protobufrpc/socket_pool.cc [deleted file]
kvstore/protobufrpc/socket_pool.h [deleted file]
kvstore/protobufrpc/util.h [deleted file]
kvstore/protobufrpc/workqueue.cc [deleted file]
kvstore/protobufrpc/workqueue.h [deleted file]
kvstore/socket_pool.cc [new file with mode: 0644]
kvstore/socket_pool.h [new file with mode: 0644]
kvstore/util.h [new file with mode: 0644]
kvstore/workqueue.cc [new file with mode: 0644]
kvstore/workqueue.h [new file with mode: 0644]

diff --git a/kvstore/CMakeLists.txt b/kvstore/CMakeLists.txt
new file mode 100644 (file)
index 0000000..4d7edad
--- /dev/null
@@ -0,0 +1,15 @@
+cmake_minimum_required(VERSION 2.6)
+
+add_custom_command(OUTPUT kvstore.pb.cc kvstore.pb.h
+                   COMMAND protoc --cpp_out=. kvstore.proto
+                   DEPENDS kvstore.proto)
+
+add_library(protobufrpc protobufrpc.cc socket_pool.cc workqueue.cc)
+add_library(kvservice kvservice.cc kvstore.pb.cc backend.cc)
+add_library(kvclient kvclient.cc)
+add_executable(kvstore kvstore.cc)
+
+target_link_libraries(kvstore
+                      kvservice
+                      boost_thread-mt boost_regex-mt boost_system-mt
+                      boost_program_options-mt db protobuf protobufrpc pthread)
index d97c492..04a6764 100644 (file)
@@ -91,22 +91,22 @@ public:
 
         if (log_in_memory)
         {
-            res = _dbenv->set_flags(_dbenv, DB_LOG_INMEMORY, 1);
+            res = _dbenv->set_flags(_dbenv, DB_LOG_IN_MEMORY, 1);
 
             if (res != 0)
             {
                 cerr << db_strerror(res) << endl;
-                throw std::runtime_error("BDB ENV DB_LOG_INMEMORY");
+                throw std::runtime_error("BDB ENV DB_LOG_IN_MEMORY");
             }
 
         }
 
-        res = _dbenv->set_flags(_dbenv, DB_LOG_AUTOREMOVE, 1);
+        res = _dbenv->set_flags(_dbenv, DB_LOG_AUTO_REMOVE, 1);
 
         if (res != 0)
         {
             cerr << db_strerror(res) << endl;
-            throw std::runtime_error("BDB ENV DB_LOG_AUTOREMOVE");
+            throw std::runtime_error("BDB ENV DB_LOG_AUTO_REMOVE");
         }
 
         res = _dbenv->open(_dbenv,
diff --git a/kvstore/debian/README.Debian b/kvstore/debian/README.Debian
deleted file mode 100644 (file)
index 0cc3a58..0000000
+++ /dev/null
@@ -1,6 +0,0 @@
-bicker-fcgi for Debian
-----------------------
-
-<possible notes regarding this package - if none, delete this file>
-
- -- John McCullough <jcm@salud.ucsd.edu>  Tue, 29 Sep 2009 13:29:18 -0700
diff --git a/kvstore/debian/README.source b/kvstore/debian/README.source
deleted file mode 100644 (file)
index a7e4671..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-bicker-fcgi for Debian
-----------------------
-
-<this file describes information about the source package, see Debian policy
-manual section 4.14. You WILL either need to modify or delete this file>
-
-
-
-
diff --git a/kvstore/debian/changelog b/kvstore/debian/changelog
deleted file mode 100644 (file)
index be2e2de..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-kvstore (0.1-7) unstable; urgency=low
-
-  * Reduced memory copying in protobuf, support for multiple data
-    targets
-
- -- John McCullough <jcm@salud.ucsd.edu>  Thu, 21 Jan 2010 11:10:45 -0800
-
-kvstore (0.1-6) unstable; urgency=low
-
-  * Improved RPC Layer
-
- -- John McCullough <jcm@salud.ucsd.edu>  Thu, 24 Dec 2009 14:25:41 -0800
-
-kvstore (0.1-5) unstable; urgency=low
-
-  * KeyValue -> KVStore rename.
-  * Fixed Dependencies
-
- -- John McCullough <jcm@salud.ucsd.edu>  Sun, 29 Nov 2009 16:17:23 -0800
-
-kvstore (0.1-3) unstable; urgency=low
-
-  * More Packaging Fun
-
- -- John McCullough <jcm@salud.ucsd.edu>  Sun, 29 Nov 2009 15:34:41 -0800
-
-kvstore (0.1-2) unstable; urgency=low
-
-  * Incremented boost version to .0
-
- -- John McCullough <jcm@salud.ucsd.edu>  Sun, 29 Nov 2009 15:32:01 -0800
-
-kvstore (0.1-1) unstable; urgency=low
-
-  * Initial release (Closes: #nnnn)  <nnnn is the bug number of your ITP>
-
- -- John McCullough <jcm@salud.ucsd.edu>  Tue, 29 Sep 2009 13:29:18 -0700
diff --git a/kvstore/debian/compat b/kvstore/debian/compat
deleted file mode 100644 (file)
index 7f8f011..0000000
+++ /dev/null
@@ -1 +0,0 @@
-7
diff --git a/kvstore/debian/control b/kvstore/debian/control
deleted file mode 100644 (file)
index 748f726..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-Source: kvstore
-Section: unknown
-Priority: extra
-Maintainer: John McCullough <jcm@salud.ucsd.edu>
-Build-Depends: debhelper (>= 7), libboost-thread1.35-dev, libboost-system1.35-dev, libboost1.35-dev, libasio-dev, protobuf-compiler, libprotobuf-dev, libdb4.6-dev, libgtest-dev
-Standards-Version: 3.8.3,
-Homepage: <insert the upstream URL, if relevant>
-
-Package: kvstore
-Architecture: any
-Depends: ${misc:Depends}, libboost-thread1.35.0, libboost-regex1.35.0, libboost-program-options1.35.0, libboost-system1.35.0, libprotobuf4, libdb4.6, libgtest0
-Description: Homebrew key value store
- <insert long description, indented with spaces>
diff --git a/kvstore/debian/copyright b/kvstore/debian/copyright
deleted file mode 100644 (file)
index c356a24..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-This work was packaged for Debian by:
-
-    John McCullough <jcm@salud.ucsd.edu> on Tue, 29 Sep 2009 13:29:18 -0700
-
-It was downloaded from <url://example.com>
-
-Upstream Author(s):
-
-    <put author's name and email here>
-    <likewise for another author>
-
-Copyright:
-
-    <Copyright (C) YYYY Name OfAuthor>
-    <likewise for another author>
-
-License:
-
-    <Put the license of the package here indented by 4 spaces>
-
-The Debian packaging is:
-
-    Copyright (C) 2009 John McCullough <jcm@salud.ucsd.edu>
-
-# Please chose a license for your packaging work. If the program you package
-# uses a mainstream license, using the same license is the safest choice.
-# Please avoid to pick license terms that are more restrictive than the
-# packaged work, as it may make Debian's contributions unacceptable upstream.
-# If you just want it to be GPL version 3, leave the following lines in.
-
-and is licensed under the GPL version 3, 
-see `/usr/share/common-licenses/GPL-3'.
-
-# Please also look if there are files or directories which have a
-# different copyright/license attached and list them here.
diff --git a/kvstore/debian/docs b/kvstore/debian/docs
deleted file mode 100644 (file)
index e845566..0000000
+++ /dev/null
@@ -1 +0,0 @@
-README
diff --git a/kvstore/debian/install b/kvstore/debian/install
deleted file mode 100644 (file)
index c110135..0000000
+++ /dev/null
@@ -1 +0,0 @@
-usr/bin/kvstore
diff --git a/kvstore/debian/rules b/kvstore/debian/rules
deleted file mode 100755 (executable)
index a47c8e4..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-#!/usr/bin/make -f
-# -*- makefile -*-
-# Sample debian/rules that uses debhelper.
-# This file was originally written by Joey Hess and Craig Small.
-# As a special exception, when this file is copied by dh-make into a
-# dh-make output file, you may use that output file without restriction.
-# This special exception was added by Craig Small in version 0.37 of dh-make.
-
-# Uncomment this to turn on verbose mode.
-#export DH_VERBOSE=1
-build: 
-       dh_testdir
-       scons
-
-install: build
-       dh_testdir
-       dh_testroot
-       dh_prep
-       dh_installdirs
-       scons --prefix=$(CURDIR)/debian/tmp install
-
-clean:
-       scons -c
-       dh_clean
-
-binary-arch: install
-       dh_testdir
-       dh_testroot
-       dh_installchangelogs
-       dh_install
-       dh_link
-       #dh_strip
-       dh_compress
-       dh_fixperms
-       dh_installdeb
-       dh_gencontrol
-       dh_md5sums
-       dh_builddeb
-
-binary: binary-arch
-
-%:
-       dh  $@
diff --git a/kvstore/protobufrpc.cc b/kvstore/protobufrpc.cc
new file mode 100644 (file)
index 0000000..06226f7
--- /dev/null
@@ -0,0 +1,628 @@
+#include "protobufrpc.h"
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include <boost/functional/hash.hpp>
+
+using namespace std;
+
+using namespace boost;
+using asio::buffer;
+
+namespace bicker
+{
+
+template<typename T>
+static void* void_write(void* data, T val)
+{
+    *((T*)data) = val;
+    return (char*)data + sizeof(T);
+}
+
+class ProtoBufRpcServiceRequest
+{
+public:
+    ProtoBufRpcServiceRequest(
+                            RpcController *ctrl,
+                            const MethodDescriptor* method,
+                            Message *request,
+                            Message *response,
+                            shared_ptr<ProtoBufRpcConnection> conn
+                           )
+        :_ctrl(ctrl),
+         _method(method),
+         _request(request),
+         _response(response),
+         _conn(conn)
+    {
+    }
+
+    ~ProtoBufRpcServiceRequest()
+    {
+
+    }
+
+    static void run(ProtoBufRpcServiceRequest *req)
+    {
+
+        req->_conn->writeResponse(req->_response.get());
+
+        delete req;
+    }
+
+    shared_ptr<RpcController> _ctrl;
+    const MethodDescriptor *_method;
+    shared_ptr<Message> _request;
+    shared_ptr<Message> _response;
+    shared_ptr<ProtoBufRpcConnection> _conn;
+};
+
+ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service,
+                             Service *service)
+:_socket(io_service),
+ _strand(io_service),
+ _service(service),
+ _state(STATE_NONE)
+{
+}
+
+tcp::socket& ProtoBufRpcConnection::socket()
+{
+    return _socket;
+}
+
+void ProtoBufRpcConnection::start()
+{
+    _socket.async_read_some(_buffer.prepare(4096),
+            _strand.wrap(
+                boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(),
+                            asio::placeholders::error,
+                            asio::placeholders::bytes_transferred)));
+}
+
+void ProtoBufRpcConnection::writeResponse(Message *msg)
+{
+    int rlen = msg->ByteSize();
+    int len = htonl(rlen);
+    int mlen = sizeof(len) + rlen;
+
+    void * data = asio::buffer_cast<void*>(_buffer.prepare(mlen));
+
+    data = void_write(data, len);
+
+    using google::protobuf::io::ArrayOutputStream;
+
+    ArrayOutputStream as(data, rlen);
+
+    msg->SerializeToZeroCopyStream(&as);
+
+    _buffer.commit(mlen);
+
+    asio::async_write(_socket,
+            _buffer.data(),
+            _strand.wrap(
+                boost::bind(&ProtoBufRpcConnection::handle_write, 
+                            shared_from_this(),
+                asio::placeholders::error,
+                asio::placeholders::bytes_transferred)));
+}
+
+
+void ProtoBufRpcConnection::handle_read(const error_code& e, 
+                 std::size_t bytes_transferred)
+{
+    if (!e)
+    {
+        _buffer.commit(bytes_transferred);
+
+        if (_state == STATE_NONE)
+        {
+            if (_buffer.size() >= sizeof(_id) + sizeof(_len))
+            {
+                string b(
+                     buffers_begin(_buffer.data()),
+                     buffers_begin(_buffer.data())
+                                                + sizeof(_id) + sizeof(_len)
+                        );
+
+                _buffer.consume(sizeof(_id) + sizeof(_len));
+
+                _id = *((int*)b.c_str());
+                _id = ntohl(_id);
+
+                _len = *((unsigned int*)(b.c_str() + sizeof(_id)));
+                _len = ntohl(_len);
+
+                _state = STATE_HAVE_ID_AND_LEN;
+            }
+            else
+            {
+                start();
+            }
+        }
+
+        if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA)
+        {
+            if (_buffer.size() >= _len)
+            {
+                const MethodDescriptor* method =
+                    _service->GetDescriptor()->method(_id);
+
+                Message *req = _service->GetRequestPrototype(method).New();
+                Message *resp = _service->GetResponsePrototype(method).New();
+
+                using google::protobuf::io::ArrayInputStream;
+                using google::protobuf::io::CodedInputStream;
+
+                const void* data = asio::buffer_cast<const void*>(
+                                                        _buffer.data()
+                                                                 );
+                ArrayInputStream as(data, _len);
+                CodedInputStream is(&as);
+                is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+
+                if (!req->ParseFromCodedStream(&is))
+                {
+                    throw std::runtime_error("ParseFromCodedStream");
+                }
+
+                _buffer.consume(_len);
+
+                ProtoBufRpcController *ctrl = new ProtoBufRpcController();
+                _service->CallMethod(method, 
+                                     ctrl,
+                                     req, 
+                                     resp, 
+                                     NewCallback(
+                                             &ProtoBufRpcServiceRequest::run,
+                                             new ProtoBufRpcServiceRequest(
+                                                           ctrl,
+                                                           method,
+                                                           req,
+                                                           resp,
+                                                           shared_from_this())
+                                                )
+                                     );
+                _state = STATE_NONE;
+            }
+            else
+            {
+                _state = STATE_WAITING_FOR_DATA;
+                start();
+            }
+        }
+
+    }
+    else
+    {
+        error_code ignored_ec;
+        _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+    }
+}
+
+void ProtoBufRpcConnection::handle_write(const error_code& e,
+                                         std::size_t bytes_transferred)
+{
+    if (e)
+    {
+        error_code ignored_ec;
+        _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+    }
+    else
+    {
+        _buffer.consume(bytes_transferred);
+
+        if (_buffer.size())
+        {
+            asio::async_write(_socket,
+                    _buffer.data(),
+                    _strand.wrap(
+                        boost::bind(&ProtoBufRpcConnection::handle_write, 
+                                    shared_from_this(),
+                        asio::placeholders::error,
+                        asio::placeholders::bytes_transferred)));
+            return;
+        }
+
+        _state = STATE_NONE;
+        start();
+    }
+}
+
+ProtoBufRpcServer::ProtoBufRpcServer()
+    :_io_service(new asio::io_service())
+{
+}
+
+bool ProtoBufRpcServer::registerService(uint16_t port,
+                                shared_ptr<Service> service)
+{
+    // This is not thread safe
+
+    // The RegisteredService Constructor fires up the appropriate
+    // async accepts for the service
+    _services.push_back(shared_ptr<RegisteredService>(
+                                        new RegisteredService(
+                                                    _io_service,
+                                                    port,
+                                                    service)));
+
+    return true;
+}
+
+void run_wrapper(asio::io_service *io_service)
+{
+    struct itimerval itimer; 
+    setitimer(ITIMER_PROF, &itimer, NULL);
+
+    io_service->run();
+}
+
+void ProtoBufRpcServer::run()
+{
+    try
+    {
+        if (_services.size() == 0)
+        {
+            throw std::runtime_error("No services registered for ProtoBufRpcServer");
+        }
+
+        size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN);
+
+        vector<shared_ptr<thread> > threads;
+        for (size_t i = 0; i < nprocs; ++i)
+        {
+            shared_ptr<thread> t(new thread(
+                                    boost::bind(
+                                        //&run_wrapper,
+                                        &asio::io_service::run, 
+                                        _io_service.get())));
+            threads.push_back(t);
+        }
+
+        for (size_t i = 0; i < threads.size(); ++i)
+        {
+            threads[i]->join();
+        }
+    }
+    catch (std::exception &e)
+    {
+        std::cerr << "ProtoBufRpcService" << e.what() << std::endl;
+    }
+}
+
+void ProtoBufRpcServer::shutdown()
+{
+    _io_service->stop();
+}
+
+ProtoBufRpcServer::RegisteredService::RegisteredService(
+                  shared_ptr<asio::io_service> io_service,
+                  uint16_t port,
+                  shared_ptr<Service> service
+                 )
+:_io_service(io_service),
+ _port(port),
+ _service(service),
+ _endpoint(tcp::v4(), _port),
+ _acceptor(*_io_service),
+ _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get()))
+{
+    _acceptor.open(_endpoint.protocol());
+    _acceptor.set_option(tcp::acceptor::reuse_address(true));
+    _acceptor.bind(_endpoint);
+    _acceptor.listen();
+    _acceptor.async_accept(_new_connection->socket(),
+                   boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept, 
+                               this, 
+                               asio::placeholders::error));
+}
+
+void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e)
+{
+      if (!e)
+      {
+          _new_connection->start();
+          _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get()));
+          _acceptor.async_accept(_new_connection->socket(),
+                 boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
+                             this,
+                             asio::placeholders::error));
+      }
+
+}
+
+ProtoBufRpcController::ProtoBufRpcController()
+{
+}
+
+ProtoBufRpcController::~ProtoBufRpcController()
+{
+}
+
+void ProtoBufRpcController::Reset()
+{
+}
+
+bool ProtoBufRpcController::Failed() const
+{
+    return false;
+}
+
+string ProtoBufRpcController::ErrorText() const
+{
+    return "No Error";
+}
+
+void ProtoBufRpcController::StartCancel()
+{
+}
+
+void ProtoBufRpcController::SetFailed(const string &/*reason*/)
+{
+}
+
+bool ProtoBufRpcController::IsCanceled() const
+{
+    return false;
+}
+
+void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/)
+{
+}
+
+class ProtoBufRpcChannel::MethodHandler 
+    : public enable_shared_from_this<MethodHandler>,
+      private boost::noncopyable
+{
+public:
+    MethodHandler(auto_ptr<SocketCheckout> socket,
+                           const MethodDescriptor * method,
+                           RpcController * controller,
+                           const Message * request,
+                           Message * response,
+                           Closure * done
+                          )
+        :_socket(socket),
+        _method(method),
+        _controller(controller),
+        _request(request),
+        _response(response),
+        _done(done)
+    {
+    }
+
+    ~MethodHandler()
+    {
+        _socket.reset();
+        _done->Run();
+    }
+
+    static void execute(shared_ptr<MethodHandler> this_ptr)
+    {
+        int index = htonl(this_ptr->_method->index());
+        int rlen = this_ptr->_request->ByteSize();
+        int len = htonl(rlen);
+
+        int mlen = sizeof(index) + sizeof(len) + rlen;
+
+        void * data = asio::buffer_cast<void*>(this_ptr->_buffer.prepare(mlen));
+
+        data = void_write(data, index);
+        data = void_write(data, len);
+
+        using google::protobuf::io::ArrayOutputStream;
+
+        ArrayOutputStream as(data, rlen);
+
+        this_ptr->_request->SerializeToZeroCopyStream(&as);
+        this_ptr->_buffer.commit(mlen);
+
+        (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+                               boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+                                           this_ptr,
+                                           asio::placeholders::error,
+                                           asio::placeholders::bytes_transferred));
+    }
+
+    static void handle_write(shared_ptr<MethodHandler> this_ptr,
+                      const error_code& e, 
+                      std::size_t bytes_transferred)
+    {
+        if (!e)
+        {
+            this_ptr->_buffer.consume(bytes_transferred);
+
+            if (this_ptr->_buffer.size())
+            {
+                (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+                                       boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+                                                   this_ptr,
+                                                   asio::placeholders::error,
+                                                   asio::placeholders::bytes_transferred));
+                return;
+            }
+
+            (*(this_ptr->_socket))->async_receive(
+                                      buffer(&this_ptr->_len, sizeof(this_ptr->_len)),
+                                      boost::bind(
+                                                  &ProtoBufRpcChannel::MethodHandler::handle_read_len,
+                                                  this_ptr,
+                                                  asio::placeholders::error,
+                                                  asio::placeholders::bytes_transferred)
+                                     );
+        }
+        else
+        {
+            this_ptr->_controller->SetFailed(e.message());
+            (*(this_ptr->_socket))->close();
+        }
+    }
+
+    static void handle_read_len(shared_ptr<MethodHandler> this_ptr,
+                                const error_code& e,
+                                std::size_t bytes_transferred)
+    {
+        if (!e && bytes_transferred == sizeof(this_ptr->_len))
+        {
+            this_ptr->_len = ntohl(this_ptr->_len);
+            (*(this_ptr->_socket))->async_receive(
+                                      this_ptr->_buffer.prepare(this_ptr->_len),
+                                      boost::bind(
+                                                  &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+                                                  this_ptr,
+                                                  asio::placeholders::error,
+                                                  asio::placeholders::bytes_transferred
+                                                 )
+                                     );
+        }
+        else
+        {
+            this_ptr->_controller->SetFailed(e.message());
+            (*(this_ptr->_socket))->close();
+        }
+    }
+
+    static void handle_read_response(shared_ptr<MethodHandler> this_ptr,
+                              const error_code& e, 
+                              std::size_t bytes_transferred)
+    {
+        if (!e)
+        {
+            this_ptr->_buffer.commit(bytes_transferred);
+            if (this_ptr->_buffer.size() >= this_ptr->_len)
+            {
+                using google::protobuf::io::ArrayInputStream;
+                using google::protobuf::io::CodedInputStream;
+
+                const void* data = asio::buffer_cast<const void*>(
+                                                        this_ptr->_buffer.data()
+                                                                 );
+                ArrayInputStream as(data, this_ptr->_len);
+                CodedInputStream is(&as);
+                is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+
+                if (!this_ptr->_response->ParseFromCodedStream(&is))
+                {
+                    throw std::runtime_error("ParseFromCodedStream");
+                }
+
+                this_ptr->_buffer.consume(this_ptr->_len);
+            }
+            else
+            {
+                (*(this_ptr->_socket))->async_receive(
+                                          this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()),
+                                          boost::bind(
+                                                      &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+                                                      this_ptr,
+                                                      asio::placeholders::error,
+                                                      asio::placeholders::bytes_transferred
+                                                     )
+                                         );
+                return;
+            }
+        }
+        else
+        {
+            this_ptr->_controller->SetFailed(e.message());
+            (*(this_ptr->_socket))->close();
+        }
+    }
+
+private:
+    auto_ptr<SocketCheckout> _socket;
+    const MethodDescriptor * _method;
+    RpcController * _controller;
+    const Message * _request;
+    Message * _response;
+    Closure * _done;
+    asio::streambuf _buffer;
+    unsigned int _len;
+    bool _status;
+    unsigned int _sent;
+};
+
+
+ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost, 
+                                  const string &port)
+    :_remote_host(remotehost), _port(port),
+     _resolver(_io_service),
+     _acceptor(_io_service),
+     _pool(2000, _io_service),
+     _lame_socket(_io_service),
+     _thread()
+//                                &asio::io_service::run, 
+//                                &_io_service)))
+{
+
+    tcp::resolver::query query(_remote_host, _port);
+    tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query);
+    tcp::resolver::iterator end;
+
+    error_code error = asio::error::host_not_found;
+
+    if (endpoint_iterator == end) throw syserr::system_error(error);
+
+    _pool.setEndpoint(*endpoint_iterator);
+
+    tcp::endpoint e(tcp::v4(), 0);
+    _acceptor.open(e.protocol());
+    _acceptor.set_option(tcp::acceptor::reuse_address(true));
+    _acceptor.bind(e);
+    _acceptor.listen();
+    _acceptor.async_accept(_lame_socket,
+                       boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this, 
+                                   asio::placeholders::error));
+
+    _thread = shared_ptr<thread>(new thread(
+                                     boost::bind(
+                                             &asio::io_service::run, 
+                                             &_io_service)));
+}
+
+void ProtoBufRpcChannel::lame_handle_accept(const error_code &err)
+{
+    if (!err)
+    {
+        _acceptor.async_accept(_lame_socket,
+                           boost::bind(&ProtoBufRpcChannel::lame_handle_accept,
+                                       this,
+                                       asio::placeholders::error));
+    }
+}
+
+ProtoBufRpcChannel::~ProtoBufRpcChannel()
+{
+    _pool.cancelAndClear();
+
+    _io_service.stop();
+
+    _thread->join();
+}
+
+void ProtoBufRpcChannel::CallMethod(
+        const MethodDescriptor * method,
+        RpcController * controller,
+        const Message * request,
+        Message * response,
+        Closure * done)
+{
+    shared_ptr<MethodHandler> h(
+                            new MethodHandler(
+                          auto_ptr<SocketCheckout>(new SocketCheckout(&_pool)),
+                                              method,
+                                              controller,
+                                              request,
+                                              response,
+                                              done
+                                             ));
+
+    MethodHandler::execute(h);
+}
+
+} // namespace bicker
diff --git a/kvstore/protobufrpc.h b/kvstore/protobufrpc.h
new file mode 100644 (file)
index 0000000..221903d
--- /dev/null
@@ -0,0 +1,173 @@
+#ifndef __PROTOBUFRPC_H__
+#define __PROTOBUFRPC_H__
+
+#include <stdint.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread.hpp>
+#include <boost/function.hpp>
+#include <queue>
+#include <set>
+#include "socket_pool.h"
+#include "util.h"
+
+#include <google/protobuf/stubs/common.h>
+#include <google/protobuf/generated_message_reflection.h>
+#include <google/protobuf/repeated_field.h>
+#include <google/protobuf/extension_set.h>
+#include <google/protobuf/service.h>
+
+using namespace std;
+using namespace google::protobuf;
+using namespace boost;
+
+namespace bicker
+{
+
+class ProtoBufRpcService;
+
+class ProtoBufRpcConnection 
+    : public enable_shared_from_this<ProtoBufRpcConnection>,
+      private boost::noncopyable
+{
+public:
+    explicit ProtoBufRpcConnection(asio::io_service& io_service,
+                                 Service *_service);
+
+    asio::ip::tcp::socket& socket();
+
+    void start();
+
+    void writeResponse(Message *msg);
+
+
+private:
+    void handle_read(const error_code& e, 
+                     std::size_t bytes_transferred);
+
+    void handle_write(const error_code& e,
+                     std::size_t bytes_transferred);
+
+    tcp::socket _socket;
+
+
+    asio::io_service::strand _strand;
+
+    Service *_service;
+
+    asio::streambuf _buffer;
+
+    int _id;
+    unsigned int _len;
+
+    enum {
+        STATE_NONE,
+        STATE_HAVE_ID_AND_LEN,
+        STATE_WAITING_FOR_DATA,
+        STATE_FAIL,
+    } _state;
+};
+
+
+class ProtoBufRpcServer
+{
+public:
+    ProtoBufRpcServer();
+
+    bool registerService(uint16_t port,
+                         shared_ptr< Service> service);
+
+    // So we can call this as a thread.
+    void run();
+
+    // So we can stop..
+    void shutdown();
+
+protected:
+
+    class RegisteredService
+    {
+    public:
+        RegisteredService(
+                          shared_ptr<asio::io_service> io_service,
+                          uint16_t port,
+                          shared_ptr<Service> service
+                         );
+
+        void handle_accept(const error_code& e);
+
+    protected:
+        // Ref to parent's
+        shared_ptr<asio::io_service> _io_service;
+        uint16_t _port;
+        shared_ptr<Service> _service;
+        tcp::endpoint _endpoint;
+        tcp::acceptor _acceptor;
+        shared_ptr<ProtoBufRpcConnection> _new_connection;
+    };
+
+    list<shared_ptr<RegisteredService> > _services;
+    shared_ptr<asio::io_service> _io_service;
+};
+
+class ProtoBufRpcController : public RpcController
+{
+public:
+    ProtoBufRpcController();
+    virtual ~ProtoBufRpcController();
+
+    virtual void Reset();
+    virtual bool Failed() const;
+    virtual string ErrorText() const;
+    virtual void StartCancel();
+
+    virtual void SetFailed(const string &reason);
+    virtual bool IsCanceled() const;
+    virtual void NotifyOnCancel(Closure *callback);
+};
+
+class ProtoBufRpcChannel
+    : public RpcChannel,
+      public enable_shared_from_this<ProtoBufRpcChannel>,
+      private boost::noncopyable
+{
+public:
+    ProtoBufRpcChannel(const string &remotehost, const string &port);
+
+    virtual ~ProtoBufRpcChannel();
+
+    virtual void CallMethod(
+        const MethodDescriptor * method,
+        RpcController * controller,
+        const Message * request,
+        Message * response,
+        Closure * done);
+
+protected:
+    shared_ptr<tcp::socket> getSocket();
+    void putSocket(shared_ptr<tcp::socket>);
+
+private:
+    class MethodHandler;
+
+    string _remote_host;
+    string _port;
+
+
+    asio::io_service _io_service;
+    tcp::resolver _resolver;
+    // This exists to keep the io service running
+    tcp::acceptor _acceptor;
+
+    SocketPool _pool;
+
+    asio::ip::tcp::socket _lame_socket;
+    void lame_handle_accept(const error_code &err);
+
+    shared_ptr<boost::thread> _thread;
+
+};
+
+} // namespace bicker
+
+#endif
diff --git a/kvstore/protobufrpc/SConscript b/kvstore/protobufrpc/SConscript
deleted file mode 100644 (file)
index b972f84..0000000
+++ /dev/null
@@ -1,7 +0,0 @@
-Import('env')
-
-base_files = ['protobufrpc.cc', 'socket_pool.cc', 'workqueue.cc']
-
-protobufrpc = env.StaticLibrary('protobufrpc', base_files)
-
-Return('protobufrpc')
diff --git a/kvstore/protobufrpc/SConstruct b/kvstore/protobufrpc/SConstruct
deleted file mode 100644 (file)
index 79fbb68..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-import os
-
-AddOption('--prefix',
-          dest='prefix',
-          type='string',
-          nargs=1,
-          action='store',
-          metavar='DIR',
-          help='installation prefix')
-
-env = Environment(CXXFLAGS='-g -fPIC', PREFIX=GetOption('prefix'))
-
-for envvar in ('HOME', 'DISTCC_DIR', 'DISTCC_HOSTS', 'CCACHE_DIR',
-               'INTERCEPTOR_SOCKET', 'ENFORGE_DIGEST_CACHE',
-               'ENFORGE_CACHE_HOST', 'ENFORGE_CACHE_PORT'):
-  if envvar in os.environ:
-    env['ENV'][envvar] = os.environ[envvar]
-
-if env['PREFIX'] is not None:
-    bin_dest = env['PREFIX'] + '/usr/bin'
-else:
-    bin_dest = '/usr/bin'
-
-base_files = ['protobufrpc.cc', 'socket_pool.cc']
-
-if not env.has_key('LIBS'):
-    env['LIBS'] = []
-
-SConscript(['SConscript'])
diff --git a/kvstore/protobufrpc/protobufrpc.cc b/kvstore/protobufrpc/protobufrpc.cc
deleted file mode 100644 (file)
index 06226f7..0000000
+++ /dev/null
@@ -1,628 +0,0 @@
-#include "protobufrpc.h"
-
-#include <google/protobuf/descriptor.h>
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <boost/shared_ptr.hpp>
-#include <boost/bind.hpp>
-
-#include <boost/functional/hash.hpp>
-
-using namespace std;
-
-using namespace boost;
-using asio::buffer;
-
-namespace bicker
-{
-
-template<typename T>
-static void* void_write(void* data, T val)
-{
-    *((T*)data) = val;
-    return (char*)data + sizeof(T);
-}
-
-class ProtoBufRpcServiceRequest
-{
-public:
-    ProtoBufRpcServiceRequest(
-                            RpcController *ctrl,
-                            const MethodDescriptor* method,
-                            Message *request,
-                            Message *response,
-                            shared_ptr<ProtoBufRpcConnection> conn
-                           )
-        :_ctrl(ctrl),
-         _method(method),
-         _request(request),
-         _response(response),
-         _conn(conn)
-    {
-    }
-
-    ~ProtoBufRpcServiceRequest()
-    {
-
-    }
-
-    static void run(ProtoBufRpcServiceRequest *req)
-    {
-
-        req->_conn->writeResponse(req->_response.get());
-
-        delete req;
-    }
-
-    shared_ptr<RpcController> _ctrl;
-    const MethodDescriptor *_method;
-    shared_ptr<Message> _request;
-    shared_ptr<Message> _response;
-    shared_ptr<ProtoBufRpcConnection> _conn;
-};
-
-ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service,
-                             Service *service)
-:_socket(io_service),
- _strand(io_service),
- _service(service),
- _state(STATE_NONE)
-{
-}
-
-tcp::socket& ProtoBufRpcConnection::socket()
-{
-    return _socket;
-}
-
-void ProtoBufRpcConnection::start()
-{
-    _socket.async_read_some(_buffer.prepare(4096),
-            _strand.wrap(
-                boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(),
-                            asio::placeholders::error,
-                            asio::placeholders::bytes_transferred)));
-}
-
-void ProtoBufRpcConnection::writeResponse(Message *msg)
-{
-    int rlen = msg->ByteSize();
-    int len = htonl(rlen);
-    int mlen = sizeof(len) + rlen;
-
-    void * data = asio::buffer_cast<void*>(_buffer.prepare(mlen));
-
-    data = void_write(data, len);
-
-    using google::protobuf::io::ArrayOutputStream;
-
-    ArrayOutputStream as(data, rlen);
-
-    msg->SerializeToZeroCopyStream(&as);
-
-    _buffer.commit(mlen);
-
-    asio::async_write(_socket,
-            _buffer.data(),
-            _strand.wrap(
-                boost::bind(&ProtoBufRpcConnection::handle_write, 
-                            shared_from_this(),
-                asio::placeholders::error,
-                asio::placeholders::bytes_transferred)));
-}
-
-
-void ProtoBufRpcConnection::handle_read(const error_code& e, 
-                 std::size_t bytes_transferred)
-{
-    if (!e)
-    {
-        _buffer.commit(bytes_transferred);
-
-        if (_state == STATE_NONE)
-        {
-            if (_buffer.size() >= sizeof(_id) + sizeof(_len))
-            {
-                string b(
-                     buffers_begin(_buffer.data()),
-                     buffers_begin(_buffer.data())
-                                                + sizeof(_id) + sizeof(_len)
-                        );
-
-                _buffer.consume(sizeof(_id) + sizeof(_len));
-
-                _id = *((int*)b.c_str());
-                _id = ntohl(_id);
-
-                _len = *((unsigned int*)(b.c_str() + sizeof(_id)));
-                _len = ntohl(_len);
-
-                _state = STATE_HAVE_ID_AND_LEN;
-            }
-            else
-            {
-                start();
-            }
-        }
-
-        if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA)
-        {
-            if (_buffer.size() >= _len)
-            {
-                const MethodDescriptor* method =
-                    _service->GetDescriptor()->method(_id);
-
-                Message *req = _service->GetRequestPrototype(method).New();
-                Message *resp = _service->GetResponsePrototype(method).New();
-
-                using google::protobuf::io::ArrayInputStream;
-                using google::protobuf::io::CodedInputStream;
-
-                const void* data = asio::buffer_cast<const void*>(
-                                                        _buffer.data()
-                                                                 );
-                ArrayInputStream as(data, _len);
-                CodedInputStream is(&as);
-                is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
-
-                if (!req->ParseFromCodedStream(&is))
-                {
-                    throw std::runtime_error("ParseFromCodedStream");
-                }
-
-                _buffer.consume(_len);
-
-                ProtoBufRpcController *ctrl = new ProtoBufRpcController();
-                _service->CallMethod(method, 
-                                     ctrl,
-                                     req, 
-                                     resp, 
-                                     NewCallback(
-                                             &ProtoBufRpcServiceRequest::run,
-                                             new ProtoBufRpcServiceRequest(
-                                                           ctrl,
-                                                           method,
-                                                           req,
-                                                           resp,
-                                                           shared_from_this())
-                                                )
-                                     );
-                _state = STATE_NONE;
-            }
-            else
-            {
-                _state = STATE_WAITING_FOR_DATA;
-                start();
-            }
-        }
-
-    }
-    else
-    {
-        error_code ignored_ec;
-        _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
-    }
-}
-
-void ProtoBufRpcConnection::handle_write(const error_code& e,
-                                         std::size_t bytes_transferred)
-{
-    if (e)
-    {
-        error_code ignored_ec;
-        _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
-    }
-    else
-    {
-        _buffer.consume(bytes_transferred);
-
-        if (_buffer.size())
-        {
-            asio::async_write(_socket,
-                    _buffer.data(),
-                    _strand.wrap(
-                        boost::bind(&ProtoBufRpcConnection::handle_write, 
-                                    shared_from_this(),
-                        asio::placeholders::error,
-                        asio::placeholders::bytes_transferred)));
-            return;
-        }
-
-        _state = STATE_NONE;
-        start();
-    }
-}
-
-ProtoBufRpcServer::ProtoBufRpcServer()
-    :_io_service(new asio::io_service())
-{
-}
-
-bool ProtoBufRpcServer::registerService(uint16_t port,
-                                shared_ptr<Service> service)
-{
-    // This is not thread safe
-
-    // The RegisteredService Constructor fires up the appropriate
-    // async accepts for the service
-    _services.push_back(shared_ptr<RegisteredService>(
-                                        new RegisteredService(
-                                                    _io_service,
-                                                    port,
-                                                    service)));
-
-    return true;
-}
-
-void run_wrapper(asio::io_service *io_service)
-{
-    struct itimerval itimer; 
-    setitimer(ITIMER_PROF, &itimer, NULL);
-
-    io_service->run();
-}
-
-void ProtoBufRpcServer::run()
-{
-    try
-    {
-        if (_services.size() == 0)
-        {
-            throw std::runtime_error("No services registered for ProtoBufRpcServer");
-        }
-
-        size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN);
-
-        vector<shared_ptr<thread> > threads;
-        for (size_t i = 0; i < nprocs; ++i)
-        {
-            shared_ptr<thread> t(new thread(
-                                    boost::bind(
-                                        //&run_wrapper,
-                                        &asio::io_service::run, 
-                                        _io_service.get())));
-            threads.push_back(t);
-        }
-
-        for (size_t i = 0; i < threads.size(); ++i)
-        {
-            threads[i]->join();
-        }
-    }
-    catch (std::exception &e)
-    {
-        std::cerr << "ProtoBufRpcService" << e.what() << std::endl;
-    }
-}
-
-void ProtoBufRpcServer::shutdown()
-{
-    _io_service->stop();
-}
-
-ProtoBufRpcServer::RegisteredService::RegisteredService(
-                  shared_ptr<asio::io_service> io_service,
-                  uint16_t port,
-                  shared_ptr<Service> service
-                 )
-:_io_service(io_service),
- _port(port),
- _service(service),
- _endpoint(tcp::v4(), _port),
- _acceptor(*_io_service),
- _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get()))
-{
-    _acceptor.open(_endpoint.protocol());
-    _acceptor.set_option(tcp::acceptor::reuse_address(true));
-    _acceptor.bind(_endpoint);
-    _acceptor.listen();
-    _acceptor.async_accept(_new_connection->socket(),
-                   boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept, 
-                               this, 
-                               asio::placeholders::error));
-}
-
-void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e)
-{
-      if (!e)
-      {
-          _new_connection->start();
-          _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get()));
-          _acceptor.async_accept(_new_connection->socket(),
-                 boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
-                             this,
-                             asio::placeholders::error));
-      }
-
-}
-
-ProtoBufRpcController::ProtoBufRpcController()
-{
-}
-
-ProtoBufRpcController::~ProtoBufRpcController()
-{
-}
-
-void ProtoBufRpcController::Reset()
-{
-}
-
-bool ProtoBufRpcController::Failed() const
-{
-    return false;
-}
-
-string ProtoBufRpcController::ErrorText() const
-{
-    return "No Error";
-}
-
-void ProtoBufRpcController::StartCancel()
-{
-}
-
-void ProtoBufRpcController::SetFailed(const string &/*reason*/)
-{
-}
-
-bool ProtoBufRpcController::IsCanceled() const
-{
-    return false;
-}
-
-void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/)
-{
-}
-
-class ProtoBufRpcChannel::MethodHandler 
-    : public enable_shared_from_this<MethodHandler>,
-      private boost::noncopyable
-{
-public:
-    MethodHandler(auto_ptr<SocketCheckout> socket,
-                           const MethodDescriptor * method,
-                           RpcController * controller,
-                           const Message * request,
-                           Message * response,
-                           Closure * done
-                          )
-        :_socket(socket),
-        _method(method),
-        _controller(controller),
-        _request(request),
-        _response(response),
-        _done(done)
-    {
-    }
-
-    ~MethodHandler()
-    {
-        _socket.reset();
-        _done->Run();
-    }
-
-    static void execute(shared_ptr<MethodHandler> this_ptr)
-    {
-        int index = htonl(this_ptr->_method->index());
-        int rlen = this_ptr->_request->ByteSize();
-        int len = htonl(rlen);
-
-        int mlen = sizeof(index) + sizeof(len) + rlen;
-
-        void * data = asio::buffer_cast<void*>(this_ptr->_buffer.prepare(mlen));
-
-        data = void_write(data, index);
-        data = void_write(data, len);
-
-        using google::protobuf::io::ArrayOutputStream;
-
-        ArrayOutputStream as(data, rlen);
-
-        this_ptr->_request->SerializeToZeroCopyStream(&as);
-        this_ptr->_buffer.commit(mlen);
-
-        (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
-                               boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
-                                           this_ptr,
-                                           asio::placeholders::error,
-                                           asio::placeholders::bytes_transferred));
-    }
-
-    static void handle_write(shared_ptr<MethodHandler> this_ptr,
-                      const error_code& e, 
-                      std::size_t bytes_transferred)
-    {
-        if (!e)
-        {
-            this_ptr->_buffer.consume(bytes_transferred);
-
-            if (this_ptr->_buffer.size())
-            {
-                (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
-                                       boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
-                                                   this_ptr,
-                                                   asio::placeholders::error,
-                                                   asio::placeholders::bytes_transferred));
-                return;
-            }
-
-            (*(this_ptr->_socket))->async_receive(
-                                      buffer(&this_ptr->_len, sizeof(this_ptr->_len)),
-                                      boost::bind(
-                                                  &ProtoBufRpcChannel::MethodHandler::handle_read_len,
-                                                  this_ptr,
-                                                  asio::placeholders::error,
-                                                  asio::placeholders::bytes_transferred)
-                                     );
-        }
-        else
-        {
-            this_ptr->_controller->SetFailed(e.message());
-            (*(this_ptr->_socket))->close();
-        }
-    }
-
-    static void handle_read_len(shared_ptr<MethodHandler> this_ptr,
-                                const error_code& e,
-                                std::size_t bytes_transferred)
-    {
-        if (!e && bytes_transferred == sizeof(this_ptr->_len))
-        {
-            this_ptr->_len = ntohl(this_ptr->_len);
-            (*(this_ptr->_socket))->async_receive(
-                                      this_ptr->_buffer.prepare(this_ptr->_len),
-                                      boost::bind(
-                                                  &ProtoBufRpcChannel::MethodHandler::handle_read_response,
-                                                  this_ptr,
-                                                  asio::placeholders::error,
-                                                  asio::placeholders::bytes_transferred
-                                                 )
-                                     );
-        }
-        else
-        {
-            this_ptr->_controller->SetFailed(e.message());
-            (*(this_ptr->_socket))->close();
-        }
-    }
-
-    static void handle_read_response(shared_ptr<MethodHandler> this_ptr,
-                              const error_code& e, 
-                              std::size_t bytes_transferred)
-    {
-        if (!e)
-        {
-            this_ptr->_buffer.commit(bytes_transferred);
-            if (this_ptr->_buffer.size() >= this_ptr->_len)
-            {
-                using google::protobuf::io::ArrayInputStream;
-                using google::protobuf::io::CodedInputStream;
-
-                const void* data = asio::buffer_cast<const void*>(
-                                                        this_ptr->_buffer.data()
-                                                                 );
-                ArrayInputStream as(data, this_ptr->_len);
-                CodedInputStream is(&as);
-                is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
-
-                if (!this_ptr->_response->ParseFromCodedStream(&is))
-                {
-                    throw std::runtime_error("ParseFromCodedStream");
-                }
-
-                this_ptr->_buffer.consume(this_ptr->_len);
-            }
-            else
-            {
-                (*(this_ptr->_socket))->async_receive(
-                                          this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()),
-                                          boost::bind(
-                                                      &ProtoBufRpcChannel::MethodHandler::handle_read_response,
-                                                      this_ptr,
-                                                      asio::placeholders::error,
-                                                      asio::placeholders::bytes_transferred
-                                                     )
-                                         );
-                return;
-            }
-        }
-        else
-        {
-            this_ptr->_controller->SetFailed(e.message());
-            (*(this_ptr->_socket))->close();
-        }
-    }
-
-private:
-    auto_ptr<SocketCheckout> _socket;
-    const MethodDescriptor * _method;
-    RpcController * _controller;
-    const Message * _request;
-    Message * _response;
-    Closure * _done;
-    asio::streambuf _buffer;
-    unsigned int _len;
-    bool _status;
-    unsigned int _sent;
-};
-
-
-ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost, 
-                                  const string &port)
-    :_remote_host(remotehost), _port(port),
-     _resolver(_io_service),
-     _acceptor(_io_service),
-     _pool(2000, _io_service),
-     _lame_socket(_io_service),
-     _thread()
-//                                &asio::io_service::run, 
-//                                &_io_service)))
-{
-
-    tcp::resolver::query query(_remote_host, _port);
-    tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query);
-    tcp::resolver::iterator end;
-
-    error_code error = asio::error::host_not_found;
-
-    if (endpoint_iterator == end) throw syserr::system_error(error);
-
-    _pool.setEndpoint(*endpoint_iterator);
-
-    tcp::endpoint e(tcp::v4(), 0);
-    _acceptor.open(e.protocol());
-    _acceptor.set_option(tcp::acceptor::reuse_address(true));
-    _acceptor.bind(e);
-    _acceptor.listen();
-    _acceptor.async_accept(_lame_socket,
-                       boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this, 
-                                   asio::placeholders::error));
-
-    _thread = shared_ptr<thread>(new thread(
-                                     boost::bind(
-                                             &asio::io_service::run, 
-                                             &_io_service)));
-}
-
-void ProtoBufRpcChannel::lame_handle_accept(const error_code &err)
-{
-    if (!err)
-    {
-        _acceptor.async_accept(_lame_socket,
-                           boost::bind(&ProtoBufRpcChannel::lame_handle_accept,
-                                       this,
-                                       asio::placeholders::error));
-    }
-}
-
-ProtoBufRpcChannel::~ProtoBufRpcChannel()
-{
-    _pool.cancelAndClear();
-
-    _io_service.stop();
-
-    _thread->join();
-}
-
-void ProtoBufRpcChannel::CallMethod(
-        const MethodDescriptor * method,
-        RpcController * controller,
-        const Message * request,
-        Message * response,
-        Closure * done)
-{
-    shared_ptr<MethodHandler> h(
-                            new MethodHandler(
-                          auto_ptr<SocketCheckout>(new SocketCheckout(&_pool)),
-                                              method,
-                                              controller,
-                                              request,
-                                              response,
-                                              done
-                                             ));
-
-    MethodHandler::execute(h);
-}
-
-} // namespace bicker
diff --git a/kvstore/protobufrpc/protobufrpc.h b/kvstore/protobufrpc/protobufrpc.h
deleted file mode 100644 (file)
index 221903d..0000000
+++ /dev/null
@@ -1,173 +0,0 @@
-#ifndef __PROTOBUFRPC_H__
-#define __PROTOBUFRPC_H__
-
-#include <stdint.h>
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/thread.hpp>
-#include <boost/function.hpp>
-#include <queue>
-#include <set>
-#include "socket_pool.h"
-#include "util.h"
-
-#include <google/protobuf/stubs/common.h>
-#include <google/protobuf/generated_message_reflection.h>
-#include <google/protobuf/repeated_field.h>
-#include <google/protobuf/extension_set.h>
-#include <google/protobuf/service.h>
-
-using namespace std;
-using namespace google::protobuf;
-using namespace boost;
-
-namespace bicker
-{
-
-class ProtoBufRpcService;
-
-class ProtoBufRpcConnection 
-    : public enable_shared_from_this<ProtoBufRpcConnection>,
-      private boost::noncopyable
-{
-public:
-    explicit ProtoBufRpcConnection(asio::io_service& io_service,
-                                 Service *_service);
-
-    asio::ip::tcp::socket& socket();
-
-    void start();
-
-    void writeResponse(Message *msg);
-
-
-private:
-    void handle_read(const error_code& e, 
-                     std::size_t bytes_transferred);
-
-    void handle_write(const error_code& e,
-                     std::size_t bytes_transferred);
-
-    tcp::socket _socket;
-
-
-    asio::io_service::strand _strand;
-
-    Service *_service;
-
-    asio::streambuf _buffer;
-
-    int _id;
-    unsigned int _len;
-
-    enum {
-        STATE_NONE,
-        STATE_HAVE_ID_AND_LEN,
-        STATE_WAITING_FOR_DATA,
-        STATE_FAIL,
-    } _state;
-};
-
-
-class ProtoBufRpcServer
-{
-public:
-    ProtoBufRpcServer();
-
-    bool registerService(uint16_t port,
-                         shared_ptr< Service> service);
-
-    // So we can call this as a thread.
-    void run();
-
-    // So we can stop..
-    void shutdown();
-
-protected:
-
-    class RegisteredService
-    {
-    public:
-        RegisteredService(
-                          shared_ptr<asio::io_service> io_service,
-                          uint16_t port,
-                          shared_ptr<Service> service
-                         );
-
-        void handle_accept(const error_code& e);
-
-    protected:
-        // Ref to parent's
-        shared_ptr<asio::io_service> _io_service;
-        uint16_t _port;
-        shared_ptr<Service> _service;
-        tcp::endpoint _endpoint;
-        tcp::acceptor _acceptor;
-        shared_ptr<ProtoBufRpcConnection> _new_connection;
-    };
-
-    list<shared_ptr<RegisteredService> > _services;
-    shared_ptr<asio::io_service> _io_service;
-};
-
-class ProtoBufRpcController : public RpcController
-{
-public:
-    ProtoBufRpcController();
-    virtual ~ProtoBufRpcController();
-
-    virtual void Reset();
-    virtual bool Failed() const;
-    virtual string ErrorText() const;
-    virtual void StartCancel();
-
-    virtual void SetFailed(const string &reason);
-    virtual bool IsCanceled() const;
-    virtual void NotifyOnCancel(Closure *callback);
-};
-
-class ProtoBufRpcChannel
-    : public RpcChannel,
-      public enable_shared_from_this<ProtoBufRpcChannel>,
-      private boost::noncopyable
-{
-public:
-    ProtoBufRpcChannel(const string &remotehost, const string &port);
-
-    virtual ~ProtoBufRpcChannel();
-
-    virtual void CallMethod(
-        const MethodDescriptor * method,
-        RpcController * controller,
-        const Message * request,
-        Message * response,
-        Closure * done);
-
-protected:
-    shared_ptr<tcp::socket> getSocket();
-    void putSocket(shared_ptr<tcp::socket>);
-
-private:
-    class MethodHandler;
-
-    string _remote_host;
-    string _port;
-
-
-    asio::io_service _io_service;
-    tcp::resolver _resolver;
-    // This exists to keep the io service running
-    tcp::acceptor _acceptor;
-
-    SocketPool _pool;
-
-    asio::ip::tcp::socket _lame_socket;
-    void lame_handle_accept(const error_code &err);
-
-    shared_ptr<boost::thread> _thread;
-
-};
-
-} // namespace bicker
-
-#endif
diff --git a/kvstore/protobufrpc/socket_pool.cc b/kvstore/protobufrpc/socket_pool.cc
deleted file mode 100644 (file)
index 454da55..0000000
+++ /dev/null
@@ -1,101 +0,0 @@
-#include "socket_pool.h"
-
-SocketPool::SocketPool(int max_sockets,
-               asio::io_service &io_svc
-              )
-:_issued(0),
- _max_sockets(max_sockets),
- _io_service(io_svc)
-{
-}
-
-void SocketPool::setEndpoint(const tcp::endpoint &endpoint)
-{
-    _endpoint = endpoint;
-}
-
-void SocketPool::cancelAndClear()
-{
-    for (set<shared_ptr<tcp::socket> >::iterator i = _set.begin();
-         i != _set.end();
-         ++i)
-    {
-        (*i)->cancel();
-    }
-
-    while (!_queue.empty()) _queue.pop();
-    _set.clear();
-}
-
-shared_ptr<tcp::socket> SocketPool::getSocket()
-{
-    mutex::scoped_lock lock(_sockets_lock);
-
-    while (_queue.size() == 0 && _issued >= _max_sockets)
-        _sockets_non_empty.wait(lock);
-
-    if (_queue.size())
-    {
-            shared_ptr<tcp::socket> socket = _queue.front();
-            _queue.pop();
-
-            return socket;
-    }
-    else
-    {
-        ++_issued;
-        error_code error = asio::error::host_not_found;
-
-        shared_ptr<tcp::socket> socket(new tcp::socket(_io_service));
-        socket->connect(_endpoint, error);
-
-        if (error) throw syserr::system_error(error);
-
-        _set.insert(socket);
-
-        return socket;
-    }
-}
-
-void SocketPool::putSocket(shared_ptr<tcp::socket> socket)
-{
-    mutex::scoped_lock lock(_sockets_lock);
-
-    if (!socket->is_open())
-    {
-        cerr << "socket closed\n";
-        --_issued;
-        _set.erase(socket);
-    }
-    else 
-    {
-        _queue.push(socket);
-    }
-
-    _sockets_non_empty.notify_one();
-}
-
-SocketCheckout::SocketCheckout(SocketPool *pool)
-    :_socket(pool->getSocket()),
-     _pool(pool)
-{
-}
-
-SocketCheckout::~SocketCheckout()
-{
-    _pool->putSocket(_socket);
-}
-tcp::socket& SocketCheckout::operator*()
-{
-    return *_socket;
-}
-
-tcp::socket* SocketCheckout::operator->()
-{
-    return _socket.get();
-}
-
-shared_ptr<tcp::socket>& SocketCheckout::socket()
-{
-    return _socket;
-}
diff --git a/kvstore/protobufrpc/socket_pool.h b/kvstore/protobufrpc/socket_pool.h
deleted file mode 100644 (file)
index 57faf42..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-#ifndef _SOCKET_POOL_H_
-#define _SOCKET_POOL_H_ 1
-
-#include <iostream>
-#include <set>
-#include <queue>
-#include <boost/shared_ptr.hpp>
-#include <boost/thread.hpp>
-
-#include "util.h"
-
-using namespace std;
-using namespace boost;
-
-
-class SocketPool
-{
-public:
-    SocketPool(int max_streams,
-               asio::io_service &io_svc);
-    void setEndpoint(const tcp::endpoint &endpoint);
-    void cancelAndClear();
-    shared_ptr<tcp::socket> getSocket();
-    void putSocket(shared_ptr<tcp::socket> socket);
-private:
-    int _issued;
-    int _max_sockets;
-    mutex _sockets_lock;
-    condition_variable _sockets_non_empty;
-    asio::io_service &_io_service;
-    tcp::endpoint _endpoint;
-    queue<shared_ptr<tcp::socket> > _queue;
-    set<shared_ptr<tcp::socket> > _set;
-};
-
-class SocketCheckout
-{
-public:
-    SocketCheckout(SocketPool *pool);
-    ~SocketCheckout();
-
-    tcp::socket& operator*();
-    tcp::socket* operator->();
-
-    shared_ptr<tcp::socket>& socket();
-
-private:
-    shared_ptr<tcp::socket> _socket;
-    SocketPool *_pool;
-};
-
-#endif
diff --git a/kvstore/protobufrpc/util.h b/kvstore/protobufrpc/util.h
deleted file mode 100644 (file)
index 966921f..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-#ifndef _UTIL_H_ 
-#define _UTIL_H_ 1
-
-#include <boost/version.hpp>
-
-#if BOOST_VERSION <= 103500
-#include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/bind.hpp>
-#include <asio.hpp>
-#include <asio/buffer.hpp>
-//typedef boost::condition condition_variable ;
-//typedef boost::detail::thread::scoped_lock<boost::mutex> scoped_lock;
-using asio::ip::tcp;
-using asio::error_code;
-using asio::buffers_begin;
-namespace syserr=asio;
-#else
-#if BOOST_VERSION < 104000
-#include <boost/asio.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/bind.hpp>
-using boost::asio::ip::tcp;
-using boost::system::error_code;
-using boost::system::system_error;
-namespace syserr=boost::system;
-#else
-#include <boost/asio.hpp>
-#include <boost/thread/mutex.hpp>
-using boost::asio::ip::tcp;
-using boost::system::error_code;
-using boost::system::system_error;
-namespace syserr=boost::system;
-#endif
-#endif
-
-#endif
diff --git a/kvstore/protobufrpc/workqueue.cc b/kvstore/protobufrpc/workqueue.cc
deleted file mode 100644 (file)
index 27861aa..0000000
+++ /dev/null
@@ -1,125 +0,0 @@
-#include "workqueue.h"
-#include <boost/thread.hpp>
-
-namespace bicker
-{
-
-WorkQueue::WorkQueue()
-    :_thread_count(0), _min_threads(1), _max_threads(50), _running(true)
-{
-    for (int i = 0; i < _min_threads; ++i)
-    {
-        spawnThread();
-    }
-}
-
-WorkQueue::WorkQueue(int thread_count)
-    :_thread_count(0), 
-     _min_threads(1), _max_threads(thread_count), _running(true)
-{
-    for (int i = 0; i < _min_threads; ++i)
-    {
-        spawnThread();
-    }
-}
-
-WorkQueue::~WorkQueue()
-{
-    _running = false;
-
-    {
-        mutex::scoped_lock lock(_queue_lock);
-        _queue_non_empty.notify_all();
-    }
-
-    _threads.join_all();
-}
-
-void WorkQueue::spawnThread()
-{
-    ++_thread_count;
-    _threads.create_thread(Worker(this));
-}
-
-shared_ptr<WorkUnit> WorkQueue::get()
-{
-    mutex::scoped_lock lock(_queue_lock);
-
-    while (_queue.size() == 0)
-    {
-        _queue_non_empty.wait(lock);
-        if (!_running) throw interrupted_error();
-    }
-
-    shared_ptr<WorkUnit> back = _queue.front();
-    _queue.pop();
-
-    if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread();
-
-    return back;
-}
-
-void WorkQueue::put(shared_ptr<WorkUnit> work_unit)
-{
-    mutex::scoped_lock lock(_queue_lock);
-
-    _queue.push(work_unit);
-
-    _queue_non_empty.notify_one();
-}
-
-WorkQueue::Worker::Worker(WorkQueue* queue)
-    :_queue(queue)
-{
-}
-
-void WorkQueue::Worker::operator()()
-{
-    while (true)
-    {
-        try
-        {
-            shared_ptr<WorkUnit> unit = _queue->get();
-
-            unit->run();
-        }
-        catch (interrupted_error)
-        {
-            return;
-        }
-    }
-}
-
-TaskNotification::TaskNotification()
-:_expected(0), _count(0), _fail_count(0)
-{
-}
-
-void TaskNotification::registerTask()
-{
-    mutex::scoped_lock lock(_lock);
-    ++_expected;
-}
-
-void TaskNotification::completeTask(bool success)
-{
-    mutex::scoped_lock lock(_lock);
-    if (!success) ++_fail_count;
-    if (++_count == _expected) _cond.notify_all();
-}
-
-void TaskNotification::waitForComplete()
-{
-    mutex::scoped_lock lock(_lock);
-    while (_count < _expected)
-    {
-        _cond.wait(lock);
-    }
-}
-
-bool TaskNotification::failCount()
-{
-    return _fail_count;
-}
-
-} // namespace bicker
diff --git a/kvstore/protobufrpc/workqueue.h b/kvstore/protobufrpc/workqueue.h
deleted file mode 100644 (file)
index cfb2e02..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-#ifndef __WORKQUEUE_H__
-#define __WORKQUEUE_H__ 1
-
-#include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
-#include <queue>
-#include "util.h"
-
-using namespace boost;
-using namespace std;
-
-namespace bicker
-{
-    struct interrupted_error : public virtual std::exception { };
-
-    class WorkUnit
-    {
-    public:
-        virtual ~WorkUnit() {};
-        virtual void run() = 0;
-    };
-
-    class WorkQueue
-    {
-    public:
-        WorkQueue();
-        WorkQueue(int thread_count);
-
-        ~WorkQueue();
-
-        shared_ptr<WorkUnit> get();
-        void put(shared_ptr<WorkUnit> work_unit);
-
-    protected:
-        void spawnThread();
-
-    private:
-        class Worker
-        {
-        public:
-            Worker(WorkQueue* queue);
-            void operator()();
-        private:
-            WorkQueue *_queue;
-        };
-
-        int _thread_count;
-        int _min_threads;
-        int _max_threads;
-        mutex _queue_lock;
-        condition_variable _queue_non_empty;
-        queue<shared_ptr<WorkUnit> > _queue;
-        thread_group _threads;
-        volatile bool _running;
-    };
-
-    class TaskNotification
-    {
-    public:
-        TaskNotification();
-
-        void registerTask();
-        void completeTask(bool success = true);
-
-        void waitForComplete();
-
-        bool failCount();
-    private:
-        int                _expected;
-        int                _count;
-        int                _fail_count;
-        mutex              _lock;
-        condition_variable _cond;
-    };
-
-} // namespace bicker
-
-#endif
diff --git a/kvstore/socket_pool.cc b/kvstore/socket_pool.cc
new file mode 100644 (file)
index 0000000..454da55
--- /dev/null
@@ -0,0 +1,101 @@
+#include "socket_pool.h"
+
+SocketPool::SocketPool(int max_sockets,
+               asio::io_service &io_svc
+              )
+:_issued(0),
+ _max_sockets(max_sockets),
+ _io_service(io_svc)
+{
+}
+
+void SocketPool::setEndpoint(const tcp::endpoint &endpoint)
+{
+    _endpoint = endpoint;
+}
+
+void SocketPool::cancelAndClear()
+{
+    for (set<shared_ptr<tcp::socket> >::iterator i = _set.begin();
+         i != _set.end();
+         ++i)
+    {
+        (*i)->cancel();
+    }
+
+    while (!_queue.empty()) _queue.pop();
+    _set.clear();
+}
+
+shared_ptr<tcp::socket> SocketPool::getSocket()
+{
+    mutex::scoped_lock lock(_sockets_lock);
+
+    while (_queue.size() == 0 && _issued >= _max_sockets)
+        _sockets_non_empty.wait(lock);
+
+    if (_queue.size())
+    {
+            shared_ptr<tcp::socket> socket = _queue.front();
+            _queue.pop();
+
+            return socket;
+    }
+    else
+    {
+        ++_issued;
+        error_code error = asio::error::host_not_found;
+
+        shared_ptr<tcp::socket> socket(new tcp::socket(_io_service));
+        socket->connect(_endpoint, error);
+
+        if (error) throw syserr::system_error(error);
+
+        _set.insert(socket);
+
+        return socket;
+    }
+}
+
+void SocketPool::putSocket(shared_ptr<tcp::socket> socket)
+{
+    mutex::scoped_lock lock(_sockets_lock);
+
+    if (!socket->is_open())
+    {
+        cerr << "socket closed\n";
+        --_issued;
+        _set.erase(socket);
+    }
+    else 
+    {
+        _queue.push(socket);
+    }
+
+    _sockets_non_empty.notify_one();
+}
+
+SocketCheckout::SocketCheckout(SocketPool *pool)
+    :_socket(pool->getSocket()),
+     _pool(pool)
+{
+}
+
+SocketCheckout::~SocketCheckout()
+{
+    _pool->putSocket(_socket);
+}
+tcp::socket& SocketCheckout::operator*()
+{
+    return *_socket;
+}
+
+tcp::socket* SocketCheckout::operator->()
+{
+    return _socket.get();
+}
+
+shared_ptr<tcp::socket>& SocketCheckout::socket()
+{
+    return _socket;
+}
diff --git a/kvstore/socket_pool.h b/kvstore/socket_pool.h
new file mode 100644 (file)
index 0000000..57faf42
--- /dev/null
@@ -0,0 +1,52 @@
+#ifndef _SOCKET_POOL_H_
+#define _SOCKET_POOL_H_ 1
+
+#include <iostream>
+#include <set>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+
+#include "util.h"
+
+using namespace std;
+using namespace boost;
+
+
+class SocketPool
+{
+public:
+    SocketPool(int max_streams,
+               asio::io_service &io_svc);
+    void setEndpoint(const tcp::endpoint &endpoint);
+    void cancelAndClear();
+    shared_ptr<tcp::socket> getSocket();
+    void putSocket(shared_ptr<tcp::socket> socket);
+private:
+    int _issued;
+    int _max_sockets;
+    mutex _sockets_lock;
+    condition_variable _sockets_non_empty;
+    asio::io_service &_io_service;
+    tcp::endpoint _endpoint;
+    queue<shared_ptr<tcp::socket> > _queue;
+    set<shared_ptr<tcp::socket> > _set;
+};
+
+class SocketCheckout
+{
+public:
+    SocketCheckout(SocketPool *pool);
+    ~SocketCheckout();
+
+    tcp::socket& operator*();
+    tcp::socket* operator->();
+
+    shared_ptr<tcp::socket>& socket();
+
+private:
+    shared_ptr<tcp::socket> _socket;
+    SocketPool *_pool;
+};
+
+#endif
diff --git a/kvstore/util.h b/kvstore/util.h
new file mode 100644 (file)
index 0000000..966921f
--- /dev/null
@@ -0,0 +1,37 @@
+#ifndef _UTIL_H_ 
+#define _UTIL_H_ 1
+
+#include <boost/version.hpp>
+
+#if BOOST_VERSION <= 103500
+#include <boost/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+#include <asio.hpp>
+#include <asio/buffer.hpp>
+//typedef boost::condition condition_variable ;
+//typedef boost::detail::thread::scoped_lock<boost::mutex> scoped_lock;
+using asio::ip::tcp;
+using asio::error_code;
+using asio::buffers_begin;
+namespace syserr=asio;
+#else
+#if BOOST_VERSION < 104000
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
+#else
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
+#endif
+#endif
+
+#endif
diff --git a/kvstore/workqueue.cc b/kvstore/workqueue.cc
new file mode 100644 (file)
index 0000000..27861aa
--- /dev/null
@@ -0,0 +1,125 @@
+#include "workqueue.h"
+#include <boost/thread.hpp>
+
+namespace bicker
+{
+
+WorkQueue::WorkQueue()
+    :_thread_count(0), _min_threads(1), _max_threads(50), _running(true)
+{
+    for (int i = 0; i < _min_threads; ++i)
+    {
+        spawnThread();
+    }
+}
+
+WorkQueue::WorkQueue(int thread_count)
+    :_thread_count(0), 
+     _min_threads(1), _max_threads(thread_count), _running(true)
+{
+    for (int i = 0; i < _min_threads; ++i)
+    {
+        spawnThread();
+    }
+}
+
+WorkQueue::~WorkQueue()
+{
+    _running = false;
+
+    {
+        mutex::scoped_lock lock(_queue_lock);
+        _queue_non_empty.notify_all();
+    }
+
+    _threads.join_all();
+}
+
+void WorkQueue::spawnThread()
+{
+    ++_thread_count;
+    _threads.create_thread(Worker(this));
+}
+
+shared_ptr<WorkUnit> WorkQueue::get()
+{
+    mutex::scoped_lock lock(_queue_lock);
+
+    while (_queue.size() == 0)
+    {
+        _queue_non_empty.wait(lock);
+        if (!_running) throw interrupted_error();
+    }
+
+    shared_ptr<WorkUnit> back = _queue.front();
+    _queue.pop();
+
+    if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread();
+
+    return back;
+}
+
+void WorkQueue::put(shared_ptr<WorkUnit> work_unit)
+{
+    mutex::scoped_lock lock(_queue_lock);
+
+    _queue.push(work_unit);
+
+    _queue_non_empty.notify_one();
+}
+
+WorkQueue::Worker::Worker(WorkQueue* queue)
+    :_queue(queue)
+{
+}
+
+void WorkQueue::Worker::operator()()
+{
+    while (true)
+    {
+        try
+        {
+            shared_ptr<WorkUnit> unit = _queue->get();
+
+            unit->run();
+        }
+        catch (interrupted_error)
+        {
+            return;
+        }
+    }
+}
+
+TaskNotification::TaskNotification()
+:_expected(0), _count(0), _fail_count(0)
+{
+}
+
+void TaskNotification::registerTask()
+{
+    mutex::scoped_lock lock(_lock);
+    ++_expected;
+}
+
+void TaskNotification::completeTask(bool success)
+{
+    mutex::scoped_lock lock(_lock);
+    if (!success) ++_fail_count;
+    if (++_count == _expected) _cond.notify_all();
+}
+
+void TaskNotification::waitForComplete()
+{
+    mutex::scoped_lock lock(_lock);
+    while (_count < _expected)
+    {
+        _cond.wait(lock);
+    }
+}
+
+bool TaskNotification::failCount()
+{
+    return _fail_count;
+}
+
+} // namespace bicker
diff --git a/kvstore/workqueue.h b/kvstore/workqueue.h
new file mode 100644 (file)
index 0000000..cfb2e02
--- /dev/null
@@ -0,0 +1,78 @@
+#ifndef __WORKQUEUE_H__
+#define __WORKQUEUE_H__ 1
+
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
+#include <queue>
+#include "util.h"
+
+using namespace boost;
+using namespace std;
+
+namespace bicker
+{
+    struct interrupted_error : public virtual std::exception { };
+
+    class WorkUnit
+    {
+    public:
+        virtual ~WorkUnit() {};
+        virtual void run() = 0;
+    };
+
+    class WorkQueue
+    {
+    public:
+        WorkQueue();
+        WorkQueue(int thread_count);
+
+        ~WorkQueue();
+
+        shared_ptr<WorkUnit> get();
+        void put(shared_ptr<WorkUnit> work_unit);
+
+    protected:
+        void spawnThread();
+
+    private:
+        class Worker
+        {
+        public:
+            Worker(WorkQueue* queue);
+            void operator()();
+        private:
+            WorkQueue *_queue;
+        };
+
+        int _thread_count;
+        int _min_threads;
+        int _max_threads;
+        mutex _queue_lock;
+        condition_variable _queue_non_empty;
+        queue<shared_ptr<WorkUnit> > _queue;
+        thread_group _threads;
+        volatile bool _running;
+    };
+
+    class TaskNotification
+    {
+    public:
+        TaskNotification();
+
+        void registerTask();
+        void completeTask(bool success = true);
+
+        void waitForComplete();
+
+        bool failCount();
+    private:
+        int                _expected;
+        int                _count;
+        int                _fail_count;
+        mutex              _lock;
+        condition_variable _cond;
+    };
+
+} // namespace bicker
+
+#endif