--- /dev/null
+cmake_minimum_required(VERSION 2.6)
+
+add_custom_command(OUTPUT kvstore.pb.cc kvstore.pb.h
+ COMMAND protoc --cpp_out=. kvstore.proto
+ DEPENDS kvstore.proto)
+
+add_library(protobufrpc protobufrpc.cc socket_pool.cc workqueue.cc)
+add_library(kvservice kvservice.cc kvstore.pb.cc backend.cc)
+add_library(kvclient kvclient.cc)
+add_executable(kvstore kvstore.cc)
+
+target_link_libraries(kvstore
+ kvservice
+ boost_thread-mt boost_regex-mt boost_system-mt
+ boost_program_options-mt db protobuf protobufrpc pthread)
if (log_in_memory)
{
- res = _dbenv->set_flags(_dbenv, DB_LOG_INMEMORY, 1);
+ res = _dbenv->set_flags(_dbenv, DB_LOG_IN_MEMORY, 1);
if (res != 0)
{
cerr << db_strerror(res) << endl;
- throw std::runtime_error("BDB ENV DB_LOG_INMEMORY");
+ throw std::runtime_error("BDB ENV DB_LOG_IN_MEMORY");
}
}
- res = _dbenv->set_flags(_dbenv, DB_LOG_AUTOREMOVE, 1);
+ res = _dbenv->set_flags(_dbenv, DB_LOG_AUTO_REMOVE, 1);
if (res != 0)
{
cerr << db_strerror(res) << endl;
- throw std::runtime_error("BDB ENV DB_LOG_AUTOREMOVE");
+ throw std::runtime_error("BDB ENV DB_LOG_AUTO_REMOVE");
}
res = _dbenv->open(_dbenv,
+++ /dev/null
-bicker-fcgi for Debian
-----------------------
-
-<possible notes regarding this package - if none, delete this file>
-
- -- John McCullough <jcm@salud.ucsd.edu> Tue, 29 Sep 2009 13:29:18 -0700
+++ /dev/null
-bicker-fcgi for Debian
-----------------------
-
-<this file describes information about the source package, see Debian policy
-manual section 4.14. You WILL either need to modify or delete this file>
-
-
-
-
+++ /dev/null
-kvstore (0.1-7) unstable; urgency=low
-
- * Reduced memory copying in protobuf, support for multiple data
- targets
-
- -- John McCullough <jcm@salud.ucsd.edu> Thu, 21 Jan 2010 11:10:45 -0800
-
-kvstore (0.1-6) unstable; urgency=low
-
- * Improved RPC Layer
-
- -- John McCullough <jcm@salud.ucsd.edu> Thu, 24 Dec 2009 14:25:41 -0800
-
-kvstore (0.1-5) unstable; urgency=low
-
- * KeyValue -> KVStore rename.
- * Fixed Dependencies
-
- -- John McCullough <jcm@salud.ucsd.edu> Sun, 29 Nov 2009 16:17:23 -0800
-
-kvstore (0.1-3) unstable; urgency=low
-
- * More Packaging Fun
-
- -- John McCullough <jcm@salud.ucsd.edu> Sun, 29 Nov 2009 15:34:41 -0800
-
-kvstore (0.1-2) unstable; urgency=low
-
- * Incremented boost version to .0
-
- -- John McCullough <jcm@salud.ucsd.edu> Sun, 29 Nov 2009 15:32:01 -0800
-
-kvstore (0.1-1) unstable; urgency=low
-
- * Initial release (Closes: #nnnn) <nnnn is the bug number of your ITP>
-
- -- John McCullough <jcm@salud.ucsd.edu> Tue, 29 Sep 2009 13:29:18 -0700
+++ /dev/null
-Source: kvstore
-Section: unknown
-Priority: extra
-Maintainer: John McCullough <jcm@salud.ucsd.edu>
-Build-Depends: debhelper (>= 7), libboost-thread1.35-dev, libboost-system1.35-dev, libboost1.35-dev, libasio-dev, protobuf-compiler, libprotobuf-dev, libdb4.6-dev, libgtest-dev
-Standards-Version: 3.8.3,
-Homepage: <insert the upstream URL, if relevant>
-
-Package: kvstore
-Architecture: any
-Depends: ${misc:Depends}, libboost-thread1.35.0, libboost-regex1.35.0, libboost-program-options1.35.0, libboost-system1.35.0, libprotobuf4, libdb4.6, libgtest0
-Description: Homebrew key value store
- <insert long description, indented with spaces>
+++ /dev/null
-This work was packaged for Debian by:
-
- John McCullough <jcm@salud.ucsd.edu> on Tue, 29 Sep 2009 13:29:18 -0700
-
-It was downloaded from <url://example.com>
-
-Upstream Author(s):
-
- <put author's name and email here>
- <likewise for another author>
-
-Copyright:
-
- <Copyright (C) YYYY Name OfAuthor>
- <likewise for another author>
-
-License:
-
- <Put the license of the package here indented by 4 spaces>
-
-The Debian packaging is:
-
- Copyright (C) 2009 John McCullough <jcm@salud.ucsd.edu>
-
-# Please chose a license for your packaging work. If the program you package
-# uses a mainstream license, using the same license is the safest choice.
-# Please avoid to pick license terms that are more restrictive than the
-# packaged work, as it may make Debian's contributions unacceptable upstream.
-# If you just want it to be GPL version 3, leave the following lines in.
-
-and is licensed under the GPL version 3,
-see `/usr/share/common-licenses/GPL-3'.
-
-# Please also look if there are files or directories which have a
-# different copyright/license attached and list them here.
+++ /dev/null
-usr/bin/kvstore
+++ /dev/null
-#!/usr/bin/make -f
-# -*- makefile -*-
-# Sample debian/rules that uses debhelper.
-# This file was originally written by Joey Hess and Craig Small.
-# As a special exception, when this file is copied by dh-make into a
-# dh-make output file, you may use that output file without restriction.
-# This special exception was added by Craig Small in version 0.37 of dh-make.
-
-# Uncomment this to turn on verbose mode.
-#export DH_VERBOSE=1
-build:
- dh_testdir
- scons
-
-install: build
- dh_testdir
- dh_testroot
- dh_prep
- dh_installdirs
- scons --prefix=$(CURDIR)/debian/tmp install
-
-clean:
- scons -c
- dh_clean
-
-binary-arch: install
- dh_testdir
- dh_testroot
- dh_installchangelogs
- dh_install
- dh_link
- #dh_strip
- dh_compress
- dh_fixperms
- dh_installdeb
- dh_gencontrol
- dh_md5sums
- dh_builddeb
-
-binary: binary-arch
-
-%:
- dh $@
--- /dev/null
+#include "protobufrpc.h"
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include <boost/functional/hash.hpp>
+
+using namespace std;
+
+using namespace boost;
+using asio::buffer;
+
+namespace bicker
+{
+
+template<typename T>
+static void* void_write(void* data, T val)
+{
+ *((T*)data) = val;
+ return (char*)data + sizeof(T);
+}
+
+class ProtoBufRpcServiceRequest
+{
+public:
+ ProtoBufRpcServiceRequest(
+ RpcController *ctrl,
+ const MethodDescriptor* method,
+ Message *request,
+ Message *response,
+ shared_ptr<ProtoBufRpcConnection> conn
+ )
+ :_ctrl(ctrl),
+ _method(method),
+ _request(request),
+ _response(response),
+ _conn(conn)
+ {
+ }
+
+ ~ProtoBufRpcServiceRequest()
+ {
+
+ }
+
+ static void run(ProtoBufRpcServiceRequest *req)
+ {
+
+ req->_conn->writeResponse(req->_response.get());
+
+ delete req;
+ }
+
+ shared_ptr<RpcController> _ctrl;
+ const MethodDescriptor *_method;
+ shared_ptr<Message> _request;
+ shared_ptr<Message> _response;
+ shared_ptr<ProtoBufRpcConnection> _conn;
+};
+
+ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service,
+ Service *service)
+:_socket(io_service),
+ _strand(io_service),
+ _service(service),
+ _state(STATE_NONE)
+{
+}
+
+tcp::socket& ProtoBufRpcConnection::socket()
+{
+ return _socket;
+}
+
+void ProtoBufRpcConnection::start()
+{
+ _socket.async_read_some(_buffer.prepare(4096),
+ _strand.wrap(
+ boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(),
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)));
+}
+
+void ProtoBufRpcConnection::writeResponse(Message *msg)
+{
+ int rlen = msg->ByteSize();
+ int len = htonl(rlen);
+ int mlen = sizeof(len) + rlen;
+
+ void * data = asio::buffer_cast<void*>(_buffer.prepare(mlen));
+
+ data = void_write(data, len);
+
+ using google::protobuf::io::ArrayOutputStream;
+
+ ArrayOutputStream as(data, rlen);
+
+ msg->SerializeToZeroCopyStream(&as);
+
+ _buffer.commit(mlen);
+
+ asio::async_write(_socket,
+ _buffer.data(),
+ _strand.wrap(
+ boost::bind(&ProtoBufRpcConnection::handle_write,
+ shared_from_this(),
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)));
+}
+
+
+void ProtoBufRpcConnection::handle_read(const error_code& e,
+ std::size_t bytes_transferred)
+{
+ if (!e)
+ {
+ _buffer.commit(bytes_transferred);
+
+ if (_state == STATE_NONE)
+ {
+ if (_buffer.size() >= sizeof(_id) + sizeof(_len))
+ {
+ string b(
+ buffers_begin(_buffer.data()),
+ buffers_begin(_buffer.data())
+ + sizeof(_id) + sizeof(_len)
+ );
+
+ _buffer.consume(sizeof(_id) + sizeof(_len));
+
+ _id = *((int*)b.c_str());
+ _id = ntohl(_id);
+
+ _len = *((unsigned int*)(b.c_str() + sizeof(_id)));
+ _len = ntohl(_len);
+
+ _state = STATE_HAVE_ID_AND_LEN;
+ }
+ else
+ {
+ start();
+ }
+ }
+
+ if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA)
+ {
+ if (_buffer.size() >= _len)
+ {
+ const MethodDescriptor* method =
+ _service->GetDescriptor()->method(_id);
+
+ Message *req = _service->GetRequestPrototype(method).New();
+ Message *resp = _service->GetResponsePrototype(method).New();
+
+ using google::protobuf::io::ArrayInputStream;
+ using google::protobuf::io::CodedInputStream;
+
+ const void* data = asio::buffer_cast<const void*>(
+ _buffer.data()
+ );
+ ArrayInputStream as(data, _len);
+ CodedInputStream is(&as);
+ is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+
+ if (!req->ParseFromCodedStream(&is))
+ {
+ throw std::runtime_error("ParseFromCodedStream");
+ }
+
+ _buffer.consume(_len);
+
+ ProtoBufRpcController *ctrl = new ProtoBufRpcController();
+ _service->CallMethod(method,
+ ctrl,
+ req,
+ resp,
+ NewCallback(
+ &ProtoBufRpcServiceRequest::run,
+ new ProtoBufRpcServiceRequest(
+ ctrl,
+ method,
+ req,
+ resp,
+ shared_from_this())
+ )
+ );
+ _state = STATE_NONE;
+ }
+ else
+ {
+ _state = STATE_WAITING_FOR_DATA;
+ start();
+ }
+ }
+
+ }
+ else
+ {
+ error_code ignored_ec;
+ _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+ }
+}
+
+void ProtoBufRpcConnection::handle_write(const error_code& e,
+ std::size_t bytes_transferred)
+{
+ if (e)
+ {
+ error_code ignored_ec;
+ _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+ }
+ else
+ {
+ _buffer.consume(bytes_transferred);
+
+ if (_buffer.size())
+ {
+ asio::async_write(_socket,
+ _buffer.data(),
+ _strand.wrap(
+ boost::bind(&ProtoBufRpcConnection::handle_write,
+ shared_from_this(),
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)));
+ return;
+ }
+
+ _state = STATE_NONE;
+ start();
+ }
+}
+
+ProtoBufRpcServer::ProtoBufRpcServer()
+ :_io_service(new asio::io_service())
+{
+}
+
+bool ProtoBufRpcServer::registerService(uint16_t port,
+ shared_ptr<Service> service)
+{
+ // This is not thread safe
+
+ // The RegisteredService Constructor fires up the appropriate
+ // async accepts for the service
+ _services.push_back(shared_ptr<RegisteredService>(
+ new RegisteredService(
+ _io_service,
+ port,
+ service)));
+
+ return true;
+}
+
+void run_wrapper(asio::io_service *io_service)
+{
+ struct itimerval itimer;
+ setitimer(ITIMER_PROF, &itimer, NULL);
+
+ io_service->run();
+}
+
+void ProtoBufRpcServer::run()
+{
+ try
+ {
+ if (_services.size() == 0)
+ {
+ throw std::runtime_error("No services registered for ProtoBufRpcServer");
+ }
+
+ size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN);
+
+ vector<shared_ptr<thread> > threads;
+ for (size_t i = 0; i < nprocs; ++i)
+ {
+ shared_ptr<thread> t(new thread(
+ boost::bind(
+ //&run_wrapper,
+ &asio::io_service::run,
+ _io_service.get())));
+ threads.push_back(t);
+ }
+
+ for (size_t i = 0; i < threads.size(); ++i)
+ {
+ threads[i]->join();
+ }
+ }
+ catch (std::exception &e)
+ {
+ std::cerr << "ProtoBufRpcService" << e.what() << std::endl;
+ }
+}
+
+void ProtoBufRpcServer::shutdown()
+{
+ _io_service->stop();
+}
+
+ProtoBufRpcServer::RegisteredService::RegisteredService(
+ shared_ptr<asio::io_service> io_service,
+ uint16_t port,
+ shared_ptr<Service> service
+ )
+:_io_service(io_service),
+ _port(port),
+ _service(service),
+ _endpoint(tcp::v4(), _port),
+ _acceptor(*_io_service),
+ _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get()))
+{
+ _acceptor.open(_endpoint.protocol());
+ _acceptor.set_option(tcp::acceptor::reuse_address(true));
+ _acceptor.bind(_endpoint);
+ _acceptor.listen();
+ _acceptor.async_accept(_new_connection->socket(),
+ boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
+ this,
+ asio::placeholders::error));
+}
+
+void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e)
+{
+ if (!e)
+ {
+ _new_connection->start();
+ _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get()));
+ _acceptor.async_accept(_new_connection->socket(),
+ boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
+ this,
+ asio::placeholders::error));
+ }
+
+}
+
+ProtoBufRpcController::ProtoBufRpcController()
+{
+}
+
+ProtoBufRpcController::~ProtoBufRpcController()
+{
+}
+
+void ProtoBufRpcController::Reset()
+{
+}
+
+bool ProtoBufRpcController::Failed() const
+{
+ return false;
+}
+
+string ProtoBufRpcController::ErrorText() const
+{
+ return "No Error";
+}
+
+void ProtoBufRpcController::StartCancel()
+{
+}
+
+void ProtoBufRpcController::SetFailed(const string &/*reason*/)
+{
+}
+
+bool ProtoBufRpcController::IsCanceled() const
+{
+ return false;
+}
+
+void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/)
+{
+}
+
+class ProtoBufRpcChannel::MethodHandler
+ : public enable_shared_from_this<MethodHandler>,
+ private boost::noncopyable
+{
+public:
+ MethodHandler(auto_ptr<SocketCheckout> socket,
+ const MethodDescriptor * method,
+ RpcController * controller,
+ const Message * request,
+ Message * response,
+ Closure * done
+ )
+ :_socket(socket),
+ _method(method),
+ _controller(controller),
+ _request(request),
+ _response(response),
+ _done(done)
+ {
+ }
+
+ ~MethodHandler()
+ {
+ _socket.reset();
+ _done->Run();
+ }
+
+ static void execute(shared_ptr<MethodHandler> this_ptr)
+ {
+ int index = htonl(this_ptr->_method->index());
+ int rlen = this_ptr->_request->ByteSize();
+ int len = htonl(rlen);
+
+ int mlen = sizeof(index) + sizeof(len) + rlen;
+
+ void * data = asio::buffer_cast<void*>(this_ptr->_buffer.prepare(mlen));
+
+ data = void_write(data, index);
+ data = void_write(data, len);
+
+ using google::protobuf::io::ArrayOutputStream;
+
+ ArrayOutputStream as(data, rlen);
+
+ this_ptr->_request->SerializeToZeroCopyStream(&as);
+ this_ptr->_buffer.commit(mlen);
+
+ (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+ boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred));
+ }
+
+ static void handle_write(shared_ptr<MethodHandler> this_ptr,
+ const error_code& e,
+ std::size_t bytes_transferred)
+ {
+ if (!e)
+ {
+ this_ptr->_buffer.consume(bytes_transferred);
+
+ if (this_ptr->_buffer.size())
+ {
+ (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+ boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred));
+ return;
+ }
+
+ (*(this_ptr->_socket))->async_receive(
+ buffer(&this_ptr->_len, sizeof(this_ptr->_len)),
+ boost::bind(
+ &ProtoBufRpcChannel::MethodHandler::handle_read_len,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)
+ );
+ }
+ else
+ {
+ this_ptr->_controller->SetFailed(e.message());
+ (*(this_ptr->_socket))->close();
+ }
+ }
+
+ static void handle_read_len(shared_ptr<MethodHandler> this_ptr,
+ const error_code& e,
+ std::size_t bytes_transferred)
+ {
+ if (!e && bytes_transferred == sizeof(this_ptr->_len))
+ {
+ this_ptr->_len = ntohl(this_ptr->_len);
+ (*(this_ptr->_socket))->async_receive(
+ this_ptr->_buffer.prepare(this_ptr->_len),
+ boost::bind(
+ &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred
+ )
+ );
+ }
+ else
+ {
+ this_ptr->_controller->SetFailed(e.message());
+ (*(this_ptr->_socket))->close();
+ }
+ }
+
+ static void handle_read_response(shared_ptr<MethodHandler> this_ptr,
+ const error_code& e,
+ std::size_t bytes_transferred)
+ {
+ if (!e)
+ {
+ this_ptr->_buffer.commit(bytes_transferred);
+ if (this_ptr->_buffer.size() >= this_ptr->_len)
+ {
+ using google::protobuf::io::ArrayInputStream;
+ using google::protobuf::io::CodedInputStream;
+
+ const void* data = asio::buffer_cast<const void*>(
+ this_ptr->_buffer.data()
+ );
+ ArrayInputStream as(data, this_ptr->_len);
+ CodedInputStream is(&as);
+ is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+
+ if (!this_ptr->_response->ParseFromCodedStream(&is))
+ {
+ throw std::runtime_error("ParseFromCodedStream");
+ }
+
+ this_ptr->_buffer.consume(this_ptr->_len);
+ }
+ else
+ {
+ (*(this_ptr->_socket))->async_receive(
+ this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()),
+ boost::bind(
+ &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred
+ )
+ );
+ return;
+ }
+ }
+ else
+ {
+ this_ptr->_controller->SetFailed(e.message());
+ (*(this_ptr->_socket))->close();
+ }
+ }
+
+private:
+ auto_ptr<SocketCheckout> _socket;
+ const MethodDescriptor * _method;
+ RpcController * _controller;
+ const Message * _request;
+ Message * _response;
+ Closure * _done;
+ asio::streambuf _buffer;
+ unsigned int _len;
+ bool _status;
+ unsigned int _sent;
+};
+
+
+ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost,
+ const string &port)
+ :_remote_host(remotehost), _port(port),
+ _resolver(_io_service),
+ _acceptor(_io_service),
+ _pool(2000, _io_service),
+ _lame_socket(_io_service),
+ _thread()
+// &asio::io_service::run,
+// &_io_service)))
+{
+
+ tcp::resolver::query query(_remote_host, _port);
+ tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query);
+ tcp::resolver::iterator end;
+
+ error_code error = asio::error::host_not_found;
+
+ if (endpoint_iterator == end) throw syserr::system_error(error);
+
+ _pool.setEndpoint(*endpoint_iterator);
+
+ tcp::endpoint e(tcp::v4(), 0);
+ _acceptor.open(e.protocol());
+ _acceptor.set_option(tcp::acceptor::reuse_address(true));
+ _acceptor.bind(e);
+ _acceptor.listen();
+ _acceptor.async_accept(_lame_socket,
+ boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this,
+ asio::placeholders::error));
+
+ _thread = shared_ptr<thread>(new thread(
+ boost::bind(
+ &asio::io_service::run,
+ &_io_service)));
+}
+
+void ProtoBufRpcChannel::lame_handle_accept(const error_code &err)
+{
+ if (!err)
+ {
+ _acceptor.async_accept(_lame_socket,
+ boost::bind(&ProtoBufRpcChannel::lame_handle_accept,
+ this,
+ asio::placeholders::error));
+ }
+}
+
+ProtoBufRpcChannel::~ProtoBufRpcChannel()
+{
+ _pool.cancelAndClear();
+
+ _io_service.stop();
+
+ _thread->join();
+}
+
+void ProtoBufRpcChannel::CallMethod(
+ const MethodDescriptor * method,
+ RpcController * controller,
+ const Message * request,
+ Message * response,
+ Closure * done)
+{
+ shared_ptr<MethodHandler> h(
+ new MethodHandler(
+ auto_ptr<SocketCheckout>(new SocketCheckout(&_pool)),
+ method,
+ controller,
+ request,
+ response,
+ done
+ ));
+
+ MethodHandler::execute(h);
+}
+
+} // namespace bicker
--- /dev/null
+#ifndef __PROTOBUFRPC_H__
+#define __PROTOBUFRPC_H__
+
+#include <stdint.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread.hpp>
+#include <boost/function.hpp>
+#include <queue>
+#include <set>
+#include "socket_pool.h"
+#include "util.h"
+
+#include <google/protobuf/stubs/common.h>
+#include <google/protobuf/generated_message_reflection.h>
+#include <google/protobuf/repeated_field.h>
+#include <google/protobuf/extension_set.h>
+#include <google/protobuf/service.h>
+
+using namespace std;
+using namespace google::protobuf;
+using namespace boost;
+
+namespace bicker
+{
+
+class ProtoBufRpcService;
+
+class ProtoBufRpcConnection
+ : public enable_shared_from_this<ProtoBufRpcConnection>,
+ private boost::noncopyable
+{
+public:
+ explicit ProtoBufRpcConnection(asio::io_service& io_service,
+ Service *_service);
+
+ asio::ip::tcp::socket& socket();
+
+ void start();
+
+ void writeResponse(Message *msg);
+
+
+private:
+ void handle_read(const error_code& e,
+ std::size_t bytes_transferred);
+
+ void handle_write(const error_code& e,
+ std::size_t bytes_transferred);
+
+ tcp::socket _socket;
+
+
+ asio::io_service::strand _strand;
+
+ Service *_service;
+
+ asio::streambuf _buffer;
+
+ int _id;
+ unsigned int _len;
+
+ enum {
+ STATE_NONE,
+ STATE_HAVE_ID_AND_LEN,
+ STATE_WAITING_FOR_DATA,
+ STATE_FAIL,
+ } _state;
+};
+
+
+class ProtoBufRpcServer
+{
+public:
+ ProtoBufRpcServer();
+
+ bool registerService(uint16_t port,
+ shared_ptr< Service> service);
+
+ // So we can call this as a thread.
+ void run();
+
+ // So we can stop..
+ void shutdown();
+
+protected:
+
+ class RegisteredService
+ {
+ public:
+ RegisteredService(
+ shared_ptr<asio::io_service> io_service,
+ uint16_t port,
+ shared_ptr<Service> service
+ );
+
+ void handle_accept(const error_code& e);
+
+ protected:
+ // Ref to parent's
+ shared_ptr<asio::io_service> _io_service;
+ uint16_t _port;
+ shared_ptr<Service> _service;
+ tcp::endpoint _endpoint;
+ tcp::acceptor _acceptor;
+ shared_ptr<ProtoBufRpcConnection> _new_connection;
+ };
+
+ list<shared_ptr<RegisteredService> > _services;
+ shared_ptr<asio::io_service> _io_service;
+};
+
+class ProtoBufRpcController : public RpcController
+{
+public:
+ ProtoBufRpcController();
+ virtual ~ProtoBufRpcController();
+
+ virtual void Reset();
+ virtual bool Failed() const;
+ virtual string ErrorText() const;
+ virtual void StartCancel();
+
+ virtual void SetFailed(const string &reason);
+ virtual bool IsCanceled() const;
+ virtual void NotifyOnCancel(Closure *callback);
+};
+
+class ProtoBufRpcChannel
+ : public RpcChannel,
+ public enable_shared_from_this<ProtoBufRpcChannel>,
+ private boost::noncopyable
+{
+public:
+ ProtoBufRpcChannel(const string &remotehost, const string &port);
+
+ virtual ~ProtoBufRpcChannel();
+
+ virtual void CallMethod(
+ const MethodDescriptor * method,
+ RpcController * controller,
+ const Message * request,
+ Message * response,
+ Closure * done);
+
+protected:
+ shared_ptr<tcp::socket> getSocket();
+ void putSocket(shared_ptr<tcp::socket>);
+
+private:
+ class MethodHandler;
+
+ string _remote_host;
+ string _port;
+
+
+ asio::io_service _io_service;
+ tcp::resolver _resolver;
+ // This exists to keep the io service running
+ tcp::acceptor _acceptor;
+
+ SocketPool _pool;
+
+ asio::ip::tcp::socket _lame_socket;
+ void lame_handle_accept(const error_code &err);
+
+ shared_ptr<boost::thread> _thread;
+
+};
+
+} // namespace bicker
+
+#endif
+++ /dev/null
-Import('env')
-
-base_files = ['protobufrpc.cc', 'socket_pool.cc', 'workqueue.cc']
-
-protobufrpc = env.StaticLibrary('protobufrpc', base_files)
-
-Return('protobufrpc')
+++ /dev/null
-import os
-
-AddOption('--prefix',
- dest='prefix',
- type='string',
- nargs=1,
- action='store',
- metavar='DIR',
- help='installation prefix')
-
-env = Environment(CXXFLAGS='-g -fPIC', PREFIX=GetOption('prefix'))
-
-for envvar in ('HOME', 'DISTCC_DIR', 'DISTCC_HOSTS', 'CCACHE_DIR',
- 'INTERCEPTOR_SOCKET', 'ENFORGE_DIGEST_CACHE',
- 'ENFORGE_CACHE_HOST', 'ENFORGE_CACHE_PORT'):
- if envvar in os.environ:
- env['ENV'][envvar] = os.environ[envvar]
-
-if env['PREFIX'] is not None:
- bin_dest = env['PREFIX'] + '/usr/bin'
-else:
- bin_dest = '/usr/bin'
-
-base_files = ['protobufrpc.cc', 'socket_pool.cc']
-
-if not env.has_key('LIBS'):
- env['LIBS'] = []
-
-SConscript(['SConscript'])
+++ /dev/null
-#include "protobufrpc.h"
-
-#include <google/protobuf/descriptor.h>
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-#include <google/protobuf/io/coded_stream.h>
-#include <boost/shared_ptr.hpp>
-#include <boost/bind.hpp>
-
-#include <boost/functional/hash.hpp>
-
-using namespace std;
-
-using namespace boost;
-using asio::buffer;
-
-namespace bicker
-{
-
-template<typename T>
-static void* void_write(void* data, T val)
-{
- *((T*)data) = val;
- return (char*)data + sizeof(T);
-}
-
-class ProtoBufRpcServiceRequest
-{
-public:
- ProtoBufRpcServiceRequest(
- RpcController *ctrl,
- const MethodDescriptor* method,
- Message *request,
- Message *response,
- shared_ptr<ProtoBufRpcConnection> conn
- )
- :_ctrl(ctrl),
- _method(method),
- _request(request),
- _response(response),
- _conn(conn)
- {
- }
-
- ~ProtoBufRpcServiceRequest()
- {
-
- }
-
- static void run(ProtoBufRpcServiceRequest *req)
- {
-
- req->_conn->writeResponse(req->_response.get());
-
- delete req;
- }
-
- shared_ptr<RpcController> _ctrl;
- const MethodDescriptor *_method;
- shared_ptr<Message> _request;
- shared_ptr<Message> _response;
- shared_ptr<ProtoBufRpcConnection> _conn;
-};
-
-ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service,
- Service *service)
-:_socket(io_service),
- _strand(io_service),
- _service(service),
- _state(STATE_NONE)
-{
-}
-
-tcp::socket& ProtoBufRpcConnection::socket()
-{
- return _socket;
-}
-
-void ProtoBufRpcConnection::start()
-{
- _socket.async_read_some(_buffer.prepare(4096),
- _strand.wrap(
- boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(),
- asio::placeholders::error,
- asio::placeholders::bytes_transferred)));
-}
-
-void ProtoBufRpcConnection::writeResponse(Message *msg)
-{
- int rlen = msg->ByteSize();
- int len = htonl(rlen);
- int mlen = sizeof(len) + rlen;
-
- void * data = asio::buffer_cast<void*>(_buffer.prepare(mlen));
-
- data = void_write(data, len);
-
- using google::protobuf::io::ArrayOutputStream;
-
- ArrayOutputStream as(data, rlen);
-
- msg->SerializeToZeroCopyStream(&as);
-
- _buffer.commit(mlen);
-
- asio::async_write(_socket,
- _buffer.data(),
- _strand.wrap(
- boost::bind(&ProtoBufRpcConnection::handle_write,
- shared_from_this(),
- asio::placeholders::error,
- asio::placeholders::bytes_transferred)));
-}
-
-
-void ProtoBufRpcConnection::handle_read(const error_code& e,
- std::size_t bytes_transferred)
-{
- if (!e)
- {
- _buffer.commit(bytes_transferred);
-
- if (_state == STATE_NONE)
- {
- if (_buffer.size() >= sizeof(_id) + sizeof(_len))
- {
- string b(
- buffers_begin(_buffer.data()),
- buffers_begin(_buffer.data())
- + sizeof(_id) + sizeof(_len)
- );
-
- _buffer.consume(sizeof(_id) + sizeof(_len));
-
- _id = *((int*)b.c_str());
- _id = ntohl(_id);
-
- _len = *((unsigned int*)(b.c_str() + sizeof(_id)));
- _len = ntohl(_len);
-
- _state = STATE_HAVE_ID_AND_LEN;
- }
- else
- {
- start();
- }
- }
-
- if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA)
- {
- if (_buffer.size() >= _len)
- {
- const MethodDescriptor* method =
- _service->GetDescriptor()->method(_id);
-
- Message *req = _service->GetRequestPrototype(method).New();
- Message *resp = _service->GetResponsePrototype(method).New();
-
- using google::protobuf::io::ArrayInputStream;
- using google::protobuf::io::CodedInputStream;
-
- const void* data = asio::buffer_cast<const void*>(
- _buffer.data()
- );
- ArrayInputStream as(data, _len);
- CodedInputStream is(&as);
- is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
-
- if (!req->ParseFromCodedStream(&is))
- {
- throw std::runtime_error("ParseFromCodedStream");
- }
-
- _buffer.consume(_len);
-
- ProtoBufRpcController *ctrl = new ProtoBufRpcController();
- _service->CallMethod(method,
- ctrl,
- req,
- resp,
- NewCallback(
- &ProtoBufRpcServiceRequest::run,
- new ProtoBufRpcServiceRequest(
- ctrl,
- method,
- req,
- resp,
- shared_from_this())
- )
- );
- _state = STATE_NONE;
- }
- else
- {
- _state = STATE_WAITING_FOR_DATA;
- start();
- }
- }
-
- }
- else
- {
- error_code ignored_ec;
- _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
- }
-}
-
-void ProtoBufRpcConnection::handle_write(const error_code& e,
- std::size_t bytes_transferred)
-{
- if (e)
- {
- error_code ignored_ec;
- _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
- }
- else
- {
- _buffer.consume(bytes_transferred);
-
- if (_buffer.size())
- {
- asio::async_write(_socket,
- _buffer.data(),
- _strand.wrap(
- boost::bind(&ProtoBufRpcConnection::handle_write,
- shared_from_this(),
- asio::placeholders::error,
- asio::placeholders::bytes_transferred)));
- return;
- }
-
- _state = STATE_NONE;
- start();
- }
-}
-
-ProtoBufRpcServer::ProtoBufRpcServer()
- :_io_service(new asio::io_service())
-{
-}
-
-bool ProtoBufRpcServer::registerService(uint16_t port,
- shared_ptr<Service> service)
-{
- // This is not thread safe
-
- // The RegisteredService Constructor fires up the appropriate
- // async accepts for the service
- _services.push_back(shared_ptr<RegisteredService>(
- new RegisteredService(
- _io_service,
- port,
- service)));
-
- return true;
-}
-
-void run_wrapper(asio::io_service *io_service)
-{
- struct itimerval itimer;
- setitimer(ITIMER_PROF, &itimer, NULL);
-
- io_service->run();
-}
-
-void ProtoBufRpcServer::run()
-{
- try
- {
- if (_services.size() == 0)
- {
- throw std::runtime_error("No services registered for ProtoBufRpcServer");
- }
-
- size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN);
-
- vector<shared_ptr<thread> > threads;
- for (size_t i = 0; i < nprocs; ++i)
- {
- shared_ptr<thread> t(new thread(
- boost::bind(
- //&run_wrapper,
- &asio::io_service::run,
- _io_service.get())));
- threads.push_back(t);
- }
-
- for (size_t i = 0; i < threads.size(); ++i)
- {
- threads[i]->join();
- }
- }
- catch (std::exception &e)
- {
- std::cerr << "ProtoBufRpcService" << e.what() << std::endl;
- }
-}
-
-void ProtoBufRpcServer::shutdown()
-{
- _io_service->stop();
-}
-
-ProtoBufRpcServer::RegisteredService::RegisteredService(
- shared_ptr<asio::io_service> io_service,
- uint16_t port,
- shared_ptr<Service> service
- )
-:_io_service(io_service),
- _port(port),
- _service(service),
- _endpoint(tcp::v4(), _port),
- _acceptor(*_io_service),
- _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get()))
-{
- _acceptor.open(_endpoint.protocol());
- _acceptor.set_option(tcp::acceptor::reuse_address(true));
- _acceptor.bind(_endpoint);
- _acceptor.listen();
- _acceptor.async_accept(_new_connection->socket(),
- boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
- this,
- asio::placeholders::error));
-}
-
-void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e)
-{
- if (!e)
- {
- _new_connection->start();
- _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get()));
- _acceptor.async_accept(_new_connection->socket(),
- boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
- this,
- asio::placeholders::error));
- }
-
-}
-
-ProtoBufRpcController::ProtoBufRpcController()
-{
-}
-
-ProtoBufRpcController::~ProtoBufRpcController()
-{
-}
-
-void ProtoBufRpcController::Reset()
-{
-}
-
-bool ProtoBufRpcController::Failed() const
-{
- return false;
-}
-
-string ProtoBufRpcController::ErrorText() const
-{
- return "No Error";
-}
-
-void ProtoBufRpcController::StartCancel()
-{
-}
-
-void ProtoBufRpcController::SetFailed(const string &/*reason*/)
-{
-}
-
-bool ProtoBufRpcController::IsCanceled() const
-{
- return false;
-}
-
-void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/)
-{
-}
-
-class ProtoBufRpcChannel::MethodHandler
- : public enable_shared_from_this<MethodHandler>,
- private boost::noncopyable
-{
-public:
- MethodHandler(auto_ptr<SocketCheckout> socket,
- const MethodDescriptor * method,
- RpcController * controller,
- const Message * request,
- Message * response,
- Closure * done
- )
- :_socket(socket),
- _method(method),
- _controller(controller),
- _request(request),
- _response(response),
- _done(done)
- {
- }
-
- ~MethodHandler()
- {
- _socket.reset();
- _done->Run();
- }
-
- static void execute(shared_ptr<MethodHandler> this_ptr)
- {
- int index = htonl(this_ptr->_method->index());
- int rlen = this_ptr->_request->ByteSize();
- int len = htonl(rlen);
-
- int mlen = sizeof(index) + sizeof(len) + rlen;
-
- void * data = asio::buffer_cast<void*>(this_ptr->_buffer.prepare(mlen));
-
- data = void_write(data, index);
- data = void_write(data, len);
-
- using google::protobuf::io::ArrayOutputStream;
-
- ArrayOutputStream as(data, rlen);
-
- this_ptr->_request->SerializeToZeroCopyStream(&as);
- this_ptr->_buffer.commit(mlen);
-
- (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
- boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
- this_ptr,
- asio::placeholders::error,
- asio::placeholders::bytes_transferred));
- }
-
- static void handle_write(shared_ptr<MethodHandler> this_ptr,
- const error_code& e,
- std::size_t bytes_transferred)
- {
- if (!e)
- {
- this_ptr->_buffer.consume(bytes_transferred);
-
- if (this_ptr->_buffer.size())
- {
- (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
- boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
- this_ptr,
- asio::placeholders::error,
- asio::placeholders::bytes_transferred));
- return;
- }
-
- (*(this_ptr->_socket))->async_receive(
- buffer(&this_ptr->_len, sizeof(this_ptr->_len)),
- boost::bind(
- &ProtoBufRpcChannel::MethodHandler::handle_read_len,
- this_ptr,
- asio::placeholders::error,
- asio::placeholders::bytes_transferred)
- );
- }
- else
- {
- this_ptr->_controller->SetFailed(e.message());
- (*(this_ptr->_socket))->close();
- }
- }
-
- static void handle_read_len(shared_ptr<MethodHandler> this_ptr,
- const error_code& e,
- std::size_t bytes_transferred)
- {
- if (!e && bytes_transferred == sizeof(this_ptr->_len))
- {
- this_ptr->_len = ntohl(this_ptr->_len);
- (*(this_ptr->_socket))->async_receive(
- this_ptr->_buffer.prepare(this_ptr->_len),
- boost::bind(
- &ProtoBufRpcChannel::MethodHandler::handle_read_response,
- this_ptr,
- asio::placeholders::error,
- asio::placeholders::bytes_transferred
- )
- );
- }
- else
- {
- this_ptr->_controller->SetFailed(e.message());
- (*(this_ptr->_socket))->close();
- }
- }
-
- static void handle_read_response(shared_ptr<MethodHandler> this_ptr,
- const error_code& e,
- std::size_t bytes_transferred)
- {
- if (!e)
- {
- this_ptr->_buffer.commit(bytes_transferred);
- if (this_ptr->_buffer.size() >= this_ptr->_len)
- {
- using google::protobuf::io::ArrayInputStream;
- using google::protobuf::io::CodedInputStream;
-
- const void* data = asio::buffer_cast<const void*>(
- this_ptr->_buffer.data()
- );
- ArrayInputStream as(data, this_ptr->_len);
- CodedInputStream is(&as);
- is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
-
- if (!this_ptr->_response->ParseFromCodedStream(&is))
- {
- throw std::runtime_error("ParseFromCodedStream");
- }
-
- this_ptr->_buffer.consume(this_ptr->_len);
- }
- else
- {
- (*(this_ptr->_socket))->async_receive(
- this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()),
- boost::bind(
- &ProtoBufRpcChannel::MethodHandler::handle_read_response,
- this_ptr,
- asio::placeholders::error,
- asio::placeholders::bytes_transferred
- )
- );
- return;
- }
- }
- else
- {
- this_ptr->_controller->SetFailed(e.message());
- (*(this_ptr->_socket))->close();
- }
- }
-
-private:
- auto_ptr<SocketCheckout> _socket;
- const MethodDescriptor * _method;
- RpcController * _controller;
- const Message * _request;
- Message * _response;
- Closure * _done;
- asio::streambuf _buffer;
- unsigned int _len;
- bool _status;
- unsigned int _sent;
-};
-
-
-ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost,
- const string &port)
- :_remote_host(remotehost), _port(port),
- _resolver(_io_service),
- _acceptor(_io_service),
- _pool(2000, _io_service),
- _lame_socket(_io_service),
- _thread()
-// &asio::io_service::run,
-// &_io_service)))
-{
-
- tcp::resolver::query query(_remote_host, _port);
- tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query);
- tcp::resolver::iterator end;
-
- error_code error = asio::error::host_not_found;
-
- if (endpoint_iterator == end) throw syserr::system_error(error);
-
- _pool.setEndpoint(*endpoint_iterator);
-
- tcp::endpoint e(tcp::v4(), 0);
- _acceptor.open(e.protocol());
- _acceptor.set_option(tcp::acceptor::reuse_address(true));
- _acceptor.bind(e);
- _acceptor.listen();
- _acceptor.async_accept(_lame_socket,
- boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this,
- asio::placeholders::error));
-
- _thread = shared_ptr<thread>(new thread(
- boost::bind(
- &asio::io_service::run,
- &_io_service)));
-}
-
-void ProtoBufRpcChannel::lame_handle_accept(const error_code &err)
-{
- if (!err)
- {
- _acceptor.async_accept(_lame_socket,
- boost::bind(&ProtoBufRpcChannel::lame_handle_accept,
- this,
- asio::placeholders::error));
- }
-}
-
-ProtoBufRpcChannel::~ProtoBufRpcChannel()
-{
- _pool.cancelAndClear();
-
- _io_service.stop();
-
- _thread->join();
-}
-
-void ProtoBufRpcChannel::CallMethod(
- const MethodDescriptor * method,
- RpcController * controller,
- const Message * request,
- Message * response,
- Closure * done)
-{
- shared_ptr<MethodHandler> h(
- new MethodHandler(
- auto_ptr<SocketCheckout>(new SocketCheckout(&_pool)),
- method,
- controller,
- request,
- response,
- done
- ));
-
- MethodHandler::execute(h);
-}
-
-} // namespace bicker
+++ /dev/null
-#ifndef __PROTOBUFRPC_H__
-#define __PROTOBUFRPC_H__
-
-#include <stdint.h>
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-#include <boost/thread.hpp>
-#include <boost/function.hpp>
-#include <queue>
-#include <set>
-#include "socket_pool.h"
-#include "util.h"
-
-#include <google/protobuf/stubs/common.h>
-#include <google/protobuf/generated_message_reflection.h>
-#include <google/protobuf/repeated_field.h>
-#include <google/protobuf/extension_set.h>
-#include <google/protobuf/service.h>
-
-using namespace std;
-using namespace google::protobuf;
-using namespace boost;
-
-namespace bicker
-{
-
-class ProtoBufRpcService;
-
-class ProtoBufRpcConnection
- : public enable_shared_from_this<ProtoBufRpcConnection>,
- private boost::noncopyable
-{
-public:
- explicit ProtoBufRpcConnection(asio::io_service& io_service,
- Service *_service);
-
- asio::ip::tcp::socket& socket();
-
- void start();
-
- void writeResponse(Message *msg);
-
-
-private:
- void handle_read(const error_code& e,
- std::size_t bytes_transferred);
-
- void handle_write(const error_code& e,
- std::size_t bytes_transferred);
-
- tcp::socket _socket;
-
-
- asio::io_service::strand _strand;
-
- Service *_service;
-
- asio::streambuf _buffer;
-
- int _id;
- unsigned int _len;
-
- enum {
- STATE_NONE,
- STATE_HAVE_ID_AND_LEN,
- STATE_WAITING_FOR_DATA,
- STATE_FAIL,
- } _state;
-};
-
-
-class ProtoBufRpcServer
-{
-public:
- ProtoBufRpcServer();
-
- bool registerService(uint16_t port,
- shared_ptr< Service> service);
-
- // So we can call this as a thread.
- void run();
-
- // So we can stop..
- void shutdown();
-
-protected:
-
- class RegisteredService
- {
- public:
- RegisteredService(
- shared_ptr<asio::io_service> io_service,
- uint16_t port,
- shared_ptr<Service> service
- );
-
- void handle_accept(const error_code& e);
-
- protected:
- // Ref to parent's
- shared_ptr<asio::io_service> _io_service;
- uint16_t _port;
- shared_ptr<Service> _service;
- tcp::endpoint _endpoint;
- tcp::acceptor _acceptor;
- shared_ptr<ProtoBufRpcConnection> _new_connection;
- };
-
- list<shared_ptr<RegisteredService> > _services;
- shared_ptr<asio::io_service> _io_service;
-};
-
-class ProtoBufRpcController : public RpcController
-{
-public:
- ProtoBufRpcController();
- virtual ~ProtoBufRpcController();
-
- virtual void Reset();
- virtual bool Failed() const;
- virtual string ErrorText() const;
- virtual void StartCancel();
-
- virtual void SetFailed(const string &reason);
- virtual bool IsCanceled() const;
- virtual void NotifyOnCancel(Closure *callback);
-};
-
-class ProtoBufRpcChannel
- : public RpcChannel,
- public enable_shared_from_this<ProtoBufRpcChannel>,
- private boost::noncopyable
-{
-public:
- ProtoBufRpcChannel(const string &remotehost, const string &port);
-
- virtual ~ProtoBufRpcChannel();
-
- virtual void CallMethod(
- const MethodDescriptor * method,
- RpcController * controller,
- const Message * request,
- Message * response,
- Closure * done);
-
-protected:
- shared_ptr<tcp::socket> getSocket();
- void putSocket(shared_ptr<tcp::socket>);
-
-private:
- class MethodHandler;
-
- string _remote_host;
- string _port;
-
-
- asio::io_service _io_service;
- tcp::resolver _resolver;
- // This exists to keep the io service running
- tcp::acceptor _acceptor;
-
- SocketPool _pool;
-
- asio::ip::tcp::socket _lame_socket;
- void lame_handle_accept(const error_code &err);
-
- shared_ptr<boost::thread> _thread;
-
-};
-
-} // namespace bicker
-
-#endif
+++ /dev/null
-#include "socket_pool.h"
-
-SocketPool::SocketPool(int max_sockets,
- asio::io_service &io_svc
- )
-:_issued(0),
- _max_sockets(max_sockets),
- _io_service(io_svc)
-{
-}
-
-void SocketPool::setEndpoint(const tcp::endpoint &endpoint)
-{
- _endpoint = endpoint;
-}
-
-void SocketPool::cancelAndClear()
-{
- for (set<shared_ptr<tcp::socket> >::iterator i = _set.begin();
- i != _set.end();
- ++i)
- {
- (*i)->cancel();
- }
-
- while (!_queue.empty()) _queue.pop();
- _set.clear();
-}
-
-shared_ptr<tcp::socket> SocketPool::getSocket()
-{
- mutex::scoped_lock lock(_sockets_lock);
-
- while (_queue.size() == 0 && _issued >= _max_sockets)
- _sockets_non_empty.wait(lock);
-
- if (_queue.size())
- {
- shared_ptr<tcp::socket> socket = _queue.front();
- _queue.pop();
-
- return socket;
- }
- else
- {
- ++_issued;
- error_code error = asio::error::host_not_found;
-
- shared_ptr<tcp::socket> socket(new tcp::socket(_io_service));
- socket->connect(_endpoint, error);
-
- if (error) throw syserr::system_error(error);
-
- _set.insert(socket);
-
- return socket;
- }
-}
-
-void SocketPool::putSocket(shared_ptr<tcp::socket> socket)
-{
- mutex::scoped_lock lock(_sockets_lock);
-
- if (!socket->is_open())
- {
- cerr << "socket closed\n";
- --_issued;
- _set.erase(socket);
- }
- else
- {
- _queue.push(socket);
- }
-
- _sockets_non_empty.notify_one();
-}
-
-SocketCheckout::SocketCheckout(SocketPool *pool)
- :_socket(pool->getSocket()),
- _pool(pool)
-{
-}
-
-SocketCheckout::~SocketCheckout()
-{
- _pool->putSocket(_socket);
-}
-tcp::socket& SocketCheckout::operator*()
-{
- return *_socket;
-}
-
-tcp::socket* SocketCheckout::operator->()
-{
- return _socket.get();
-}
-
-shared_ptr<tcp::socket>& SocketCheckout::socket()
-{
- return _socket;
-}
+++ /dev/null
-#ifndef _SOCKET_POOL_H_
-#define _SOCKET_POOL_H_ 1
-
-#include <iostream>
-#include <set>
-#include <queue>
-#include <boost/shared_ptr.hpp>
-#include <boost/thread.hpp>
-
-#include "util.h"
-
-using namespace std;
-using namespace boost;
-
-
-class SocketPool
-{
-public:
- SocketPool(int max_streams,
- asio::io_service &io_svc);
- void setEndpoint(const tcp::endpoint &endpoint);
- void cancelAndClear();
- shared_ptr<tcp::socket> getSocket();
- void putSocket(shared_ptr<tcp::socket> socket);
-private:
- int _issued;
- int _max_sockets;
- mutex _sockets_lock;
- condition_variable _sockets_non_empty;
- asio::io_service &_io_service;
- tcp::endpoint _endpoint;
- queue<shared_ptr<tcp::socket> > _queue;
- set<shared_ptr<tcp::socket> > _set;
-};
-
-class SocketCheckout
-{
-public:
- SocketCheckout(SocketPool *pool);
- ~SocketCheckout();
-
- tcp::socket& operator*();
- tcp::socket* operator->();
-
- shared_ptr<tcp::socket>& socket();
-
-private:
- shared_ptr<tcp::socket> _socket;
- SocketPool *_pool;
-};
-
-#endif
+++ /dev/null
-#ifndef _UTIL_H_
-#define _UTIL_H_ 1
-
-#include <boost/version.hpp>
-
-#if BOOST_VERSION <= 103500
-#include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/bind.hpp>
-#include <asio.hpp>
-#include <asio/buffer.hpp>
-//typedef boost::condition condition_variable ;
-//typedef boost::detail::thread::scoped_lock<boost::mutex> scoped_lock;
-using asio::ip::tcp;
-using asio::error_code;
-using asio::buffers_begin;
-namespace syserr=asio;
-#else
-#if BOOST_VERSION < 104000
-#include <boost/asio.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/bind.hpp>
-using boost::asio::ip::tcp;
-using boost::system::error_code;
-using boost::system::system_error;
-namespace syserr=boost::system;
-#else
-#include <boost/asio.hpp>
-#include <boost/thread/mutex.hpp>
-using boost::asio::ip::tcp;
-using boost::system::error_code;
-using boost::system::system_error;
-namespace syserr=boost::system;
-#endif
-#endif
-
-#endif
+++ /dev/null
-#include "workqueue.h"
-#include <boost/thread.hpp>
-
-namespace bicker
-{
-
-WorkQueue::WorkQueue()
- :_thread_count(0), _min_threads(1), _max_threads(50), _running(true)
-{
- for (int i = 0; i < _min_threads; ++i)
- {
- spawnThread();
- }
-}
-
-WorkQueue::WorkQueue(int thread_count)
- :_thread_count(0),
- _min_threads(1), _max_threads(thread_count), _running(true)
-{
- for (int i = 0; i < _min_threads; ++i)
- {
- spawnThread();
- }
-}
-
-WorkQueue::~WorkQueue()
-{
- _running = false;
-
- {
- mutex::scoped_lock lock(_queue_lock);
- _queue_non_empty.notify_all();
- }
-
- _threads.join_all();
-}
-
-void WorkQueue::spawnThread()
-{
- ++_thread_count;
- _threads.create_thread(Worker(this));
-}
-
-shared_ptr<WorkUnit> WorkQueue::get()
-{
- mutex::scoped_lock lock(_queue_lock);
-
- while (_queue.size() == 0)
- {
- _queue_non_empty.wait(lock);
- if (!_running) throw interrupted_error();
- }
-
- shared_ptr<WorkUnit> back = _queue.front();
- _queue.pop();
-
- if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread();
-
- return back;
-}
-
-void WorkQueue::put(shared_ptr<WorkUnit> work_unit)
-{
- mutex::scoped_lock lock(_queue_lock);
-
- _queue.push(work_unit);
-
- _queue_non_empty.notify_one();
-}
-
-WorkQueue::Worker::Worker(WorkQueue* queue)
- :_queue(queue)
-{
-}
-
-void WorkQueue::Worker::operator()()
-{
- while (true)
- {
- try
- {
- shared_ptr<WorkUnit> unit = _queue->get();
-
- unit->run();
- }
- catch (interrupted_error)
- {
- return;
- }
- }
-}
-
-TaskNotification::TaskNotification()
-:_expected(0), _count(0), _fail_count(0)
-{
-}
-
-void TaskNotification::registerTask()
-{
- mutex::scoped_lock lock(_lock);
- ++_expected;
-}
-
-void TaskNotification::completeTask(bool success)
-{
- mutex::scoped_lock lock(_lock);
- if (!success) ++_fail_count;
- if (++_count == _expected) _cond.notify_all();
-}
-
-void TaskNotification::waitForComplete()
-{
- mutex::scoped_lock lock(_lock);
- while (_count < _expected)
- {
- _cond.wait(lock);
- }
-}
-
-bool TaskNotification::failCount()
-{
- return _fail_count;
-}
-
-} // namespace bicker
+++ /dev/null
-#ifndef __WORKQUEUE_H__
-#define __WORKQUEUE_H__ 1
-
-#include <boost/thread.hpp>
-#include <boost/shared_ptr.hpp>
-#include <queue>
-#include "util.h"
-
-using namespace boost;
-using namespace std;
-
-namespace bicker
-{
- struct interrupted_error : public virtual std::exception { };
-
- class WorkUnit
- {
- public:
- virtual ~WorkUnit() {};
- virtual void run() = 0;
- };
-
- class WorkQueue
- {
- public:
- WorkQueue();
- WorkQueue(int thread_count);
-
- ~WorkQueue();
-
- shared_ptr<WorkUnit> get();
- void put(shared_ptr<WorkUnit> work_unit);
-
- protected:
- void spawnThread();
-
- private:
- class Worker
- {
- public:
- Worker(WorkQueue* queue);
- void operator()();
- private:
- WorkQueue *_queue;
- };
-
- int _thread_count;
- int _min_threads;
- int _max_threads;
- mutex _queue_lock;
- condition_variable _queue_non_empty;
- queue<shared_ptr<WorkUnit> > _queue;
- thread_group _threads;
- volatile bool _running;
- };
-
- class TaskNotification
- {
- public:
- TaskNotification();
-
- void registerTask();
- void completeTask(bool success = true);
-
- void waitForComplete();
-
- bool failCount();
- private:
- int _expected;
- int _count;
- int _fail_count;
- mutex _lock;
- condition_variable _cond;
- };
-
-} // namespace bicker
-
-#endif
--- /dev/null
+#include "socket_pool.h"
+
+SocketPool::SocketPool(int max_sockets,
+ asio::io_service &io_svc
+ )
+:_issued(0),
+ _max_sockets(max_sockets),
+ _io_service(io_svc)
+{
+}
+
+void SocketPool::setEndpoint(const tcp::endpoint &endpoint)
+{
+ _endpoint = endpoint;
+}
+
+void SocketPool::cancelAndClear()
+{
+ for (set<shared_ptr<tcp::socket> >::iterator i = _set.begin();
+ i != _set.end();
+ ++i)
+ {
+ (*i)->cancel();
+ }
+
+ while (!_queue.empty()) _queue.pop();
+ _set.clear();
+}
+
+shared_ptr<tcp::socket> SocketPool::getSocket()
+{
+ mutex::scoped_lock lock(_sockets_lock);
+
+ while (_queue.size() == 0 && _issued >= _max_sockets)
+ _sockets_non_empty.wait(lock);
+
+ if (_queue.size())
+ {
+ shared_ptr<tcp::socket> socket = _queue.front();
+ _queue.pop();
+
+ return socket;
+ }
+ else
+ {
+ ++_issued;
+ error_code error = asio::error::host_not_found;
+
+ shared_ptr<tcp::socket> socket(new tcp::socket(_io_service));
+ socket->connect(_endpoint, error);
+
+ if (error) throw syserr::system_error(error);
+
+ _set.insert(socket);
+
+ return socket;
+ }
+}
+
+void SocketPool::putSocket(shared_ptr<tcp::socket> socket)
+{
+ mutex::scoped_lock lock(_sockets_lock);
+
+ if (!socket->is_open())
+ {
+ cerr << "socket closed\n";
+ --_issued;
+ _set.erase(socket);
+ }
+ else
+ {
+ _queue.push(socket);
+ }
+
+ _sockets_non_empty.notify_one();
+}
+
+SocketCheckout::SocketCheckout(SocketPool *pool)
+ :_socket(pool->getSocket()),
+ _pool(pool)
+{
+}
+
+SocketCheckout::~SocketCheckout()
+{
+ _pool->putSocket(_socket);
+}
+tcp::socket& SocketCheckout::operator*()
+{
+ return *_socket;
+}
+
+tcp::socket* SocketCheckout::operator->()
+{
+ return _socket.get();
+}
+
+shared_ptr<tcp::socket>& SocketCheckout::socket()
+{
+ return _socket;
+}
--- /dev/null
+#ifndef _SOCKET_POOL_H_
+#define _SOCKET_POOL_H_ 1
+
+#include <iostream>
+#include <set>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+
+#include "util.h"
+
+using namespace std;
+using namespace boost;
+
+
+class SocketPool
+{
+public:
+ SocketPool(int max_streams,
+ asio::io_service &io_svc);
+ void setEndpoint(const tcp::endpoint &endpoint);
+ void cancelAndClear();
+ shared_ptr<tcp::socket> getSocket();
+ void putSocket(shared_ptr<tcp::socket> socket);
+private:
+ int _issued;
+ int _max_sockets;
+ mutex _sockets_lock;
+ condition_variable _sockets_non_empty;
+ asio::io_service &_io_service;
+ tcp::endpoint _endpoint;
+ queue<shared_ptr<tcp::socket> > _queue;
+ set<shared_ptr<tcp::socket> > _set;
+};
+
+class SocketCheckout
+{
+public:
+ SocketCheckout(SocketPool *pool);
+ ~SocketCheckout();
+
+ tcp::socket& operator*();
+ tcp::socket* operator->();
+
+ shared_ptr<tcp::socket>& socket();
+
+private:
+ shared_ptr<tcp::socket> _socket;
+ SocketPool *_pool;
+};
+
+#endif
--- /dev/null
+#ifndef _UTIL_H_
+#define _UTIL_H_ 1
+
+#include <boost/version.hpp>
+
+#if BOOST_VERSION <= 103500
+#include <boost/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+#include <asio.hpp>
+#include <asio/buffer.hpp>
+//typedef boost::condition condition_variable ;
+//typedef boost::detail::thread::scoped_lock<boost::mutex> scoped_lock;
+using asio::ip::tcp;
+using asio::error_code;
+using asio::buffers_begin;
+namespace syserr=asio;
+#else
+#if BOOST_VERSION < 104000
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
+#else
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
+#endif
+#endif
+
+#endif
--- /dev/null
+#include "workqueue.h"
+#include <boost/thread.hpp>
+
+namespace bicker
+{
+
+WorkQueue::WorkQueue()
+ :_thread_count(0), _min_threads(1), _max_threads(50), _running(true)
+{
+ for (int i = 0; i < _min_threads; ++i)
+ {
+ spawnThread();
+ }
+}
+
+WorkQueue::WorkQueue(int thread_count)
+ :_thread_count(0),
+ _min_threads(1), _max_threads(thread_count), _running(true)
+{
+ for (int i = 0; i < _min_threads; ++i)
+ {
+ spawnThread();
+ }
+}
+
+WorkQueue::~WorkQueue()
+{
+ _running = false;
+
+ {
+ mutex::scoped_lock lock(_queue_lock);
+ _queue_non_empty.notify_all();
+ }
+
+ _threads.join_all();
+}
+
+void WorkQueue::spawnThread()
+{
+ ++_thread_count;
+ _threads.create_thread(Worker(this));
+}
+
+shared_ptr<WorkUnit> WorkQueue::get()
+{
+ mutex::scoped_lock lock(_queue_lock);
+
+ while (_queue.size() == 0)
+ {
+ _queue_non_empty.wait(lock);
+ if (!_running) throw interrupted_error();
+ }
+
+ shared_ptr<WorkUnit> back = _queue.front();
+ _queue.pop();
+
+ if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread();
+
+ return back;
+}
+
+void WorkQueue::put(shared_ptr<WorkUnit> work_unit)
+{
+ mutex::scoped_lock lock(_queue_lock);
+
+ _queue.push(work_unit);
+
+ _queue_non_empty.notify_one();
+}
+
+WorkQueue::Worker::Worker(WorkQueue* queue)
+ :_queue(queue)
+{
+}
+
+void WorkQueue::Worker::operator()()
+{
+ while (true)
+ {
+ try
+ {
+ shared_ptr<WorkUnit> unit = _queue->get();
+
+ unit->run();
+ }
+ catch (interrupted_error)
+ {
+ return;
+ }
+ }
+}
+
+TaskNotification::TaskNotification()
+:_expected(0), _count(0), _fail_count(0)
+{
+}
+
+void TaskNotification::registerTask()
+{
+ mutex::scoped_lock lock(_lock);
+ ++_expected;
+}
+
+void TaskNotification::completeTask(bool success)
+{
+ mutex::scoped_lock lock(_lock);
+ if (!success) ++_fail_count;
+ if (++_count == _expected) _cond.notify_all();
+}
+
+void TaskNotification::waitForComplete()
+{
+ mutex::scoped_lock lock(_lock);
+ while (_count < _expected)
+ {
+ _cond.wait(lock);
+ }
+}
+
+bool TaskNotification::failCount()
+{
+ return _fail_count;
+}
+
+} // namespace bicker
--- /dev/null
+#ifndef __WORKQUEUE_H__
+#define __WORKQUEUE_H__ 1
+
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
+#include <queue>
+#include "util.h"
+
+using namespace boost;
+using namespace std;
+
+namespace bicker
+{
+ struct interrupted_error : public virtual std::exception { };
+
+ class WorkUnit
+ {
+ public:
+ virtual ~WorkUnit() {};
+ virtual void run() = 0;
+ };
+
+ class WorkQueue
+ {
+ public:
+ WorkQueue();
+ WorkQueue(int thread_count);
+
+ ~WorkQueue();
+
+ shared_ptr<WorkUnit> get();
+ void put(shared_ptr<WorkUnit> work_unit);
+
+ protected:
+ void spawnThread();
+
+ private:
+ class Worker
+ {
+ public:
+ Worker(WorkQueue* queue);
+ void operator()();
+ private:
+ WorkQueue *_queue;
+ };
+
+ int _thread_count;
+ int _min_threads;
+ int _max_threads;
+ mutex _queue_lock;
+ condition_variable _queue_non_empty;
+ queue<shared_ptr<WorkUnit> > _queue;
+ thread_group _threads;
+ volatile bool _running;
+ };
+
+ class TaskNotification
+ {
+ public:
+ TaskNotification();
+
+ void registerTask();
+ void completeTask(bool success = true);
+
+ void waitForComplete();
+
+ bool failCount();
+ private:
+ int _expected;
+ int _count;
+ int _fail_count;
+ mutex _lock;
+ condition_variable _cond;
+ };
+
+} // namespace bicker
+
+#endif