From e00a825326686b864f5201ed27d6b2373682cf88 Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Tue, 17 Nov 2009 16:28:20 -0800 Subject: [PATCH] Allow storse to S3 to execute in the background, in parallel. --- bluesky/s3store.c | 64 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/bluesky/s3store.c b/bluesky/s3store.c index 3f4fb16..2ac2c3a 100644 --- a/bluesky/s3store.c +++ b/bluesky/s3store.c @@ -16,15 +16,25 @@ /* Interface to Amazon S3 storage. */ -/* Simple in-memory data store for test purposes. */ typedef struct { + GThreadPool *thread_pool; S3BucketContext bucket; uint8_t encryption_key[CRYPTO_KEY_SIZE]; } S3Store; +typedef struct { + enum { S3_GET, S3_PUT } op; + gchar *key; + BlueSkyRCStr *data; +} S3Op; + +static void s3store_task(gpointer s, gpointer o); + static gpointer s3store_new() { S3Store *store = g_new(S3Store, 1); + store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE, + NULL); store->bucket.bucketName = "mvrable-bluesky"; store->bucket.protocol = S3ProtocolHTTP; store->bucket.uriStyle = S3UriStylePath; @@ -125,22 +135,50 @@ static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key) static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val) { S3Store *store = (S3Store *)s; - BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(val, store->encryption_key); - struct put_info info; - info.val = encrypted; - info.offset = 0; + S3Op *op = g_new(S3Op, 1); + op->op = S3_PUT; + op->key = g_strdup(key); + bluesky_string_ref(val); + op->data = val; - struct S3PutObjectHandler handler; - handler.responseHandler.propertiesCallback = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.putObjectDataCallback = s3store_put_handler; + g_thread_pool_push(store->thread_pool, op, NULL); +} - g_print("Starting store of %s to S3 at %ld...\n", key, bluesky_now_hires()); - S3_put_object(&store->bucket, key, encrypted->len, NULL, NULL, - &handler, &info); +static void s3store_task(gpointer o, gpointer s) +{ + S3Store *store = (S3Store *)s; + S3Op *op = (S3Op *)o; + + g_print("Start task [key=%s]...\n", op->key); + + if (op->op == S3_PUT) { + BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(op->data, + store->encryption_key); + + struct put_info info; + info.val = encrypted; + info.offset = 0; + + struct S3PutObjectHandler handler; + handler.responseHandler.propertiesCallback + = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.putObjectDataCallback = s3store_put_handler; + + g_print("Starting store of %s to S3 at %ld...\n", + op->key, bluesky_now_hires()); + S3_put_object(&store->bucket, op->key, encrypted->len, NULL, NULL, + &handler, &info); + + bluesky_string_unref(encrypted); + } + + bluesky_string_unref(op->data); + g_free(op->key); + g_free(op); - /* TODO: unref encrypted */ + g_print("Finish task...\n"); } static BlueSkyStoreImplementation store_impl = { -- 2.20.1