X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fstore.c;h=7c8123c0aef117faec650ab8dc7dbf53c2721a8c;hb=ccd8dcd110e04b43ae1d04a3b1ab058cc1761d8b;hp=13a080ffbec151c5956ed4310af059c644e2d1f4;hpb=c513d64c6a1f7c2ff2bad97db69e2f40ef642167;p=bluesky.git diff --git a/bluesky/store.c b/bluesky/store.c index 13a080f..7c8123c 100644 --- a/bluesky/store.c +++ b/bluesky/store.c @@ -20,6 +20,10 @@ struct _BlueSkyStore { const BlueSkyStoreImplementation *impl; gpointer handle; + + GMutex *lock; + GCond *cond_idle; + int pending; /* Count of operations not yet complete. */ }; GHashTable *store_implementations; @@ -45,6 +49,9 @@ BlueSkyStore *bluesky_store_new(const gchar *type) BlueSkyStore *store = g_new(BlueSkyStore, 1); store->impl = impl; store->handle = handle; + store->lock = g_mutex_new(); + store->cond_idle = g_cond_new(); + store->pending = 0; return store; } @@ -62,6 +69,7 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store) async->store = store; async->lock = g_mutex_new(); async->completion_cond = g_cond_new(); + async->refcount = 1; async->status = ASYNC_NEW; async->op = STORE_OP_NONE; async->key = NULL; @@ -72,6 +80,30 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store) return async; } +void bluesky_store_async_ref(BlueSkyStoreAsync *async) +{ + if (async == NULL) + return; + + g_atomic_int_inc(&async->refcount); +} + +void bluesky_store_async_unref(BlueSkyStoreAsync *async) +{ + if (async == NULL) + return; + + if (g_atomic_int_dec_and_test(&async->refcount)) { + async->store->impl->cleanup(async->store->handle, async); + g_mutex_free(async->lock); + g_cond_free(async->completion_cond); + g_free(async->key); + bluesky_string_unref(async->data); + g_free(async); + g_print("Freed async\n"); + } +} + /* Block until the given operation has completed. */ void bluesky_store_async_wait(BlueSkyStoreAsync *async) { @@ -87,6 +119,8 @@ void bluesky_store_async_wait(BlueSkyStoreAsync *async) while (async->status != ASYNC_COMPLETE) { g_cond_wait(async->completion_cond, async->lock); } + + g_mutex_unlock(async->lock); } /* Mark an asynchronous operation as complete. This should only be called by @@ -94,22 +128,56 @@ void bluesky_store_async_wait(BlueSkyStoreAsync *async) * function. */ void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async) { + if (async->status != ASYNC_COMPLETE) { + g_mutex_lock(async->store->lock); + async->store->pending--; + if (async->store->pending == 0) + g_cond_broadcast(async->store->cond_idle); + g_mutex_unlock(async->store->lock); + } + async->status = ASYNC_COMPLETE; g_cond_broadcast(async->completion_cond); } +void bluesky_store_async_submit(BlueSkyStoreAsync *async) +{ + BlueSkyStore *store = async->store; + + g_mutex_lock(async->store->lock); + async->store->pending++; + g_mutex_unlock(async->store->lock); + + store->impl->submit(store->handle, async); + + if (bluesky_options.synchronous_stores) + bluesky_store_async_wait(async); +} + +void bluesky_store_sync(BlueSkyStore *store) +{ + g_mutex_lock(store->lock); + g_print("Waiting for pending store operations to complete...\n"); + while (store->pending > 0) { + g_cond_wait(store->cond_idle, store->lock); + } + g_mutex_unlock(store->lock); + g_print("Operations are complete.\n"); +} + /* Convenience wrappers that perform a single operation synchronously. */ BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key) { BlueSkyStoreAsync *async = bluesky_store_async_new(store); async->op = STORE_OP_GET; async->key = g_strdup(key); - store->impl->submit(store->handle, async); + bluesky_store_async_submit(async); bluesky_store_async_wait(async); BlueSkyRCStr *data = async->data; bluesky_string_ref(data); + bluesky_store_async_unref(async); return data; } @@ -121,9 +189,10 @@ void bluesky_store_put(BlueSkyStore *store, async->key = g_strdup(key); bluesky_string_ref(val); async->data = val; - store->impl->submit(store->handle, async); + bluesky_store_async_submit(async); bluesky_store_async_wait(async); + bluesky_store_async_unref(async); } #if 0