Values in the kvstore are raw byte arrays, not strings.
[bluesky.git] / bluesky / s3store.c
index 2ac2c3a..ccf0878 100644 (file)
@@ -28,40 +28,13 @@ typedef struct {
     BlueSkyRCStr *data;
 } S3Op;
 
-static void s3store_task(gpointer s, gpointer o);
-
-static gpointer s3store_new()
-{
-    S3Store *store = g_new(S3Store, 1);
-    store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
-                                           NULL);
-    store->bucket.bucketName = "mvrable-bluesky";
-    store->bucket.protocol = S3ProtocolHTTP;
-    store->bucket.uriStyle = S3UriStylePath;
-    store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
-    store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
-
-    const char *key = getenv("BLUESKY_KEY");
-    if (key == NULL) {
-        g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
-        exit(1);
-    }
-
-    bluesky_crypt_hash_key(key, store->encryption_key);
-
-    g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
-            store->bucket.bucketName, store->bucket.accessKeyId, key);
-
-    return store;
-}
-
-static void s3store_destroy(gpointer store)
-{
-    g_free(store);
-}
-
 struct get_info {
-    gchar *buf;
+    int success;
+    GString *buf;
+};
+
+struct put_info {
+    BlueSkyRCStr *val;
     gint offset;
 };
 
@@ -69,17 +42,10 @@ static S3Status s3store_get_handler(int bufferSize, const char *buffer,
                                     void *callbackData)
 {
     struct get_info *info = (struct get_info *)callbackData;
-    gint bytes = MIN(bufferSize, (int)(BLUESKY_BLOCK_SIZE - info->offset));
-    memcpy(info->buf + info->offset, buffer, bytes);
-    info->offset += bytes;
+    g_string_append_len(info->buf, buffer, bufferSize);
     return S3StatusOK;
 }
 
-struct put_info {
-    BlueSkyRCStr *val;
-    gint offset;
-};
-
 static int s3store_put_handler(int bufferSize, char *buffer,
                                void *callbackData)
 {
@@ -93,7 +59,6 @@ static int s3store_put_handler(int bufferSize, char *buffer,
 static S3Status s3store_properties_callback(const S3ResponseProperties *properties,
                                      void *callbackData)
 {
-    g_print("(Properties callback)\n");
     return S3StatusOK;
 }
 
@@ -101,59 +66,50 @@ static void s3store_response_callback(S3Status status,
                                const S3ErrorDetails *errorDetails,
                                void *callbackData)
 {
-    g_print("S3 operation complete, status=%s, now=%ld\n",
-            S3_get_status_name(status), bluesky_now_hires());
-    if (errorDetails != NULL) {
-        g_print("  Error message: %s\n", errorDetails->message);
-    }
-}
-
-static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key)
-{
-    S3Store *store = (S3Store *)s;
-
-    struct get_info info;
-    info.buf = (char *)g_malloc0(BLUESKY_BLOCK_SIZE);
-    info.offset = 0;
-
-    struct S3GetObjectHandler handler;
-    handler.responseHandler.propertiesCallback = s3store_properties_callback;
-    handler.responseHandler.completeCallback = s3store_response_callback;
-    handler.getObjectDataCallback = s3store_get_handler;
+    struct get_info *info = (struct get_info *)callbackData;
 
-    g_print("Starting fetch of %s from S3...\n", key);
-    S3_get_object(&store->bucket, key, NULL, 0, 0, NULL,
-                  &handler, &info);
+    if (status == 0) {
+        info->success = 1;
+    }
 
-    BlueSkyRCStr *raw, *decrypted;
-    raw = bluesky_string_new(info.buf, BLUESKY_BLOCK_SIZE);
-    decrypted = bluesky_crypt_decrypt(raw, store->encryption_key);
-    bluesky_string_unref(raw);
-    return decrypted;
+    if (errorDetails != NULL && errorDetails->message != NULL) {
+        g_print("  Error message: %s\n", errorDetails->message);
+    }
 }
 
-static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val)
+static void s3store_task(gpointer a, gpointer s)
 {
+    BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
     S3Store *store = (S3Store *)s;
 
-    S3Op *op = g_new(S3Op, 1);
-    op->op = S3_PUT;
-    op->key = g_strdup(key);
-    bluesky_string_ref(val);
-    op->data = val;
+    async->status = ASYNC_RUNNING;
 
-    g_thread_pool_push(store->thread_pool, op, NULL);
-}
+    if (async->op == STORE_OP_GET) {
+        struct get_info info;
+        info.buf = g_string_new("");
+        info.success = 0;
 
-static void s3store_task(gpointer o, gpointer s)
-{
-    S3Store *store = (S3Store *)s;
-    S3Op *op = (S3Op *)o;
+        struct S3GetObjectHandler handler;
+        handler.responseHandler.propertiesCallback = s3store_properties_callback;
+        handler.responseHandler.completeCallback = s3store_response_callback;
+        handler.getObjectDataCallback = s3store_get_handler;
 
-    g_print("Start task [key=%s]...\n", op->key);
+        S3_get_object(&store->bucket, async->key, NULL, 0, 0, NULL,
+                      &handler, &info);
 
-    if (op->op == S3_PUT) {
-        BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(op->data,
+        if (info.success) {
+            BlueSkyRCStr *raw, *decrypted;
+            raw = bluesky_string_new_from_gstring(info.buf);
+            decrypted = bluesky_crypt_decrypt(raw, store->encryption_key);
+            bluesky_string_unref(raw);
+            async->data = decrypted;
+            async->result = 0;
+        } else {
+            g_string_free(info.buf, TRUE);
+        }
+
+    } else if (async->op == STORE_OP_PUT) {
+        BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(async->data,
                                                         store->encryption_key);
 
         struct put_info info;
@@ -166,26 +122,84 @@ static void s3store_task(gpointer o, gpointer s)
         handler.responseHandler.completeCallback = s3store_response_callback;
         handler.putObjectDataCallback = s3store_put_handler;
 
-        g_print("Starting store of %s to S3 at %ld...\n",
-                op->key, bluesky_now_hires());
-        S3_put_object(&store->bucket, op->key, encrypted->len, NULL, NULL,
+        S3_put_object(&store->bucket, async->key, encrypted->len, NULL, NULL,
                       &handler, &info);
 
         bluesky_string_unref(encrypted);
+
+        async->result = 0;
     }
 
-    bluesky_string_unref(op->data);
-    g_free(op->key);
-    g_free(op);
+    bluesky_store_async_mark_complete(async);
+    bluesky_store_async_unref(async);
+}
 
-    g_print("Finish task...\n");
+static gpointer s3store_new()
+{
+    S3Store *store = g_new(S3Store, 1);
+    store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
+                                           NULL);
+    store->bucket.bucketName = "mvrable-bluesky";
+    store->bucket.protocol = S3ProtocolHTTP;
+    store->bucket.uriStyle = S3UriStylePath;
+    store->bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
+    store->bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
+
+    const char *key = getenv("BLUESKY_KEY");
+    if (key == NULL) {
+        g_error("Encryption key not defined; please set BLUESKY_KEY environment variable");
+        exit(1);
+    }
+
+    bluesky_crypt_hash_key(key, store->encryption_key);
+
+    g_print("Initializing S3 with bucket %s, access key %s, encryption key %s\n",
+            store->bucket.bucketName, store->bucket.accessKeyId, key);
+
+    return store;
+}
+
+static void s3store_destroy(gpointer store)
+{
+    g_free(store);
+}
+
+static void s3store_submit(gpointer s, BlueSkyStoreAsync *async)
+{
+    S3Store *store = (S3Store *)s;
+    g_return_if_fail(async->status == ASYNC_NEW);
+    g_return_if_fail(async->op != STORE_OP_NONE);
+
+    switch (async->op) {
+    case STORE_OP_GET:
+    case STORE_OP_PUT:
+        async->status = ASYNC_PENDING;
+        bluesky_store_async_ref(async);
+        g_thread_pool_push(store->thread_pool, async, NULL);
+        break;
+
+    default:
+        g_warning("Uknown operation type for S3Store: %d\n", async->op);
+        bluesky_store_async_mark_complete(async);
+        break;
+    }
+}
+
+static void s3store_cleanup(gpointer store, BlueSkyStoreAsync *async)
+{
+    GString *buf = (GString *)async->store_private;
+
+    if (buf != NULL) {
+        g_string_free(buf, TRUE);
+        async->store_private = NULL;
+    }
 }
 
 static BlueSkyStoreImplementation store_impl = {
     .create = s3store_new,
     .destroy = s3store_destroy,
-    .get = s3store_get,
-    .put = s3store_put,
+    .submit = s3store_submit,
+    .cleanup = s3store_cleanup,
 };
 
 void bluesky_store_init_s3(void)