Allow storse to S3 to execute in the background, in parallel.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 18 Nov 2009 00:28:20 +0000 (16:28 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 18 Nov 2009 00:28:20 +0000 (16:28 -0800)
bluesky/s3store.c

index 3f4fb16..2ac2c3a 100644 (file)
 
 /* Interface to Amazon S3 storage. */
 
-/* Simple in-memory data store for test purposes. */
 typedef struct {
+    GThreadPool *thread_pool;
     S3BucketContext bucket;
     uint8_t encryption_key[CRYPTO_KEY_SIZE];
 } S3Store;
 
+typedef struct {
+    enum { S3_GET, S3_PUT } op;
+    gchar *key;
+    BlueSkyRCStr *data;
+} S3Op;
+
+static void s3store_task(gpointer s, gpointer o);
+
 static gpointer s3store_new()
 {
     S3Store *store = g_new(S3Store, 1);
+    store->thread_pool = g_thread_pool_new(s3store_task, store, -1, FALSE,
+                                           NULL);
     store->bucket.bucketName = "mvrable-bluesky";
     store->bucket.protocol = S3ProtocolHTTP;
     store->bucket.uriStyle = S3UriStylePath;
@@ -125,22 +135,50 @@ static BlueSkyRCStr *s3store_get(gpointer s, const gchar *key)
 static void s3store_put(gpointer s, const gchar *key, BlueSkyRCStr *val)
 {
     S3Store *store = (S3Store *)s;
-    BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(val, store->encryption_key);
 
-    struct put_info info;
-    info.val = encrypted;
-    info.offset = 0;
+    S3Op *op = g_new(S3Op, 1);
+    op->op = S3_PUT;
+    op->key = g_strdup(key);
+    bluesky_string_ref(val);
+    op->data = val;
 
-    struct S3PutObjectHandler handler;
-    handler.responseHandler.propertiesCallback = s3store_properties_callback;
-    handler.responseHandler.completeCallback = s3store_response_callback;
-    handler.putObjectDataCallback = s3store_put_handler;
+    g_thread_pool_push(store->thread_pool, op, NULL);
+}
 
-    g_print("Starting store of %s to S3 at %ld...\n", key, bluesky_now_hires());
-    S3_put_object(&store->bucket, key, encrypted->len, NULL, NULL,
-                  &handler, &info);
+static void s3store_task(gpointer o, gpointer s)
+{
+    S3Store *store = (S3Store *)s;
+    S3Op *op = (S3Op *)o;
+
+    g_print("Start task [key=%s]...\n", op->key);
+
+    if (op->op == S3_PUT) {
+        BlueSkyRCStr *encrypted = bluesky_crypt_encrypt(op->data,
+                                                        store->encryption_key);
+
+        struct put_info info;
+        info.val = encrypted;
+        info.offset = 0;
+
+        struct S3PutObjectHandler handler;
+        handler.responseHandler.propertiesCallback
+            = s3store_properties_callback;
+        handler.responseHandler.completeCallback = s3store_response_callback;
+        handler.putObjectDataCallback = s3store_put_handler;
+
+        g_print("Starting store of %s to S3 at %ld...\n",
+                op->key, bluesky_now_hires());
+        S3_put_object(&store->bucket, op->key, encrypted->len, NULL, NULL,
+                      &handler, &info);
+
+        bluesky_string_unref(encrypted);
+    }
+
+    bluesky_string_unref(op->data);
+    g_free(op->key);
+    g_free(op);
 
-    /* TODO: unref encrypted */
+    g_print("Finish task...\n");
 }
 
 static BlueSkyStoreImplementation store_impl = {