Limit the number of concurrent log uploads to the cloud
[bluesky.git] / bluesky / cloudlog.c
index f0fa485..4b49397 100644 (file)
@@ -17,6 +17,9 @@
 // no absolute guarantees on the size of a log segment.
 #define CLOUDLOG_SEGMENT_SIZE (4 << 20)
 
+// Maximum number of segments to attempt to upload concurrently
+int cloudlog_concurrent_uploads = 32;
+
 BlueSkyCloudID bluesky_cloudlog_new_id()
 {
     BlueSkyCloudID id;
@@ -478,6 +481,13 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async,
         g_slist_free(record->items);
         record->items = NULL;
         record->complete = TRUE;
+
+        BlueSkyCloudLogState *state = record->fs->log_state;
+        g_mutex_lock(state->uploads_pending_lock);
+        state->uploads_pending--;
+        g_cond_broadcast(state->uploads_pending_cond);
+        g_mutex_unlock(state->uploads_pending_lock);
+
         g_cond_broadcast(record->cond);
     } else {
         g_print("Write should be resubmitted...\n");
@@ -522,6 +532,12 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs)
     if (state->data == NULL || state->data->len == 0)
         return;
 
+    g_mutex_lock(state->uploads_pending_lock);
+    while (state->uploads_pending > cloudlog_concurrent_uploads)
+        g_cond_wait(state->uploads_pending_cond, state->uploads_pending_lock);
+    state->uploads_pending++;
+    g_mutex_unlock(state->uploads_pending_lock);
+
     /* TODO: Append some type of commit record to the log segment? */
 
     g_print("Serializing %zd bytes of data to cloud\n", state->data->len);
@@ -538,12 +554,12 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs)
                                   state->location.directory,
                                   state->location.sequence);
 
+    state->pending_segments = g_list_prepend(state->pending_segments, record);
+
     /* Encryption of data and upload happen in the background, for additional
      * parallelism when uploading large amounts of data. */
     g_thread_create((GThreadFunc)cloud_flush_background, record, FALSE, NULL);
 
-    state->pending_segments = g_list_prepend(state->pending_segments, record);
-
     state->location.sequence++;
     state->location.offset = 0;
     state->data = g_string_new("");