X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fs3store.c;h=336afc26e3fd00a82760426f85363e12d5523a5d;hb=a4e456f91da9819e5a1517d4e505816bb4aa1007;hp=4602136d4520b7ad48439746fa945144ac023752;hpb=a5a9eca66728d271a442125ac52098378c70cf42;p=bluesky.git diff --git a/bluesky/s3store.c b/bluesky/s3store.c index 4602136..336afc2 100644 --- a/bluesky/s3store.c +++ b/bluesky/s3store.c @@ -11,38 +11,30 @@ #include #include -#include "bluesky.h" +#include "bluesky-private.h" #include "libs3.h" /* Interface to Amazon S3 storage. */ -/* Simple in-memory data store for test purposes. */ typedef struct { + GThreadPool *thread_pool; S3BucketContext bucket; + uint8_t encryption_key[CRYPTO_KEY_SIZE]; } S3Store; -static gpointer s3store_new() -{ - S3Store *store = g_new(S3Store, 1); - store->bucket.bucketName = "mvrable-bluesky"; - store->bucket.protocol = S3ProtocolHTTP; - store->bucket.uriStyle = S3UriStylePath; - store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID"); - store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY"); - - g_print("Initializing S3 with bucket %s, access key %s\n", - store->bucket.bucketName, store->bucket.accessKeyId); - - return store; -} - -static void s3store_destroy(gpointer store) -{ - g_free(store); -} +typedef struct { + enum { S3_GET, S3_PUT } op; + gchar *key; + BlueSkyRCStr *data; +} S3Op; struct get_info { - gchar *buf; + int success; + GString *buf; +}; + +struct put_info { + BlueSkyRCStr *val; gint offset; }; @@ -50,17 +42,10 @@ static S3Status s3store_get_handler(int bufferSize, const char *buffer, void *callbackData) { struct get_info *info = (struct get_info *)callbackData; - gint bytes = MIN(bufferSize, (int)(BLUESKY_BLOCK_SIZE - info->offset)); - memcpy(info->buf + info->offset, buffer, bytes); - info->offset += bytes; + g_string_append_len(info->buf, buffer, bufferSize); return S3StatusOK; } -struct put_info { - BlueSkyRCStr *val; - gint offset; -}; - static int s3store_put_handler(int bufferSize, char *buffer, void *callbackData) { @@ -74,7 +59,6 @@ static int s3store_put_handler(int bufferSize, char *buffer, static S3Status s3store_properties_callback(const S3ResponseProperties *properties, void *callbackData) { - g_print("(Properties callback)\n"); return S3StatusOK; } @@ -82,56 +66,143 @@ static void s3store_response_callback(S3Status status, const S3ErrorDetails *errorDetails, void *callbackData) { - g_print("S3 operation complete, status=%s\n", - S3_get_status_name(status)); + struct get_info *info = (struct get_info *)callbackData; + + g_print("S3 operation complete, status=%s, now=%ld\n", + S3_get_status_name(status), bluesky_now_hires()); + + if (status == 0) { + info->success = 1; + } + if (errorDetails != NULL) { g_print(" Error message: %s\n", errorDetails->message); } } -static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key) +static void s3store_task(gpointer a, gpointer s) { + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; S3Store *store = (S3Store *)s; - struct get_info info; - info.buf = (char *)g_malloc0(BLUESKY_BLOCK_SIZE); - info.offset = 0; + async->status = ASYNC_RUNNING; + + if (async->op == STORE_OP_GET) { + struct get_info info; + info.buf = g_string_new(""); + info.success = 0; - struct S3GetObjectHandler handler; - handler.responseHandler.propertiesCallback = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.getObjectDataCallback = s3store_get_handler; + struct S3GetObjectHandler handler; + handler.responseHandler.propertiesCallback = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.getObjectDataCallback = s3store_get_handler; - g_print("Starting fetch of %s from S3...\n", key); - S3_get_object(&store->bucket, key, NULL, 0, 0, NULL, - &handler, &info); + S3_get_object(&store->bucket, async->key, NULL, 0, 0, NULL, + &handler, &info); - return bluesky_string_new(info.buf, BLUESKY_BLOCK_SIZE); + if (info.success) { + BlueSkyRCStr *raw, *decrypted; + raw = bluesky_string_new_from_gstring(info.buf); + decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); + bluesky_string_unref(raw); + async->data = decrypted; + async->result = 0; + } else { + g_string_free(info.buf, TRUE); + } + + } else if (async->op == STORE_OP_PUT) { + BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(async->data, + store->encryption_key); + + struct put_info info; + info.val = encrypted; + info.offset = 0; + + struct S3PutObjectHandler handler; + handler.responseHandler.propertiesCallback + = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.putObjectDataCallback = s3store_put_handler; + + S3_put_object(&store->bucket, async->key, encrypted->len, NULL, NULL, + &handler, &info); + + bluesky_string_unref(encrypted); + + async->result = 0; + } + + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); } -static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val) +static gpointer s3store_new() { - S3Store *store = (S3Store *)s; + S3Store *store = g_new(S3Store, 1); + store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE, + NULL); + store->bucket.bucketName = "mvrable-bluesky"; + store->bucket.protocol = S3ProtocolHTTP; + store->bucket.uriStyle = S3UriStylePath; + store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID"); + store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY"); + + const char *key = getenv("BLUESKY_KEY"); + if (key == NULL) { + g_error("Encryption key not defined; please set BLUESKY_KEY environment variable"); + exit(1); + } + + bluesky_crypt_hash_key(key, store->encryption_key); + + g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n", + store->bucket.bucketName, store->bucket.accessKeyId, key); - struct put_info info; - info.val = val; - info.offset = 0; + return store; +} - struct S3PutObjectHandler handler; - handler.responseHandler.propertiesCallback = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.putObjectDataCallback = s3store_put_handler; +static void s3store_destroy(gpointer store) +{ + g_free(store); +} - g_print("Starting store of %s to S3...\n", key); - S3_put_object(&store->bucket, key, val->len, NULL, NULL, - &handler, &info); +static void s3store_submit(gpointer s, BlueSkyStoreAsync *async) +{ + S3Store *store = (S3Store *)s; + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + bluesky_store_async_ref(async); + g_thread_pool_push(store->thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for S3Store: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } +} + +static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ + GString *buf = (GString *)async->store_private; + + if (buf != NULL) { + g_string_free(buf, TRUE); + async->store_private = NULL; + } } static BlueSkyStoreImplementation store_impl = { .create = s3store_new, .destroy = s3store_destroy, - .get = s3store_get, - .put = s3store_put, + .submit = s3store_submit, + .cleanup = s3store_cleanup, }; void bluesky_store_init_s3(void)