X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fs3store.c;h=336afc26e3fd00a82760426f85363e12d5523a5d;hb=a4e456f91da9819e5a1517d4e505816bb4aa1007;hp=083023ee79e1fae99b3b509fd300184ce02c1d5c;hpb=e53d372f2e2d81a4a0958425bd7cb41e3e6f4a57;p=bluesky.git diff --git a/bluesky/s3store.c b/bluesky/s3store.c index 083023e..336afc2 100644 --- a/bluesky/s3store.c +++ b/bluesky/s3store.c @@ -28,42 +28,16 @@ typedef struct { BlueSkyRCStr *data; } S3Op; -static void s3store_task(gpointer s, gpointer o); - -static gpointer s3store_new() -{ - S3Store *store = g_new(S3Store, 1); - store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE, - NULL); - store->bucket.bucketName = "mvrable-bluesky"; - store->bucket.protocol = S3ProtocolHTTP; - store->bucket.uriStyle = S3UriStylePath; - store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID"); - store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY"); - - const char *key = getenv("BLUESKY_KEY"); - if (key == NULL) { - g_error("Encryption key not defined; please set BLUESKY_KEY environment variable"); - exit(1); - } - - bluesky_crypt_hash_key(key, store->encryption_key); - - g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n", - store->bucket.bucketName, store->bucket.accessKeyId, key); - - return store; -} - -static void s3store_destroy(gpointer store) -{ - g_free(store); -} - struct get_info { + int success; GString *buf; }; +struct put_info { + BlueSkyRCStr *val; + gint offset; +}; + static S3Status s3store_get_handler(int bufferSize, const char *buffer, void *callbackData) { @@ -72,11 +46,6 @@ static S3Status s3store_get_handler(int bufferSize, const char *buffer, return S3StatusOK; } -struct put_info { - BlueSkyRCStr *val; - gint offset; -}; - static int s3store_put_handler(int bufferSize, char *buffer, void *callbackData) { @@ -90,7 +59,6 @@ static int s3store_put_handler(int bufferSize, char *buffer, static S3Status s3store_properties_callback(const S3ResponseProperties *properties, void *callbackData) { - g_print("(Properties callback)\n"); return S3StatusOK; } @@ -98,58 +66,53 @@ static void s3store_response_callback(S3Status status, const S3ErrorDetails *errorDetails, void *callbackData) { + struct get_info *info = (struct get_info *)callbackData; + g_print("S3 operation complete, status=%s, now=%ld\n", S3_get_status_name(status), bluesky_now_hires()); + + if (status == 0) { + info->success = 1; + } + if (errorDetails != NULL) { g_print(" Error message: %s\n", errorDetails->message); } } -static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key) +static void s3store_task(gpointer a, gpointer s) { + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; S3Store *store = (S3Store *)s; - struct get_info info; - info.buf = g_string_new(""); - - struct S3GetObjectHandler handler; - handler.responseHandler.propertiesCallback = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.getObjectDataCallback = s3store_get_handler; + async->status = ASYNC_RUNNING; - g_print("Starting fetch of %s from S3...\n", key); - S3_get_object(&store->bucket, key, NULL, 0, 0, NULL, - &handler, &info); + if (async->op == STORE_OP_GET) { + struct get_info info; + info.buf = g_string_new(""); + info.success = 0; - BlueSkyRCStr *raw, *decrypted; - raw = bluesky_string_new_from_string(info.buf); - decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); - bluesky_string_unref(raw); - return decrypted; -} - -static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val) -{ - S3Store *store = (S3Store *)s; - - S3Op *op = g_new(S3Op, 1); - op->op = S3_PUT; - op->key = g_strdup(key); - bluesky_string_ref(val); - op->data = val; - - g_thread_pool_push(store->thread_pool, op, NULL); -} - -static void s3store_task(gpointer o, gpointer s) -{ - S3Store *store = (S3Store *)s; - S3Op *op = (S3Op *)o; + struct S3GetObjectHandler handler; + handler.responseHandler.propertiesCallback = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.getObjectDataCallback = s3store_get_handler; - g_print("Start task [key=%s]...\n", op->key); + S3_get_object(&store->bucket, async->key, NULL, 0, 0, NULL, + &handler, &info); - if (op->op == S3_PUT) { - BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(op->data, + if (info.success) { + BlueSkyRCStr *raw, *decrypted; + raw = bluesky_string_new_from_gstring(info.buf); + decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); + bluesky_string_unref(raw); + async->data = decrypted; + async->result = 0; + } else { + g_string_free(info.buf, TRUE); + } + + } else if (async->op == STORE_OP_PUT) { + BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(async->data, store->encryption_key); struct put_info info; @@ -162,26 +125,84 @@ static void s3store_task(gpointer o, gpointer s) handler.responseHandler.completeCallback = s3store_response_callback; handler.putObjectDataCallback = s3store_put_handler; - g_print("Starting store of %s to S3 at %ld...\n", - op->key, bluesky_now_hires()); - S3_put_object(&store->bucket, op->key, encrypted->len, NULL, NULL, + S3_put_object(&store->bucket, async->key, encrypted->len, NULL, NULL, &handler, &info); bluesky_string_unref(encrypted); + + async->result = 0; } - bluesky_string_unref(op->data); - g_free(op->key); - g_free(op); + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); +} - g_print("Finish task...\n"); +static gpointer s3store_new() +{ + S3Store *store = g_new(S3Store, 1); + store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE, + NULL); + store->bucket.bucketName = "mvrable-bluesky"; + store->bucket.protocol = S3ProtocolHTTP; + store->bucket.uriStyle = S3UriStylePath; + store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID"); + store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY"); + + const char *key = getenv("BLUESKY_KEY"); + if (key == NULL) { + g_error("Encryption key not defined; please set BLUESKY_KEY environment variable"); + exit(1); + } + + bluesky_crypt_hash_key(key, store->encryption_key); + + g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n", + store->bucket.bucketName, store->bucket.accessKeyId, key); + + return store; +} + +static void s3store_destroy(gpointer store) +{ + g_free(store); +} + +static void s3store_submit(gpointer s, BlueSkyStoreAsync *async) +{ + S3Store *store = (S3Store *)s; + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + bluesky_store_async_ref(async); + g_thread_pool_push(store->thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for S3Store: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } +} + +static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ + GString *buf = (GString *)async->store_private; + + if (buf != NULL) { + g_string_free(buf, TRUE); + async->store_private = NULL; + } } static BlueSkyStoreImplementation store_impl = { .create = s3store_new, .destroy = s3store_destroy, - .get = s3store_get, - .put = s3store_put, + .submit = s3store_submit, + .cleanup = s3store_cleanup, }; void bluesky_store_init_s3(void)