--- /dev/null
+cmake_minimum_required(VERSION 2.6)
+add_custom_command(OUTPUT kvstore.pb.cc kvstore.pb.h
+ COMMAND protoc --cpp_out=. kvstore.proto
+ DEPENDS kvstore.proto)
+add_library(protobufrpc protobufrpc.cc socket_pool.cc workqueue.cc)
+add_library(kvservice kvservice.cc kvstore.pb.cc backend.cc)
+add_library(kvclient kvclient.cc)
+add_executable(kvstore kvstore.cc)
+ kvservice
+ boost_thread-mt boost_regex-mt boost_system-mt
+ boost_program_options-mt db protobuf protobufrpc pthread)
if (log_in_memory)
- res = _dbenv->set_flags(_dbenv, DB_LOG_INMEMORY, 1);
+ res = _dbenv->set_flags(_dbenv, DB_LOG_IN_MEMORY, 1);
if (res != 0)
cerr << db_strerror(res) << endl;
- throw std::runtime_error("BDB ENV DB_LOG_INMEMORY");
+ throw std::runtime_error("BDB ENV DB_LOG_IN_MEMORY");
- res = _dbenv->set_flags(_dbenv, DB_LOG_AUTOREMOVE, 1);
+ res = _dbenv->set_flags(_dbenv, DB_LOG_AUTO_REMOVE, 1);
if (res != 0)
cerr << db_strerror(res) << endl;
- throw std::runtime_error("BDB ENV DB_LOG_AUTOREMOVE");
+ throw std::runtime_error("BDB ENV DB_LOG_AUTO_REMOVE");
res = _dbenv->open(_dbenv,
+++ /dev/null
+++ /dev/null
+++ /dev/null
+++ /dev/null
+++ /dev/null
-#!/usr/bin/make -f
-# -*- makefile -*-
-# Sample debian/rules that uses debhelper.
-# This file was originally written by Joey Hess and Craig Small.
-# As a special exception, when this file is copied by dh-make into a
-# dh-make output file, you may use that output file without restriction.
-# This special exception was added by Craig Small in version 0.37 of dh-make.
-# Uncomment this to turn on verbose mode.
-#export DH_VERBOSE=1
- dh_testdir
- scons
-install: build
- dh_testdir
- dh_testroot
- dh_prep
- dh_installdirs
- scons --prefix=$(CURDIR)/debian/tmp install
- scons -c
- dh_clean
-binary-arch: install
- dh_testdir
- dh_testroot
- dh_installchangelogs
- dh_install
- dh_link
- #dh_strip
- dh_compress
- dh_fixperms
- dh_installdeb
- dh_gencontrol
- dh_md5sums
- dh_builddeb
-binary: binary-arch
- dh $@
--- /dev/null
+#include "protobufrpc.h"
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/functional/hash.hpp>
+using namespace std;
+using namespace boost;
+using asio::buffer;
+namespace bicker
+template<typename T>
+static void* void_write(void* data, T val)
+ *((T*)data) = val;
+ return (char*)data + sizeof(T);
+class ProtoBufRpcServiceRequest
+ ProtoBufRpcServiceRequest(
+ RpcController *ctrl,
+ const MethodDescriptor* method,
+ Message *request,
+ Message *response,
+ shared_ptr<ProtoBufRpcConnection> conn
+ )
+ :_ctrl(ctrl),
+ _method(method),
+ _request(request),
+ _response(response),
+ _conn(conn)
+ {
+ }
+ ~ProtoBufRpcServiceRequest()
+ {
+ }
+ static void run(ProtoBufRpcServiceRequest *req)
+ {
+ req->_conn->writeResponse(req->_response.get());
+ delete req;
+ }
+ shared_ptr<RpcController> _ctrl;
+ const MethodDescriptor *_method;
+ shared_ptr<Message> _request;
+ shared_ptr<Message> _response;
+ shared_ptr<ProtoBufRpcConnection> _conn;
+ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service,
+ Service *service)
+ _strand(io_service),
+ _service(service),
+ _state(STATE_NONE)
+tcp::socket& ProtoBufRpcConnection::socket()
+ return _socket;
+void ProtoBufRpcConnection::start()
+ _socket.async_read_some(_buffer.prepare(4096),
+ _strand.wrap(
+ boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(),
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)));
+void ProtoBufRpcConnection::writeResponse(Message *msg)
+ int rlen = msg->ByteSize();
+ int len = htonl(rlen);
+ int mlen = sizeof(len) + rlen;
+ void * data = asio::buffer_cast<void*>(_buffer.prepare(mlen));
+ data = void_write(data, len);
+ using google::protobuf::io::ArrayOutputStream;
+ ArrayOutputStream as(data, rlen);
+ msg->SerializeToZeroCopyStream(&as);
+ _buffer.commit(mlen);
+ asio::async_write(_socket,
+ _buffer.data(),
+ _strand.wrap(
+ boost::bind(&ProtoBufRpcConnection::handle_write,
+ shared_from_this(),
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)));
+void ProtoBufRpcConnection::handle_read(const error_code& e,
+ std::size_t bytes_transferred)
+ if (!e)
+ {
+ _buffer.commit(bytes_transferred);
+ if (_state == STATE_NONE)
+ {
+ if (_buffer.size() >= sizeof(_id) + sizeof(_len))
+ {
+ string b(
+ buffers_begin(_buffer.data()),
+ buffers_begin(_buffer.data())
+ + sizeof(_id) + sizeof(_len)
+ );
+ _buffer.consume(sizeof(_id) + sizeof(_len));
+ _id = *((int*)b.c_str());
+ _id = ntohl(_id);
+ _len = *((unsigned int*)(b.c_str() + sizeof(_id)));
+ _len = ntohl(_len);
+ }
+ else
+ {
+ start();
+ }
+ }
+ if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA)
+ {
+ if (_buffer.size() >= _len)
+ {
+ const MethodDescriptor* method =
+ _service->GetDescriptor()->method(_id);
+ Message *req = _service->GetRequestPrototype(method).New();
+ Message *resp = _service->GetResponsePrototype(method).New();
+ using google::protobuf::io::ArrayInputStream;
+ using google::protobuf::io::CodedInputStream;
+ const void* data = asio::buffer_cast<const void*>(
+ _buffer.data()
+ );
+ ArrayInputStream as(data, _len);
+ CodedInputStream is(&as);
+ is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+ if (!req->ParseFromCodedStream(&is))
+ {
+ throw std::runtime_error("ParseFromCodedStream");
+ }
+ _buffer.consume(_len);
+ ProtoBufRpcController *ctrl = new ProtoBufRpcController();
+ _service->CallMethod(method,
+ ctrl,
+ req,
+ resp,
+ NewCallback(
+ &ProtoBufRpcServiceRequest::run,
+ new ProtoBufRpcServiceRequest(
+ ctrl,
+ method,
+ req,
+ resp,
+ shared_from_this())
+ )
+ );
+ _state = STATE_NONE;
+ }
+ else
+ {
+ start();
+ }
+ }
+ }
+ else
+ {
+ error_code ignored_ec;
+ _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+ }
+void ProtoBufRpcConnection::handle_write(const error_code& e,
+ std::size_t bytes_transferred)
+ if (e)
+ {
+ error_code ignored_ec;
+ _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+ }
+ else
+ {
+ _buffer.consume(bytes_transferred);
+ if (_buffer.size())
+ {
+ asio::async_write(_socket,
+ _buffer.data(),
+ _strand.wrap(
+ boost::bind(&ProtoBufRpcConnection::handle_write,
+ shared_from_this(),
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)));
+ return;
+ }
+ _state = STATE_NONE;
+ start();
+ }
+ :_io_service(new asio::io_service())
+bool ProtoBufRpcServer::registerService(uint16_t port,
+ shared_ptr<Service> service)
+ // This is not thread safe
+ // The RegisteredService Constructor fires up the appropriate
+ // async accepts for the service
+ _services.push_back(shared_ptr<RegisteredService>(
+ new RegisteredService(
+ _io_service,
+ port,
+ service)));
+ return true;
+void run_wrapper(asio::io_service *io_service)
+ struct itimerval itimer;
+ setitimer(ITIMER_PROF, &itimer, NULL);
+ io_service->run();
+void ProtoBufRpcServer::run()
+ try
+ {
+ if (_services.size() == 0)
+ {
+ throw std::runtime_error("No services registered for ProtoBufRpcServer");
+ }
+ size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN);
+ vector<shared_ptr<thread> > threads;
+ for (size_t i = 0; i < nprocs; ++i)
+ {
+ shared_ptr<thread> t(new thread(
+ boost::bind(
+ //&run_wrapper,
+ &asio::io_service::run,
+ _io_service.get())));
+ threads.push_back(t);
+ }
+ for (size_t i = 0; i < threads.size(); ++i)
+ {
+ threads[i]->join();
+ }
+ }
+ catch (std::exception &e)
+ {
+ std::cerr << "ProtoBufRpcService" << e.what() << std::endl;
+ }
+void ProtoBufRpcServer::shutdown()
+ _io_service->stop();
+ shared_ptr<asio::io_service> io_service,
+ uint16_t port,
+ shared_ptr<Service> service
+ )
+ _port(port),
+ _service(service),
+ _endpoint(tcp::v4(), _port),
+ _acceptor(*_io_service),
+ _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get()))
+ _acceptor.open(_endpoint.protocol());
+ _acceptor.set_option(tcp::acceptor::reuse_address(true));
+ _acceptor.bind(_endpoint);
+ _acceptor.listen();
+ _acceptor.async_accept(_new_connection->socket(),
+ boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
+ this,
+ asio::placeholders::error));
+void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e)
+ if (!e)
+ {
+ _new_connection->start();
+ _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get()));
+ _acceptor.async_accept(_new_connection->socket(),
+ boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
+ this,
+ asio::placeholders::error));
+ }
+void ProtoBufRpcController::Reset()
+bool ProtoBufRpcController::Failed() const
+ return false;
+string ProtoBufRpcController::ErrorText() const
+ return "No Error";
+void ProtoBufRpcController::StartCancel()
+void ProtoBufRpcController::SetFailed(const string &/*reason*/)
+bool ProtoBufRpcController::IsCanceled() const
+ return false;
+void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/)
+class ProtoBufRpcChannel::MethodHandler
+ : public enable_shared_from_this<MethodHandler>,
+ private boost::noncopyable
+ MethodHandler(auto_ptr<SocketCheckout> socket,
+ const MethodDescriptor * method,
+ RpcController * controller,
+ const Message * request,
+ Message * response,
+ Closure * done
+ )
+ :_socket(socket),
+ _method(method),
+ _controller(controller),
+ _request(request),
+ _response(response),
+ _done(done)
+ {
+ }
+ ~MethodHandler()
+ {
+ _socket.reset();
+ _done->Run();
+ }
+ static void execute(shared_ptr<MethodHandler> this_ptr)
+ {
+ int index = htonl(this_ptr->_method->index());
+ int rlen = this_ptr->_request->ByteSize();
+ int len = htonl(rlen);
+ int mlen = sizeof(index) + sizeof(len) + rlen;
+ void * data = asio::buffer_cast<void*>(this_ptr->_buffer.prepare(mlen));
+ data = void_write(data, index);
+ data = void_write(data, len);
+ using google::protobuf::io::ArrayOutputStream;
+ ArrayOutputStream as(data, rlen);
+ this_ptr->_request->SerializeToZeroCopyStream(&as);
+ this_ptr->_buffer.commit(mlen);
+ (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+ boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred));
+ }
+ static void handle_write(shared_ptr<MethodHandler> this_ptr,
+ const error_code& e,
+ std::size_t bytes_transferred)
+ {
+ if (!e)
+ {
+ this_ptr->_buffer.consume(bytes_transferred);
+ if (this_ptr->_buffer.size())
+ {
+ (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+ boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred));
+ return;
+ }
+ (*(this_ptr->_socket))->async_receive(
+ buffer(&this_ptr->_len, sizeof(this_ptr->_len)),
+ boost::bind(
+ &ProtoBufRpcChannel::MethodHandler::handle_read_len,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)
+ );
+ }
+ else
+ {
+ this_ptr->_controller->SetFailed(e.message());
+ (*(this_ptr->_socket))->close();
+ }
+ }
+ static void handle_read_len(shared_ptr<MethodHandler> this_ptr,
+ const error_code& e,
+ std::size_t bytes_transferred)
+ {
+ if (!e && bytes_transferred == sizeof(this_ptr->_len))
+ {
+ this_ptr->_len = ntohl(this_ptr->_len);
+ (*(this_ptr->_socket))->async_receive(
+ this_ptr->_buffer.prepare(this_ptr->_len),
+ boost::bind(
+ &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred
+ )
+ );
+ }
+ else
+ {
+ this_ptr->_controller->SetFailed(e.message());
+ (*(this_ptr->_socket))->close();
+ }
+ }
+ static void handle_read_response(shared_ptr<MethodHandler> this_ptr,
+ const error_code& e,
+ std::size_t bytes_transferred)
+ {
+ if (!e)
+ {
+ this_ptr->_buffer.commit(bytes_transferred);
+ if (this_ptr->_buffer.size() >= this_ptr->_len)
+ {
+ using google::protobuf::io::ArrayInputStream;
+ using google::protobuf::io::CodedInputStream;
+ const void* data = asio::buffer_cast<const void*>(
+ this_ptr->_buffer.data()
+ );
+ ArrayInputStream as(data, this_ptr->_len);
+ CodedInputStream is(&as);
+ is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+ if (!this_ptr->_response->ParseFromCodedStream(&is))
+ {
+ throw std::runtime_error("ParseFromCodedStream");
+ }
+ this_ptr->_buffer.consume(this_ptr->_len);
+ }
+ else
+ {
+ (*(this_ptr->_socket))->async_receive(
+ this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()),
+ boost::bind(
+ &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred
+ )
+ );
+ return;
+ }
+ }
+ else
+ {
+ this_ptr->_controller->SetFailed(e.message());
+ (*(this_ptr->_socket))->close();
+ }
+ }
+ auto_ptr<SocketCheckout> _socket;
+ const MethodDescriptor * _method;
+ RpcController * _controller;
+ const Message * _request;
+ Message * _response;
+ Closure * _done;
+ asio::streambuf _buffer;
+ unsigned int _len;
+ bool _status;
+ unsigned int _sent;
+ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost,
+ const string &port)
+ :_remote_host(remotehost), _port(port),
+ _resolver(_io_service),
+ _acceptor(_io_service),
+ _pool(2000, _io_service),
+ _lame_socket(_io_service),
+ _thread()
+// &asio::io_service::run,
+// &_io_service)))
+ tcp::resolver::query query(_remote_host, _port);
+ tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query);
+ tcp::resolver::iterator end;
+ error_code error = asio::error::host_not_found;
+ if (endpoint_iterator == end) throw syserr::system_error(error);
+ _pool.setEndpoint(*endpoint_iterator);
+ tcp::endpoint e(tcp::v4(), 0);
+ _acceptor.open(e.protocol());
+ _acceptor.set_option(tcp::acceptor::reuse_address(true));
+ _acceptor.bind(e);
+ _acceptor.listen();
+ _acceptor.async_accept(_lame_socket,
+ boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this,
+ asio::placeholders::error));
+ _thread = shared_ptr<thread>(new thread(
+ boost::bind(
+ &asio::io_service::run,
+ &_io_service)));
+void ProtoBufRpcChannel::lame_handle_accept(const error_code &err)
+ if (!err)
+ {
+ _acceptor.async_accept(_lame_socket,
+ boost::bind(&ProtoBufRpcChannel::lame_handle_accept,
+ this,
+ asio::placeholders::error));
+ }
+ _pool.cancelAndClear();
+ _io_service.stop();
+ _thread->join();
+void ProtoBufRpcChannel::CallMethod(
+ const MethodDescriptor * method,
+ RpcController * controller,
+ const Message * request,
+ Message * response,
+ Closure * done)
+ shared_ptr<MethodHandler> h(
+ new MethodHandler(
+ auto_ptr<SocketCheckout>(new SocketCheckout(&_pool)),
+ method,
+ controller,
+ request,
+ response,
+ done
+ ));
+ MethodHandler::execute(h);
+} // namespace bicker
--- /dev/null
+#ifndef __PROTOBUFRPC_H__
+#define __PROTOBUFRPC_H__
+#include <stdint.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread.hpp>
+#include <boost/function.hpp>
+#include <queue>
+#include <set>
+#include "socket_pool.h"
+#include "util.h"
+#include <google/protobuf/stubs/common.h>
+#include <google/protobuf/generated_message_reflection.h>
+#include <google/protobuf/repeated_field.h>
+#include <google/protobuf/extension_set.h>
+#include <google/protobuf/service.h>
+using namespace std;
+using namespace google::protobuf;
+using namespace boost;
+namespace bicker
+class ProtoBufRpcService;
+class ProtoBufRpcConnection
+ : public enable_shared_from_this<ProtoBufRpcConnection>,
+ private boost::noncopyable
+ explicit ProtoBufRpcConnection(asio::io_service& io_service,
+ Service *_service);
+ asio::ip::tcp::socket& socket();
+ void start();
+ void writeResponse(Message *msg);
+ void handle_read(const error_code& e,
+ std::size_t bytes_transferred);
+ void handle_write(const error_code& e,
+ std::size_t bytes_transferred);
+ tcp::socket _socket;
+ asio::io_service::strand _strand;
+ Service *_service;
+ asio::streambuf _buffer;
+ int _id;
+ unsigned int _len;
+ enum {
+ } _state;
+class ProtoBufRpcServer
+ ProtoBufRpcServer();
+ bool registerService(uint16_t port,
+ shared_ptr< Service> service);
+ // So we can call this as a thread.
+ void run();
+ // So we can stop..
+ void shutdown();
+ class RegisteredService
+ {
+ public:
+ RegisteredService(
+ shared_ptr<asio::io_service> io_service,
+ uint16_t port,
+ shared_ptr<Service> service
+ );
+ void handle_accept(const error_code& e);
+ protected:
+ // Ref to parent's
+ shared_ptr<asio::io_service> _io_service;
+ uint16_t _port;
+ shared_ptr<Service> _service;
+ tcp::endpoint _endpoint;
+ tcp::acceptor _acceptor;
+ shared_ptr<ProtoBufRpcConnection> _new_connection;
+ };
+ list<shared_ptr<RegisteredService> > _services;
+ shared_ptr<asio::io_service> _io_service;
+class ProtoBufRpcController : public RpcController
+ ProtoBufRpcController();
+ virtual ~ProtoBufRpcController();
+ virtual void Reset();
+ virtual bool Failed() const;
+ virtual string ErrorText() const;
+ virtual void StartCancel();
+ virtual void SetFailed(const string &reason);
+ virtual bool IsCanceled() const;
+ virtual void NotifyOnCancel(Closure *callback);
+class ProtoBufRpcChannel
+ : public RpcChannel,
+ public enable_shared_from_this<ProtoBufRpcChannel>,
+ private boost::noncopyable
+ ProtoBufRpcChannel(const string &remotehost, const string &port);
+ virtual ~ProtoBufRpcChannel();
+ virtual void CallMethod(
+ const MethodDescriptor * method,
+ RpcController * controller,
+ const Message * request,
+ Message * response,
+ Closure * done);
+ shared_ptr<tcp::socket> getSocket();
+ void putSocket(shared_ptr<tcp::socket>);
+ class MethodHandler;
+ string _remote_host;
+ string _port;
+ asio::io_service _io_service;
+ tcp::resolver _resolver;
+ // This exists to keep the io service running
+ tcp::acceptor _acceptor;
+ SocketPool _pool;
+ asio::ip::tcp::socket _lame_socket;
+ void lame_handle_accept(const error_code &err);
+ shared_ptr<boost::thread> _thread;
+} // namespace bicker
+++ /dev/null
+++ /dev/null
+++ /dev/null
+++ /dev/null
+++ /dev/null
+++ /dev/null
+++ /dev/null
+++ /dev/null
--- /dev/null
+#include "socket_pool.h"
+SocketPool::SocketPool(int max_sockets,
+ asio::io_service &io_svc
+ )
+ _max_sockets(max_sockets),
+ _io_service(io_svc)
+void SocketPool::setEndpoint(const tcp::endpoint &endpoint)
+ _endpoint = endpoint;
+void SocketPool::cancelAndClear()
+ for (set<shared_ptr<tcp::socket> >::iterator i = _set.begin();
+ i != _set.end();
+ ++i)
+ {
+ (*i)->cancel();
+ }
+ while (!_queue.empty()) _queue.pop();
+ _set.clear();
+shared_ptr<tcp::socket> SocketPool::getSocket()
+ mutex::scoped_lock lock(_sockets_lock);
+ while (_queue.size() == 0 && _issued >= _max_sockets)
+ _sockets_non_empty.wait(lock);
+ if (_queue.size())
+ {
+ shared_ptr<tcp::socket> socket = _queue.front();
+ _queue.pop();
+ return socket;
+ }
+ else
+ {
+ ++_issued;
+ error_code error = asio::error::host_not_found;
+ shared_ptr<tcp::socket> socket(new tcp::socket(_io_service));
+ socket->connect(_endpoint, error);
+ if (error) throw syserr::system_error(error);
+ _set.insert(socket);
+ return socket;
+ }
+void SocketPool::putSocket(shared_ptr<tcp::socket> socket)
+ mutex::scoped_lock lock(_sockets_lock);
+ if (!socket->is_open())
+ {
+ cerr << "socket closed\n";
+ --_issued;
+ _set.erase(socket);
+ }
+ else
+ {
+ _queue.push(socket);
+ }
+ _sockets_non_empty.notify_one();
+SocketCheckout::SocketCheckout(SocketPool *pool)
+ :_socket(pool->getSocket()),
+ _pool(pool)
+ _pool->putSocket(_socket);
+tcp::socket& SocketCheckout::operator*()
+ return *_socket;
+tcp::socket* SocketCheckout::operator->()
+ return _socket.get();
+shared_ptr<tcp::socket>& SocketCheckout::socket()
+ return _socket;
--- /dev/null
+#ifndef _SOCKET_POOL_H_
+#define _SOCKET_POOL_H_ 1
+#include <iostream>
+#include <set>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include "util.h"
+using namespace std;
+using namespace boost;
+class SocketPool
+ SocketPool(int max_streams,
+ asio::io_service &io_svc);
+ void setEndpoint(const tcp::endpoint &endpoint);
+ void cancelAndClear();
+ shared_ptr<tcp::socket> getSocket();
+ void putSocket(shared_ptr<tcp::socket> socket);
+ int _issued;
+ int _max_sockets;
+ mutex _sockets_lock;
+ condition_variable _sockets_non_empty;
+ asio::io_service &_io_service;
+ tcp::endpoint _endpoint;
+ queue<shared_ptr<tcp::socket> > _queue;
+ set<shared_ptr<tcp::socket> > _set;
+class SocketCheckout
+ SocketCheckout(SocketPool *pool);
+ ~SocketCheckout();
+ tcp::socket& operator*();
+ tcp::socket* operator->();
+ shared_ptr<tcp::socket>& socket();
+ shared_ptr<tcp::socket> _socket;
+ SocketPool *_pool;
--- /dev/null
+#ifndef _UTIL_H_
+#define _UTIL_H_ 1
+#include <boost/version.hpp>
+#if BOOST_VERSION <= 103500
+#include <boost/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+#include <asio.hpp>
+#include <asio/buffer.hpp>
+//typedef boost::condition condition_variable ;
+//typedef boost::detail::thread::scoped_lock<boost::mutex> scoped_lock;
+using asio::ip::tcp;
+using asio::error_code;
+using asio::buffers_begin;
+namespace syserr=asio;
+#if BOOST_VERSION < 104000
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
--- /dev/null
+#include "workqueue.h"
+#include <boost/thread.hpp>
+namespace bicker
+ :_thread_count(0), _min_threads(1), _max_threads(50), _running(true)
+ for (int i = 0; i < _min_threads; ++i)
+ {
+ spawnThread();
+ }
+WorkQueue::WorkQueue(int thread_count)
+ :_thread_count(0),
+ _min_threads(1), _max_threads(thread_count), _running(true)
+ for (int i = 0; i < _min_threads; ++i)
+ {
+ spawnThread();
+ }
+ _running = false;
+ {
+ mutex::scoped_lock lock(_queue_lock);
+ _queue_non_empty.notify_all();
+ }
+ _threads.join_all();
+void WorkQueue::spawnThread()
+ ++_thread_count;
+ _threads.create_thread(Worker(this));
+shared_ptr<WorkUnit> WorkQueue::get()
+ mutex::scoped_lock lock(_queue_lock);
+ while (_queue.size() == 0)
+ {
+ _queue_non_empty.wait(lock);
+ if (!_running) throw interrupted_error();
+ }
+ shared_ptr<WorkUnit> back = _queue.front();
+ _queue.pop();
+ if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread();
+ return back;
+void WorkQueue::put(shared_ptr<WorkUnit> work_unit)
+ mutex::scoped_lock lock(_queue_lock);
+ _queue.push(work_unit);
+ _queue_non_empty.notify_one();
+WorkQueue::Worker::Worker(WorkQueue* queue)
+ :_queue(queue)
+void WorkQueue::Worker::operator()()
+ while (true)
+ {
+ try
+ {
+ shared_ptr<WorkUnit> unit = _queue->get();
+ unit->run();
+ }
+ catch (interrupted_error)
+ {
+ return;
+ }
+ }
+:_expected(0), _count(0), _fail_count(0)
+void TaskNotification::registerTask()
+ mutex::scoped_lock lock(_lock);
+ ++_expected;
+void TaskNotification::completeTask(bool success)
+ mutex::scoped_lock lock(_lock);
+ if (!success) ++_fail_count;
+ if (++_count == _expected) _cond.notify_all();
+void TaskNotification::waitForComplete()
+ mutex::scoped_lock lock(_lock);
+ while (_count < _expected)
+ {
+ _cond.wait(lock);
+ }
+bool TaskNotification::failCount()
+ return _fail_count;
+} // namespace bicker
--- /dev/null
+#ifndef __WORKQUEUE_H__
+#define __WORKQUEUE_H__ 1
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
+#include <queue>
+#include "util.h"
+using namespace boost;
+using namespace std;
+namespace bicker
+ struct interrupted_error : public virtual std::exception { };
+ class WorkUnit
+ {
+ public:
+ virtual ~WorkUnit() {};
+ virtual void run() = 0;
+ };
+ class WorkQueue
+ {
+ public:
+ WorkQueue();
+ WorkQueue(int thread_count);
+ ~WorkQueue();
+ shared_ptr<WorkUnit> get();
+ void put(shared_ptr<WorkUnit> work_unit);
+ protected:
+ void spawnThread();
+ private:
+ class Worker
+ {
+ public:
+ Worker(WorkQueue* queue);
+ void operator()();
+ private:
+ WorkQueue *_queue;
+ };
+ int _thread_count;
+ int _min_threads;
+ int _max_threads;
+ mutex _queue_lock;
+ condition_variable _queue_non_empty;
+ queue<shared_ptr<WorkUnit> > _queue;
+ thread_group _threads;
+ volatile bool _running;
+ };
+ class TaskNotification
+ {
+ public:
+ TaskNotification();
+ void registerTask();
+ void completeTask(bool success = true);
+ void waitForComplete();
+ bool failCount();
+ private:
+ int _expected;
+ int _count;
+ int _fail_count;
+ mutex _lock;
+ condition_variable _cond;
+ };
+} // namespace bicker