From: Michael Vrable Date: Tue, 24 Nov 2009 21:18:50 +0000 (-0800) Subject: More work on synchronous/asynchronous operations. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=ccd8dcd110e04b43ae1d04a3b1ab058cc1761d8b;p=bluesky.git More work on synchronous/asynchronous operations. Allow operations to be made synchronous for benchmarking, and add in a storage-wide sync() call. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 0dd6446..9ca0658 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -89,6 +89,7 @@ void bluesky_store_async_unref(BlueSkyStoreAsync *async); void bluesky_store_async_wait(BlueSkyStoreAsync *async); void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async); void bluesky_store_async_submit(BlueSkyStoreAsync *async); +void bluesky_store_sync(BlueSkyStore *store); #ifdef __cplusplus } diff --git a/bluesky/bluesky.h b/bluesky/bluesky.h index f9e97f5..3bdfd59 100644 --- a/bluesky/bluesky.h +++ b/bluesky/bluesky.h @@ -17,6 +17,14 @@ extern "C" { #endif +/* Various options to tweak for performance benchmarking purposes. */ +typedef struct { + /* Perform all get/put operations synchronously. */ + int synchronous_stores; +} BlueSkyOptions; + +extern BlueSkyOptions bluesky_options; + /* BlueSky status and error codes. Various frontends should translate these to * the appropriate error code for whatever protocol they implement. */ typedef enum { diff --git a/bluesky/init.c b/bluesky/init.c index 2693c90..4b7ec93 100644 --- a/bluesky/init.c +++ b/bluesky/init.c @@ -13,6 +13,8 @@ #include "bluesky-private.h" +BlueSkyOptions bluesky_options; + /* BlueSky library initialization. */ void bluesky_store_init_s3(void); @@ -23,6 +25,8 @@ void bluesky_init(void) g_thread_init(NULL); bluesky_crypt_init(); + //bluesky_options.synchronous_stores = 1; + bluesky_store_init(); bluesky_store_init_s3(); } diff --git a/bluesky/inode.c b/bluesky/inode.c index 44ef63c..e0d237b 100644 --- a/bluesky/inode.c +++ b/bluesky/inode.c @@ -207,7 +207,12 @@ void bluesky_inode_flush(BlueSkyFS *fs, BlueSkyInode *inode) char key[64]; sprintf(key, "inode-%016"PRIx64, inode->inum); - bluesky_store_put(fs->store, key, data); + BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); + async->op = STORE_OP_PUT; + async->key = g_strdup(key); + async->data = data; + bluesky_store_async_submit(async); + bluesky_store_async_unref(async); } /* Fetch an inode from stable storage. */ @@ -234,5 +239,12 @@ void bluesky_superblock_flush(BlueSkyFS *fs) g_print("Syncing superblock...\n"); - bluesky_store_put(fs->store, "superblock", data); + BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store); + async->op = STORE_OP_PUT; + async->key = g_strdup("superblock"); + async->data = data; + bluesky_store_async_submit(async); + bluesky_store_async_unref(async); + + bluesky_store_sync(fs->store); } diff --git a/bluesky/store.c b/bluesky/store.c index b60bd05..7c8123c 100644 --- a/bluesky/store.c +++ b/bluesky/store.c @@ -20,6 +20,10 @@ struct _BlueSkyStore { const BlueSkyStoreImplementation *impl; gpointer handle; + + GMutex *lock; + GCond *cond_idle; + int pending; /* Count of operations not yet complete. */ }; GHashTable *store_implementations; @@ -45,6 +49,9 @@ BlueSkyStore *bluesky_store_new(const gchar *type) BlueSkyStore *store = g_new(BlueSkyStore, 1); store->impl = impl; store->handle = handle; + store->lock = g_mutex_new(); + store->cond_idle = g_cond_new(); + store->pending = 0; return store; } @@ -121,6 +128,14 @@ void bluesky_store_async_wait(BlueSkyStoreAsync *async) * function. */ void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async) { + if (async->status != ASYNC_COMPLETE) { + g_mutex_lock(async->store->lock); + async->store->pending--; + if (async->store->pending == 0) + g_cond_broadcast(async->store->cond_idle); + g_mutex_unlock(async->store->lock); + } + async->status = ASYNC_COMPLETE; g_cond_broadcast(async->completion_cond); } @@ -129,7 +144,25 @@ void bluesky_store_async_submit(BlueSkyStoreAsync *async) { BlueSkyStore *store = async->store; + g_mutex_lock(async->store->lock); + async->store->pending++; + g_mutex_unlock(async->store->lock); + store->impl->submit(store->handle, async); + + if (bluesky_options.synchronous_stores) + bluesky_store_async_wait(async); +} + +void bluesky_store_sync(BlueSkyStore *store) +{ + g_mutex_lock(store->lock); + g_print("Waiting for pending store operations to complete...\n"); + while (store->pending > 0) { + g_cond_wait(store->cond_idle, store->lock); + } + g_mutex_unlock(store->lock); + g_print("Operations are complete.\n"); } /* Convenience wrappers that perform a single operation synchronously. */