Rework the checkpoint record format to include a version vector.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 6 Dec 2010 04:46:49 +0000 (20:46 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 6 Dec 2010 04:46:49 +0000 (20:46 -0800)
This will be used for keeping track of whether we have incorporated changes
made by the cleaner or not, and is a first step (along with having the
clenaer write to the a separate set of logs) to making a functional online
cleaner.

bluesky/bluesky-private.h
bluesky/imap.c
bluesky/inode.c
cleaner/cleaner

index f945a93..20079da 100644 (file)
@@ -207,6 +207,12 @@ void bluesky_file_drop_cached(BlueSkyInode *inode);
  * various pieces of data (both where in the cloud and where cached locally).
  * */
 
+/* Eventually we'll want to support multiple writers.  But for now, hard-code
+ * separate namespaces in the cloud for the proxy and the cleaner to write to.
+ * */
+#define BLUESKY_CLOUD_DIR_PRIMARY 0
+#define BLUESKY_CLOUD_DIR_CLEANER 1
+
 typedef struct {
     char bytes[16];
 } BlueSkyCloudID;
@@ -319,6 +325,12 @@ struct BlueSkyCloudLogState {
     GList *inode_list;
     GSList *writeback_list;     // Items which are being serialized right now
     GList *pending_segments;    // Segments which are being uploaded now
+
+    /* What is the most recent sequence number written by the cleaner which we
+     * have processed and incorporated into our own log?  This gets
+     * incorporated into the version vector written out with our checkpoint
+     * records. */
+    int latest_cleaner_seq_seen;
 };
 
 gboolean bluesky_cloudlog_equal(gconstpointer a, gconstpointer b);
index 7181a1e..ef48e77 100644 (file)
 
 #include "bluesky-private.h"
 
+/* Magic number at the start of the checkpoint record, to check for version
+ * mismatches. */
+#define CHECKPOINT_MAGIC 0x7ad7dafb42a498b4ULL
+
 /* Inode maps.  There is both an in-memory representation as well as the
  * serialized form in the cloud.
  *
@@ -165,6 +169,21 @@ BlueSkyCloudLog *bluesky_inode_map_serialize(BlueSkyFS *fs)
     log->type = LOGTYPE_CHECKPOINT;
     log->inum = 0;
 
+    /* The checkpoint record starts with a magic number, followed by the
+     * version vector which lists the latest sequence number of all other logs
+     * (currently, only the cleaner) which have been seen. */
+    uint64_t magic = GUINT64_TO_LE(CHECKPOINT_MAGIC);
+    g_string_append_len(buf, (const char *)&magic, sizeof(magic));
+    uint32_t versions;
+    versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen >= 0);
+    g_string_append_len(buf, (const char *)&versions, sizeof(versions));
+    if (fs->log_state->latest_cleaner_seq_seen >= 0) {
+        versions = GUINT32_TO_LE(BLUESKY_CLOUD_DIR_CLEANER);
+        g_string_append_len(buf, (const char *)&versions, sizeof(versions));
+        versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen);
+        g_string_append_len(buf, (const char *)&versions, sizeof(versions));
+    }
+
     GSequenceIter *i = g_sequence_get_begin_iter(fs->inode_map);
     while (!g_sequence_iter_is_end(i)) {
         InodeMapRange *range = (InodeMapRange *)g_sequence_get(i);
@@ -234,7 +253,30 @@ static void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap)
     g_mutex_lock(imap->lock);
     bluesky_cloudlog_fetch(imap);
     g_assert(imap->data != NULL);
-    g_assert(imap->data->len == 16 * imap->links->len);
+    g_assert(imap->data->len >= 12);
+    uint64_t magic;
+    uint32_t vector_data;
+    memcpy((char *)&magic, imap->data->data, sizeof(magic));
+    g_assert(GUINT64_FROM_LE(magic) == CHECKPOINT_MAGIC);
+    memcpy((char *)&vector_data, imap->data->data + 8, sizeof(vector_data));
+    g_assert(GUINT32_FROM_LE(vector_data) <= 2);
+
+    int vector_size = GUINT32_FROM_LE(vector_data);
+    g_assert(imap->data->len == 16 * imap->links->len + 12 + 8 * vector_size);
+
+    for (int i = 0; i < vector_size; i++) {
+        memcpy((char *)&vector_data, imap->data->data + 12 + 8*i,
+               sizeof(vector_data));
+        if (GUINT32_FROM_LE(vector_data) == 1) {
+            memcpy((char *)&vector_data, imap->data->data + 16 + 8*i,
+                   sizeof(vector_data));
+            fs->log_state->latest_cleaner_seq_seen
+                = GUINT32_FROM_LE(vector_data);
+            g_print("Deserializing checkpoint: last cleaner sequence is %d\n",
+                    GUINT32_FROM_LE(vector_data));
+        }
+    }
+
     //uint64_t *inum_range = (uint64_t *)imap->data->data;
     for (int i = 0; i < imap->links->len; i++) {
         //int64_t start = GUINT64_FROM_LE(*inum_range++);
@@ -267,7 +309,12 @@ static void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap)
  * whether a checkpoint was found and loaded or not. */
 gboolean bluesky_checkpoint_load(BlueSkyFS *fs)
 {
-    char *last_segment = bluesky_store_lookup_last(fs->store, "log-");
+    g_print("Claiming cloud log directory: %d\n",
+            fs->log_state->location.directory);
+    char *prefix = g_strdup_printf("log-%08d",
+                                   fs->log_state->location.directory);
+    char *last_segment = bluesky_store_lookup_last(fs->store, prefix);
+    g_free(prefix);
     if (last_segment == NULL)
         return FALSE;
 
index 8cd05a0..ae41d69 100644 (file)
@@ -95,6 +95,7 @@ BlueSkyFS *bluesky_new_fs(gchar *name)
 
     fs->log_state = g_new0(BlueSkyCloudLogState, 1);
     fs->log_state->data = g_string_new("");
+    fs->log_state->latest_cleaner_seq_seen = -1;
 
     bluesky_cloudlog_threads_init(fs);
     fs->inode_fetch_thread_pool = g_thread_pool_new(inode_fetch_task, NULL,
index c4b1222..b148b95 100755 (executable)
@@ -19,6 +19,8 @@ HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
 HEADER_MAGIC2 = 'AgI='          # Encrypted data
 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
+CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
+
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
@@ -34,10 +36,11 @@ class FileBackend:
     def __init__(self, path):
         self.path = path
 
-    def list(self):
+    def list(self, directory=0):
         """Return a listing of all log segments and their sizes."""
 
-        files = [f for f in os.listdir(self.path) if f.startswith('log-')]
+        prefix = "log-%08d-" % (directory,)
+        files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
         files.sort()
 
         return [(f, os.stat(os.path.join(self.path, f)).st_size)
@@ -47,7 +50,7 @@ class FileBackend:
         fp = open(os.path.join(self.path, filename), 'rb')
         if offset > 0:
             fp.seek(offset)
-        if legnth is None:
+        if length is None:
             return fp.read()
         else:
             return fp.read(length)
@@ -93,9 +96,10 @@ class S3Backend:
         self.conn = boto.connect_s3(is_secure=False)
         self.bucket = self.conn.get_bucket(self.bucket_name)
 
-    def list(self):
+    def list(self, directory=0):
         files = []
-        for k in self.bucket.list(self.path + 'log-'):
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
             files.append((k.key, k.size))
         return files
 
@@ -208,7 +212,8 @@ class LogDirectory:
         self.backend = backend
         self.dir_num = dir
         self.seq_num = 0
-        for logname in backend.list():
+        for logname in backend.list(dir):
+            print "Old log file:", logname
             loc = backend.name_to_loc(logname[0])
             if loc is not None and loc[0] == dir:
                 self.seq_num = max(self.seq_num, loc[1] + 1)
@@ -241,7 +246,7 @@ class UtilizationTracker:
 
     def __init__(self, backend):
         self.segments = {}
-        for (segment, size) in backend.list():
+        for (segment, size) in backend.list(0) + backend.list(1):
             self.segments[segment] = [size, 0]
 
     def add_item(self, item):
@@ -327,8 +332,8 @@ def parse_log(data, location=None):
         if item is not None: yield item
         offset += size
 
-def load_checkpoint_record(backend):
-    for (log, size) in reversed(backend.list()):
+def load_checkpoint_record(backend, directory=0):
+    for (log, size) in reversed(backend.list(directory)):
         for item in reversed(list(parse_log(backend.read(log), log))):
             print item
             if item.type == ITEM_TYPE.CHECKPOINT:
@@ -343,6 +348,7 @@ class InodeMap:
 
         This will also build up information about segment utilization."""
 
+        self.version_vector = {}
         self.checkpoint_record = checkpoint_record
 
         util = UtilizationTracker(backend)
@@ -350,9 +356,25 @@ class InodeMap:
         inodes = {}
         self.obsolete_segments = set()
 
+        data = checkpoint_record.data
+        if not data.startswith(CHECKPOINT_MAGIC):
+            raise ValueError, "Invalid checkpoint record!"
+        data = data[len(CHECKPOINT_MAGIC):]
+        (vvlen,) = struct.unpack_from("<I", data, 0)
+        self.vvsize = 4 + 8*vvlen
+        for i in range(vvlen):
+            (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
+            self.version_vector[v1] = v2
+        print self.version_vector
+        self.version_vector[checkpoint_record.location[0]] \
+            = checkpoint_record.location[1]
+        print self.version_vector
+
+        data = data[self.vvsize:]
+
         print "Inode map:"
-        for i in range(len(checkpoint_record.data) // 16):
-            (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
             imap = load_item(backend, checkpoint_record.links[i][1])
             util.add_item(imap)
             print "[%d, %d]: %s" % (start, end, imap)
@@ -372,8 +394,9 @@ class InodeMap:
         for (s, u) in sorted(util.segments.items()):
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             if u[1] == 0:
-                print "Deleting..."
-                backend.delete(s)
+                # print "Deleting..."
+                # backend.delete(s)
+                pass
 
         self.inodes = inodes
         self.util = util
@@ -389,13 +412,18 @@ class InodeMap:
         new_checkpoint.id = LogItem.random_id()
         new_checkpoint.inum = 0
         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
-        new_checkpoint.data = ""
+        new_checkpoint.data = CHECKPOINT_MAGIC
         new_checkpoint.links = []
 
-        for i in range(len(self.checkpoint_record.data) // 16):
-            (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
+        new_checkpoint.data += struct.pack('<I', len(self.version_vector))
+        for d in sorted(self.version_vector):
+            new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
+
+        data = self.checkpoint_record.data[self.vvsize:]
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
 
-            new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
+            new_checkpoint.data += data[16*i : 16*i + 16]
 
             # Case 1: No inodes in this range of the old inode map have
             # changed.  Simply emit a new pointer to the same inode map block.
@@ -482,14 +510,16 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
-    backend = S3Backend("mvrable-bluesky", cachedir=".")
+    #backend = S3Backend("mvrable-bluesky", cachedir=".")
+    backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
     print backend.list()
     imap = InodeMap()
     imap.build(backend, chkpt)
     print chkpt
 
-    log_dir = LogDirectory(backend, 0)
+    log_dir = LogDirectory(backend, 1)
     run_cleaner(backend, imap, log_dir)
+    print "Version vector:", imap.version_vector
     imap.write(backend, log_dir)
     log_dir.close_all()