Implement new scheme for retaining needed journal segments.
[bluesky.git] / bluesky / cloudlog.c
index e842fbf..a47f5b0 100644 (file)
@@ -279,17 +279,13 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log,
     return log->location;
 }
 
-typedef struct {
-    BlueSkyRCStr *data;
-    GSList *items;
-} SerializedRecord;
-
 static void cloudlog_flush_complete(BlueSkyStoreAsync *async,
                                     SerializedRecord *record)
 {
     g_print("Write of %s to cloud complete, status = %d\n",
             async->key, async->result);
 
+    g_mutex_lock(record->lock);
     if (async->result >= 0) {
         while (record->items != NULL) {
             BlueSkyCloudLog *item = (BlueSkyCloudLog *)record->items->data;
@@ -305,8 +301,11 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async,
         }
 
         bluesky_string_unref(record->data);
+        record->data = NULL;
         g_slist_free(record->items);
-        g_free(record);
+        record->items = NULL;
+        record->complete = TRUE;
+        g_cond_broadcast(record->cond);
     } else {
         g_print("Write should be resubmitted...\n");
 
@@ -321,6 +320,7 @@ static void cloudlog_flush_complete(BlueSkyStoreAsync *async,
                                          record);
         bluesky_store_async_unref(async2);
     }
+    g_mutex_unlock(record->lock);
 }
 
 /* Finish up a partially-written cloud log segment and flush it to storage. */
@@ -336,6 +336,8 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs)
     SerializedRecord *record = g_new0(SerializedRecord, 1);
     record->data = bluesky_string_new_from_gstring(state->data);
     record->items = state->writeback_list;
+    record->lock = g_mutex_new();
+    record->cond = g_cond_new();
     state->writeback_list = NULL;
 
     BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
@@ -351,6 +353,8 @@ void bluesky_cloudlog_flush(BlueSkyFS *fs)
                                      record);
     bluesky_store_async_unref(async);
 
+    state->pending_segments = g_list_prepend(state->pending_segments, record);
+
     state->location.sequence++;
     state->location.offset = 0;
     state->data = g_string_new("");