From ee0530e785f7035890b3b54cb3e0cf6800e6a4dd Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Mon, 12 Jul 2010 12:29:15 -0700 Subject: [PATCH] Switch to an explicit BDB operations queue instead of thread pool. Create a dedicated thread for handling BDB operations, and pass gets/puts to it via a queue. This is in preparation for batching operations together in transactions to see if we can improve performance that way. --- bluesky/store-bdb.c | 82 +++++++++++++++++++++++++-------------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/bluesky/store-bdb.c b/bluesky/store-bdb.c index 8565314..57422b9 100644 --- a/bluesky/store-bdb.c +++ b/bluesky/store-bdb.c @@ -19,67 +19,71 @@ /* A storage layer that writes to Berkeley DB locally. */ typedef struct { - GThreadPool *thread_pool; DB_ENV *env; DB *db; + GAsyncQueue *operations; } BDBStore; -static void bdbstore_task(gpointer a, gpointer s) +static gpointer bdbstore_thread(gpointer data) { - int res; - BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; - BDBStore *store = (BDBStore *)s; + BDBStore *store = (BDBStore *)data; - async->status = ASYNC_RUNNING; - async->exec_time = bluesky_now_hires(); + while (TRUE) { + int res; + BlueSkyStoreAsync *async; - DBT key; - memset(&key, 0, sizeof(key)); + async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations); + async->status = ASYNC_RUNNING; + async->exec_time = bluesky_now_hires(); - key.data = async->key; - key.size = strlen(async->key); + DBT key; + memset(&key, 0, sizeof(key)); - DBT value; - memset(&value, 0, sizeof(value)); + key.data = async->key; + key.size = strlen(async->key); - if (async->op == STORE_OP_GET) { - value.flags = DB_DBT_MALLOC; + DBT value; + memset(&value, 0, sizeof(value)); - res = store->db->get(store->db, NULL, &key, &value, 0); + if (async->op == STORE_OP_GET) { + value.flags = DB_DBT_MALLOC; - async->result = res; - async->data = NULL; + res = store->db->get(store->db, NULL, &key, &value, 0); - if (res != 0) { - fprintf(stderr, "BDB read failure: %s\n", db_strerror(res)); - } else { - async->data = bluesky_string_new(value.data, value.size); - async->result = 0; - } + async->result = res; + async->data = NULL; + + if (res != 0) { + fprintf(stderr, "BDB read failure: %s\n", db_strerror(res)); + } else { + async->data = bluesky_string_new(value.data, value.size); + async->result = 0; + } + + } else if (async->op == STORE_OP_PUT) { + value.data = async->data->data; + value.size = async->data->len; - } else if (async->op == STORE_OP_PUT) { - value.data = async->data->data; - value.size = async->data->len; + res = store->db->put(store->db, NULL, &key, &value, 0); - res = store->db->put(store->db, NULL, &key, &value, 0); + if (res != 0) { + fprintf(stderr, "BDB write failure: %s\n", db_strerror(res)); + } - if (res != 0) { - fprintf(stderr, "BDB write failure: %s\n", db_strerror(res)); + async->result = 0; } - async->result = 0; + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); } - bluesky_store_async_mark_complete(async); - bluesky_store_async_unref(async); + return NULL; } static gpointer bdbstore_new(const gchar *path) { int res; BDBStore *store = g_new0(BDBStore, 1); - store->thread_pool = g_thread_pool_new(bdbstore_task, store, 16, FALSE, - NULL); res = db_env_create(&store->env, 0); @@ -121,6 +125,12 @@ static gpointer bdbstore_new(const gchar *path) db_strerror(res)); } + store->operations = g_async_queue_new(); + if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) { + fprintf(stderr, "Creating BDB thread failed!\n"); + return NULL; + } + return store; } @@ -150,7 +160,7 @@ static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async) case STORE_OP_PUT: async->status = ASYNC_PENDING; bluesky_store_async_ref(async); - g_thread_pool_push(store->thread_pool, async, NULL); + g_async_queue_push(store->operations, async); break; default: -- 2.20.1