GSList *writeback_list; // Items which are being serialized right now
GList *pending_segments; // Segments which are being uploaded now
+ int uploads_pending; // Count of uploads in progress, not completed
+ GMutex *uploads_pending_lock;
+ GCond *uploads_pending_cond;
+
/* What is the most recent sequence number written by the cleaner which we
* have processed and incorporated into our own log? This gets
* incorporated into the version vector written out with our checkpoint
// no absolute guarantees on the size of a log segment.
#define CLOUDLOG_SEGMENT_SIZE (4 << 20)
+// Maximum number of segments to attempt to upload concurrently
+int cloudlog_concurrent_uploads = 32;
+
BlueSkyCloudID bluesky_cloudlog_new_id()
{
BlueSkyCloudID id;
g_slist_free(record->items);
record->items = NULL;
record->complete = TRUE;
+
+ BlueSkyCloudLogState *state = record->fs->log_state;
+ g_mutex_lock(state->uploads_pending_lock);
+ state->uploads_pending--;
+ g_cond_broadcast(state->uploads_pending_cond);
+ g_mutex_unlock(state->uploads_pending_lock);
+
g_cond_broadcast(record->cond);
} else {
g_print("Write should be resubmitted...\n");
if (state->data == NULL || state->data->len == 0)
return;
+ g_mutex_lock(state->uploads_pending_lock);
+ while (state->uploads_pending > cloudlog_concurrent_uploads)
+ g_cond_wait(state->uploads_pending_cond, state->uploads_pending_lock);
+ state->uploads_pending++;
+ g_mutex_unlock(state->uploads_pending_lock);
+
/* TODO: Append some type of commit record to the log segment? */
g_print("Serializing %zd bytes of data to cloud\n", state->data->len);
state->location.directory,
state->location.sequence);
+ state->pending_segments = g_list_prepend(state->pending_segments, record);
+
/* Encryption of data and upload happen in the background, for additional
* parallelism when uploading large amounts of data. */
g_thread_create((GThreadFunc)cloud_flush_background, record, FALSE, NULL);
- state->pending_segments = g_list_prepend(state->pending_segments, record);
-
state->location.sequence++;
state->location.offset = 0;
state->data = g_string_new("");
fs->log_state = g_new0(BlueSkyCloudLogState, 1);
fs->log_state->data = g_string_new("");
fs->log_state->latest_cleaner_seq_seen = -1;
+ fs->log_state->uploads_pending_lock = g_mutex_new();
+ fs->log_state->uploads_pending_cond = g_cond_new();
bluesky_cloudlog_threads_init(fs);
fs->inode_fetch_thread_pool = g_thread_pool_new(inode_fetch_task, NULL,