X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=kvstore%2Fsocket_pool.cc;fp=kvstore%2Fsocket_pool.cc;h=454da55479f7c328e215188027a5cc238d06576a;hb=d9c7ab29f5651da244cac393f446a6c5e823fcd1;hp=0000000000000000000000000000000000000000;hpb=3c2cbef21a11c4d86952922f4da7b830a91423f9;p=bluesky.git diff --git a/kvstore/socket_pool.cc b/kvstore/socket_pool.cc new file mode 100644 index 0000000..454da55 --- /dev/null +++ b/kvstore/socket_pool.cc @@ -0,0 +1,101 @@ +#include "socket_pool.h" + +SocketPool::SocketPool(int max_sockets, + asio::io_service &io_svc + ) +:_issued(0), + _max_sockets(max_sockets), + _io_service(io_svc) +{ +} + +void SocketPool::setEndpoint(const tcp::endpoint &endpoint) +{ + _endpoint = endpoint; +} + +void SocketPool::cancelAndClear() +{ + for (set >::iterator i = _set.begin(); + i != _set.end(); + ++i) + { + (*i)->cancel(); + } + + while (!_queue.empty()) _queue.pop(); + _set.clear(); +} + +shared_ptr SocketPool::getSocket() +{ + mutex::scoped_lock lock(_sockets_lock); + + while (_queue.size() == 0 && _issued >= _max_sockets) + _sockets_non_empty.wait(lock); + + if (_queue.size()) + { + shared_ptr socket = _queue.front(); + _queue.pop(); + + return socket; + } + else + { + ++_issued; + error_code error = asio::error::host_not_found; + + shared_ptr socket(new tcp::socket(_io_service)); + socket->connect(_endpoint, error); + + if (error) throw syserr::system_error(error); + + _set.insert(socket); + + return socket; + } +} + +void SocketPool::putSocket(shared_ptr socket) +{ + mutex::scoped_lock lock(_sockets_lock); + + if (!socket->is_open()) + { + cerr << "socket closed\n"; + --_issued; + _set.erase(socket); + } + else + { + _queue.push(socket); + } + + _sockets_non_empty.notify_one(); +} + +SocketCheckout::SocketCheckout(SocketPool *pool) + :_socket(pool->getSocket()), + _pool(pool) +{ +} + +SocketCheckout::~SocketCheckout() +{ + _pool->putSocket(_socket); +} +tcp::socket& SocketCheckout::operator*() +{ + return *_socket; +} + +tcp::socket* SocketCheckout::operator->() +{ + return _socket.get(); +} + +shared_ptr& SocketCheckout::socket() +{ + return _socket; +}