X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;ds=sidebyside;f=bluesky%2Fs3store.c;h=336afc26e3fd00a82760426f85363e12d5523a5d;hb=a4e456f91da9819e5a1517d4e505816bb4aa1007;hp=d742e3b19ec2803c0d740269868be955681faf91;hpb=b7e08dcf6552eb8977ccef56f00e775da8133cf8;p=bluesky.git diff --git a/bluesky/s3store.c b/bluesky/s3store.c index d742e3b..336afc2 100644 --- a/bluesky/s3store.c +++ b/bluesky/s3store.c @@ -59,7 +59,6 @@ static int s3store_put_handler(int bufferSize, char *buffer, static S3Status s3store_properties_callback(const S3ResponseProperties *properties, void *callbackData) { - g_print("(Properties callback)\n"); return S3StatusOK; } @@ -81,57 +80,39 @@ static void s3store_response_callback(S3Status status, } } -static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key) +static void s3store_task(gpointer a, gpointer s) { + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; S3Store *store = (S3Store *)s; - struct get_info info; - info.buf = g_string_new(""); - info.success = 0; + async->status = ASYNC_RUNNING; - struct S3GetObjectHandler handler; - handler.responseHandler.propertiesCallback = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.getObjectDataCallback = s3store_get_handler; + if (async->op == STORE_OP_GET) { + struct get_info info; + info.buf = g_string_new(""); + info.success = 0; - g_print("Starting fetch of %s from S3...\n", key); - S3_get_object(&store->bucket, key, NULL, 0, 0, NULL, - &handler, &info); - - if (!info.success) { - g_string_free(info.buf, TRUE); - return NULL; - } - - BlueSkyRCStr *raw, *decrypted; - raw = bluesky_string_new_from_gstring(info.buf); - decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); - bluesky_string_unref(raw); - return decrypted; -} - -static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val) -{ - S3Store *store = (S3Store *)s; - - S3Op *op = g_new(S3Op, 1); - op->op = S3_PUT; - op->key = g_strdup(key); - bluesky_string_ref(val); - op->data = val; - - g_thread_pool_push(store->thread_pool, op, NULL); -} - -static void s3store_task(gpointer o, gpointer s) -{ - S3Store *store = (S3Store *)s; - S3Op *op = (S3Op *)o; + struct S3GetObjectHandler handler; + handler.responseHandler.propertiesCallback = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.getObjectDataCallback = s3store_get_handler; - g_print("Start task [key=%s]...\n", op->key); + S3_get_object(&store->bucket, async->key, NULL, 0, 0, NULL, + &handler, &info); - if (op->op == S3_PUT) { - BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(op->data, + if (info.success) { + BlueSkyRCStr *raw, *decrypted; + raw = bluesky_string_new_from_gstring(info.buf); + decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); + bluesky_string_unref(raw); + async->data = decrypted; + async->result = 0; + } else { + g_string_free(info.buf, TRUE); + } + + } else if (async->op == STORE_OP_PUT) { + BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(async->data, store->encryption_key); struct put_info info; @@ -144,19 +125,16 @@ static void s3store_task(gpointer o, gpointer s) handler.responseHandler.completeCallback = s3store_response_callback; handler.putObjectDataCallback = s3store_put_handler; - g_print("Starting store of %s to S3 at %ld...\n", - op->key, bluesky_now_hires()); - S3_put_object(&store->bucket, op->key, encrypted->len, NULL, NULL, + S3_put_object(&store->bucket, async->key, encrypted->len, NULL, NULL, &handler, &info); bluesky_string_unref(encrypted); - } - bluesky_string_unref(op->data); - g_free(op->key); - g_free(op); + async->result = 0; + } - g_print("Finish task...\n"); + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); } static gpointer s3store_new() @@ -189,11 +167,42 @@ static void s3store_destroy(gpointer store) g_free(store); } +static void s3store_submit(gpointer s, BlueSkyStoreAsync *async) +{ + S3Store *store = (S3Store *)s; + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + bluesky_store_async_ref(async); + g_thread_pool_push(store->thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for S3Store: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } +} + +static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ + GString *buf = (GString *)async->store_private; + + if (buf != NULL) { + g_string_free(buf, TRUE); + async->store_private = NULL; + } +} + static BlueSkyStoreImplementation store_impl = { .create = s3store_new, .destroy = s3store_destroy, - .get = s3store_get, - .put = s3store_put, + .submit = s3store_submit, + .cleanup = s3store_cleanup, }; void bluesky_store_init_s3(void)