Move encryption of cloud log segments into background threads.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 2 Mar 2011 01:09:29 +0000 (17:09 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 2 Mar 2011 01:09:29 +0000 (17:09 -0800)
Crypto operations looked to be a CPU bottleneck, but using separate threads
will at least allow multiple log segments to be encrypted in parallel on
separate CPU cores.

bluesky/bluesky-private.h
bluesky/cloudlog.c

index be7a2d3..5e7ed2f 100644 (file)
@@ -433,7 +433,10 @@ void bluesky_replay(BlueSkyFS *fs);
 
 /* Used to track log segments that are being written to the cloud. */
 typedef struct {
 
 /* Used to track log segments that are being written to the cloud. */
 typedef struct {
-    BlueSkyRCStr *data;
+    BlueSkyFS *fs;
+    char *key;                  /* File name for log segment in backend */
+    GString *raw_data;          /* Data before encryption */
+    BlueSkyRCStr *data;         /* Data after encryption */
     GSList *items;
     GMutex *lock;
     GCond *cond;
     GSList *items;
     GMutex *lock;
     GCond *cond;
index 44799e3..f0fa485 100644 (file)
@@ -498,6 +498,24 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async,
 }
 
 /* Finish up a partially-written cloud log segment and flush it to storage. */
 }
 
 /* Finish up a partially-written cloud log segment and flush it to storage. */
+static void cloud_flush_background(SerializedRecord *record)
+{
+    bluesky_cloudlog_encrypt(record->raw_data, record->fs->keys);
+    record->data = bluesky_string_new_from_gstring(record->raw_data);
+    record->raw_data = NULL;
+
+    BlueSkyStoreAsync *async = bluesky_store_async_new(record->fs->store);
+    async->op = STORE_OP_PUT;
+    async->key = record->key;
+    async->data = record->data;
+    bluesky_string_ref(record->data);
+    bluesky_store_async_submit(async);
+    bluesky_store_async_add_notifier(async,
+                                     (GFunc)cloudlog_flush_complete,
+                                     record);
+    bluesky_store_async_unref(async);
+}
+
 void bluesky_cloudlog_flush(BlueSkyFS *fs)
 {
     BlueSkyCloudLogState *state = fs->log_state;
 void bluesky_cloudlog_flush(BlueSkyFS *fs)
 {
     BlueSkyCloudLogState *state = fs->log_state;
@@ -508,25 +526,21 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs)
 
     g_print("Serializing %zd bytes of data to cloud\n", state->data->len);
     SerializedRecord *record = g_new0(SerializedRecord, 1);
 
     g_print("Serializing %zd bytes of data to cloud\n", state->data->len);
     SerializedRecord *record = g_new0(SerializedRecord, 1);
-    bluesky_cloudlog_encrypt(state->data, fs->keys);
-    record->data = bluesky_string_new_from_gstring(state->data);
+    record->fs = fs;
+    record->raw_data = state->data;
+    record->data = NULL;
     record->items = state->writeback_list;
     record->lock = g_mutex_new();
     record->cond = g_cond_new();
     state->writeback_list = NULL;
 
     record->items = state->writeback_list;
     record->lock = g_mutex_new();
     record->cond = g_cond_new();
     state->writeback_list = NULL;
 
-    BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
-    async->op = STORE_OP_PUT;
-    async->key = g_strdup_printf("log-%08d-%08d",
-                                 state->location.directory,
-                                 state->location.sequence);
-    async->data = record->data;
-    bluesky_string_ref(record->data);
-    bluesky_store_async_submit(async);
-    bluesky_store_async_add_notifier(async,
-                                     (GFunc)cloudlog_flush_complete,
-                                     record);
-    bluesky_store_async_unref(async);
+    record->key = g_strdup_printf("log-%08d-%08d",
+                                  state->location.directory,
+                                  state->location.sequence);
+
+    /* Encryption of data and upload happen in the background, for additional
+     * parallelism when uploading large amounts of data. */
+    g_thread_create((GThreadFunc)cloud_flush_background, record, FALSE, NULL);
 
     state->pending_segments = g_list_prepend(state->pending_segments, record);
 
 
     state->pending_segments = g_list_prepend(state->pending_segments, record);