1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the University nor the names of its contributors
15 * may be used to endorse or promote products derived from this software
16 * without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 #include "bluesky-private.h"
37 /* Interaction with cloud storage. We expose very simple GET/PUT style
38 * interface, which different backends can implement. Available backends
39 * (will) include Amazon S3 and a simple local store for testing purposes.
40 * Operations may be performed asynchronously. */
43 const BlueSkyStoreImplementation *impl;
48 int pending; /* Count of operations not yet complete. */
50 struct bluesky_stats *stats_get, *stats_put;
53 GHashTable *store_implementations;
55 /* Thread pool for calling notifier functions when an operation completes.
56 * These are called in a separate thread for locking reasons: we want to call
57 * the notifiers without the lock on the async object held, but completion
58 * occurs when the lock is held--so we need some way to defer the call. This
59 * isn't really optimal from a cache-locality standpoint. */
60 static GThreadPool *notifier_thread_pool;
62 void bluesky_store_register(const BlueSkyStoreImplementation *impl,
65 g_hash_table_insert(store_implementations, g_strdup(name), (gpointer)impl);
68 BlueSkyStore *bluesky_store_new(const gchar *type)
70 const BlueSkyStoreImplementation *impl;
73 scheme = g_strdup(type);
74 path = strchr(scheme, ':');
80 impl = g_hash_table_lookup(store_implementations, scheme);
86 gpointer handle = impl->create(path);
92 BlueSkyStore *store = g_new(BlueSkyStore, 1);
94 store->handle = handle;
95 store->lock = g_mutex_new();
96 store->cond_idle = g_cond_new();
98 store->stats_get = bluesky_stats_new(g_strdup_printf("Store[%s]: GETS",
100 store->stats_put = bluesky_stats_new(g_strdup_printf("Store[%s]: PUTS",
106 void bluesky_store_free(BlueSkyStore *store)
108 store->impl->destroy(store->handle);
112 char *bluesky_store_lookup_last(BlueSkyStore *store, const char *prefix)
114 return store->impl->lookup_last(store->handle, prefix);
117 BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store)
119 BlueSkyStoreAsync *async;
121 async = g_new(BlueSkyStoreAsync, 1);
122 async->store = store;
123 async->lock = g_mutex_new();
124 async->completion_cond = g_cond_new();
126 async->status = ASYNC_NEW;
127 async->op = STORE_OP_NONE;
130 async->start = async->len = 0;
131 async->range_done = FALSE;
133 async->notifiers = NULL;
134 async->notifier_count = 0;
135 async->barrier = NULL;
136 async->store_private = NULL;
137 async->profile = NULL;
142 gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async)
144 return async->store->handle;
147 void bluesky_store_async_ref(BlueSkyStoreAsync *async)
152 g_return_if_fail(g_atomic_int_get(&async->refcount) > 0);
154 g_atomic_int_inc(&async->refcount);
157 void bluesky_store_async_unref(BlueSkyStoreAsync *async)
162 if (g_atomic_int_dec_and_test(&async->refcount)) {
163 async->store->impl->cleanup(async->store->handle, async);
164 g_mutex_free(async->lock);
165 g_cond_free(async->completion_cond);
167 bluesky_string_unref(async->data);
172 /* Block until the given operation has completed. */
173 void bluesky_store_async_wait(BlueSkyStoreAsync *async)
175 g_return_if_fail(async != NULL);
176 g_mutex_lock(async->lock);
178 if (async->status == ASYNC_NEW) {
179 g_error("bluesky_store_async_wait on a new async object!\n");
180 g_mutex_unlock(async->lock);
184 while (async->status != ASYNC_COMPLETE
185 || g_atomic_int_get(&async->notifier_count) > 0) {
186 g_cond_wait(async->completion_cond, async->lock);
189 g_mutex_unlock(async->lock);
192 /* Add a notifier function to be called when the operation completes. */
193 void bluesky_store_async_add_notifier(BlueSkyStoreAsync *async,
194 GFunc func, gpointer user_data)
196 struct BlueSkyNotifierList *nl = g_new(struct BlueSkyNotifierList, 1);
197 g_mutex_lock(async->lock);
198 nl->next = async->notifiers;
200 nl->async = async; bluesky_store_async_ref(async);
201 nl->user_data = user_data;
202 g_atomic_int_inc(&async->notifier_count);
203 if (async->status == ASYNC_COMPLETE) {
204 g_thread_pool_push(notifier_thread_pool, nl, NULL);
206 async->notifiers = nl;
208 g_mutex_unlock(async->lock);
211 static void op_complete(gpointer a, gpointer b)
213 BlueSkyStoreAsync *barrier = (BlueSkyStoreAsync *)b;
215 bluesky_store_async_ref(barrier);
216 g_mutex_lock(barrier->lock);
217 barrier->store_private
218 = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) - 1);
219 if (GPOINTER_TO_INT(barrier->store_private) == 0
220 && barrier->status != ASYNC_NEW) {
221 bluesky_store_async_mark_complete(barrier);
223 g_mutex_unlock(barrier->lock);
224 bluesky_store_async_unref(barrier);
227 /* Mark an asynchronous operation as complete. This should only be called by
228 * the store implementations. The lock should be held when calling this
229 * function. Any notifier functions will be called, but in a separate thread
230 * and without the lock held. */
231 void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async)
233 g_return_if_fail(async->status != ASYNC_COMPLETE);
235 bluesky_time_hires elapsed = bluesky_now_hires() - async->start_time;
236 bluesky_time_hires latency = bluesky_now_hires() - async->exec_time;
238 if (async->op != STORE_OP_BARRIER) {
239 g_mutex_lock(async->store->lock);
240 async->store->pending--;
241 if (async->store->pending == 0)
242 g_cond_broadcast(async->store->cond_idle);
243 g_mutex_unlock(async->store->lock);
246 /* If the request was a range request but the backend read the entire
247 * object, select out the appropriate bytes. */
248 if (async->op == STORE_OP_GET
249 && !async->range_done
250 && async->result == 0
251 && async->data != NULL) {
252 if (async->start != 0 || async->len != 0) {
253 /* If the caller requesteda read outside the object, return an
255 if (async->start + async->len > async->data->len) {
256 g_warning("Range request outside object boundaries!\n");
260 async->len = async->data->len - async->start;
261 BlueSkyRCStr *newstr = bluesky_string_new(g_memdup(&async->data->data[async->start], async->len), async->len);
262 bluesky_string_unref(async->data);
263 async->data = newstr;
264 async->range_done = TRUE;
269 async->status = ASYNC_COMPLETE;
270 g_cond_broadcast(async->completion_cond);
272 if (async->barrier != NULL && async->notifiers == NULL)
273 op_complete(async, async->barrier);
275 while (async->notifiers != NULL) {
276 struct BlueSkyNotifierList *nl = async->notifiers;
277 async->notifiers = nl->next;
278 g_thread_pool_push(notifier_thread_pool, nl, NULL);
281 if (async->profile) {
282 bluesky_profile_add_event(
284 g_strdup_printf("%s for %s complete",
285 async->op == STORE_OP_GET ? "GET"
286 : async->op == STORE_OP_PUT ? "PUT"
287 : async->op == STORE_OP_DELETE ? "DELETE"
288 : async->op == STORE_OP_BARRIER ? "BARRIER" : "???",
293 if (bluesky_verbose) {
294 g_log("bluesky/store", G_LOG_LEVEL_DEBUG,
295 "[%p] complete: elapsed = %"PRIi64" ns, latency = %"PRIi64" ns",
296 async, elapsed, latency);
300 if (async->op == STORE_OP_GET) {
301 bluesky_stats_add(async->store->stats_get, async->data->len);
302 } else if (async->op == STORE_OP_PUT) {
303 bluesky_stats_add(async->store->stats_put, async->data->len);
308 void bluesky_store_async_submit(BlueSkyStoreAsync *async)
310 BlueSkyStore *store = async->store;
312 async->start_time = bluesky_now_hires();
314 // Backends should fill this in with a better estimate of the actual time
315 // processing was started, if there could be a delay from submission time.
316 async->exec_time = bluesky_now_hires();
318 if (async->profile) {
319 bluesky_profile_add_event(
321 g_strdup_printf("Start %s for %s",
322 async->op == STORE_OP_GET ? "GET"
323 : async->op == STORE_OP_PUT ? "PUT"
324 : async->op == STORE_OP_DELETE ? "DELETE"
325 : async->op == STORE_OP_BARRIER ? "BARRIER" : "???",
330 if (bluesky_verbose) {
331 g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "[%p] submit: %s %s",
333 async->op == STORE_OP_GET ? "GET"
334 : async->op == STORE_OP_PUT ? "PUT"
335 : async->op == STORE_OP_DELETE ? "DELETE"
336 : async->op == STORE_OP_BARRIER ? "BARRIER" : "???",
340 /* Barriers are handled specially, and not handed down the storage
341 * implementation layer. */
342 if (async->op == STORE_OP_BARRIER) {
343 async->status = ASYNC_RUNNING;
344 if (GPOINTER_TO_INT(async->store_private) == 0)
345 bluesky_store_async_mark_complete(async);
349 g_mutex_lock(async->store->lock);
350 async->store->pending++;
351 g_mutex_unlock(async->store->lock);
353 store->impl->submit(store->handle, async);
355 if (bluesky_options.synchronous_stores)
356 bluesky_store_async_wait(async);
359 /* Add the given operation to the barrier. The barrier will not complete until
360 * all operations added to it have completed. */
361 void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier,
362 BlueSkyStoreAsync *async)
364 g_return_if_fail(barrier->op == STORE_OP_BARRIER);
366 g_mutex_lock(barrier->lock);
367 barrier->store_private
368 = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) + 1);
369 g_mutex_unlock(barrier->lock);
371 g_mutex_lock(async->lock);
372 if (async->barrier == NULL && async->status != ASYNC_COMPLETE) {
373 async->barrier = barrier;
374 g_mutex_unlock(async->lock);
376 if (async->barrier != NULL)
377 g_warning("Adding async to more than one barrier!\n");
378 g_mutex_unlock(async->lock);
379 bluesky_store_async_add_notifier(async, op_complete, barrier);
383 static void notifier_task(gpointer n, gpointer s)
385 struct BlueSkyNotifierList *notifier = (struct BlueSkyNotifierList *)n;
387 notifier->func(notifier->async, notifier->user_data);
388 if (g_atomic_int_dec_and_test(¬ifier->async->notifier_count)) {
389 g_mutex_lock(notifier->async->lock);
390 if (notifier->async->barrier != NULL)
391 op_complete(notifier->async, notifier->async->barrier);
392 g_cond_broadcast(notifier->async->completion_cond);
393 g_mutex_unlock(notifier->async->lock);
395 bluesky_store_async_unref(notifier->async);
399 void bluesky_store_sync(BlueSkyStore *store)
401 g_mutex_lock(store->lock);
402 if (bluesky_verbose) {
403 g_log("bluesky/store", G_LOG_LEVEL_DEBUG,
404 "Waiting for pending store operations to complete...");
406 while (store->pending > 0) {
407 g_cond_wait(store->cond_idle, store->lock);
409 g_mutex_unlock(store->lock);
410 if (bluesky_verbose) {
411 g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "Operations are complete.");
415 /* Convenience wrappers that perform a single operation synchronously. */
416 BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key)
418 BlueSkyStoreAsync *async = bluesky_store_async_new(store);
419 async->op = STORE_OP_GET;
420 async->key = g_strdup(key);
421 bluesky_store_async_submit(async);
423 bluesky_store_async_wait(async);
425 BlueSkyRCStr *data = async->data;
426 bluesky_string_ref(data);
427 bluesky_store_async_unref(async);
431 void bluesky_store_put(BlueSkyStore *store,
432 const gchar *key, BlueSkyRCStr *val)
434 BlueSkyStoreAsync *async = bluesky_store_async_new(store);
435 async->op = STORE_OP_PUT;
436 async->key = g_strdup(key);
437 bluesky_string_ref(val);
439 bluesky_store_async_submit(async);
441 bluesky_store_async_wait(async);
442 bluesky_store_async_unref(async);
445 /* Simple in-memory data store for test purposes. */
449 /* TODO: A hashtable isn't optimal for list queries... */
453 static gpointer memstore_create(const gchar *path)
455 MemStore *store = g_new(MemStore, 1);
456 store->lock = g_mutex_new();
457 store->store = g_hash_table_new_full(g_str_hash, g_str_equal,
459 (GDestroyNotify)bluesky_string_unref);
461 return (gpointer)store;
464 static void memstore_destroy(gpointer store)
469 static BlueSkyRCStr *memstore_get(gpointer st, const gchar *key)
471 MemStore *store = (MemStore *)st;
472 BlueSkyRCStr *s = g_hash_table_lookup(store->store, key);
474 bluesky_string_ref(s);
478 static void memstore_put(gpointer s, const gchar *key, BlueSkyRCStr *val)
480 MemStore *store = (MemStore *)s;
481 bluesky_string_ref(val);
482 g_hash_table_insert(store->store, g_strdup(key), val);
485 static void memstore_submit(gpointer s, BlueSkyStoreAsync *async)
487 g_return_if_fail(async->status == ASYNC_NEW);
488 g_return_if_fail(async->op != STORE_OP_NONE);
492 async->data = memstore_get(s, async->key);
496 memstore_put(s, async->key, async->data);
500 g_warning("Uknown operation type for MemStore: %d\n", async->op);
504 bluesky_store_async_mark_complete(async);
507 static void memstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
511 static BlueSkyStoreImplementation memstore_impl = {
512 .create = memstore_create,
513 .destroy = memstore_destroy,
514 .submit = memstore_submit,
515 .cleanup = memstore_cleanup,
518 /* Store implementation which writes data as files to disk. */
519 static gpointer filestore_create(const gchar *path)
521 return GINT_TO_POINTER(1);
524 static void filestore_destroy()
528 static BlueSkyRCStr *filestore_get(const gchar *key)
530 gchar *contents = NULL;
532 GError *error = NULL;
534 g_file_get_contents(key, &contents, &length, &error);
535 if (contents == NULL)
538 return bluesky_string_new(contents, length);
541 static void filestore_put(const gchar *key, BlueSkyRCStr *val)
543 g_file_set_contents(key, val->data, val->len, NULL);
546 static void filestore_submit(gpointer s, BlueSkyStoreAsync *async)
548 g_return_if_fail(async->status == ASYNC_NEW);
549 g_return_if_fail(async->op != STORE_OP_NONE);
553 async->data = filestore_get(async->key);
558 filestore_put(async->key, async->data);
563 g_warning("Uknown operation type for FileStore: %d\n", async->op);
567 bluesky_store_async_mark_complete(async);
570 static void filestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
574 static char *filestore_lookup_last(gpointer store, const char *prefix)
577 GDir *dir = g_dir_open(".", 0, NULL);
579 g_warning("Unable to open directory for listing");
584 while ((file = g_dir_read_name(dir)) != NULL) {
585 if (strncmp(file, prefix, strlen(prefix)) == 0) {
586 if (last == NULL || strcmp(file, last) > 0) {
588 last = g_strdup(file);
597 static BlueSkyStoreImplementation filestore_impl = {
598 .create = filestore_create,
599 .destroy = filestore_destroy,
600 .submit = filestore_submit,
601 .cleanup = filestore_cleanup,
602 .lookup_last = filestore_lookup_last,
605 /* A store implementation which simply discards all data, for testing. */
606 static gpointer nullstore_create(const gchar *path)
608 return (gpointer)nullstore_create;
611 static void nullstore_destroy(gpointer store)
615 static void nullstore_submit(gpointer s, BlueSkyStoreAsync *async)
617 bluesky_store_async_mark_complete(async);
620 static void nullstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
624 static BlueSkyStoreImplementation nullstore_impl = {
625 .create = nullstore_create,
626 .destroy = nullstore_destroy,
627 .submit = nullstore_submit,
628 .cleanup = nullstore_cleanup,
631 void bluesky_store_init()
633 store_implementations = g_hash_table_new(g_str_hash, g_str_equal);
634 notifier_thread_pool = g_thread_pool_new(notifier_task, NULL,
635 bluesky_max_threads, FALSE, NULL);
636 bluesky_store_register(&memstore_impl, "mem");
637 bluesky_store_register(&filestore_impl, "file");
638 bluesky_store_register(&nullstore_impl, "null");