Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / bluesky / store-bdb.c
index 8565314..3fabb2d 100644 (file)
@@ -3,7 +3,29 @@
  * Copyright (C) 2009  The Regents of the University of California
  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
  *
- * TODO: Licensing
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ * 3. Neither the name of the University nor the names of its contributors
+ *    may be used to endorse or promote products derived from this software
+ *    without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
  */
 
 #include <stdint.h>
 /* A storage layer that writes to Berkeley DB locally. */
 
 typedef struct {
-    GThreadPool *thread_pool;
     DB_ENV *env;
     DB *db;
+    GAsyncQueue *operations;
 } BDBStore;
 
-static void bdbstore_task(gpointer a, gpointer s)
+static gpointer bdbstore_thread(gpointer data)
 {
-    int res;
-    BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
-    BDBStore *store = (BDBStore *)s;
+    BDBStore *store = (BDBStore *)data;
+    DB_TXN *txn = NULL;
+
+    // Number of operations in the current transaction
+    int transaction_size = 0;
+
+    while (TRUE) {
+        int res;
+        BlueSkyStoreAsync *async;
+
+        if (txn == NULL) {
+            res = store->env->txn_begin(store->env, NULL, &txn, 0);
+            if (res != 0) {
+                fprintf(stderr, "Unable to begin transaction!\n");
+                return NULL;
+            }
+        }
 
-    async->status = ASYNC_RUNNING;
-    async->exec_time = bluesky_now_hires();
+        async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations);
+        async->status = ASYNC_RUNNING;
+        async->exec_time = bluesky_now_hires();
 
-    DBT key;
-    memset(&key, 0, sizeof(key));
+        DBT key;
+        memset(&key, 0, sizeof(key));
 
-    key.data = async->key;
-    key.size = strlen(async->key);
+        key.data = async->key;
+        key.size = strlen(async->key);
 
-    DBT value;
-    memset(&value, 0, sizeof(value));
+        DBT value;
+        memset(&value, 0, sizeof(value));
 
-    if (async->op == STORE_OP_GET) {
-        value.flags = DB_DBT_MALLOC;
+        if (async->op == STORE_OP_GET) {
+            value.flags = DB_DBT_MALLOC;
 
-        res = store->db->get(store->db, NULL, &key, &value, 0);
+            res = store->db->get(store->db, txn, &key, &value, 0);
 
-        async->result = res;
-        async->data = NULL;
+            async->result = res;
+            async->data = NULL;
 
-        if (res != 0) {
-            fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
-        } else {
-            async->data = bluesky_string_new(value.data, value.size);
-            async->result = 0;
-        }
+            if (res != 0) {
+                fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
+            } else {
+                async->data = bluesky_string_new(value.data, value.size);
+                async->result = 0;
+            }
+
+        } else if (async->op == STORE_OP_PUT) {
+            value.data = async->data->data;
+            value.size = async->data->len;
 
-    } else if (async->op == STORE_OP_PUT) {
-        value.data = async->data->data;
-        value.size = async->data->len;
+            res = store->db->put(store->db, txn, &key, &value, 0);
 
-        res = store->db->put(store->db, NULL, &key, &value, 0);
+            if (res != 0) {
+                fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
+            }
 
-        if (res != 0) {
-            fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
+            async->result = 0;
         }
 
-        async->result = 0;
+        bluesky_store_async_mark_complete(async);
+        bluesky_store_async_unref(async);
+        transaction_size++;
+
+        if (transaction_size >= 64) {
+            txn->commit(txn, 0);
+            txn = NULL;
+            transaction_size = 0;
+        }
     }
 
-    bluesky_store_async_mark_complete(async);
-    bluesky_store_async_unref(async);
+    return NULL;
 }
 
 static gpointer bdbstore_new(const gchar *path)
 {
     int res;
     BDBStore *store = g_new0(BDBStore, 1);
-    store->thread_pool = g_thread_pool_new(bdbstore_task, store, 16, FALSE,
-                                           NULL);
 
     res = db_env_create(&store->env, 0);
 
@@ -121,6 +166,12 @@ static gpointer bdbstore_new(const gchar *path)
                 db_strerror(res));
     }
 
+    store->operations = g_async_queue_new();
+    if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) {
+        fprintf(stderr, "Creating BDB thread failed!\n");
+        return NULL;
+    }
+
     return store;
 }
 
@@ -150,7 +201,7 @@ static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async)
     case STORE_OP_PUT:
         async->status = ASYNC_PENDING;
         bluesky_store_async_ref(async);
-        g_thread_pool_push(store->thread_pool, async, NULL);
+        g_async_queue_push(store->operations, async);
         break;
 
     default: