From: Michael Vrable Date: Fri, 20 Nov 2009 04:33:55 +0000 (-0800) Subject: Rework the storage interface so that operations are asynchronous. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=c513d64c6a1f7c2ff2bad97db69e2f40ef642167;p=bluesky.git Rework the storage interface so that operations are asynchronous. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index a450085..7f5e646 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -29,6 +29,62 @@ BlueSkyFS *bluesky_deserialize_superblock(const gchar *buf); void bluesky_serialize_inode(GString *out, BlueSkyInode *inode); BlueSkyInode *bluesky_deserialize_inode(BlueSkyFS *fs, const gchar *buf); +/* Storage layer. Requests can be performed asynchronously, so these objects + * help keep track of operations in progress. */ +typedef enum { + STORE_OP_NONE, + STORE_OP_GET, + STORE_OP_PUT, + STORE_OP_DELETE, +} BlueSkyStoreOp; + +typedef enum { + ASYNC_NEW, // Operation not yet submitted to storage layer + ASYNC_PENDING, // Submitted to storage layer + ASYNC_RUNNING, // Operation is in progress + ASYNC_COMPLETE, // Operation finished, results available +} BlueSkyAsyncStatus; + +typedef struct { + BlueSkyStore *store; + + GMutex *lock; + GCond *completion_cond; /* Used to wait for operation to complete. */ + + BlueSkyAsyncStatus status; + + BlueSkyStoreOp op; + gchar *key; /* Key to read/write */ + BlueSkyRCStr *data; /* Data read/to write */ + + int result; /* Result code; 0 for success. */ + + gpointer store_private; /* For use by the storage implementation */ +} BlueSkyStoreAsync; + +/* The abstraction layer for storage, allowing multiple implementations. */ +typedef struct { + /* Create a new store instance and return a handle to it. */ + gpointer (*create)(); + + /* Clean up any resources used by this store. */ + void (*destroy)(gpointer store); + + /* Submit an operation (get/put/delete) to the storage layer to be + * performed asynchronously. */ + void (*submit)(gpointer store, BlueSkyStoreAsync *async); + + /* Clean up any implementation-private data in a BlueSkyStoreAsync. */ + void (*cleanup)(gpointer store, BlueSkyStoreAsync *async); +} BlueSkyStoreImplementation; + +void bluesky_store_register(const BlueSkyStoreImplementation *impl, + const gchar *name); + +BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store); +void bluesky_store_async_wait(BlueSkyStoreAsync *async); +void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async); + #ifdef __cplusplus } #endif diff --git a/bluesky/bluesky.h b/bluesky/bluesky.h index 21fd855..f9e97f5 100644 --- a/bluesky/bluesky.h +++ b/bluesky/bluesky.h @@ -56,23 +56,6 @@ BlueSkyRCStr *bluesky_crypt_decrypt(BlueSkyRCStr *in, const uint8_t *key); /* Storage interface. This presents a key-value store abstraction, and can * have multiple implementations: in-memory, on-disk, in-cloud. */ -typedef struct { - /* Create a new store instance and return a handle to it. */ - gpointer (*create)(); - - /* Clean up any resources used by this store. */ - void (*destroy)(gpointer store); - - /* Fetch an item with the given name, or return NULL if not found. */ - BlueSkyRCStr * (*get)(gpointer store, const gchar *key); - - /* Store an item to the given key name. */ - void (*put)(gpointer store, const gchar *key, BlueSkyRCStr *val); -} BlueSkyStoreImplementation; - -void bluesky_store_register(const BlueSkyStoreImplementation *impl, - const gchar *name); - struct _BlueSkyStore; typedef struct _BlueSkyStore BlueSkyStore; diff --git a/bluesky/s3store.c b/bluesky/s3store.c index d742e3b..5ab42e1 100644 --- a/bluesky/s3store.c +++ b/bluesky/s3store.c @@ -81,57 +81,41 @@ static void s3store_response_callback(S3Status status, } } -static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key) +static void s3store_task(gpointer a, gpointer s) { + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; S3Store *store = (S3Store *)s; - struct get_info info; - info.buf = g_string_new(""); - info.success = 0; + g_print("Start task [key=%s]...\n", async->key); + async->status = ASYNC_RUNNING; - struct S3GetObjectHandler handler; - handler.responseHandler.propertiesCallback = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.getObjectDataCallback = s3store_get_handler; + if (async->op == STORE_OP_GET) { + struct get_info info; + info.buf = g_string_new(""); + info.success = 0; - g_print("Starting fetch of %s from S3...\n", key); - S3_get_object(&store->bucket, key, NULL, 0, 0, NULL, - &handler, &info); - - if (!info.success) { - g_string_free(info.buf, TRUE); - return NULL; - } - - BlueSkyRCStr *raw, *decrypted; - raw = bluesky_string_new_from_gstring(info.buf); - decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); - bluesky_string_unref(raw); - return decrypted; -} - -static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val) -{ - S3Store *store = (S3Store *)s; - - S3Op *op = g_new(S3Op, 1); - op->op = S3_PUT; - op->key = g_strdup(key); - bluesky_string_ref(val); - op->data = val; - - g_thread_pool_push(store->thread_pool, op, NULL); -} - -static void s3store_task(gpointer o, gpointer s) -{ - S3Store *store = (S3Store *)s; - S3Op *op = (S3Op *)o; + struct S3GetObjectHandler handler; + handler.responseHandler.propertiesCallback = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.getObjectDataCallback = s3store_get_handler; - g_print("Start task [key=%s]...\n", op->key); + g_print("Starting fetch of %s from S3...\n", async->key); + S3_get_object(&store->bucket, async->key, NULL, 0, 0, NULL, + &handler, &info); - if (op->op == S3_PUT) { - BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(op->data, + if (info.success) { + BlueSkyRCStr *raw, *decrypted; + raw = bluesky_string_new_from_gstring(info.buf); + decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); + bluesky_string_unref(raw); + async->data = decrypted; + async->result = 0; + } else { + g_string_free(info.buf, TRUE); + } + + } else if (async->op == STORE_OP_PUT) { + BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(async->data, store->encryption_key); struct put_info info; @@ -145,18 +129,19 @@ static void s3store_task(gpointer o, gpointer s) handler.putObjectDataCallback = s3store_put_handler; g_print("Starting store of %s to S3 at %ld...\n", - op->key, bluesky_now_hires()); - S3_put_object(&store->bucket, op->key, encrypted->len, NULL, NULL, + async->key, bluesky_now_hires()); + S3_put_object(&store->bucket, async->key, encrypted->len, NULL, NULL, &handler, &info); bluesky_string_unref(encrypted); + + async->result = 0; } - bluesky_string_unref(op->data); - g_free(op->key); - g_free(op); + // TODO: Deallocate resources g_print("Finish task...\n"); + bluesky_store_async_mark_complete(async); } static gpointer s3store_new() @@ -189,11 +174,41 @@ static void s3store_destroy(gpointer store) g_free(store); } +static void s3store_submit(gpointer s, BlueSkyStoreAsync *async) +{ + S3Store *store = (S3Store *)s; + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + g_thread_pool_push(store->thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for S3Store: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } +} + +static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ + GString *buf = (GString *)async->store_private; + + if (buf != NULL) { + g_string_free(buf, TRUE); + async->store_private = NULL; + } +} + static BlueSkyStoreImplementation store_impl = { .create = s3store_new, .destroy = s3store_destroy, - .get = s3store_get, - .put = s3store_put, + .submit = s3store_submit, + .cleanup = s3store_cleanup, }; void bluesky_store_init_s3(void) diff --git a/bluesky/store.c b/bluesky/store.c index 52cc6b3..13a080f 100644 --- a/bluesky/store.c +++ b/bluesky/store.c @@ -14,7 +14,8 @@ /* Interaction with cloud storage. We expose very simple GET/PUT style * interface, which different backends can implement. Available backends - * (will) include Amazon S3 and a simple local store for testing purposes. */ + * (will) include Amazon S3 and a simple local store for testing purposes. + * Operations may be performed asynchronously. */ struct _BlueSkyStore { const BlueSkyStoreImplementation *impl; @@ -53,17 +54,79 @@ void bluesky_store_free(BlueSkyStore *store) g_free(store); } +BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store) +{ + BlueSkyStoreAsync *async; + + async = g_new(BlueSkyStoreAsync, 1); + async->store = store; + async->lock = g_mutex_new(); + async->completion_cond = g_cond_new(); + async->status = ASYNC_NEW; + async->op = STORE_OP_NONE; + async->key = NULL; + async->data = NULL; + async->result = -1; + async->store_private = NULL; + + return async; +} + +/* Block until the given operation has completed. */ +void bluesky_store_async_wait(BlueSkyStoreAsync *async) +{ + g_return_if_fail(async != NULL); + g_mutex_lock(async->lock); + + if (async->status == ASYNC_NEW) { + g_error("bluesky_store_async_wait on a new async object!\n"); + g_mutex_unlock(async->lock); + return; + } + + while (async->status != ASYNC_COMPLETE) { + g_cond_wait(async->completion_cond, async->lock); + } +} + +/* Mark an asynchronous operation as complete. This should only be called by + * the store implementations. The lock must be held when calling this + * function. */ +void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async) +{ + async->status = ASYNC_COMPLETE; + g_cond_broadcast(async->completion_cond); +} + +/* Convenience wrappers that perform a single operation synchronously. */ BlueSkyRCStr *bluesky_store_get(BlueSkyStore *store, const gchar *key) { - return store->impl->get(store->handle, key); + BlueSkyStoreAsync *async = bluesky_store_async_new(store); + async->op = STORE_OP_GET; + async->key = g_strdup(key); + store->impl->submit(store->handle, async); + + bluesky_store_async_wait(async); + + BlueSkyRCStr *data = async->data; + bluesky_string_ref(data); + return data; } void bluesky_store_put(BlueSkyStore *store, const gchar *key, BlueSkyRCStr *val) { - store->impl->put(store->handle, key, val); + BlueSkyStoreAsync *async = bluesky_store_async_new(store); + async->op = STORE_OP_PUT; + async->key = g_strdup(key); + bluesky_string_ref(val); + async->data = val; + store->impl->submit(store->handle, async); + + bluesky_store_async_wait(async); } +#if 0 /* Simple in-memory data store for test purposes. */ typedef struct { GMutex *lock; @@ -145,10 +208,11 @@ static BlueSkyStoreImplementation filestore_impl = { .get = filestore_get, .put = filestore_put, }; +#endif void bluesky_store_init() { store_implementations = g_hash_table_new(g_str_hash, g_str_equal); - bluesky_store_register(&memstore_impl, "mem"); - bluesky_store_register(&filestore_impl, "file"); + //bluesky_store_register(&memstore_impl, "mem"); + //bluesky_store_register(&filestore_impl, "file"); }