Add John MucCullough's simple key/value storage server.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Sun, 14 Feb 2010 22:56:22 +0000 (14:56 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Sun, 14 Feb 2010 22:56:22 +0000 (14:56 -0800)
32 files changed:
kvstore/NOTES [new file with mode: 0644]
kvstore/SConscript [new file with mode: 0644]
kvstore/SConstruct [new file with mode: 0644]
kvstore/backend.cc [new file with mode: 0644]
kvstore/backend.h [new file with mode: 0644]
kvstore/debian/README.Debian [new file with mode: 0644]
kvstore/debian/README.source [new file with mode: 0644]
kvstore/debian/changelog [new file with mode: 0644]
kvstore/debian/compat [new file with mode: 0644]
kvstore/debian/control [new file with mode: 0644]
kvstore/debian/copyright [new file with mode: 0644]
kvstore/debian/docs [new file with mode: 0644]
kvstore/debian/install [new file with mode: 0644]
kvstore/debian/rules [new file with mode: 0755]
kvstore/kvbench.cc [new file with mode: 0644]
kvstore/kvclient.cc [new file with mode: 0644]
kvstore/kvclient.h [new file with mode: 0644]
kvstore/kvservice.cc [new file with mode: 0644]
kvstore/kvservice.h [new file with mode: 0644]
kvstore/kvstore.cc [new file with mode: 0644]
kvstore/kvstore.proto [new file with mode: 0644]
kvstore/kvstore.spec [new file with mode: 0644]
kvstore/kvtest.cc [new file with mode: 0644]
kvstore/protobufrpc/SConscript [new file with mode: 0644]
kvstore/protobufrpc/SConstruct [new file with mode: 0644]
kvstore/protobufrpc/protobufrpc.cc [new file with mode: 0644]
kvstore/protobufrpc/protobufrpc.h [new file with mode: 0644]
kvstore/protobufrpc/socket_pool.cc [new file with mode: 0644]
kvstore/protobufrpc/socket_pool.h [new file with mode: 0644]
kvstore/protobufrpc/util.h [new file with mode: 0644]
kvstore/protobufrpc/workqueue.cc [new file with mode: 0644]
kvstore/protobufrpc/workqueue.h [new file with mode: 0644]

diff --git a/kvstore/NOTES b/kvstore/NOTES
new file mode 100644 (file)
index 0000000..16cc05a
--- /dev/null
@@ -0,0 +1,13 @@
+apt-get install mercurial build-essential libdb4.6-dev libasio-dev libboost1.35-dev libprotobuf-dev libgtest-dev protobuf-compiler scons
+
+rm -f /tmp/__db* /tmp/test.db;./kvstore -d /tmp & KVPID=$! ;sleep 1 ;time
+./kvbench --high 20 --servers 127.0.0.1:9090 ;kill $KVPID
+
+(0x42, 0x5A, 0x68) 
+
+ grep '1f 8b 08 00'
+
+x=/boot/vmlinuz-2.6.26-2-amd64 p=`echo -e '\x8b\x08\x00'` o=`grep -a -b
+-o -m 1 -e $p $x | cut -d: -f 1` dd if=$x skip=$o bs=1 | zcat > /tmp/vmlinux
+
+x=/boot/vmlinuz-2.6.26-2-amd64 dd if=$x skip=`grep -a -b -o -m 1 -e $'\xf1\x8b\x08\x00 $x | cut -d: -f 1` bs=1 | zcat > /tmp/vmlinux
diff --git a/kvstore/SConscript b/kvstore/SConscript
new file mode 100644 (file)
index 0000000..6414023
--- /dev/null
@@ -0,0 +1,65 @@
+Import('env')
+
+protobuf = Builder(
+    action = (
+        'protoc --proto_path=${SOURCE.dir} --cpp_out=${TARGET.dir} $SOURCE'
+             ),
+    single_source = True,
+    )
+
+env["BUILDERS"]["Protobuf"] = protobuf
+
+env.ParseConfig("echo -I./protobufrpc")
+env.ParseConfig("echo -I/opt/local/include -L/opt/local/lib")
+
+if not env.has_key('LIBS'):
+    env['LIBS'] = []
+
+env['LIBS'] += ['protobuf',
+                'pthread',
+                'boost_thread-mt',
+                'boost_regex-mt',
+                'boost_system-mt',
+                'boost_program_options-mt',
+                'db']
+
+protobufrpc = SConscript(dirs=['./protobufrpc/'], exports='env')
+
+kvstore_proto_files = env.Protobuf(
+        target = [ 'kvstore.pb.cc', 'kvstore.pb.h' ],
+        source = 'kvstore.proto'
+    )
+
+kvservice_files = ['kvservice.cc', 'backend.cc']
+kvclient_files = ['kvclient.cc']
+
+kvservice = env.StaticLibrary('kvservice', kvservice_files)
+kvclient = env.StaticLibrary('kvclient', kvclient_files)
+kvstore_proto = env.StaticLibrary('kvstore', kvstore_proto_files)
+
+env.Program('kvstore', ['kvstore.cc']
+                       + kvservice
+                       + kvstore_proto
+                       + protobufrpc)
+
+env.Program('kvtest',
+            ['kvtest.cc']
+                    + kvservice
+                    + kvclient
+                    + kvstore_proto
+                    + protobufrpc,
+            LIBS=env['LIBS']+['gtest'])
+
+env.Program('kvbench',
+            ['kvbench.cc']
+                    + kvclient
+                    + kvstore_proto
+                    + protobufrpc)
+
+products = {
+            'kvservice': kvservice,
+            'kvclient': kvclient,
+            'kvstore_proto': kvstore_proto,
+           }
+
+Return('products')
diff --git a/kvstore/SConstruct b/kvstore/SConstruct
new file mode 100644 (file)
index 0000000..a848c09
--- /dev/null
@@ -0,0 +1,39 @@
+import os 
+
+AddOption('--prefix',
+          dest='prefix',
+          type='string',
+          nargs=1,
+          action='store',
+          metavar='DIR',
+          help='installation prefix')
+
+env = Environment(CXXFLAGS='-O3 -fPIC -W -Wall -g',
+                  PREFIX=GetOption('prefix'),
+                  LINKFLAGS='',#-pg',
+                 )
+
+
+for envvar in ('HOME', 'DISTCC_DIR', 'DISTCC_HOSTS', 'CCACHE_DIR',
+               'INTERCEPTOR_SOCKET', 'ENFORGE_DIGEST_CACHE',
+               'ENFORGE_CACHE_HOST', 'ENFORGE_CACHE_PORT',
+               'PATH'):
+  if envvar in os.environ:
+    env['ENV'][envvar] = os.environ[envvar]
+
+#env['CXX'] = 'distcc g++'
+#env['CXX'] = 'icc'
+
+if env['PREFIX'] is not None:
+    bin_dest = env['PREFIX'] + '/usr/bin'
+else:
+    bin_dest = '/usr/bin'
+
+#env.ParseConfig("echo -lprofiler -ltcmalloc")
+
+SConscript(dirs=['./'], exports='env')
+
+if 'install' in COMMAND_LINE_TARGETS:
+    env.Install(bin_dest, 'kvstore')
+
+env.Alias('install', bin_dest)
diff --git a/kvstore/backend.cc b/kvstore/backend.cc
new file mode 100644 (file)
index 0000000..d97c492
--- /dev/null
@@ -0,0 +1,295 @@
+#include "backend.h"
+#include "util.h"
+
+#include <iostream>
+#include <boost/functional/hash.hpp>
+
+/* For bdb */
+extern "C" {
+#include <sys/types.h>
+#include <fcntl.h>
+#include <limits.h>
+}
+
+
+namespace kvstore
+{
+
+MemoryBackend::~MemoryBackend() 
+{
+}
+
+bool MemoryBackend::Put(const string &key,
+                        const string &value)
+{
+    _map[key] = value;
+    return true;
+}
+
+bool MemoryBackend::Get(const string &key,
+                        string *value)
+{
+    map_t::iterator it = _map.find(key);
+
+    if (it == _map.end())
+    {
+        return false;
+    }
+    else
+    {
+        *value = it->second;
+        return true;
+    }
+}
+
+void set_DBT(DBT *thing, const string &str)
+{
+    bzero(thing, sizeof(DBT));
+    thing->data = (void*)str.data();
+    thing->size = str.size();
+    thing->ulen = str.size();
+}
+
+class BDBBackend::BDBDatabase
+{
+public:
+    BDBDatabase(const string &path,
+               bool flush,
+               bool log_in_memory,
+               size_t num_dbs)
+        :_path(path)
+    {
+        int res = db_env_create(&_dbenv, 0);
+
+        if (res != 0)
+        {
+            cerr << db_strerror(res) << endl;
+            throw std::runtime_error("db_env_create fail");
+        }
+
+        /* Set Cache Size To Total Memory */
+        if (true)
+        {
+            double use_fraction = 0.1;
+            uint64_t pages = sysconf(_SC_PHYS_PAGES);
+            uint64_t page_size = sysconf(_SC_PAGE_SIZE);
+
+            uint64_t bytes = pages * page_size * use_fraction / num_dbs;
+
+            uint32_t gbytes = bytes / (1024uLL * 1024uLL * 1024uLL);
+            uint32_t nbytes = bytes % (1024uLL * 1024uLL * 1024uLL);
+            uint32_t ncache = bytes / (1024uLL * 1024uLL * 1024uLL * 4uLL) + 1;
+
+            res = _dbenv->set_cachesize(_dbenv, gbytes, nbytes, ncache);
+
+            if (res != 0)
+            {
+                cerr << db_strerror(res) << endl;
+                throw std::runtime_error("set_cachesize");
+            }
+        }
+
+        if (log_in_memory)
+        {
+            res = _dbenv->set_flags(_dbenv, DB_LOG_INMEMORY, 1);
+
+            if (res != 0)
+            {
+                cerr << db_strerror(res) << endl;
+                throw std::runtime_error("BDB ENV DB_LOG_INMEMORY");
+            }
+
+        }
+
+        res = _dbenv->set_flags(_dbenv, DB_LOG_AUTOREMOVE, 1);
+
+        if (res != 0)
+        {
+            cerr << db_strerror(res) << endl;
+            throw std::runtime_error("BDB ENV DB_LOG_AUTOREMOVE");
+        }
+
+        res = _dbenv->open(_dbenv,
+                           _path.c_str(),
+                           DB_INIT_CDB
+                           | DB_INIT_MPOOL
+                           | DB_CREATE
+                           | DB_THREAD,
+                           0644);
+
+
+        if (res != 0) 
+        {
+            cerr << db_strerror(res) << endl;
+            throw std::runtime_error("BDB ENV Open Fail");
+        }
+
+        string dbfilename = _path + "/test.db";
+
+        /* Flush */
+        if (flush)
+        {
+            res = _dbenv->dbremove(_dbenv, NULL, dbfilename.c_str(), "test", 0);
+
+            if (res != 0 && res != ENOENT)
+            {
+                cerr << db_strerror(res) << endl;
+                throw std::runtime_error("db remove failed");
+            }
+        }
+
+        res = db_create(&_db, _dbenv, 0);
+
+        if (res != 0)
+        {
+            cerr << db_strerror(res) << endl;
+            throw std::runtime_error("db_create fail");
+        }
+
+        uint32_t flags = DB_CREATE | DB_THREAD;
+
+        res = _db->open(_db,
+                       NULL, /* TXN */
+                       dbfilename.c_str(),
+                       "test",
+                       DB_BTREE,
+                       flags,
+                       0644);
+
+        if (res != 0)
+        {
+            cerr << db_strerror(res) << endl;
+            throw std::runtime_error("BDB Open Fail");
+        }
+
+
+    }
+
+    ~BDBDatabase()
+    {
+        int res;
+
+        if (_db)
+        {
+            if ((res = _db->close(_db, 0)) < 0)
+            {
+                cerr << db_strerror(res) << endl;
+            }
+        }
+
+        if (_dbenv)
+        {
+            if ((res = _dbenv->close(_dbenv, 0)) < 0)
+            {
+                cerr << db_strerror(res) << endl;
+            }
+        }
+    }
+
+    bool Put(const string &key,
+                     const string &value)
+    {
+        DBT bdb_key, bdb_value;
+
+        set_DBT(&bdb_key, key);
+        set_DBT(&bdb_value, value);
+
+        if (_db->put(_db, NULL, &bdb_key, &bdb_value, 0) != 0)
+        {
+            perror("bdb put");
+            return false;
+        }
+        else
+        {
+            return true;
+        }
+    }
+
+    bool Get(const string &key,
+             string *value)
+    {
+        DBT bdb_key, bdb_value;
+
+        set_DBT(&bdb_key, key);
+
+        bzero(&bdb_value, sizeof(DBT));
+        bdb_value.flags = DB_DBT_MALLOC;
+
+        int res = _db->get(_db, NULL, &bdb_key, &bdb_value, 0);
+
+        if (res == 0)
+        {
+            *value = string((char*)bdb_value.data, bdb_value.size);
+            free(bdb_value.data);
+            bdb_value.data = NULL;
+            return true;
+        }
+        else if (res == DB_NOTFOUND || res == DB_KEYEMPTY)
+        {
+            return false;
+        }
+        else
+        {
+            /* ERR */
+            cerr << db_strerror(res) << endl;
+            return false;
+        }
+    }
+
+private:
+    DB *_db;
+    DB_ENV *_dbenv;
+    const string _path;
+};
+
+BDBBackend::BDBBackend(const vector<string> &paths,
+                       bool flush,
+                       bool log_in_memory)
+{
+    if (paths.size() < 1)
+    {
+        cerr << "Insufficient BDB Paths supplied (need at least 1)";
+        throw std::runtime_error("not enough paths");
+    }
+
+
+    for (size_t i = 0; i < paths.size(); ++i)
+    {
+        cerr << "db for " << paths[i] << endl;
+        _dbs.push_back(shared_ptr<BDBDatabase>(new BDBDatabase(paths[i],
+                                                               flush,
+                                                               log_in_memory,
+                                                               paths.size())));
+    }
+}
+
+BDBBackend::~BDBBackend()
+{
+}
+
+
+static boost::hash<string> hasher;
+
+inline size_t BDBBackend::LookupKeyDB(const string &key)
+{
+    uint32_t hash = (uint32_t)hasher(key);
+    return hash % _dbs.size();
+}
+
+bool BDBBackend::Put(const string &key,
+                 const string &value)
+{
+    size_t i = LookupKeyDB(key);
+
+    return _dbs[i]->Put(key, value);
+}
+
+bool BDBBackend::Get(const string &key,
+                 string *value)
+{
+    size_t i = LookupKeyDB(key);
+    return _dbs[i]->Get(key, value);
+}
+
+
+} // namespace kvstore
diff --git a/kvstore/backend.h b/kvstore/backend.h
new file mode 100644 (file)
index 0000000..1026b0c
--- /dev/null
@@ -0,0 +1,76 @@
+#ifndef _BACKEND_H_
+#define _BACKEND_H_ 1
+
+extern "C"
+{
+#include <db.h>
+}
+
+#include <string>
+#include <map>
+
+#include <boost/shared_ptr.hpp>
+
+#include "util.h"
+
+using namespace std;
+using boost::shared_ptr;
+
+namespace kvstore
+{
+
+class Backend
+{
+public:
+    virtual ~Backend() {};
+
+    virtual bool Put(const string &key,
+                     const string &value) = 0;
+
+    virtual bool Get(const string &key,
+                     string *value) = 0;
+
+};
+
+class MemoryBackend : public Backend
+{
+public:
+    virtual ~MemoryBackend();
+
+    virtual bool Put(const string &key,
+                     const string &value);
+
+    virtual bool Get(const string &key,
+                     string *value);
+
+private:
+    boost::mutex _lock;
+    typedef map<string, string> map_t;
+    map_t _map;
+};
+
+class BDBBackend : public Backend
+{
+public:
+    BDBBackend(const vector<string> &paths,
+               bool flush=true,
+               bool log_in_memory = false);
+
+    virtual ~BDBBackend();
+    virtual bool Put(const string &key,
+                     const string &value);
+
+    virtual bool Get(const string &key,
+                     string *value);
+
+private:
+    class BDBDatabase;
+
+    inline size_t LookupKeyDB(const string &key);
+
+    vector< shared_ptr<BDBDatabase> > _dbs;
+};
+
+} // namespace kvstore
+
+#endif
diff --git a/kvstore/debian/README.Debian b/kvstore/debian/README.Debian
new file mode 100644 (file)
index 0000000..0cc3a58
--- /dev/null
@@ -0,0 +1,6 @@
+bicker-fcgi for Debian
+----------------------
+
+<possible notes regarding this package - if none, delete this file>
+
+ -- John McCullough <jcm@salud.ucsd.edu>  Tue, 29 Sep 2009 13:29:18 -0700
diff --git a/kvstore/debian/README.source b/kvstore/debian/README.source
new file mode 100644 (file)
index 0000000..a7e4671
--- /dev/null
@@ -0,0 +1,9 @@
+bicker-fcgi for Debian
+----------------------
+
+<this file describes information about the source package, see Debian policy
+manual section 4.14. You WILL either need to modify or delete this file>
+
+
+
+
diff --git a/kvstore/debian/changelog b/kvstore/debian/changelog
new file mode 100644 (file)
index 0000000..be2e2de
--- /dev/null
@@ -0,0 +1,37 @@
+kvstore (0.1-7) unstable; urgency=low
+
+  * Reduced memory copying in protobuf, support for multiple data
+    targets
+
+ -- John McCullough <jcm@salud.ucsd.edu>  Thu, 21 Jan 2010 11:10:45 -0800
+
+kvstore (0.1-6) unstable; urgency=low
+
+  * Improved RPC Layer
+
+ -- John McCullough <jcm@salud.ucsd.edu>  Thu, 24 Dec 2009 14:25:41 -0800
+
+kvstore (0.1-5) unstable; urgency=low
+
+  * KeyValue -> KVStore rename.
+  * Fixed Dependencies
+
+ -- John McCullough <jcm@salud.ucsd.edu>  Sun, 29 Nov 2009 16:17:23 -0800
+
+kvstore (0.1-3) unstable; urgency=low
+
+  * More Packaging Fun
+
+ -- John McCullough <jcm@salud.ucsd.edu>  Sun, 29 Nov 2009 15:34:41 -0800
+
+kvstore (0.1-2) unstable; urgency=low
+
+  * Incremented boost version to .0
+
+ -- John McCullough <jcm@salud.ucsd.edu>  Sun, 29 Nov 2009 15:32:01 -0800
+
+kvstore (0.1-1) unstable; urgency=low
+
+  * Initial release (Closes: #nnnn)  <nnnn is the bug number of your ITP>
+
+ -- John McCullough <jcm@salud.ucsd.edu>  Tue, 29 Sep 2009 13:29:18 -0700
diff --git a/kvstore/debian/compat b/kvstore/debian/compat
new file mode 100644 (file)
index 0000000..7f8f011
--- /dev/null
@@ -0,0 +1 @@
+7
diff --git a/kvstore/debian/control b/kvstore/debian/control
new file mode 100644 (file)
index 0000000..748f726
--- /dev/null
@@ -0,0 +1,13 @@
+Source: kvstore
+Section: unknown
+Priority: extra
+Maintainer: John McCullough <jcm@salud.ucsd.edu>
+Build-Depends: debhelper (>= 7), libboost-thread1.35-dev, libboost-system1.35-dev, libboost1.35-dev, libasio-dev, protobuf-compiler, libprotobuf-dev, libdb4.6-dev, libgtest-dev
+Standards-Version: 3.8.3,
+Homepage: <insert the upstream URL, if relevant>
+
+Package: kvstore
+Architecture: any
+Depends: ${misc:Depends}, libboost-thread1.35.0, libboost-regex1.35.0, libboost-program-options1.35.0, libboost-system1.35.0, libprotobuf4, libdb4.6, libgtest0
+Description: Homebrew key value store
+ <insert long description, indented with spaces>
diff --git a/kvstore/debian/copyright b/kvstore/debian/copyright
new file mode 100644 (file)
index 0000000..c356a24
--- /dev/null
@@ -0,0 +1,35 @@
+This work was packaged for Debian by:
+
+    John McCullough <jcm@salud.ucsd.edu> on Tue, 29 Sep 2009 13:29:18 -0700
+
+It was downloaded from <url://example.com>
+
+Upstream Author(s):
+
+    <put author's name and email here>
+    <likewise for another author>
+
+Copyright:
+
+    <Copyright (C) YYYY Name OfAuthor>
+    <likewise for another author>
+
+License:
+
+    <Put the license of the package here indented by 4 spaces>
+
+The Debian packaging is:
+
+    Copyright (C) 2009 John McCullough <jcm@salud.ucsd.edu>
+
+# Please chose a license for your packaging work. If the program you package
+# uses a mainstream license, using the same license is the safest choice.
+# Please avoid to pick license terms that are more restrictive than the
+# packaged work, as it may make Debian's contributions unacceptable upstream.
+# If you just want it to be GPL version 3, leave the following lines in.
+
+and is licensed under the GPL version 3, 
+see `/usr/share/common-licenses/GPL-3'.
+
+# Please also look if there are files or directories which have a
+# different copyright/license attached and list them here.
diff --git a/kvstore/debian/docs b/kvstore/debian/docs
new file mode 100644 (file)
index 0000000..e845566
--- /dev/null
@@ -0,0 +1 @@
+README
diff --git a/kvstore/debian/install b/kvstore/debian/install
new file mode 100644 (file)
index 0000000..c110135
--- /dev/null
@@ -0,0 +1 @@
+usr/bin/kvstore
diff --git a/kvstore/debian/rules b/kvstore/debian/rules
new file mode 100755 (executable)
index 0000000..a47c8e4
--- /dev/null
@@ -0,0 +1,43 @@
+#!/usr/bin/make -f
+# -*- makefile -*-
+# Sample debian/rules that uses debhelper.
+# This file was originally written by Joey Hess and Craig Small.
+# As a special exception, when this file is copied by dh-make into a
+# dh-make output file, you may use that output file without restriction.
+# This special exception was added by Craig Small in version 0.37 of dh-make.
+
+# Uncomment this to turn on verbose mode.
+#export DH_VERBOSE=1
+build: 
+       dh_testdir
+       scons
+
+install: build
+       dh_testdir
+       dh_testroot
+       dh_prep
+       dh_installdirs
+       scons --prefix=$(CURDIR)/debian/tmp install
+
+clean:
+       scons -c
+       dh_clean
+
+binary-arch: install
+       dh_testdir
+       dh_testroot
+       dh_installchangelogs
+       dh_install
+       dh_link
+       #dh_strip
+       dh_compress
+       dh_fixperms
+       dh_installdeb
+       dh_gencontrol
+       dh_md5sums
+       dh_builddeb
+
+binary: binary-arch
+
+%:
+       dh  $@
diff --git a/kvstore/kvbench.cc b/kvstore/kvbench.cc
new file mode 100644 (file)
index 0000000..da25085
--- /dev/null
@@ -0,0 +1,111 @@
+#include <boost/program_options.hpp>
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <string>
+#include "kvservice.h"
+#include "kvclient.h"
+#include "protobufrpc.h"
+
+using namespace bicker;
+using namespace boost;
+using namespace kvstore;
+using namespace std;
+
+namespace po = boost::program_options;
+
+class KVBench
+{
+public:
+    KVBench(const vector<string> &hosts)
+        :_kv_client(list<string>(hosts.begin(), hosts.end()))
+    {
+    }
+
+    virtual ~KVBench()
+    {
+    }
+
+
+    void Bench(const size_t size,
+               const size_t count)
+    {
+        string data(size, 'A');
+
+        for (size_t i = 0; i < count; ++i)
+        {
+            ostringstream key;
+            key << "key_" << size << "_" << i << endl;
+
+            _kv_client.Put(key.str(), data);
+        }
+
+        for (size_t i = 0; i < count; ++i)
+        {
+            string value;
+            ostringstream key;
+            key << "key_" << size << "_" << i << endl;
+
+            _kv_client.Get(key.str(), &value);
+        }
+    }
+
+protected:
+    KeyValueClient _kv_client;
+};
+
+
+int
+main(
+     int argc,
+     char **argv
+    )
+{
+    size_t opt_low;
+    size_t opt_high;
+    size_t opt_count;
+
+    po::options_description options("Options");
+
+    options.add_options()
+        ("help,h", "display help message")
+        ("servers",
+         po::value< vector<string> >()->multitoken(),
+         "server:port ... server:port")
+        ("low,l",
+         po::value<size_t>(&opt_low)->default_value(1),
+         "low 2^i")
+        ("high,H",
+         po::value<size_t>(&opt_high)->default_value(16),
+         "high 2^i")
+        ("count,c",
+         po::value<size_t>(&opt_count)->default_value(100),
+         "count of each size")
+        ;
+
+    po::variables_map vm;
+    po::store(po::parse_command_line(argc, argv, options), vm);
+    po::notify(vm);
+
+    if (vm.count("help"))
+    {
+        cerr << options << endl;
+        return 1;
+    }
+
+    if (!vm.count("servers"))
+    {
+        cerr << "No Servers Specified" << endl;
+        return 1;
+    }
+
+
+    KVBench bench(vm["servers"].as< vector<string> >());
+
+    for (size_t i = opt_low; i <= opt_high; ++i)
+    {
+        cout << i << ": " << (1<<i) << endl;
+        bench.Bench(1<<i, opt_count);
+    }
+
+    return 0;
+}
diff --git a/kvstore/kvclient.cc b/kvstore/kvclient.cc
new file mode 100644 (file)
index 0000000..9545770
--- /dev/null
@@ -0,0 +1,166 @@
+#include "kvclient.h"
+#include <boost/tuple/tuple.hpp>
+#include <boost/functional/hash.hpp>
+
+using namespace boost;
+using namespace kvrpc;
+
+namespace kvstore
+{
+
+class KeyValueClientRouter
+{
+public:
+    KeyValueClientRouter()
+    {
+    }
+
+    void AddHost(const string &host, const string &port)
+    {
+        mutex::scoped_lock lock(_lock);
+
+        _channels.push_back(shared_ptr<ProtoBufRpcChannel>(new ProtoBufRpcChannel(host, port)));
+        _clients.push_back(shared_ptr<kvrpc::KeyValueService_Stub>(new KeyValueService_Stub(static_cast<RpcChannel*>(_channels.back().get()))));
+    }
+
+    kvrpc::KeyValueService_Stub* Route(const string &key)
+    {
+        static hash<string> hasher;
+
+        uint32_t hash = (uint32_t)hasher(key);
+        int id = hash % _clients.size();
+
+        return _clients[id].get();
+    }
+
+private:
+    mutex _lock;
+    vector< shared_ptr<ProtoBufRpcChannel> > _channels;
+    vector< shared_ptr<kvrpc::KeyValueService_Stub> > _clients;
+};
+
+KeyValueClient::KeyValueClient(const string& host,
+                               const string& port)
+:_router(new KeyValueClientRouter())
+{
+    _router->AddHost(host, port);
+}
+
+KeyValueClient::KeyValueClient(const list<string> &hosts)
+:_router(new KeyValueClientRouter())
+{
+    for (list<string>::const_iterator i = hosts.begin();
+         i != hosts.end();
+         ++i)
+    {
+        size_t pos = i->find(':');
+        if (pos == string::npos)
+            throw runtime_error("couldn't parse host");
+
+        string host = i->substr(0, pos);
+        string port = i->substr(pos+1);
+
+        _router->AddHost(host, port);
+    }
+}
+
+typedef tuple<shared_ptr<RpcController>,
+              shared_ptr<Put>,
+              shared_ptr<PutReply> > rpc_put_state_tuple_t;
+
+void cleanupPut(rpc_put_state_tuple_t t, TaskNotification *tn)
+{
+    tn->completeTask(t.get<2>()->result() == kvrpc::SUCCESS);
+}
+
+bool
+KeyValueClient::Put(const string& key,
+         const string& value)
+{
+    TaskNotification tn;
+
+    this->Put(key, value, tn);
+
+    tn.waitForComplete();
+
+    return tn.failCount() == 0;
+}
+
+bool
+KeyValueClient::Put(const string& key,
+                    const string& value,
+                    TaskNotification &tn)
+{
+    tn.registerTask();
+
+    shared_ptr<RpcController> ctrl(new ProtoBufRpcController());
+    shared_ptr< ::kvrpc::Put> put(new ::kvrpc::Put());
+
+    put->set_key(key);
+    put->set_value(value);
+
+    shared_ptr<PutReply> reply(new PutReply());
+
+
+    _router->Route(key)->PutValue(ctrl.get(), put.get(), reply.get(),
+                             NewCallback(&cleanupPut,
+                                         rpc_put_state_tuple_t(ctrl,put,reply),
+                                         &tn));
+
+    return true;
+}
+
+typedef tuple<shared_ptr<RpcController>,
+              shared_ptr<Get>,
+              shared_ptr<GetReply>,
+              string*> rpc_get_state_tuple_t;
+
+void cleanupGet(rpc_get_state_tuple_t t, TaskNotification *tn)
+{
+    bool result = t.get<2>()->result() == kvrpc::SUCCESS;
+
+    if (result)
+    {
+        string* result_ptr = t.get<3>();
+        *result_ptr = t.get<2>()->value();
+    }
+
+    tn->completeTask(result);
+}
+
+bool
+KeyValueClient::Get(const string& key, string* value)
+{
+    TaskNotification tn;
+
+    this->Get(key, value, tn);
+
+    tn.waitForComplete();
+
+    return tn.failCount() == 0;
+}
+
+bool
+KeyValueClient::Get(const string& key, string* value, TaskNotification &tn)
+{
+    tn.registerTask();
+
+    shared_ptr<RpcController> ctrl(new ProtoBufRpcController());
+    shared_ptr< ::kvrpc::Get> get(new ::kvrpc::Get());
+
+    get->set_key(key);
+
+    shared_ptr<GetReply> reply(new GetReply());
+
+    _router->Route(key)->GetValue(ctrl.get(), get.get(), reply.get(),
+                             NewCallback(&cleanupGet,
+                                         rpc_get_state_tuple_t(ctrl,
+                                                               get,
+                                                               reply,
+                                                               value),
+                                         &tn));
+
+    return true;
+}
+
+}
diff --git a/kvstore/kvclient.h b/kvstore/kvclient.h
new file mode 100644 (file)
index 0000000..dfe86d2
--- /dev/null
@@ -0,0 +1,40 @@
+#ifndef _KVCLIENT_H_
+#define _KVCLIENT_H_ 1
+
+#include "kvstore.pb.h"
+#include "workqueue.h"
+#include "protobufrpc.h"
+
+using namespace bicker;
+
+namespace kvstore
+{
+    class KeyValueClientRouter;
+
+    class KeyValueClient
+    {
+    public:
+        KeyValueClient(const string& host,
+                       const string& port);
+
+        KeyValueClient(const list<string> &hosts);
+
+        bool Put(const string& key,
+                 const string& value);
+
+        bool Put(const string& key,
+                 const string& value,
+                 TaskNotification &tn);
+
+        bool Get(const string& key, string* value);
+
+        bool Get(const string& key,
+                 string* value,
+                 TaskNotification &tn);
+
+    private:
+        shared_ptr<KeyValueClientRouter> _router;
+    };
+} // namespace kvstore
+
+#endif
diff --git a/kvstore/kvservice.cc b/kvstore/kvservice.cc
new file mode 100644 (file)
index 0000000..e241bc9
--- /dev/null
@@ -0,0 +1,55 @@
+#include "kvservice.h"
+#include <iostream>
+
+using namespace std;
+
+namespace kvstore
+{
+
+KeyValueRpcService::KeyValueRpcService(Backend *backend)
+    :_backend(backend)
+{
+}
+
+KeyValueRpcService::~KeyValueRpcService()
+{
+}
+
+void KeyValueRpcService::PutValue(
+                      ::google::protobuf::RpcController* /*controller*/,
+                      const ::kvrpc::Put* request,
+                      ::kvrpc::PutReply* response,
+                      ::google::protobuf::Closure* done)
+{
+    if (_backend->Put(request->key(), request->value()))
+    {
+        response->set_result(kvrpc::SUCCESS);
+    }
+    else
+    {
+        response->set_result(kvrpc::FAILURE);
+    }
+
+    done->Run();
+}
+
+void KeyValueRpcService::GetValue(
+                      ::google::protobuf::RpcController* /*controller*/,
+                      const ::kvrpc::Get* request,
+                      ::kvrpc::GetReply* response,
+                      ::google::protobuf::Closure* done)
+{
+    string value;
+    if (_backend->Get(request->key(), &value))
+    {
+        response->set_result(kvrpc::SUCCESS);
+        response->set_value(value);
+    }
+    else
+    {
+        response->set_result(kvrpc::FAILURE);
+    }
+    done->Run();
+}
+
+}; // namespace kvstore
diff --git a/kvstore/kvservice.h b/kvstore/kvservice.h
new file mode 100644 (file)
index 0000000..68008ed
--- /dev/null
@@ -0,0 +1,34 @@
+#ifndef _KVSERVICE_H_
+#define _KVSERVICE_H_ 1
+
+#include "kvstore.pb.h"
+#include "backend.h"
+
+#include <memory>
+
+using std::auto_ptr;
+
+namespace kvstore
+{
+    class KeyValueRpcService : public ::kvrpc::KeyValueService
+    {
+    public:
+        KeyValueRpcService(Backend *backend);
+
+        virtual ~KeyValueRpcService();
+
+        virtual void PutValue(::google::protobuf::RpcController* controller,
+                              const ::kvrpc::Put* request,
+                              ::kvrpc::PutReply* response,
+                              ::google::protobuf::Closure* done);
+        virtual void GetValue(::google::protobuf::RpcController* controller,
+                              const ::kvrpc::Get* request,
+                              ::kvrpc::GetReply* response,
+                              ::google::protobuf::Closure* done);
+    private:
+        auto_ptr<Backend> _backend;
+
+    };
+} // namespace kvstore
+
+#endif
diff --git a/kvstore/kvstore.cc b/kvstore/kvstore.cc
new file mode 100644 (file)
index 0000000..8d8fd12
--- /dev/null
@@ -0,0 +1,137 @@
+#include <boost/program_options.hpp>
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <string>
+#include <ctime>
+#include <csignal>
+#include "kvservice.h"
+#include "protobufrpc.h"
+
+using namespace bicker;
+using namespace boost;
+using namespace kvstore;
+using namespace std;
+
+namespace po = boost::program_options;
+
+int interrupted_count = 0;
+ProtoBufRpcServer rpc_server;
+
+void
+interrupted(int /*signal*/)
+{
+    if (interrupted_count < 1)
+    {
+        rpc_server.shutdown();
+    }
+    else
+    {
+        exit(1);
+    }
+    ++interrupted_count;
+}
+
+int
+main(
+     int argc,
+     char **argv
+    )
+{
+    shared_ptr<thread> server_thread;
+
+    /* Initialize Exception Handling */
+    struct sigaction action;
+
+    action.sa_handler = &interrupted;
+    action.sa_flags = SA_SIGINFO;
+    sigemptyset(&action.sa_mask);
+
+    if (sigaction(SIGINT, &action, NULL) == -1)
+    {
+        perror("sigaction");
+        exit(1);
+    }
+
+    /* Parse Options */
+    string opt_bdb_home;
+    uint16_t opt_port;
+    uint16_t opt_sleep;
+
+    po::options_description options("Options");
+
+    options.add_options()
+        ("help,h", "display help message")
+        ("bdb-home,d",
+         po::value< vector<string> >(),
+         "bdb home directories")
+        ("port",
+         po::value<uint16_t>(&opt_port)->default_value(9090),
+         "listen port")
+        ("foreground,f", "don't fork to the background")
+        ("sleep,s", po::value<uint16_t>(&opt_sleep)->default_value(0), "sleep then exit")
+        ;
+
+    po::variables_map vm;
+    po::store(po::parse_command_line(argc, argv, options), vm);
+    po::notify(vm);
+
+    if (vm.count("help"))
+    {
+        cerr << options << endl;
+        return 1;
+    }
+
+
+    vector<string> paths;
+
+    if (vm.count("bdb-home") > 0)
+    {
+        paths = vm["bdb-home"].as< vector<string> >();
+    }
+    else
+    {
+        paths.push_back("./");
+    }
+
+
+    shared_ptr<Service> service(new KeyValueRpcService(new BDBBackend(paths)));
+
+    rpc_server.registerService(opt_port, service);
+
+    server_thread.reset(
+                         new thread(
+                                    boost::bind(
+                                                &ProtoBufRpcServer::run, 
+                                                &rpc_server)));
+
+    if (!vm.count("foreground"))
+    {
+        // Daemonize
+        if (daemon(0,0) != 0)
+        {
+            perror("daemonizing");
+            exit(1);
+        }
+    }
+
+    if (opt_sleep == 0)
+    {
+        server_thread->join();
+    }
+    else
+    {
+        time_t t0 = time(NULL);
+        time_t t1;
+
+        do
+        {
+            t1 = time(NULL);
+            sleep(opt_sleep - (t1 - t0));
+        } while (t1 - t0 < opt_sleep);
+    }
+    rpc_server.shutdown();
+
+    return 0;
+}
+
+
diff --git a/kvstore/kvstore.proto b/kvstore/kvstore.proto
new file mode 100644 (file)
index 0000000..1479cfd
--- /dev/null
@@ -0,0 +1,35 @@
+package kvrpc;
+
+enum Result
+{
+    SUCCESS = 0;
+    FAILURE = 1;
+}
+
+message Put
+{
+    required string key = 1;
+    required string value = 2;
+}
+
+message PutReply
+{
+    required Result result = 1;
+}
+
+message Get
+{
+    required string key = 1;
+}
+
+message GetReply
+{
+    required Result result = 1;
+    optional string value = 2;
+}
+
+service KeyValueService
+{
+    rpc PutValue(Put) returns(PutReply);
+    rpc GetValue(Get) returns(GetReply);
+}
diff --git a/kvstore/kvstore.spec b/kvstore/kvstore.spec
new file mode 100644 (file)
index 0000000..18bae94
--- /dev/null
@@ -0,0 +1,33 @@
+Name: kvstore
+Version: 0.1
+Release: 1
+Summary: A Simple kv store application
+License: BSD
+Group:System Environment/Base
+Source0: http://salud.ucsd.edu/~jcm/scaling/kvstore-0.1.tar.gz
+BuildRoot: %{_tmppath}/%{name}%{version}%{release}root%(%{__id_u} jcm)
+BuildArch: x86_64
+Requires: boost protobuf db4
+BuildRequires: boost-devel gtest-devel protobuf-devel db4-devel
+
+%description
+
+Key Value Store with simple backend and google protocol buffer rpc.
+
+%prep
+%setup -q
+
+%build
+scons %{?_smp_mflags}
+
+%install
+rm -rf %{buildroot}
+mkdir -p %{buildroot}%{_bindir}
+scons --prefix=%{buildroot} install
+
+%clean
+scons -c
+
+%files
+%defattr(-,root,root,-)
+%{_bindir}/kvstore
diff --git a/kvstore/kvtest.cc b/kvstore/kvtest.cc
new file mode 100644 (file)
index 0000000..5ac60ea
--- /dev/null
@@ -0,0 +1,104 @@
+#include <boost/shared_ptr.hpp>
+#include <gtest/gtest.h>
+#include "workqueue.h"
+#include "kvservice.h"
+#include "kvclient.h"
+#include "protobufrpc.h"
+
+using namespace boost;
+using namespace kvstore;
+using namespace bicker;
+using namespace kvrpc;
+
+/* http://code.google.com/p/googletest/wiki/GoogleTestPrimer */
+
+class KVTest : public ::testing::Test
+{
+public:
+    KVTest()
+        :_kv_client("127.0.0.1", "9090")
+    {
+        vector<string> data_dirs;
+        data_dirs.push_back("/tmp");
+
+        shared_ptr<Service> service(new KeyValueRpcService(new BDBBackend(data_dirs)));
+        _rpc_server.registerService(9090, service);
+
+        _server_thread.reset(
+                        new thread(
+                                boost::bind(
+                                        &ProtoBufRpcServer::run, 
+                                        &_rpc_server)));
+
+    }
+
+    virtual ~KVTest()
+    {
+        _rpc_server.shutdown();
+        _server_thread->join();
+    }
+protected:
+
+    ProtoBufRpcServer _rpc_server;
+    KeyValueClient _kv_client;
+    shared_ptr<thread> _server_thread;
+};
+
+
+TEST_F(KVTest, PutTest)
+{
+    ASSERT_TRUE(_kv_client.Put("test", "value"));
+    string value;
+    ASSERT_TRUE(_kv_client.Get("test", &value));
+    ASSERT_EQ(value, "value");
+}
+
+TEST_F(KVTest, MedPutTest)
+{
+    string test_value(1024*10, '6');
+    ASSERT_TRUE(_kv_client.Put("test", test_value));
+    string value;
+    ASSERT_TRUE(_kv_client.Get("test", &value));
+    ASSERT_TRUE(value == test_value);
+}
+
+TEST_F(KVTest, BigPutTest)
+{
+    string test_value(1024*1024*10, '6');
+    ASSERT_TRUE(_kv_client.Put("test", test_value));
+    string value;
+    ASSERT_TRUE(_kv_client.Get("test", &value));
+    ASSERT_EQ(value.size(), test_value.size());
+    ASSERT_TRUE(value == test_value);
+}
+
+TEST_F(KVTest, LoopPutTest)
+{
+    string test_value(1024*1024, '6');
+
+    for (int i = 0; i < 10; ++i)
+    {
+        ostringstream key;
+        key << "test" << i;
+        ASSERT_TRUE(_kv_client.Put(key.str(), test_value));
+        string value;
+        ASSERT_TRUE(_kv_client.Get(key.str(), &value));
+        ASSERT_TRUE(value == test_value);
+    }
+}
+
+TEST_F(KVTest, EmptyTest)
+{
+    string value;
+    ASSERT_FALSE(_kv_client.Get("test", &value));
+}
+
+int
+main(
+     int argc,
+     char **argv
+    )
+{
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/kvstore/protobufrpc/SConscript b/kvstore/protobufrpc/SConscript
new file mode 100644 (file)
index 0000000..b972f84
--- /dev/null
@@ -0,0 +1,7 @@
+Import('env')
+
+base_files = ['protobufrpc.cc', 'socket_pool.cc', 'workqueue.cc']
+
+protobufrpc = env.StaticLibrary('protobufrpc', base_files)
+
+Return('protobufrpc')
diff --git a/kvstore/protobufrpc/SConstruct b/kvstore/protobufrpc/SConstruct
new file mode 100644 (file)
index 0000000..79fbb68
--- /dev/null
@@ -0,0 +1,29 @@
+import os
+
+AddOption('--prefix',
+          dest='prefix',
+          type='string',
+          nargs=1,
+          action='store',
+          metavar='DIR',
+          help='installation prefix')
+
+env = Environment(CXXFLAGS='-g -fPIC', PREFIX=GetOption('prefix'))
+
+for envvar in ('HOME', 'DISTCC_DIR', 'DISTCC_HOSTS', 'CCACHE_DIR',
+               'INTERCEPTOR_SOCKET', 'ENFORGE_DIGEST_CACHE',
+               'ENFORGE_CACHE_HOST', 'ENFORGE_CACHE_PORT'):
+  if envvar in os.environ:
+    env['ENV'][envvar] = os.environ[envvar]
+
+if env['PREFIX'] is not None:
+    bin_dest = env['PREFIX'] + '/usr/bin'
+else:
+    bin_dest = '/usr/bin'
+
+base_files = ['protobufrpc.cc', 'socket_pool.cc']
+
+if not env.has_key('LIBS'):
+    env['LIBS'] = []
+
+SConscript(['SConscript'])
diff --git a/kvstore/protobufrpc/protobufrpc.cc b/kvstore/protobufrpc/protobufrpc.cc
new file mode 100644 (file)
index 0000000..06226f7
--- /dev/null
@@ -0,0 +1,628 @@
+#include "protobufrpc.h"
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/zero_copy_stream_impl.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include <boost/functional/hash.hpp>
+
+using namespace std;
+
+using namespace boost;
+using asio::buffer;
+
+namespace bicker
+{
+
+template<typename T>
+static void* void_write(void* data, T val)
+{
+    *((T*)data) = val;
+    return (char*)data + sizeof(T);
+}
+
+class ProtoBufRpcServiceRequest
+{
+public:
+    ProtoBufRpcServiceRequest(
+                            RpcController *ctrl,
+                            const MethodDescriptor* method,
+                            Message *request,
+                            Message *response,
+                            shared_ptr<ProtoBufRpcConnection> conn
+                           )
+        :_ctrl(ctrl),
+         _method(method),
+         _request(request),
+         _response(response),
+         _conn(conn)
+    {
+    }
+
+    ~ProtoBufRpcServiceRequest()
+    {
+
+    }
+
+    static void run(ProtoBufRpcServiceRequest *req)
+    {
+
+        req->_conn->writeResponse(req->_response.get());
+
+        delete req;
+    }
+
+    shared_ptr<RpcController> _ctrl;
+    const MethodDescriptor *_method;
+    shared_ptr<Message> _request;
+    shared_ptr<Message> _response;
+    shared_ptr<ProtoBufRpcConnection> _conn;
+};
+
+ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service,
+                             Service *service)
+:_socket(io_service),
+ _strand(io_service),
+ _service(service),
+ _state(STATE_NONE)
+{
+}
+
+tcp::socket& ProtoBufRpcConnection::socket()
+{
+    return _socket;
+}
+
+void ProtoBufRpcConnection::start()
+{
+    _socket.async_read_some(_buffer.prepare(4096),
+            _strand.wrap(
+                boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(),
+                            asio::placeholders::error,
+                            asio::placeholders::bytes_transferred)));
+}
+
+void ProtoBufRpcConnection::writeResponse(Message *msg)
+{
+    int rlen = msg->ByteSize();
+    int len = htonl(rlen);
+    int mlen = sizeof(len) + rlen;
+
+    void * data = asio::buffer_cast<void*>(_buffer.prepare(mlen));
+
+    data = void_write(data, len);
+
+    using google::protobuf::io::ArrayOutputStream;
+
+    ArrayOutputStream as(data, rlen);
+
+    msg->SerializeToZeroCopyStream(&as);
+
+    _buffer.commit(mlen);
+
+    asio::async_write(_socket,
+            _buffer.data(),
+            _strand.wrap(
+                boost::bind(&ProtoBufRpcConnection::handle_write, 
+                            shared_from_this(),
+                asio::placeholders::error,
+                asio::placeholders::bytes_transferred)));
+}
+
+
+void ProtoBufRpcConnection::handle_read(const error_code& e, 
+                 std::size_t bytes_transferred)
+{
+    if (!e)
+    {
+        _buffer.commit(bytes_transferred);
+
+        if (_state == STATE_NONE)
+        {
+            if (_buffer.size() >= sizeof(_id) + sizeof(_len))
+            {
+                string b(
+                     buffers_begin(_buffer.data()),
+                     buffers_begin(_buffer.data())
+                                                + sizeof(_id) + sizeof(_len)
+                        );
+
+                _buffer.consume(sizeof(_id) + sizeof(_len));
+
+                _id = *((int*)b.c_str());
+                _id = ntohl(_id);
+
+                _len = *((unsigned int*)(b.c_str() + sizeof(_id)));
+                _len = ntohl(_len);
+
+                _state = STATE_HAVE_ID_AND_LEN;
+            }
+            else
+            {
+                start();
+            }
+        }
+
+        if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA)
+        {
+            if (_buffer.size() >= _len)
+            {
+                const MethodDescriptor* method =
+                    _service->GetDescriptor()->method(_id);
+
+                Message *req = _service->GetRequestPrototype(method).New();
+                Message *resp = _service->GetResponsePrototype(method).New();
+
+                using google::protobuf::io::ArrayInputStream;
+                using google::protobuf::io::CodedInputStream;
+
+                const void* data = asio::buffer_cast<const void*>(
+                                                        _buffer.data()
+                                                                 );
+                ArrayInputStream as(data, _len);
+                CodedInputStream is(&as);
+                is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+
+                if (!req->ParseFromCodedStream(&is))
+                {
+                    throw std::runtime_error("ParseFromCodedStream");
+                }
+
+                _buffer.consume(_len);
+
+                ProtoBufRpcController *ctrl = new ProtoBufRpcController();
+                _service->CallMethod(method, 
+                                     ctrl,
+                                     req, 
+                                     resp, 
+                                     NewCallback(
+                                             &ProtoBufRpcServiceRequest::run,
+                                             new ProtoBufRpcServiceRequest(
+                                                           ctrl,
+                                                           method,
+                                                           req,
+                                                           resp,
+                                                           shared_from_this())
+                                                )
+                                     );
+                _state = STATE_NONE;
+            }
+            else
+            {
+                _state = STATE_WAITING_FOR_DATA;
+                start();
+            }
+        }
+
+    }
+    else
+    {
+        error_code ignored_ec;
+        _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+    }
+}
+
+void ProtoBufRpcConnection::handle_write(const error_code& e,
+                                         std::size_t bytes_transferred)
+{
+    if (e)
+    {
+        error_code ignored_ec;
+        _socket.shutdown(tcp::socket::shutdown_both, ignored_ec);
+    }
+    else
+    {
+        _buffer.consume(bytes_transferred);
+
+        if (_buffer.size())
+        {
+            asio::async_write(_socket,
+                    _buffer.data(),
+                    _strand.wrap(
+                        boost::bind(&ProtoBufRpcConnection::handle_write, 
+                                    shared_from_this(),
+                        asio::placeholders::error,
+                        asio::placeholders::bytes_transferred)));
+            return;
+        }
+
+        _state = STATE_NONE;
+        start();
+    }
+}
+
+ProtoBufRpcServer::ProtoBufRpcServer()
+    :_io_service(new asio::io_service())
+{
+}
+
+bool ProtoBufRpcServer::registerService(uint16_t port,
+                                shared_ptr<Service> service)
+{
+    // This is not thread safe
+
+    // The RegisteredService Constructor fires up the appropriate
+    // async accepts for the service
+    _services.push_back(shared_ptr<RegisteredService>(
+                                        new RegisteredService(
+                                                    _io_service,
+                                                    port,
+                                                    service)));
+
+    return true;
+}
+
+void run_wrapper(asio::io_service *io_service)
+{
+    struct itimerval itimer; 
+    setitimer(ITIMER_PROF, &itimer, NULL);
+
+    io_service->run();
+}
+
+void ProtoBufRpcServer::run()
+{
+    try
+    {
+        if (_services.size() == 0)
+        {
+            throw std::runtime_error("No services registered for ProtoBufRpcServer");
+        }
+
+        size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN);
+
+        vector<shared_ptr<thread> > threads;
+        for (size_t i = 0; i < nprocs; ++i)
+        {
+            shared_ptr<thread> t(new thread(
+                                    boost::bind(
+                                        //&run_wrapper,
+                                        &asio::io_service::run, 
+                                        _io_service.get())));
+            threads.push_back(t);
+        }
+
+        for (size_t i = 0; i < threads.size(); ++i)
+        {
+            threads[i]->join();
+        }
+    }
+    catch (std::exception &e)
+    {
+        std::cerr << "ProtoBufRpcService" << e.what() << std::endl;
+    }
+}
+
+void ProtoBufRpcServer::shutdown()
+{
+    _io_service->stop();
+}
+
+ProtoBufRpcServer::RegisteredService::RegisteredService(
+                  shared_ptr<asio::io_service> io_service,
+                  uint16_t port,
+                  shared_ptr<Service> service
+                 )
+:_io_service(io_service),
+ _port(port),
+ _service(service),
+ _endpoint(tcp::v4(), _port),
+ _acceptor(*_io_service),
+ _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get()))
+{
+    _acceptor.open(_endpoint.protocol());
+    _acceptor.set_option(tcp::acceptor::reuse_address(true));
+    _acceptor.bind(_endpoint);
+    _acceptor.listen();
+    _acceptor.async_accept(_new_connection->socket(),
+                   boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept, 
+                               this, 
+                               asio::placeholders::error));
+}
+
+void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e)
+{
+      if (!e)
+      {
+          _new_connection->start();
+          _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get()));
+          _acceptor.async_accept(_new_connection->socket(),
+                 boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept,
+                             this,
+                             asio::placeholders::error));
+      }
+
+}
+
+ProtoBufRpcController::ProtoBufRpcController()
+{
+}
+
+ProtoBufRpcController::~ProtoBufRpcController()
+{
+}
+
+void ProtoBufRpcController::Reset()
+{
+}
+
+bool ProtoBufRpcController::Failed() const
+{
+    return false;
+}
+
+string ProtoBufRpcController::ErrorText() const
+{
+    return "No Error";
+}
+
+void ProtoBufRpcController::StartCancel()
+{
+}
+
+void ProtoBufRpcController::SetFailed(const string &/*reason*/)
+{
+}
+
+bool ProtoBufRpcController::IsCanceled() const
+{
+    return false;
+}
+
+void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/)
+{
+}
+
+class ProtoBufRpcChannel::MethodHandler 
+    : public enable_shared_from_this<MethodHandler>,
+      private boost::noncopyable
+{
+public:
+    MethodHandler(auto_ptr<SocketCheckout> socket,
+                           const MethodDescriptor * method,
+                           RpcController * controller,
+                           const Message * request,
+                           Message * response,
+                           Closure * done
+                          )
+        :_socket(socket),
+        _method(method),
+        _controller(controller),
+        _request(request),
+        _response(response),
+        _done(done)
+    {
+    }
+
+    ~MethodHandler()
+    {
+        _socket.reset();
+        _done->Run();
+    }
+
+    static void execute(shared_ptr<MethodHandler> this_ptr)
+    {
+        int index = htonl(this_ptr->_method->index());
+        int rlen = this_ptr->_request->ByteSize();
+        int len = htonl(rlen);
+
+        int mlen = sizeof(index) + sizeof(len) + rlen;
+
+        void * data = asio::buffer_cast<void*>(this_ptr->_buffer.prepare(mlen));
+
+        data = void_write(data, index);
+        data = void_write(data, len);
+
+        using google::protobuf::io::ArrayOutputStream;
+
+        ArrayOutputStream as(data, rlen);
+
+        this_ptr->_request->SerializeToZeroCopyStream(&as);
+        this_ptr->_buffer.commit(mlen);
+
+        (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+                               boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+                                           this_ptr,
+                                           asio::placeholders::error,
+                                           asio::placeholders::bytes_transferred));
+    }
+
+    static void handle_write(shared_ptr<MethodHandler> this_ptr,
+                      const error_code& e, 
+                      std::size_t bytes_transferred)
+    {
+        if (!e)
+        {
+            this_ptr->_buffer.consume(bytes_transferred);
+
+            if (this_ptr->_buffer.size())
+            {
+                (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(),
+                                       boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write,
+                                                   this_ptr,
+                                                   asio::placeholders::error,
+                                                   asio::placeholders::bytes_transferred));
+                return;
+            }
+
+            (*(this_ptr->_socket))->async_receive(
+                                      buffer(&this_ptr->_len, sizeof(this_ptr->_len)),
+                                      boost::bind(
+                                                  &ProtoBufRpcChannel::MethodHandler::handle_read_len,
+                                                  this_ptr,
+                                                  asio::placeholders::error,
+                                                  asio::placeholders::bytes_transferred)
+                                     );
+        }
+        else
+        {
+            this_ptr->_controller->SetFailed(e.message());
+            (*(this_ptr->_socket))->close();
+        }
+    }
+
+    static void handle_read_len(shared_ptr<MethodHandler> this_ptr,
+                                const error_code& e,
+                                std::size_t bytes_transferred)
+    {
+        if (!e && bytes_transferred == sizeof(this_ptr->_len))
+        {
+            this_ptr->_len = ntohl(this_ptr->_len);
+            (*(this_ptr->_socket))->async_receive(
+                                      this_ptr->_buffer.prepare(this_ptr->_len),
+                                      boost::bind(
+                                                  &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+                                                  this_ptr,
+                                                  asio::placeholders::error,
+                                                  asio::placeholders::bytes_transferred
+                                                 )
+                                     );
+        }
+        else
+        {
+            this_ptr->_controller->SetFailed(e.message());
+            (*(this_ptr->_socket))->close();
+        }
+    }
+
+    static void handle_read_response(shared_ptr<MethodHandler> this_ptr,
+                              const error_code& e, 
+                              std::size_t bytes_transferred)
+    {
+        if (!e)
+        {
+            this_ptr->_buffer.commit(bytes_transferred);
+            if (this_ptr->_buffer.size() >= this_ptr->_len)
+            {
+                using google::protobuf::io::ArrayInputStream;
+                using google::protobuf::io::CodedInputStream;
+
+                const void* data = asio::buffer_cast<const void*>(
+                                                        this_ptr->_buffer.data()
+                                                                 );
+                ArrayInputStream as(data, this_ptr->_len);
+                CodedInputStream is(&as);
+                is.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+
+                if (!this_ptr->_response->ParseFromCodedStream(&is))
+                {
+                    throw std::runtime_error("ParseFromCodedStream");
+                }
+
+                this_ptr->_buffer.consume(this_ptr->_len);
+            }
+            else
+            {
+                (*(this_ptr->_socket))->async_receive(
+                                          this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()),
+                                          boost::bind(
+                                                      &ProtoBufRpcChannel::MethodHandler::handle_read_response,
+                                                      this_ptr,
+                                                      asio::placeholders::error,
+                                                      asio::placeholders::bytes_transferred
+                                                     )
+                                         );
+                return;
+            }
+        }
+        else
+        {
+            this_ptr->_controller->SetFailed(e.message());
+            (*(this_ptr->_socket))->close();
+        }
+    }
+
+private:
+    auto_ptr<SocketCheckout> _socket;
+    const MethodDescriptor * _method;
+    RpcController * _controller;
+    const Message * _request;
+    Message * _response;
+    Closure * _done;
+    asio::streambuf _buffer;
+    unsigned int _len;
+    bool _status;
+    unsigned int _sent;
+};
+
+
+ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost, 
+                                  const string &port)
+    :_remote_host(remotehost), _port(port),
+     _resolver(_io_service),
+     _acceptor(_io_service),
+     _pool(2000, _io_service),
+     _lame_socket(_io_service),
+     _thread()
+//                                &asio::io_service::run, 
+//                                &_io_service)))
+{
+
+    tcp::resolver::query query(_remote_host, _port);
+    tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query);
+    tcp::resolver::iterator end;
+
+    error_code error = asio::error::host_not_found;
+
+    if (endpoint_iterator == end) throw syserr::system_error(error);
+
+    _pool.setEndpoint(*endpoint_iterator);
+
+    tcp::endpoint e(tcp::v4(), 0);
+    _acceptor.open(e.protocol());
+    _acceptor.set_option(tcp::acceptor::reuse_address(true));
+    _acceptor.bind(e);
+    _acceptor.listen();
+    _acceptor.async_accept(_lame_socket,
+                       boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this, 
+                                   asio::placeholders::error));
+
+    _thread = shared_ptr<thread>(new thread(
+                                     boost::bind(
+                                             &asio::io_service::run, 
+                                             &_io_service)));
+}
+
+void ProtoBufRpcChannel::lame_handle_accept(const error_code &err)
+{
+    if (!err)
+    {
+        _acceptor.async_accept(_lame_socket,
+                           boost::bind(&ProtoBufRpcChannel::lame_handle_accept,
+                                       this,
+                                       asio::placeholders::error));
+    }
+}
+
+ProtoBufRpcChannel::~ProtoBufRpcChannel()
+{
+    _pool.cancelAndClear();
+
+    _io_service.stop();
+
+    _thread->join();
+}
+
+void ProtoBufRpcChannel::CallMethod(
+        const MethodDescriptor * method,
+        RpcController * controller,
+        const Message * request,
+        Message * response,
+        Closure * done)
+{
+    shared_ptr<MethodHandler> h(
+                            new MethodHandler(
+                          auto_ptr<SocketCheckout>(new SocketCheckout(&_pool)),
+                                              method,
+                                              controller,
+                                              request,
+                                              response,
+                                              done
+                                             ));
+
+    MethodHandler::execute(h);
+}
+
+} // namespace bicker
diff --git a/kvstore/protobufrpc/protobufrpc.h b/kvstore/protobufrpc/protobufrpc.h
new file mode 100644 (file)
index 0000000..221903d
--- /dev/null
@@ -0,0 +1,173 @@
+#ifndef __PROTOBUFRPC_H__
+#define __PROTOBUFRPC_H__
+
+#include <stdint.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/thread.hpp>
+#include <boost/function.hpp>
+#include <queue>
+#include <set>
+#include "socket_pool.h"
+#include "util.h"
+
+#include <google/protobuf/stubs/common.h>
+#include <google/protobuf/generated_message_reflection.h>
+#include <google/protobuf/repeated_field.h>
+#include <google/protobuf/extension_set.h>
+#include <google/protobuf/service.h>
+
+using namespace std;
+using namespace google::protobuf;
+using namespace boost;
+
+namespace bicker
+{
+
+class ProtoBufRpcService;
+
+class ProtoBufRpcConnection 
+    : public enable_shared_from_this<ProtoBufRpcConnection>,
+      private boost::noncopyable
+{
+public:
+    explicit ProtoBufRpcConnection(asio::io_service& io_service,
+                                 Service *_service);
+
+    asio::ip::tcp::socket& socket();
+
+    void start();
+
+    void writeResponse(Message *msg);
+
+
+private:
+    void handle_read(const error_code& e, 
+                     std::size_t bytes_transferred);
+
+    void handle_write(const error_code& e,
+                     std::size_t bytes_transferred);
+
+    tcp::socket _socket;
+
+
+    asio::io_service::strand _strand;
+
+    Service *_service;
+
+    asio::streambuf _buffer;
+
+    int _id;
+    unsigned int _len;
+
+    enum {
+        STATE_NONE,
+        STATE_HAVE_ID_AND_LEN,
+        STATE_WAITING_FOR_DATA,
+        STATE_FAIL,
+    } _state;
+};
+
+
+class ProtoBufRpcServer
+{
+public:
+    ProtoBufRpcServer();
+
+    bool registerService(uint16_t port,
+                         shared_ptr< Service> service);
+
+    // So we can call this as a thread.
+    void run();
+
+    // So we can stop..
+    void shutdown();
+
+protected:
+
+    class RegisteredService
+    {
+    public:
+        RegisteredService(
+                          shared_ptr<asio::io_service> io_service,
+                          uint16_t port,
+                          shared_ptr<Service> service
+                         );
+
+        void handle_accept(const error_code& e);
+
+    protected:
+        // Ref to parent's
+        shared_ptr<asio::io_service> _io_service;
+        uint16_t _port;
+        shared_ptr<Service> _service;
+        tcp::endpoint _endpoint;
+        tcp::acceptor _acceptor;
+        shared_ptr<ProtoBufRpcConnection> _new_connection;
+    };
+
+    list<shared_ptr<RegisteredService> > _services;
+    shared_ptr<asio::io_service> _io_service;
+};
+
+class ProtoBufRpcController : public RpcController
+{
+public:
+    ProtoBufRpcController();
+    virtual ~ProtoBufRpcController();
+
+    virtual void Reset();
+    virtual bool Failed() const;
+    virtual string ErrorText() const;
+    virtual void StartCancel();
+
+    virtual void SetFailed(const string &reason);
+    virtual bool IsCanceled() const;
+    virtual void NotifyOnCancel(Closure *callback);
+};
+
+class ProtoBufRpcChannel
+    : public RpcChannel,
+      public enable_shared_from_this<ProtoBufRpcChannel>,
+      private boost::noncopyable
+{
+public:
+    ProtoBufRpcChannel(const string &remotehost, const string &port);
+
+    virtual ~ProtoBufRpcChannel();
+
+    virtual void CallMethod(
+        const MethodDescriptor * method,
+        RpcController * controller,
+        const Message * request,
+        Message * response,
+        Closure * done);
+
+protected:
+    shared_ptr<tcp::socket> getSocket();
+    void putSocket(shared_ptr<tcp::socket>);
+
+private:
+    class MethodHandler;
+
+    string _remote_host;
+    string _port;
+
+
+    asio::io_service _io_service;
+    tcp::resolver _resolver;
+    // This exists to keep the io service running
+    tcp::acceptor _acceptor;
+
+    SocketPool _pool;
+
+    asio::ip::tcp::socket _lame_socket;
+    void lame_handle_accept(const error_code &err);
+
+    shared_ptr<boost::thread> _thread;
+
+};
+
+} // namespace bicker
+
+#endif
diff --git a/kvstore/protobufrpc/socket_pool.cc b/kvstore/protobufrpc/socket_pool.cc
new file mode 100644 (file)
index 0000000..454da55
--- /dev/null
@@ -0,0 +1,101 @@
+#include "socket_pool.h"
+
+SocketPool::SocketPool(int max_sockets,
+               asio::io_service &io_svc
+              )
+:_issued(0),
+ _max_sockets(max_sockets),
+ _io_service(io_svc)
+{
+}
+
+void SocketPool::setEndpoint(const tcp::endpoint &endpoint)
+{
+    _endpoint = endpoint;
+}
+
+void SocketPool::cancelAndClear()
+{
+    for (set<shared_ptr<tcp::socket> >::iterator i = _set.begin();
+         i != _set.end();
+         ++i)
+    {
+        (*i)->cancel();
+    }
+
+    while (!_queue.empty()) _queue.pop();
+    _set.clear();
+}
+
+shared_ptr<tcp::socket> SocketPool::getSocket()
+{
+    mutex::scoped_lock lock(_sockets_lock);
+
+    while (_queue.size() == 0 && _issued >= _max_sockets)
+        _sockets_non_empty.wait(lock);
+
+    if (_queue.size())
+    {
+            shared_ptr<tcp::socket> socket = _queue.front();
+            _queue.pop();
+
+            return socket;
+    }
+    else
+    {
+        ++_issued;
+        error_code error = asio::error::host_not_found;
+
+        shared_ptr<tcp::socket> socket(new tcp::socket(_io_service));
+        socket->connect(_endpoint, error);
+
+        if (error) throw syserr::system_error(error);
+
+        _set.insert(socket);
+
+        return socket;
+    }
+}
+
+void SocketPool::putSocket(shared_ptr<tcp::socket> socket)
+{
+    mutex::scoped_lock lock(_sockets_lock);
+
+    if (!socket->is_open())
+    {
+        cerr << "socket closed\n";
+        --_issued;
+        _set.erase(socket);
+    }
+    else 
+    {
+        _queue.push(socket);
+    }
+
+    _sockets_non_empty.notify_one();
+}
+
+SocketCheckout::SocketCheckout(SocketPool *pool)
+    :_socket(pool->getSocket()),
+     _pool(pool)
+{
+}
+
+SocketCheckout::~SocketCheckout()
+{
+    _pool->putSocket(_socket);
+}
+tcp::socket& SocketCheckout::operator*()
+{
+    return *_socket;
+}
+
+tcp::socket* SocketCheckout::operator->()
+{
+    return _socket.get();
+}
+
+shared_ptr<tcp::socket>& SocketCheckout::socket()
+{
+    return _socket;
+}
diff --git a/kvstore/protobufrpc/socket_pool.h b/kvstore/protobufrpc/socket_pool.h
new file mode 100644 (file)
index 0000000..57faf42
--- /dev/null
@@ -0,0 +1,52 @@
+#ifndef _SOCKET_POOL_H_
+#define _SOCKET_POOL_H_ 1
+
+#include <iostream>
+#include <set>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+
+#include "util.h"
+
+using namespace std;
+using namespace boost;
+
+
+class SocketPool
+{
+public:
+    SocketPool(int max_streams,
+               asio::io_service &io_svc);
+    void setEndpoint(const tcp::endpoint &endpoint);
+    void cancelAndClear();
+    shared_ptr<tcp::socket> getSocket();
+    void putSocket(shared_ptr<tcp::socket> socket);
+private:
+    int _issued;
+    int _max_sockets;
+    mutex _sockets_lock;
+    condition_variable _sockets_non_empty;
+    asio::io_service &_io_service;
+    tcp::endpoint _endpoint;
+    queue<shared_ptr<tcp::socket> > _queue;
+    set<shared_ptr<tcp::socket> > _set;
+};
+
+class SocketCheckout
+{
+public:
+    SocketCheckout(SocketPool *pool);
+    ~SocketCheckout();
+
+    tcp::socket& operator*();
+    tcp::socket* operator->();
+
+    shared_ptr<tcp::socket>& socket();
+
+private:
+    shared_ptr<tcp::socket> _socket;
+    SocketPool *_pool;
+};
+
+#endif
diff --git a/kvstore/protobufrpc/util.h b/kvstore/protobufrpc/util.h
new file mode 100644 (file)
index 0000000..966921f
--- /dev/null
@@ -0,0 +1,37 @@
+#ifndef _UTIL_H_ 
+#define _UTIL_H_ 1
+
+#include <boost/version.hpp>
+
+#if BOOST_VERSION <= 103500
+#include <boost/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+#include <asio.hpp>
+#include <asio/buffer.hpp>
+//typedef boost::condition condition_variable ;
+//typedef boost::detail::thread::scoped_lock<boost::mutex> scoped_lock;
+using asio::ip::tcp;
+using asio::error_code;
+using asio::buffers_begin;
+namespace syserr=asio;
+#else
+#if BOOST_VERSION < 104000
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/bind.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
+#else
+#include <boost/asio.hpp>
+#include <boost/thread/mutex.hpp>
+using boost::asio::ip::tcp;
+using boost::system::error_code;
+using boost::system::system_error;
+namespace syserr=boost::system;
+#endif
+#endif
+
+#endif
diff --git a/kvstore/protobufrpc/workqueue.cc b/kvstore/protobufrpc/workqueue.cc
new file mode 100644 (file)
index 0000000..27861aa
--- /dev/null
@@ -0,0 +1,125 @@
+#include "workqueue.h"
+#include <boost/thread.hpp>
+
+namespace bicker
+{
+
+WorkQueue::WorkQueue()
+    :_thread_count(0), _min_threads(1), _max_threads(50), _running(true)
+{
+    for (int i = 0; i < _min_threads; ++i)
+    {
+        spawnThread();
+    }
+}
+
+WorkQueue::WorkQueue(int thread_count)
+    :_thread_count(0), 
+     _min_threads(1), _max_threads(thread_count), _running(true)
+{
+    for (int i = 0; i < _min_threads; ++i)
+    {
+        spawnThread();
+    }
+}
+
+WorkQueue::~WorkQueue()
+{
+    _running = false;
+
+    {
+        mutex::scoped_lock lock(_queue_lock);
+        _queue_non_empty.notify_all();
+    }
+
+    _threads.join_all();
+}
+
+void WorkQueue::spawnThread()
+{
+    ++_thread_count;
+    _threads.create_thread(Worker(this));
+}
+
+shared_ptr<WorkUnit> WorkQueue::get()
+{
+    mutex::scoped_lock lock(_queue_lock);
+
+    while (_queue.size() == 0)
+    {
+        _queue_non_empty.wait(lock);
+        if (!_running) throw interrupted_error();
+    }
+
+    shared_ptr<WorkUnit> back = _queue.front();
+    _queue.pop();
+
+    if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread();
+
+    return back;
+}
+
+void WorkQueue::put(shared_ptr<WorkUnit> work_unit)
+{
+    mutex::scoped_lock lock(_queue_lock);
+
+    _queue.push(work_unit);
+
+    _queue_non_empty.notify_one();
+}
+
+WorkQueue::Worker::Worker(WorkQueue* queue)
+    :_queue(queue)
+{
+}
+
+void WorkQueue::Worker::operator()()
+{
+    while (true)
+    {
+        try
+        {
+            shared_ptr<WorkUnit> unit = _queue->get();
+
+            unit->run();
+        }
+        catch (interrupted_error)
+        {
+            return;
+        }
+    }
+}
+
+TaskNotification::TaskNotification()
+:_expected(0), _count(0), _fail_count(0)
+{
+}
+
+void TaskNotification::registerTask()
+{
+    mutex::scoped_lock lock(_lock);
+    ++_expected;
+}
+
+void TaskNotification::completeTask(bool success)
+{
+    mutex::scoped_lock lock(_lock);
+    if (!success) ++_fail_count;
+    if (++_count == _expected) _cond.notify_all();
+}
+
+void TaskNotification::waitForComplete()
+{
+    mutex::scoped_lock lock(_lock);
+    while (_count < _expected)
+    {
+        _cond.wait(lock);
+    }
+}
+
+bool TaskNotification::failCount()
+{
+    return _fail_count;
+}
+
+} // namespace bicker
diff --git a/kvstore/protobufrpc/workqueue.h b/kvstore/protobufrpc/workqueue.h
new file mode 100644 (file)
index 0000000..cfb2e02
--- /dev/null
@@ -0,0 +1,78 @@
+#ifndef __WORKQUEUE_H__
+#define __WORKQUEUE_H__ 1
+
+#include <boost/thread.hpp>
+#include <boost/shared_ptr.hpp>
+#include <queue>
+#include "util.h"
+
+using namespace boost;
+using namespace std;
+
+namespace bicker
+{
+    struct interrupted_error : public virtual std::exception { };
+
+    class WorkUnit
+    {
+    public:
+        virtual ~WorkUnit() {};
+        virtual void run() = 0;
+    };
+
+    class WorkQueue
+    {
+    public:
+        WorkQueue();
+        WorkQueue(int thread_count);
+
+        ~WorkQueue();
+
+        shared_ptr<WorkUnit> get();
+        void put(shared_ptr<WorkUnit> work_unit);
+
+    protected:
+        void spawnThread();
+
+    private:
+        class Worker
+        {
+        public:
+            Worker(WorkQueue* queue);
+            void operator()();
+        private:
+            WorkQueue *_queue;
+        };
+
+        int _thread_count;
+        int _min_threads;
+        int _max_threads;
+        mutex _queue_lock;
+        condition_variable _queue_non_empty;
+        queue<shared_ptr<WorkUnit> > _queue;
+        thread_group _threads;
+        volatile bool _running;
+    };
+
+    class TaskNotification
+    {
+    public:
+        TaskNotification();
+
+        void registerTask();
+        void completeTask(bool success = true);
+
+        void waitForComplete();
+
+        bool failCount();
+    private:
+        int                _expected;
+        int                _count;
+        int                _fail_count;
+        mutex              _lock;
+        condition_variable _cond;
+    };
+
+} // namespace bicker
+
+#endif