X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fstore-bdb.c;h=3fabb2de3685c3079c0060a207cf2ec283b87481;hb=8ff0fd08d6e1cc97cdb7e94b7cd97dc28c29e674;hp=8565314376886ea947d0dae21f56fcf5e9743ba2;hpb=d645629831d01da077d418dad3a58953ee4b91e1;p=bluesky.git diff --git a/bluesky/store-bdb.c b/bluesky/store-bdb.c index 8565314..3fabb2d 100644 --- a/bluesky/store-bdb.c +++ b/bluesky/store-bdb.c @@ -3,7 +3,29 @@ * Copyright (C) 2009 The Regents of the University of California * Written by Michael Vrable * - * TODO: Licensing + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. */ #include @@ -19,67 +41,90 @@ /* A storage layer that writes to Berkeley DB locally. */ typedef struct { - GThreadPool *thread_pool; DB_ENV *env; DB *db; + GAsyncQueue *operations; } BDBStore; -static void bdbstore_task(gpointer a, gpointer s) +static gpointer bdbstore_thread(gpointer data) { - int res; - BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; - BDBStore *store = (BDBStore *)s; + BDBStore *store = (BDBStore *)data; + DB_TXN *txn = NULL; + + // Number of operations in the current transaction + int transaction_size = 0; + + while (TRUE) { + int res; + BlueSkyStoreAsync *async; + + if (txn == NULL) { + res = store->env->txn_begin(store->env, NULL, &txn, 0); + if (res != 0) { + fprintf(stderr, "Unable to begin transaction!\n"); + return NULL; + } + } - async->status = ASYNC_RUNNING; - async->exec_time = bluesky_now_hires(); + async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations); + async->status = ASYNC_RUNNING; + async->exec_time = bluesky_now_hires(); - DBT key; - memset(&key, 0, sizeof(key)); + DBT key; + memset(&key, 0, sizeof(key)); - key.data = async->key; - key.size = strlen(async->key); + key.data = async->key; + key.size = strlen(async->key); - DBT value; - memset(&value, 0, sizeof(value)); + DBT value; + memset(&value, 0, sizeof(value)); - if (async->op == STORE_OP_GET) { - value.flags = DB_DBT_MALLOC; + if (async->op == STORE_OP_GET) { + value.flags = DB_DBT_MALLOC; - res = store->db->get(store->db, NULL, &key, &value, 0); + res = store->db->get(store->db, txn, &key, &value, 0); - async->result = res; - async->data = NULL; + async->result = res; + async->data = NULL; - if (res != 0) { - fprintf(stderr, "BDB read failure: %s\n", db_strerror(res)); - } else { - async->data = bluesky_string_new(value.data, value.size); - async->result = 0; - } + if (res != 0) { + fprintf(stderr, "BDB read failure: %s\n", db_strerror(res)); + } else { + async->data = bluesky_string_new(value.data, value.size); + async->result = 0; + } + + } else if (async->op == STORE_OP_PUT) { + value.data = async->data->data; + value.size = async->data->len; - } else if (async->op == STORE_OP_PUT) { - value.data = async->data->data; - value.size = async->data->len; + res = store->db->put(store->db, txn, &key, &value, 0); - res = store->db->put(store->db, NULL, &key, &value, 0); + if (res != 0) { + fprintf(stderr, "BDB write failure: %s\n", db_strerror(res)); + } - if (res != 0) { - fprintf(stderr, "BDB write failure: %s\n", db_strerror(res)); + async->result = 0; } - async->result = 0; + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); + transaction_size++; + + if (transaction_size >= 64) { + txn->commit(txn, 0); + txn = NULL; + transaction_size = 0; + } } - bluesky_store_async_mark_complete(async); - bluesky_store_async_unref(async); + return NULL; } static gpointer bdbstore_new(const gchar *path) { int res; BDBStore *store = g_new0(BDBStore, 1); - store->thread_pool = g_thread_pool_new(bdbstore_task, store, 16, FALSE, - NULL); res = db_env_create(&store->env, 0); @@ -121,6 +166,12 @@ static gpointer bdbstore_new(const gchar *path) db_strerror(res)); } + store->operations = g_async_queue_new(); + if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) { + fprintf(stderr, "Creating BDB thread failed!\n"); + return NULL; + } + return store; } @@ -150,7 +201,7 @@ static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async) case STORE_OP_PUT: async->status = ASYNC_PENDING; bluesky_store_async_ref(async); - g_thread_pool_push(store->thread_pool, async, NULL); + g_async_queue_push(store->operations, async); break; default: