Work to unify the cloud segment writing with other cache management.
[bluesky.git] / bluesky / cloudlog.c
index 30c2db0..134f3d7 100644 (file)
 
 #include "bluesky-private.h"
 
-/* The locations hash table in the file system is used to map objects to their locations.  Objects are named using 128- */
-
-typedef struct {
-    BlueSkyCloudID id;
-
-    BlueSkyCloudPointer *cloud_loc;
-} BlueSkyLocationEntry;
+// Rough size limit for a log segment.  This is not a firm limit and there are
+// no absolute guarantees on the size of a log segment.
+#define CLOUDLOG_SEGMENT_SIZE (4 << 20)
 
 BlueSkyCloudID bluesky_cloudlog_new_id()
 {
@@ -178,21 +174,36 @@ struct log_footer {
     struct logref refs[0];
 };
 
+/* Ensure that a cloud log item is loaded in memory, and if not read it in.
+ * TODO: Make asynchronous, and make this also fetch from the cloud.  Right now
+ * we only read from the log.  Log item must be locked. */
+void bluesky_cloudlog_fetch(BlueSkyCloudLog *log)
+{
+    if (log->data != NULL)
+        return;
+
+    g_assert(log->location_flags & CLOUDLOG_JOURNAL);
+
+    log->data = bluesky_log_map_object(log->fs->log, log->log_seq,
+                                       log->log_offset, log->log_size);
+
+    g_cond_broadcast(log->cond);
+}
+
 BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log,
-                                               BlueSkyCloudLogState *state)
+                                               BlueSkyFS *fs)
 {
+    BlueSkyCloudLogState *state = fs->log_state;
+
     if (log->location_flags & CLOUDLOG_CLOUD) {
         return log->location;
     }
 
-    g_print("Flushing object %s to cloud...\n",
-            bluesky_cloudlog_id_to_string(log->id));
-
     for (int i = 0; i < log->links->len; i++) {
         BlueSkyCloudLog *ref = g_array_index(log->links,
                                              BlueSkyCloudLog *, i);
         if (ref != NULL)
-            bluesky_cloudlog_serialize(ref, state);
+            bluesky_cloudlog_serialize(ref, fs);
     }
 
     g_mutex_lock(log->lock);
@@ -217,6 +228,9 @@ BlueSkyCloudPointer bluesky_cloudlog_serialize(BlueSkyCloudLog *log,
     log->location_flags |= CLOUDLOG_CLOUD;
     g_mutex_unlock(log->lock);
 
+    if (state->data->len > CLOUDLOG_SEGMENT_SIZE)
+        bluesky_cloudlog_flush(fs);
+
     return log->location;
 }
 
@@ -232,6 +246,32 @@ static void find_inodes(gpointer key, gpointer value, gpointer user_data)
     state->inode_list = g_list_prepend(state->inode_list, item);
 }
 
+/* Finish up a partially-written cloud log segment and flush it to storage. */
+void bluesky_cloudlog_flush(BlueSkyFS *fs)
+{
+    BlueSkyCloudLogState *state = fs->log_state;
+    if (state->data == NULL || state->data->len == 0)
+        return;
+
+    /* TODO: Append some type of commit record to the log segment? */
+
+    g_print("Serializing %zd bytes of data to cloud\n", state->data->len);
+
+    BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
+    async->op = STORE_OP_PUT;
+    async->key = g_strdup_printf("log-%08d-%08d",
+                                 state->location.directory,
+                                 state->location.sequence);
+    async->data = bluesky_string_new_from_gstring(state->data);
+    bluesky_store_async_submit(async);
+    bluesky_store_async_wait(async);
+    bluesky_store_async_unref(async);
+
+    state->location.sequence++;
+    state->location.offset = 0;
+    state->data = g_string_new("");
+}
+
 void bluesky_cloudlog_write_log(BlueSkyFS *fs)
 {
     BlueSkyCloudLogState *state = fs->log_state;
@@ -244,47 +284,14 @@ void bluesky_cloudlog_write_log(BlueSkyFS *fs)
 
     while (state->inode_list != NULL) {
         BlueSkyCloudLog *log = (BlueSkyCloudLog *)state->inode_list->data;
-        bluesky_cloudlog_serialize(log, state);
+        bluesky_cloudlog_serialize(log, fs);
         bluesky_cloudlog_unref(log);
         state->inode_list = g_list_delete_link(state->inode_list,
                                                state->inode_list);
-    }
 
-    if (state->data->len > 0) {
-        g_print("Serialized %zd bytes of data to cloud\n", state->data->len);
-
-        BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
-        async->op = STORE_OP_PUT;
-        async->key = g_strdup_printf("log-%08d-%08d",
-                                     state->location.directory,
-                                     state->location.sequence);
-        async->data = bluesky_string_new_from_gstring(state->data);
-        bluesky_store_async_submit(async);
-        bluesky_store_async_wait(async);
-        bluesky_store_async_unref(async);
-
-        state->location.sequence++;
-        state->location.offset = 0;
+        if (state->data->len > CLOUDLOG_SEGMENT_SIZE)
+            bluesky_cloudlog_flush(fs);
     }
 
-    state->data = NULL;
-}
-
-/* Ensure that a cloud log item is loaded in memory, and if not read it in.
- * TODO: Make asynchronous, and make this also fetch from the cloud.  Right now
- * we only read from the log.  Log item must be locked. */
-void bluesky_cloudlog_fetch(BlueSkyCloudLog *log)
-{
-    if (log->data != NULL)
-        return;
-
-    g_print("Re-mapping log entry %d/%d/%d...\n",
-            log->log_seq, log->log_offset, log->log_size);
-
-    g_assert(log->location_flags & CLOUDLOG_JOURNAL);
-
-    log->data = bluesky_log_map_object(log->fs->log, log->log_seq,
-                                       log->log_offset, log->log_size);
-
-    g_cond_broadcast(log->cond);
+    bluesky_cloudlog_flush(fs);
 }