2 #include <boost/tuple/tuple.hpp>
3 #include <boost/functional/hash.hpp>
11 class KeyValueClientRouter
14 KeyValueClientRouter()
18 void AddHost(const string &host, const string &port)
20 mutex::scoped_lock lock(_lock);
22 _channels.push_back(shared_ptr<ProtoBufRpcChannel>(new ProtoBufRpcChannel(host, port)));
23 _clients.push_back(shared_ptr<kvrpc::KeyValueService_Stub>(new KeyValueService_Stub(static_cast<RpcChannel*>(_channels.back().get()))));
26 kvrpc::KeyValueService_Stub* Route(const string &key)
28 static hash<string> hasher;
30 uint32_t hash = (uint32_t)hasher(key);
31 int id = hash % _clients.size();
33 return _clients[id].get();
38 vector< shared_ptr<ProtoBufRpcChannel> > _channels;
39 vector< shared_ptr<kvrpc::KeyValueService_Stub> > _clients;
42 KeyValueClient::KeyValueClient(const string& host,
44 :_router(new KeyValueClientRouter())
46 _router->AddHost(host, port);
49 KeyValueClient::KeyValueClient(const list<string> &hosts)
50 :_router(new KeyValueClientRouter())
52 for (list<string>::const_iterator i = hosts.begin();
56 size_t pos = i->find(':');
57 if (pos == string::npos)
58 throw runtime_error("couldn't parse host");
60 string host = i->substr(0, pos);
61 string port = i->substr(pos+1);
63 _router->AddHost(host, port);
67 typedef tuple<shared_ptr<RpcController>,
69 shared_ptr<PutReply> > rpc_put_state_tuple_t;
71 void cleanupPut(rpc_put_state_tuple_t t, TaskNotification *tn)
73 tn->completeTask(t.get<2>()->result() == kvrpc::SUCCESS);
77 KeyValueClient::Put(const string& key,
82 this->Put(key, value, tn);
86 return tn.failCount() == 0;
90 KeyValueClient::Put(const string& key,
96 shared_ptr<RpcController> ctrl(new ProtoBufRpcController());
97 shared_ptr< ::kvrpc::Put> put(new ::kvrpc::Put());
100 put->set_value(value);
102 shared_ptr<PutReply> reply(new PutReply());
105 _router->Route(key)->PutValue(ctrl.get(), put.get(), reply.get(),
106 NewCallback(&cleanupPut,
107 rpc_put_state_tuple_t(ctrl,put,reply),
113 typedef tuple<shared_ptr<RpcController>,
115 shared_ptr<GetReply>,
116 string*> rpc_get_state_tuple_t;
118 void cleanupGet(rpc_get_state_tuple_t t, TaskNotification *tn)
120 bool result = t.get<2>()->result() == kvrpc::SUCCESS;
124 string* result_ptr = t.get<3>();
125 *result_ptr = t.get<2>()->value();
128 tn->completeTask(result);
132 KeyValueClient::Get(const string& key, string* value)
136 this->Get(key, value, tn);
138 tn.waitForComplete();
140 return tn.failCount() == 0;
144 KeyValueClient::Get(const string& key, string* value, TaskNotification &tn)
148 shared_ptr<RpcController> ctrl(new ProtoBufRpcController());
149 shared_ptr< ::kvrpc::Get> get(new ::kvrpc::Get());
153 shared_ptr<GetReply> reply(new GetReply());
155 _router->Route(key)->GetValue(ctrl.get(), get.get(), reply.get(),
156 NewCallback(&cleanupGet,
157 rpc_get_state_tuple_t(ctrl,