Add a debug option to dump statistics counters every ten seconds
[bluesky.git] / kvstore / socket_pool.cc
1 #include "socket_pool.h"
2
3 SocketPool::SocketPool(int max_sockets,
4                asio::io_service &io_svc
5               )
6 :_issued(0),
7  _max_sockets(max_sockets),
8  _io_service(io_svc)
9 {
10 }
11
12 void SocketPool::setEndpoint(const tcp::endpoint &endpoint)
13 {
14     _endpoint = endpoint;
15 }
16
17 void SocketPool::cancelAndClear()
18 {
19     for (set<shared_ptr<tcp::socket> >::iterator i = _set.begin();
20          i != _set.end();
21          ++i)
22     {
23         (*i)->cancel();
24     }
25
26     while (!_queue.empty()) _queue.pop();
27     _set.clear();
28 }
29
30 shared_ptr<tcp::socket> SocketPool::getSocket()
31 {
32     mutex::scoped_lock lock(_sockets_lock);
33
34     while (_queue.size() == 0 && _issued >= _max_sockets)
35         _sockets_non_empty.wait(lock);
36
37     if (_queue.size())
38     {
39             shared_ptr<tcp::socket> socket = _queue.front();
40             _queue.pop();
41
42             return socket;
43     }
44     else
45     {
46         ++_issued;
47         error_code error = asio::error::host_not_found;
48
49         shared_ptr<tcp::socket> socket(new tcp::socket(_io_service));
50         socket->connect(_endpoint, error);
51
52         if (error) throw syserr::system_error(error);
53
54         _set.insert(socket);
55
56         return socket;
57     }
58 }
59
60 void SocketPool::putSocket(shared_ptr<tcp::socket> socket)
61 {
62     mutex::scoped_lock lock(_sockets_lock);
63
64     if (!socket->is_open())
65     {
66         cerr << "socket closed\n";
67         --_issued;
68         _set.erase(socket);
69     }
70     else 
71     {
72         _queue.push(socket);
73     }
74
75     _sockets_non_empty.notify_one();
76 }
77
78 SocketCheckout::SocketCheckout(SocketPool *pool)
79     :_socket(pool->getSocket()),
80      _pool(pool)
81 {
82 }
83
84 SocketCheckout::~SocketCheckout()
85 {
86     _pool->putSocket(_socket);
87 }
88 tcp::socket& SocketCheckout::operator*()
89 {
90     return *_socket;
91 }
92
93 tcp::socket* SocketCheckout::operator->()
94 {
95     return _socket.get();
96 }
97
98 shared_ptr<tcp::socket>& SocketCheckout::socket()
99 {
100     return _socket;
101 }