X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fs3store.c;h=5ab42e130552017e7a1433297256971d6b7974df;hb=c513d64c6a1f7c2ff2bad97db69e2f40ef642167;hp=d742e3b19ec2803c0d740269868be955681faf91;hpb=b7e08dcf6552eb8977ccef56f00e775da8133cf8;p=bluesky.git diff --git a/bluesky/s3store.c b/bluesky/s3store.c index d742e3b..5ab42e1 100644 --- a/bluesky/s3store.c +++ b/bluesky/s3store.c @@ -81,57 +81,41 @@ static void s3store_response_callback(S3Status status, } } -static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key) +static void s3store_task(gpointer a, gpointer s) { + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; S3Store *store = (S3Store *)s; - struct get_info info; - info.buf = g_string_new(""); - info.success = 0; + g_print("Start task [key=%s]...\n", async->key); + async->status = ASYNC_RUNNING; - struct S3GetObjectHandler handler; - handler.responseHandler.propertiesCallback = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.getObjectDataCallback = s3store_get_handler; + if (async->op == STORE_OP_GET) { + struct get_info info; + info.buf = g_string_new(""); + info.success = 0; - g_print("Starting fetch of %s from S3...\n", key); - S3_get_object(&store->bucket, key, NULL, 0, 0, NULL, - &handler, &info); - - if (!info.success) { - g_string_free(info.buf, TRUE); - return NULL; - } - - BlueSkyRCStr *raw, *decrypted; - raw = bluesky_string_new_from_gstring(info.buf); - decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); - bluesky_string_unref(raw); - return decrypted; -} - -static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val) -{ - S3Store *store = (S3Store *)s; - - S3Op *op = g_new(S3Op, 1); - op->op = S3_PUT; - op->key = g_strdup(key); - bluesky_string_ref(val); - op->data = val; - - g_thread_pool_push(store->thread_pool, op, NULL); -} - -static void s3store_task(gpointer o, gpointer s) -{ - S3Store *store = (S3Store *)s; - S3Op *op = (S3Op *)o; + struct S3GetObjectHandler handler; + handler.responseHandler.propertiesCallback = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.getObjectDataCallback = s3store_get_handler; - g_print("Start task [key=%s]...\n", op->key); + g_print("Starting fetch of %s from S3...\n", async->key); + S3_get_object(&store->bucket, async->key, NULL, 0, 0, NULL, + &handler, &info); - if (op->op == S3_PUT) { - BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(op->data, + if (info.success) { + BlueSkyRCStr *raw, *decrypted; + raw = bluesky_string_new_from_gstring(info.buf); + decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); + bluesky_string_unref(raw); + async->data = decrypted; + async->result = 0; + } else { + g_string_free(info.buf, TRUE); + } + + } else if (async->op == STORE_OP_PUT) { + BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(async->data, store->encryption_key); struct put_info info; @@ -145,18 +129,19 @@ static void s3store_task(gpointer o, gpointer s) handler.putObjectDataCallback = s3store_put_handler; g_print("Starting store of %s to S3 at %ld...\n", - op->key, bluesky_now_hires()); - S3_put_object(&store->bucket, op->key, encrypted->len, NULL, NULL, + async->key, bluesky_now_hires()); + S3_put_object(&store->bucket, async->key, encrypted->len, NULL, NULL, &handler, &info); bluesky_string_unref(encrypted); + + async->result = 0; } - bluesky_string_unref(op->data); - g_free(op->key); - g_free(op); + // TODO: Deallocate resources g_print("Finish task...\n"); + bluesky_store_async_mark_complete(async); } static gpointer s3store_new() @@ -189,11 +174,41 @@ static void s3store_destroy(gpointer store) g_free(store); } +static void s3store_submit(gpointer s, BlueSkyStoreAsync *async) +{ + S3Store *store = (S3Store *)s; + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + g_thread_pool_push(store->thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for S3Store: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } +} + +static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ + GString *buf = (GString *)async->store_private; + + if (buf != NULL) { + g_string_free(buf, TRUE); + async->store_private = NULL; + } +} + static BlueSkyStoreImplementation store_impl = { .create = s3store_new, .destroy = s3store_destroy, - .get = s3store_get, - .put = s3store_put, + .submit = s3store_submit, + .cleanup = s3store_cleanup, }; void bluesky_store_init_s3(void)