X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fstore.c;h=f8b3a6dae0e5e4e64ae5d9453341d0c74e1ea99f;hb=8a2ef6b448a40c790e7df154c12b54932f3e62e0;hp=ec72ec2d0510fd84dd4230277febd7b90f10bfa8;hpb=c6d5eb1086d5da2a2e7548c5ce3b5efa9ba76f0c;p=bluesky.git diff --git a/bluesky/store.c b/bluesky/store.c index ec72ec2..f8b3a6d 100644 --- a/bluesky/store.c +++ b/bluesky/store.c @@ -20,10 +20,21 @@ struct _BlueSkyStore { const BlueSkyStoreImplementation *impl; gpointer handle; + + GMutex *lock; + GCond *cond_idle; + int pending; /* Count of operations not yet complete. */ }; GHashTable *store_implementations; +/* Thread pool for calling notifier functions when an operation completes. + * These are called in a separate thread for locking reasons: we want to call + * the notifiers without the lock on the async object held, but completion + * occurs when the lock is held--so we need some way to defer the call. This + * isn't really optimal from a cache-locality standpoint. */ +static GThreadPool *notifier_thread_pool; + void bluesky_store_register(const BlueSkyStoreImplementation *impl, const gchar *name) { @@ -45,6 +56,9 @@ BlueSkyStore *bluesky_store_new(const gchar *type) BlueSkyStore *store = g_new(BlueSkyStore, 1); store->impl = impl; store->handle = handle; + store->lock = g_mutex_new(); + store->cond_idle = g_cond_new(); + store->pending = 0; return store; } @@ -68,6 +82,7 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store) async->key = NULL; async->data = NULL; async->result = -1; + async->notifiers = NULL; async->store_private = NULL; return async; @@ -78,6 +93,8 @@ void bluesky_store_async_ref(BlueSkyStoreAsync *async) if (async == NULL) return; + g_return_if_fail(g_atomic_int_get(&async->refcount) > 0); + g_atomic_int_inc(&async->refcount); } @@ -93,7 +110,6 @@ void bluesky_store_async_unref(BlueSkyStoreAsync *async) g_free(async->key); bluesky_string_unref(async->data); g_free(async); - g_print("Freed async\n"); } } @@ -116,20 +132,118 @@ void bluesky_store_async_wait(BlueSkyStoreAsync *async) g_mutex_unlock(async->lock); } +/* Add a notifier function to be called when the operation completes. */ +void bluesky_store_async_add_notifier(BlueSkyStoreAsync *async, + GFunc func, gpointer user_data) +{ + struct BlueSkyNotifierList *nl = g_new(struct BlueSkyNotifierList, 1); + nl->next = async->notifiers; + nl->func = func; + nl->async = async; bluesky_store_async_ref(async); + nl->user_data = user_data; + if (async->status == ASYNC_COMPLETE) { + g_thread_pool_push(notifier_thread_pool, nl, NULL); + } else { + async->notifiers = nl; + } +} + /* Mark an asynchronous operation as complete. This should only be called by - * the store implementations. The lock must be held when calling this - * function. */ + * the store implementations. The lock should be held when calling this + * function. Any notifier functions will be called, but in a separate thread + * and without the lock held. */ void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async) { + g_return_if_fail(async->status != ASYNC_COMPLETE); + + g_mutex_lock(async->store->lock); + async->store->pending--; + if (async->store->pending == 0) + g_cond_broadcast(async->store->cond_idle); + g_mutex_unlock(async->store->lock); + async->status = ASYNC_COMPLETE; g_cond_broadcast(async->completion_cond); + + while (async->notifiers != NULL) { + struct BlueSkyNotifierList *nl = async->notifiers; + async->notifiers = nl->next; + g_thread_pool_push(notifier_thread_pool, nl, NULL); + } } void bluesky_store_async_submit(BlueSkyStoreAsync *async) { BlueSkyStore *store = async->store; + /* Barriers are handled specially, and not handed down the storage + * implementation layer. */ + if (async->op == STORE_OP_BARRIER) { + async->status = ASYNC_RUNNING; + if (GPOINTER_TO_INT(async->store_private) == 0) + bluesky_store_async_mark_complete(async); + return; + } + + g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "submit: %s %s", + async->op == STORE_OP_GET ? "GET" + : async->op == STORE_OP_PUT ? "PUT" + : async->op == STORE_OP_DELETE ? "DELETE" : "???", + async->key); + + g_mutex_lock(async->store->lock); + async->store->pending++; + g_mutex_unlock(async->store->lock); + store->impl->submit(store->handle, async); + + if (bluesky_options.synchronous_stores) + bluesky_store_async_wait(async); +} + +static void op_complete(gpointer a, gpointer b) +{ + BlueSkyStoreAsync *barrier = (BlueSkyStoreAsync *)b; + + g_mutex_lock(barrier->lock); + barrier->store_private + = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) - 1); + if (GPOINTER_TO_INT(barrier->store_private) == 0 + && barrier->status != ASYNC_NEW) { + bluesky_store_async_mark_complete(barrier); + } + g_mutex_unlock(barrier->lock); +} + +/* Add the given operation to the barrier. The barrier will not complete until + * all operations added to it have completed. */ +void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier, + BlueSkyStoreAsync *async) +{ + g_return_if_fail(barrier->op == STORE_OP_BARRIER); + barrier->store_private + = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) + 1); + bluesky_store_async_add_notifier(async, op_complete, barrier); +} + +static void notifier_task(gpointer n, gpointer s) +{ + struct BlueSkyNotifierList *notifier = (struct BlueSkyNotifierList *)n; + + notifier->func(notifier->async, notifier->user_data); + bluesky_store_async_unref(notifier->async); + g_free(notifier); +} + +void bluesky_store_sync(BlueSkyStore *store) +{ + g_mutex_lock(store->lock); + g_print("Waiting for pending store operations to complete...\n"); + while (store->pending > 0) { + g_cond_wait(store->cond_idle, store->lock); + } + g_mutex_unlock(store->lock); + g_print("Operations are complete.\n"); } /* Convenience wrappers that perform a single operation synchronously. */ @@ -138,7 +252,7 @@ BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key) BlueSkyStoreAsync *async = bluesky_store_async_new(store); async->op = STORE_OP_GET; async->key = g_strdup(key); - store->impl->submit(store->handle, async); + bluesky_store_async_submit(async); bluesky_store_async_wait(async); @@ -156,13 +270,12 @@ void bluesky_store_put(BlueSkyStore *store, async->key = g_strdup(key); bluesky_string_ref(val); async->data = val; - store->impl->submit(store->handle, async); + bluesky_store_async_submit(async); bluesky_store_async_wait(async); bluesky_store_async_unref(async); } -#if 0 /* Simple in-memory data store for test purposes. */ typedef struct { GMutex *lock; @@ -203,11 +316,37 @@ static void memstore_put(gpointer s, const gchar *key, BlueSkyRCStr *val) g_hash_table_insert(store->store, g_strdup(key), val); } +static void memstore_submit(gpointer s, BlueSkyStoreAsync *async) +{ + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + async->data = memstore_get(s, async->key); + break; + + case STORE_OP_PUT: + memstore_put(s, async->key, async->data); + break; + + default: + g_warning("Uknown operation type for MemStore: %d\n", async->op); + return; + } + + bluesky_store_async_mark_complete(async); +} + +static void memstore_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ +} + static BlueSkyStoreImplementation memstore_impl = { .create = memstore_create, .destroy = memstore_destroy, - .get = memstore_get, - .put = memstore_put, + .submit = memstore_submit, + .cleanup = memstore_cleanup, }; /* Store implementation which writes data as files to disk. */ @@ -220,7 +359,7 @@ static void filestore_destroy() { } -static BlueSkyRCStr *filestore_get(gpointer s, const gchar *key) +static BlueSkyRCStr *filestore_get(const gchar *key) { gchar *contents = NULL; gsize length; @@ -233,22 +372,49 @@ static BlueSkyRCStr *filestore_get(gpointer s, const gchar *key) return bluesky_string_new(contents, length); } -static void filestore_put(gpointer s, const gchar *key, BlueSkyRCStr *val) +static void filestore_put(const gchar *key, BlueSkyRCStr *val) { g_file_set_contents(key, val->data, val->len, NULL); } +static void filestore_submit(gpointer s, BlueSkyStoreAsync *async) +{ + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + async->data = filestore_get(async->key); + break; + + case STORE_OP_PUT: + filestore_put(async->key, async->data); + break; + + default: + g_warning("Uknown operation type for FileStore: %d\n", async->op); + return; + } + + bluesky_store_async_mark_complete(async); +} + +static void filestore_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ +} + static BlueSkyStoreImplementation filestore_impl = { .create = filestore_create, .destroy = filestore_destroy, - .get = filestore_get, - .put = filestore_put, + .submit = filestore_submit, + .cleanup = filestore_cleanup, }; -#endif void bluesky_store_init() { store_implementations = g_hash_table_new(g_str_hash, g_str_equal); - //bluesky_store_register(&memstore_impl, "mem"); - //bluesky_store_register(&filestore_impl, "file"); + notifier_thread_pool = g_thread_pool_new(notifier_task, NULL, -1, FALSE, + NULL); + bluesky_store_register(&memstore_impl, "mem"); + bluesky_store_register(&filestore_impl, "file"); }