Drop another debugging message in non-verbose mode
[bluesky.git] / bluesky / store.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 #include <stdint.h>
10 #include <glib.h>
11 #include <string.h>
12
13 #include "bluesky-private.h"
14
15 /* Interaction with cloud storage.  We expose very simple GET/PUT style
16  * interface, which different backends can implement.  Available backends
17  * (will) include Amazon S3 and a simple local store for testing purposes.
18  * Operations may be performed asynchronously. */
19
20 struct BlueSkyStore {
21     const BlueSkyStoreImplementation *impl;
22     gpointer handle;
23
24     GMutex *lock;
25     GCond *cond_idle;
26     int pending;                /* Count of operations not yet complete. */
27
28     struct bluesky_stats *stats_get, *stats_put;
29 };
30
31 GHashTable *store_implementations;
32
33 /* Thread pool for calling notifier functions when an operation completes.
34  * These are called in a separate thread for locking reasons: we want to call
35  * the notifiers without the lock on the async object held, but completion
36  * occurs when the lock is held--so we need some way to defer the call.  This
37  * isn't really optimal from a cache-locality standpoint. */
38 static GThreadPool *notifier_thread_pool;
39
40 void bluesky_store_register(const BlueSkyStoreImplementation *impl,
41                             const gchar *name)
42 {
43     g_hash_table_insert(store_implementations, g_strdup(name), (gpointer)impl);
44 }
45
46 BlueSkyStore *bluesky_store_new(const gchar *type)
47 {
48     const BlueSkyStoreImplementation *impl;
49
50     gchar *scheme, *path;
51     scheme = g_strdup(type);
52     path = strchr(scheme, ':');
53     if (path != NULL) {
54         *path = '\0';
55         path++;
56     }
57
58     impl = g_hash_table_lookup(store_implementations, scheme);
59     if (impl == NULL) {
60         g_free(scheme);
61         return NULL;
62     }
63
64     gpointer handle = impl->create(path);
65     if (handle == NULL) {
66         g_free(scheme);
67         return NULL;
68     }
69
70     BlueSkyStore *store = g_new(BlueSkyStore, 1);
71     store->impl = impl;
72     store->handle = handle;
73     store->lock = g_mutex_new();
74     store->cond_idle = g_cond_new();
75     store->pending = 0;
76     store->stats_get = bluesky_stats_new(g_strdup_printf("Store[%s]: GETS",
77                                                          type));
78     store->stats_put = bluesky_stats_new(g_strdup_printf("Store[%s]: PUTS",
79                                                          type));
80     g_free(scheme);
81     return store;
82 }
83
84 void bluesky_store_free(BlueSkyStore *store)
85 {
86     store->impl->destroy(store->handle);
87     g_free(store);
88 }
89
90 char *bluesky_store_lookup_last(BlueSkyStore *store, const char *prefix)
91 {
92     return store->impl->lookup_last(store->handle, prefix);
93 }
94
95 BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store)
96 {
97     BlueSkyStoreAsync *async;
98
99     async = g_new(BlueSkyStoreAsync, 1);
100     async->store = store;
101     async->lock = g_mutex_new();
102     async->completion_cond = g_cond_new();
103     async->refcount = 1;
104     async->status = ASYNC_NEW;
105     async->op = STORE_OP_NONE;
106     async->key = NULL;
107     async->data = NULL;
108     async->start = async->len = 0;
109     async->range_done = FALSE;
110     async->result = -1;
111     async->notifiers = NULL;
112     async->notifier_count = 0;
113     async->barrier = NULL;
114     async->store_private = NULL;
115     async->profile = NULL;
116
117     return async;
118 }
119
120 gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async)
121 {
122     return async->store->handle;
123 }
124
125 void bluesky_store_async_ref(BlueSkyStoreAsync *async)
126 {
127     if (async == NULL)
128         return;
129
130     g_return_if_fail(g_atomic_int_get(&async->refcount) > 0);
131
132     g_atomic_int_inc(&async->refcount);
133 }
134
135 void bluesky_store_async_unref(BlueSkyStoreAsync *async)
136 {
137     if (async == NULL)
138         return;
139
140     if (g_atomic_int_dec_and_test(&async->refcount)) {
141         async->store->impl->cleanup(async->store->handle, async);
142         g_mutex_free(async->lock);
143         g_cond_free(async->completion_cond);
144         g_free(async->key);
145         bluesky_string_unref(async->data);
146         g_free(async);
147     }
148 }
149
150 /* Block until the given operation has completed. */
151 void bluesky_store_async_wait(BlueSkyStoreAsync *async)
152 {
153     g_return_if_fail(async != NULL);
154     g_mutex_lock(async->lock);
155
156     if (async->status == ASYNC_NEW) {
157         g_error("bluesky_store_async_wait on a new async object!\n");
158         g_mutex_unlock(async->lock);
159         return;
160     }
161
162     while (async->status != ASYNC_COMPLETE
163            || g_atomic_int_get(&async->notifier_count) > 0) {
164         g_cond_wait(async->completion_cond, async->lock);
165     }
166
167     g_mutex_unlock(async->lock);
168 }
169
170 /* Add a notifier function to be called when the operation completes. */
171 void bluesky_store_async_add_notifier(BlueSkyStoreAsync *async,
172                                       GFunc func, gpointer user_data)
173 {
174     struct BlueSkyNotifierList *nl = g_new(struct BlueSkyNotifierList, 1);
175     g_mutex_lock(async->lock);
176     nl->next = async->notifiers;
177     nl->func = func;
178     nl->async = async; bluesky_store_async_ref(async);
179     nl->user_data = user_data;
180     g_atomic_int_inc(&async->notifier_count);
181     if (async->status == ASYNC_COMPLETE) {
182         g_thread_pool_push(notifier_thread_pool, nl, NULL);
183     } else {
184         async->notifiers = nl;
185     }
186     g_mutex_unlock(async->lock);
187 }
188
189 static void op_complete(gpointer a, gpointer b)
190 {
191     BlueSkyStoreAsync *barrier = (BlueSkyStoreAsync *)b;
192
193     bluesky_store_async_ref(barrier);
194     g_mutex_lock(barrier->lock);
195     barrier->store_private
196         = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) - 1);
197     if (GPOINTER_TO_INT(barrier->store_private) == 0
198             && barrier->status != ASYNC_NEW) {
199         bluesky_store_async_mark_complete(barrier);
200     }
201     g_mutex_unlock(barrier->lock);
202     bluesky_store_async_unref(barrier);
203 }
204
205 /* Mark an asynchronous operation as complete.  This should only be called by
206  * the store implementations.  The lock should be held when calling this
207  * function.  Any notifier functions will be called, but in a separate thread
208  * and without the lock held. */
209 void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async)
210 {
211     g_return_if_fail(async->status != ASYNC_COMPLETE);
212
213     bluesky_time_hires elapsed = bluesky_now_hires() - async->start_time;
214     bluesky_time_hires latency = bluesky_now_hires() - async->exec_time;
215
216     if (async->op != STORE_OP_BARRIER) {
217         g_mutex_lock(async->store->lock);
218         async->store->pending--;
219         if (async->store->pending == 0)
220             g_cond_broadcast(async->store->cond_idle);
221         g_mutex_unlock(async->store->lock);
222     }
223
224     /* If the request was a range request but the backend read the entire
225      * object, select out the appropriate bytes. */
226     if (async->op == STORE_OP_GET
227             && !async->range_done
228             && async->result == 0
229             && async->data != NULL) {
230         if (async->start != 0 || async->len != 0) {
231             /* If the caller requesteda read outside the object, return an
232              * error. */
233             if (async->start + async->len > async->data->len) {
234                 g_warning("Range request outside object boundaries!\n");
235                 async->result = -1;
236             } else {
237                 if (async->len == 0)
238                     async->len = async->data->len - async->start;
239                 BlueSkyRCStr *newstr = bluesky_string_new(g_memdup(&async->data->data[async->start], async->len), async->len);
240                 bluesky_string_unref(async->data);
241                 async->data = newstr;
242                 async->range_done = TRUE;
243             }
244         }
245     }
246
247     async->status = ASYNC_COMPLETE;
248     g_cond_broadcast(async->completion_cond);
249
250     if (async->barrier != NULL && async->notifiers == NULL)
251         op_complete(async, async->barrier);
252
253     while (async->notifiers != NULL) {
254         struct BlueSkyNotifierList *nl = async->notifiers;
255         async->notifiers = nl->next;
256         g_thread_pool_push(notifier_thread_pool, nl, NULL);
257     }
258
259     if (async->profile) {
260         bluesky_profile_add_event(
261             async->profile,
262             g_strdup_printf("%s for %s complete",
263                             async->op == STORE_OP_GET ? "GET"
264                             : async->op == STORE_OP_PUT ? "PUT"
265                             : async->op == STORE_OP_DELETE ? "DELETE"
266                             : async->op == STORE_OP_BARRIER ? "BARRIER" : "???",
267                             async->key)
268         );
269     }
270
271     if (bluesky_verbose) {
272         g_log("bluesky/store", G_LOG_LEVEL_DEBUG,
273               "[%p] complete: elapsed = %"PRIi64" ns, latency = %"PRIi64" ns",
274               async, elapsed, latency);
275     }
276
277     if (async->data) {
278         if (async->op == STORE_OP_GET) {
279             bluesky_stats_add(async->store->stats_get, async->data->len);
280         } else if (async->op == STORE_OP_PUT) {
281             bluesky_stats_add(async->store->stats_put, async->data->len);
282         }
283     }
284 }
285
286 void bluesky_store_async_submit(BlueSkyStoreAsync *async)
287 {
288     BlueSkyStore *store = async->store;
289
290     async->start_time = bluesky_now_hires();
291
292     // Backends should fill this in with a better estimate of the actual time
293     // processing was started, if there could be a delay from submission time.
294     async->exec_time = bluesky_now_hires();
295
296     if (async->profile) {
297         bluesky_profile_add_event(
298             async->profile,
299             g_strdup_printf("Start %s for %s",
300                             async->op == STORE_OP_GET ? "GET"
301                             : async->op == STORE_OP_PUT ? "PUT"
302                             : async->op == STORE_OP_DELETE ? "DELETE"
303                             : async->op == STORE_OP_BARRIER ? "BARRIER" : "???",
304                             async->key)
305         );
306     }
307
308     if (bluesky_verbose) {
309         g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "[%p] submit: %s %s",
310               async,
311               async->op == STORE_OP_GET ? "GET"
312                 : async->op == STORE_OP_PUT ? "PUT"
313                 : async->op == STORE_OP_DELETE ? "DELETE"
314                 : async->op == STORE_OP_BARRIER ? "BARRIER" : "???",
315               async->key);
316     }
317
318     /* Barriers are handled specially, and not handed down the storage
319      * implementation layer. */
320     if (async->op == STORE_OP_BARRIER) {
321         async->status = ASYNC_RUNNING;
322         if (GPOINTER_TO_INT(async->store_private) == 0)
323             bluesky_store_async_mark_complete(async);
324         return;
325     }
326
327     g_mutex_lock(async->store->lock);
328     async->store->pending++;
329     g_mutex_unlock(async->store->lock);
330
331     store->impl->submit(store->handle, async);
332
333     if (bluesky_options.synchronous_stores)
334         bluesky_store_async_wait(async);
335 }
336
337 /* Add the given operation to the barrier.  The barrier will not complete until
338  * all operations added to it have completed. */
339 void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier,
340                                BlueSkyStoreAsync *async)
341 {
342     g_return_if_fail(barrier->op == STORE_OP_BARRIER);
343
344     g_mutex_lock(barrier->lock);
345     barrier->store_private
346         = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) + 1);
347     g_mutex_unlock(barrier->lock);
348
349     g_mutex_lock(async->lock);
350     if (async->barrier == NULL && async->status != ASYNC_COMPLETE) {
351         async->barrier = barrier;
352         g_mutex_unlock(async->lock);
353     } else {
354         if (async->barrier != NULL)
355             g_warning("Adding async to more than one barrier!\n");
356         g_mutex_unlock(async->lock);
357         bluesky_store_async_add_notifier(async, op_complete, barrier);
358     }
359 }
360
361 static void notifier_task(gpointer n, gpointer s)
362 {
363     struct BlueSkyNotifierList *notifier = (struct BlueSkyNotifierList *)n;
364
365     notifier->func(notifier->async, notifier->user_data);
366     if (g_atomic_int_dec_and_test(&notifier->async->notifier_count)) {
367         g_mutex_lock(notifier->async->lock);
368         if (notifier->async->barrier != NULL)
369             op_complete(notifier->async, notifier->async->barrier);
370         g_cond_broadcast(notifier->async->completion_cond);
371         g_mutex_unlock(notifier->async->lock);
372     }
373     bluesky_store_async_unref(notifier->async);
374     g_free(notifier);
375 }
376
377 void bluesky_store_sync(BlueSkyStore *store)
378 {
379     g_mutex_lock(store->lock);
380     if (bluesky_verbose) {
381         g_log("bluesky/store", G_LOG_LEVEL_DEBUG,
382               "Waiting for pending store operations to complete...");
383     }
384     while (store->pending > 0) {
385         g_cond_wait(store->cond_idle, store->lock);
386     }
387     g_mutex_unlock(store->lock);
388     if (bluesky_verbose) {
389         g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "Operations are complete.");
390     }
391 }
392
393 /* Convenience wrappers that perform a single operation synchronously. */
394 BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key)
395 {
396     BlueSkyStoreAsync *async = bluesky_store_async_new(store);
397     async->op = STORE_OP_GET;
398     async->key = g_strdup(key);
399     bluesky_store_async_submit(async);
400
401     bluesky_store_async_wait(async);
402
403     BlueSkyRCStr *data = async->data;
404     bluesky_string_ref(data);
405     bluesky_store_async_unref(async);
406     return data;
407 }
408
409 void bluesky_store_put(BlueSkyStore *store,
410                        const gchar *key, BlueSkyRCStr *val)
411 {
412     BlueSkyStoreAsync *async = bluesky_store_async_new(store);
413     async->op = STORE_OP_PUT;
414     async->key = g_strdup(key);
415     bluesky_string_ref(val);
416     async->data = val;
417     bluesky_store_async_submit(async);
418
419     bluesky_store_async_wait(async);
420     bluesky_store_async_unref(async);
421 }
422
423 /* Simple in-memory data store for test purposes. */
424 typedef struct {
425     GMutex *lock;
426
427     /* TODO: A hashtable isn't optimal for list queries... */
428     GHashTable *store;
429 } MemStore;
430
431 static gpointer memstore_create(const gchar *path)
432 {
433     MemStore *store = g_new(MemStore, 1);
434     store->lock = g_mutex_new();
435     store->store = g_hash_table_new_full(g_str_hash, g_str_equal,
436                                          g_free,
437                                          (GDestroyNotify)bluesky_string_unref);
438
439     return (gpointer)store;
440 }
441
442 static void memstore_destroy(gpointer store)
443 {
444     /* TODO */
445 }
446
447 static BlueSkyRCStr *memstore_get(gpointer st, const gchar *key)
448 {
449     MemStore *store = (MemStore *)st;
450     BlueSkyRCStr *s = g_hash_table_lookup(store->store, key);
451     if (s != NULL)
452         bluesky_string_ref(s);
453     return s;
454 }
455
456 static void memstore_put(gpointer s, const gchar *key, BlueSkyRCStr *val)
457 {
458     MemStore *store = (MemStore *)s;
459     bluesky_string_ref(val);
460     g_hash_table_insert(store->store, g_strdup(key), val);
461 }
462
463 static void memstore_submit(gpointer s, BlueSkyStoreAsync *async)
464 {
465     g_return_if_fail(async->status == ASYNC_NEW);
466     g_return_if_fail(async->op != STORE_OP_NONE);
467
468     switch (async->op) {
469     case STORE_OP_GET:
470         async->data = memstore_get(s, async->key);
471         break;
472
473     case STORE_OP_PUT:
474         memstore_put(s, async->key, async->data);
475         break;
476
477     default:
478         g_warning("Uknown operation type for MemStore: %d\n", async->op);
479         return;
480     }
481
482     bluesky_store_async_mark_complete(async);
483 }
484
485 static void memstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
486 {
487 }
488
489 static BlueSkyStoreImplementation memstore_impl = {
490     .create = memstore_create,
491     .destroy = memstore_destroy,
492     .submit = memstore_submit,
493     .cleanup = memstore_cleanup,
494 };
495
496 /* Store implementation which writes data as files to disk. */
497 static gpointer filestore_create(const gchar *path)
498 {
499     return GINT_TO_POINTER(1);
500 }
501
502 static void filestore_destroy()
503 {
504 }
505
506 static BlueSkyRCStr *filestore_get(const gchar *key)
507 {
508     gchar *contents = NULL;
509     gsize length;
510     GError *error = NULL;
511
512     g_file_get_contents(key, &contents, &length, &error);
513     if (contents == NULL)
514         return NULL;
515
516     return bluesky_string_new(contents, length);
517 }
518
519 static void filestore_put(const gchar *key, BlueSkyRCStr *val)
520 {
521     g_file_set_contents(key, val->data, val->len, NULL);
522 }
523
524 static void filestore_submit(gpointer s, BlueSkyStoreAsync *async)
525 {
526     g_return_if_fail(async->status == ASYNC_NEW);
527     g_return_if_fail(async->op != STORE_OP_NONE);
528
529     switch (async->op) {
530     case STORE_OP_GET:
531         async->data = filestore_get(async->key);
532         async->result = 0;
533         break;
534
535     case STORE_OP_PUT:
536         filestore_put(async->key, async->data);
537         async->result = 0;
538         break;
539
540     default:
541         g_warning("Uknown operation type for FileStore: %d\n", async->op);
542         return;
543     }
544
545     bluesky_store_async_mark_complete(async);
546 }
547
548 static void filestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
549 {
550 }
551
552 static char *filestore_lookup_last(gpointer store, const char *prefix)
553 {
554     char *last = NULL;
555     GDir *dir = g_dir_open(".", 0, NULL);
556     if (dir == NULL) {
557         g_warning("Unable to open directory for listing");
558         return NULL;
559     }
560
561     const gchar *file;
562     while ((file = g_dir_read_name(dir)) != NULL) {
563         if (strncmp(file, prefix, strlen(prefix)) == 0) {
564             if (last == NULL || strcmp(file, last) > 0) {
565                 g_free(last);
566                 last = g_strdup(file);
567             }
568         }
569     }
570     g_dir_close(dir);
571
572     return last;
573 }
574
575 static BlueSkyStoreImplementation filestore_impl = {
576     .create = filestore_create,
577     .destroy = filestore_destroy,
578     .submit = filestore_submit,
579     .cleanup = filestore_cleanup,
580     .lookup_last = filestore_lookup_last,
581 };
582
583 /* A store implementation which simply discards all data, for testing. */
584 static gpointer nullstore_create(const gchar *path)
585 {
586     return (gpointer)nullstore_create;
587 }
588
589 static void nullstore_destroy(gpointer store)
590 {
591 }
592
593 static void nullstore_submit(gpointer s, BlueSkyStoreAsync *async)
594 {
595     bluesky_store_async_mark_complete(async);
596 }
597
598 static void nullstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
599 {
600 }
601
602 static BlueSkyStoreImplementation nullstore_impl = {
603     .create = nullstore_create,
604     .destroy = nullstore_destroy,
605     .submit = nullstore_submit,
606     .cleanup = nullstore_cleanup,
607 };
608
609 void bluesky_store_init()
610 {
611     store_implementations = g_hash_table_new(g_str_hash, g_str_equal);
612     notifier_thread_pool = g_thread_pool_new(notifier_task, NULL,
613                                              bluesky_max_threads, FALSE, NULL);
614     bluesky_store_register(&memstore_impl, "mem");
615     bluesky_store_register(&filestore_impl, "file");
616     bluesky_store_register(&nullstore_impl, "null");
617 }