From afc95f0ab2900a311802028f1b811867fa3c0c31 Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Sun, 5 Dec 2010 20:46:49 -0800 Subject: [PATCH 1/1] Rework the checkpoint record format to include a version vector. This will be used for keeping track of whether we have incorporated changes made by the cleaner or not, and is a first step (along with having the clenaer write to the a separate set of logs) to making a functional online cleaner. --- bluesky/bluesky-private.h | 12 +++++++ bluesky/imap.c | 51 +++++++++++++++++++++++++++-- bluesky/inode.c | 1 + cleaner/cleaner | 68 ++++++++++++++++++++++++++++----------- 4 files changed, 111 insertions(+), 21 deletions(-) diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index f945a93..20079da 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -207,6 +207,12 @@ void bluesky_file_drop_cached(BlueSkyInode *inode); * various pieces of data (both where in the cloud and where cached locally). * */ +/* Eventually we'll want to support multiple writers. But for now, hard-code + * separate namespaces in the cloud for the proxy and the cleaner to write to. + * */ +#define BLUESKY_CLOUD_DIR_PRIMARY 0 +#define BLUESKY_CLOUD_DIR_CLEANER 1 + typedef struct { char bytes[16]; } BlueSkyCloudID; @@ -319,6 +325,12 @@ struct BlueSkyCloudLogState { GList *inode_list; GSList *writeback_list; // Items which are being serialized right now GList *pending_segments; // Segments which are being uploaded now + + /* What is the most recent sequence number written by the cleaner which we + * have processed and incorporated into our own log? This gets + * incorporated into the version vector written out with our checkpoint + * records. */ + int latest_cleaner_seq_seen; }; gboolean bluesky_cloudlog_equal(gconstpointer a, gconstpointer b); diff --git a/bluesky/imap.c b/bluesky/imap.c index 7181a1e..ef48e77 100644 --- a/bluesky/imap.c +++ b/bluesky/imap.c @@ -15,6 +15,10 @@ #include "bluesky-private.h" +/* Magic number at the start of the checkpoint record, to check for version + * mismatches. */ +#define CHECKPOINT_MAGIC 0x7ad7dafb42a498b4ULL + /* Inode maps. There is both an in-memory representation as well as the * serialized form in the cloud. * @@ -165,6 +169,21 @@ BlueSkyCloudLog *bluesky_inode_map_serialize(BlueSkyFS *fs) log->type = LOGTYPE_CHECKPOINT; log->inum = 0; + /* The checkpoint record starts with a magic number, followed by the + * version vector which lists the latest sequence number of all other logs + * (currently, only the cleaner) which have been seen. */ + uint64_t magic = GUINT64_TO_LE(CHECKPOINT_MAGIC); + g_string_append_len(buf, (const char *)&magic, sizeof(magic)); + uint32_t versions; + versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen >= 0); + g_string_append_len(buf, (const char *)&versions, sizeof(versions)); + if (fs->log_state->latest_cleaner_seq_seen >= 0) { + versions = GUINT32_TO_LE(BLUESKY_CLOUD_DIR_CLEANER); + g_string_append_len(buf, (const char *)&versions, sizeof(versions)); + versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen); + g_string_append_len(buf, (const char *)&versions, sizeof(versions)); + } + GSequenceIter *i = g_sequence_get_begin_iter(fs->inode_map); while (!g_sequence_iter_is_end(i)) { InodeMapRange *range = (InodeMapRange *)g_sequence_get(i); @@ -234,7 +253,30 @@ static void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap) g_mutex_lock(imap->lock); bluesky_cloudlog_fetch(imap); g_assert(imap->data != NULL); - g_assert(imap->data->len == 16 * imap->links->len); + g_assert(imap->data->len >= 12); + uint64_t magic; + uint32_t vector_data; + memcpy((char *)&magic, imap->data->data, sizeof(magic)); + g_assert(GUINT64_FROM_LE(magic) == CHECKPOINT_MAGIC); + memcpy((char *)&vector_data, imap->data->data + 8, sizeof(vector_data)); + g_assert(GUINT32_FROM_LE(vector_data) <= 2); + + int vector_size = GUINT32_FROM_LE(vector_data); + g_assert(imap->data->len == 16 * imap->links->len + 12 + 8 * vector_size); + + for (int i = 0; i < vector_size; i++) { + memcpy((char *)&vector_data, imap->data->data + 12 + 8*i, + sizeof(vector_data)); + if (GUINT32_FROM_LE(vector_data) == 1) { + memcpy((char *)&vector_data, imap->data->data + 16 + 8*i, + sizeof(vector_data)); + fs->log_state->latest_cleaner_seq_seen + = GUINT32_FROM_LE(vector_data); + g_print("Deserializing checkpoint: last cleaner sequence is %d\n", + GUINT32_FROM_LE(vector_data)); + } + } + //uint64_t *inum_range = (uint64_t *)imap->data->data; for (int i = 0; i < imap->links->len; i++) { //int64_t start = GUINT64_FROM_LE(*inum_range++); @@ -267,7 +309,12 @@ static void bluesky_inode_map_deserialize(BlueSkyFS *fs, BlueSkyCloudLog *imap) * whether a checkpoint was found and loaded or not. */ gboolean bluesky_checkpoint_load(BlueSkyFS *fs) { - char *last_segment = bluesky_store_lookup_last(fs->store, "log-"); + g_print("Claiming cloud log directory: %d\n", + fs->log_state->location.directory); + char *prefix = g_strdup_printf("log-%08d", + fs->log_state->location.directory); + char *last_segment = bluesky_store_lookup_last(fs->store, prefix); + g_free(prefix); if (last_segment == NULL) return FALSE; diff --git a/bluesky/inode.c b/bluesky/inode.c index 8cd05a0..ae41d69 100644 --- a/bluesky/inode.c +++ b/bluesky/inode.c @@ -95,6 +95,7 @@ BlueSkyFS *bluesky_new_fs(gchar *name) fs->log_state = g_new0(BlueSkyCloudLogState, 1); fs->log_state->data = g_string_new(""); + fs->log_state->latest_cleaner_seq_seen = -1; bluesky_cloudlog_threads_init(fs); fs->inode_fetch_thread_pool = g_thread_pool_new(inode_fetch_task, NULL, diff --git a/cleaner/cleaner b/cleaner/cleaner index c4b1222..b148b95 100755 --- a/cleaner/cleaner +++ b/cleaner/cleaner @@ -19,6 +19,8 @@ HEADER_MAGIC1 = 'AgI-' # Unencrypted data HEADER_MAGIC2 = 'AgI=' # Encrypted data HEADER_SIZE = struct.calcsize(HEADER_FORMAT) +CHECKPOINT_MAGIC = struct.pack(' 0: fp.seek(offset) - if legnth is None: + if length is None: return fp.read() else: return fp.read(length) @@ -93,9 +96,10 @@ class S3Backend: self.conn = boto.connect_s3(is_secure=False) self.bucket = self.conn.get_bucket(self.bucket_name) - def list(self): + def list(self, directory=0): files = [] - for k in self.bucket.list(self.path + 'log-'): + prefix = "log-%08d-" % (directory,) + for k in self.bucket.list(self.path + prefix): files.append((k.key, k.size)) return files @@ -208,7 +212,8 @@ class LogDirectory: self.backend = backend self.dir_num = dir self.seq_num = 0 - for logname in backend.list(): + for logname in backend.list(dir): + print "Old log file:", logname loc = backend.name_to_loc(logname[0]) if loc is not None and loc[0] == dir: self.seq_num = max(self.seq_num, loc[1] + 1) @@ -241,7 +246,7 @@ class UtilizationTracker: def __init__(self, backend): self.segments = {} - for (segment, size) in backend.list(): + for (segment, size) in backend.list(0) + backend.list(1): self.segments[segment] = [size, 0] def add_item(self, item): @@ -327,8 +332,8 @@ def parse_log(data, location=None): if item is not None: yield item offset += size -def load_checkpoint_record(backend): - for (log, size) in reversed(backend.list()): +def load_checkpoint_record(backend, directory=0): + for (log, size) in reversed(backend.list(directory)): for item in reversed(list(parse_log(backend.read(log), log))): print item if item.type == ITEM_TYPE.CHECKPOINT: @@ -343,6 +348,7 @@ class InodeMap: This will also build up information about segment utilization.""" + self.version_vector = {} self.checkpoint_record = checkpoint_record util = UtilizationTracker(backend) @@ -350,9 +356,25 @@ class InodeMap: inodes = {} self.obsolete_segments = set() + data = checkpoint_record.data + if not data.startswith(CHECKPOINT_MAGIC): + raise ValueError, "Invalid checkpoint record!" + data = data[len(CHECKPOINT_MAGIC):] + (vvlen,) = struct.unpack_from("