Add support for byterange requests in the storage layer.
[bluesky.git] / bluesky / store.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdint.h>
10 #include <glib.h>
11 #include <string.h>
12
13 #include "bluesky-private.h"
14
15 /* Interaction with cloud storage.  We expose very simple GET/PUT style
16  * interface, which different backends can implement.  Available backends
17  * (will) include Amazon S3 and a simple local store for testing purposes.
18  * Operations may be performed asynchronously. */
19
20 struct BlueSkyStore {
21     const BlueSkyStoreImplementation *impl;
22     gpointer handle;
23
24     GMutex *lock;
25     GCond *cond_idle;
26     int pending;                /* Count of operations not yet complete. */
27
28     struct bluesky_stats *stats_get, *stats_put;
29 };
30
31 GHashTable *store_implementations;
32
33 /* Thread pool for calling notifier functions when an operation completes.
34  * These are called in a separate thread for locking reasons: we want to call
35  * the notifiers without the lock on the async object held, but completion
36  * occurs when the lock is held--so we need some way to defer the call.  This
37  * isn't really optimal from a cache-locality standpoint. */
38 static GThreadPool *notifier_thread_pool;
39
40 void bluesky_store_register(const BlueSkyStoreImplementation *impl,
41                             const gchar *name)
42 {
43     g_hash_table_insert(store_implementations, g_strdup(name), (gpointer)impl);
44 }
45
46 BlueSkyStore *bluesky_store_new(const gchar *type)
47 {
48     const BlueSkyStoreImplementation *impl;
49
50     gchar *scheme, *path;
51     scheme = g_strdup(type);
52     path = strchr(scheme, ':');
53     if (path != NULL) {
54         *path = '\0';
55         path++;
56     }
57
58     impl = g_hash_table_lookup(store_implementations, scheme);
59     if (impl == NULL) {
60         g_free(scheme);
61         return NULL;
62     }
63
64     gpointer handle = impl->create(path);
65     if (handle == NULL) {
66         g_free(scheme);
67         return NULL;
68     }
69
70     BlueSkyStore *store = g_new(BlueSkyStore, 1);
71     store->impl = impl;
72     store->handle = handle;
73     store->lock = g_mutex_new();
74     store->cond_idle = g_cond_new();
75     store->pending = 0;
76     store->stats_get = bluesky_stats_new(g_strdup_printf("Store[%s]: GETS",
77                                                          type));
78     store->stats_put = bluesky_stats_new(g_strdup_printf("Store[%s]: PUTS",
79                                                          type));
80     g_free(scheme);
81     return store;
82 }
83
84 void bluesky_store_free(BlueSkyStore *store)
85 {
86     store->impl->destroy(store->handle);
87     g_free(store);
88 }
89
90 char *bluesky_store_lookup_last(BlueSkyStore *store, const char *prefix)
91 {
92     return store->impl->lookup_last(store->handle, prefix);
93 }
94
95 BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store)
96 {
97     BlueSkyStoreAsync *async;
98
99     async = g_new(BlueSkyStoreAsync, 1);
100     async->store = store;
101     async->lock = g_mutex_new();
102     async->completion_cond = g_cond_new();
103     async->refcount = 1;
104     async->status = ASYNC_NEW;
105     async->op = STORE_OP_NONE;
106     async->key = NULL;
107     async->data = NULL;
108     async->start = async->len = 0;
109     async->range_done = FALSE;
110     async->result = -1;
111     async->notifiers = NULL;
112     async->notifier_count = 0;
113     async->barrier = NULL;
114     async->store_private = NULL;
115
116     return async;
117 }
118
119 gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async)
120 {
121     return async->store->handle;
122 }
123
124 void bluesky_store_async_ref(BlueSkyStoreAsync *async)
125 {
126     if (async == NULL)
127         return;
128
129     g_return_if_fail(g_atomic_int_get(&async->refcount) > 0);
130
131     g_atomic_int_inc(&async->refcount);
132 }
133
134 void bluesky_store_async_unref(BlueSkyStoreAsync *async)
135 {
136     if (async == NULL)
137         return;
138
139     if (g_atomic_int_dec_and_test(&async->refcount)) {
140         async->store->impl->cleanup(async->store->handle, async);
141         g_mutex_free(async->lock);
142         g_cond_free(async->completion_cond);
143         g_free(async->key);
144         bluesky_string_unref(async->data);
145         g_free(async);
146     }
147 }
148
149 /* Block until the given operation has completed. */
150 void bluesky_store_async_wait(BlueSkyStoreAsync *async)
151 {
152     g_return_if_fail(async != NULL);
153     g_mutex_lock(async->lock);
154
155     if (async->status == ASYNC_NEW) {
156         g_error("bluesky_store_async_wait on a new async object!\n");
157         g_mutex_unlock(async->lock);
158         return;
159     }
160
161     while (async->status != ASYNC_COMPLETE
162            || g_atomic_int_get(&async->notifier_count) > 0) {
163         g_cond_wait(async->completion_cond, async->lock);
164     }
165
166     g_mutex_unlock(async->lock);
167 }
168
169 /* Add a notifier function to be called when the operation completes. */
170 void bluesky_store_async_add_notifier(BlueSkyStoreAsync *async,
171                                       GFunc func, gpointer user_data)
172 {
173     struct BlueSkyNotifierList *nl = g_new(struct BlueSkyNotifierList, 1);
174     g_mutex_lock(async->lock);
175     nl->next = async->notifiers;
176     nl->func = func;
177     nl->async = async; bluesky_store_async_ref(async);
178     nl->user_data = user_data;
179     g_atomic_int_inc(&async->notifier_count);
180     if (async->status == ASYNC_COMPLETE) {
181         g_thread_pool_push(notifier_thread_pool, nl, NULL);
182     } else {
183         async->notifiers = nl;
184     }
185     g_mutex_unlock(async->lock);
186 }
187
188 static void op_complete(gpointer a, gpointer b)
189 {
190     BlueSkyStoreAsync *barrier = (BlueSkyStoreAsync *)b;
191
192     bluesky_store_async_ref(barrier);
193     g_mutex_lock(barrier->lock);
194     barrier->store_private
195         = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) - 1);
196     if (GPOINTER_TO_INT(barrier->store_private) == 0
197             && barrier->status != ASYNC_NEW) {
198         bluesky_store_async_mark_complete(barrier);
199     }
200     g_mutex_unlock(barrier->lock);
201     bluesky_store_async_unref(barrier);
202 }
203
204 /* Mark an asynchronous operation as complete.  This should only be called by
205  * the store implementations.  The lock should be held when calling this
206  * function.  Any notifier functions will be called, but in a separate thread
207  * and without the lock held. */
208 void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async)
209 {
210     g_return_if_fail(async->status != ASYNC_COMPLETE);
211
212     bluesky_time_hires elapsed = bluesky_now_hires() - async->start_time;
213     bluesky_time_hires latency = bluesky_now_hires() - async->exec_time;
214
215     if (async->op != STORE_OP_BARRIER) {
216         g_mutex_lock(async->store->lock);
217         async->store->pending--;
218         if (async->store->pending == 0)
219             g_cond_broadcast(async->store->cond_idle);
220         g_mutex_unlock(async->store->lock);
221     }
222
223     /* If the request was a range request but the backend read the entire
224      * object, select out the appropriate bytes. */
225     if (async->op == STORE_OP_GET
226             && !async->range_done
227             && async->result == 0
228             && async->data != NULL) {
229         if (async->start != 0 || async->len != 0) {
230             /* If the caller requesteda read outside the object, return an
231              * error. */
232             if (async->start + async->len > async->data->len) {
233                 g_warning("Range request outside object boundaries!\n");
234                 async->result = -1;
235             } else {
236                 if (async->len == 0)
237                     async->len = async->data->len - async->start;
238                 BlueSkyRCStr *newstr = bluesky_string_new(g_memdup(&async->data->data[async->start], async->len), async->len);
239                 bluesky_string_unref(async->data);
240                 async->data = newstr;
241                 async->range_done = TRUE;
242             }
243         }
244     }
245
246     async->status = ASYNC_COMPLETE;
247     g_cond_broadcast(async->completion_cond);
248
249     if (async->barrier != NULL && async->notifiers == NULL)
250         op_complete(async, async->barrier);
251
252     while (async->notifiers != NULL) {
253         struct BlueSkyNotifierList *nl = async->notifiers;
254         async->notifiers = nl->next;
255         g_thread_pool_push(notifier_thread_pool, nl, NULL);
256     }
257
258     if (bluesky_verbose) {
259         g_log("bluesky/store", G_LOG_LEVEL_DEBUG,
260               "[%p] complete: elapsed = %"PRIi64" ns, latency = %"PRIi64" ns",
261               async, elapsed, latency);
262     }
263
264     if (async->data) {
265         if (async->op == STORE_OP_GET) {
266             bluesky_stats_add(async->store->stats_get, async->data->len);
267         } else if (async->op == STORE_OP_PUT) {
268             bluesky_stats_add(async->store->stats_put, async->data->len);
269         }
270     }
271 }
272
273 void bluesky_store_async_submit(BlueSkyStoreAsync *async)
274 {
275     BlueSkyStore *store = async->store;
276
277     async->start_time = bluesky_now_hires();
278
279     // Backends should fill this in with a better estimate of the actual time
280     // processing was started, if there could be a delay from submission time.
281     async->exec_time = bluesky_now_hires();
282
283     if (bluesky_verbose) {
284         g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "[%p] submit: %s %s",
285               async,
286               async->op == STORE_OP_GET ? "GET"
287                 : async->op == STORE_OP_PUT ? "PUT"
288                 : async->op == STORE_OP_DELETE ? "DELETE"
289                 : async->op == STORE_OP_BARRIER ? "BARRIER" : "???",
290               async->key);
291     }
292
293     /* Barriers are handled specially, and not handed down the storage
294      * implementation layer. */
295     if (async->op == STORE_OP_BARRIER) {
296         async->status = ASYNC_RUNNING;
297         if (GPOINTER_TO_INT(async->store_private) == 0)
298             bluesky_store_async_mark_complete(async);
299         return;
300     }
301
302     g_mutex_lock(async->store->lock);
303     async->store->pending++;
304     g_mutex_unlock(async->store->lock);
305
306     store->impl->submit(store->handle, async);
307
308     if (bluesky_options.synchronous_stores)
309         bluesky_store_async_wait(async);
310 }
311
312 /* Add the given operation to the barrier.  The barrier will not complete until
313  * all operations added to it have completed. */
314 void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier,
315                                BlueSkyStoreAsync *async)
316 {
317     g_return_if_fail(barrier->op == STORE_OP_BARRIER);
318
319     g_mutex_lock(barrier->lock);
320     barrier->store_private
321         = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) + 1);
322     g_mutex_unlock(barrier->lock);
323
324     g_mutex_lock(async->lock);
325     if (async->barrier == NULL && async->status != ASYNC_COMPLETE) {
326         async->barrier = barrier;
327         g_mutex_unlock(async->lock);
328     } else {
329         if (async->barrier != NULL)
330             g_warning("Adding async to more than one barrier!\n");
331         g_mutex_unlock(async->lock);
332         bluesky_store_async_add_notifier(async, op_complete, barrier);
333     }
334 }
335
336 static void notifier_task(gpointer n, gpointer s)
337 {
338     struct BlueSkyNotifierList *notifier = (struct BlueSkyNotifierList *)n;
339
340     notifier->func(notifier->async, notifier->user_data);
341     if (g_atomic_int_dec_and_test(&notifier->async->notifier_count)) {
342         g_mutex_lock(notifier->async->lock);
343         if (notifier->async->barrier != NULL)
344             op_complete(notifier->async, notifier->async->barrier);
345         g_cond_broadcast(notifier->async->completion_cond);
346         g_mutex_unlock(notifier->async->lock);
347     }
348     bluesky_store_async_unref(notifier->async);
349     g_free(notifier);
350 }
351
352 void bluesky_store_sync(BlueSkyStore *store)
353 {
354     g_mutex_lock(store->lock);
355     if (bluesky_verbose) {
356         g_log("bluesky/store", G_LOG_LEVEL_DEBUG,
357               "Waiting for pending store operations to complete...");
358     }
359     while (store->pending > 0) {
360         g_cond_wait(store->cond_idle, store->lock);
361     }
362     g_mutex_unlock(store->lock);
363     if (bluesky_verbose) {
364         g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "Operations are complete.");
365     }
366 }
367
368 /* Convenience wrappers that perform a single operation synchronously. */
369 BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key)
370 {
371     BlueSkyStoreAsync *async = bluesky_store_async_new(store);
372     async->op = STORE_OP_GET;
373     async->key = g_strdup(key);
374     bluesky_store_async_submit(async);
375
376     bluesky_store_async_wait(async);
377
378     BlueSkyRCStr *data = async->data;
379     bluesky_string_ref(data);
380     bluesky_store_async_unref(async);
381     return data;
382 }
383
384 void bluesky_store_put(BlueSkyStore *store,
385                        const gchar *key, BlueSkyRCStr *val)
386 {
387     BlueSkyStoreAsync *async = bluesky_store_async_new(store);
388     async->op = STORE_OP_PUT;
389     async->key = g_strdup(key);
390     bluesky_string_ref(val);
391     async->data = val;
392     bluesky_store_async_submit(async);
393
394     bluesky_store_async_wait(async);
395     bluesky_store_async_unref(async);
396 }
397
398 /* Simple in-memory data store for test purposes. */
399 typedef struct {
400     GMutex *lock;
401
402     /* TODO: A hashtable isn't optimal for list queries... */
403     GHashTable *store;
404 } MemStore;
405
406 static gpointer memstore_create(const gchar *path)
407 {
408     MemStore *store = g_new(MemStore, 1);
409     store->lock = g_mutex_new();
410     store->store = g_hash_table_new_full(g_str_hash, g_str_equal,
411                                          g_free,
412                                          (GDestroyNotify)bluesky_string_unref);
413
414     return (gpointer)store;
415 }
416
417 static void memstore_destroy(gpointer store)
418 {
419     /* TODO */
420 }
421
422 static BlueSkyRCStr *memstore_get(gpointer st, const gchar *key)
423 {
424     MemStore *store = (MemStore *)st;
425     BlueSkyRCStr *s = g_hash_table_lookup(store->store, key);
426     if (s != NULL)
427         bluesky_string_ref(s);
428     return s;
429 }
430
431 static void memstore_put(gpointer s, const gchar *key, BlueSkyRCStr *val)
432 {
433     MemStore *store = (MemStore *)s;
434     bluesky_string_ref(val);
435     g_hash_table_insert(store->store, g_strdup(key), val);
436 }
437
438 static void memstore_submit(gpointer s, BlueSkyStoreAsync *async)
439 {
440     g_return_if_fail(async->status == ASYNC_NEW);
441     g_return_if_fail(async->op != STORE_OP_NONE);
442
443     switch (async->op) {
444     case STORE_OP_GET:
445         async->data = memstore_get(s, async->key);
446         break;
447
448     case STORE_OP_PUT:
449         memstore_put(s, async->key, async->data);
450         break;
451
452     default:
453         g_warning("Uknown operation type for MemStore: %d\n", async->op);
454         return;
455     }
456
457     bluesky_store_async_mark_complete(async);
458 }
459
460 static void memstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
461 {
462 }
463
464 static BlueSkyStoreImplementation memstore_impl = {
465     .create = memstore_create,
466     .destroy = memstore_destroy,
467     .submit = memstore_submit,
468     .cleanup = memstore_cleanup,
469 };
470
471 /* Store implementation which writes data as files to disk. */
472 static gpointer filestore_create(const gchar *path)
473 {
474     return GINT_TO_POINTER(1);
475 }
476
477 static void filestore_destroy()
478 {
479 }
480
481 static BlueSkyRCStr *filestore_get(const gchar *key)
482 {
483     gchar *contents = NULL;
484     gsize length;
485     GError *error = NULL;
486
487     g_file_get_contents(key, &contents, &length, &error);
488     if (contents == NULL)
489         return NULL;
490
491     return bluesky_string_new(contents, length);
492 }
493
494 static void filestore_put(const gchar *key, BlueSkyRCStr *val)
495 {
496     g_file_set_contents(key, val->data, val->len, NULL);
497 }
498
499 static void filestore_submit(gpointer s, BlueSkyStoreAsync *async)
500 {
501     g_return_if_fail(async->status == ASYNC_NEW);
502     g_return_if_fail(async->op != STORE_OP_NONE);
503
504     switch (async->op) {
505     case STORE_OP_GET:
506         async->data = filestore_get(async->key);
507         async->result = 0;
508         break;
509
510     case STORE_OP_PUT:
511         filestore_put(async->key, async->data);
512         async->result = 0;
513         break;
514
515     default:
516         g_warning("Uknown operation type for FileStore: %d\n", async->op);
517         return;
518     }
519
520     bluesky_store_async_mark_complete(async);
521 }
522
523 static void filestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
524 {
525 }
526
527 static char *filestore_lookup_last(gpointer store, const char *prefix)
528 {
529     char *last = NULL;
530     GDir *dir = g_dir_open(".", 0, NULL);
531     if (dir == NULL) {
532         g_warning("Unable to open directory for listing");
533         return NULL;
534     }
535
536     const gchar *file;
537     while ((file = g_dir_read_name(dir)) != NULL) {
538         if (strncmp(file, prefix, strlen(prefix)) == 0) {
539             if (last == NULL || strcmp(file, last) > 0) {
540                 g_free(last);
541                 last = g_strdup(file);
542             }
543         }
544     }
545     g_dir_close(dir);
546
547     return last;
548 }
549
550 static BlueSkyStoreImplementation filestore_impl = {
551     .create = filestore_create,
552     .destroy = filestore_destroy,
553     .submit = filestore_submit,
554     .cleanup = filestore_cleanup,
555     .lookup_last = filestore_lookup_last,
556 };
557
558 /* A store implementation which simply discards all data, for testing. */
559 static gpointer nullstore_create(const gchar *path)
560 {
561     return (gpointer)nullstore_create;
562 }
563
564 static void nullstore_destroy(gpointer store)
565 {
566 }
567
568 static void nullstore_submit(gpointer s, BlueSkyStoreAsync *async)
569 {
570     bluesky_store_async_mark_complete(async);
571 }
572
573 static void nullstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
574 {
575 }
576
577 static BlueSkyStoreImplementation nullstore_impl = {
578     .create = nullstore_create,
579     .destroy = nullstore_destroy,
580     .submit = nullstore_submit,
581     .cleanup = nullstore_cleanup,
582 };
583
584 void bluesky_store_init()
585 {
586     store_implementations = g_hash_table_new(g_str_hash, g_str_equal);
587     notifier_thread_pool = g_thread_pool_new(notifier_task, NULL,
588                                              bluesky_max_threads, FALSE, NULL);
589     bluesky_store_register(&memstore_impl, "mem");
590     bluesky_store_register(&filestore_impl, "file");
591     bluesky_store_register(&nullstore_impl, "null");
592 }