From: Michael Vrable Date: Tue, 1 Dec 2009 21:39:42 +0000 (-0800) Subject: Work on a storage barrier operation. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=625d4ed1c01e01a5f2a508264f8605fb79a620fb;p=bluesky.git Work on a storage barrier operation. The barrier completes when all of a set of operations has completed. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 9ca0658..d00f7c3 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -36,6 +36,7 @@ typedef enum { STORE_OP_GET, STORE_OP_PUT, STORE_OP_DELETE, + STORE_OP_BARRIER, // Waits for other selected operations to complete } BlueSkyStoreOp; typedef enum { @@ -45,6 +46,7 @@ typedef enum { ASYNC_COMPLETE, // Operation finished, results available } BlueSkyAsyncStatus; +struct BlueSkyNotifierList; typedef struct { BlueSkyStore *store; @@ -60,10 +62,22 @@ typedef struct { BlueSkyRCStr *data; /* Data read/to write */ int result; /* Result code; 0 for success. */ + struct BlueSkyNotifierList *notifiers; gpointer store_private; /* For use by the storage implementation */ } BlueSkyStoreAsync; +/* Support for notification lists. These are lists of one-shot functions which + * can be called when certain events--primarily, competed storage + * events--occur. Multiple notifiers can be added, but no particular order is + * guaranteed for the notification functions to be called. */ +struct BlueSkyNotifierList { + struct BlueSkyNotifierList *next; + GFunc func; + BlueSkyStoreAsync *async; + gpointer user_data; // Passed to the function when called +}; + /* The abstraction layer for storage, allowing multiple implementations. */ typedef struct { /* Create a new store instance and return a handle to it. */ @@ -87,10 +101,15 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store); void bluesky_store_async_ref(BlueSkyStoreAsync *async); void bluesky_store_async_unref(BlueSkyStoreAsync *async); void bluesky_store_async_wait(BlueSkyStoreAsync *async); +void bluesky_store_async_add_notifier(BlueSkyStoreAsync *async, + GFunc func, gpointer user_data); void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async); void bluesky_store_async_submit(BlueSkyStoreAsync *async); void bluesky_store_sync(BlueSkyStore *store); +void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier, + BlueSkyStoreAsync *async); + #ifdef __cplusplus } #endif diff --git a/bluesky/file.c b/bluesky/file.c index 9f4d980..f7e8fca 100644 --- a/bluesky/file.c +++ b/bluesky/file.c @@ -182,6 +182,11 @@ void bluesky_block_fetch(BlueSkyFS *fs, BlueSkyBlock *block) block->type = BLUESKY_BLOCK_CACHED; } +static void finished(gpointer a, gpointer b) +{ + g_print("Barrier completed!\n"); +} + /* Write the given block to cloud-backed storage and mark it clean. */ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block) { @@ -207,6 +212,13 @@ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block) g_free(block->ref); block->ref = name; + BlueSkyStoreAsync *barrier = bluesky_store_async_new(fs->store); + barrier->op = STORE_OP_BARRIER; + bluesky_store_add_barrier(barrier, async); + bluesky_store_async_add_notifier(barrier, finished, NULL); + bluesky_store_async_submit(barrier); + bluesky_store_async_unref(barrier); + /* block->type = BLUESKY_BLOCK_CACHED; */ bluesky_string_unref(block->data); block->data = NULL; diff --git a/bluesky/store.c b/bluesky/store.c index 7c8123c..cc8a5e4 100644 --- a/bluesky/store.c +++ b/bluesky/store.c @@ -28,6 +28,13 @@ struct _BlueSkyStore { GHashTable *store_implementations; +/* Thread pool for calling notifier functions when an operation completes. + * These are called in a separate thread for locking reasons: we want to call + * the notifiers without the lock on the async object held, but completion + * occurs when the lock is held--so we need some way to defer the call. This + * isn't really optimal from a cache-locality standpoint. */ +static GThreadPool *notifier_thread_pool; + void bluesky_store_register(const BlueSkyStoreImplementation *impl, const gchar *name) { @@ -75,6 +82,7 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store) async->key = NULL; async->data = NULL; async->result = -1; + async->notifiers = NULL; async->store_private = NULL; return async; @@ -85,6 +93,8 @@ void bluesky_store_async_ref(BlueSkyStoreAsync *async) if (async == NULL) return; + g_return_if_fail(g_atomic_int_get(&async->refcount) > 0); + g_atomic_int_inc(&async->refcount); } @@ -123,37 +133,110 @@ void bluesky_store_async_wait(BlueSkyStoreAsync *async) g_mutex_unlock(async->lock); } +/* Add a notifier function to be called when the operation completes. */ +void bluesky_store_async_add_notifier(BlueSkyStoreAsync *async, + GFunc func, gpointer user_data) +{ + struct BlueSkyNotifierList *nl = g_new(struct BlueSkyNotifierList, 1); + nl->next = async->notifiers; + nl->func = func; + nl->async = async; bluesky_store_async_ref(async); + nl->user_data = user_data; + if (async->status == ASYNC_COMPLETE) { + g_thread_pool_push(notifier_thread_pool, nl, NULL); + } else { + async->notifiers = nl; + } +} + /* Mark an asynchronous operation as complete. This should only be called by - * the store implementations. The lock must be held when calling this - * function. */ + * the store implementations. The lock should be held when calling this + * function. Any notifier functions will be called, but in a separate thread + * and without the lock held. */ void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async) { - if (async->status != ASYNC_COMPLETE) { - g_mutex_lock(async->store->lock); - async->store->pending--; - if (async->store->pending == 0) - g_cond_broadcast(async->store->cond_idle); - g_mutex_unlock(async->store->lock); - } + g_return_if_fail(async->status != ASYNC_COMPLETE); + + g_mutex_lock(async->store->lock); + async->store->pending--; + if (async->store->pending == 0) + g_cond_broadcast(async->store->cond_idle); + g_mutex_unlock(async->store->lock); async->status = ASYNC_COMPLETE; g_cond_broadcast(async->completion_cond); + + while (async->notifiers != NULL) { + struct BlueSkyNotifierList *nl = async->notifiers; + async->notifiers = nl->next; + g_thread_pool_push(notifier_thread_pool, nl, NULL); + } +} + +static void test_notifier(gpointer a, gpointer u) +{ + g_print("Notifier called!\n"); } void bluesky_store_async_submit(BlueSkyStoreAsync *async) { BlueSkyStore *store = async->store; + /* Barriers are handled specially, and not handed down the storage + * implementation layer. */ + if (async->op = STORE_OP_BARRIER) { + async->status = ASYNC_RUNNING; + if (GPOINTER_TO_INT(async->store_private) == 0) + bluesky_store_async_mark_complete(async); + return; + } + g_mutex_lock(async->store->lock); async->store->pending++; g_mutex_unlock(async->store->lock); + bluesky_store_async_add_notifier(async, test_notifier, NULL); store->impl->submit(store->handle, async); if (bluesky_options.synchronous_stores) bluesky_store_async_wait(async); } +static void op_complete(gpointer a, gpointer b) +{ + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; + BlueSkyStoreAsync *barrier = (BlueSkyStoreAsync *)b; + + g_mutex_lock(barrier->lock); + barrier->store_private + = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) - 1); + if (GPOINTER_TO_INT(barrier->store_private) == 0 + && barrier->status != ASYNC_NEW) { + bluesky_store_async_mark_complete(barrier); + } + g_mutex_unlock(barrier->lock); +} + +/* Add the given operation to the barrier. The barrier will not complete until + * all operations added to it have completed. */ +void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier, + BlueSkyStoreAsync *async) +{ + g_return_if_fail(barrier->op == STORE_OP_BARRIER); + barrier->store_private + = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) + 1); + bluesky_store_async_add_notifier(async, op_complete, NULL); +} + +static void notifier_task(gpointer n, gpointer s) +{ + struct BlueSkyNotifierList *notifier = (struct BlueSkyNotifierList *)n; + + notifier->func(notifier->async, notifier->user_data); + bluesky_store_async_unref(notifier->async); + g_free(notifier); +} + void bluesky_store_sync(BlueSkyStore *store) { g_mutex_lock(store->lock); @@ -282,6 +365,8 @@ static BlueSkyStoreImplementation filestore_impl = { void bluesky_store_init() { store_implementations = g_hash_table_new(g_str_hash, g_str_equal); + notifier_thread_pool = g_thread_pool_new(notifier_task, NULL, -1, FALSE, + NULL); //bluesky_store_register(&memstore_impl, "mem"); //bluesky_store_register(&filestore_impl, "file"); }