Limit the number of concurrent log uploads to the cloud
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 2 Mar 2011 22:20:46 +0000 (14:20 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 2 Mar 2011 22:20:46 +0000 (14:20 -0800)
This is mostly aimed at limiting memory usage, since each cloud log segment
is stored entirely in memory before it is uploaded, so a very large number
of parallel uploads could require allocating a large amount of memory.

bluesky/bluesky-private.h
bluesky/cloudlog.c
bluesky/inode.c

index 5e7ed2f..d4ec4cd 100644 (file)
@@ -329,6 +329,10 @@ struct BlueSkyCloudLogState {
     GSList *writeback_list;     // Items which are being serialized right now
     GList *pending_segments;    // Segments which are being uploaded now
 
+    int uploads_pending;        // Count of uploads in progress, not completed
+    GMutex *uploads_pending_lock;
+    GCond *uploads_pending_cond;
+
     /* What is the most recent sequence number written by the cleaner which we
      * have processed and incorporated into our own log?  This gets
      * incorporated into the version vector written out with our checkpoint
index f0fa485..4b49397 100644 (file)
@@ -17,6 +17,9 @@
 // no absolute guarantees on the size of a log segment.
 #define CLOUDLOG_SEGMENT_SIZE (4 << 20)
 
+// Maximum number of segments to attempt to upload concurrently
+int cloudlog_concurrent_uploads = 32;
+
 BlueSkyCloudID bluesky_cloudlog_new_id()
 {
     BlueSkyCloudID id;
@@ -478,6 +481,13 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async,
         g_slist_free(record->items);
         record->items = NULL;
         record->complete = TRUE;
+
+        BlueSkyCloudLogState *state = record->fs->log_state;
+        g_mutex_lock(state->uploads_pending_lock);
+        state->uploads_pending--;
+        g_cond_broadcast(state->uploads_pending_cond);
+        g_mutex_unlock(state->uploads_pending_lock);
+
         g_cond_broadcast(record->cond);
     } else {
         g_print("Write should be resubmitted...\n");
@@ -522,6 +532,12 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs)
     if (state->data == NULL || state->data->len == 0)
         return;
 
+    g_mutex_lock(state->uploads_pending_lock);
+    while (state->uploads_pending > cloudlog_concurrent_uploads)
+        g_cond_wait(state->uploads_pending_cond, state->uploads_pending_lock);
+    state->uploads_pending++;
+    g_mutex_unlock(state->uploads_pending_lock);
+
     /* TODO: Append some type of commit record to the log segment? */
 
     g_print("Serializing %zd bytes of data to cloud\n", state->data->len);
@@ -538,12 +554,12 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs)
                                   state->location.directory,
                                   state->location.sequence);
 
+    state->pending_segments = g_list_prepend(state->pending_segments, record);
+
     /* Encryption of data and upload happen in the background, for additional
      * parallelism when uploading large amounts of data. */
     g_thread_create((GThreadFunc)cloud_flush_background, record, FALSE, NULL);
 
-    state->pending_segments = g_list_prepend(state->pending_segments, record);
-
     state->location.sequence++;
     state->location.offset = 0;
     state->data = g_string_new("");
index cabaa34..fe68328 100644 (file)
@@ -96,6 +96,8 @@ BlueSkyFS *bluesky_new_fs(gchar *name)
     fs->log_state = g_new0(BlueSkyCloudLogState, 1);
     fs->log_state->data = g_string_new("");
     fs->log_state->latest_cleaner_seq_seen = -1;
+    fs->log_state->uploads_pending_lock = g_mutex_new();
+    fs->log_state->uploads_pending_cond = g_cond_new();
 
     bluesky_cloudlog_threads_init(fs);
     fs->inode_fetch_thread_pool = g_thread_pool_new(inode_fetch_task, NULL,