X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=kvstore%2Fprotobufrpc%2Fprotobufrpc.h;fp=kvstore%2Fprotobufrpc%2Fprotobufrpc.h;h=221903de9003875fed21b890e4ff3edc03d89080;hb=3c2cbef21a11c4d86952922f4da7b830a91423f9;hp=0000000000000000000000000000000000000000;hpb=db0d4c10ea7abfa2546f73e96784ebf554342977;p=bluesky.git diff --git a/kvstore/protobufrpc/protobufrpc.h b/kvstore/protobufrpc/protobufrpc.h new file mode 100644 index 0000000..221903d --- /dev/null +++ b/kvstore/protobufrpc/protobufrpc.h @@ -0,0 +1,173 @@ +#ifndef __PROTOBUFRPC_H__ +#define __PROTOBUFRPC_H__ + +#include +#include +#include +#include +#include +#include +#include +#include "socket_pool.h" +#include "util.h" + +#include +#include +#include +#include +#include + +using namespace std; +using namespace google::protobuf; +using namespace boost; + +namespace bicker +{ + +class ProtoBufRpcService; + +class ProtoBufRpcConnection + : public enable_shared_from_this, + private boost::noncopyable +{ +public: + explicit ProtoBufRpcConnection(asio::io_service& io_service, + Service *_service); + + asio::ip::tcp::socket& socket(); + + void start(); + + void writeResponse(Message *msg); + + +private: + void handle_read(const error_code& e, + std::size_t bytes_transferred); + + void handle_write(const error_code& e, + std::size_t bytes_transferred); + + tcp::socket _socket; + + + asio::io_service::strand _strand; + + Service *_service; + + asio::streambuf _buffer; + + int _id; + unsigned int _len; + + enum { + STATE_NONE, + STATE_HAVE_ID_AND_LEN, + STATE_WAITING_FOR_DATA, + STATE_FAIL, + } _state; +}; + + +class ProtoBufRpcServer +{ +public: + ProtoBufRpcServer(); + + bool registerService(uint16_t port, + shared_ptr< Service> service); + + // So we can call this as a thread. + void run(); + + // So we can stop.. + void shutdown(); + +protected: + + class RegisteredService + { + public: + RegisteredService( + shared_ptr io_service, + uint16_t port, + shared_ptr service + ); + + void handle_accept(const error_code& e); + + protected: + // Ref to parent's + shared_ptr _io_service; + uint16_t _port; + shared_ptr _service; + tcp::endpoint _endpoint; + tcp::acceptor _acceptor; + shared_ptr _new_connection; + }; + + list > _services; + shared_ptr _io_service; +}; + +class ProtoBufRpcController : public RpcController +{ +public: + ProtoBufRpcController(); + virtual ~ProtoBufRpcController(); + + virtual void Reset(); + virtual bool Failed() const; + virtual string ErrorText() const; + virtual void StartCancel(); + + virtual void SetFailed(const string &reason); + virtual bool IsCanceled() const; + virtual void NotifyOnCancel(Closure *callback); +}; + +class ProtoBufRpcChannel + : public RpcChannel, + public enable_shared_from_this, + private boost::noncopyable +{ +public: + ProtoBufRpcChannel(const string &remotehost, const string &port); + + virtual ~ProtoBufRpcChannel(); + + virtual void CallMethod( + const MethodDescriptor * method, + RpcController * controller, + const Message * request, + Message * response, + Closure * done); + +protected: + shared_ptr getSocket(); + void putSocket(shared_ptr); + +private: + class MethodHandler; + + string _remote_host; + string _port; + + + asio::io_service _io_service; + tcp::resolver _resolver; + // This exists to keep the io service running + tcp::acceptor _acceptor; + + SocketPool _pool; + + asio::ip::tcp::socket _lame_socket; + void lame_handle_accept(const error_code &err); + + shared_ptr _thread; + +}; + +} // namespace bicker + +#endif