/* A storage layer that writes to Berkeley DB locally. */
typedef struct {
- GThreadPool *thread_pool;
DB_ENV *env;
DB *db;
+ GAsyncQueue *operations;
} BDBStore;
-static void bdbstore_task(gpointer a, gpointer s)
+static gpointer bdbstore_thread(gpointer data)
{
- int res;
- BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
- BDBStore *store = (BDBStore *)s;
+ BDBStore *store = (BDBStore *)data;
- async->status = ASYNC_RUNNING;
- async->exec_time = bluesky_now_hires();
+ while (TRUE) {
+ int res;
+ BlueSkyStoreAsync *async;
- DBT key;
- memset(&key, 0, sizeof(key));
+ async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations);
+ async->status = ASYNC_RUNNING;
+ async->exec_time = bluesky_now_hires();
- key.data = async->key;
- key.size = strlen(async->key);
+ DBT key;
+ memset(&key, 0, sizeof(key));
- DBT value;
- memset(&value, 0, sizeof(value));
+ key.data = async->key;
+ key.size = strlen(async->key);
- if (async->op == STORE_OP_GET) {
- value.flags = DB_DBT_MALLOC;
+ DBT value;
+ memset(&value, 0, sizeof(value));
- res = store->db->get(store->db, NULL, &key, &value, 0);
+ if (async->op == STORE_OP_GET) {
+ value.flags = DB_DBT_MALLOC;
- async->result = res;
- async->data = NULL;
+ res = store->db->get(store->db, NULL, &key, &value, 0);
- if (res != 0) {
- fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
- } else {
- async->data = bluesky_string_new(value.data, value.size);
- async->result = 0;
- }
+ async->result = res;
+ async->data = NULL;
+
+ if (res != 0) {
+ fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
+ } else {
+ async->data = bluesky_string_new(value.data, value.size);
+ async->result = 0;
+ }
+
+ } else if (async->op == STORE_OP_PUT) {
+ value.data = async->data->data;
+ value.size = async->data->len;
- } else if (async->op == STORE_OP_PUT) {
- value.data = async->data->data;
- value.size = async->data->len;
+ res = store->db->put(store->db, NULL, &key, &value, 0);
- res = store->db->put(store->db, NULL, &key, &value, 0);
+ if (res != 0) {
+ fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
+ }
- if (res != 0) {
- fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
+ async->result = 0;
}
- async->result = 0;
+ bluesky_store_async_mark_complete(async);
+ bluesky_store_async_unref(async);
}
- bluesky_store_async_mark_complete(async);
- bluesky_store_async_unref(async);
+ return NULL;
}
static gpointer bdbstore_new(const gchar *path)
{
int res;
BDBStore *store = g_new0(BDBStore, 1);
- store->thread_pool = g_thread_pool_new(bdbstore_task, store, 16, FALSE,
- NULL);
res = db_env_create(&store->env, 0);
db_strerror(res));
}
+ store->operations = g_async_queue_new();
+ if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) {
+ fprintf(stderr, "Creating BDB thread failed!\n");
+ return NULL;
+ }
+
return store;
}
case STORE_OP_PUT:
async->status = ASYNC_PENDING;
bluesky_store_async_ref(async);
- g_thread_pool_push(store->thread_pool, async, NULL);
+ g_async_queue_push(store->operations, async);
break;
default: