X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fstore-bdb.c;h=cd33face07268fa85e89cd3ecd2a5d6faf650403;hb=388030970805a70cb4fad34ade5e3de7a3607a57;hp=8565314376886ea947d0dae21f56fcf5e9743ba2;hpb=d645629831d01da077d418dad3a58953ee4b91e1;p=bluesky.git diff --git a/bluesky/store-bdb.c b/bluesky/store-bdb.c index 8565314..cd33fac 100644 --- a/bluesky/store-bdb.c +++ b/bluesky/store-bdb.c @@ -19,67 +19,90 @@ /* A storage layer that writes to Berkeley DB locally. */ typedef struct { - GThreadPool *thread_pool; DB_ENV *env; DB *db; + GAsyncQueue *operations; } BDBStore; -static void bdbstore_task(gpointer a, gpointer s) +static gpointer bdbstore_thread(gpointer data) { - int res; - BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; - BDBStore *store = (BDBStore *)s; + BDBStore *store = (BDBStore *)data; + DB_TXN *txn = NULL; + + // Number of operations in the current transaction + int transaction_size = 0; + + while (TRUE) { + int res; + BlueSkyStoreAsync *async; + + if (txn == NULL) { + res = store->env->txn_begin(store->env, NULL, &txn, 0); + if (res != 0) { + fprintf(stderr, "Unable to begin transaction!\n"); + return NULL; + } + } - async->status = ASYNC_RUNNING; - async->exec_time = bluesky_now_hires(); + async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations); + async->status = ASYNC_RUNNING; + async->exec_time = bluesky_now_hires(); - DBT key; - memset(&key, 0, sizeof(key)); + DBT key; + memset(&key, 0, sizeof(key)); - key.data = async->key; - key.size = strlen(async->key); + key.data = async->key; + key.size = strlen(async->key); - DBT value; - memset(&value, 0, sizeof(value)); + DBT value; + memset(&value, 0, sizeof(value)); - if (async->op == STORE_OP_GET) { - value.flags = DB_DBT_MALLOC; + if (async->op == STORE_OP_GET) { + value.flags = DB_DBT_MALLOC; - res = store->db->get(store->db, NULL, &key, &value, 0); + res = store->db->get(store->db, txn, &key, &value, 0); - async->result = res; - async->data = NULL; + async->result = res; + async->data = NULL; - if (res != 0) { - fprintf(stderr, "BDB read failure: %s\n", db_strerror(res)); - } else { - async->data = bluesky_string_new(value.data, value.size); - async->result = 0; - } + if (res != 0) { + fprintf(stderr, "BDB read failure: %s\n", db_strerror(res)); + } else { + async->data = bluesky_string_new(value.data, value.size); + async->result = 0; + } + + } else if (async->op == STORE_OP_PUT) { + value.data = async->data->data; + value.size = async->data->len; - } else if (async->op == STORE_OP_PUT) { - value.data = async->data->data; - value.size = async->data->len; + res = store->db->put(store->db, txn, &key, &value, 0); - res = store->db->put(store->db, NULL, &key, &value, 0); + if (res != 0) { + fprintf(stderr, "BDB write failure: %s\n", db_strerror(res)); + } - if (res != 0) { - fprintf(stderr, "BDB write failure: %s\n", db_strerror(res)); + async->result = 0; } - async->result = 0; + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); + transaction_size++; + + if (transaction_size >= 64) { + txn->commit(txn, 0); + txn = NULL; + transaction_size = 0; + } } - bluesky_store_async_mark_complete(async); - bluesky_store_async_unref(async); + return NULL; } static gpointer bdbstore_new(const gchar *path) { int res; BDBStore *store = g_new0(BDBStore, 1); - store->thread_pool = g_thread_pool_new(bdbstore_task, store, 16, FALSE, - NULL); res = db_env_create(&store->env, 0); @@ -121,6 +144,12 @@ static gpointer bdbstore_new(const gchar *path) db_strerror(res)); } + store->operations = g_async_queue_new(); + if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) { + fprintf(stderr, "Creating BDB thread failed!\n"); + return NULL; + } + return store; } @@ -150,7 +179,7 @@ static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async) case STORE_OP_PUT: async->status = ASYNC_PENDING; bluesky_store_async_ref(async); - g_thread_pool_push(store->thread_pool, async, NULL); + g_async_queue_push(store->operations, async); break; default: