Switch to an explicit BDB operations queue instead of thread pool.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 12 Jul 2010 19:29:15 +0000 (12:29 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 12 Jul 2010 19:29:15 +0000 (12:29 -0700)
Create a dedicated thread for handling BDB operations, and pass gets/puts
to it via a queue.  This is in preparation for batching operations
together in transactions to see if we can improve performance that way.

bluesky/store-bdb.c

index 8565314..57422b9 100644 (file)
 /* A storage layer that writes to Berkeley DB locally. */
 
 typedef struct {
-    GThreadPool *thread_pool;
     DB_ENV *env;
     DB *db;
+    GAsyncQueue *operations;
 } BDBStore;
 
-static void bdbstore_task(gpointer a, gpointer s)
+static gpointer bdbstore_thread(gpointer data)
 {
-    int res;
-    BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
-    BDBStore *store = (BDBStore *)s;
+    BDBStore *store = (BDBStore *)data;
 
-    async->status = ASYNC_RUNNING;
-    async->exec_time = bluesky_now_hires();
+    while (TRUE) {
+        int res;
+        BlueSkyStoreAsync *async;
 
-    DBT key;
-    memset(&key, 0, sizeof(key));
+        async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations);
+        async->status = ASYNC_RUNNING;
+        async->exec_time = bluesky_now_hires();
 
-    key.data = async->key;
-    key.size = strlen(async->key);
+        DBT key;
+        memset(&key, 0, sizeof(key));
 
-    DBT value;
-    memset(&value, 0, sizeof(value));
+        key.data = async->key;
+        key.size = strlen(async->key);
 
-    if (async->op == STORE_OP_GET) {
-        value.flags = DB_DBT_MALLOC;
+        DBT value;
+        memset(&value, 0, sizeof(value));
 
-        res = store->db->get(store->db, NULL, &key, &value, 0);
+        if (async->op == STORE_OP_GET) {
+            value.flags = DB_DBT_MALLOC;
 
-        async->result = res;
-        async->data = NULL;
+            res = store->db->get(store->db, NULL, &key, &value, 0);
 
-        if (res != 0) {
-            fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
-        } else {
-            async->data = bluesky_string_new(value.data, value.size);
-            async->result = 0;
-        }
+            async->result = res;
+            async->data = NULL;
+
+            if (res != 0) {
+                fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
+            } else {
+                async->data = bluesky_string_new(value.data, value.size);
+                async->result = 0;
+            }
+
+        } else if (async->op == STORE_OP_PUT) {
+            value.data = async->data->data;
+            value.size = async->data->len;
 
-    } else if (async->op == STORE_OP_PUT) {
-        value.data = async->data->data;
-        value.size = async->data->len;
+            res = store->db->put(store->db, NULL, &key, &value, 0);
 
-        res = store->db->put(store->db, NULL, &key, &value, 0);
+            if (res != 0) {
+                fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
+            }
 
-        if (res != 0) {
-            fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
+            async->result = 0;
         }
 
-        async->result = 0;
+        bluesky_store_async_mark_complete(async);
+        bluesky_store_async_unref(async);
     }
 
-    bluesky_store_async_mark_complete(async);
-    bluesky_store_async_unref(async);
+    return NULL;
 }
 
 static gpointer bdbstore_new(const gchar *path)
 {
     int res;
     BDBStore *store = g_new0(BDBStore, 1);
-    store->thread_pool = g_thread_pool_new(bdbstore_task, store, 16, FALSE,
-                                           NULL);
 
     res = db_env_create(&store->env, 0);
 
@@ -121,6 +125,12 @@ static gpointer bdbstore_new(const gchar *path)
                 db_strerror(res));
     }
 
+    store->operations = g_async_queue_new();
+    if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) {
+        fprintf(stderr, "Creating BDB thread failed!\n");
+        return NULL;
+    }
+
     return store;
 }
 
@@ -150,7 +160,7 @@ static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async)
     case STORE_OP_PUT:
         async->status = ASYNC_PENDING;
         bluesky_store_async_ref(async);
-        g_thread_pool_push(store->thread_pool, async, NULL);
+        g_async_queue_push(store->operations, async);
         break;
 
     default: