Gradually converting code to use cloud logs for storing data.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Fri, 30 Jul 2010 23:17:30 +0000 (16:17 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Fri, 30 Jul 2010 23:17:30 +0000 (16:17 -0700)
bluesky/bluesky-private.h
bluesky/bluesky.h
bluesky/cloudlog.c
bluesky/debug.c
bluesky/dir.c
bluesky/file.c
bluesky/inode.c
bluesky/serialize.c

index d6fcc1a..fbc765c 100644 (file)
@@ -142,9 +142,9 @@ void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier,
 void bluesky_inode_start_sync(BlueSkyInode *inode);
 
 void bluesky_block_touch(BlueSkyInode *inode, uint64_t i);
-void bluesky_block_fetch(BlueSkyFS *fs, BlueSkyBlock *block,
+void bluesky_block_fetch(BlueSkyInode *inode, BlueSkyBlock *block,
                          BlueSkyStoreAsync *barrier);
-void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block,
+void bluesky_block_flush(BlueSkyInode *inode, BlueSkyBlock *block,
                          GList **log_items);
 void bluesky_file_flush(BlueSkyInode *inode, GList **log_items);
 void bluesky_file_drop_cached(BlueSkyInode *inode);
@@ -178,7 +178,7 @@ typedef enum {
 #define CLOUDLOG_JOURNAL    0x01
 #define CLOUDLOG_CLOUD      0x02
 #define CLOUDLOG_CACHE      0x04
-typedef struct {
+struct _BlueSkyCloudLog {
     gint refcount;
     GMutex *lock;
     GCond *cond;
@@ -211,7 +211,7 @@ typedef struct {
 
     // Serialized data, if available in memory (otherwise NULL).
     BlueSkyRCStr *data;
-} BlueSkyCloudLog;
+};
 
 /* Serialize objects into a log segment to be written to the cloud. */
 struct _BlueSkyCloudLogState {
index f72c6d0..4b17bda 100644 (file)
@@ -99,6 +99,9 @@ typedef struct _BlueSkyLog BlueSkyLog;
 struct _BlueSkyCloudLogState;
 typedef struct _BlueSkyCloudLogState BlueSkyCloudLogState;
 
+struct _BlueSkyCloudLog;
+typedef struct _BlueSkyCloudLog BlueSkyCloudLog;
+
 void bluesky_store_init();
 BlueSkyStore *bluesky_store_new(const gchar *type);
 void bluesky_store_free(BlueSkyStore *store);
@@ -220,6 +223,9 @@ typedef struct {
     /* Additional state for tracking cache writeback status. */
     uint64_t change_pending;    /* change_count version currently being committed to storage */
 
+    /* Version of the object last serialized and committed to storage. */
+    BlueSkyCloudLog *committed_item;
+
     /* Pointers to the linked-list elements for this inode in the accessed and
      * dirty linked lists.  We re-use the GList structure, using ->next to
      * point to the head of the list and ->prev to point to the tail.  The data
@@ -272,8 +278,8 @@ typedef enum {
 
 typedef struct {
     BlueSkyBlockType type;
-    gchar *ref;                 /* Name of data block in the backing store */
     BlueSkyRCStr *data;         /* Pointer to data in memory if cached */
+    BlueSkyCloudLog *cloudref;  /* Reference to cloud log entry with data */
 } BlueSkyBlock;
 
 BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store);
@@ -306,7 +312,6 @@ void bluesky_file_write(BlueSkyInode *inode, uint64_t offset,
 void bluesky_file_read(BlueSkyInode *inode, uint64_t offset,
                        char *buf, gint len);
 
-void bluesky_inode_flush(BlueSkyFS *fs, BlueSkyInode *inode);
 void bluesky_inode_fetch(BlueSkyFS *fs, uint64_t inum);
 
 gint bluesky_dirent_compare(gconstpointer a, gconstpointer b,
index 82f0f65..b98c233 100644 (file)
@@ -99,11 +99,17 @@ BlueSkyCloudLog *bluesky_cloudlog_new(BlueSkyFS *fs)
 
 void bluesky_cloudlog_ref(BlueSkyCloudLog *log)
 {
+    if (log == NULL)
+        return;
+
     g_atomic_int_inc(&log->refcount);
 }
 
 void bluesky_cloudlog_unref(BlueSkyCloudLog *log)
 {
+    if (log == NULL)
+        return;
+
     if (g_atomic_int_dec_and_test(&log->refcount)) {
         g_print("Cloud log refcount dropped to zero.\n");
     }
@@ -203,7 +209,8 @@ void bluesky_cloudlog_write_log(BlueSkyFS *fs)
     g_print("Starting cloudlog write...\n");
 
     BlueSkyCloudLogState *state = fs->log_state;
-    state->data = g_string_new("");
+    if (state->data == NULL)
+        state->data = g_string_new("");
 
     g_mutex_lock(fs->lock);
     g_hash_table_foreach(fs->locations, find_inodes, state);
@@ -216,19 +223,22 @@ void bluesky_cloudlog_write_log(BlueSkyFS *fs)
                                                state->inode_list);
     }
 
-    g_print("Serialized %zd bytes of data\n", state->data->len);
-
-    BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
-    async->op = STORE_OP_PUT;
-    async->key = g_strdup_printf("log-%08d-%08d",
-                                 state->location.directory,
-                                 state->location.sequence);
-    async->data = bluesky_string_new_from_gstring(state->data);
-    bluesky_store_async_submit(async);
-    bluesky_store_async_wait(async);
-    bluesky_store_async_unref(async);
+    if (state->data->len > 0) {
+        g_print("Serialized %zd bytes of data\n", state->data->len);
+
+        BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
+        async->op = STORE_OP_PUT;
+        async->key = g_strdup_printf("log-%08d-%08d",
+                                     state->location.directory,
+                                     state->location.sequence);
+        async->data = bluesky_string_new_from_gstring(state->data);
+        bluesky_store_async_submit(async);
+        bluesky_store_async_wait(async);
+        bluesky_store_async_unref(async);
+
+        state->location.sequence++;
+        state->location.offset = 0;
+    }
 
     state->data = NULL;
-    state->location.sequence++;
-    state->location.offset = 0;
 }
index 19cc5ef..2b32c9c 100644 (file)
@@ -42,7 +42,8 @@ static void cloudlog_dump(gpointer key, gpointer value, gpointer user_data)
     for (int i = 0; i < sizeof(BlueSkyCloudID); i++) {
         g_print("%02x", (uint8_t)(log->id.bytes[i]));
     }
-    g_print(": ty=%d inode=%"PRIu64" locs=%x log@(%d,%d) cloud@(%d,%d,%d)\n",
+    g_print(": refs=%d ty=%d inode=%"PRIu64" locs=%x log@(%d,%d) cloud@(%d,%d,%d)\n",
+            log->refcount,
             log->type, log->inum, log->location_flags,
             log->log_seq, log->log_offset, log->location.directory,
             log->location.sequence, log->location.offset);
index cd4ee1d..8e22d2d 100644 (file)
@@ -132,7 +132,7 @@ gboolean bluesky_directory_insert(BlueSkyInode *dir,
     g_hash_table_insert(dir->dirhash_folded, d->name_folded, d);
 
     bluesky_inode_update_ctime(dir, 1);
-    bluesky_inode_flush(dir->fs, dir);
+    //bluesky_inode_do_sync(dir);     // TODO: Needed?
 
     return TRUE;
 }
@@ -161,7 +161,6 @@ gboolean bluesky_directory_remove(BlueSkyInode *dir, gchar *name)
     g_sequence_remove(i);
 
     bluesky_inode_update_ctime(dir, 1);
-    bluesky_inode_flush(dir->fs, dir);
 
     return TRUE;
 }
index 2e4313e..4824dd8 100644 (file)
@@ -34,7 +34,7 @@ void bluesky_block_touch(BlueSkyInode *inode, uint64_t i)
         block->data = bluesky_string_new(g_malloc0(block_len), block_len);
         break;
     case BLUESKY_BLOCK_REF:
-        bluesky_block_fetch(inode->fs, block, NULL);
+        bluesky_block_fetch(inode, block, NULL);
         g_assert(block->type == BLUESKY_BLOCK_CACHED);
         /* Fall through */
     case BLUESKY_BLOCK_CACHED:
@@ -50,6 +50,9 @@ void bluesky_block_touch(BlueSkyInode *inode, uint64_t i)
         g_atomic_int_add(&inode->fs->cache_dirty, 1);
 
     block->type = BLUESKY_BLOCK_DIRTY;
+    if (block->cloudref != NULL)
+        bluesky_cloudlog_unref(block->cloudref);
+    block->cloudref = NULL;
 }
 
 /* Set the size of a file.  This will truncate or extend the file as needed.
@@ -94,13 +97,13 @@ void bluesky_file_truncate(BlueSkyInode *inode, uint64_t size)
         /* Delete blocks from a file.  Must reclaim memory. */
         for (guint i = inode->blocks->len; i < blocks; i++) {
             BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i);
-            g_free(b->ref);
             if (b->type == BLUESKY_BLOCK_CACHED
                     || b->type == BLUESKY_BLOCK_DIRTY)
                 g_atomic_int_add(&inode->fs->cache_total, -1);
             if (b->type == BLUESKY_BLOCK_DIRTY)
                 g_atomic_int_add(&inode->fs->cache_dirty, -1);
             bluesky_string_unref(b->data);
+            bluesky_cloudlog_unref(b->cloudref);
         }
         g_array_set_size(inode->blocks, blocks);
     }
@@ -181,7 +184,7 @@ void bluesky_file_read(BlueSkyInode *inode, uint64_t offset,
         BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock,
                                          i);
         if (b->type == BLUESKY_BLOCK_REF)
-            bluesky_block_fetch(inode->fs, b, barrier);
+            bluesky_block_fetch(inode, b, barrier);
     }
     bluesky_store_async_submit(barrier);
     bluesky_store_async_wait(barrier);
@@ -202,7 +205,7 @@ void bluesky_file_read(BlueSkyInode *inode, uint64_t offset,
             memset(buf, 0, bytes);
             break;
         case BLUESKY_BLOCK_REF:
-            bluesky_block_fetch(inode->fs, b, NULL);
+            bluesky_block_fetch(inode, b, NULL);
             /* Fall through */
         case BLUESKY_BLOCK_CACHED:
         case BLUESKY_BLOCK_DIRTY:
@@ -216,6 +219,7 @@ void bluesky_file_read(BlueSkyInode *inode, uint64_t offset,
     }
 }
 
+#if 0
 /* Read the given block from cloud-backed storage if the data is not already
  * cached. */
 static void block_fetch_completion(BlueSkyStoreAsync *async, gpointer data)
@@ -234,10 +238,16 @@ static void block_fetch_completion(BlueSkyStoreAsync *async, gpointer data)
 
     block->type = BLUESKY_BLOCK_CACHED;
 }
+#endif
 
-void bluesky_block_fetch(BlueSkyFS *fs, BlueSkyBlock *block,
+void bluesky_block_fetch(BlueSkyInode *inode, BlueSkyBlock *block,
                          BlueSkyStoreAsync *barrier)
 {
+    // TODO: Implement this
+
+#if 0
+    BlueSkyFS *fs = inode->fs;
+
     if (block->type != BLUESKY_BLOCK_REF)
         return;
 
@@ -254,29 +264,34 @@ void bluesky_block_fetch(BlueSkyFS *fs, BlueSkyBlock *block,
 
     bluesky_store_async_unref(async);
     g_atomic_int_add(&fs->cache_total, 1);
+#endif
 }
 
 /* Write the given block to cloud-backed storage and mark it clean. */
-void bluesky_block_flush(BlueSkyFS *fs, BlueSkyBlock *block,
+void bluesky_block_flush(BlueSkyInode *inode, BlueSkyBlock *block,
                          GList **log_items)
 {
+    BlueSkyFS *fs = inode->fs;
+
     if (block->type != BLUESKY_BLOCK_DIRTY)
         return;
 
+    if (block->cloudref != NULL)
+        bluesky_cloudlog_unref(block->cloudref);
+
     BlueSkyRCStr *data = block->data;
 
     BlueSkyCloudLog *cloudlog = bluesky_cloudlog_new(fs);
-    gchar *name = bluesky_cloudlog_id_to_string(cloudlog->id);
     cloudlog->type = LOGTYPE_DATA;
-    cloudlog->inum = 0; //FIXME
+    cloudlog->inum = inode->inum;
     cloudlog->data = data;
     bluesky_string_ref(data);
     bluesky_cloudlog_sync(cloudlog);
     *log_items = g_list_prepend(*log_items, cloudlog);
     bluesky_cloudlog_insert(cloudlog);
 
-    g_free(block->ref);
-    block->ref = name;
+    block->cloudref = cloudlog;
+    bluesky_cloudlog_ref(cloudlog);
 
     block->type = BLUESKY_BLOCK_CACHED;
     g_atomic_int_add(&fs->cache_dirty, -1);
@@ -289,13 +304,14 @@ void bluesky_file_flush(BlueSkyInode *inode, GList **log_items)
 
     for (int i = 0; i < inode->blocks->len; i++) {
         BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i);
-        bluesky_block_flush(inode->fs, b, log_items);
+        bluesky_block_flush(inode, b, log_items);
     }
 }
 
 /* Drop clean data blocks for a file from cache. */
 void bluesky_file_drop_cached(BlueSkyInode *inode)
 {
+#if 0
     g_return_if_fail(inode->type == BLUESKY_REGULAR);
 
     for (int i = 0; i < inode->blocks->len; i++) {
@@ -315,4 +331,5 @@ void bluesky_file_drop_cached(BlueSkyInode *inode)
             g_atomic_int_add(&inode->fs->cache_total, -1);
         }
     }
+#endif
 }
index 0844f79..9d72f4d 100644 (file)
@@ -125,7 +125,7 @@ BlueSkyFS *bluesky_init_fs(gchar *name, BlueSkyStore *store)
     bluesky_insert_inode(fs, root);
     bluesky_inode_update_ctime(root, TRUE);
 
-    bluesky_inode_flush(fs, root);
+    bluesky_inode_do_sync(root);
     bluesky_superblock_flush(fs);
 
     return fs;
@@ -173,7 +173,7 @@ void bluesky_inode_unref(BlueSkyInode *inode)
                 if (b->type == BLUESKY_BLOCK_DIRTY) {
                     g_error("Deleting an inode with dirty file data!");
                 }
-                g_free(b->ref);
+                bluesky_cloudlog_unref(b->cloudref);
                 bluesky_string_unref(b->data);
             }
             g_array_unref(inode->blocks);
@@ -288,24 +288,6 @@ void bluesky_insert_inode(BlueSkyFS *fs, BlueSkyInode *inode)
     g_hash_table_insert(fs->inodes, &inode->inum, inode);
 }
 
-/* Deprecated: Synchronize an inode to stable storage. */
-void bluesky_inode_flush(BlueSkyFS *fs, BlueSkyInode *inode)
-{
-    GString *buf = g_string_new("");
-    bluesky_serialize_inode(buf, inode);
-    BlueSkyRCStr *data = bluesky_string_new_from_gstring(buf);
-
-    char key[64];
-    sprintf(key, "inode-%016"PRIx64, inode->inum);
-
-    BlueSkyStoreAsync *async = bluesky_store_async_new(fs->store);
-    async->op = STORE_OP_PUT;
-    async->key = g_strdup(key);
-    async->data = data;
-    bluesky_store_async_submit(async);
-    bluesky_store_async_unref(async);
-}
-
 /* Start writeback of an inode and all associated data. */
 void bluesky_inode_start_sync(BlueSkyInode *inode)
 {
@@ -334,12 +316,16 @@ void bluesky_inode_start_sync(BlueSkyInode *inode)
             if (b->type == BLUESKY_BLOCK_CACHED
                 || b->type == BLUESKY_BLOCK_REF)
             {
-                BlueSkyCloudID id = bluesky_cloudlog_id_from_string(b->ref);
+                BlueSkyCloudID id = b->cloudref->id;
                 g_array_append_val(cloudlog->pointers, id);
             }
         }
     }
 
+    if (inode->committed_item != NULL)
+        bluesky_cloudlog_unref(inode->committed_item);
+    inode->committed_item = cloudlog;
+
     bluesky_cloudlog_sync(cloudlog);
     log_items = g_list_prepend(log_items, cloudlog);
     bluesky_cloudlog_insert(cloudlog);
@@ -428,6 +414,7 @@ void bluesky_inode_fetch(BlueSkyFS *fs, uint64_t inum)
 /* Synchronize filesystem superblock to stable storage. */
 void bluesky_superblock_flush(BlueSkyFS *fs)
 {
+#if 0
     GString *buf = g_string_new("");
     bluesky_serialize_superblock(buf, fs);
     BlueSkyRCStr *data = bluesky_string_new_from_gstring(buf);
@@ -440,4 +427,5 @@ void bluesky_superblock_flush(BlueSkyFS *fs)
     bluesky_store_async_unref(async);
 
     //bluesky_store_sync(fs->store);
+#endif
 }
index e453f49..44233a1 100644 (file)
@@ -93,9 +93,11 @@ void bluesky_serialize_inode(GString *out, BlueSkyInode *inode)
         g_string_append_len(out, (gchar *)&size, sizeof(uint64_t));
         for (int i = 0; i < inode->blocks->len; i++) {
             BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i);
-            if (b->ref != NULL)
-                g_string_append(out, b->ref);
-            g_string_append_c(out, '\0');
+            BlueSkyCloudID id;
+            memset(&id, 0, sizeof(id));
+            if (b->cloudref != NULL)
+                id = b->cloudref->id;
+            g_string_append_len(out, (const char *)&id, sizeof(id));
         }
         break;
     }
@@ -178,12 +180,15 @@ gboolean bluesky_deserialize_inode(BlueSkyInode *inode, const gchar *buf)
         g_array_set_size(inode->blocks,
                          (inode->size + BLUESKY_BLOCK_SIZE - 1)
                           / BLUESKY_BLOCK_SIZE);
+        // TODO
+#if 0
         for (int i = 0; i < inode->blocks->len; i++) {
             BlueSkyBlock *b = &g_array_index(inode->blocks, BlueSkyBlock, i);
             b->type = BLUESKY_BLOCK_REF;
             b->ref = g_strdup(buf);
             buf += strlen(b->ref) + 1;
         }
+#endif
         break;
 
     case BLUESKY_DIRECTORY: