1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the University nor the names of its contributors
15 * may be used to endorse or promote products derived from this software
16 * without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
38 #include "bluesky-private.h"
41 /* A storage layer that writes to Berkeley DB locally. */
46 GAsyncQueue *operations;
49 static gpointer bdbstore_thread(gpointer data)
51 BDBStore *store = (BDBStore *)data;
54 // Number of operations in the current transaction
55 int transaction_size = 0;
59 BlueSkyStoreAsync *async;
62 res = store->env->txn_begin(store->env, NULL, &txn, 0);
64 fprintf(stderr, "Unable to begin transaction!\n");
69 async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations);
70 async->status = ASYNC_RUNNING;
71 async->exec_time = bluesky_now_hires();
74 memset(&key, 0, sizeof(key));
76 key.data = async->key;
77 key.size = strlen(async->key);
80 memset(&value, 0, sizeof(value));
82 if (async->op == STORE_OP_GET) {
83 value.flags = DB_DBT_MALLOC;
85 res = store->db->get(store->db, txn, &key, &value, 0);
91 fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
93 async->data = bluesky_string_new(value.data, value.size);
97 } else if (async->op == STORE_OP_PUT) {
98 value.data = async->data->data;
99 value.size = async->data->len;
101 res = store->db->put(store->db, txn, &key, &value, 0);
104 fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
110 bluesky_store_async_mark_complete(async);
111 bluesky_store_async_unref(async);
114 if (transaction_size >= 64) {
117 transaction_size = 0;
124 static gpointer bdbstore_new(const gchar *path)
127 BDBStore *store = g_new0(BDBStore, 1);
129 res = db_env_create(&store->env, 0);
132 fprintf(stderr, "db_env_create failure: %s\n", db_strerror(res));
136 res = store->env->open(store->env, path,
137 DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG
138 | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD,
142 fprintf(stderr, "BDB open failure: %s\n",
147 res = db_create(&store->db, store->env, 0);
150 fprintf(stderr, "DB create failed: %s\n", db_strerror(res));
154 uint32_t flags = DB_CREATE | DB_THREAD | DB_AUTO_COMMIT;
156 res = store->db->open(store->db,
165 fprintf(stderr, "DB open failed: %s\n",
169 store->operations = g_async_queue_new();
170 if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) {
171 fprintf(stderr, "Creating BDB thread failed!\n");
178 static void bdbstore_destroy(gpointer s)
180 BDBStore *store = (BDBStore *)store;
183 store->db->close(store->db, 0);
187 store->env->close(store->env, 0);
193 static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async)
195 BDBStore *store = (BDBStore *)s;
196 g_return_if_fail(async->status == ASYNC_NEW);
197 g_return_if_fail(async->op != STORE_OP_NONE);
202 async->status = ASYNC_PENDING;
203 bluesky_store_async_ref(async);
204 g_async_queue_push(store->operations, async);
208 g_warning("Uknown operation type for BDBStore: %d\n", async->op);
209 bluesky_store_async_mark_complete(async);
214 static void bdbstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
218 static BlueSkyStoreImplementation store_impl = {
219 .create = bdbstore_new,
220 .destroy = bdbstore_destroy,
221 .submit = bdbstore_submit,
222 .cleanup = bdbstore_cleanup,
225 void bluesky_store_init_bdb(void)
227 bluesky_store_register(&store_impl, "bdb");