Begin work on a segment cleaner prototype.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 8 Sep 2010 20:24:42 +0000 (13:24 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 8 Sep 2010 20:24:42 +0000 (13:24 -0700)
Right now this can rebuild an inode map and compute segment utilization,
though it isn't very efficient.

cleaner/cleaner [new file with mode: 0755]

diff --git a/cleaner/cleaner b/cleaner/cleaner
new file mode 100755 (executable)
index 0000000..7ebcc2b
--- /dev/null
@@ -0,0 +1,180 @@
+#!/usr/bin/env python
+#
+# A simple cleaner for the BlueSky cloud file system.  At the moment this is an
+# offline cleaner--the file system must not be in use at the time that the
+# cleaning is performed.  Later, it will be extended to be an online/concurrent
+# cleaner, where cleaning is performed even while the file system is mounted.
+#
+# Copyright (C) 2010  The Regents of the University of California
+# Written by Michael Vrable <mvrable@cs.ucsd.edu>
+
+import base64, os, struct, sys
+import boto
+
+# The BlueSky 'struct cloudlog_header' data type.
+HEADER_FORMAT = '<4sb16sQIII'
+HEADER_MAGIC = 'AgI-'
+HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
+
+class ITEM_TYPE:
+    DATA = '1'
+    INODE = '2'
+    INODE_MAP = '3'
+    CHECKPOINT = '4'
+
+class FileBackend:
+    """An interface to BlueSky where the log segments are on local disk.
+
+    This is mainly intended for testing purposes, as the real cleaner would
+    operate where data is being stored in S3."""
+
+    def __init__(self, path):
+        self.path = path
+
+    def list(self):
+        """Return a listing of all log segments and their sizes."""
+
+        files = [f for f in os.listdir(self.path) if f.startswith('log-')]
+        files.sort()
+
+        return [(f, os.stat(os.path.join(self.path, f)).st_size)
+                for f in files]
+
+    def read(self, filename):
+        fp = open(os.path.join(self.path, filename), 'rb')
+        return fp.read()
+
+class LogItem:
+    """In-memory representation of a single item stored in a log file."""
+
+    def __str__(self):
+        return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
+
+class UtilizationTracker:
+    """A simple object that tracks what fraction of each segment is used.
+
+    This data can be used to guide segment cleaning decisions."""
+
+    def __init__(self, backend):
+        self.segments = {}
+        for (segment, size) in backend.list():
+            self.segments[segment] = [size, 0]
+
+    def add_item(self, item):
+        if isinstance(item, LogItem):
+            item = item.location
+        if item is None: return
+        (dir, seq, offset, size) = item
+        filename = "log-%08d-%08d" % (dir, seq)
+        self.segments[filename][1] += size
+
+def parse_item(data):
+    if len(data) < HEADER_SIZE: return
+    header = struct.unpack_from(HEADER_FORMAT, data, 0)
+    size = HEADER_SIZE + sum(header[4:7])
+
+    if header[0] != HEADER_MAGIC:
+        print "Bad header magic!"
+        return
+
+    if len(data) != size:
+        print "Item size does not match!"
+        return
+
+    item = LogItem()
+    item.id = header[2]
+    item.location = None
+    item.type = chr(header[1])
+    item.size = size
+    item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
+    links = []
+    link_ids = data[HEADER_SIZE + header[4]
+                    : HEADER_SIZE + header[4] + header[5]]
+    link_locs = data[HEADER_SIZE + header[4] + header[5]
+                     : HEADER_SIZE + sum(header[4:7])]
+    for i in range(len(link_ids) // 16):
+        id = link_ids[16*i : 16*i + 16]
+        if id == '\0' * 16:
+            loc = None
+        else:
+            loc = struct.unpack('<IIII', link_locs[0:16])
+            link_locs = link_locs[16:]
+        links.append((id, loc))
+    item.links = links
+    return item
+
+def load_item(backend, location):
+    """Load the cloud item pointed at by the 4-tuple 'location'.
+
+    The elements of the tuple are (directory, sequence, offset, size)."""
+
+    filename = "log-%08d-%08d" % (location[0], location[1])
+    data = backend.read(filename)[location[2] : location[2] + location[3]]
+    item = parse_item(data)
+    item.location = location
+    return item
+
+def parse_log(data, logname=None):
+    """Parse contents of a log file, yielding a sequence of log items."""
+
+    location = None
+    if logname is not None:
+        m = re.match(r"^log-(\d+)-(\d+)$", logname)
+        if m:
+            location = (int(m.group(1)), int(m.group(2)))
+
+    offset = 0
+    while len(data) - offset >= HEADER_SIZE:
+        header = struct.unpack_from(HEADER_FORMAT, data, offset)
+        size = HEADER_SIZE + sum(header[4:7])
+        if header[0] != HEADER_MAGIC:
+            print "Bad header magic!"
+            break
+        if size + offset > len(data):
+            print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
+            break
+        item = parse_item(data[offset : offset + size])
+        if location is not None:
+            item.location = location + (offset, size)
+        if item is not None: yield item
+        offset += size
+
+def load_checkpoint_record(backend):
+    for (log, size) in reversed(backend.list()):
+        for item in reversed(list(parse_log(backend.read(log)))):
+            if item.type == ITEM_TYPE.CHECKPOINT:
+                return item
+
+def build_inode_map(backend, checkpoint_record):
+    """Reconstruct the inode map, starting from the checkpoint record given.
+
+    This will also build up information about segment utilization."""
+
+    util = UtilizationTracker(backend)
+    util.add_item(checkpoint_record)
+
+    print "Inode map:"
+    for i in range(len(checkpoint_record.data) // 16):
+        (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
+        imap = load_item(backend, checkpoint_record.links[i][1])
+        util.add_item(imap)
+        print "[%d, %d]: %s" % (start, end, imap)
+        for j in range(len(imap.data) // 8):
+            (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
+            inode = load_item(backend, imap.links[j][1])
+            data_segments = set()
+            util.add_item(inode)
+            for i in inode.links:
+                util.add_item(i[1])
+                data_segments.add(i[1][0:2])
+            print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
+
+    print
+    print "Segment utilizations:"
+    for (s, u) in sorted(util.segments.items()):
+        print "%s: %s" % (s, u)
+
+if __name__ == '__main__':
+    backend = FileBackend(".")
+    chkpt = load_checkpoint_record(backend)
+    build_inode_map(backend, chkpt)