Try to improve support for no-aggregation mode
[bluesky.git] / kvstore / protobufrpc.h
1 #ifndef __PROTOBUFRPC_H__
2 #define __PROTOBUFRPC_H__
3
4 #include <stdint.h>
5 #include <boost/shared_ptr.hpp>
6 #include <boost/enable_shared_from_this.hpp>
7 #include <boost/thread.hpp>
8 #include <boost/function.hpp>
9 #include <queue>
10 #include <set>
11 #include "socket_pool.h"
12 #include "util.h"
13
14 #include <google/protobuf/stubs/common.h>
15 #include <google/protobuf/generated_message_reflection.h>
16 #include <google/protobuf/repeated_field.h>
17 #include <google/protobuf/extension_set.h>
18 #include <google/protobuf/service.h>
19
20 using namespace std;
21 using namespace google::protobuf;
22 using namespace boost;
23
24 namespace bicker
25 {
26
27 class ProtoBufRpcService;
28
29 class ProtoBufRpcConnection 
30     : public enable_shared_from_this<ProtoBufRpcConnection>,
31       private boost::noncopyable
32 {
33 public:
34     explicit ProtoBufRpcConnection(asio::io_service& io_service,
35                                  Service *_service);
36
37     asio::ip::tcp::socket& socket();
38
39     void start();
40
41     void writeResponse(Message *msg);
42
43
44 private:
45     void handle_read(const error_code& e, 
46                      std::size_t bytes_transferred);
47
48     void handle_write(const error_code& e,
49                      std::size_t bytes_transferred);
50
51     tcp::socket _socket;
52
53
54     asio::io_service::strand _strand;
55
56     Service *_service;
57
58     asio::streambuf _buffer;
59
60     int _id;
61     unsigned int _len;
62
63     enum {
64         STATE_NONE,
65         STATE_HAVE_ID_AND_LEN,
66         STATE_WAITING_FOR_DATA,
67         STATE_FAIL,
68     } _state;
69 };
70
71
72 class ProtoBufRpcServer
73 {
74 public:
75     ProtoBufRpcServer();
76
77     bool registerService(uint16_t port,
78                          shared_ptr< Service> service);
79
80     // So we can call this as a thread.
81     void run();
82
83     // So we can stop..
84     void shutdown();
85
86 protected:
87
88     class RegisteredService
89     {
90     public:
91         RegisteredService(
92                           shared_ptr<asio::io_service> io_service,
93                           uint16_t port,
94                           shared_ptr<Service> service
95                          );
96
97         void handle_accept(const error_code& e);
98
99     protected:
100         // Ref to parent's
101         shared_ptr<asio::io_service> _io_service;
102         uint16_t _port;
103         shared_ptr<Service> _service;
104         tcp::endpoint _endpoint;
105         tcp::acceptor _acceptor;
106         shared_ptr<ProtoBufRpcConnection> _new_connection;
107     };
108
109     list<shared_ptr<RegisteredService> > _services;
110     shared_ptr<asio::io_service> _io_service;
111 };
112
113 class ProtoBufRpcController : public RpcController
114 {
115 public:
116     ProtoBufRpcController();
117     virtual ~ProtoBufRpcController();
118
119     virtual void Reset();
120     virtual bool Failed() const;
121     virtual string ErrorText() const;
122     virtual void StartCancel();
123
124     virtual void SetFailed(const string &reason);
125     virtual bool IsCanceled() const;
126     virtual void NotifyOnCancel(Closure *callback);
127 };
128
129 class ProtoBufRpcChannel
130     : public RpcChannel,
131       public enable_shared_from_this<ProtoBufRpcChannel>,
132       private boost::noncopyable
133 {
134 public:
135     ProtoBufRpcChannel(const string &remotehost, const string &port);
136
137     virtual ~ProtoBufRpcChannel();
138
139     virtual void CallMethod(
140         const MethodDescriptor * method,
141         RpcController * controller,
142         const Message * request,
143         Message * response,
144         Closure * done);
145
146 protected:
147     shared_ptr<tcp::socket> getSocket();
148     void putSocket(shared_ptr<tcp::socket>);
149
150 private:
151     class MethodHandler;
152
153     string _remote_host;
154     string _port;
155
156
157     asio::io_service _io_service;
158     tcp::resolver _resolver;
159     // This exists to keep the io service running
160     tcp::acceptor _acceptor;
161
162     SocketPool _pool;
163
164     asio::ip::tcp::socket _lame_socket;
165     void lame_handle_accept(const error_code &err);
166
167     shared_ptr<boost::thread> _thread;
168
169 };
170
171 } // namespace bicker
172
173 #endif