1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
16 #include "bluesky-private.h"
19 /* A storage layer that writes to Berkeley DB locally. */
24 GAsyncQueue *operations;
27 static gpointer bdbstore_thread(gpointer data)
29 BDBStore *store = (BDBStore *)data;
32 // Number of operations in the current transaction
33 int transaction_size = 0;
37 BlueSkyStoreAsync *async;
40 res = store->env->txn_begin(store->env, NULL, &txn, 0);
42 fprintf(stderr, "Unable to begin transaction!\n");
47 async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations);
48 async->status = ASYNC_RUNNING;
49 async->exec_time = bluesky_now_hires();
52 memset(&key, 0, sizeof(key));
54 key.data = async->key;
55 key.size = strlen(async->key);
58 memset(&value, 0, sizeof(value));
60 if (async->op == STORE_OP_GET) {
61 value.flags = DB_DBT_MALLOC;
63 res = store->db->get(store->db, txn, &key, &value, 0);
69 fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
71 async->data = bluesky_string_new(value.data, value.size);
75 } else if (async->op == STORE_OP_PUT) {
76 value.data = async->data->data;
77 value.size = async->data->len;
79 res = store->db->put(store->db, txn, &key, &value, 0);
82 fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
88 bluesky_store_async_mark_complete(async);
89 bluesky_store_async_unref(async);
92 if (transaction_size >= 64) {
102 static gpointer bdbstore_new(const gchar *path)
105 BDBStore *store = g_new0(BDBStore, 1);
107 res = db_env_create(&store->env, 0);
110 fprintf(stderr, "db_env_create failure: %s\n", db_strerror(res));
114 res = store->env->open(store->env, path,
115 DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG
116 | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD,
120 fprintf(stderr, "BDB open failure: %s\n",
125 res = db_create(&store->db, store->env, 0);
128 fprintf(stderr, "DB create failed: %s\n", db_strerror(res));
132 uint32_t flags = DB_CREATE | DB_THREAD | DB_AUTO_COMMIT;
134 res = store->db->open(store->db,
143 fprintf(stderr, "DB open failed: %s\n",
147 store->operations = g_async_queue_new();
148 if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) {
149 fprintf(stderr, "Creating BDB thread failed!\n");
156 static void bdbstore_destroy(gpointer s)
158 BDBStore *store = (BDBStore *)store;
161 store->db->close(store->db, 0);
165 store->env->close(store->env, 0);
171 static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async)
173 BDBStore *store = (BDBStore *)s;
174 g_return_if_fail(async->status == ASYNC_NEW);
175 g_return_if_fail(async->op != STORE_OP_NONE);
180 async->status = ASYNC_PENDING;
181 bluesky_store_async_ref(async);
182 g_async_queue_push(store->operations, async);
186 g_warning("Uknown operation type for BDBStore: %d\n", async->op);
187 bluesky_store_async_mark_complete(async);
192 static void bdbstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
196 static BlueSkyStoreImplementation store_impl = {
197 .create = bdbstore_new,
198 .destroy = bdbstore_destroy,
199 .submit = bdbstore_submit,
200 .cleanup = bdbstore_cleanup,
203 void bluesky_store_init_bdb(void)
205 bluesky_store_register(&store_impl, "bdb");