Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / bluesky / store-bdb.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 #include <stdint.h>
32 #include <stdlib.h>
33 #include <glib.h>
34 #include <string.h>
35 #include <db.h>
36 #include <errno.h>
37
38 #include "bluesky-private.h"
39 #include "libs3.h"
40
41 /* A storage layer that writes to Berkeley DB locally. */
42
43 typedef struct {
44     DB_ENV *env;
45     DB *db;
46     GAsyncQueue *operations;
47 } BDBStore;
48
49 static gpointer bdbstore_thread(gpointer data)
50 {
51     BDBStore *store = (BDBStore *)data;
52     DB_TXN *txn = NULL;
53
54     // Number of operations in the current transaction
55     int transaction_size = 0;
56
57     while (TRUE) {
58         int res;
59         BlueSkyStoreAsync *async;
60
61         if (txn == NULL) {
62             res = store->env->txn_begin(store->env, NULL, &txn, 0);
63             if (res != 0) {
64                 fprintf(stderr, "Unable to begin transaction!\n");
65                 return NULL;
66             }
67         }
68
69         async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations);
70         async->status = ASYNC_RUNNING;
71         async->exec_time = bluesky_now_hires();
72
73         DBT key;
74         memset(&key, 0, sizeof(key));
75
76         key.data = async->key;
77         key.size = strlen(async->key);
78
79         DBT value;
80         memset(&value, 0, sizeof(value));
81
82         if (async->op == STORE_OP_GET) {
83             value.flags = DB_DBT_MALLOC;
84
85             res = store->db->get(store->db, txn, &key, &value, 0);
86
87             async->result = res;
88             async->data = NULL;
89
90             if (res != 0) {
91                 fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
92             } else {
93                 async->data = bluesky_string_new(value.data, value.size);
94                 async->result = 0;
95             }
96
97         } else if (async->op == STORE_OP_PUT) {
98             value.data = async->data->data;
99             value.size = async->data->len;
100
101             res = store->db->put(store->db, txn, &key, &value, 0);
102
103             if (res != 0) {
104                 fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
105             }
106
107             async->result = 0;
108         }
109
110         bluesky_store_async_mark_complete(async);
111         bluesky_store_async_unref(async);
112         transaction_size++;
113
114         if (transaction_size >= 64) {
115             txn->commit(txn, 0);
116             txn = NULL;
117             transaction_size = 0;
118         }
119     }
120
121     return NULL;
122 }
123
124 static gpointer bdbstore_new(const gchar *path)
125 {
126     int res;
127     BDBStore *store = g_new0(BDBStore, 1);
128
129     res = db_env_create(&store->env, 0);
130
131     if (res != 0) {
132         fprintf(stderr, "db_env_create failure: %s\n", db_strerror(res));
133         return NULL;
134     }
135
136     res = store->env->open(store->env, path,
137                            DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG
138                             | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD,
139                            0644);
140
141     if (res != 0) {
142         fprintf(stderr, "BDB open failure: %s\n",
143                 db_strerror(res));
144         return NULL;
145     }
146
147     res = db_create(&store->db, store->env, 0);
148
149     if (res != 0) {
150         fprintf(stderr, "DB create failed: %s\n", db_strerror(res));
151         return NULL;
152     }
153
154     uint32_t flags = DB_CREATE | DB_THREAD | DB_AUTO_COMMIT;
155
156     res = store->db->open(store->db,
157                           NULL, /* TXN */
158                           "store.db",
159                           "store",
160                           DB_BTREE,
161                           flags,
162                           0644);
163
164     if (res != 0) {
165         fprintf(stderr, "DB open failed: %s\n",
166                 db_strerror(res));
167     }
168
169     store->operations = g_async_queue_new();
170     if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) {
171         fprintf(stderr, "Creating BDB thread failed!\n");
172         return NULL;
173     }
174
175     return store;
176 }
177
178 static void bdbstore_destroy(gpointer s)
179 {
180     BDBStore *store = (BDBStore *)store;
181
182     if (store->db) {
183         store->db->close(store->db, 0);
184     }
185
186     if (store->env) {
187         store->env->close(store->env, 0);
188     }
189
190     g_free(store);
191 }
192
193 static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async)
194 {
195     BDBStore *store = (BDBStore *)s;
196     g_return_if_fail(async->status == ASYNC_NEW);
197     g_return_if_fail(async->op != STORE_OP_NONE);
198
199     switch (async->op) {
200     case STORE_OP_GET:
201     case STORE_OP_PUT:
202         async->status = ASYNC_PENDING;
203         bluesky_store_async_ref(async);
204         g_async_queue_push(store->operations, async);
205         break;
206
207     default:
208         g_warning("Uknown operation type for BDBStore: %d\n", async->op);
209         bluesky_store_async_mark_complete(async);
210         break;
211     }
212 }
213
214 static void bdbstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
215 {
216 }
217
218 static BlueSkyStoreImplementation store_impl = {
219     .create = bdbstore_new,
220     .destroy = bdbstore_destroy,
221     .submit = bdbstore_submit,
222     .cleanup = bdbstore_cleanup,
223 };
224
225 void bluesky_store_init_bdb(void)
226 {
227     bluesky_store_register(&store_impl, "bdb");
228 }
229