More work on synchronous/asynchronous operations.
[bluesky.git] / bluesky / store.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdint.h>
10 #include <glib.h>
11 #include <string.h>
12
13 #include "bluesky-private.h"
14
15 /* Interaction with cloud storage.  We expose very simple GET/PUT style
16  * interface, which different backends can implement.  Available backends
17  * (will) include Amazon S3 and a simple local store for testing purposes.
18  * Operations may be performed asynchronously. */
19
20 struct _BlueSkyStore {
21     const BlueSkyStoreImplementation *impl;
22     gpointer handle;
23
24     GMutex *lock;
25     GCond *cond_idle;
26     int pending;                /* Count of operations not yet complete. */
27 };
28
29 GHashTable *store_implementations;
30
31 void bluesky_store_register(const BlueSkyStoreImplementation *impl,
32                             const gchar *name)
33 {
34     g_hash_table_insert(store_implementations, g_strdup(name), (gpointer)impl);
35 }
36
37 BlueSkyStore *bluesky_store_new(const gchar *type)
38 {
39     const BlueSkyStoreImplementation *impl;
40
41     impl = g_hash_table_lookup(store_implementations, type);
42     if (impl == NULL)
43         return NULL;
44
45     gpointer handle = impl->create();
46     if (handle == NULL)
47         return NULL;
48
49     BlueSkyStore *store = g_new(BlueSkyStore, 1);
50     store->impl = impl;
51     store->handle = handle;
52     store->lock = g_mutex_new();
53     store->cond_idle = g_cond_new();
54     store->pending = 0;
55     return store;
56 }
57
58 void bluesky_store_free(BlueSkyStore *store)
59 {
60     store->impl->destroy(store->handle);
61     g_free(store);
62 }
63
64 BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store)
65 {
66     BlueSkyStoreAsync *async;
67
68     async = g_new(BlueSkyStoreAsync, 1);
69     async->store = store;
70     async->lock = g_mutex_new();
71     async->completion_cond = g_cond_new();
72     async->refcount = 1;
73     async->status = ASYNC_NEW;
74     async->op = STORE_OP_NONE;
75     async->key = NULL;
76     async->data = NULL;
77     async->result = -1;
78     async->store_private = NULL;
79
80     return async;
81 }
82
83 void bluesky_store_async_ref(BlueSkyStoreAsync *async)
84 {
85     if (async == NULL)
86         return;
87
88     g_atomic_int_inc(&async->refcount);
89 }
90
91 void bluesky_store_async_unref(BlueSkyStoreAsync *async)
92 {
93     if (async == NULL)
94         return;
95
96     if (g_atomic_int_dec_and_test(&async->refcount)) {
97         async->store->impl->cleanup(async->store->handle, async);
98         g_mutex_free(async->lock);
99         g_cond_free(async->completion_cond);
100         g_free(async->key);
101         bluesky_string_unref(async->data);
102         g_free(async);
103         g_print("Freed async\n");
104     }
105 }
106
107 /* Block until the given operation has completed. */
108 void bluesky_store_async_wait(BlueSkyStoreAsync *async)
109 {
110     g_return_if_fail(async != NULL);
111     g_mutex_lock(async->lock);
112
113     if (async->status == ASYNC_NEW) {
114         g_error("bluesky_store_async_wait on a new async object!\n");
115         g_mutex_unlock(async->lock);
116         return;
117     }
118
119     while (async->status != ASYNC_COMPLETE) {
120         g_cond_wait(async->completion_cond, async->lock);
121     }
122
123     g_mutex_unlock(async->lock);
124 }
125
126 /* Mark an asynchronous operation as complete.  This should only be called by
127  * the store implementations.  The lock must be held when calling this
128  * function. */
129 void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async)
130 {
131     if (async->status != ASYNC_COMPLETE) {
132         g_mutex_lock(async->store->lock);
133         async->store->pending--;
134         if (async->store->pending == 0)
135             g_cond_broadcast(async->store->cond_idle);
136         g_mutex_unlock(async->store->lock);
137     }
138
139     async->status = ASYNC_COMPLETE;
140     g_cond_broadcast(async->completion_cond);
141 }
142
143 void bluesky_store_async_submit(BlueSkyStoreAsync *async)
144 {
145     BlueSkyStore *store = async->store;
146
147     g_mutex_lock(async->store->lock);
148     async->store->pending++;
149     g_mutex_unlock(async->store->lock);
150
151     store->impl->submit(store->handle, async);
152
153     if (bluesky_options.synchronous_stores)
154         bluesky_store_async_wait(async);
155 }
156
157 void bluesky_store_sync(BlueSkyStore *store)
158 {
159     g_mutex_lock(store->lock);
160     g_print("Waiting for pending store operations to complete...\n");
161     while (store->pending > 0) {
162         g_cond_wait(store->cond_idle, store->lock);
163     }
164     g_mutex_unlock(store->lock);
165     g_print("Operations are complete.\n");
166 }
167
168 /* Convenience wrappers that perform a single operation synchronously. */
169 BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key)
170 {
171     BlueSkyStoreAsync *async = bluesky_store_async_new(store);
172     async->op = STORE_OP_GET;
173     async->key = g_strdup(key);
174     bluesky_store_async_submit(async);
175
176     bluesky_store_async_wait(async);
177
178     BlueSkyRCStr *data = async->data;
179     bluesky_string_ref(data);
180     bluesky_store_async_unref(async);
181     return data;
182 }
183
184 void bluesky_store_put(BlueSkyStore *store,
185                        const gchar *key, BlueSkyRCStr *val)
186 {
187     BlueSkyStoreAsync *async = bluesky_store_async_new(store);
188     async->op = STORE_OP_PUT;
189     async->key = g_strdup(key);
190     bluesky_string_ref(val);
191     async->data = val;
192     bluesky_store_async_submit(async);
193
194     bluesky_store_async_wait(async);
195     bluesky_store_async_unref(async);
196 }
197
198 #if 0
199 /* Simple in-memory data store for test purposes. */
200 typedef struct {
201     GMutex *lock;
202
203     /* TODO: A hashtable isn't optimal for list queries... */
204     GHashTable *store;
205 } MemStore;
206
207 static gpointer memstore_create()
208 {
209     MemStore *store = g_new(MemStore, 1);
210     store->lock = g_mutex_new();
211     store->store = g_hash_table_new_full(g_str_hash, g_str_equal,
212                                          g_free,
213                                          (GDestroyNotify)bluesky_string_unref);
214
215     return (gpointer)store;
216 }
217
218 static void memstore_destroy(gpointer store)
219 {
220     /* TODO */
221 }
222
223 static BlueSkyRCStr *memstore_get(gpointer st, const gchar *key)
224 {
225     MemStore *store = (MemStore *)st;
226     BlueSkyRCStr *s = g_hash_table_lookup(store->store, key);
227     if (s != NULL)
228         bluesky_string_ref(s);
229     return s;
230 }
231
232 static void memstore_put(gpointer s, const gchar *key, BlueSkyRCStr *val)
233 {
234     MemStore *store = (MemStore *)s;
235     bluesky_string_ref(val);
236     g_hash_table_insert(store->store, g_strdup(key), val);
237 }
238
239 static BlueSkyStoreImplementation memstore_impl = {
240     .create = memstore_create,
241     .destroy = memstore_destroy,
242     .get = memstore_get,
243     .put = memstore_put,
244 };
245
246 /* Store implementation which writes data as files to disk. */
247 static gpointer filestore_create()
248 {
249     return GINT_TO_POINTER(1);
250 }
251
252 static void filestore_destroy()
253 {
254 }
255
256 static BlueSkyRCStr *filestore_get(gpointer s, const gchar *key)
257 {
258     gchar *contents = NULL;
259     gsize length;
260     GError *error = NULL;
261
262     g_file_get_contents(key, &contents, &length, &error);
263     if (contents == NULL)
264         return NULL;
265
266     return bluesky_string_new(contents, length);
267 }
268
269 static void filestore_put(gpointer s, const gchar *key, BlueSkyRCStr *val)
270 {
271     g_file_set_contents(key, val->data, val->len, NULL);
272 }
273
274 static BlueSkyStoreImplementation filestore_impl = {
275     .create = filestore_create,
276     .destroy = filestore_destroy,
277     .get = filestore_get,
278     .put = filestore_put,
279 };
280 #endif
281
282 void bluesky_store_init()
283 {
284     store_implementations = g_hash_table_new(g_str_hash, g_str_equal);
285     //bluesky_store_register(&memstore_impl, "mem");
286     //bluesky_store_register(&filestore_impl, "file");
287 }