Make kvstore backend in BlueSky asynchronous.
[bluesky.git] / bluesky / store-kv.cc
index 7a6ac71..c1b0524 100644 (file)
@@ -21,24 +21,14 @@ using namespace boost;
 using namespace kvstore;
 using namespace std;
 
-static gpointer kvstore_new()
-{
-    KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090");
-    return client;
-}
-
-static void kvstore_destroy(gpointer store)
-{
-    KeyValueClient *client = (KeyValueClient *)store;
-    delete client;
-}
+static GThreadPool *thread_pool = NULL;
 
-static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
+static void kvstore_task(gpointer a, gpointer b)
 {
-    KeyValueClient *client = (KeyValueClient *)store;
+    BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
+    KeyValueClient *client = (KeyValueClient *)bluesky_store_async_get_handle(async);
 
-    g_return_if_fail(async->status == ASYNC_NEW);
-    g_return_if_fail(async->op != STORE_OP_NONE);
+    async->status = ASYNC_RUNNING;
 
     switch (async->op) {
     case STORE_OP_GET:
@@ -61,11 +51,51 @@ static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
     }
 
     default:
-        g_warning("Uknown operation type for MemStore: %d\n", async->op);
-        return;
+        break;
     }
 
     bluesky_store_async_mark_complete(async);
+    bluesky_store_async_unref(async);
+}
+
+static gpointer kvstore_new()
+{
+    static volatile gsize once = 0;
+    if (g_once_init_enter(&once)) {
+        thread_pool = g_thread_pool_new(kvstore_task, NULL, -1, FALSE, NULL);
+        g_once_init_leave(&once, 1);
+    }
+
+    KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090");
+    return client;
+}
+
+static void kvstore_destroy(gpointer store)
+{
+    KeyValueClient *client = (KeyValueClient *)store;
+    delete client;
+}
+
+static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
+{
+    KeyValueClient *client = (KeyValueClient *)store;
+
+    g_return_if_fail(async->status == ASYNC_NEW);
+    g_return_if_fail(async->op != STORE_OP_NONE);
+
+    switch (async->op) {
+    case STORE_OP_GET:
+    case STORE_OP_PUT:
+        async->status = ASYNC_PENDING;
+        bluesky_store_async_ref(async);
+        g_thread_pool_push(thread_pool, async, NULL);
+        break;
+
+    default:
+        g_warning("Uknown operation type for kvstore: %d\n", async->op);
+        bluesky_store_async_mark_complete(async);
+        break;
+    }
 }
 
 static void kvstore_cleanup(gpointer store, BlueSkyStoreAsync *async)