From: Michael Vrable Date: Mon, 15 Feb 2010 23:50:09 +0000 (-0800) Subject: Reorganizing kvstore sources and switching build system to CMake. X-Git-Url: http://git.vrable.net/?a=commitdiff_plain;h=d9c7ab29f5651da244cac393f446a6c5e823fcd1;p=bluesky.git Reorganizing kvstore sources and switching build system to CMake. --- diff --git a/kvstore/CMakeLists.txt b/kvstore/CMakeLists.txt new file mode 100644 index 0000000..4d7edad --- /dev/null +++ b/kvstore/CMakeLists.txt @@ -0,0 +1,15 @@ +cmake_minimum_required(VERSION 2.6) + +add_custom_command(OUTPUT kvstore.pb.cc kvstore.pb.h + COMMAND protoc --cpp_out=. kvstore.proto + DEPENDS kvstore.proto) + +add_library(protobufrpc protobufrpc.cc socket_pool.cc workqueue.cc) +add_library(kvservice kvservice.cc kvstore.pb.cc backend.cc) +add_library(kvclient kvclient.cc) +add_executable(kvstore kvstore.cc) + +target_link_libraries(kvstore + kvservice + boost_thread-mt boost_regex-mt boost_system-mt + boost_program_options-mt db protobuf protobufrpc pthread) diff --git a/kvstore/backend.cc b/kvstore/backend.cc index d97c492..04a6764 100644 --- a/kvstore/backend.cc +++ b/kvstore/backend.cc @@ -91,22 +91,22 @@ public: if (log_in_memory) { - res = _dbenv->set_flags(_dbenv, DB_LOG_INMEMORY, 1); + res = _dbenv->set_flags(_dbenv, DB_LOG_IN_MEMORY, 1); if (res != 0) { cerr << db_strerror(res) << endl; - throw std::runtime_error("BDB ENV DB_LOG_INMEMORY"); + throw std::runtime_error("BDB ENV DB_LOG_IN_MEMORY"); } } - res = _dbenv->set_flags(_dbenv, DB_LOG_AUTOREMOVE, 1); + res = _dbenv->set_flags(_dbenv, DB_LOG_AUTO_REMOVE, 1); if (res != 0) { cerr << db_strerror(res) << endl; - throw std::runtime_error("BDB ENV DB_LOG_AUTOREMOVE"); + throw std::runtime_error("BDB ENV DB_LOG_AUTO_REMOVE"); } res = _dbenv->open(_dbenv, diff --git a/kvstore/debian/README.Debian b/kvstore/debian/README.Debian deleted file mode 100644 index 0cc3a58..0000000 --- a/kvstore/debian/README.Debian +++ /dev/null @@ -1,6 +0,0 @@ -bicker-fcgi for Debian ----------------------- - - - - -- John McCullough Tue, 29 Sep 2009 13:29:18 -0700 diff --git a/kvstore/debian/README.source b/kvstore/debian/README.source deleted file mode 100644 index a7e4671..0000000 --- a/kvstore/debian/README.source +++ /dev/null @@ -1,9 +0,0 @@ -bicker-fcgi for Debian ----------------------- - - - - - - diff --git a/kvstore/debian/changelog b/kvstore/debian/changelog deleted file mode 100644 index be2e2de..0000000 --- a/kvstore/debian/changelog +++ /dev/null @@ -1,37 +0,0 @@ -kvstore (0.1-7) unstable; urgency=low - - * Reduced memory copying in protobuf, support for multiple data - targets - - -- John McCullough Thu, 21 Jan 2010 11:10:45 -0800 - -kvstore (0.1-6) unstable; urgency=low - - * Improved RPC Layer - - -- John McCullough Thu, 24 Dec 2009 14:25:41 -0800 - -kvstore (0.1-5) unstable; urgency=low - - * KeyValue -> KVStore rename. - * Fixed Dependencies - - -- John McCullough Sun, 29 Nov 2009 16:17:23 -0800 - -kvstore (0.1-3) unstable; urgency=low - - * More Packaging Fun - - -- John McCullough Sun, 29 Nov 2009 15:34:41 -0800 - -kvstore (0.1-2) unstable; urgency=low - - * Incremented boost version to .0 - - -- John McCullough Sun, 29 Nov 2009 15:32:01 -0800 - -kvstore (0.1-1) unstable; urgency=low - - * Initial release (Closes: #nnnn) - - -- John McCullough Tue, 29 Sep 2009 13:29:18 -0700 diff --git a/kvstore/debian/compat b/kvstore/debian/compat deleted file mode 100644 index 7f8f011..0000000 --- a/kvstore/debian/compat +++ /dev/null @@ -1 +0,0 @@ -7 diff --git a/kvstore/debian/control b/kvstore/debian/control deleted file mode 100644 index 748f726..0000000 --- a/kvstore/debian/control +++ /dev/null @@ -1,13 +0,0 @@ -Source: kvstore -Section: unknown -Priority: extra -Maintainer: John McCullough -Build-Depends: debhelper (>= 7), libboost-thread1.35-dev, libboost-system1.35-dev, libboost1.35-dev, libasio-dev, protobuf-compiler, libprotobuf-dev, libdb4.6-dev, libgtest-dev -Standards-Version: 3.8.3, -Homepage: - -Package: kvstore -Architecture: any -Depends: ${misc:Depends}, libboost-thread1.35.0, libboost-regex1.35.0, libboost-program-options1.35.0, libboost-system1.35.0, libprotobuf4, libdb4.6, libgtest0 -Description: Homebrew key value store - diff --git a/kvstore/debian/copyright b/kvstore/debian/copyright deleted file mode 100644 index c356a24..0000000 --- a/kvstore/debian/copyright +++ /dev/null @@ -1,35 +0,0 @@ -This work was packaged for Debian by: - - John McCullough on Tue, 29 Sep 2009 13:29:18 -0700 - -It was downloaded from - -Upstream Author(s): - - - - -Copyright: - - - - -License: - - - -The Debian packaging is: - - Copyright (C) 2009 John McCullough - -# Please chose a license for your packaging work. If the program you package -# uses a mainstream license, using the same license is the safest choice. -# Please avoid to pick license terms that are more restrictive than the -# packaged work, as it may make Debian's contributions unacceptable upstream. -# If you just want it to be GPL version 3, leave the following lines in. - -and is licensed under the GPL version 3, -see `/usr/share/common-licenses/GPL-3'. - -# Please also look if there are files or directories which have a -# different copyright/license attached and list them here. diff --git a/kvstore/debian/docs b/kvstore/debian/docs deleted file mode 100644 index e845566..0000000 --- a/kvstore/debian/docs +++ /dev/null @@ -1 +0,0 @@ -README diff --git a/kvstore/debian/install b/kvstore/debian/install deleted file mode 100644 index c110135..0000000 --- a/kvstore/debian/install +++ /dev/null @@ -1 +0,0 @@ -usr/bin/kvstore diff --git a/kvstore/debian/rules b/kvstore/debian/rules deleted file mode 100755 index a47c8e4..0000000 --- a/kvstore/debian/rules +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/make -f -# -*- makefile -*- -# Sample debian/rules that uses debhelper. -# This file was originally written by Joey Hess and Craig Small. -# As a special exception, when this file is copied by dh-make into a -# dh-make output file, you may use that output file without restriction. -# This special exception was added by Craig Small in version 0.37 of dh-make. - -# Uncomment this to turn on verbose mode. -#export DH_VERBOSE=1 -build: - dh_testdir - scons - -install: build - dh_testdir - dh_testroot - dh_prep - dh_installdirs - scons --prefix=$(CURDIR)/debian/tmp install - -clean: - scons -c - dh_clean - -binary-arch: install - dh_testdir - dh_testroot - dh_installchangelogs - dh_install - dh_link - #dh_strip - dh_compress - dh_fixperms - dh_installdeb - dh_gencontrol - dh_md5sums - dh_builddeb - -binary: binary-arch - -%: - dh $@ diff --git a/kvstore/protobufrpc.cc b/kvstore/protobufrpc.cc new file mode 100644 index 0000000..06226f7 --- /dev/null +++ b/kvstore/protobufrpc.cc @@ -0,0 +1,628 @@ +#include "protobufrpc.h" + +#include +#include +#include +#include +#include + +#include + +using namespace std; + +using namespace boost; +using asio::buffer; + +namespace bicker +{ + +template +static void* void_write(void* data, T val) +{ + *((T*)data) = val; + return (char*)data + sizeof(T); +} + +class ProtoBufRpcServiceRequest +{ +public: + ProtoBufRpcServiceRequest( + RpcController *ctrl, + const MethodDescriptor* method, + Message *request, + Message *response, + shared_ptr conn + ) + :_ctrl(ctrl), + _method(method), + _request(request), + _response(response), + _conn(conn) + { + } + + ~ProtoBufRpcServiceRequest() + { + + } + + static void run(ProtoBufRpcServiceRequest *req) + { + + req->_conn->writeResponse(req->_response.get()); + + delete req; + } + + shared_ptr _ctrl; + const MethodDescriptor *_method; + shared_ptr _request; + shared_ptr _response; + shared_ptr _conn; +}; + +ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service, + Service *service) +:_socket(io_service), + _strand(io_service), + _service(service), + _state(STATE_NONE) +{ +} + +tcp::socket& ProtoBufRpcConnection::socket() +{ + return _socket; +} + +void ProtoBufRpcConnection::start() +{ + _socket.async_read_some(_buffer.prepare(4096), + _strand.wrap( + boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred))); +} + +void ProtoBufRpcConnection::writeResponse(Message *msg) +{ + int rlen = msg->ByteSize(); + int len = htonl(rlen); + int mlen = sizeof(len) + rlen; + + void * data = asio::buffer_cast(_buffer.prepare(mlen)); + + data = void_write(data, len); + + using google::protobuf::io::ArrayOutputStream; + + ArrayOutputStream as(data, rlen); + + msg->SerializeToZeroCopyStream(&as); + + _buffer.commit(mlen); + + asio::async_write(_socket, + _buffer.data(), + _strand.wrap( + boost::bind(&ProtoBufRpcConnection::handle_write, + shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred))); +} + + +void ProtoBufRpcConnection::handle_read(const error_code& e, + std::size_t bytes_transferred) +{ + if (!e) + { + _buffer.commit(bytes_transferred); + + if (_state == STATE_NONE) + { + if (_buffer.size() >= sizeof(_id) + sizeof(_len)) + { + string b( + buffers_begin(_buffer.data()), + buffers_begin(_buffer.data()) + + sizeof(_id) + sizeof(_len) + ); + + _buffer.consume(sizeof(_id) + sizeof(_len)); + + _id = *((int*)b.c_str()); + _id = ntohl(_id); + + _len = *((unsigned int*)(b.c_str() + sizeof(_id))); + _len = ntohl(_len); + + _state = STATE_HAVE_ID_AND_LEN; + } + else + { + start(); + } + } + + if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA) + { + if (_buffer.size() >= _len) + { + const MethodDescriptor* method = + _service->GetDescriptor()->method(_id); + + Message *req = _service->GetRequestPrototype(method).New(); + Message *resp = _service->GetResponsePrototype(method).New(); + + using google::protobuf::io::ArrayInputStream; + using google::protobuf::io::CodedInputStream; + + const void* data = asio::buffer_cast( + _buffer.data() + ); + ArrayInputStream as(data, _len); + CodedInputStream is(&as); + is.SetTotalBytesLimit(512 * 1024 * 1024, -1); + + if (!req->ParseFromCodedStream(&is)) + { + throw std::runtime_error("ParseFromCodedStream"); + } + + _buffer.consume(_len); + + ProtoBufRpcController *ctrl = new ProtoBufRpcController(); + _service->CallMethod(method, + ctrl, + req, + resp, + NewCallback( + &ProtoBufRpcServiceRequest::run, + new ProtoBufRpcServiceRequest( + ctrl, + method, + req, + resp, + shared_from_this()) + ) + ); + _state = STATE_NONE; + } + else + { + _state = STATE_WAITING_FOR_DATA; + start(); + } + } + + } + else + { + error_code ignored_ec; + _socket.shutdown(tcp::socket::shutdown_both, ignored_ec); + } +} + +void ProtoBufRpcConnection::handle_write(const error_code& e, + std::size_t bytes_transferred) +{ + if (e) + { + error_code ignored_ec; + _socket.shutdown(tcp::socket::shutdown_both, ignored_ec); + } + else + { + _buffer.consume(bytes_transferred); + + if (_buffer.size()) + { + asio::async_write(_socket, + _buffer.data(), + _strand.wrap( + boost::bind(&ProtoBufRpcConnection::handle_write, + shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred))); + return; + } + + _state = STATE_NONE; + start(); + } +} + +ProtoBufRpcServer::ProtoBufRpcServer() + :_io_service(new asio::io_service()) +{ +} + +bool ProtoBufRpcServer::registerService(uint16_t port, + shared_ptr service) +{ + // This is not thread safe + + // The RegisteredService Constructor fires up the appropriate + // async accepts for the service + _services.push_back(shared_ptr( + new RegisteredService( + _io_service, + port, + service))); + + return true; +} + +void run_wrapper(asio::io_service *io_service) +{ + struct itimerval itimer; + setitimer(ITIMER_PROF, &itimer, NULL); + + io_service->run(); +} + +void ProtoBufRpcServer::run() +{ + try + { + if (_services.size() == 0) + { + throw std::runtime_error("No services registered for ProtoBufRpcServer"); + } + + size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN); + + vector > threads; + for (size_t i = 0; i < nprocs; ++i) + { + shared_ptr t(new thread( + boost::bind( + //&run_wrapper, + &asio::io_service::run, + _io_service.get()))); + threads.push_back(t); + } + + for (size_t i = 0; i < threads.size(); ++i) + { + threads[i]->join(); + } + } + catch (std::exception &e) + { + std::cerr << "ProtoBufRpcService" << e.what() << std::endl; + } +} + +void ProtoBufRpcServer::shutdown() +{ + _io_service->stop(); +} + +ProtoBufRpcServer::RegisteredService::RegisteredService( + shared_ptr io_service, + uint16_t port, + shared_ptr service + ) +:_io_service(io_service), + _port(port), + _service(service), + _endpoint(tcp::v4(), _port), + _acceptor(*_io_service), + _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get())) +{ + _acceptor.open(_endpoint.protocol()); + _acceptor.set_option(tcp::acceptor::reuse_address(true)); + _acceptor.bind(_endpoint); + _acceptor.listen(); + _acceptor.async_accept(_new_connection->socket(), + boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept, + this, + asio::placeholders::error)); +} + +void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e) +{ + if (!e) + { + _new_connection->start(); + _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get())); + _acceptor.async_accept(_new_connection->socket(), + boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept, + this, + asio::placeholders::error)); + } + +} + +ProtoBufRpcController::ProtoBufRpcController() +{ +} + +ProtoBufRpcController::~ProtoBufRpcController() +{ +} + +void ProtoBufRpcController::Reset() +{ +} + +bool ProtoBufRpcController::Failed() const +{ + return false; +} + +string ProtoBufRpcController::ErrorText() const +{ + return "No Error"; +} + +void ProtoBufRpcController::StartCancel() +{ +} + +void ProtoBufRpcController::SetFailed(const string &/*reason*/) +{ +} + +bool ProtoBufRpcController::IsCanceled() const +{ + return false; +} + +void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/) +{ +} + +class ProtoBufRpcChannel::MethodHandler + : public enable_shared_from_this, + private boost::noncopyable +{ +public: + MethodHandler(auto_ptr socket, + const MethodDescriptor * method, + RpcController * controller, + const Message * request, + Message * response, + Closure * done + ) + :_socket(socket), + _method(method), + _controller(controller), + _request(request), + _response(response), + _done(done) + { + } + + ~MethodHandler() + { + _socket.reset(); + _done->Run(); + } + + static void execute(shared_ptr this_ptr) + { + int index = htonl(this_ptr->_method->index()); + int rlen = this_ptr->_request->ByteSize(); + int len = htonl(rlen); + + int mlen = sizeof(index) + sizeof(len) + rlen; + + void * data = asio::buffer_cast(this_ptr->_buffer.prepare(mlen)); + + data = void_write(data, index); + data = void_write(data, len); + + using google::protobuf::io::ArrayOutputStream; + + ArrayOutputStream as(data, rlen); + + this_ptr->_request->SerializeToZeroCopyStream(&as); + this_ptr->_buffer.commit(mlen); + + (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(), + boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred)); + } + + static void handle_write(shared_ptr this_ptr, + const error_code& e, + std::size_t bytes_transferred) + { + if (!e) + { + this_ptr->_buffer.consume(bytes_transferred); + + if (this_ptr->_buffer.size()) + { + (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(), + boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred)); + return; + } + + (*(this_ptr->_socket))->async_receive( + buffer(&this_ptr->_len, sizeof(this_ptr->_len)), + boost::bind( + &ProtoBufRpcChannel::MethodHandler::handle_read_len, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred) + ); + } + else + { + this_ptr->_controller->SetFailed(e.message()); + (*(this_ptr->_socket))->close(); + } + } + + static void handle_read_len(shared_ptr this_ptr, + const error_code& e, + std::size_t bytes_transferred) + { + if (!e && bytes_transferred == sizeof(this_ptr->_len)) + { + this_ptr->_len = ntohl(this_ptr->_len); + (*(this_ptr->_socket))->async_receive( + this_ptr->_buffer.prepare(this_ptr->_len), + boost::bind( + &ProtoBufRpcChannel::MethodHandler::handle_read_response, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred + ) + ); + } + else + { + this_ptr->_controller->SetFailed(e.message()); + (*(this_ptr->_socket))->close(); + } + } + + static void handle_read_response(shared_ptr this_ptr, + const error_code& e, + std::size_t bytes_transferred) + { + if (!e) + { + this_ptr->_buffer.commit(bytes_transferred); + if (this_ptr->_buffer.size() >= this_ptr->_len) + { + using google::protobuf::io::ArrayInputStream; + using google::protobuf::io::CodedInputStream; + + const void* data = asio::buffer_cast( + this_ptr->_buffer.data() + ); + ArrayInputStream as(data, this_ptr->_len); + CodedInputStream is(&as); + is.SetTotalBytesLimit(512 * 1024 * 1024, -1); + + if (!this_ptr->_response->ParseFromCodedStream(&is)) + { + throw std::runtime_error("ParseFromCodedStream"); + } + + this_ptr->_buffer.consume(this_ptr->_len); + } + else + { + (*(this_ptr->_socket))->async_receive( + this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()), + boost::bind( + &ProtoBufRpcChannel::MethodHandler::handle_read_response, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred + ) + ); + return; + } + } + else + { + this_ptr->_controller->SetFailed(e.message()); + (*(this_ptr->_socket))->close(); + } + } + +private: + auto_ptr _socket; + const MethodDescriptor * _method; + RpcController * _controller; + const Message * _request; + Message * _response; + Closure * _done; + asio::streambuf _buffer; + unsigned int _len; + bool _status; + unsigned int _sent; +}; + + +ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost, + const string &port) + :_remote_host(remotehost), _port(port), + _resolver(_io_service), + _acceptor(_io_service), + _pool(2000, _io_service), + _lame_socket(_io_service), + _thread() +// &asio::io_service::run, +// &_io_service))) +{ + + tcp::resolver::query query(_remote_host, _port); + tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query); + tcp::resolver::iterator end; + + error_code error = asio::error::host_not_found; + + if (endpoint_iterator == end) throw syserr::system_error(error); + + _pool.setEndpoint(*endpoint_iterator); + + tcp::endpoint e(tcp::v4(), 0); + _acceptor.open(e.protocol()); + _acceptor.set_option(tcp::acceptor::reuse_address(true)); + _acceptor.bind(e); + _acceptor.listen(); + _acceptor.async_accept(_lame_socket, + boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this, + asio::placeholders::error)); + + _thread = shared_ptr(new thread( + boost::bind( + &asio::io_service::run, + &_io_service))); +} + +void ProtoBufRpcChannel::lame_handle_accept(const error_code &err) +{ + if (!err) + { + _acceptor.async_accept(_lame_socket, + boost::bind(&ProtoBufRpcChannel::lame_handle_accept, + this, + asio::placeholders::error)); + } +} + +ProtoBufRpcChannel::~ProtoBufRpcChannel() +{ + _pool.cancelAndClear(); + + _io_service.stop(); + + _thread->join(); +} + +void ProtoBufRpcChannel::CallMethod( + const MethodDescriptor * method, + RpcController * controller, + const Message * request, + Message * response, + Closure * done) +{ + shared_ptr h( + new MethodHandler( + auto_ptr(new SocketCheckout(&_pool)), + method, + controller, + request, + response, + done + )); + + MethodHandler::execute(h); +} + +} // namespace bicker diff --git a/kvstore/protobufrpc.h b/kvstore/protobufrpc.h new file mode 100644 index 0000000..221903d --- /dev/null +++ b/kvstore/protobufrpc.h @@ -0,0 +1,173 @@ +#ifndef __PROTOBUFRPC_H__ +#define __PROTOBUFRPC_H__ + +#include +#include +#include +#include +#include +#include +#include +#include "socket_pool.h" +#include "util.h" + +#include +#include +#include +#include +#include + +using namespace std; +using namespace google::protobuf; +using namespace boost; + +namespace bicker +{ + +class ProtoBufRpcService; + +class ProtoBufRpcConnection + : public enable_shared_from_this, + private boost::noncopyable +{ +public: + explicit ProtoBufRpcConnection(asio::io_service& io_service, + Service *_service); + + asio::ip::tcp::socket& socket(); + + void start(); + + void writeResponse(Message *msg); + + +private: + void handle_read(const error_code& e, + std::size_t bytes_transferred); + + void handle_write(const error_code& e, + std::size_t bytes_transferred); + + tcp::socket _socket; + + + asio::io_service::strand _strand; + + Service *_service; + + asio::streambuf _buffer; + + int _id; + unsigned int _len; + + enum { + STATE_NONE, + STATE_HAVE_ID_AND_LEN, + STATE_WAITING_FOR_DATA, + STATE_FAIL, + } _state; +}; + + +class ProtoBufRpcServer +{ +public: + ProtoBufRpcServer(); + + bool registerService(uint16_t port, + shared_ptr< Service> service); + + // So we can call this as a thread. + void run(); + + // So we can stop.. + void shutdown(); + +protected: + + class RegisteredService + { + public: + RegisteredService( + shared_ptr io_service, + uint16_t port, + shared_ptr service + ); + + void handle_accept(const error_code& e); + + protected: + // Ref to parent's + shared_ptr _io_service; + uint16_t _port; + shared_ptr _service; + tcp::endpoint _endpoint; + tcp::acceptor _acceptor; + shared_ptr _new_connection; + }; + + list > _services; + shared_ptr _io_service; +}; + +class ProtoBufRpcController : public RpcController +{ +public: + ProtoBufRpcController(); + virtual ~ProtoBufRpcController(); + + virtual void Reset(); + virtual bool Failed() const; + virtual string ErrorText() const; + virtual void StartCancel(); + + virtual void SetFailed(const string &reason); + virtual bool IsCanceled() const; + virtual void NotifyOnCancel(Closure *callback); +}; + +class ProtoBufRpcChannel + : public RpcChannel, + public enable_shared_from_this, + private boost::noncopyable +{ +public: + ProtoBufRpcChannel(const string &remotehost, const string &port); + + virtual ~ProtoBufRpcChannel(); + + virtual void CallMethod( + const MethodDescriptor * method, + RpcController * controller, + const Message * request, + Message * response, + Closure * done); + +protected: + shared_ptr getSocket(); + void putSocket(shared_ptr); + +private: + class MethodHandler; + + string _remote_host; + string _port; + + + asio::io_service _io_service; + tcp::resolver _resolver; + // This exists to keep the io service running + tcp::acceptor _acceptor; + + SocketPool _pool; + + asio::ip::tcp::socket _lame_socket; + void lame_handle_accept(const error_code &err); + + shared_ptr _thread; + +}; + +} // namespace bicker + +#endif diff --git a/kvstore/protobufrpc/SConscript b/kvstore/protobufrpc/SConscript deleted file mode 100644 index b972f84..0000000 --- a/kvstore/protobufrpc/SConscript +++ /dev/null @@ -1,7 +0,0 @@ -Import('env') - -base_files = ['protobufrpc.cc', 'socket_pool.cc', 'workqueue.cc'] - -protobufrpc = env.StaticLibrary('protobufrpc', base_files) - -Return('protobufrpc') diff --git a/kvstore/protobufrpc/SConstruct b/kvstore/protobufrpc/SConstruct deleted file mode 100644 index 79fbb68..0000000 --- a/kvstore/protobufrpc/SConstruct +++ /dev/null @@ -1,29 +0,0 @@ -import os - -AddOption('--prefix', - dest='prefix', - type='string', - nargs=1, - action='store', - metavar='DIR', - help='installation prefix') - -env = Environment(CXXFLAGS='-g -fPIC', PREFIX=GetOption('prefix')) - -for envvar in ('HOME', 'DISTCC_DIR', 'DISTCC_HOSTS', 'CCACHE_DIR', - 'INTERCEPTOR_SOCKET', 'ENFORGE_DIGEST_CACHE', - 'ENFORGE_CACHE_HOST', 'ENFORGE_CACHE_PORT'): - if envvar in os.environ: - env['ENV'][envvar] = os.environ[envvar] - -if env['PREFIX'] is not None: - bin_dest = env['PREFIX'] + '/usr/bin' -else: - bin_dest = '/usr/bin' - -base_files = ['protobufrpc.cc', 'socket_pool.cc'] - -if not env.has_key('LIBS'): - env['LIBS'] = [] - -SConscript(['SConscript']) diff --git a/kvstore/protobufrpc/protobufrpc.cc b/kvstore/protobufrpc/protobufrpc.cc deleted file mode 100644 index 06226f7..0000000 --- a/kvstore/protobufrpc/protobufrpc.cc +++ /dev/null @@ -1,628 +0,0 @@ -#include "protobufrpc.h" - -#include -#include -#include -#include -#include - -#include - -using namespace std; - -using namespace boost; -using asio::buffer; - -namespace bicker -{ - -template -static void* void_write(void* data, T val) -{ - *((T*)data) = val; - return (char*)data + sizeof(T); -} - -class ProtoBufRpcServiceRequest -{ -public: - ProtoBufRpcServiceRequest( - RpcController *ctrl, - const MethodDescriptor* method, - Message *request, - Message *response, - shared_ptr conn - ) - :_ctrl(ctrl), - _method(method), - _request(request), - _response(response), - _conn(conn) - { - } - - ~ProtoBufRpcServiceRequest() - { - - } - - static void run(ProtoBufRpcServiceRequest *req) - { - - req->_conn->writeResponse(req->_response.get()); - - delete req; - } - - shared_ptr _ctrl; - const MethodDescriptor *_method; - shared_ptr _request; - shared_ptr _response; - shared_ptr _conn; -}; - -ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service, - Service *service) -:_socket(io_service), - _strand(io_service), - _service(service), - _state(STATE_NONE) -{ -} - -tcp::socket& ProtoBufRpcConnection::socket() -{ - return _socket; -} - -void ProtoBufRpcConnection::start() -{ - _socket.async_read_some(_buffer.prepare(4096), - _strand.wrap( - boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(), - asio::placeholders::error, - asio::placeholders::bytes_transferred))); -} - -void ProtoBufRpcConnection::writeResponse(Message *msg) -{ - int rlen = msg->ByteSize(); - int len = htonl(rlen); - int mlen = sizeof(len) + rlen; - - void * data = asio::buffer_cast(_buffer.prepare(mlen)); - - data = void_write(data, len); - - using google::protobuf::io::ArrayOutputStream; - - ArrayOutputStream as(data, rlen); - - msg->SerializeToZeroCopyStream(&as); - - _buffer.commit(mlen); - - asio::async_write(_socket, - _buffer.data(), - _strand.wrap( - boost::bind(&ProtoBufRpcConnection::handle_write, - shared_from_this(), - asio::placeholders::error, - asio::placeholders::bytes_transferred))); -} - - -void ProtoBufRpcConnection::handle_read(const error_code& e, - std::size_t bytes_transferred) -{ - if (!e) - { - _buffer.commit(bytes_transferred); - - if (_state == STATE_NONE) - { - if (_buffer.size() >= sizeof(_id) + sizeof(_len)) - { - string b( - buffers_begin(_buffer.data()), - buffers_begin(_buffer.data()) - + sizeof(_id) + sizeof(_len) - ); - - _buffer.consume(sizeof(_id) + sizeof(_len)); - - _id = *((int*)b.c_str()); - _id = ntohl(_id); - - _len = *((unsigned int*)(b.c_str() + sizeof(_id))); - _len = ntohl(_len); - - _state = STATE_HAVE_ID_AND_LEN; - } - else - { - start(); - } - } - - if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA) - { - if (_buffer.size() >= _len) - { - const MethodDescriptor* method = - _service->GetDescriptor()->method(_id); - - Message *req = _service->GetRequestPrototype(method).New(); - Message *resp = _service->GetResponsePrototype(method).New(); - - using google::protobuf::io::ArrayInputStream; - using google::protobuf::io::CodedInputStream; - - const void* data = asio::buffer_cast( - _buffer.data() - ); - ArrayInputStream as(data, _len); - CodedInputStream is(&as); - is.SetTotalBytesLimit(512 * 1024 * 1024, -1); - - if (!req->ParseFromCodedStream(&is)) - { - throw std::runtime_error("ParseFromCodedStream"); - } - - _buffer.consume(_len); - - ProtoBufRpcController *ctrl = new ProtoBufRpcController(); - _service->CallMethod(method, - ctrl, - req, - resp, - NewCallback( - &ProtoBufRpcServiceRequest::run, - new ProtoBufRpcServiceRequest( - ctrl, - method, - req, - resp, - shared_from_this()) - ) - ); - _state = STATE_NONE; - } - else - { - _state = STATE_WAITING_FOR_DATA; - start(); - } - } - - } - else - { - error_code ignored_ec; - _socket.shutdown(tcp::socket::shutdown_both, ignored_ec); - } -} - -void ProtoBufRpcConnection::handle_write(const error_code& e, - std::size_t bytes_transferred) -{ - if (e) - { - error_code ignored_ec; - _socket.shutdown(tcp::socket::shutdown_both, ignored_ec); - } - else - { - _buffer.consume(bytes_transferred); - - if (_buffer.size()) - { - asio::async_write(_socket, - _buffer.data(), - _strand.wrap( - boost::bind(&ProtoBufRpcConnection::handle_write, - shared_from_this(), - asio::placeholders::error, - asio::placeholders::bytes_transferred))); - return; - } - - _state = STATE_NONE; - start(); - } -} - -ProtoBufRpcServer::ProtoBufRpcServer() - :_io_service(new asio::io_service()) -{ -} - -bool ProtoBufRpcServer::registerService(uint16_t port, - shared_ptr service) -{ - // This is not thread safe - - // The RegisteredService Constructor fires up the appropriate - // async accepts for the service - _services.push_back(shared_ptr( - new RegisteredService( - _io_service, - port, - service))); - - return true; -} - -void run_wrapper(asio::io_service *io_service) -{ - struct itimerval itimer; - setitimer(ITIMER_PROF, &itimer, NULL); - - io_service->run(); -} - -void ProtoBufRpcServer::run() -{ - try - { - if (_services.size() == 0) - { - throw std::runtime_error("No services registered for ProtoBufRpcServer"); - } - - size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN); - - vector > threads; - for (size_t i = 0; i < nprocs; ++i) - { - shared_ptr t(new thread( - boost::bind( - //&run_wrapper, - &asio::io_service::run, - _io_service.get()))); - threads.push_back(t); - } - - for (size_t i = 0; i < threads.size(); ++i) - { - threads[i]->join(); - } - } - catch (std::exception &e) - { - std::cerr << "ProtoBufRpcService" << e.what() << std::endl; - } -} - -void ProtoBufRpcServer::shutdown() -{ - _io_service->stop(); -} - -ProtoBufRpcServer::RegisteredService::RegisteredService( - shared_ptr io_service, - uint16_t port, - shared_ptr service - ) -:_io_service(io_service), - _port(port), - _service(service), - _endpoint(tcp::v4(), _port), - _acceptor(*_io_service), - _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get())) -{ - _acceptor.open(_endpoint.protocol()); - _acceptor.set_option(tcp::acceptor::reuse_address(true)); - _acceptor.bind(_endpoint); - _acceptor.listen(); - _acceptor.async_accept(_new_connection->socket(), - boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept, - this, - asio::placeholders::error)); -} - -void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e) -{ - if (!e) - { - _new_connection->start(); - _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get())); - _acceptor.async_accept(_new_connection->socket(), - boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept, - this, - asio::placeholders::error)); - } - -} - -ProtoBufRpcController::ProtoBufRpcController() -{ -} - -ProtoBufRpcController::~ProtoBufRpcController() -{ -} - -void ProtoBufRpcController::Reset() -{ -} - -bool ProtoBufRpcController::Failed() const -{ - return false; -} - -string ProtoBufRpcController::ErrorText() const -{ - return "No Error"; -} - -void ProtoBufRpcController::StartCancel() -{ -} - -void ProtoBufRpcController::SetFailed(const string &/*reason*/) -{ -} - -bool ProtoBufRpcController::IsCanceled() const -{ - return false; -} - -void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/) -{ -} - -class ProtoBufRpcChannel::MethodHandler - : public enable_shared_from_this, - private boost::noncopyable -{ -public: - MethodHandler(auto_ptr socket, - const MethodDescriptor * method, - RpcController * controller, - const Message * request, - Message * response, - Closure * done - ) - :_socket(socket), - _method(method), - _controller(controller), - _request(request), - _response(response), - _done(done) - { - } - - ~MethodHandler() - { - _socket.reset(); - _done->Run(); - } - - static void execute(shared_ptr this_ptr) - { - int index = htonl(this_ptr->_method->index()); - int rlen = this_ptr->_request->ByteSize(); - int len = htonl(rlen); - - int mlen = sizeof(index) + sizeof(len) + rlen; - - void * data = asio::buffer_cast(this_ptr->_buffer.prepare(mlen)); - - data = void_write(data, index); - data = void_write(data, len); - - using google::protobuf::io::ArrayOutputStream; - - ArrayOutputStream as(data, rlen); - - this_ptr->_request->SerializeToZeroCopyStream(&as); - this_ptr->_buffer.commit(mlen); - - (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(), - boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write, - this_ptr, - asio::placeholders::error, - asio::placeholders::bytes_transferred)); - } - - static void handle_write(shared_ptr this_ptr, - const error_code& e, - std::size_t bytes_transferred) - { - if (!e) - { - this_ptr->_buffer.consume(bytes_transferred); - - if (this_ptr->_buffer.size()) - { - (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(), - boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write, - this_ptr, - asio::placeholders::error, - asio::placeholders::bytes_transferred)); - return; - } - - (*(this_ptr->_socket))->async_receive( - buffer(&this_ptr->_len, sizeof(this_ptr->_len)), - boost::bind( - &ProtoBufRpcChannel::MethodHandler::handle_read_len, - this_ptr, - asio::placeholders::error, - asio::placeholders::bytes_transferred) - ); - } - else - { - this_ptr->_controller->SetFailed(e.message()); - (*(this_ptr->_socket))->close(); - } - } - - static void handle_read_len(shared_ptr this_ptr, - const error_code& e, - std::size_t bytes_transferred) - { - if (!e && bytes_transferred == sizeof(this_ptr->_len)) - { - this_ptr->_len = ntohl(this_ptr->_len); - (*(this_ptr->_socket))->async_receive( - this_ptr->_buffer.prepare(this_ptr->_len), - boost::bind( - &ProtoBufRpcChannel::MethodHandler::handle_read_response, - this_ptr, - asio::placeholders::error, - asio::placeholders::bytes_transferred - ) - ); - } - else - { - this_ptr->_controller->SetFailed(e.message()); - (*(this_ptr->_socket))->close(); - } - } - - static void handle_read_response(shared_ptr this_ptr, - const error_code& e, - std::size_t bytes_transferred) - { - if (!e) - { - this_ptr->_buffer.commit(bytes_transferred); - if (this_ptr->_buffer.size() >= this_ptr->_len) - { - using google::protobuf::io::ArrayInputStream; - using google::protobuf::io::CodedInputStream; - - const void* data = asio::buffer_cast( - this_ptr->_buffer.data() - ); - ArrayInputStream as(data, this_ptr->_len); - CodedInputStream is(&as); - is.SetTotalBytesLimit(512 * 1024 * 1024, -1); - - if (!this_ptr->_response->ParseFromCodedStream(&is)) - { - throw std::runtime_error("ParseFromCodedStream"); - } - - this_ptr->_buffer.consume(this_ptr->_len); - } - else - { - (*(this_ptr->_socket))->async_receive( - this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()), - boost::bind( - &ProtoBufRpcChannel::MethodHandler::handle_read_response, - this_ptr, - asio::placeholders::error, - asio::placeholders::bytes_transferred - ) - ); - return; - } - } - else - { - this_ptr->_controller->SetFailed(e.message()); - (*(this_ptr->_socket))->close(); - } - } - -private: - auto_ptr _socket; - const MethodDescriptor * _method; - RpcController * _controller; - const Message * _request; - Message * _response; - Closure * _done; - asio::streambuf _buffer; - unsigned int _len; - bool _status; - unsigned int _sent; -}; - - -ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost, - const string &port) - :_remote_host(remotehost), _port(port), - _resolver(_io_service), - _acceptor(_io_service), - _pool(2000, _io_service), - _lame_socket(_io_service), - _thread() -// &asio::io_service::run, -// &_io_service))) -{ - - tcp::resolver::query query(_remote_host, _port); - tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query); - tcp::resolver::iterator end; - - error_code error = asio::error::host_not_found; - - if (endpoint_iterator == end) throw syserr::system_error(error); - - _pool.setEndpoint(*endpoint_iterator); - - tcp::endpoint e(tcp::v4(), 0); - _acceptor.open(e.protocol()); - _acceptor.set_option(tcp::acceptor::reuse_address(true)); - _acceptor.bind(e); - _acceptor.listen(); - _acceptor.async_accept(_lame_socket, - boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this, - asio::placeholders::error)); - - _thread = shared_ptr(new thread( - boost::bind( - &asio::io_service::run, - &_io_service))); -} - -void ProtoBufRpcChannel::lame_handle_accept(const error_code &err) -{ - if (!err) - { - _acceptor.async_accept(_lame_socket, - boost::bind(&ProtoBufRpcChannel::lame_handle_accept, - this, - asio::placeholders::error)); - } -} - -ProtoBufRpcChannel::~ProtoBufRpcChannel() -{ - _pool.cancelAndClear(); - - _io_service.stop(); - - _thread->join(); -} - -void ProtoBufRpcChannel::CallMethod( - const MethodDescriptor * method, - RpcController * controller, - const Message * request, - Message * response, - Closure * done) -{ - shared_ptr h( - new MethodHandler( - auto_ptr(new SocketCheckout(&_pool)), - method, - controller, - request, - response, - done - )); - - MethodHandler::execute(h); -} - -} // namespace bicker diff --git a/kvstore/protobufrpc/protobufrpc.h b/kvstore/protobufrpc/protobufrpc.h deleted file mode 100644 index 221903d..0000000 --- a/kvstore/protobufrpc/protobufrpc.h +++ /dev/null @@ -1,173 +0,0 @@ -#ifndef __PROTOBUFRPC_H__ -#define __PROTOBUFRPC_H__ - -#include -#include -#include -#include -#include -#include -#include -#include "socket_pool.h" -#include "util.h" - -#include -#include -#include -#include -#include - -using namespace std; -using namespace google::protobuf; -using namespace boost; - -namespace bicker -{ - -class ProtoBufRpcService; - -class ProtoBufRpcConnection - : public enable_shared_from_this, - private boost::noncopyable -{ -public: - explicit ProtoBufRpcConnection(asio::io_service& io_service, - Service *_service); - - asio::ip::tcp::socket& socket(); - - void start(); - - void writeResponse(Message *msg); - - -private: - void handle_read(const error_code& e, - std::size_t bytes_transferred); - - void handle_write(const error_code& e, - std::size_t bytes_transferred); - - tcp::socket _socket; - - - asio::io_service::strand _strand; - - Service *_service; - - asio::streambuf _buffer; - - int _id; - unsigned int _len; - - enum { - STATE_NONE, - STATE_HAVE_ID_AND_LEN, - STATE_WAITING_FOR_DATA, - STATE_FAIL, - } _state; -}; - - -class ProtoBufRpcServer -{ -public: - ProtoBufRpcServer(); - - bool registerService(uint16_t port, - shared_ptr< Service> service); - - // So we can call this as a thread. - void run(); - - // So we can stop.. - void shutdown(); - -protected: - - class RegisteredService - { - public: - RegisteredService( - shared_ptr io_service, - uint16_t port, - shared_ptr service - ); - - void handle_accept(const error_code& e); - - protected: - // Ref to parent's - shared_ptr _io_service; - uint16_t _port; - shared_ptr _service; - tcp::endpoint _endpoint; - tcp::acceptor _acceptor; - shared_ptr _new_connection; - }; - - list > _services; - shared_ptr _io_service; -}; - -class ProtoBufRpcController : public RpcController -{ -public: - ProtoBufRpcController(); - virtual ~ProtoBufRpcController(); - - virtual void Reset(); - virtual bool Failed() const; - virtual string ErrorText() const; - virtual void StartCancel(); - - virtual void SetFailed(const string &reason); - virtual bool IsCanceled() const; - virtual void NotifyOnCancel(Closure *callback); -}; - -class ProtoBufRpcChannel - : public RpcChannel, - public enable_shared_from_this, - private boost::noncopyable -{ -public: - ProtoBufRpcChannel(const string &remotehost, const string &port); - - virtual ~ProtoBufRpcChannel(); - - virtual void CallMethod( - const MethodDescriptor * method, - RpcController * controller, - const Message * request, - Message * response, - Closure * done); - -protected: - shared_ptr getSocket(); - void putSocket(shared_ptr); - -private: - class MethodHandler; - - string _remote_host; - string _port; - - - asio::io_service _io_service; - tcp::resolver _resolver; - // This exists to keep the io service running - tcp::acceptor _acceptor; - - SocketPool _pool; - - asio::ip::tcp::socket _lame_socket; - void lame_handle_accept(const error_code &err); - - shared_ptr _thread; - -}; - -} // namespace bicker - -#endif diff --git a/kvstore/protobufrpc/socket_pool.cc b/kvstore/protobufrpc/socket_pool.cc deleted file mode 100644 index 454da55..0000000 --- a/kvstore/protobufrpc/socket_pool.cc +++ /dev/null @@ -1,101 +0,0 @@ -#include "socket_pool.h" - -SocketPool::SocketPool(int max_sockets, - asio::io_service &io_svc - ) -:_issued(0), - _max_sockets(max_sockets), - _io_service(io_svc) -{ -} - -void SocketPool::setEndpoint(const tcp::endpoint &endpoint) -{ - _endpoint = endpoint; -} - -void SocketPool::cancelAndClear() -{ - for (set >::iterator i = _set.begin(); - i != _set.end(); - ++i) - { - (*i)->cancel(); - } - - while (!_queue.empty()) _queue.pop(); - _set.clear(); -} - -shared_ptr SocketPool::getSocket() -{ - mutex::scoped_lock lock(_sockets_lock); - - while (_queue.size() == 0 && _issued >= _max_sockets) - _sockets_non_empty.wait(lock); - - if (_queue.size()) - { - shared_ptr socket = _queue.front(); - _queue.pop(); - - return socket; - } - else - { - ++_issued; - error_code error = asio::error::host_not_found; - - shared_ptr socket(new tcp::socket(_io_service)); - socket->connect(_endpoint, error); - - if (error) throw syserr::system_error(error); - - _set.insert(socket); - - return socket; - } -} - -void SocketPool::putSocket(shared_ptr socket) -{ - mutex::scoped_lock lock(_sockets_lock); - - if (!socket->is_open()) - { - cerr << "socket closed\n"; - --_issued; - _set.erase(socket); - } - else - { - _queue.push(socket); - } - - _sockets_non_empty.notify_one(); -} - -SocketCheckout::SocketCheckout(SocketPool *pool) - :_socket(pool->getSocket()), - _pool(pool) -{ -} - -SocketCheckout::~SocketCheckout() -{ - _pool->putSocket(_socket); -} -tcp::socket& SocketCheckout::operator*() -{ - return *_socket; -} - -tcp::socket* SocketCheckout::operator->() -{ - return _socket.get(); -} - -shared_ptr& SocketCheckout::socket() -{ - return _socket; -} diff --git a/kvstore/protobufrpc/socket_pool.h b/kvstore/protobufrpc/socket_pool.h deleted file mode 100644 index 57faf42..0000000 --- a/kvstore/protobufrpc/socket_pool.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef _SOCKET_POOL_H_ -#define _SOCKET_POOL_H_ 1 - -#include -#include -#include -#include -#include - -#include "util.h" - -using namespace std; -using namespace boost; - - -class SocketPool -{ -public: - SocketPool(int max_streams, - asio::io_service &io_svc); - void setEndpoint(const tcp::endpoint &endpoint); - void cancelAndClear(); - shared_ptr getSocket(); - void putSocket(shared_ptr socket); -private: - int _issued; - int _max_sockets; - mutex _sockets_lock; - condition_variable _sockets_non_empty; - asio::io_service &_io_service; - tcp::endpoint _endpoint; - queue > _queue; - set > _set; -}; - -class SocketCheckout -{ -public: - SocketCheckout(SocketPool *pool); - ~SocketCheckout(); - - tcp::socket& operator*(); - tcp::socket* operator->(); - - shared_ptr& socket(); - -private: - shared_ptr _socket; - SocketPool *_pool; -}; - -#endif diff --git a/kvstore/protobufrpc/util.h b/kvstore/protobufrpc/util.h deleted file mode 100644 index 966921f..0000000 --- a/kvstore/protobufrpc/util.h +++ /dev/null @@ -1,37 +0,0 @@ -#ifndef _UTIL_H_ -#define _UTIL_H_ 1 - -#include - -#if BOOST_VERSION <= 103500 -#include -#include -#include -#include -#include -//typedef boost::condition condition_variable ; -//typedef boost::detail::thread::scoped_lock scoped_lock; -using asio::ip::tcp; -using asio::error_code; -using asio::buffers_begin; -namespace syserr=asio; -#else -#if BOOST_VERSION < 104000 -#include -#include -#include -using boost::asio::ip::tcp; -using boost::system::error_code; -using boost::system::system_error; -namespace syserr=boost::system; -#else -#include -#include -using boost::asio::ip::tcp; -using boost::system::error_code; -using boost::system::system_error; -namespace syserr=boost::system; -#endif -#endif - -#endif diff --git a/kvstore/protobufrpc/workqueue.cc b/kvstore/protobufrpc/workqueue.cc deleted file mode 100644 index 27861aa..0000000 --- a/kvstore/protobufrpc/workqueue.cc +++ /dev/null @@ -1,125 +0,0 @@ -#include "workqueue.h" -#include - -namespace bicker -{ - -WorkQueue::WorkQueue() - :_thread_count(0), _min_threads(1), _max_threads(50), _running(true) -{ - for (int i = 0; i < _min_threads; ++i) - { - spawnThread(); - } -} - -WorkQueue::WorkQueue(int thread_count) - :_thread_count(0), - _min_threads(1), _max_threads(thread_count), _running(true) -{ - for (int i = 0; i < _min_threads; ++i) - { - spawnThread(); - } -} - -WorkQueue::~WorkQueue() -{ - _running = false; - - { - mutex::scoped_lock lock(_queue_lock); - _queue_non_empty.notify_all(); - } - - _threads.join_all(); -} - -void WorkQueue::spawnThread() -{ - ++_thread_count; - _threads.create_thread(Worker(this)); -} - -shared_ptr WorkQueue::get() -{ - mutex::scoped_lock lock(_queue_lock); - - while (_queue.size() == 0) - { - _queue_non_empty.wait(lock); - if (!_running) throw interrupted_error(); - } - - shared_ptr back = _queue.front(); - _queue.pop(); - - if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread(); - - return back; -} - -void WorkQueue::put(shared_ptr work_unit) -{ - mutex::scoped_lock lock(_queue_lock); - - _queue.push(work_unit); - - _queue_non_empty.notify_one(); -} - -WorkQueue::Worker::Worker(WorkQueue* queue) - :_queue(queue) -{ -} - -void WorkQueue::Worker::operator()() -{ - while (true) - { - try - { - shared_ptr unit = _queue->get(); - - unit->run(); - } - catch (interrupted_error) - { - return; - } - } -} - -TaskNotification::TaskNotification() -:_expected(0), _count(0), _fail_count(0) -{ -} - -void TaskNotification::registerTask() -{ - mutex::scoped_lock lock(_lock); - ++_expected; -} - -void TaskNotification::completeTask(bool success) -{ - mutex::scoped_lock lock(_lock); - if (!success) ++_fail_count; - if (++_count == _expected) _cond.notify_all(); -} - -void TaskNotification::waitForComplete() -{ - mutex::scoped_lock lock(_lock); - while (_count < _expected) - { - _cond.wait(lock); - } -} - -bool TaskNotification::failCount() -{ - return _fail_count; -} - -} // namespace bicker diff --git a/kvstore/protobufrpc/workqueue.h b/kvstore/protobufrpc/workqueue.h deleted file mode 100644 index cfb2e02..0000000 --- a/kvstore/protobufrpc/workqueue.h +++ /dev/null @@ -1,78 +0,0 @@ -#ifndef __WORKQUEUE_H__ -#define __WORKQUEUE_H__ 1 - -#include -#include -#include -#include "util.h" - -using namespace boost; -using namespace std; - -namespace bicker -{ - struct interrupted_error : public virtual std::exception { }; - - class WorkUnit - { - public: - virtual ~WorkUnit() {}; - virtual void run() = 0; - }; - - class WorkQueue - { - public: - WorkQueue(); - WorkQueue(int thread_count); - - ~WorkQueue(); - - shared_ptr get(); - void put(shared_ptr work_unit); - - protected: - void spawnThread(); - - private: - class Worker - { - public: - Worker(WorkQueue* queue); - void operator()(); - private: - WorkQueue *_queue; - }; - - int _thread_count; - int _min_threads; - int _max_threads; - mutex _queue_lock; - condition_variable _queue_non_empty; - queue > _queue; - thread_group _threads; - volatile bool _running; - }; - - class TaskNotification - { - public: - TaskNotification(); - - void registerTask(); - void completeTask(bool success = true); - - void waitForComplete(); - - bool failCount(); - private: - int _expected; - int _count; - int _fail_count; - mutex _lock; - condition_variable _cond; - }; - -} // namespace bicker - -#endif diff --git a/kvstore/socket_pool.cc b/kvstore/socket_pool.cc new file mode 100644 index 0000000..454da55 --- /dev/null +++ b/kvstore/socket_pool.cc @@ -0,0 +1,101 @@ +#include "socket_pool.h" + +SocketPool::SocketPool(int max_sockets, + asio::io_service &io_svc + ) +:_issued(0), + _max_sockets(max_sockets), + _io_service(io_svc) +{ +} + +void SocketPool::setEndpoint(const tcp::endpoint &endpoint) +{ + _endpoint = endpoint; +} + +void SocketPool::cancelAndClear() +{ + for (set >::iterator i = _set.begin(); + i != _set.end(); + ++i) + { + (*i)->cancel(); + } + + while (!_queue.empty()) _queue.pop(); + _set.clear(); +} + +shared_ptr SocketPool::getSocket() +{ + mutex::scoped_lock lock(_sockets_lock); + + while (_queue.size() == 0 && _issued >= _max_sockets) + _sockets_non_empty.wait(lock); + + if (_queue.size()) + { + shared_ptr socket = _queue.front(); + _queue.pop(); + + return socket; + } + else + { + ++_issued; + error_code error = asio::error::host_not_found; + + shared_ptr socket(new tcp::socket(_io_service)); + socket->connect(_endpoint, error); + + if (error) throw syserr::system_error(error); + + _set.insert(socket); + + return socket; + } +} + +void SocketPool::putSocket(shared_ptr socket) +{ + mutex::scoped_lock lock(_sockets_lock); + + if (!socket->is_open()) + { + cerr << "socket closed\n"; + --_issued; + _set.erase(socket); + } + else + { + _queue.push(socket); + } + + _sockets_non_empty.notify_one(); +} + +SocketCheckout::SocketCheckout(SocketPool *pool) + :_socket(pool->getSocket()), + _pool(pool) +{ +} + +SocketCheckout::~SocketCheckout() +{ + _pool->putSocket(_socket); +} +tcp::socket& SocketCheckout::operator*() +{ + return *_socket; +} + +tcp::socket* SocketCheckout::operator->() +{ + return _socket.get(); +} + +shared_ptr& SocketCheckout::socket() +{ + return _socket; +} diff --git a/kvstore/socket_pool.h b/kvstore/socket_pool.h new file mode 100644 index 0000000..57faf42 --- /dev/null +++ b/kvstore/socket_pool.h @@ -0,0 +1,52 @@ +#ifndef _SOCKET_POOL_H_ +#define _SOCKET_POOL_H_ 1 + +#include +#include +#include +#include +#include + +#include "util.h" + +using namespace std; +using namespace boost; + + +class SocketPool +{ +public: + SocketPool(int max_streams, + asio::io_service &io_svc); + void setEndpoint(const tcp::endpoint &endpoint); + void cancelAndClear(); + shared_ptr getSocket(); + void putSocket(shared_ptr socket); +private: + int _issued; + int _max_sockets; + mutex _sockets_lock; + condition_variable _sockets_non_empty; + asio::io_service &_io_service; + tcp::endpoint _endpoint; + queue > _queue; + set > _set; +}; + +class SocketCheckout +{ +public: + SocketCheckout(SocketPool *pool); + ~SocketCheckout(); + + tcp::socket& operator*(); + tcp::socket* operator->(); + + shared_ptr& socket(); + +private: + shared_ptr _socket; + SocketPool *_pool; +}; + +#endif diff --git a/kvstore/util.h b/kvstore/util.h new file mode 100644 index 0000000..966921f --- /dev/null +++ b/kvstore/util.h @@ -0,0 +1,37 @@ +#ifndef _UTIL_H_ +#define _UTIL_H_ 1 + +#include + +#if BOOST_VERSION <= 103500 +#include +#include +#include +#include +#include +//typedef boost::condition condition_variable ; +//typedef boost::detail::thread::scoped_lock scoped_lock; +using asio::ip::tcp; +using asio::error_code; +using asio::buffers_begin; +namespace syserr=asio; +#else +#if BOOST_VERSION < 104000 +#include +#include +#include +using boost::asio::ip::tcp; +using boost::system::error_code; +using boost::system::system_error; +namespace syserr=boost::system; +#else +#include +#include +using boost::asio::ip::tcp; +using boost::system::error_code; +using boost::system::system_error; +namespace syserr=boost::system; +#endif +#endif + +#endif diff --git a/kvstore/workqueue.cc b/kvstore/workqueue.cc new file mode 100644 index 0000000..27861aa --- /dev/null +++ b/kvstore/workqueue.cc @@ -0,0 +1,125 @@ +#include "workqueue.h" +#include + +namespace bicker +{ + +WorkQueue::WorkQueue() + :_thread_count(0), _min_threads(1), _max_threads(50), _running(true) +{ + for (int i = 0; i < _min_threads; ++i) + { + spawnThread(); + } +} + +WorkQueue::WorkQueue(int thread_count) + :_thread_count(0), + _min_threads(1), _max_threads(thread_count), _running(true) +{ + for (int i = 0; i < _min_threads; ++i) + { + spawnThread(); + } +} + +WorkQueue::~WorkQueue() +{ + _running = false; + + { + mutex::scoped_lock lock(_queue_lock); + _queue_non_empty.notify_all(); + } + + _threads.join_all(); +} + +void WorkQueue::spawnThread() +{ + ++_thread_count; + _threads.create_thread(Worker(this)); +} + +shared_ptr WorkQueue::get() +{ + mutex::scoped_lock lock(_queue_lock); + + while (_queue.size() == 0) + { + _queue_non_empty.wait(lock); + if (!_running) throw interrupted_error(); + } + + shared_ptr back = _queue.front(); + _queue.pop(); + + if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread(); + + return back; +} + +void WorkQueue::put(shared_ptr work_unit) +{ + mutex::scoped_lock lock(_queue_lock); + + _queue.push(work_unit); + + _queue_non_empty.notify_one(); +} + +WorkQueue::Worker::Worker(WorkQueue* queue) + :_queue(queue) +{ +} + +void WorkQueue::Worker::operator()() +{ + while (true) + { + try + { + shared_ptr unit = _queue->get(); + + unit->run(); + } + catch (interrupted_error) + { + return; + } + } +} + +TaskNotification::TaskNotification() +:_expected(0), _count(0), _fail_count(0) +{ +} + +void TaskNotification::registerTask() +{ + mutex::scoped_lock lock(_lock); + ++_expected; +} + +void TaskNotification::completeTask(bool success) +{ + mutex::scoped_lock lock(_lock); + if (!success) ++_fail_count; + if (++_count == _expected) _cond.notify_all(); +} + +void TaskNotification::waitForComplete() +{ + mutex::scoped_lock lock(_lock); + while (_count < _expected) + { + _cond.wait(lock); + } +} + +bool TaskNotification::failCount() +{ + return _fail_count; +} + +} // namespace bicker diff --git a/kvstore/workqueue.h b/kvstore/workqueue.h new file mode 100644 index 0000000..cfb2e02 --- /dev/null +++ b/kvstore/workqueue.h @@ -0,0 +1,78 @@ +#ifndef __WORKQUEUE_H__ +#define __WORKQUEUE_H__ 1 + +#include +#include +#include +#include "util.h" + +using namespace boost; +using namespace std; + +namespace bicker +{ + struct interrupted_error : public virtual std::exception { }; + + class WorkUnit + { + public: + virtual ~WorkUnit() {}; + virtual void run() = 0; + }; + + class WorkQueue + { + public: + WorkQueue(); + WorkQueue(int thread_count); + + ~WorkQueue(); + + shared_ptr get(); + void put(shared_ptr work_unit); + + protected: + void spawnThread(); + + private: + class Worker + { + public: + Worker(WorkQueue* queue); + void operator()(); + private: + WorkQueue *_queue; + }; + + int _thread_count; + int _min_threads; + int _max_threads; + mutex _queue_lock; + condition_variable _queue_non_empty; + queue > _queue; + thread_group _threads; + volatile bool _running; + }; + + class TaskNotification + { + public: + TaskNotification(); + + void registerTask(); + void completeTask(bool success = true); + + void waitForComplete(); + + bool failCount(); + private: + int _expected; + int _count; + int _fail_count; + mutex _lock; + condition_variable _cond; + }; + +} // namespace bicker + +#endif