From 3c2cbef21a11c4d86952922f4da7b830a91423f9 Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Sun, 14 Feb 2010 14:56:22 -0800 Subject: [PATCH] Add John MucCullough's simple key/value storage server. --- kvstore/NOTES | 13 + kvstore/SConscript | 65 +++ kvstore/SConstruct | 39 ++ kvstore/backend.cc | 295 ++++++++++++++ kvstore/backend.h | 76 ++++ kvstore/debian/README.Debian | 6 + kvstore/debian/README.source | 9 + kvstore/debian/changelog | 37 ++ kvstore/debian/compat | 1 + kvstore/debian/control | 13 + kvstore/debian/copyright | 35 ++ kvstore/debian/docs | 1 + kvstore/debian/install | 1 + kvstore/debian/rules | 43 ++ kvstore/kvbench.cc | 111 +++++ kvstore/kvclient.cc | 166 ++++++++ kvstore/kvclient.h | 40 ++ kvstore/kvservice.cc | 55 +++ kvstore/kvservice.h | 34 ++ kvstore/kvstore.cc | 137 +++++++ kvstore/kvstore.proto | 35 ++ kvstore/kvstore.spec | 33 ++ kvstore/kvtest.cc | 104 +++++ kvstore/protobufrpc/SConscript | 7 + kvstore/protobufrpc/SConstruct | 29 ++ kvstore/protobufrpc/protobufrpc.cc | 628 +++++++++++++++++++++++++++++ kvstore/protobufrpc/protobufrpc.h | 173 ++++++++ kvstore/protobufrpc/socket_pool.cc | 101 +++++ kvstore/protobufrpc/socket_pool.h | 52 +++ kvstore/protobufrpc/util.h | 37 ++ kvstore/protobufrpc/workqueue.cc | 125 ++++++ kvstore/protobufrpc/workqueue.h | 78 ++++ 32 files changed, 2579 insertions(+) create mode 100644 kvstore/NOTES create mode 100644 kvstore/SConscript create mode 100644 kvstore/SConstruct create mode 100644 kvstore/backend.cc create mode 100644 kvstore/backend.h create mode 100644 kvstore/debian/README.Debian create mode 100644 kvstore/debian/README.source create mode 100644 kvstore/debian/changelog create mode 100644 kvstore/debian/compat create mode 100644 kvstore/debian/control create mode 100644 kvstore/debian/copyright create mode 100644 kvstore/debian/docs create mode 100644 kvstore/debian/install create mode 100755 kvstore/debian/rules create mode 100644 kvstore/kvbench.cc create mode 100644 kvstore/kvclient.cc create mode 100644 kvstore/kvclient.h create mode 100644 kvstore/kvservice.cc create mode 100644 kvstore/kvservice.h create mode 100644 kvstore/kvstore.cc create mode 100644 kvstore/kvstore.proto create mode 100644 kvstore/kvstore.spec create mode 100644 kvstore/kvtest.cc create mode 100644 kvstore/protobufrpc/SConscript create mode 100644 kvstore/protobufrpc/SConstruct create mode 100644 kvstore/protobufrpc/protobufrpc.cc create mode 100644 kvstore/protobufrpc/protobufrpc.h create mode 100644 kvstore/protobufrpc/socket_pool.cc create mode 100644 kvstore/protobufrpc/socket_pool.h create mode 100644 kvstore/protobufrpc/util.h create mode 100644 kvstore/protobufrpc/workqueue.cc create mode 100644 kvstore/protobufrpc/workqueue.h diff --git a/kvstore/NOTES b/kvstore/NOTES new file mode 100644 index 0000000..16cc05a --- /dev/null +++ b/kvstore/NOTES @@ -0,0 +1,13 @@ +apt-get install mercurial build-essential libdb4.6-dev libasio-dev libboost1.35-dev libprotobuf-dev libgtest-dev protobuf-compiler scons + +rm -f /tmp/__db* /tmp/test.db;./kvstore -d /tmp & KVPID=$! ;sleep 1 ;time +./kvbench --high 20 --servers 127.0.0.1:9090 ;kill $KVPID + +(0x42, 0x5A, 0x68) + + grep '1f 8b 08 00' + +x=/boot/vmlinuz-2.6.26-2-amd64 p=`echo -e '\x8b\x08\x00'` o=`grep -a -b +-o -m 1 -e $p $x | cut -d: -f 1` dd if=$x skip=$o bs=1 | zcat > /tmp/vmlinux + +x=/boot/vmlinuz-2.6.26-2-amd64 dd if=$x skip=`grep -a -b -o -m 1 -e $'\xf1\x8b\x08\x00 $x | cut -d: -f 1` bs=1 | zcat > /tmp/vmlinux diff --git a/kvstore/SConscript b/kvstore/SConscript new file mode 100644 index 0000000..6414023 --- /dev/null +++ b/kvstore/SConscript @@ -0,0 +1,65 @@ +Import('env') + +protobuf = Builder( + action = ( + 'protoc --proto_path=${SOURCE.dir} --cpp_out=${TARGET.dir} $SOURCE' + ), + single_source = True, + ) + +env["BUILDERS"]["Protobuf"] = protobuf + +env.ParseConfig("echo -I./protobufrpc") +env.ParseConfig("echo -I/opt/local/include -L/opt/local/lib") + +if not env.has_key('LIBS'): + env['LIBS'] = [] + +env['LIBS'] += ['protobuf', + 'pthread', + 'boost_thread-mt', + 'boost_regex-mt', + 'boost_system-mt', + 'boost_program_options-mt', + 'db'] + +protobufrpc = SConscript(dirs=['./protobufrpc/'], exports='env') + +kvstore_proto_files = env.Protobuf( + target = [ 'kvstore.pb.cc', 'kvstore.pb.h' ], + source = 'kvstore.proto' + ) + +kvservice_files = ['kvservice.cc', 'backend.cc'] +kvclient_files = ['kvclient.cc'] + +kvservice = env.StaticLibrary('kvservice', kvservice_files) +kvclient = env.StaticLibrary('kvclient', kvclient_files) +kvstore_proto = env.StaticLibrary('kvstore', kvstore_proto_files) + +env.Program('kvstore', ['kvstore.cc'] + + kvservice + + kvstore_proto + + protobufrpc) + +env.Program('kvtest', + ['kvtest.cc'] + + kvservice + + kvclient + + kvstore_proto + + protobufrpc, + LIBS=env['LIBS']+['gtest']) + +env.Program('kvbench', + ['kvbench.cc'] + + kvclient + + kvstore_proto + + protobufrpc) + +products = { + 'kvservice': kvservice, + 'kvclient': kvclient, + 'kvstore_proto': kvstore_proto, + } + +Return('products') diff --git a/kvstore/SConstruct b/kvstore/SConstruct new file mode 100644 index 0000000..a848c09 --- /dev/null +++ b/kvstore/SConstruct @@ -0,0 +1,39 @@ +import os + +AddOption('--prefix', + dest='prefix', + type='string', + nargs=1, + action='store', + metavar='DIR', + help='installation prefix') + +env = Environment(CXXFLAGS='-O3 -fPIC -W -Wall -g', + PREFIX=GetOption('prefix'), + LINKFLAGS='',#-pg', + ) + + +for envvar in ('HOME', 'DISTCC_DIR', 'DISTCC_HOSTS', 'CCACHE_DIR', + 'INTERCEPTOR_SOCKET', 'ENFORGE_DIGEST_CACHE', + 'ENFORGE_CACHE_HOST', 'ENFORGE_CACHE_PORT', + 'PATH'): + if envvar in os.environ: + env['ENV'][envvar] = os.environ[envvar] + +#env['CXX'] = 'distcc g++' +#env['CXX'] = 'icc' + +if env['PREFIX'] is not None: + bin_dest = env['PREFIX'] + '/usr/bin' +else: + bin_dest = '/usr/bin' + +#env.ParseConfig("echo -lprofiler -ltcmalloc") + +SConscript(dirs=['./'], exports='env') + +if 'install' in COMMAND_LINE_TARGETS: + env.Install(bin_dest, 'kvstore') + +env.Alias('install', bin_dest) diff --git a/kvstore/backend.cc b/kvstore/backend.cc new file mode 100644 index 0000000..d97c492 --- /dev/null +++ b/kvstore/backend.cc @@ -0,0 +1,295 @@ +#include "backend.h" +#include "util.h" + +#include +#include + +/* For bdb */ +extern "C" { +#include +#include +#include +} + + +namespace kvstore +{ + +MemoryBackend::~MemoryBackend() +{ +} + +bool MemoryBackend::Put(const string &key, + const string &value) +{ + _map[key] = value; + return true; +} + +bool MemoryBackend::Get(const string &key, + string *value) +{ + map_t::iterator it = _map.find(key); + + if (it == _map.end()) + { + return false; + } + else + { + *value = it->second; + return true; + } +} + +void set_DBT(DBT *thing, const string &str) +{ + bzero(thing, sizeof(DBT)); + thing->data = (void*)str.data(); + thing->size = str.size(); + thing->ulen = str.size(); +} + +class BDBBackend::BDBDatabase +{ +public: + BDBDatabase(const string &path, + bool flush, + bool log_in_memory, + size_t num_dbs) + :_path(path) + { + int res = db_env_create(&_dbenv, 0); + + if (res != 0) + { + cerr << db_strerror(res) << endl; + throw std::runtime_error("db_env_create fail"); + } + + /* Set Cache Size To Total Memory */ + if (true) + { + double use_fraction = 0.1; + uint64_t pages = sysconf(_SC_PHYS_PAGES); + uint64_t page_size = sysconf(_SC_PAGE_SIZE); + + uint64_t bytes = pages * page_size * use_fraction / num_dbs; + + uint32_t gbytes = bytes / (1024uLL * 1024uLL * 1024uLL); + uint32_t nbytes = bytes % (1024uLL * 1024uLL * 1024uLL); + uint32_t ncache = bytes / (1024uLL * 1024uLL * 1024uLL * 4uLL) + 1; + + res = _dbenv->set_cachesize(_dbenv, gbytes, nbytes, ncache); + + if (res != 0) + { + cerr << db_strerror(res) << endl; + throw std::runtime_error("set_cachesize"); + } + } + + if (log_in_memory) + { + res = _dbenv->set_flags(_dbenv, DB_LOG_INMEMORY, 1); + + if (res != 0) + { + cerr << db_strerror(res) << endl; + throw std::runtime_error("BDB ENV DB_LOG_INMEMORY"); + } + + } + + res = _dbenv->set_flags(_dbenv, DB_LOG_AUTOREMOVE, 1); + + if (res != 0) + { + cerr << db_strerror(res) << endl; + throw std::runtime_error("BDB ENV DB_LOG_AUTOREMOVE"); + } + + res = _dbenv->open(_dbenv, + _path.c_str(), + DB_INIT_CDB + | DB_INIT_MPOOL + | DB_CREATE + | DB_THREAD, + 0644); + + + if (res != 0) + { + cerr << db_strerror(res) << endl; + throw std::runtime_error("BDB ENV Open Fail"); + } + + string dbfilename = _path + "/test.db"; + + /* Flush */ + if (flush) + { + res = _dbenv->dbremove(_dbenv, NULL, dbfilename.c_str(), "test", 0); + + if (res != 0 && res != ENOENT) + { + cerr << db_strerror(res) << endl; + throw std::runtime_error("db remove failed"); + } + } + + res = db_create(&_db, _dbenv, 0); + + if (res != 0) + { + cerr << db_strerror(res) << endl; + throw std::runtime_error("db_create fail"); + } + + uint32_t flags = DB_CREATE | DB_THREAD; + + res = _db->open(_db, + NULL, /* TXN */ + dbfilename.c_str(), + "test", + DB_BTREE, + flags, + 0644); + + if (res != 0) + { + cerr << db_strerror(res) << endl; + throw std::runtime_error("BDB Open Fail"); + } + + + } + + ~BDBDatabase() + { + int res; + + if (_db) + { + if ((res = _db->close(_db, 0)) < 0) + { + cerr << db_strerror(res) << endl; + } + } + + if (_dbenv) + { + if ((res = _dbenv->close(_dbenv, 0)) < 0) + { + cerr << db_strerror(res) << endl; + } + } + } + + bool Put(const string &key, + const string &value) + { + DBT bdb_key, bdb_value; + + set_DBT(&bdb_key, key); + set_DBT(&bdb_value, value); + + if (_db->put(_db, NULL, &bdb_key, &bdb_value, 0) != 0) + { + perror("bdb put"); + return false; + } + else + { + return true; + } + } + + bool Get(const string &key, + string *value) + { + DBT bdb_key, bdb_value; + + set_DBT(&bdb_key, key); + + bzero(&bdb_value, sizeof(DBT)); + bdb_value.flags = DB_DBT_MALLOC; + + int res = _db->get(_db, NULL, &bdb_key, &bdb_value, 0); + + if (res == 0) + { + *value = string((char*)bdb_value.data, bdb_value.size); + free(bdb_value.data); + bdb_value.data = NULL; + return true; + } + else if (res == DB_NOTFOUND || res == DB_KEYEMPTY) + { + return false; + } + else + { + /* ERR */ + cerr << db_strerror(res) << endl; + return false; + } + } + +private: + DB *_db; + DB_ENV *_dbenv; + const string _path; +}; + +BDBBackend::BDBBackend(const vector &paths, + bool flush, + bool log_in_memory) +{ + if (paths.size() < 1) + { + cerr << "Insufficient BDB Paths supplied (need at least 1)"; + throw std::runtime_error("not enough paths"); + } + + + for (size_t i = 0; i < paths.size(); ++i) + { + cerr << "db for " << paths[i] << endl; + _dbs.push_back(shared_ptr(new BDBDatabase(paths[i], + flush, + log_in_memory, + paths.size()))); + } +} + +BDBBackend::~BDBBackend() +{ +} + + +static boost::hash hasher; + +inline size_t BDBBackend::LookupKeyDB(const string &key) +{ + uint32_t hash = (uint32_t)hasher(key); + return hash % _dbs.size(); +} + +bool BDBBackend::Put(const string &key, + const string &value) +{ + size_t i = LookupKeyDB(key); + + return _dbs[i]->Put(key, value); +} + +bool BDBBackend::Get(const string &key, + string *value) +{ + size_t i = LookupKeyDB(key); + return _dbs[i]->Get(key, value); +} + + +} // namespace kvstore diff --git a/kvstore/backend.h b/kvstore/backend.h new file mode 100644 index 0000000..1026b0c --- /dev/null +++ b/kvstore/backend.h @@ -0,0 +1,76 @@ +#ifndef _BACKEND_H_ +#define _BACKEND_H_ 1 + +extern "C" +{ +#include +} + +#include +#include + +#include + +#include "util.h" + +using namespace std; +using boost::shared_ptr; + +namespace kvstore +{ + +class Backend +{ +public: + virtual ~Backend() {}; + + virtual bool Put(const string &key, + const string &value) = 0; + + virtual bool Get(const string &key, + string *value) = 0; + +}; + +class MemoryBackend : public Backend +{ +public: + virtual ~MemoryBackend(); + + virtual bool Put(const string &key, + const string &value); + + virtual bool Get(const string &key, + string *value); + +private: + boost::mutex _lock; + typedef map map_t; + map_t _map; +}; + +class BDBBackend : public Backend +{ +public: + BDBBackend(const vector &paths, + bool flush=true, + bool log_in_memory = false); + + virtual ~BDBBackend(); + virtual bool Put(const string &key, + const string &value); + + virtual bool Get(const string &key, + string *value); + +private: + class BDBDatabase; + + inline size_t LookupKeyDB(const string &key); + + vector< shared_ptr > _dbs; +}; + +} // namespace kvstore + +#endif diff --git a/kvstore/debian/README.Debian b/kvstore/debian/README.Debian new file mode 100644 index 0000000..0cc3a58 --- /dev/null +++ b/kvstore/debian/README.Debian @@ -0,0 +1,6 @@ +bicker-fcgi for Debian +---------------------- + + + + -- John McCullough Tue, 29 Sep 2009 13:29:18 -0700 diff --git a/kvstore/debian/README.source b/kvstore/debian/README.source new file mode 100644 index 0000000..a7e4671 --- /dev/null +++ b/kvstore/debian/README.source @@ -0,0 +1,9 @@ +bicker-fcgi for Debian +---------------------- + + + + + + diff --git a/kvstore/debian/changelog b/kvstore/debian/changelog new file mode 100644 index 0000000..be2e2de --- /dev/null +++ b/kvstore/debian/changelog @@ -0,0 +1,37 @@ +kvstore (0.1-7) unstable; urgency=low + + * Reduced memory copying in protobuf, support for multiple data + targets + + -- John McCullough Thu, 21 Jan 2010 11:10:45 -0800 + +kvstore (0.1-6) unstable; urgency=low + + * Improved RPC Layer + + -- John McCullough Thu, 24 Dec 2009 14:25:41 -0800 + +kvstore (0.1-5) unstable; urgency=low + + * KeyValue -> KVStore rename. + * Fixed Dependencies + + -- John McCullough Sun, 29 Nov 2009 16:17:23 -0800 + +kvstore (0.1-3) unstable; urgency=low + + * More Packaging Fun + + -- John McCullough Sun, 29 Nov 2009 15:34:41 -0800 + +kvstore (0.1-2) unstable; urgency=low + + * Incremented boost version to .0 + + -- John McCullough Sun, 29 Nov 2009 15:32:01 -0800 + +kvstore (0.1-1) unstable; urgency=low + + * Initial release (Closes: #nnnn) + + -- John McCullough Tue, 29 Sep 2009 13:29:18 -0700 diff --git a/kvstore/debian/compat b/kvstore/debian/compat new file mode 100644 index 0000000..7f8f011 --- /dev/null +++ b/kvstore/debian/compat @@ -0,0 +1 @@ +7 diff --git a/kvstore/debian/control b/kvstore/debian/control new file mode 100644 index 0000000..748f726 --- /dev/null +++ b/kvstore/debian/control @@ -0,0 +1,13 @@ +Source: kvstore +Section: unknown +Priority: extra +Maintainer: John McCullough +Build-Depends: debhelper (>= 7), libboost-thread1.35-dev, libboost-system1.35-dev, libboost1.35-dev, libasio-dev, protobuf-compiler, libprotobuf-dev, libdb4.6-dev, libgtest-dev +Standards-Version: 3.8.3, +Homepage: + +Package: kvstore +Architecture: any +Depends: ${misc:Depends}, libboost-thread1.35.0, libboost-regex1.35.0, libboost-program-options1.35.0, libboost-system1.35.0, libprotobuf4, libdb4.6, libgtest0 +Description: Homebrew key value store + diff --git a/kvstore/debian/copyright b/kvstore/debian/copyright new file mode 100644 index 0000000..c356a24 --- /dev/null +++ b/kvstore/debian/copyright @@ -0,0 +1,35 @@ +This work was packaged for Debian by: + + John McCullough on Tue, 29 Sep 2009 13:29:18 -0700 + +It was downloaded from + +Upstream Author(s): + + + + +Copyright: + + + + +License: + + + +The Debian packaging is: + + Copyright (C) 2009 John McCullough + +# Please chose a license for your packaging work. If the program you package +# uses a mainstream license, using the same license is the safest choice. +# Please avoid to pick license terms that are more restrictive than the +# packaged work, as it may make Debian's contributions unacceptable upstream. +# If you just want it to be GPL version 3, leave the following lines in. + +and is licensed under the GPL version 3, +see `/usr/share/common-licenses/GPL-3'. + +# Please also look if there are files or directories which have a +# different copyright/license attached and list them here. diff --git a/kvstore/debian/docs b/kvstore/debian/docs new file mode 100644 index 0000000..e845566 --- /dev/null +++ b/kvstore/debian/docs @@ -0,0 +1 @@ +README diff --git a/kvstore/debian/install b/kvstore/debian/install new file mode 100644 index 0000000..c110135 --- /dev/null +++ b/kvstore/debian/install @@ -0,0 +1 @@ +usr/bin/kvstore diff --git a/kvstore/debian/rules b/kvstore/debian/rules new file mode 100755 index 0000000..a47c8e4 --- /dev/null +++ b/kvstore/debian/rules @@ -0,0 +1,43 @@ +#!/usr/bin/make -f +# -*- makefile -*- +# Sample debian/rules that uses debhelper. +# This file was originally written by Joey Hess and Craig Small. +# As a special exception, when this file is copied by dh-make into a +# dh-make output file, you may use that output file without restriction. +# This special exception was added by Craig Small in version 0.37 of dh-make. + +# Uncomment this to turn on verbose mode. +#export DH_VERBOSE=1 +build: + dh_testdir + scons + +install: build + dh_testdir + dh_testroot + dh_prep + dh_installdirs + scons --prefix=$(CURDIR)/debian/tmp install + +clean: + scons -c + dh_clean + +binary-arch: install + dh_testdir + dh_testroot + dh_installchangelogs + dh_install + dh_link + #dh_strip + dh_compress + dh_fixperms + dh_installdeb + dh_gencontrol + dh_md5sums + dh_builddeb + +binary: binary-arch + +%: + dh $@ diff --git a/kvstore/kvbench.cc b/kvstore/kvbench.cc new file mode 100644 index 0000000..da25085 --- /dev/null +++ b/kvstore/kvbench.cc @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include "kvservice.h" +#include "kvclient.h" +#include "protobufrpc.h" + +using namespace bicker; +using namespace boost; +using namespace kvstore; +using namespace std; + +namespace po = boost::program_options; + +class KVBench +{ +public: + KVBench(const vector &hosts) + :_kv_client(list(hosts.begin(), hosts.end())) + { + } + + virtual ~KVBench() + { + } + + + void Bench(const size_t size, + const size_t count) + { + string data(size, 'A'); + + for (size_t i = 0; i < count; ++i) + { + ostringstream key; + key << "key_" << size << "_" << i << endl; + + _kv_client.Put(key.str(), data); + } + + for (size_t i = 0; i < count; ++i) + { + string value; + ostringstream key; + key << "key_" << size << "_" << i << endl; + + _kv_client.Get(key.str(), &value); + } + } + +protected: + KeyValueClient _kv_client; +}; + + +int +main( + int argc, + char **argv + ) +{ + size_t opt_low; + size_t opt_high; + size_t opt_count; + + po::options_description options("Options"); + + options.add_options() + ("help,h", "display help message") + ("servers", + po::value< vector >()->multitoken(), + "server:port ... server:port") + ("low,l", + po::value(&opt_low)->default_value(1), + "low 2^i") + ("high,H", + po::value(&opt_high)->default_value(16), + "high 2^i") + ("count,c", + po::value(&opt_count)->default_value(100), + "count of each size") + ; + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, options), vm); + po::notify(vm); + + if (vm.count("help")) + { + cerr << options << endl; + return 1; + } + + if (!vm.count("servers")) + { + cerr << "No Servers Specified" << endl; + return 1; + } + + + KVBench bench(vm["servers"].as< vector >()); + + for (size_t i = opt_low; i <= opt_high; ++i) + { + cout << i << ": " << (1< +#include + +using namespace boost; +using namespace kvrpc; + +namespace kvstore +{ + +class KeyValueClientRouter +{ +public: + KeyValueClientRouter() + { + } + + void AddHost(const string &host, const string &port) + { + mutex::scoped_lock lock(_lock); + + _channels.push_back(shared_ptr(new ProtoBufRpcChannel(host, port))); + _clients.push_back(shared_ptr(new KeyValueService_Stub(static_cast(_channels.back().get())))); + } + + kvrpc::KeyValueService_Stub* Route(const string &key) + { + static hash hasher; + + uint32_t hash = (uint32_t)hasher(key); + int id = hash % _clients.size(); + + return _clients[id].get(); + } + +private: + mutex _lock; + vector< shared_ptr > _channels; + vector< shared_ptr > _clients; +}; + +KeyValueClient::KeyValueClient(const string& host, + const string& port) +:_router(new KeyValueClientRouter()) +{ + _router->AddHost(host, port); +} + +KeyValueClient::KeyValueClient(const list &hosts) +:_router(new KeyValueClientRouter()) +{ + for (list::const_iterator i = hosts.begin(); + i != hosts.end(); + ++i) + { + size_t pos = i->find(':'); + if (pos == string::npos) + throw runtime_error("couldn't parse host"); + + string host = i->substr(0, pos); + string port = i->substr(pos+1); + + _router->AddHost(host, port); + } +} + +typedef tuple, + shared_ptr, + shared_ptr > rpc_put_state_tuple_t; + +void cleanupPut(rpc_put_state_tuple_t t, TaskNotification *tn) +{ + tn->completeTask(t.get<2>()->result() == kvrpc::SUCCESS); +} + +bool +KeyValueClient::Put(const string& key, + const string& value) +{ + TaskNotification tn; + + this->Put(key, value, tn); + + tn.waitForComplete(); + + return tn.failCount() == 0; +} + +bool +KeyValueClient::Put(const string& key, + const string& value, + TaskNotification &tn) +{ + tn.registerTask(); + + shared_ptr ctrl(new ProtoBufRpcController()); + shared_ptr< ::kvrpc::Put> put(new ::kvrpc::Put()); + + put->set_key(key); + put->set_value(value); + + shared_ptr reply(new PutReply()); + + + _router->Route(key)->PutValue(ctrl.get(), put.get(), reply.get(), + NewCallback(&cleanupPut, + rpc_put_state_tuple_t(ctrl,put,reply), + &tn)); + + return true; +} + +typedef tuple, + shared_ptr, + shared_ptr, + string*> rpc_get_state_tuple_t; + +void cleanupGet(rpc_get_state_tuple_t t, TaskNotification *tn) +{ + bool result = t.get<2>()->result() == kvrpc::SUCCESS; + + if (result) + { + string* result_ptr = t.get<3>(); + *result_ptr = t.get<2>()->value(); + } + + tn->completeTask(result); +} + +bool +KeyValueClient::Get(const string& key, string* value) +{ + TaskNotification tn; + + this->Get(key, value, tn); + + tn.waitForComplete(); + + return tn.failCount() == 0; +} + +bool +KeyValueClient::Get(const string& key, string* value, TaskNotification &tn) +{ + tn.registerTask(); + + shared_ptr ctrl(new ProtoBufRpcController()); + shared_ptr< ::kvrpc::Get> get(new ::kvrpc::Get()); + + get->set_key(key); + + shared_ptr reply(new GetReply()); + + _router->Route(key)->GetValue(ctrl.get(), get.get(), reply.get(), + NewCallback(&cleanupGet, + rpc_get_state_tuple_t(ctrl, + get, + reply, + value), + &tn)); + + return true; +} + +} diff --git a/kvstore/kvclient.h b/kvstore/kvclient.h new file mode 100644 index 0000000..dfe86d2 --- /dev/null +++ b/kvstore/kvclient.h @@ -0,0 +1,40 @@ +#ifndef _KVCLIENT_H_ +#define _KVCLIENT_H_ 1 + +#include "kvstore.pb.h" +#include "workqueue.h" +#include "protobufrpc.h" + +using namespace bicker; + +namespace kvstore +{ + class KeyValueClientRouter; + + class KeyValueClient + { + public: + KeyValueClient(const string& host, + const string& port); + + KeyValueClient(const list &hosts); + + bool Put(const string& key, + const string& value); + + bool Put(const string& key, + const string& value, + TaskNotification &tn); + + bool Get(const string& key, string* value); + + bool Get(const string& key, + string* value, + TaskNotification &tn); + + private: + shared_ptr _router; + }; +} // namespace kvstore + +#endif diff --git a/kvstore/kvservice.cc b/kvstore/kvservice.cc new file mode 100644 index 0000000..e241bc9 --- /dev/null +++ b/kvstore/kvservice.cc @@ -0,0 +1,55 @@ +#include "kvservice.h" +#include + +using namespace std; + +namespace kvstore +{ + +KeyValueRpcService::KeyValueRpcService(Backend *backend) + :_backend(backend) +{ +} + +KeyValueRpcService::~KeyValueRpcService() +{ +} + +void KeyValueRpcService::PutValue( + ::google::protobuf::RpcController* /*controller*/, + const ::kvrpc::Put* request, + ::kvrpc::PutReply* response, + ::google::protobuf::Closure* done) +{ + if (_backend->Put(request->key(), request->value())) + { + response->set_result(kvrpc::SUCCESS); + } + else + { + response->set_result(kvrpc::FAILURE); + } + + done->Run(); +} + +void KeyValueRpcService::GetValue( + ::google::protobuf::RpcController* /*controller*/, + const ::kvrpc::Get* request, + ::kvrpc::GetReply* response, + ::google::protobuf::Closure* done) +{ + string value; + if (_backend->Get(request->key(), &value)) + { + response->set_result(kvrpc::SUCCESS); + response->set_value(value); + } + else + { + response->set_result(kvrpc::FAILURE); + } + done->Run(); +} + +}; // namespace kvstore diff --git a/kvstore/kvservice.h b/kvstore/kvservice.h new file mode 100644 index 0000000..68008ed --- /dev/null +++ b/kvstore/kvservice.h @@ -0,0 +1,34 @@ +#ifndef _KVSERVICE_H_ +#define _KVSERVICE_H_ 1 + +#include "kvstore.pb.h" +#include "backend.h" + +#include + +using std::auto_ptr; + +namespace kvstore +{ + class KeyValueRpcService : public ::kvrpc::KeyValueService + { + public: + KeyValueRpcService(Backend *backend); + + virtual ~KeyValueRpcService(); + + virtual void PutValue(::google::protobuf::RpcController* controller, + const ::kvrpc::Put* request, + ::kvrpc::PutReply* response, + ::google::protobuf::Closure* done); + virtual void GetValue(::google::protobuf::RpcController* controller, + const ::kvrpc::Get* request, + ::kvrpc::GetReply* response, + ::google::protobuf::Closure* done); + private: + auto_ptr _backend; + + }; +} // namespace kvstore + +#endif diff --git a/kvstore/kvstore.cc b/kvstore/kvstore.cc new file mode 100644 index 0000000..8d8fd12 --- /dev/null +++ b/kvstore/kvstore.cc @@ -0,0 +1,137 @@ +#include +#include +#include +#include +#include +#include +#include "kvservice.h" +#include "protobufrpc.h" + +using namespace bicker; +using namespace boost; +using namespace kvstore; +using namespace std; + +namespace po = boost::program_options; + +int interrupted_count = 0; +ProtoBufRpcServer rpc_server; + +void +interrupted(int /*signal*/) +{ + if (interrupted_count < 1) + { + rpc_server.shutdown(); + } + else + { + exit(1); + } + ++interrupted_count; +} + +int +main( + int argc, + char **argv + ) +{ + shared_ptr server_thread; + + /* Initialize Exception Handling */ + struct sigaction action; + + action.sa_handler = &interrupted; + action.sa_flags = SA_SIGINFO; + sigemptyset(&action.sa_mask); + + if (sigaction(SIGINT, &action, NULL) == -1) + { + perror("sigaction"); + exit(1); + } + + /* Parse Options */ + string opt_bdb_home; + uint16_t opt_port; + uint16_t opt_sleep; + + po::options_description options("Options"); + + options.add_options() + ("help,h", "display help message") + ("bdb-home,d", + po::value< vector >(), + "bdb home directories") + ("port", + po::value(&opt_port)->default_value(9090), + "listen port") + ("foreground,f", "don't fork to the background") + ("sleep,s", po::value(&opt_sleep)->default_value(0), "sleep then exit") + ; + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, options), vm); + po::notify(vm); + + if (vm.count("help")) + { + cerr << options << endl; + return 1; + } + + + vector paths; + + if (vm.count("bdb-home") > 0) + { + paths = vm["bdb-home"].as< vector >(); + } + else + { + paths.push_back("./"); + } + + + shared_ptr service(new KeyValueRpcService(new BDBBackend(paths))); + + rpc_server.registerService(opt_port, service); + + server_thread.reset( + new thread( + boost::bind( + &ProtoBufRpcServer::run, + &rpc_server))); + + if (!vm.count("foreground")) + { + // Daemonize + if (daemon(0,0) != 0) + { + perror("daemonizing"); + exit(1); + } + } + + if (opt_sleep == 0) + { + server_thread->join(); + } + else + { + time_t t0 = time(NULL); + time_t t1; + + do + { + t1 = time(NULL); + sleep(opt_sleep - (t1 - t0)); + } while (t1 - t0 < opt_sleep); + } + rpc_server.shutdown(); + + return 0; +} + + diff --git a/kvstore/kvstore.proto b/kvstore/kvstore.proto new file mode 100644 index 0000000..1479cfd --- /dev/null +++ b/kvstore/kvstore.proto @@ -0,0 +1,35 @@ +package kvrpc; + +enum Result +{ + SUCCESS = 0; + FAILURE = 1; +} + +message Put +{ + required string key = 1; + required string value = 2; +} + +message PutReply +{ + required Result result = 1; +} + +message Get +{ + required string key = 1; +} + +message GetReply +{ + required Result result = 1; + optional string value = 2; +} + +service KeyValueService +{ + rpc PutValue(Put) returns(PutReply); + rpc GetValue(Get) returns(GetReply); +} diff --git a/kvstore/kvstore.spec b/kvstore/kvstore.spec new file mode 100644 index 0000000..18bae94 --- /dev/null +++ b/kvstore/kvstore.spec @@ -0,0 +1,33 @@ +Name: kvstore +Version: 0.1 +Release: 1 +Summary: A Simple kv store application +License: BSD +Group:System Environment/Base +Source0: http://salud.ucsd.edu/~jcm/scaling/kvstore-0.1.tar.gz +BuildRoot: %{_tmppath}/%{name}%{version}%{release}root%(%{__id_u} jcm) +BuildArch: x86_64 +Requires: boost protobuf db4 +BuildRequires: boost-devel gtest-devel protobuf-devel db4-devel + +%description + +Key Value Store with simple backend and google protocol buffer rpc. + +%prep +%setup -q + +%build +scons %{?_smp_mflags} + +%install +rm -rf %{buildroot} +mkdir -p %{buildroot}%{_bindir} +scons --prefix=%{buildroot} install + +%clean +scons -c + +%files +%defattr(-,root,root,-) +%{_bindir}/kvstore diff --git a/kvstore/kvtest.cc b/kvstore/kvtest.cc new file mode 100644 index 0000000..5ac60ea --- /dev/null +++ b/kvstore/kvtest.cc @@ -0,0 +1,104 @@ +#include +#include +#include "workqueue.h" +#include "kvservice.h" +#include "kvclient.h" +#include "protobufrpc.h" + +using namespace boost; +using namespace kvstore; +using namespace bicker; +using namespace kvrpc; + +/* http://code.google.com/p/googletest/wiki/GoogleTestPrimer */ + +class KVTest : public ::testing::Test +{ +public: + KVTest() + :_kv_client("127.0.0.1", "9090") + { + vector data_dirs; + data_dirs.push_back("/tmp"); + + shared_ptr service(new KeyValueRpcService(new BDBBackend(data_dirs))); + _rpc_server.registerService(9090, service); + + _server_thread.reset( + new thread( + boost::bind( + &ProtoBufRpcServer::run, + &_rpc_server))); + + } + + virtual ~KVTest() + { + _rpc_server.shutdown(); + _server_thread->join(); + } +protected: + + ProtoBufRpcServer _rpc_server; + KeyValueClient _kv_client; + shared_ptr _server_thread; +}; + + +TEST_F(KVTest, PutTest) +{ + ASSERT_TRUE(_kv_client.Put("test", "value")); + string value; + ASSERT_TRUE(_kv_client.Get("test", &value)); + ASSERT_EQ(value, "value"); +} + +TEST_F(KVTest, MedPutTest) +{ + string test_value(1024*10, '6'); + ASSERT_TRUE(_kv_client.Put("test", test_value)); + string value; + ASSERT_TRUE(_kv_client.Get("test", &value)); + ASSERT_TRUE(value == test_value); +} + +TEST_F(KVTest, BigPutTest) +{ + string test_value(1024*1024*10, '6'); + ASSERT_TRUE(_kv_client.Put("test", test_value)); + string value; + ASSERT_TRUE(_kv_client.Get("test", &value)); + ASSERT_EQ(value.size(), test_value.size()); + ASSERT_TRUE(value == test_value); +} + +TEST_F(KVTest, LoopPutTest) +{ + string test_value(1024*1024, '6'); + + for (int i = 0; i < 10; ++i) + { + ostringstream key; + key << "test" << i; + ASSERT_TRUE(_kv_client.Put(key.str(), test_value)); + string value; + ASSERT_TRUE(_kv_client.Get(key.str(), &value)); + ASSERT_TRUE(value == test_value); + } +} + +TEST_F(KVTest, EmptyTest) +{ + string value; + ASSERT_FALSE(_kv_client.Get("test", &value)); +} + +int +main( + int argc, + char **argv + ) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/kvstore/protobufrpc/SConscript b/kvstore/protobufrpc/SConscript new file mode 100644 index 0000000..b972f84 --- /dev/null +++ b/kvstore/protobufrpc/SConscript @@ -0,0 +1,7 @@ +Import('env') + +base_files = ['protobufrpc.cc', 'socket_pool.cc', 'workqueue.cc'] + +protobufrpc = env.StaticLibrary('protobufrpc', base_files) + +Return('protobufrpc') diff --git a/kvstore/protobufrpc/SConstruct b/kvstore/protobufrpc/SConstruct new file mode 100644 index 0000000..79fbb68 --- /dev/null +++ b/kvstore/protobufrpc/SConstruct @@ -0,0 +1,29 @@ +import os + +AddOption('--prefix', + dest='prefix', + type='string', + nargs=1, + action='store', + metavar='DIR', + help='installation prefix') + +env = Environment(CXXFLAGS='-g -fPIC', PREFIX=GetOption('prefix')) + +for envvar in ('HOME', 'DISTCC_DIR', 'DISTCC_HOSTS', 'CCACHE_DIR', + 'INTERCEPTOR_SOCKET', 'ENFORGE_DIGEST_CACHE', + 'ENFORGE_CACHE_HOST', 'ENFORGE_CACHE_PORT'): + if envvar in os.environ: + env['ENV'][envvar] = os.environ[envvar] + +if env['PREFIX'] is not None: + bin_dest = env['PREFIX'] + '/usr/bin' +else: + bin_dest = '/usr/bin' + +base_files = ['protobufrpc.cc', 'socket_pool.cc'] + +if not env.has_key('LIBS'): + env['LIBS'] = [] + +SConscript(['SConscript']) diff --git a/kvstore/protobufrpc/protobufrpc.cc b/kvstore/protobufrpc/protobufrpc.cc new file mode 100644 index 0000000..06226f7 --- /dev/null +++ b/kvstore/protobufrpc/protobufrpc.cc @@ -0,0 +1,628 @@ +#include "protobufrpc.h" + +#include +#include +#include +#include +#include + +#include + +using namespace std; + +using namespace boost; +using asio::buffer; + +namespace bicker +{ + +template +static void* void_write(void* data, T val) +{ + *((T*)data) = val; + return (char*)data + sizeof(T); +} + +class ProtoBufRpcServiceRequest +{ +public: + ProtoBufRpcServiceRequest( + RpcController *ctrl, + const MethodDescriptor* method, + Message *request, + Message *response, + shared_ptr conn + ) + :_ctrl(ctrl), + _method(method), + _request(request), + _response(response), + _conn(conn) + { + } + + ~ProtoBufRpcServiceRequest() + { + + } + + static void run(ProtoBufRpcServiceRequest *req) + { + + req->_conn->writeResponse(req->_response.get()); + + delete req; + } + + shared_ptr _ctrl; + const MethodDescriptor *_method; + shared_ptr _request; + shared_ptr _response; + shared_ptr _conn; +}; + +ProtoBufRpcConnection::ProtoBufRpcConnection(asio::io_service& io_service, + Service *service) +:_socket(io_service), + _strand(io_service), + _service(service), + _state(STATE_NONE) +{ +} + +tcp::socket& ProtoBufRpcConnection::socket() +{ + return _socket; +} + +void ProtoBufRpcConnection::start() +{ + _socket.async_read_some(_buffer.prepare(4096), + _strand.wrap( + boost::bind(&ProtoBufRpcConnection::handle_read, shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred))); +} + +void ProtoBufRpcConnection::writeResponse(Message *msg) +{ + int rlen = msg->ByteSize(); + int len = htonl(rlen); + int mlen = sizeof(len) + rlen; + + void * data = asio::buffer_cast(_buffer.prepare(mlen)); + + data = void_write(data, len); + + using google::protobuf::io::ArrayOutputStream; + + ArrayOutputStream as(data, rlen); + + msg->SerializeToZeroCopyStream(&as); + + _buffer.commit(mlen); + + asio::async_write(_socket, + _buffer.data(), + _strand.wrap( + boost::bind(&ProtoBufRpcConnection::handle_write, + shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred))); +} + + +void ProtoBufRpcConnection::handle_read(const error_code& e, + std::size_t bytes_transferred) +{ + if (!e) + { + _buffer.commit(bytes_transferred); + + if (_state == STATE_NONE) + { + if (_buffer.size() >= sizeof(_id) + sizeof(_len)) + { + string b( + buffers_begin(_buffer.data()), + buffers_begin(_buffer.data()) + + sizeof(_id) + sizeof(_len) + ); + + _buffer.consume(sizeof(_id) + sizeof(_len)); + + _id = *((int*)b.c_str()); + _id = ntohl(_id); + + _len = *((unsigned int*)(b.c_str() + sizeof(_id))); + _len = ntohl(_len); + + _state = STATE_HAVE_ID_AND_LEN; + } + else + { + start(); + } + } + + if (_state == STATE_HAVE_ID_AND_LEN || _state == STATE_WAITING_FOR_DATA) + { + if (_buffer.size() >= _len) + { + const MethodDescriptor* method = + _service->GetDescriptor()->method(_id); + + Message *req = _service->GetRequestPrototype(method).New(); + Message *resp = _service->GetResponsePrototype(method).New(); + + using google::protobuf::io::ArrayInputStream; + using google::protobuf::io::CodedInputStream; + + const void* data = asio::buffer_cast( + _buffer.data() + ); + ArrayInputStream as(data, _len); + CodedInputStream is(&as); + is.SetTotalBytesLimit(512 * 1024 * 1024, -1); + + if (!req->ParseFromCodedStream(&is)) + { + throw std::runtime_error("ParseFromCodedStream"); + } + + _buffer.consume(_len); + + ProtoBufRpcController *ctrl = new ProtoBufRpcController(); + _service->CallMethod(method, + ctrl, + req, + resp, + NewCallback( + &ProtoBufRpcServiceRequest::run, + new ProtoBufRpcServiceRequest( + ctrl, + method, + req, + resp, + shared_from_this()) + ) + ); + _state = STATE_NONE; + } + else + { + _state = STATE_WAITING_FOR_DATA; + start(); + } + } + + } + else + { + error_code ignored_ec; + _socket.shutdown(tcp::socket::shutdown_both, ignored_ec); + } +} + +void ProtoBufRpcConnection::handle_write(const error_code& e, + std::size_t bytes_transferred) +{ + if (e) + { + error_code ignored_ec; + _socket.shutdown(tcp::socket::shutdown_both, ignored_ec); + } + else + { + _buffer.consume(bytes_transferred); + + if (_buffer.size()) + { + asio::async_write(_socket, + _buffer.data(), + _strand.wrap( + boost::bind(&ProtoBufRpcConnection::handle_write, + shared_from_this(), + asio::placeholders::error, + asio::placeholders::bytes_transferred))); + return; + } + + _state = STATE_NONE; + start(); + } +} + +ProtoBufRpcServer::ProtoBufRpcServer() + :_io_service(new asio::io_service()) +{ +} + +bool ProtoBufRpcServer::registerService(uint16_t port, + shared_ptr service) +{ + // This is not thread safe + + // The RegisteredService Constructor fires up the appropriate + // async accepts for the service + _services.push_back(shared_ptr( + new RegisteredService( + _io_service, + port, + service))); + + return true; +} + +void run_wrapper(asio::io_service *io_service) +{ + struct itimerval itimer; + setitimer(ITIMER_PROF, &itimer, NULL); + + io_service->run(); +} + +void ProtoBufRpcServer::run() +{ + try + { + if (_services.size() == 0) + { + throw std::runtime_error("No services registered for ProtoBufRpcServer"); + } + + size_t nprocs = sysconf(_SC_NPROCESSORS_ONLN); + + vector > threads; + for (size_t i = 0; i < nprocs; ++i) + { + shared_ptr t(new thread( + boost::bind( + //&run_wrapper, + &asio::io_service::run, + _io_service.get()))); + threads.push_back(t); + } + + for (size_t i = 0; i < threads.size(); ++i) + { + threads[i]->join(); + } + } + catch (std::exception &e) + { + std::cerr << "ProtoBufRpcService" << e.what() << std::endl; + } +} + +void ProtoBufRpcServer::shutdown() +{ + _io_service->stop(); +} + +ProtoBufRpcServer::RegisteredService::RegisteredService( + shared_ptr io_service, + uint16_t port, + shared_ptr service + ) +:_io_service(io_service), + _port(port), + _service(service), + _endpoint(tcp::v4(), _port), + _acceptor(*_io_service), + _new_connection(new ProtoBufRpcConnection(*_io_service, _service.get())) +{ + _acceptor.open(_endpoint.protocol()); + _acceptor.set_option(tcp::acceptor::reuse_address(true)); + _acceptor.bind(_endpoint); + _acceptor.listen(); + _acceptor.async_accept(_new_connection->socket(), + boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept, + this, + asio::placeholders::error)); +} + +void ProtoBufRpcServer::RegisteredService::handle_accept(const error_code& e) +{ + if (!e) + { + _new_connection->start(); + _new_connection.reset(new ProtoBufRpcConnection(*_io_service, _service.get())); + _acceptor.async_accept(_new_connection->socket(), + boost::bind(&ProtoBufRpcServer::RegisteredService::handle_accept, + this, + asio::placeholders::error)); + } + +} + +ProtoBufRpcController::ProtoBufRpcController() +{ +} + +ProtoBufRpcController::~ProtoBufRpcController() +{ +} + +void ProtoBufRpcController::Reset() +{ +} + +bool ProtoBufRpcController::Failed() const +{ + return false; +} + +string ProtoBufRpcController::ErrorText() const +{ + return "No Error"; +} + +void ProtoBufRpcController::StartCancel() +{ +} + +void ProtoBufRpcController::SetFailed(const string &/*reason*/) +{ +} + +bool ProtoBufRpcController::IsCanceled() const +{ + return false; +} + +void ProtoBufRpcController::NotifyOnCancel(Closure * /*callback*/) +{ +} + +class ProtoBufRpcChannel::MethodHandler + : public enable_shared_from_this, + private boost::noncopyable +{ +public: + MethodHandler(auto_ptr socket, + const MethodDescriptor * method, + RpcController * controller, + const Message * request, + Message * response, + Closure * done + ) + :_socket(socket), + _method(method), + _controller(controller), + _request(request), + _response(response), + _done(done) + { + } + + ~MethodHandler() + { + _socket.reset(); + _done->Run(); + } + + static void execute(shared_ptr this_ptr) + { + int index = htonl(this_ptr->_method->index()); + int rlen = this_ptr->_request->ByteSize(); + int len = htonl(rlen); + + int mlen = sizeof(index) + sizeof(len) + rlen; + + void * data = asio::buffer_cast(this_ptr->_buffer.prepare(mlen)); + + data = void_write(data, index); + data = void_write(data, len); + + using google::protobuf::io::ArrayOutputStream; + + ArrayOutputStream as(data, rlen); + + this_ptr->_request->SerializeToZeroCopyStream(&as); + this_ptr->_buffer.commit(mlen); + + (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(), + boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred)); + } + + static void handle_write(shared_ptr this_ptr, + const error_code& e, + std::size_t bytes_transferred) + { + if (!e) + { + this_ptr->_buffer.consume(bytes_transferred); + + if (this_ptr->_buffer.size()) + { + (*(this_ptr->_socket))->async_send(this_ptr->_buffer.data(), + boost::bind(&ProtoBufRpcChannel::MethodHandler::handle_write, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred)); + return; + } + + (*(this_ptr->_socket))->async_receive( + buffer(&this_ptr->_len, sizeof(this_ptr->_len)), + boost::bind( + &ProtoBufRpcChannel::MethodHandler::handle_read_len, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred) + ); + } + else + { + this_ptr->_controller->SetFailed(e.message()); + (*(this_ptr->_socket))->close(); + } + } + + static void handle_read_len(shared_ptr this_ptr, + const error_code& e, + std::size_t bytes_transferred) + { + if (!e && bytes_transferred == sizeof(this_ptr->_len)) + { + this_ptr->_len = ntohl(this_ptr->_len); + (*(this_ptr->_socket))->async_receive( + this_ptr->_buffer.prepare(this_ptr->_len), + boost::bind( + &ProtoBufRpcChannel::MethodHandler::handle_read_response, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred + ) + ); + } + else + { + this_ptr->_controller->SetFailed(e.message()); + (*(this_ptr->_socket))->close(); + } + } + + static void handle_read_response(shared_ptr this_ptr, + const error_code& e, + std::size_t bytes_transferred) + { + if (!e) + { + this_ptr->_buffer.commit(bytes_transferred); + if (this_ptr->_buffer.size() >= this_ptr->_len) + { + using google::protobuf::io::ArrayInputStream; + using google::protobuf::io::CodedInputStream; + + const void* data = asio::buffer_cast( + this_ptr->_buffer.data() + ); + ArrayInputStream as(data, this_ptr->_len); + CodedInputStream is(&as); + is.SetTotalBytesLimit(512 * 1024 * 1024, -1); + + if (!this_ptr->_response->ParseFromCodedStream(&is)) + { + throw std::runtime_error("ParseFromCodedStream"); + } + + this_ptr->_buffer.consume(this_ptr->_len); + } + else + { + (*(this_ptr->_socket))->async_receive( + this_ptr->_buffer.prepare(this_ptr->_len - this_ptr->_buffer.size()), + boost::bind( + &ProtoBufRpcChannel::MethodHandler::handle_read_response, + this_ptr, + asio::placeholders::error, + asio::placeholders::bytes_transferred + ) + ); + return; + } + } + else + { + this_ptr->_controller->SetFailed(e.message()); + (*(this_ptr->_socket))->close(); + } + } + +private: + auto_ptr _socket; + const MethodDescriptor * _method; + RpcController * _controller; + const Message * _request; + Message * _response; + Closure * _done; + asio::streambuf _buffer; + unsigned int _len; + bool _status; + unsigned int _sent; +}; + + +ProtoBufRpcChannel::ProtoBufRpcChannel(const string &remotehost, + const string &port) + :_remote_host(remotehost), _port(port), + _resolver(_io_service), + _acceptor(_io_service), + _pool(2000, _io_service), + _lame_socket(_io_service), + _thread() +// &asio::io_service::run, +// &_io_service))) +{ + + tcp::resolver::query query(_remote_host, _port); + tcp::resolver::iterator endpoint_iterator = _resolver.resolve(query); + tcp::resolver::iterator end; + + error_code error = asio::error::host_not_found; + + if (endpoint_iterator == end) throw syserr::system_error(error); + + _pool.setEndpoint(*endpoint_iterator); + + tcp::endpoint e(tcp::v4(), 0); + _acceptor.open(e.protocol()); + _acceptor.set_option(tcp::acceptor::reuse_address(true)); + _acceptor.bind(e); + _acceptor.listen(); + _acceptor.async_accept(_lame_socket, + boost::bind(&ProtoBufRpcChannel::lame_handle_accept, this, + asio::placeholders::error)); + + _thread = shared_ptr(new thread( + boost::bind( + &asio::io_service::run, + &_io_service))); +} + +void ProtoBufRpcChannel::lame_handle_accept(const error_code &err) +{ + if (!err) + { + _acceptor.async_accept(_lame_socket, + boost::bind(&ProtoBufRpcChannel::lame_handle_accept, + this, + asio::placeholders::error)); + } +} + +ProtoBufRpcChannel::~ProtoBufRpcChannel() +{ + _pool.cancelAndClear(); + + _io_service.stop(); + + _thread->join(); +} + +void ProtoBufRpcChannel::CallMethod( + const MethodDescriptor * method, + RpcController * controller, + const Message * request, + Message * response, + Closure * done) +{ + shared_ptr h( + new MethodHandler( + auto_ptr(new SocketCheckout(&_pool)), + method, + controller, + request, + response, + done + )); + + MethodHandler::execute(h); +} + +} // namespace bicker diff --git a/kvstore/protobufrpc/protobufrpc.h b/kvstore/protobufrpc/protobufrpc.h new file mode 100644 index 0000000..221903d --- /dev/null +++ b/kvstore/protobufrpc/protobufrpc.h @@ -0,0 +1,173 @@ +#ifndef __PROTOBUFRPC_H__ +#define __PROTOBUFRPC_H__ + +#include +#include +#include +#include +#include +#include +#include +#include "socket_pool.h" +#include "util.h" + +#include +#include +#include +#include +#include + +using namespace std; +using namespace google::protobuf; +using namespace boost; + +namespace bicker +{ + +class ProtoBufRpcService; + +class ProtoBufRpcConnection + : public enable_shared_from_this, + private boost::noncopyable +{ +public: + explicit ProtoBufRpcConnection(asio::io_service& io_service, + Service *_service); + + asio::ip::tcp::socket& socket(); + + void start(); + + void writeResponse(Message *msg); + + +private: + void handle_read(const error_code& e, + std::size_t bytes_transferred); + + void handle_write(const error_code& e, + std::size_t bytes_transferred); + + tcp::socket _socket; + + + asio::io_service::strand _strand; + + Service *_service; + + asio::streambuf _buffer; + + int _id; + unsigned int _len; + + enum { + STATE_NONE, + STATE_HAVE_ID_AND_LEN, + STATE_WAITING_FOR_DATA, + STATE_FAIL, + } _state; +}; + + +class ProtoBufRpcServer +{ +public: + ProtoBufRpcServer(); + + bool registerService(uint16_t port, + shared_ptr< Service> service); + + // So we can call this as a thread. + void run(); + + // So we can stop.. + void shutdown(); + +protected: + + class RegisteredService + { + public: + RegisteredService( + shared_ptr io_service, + uint16_t port, + shared_ptr service + ); + + void handle_accept(const error_code& e); + + protected: + // Ref to parent's + shared_ptr _io_service; + uint16_t _port; + shared_ptr _service; + tcp::endpoint _endpoint; + tcp::acceptor _acceptor; + shared_ptr _new_connection; + }; + + list > _services; + shared_ptr _io_service; +}; + +class ProtoBufRpcController : public RpcController +{ +public: + ProtoBufRpcController(); + virtual ~ProtoBufRpcController(); + + virtual void Reset(); + virtual bool Failed() const; + virtual string ErrorText() const; + virtual void StartCancel(); + + virtual void SetFailed(const string &reason); + virtual bool IsCanceled() const; + virtual void NotifyOnCancel(Closure *callback); +}; + +class ProtoBufRpcChannel + : public RpcChannel, + public enable_shared_from_this, + private boost::noncopyable +{ +public: + ProtoBufRpcChannel(const string &remotehost, const string &port); + + virtual ~ProtoBufRpcChannel(); + + virtual void CallMethod( + const MethodDescriptor * method, + RpcController * controller, + const Message * request, + Message * response, + Closure * done); + +protected: + shared_ptr getSocket(); + void putSocket(shared_ptr); + +private: + class MethodHandler; + + string _remote_host; + string _port; + + + asio::io_service _io_service; + tcp::resolver _resolver; + // This exists to keep the io service running + tcp::acceptor _acceptor; + + SocketPool _pool; + + asio::ip::tcp::socket _lame_socket; + void lame_handle_accept(const error_code &err); + + shared_ptr _thread; + +}; + +} // namespace bicker + +#endif diff --git a/kvstore/protobufrpc/socket_pool.cc b/kvstore/protobufrpc/socket_pool.cc new file mode 100644 index 0000000..454da55 --- /dev/null +++ b/kvstore/protobufrpc/socket_pool.cc @@ -0,0 +1,101 @@ +#include "socket_pool.h" + +SocketPool::SocketPool(int max_sockets, + asio::io_service &io_svc + ) +:_issued(0), + _max_sockets(max_sockets), + _io_service(io_svc) +{ +} + +void SocketPool::setEndpoint(const tcp::endpoint &endpoint) +{ + _endpoint = endpoint; +} + +void SocketPool::cancelAndClear() +{ + for (set >::iterator i = _set.begin(); + i != _set.end(); + ++i) + { + (*i)->cancel(); + } + + while (!_queue.empty()) _queue.pop(); + _set.clear(); +} + +shared_ptr SocketPool::getSocket() +{ + mutex::scoped_lock lock(_sockets_lock); + + while (_queue.size() == 0 && _issued >= _max_sockets) + _sockets_non_empty.wait(lock); + + if (_queue.size()) + { + shared_ptr socket = _queue.front(); + _queue.pop(); + + return socket; + } + else + { + ++_issued; + error_code error = asio::error::host_not_found; + + shared_ptr socket(new tcp::socket(_io_service)); + socket->connect(_endpoint, error); + + if (error) throw syserr::system_error(error); + + _set.insert(socket); + + return socket; + } +} + +void SocketPool::putSocket(shared_ptr socket) +{ + mutex::scoped_lock lock(_sockets_lock); + + if (!socket->is_open()) + { + cerr << "socket closed\n"; + --_issued; + _set.erase(socket); + } + else + { + _queue.push(socket); + } + + _sockets_non_empty.notify_one(); +} + +SocketCheckout::SocketCheckout(SocketPool *pool) + :_socket(pool->getSocket()), + _pool(pool) +{ +} + +SocketCheckout::~SocketCheckout() +{ + _pool->putSocket(_socket); +} +tcp::socket& SocketCheckout::operator*() +{ + return *_socket; +} + +tcp::socket* SocketCheckout::operator->() +{ + return _socket.get(); +} + +shared_ptr& SocketCheckout::socket() +{ + return _socket; +} diff --git a/kvstore/protobufrpc/socket_pool.h b/kvstore/protobufrpc/socket_pool.h new file mode 100644 index 0000000..57faf42 --- /dev/null +++ b/kvstore/protobufrpc/socket_pool.h @@ -0,0 +1,52 @@ +#ifndef _SOCKET_POOL_H_ +#define _SOCKET_POOL_H_ 1 + +#include +#include +#include +#include +#include + +#include "util.h" + +using namespace std; +using namespace boost; + + +class SocketPool +{ +public: + SocketPool(int max_streams, + asio::io_service &io_svc); + void setEndpoint(const tcp::endpoint &endpoint); + void cancelAndClear(); + shared_ptr getSocket(); + void putSocket(shared_ptr socket); +private: + int _issued; + int _max_sockets; + mutex _sockets_lock; + condition_variable _sockets_non_empty; + asio::io_service &_io_service; + tcp::endpoint _endpoint; + queue > _queue; + set > _set; +}; + +class SocketCheckout +{ +public: + SocketCheckout(SocketPool *pool); + ~SocketCheckout(); + + tcp::socket& operator*(); + tcp::socket* operator->(); + + shared_ptr& socket(); + +private: + shared_ptr _socket; + SocketPool *_pool; +}; + +#endif diff --git a/kvstore/protobufrpc/util.h b/kvstore/protobufrpc/util.h new file mode 100644 index 0000000..966921f --- /dev/null +++ b/kvstore/protobufrpc/util.h @@ -0,0 +1,37 @@ +#ifndef _UTIL_H_ +#define _UTIL_H_ 1 + +#include + +#if BOOST_VERSION <= 103500 +#include +#include +#include +#include +#include +//typedef boost::condition condition_variable ; +//typedef boost::detail::thread::scoped_lock scoped_lock; +using asio::ip::tcp; +using asio::error_code; +using asio::buffers_begin; +namespace syserr=asio; +#else +#if BOOST_VERSION < 104000 +#include +#include +#include +using boost::asio::ip::tcp; +using boost::system::error_code; +using boost::system::system_error; +namespace syserr=boost::system; +#else +#include +#include +using boost::asio::ip::tcp; +using boost::system::error_code; +using boost::system::system_error; +namespace syserr=boost::system; +#endif +#endif + +#endif diff --git a/kvstore/protobufrpc/workqueue.cc b/kvstore/protobufrpc/workqueue.cc new file mode 100644 index 0000000..27861aa --- /dev/null +++ b/kvstore/protobufrpc/workqueue.cc @@ -0,0 +1,125 @@ +#include "workqueue.h" +#include + +namespace bicker +{ + +WorkQueue::WorkQueue() + :_thread_count(0), _min_threads(1), _max_threads(50), _running(true) +{ + for (int i = 0; i < _min_threads; ++i) + { + spawnThread(); + } +} + +WorkQueue::WorkQueue(int thread_count) + :_thread_count(0), + _min_threads(1), _max_threads(thread_count), _running(true) +{ + for (int i = 0; i < _min_threads; ++i) + { + spawnThread(); + } +} + +WorkQueue::~WorkQueue() +{ + _running = false; + + { + mutex::scoped_lock lock(_queue_lock); + _queue_non_empty.notify_all(); + } + + _threads.join_all(); +} + +void WorkQueue::spawnThread() +{ + ++_thread_count; + _threads.create_thread(Worker(this)); +} + +shared_ptr WorkQueue::get() +{ + mutex::scoped_lock lock(_queue_lock); + + while (_queue.size() == 0) + { + _queue_non_empty.wait(lock); + if (!_running) throw interrupted_error(); + } + + shared_ptr back = _queue.front(); + _queue.pop(); + + if (_queue.size() > 0 && _thread_count < _max_threads) spawnThread(); + + return back; +} + +void WorkQueue::put(shared_ptr work_unit) +{ + mutex::scoped_lock lock(_queue_lock); + + _queue.push(work_unit); + + _queue_non_empty.notify_one(); +} + +WorkQueue::Worker::Worker(WorkQueue* queue) + :_queue(queue) +{ +} + +void WorkQueue::Worker::operator()() +{ + while (true) + { + try + { + shared_ptr unit = _queue->get(); + + unit->run(); + } + catch (interrupted_error) + { + return; + } + } +} + +TaskNotification::TaskNotification() +:_expected(0), _count(0), _fail_count(0) +{ +} + +void TaskNotification::registerTask() +{ + mutex::scoped_lock lock(_lock); + ++_expected; +} + +void TaskNotification::completeTask(bool success) +{ + mutex::scoped_lock lock(_lock); + if (!success) ++_fail_count; + if (++_count == _expected) _cond.notify_all(); +} + +void TaskNotification::waitForComplete() +{ + mutex::scoped_lock lock(_lock); + while (_count < _expected) + { + _cond.wait(lock); + } +} + +bool TaskNotification::failCount() +{ + return _fail_count; +} + +} // namespace bicker diff --git a/kvstore/protobufrpc/workqueue.h b/kvstore/protobufrpc/workqueue.h new file mode 100644 index 0000000..cfb2e02 --- /dev/null +++ b/kvstore/protobufrpc/workqueue.h @@ -0,0 +1,78 @@ +#ifndef __WORKQUEUE_H__ +#define __WORKQUEUE_H__ 1 + +#include +#include +#include +#include "util.h" + +using namespace boost; +using namespace std; + +namespace bicker +{ + struct interrupted_error : public virtual std::exception { }; + + class WorkUnit + { + public: + virtual ~WorkUnit() {}; + virtual void run() = 0; + }; + + class WorkQueue + { + public: + WorkQueue(); + WorkQueue(int thread_count); + + ~WorkQueue(); + + shared_ptr get(); + void put(shared_ptr work_unit); + + protected: + void spawnThread(); + + private: + class Worker + { + public: + Worker(WorkQueue* queue); + void operator()(); + private: + WorkQueue *_queue; + }; + + int _thread_count; + int _min_threads; + int _max_threads; + mutex _queue_lock; + condition_variable _queue_non_empty; + queue > _queue; + thread_group _threads; + volatile bool _running; + }; + + class TaskNotification + { + public: + TaskNotification(); + + void registerTask(); + void completeTask(bool success = true); + + void waitForComplete(); + + bool failCount(); + private: + int _expected; + int _count; + int _fail_count; + mutex _lock; + condition_variable _cond; + }; + +} // namespace bicker + +#endif -- 2.20.1