cd33face07268fa85e89cd3ecd2a5d6faf650403
[bluesky.git] / bluesky / store-bdb.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdint.h>
10 #include <stdlib.h>
11 #include <glib.h>
12 #include <string.h>
13 #include <db.h>
14 #include <errno.h>
15
16 #include "bluesky-private.h"
17 #include "libs3.h"
18
19 /* A storage layer that writes to Berkeley DB locally. */
20
21 typedef struct {
22     DB_ENV *env;
23     DB *db;
24     GAsyncQueue *operations;
25 } BDBStore;
26
27 static gpointer bdbstore_thread(gpointer data)
28 {
29     BDBStore *store = (BDBStore *)data;
30     DB_TXN *txn = NULL;
31
32     // Number of operations in the current transaction
33     int transaction_size = 0;
34
35     while (TRUE) {
36         int res;
37         BlueSkyStoreAsync *async;
38
39         if (txn == NULL) {
40             res = store->env->txn_begin(store->env, NULL, &txn, 0);
41             if (res != 0) {
42                 fprintf(stderr, "Unable to begin transaction!\n");
43                 return NULL;
44             }
45         }
46
47         async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations);
48         async->status = ASYNC_RUNNING;
49         async->exec_time = bluesky_now_hires();
50
51         DBT key;
52         memset(&key, 0, sizeof(key));
53
54         key.data = async->key;
55         key.size = strlen(async->key);
56
57         DBT value;
58         memset(&value, 0, sizeof(value));
59
60         if (async->op == STORE_OP_GET) {
61             value.flags = DB_DBT_MALLOC;
62
63             res = store->db->get(store->db, txn, &key, &value, 0);
64
65             async->result = res;
66             async->data = NULL;
67
68             if (res != 0) {
69                 fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
70             } else {
71                 async->data = bluesky_string_new(value.data, value.size);
72                 async->result = 0;
73             }
74
75         } else if (async->op == STORE_OP_PUT) {
76             value.data = async->data->data;
77             value.size = async->data->len;
78
79             res = store->db->put(store->db, txn, &key, &value, 0);
80
81             if (res != 0) {
82                 fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
83             }
84
85             async->result = 0;
86         }
87
88         bluesky_store_async_mark_complete(async);
89         bluesky_store_async_unref(async);
90         transaction_size++;
91
92         if (transaction_size >= 64) {
93             txn->commit(txn, 0);
94             txn = NULL;
95             transaction_size = 0;
96         }
97     }
98
99     return NULL;
100 }
101
102 static gpointer bdbstore_new(const gchar *path)
103 {
104     int res;
105     BDBStore *store = g_new0(BDBStore, 1);
106
107     res = db_env_create(&store->env, 0);
108
109     if (res != 0) {
110         fprintf(stderr, "db_env_create failure: %s\n", db_strerror(res));
111         return NULL;
112     }
113
114     res = store->env->open(store->env, path,
115                            DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG
116                             | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD,
117                            0644);
118
119     if (res != 0) {
120         fprintf(stderr, "BDB open failure: %s\n",
121                 db_strerror(res));
122         return NULL;
123     }
124
125     res = db_create(&store->db, store->env, 0);
126
127     if (res != 0) {
128         fprintf(stderr, "DB create failed: %s\n", db_strerror(res));
129         return NULL;
130     }
131
132     uint32_t flags = DB_CREATE | DB_THREAD | DB_AUTO_COMMIT;
133
134     res = store->db->open(store->db,
135                           NULL, /* TXN */
136                           "store.db",
137                           "store",
138                           DB_BTREE,
139                           flags,
140                           0644);
141
142     if (res != 0) {
143         fprintf(stderr, "DB open failed: %s\n",
144                 db_strerror(res));
145     }
146
147     store->operations = g_async_queue_new();
148     if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) {
149         fprintf(stderr, "Creating BDB thread failed!\n");
150         return NULL;
151     }
152
153     return store;
154 }
155
156 static void bdbstore_destroy(gpointer s)
157 {
158     BDBStore *store = (BDBStore *)store;
159
160     if (store->db) {
161         store->db->close(store->db, 0);
162     }
163
164     if (store->env) {
165         store->env->close(store->env, 0);
166     }
167
168     g_free(store);
169 }
170
171 static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async)
172 {
173     BDBStore *store = (BDBStore *)s;
174     g_return_if_fail(async->status == ASYNC_NEW);
175     g_return_if_fail(async->op != STORE_OP_NONE);
176
177     switch (async->op) {
178     case STORE_OP_GET:
179     case STORE_OP_PUT:
180         async->status = ASYNC_PENDING;
181         bluesky_store_async_ref(async);
182         g_async_queue_push(store->operations, async);
183         break;
184
185     default:
186         g_warning("Uknown operation type for BDBStore: %d\n", async->op);
187         bluesky_store_async_mark_complete(async);
188         break;
189     }
190 }
191
192 static void bdbstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
193 {
194 }
195
196 static BlueSkyStoreImplementation store_impl = {
197     .create = bdbstore_new,
198     .destroy = bdbstore_destroy,
199     .submit = bdbstore_submit,
200     .cleanup = bdbstore_cleanup,
201 };
202
203 void bluesky_store_init_bdb(void)
204 {
205     bluesky_store_register(&store_impl, "bdb");
206 }
207