Switch to an explicit BDB operations queue instead of thread pool.
[bluesky.git] / bluesky / store-bdb.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdint.h>
10 #include <stdlib.h>
11 #include <glib.h>
12 #include <string.h>
13 #include <db.h>
14 #include <errno.h>
15
16 #include "bluesky-private.h"
17 #include "libs3.h"
18
19 /* A storage layer that writes to Berkeley DB locally. */
20
21 typedef struct {
22     DB_ENV *env;
23     DB *db;
24     GAsyncQueue *operations;
25 } BDBStore;
26
27 static gpointer bdbstore_thread(gpointer data)
28 {
29     BDBStore *store = (BDBStore *)data;
30
31     while (TRUE) {
32         int res;
33         BlueSkyStoreAsync *async;
34
35         async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations);
36         async->status = ASYNC_RUNNING;
37         async->exec_time = bluesky_now_hires();
38
39         DBT key;
40         memset(&key, 0, sizeof(key));
41
42         key.data = async->key;
43         key.size = strlen(async->key);
44
45         DBT value;
46         memset(&value, 0, sizeof(value));
47
48         if (async->op == STORE_OP_GET) {
49             value.flags = DB_DBT_MALLOC;
50
51             res = store->db->get(store->db, NULL, &key, &value, 0);
52
53             async->result = res;
54             async->data = NULL;
55
56             if (res != 0) {
57                 fprintf(stderr, "BDB read failure: %s\n", db_strerror(res));
58             } else {
59                 async->data = bluesky_string_new(value.data, value.size);
60                 async->result = 0;
61             }
62
63         } else if (async->op == STORE_OP_PUT) {
64             value.data = async->data->data;
65             value.size = async->data->len;
66
67             res = store->db->put(store->db, NULL, &key, &value, 0);
68
69             if (res != 0) {
70                 fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
71             }
72
73             async->result = 0;
74         }
75
76         bluesky_store_async_mark_complete(async);
77         bluesky_store_async_unref(async);
78     }
79
80     return NULL;
81 }
82
83 static gpointer bdbstore_new(const gchar *path)
84 {
85     int res;
86     BDBStore *store = g_new0(BDBStore, 1);
87
88     res = db_env_create(&store->env, 0);
89
90     if (res != 0) {
91         fprintf(stderr, "db_env_create failure: %s\n", db_strerror(res));
92         return NULL;
93     }
94
95     res = store->env->open(store->env, path,
96                            DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG
97                             | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD,
98                            0644);
99
100     if (res != 0) {
101         fprintf(stderr, "BDB open failure: %s\n",
102                 db_strerror(res));
103         return NULL;
104     }
105
106     res = db_create(&store->db, store->env, 0);
107
108     if (res != 0) {
109         fprintf(stderr, "DB create failed: %s\n", db_strerror(res));
110         return NULL;
111     }
112
113     uint32_t flags = DB_CREATE | DB_THREAD | DB_AUTO_COMMIT;
114
115     res = store->db->open(store->db,
116                           NULL, /* TXN */
117                           "store.db",
118                           "store",
119                           DB_BTREE,
120                           flags,
121                           0644);
122
123     if (res != 0) {
124         fprintf(stderr, "DB open failed: %s\n",
125                 db_strerror(res));
126     }
127
128     store->operations = g_async_queue_new();
129     if (g_thread_create(bdbstore_thread, store, FALSE, NULL) == NULL) {
130         fprintf(stderr, "Creating BDB thread failed!\n");
131         return NULL;
132     }
133
134     return store;
135 }
136
137 static void bdbstore_destroy(gpointer s)
138 {
139     BDBStore *store = (BDBStore *)store;
140
141     if (store->db) {
142         store->db->close(store->db, 0);
143     }
144
145     if (store->env) {
146         store->env->close(store->env, 0);
147     }
148
149     g_free(store);
150 }
151
152 static void bdbstore_submit(gpointer s, BlueSkyStoreAsync *async)
153 {
154     BDBStore *store = (BDBStore *)s;
155     g_return_if_fail(async->status == ASYNC_NEW);
156     g_return_if_fail(async->op != STORE_OP_NONE);
157
158     switch (async->op) {
159     case STORE_OP_GET:
160     case STORE_OP_PUT:
161         async->status = ASYNC_PENDING;
162         bluesky_store_async_ref(async);
163         g_async_queue_push(store->operations, async);
164         break;
165
166     default:
167         g_warning("Uknown operation type for BDBStore: %d\n", async->op);
168         bluesky_store_async_mark_complete(async);
169         break;
170     }
171 }
172
173 static void bdbstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
174 {
175 }
176
177 static BlueSkyStoreImplementation store_impl = {
178     .create = bdbstore_new,
179     .destroy = bdbstore_destroy,
180     .submit = bdbstore_submit,
181     .cleanup = bdbstore_cleanup,
182 };
183
184 void bluesky_store_init_bdb(void)
185 {
186     bluesky_store_register(&store_impl, "bdb");
187 }
188