X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fstore.c;h=888bd1b07b490b6ae305377c2a8c8c1809c8ac9e;hb=8e686a50b321c7d2d154bd6121dd4a1e982f595d;hp=022deeb0ca2d4f8775b4a58a227128a086b3e5f4;hpb=0ef7fc934daee6ded318e3d52023521b758b295e;p=bluesky.git diff --git a/bluesky/store.c b/bluesky/store.c index 022deeb..888bd1b 100644 --- a/bluesky/store.c +++ b/bluesky/store.c @@ -45,13 +45,25 @@ BlueSkyStore *bluesky_store_new(const gchar *type) { const BlueSkyStoreImplementation *impl; - impl = g_hash_table_lookup(store_implementations, type); - if (impl == NULL) + gchar *scheme, *path; + scheme = g_strdup(type); + path = strchr(scheme, ':'); + if (path != NULL) { + *path = '\0'; + path++; + } + + impl = g_hash_table_lookup(store_implementations, scheme); + if (impl == NULL) { + g_free(scheme); return NULL; + } - gpointer handle = impl->create(); - if (handle == NULL) + gpointer handle = impl->create(path); + if (handle == NULL) { + g_free(scheme); return NULL; + } BlueSkyStore *store = g_new(BlueSkyStore, 1); store->impl = impl; @@ -59,6 +71,7 @@ BlueSkyStore *bluesky_store_new(const gchar *type) store->lock = g_mutex_new(); store->cond_idle = g_cond_new(); store->pending = 0; + g_free(scheme); return store; } @@ -83,11 +96,18 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store) async->data = NULL; async->result = -1; async->notifiers = NULL; + async->notifier_count = 0; + async->barrier = NULL; async->store_private = NULL; return async; } +gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async) +{ + return async->store->handle; +} + void bluesky_store_async_ref(BlueSkyStoreAsync *async) { if (async == NULL) @@ -110,8 +130,6 @@ void bluesky_store_async_unref(BlueSkyStoreAsync *async) g_free(async->key); bluesky_string_unref(async->data); g_free(async); - g_log("bluesky/store", G_LOG_LEVEL_DEBUG, - "freeing async"); } } @@ -127,7 +145,8 @@ void bluesky_store_async_wait(BlueSkyStoreAsync *async) return; } - while (async->status != ASYNC_COMPLETE) { + while (async->status != ASYNC_COMPLETE + || g_atomic_int_get(&async->notifier_count) > 0) { g_cond_wait(async->completion_cond, async->lock); } @@ -139,15 +158,34 @@ void bluesky_store_async_add_notifier(BlueSkyStoreAsync *async, GFunc func, gpointer user_data) { struct BlueSkyNotifierList *nl = g_new(struct BlueSkyNotifierList, 1); + g_mutex_lock(async->lock); nl->next = async->notifiers; nl->func = func; nl->async = async; bluesky_store_async_ref(async); nl->user_data = user_data; + g_atomic_int_inc(&async->notifier_count); if (async->status == ASYNC_COMPLETE) { g_thread_pool_push(notifier_thread_pool, nl, NULL); } else { async->notifiers = nl; } + g_mutex_unlock(async->lock); +} + +static void op_complete(gpointer a, gpointer b) +{ + BlueSkyStoreAsync *barrier = (BlueSkyStoreAsync *)b; + + bluesky_store_async_ref(barrier); + g_mutex_lock(barrier->lock); + barrier->store_private + = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) - 1); + if (GPOINTER_TO_INT(barrier->store_private) == 0 + && barrier->status != ASYNC_NEW) { + bluesky_store_async_mark_complete(barrier); + } + g_mutex_unlock(barrier->lock); + bluesky_store_async_unref(barrier); } /* Mark an asynchronous operation as complete. This should only be called by @@ -158,6 +196,8 @@ void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async) { g_return_if_fail(async->status != ASYNC_COMPLETE); + bluesky_time_hires elapsed = bluesky_now_hires() - async->start_time; + g_mutex_lock(async->store->lock); async->store->pending--; if (async->store->pending == 0) @@ -167,17 +207,34 @@ void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async) async->status = ASYNC_COMPLETE; g_cond_broadcast(async->completion_cond); + if (async->barrier != NULL && async->notifiers == NULL) + op_complete(async, async->barrier); + while (async->notifiers != NULL) { struct BlueSkyNotifierList *nl = async->notifiers; async->notifiers = nl->next; g_thread_pool_push(notifier_thread_pool, nl, NULL); } + + g_log("bluesky/store", G_LOG_LEVEL_DEBUG, + "[%p] complete: elapsed = %"PRIi64" ns", + async, elapsed); } void bluesky_store_async_submit(BlueSkyStoreAsync *async) { BlueSkyStore *store = async->store; + async->start_time = bluesky_now_hires(); + + g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "[%p] submit: %s %s", + async, + async->op == STORE_OP_GET ? "GET" + : async->op == STORE_OP_PUT ? "PUT" + : async->op == STORE_OP_DELETE ? "DELETE" + : async->op == STORE_OP_BARRIER ? "BARRIER" : "???", + async->key); + /* Barriers are handled specially, and not handed down the storage * implementation layer. */ if (async->op == STORE_OP_BARRIER) { @@ -187,12 +244,6 @@ void bluesky_store_async_submit(BlueSkyStoreAsync *async) return; } - g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "submit: %s %s", - async->op == STORE_OP_GET ? "GET" - : async->op == STORE_OP_PUT ? "PUT" - : async->op == STORE_OP_DELETE ? "DELETE" : "???", - async->key); - g_mutex_lock(async->store->lock); async->store->pending++; g_mutex_unlock(async->store->lock); @@ -203,29 +254,26 @@ void bluesky_store_async_submit(BlueSkyStoreAsync *async) bluesky_store_async_wait(async); } -static void op_complete(gpointer a, gpointer b) -{ - BlueSkyStoreAsync *barrier = (BlueSkyStoreAsync *)b; - - g_mutex_lock(barrier->lock); - barrier->store_private - = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) - 1); - if (GPOINTER_TO_INT(barrier->store_private) == 0 - && barrier->status != ASYNC_NEW) { - bluesky_store_async_mark_complete(barrier); - } - g_mutex_unlock(barrier->lock); -} - /* Add the given operation to the barrier. The barrier will not complete until * all operations added to it have completed. */ void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier, BlueSkyStoreAsync *async) { g_return_if_fail(barrier->op == STORE_OP_BARRIER); + + g_mutex_lock(barrier->lock); barrier->store_private = GINT_TO_POINTER(GPOINTER_TO_INT(barrier->store_private) + 1); - bluesky_store_async_add_notifier(async, op_complete, barrier); + g_mutex_unlock(barrier->lock); + + g_mutex_lock(async->lock); + if (async->barrier == NULL) { + async->barrier = barrier; + } else { + g_warning("Adding async to more than one barrier!\n"); + bluesky_store_async_add_notifier(async, op_complete, barrier); + } + g_mutex_unlock(async->lock); } static void notifier_task(gpointer n, gpointer s) @@ -233,6 +281,13 @@ static void notifier_task(gpointer n, gpointer s) struct BlueSkyNotifierList *notifier = (struct BlueSkyNotifierList *)n; notifier->func(notifier->async, notifier->user_data); + if (g_atomic_int_dec_and_test(¬ifier->async->notifier_count)) { + g_mutex_lock(notifier->async->lock); + if (notifier->async->barrier != NULL) + op_complete(notifier->async, notifier->async->barrier); + g_cond_broadcast(notifier->async->completion_cond); + g_mutex_unlock(notifier->async->lock); + } bluesky_store_async_unref(notifier->async); g_free(notifier); } @@ -278,7 +333,6 @@ void bluesky_store_put(BlueSkyStore *store, bluesky_store_async_unref(async); } -#if 0 /* Simple in-memory data store for test purposes. */ typedef struct { GMutex *lock; @@ -287,7 +341,7 @@ typedef struct { GHashTable *store; } MemStore; -static gpointer memstore_create() +static gpointer memstore_create(const gchar *path) { MemStore *store = g_new(MemStore, 1); store->lock = g_mutex_new(); @@ -319,15 +373,41 @@ static void memstore_put(gpointer s, const gchar *key, BlueSkyRCStr *val) g_hash_table_insert(store->store, g_strdup(key), val); } +static void memstore_submit(gpointer s, BlueSkyStoreAsync *async) +{ + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + async->data = memstore_get(s, async->key); + break; + + case STORE_OP_PUT: + memstore_put(s, async->key, async->data); + break; + + default: + g_warning("Uknown operation type for MemStore: %d\n", async->op); + return; + } + + bluesky_store_async_mark_complete(async); +} + +static void memstore_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ +} + static BlueSkyStoreImplementation memstore_impl = { .create = memstore_create, .destroy = memstore_destroy, - .get = memstore_get, - .put = memstore_put, + .submit = memstore_submit, + .cleanup = memstore_cleanup, }; /* Store implementation which writes data as files to disk. */ -static gpointer filestore_create() +static gpointer filestore_create(const gchar *path) { return GINT_TO_POINTER(1); } @@ -336,7 +416,7 @@ static void filestore_destroy() { } -static BlueSkyRCStr *filestore_get(gpointer s, const gchar *key) +static BlueSkyRCStr *filestore_get(const gchar *key) { gchar *contents = NULL; gsize length; @@ -349,24 +429,51 @@ static BlueSkyRCStr *filestore_get(gpointer s, const gchar *key) return bluesky_string_new(contents, length); } -static void filestore_put(gpointer s, const gchar *key, BlueSkyRCStr *val) +static void filestore_put(const gchar *key, BlueSkyRCStr *val) { g_file_set_contents(key, val->data, val->len, NULL); } +static void filestore_submit(gpointer s, BlueSkyStoreAsync *async) +{ + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + async->data = filestore_get(async->key); + async->result = 0; + break; + + case STORE_OP_PUT: + filestore_put(async->key, async->data); + async->result = 0; + break; + + default: + g_warning("Uknown operation type for FileStore: %d\n", async->op); + return; + } + + bluesky_store_async_mark_complete(async); +} + +static void filestore_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ +} + static BlueSkyStoreImplementation filestore_impl = { .create = filestore_create, .destroy = filestore_destroy, - .get = filestore_get, - .put = filestore_put, + .submit = filestore_submit, + .cleanup = filestore_cleanup, }; -#endif void bluesky_store_init() { store_implementations = g_hash_table_new(g_str_hash, g_str_equal); notifier_thread_pool = g_thread_pool_new(notifier_task, NULL, -1, FALSE, NULL); - //bluesky_store_register(&memstore_impl, "mem"); - //bluesky_store_register(&filestore_impl, "file"); + bluesky_store_register(&memstore_impl, "mem"); + bluesky_store_register(&filestore_impl, "file"); }