From: Michael Vrable Date: Wed, 2 Mar 2011 22:20:46 +0000 (-0800) Subject: Limit the number of concurrent log uploads to the cloud X-Git-Url: http://git.vrable.net/?p=bluesky.git;a=commitdiff_plain;h=fe314d13c4973852723429e709137bf7f5a8b607 Limit the number of concurrent log uploads to the cloud This is mostly aimed at limiting memory usage, since each cloud log segment is stored entirely in memory before it is uploaded, so a very large number of parallel uploads could require allocating a large amount of memory. --- diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index 5e7ed2f..d4ec4cd 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -329,6 +329,10 @@ struct BlueSkyCloudLogState { GSList *writeback_list; // Items which are being serialized right now GList *pending_segments; // Segments which are being uploaded now + int uploads_pending; // Count of uploads in progress, not completed + GMutex *uploads_pending_lock; + GCond *uploads_pending_cond; + /* What is the most recent sequence number written by the cleaner which we * have processed and incorporated into our own log? This gets * incorporated into the version vector written out with our checkpoint diff --git a/bluesky/cloudlog.c b/bluesky/cloudlog.c index f0fa485..4b49397 100644 --- a/bluesky/cloudlog.c +++ b/bluesky/cloudlog.c @@ -17,6 +17,9 @@ // no absolute guarantees on the size of a log segment. #define CLOUDLOG_SEGMENT_SIZE (4 << 20) +// Maximum number of segments to attempt to upload concurrently +int cloudlog_concurrent_uploads = 32; + BlueSkyCloudID bluesky_cloudlog_new_id() { BlueSkyCloudID id; @@ -478,6 +481,13 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async, g_slist_free(record->items); record->items = NULL; record->complete = TRUE; + + BlueSkyCloudLogState *state = record->fs->log_state; + g_mutex_lock(state->uploads_pending_lock); + state->uploads_pending--; + g_cond_broadcast(state->uploads_pending_cond); + g_mutex_unlock(state->uploads_pending_lock); + g_cond_broadcast(record->cond); } else { g_print("Write should be resubmitted...\n"); @@ -522,6 +532,12 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs) if (state->data == NULL || state->data->len == 0) return; + g_mutex_lock(state->uploads_pending_lock); + while (state->uploads_pending > cloudlog_concurrent_uploads) + g_cond_wait(state->uploads_pending_cond, state->uploads_pending_lock); + state->uploads_pending++; + g_mutex_unlock(state->uploads_pending_lock); + /* TODO: Append some type of commit record to the log segment? */ g_print("Serializing %zd bytes of data to cloud\n", state->data->len); @@ -538,12 +554,12 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs) state->location.directory, state->location.sequence); + state->pending_segments = g_list_prepend(state->pending_segments, record); + /* Encryption of data and upload happen in the background, for additional * parallelism when uploading large amounts of data. */ g_thread_create((GThreadFunc)cloud_flush_background, record, FALSE, NULL); - state->pending_segments = g_list_prepend(state->pending_segments, record); - state->location.sequence++; state->location.offset = 0; state->data = g_string_new(""); diff --git a/bluesky/inode.c b/bluesky/inode.c index cabaa34..fe68328 100644 --- a/bluesky/inode.c +++ b/bluesky/inode.c @@ -96,6 +96,8 @@ BlueSkyFS *bluesky_new_fs(gchar *name) fs->log_state = g_new0(BlueSkyCloudLogState, 1); fs->log_state->data = g_string_new(""); fs->log_state->latest_cleaner_seq_seen = -1; + fs->log_state->uploads_pending_lock = g_mutex_new(); + fs->log_state->uploads_pending_cond = g_cond_new(); bluesky_cloudlog_threads_init(fs); fs->inode_fetch_thread_pool = g_thread_pool_new(inode_fetch_task, NULL,