Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / kvstore / kvclient.cc
1 #include "kvclient.h"
2 #include <boost/tuple/tuple.hpp>
3 #include <boost/functional/hash.hpp>
4
5 using namespace boost;
6 using namespace kvrpc;
7
8 namespace kvstore
9 {
10
11 class KeyValueClientRouter
12 {
13 public:
14     KeyValueClientRouter()
15     {
16     }
17
18     void AddHost(const string &host, const string &port)
19     {
20         mutex::scoped_lock lock(_lock);
21
22         _channels.push_back(shared_ptr<ProtoBufRpcChannel>(new ProtoBufRpcChannel(host, port)));
23         _clients.push_back(shared_ptr<kvrpc::KeyValueService_Stub>(new KeyValueService_Stub(static_cast<RpcChannel*>(_channels.back().get()))));
24     }
25
26     kvrpc::KeyValueService_Stub* Route(const string &key)
27     {
28         static hash<string> hasher;
29
30         uint32_t hash = (uint32_t)hasher(key);
31         int id = hash % _clients.size();
32
33         return _clients[id].get();
34     }
35
36 private:
37     mutex _lock;
38     vector< shared_ptr<ProtoBufRpcChannel> > _channels;
39     vector< shared_ptr<kvrpc::KeyValueService_Stub> > _clients;
40 };
41
42 KeyValueClient::KeyValueClient(const string& host,
43                                const string& port)
44 :_router(new KeyValueClientRouter())
45 {
46     _router->AddHost(host, port);
47 }
48
49 KeyValueClient::KeyValueClient(const list<string> &hosts)
50 :_router(new KeyValueClientRouter())
51 {
52     for (list<string>::const_iterator i = hosts.begin();
53          i != hosts.end();
54          ++i)
55     {
56         size_t pos = i->find(':');
57         if (pos == string::npos)
58             throw runtime_error("couldn't parse host");
59
60         string host = i->substr(0, pos);
61         string port = i->substr(pos+1);
62
63         _router->AddHost(host, port);
64     }
65 }
66
67 typedef tuple<shared_ptr<RpcController>,
68               shared_ptr<Put>,
69               shared_ptr<PutReply> > rpc_put_state_tuple_t;
70
71 void cleanupPut(rpc_put_state_tuple_t t, TaskNotification *tn)
72 {
73     tn->completeTask(t.get<2>()->result() == kvrpc::SUCCESS);
74 }
75
76 bool
77 KeyValueClient::Put(const string& key,
78          const string& value)
79 {
80     TaskNotification tn;
81
82     this->Put(key, value, tn);
83
84     tn.waitForComplete();
85
86     return tn.failCount() == 0;
87 }
88
89 bool
90 KeyValueClient::Put(const string& key,
91                     const string& value,
92                     TaskNotification &tn)
93 {
94     tn.registerTask();
95
96     shared_ptr<RpcController> ctrl(new ProtoBufRpcController());
97     shared_ptr< ::kvrpc::Put> put(new ::kvrpc::Put());
98
99     put->set_key(key);
100     put->set_value(value);
101
102     shared_ptr<PutReply> reply(new PutReply());
103
104
105     _router->Route(key)->PutValue(ctrl.get(), put.get(), reply.get(),
106                              NewCallback(&cleanupPut,
107                                          rpc_put_state_tuple_t(ctrl,put,reply),
108                                          &tn));
109
110     return true;
111 }
112
113 typedef tuple<shared_ptr<RpcController>,
114               shared_ptr<Get>,
115               shared_ptr<GetReply>,
116               string*> rpc_get_state_tuple_t;
117
118 void cleanupGet(rpc_get_state_tuple_t t, TaskNotification *tn)
119 {
120     bool result = t.get<2>()->result() == kvrpc::SUCCESS;
121
122     if (result)
123     {
124         string* result_ptr = t.get<3>();
125         *result_ptr = t.get<2>()->value();
126     }
127
128     tn->completeTask(result);
129 }
130
131 bool
132 KeyValueClient::Get(const string& key, string* value)
133 {
134     TaskNotification tn;
135
136     this->Get(key, value, tn);
137
138     tn.waitForComplete();
139
140     return tn.failCount() == 0;
141 }
142
143 bool
144 KeyValueClient::Get(const string& key, string* value, TaskNotification &tn)
145 {
146     tn.registerTask();
147
148     shared_ptr<RpcController> ctrl(new ProtoBufRpcController());
149     shared_ptr< ::kvrpc::Get> get(new ::kvrpc::Get());
150
151     get->set_key(key);
152
153     shared_ptr<GetReply> reply(new GetReply());
154
155     _router->Route(key)->GetValue(ctrl.get(), get.get(), reply.get(),
156                              NewCallback(&cleanupGet,
157                                          rpc_get_state_tuple_t(ctrl,
158                                                                get,
159                                                                reply,
160                                                                value),
161                                          &tn));
162
163     return true;
164 }
165
166 }