From c6d5eb1086d5da2a2e7548c5ce3b5efa9ba76f0c Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Sun, 22 Nov 2009 13:09:54 -0800 Subject: [PATCH] Add reference counting to asynchronous storage operations. --- bluesky/bluesky-private.h | 5 +++++ bluesky/file.c | 11 ++++++++++- bluesky/s3store.c | 4 ++-- bluesky/store.c | 36 ++++++++++++++++++++++++++++++++++++ 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 7f5e646..0dd6446 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -51,6 +51,8 @@ typedef struct { GMutex *lock; GCond *completion_cond; /* Used to wait for operation to complete. */ + gint refcount; /* Reference count for destruction. */ + BlueSkyAsyncStatus status; BlueSkyStoreOp op; @@ -82,8 +84,11 @@ void bluesky_store_register(const BlueSkyStoreImplementation *impl, const gchar *name); BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store); +void bluesky_store_async_ref(BlueSkyStoreAsync *async); +void bluesky_store_async_unref(BlueSkyStoreAsync *async); void bluesky_store_async_wait(BlueSkyStoreAsync *async); void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async); +void bluesky_store_async_submit(BlueSkyStoreAsync *async); #ifdef __cplusplus } diff --git a/bluesky/file.c b/bluesky/file.c index ccb45e4..f9f6444 100644 --- a/bluesky/file.c +++ b/bluesky/file.c @@ -194,7 +194,16 @@ void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block) g_checksum_update(csum, data->data, data->len); gchar *name = g_strdup(g_checksum_get_string(csum)); - bluesky_store_put(fs->store, name, data); + /* Store the file data asynchronously, and don't bother waiting for a + * response. */ + BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); + async->op = STORE_OP_PUT; + async->key = g_strdup(name); + bluesky_string_ref(data); + async->data = data; + bluesky_store_async_submit(async); + bluesky_store_async_unref(async); + g_free(block->ref); block->ref = name; diff --git a/bluesky/s3store.c b/bluesky/s3store.c index 5ab42e1..bf7c7cd 100644 --- a/bluesky/s3store.c +++ b/bluesky/s3store.c @@ -138,10 +138,9 @@ static void s3store_task(gpointer a, gpointer s) async->result = 0; } - // TODO: Deallocate resources - g_print("Finish task...\n"); bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); } static gpointer s3store_new() @@ -184,6 +183,7 @@ static void s3store_submit(gpointer s, BlueSkyStoreAsync *async) case STORE_OP_GET: case STORE_OP_PUT: async->status = ASYNC_PENDING; + bluesky_store_async_ref(async); g_thread_pool_push(store->thread_pool, async, NULL); break; diff --git a/bluesky/store.c b/bluesky/store.c index 13a080f..ec72ec2 100644 --- a/bluesky/store.c +++ b/bluesky/store.c @@ -62,6 +62,7 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store) async->store = store; async->lock = g_mutex_new(); async->completion_cond = g_cond_new(); + async->refcount = 1; async->status = ASYNC_NEW; async->op = STORE_OP_NONE; async->key = NULL; @@ -72,6 +73,30 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store) return async; } +void bluesky_store_async_ref(BlueSkyStoreAsync *async) +{ + if (async == NULL) + return; + + g_atomic_int_inc(&async->refcount); +} + +void bluesky_store_async_unref(BlueSkyStoreAsync *async) +{ + if (async == NULL) + return; + + if (g_atomic_int_dec_and_test(&async->refcount)) { + async->store->impl->cleanup(async->store->handle, async); + g_mutex_free(async->lock); + g_cond_free(async->completion_cond); + g_free(async->key); + bluesky_string_unref(async->data); + g_free(async); + g_print("Freed async\n"); + } +} + /* Block until the given operation has completed. */ void bluesky_store_async_wait(BlueSkyStoreAsync *async) { @@ -87,6 +112,8 @@ void bluesky_store_async_wait(BlueSkyStoreAsync *async) while (async->status != ASYNC_COMPLETE) { g_cond_wait(async->completion_cond, async->lock); } + + g_mutex_unlock(async->lock); } /* Mark an asynchronous operation as complete. This should only be called by @@ -98,6 +125,13 @@ void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async) g_cond_broadcast(async->completion_cond); } +void bluesky_store_async_submit(BlueSkyStoreAsync *async) +{ + BlueSkyStore *store = async->store; + + store->impl->submit(store->handle, async); +} + /* Convenience wrappers that perform a single operation synchronously. */ BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key) { @@ -110,6 +144,7 @@ BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key) BlueSkyRCStr *data = async->data; bluesky_string_ref(data); + bluesky_store_async_unref(async); return data; } @@ -124,6 +159,7 @@ void bluesky_store_put(BlueSkyStore *store, store->impl->submit(store->handle, async); bluesky_store_async_wait(async); + bluesky_store_async_unref(async); } #if 0 -- 2.20.1