#include "bluesky-private.h"
+/* Magic number at the start of the checkpoint record, to check for version
+ * mismatches. */
+#define CHECKPOINT_MAGIC 0x7ad7dafb42a498b4ULL
+
/* Inode maps. There is both an in-memory representation as well as the
* serialized form in the cloud.
*
log->type = LOGTYPE_CHECKPOINT;
log->inum = 0;
+ /* The checkpoint record starts with a magic number, followed by the
+ * version vector which lists the latest sequence number of all other logs
+ * (currently, only the cleaner) which have been seen. */
+ uint64_t magic = GUINT64_TO_LE(CHECKPOINT_MAGIC);
+ g_string_append_len(buf, (const char *)&magic, sizeof(magic));
+ uint32_t versions;
+ versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen >= 0);
+ g_string_append_len(buf, (const char *)&versions, sizeof(versions));
+ if (fs->log_state->latest_cleaner_seq_seen >= 0) {
+ versions = GUINT32_TO_LE(BLUESKY_CLOUD_DIR_CLEANER);
+ g_string_append_len(buf, (const char *)&versions, sizeof(versions));
+ versions = GUINT32_TO_LE(fs->log_state->latest_cleaner_seq_seen);
+ g_string_append_len(buf, (const char *)&versions, sizeof(versions));
+ }
+
GSequenceIter *i = g_sequence_get_begin_iter(fs->inode_map);
while (!g_sequence_iter_is_end(i)) {
InodeMapRange *range = (InodeMapRange *)g_sequence_get(i);
g_mutex_lock(imap->lock);
bluesky_cloudlog_fetch(imap);
g_assert(imap->data != NULL);
- g_assert(imap->data->len == 16 * imap->links->len);
+ g_assert(imap->data->len >= 12);
+ uint64_t magic;
+ uint32_t vector_data;
+ memcpy((char *)&magic, imap->data->data, sizeof(magic));
+ g_assert(GUINT64_FROM_LE(magic) == CHECKPOINT_MAGIC);
+ memcpy((char *)&vector_data, imap->data->data + 8, sizeof(vector_data));
+ g_assert(GUINT32_FROM_LE(vector_data) <= 2);
+
+ int vector_size = GUINT32_FROM_LE(vector_data);
+ g_assert(imap->data->len == 16 * imap->links->len + 12 + 8 * vector_size);
+
+ for (int i = 0; i < vector_size; i++) {
+ memcpy((char *)&vector_data, imap->data->data + 12 + 8*i,
+ sizeof(vector_data));
+ if (GUINT32_FROM_LE(vector_data) == 1) {
+ memcpy((char *)&vector_data, imap->data->data + 16 + 8*i,
+ sizeof(vector_data));
+ fs->log_state->latest_cleaner_seq_seen
+ = GUINT32_FROM_LE(vector_data);
+ g_print("Deserializing checkpoint: last cleaner sequence is %d\n",
+ GUINT32_FROM_LE(vector_data));
+ }
+ }
+
//uint64_t *inum_range = (uint64_t *)imap->data->data;
for (int i = 0; i < imap->links->len; i++) {
//int64_t start = GUINT64_FROM_LE(*inum_range++);
* whether a checkpoint was found and loaded or not. */
gboolean bluesky_checkpoint_load(BlueSkyFS *fs)
{
- char *last_segment = bluesky_store_lookup_last(fs->store, "log-");
+ g_print("Claiming cloud log directory: %d\n",
+ fs->log_state->location.directory);
+ char *prefix = g_strdup_printf("log-%08d",
+ fs->log_state->location.directory);
+ char *last_segment = bluesky_store_lookup_last(fs->store, prefix);
+ g_free(prefix);
if (last_segment == NULL)
return FALSE;
HEADER_MAGIC2 = 'AgI=' # Encrypted data
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
+CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
+
class ITEM_TYPE:
DATA = '1'
INODE = '2'
def __init__(self, path):
self.path = path
- def list(self):
+ def list(self, directory=0):
"""Return a listing of all log segments and their sizes."""
- files = [f for f in os.listdir(self.path) if f.startswith('log-')]
+ prefix = "log-%08d-" % (directory,)
+ files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
files.sort()
return [(f, os.stat(os.path.join(self.path, f)).st_size)
fp = open(os.path.join(self.path, filename), 'rb')
if offset > 0:
fp.seek(offset)
- if legnth is None:
+ if length is None:
return fp.read()
else:
return fp.read(length)
self.conn = boto.connect_s3(is_secure=False)
self.bucket = self.conn.get_bucket(self.bucket_name)
- def list(self):
+ def list(self, directory=0):
files = []
- for k in self.bucket.list(self.path + 'log-'):
+ prefix = "log-%08d-" % (directory,)
+ for k in self.bucket.list(self.path + prefix):
files.append((k.key, k.size))
return files
self.backend = backend
self.dir_num = dir
self.seq_num = 0
- for logname in backend.list():
+ for logname in backend.list(dir):
+ print "Old log file:", logname
loc = backend.name_to_loc(logname[0])
if loc is not None and loc[0] == dir:
self.seq_num = max(self.seq_num, loc[1] + 1)
def __init__(self, backend):
self.segments = {}
- for (segment, size) in backend.list():
+ for (segment, size) in backend.list(0) + backend.list(1):
self.segments[segment] = [size, 0]
def add_item(self, item):
if item is not None: yield item
offset += size
-def load_checkpoint_record(backend):
- for (log, size) in reversed(backend.list()):
+def load_checkpoint_record(backend, directory=0):
+ for (log, size) in reversed(backend.list(directory)):
for item in reversed(list(parse_log(backend.read(log), log))):
print item
if item.type == ITEM_TYPE.CHECKPOINT:
This will also build up information about segment utilization."""
+ self.version_vector = {}
self.checkpoint_record = checkpoint_record
util = UtilizationTracker(backend)
inodes = {}
self.obsolete_segments = set()
+ data = checkpoint_record.data
+ if not data.startswith(CHECKPOINT_MAGIC):
+ raise ValueError, "Invalid checkpoint record!"
+ data = data[len(CHECKPOINT_MAGIC):]
+ (vvlen,) = struct.unpack_from("<I", data, 0)
+ self.vvsize = 4 + 8*vvlen
+ for i in range(vvlen):
+ (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
+ self.version_vector[v1] = v2
+ print self.version_vector
+ self.version_vector[checkpoint_record.location[0]] \
+ = checkpoint_record.location[1]
+ print self.version_vector
+
+ data = data[self.vvsize:]
+
print "Inode map:"
- for i in range(len(checkpoint_record.data) // 16):
- (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
+ for i in range(len(data) // 16):
+ (start, end) = struct.unpack_from("<QQ", data, 16*i)
imap = load_item(backend, checkpoint_record.links[i][1])
util.add_item(imap)
print "[%d, %d]: %s" % (start, end, imap)
for (s, u) in sorted(util.segments.items()):
print "%s: %s %s" % (s, u, float(u[1]) / u[0])
if u[1] == 0:
- print "Deleting..."
- backend.delete(s)
+ # print "Deleting..."
+ # backend.delete(s)
+ pass
self.inodes = inodes
self.util = util
new_checkpoint.id = LogItem.random_id()
new_checkpoint.inum = 0
new_checkpoint.type = ITEM_TYPE.CHECKPOINT
- new_checkpoint.data = ""
+ new_checkpoint.data = CHECKPOINT_MAGIC
new_checkpoint.links = []
- for i in range(len(self.checkpoint_record.data) // 16):
- (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
+ new_checkpoint.data += struct.pack('<I', len(self.version_vector))
+ for d in sorted(self.version_vector):
+ new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
+
+ data = self.checkpoint_record.data[self.vvsize:]
+ for i in range(len(data) // 16):
+ (start, end) = struct.unpack_from("<QQ", data, 16*i)
- new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
+ new_checkpoint.data += data[16*i : 16*i + 16]
# Case 1: No inodes in this range of the old inode map have
# changed. Simply emit a new pointer to the same inode map block.
rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
if __name__ == '__main__':
- backend = S3Backend("mvrable-bluesky", cachedir=".")
+ #backend = S3Backend("mvrable-bluesky", cachedir=".")
+ backend = FileBackend(".")
chkpt = load_checkpoint_record(backend)
print backend.list()
imap = InodeMap()
imap.build(backend, chkpt)
print chkpt
- log_dir = LogDirectory(backend, 0)
+ log_dir = LogDirectory(backend, 1)
run_cleaner(backend, imap, log_dir)
+ print "Version vector:", imap.version_vector
imap.write(backend, log_dir)
log_dir.close_all()