X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=kvstore%2Fkvclient.cc;fp=kvstore%2Fkvclient.cc;h=954577032172f956c9ef74165da5cf63f12b6bce;hb=3c2cbef21a11c4d86952922f4da7b830a91423f9;hp=0000000000000000000000000000000000000000;hpb=db0d4c10ea7abfa2546f73e96784ebf554342977;p=bluesky.git diff --git a/kvstore/kvclient.cc b/kvstore/kvclient.cc new file mode 100644 index 0000000..9545770 --- /dev/null +++ b/kvstore/kvclient.cc @@ -0,0 +1,166 @@ +#include "kvclient.h" +#include +#include + +using namespace boost; +using namespace kvrpc; + +namespace kvstore +{ + +class KeyValueClientRouter +{ +public: + KeyValueClientRouter() + { + } + + void AddHost(const string &host, const string &port) + { + mutex::scoped_lock lock(_lock); + + _channels.push_back(shared_ptr(new ProtoBufRpcChannel(host, port))); + _clients.push_back(shared_ptr(new KeyValueService_Stub(static_cast(_channels.back().get())))); + } + + kvrpc::KeyValueService_Stub* Route(const string &key) + { + static hash hasher; + + uint32_t hash = (uint32_t)hasher(key); + int id = hash % _clients.size(); + + return _clients[id].get(); + } + +private: + mutex _lock; + vector< shared_ptr > _channels; + vector< shared_ptr > _clients; +}; + +KeyValueClient::KeyValueClient(const string& host, + const string& port) +:_router(new KeyValueClientRouter()) +{ + _router->AddHost(host, port); +} + +KeyValueClient::KeyValueClient(const list &hosts) +:_router(new KeyValueClientRouter()) +{ + for (list::const_iterator i = hosts.begin(); + i != hosts.end(); + ++i) + { + size_t pos = i->find(':'); + if (pos == string::npos) + throw runtime_error("couldn't parse host"); + + string host = i->substr(0, pos); + string port = i->substr(pos+1); + + _router->AddHost(host, port); + } +} + +typedef tuple, + shared_ptr, + shared_ptr > rpc_put_state_tuple_t; + +void cleanupPut(rpc_put_state_tuple_t t, TaskNotification *tn) +{ + tn->completeTask(t.get<2>()->result() == kvrpc::SUCCESS); +} + +bool +KeyValueClient::Put(const string& key, + const string& value) +{ + TaskNotification tn; + + this->Put(key, value, tn); + + tn.waitForComplete(); + + return tn.failCount() == 0; +} + +bool +KeyValueClient::Put(const string& key, + const string& value, + TaskNotification &tn) +{ + tn.registerTask(); + + shared_ptr ctrl(new ProtoBufRpcController()); + shared_ptr< ::kvrpc::Put> put(new ::kvrpc::Put()); + + put->set_key(key); + put->set_value(value); + + shared_ptr reply(new PutReply()); + + + _router->Route(key)->PutValue(ctrl.get(), put.get(), reply.get(), + NewCallback(&cleanupPut, + rpc_put_state_tuple_t(ctrl,put,reply), + &tn)); + + return true; +} + +typedef tuple, + shared_ptr, + shared_ptr, + string*> rpc_get_state_tuple_t; + +void cleanupGet(rpc_get_state_tuple_t t, TaskNotification *tn) +{ + bool result = t.get<2>()->result() == kvrpc::SUCCESS; + + if (result) + { + string* result_ptr = t.get<3>(); + *result_ptr = t.get<2>()->value(); + } + + tn->completeTask(result); +} + +bool +KeyValueClient::Get(const string& key, string* value) +{ + TaskNotification tn; + + this->Get(key, value, tn); + + tn.waitForComplete(); + + return tn.failCount() == 0; +} + +bool +KeyValueClient::Get(const string& key, string* value, TaskNotification &tn) +{ + tn.registerTask(); + + shared_ptr ctrl(new ProtoBufRpcController()); + shared_ptr< ::kvrpc::Get> get(new ::kvrpc::Get()); + + get->set_key(key); + + shared_ptr reply(new GetReply()); + + _router->Route(key)->GetValue(ctrl.get(), get.get(), reply.get(), + NewCallback(&cleanupGet, + rpc_get_state_tuple_t(ctrl, + get, + reply, + value), + &tn)); + + return true; +} + +}