#include "bluesky-private.h"
+#define MAX_IDLE_CONNECTIONS 8
+
typedef struct {
GThreadPool *thread_pool;
struct sockaddr_in server_addr;
+
+ /* A pool of open file connections to the server which are not currently in
+ * use. */
+ GQueue *fd_pool;
+ GMutex *fd_pool_lock;
} SimpleStore;
+static int get_connection(SimpleStore *store)
+{
+ int fd = -1;
+
+ g_mutex_lock(store->fd_pool_lock);
+ if (!g_queue_is_empty(store->fd_pool)) {
+ fd = GPOINTER_TO_INT(g_queue_pop_head(store->fd_pool));
+ }
+ g_mutex_unlock(store->fd_pool_lock);
+ if (fd != -1)
+ return fd;
+
+ fd = socket(PF_INET, SOCK_STREAM, 0);
+ if (fd < 0) {
+ g_warning("Error creating simplestore socket: %m");
+ return -1;
+ }
+
+ if (connect(fd, (struct sockaddr *)&store->server_addr,
+ sizeof(store->server_addr)) < 0) {
+ g_warning("Error connecting to simplestore server: %m");
+ return -1;
+ }
+
+ return fd;
+}
+
+static void put_connection(SimpleStore *store, int fd)
+{
+ g_mutex_lock(store->fd_pool_lock);
+ g_queue_push_head(store->fd_pool, GINT_TO_POINTER(fd));
+ while (g_queue_get_length(store->fd_pool) > MAX_IDLE_CONNECTIONS) {
+ fd = GPOINTER_TO_INT(g_queue_pop_tail(store->fd_pool));
+ close(fd);
+ }
+ g_mutex_unlock(store->fd_pool_lock);
+}
+
static gboolean write_data(int fd, const char *buf, size_t len)
{
while (len > 0) {
async->status = ASYNC_RUNNING;
- int fd = socket(PF_INET, SOCK_STREAM, 0);
+ int fd = get_connection(server);
if (fd < 0) {
- g_warning("Error creating simplestore socket: %m");
- bluesky_store_async_mark_complete(async);
- bluesky_store_async_unref(async);
- return;
- }
-
- if (connect(fd, (struct sockaddr *)&server->server_addr,
- sizeof(server->server_addr)) < 0) {
- g_warning("Error connecting to simplestore server: %m");
bluesky_store_async_mark_complete(async);
bluesky_store_async_unref(async);
return;
break;
}
+ int success = (async->result == 0);
bluesky_store_async_mark_complete(async);
bluesky_store_async_unref(async);
- close(fd);
+ if (success) {
+ put_connection(server, fd);
+ } else {
+ close(fd);
+ }
}
static char *simplestore_lookup_last(gpointer s, const char *prefix)
}
}
+ store->fd_pool = g_queue_new();
+ store->fd_pool_lock = g_mutex_new();
+
store->thread_pool = g_thread_pool_new(simplestore_task, NULL,
bluesky_max_threads, FALSE, NULL);