X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fs3store.c;h=1a9e6b980de6a625e064e223a0c1f989878074af;hb=f6cf25a6403fae7297c8d8913815dbd7a87f4f0b;hp=f85a2529898e511952ee550a8efca191a1fa8a03;hpb=afdaf6a249027cccc296b8923dd95fd38736b70d;p=bluesky.git diff --git a/bluesky/s3store.c b/bluesky/s3store.c index f85a252..1a9e6b9 100644 --- a/bluesky/s3store.c +++ b/bluesky/s3store.c @@ -16,22 +16,41 @@ /* Interface to Amazon S3 storage. */ -/* Simple in-memory data store for test purposes. */ typedef struct { + GThreadPool *thread_pool; S3BucketContext bucket; + uint8_t encryption_key[CRYPTO_KEY_SIZE]; } S3Store; +typedef struct { + enum { S3_GET, S3_PUT } op; + gchar *key; + BlueSkyRCStr *data; +} S3Op; + +static void s3store_task(gpointer s, gpointer o); + static gpointer s3store_new() { S3Store *store = g_new(S3Store, 1); + store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE, + NULL); store->bucket.bucketName = "mvrable-bluesky"; store->bucket.protocol = S3ProtocolHTTP; store->bucket.uriStyle = S3UriStylePath; store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID"); store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY"); - g_print("Initializing S3 with bucket %s, access key %s\n", - store->bucket.bucketName, store->bucket.accessKeyId); + const char *key = getenv("BLUESKY_KEY"); + if (key == NULL) { + g_error("Encryption key not defined; please set BLUESKY_KEY environment variable"); + exit(1); + } + + bluesky_crypt_hash_key(key, store->encryption_key); + + g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n", + store->bucket.bucketName, store->bucket.accessKeyId, key); return store; } @@ -42,17 +61,14 @@ static void s3store_destroy(gpointer store) } struct get_info { - gchar *buf; - gint offset; + GString *buf; }; static S3Status s3store_get_handler(int bufferSize, const char *buffer, void *callbackData) { struct get_info *info = (struct get_info *)callbackData; - gint bytes = MIN(bufferSize, (int)(BLUESKY_BLOCK_SIZE - info->offset)); - memcpy(info->buf + info->offset, buffer, bytes); - info->offset += bytes; + g_string_append_len(info->buf, buffer, bufferSize); return S3StatusOK; } @@ -82,8 +98,8 @@ static void s3store_response_callback(S3Status status, const S3ErrorDetails *errorDetails, void *callbackData) { - g_print("S3 operation complete, status=%s\n", - S3_get_status_name(status)); + g_print("S3 operation complete, status=%s, now=%ld\n", + S3_get_status_name(status), bluesky_now_hires()); if (errorDetails != NULL) { g_print(" Error message: %s\n", errorDetails->message); } @@ -94,8 +110,7 @@ static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key) S3Store *store = (S3Store *)s; struct get_info info; - info.buf = (char *)g_malloc0(BLUESKY_BLOCK_SIZE); - info.offset = 0; + info.buf = g_string_new(""); struct S3GetObjectHandler handler; handler.responseHandler.propertiesCallback = s3store_properties_callback; @@ -106,25 +121,60 @@ static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key) S3_get_object(&store->bucket, key, NULL, 0, 0, NULL, &handler, &info); - return bluesky_string_new(info.buf, BLUESKY_BLOCK_SIZE); + BlueSkyRCStr *raw, *decrypted; + raw = bluesky_string_new_from_gstring(info.buf); + decrypted = bluesky_crypt_decrypt(raw, store->encryption_key); + bluesky_string_unref(raw); + return decrypted; } static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val) { S3Store *store = (S3Store *)s; - struct put_info info; - info.val = val; - info.offset = 0; + S3Op *op = g_new(S3Op, 1); + op->op = S3_PUT; + op->key = g_strdup(key); + bluesky_string_ref(val); + op->data = val; - struct S3PutObjectHandler handler; - handler.responseHandler.propertiesCallback = s3store_properties_callback; - handler.responseHandler.completeCallback = s3store_response_callback; - handler.putObjectDataCallback = s3store_put_handler; + g_thread_pool_push(store->thread_pool, op, NULL); +} - g_print("Starting store of %s to S3...\n", key); - S3_put_object(&store->bucket, key, val->len, NULL, NULL, - &handler, &info); +static void s3store_task(gpointer o, gpointer s) +{ + S3Store *store = (S3Store *)s; + S3Op *op = (S3Op *)o; + + g_print("Start task [key=%s]...\n", op->key); + + if (op->op == S3_PUT) { + BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(op->data, + store->encryption_key); + + struct put_info info; + info.val = encrypted; + info.offset = 0; + + struct S3PutObjectHandler handler; + handler.responseHandler.propertiesCallback + = s3store_properties_callback; + handler.responseHandler.completeCallback = s3store_response_callback; + handler.putObjectDataCallback = s3store_put_handler; + + g_print("Starting store of %s to S3 at %ld...\n", + op->key, bluesky_now_hires()); + S3_put_object(&store->bucket, op->key, encrypted->len, NULL, NULL, + &handler, &info); + + bluesky_string_unref(encrypted); + } + + bluesky_string_unref(op->data); + g_free(op->key); + g_free(op); + + g_print("Finish task...\n"); } static BlueSkyStoreImplementation store_impl = {