--- /dev/null
+apt-get install mercurial build-essential libdb4.6-dev libasio-dev libboost1.35-dev libprotobuf-dev libgtest-dev protobuf-compiler scons
+
+rm -f /tmp/__db* /tmp/test.db;./kvstore -d /tmp & KVPID=$! ;sleep 1 ;time
+./kvbench --high 20 --servers 127.0.0.1:9090 ;kill $KVPID
+
+(0x42, 0x5A, 0x68)
+
+ grep '1f 8b 08 00'
+
+x=/boot/vmlinuz-2.6.26-2-amd64 p=`echo -e '\x8b\x08\x00'` o=`grep -a -b
+-o -m 1 -e $p $x | cut -d: -f 1` dd if=$x skip=$o bs=1 | zcat > /tmp/vmlinux
+
+x=/boot/vmlinuz-2.6.26-2-amd64 dd if=$x skip=`grep -a -b -o -m 1 -e $'\xf1\x8b\x08\x00 $x | cut -d: -f 1` bs=1 | zcat > /tmp/vmlinux
--- /dev/null
+Import('env')
+
+protobuf = Builder(
+ action = (
+ 'protoc --proto_path=${SOURCE.dir} --cpp_out=${TARGET.dir} $SOURCE'
+ ),
+ single_source = True,
+ )
+
+env["BUILDERS"]["Protobuf"] = protobuf
+
+env.ParseConfig("echo -I./protobufrpc")
+env.ParseConfig("echo -I/opt/local/include -L/opt/local/lib")
+
+if not env.has_key('LIBS'):
+ env['LIBS'] = []
+
+env['LIBS'] += ['protobuf',
+ 'pthread',
+ 'boost_thread-mt',
+ 'boost_regex-mt',
+ 'boost_system-mt',
+ 'boost_program_options-mt',
+ 'db']
+
+protobufrpc = SConscript(dirs=['./protobufrpc/'], exports='env')
+
+kvstore_proto_files = env.Protobuf(
+ target = [ 'kvstore.pb.cc', 'kvstore.pb.h' ],
+ source = 'kvstore.proto'
+ )
+
+kvservice_files = ['kvservice.cc', 'backend.cc']
+kvclient_files = ['kvclient.cc']
+
+kvservice = env.StaticLibrary('kvservice', kvservice_files)
+kvclient = env.StaticLibrary('kvclient', kvclient_files)
+kvstore_proto = env.StaticLibrary('kvstore', kvstore_proto_files)
+
+env.Program('kvstore', ['kvstore.cc']
+ + kvservice
+ + kvstore_proto
+ + protobufrpc)
+
+env.Program('kvtest',
+ ['kvtest.cc']
+ + kvservice
+ + kvclient
+ + kvstore_proto
+ + protobufrpc,
+ LIBS=env['LIBS']+['gtest'])
+
+env.Program('kvbench',
+ ['kvbench.cc']
+ + kvclient
+ + kvstore_proto
+ + protobufrpc)
+
+products = {
+ 'kvservice': kvservice,
+ 'kvclient': kvclient,
+ 'kvstore_proto': kvstore_proto,
+ }
+
+Return('products')
--- /dev/null
+import os
+
+AddOption('--prefix',
+ dest='prefix',
+ type='string',
+ nargs=1,
+ action='store',
+ metavar='DIR',
+ help='installation prefix')
+
+env = Environment(CXXFLAGS='-O3 -fPIC -W -Wall -g',
+ PREFIX=GetOption('prefix'),
+ LINKFLAGS='',#-pg',
+ )
+
+
+for envvar in ('HOME', 'DISTCC_DIR', 'DISTCC_HOSTS', 'CCACHE_DIR',
+ 'INTERCEPTOR_SOCKET', 'ENFORGE_DIGEST_CACHE',
+ 'ENFORGE_CACHE_HOST', 'ENFORGE_CACHE_PORT',
+ 'PATH'):
+ if envvar in os.environ:
+ env['ENV'][envvar] = os.environ[envvar]
+
+#env['CXX'] = 'distcc g++'
+#env['CXX'] = 'icc'
+
+if env['PREFIX'] is not None:
+ bin_dest = env['PREFIX'] + '/usr/bin'
+else:
+ bin_dest = '/usr/bin'
+
+#env.ParseConfig("echo -lprofiler -ltcmalloc")
+
+SConscript(dirs=['./'], exports='env')
+
+if 'install' in COMMAND_LINE_TARGETS:
+ env.Install(bin_dest, 'kvstore')
+
+env.Alias('install', bin_dest)
--- /dev/null
+#include "backend.h"
+#include "util.h"
+
+#include <iostream>
+#include <boost/functional/hash.hpp>
+
+/* For bdb */
+extern "C" {
+#include <sys/types.h>
+#include <fcntl.h>
+#include <limits.h>
+}
+
+
+namespace kvstore
+{
+
+MemoryBackend::~MemoryBackend()
+{
+}
+
+bool MemoryBackend::Put(const string &key,
+ const string &value)
+{
+ _map[key] = value;
+ return true;
+}
+
+bool MemoryBackend::Get(const string &key,
+ string *value)
+{
+ map_t::iterator it = _map.find(key);
+
+ if (it == _map.end())
+ {
+ return false;
+ }
+ else
+ {
+ *value = it->second;
+ return true;
+ }
+}
+
+void set_DBT(DBT *thing, const string &str)
+{
+ bzero(thing, sizeof(DBT));
+ thing->data = (void*)str.data();
+ thing->size = str.size();
+ thing->ulen = str.size();
+}
+
+class BDBBackend::BDBDatabase
+{
+public:
+ BDBDatabase(const string &path,
+ bool flush,
+ bool log_in_memory,
+ size_t num_dbs)
+ :_path(path)
+ {
+ int res = db_env_create(&_dbenv, 0);
+
+ if (res != 0)
+ {
+ cerr << db_strerror(res) << endl;
+ throw std::runtime_error("db_env_create fail");
+ }
+
+ /* Set Cache Size To Total Memory */
+ if (true)
+ {
+ double use_fraction = 0.1;
+ uint64_t pages = sysconf(_SC_PHYS_PAGES);
+ uint64_t page_size = sysconf(_SC_PAGE_SIZE);
+
+ uint64_t bytes = pages * page_size * use_fraction / num_dbs;
+
+ uint32_t gbytes = bytes / (1024uLL * 1024uLL * 1024uLL);
+ uint32_t nbytes = bytes % (1024uLL * 1024uLL * 1024uLL);
+ uint32_t ncache = bytes / (1024uLL * 1024uLL * 1024uLL * 4uLL) + 1;
+
+ res = _dbenv->set_cachesize(_dbenv, gbytes, nbytes, ncache);
+
+ if (res != 0)
+ {
+ cerr << db_strerror(res) << endl;
+ throw std::runtime_error("set_cachesize");
+ }
+ }
+
+ if (log_in_memory)
+ {
+ res = _dbenv->set_flags(_dbenv, DB_LOG_INMEMORY, 1);
+
+ if (res != 0)
+ {
+ cerr << db_strerror(res) << endl;
+ throw std::runtime_error("BDB ENV DB_LOG_INMEMORY");
+ }
+
+ }
+
+ res = _dbenv->set_flags(_dbenv, DB_LOG_AUTOREMOVE, 1);
+
+ if (res != 0)
+ {
+ cerr << db_strerror(res) << endl;
+ throw std::runtime_error("BDB ENV DB_LOG_AUTOREMOVE");
+ }
+
+ res = _dbenv->open(_dbenv,
+ _path.c_str(),
+ DB_INIT_CDB
+ | DB_INIT_MPOOL
+ | DB_CREATE
+ | DB_THREAD,
+ 0644);
+
+
+ if (res != 0)
+ {
+ cerr << db_strerror(res) << endl;
+ throw std::runtime_error("BDB ENV Open Fail");
+ }
+
+ string dbfilename = _path + "/test.db";
+
+ /* Flush */
+ if (flush)
+ {
+ res = _dbenv->dbremove(_dbenv, NULL, dbfilename.c_str(), "test", 0);
+
+ if (res != 0 && res != ENOENT)
+ {
+ cerr << db_strerror(res) << endl;
+ throw std::runtime_error("db remove failed");
+ }
+ }
+
+ res = db_create(&_db, _dbenv, 0);
+
+ if (res != 0)
+ {
+ cerr << db_strerror(res) << endl;
+ throw std::runtime_error("db_create fail");
+ }
+
+ uint32_t flags = DB_CREATE | DB_THREAD;
+
+ res = _db->open(_db,
+ NULL, /* TXN */
+ dbfilename.c_str(),
+ "test",
+ DB_BTREE,
+ flags,
+ 0644);
+
+ if (res != 0)
+ {
+ cerr << db_strerror(res) << endl;
+ throw std::runtime_error("BDB Open Fail");
+ }
+
+
+ }
+
+ ~BDBDatabase()
+ {
+ int res;
+
+ if (_db)
+ {
+ if ((res = _db->close(_db, 0)) < 0)
+ {
+ cerr << db_strerror(res) << endl;
+ }
+ }
+
+ if (_dbenv)
+ {
+ if ((res = _dbenv->close(_dbenv, 0)) < 0)
+ {
+ cerr << db_strerror(res) << endl;
+ }
+ }
+ }
+
+ bool Put(const string &key,
+ const string &value)
+ {
+ DBT bdb_key, bdb_value;
+
+ set_DBT(&bdb_key, key);
+ set_DBT(&bdb_value, value);
+
+ if (_db->put(_db, NULL, &bdb_key, &bdb_value, 0) != 0)
+ {
+ perror("bdb put");
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ bool Get(const string &key,
+ string *value)
+ {
+ DBT bdb_key, bdb_value;
+
+ set_DBT(&bdb_key, key);
+
+ bzero(&bdb_value, sizeof(DBT));
+ bdb_value.flags = DB_DBT_MALLOC;
+
+ int res = _db->get(_db, NULL, &bdb_key, &bdb_value, 0);
+
+ if (res == 0)
+ {
+ *value = string((char*)bdb_value.data, bdb_value.size);
+ free(bdb_value.data);
+ bdb_value.data = NULL;
+ return true;
+ }
+ else if (res == DB_NOTFOUND || res == DB_KEYEMPTY)
+ {
+ return false;
+ }
+ else
+ {
+ /* ERR */
+ cerr << db_strerror(res) << endl;
+ return false;
+ }
+ }
+
+private:
+ DB *_db;
+ DB_ENV *_dbenv;
+ const string _path;
+};
+
+BDBBackend::BDBBackend(const vector<string> &paths,
+ bool flush,
+ bool log_in_memory)
+{
+ if (paths.size() < 1)
+ {
+ cerr << "Insufficient BDB Paths supplied (need at least 1)";
+ throw std::runtime_error("not enough paths");
+ }
+
+
+ for (size_t i = 0; i < paths.size(); ++i)
+ {
+ cerr << "db for " << paths[i] << endl;
+ _dbs.push_back(shared_ptr<BDBDatabase>(new BDBDatabase(paths[i],
+ flush,
+ log_in_memory,
+ paths.size())));
+ }
+}
+
+BDBBackend::~BDBBackend()
+{
+}
+
+
+static boost::hash<string> hasher;
+
+inline size_t BDBBackend::LookupKeyDB(const string &key)
+{
+ uint32_t hash = (uint32_t)hasher(key);
+ return hash % _dbs.size();
+}
+
+bool BDBBackend::Put(const string &key,
+ const string &value)
+{
+ size_t i = LookupKeyDB(key);
+
+ return _dbs[i]->Put(key, value);
+}
+
+bool BDBBackend::Get(const string &key,
+ string *value)
+{
+ size_t i = LookupKeyDB(key);
+ return _dbs[i]->Get(key, value);
+}
+
+
+} // namespace kvstore
--- /dev/null
+#ifndef _BACKEND_H_
+#define _BACKEND_H_ 1
+
+extern "C"
+{
+#include <db.h>
+}
+
+#include <string>
+#include <map>
+
+#include <boost/shared_ptr.hpp>
+
+#include "util.h"
+
+using namespace std;
+using boost::shared_ptr;
+
+namespace kvstore
+{
+
+class Backend
+{
+public:
+ virtual ~Backend() {};
+
+ virtual bool Put(const string &key,
+ const string &value) = 0;
+
+ virtual bool Get(const string &key,
+ string *value) = 0;
+
+};
+
+class MemoryBackend : public Backend
+{
+public:
+ virtual ~MemoryBackend();
+
+ virtual bool Put(const string &key,
+ const string &value);
+
+ virtual bool Get(const string &key,
+ string *value);
+
+private:
+ boost::mutex _lock;
+ typedef map<string, string> map_t;
+ map_t _map;
+};
+
+class BDBBackend : public Backend
+{
+public:
+ BDBBackend(const vector<string> &paths,
+ bool flush=true,
+ bool log_in_memory = false);
+
+ virtual ~BDBBackend();
+ virtual bool Put(const string &key,
+ const string &value);
+
+ virtual bool Get(const string &key,
+ string *value);
+
+private:
+ class BDBDatabase;
+
+ inline size_t LookupKeyDB(const string &key);
+
+ vector< shared_ptr<BDBDatabase> > _dbs;
+};
+
+} // namespace kvstore
+
+#endif
--- /dev/null
+bicker-fcgi for Debian
+----------------------
+
+<possible notes regarding this package - if none, delete this file>
+
+ -- John McCullough <jcm@salud.ucsd.edu> Tue, 29 Sep 2009 13:29:18 -0700
--- /dev/null
+bicker-fcgi for Debian
+----------------------
+
+<this file describes information about the source package, see Debian policy
+manual section 4.14. You WILL either need to modify or delete this file>
+
+
+
+
--- /dev/null
+kvstore (0.1-7) unstable; urgency=low
+
+ * Reduced memory copying in protobuf, support for multiple data
+ targets
+
+ -- John McCullough <jcm@salud.ucsd.edu> Thu, 21 Jan 2010 11:10:45 -0800
+
+kvstore (0.1-6) unstable; urgency=low
+
+ * Improved RPC Layer
+
+ -- John McCullough <jcm@salud.ucsd.edu> Thu, 24 Dec 2009 14:25:41 -0800
+
+kvstore (0.1-5) unstable; urgency=low
+
+ * KeyValue -> KVStore rename.
+ * Fixed Dependencies
+
+ -- John McCullough <jcm@salud.ucsd.edu> Sun, 29 Nov 2009 16:17:23 -0800
+
+kvstore (0.1-3) unstable; urgency=low
+
+ * More Packaging Fun
+
+ -- John McCullough <jcm@salud.ucsd.edu> Sun, 29 Nov 2009 15:34:41 -0800
+
+kvstore (0.1-2) unstable; urgency=low
+
+ * Incremented boost version to .0
+
+ -- John McCullough <jcm@salud.ucsd.edu> Sun, 29 Nov 2009 15:32:01 -0800
+
+kvstore (0.1-1) unstable; urgency=low
+
+ * Initial release (Closes: #nnnn) <nnnn is the bug number of your ITP>
+
+ -- John McCullough <jcm@salud.ucsd.edu> Tue, 29 Sep 2009 13:29:18 -0700
--- /dev/null
+Source: kvstore
+Section: unknown
+Priority: extra
+Maintainer: John McCullough <jcm@salud.ucsd.edu>
+Build-Depends: debhelper (>= 7), libboost-thread1.35-dev, libboost-system1.35-dev, libboost1.35-dev, libasio-dev, protobuf-compiler, libprotobuf-dev, libdb4.6-dev, libgtest-dev
+Standards-Version: 3.8.3,
+Homepage: <insert the upstream URL, if relevant>
+
+Package: kvstore
+Architecture: any
+Depends: ${misc:Depends}, libboost-thread1.35.0, libboost-regex1.35.0, libboost-program-options1.35.0, libboost-system1.35.0, libprotobuf4, libdb4.6, libgtest0
+Description: Homebrew key value store
+ <insert long description, indented with spaces>
--- /dev/null
+This work was packaged for Debian by:
+
+ John McCullough <jcm@salud.ucsd.edu> on Tue, 29 Sep 2009 13:29:18 -0700
+
+It was downloaded from <url://example.com>
+
+Upstream Author(s):
+
+ <put author's name and email here>
+ <likewise for another author>
+
+Copyright:
+
+ <Copyright (C) YYYY Name OfAuthor>
+ <likewise for another author>
+
+License:
+
+ <Put the license of the package here indented by 4 spaces>
+
+The Debian packaging is:
+
+ Copyright (C) 2009 John McCullough <jcm@salud.ucsd.edu>
+
+# Please chose a license for your packaging work. If the program you package
+# uses a mainstream license, using the same license is the safest choice.
+# Please avoid to pick license terms that are more restrictive than the
+# packaged work, as it may make Debian's contributions unacceptable upstream.
+# If you just want it to be GPL version 3, leave the following lines in.
+
+and is licensed under the GPL version 3,
+see `/usr/share/common-licenses/GPL-3'.
+
+# Please also look if there are files or directories which have a
+# different copyright/license attached and list them here.
--- /dev/null
+usr/bin/kvstore
--- /dev/null
+#!/usr/bin/make -f
+# -*- makefile -*-
+# Sample debian/rules that uses debhelper.
+# This file was originally written by Joey Hess and Craig Small.
+# As a special exception, when this file is copied by dh-make into a
+# dh-make output file, you may use that output file without restriction.
+# This special exception was added by Craig Small in version 0.37 of dh-make.
+
+# Uncomment this to turn on verbose mode.
+#export DH_VERBOSE=1
+build:
+ dh_testdir
+ scons
+
+install: build
+ dh_testdir
+ dh_testroot
+ dh_prep
+ dh_installdirs
+ scons --prefix=$(CURDIR)/debian/tmp install
+
+clean:
+ scons -c
+ dh_clean
+
+binary-arch: install
+ dh_testdir
+ dh_testroot
+ dh_installchangelogs
+ dh_install
+ dh_link
+ #dh_strip
+ dh_compress
+ dh_fixperms
+ dh_installdeb
+ dh_gencontrol
+ dh_md5sums
+ dh_builddeb
+
+binary: binary-arch
+
+%:
+ dh $@
--- /dev/null
+#include <boost/program_options.hpp>
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <string>
+#include "kvservice.h"
+#include "kvclient.h"
+#include "protobufrpc.h"
+
+using namespace bicker;
+using namespace boost;
+using namespace kvstore;
+using namespace std;
+
+namespace po = boost::program_options;
+
+class KVBench
+{
+public:
+ KVBench(const vector<string> &hosts)
+ :_kv_client(list<string>(hosts.begin(), hosts.end()))
+ {
+ }
+
+ virtual ~KVBench()
+ {
+ }
+
+
+ void Bench(const size_t size,
+ const size_t count)
+ {
+ string data(size, 'A');
+
+ for (size_t i = 0; i < count; ++i)
+ {
+ ostringstream key;
+ key << "key_" << size << "_" << i << endl;
+
+ _kv_client.Put(key.str(), data);
+ }
+
+ for (size_t i = 0; i < count; ++i)
+ {
+ string value;
+ ostringstream key;
+ key << "key_" << size << "_" << i << endl;
+
+ _kv_client.Get(key.str(), &value);
+ }
+ }
+
+protected:
+ KeyValueClient _kv_client;
+};
+
+
+int
+main(
+ int argc,
+ char **argv
+ )
+{
+ size_t opt_low;
+ size_t opt_high;
+ size_t opt_count;
+
+ po::options_description options("Options");
+
+ options.add_options()
+ ("help,h", "display help message")
+ ("servers",
+ po::value< vector<string> >()->multitoken(),
+ "server:port ... server:port")
+ ("low,l",
+ po::value<size_t>(&opt_low)->default_value(1),
+ "low 2^i")
+ ("high,H",
+ po::value<size_t>(&opt_high)->default_value(16),
+ "high 2^i")
+ ("count,c",
+ po::value<size_t>(&opt_count)->default_value(100),
+ "count of each size")
+ ;
+
+ po::variables_map vm;
+ po::store(po::parse_command_line(argc, argv, options), vm);
+ po::notify(vm);
+
+ if (vm.count("help"))
+ {
+ cerr << options << endl;
+ return 1;
+ }
+
+ if (!vm.count("servers"))
+ {
+ cerr << "No Servers Specified" << endl;
+ return 1;
+ }
+
+
+ KVBench bench(vm["servers"].as< vector<string> >());
+
+ for (size_t i = opt_low; i <= opt_high; ++i)
+ {
+ cout << i << ": " << (1<<i) << endl;
+ bench.Bench(1<<i, opt_count);
+ }
+
+ return 0;
+}
--- /dev/null
+#include "kvclient.h"
+#include <boost/tuple/tuple.hpp>
+#include <boost/functional/hash.hpp>
+
+using namespace boost;
+using namespace kvrpc;
+
+namespace kvstore
+{
+
+class KeyValueClientRouter
+{
+public:
+ KeyValueClientRouter()
+ {
+ }
+
+ void AddHost(const string &host, const string &port)
+ {
+ mutex::scoped_lock lock(_lock);
+
+ _channels.push_back(shared_ptr<ProtoBufRpcChannel>(new ProtoBufRpcChannel(host, port)));
+ _clients.push_back(shared_ptr<kvrpc::KeyValueService_Stub>(new KeyValueService_Stub(static_cast<RpcChannel*>(_channels.back().get()))));
+ }
+
+ kvrpc::KeyValueService_Stub* Route(const string &key)
+ {
+ static hash<string> hasher;
+
+ uint32_t hash = (uint32_t)hasher(key);
+ int id = hash % _clients.size();
+
+ return _clients[id].get();
+ }
+
+private:
+ mutex _lock;
+ vector< shared_ptr<ProtoBufRpcChannel> > _channels;
+ vector< shared_ptr<kvrpc::KeyValueService_Stub> > _clients;
+};
+
+KeyValueClient::KeyValueClient(const string& host,
+ const string& port)
+:_router(new KeyValueClientRouter())
+{
+ _router->AddHost(host, port);
+}
+
+KeyValueClient::KeyValueClient(const list<string> &hosts)
+:_router(new KeyValueClientRouter())
+{
+ for (list<string>::const_iterator i = hosts.begin();
+ i != hosts.end();
+ ++i)
+ {
+ size_t pos = i->find(':');
+ if (pos == string::npos)
+ throw runtime_error("couldn't parse host");
+
+ string host = i->substr(0, pos);
+ string port = i->substr(pos+1);
+
+ _router->AddHost(host, port);
+ }
+}
+
+typedef tuple<shared_ptr<RpcController>,
+ shared_ptr<Put>,
+ shared_ptr<PutReply> > rpc_put_state_tuple_t;
+
+void cleanupPut(rpc_put_state_tuple_t t, TaskNotification *tn)
+{
+ tn->completeTask(t.get<2>()->result() == kvrpc::SUCCESS);
+}
+
+bool
+KeyValueClient::Put(const string& key,
+ const string& value)
+{
+ TaskNotification tn;
+
+ this->Put(key, value, tn);
+
+ tn.waitForComplete();
+
+ return tn.failCount() == 0;
+}
+
+bool
+KeyValueClient::Put(const string& key,
+ const string& value,
+ TaskNotification &tn)
+{
+ tn.registerTask();
+
+ shared_ptr<RpcController> ctrl(new ProtoBufRpcController());
+ shared_ptr< ::kvrpc::Put> put(new ::kvrpc::Put());
+
+ put->set_key(key);
+ put->set_value(value);
+
+ shared_ptr<PutReply> reply(new PutReply());
+
+
+ _router->Route(key)->PutValue(ctrl.get(), put.get(), reply.get(),
+ NewCallback(&cleanupPut,
+ rpc_put_state_tuple_t(ctrl,put,reply),
+ &tn));
+
+ return true;
+}
+
+typedef tuple<shared_ptr<RpcController>,
+ shared_ptr<Get>,
+ shared_ptr<GetReply>,
+ string*> rpc_get_state_tuple_t;
+
+void cleanupGet(rpc_get_state_tuple_t t, TaskNotification *tn)
+{
+ bool result = t.get<2>()->result() == kvrpc::SUCCESS;
+
+ if (result)
+ {
+ string* result_ptr = t.get<3>();
+ *result_ptr = t.get<2>()->value();
+ }
+
+ tn->completeTask(result);
+}
+
+bool
+KeyValueClient::Get(const string& key, string* value)
+{
+ TaskNotification tn;
+
+ this->Get(key, value, tn);
+
+ tn.waitForComplete();
+
+ return tn.failCount() == 0;
+}
+
+bool
+KeyValueClient::Get(const string& key, string* value, TaskNotification &tn)
+{
+ tn.registerTask();
+
+ shared_ptr<RpcController> ctrl(new ProtoBufRpcController());
+ shared_ptr< ::kvrpc::Get> get(new ::kvrpc::Get());
+
+ get->set_key(key);
+
+ shared_ptr<GetReply> reply(new GetReply());
+
+ _router->Route(key)->GetValue(ctrl.get(), get.get(), reply.get(),
+ NewCallback(&cleanupGet,
+ rpc_get_state_tuple_t(ctrl,
+ get,
+ reply,
+ value),
+ &tn));
+
+ return true;
+}
+
+}
--- /dev/null
+#ifndef _KVCLIENT_H_
+#define _KVCLIENT_H_ 1
+
+#include "kvstore.pb.h"
+#include "workqueue.h"
+#include "protobufrpc.h"
+
+using namespace bicker;
+
+namespace kvstore
+{
+ class KeyValueClientRouter;
+
+ class KeyValueClient
+ {
+ public:
+ KeyValueClient(const string& host,
+ const string& port);
+
+ KeyValueClient(const list<string> &hosts);
+
+ bool Put(const string& key,
+ const string& value);
+
+ bool Put(const string& key,
+ const string& value,
+ TaskNotification &tn);
+
+ bool Get(const string& key, string* value);
+
+ bool Get(const string& key,
+ string* value,
+ TaskNotification &tn);
+
+ private:
+ shared_ptr<KeyValueClientRouter> _router;
+ };
+} // namespace kvstore
+
+#endif
--- /dev/null
+#include "kvservice.h"
+#include <iostream>
+
+using namespace std;
+
+namespace kvstore
+{
+
+KeyValueRpcService::KeyValueRpcService(Backend *backend)
+ :_backend(backend)
+{
+}
+
+KeyValueRpcService::~KeyValueRpcService()
+{
+}
+
+void KeyValueRpcService::PutValue(
+ ::google::protobuf::RpcController* /*controller*/,
+ const ::kvrpc::Put* request,
+ ::kvrpc::PutReply* response,
+ ::google::protobuf::Closure* done)
+{
+ if (_backend->Put(request->key(), request->value()))
+ {
+ response->set_result(kvrpc::SUCCESS);
+ }
+ else
+ {
+ response->set_result(kvrpc::FAILURE);
+ }
+
+ done->Run();
+}
+
+void KeyValueRpcService::GetValue(
+ ::google::protobuf::RpcController* /*controller*/,
+ const ::kvrpc::Get* request,
+ ::kvrpc::GetReply* response,
+ ::google::protobuf::Closure* done)
+{
+ string value;
+ if (_backend->Get(request->key(), &value))
+ {
+ response->set_result(kvrpc::SUCCESS);
+ response->set_value(value);
+ }
+ else
+ {
+ response->set_result(kvrpc::FAILURE);
+ }
+ done->Run();
+}
+
+}; // namespace kvstore
--- /dev/null
+#ifndef _KVSERVICE_H_
+#define _KVSERVICE_H_ 1
+
+#include "kvstore.pb.h"
+#include "backend.h"
+
+#include <memory>
+
+using std::auto_ptr;
+
+namespace kvstore
+{
+ class KeyValueRpcService : public ::kvrpc::KeyValueService
+ {
+ public:
+ KeyValueRpcService(Backend *backend);
+
+ virtual ~KeyValueRpcService();
+
+ virtual void PutValue(::google::protobuf::RpcController* controller,
+ const ::kvrpc::Put* request,
+ ::kvrpc::PutReply* response,
+ ::google::protobuf::Closure* done);
+ virtual void GetValue(::google::protobuf::RpcController* controller,
+ const ::kvrpc::Get* request,
+ ::kvrpc::GetReply* response,
+ ::google::protobuf::Closure* done);
+ private:
+ auto_ptr<Backend> _backend;
+
+ };
+} // namespace kvstore
+
+#endif
--- /dev/null
+#include <boost/program_options.hpp>
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <string>
+#include <ctime>
+#include <csignal>
+#include "kvservice.h"
+#include "protobufrpc.h"
+
+using namespace bicker;
+using namespace boost;
+using namespace kvstore;
+using namespace std;
+
+namespace po = boost::program_options;
+
+int interrupted_count = 0;
+ProtoBufRpcServer rpc_server;
+
+void
+interrupted(int /*signal*/)
+{
+ if (interrupted_count < 1)
+ {
+ rpc_server.shutdown();
+ }
+ else
+ {
+ exit(1);
+ }
+ ++interrupted_count;
+}
+
+int
+main(
+ int argc,
+ char **argv
+ )
+{
+ shared_ptr<thread> server_thread;
+
+ /* Initialize Exception Handling */
+ struct sigaction action;
+
+ action.sa_handler = &interrupted;
+ action.sa_flags = SA_SIGINFO;
+ sigemptyset(&action.sa_mask);
+
+ if (sigaction(SIGINT, &action, NULL) == -1)
+ {
+ perror("sigaction");
+ exit(1);
+ }
+
+ /* Parse Options */
+ string opt_bdb_home;
+ uint16_t opt_port;
+ uint16_t opt_sleep;
+
+ po::options_description options("Options");
+
+ options.add_options()
+ ("help,h", "display help message")
+ ("bdb-home,d",
+ po::value< vector<string> >(),
+ "bdb home directories")
+ ("port",
+ po::value<uint16_t>(&opt_port)->default_value(9090),
+ "listen port")
+ ("foreground,f", "don't fork to the background")
+ ("sleep,s", po::value<uint16_t>(&opt_sleep)->default_value(0), "sleep then exit")
+ ;
+
+ po::variables_map vm;
+ po::store(po::parse_command_line(argc, argv, options), vm);
+ po::notify(vm);
+
+ if (vm.count("help"))
+ {
+ cerr << options << endl;
+ return 1;
+ }
+
+
+ vector<string> paths;
+
+ if (vm.count("bdb-home") > 0)
+ {
+ paths = vm["bdb-home"].as< vector<string> >();
+ }
+ else
+ {
+ paths.push_back("./");
+ }
+
+
+ shared_ptr<Service> service(new KeyValueRpcService(new BDBBackend(paths)));
+
+ rpc_server.registerService(opt_port, service);
+
+ server_thread.reset(
+ new thread(
+ boost::bind(
+ &ProtoBufRpcServer::run,
+ &rpc_server)));
+
+ if (!vm.count("foreground"))
+ {
+ // Daemonize
+ if (daemon(0,0) != 0)
+ {
+ perror("daemonizing");
+ exit(1);
+ }
+ }
+
+ if (opt_sleep == 0)
+ {
+ server_thread->join();
+ }
+ else
+ {
+ time_t t0 = time(NULL);
+ time_t t1;
+
+ do
+ {
+ t1 = time(NULL);
+ sleep(opt_sleep - (t1 - t0));
+ } while (t1 - t0 < opt_sleep);
+ }
+ rpc_server.shutdown();
+
+ return 0;
+}
+
+
--- /dev/null
+package kvrpc;
+
+enum Result
+{
+ SUCCESS = 0;
+ FAILURE = 1;
+}
+
+message Put
+{
+ required string key = 1;
+ required string value = 2;
+}
+
+message PutReply
+{
+ required Result result = 1;
+}
+
+message Get
+{
+ required string key = 1;
+}
+
+message GetReply
+{
+ required Result result = 1;
+ optional string value = 2;
+}
+
+service KeyValueService
+{
+ rpc PutValue(Put) returns(PutReply);
+ rpc GetValue(Get) returns(GetReply);
+}
--- /dev/null
+Name: kvstore
+Version: 0.1
+Release: 1
+Summary: A Simple kv store application
+License: BSD
+Group:System Environment/Base
+Source0: http://salud.ucsd.edu/~jcm/scaling/kvstore-0.1.tar.gz
+BuildRoot: %{_tmppath}/%{name}%{version}%{release}root%(%{__id_u} jcm)
+BuildArch: x86_64
+Requires: boost protobuf db4
+BuildRequires: boost-devel gtest-devel protobuf-devel db4-devel
+
+%description
+
+Key Value Store with simple backend and google protocol buffer rpc.
+
+%prep
+%setup -q
+
+%build
+scons %{?_smp_mflags}
+
+%install
+rm -rf %{buildroot}
+mkdir -p %{buildroot}%{_bindir}
+scons --prefix=%{buildroot} install
+
+%clean
+scons -c
+
+%files
+%defattr(-,root,root,-)
+%{_bindir}/kvstore
--- /dev/null
+#include <boost/shared_ptr.hpp>
+#include <gtest/gtest.h>
+#include "workqueue.h"
+#include "kvservice.h"
+#include "kvclient.h"
+#include "protobufrpc.h"
+
+using namespace boost;
+using namespace kvstore;
+using namespace bicker;
+using namespace kvrpc;
+
+/* http://code.google.com/p/googletest/wiki/GoogleTestPrimer */
+
+class KVTest : public ::testing::Test
+{
+public:
+ KVTest()
+ :_kv_client("127.0.0.1", "9090")
+ {
+ vector<string> data_dirs;
+ data_dirs.push_back("/tmp");
+
+ shared_ptr<Service> service(new KeyValueRpcService(new BDBBackend(data_dirs)));
+ _rpc_server.registerService(9090, service);
+
+ _server_thread.reset(
+ new thread(
+ boost::bind(
+ &ProtoBufRpcServer::run,
+ &_rpc_server)));
+
+ }
+
+ virtual ~KVTest()
+ {
+ _rpc_server.shutdown();
+ _server_thread->join();
+ }
+protected:
+
+ ProtoBufRpcServer _rpc_server;
+ KeyValueClient _kv_client;
+ shared_ptr<thread> _server_thread;
+};
+
+
+TEST_F(KVTest, PutTest)
+{
+ ASSERT_TRUE(_kv_client.Put("test", "value"));
+ string value;
+ ASSERT_TRUE(_kv_client.Get("test", &value));
+ ASSERT_EQ(value, "value");
+}
+
+TEST_F(KVTest, MedPutTest)
+{
+ string test_value(1024*10, '6');
+ ASSERT_TRUE(_kv_client.Put("test", test_value));
+ string value;
+ ASSERT_TRUE(_kv_client.Get("test", &value));
+ ASSERT_TRUE(value == test_value);
+}
+
+TEST_F(KVTest, BigPutTest)
+{
+ string test_value(1024*1024*10, '6');
+ ASSERT_TRUE(_kv_client.Put("test", test_value));
+ string value;
+ ASSERT_TRUE(_kv_client.Get("test", &value));
+ ASSERT_EQ(value.size(), test_value.size());
+ ASSERT_TRUE(value == test_value);
+}
+
+TEST_F(KVTest, LoopPutTest)
+{
+ string test_value(1024*1024, '6');
+
+ for (int i = 0; i < 10; ++i)
+ {
+ ostringstream key;
+ key << "test" << i;
+ ASSERT_TRUE(_kv_client.Put(key.str(), test_value));
+ string value;
+ ASSERT_TRUE(_kv_client.Get(key.str(), &value));
+ ASSERT_TRUE(value == test_value);
+ }
+}
+
+TEST_F(KVTest, EmptyTest)
+{
+ string value;
+ ASSERT_FALSE(_kv_client.Get("test", &value));
+}
+
+int
+main(
+ int argc,
+ char **argv
+ )
+{
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
--- /dev/null
+Import('env')
+
+base_files = ['protobufrpc.cc', 'socket_pool.cc', 'workqueue.cc']
+
+protobufrpc = env.StaticLibrary('protobufrpc', base_files)
+
+Return('protobufrpc')
--- /dev/null
+import os
+
+AddOption('--prefix',
+ dest='prefix',
+ type='string',
+ nargs=1,
+ action='store',
+ metavar='DIR',
+ help='installation prefix')
+
+env = Environment(CXXFLAGS='-g -fPIC', PREFIX=GetOption('prefix'))
+
+for envvar in ('HOME', 'DISTCC_DIR', 'DISTCC_HOSTS', 'CCACHE_DIR',
+ 'INTERCEPTOR_SOCKET', 'ENFORGE_DIGEST_CACHE',
+ 'ENFORGE_CACHE_HOST', 'ENFORGE_CACHE_PORT'):
+ if envvar in os.environ:
+ env['ENV'][envvar] = os.environ[envvar]
+
+if env['PREFIX'] is not None:
+ bin_dest = env['PREFIX'] + '/usr/bin'
+else:
+ bin_dest = '/usr/bin'
+
+base_files = ['protobufrpc.cc', 'socket_pool.cc']
+
+if not env.has_key('LIBS'):
+ env['LIBS'] = []
+
+SConscript(['SConscript'])
--- /dev/null
+#include "protobufrpc.h"
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include <boost/functional/hash.hpp>
+
+using namespace std;
+
+using namespace boost;
+using asio::buffer;
+
+namespace bicker
+{
+
+template<typename T>
+static void* void_write(void* data, T val)
+{
+ *((T*)data) = val;
+ return (char*)data + sizeof(T);
+}
+
+class ProtoBufRpcServiceRequest
+{
+public:
+ ProtoBufRpcServiceRequest(
+ RpcController *ctrl,
+ const MethodDescriptor* method,
+ Message *request,
+ Message *response,
+ shared_ptr<ProtoBufRpcConnection> conn
+ )
+ :_ctrl(ctrl),
+ _method(method),
+ _request(request),
+ _response(response),
+ _conn(conn)
+ {
+ }
+
+ ~ProtoBufRpcServiceRequest()
+ {
+
+ }
+
+ static void run(ProtoBufRpcServiceRequest *req)
+ {
+
+ req->_conn->writeResponse(req->_response.get());
+
+ delete req;
+ }
+
+ shared_ptr<RpcController> _ctrl;
+ const MethodDescriptor *_method;
+ shared_ptr<Message> _request;
+ shared_ptr<Message> _response;
+ shared_ptr<ProtoBufRpcConnection> _conn;
+};
+
+ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service,
+ Service *service)
+:_socket(io_service),
+ _strand(io_service),
+ _service(service),
+ _state(STATE_NONE)
+{
+}
+
+tcp::socket& ProtoBufRpcConnection::socket()
+{
+ return _socket;
+}
+
+void ProtoBufRpcConnection::start()
+{
+ _socket.async_read_some(_buffer.prepare(4096),
+ _strand.wrap(
+ boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(),
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)));
+}
+
+void ProtoBufRpcConnection::writeResponse(Message *msg)
+{
+ int rlen = msg->ByteSize();
+ int len = htonl(rlen);
+ int mlen = sizeof(len) + rlen;
+
+ void * data = asio::buffer_cast<void*>(_buffer.prepare(mlen));
+
+ data = void_write(data, len);
+
+ using google::protobuf::io::ArrayOutputStream;
+
+ ArrayOutputStream as(data, rlen);
+
+ msg->SerializeToZeroCopyStream(&as);
+
+ _buffer.commit(mlen);
+
+ asio::async_write(_socket,
+ _buffer.data(),
+ _strand.wrap(
+ boost::bind(&ProtoBufRpcConnection::handle_write,
+ shared_from_this(),
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)));
+}
+
+
+void ProtoBufRpcConnection::handle_read(const error_code& e,
+ std::size_t bytes_transferred)
+{
+ if (!e)
+ {
+ _buffer.commit(bytes_transferred);
+
+ if (_state == STATE_NONE)
+ {
+ if (_buffer.size() >= sizeof(_id) + sizeof(_len))
+ {
+ string b(
+ buffers_begin(_buffer.data()),
+ buffers_begin(_buffer.data())
+ + sizeof(_id) + sizeof(_len)
+ );
+
+ _buffer.consume(sizeof(_id) + sizeof(_len));
+
+ _id = *((int*)b.c_str());
+ _id = ntohl(_id);
+
+ _len = *((unsigned int*)(b.c_str() + sizeof(_id)));
+ _len = ntohl(_len);
+
+ _state = STATE_HAVE_ID_AND_LEN;
+ }
+ else
+ {
+ start();
+ }
+ }
+
+ if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA)
+ {
+ if (_buffer.size() >= _len)
+ {
+ const MethodDescriptor* method =
+ _service->GetDescriptor()->method(_id);
+
+ Message *req = _service->GetRequestPrototype(method).New();
+ Message *resp = _service->GetResponsePrototype(method).New();
+
+ using google::protobuf::io::ArrayInputStream;
+ using google::protobuf::io::CodedInputStream;
+
+ const void* data = asio::buffer_cast<const void*>(
+ _buffer.data()
+ );
+ ArrayInputStream as(data, _len);
+ CodedInputStream is(&as);
+ is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+
+ if (!req->ParseFromCodedStream(&is))
+ {
+ throw std::runtime_error("ParseFromCodedStream");
+ }
+
+ _buffer.consume(_len);
+
+ ProtoBufRpcController *ctrl = new ProtoBufRpcController();
+ _service->CallMethod(method,
+ ctrl,
+ req,
+ resp,
+ NewCallback(
+ &ProtoBufRpcServiceRequest::run,
+ new ProtoBufRpcServiceRequest(
+ ctrl,
+ method,
+ req,
+ resp,
+ shared_from_this())
+ )
+ );
+ _state = STATE_NONE;
+ }
+ else
+ {
+ _state = STATE_WAITING_FOR_DATA;
+ start();
+ }
+ }
+
+ }
+ else
+ {
+ error_code ignored_ec;
+ _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+ }
+}
+
+void ProtoBufRpcConnection::handle_write(const error_code& e,
+ std::size_t bytes_transferred)
+{
+ if (e)
+ {
+ error_code ignored_ec;
+ _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+ }
+ else
+ {
+ _buffer.consume(bytes_transferred);
+
+ if (_buffer.size())
+ {
+ asio::async_write(_socket,
+ _buffer.data(),
+ _strand.wrap(
+ boost::bind(&ProtoBufRpcConnection::handle_write,
+ shared_from_this(),
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)));
+ return;
+ }
+
+ _state = STATE_NONE;
+ start();
+ }
+}
+
+ProtoBufRpcServer::ProtoBufRpcServer()
+ :_io_service(new asio::io_service())
+{
+}
+
+bool ProtoBufRpcServer::registerService(uint16_t port,
+ shared_ptr<Service> service)
+{
+ // This is not thread safe
+
+ // The RegisteredService Constructor fires up the appropriate
+ // async accepts for the service
+ _services.push_back(shared_ptr<RegisteredService>(
+ new RegisteredService(
+ _io_service,
+ port,
+ service)));
+
+ return true;
+}
+
+void run_wrapper(asio::io_service *io_service)
+{
+ struct itimerval itimer;
+ setitimer(ITIMER_PROF, &itimer, NULL);
+
+ io_service->run();
+}
+
+void ProtoBufRpcServer::run()
+{
+ try
+ {
+ if (_services.size() == 0)
+ {
+ throw std::runtime_error("No services registered for ProtoBufRpcServer");
+ }
+
+ size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN);
+
+ vector<shared_ptr<thread> > threads;
+ for (size_t i = 0; i < nprocs; ++i)
+ {
+ shared_ptr<thread> t(new thread(
+ boost::bind(
+ //&run_wrapper,
+ &asio::io_service::run,
+ _io_service.get())));
+ threads.push_back(t);
+ }
+
+ for (size_t i = 0; i < threads.size(); ++i)
+ {
+ threads[i]->join();
+ }
+ }
+ catch (std::exception &e)
+ {
+ std::cerr << "ProtoBufRpcService" << e.what() << std::endl;
+ }
+}
+
+void ProtoBufRpcServer::shutdown()
+{
+ _io_service->stop();
+}
+
+ProtoBufRpcServer::RegisteredService::RegisteredService(
+ shared_ptr<asio::io_service> io_service,
+ uint16_t port,
+ shared_ptr<Service> service
+ )
+:_io_service(io_service),
+ _port(port),
+ _service(service),
+ _endpoint(tcp::v4(), _port),
+ _acceptor(*_io_service),
+ _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get()))
+{
+ _acceptor.open(_endpoint.protocol());
+ _acceptor.set_option(tcp::acceptor::reuse_address(true));
+ _acceptor.bind(_endpoint);
+ _acceptor.listen();
+ _acceptor.async_accept(_new_connection->socket(),
+ boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
+ this,
+ asio::placeholders::error));
+}
+
+void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e)
+{
+ if (!e)
+ {
+ _new_connection->start();
+ _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get()));
+ _acceptor.async_accept(_new_connection->socket(),
+ boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
+ this,
+ asio::placeholders::error));
+ }
+
+}
+
+ProtoBufRpcController::ProtoBufRpcController()
+{
+}
+
+ProtoBufRpcController::~ProtoBufRpcController()
+{
+}
+
+void ProtoBufRpcController::Reset()
+{
+}
+
+bool ProtoBufRpcController::Failed() const
+{
+ return false;
+}
+
+string ProtoBufRpcController::ErrorText() const
+{
+ return "No Error";
+}
+
+void ProtoBufRpcController::StartCancel()
+{
+}
+
+void ProtoBufRpcController::SetFailed(const string &/*reason*/)
+{
+}
+
+bool ProtoBufRpcController::IsCanceled() const
+{
+ return false;
+}
+
+void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/)
+{
+}
+
+class ProtoBufRpcChannel::MethodHandler
+ : public enable_shared_from_this<MethodHandler>,
+ private boost::noncopyable
+{
+public:
+ MethodHandler(auto_ptr<SocketCheckout> socket,
+ const MethodDescriptor * method,
+ RpcController * controller,
+ const Message * request,
+ Message * response,
+ Closure * done
+ )
+ :_socket(socket),
+ _method(method),
+ _controller(controller),
+ _request(request),
+ _response(response),
+ _done(done)
+ {
+ }
+
+ ~MethodHandler()
+ {
+ _socket.reset();
+ _done->Run();
+ }
+
+ static void execute(shared_ptr<MethodHandler> this_ptr)
+ {
+ int index = htonl(this_ptr->_method->index());
+ int rlen = this_ptr->_request->ByteSize();
+ int len = htonl(rlen);
+
+ int mlen = sizeof(index) + sizeof(len) + rlen;
+
+ void * data = asio::buffer_cast<void*>(this_ptr->_buffer.prepare(mlen));
+
+ data = void_write(data, index);
+ data = void_write(data, len);
+
+ using google::protobuf::io::ArrayOutputStream;
+
+ ArrayOutputStream as(data, rlen);
+
+ this_ptr->_request->SerializeToZeroCopyStream(&as);
+ this_ptr->_buffer.commit(mlen);
+
+ (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+ boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred));
+ }
+
+ static void handle_write(shared_ptr<MethodHandler> this_ptr,
+ const error_code& e,
+ std::size_t bytes_transferred)
+ {
+ if (!e)
+ {
+ this_ptr->_buffer.consume(bytes_transferred);
+
+ if (this_ptr->_buffer.size())
+ {
+ (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+ boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred));
+ return;
+ }
+
+ (*(this_ptr->_socket))->async_receive(
+ buffer(&this_ptr->_len, sizeof(this_ptr->_len)),
+ boost::bind(
+ &ProtoBufRpcChannel::MethodHandler::handle_read_len,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred)
+ );
+ }
+ else
+ {
+ this_ptr->_controller->SetFailed(e.message());
+ (*(this_ptr->_socket))->close();
+ }
+ }
+
+ static void handle_read_len(shared_ptr<MethodHandler> this_ptr,
+ const error_code& e,
+ std::size_t bytes_transferred)
+ {
+ if (!e && bytes_transferred == sizeof(this_ptr->_len))
+ {
+ this_ptr->_len = ntohl(this_ptr->_len);
+ (*(this_ptr->_socket))->async_receive(
+ this_ptr->_buffer.prepare(this_ptr->_len),
+ boost::bind(
+ &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred
+ )
+ );
+ }
+ else
+ {
+ this_ptr->_controller->SetFailed(e.message());
+ (*(this_ptr->_socket))->close();
+ }
+ }
+
+ static void handle_read_response(shared_ptr<MethodHandler> this_ptr,
+ const error_code& e,
+ std::size_t bytes_transferred)
+ {
+ if (!e)
+ {
+ this_ptr->_buffer.commit(bytes_transferred);
+ if (this_ptr->_buffer.size() >= this_ptr->_len)
+ {
+ using google::protobuf::io::ArrayInputStream;
+ using google::protobuf::io::CodedInputStream;
+
+ const void* data = asio::buffer_cast<const void*>(
+ this_ptr->_buffer.data()
+ );
+ ArrayInputStream as(data, this_ptr->_len);
+ CodedInputStream is(&as);
+ is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+
+ if (!this_ptr->_response->ParseFromCodedStream(&is))
+ {
+ throw std::runtime_error("ParseFromCodedStream");
+ }
+
+ this_ptr->_buffer.consume(this_ptr->_len);
+ }
+ else
+ {
+ (*(this_ptr->_socket))->async_receive(
+ this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()),
+ boost::bind(
+ &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+ this_ptr,
+ asio::placeholders::error,
+ asio::placeholders::bytes_transferred
+ )
+ );
+ return;
+ }
+ }
+ else
+ {
+ this_ptr->_controller->SetFailed(e.message());
+ (*(this_ptr->_socket))->close();
+ }
+ }
+
+private:
+ auto_ptr<SocketCheckout> _socket;
+ const MethodDescriptor * _method;
+ RpcController * _controller;
+ const Message * _request;
+ Message * _response;
+ Closure * _done;
+ asio::streambuf _buffer;
+ unsigned int _len;
+ bool _status;
+ unsigned int _sent;
+};
+
+
+ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost,
+ const string &port)
+ :_remote_host(remotehost), _port(port),
+ _resolver(_io_service),
+ _acceptor(_io_service),
+ _pool(2000, _io_service),
+ _lame_socket(_io_service),
+ _thread()
+// &asio::io_service::run,
+// &_io_service)))
+{
+
+ tcp::resolver::query query(_remote_host, _port);
+ tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query);
+ tcp::resolver::iterator end;
+
+ error_code error = asio::error::host_not_found;
+
+ if (endpoint_iterator == end) throw syserr::system_error(error);
+
+ _pool.setEndpoint(*endpoint_iterator);
+
+ tcp::endpoint e(tcp::v4(), 0);
+ _acceptor.open(e.protocol());
+ _acceptor.set_option(tcp::acceptor::reuse_address(true));
+ _acceptor.bind(e);
+ _acceptor.listen();
+ _acceptor.async_accept(_lame_socket,
+ boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this,
+ asio::placeholders::error));
+
+ _thread = shared_ptr<thread>(new thread(
+ boost::bind(
+ &asio::io_service::run,
+ &_io_service)));
+}
+
+void ProtoBufRpcChannel::lame_handle_accept(const error_code &err)
+{
+ if (!err)
+ {
+ _acceptor.async_accept(_lame_socket,
+ boost::bind(&ProtoBufRpcChannel::lame_handle_accept,
+ this,
+ asio::placeholders::error));
+ }
+}
+
+ProtoBufRpcChannel::~ProtoBufRpcChannel()
+{
+ _pool.cancelAndClear();
+
+ _io_service.stop();
+
+ _thread->join();
+}
+
+void ProtoBufRpcChannel::CallMethod(
+ const MethodDescriptor * method,
+ RpcController * controller,
+ const Message * request,
+ Message * response,
+ Closure * done)
+{
+ shared_ptr<MethodHandler> h(
+ new MethodHandler(
+ auto_ptr<SocketCheckout>(new SocketCheckout(&_pool)),
+ method,
+ controller,
+ request,
+ response,
+ done
+ ));
+
+ MethodHandler::execute(h);
+}
+
+} // namespace bicker
--- /dev/null
+#ifndef __PROTOBUFRPC_H__
+#define __PROTOBUFRPC_H__
+
+#include <stdint.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread.hpp>
+#include <boost/function.hpp>
+#include <queue>
+#include <set>
+#include "socket_pool.h"
+#include "util.h"
+
+#include <google/protobuf/stubs/common.h>
+#include <google/protobuf/generated_message_reflection.h>
+#include <google/protobuf/repeated_field.h>
+#include <google/protobuf/extension_set.h>
+#include <google/protobuf/service.h>
+
+using namespace std;
+using namespace google::protobuf;
+using namespace boost;
+
+namespace bicker
+{
+
+class ProtoBufRpcService;
+
+class ProtoBufRpcConnection
+ : public enable_shared_from_this<ProtoBufRpcConnection>,
+ private boost::noncopyable
+{
+public:
+ explicit ProtoBufRpcConnection(asio::io_service& io_service,
+ Service *_service);
+
+ asio::ip::tcp::socket& socket();
+
+ void start();
+
+ void writeResponse(Message *msg);
+
+
+private:
+ void handle_read(const error_code& e,
+ std::size_t bytes_transferred);
+
+ void handle_write(const error_code& e,
+ std::size_t bytes_transferred);
+
+ tcp::socket _socket;
+
+
+ asio::io_service::strand _strand;
+
+ Service *_service;
+
+ asio::streambuf _buffer;
+
+ int _id;
+ unsigned int _len;
+
+ enum {
+ STATE_NONE,
+ STATE_HAVE_ID_AND_LEN,
+ STATE_WAITING_FOR_DATA,
+ STATE_FAIL,
+ } _state;
+};
+
+
+class ProtoBufRpcServer
+{
+public:
+ ProtoBufRpcServer();
+
+ bool registerService(uint16_t port,
+ shared_ptr< Service> service);
+
+ // So we can call this as a thread.
+ void run();
+
+ // So we can stop..
+ void shutdown();
+
+protected:
+
+ class RegisteredService
+ {
+ public:
+ RegisteredService(
+ shared_ptr<asio::io_service> io_service,
+ uint16_t port,
+ shared_ptr<Service> service
+ );
+
+ void handle_accept(const error_code& e);
+
+ protected:
+ // Ref to parent's
+ shared_ptr<asio::io_service> _io_service;
+ uint16_t _port;
+ shared_ptr<Service> _service;
+ tcp::endpoint _endpoint;
+ tcp::acceptor _acceptor;
+ shared_ptr<ProtoBufRpcConnection> _new_connection;
+ };
+
+ list<shared_ptr<RegisteredService> > _services;
+ shared_ptr<asio::io_service> _io_service;
+};
+
+class ProtoBufRpcController : public RpcController
+{
+public:
+ ProtoBufRpcController();
+ virtual ~ProtoBufRpcController();
+
+ virtual void Reset();
+ virtual bool Failed() const;
+ virtual string ErrorText() const;
+ virtual void StartCancel();
+
+ virtual void SetFailed(const string &reason);
+ virtual bool IsCanceled() const;
+ virtual void NotifyOnCancel(Closure *callback);
+};
+
+class ProtoBufRpcChannel
+ : public RpcChannel,
+ public enable_shared_from_this<ProtoBufRpcChannel>,
+ private boost::noncopyable
+{
+public:
+ ProtoBufRpcChannel(const string &remotehost, const string &port);
+
+ virtual ~ProtoBufRpcChannel();
+
+ virtual void CallMethod(
+ const MethodDescriptor * method,
+ RpcController * controller,
+ const Message * request,
+ Message * response,
+ Closure * done);
+
+protected:
+ shared_ptr<tcp::socket> getSocket();
+ void putSocket(shared_ptr<tcp::socket>);
+
+private:
+ class MethodHandler;
+
+ string _remote_host;
+ string _port;
+
+
+ asio::io_service _io_service;
+ tcp::resolver _resolver;
+ // This exists to keep the io service running
+ tcp::acceptor _acceptor;
+
+ SocketPool _pool;
+
+ asio::ip::tcp::socket _lame_socket;
+ void lame_handle_accept(const error_code &err);
+
+ shared_ptr<boost::thread> _thread;
+
+};
+
+} // namespace bicker
+
+#endif
--- /dev/null
+#include "socket_pool.h"
+
+SocketPool::SocketPool(int max_sockets,
+ asio::io_service &io_svc
+ )
+:_issued(0),
+ _max_sockets(max_sockets),
+ _io_service(io_svc)
+{
+}
+
+void SocketPool::setEndpoint(const tcp::endpoint &endpoint)
+{
+ _endpoint = endpoint;
+}
+
+void SocketPool::cancelAndClear()
+{
+ for (set<shared_ptr<tcp::socket> >::iterator i = _set.begin();
+ i != _set.end();
+ ++i)
+ {
+ (*i)->cancel();
+ }
+
+ while (!_queue.empty()) _queue.pop();
+ _set.clear();
+}
+
+shared_ptr<tcp::socket> SocketPool::getSocket()
+{
+ mutex::scoped_lock lock(_sockets_lock);
+
+ while (_queue.size() == 0 && _issued >= _max_sockets)
+ _sockets_non_empty.wait(lock);
+
+ if (_queue.size())
+ {
+ shared_ptr<tcp::socket> socket = _queue.front();
+ _queue.pop();
+
+ return socket;
+ }
+ else
+ {
+ ++_issued;
+ error_code error = asio::error::host_not_found;
+
+ shared_ptr<tcp::socket> socket(new tcp::socket(_io_service));
+ socket->connect(_endpoint, error);
+
+ if (error) throw syserr::system_error(error);
+
+ _set.insert(socket);
+
+ return socket;
+ }
+}
+
+void SocketPool::putSocket(shared_ptr<tcp::socket> socket)
+{
+ mutex::scoped_lock lock(_sockets_lock);
+
+ if (!socket->is_open())
+ {
+ cerr << "socket closed\n";
+ --_issued;
+ _set.erase(socket);
+ }
+ else
+ {
+ _queue.push(socket);
+ }
+
+ _sockets_non_empty.notify_one();
+}
+
+SocketCheckout::SocketCheckout(SocketPool *pool)
+ :_socket(pool->getSocket()),
+ _pool(pool)
+{
+}
+
+SocketCheckout::~SocketCheckout()
+{
+ _pool->putSocket(_socket);
+}
+tcp::socket& SocketCheckout::operator*()
+{
+ return *_socket;
+}
+
+tcp::socket* SocketCheckout::operator->()
+{
+ return _socket.get();
+}
+
+shared_ptr<tcp::socket>& SocketCheckout::socket()
+{
+ return _socket;
+}
--- /dev/null
+#ifndef _SOCKET_POOL_H_
+#define _SOCKET_POOL_H_ 1
+
+#include <iostream>
+#include <set>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+
+#include "util.h"
+
+using namespace std;
+using namespace boost;
+
+
+class SocketPool
+{
+public:
+ SocketPool(int max_streams,
+ asio::io_service &io_svc);
+ void setEndpoint(const tcp::endpoint &endpoint);
+ void cancelAndClear();
+ shared_ptr<tcp::socket> getSocket();
+ void putSocket(shared_ptr<tcp::socket> socket);
+private:
+ int _issued;
+ int _max_sockets;
+ mutex _sockets_lock;
+ condition_variable _sockets_non_empty;
+ asio::io_service &_io_service;
+ tcp::endpoint _endpoint;
+ queue<shared_ptr<tcp::socket> > _queue;
+ set<shared_ptr<tcp::socket> > _set;
+};
+
+class SocketCheckout
+{
+public:
+ SocketCheckout(SocketPool *pool);
+ ~SocketCheckout();
+
+ tcp::socket& operator*();
+ tcp::socket* operator->();
+
+ shared_ptr<tcp::socket>& socket();
+
+private:
+ shared_ptr<tcp::socket> _socket;
+ SocketPool *_pool;
+};
+
+#endif
--- /dev/null
+#ifndef _UTIL_H_
+#define _UTIL_H_ 1
+
+#include <boost/version.hpp>
+
+#if BOOST_VERSION <= 103500
+#include <boost/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+#include <asio.hpp>
+#include <asio/buffer.hpp>
+//typedef boost::condition condition_variable ;
+//typedef boost::detail::thread::scoped_lock<boost::mutex> scoped_lock;
+using asio::ip::tcp;
+using asio::error_code;
+using asio::buffers_begin;
+namespace syserr=asio;
+#else
+#if BOOST_VERSION < 104000
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
+#else
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
+#endif
+#endif
+
+#endif
--- /dev/null
+#include "workqueue.h"
+#include <boost/thread.hpp>
+
+namespace bicker
+{
+
+WorkQueue::WorkQueue()
+ :_thread_count(0), _min_threads(1), _max_threads(50), _running(true)
+{
+ for (int i = 0; i < _min_threads; ++i)
+ {
+ spawnThread();
+ }
+}
+
+WorkQueue::WorkQueue(int thread_count)
+ :_thread_count(0),
+ _min_threads(1), _max_threads(thread_count), _running(true)
+{
+ for (int i = 0; i < _min_threads; ++i)
+ {
+ spawnThread();
+ }
+}
+
+WorkQueue::~WorkQueue()
+{
+ _running = false;
+
+ {
+ mutex::scoped_lock lock(_queue_lock);
+ _queue_non_empty.notify_all();
+ }
+
+ _threads.join_all();
+}
+
+void WorkQueue::spawnThread()
+{
+ ++_thread_count;
+ _threads.create_thread(Worker(this));
+}
+
+shared_ptr<WorkUnit> WorkQueue::get()
+{
+ mutex::scoped_lock lock(_queue_lock);
+
+ while (_queue.size() == 0)
+ {
+ _queue_non_empty.wait(lock);
+ if (!_running) throw interrupted_error();
+ }
+
+ shared_ptr<WorkUnit> back = _queue.front();
+ _queue.pop();
+
+ if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread();
+
+ return back;
+}
+
+void WorkQueue::put(shared_ptr<WorkUnit> work_unit)
+{
+ mutex::scoped_lock lock(_queue_lock);
+
+ _queue.push(work_unit);
+
+ _queue_non_empty.notify_one();
+}
+
+WorkQueue::Worker::Worker(WorkQueue* queue)
+ :_queue(queue)
+{
+}
+
+void WorkQueue::Worker::operator()()
+{
+ while (true)
+ {
+ try
+ {
+ shared_ptr<WorkUnit> unit = _queue->get();
+
+ unit->run();
+ }
+ catch (interrupted_error)
+ {
+ return;
+ }
+ }
+}
+
+TaskNotification::TaskNotification()
+:_expected(0), _count(0), _fail_count(0)
+{
+}
+
+void TaskNotification::registerTask()
+{
+ mutex::scoped_lock lock(_lock);
+ ++_expected;
+}
+
+void TaskNotification::completeTask(bool success)
+{
+ mutex::scoped_lock lock(_lock);
+ if (!success) ++_fail_count;
+ if (++_count == _expected) _cond.notify_all();
+}
+
+void TaskNotification::waitForComplete()
+{
+ mutex::scoped_lock lock(_lock);
+ while (_count < _expected)
+ {
+ _cond.wait(lock);
+ }
+}
+
+bool TaskNotification::failCount()
+{
+ return _fail_count;
+}
+
+} // namespace bicker
--- /dev/null
+#ifndef __WORKQUEUE_H__
+#define __WORKQUEUE_H__ 1
+
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
+#include <queue>
+#include "util.h"
+
+using namespace boost;
+using namespace std;
+
+namespace bicker
+{
+ struct interrupted_error : public virtual std::exception { };
+
+ class WorkUnit
+ {
+ public:
+ virtual ~WorkUnit() {};
+ virtual void run() = 0;
+ };
+
+ class WorkQueue
+ {
+ public:
+ WorkQueue();
+ WorkQueue(int thread_count);
+
+ ~WorkQueue();
+
+ shared_ptr<WorkUnit> get();
+ void put(shared_ptr<WorkUnit> work_unit);
+
+ protected:
+ void spawnThread();
+
+ private:
+ class Worker
+ {
+ public:
+ Worker(WorkQueue* queue);
+ void operator()();
+ private:
+ WorkQueue *_queue;
+ };
+
+ int _thread_count;
+ int _min_threads;
+ int _max_threads;
+ mutex _queue_lock;
+ condition_variable _queue_non_empty;
+ queue<shared_ptr<WorkUnit> > _queue;
+ thread_group _threads;
+ volatile bool _running;
+ };
+
+ class TaskNotification
+ {
+ public:
+ TaskNotification();
+
+ void registerTask();
+ void completeTask(bool success = true);
+
+ void waitForComplete();
+
+ bool failCount();
+ private:
+ int _expected;
+ int _count;
+ int _fail_count;
+ mutex _lock;
+ condition_variable _cond;
+ };
+
+} // namespace bicker
+
+#endif