Updates to the Python cleaner prototype.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Thu, 9 Sep 2010 04:03:31 +0000 (21:03 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Thu, 9 Sep 2010 04:03:31 +0000 (21:03 -0700)
This can now read in the old inode maps, rewrite inode data, and write out
an updated inode map/checkpoint.

cleaner/cleaner

index 7ebcc2b..e249ed2 100755 (executable)
@@ -8,7 +8,7 @@
 # Copyright (C) 2010  The Regents of the University of California
 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
 
-import base64, os, struct, sys
+import base64, os, re, struct, sys
 import boto
 
 # The BlueSky 'struct cloudlog_header' data type.
@@ -44,12 +44,85 @@ class FileBackend:
         fp = open(os.path.join(self.path, filename), 'rb')
         return fp.read()
 
+    def write(self, filename, data):
+        fp = open(os.path.join(self.path, filename), 'wb')
+        fp.write(data)
+        fp.close()
+
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
 
     def __str__(self):
         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
 
+    @staticmethod
+    def random_id():
+        return open('/dev/urandom').read(16)
+
+    def serialize(self):
+        link_ids = []
+        link_locs = []
+        for (i, l) in self.links:
+            link_ids.append(i)
+            if i != '\0' * 16:
+                link_locs.append(struct.pack('<IIII', *l))
+        link_ids = ''.join(link_ids)
+        link_locs = ''.join(link_locs)
+
+        header = struct.pack(HEADER_FORMAT,
+                             HEADER_MAGIC, ord(self.type), self.id, self.inum,
+                             len(self.data), len(link_ids), len(link_locs))
+        return header + self.data + link_ids + link_locs
+
+class LogSegment:
+    def __init__(self, backend, location):
+        self.backend = backend
+        self.location = location
+        self.data = []
+
+    def __len__(self):
+        return sum(len(s) for s in self.data)
+
+    def write(self, item):
+        data = item.serialize()
+        offset = len(self)
+        self.data.append(data)
+        item.location = self.location + (offset, len(data))
+
+    def close(self):
+        data = ''.join(self.data)
+        filename = "log-%08d-%08d" % (self.location)
+        print "Would write %d bytes of data to %s" % (len(data), filename)
+        self.backend.write(filename, data)
+
+class LogDirectory:
+    TARGET_SIZE = 4 << 20
+
+    def __init__(self, backend, dir, seq):
+        self.backend = backend
+        self.dir_num = dir
+        self.seq_num = seq
+        self.groups = {}
+
+    def open_segment(self):
+        seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
+        self.seq_num += 1
+        return seg
+
+    def write(self, item, segment_group=0):
+        if segment_group not in self.groups:
+            self.groups[segment_group] = self.open_segment()
+        seg = self.groups[segment_group]
+        seg.write(item)
+        if len(seg) >= LogDirectory.TARGET_SIZE:
+            seg.close()
+            del self.groups[segment_group]
+
+    def close_all(self):
+        for k in list(self.groups.keys()):
+            self.groups[k].close()
+            del self.groups[k]
+
 class UtilizationTracker:
     """A simple object that tracks what fraction of each segment is used.
 
@@ -83,6 +156,7 @@ def parse_item(data):
 
     item = LogItem()
     item.id = header[2]
+    item.inum = header[3]
     item.location = None
     item.type = chr(header[1])
     item.size = size
@@ -114,14 +188,15 @@ def load_item(backend, location):
     item.location = location
     return item
 
-def parse_log(data, logname=None):
+def parse_log(data, location=None):
     """Parse contents of a log file, yielding a sequence of log items."""
 
-    location = None
-    if logname is not None:
-        m = re.match(r"^log-(\d+)-(\d+)$", logname)
+    if isinstance(location, str):
+        m = re.match(r"^log-(\d+)-(\d+)$", location)
         if m:
             location = (int(m.group(1)), int(m.group(2)))
+        else:
+            location = None
 
     offset = 0
     while len(data) - offset >= HEADER_SIZE:
@@ -141,40 +216,123 @@ def parse_log(data, logname=None):
 
 def load_checkpoint_record(backend):
     for (log, size) in reversed(backend.list()):
-        for item in reversed(list(parse_log(backend.read(log)))):
+        for item in reversed(list(parse_log(backend.read(log), log))):
             if item.type == ITEM_TYPE.CHECKPOINT:
                 return item
 
-def build_inode_map(backend, checkpoint_record):
-    """Reconstruct the inode map, starting from the checkpoint record given.
-
-    This will also build up information about segment utilization."""
-
-    util = UtilizationTracker(backend)
-    util.add_item(checkpoint_record)
-
-    print "Inode map:"
-    for i in range(len(checkpoint_record.data) // 16):
-        (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
-        imap = load_item(backend, checkpoint_record.links[i][1])
-        util.add_item(imap)
-        print "[%d, %d]: %s" % (start, end, imap)
-        for j in range(len(imap.data) // 8):
-            (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
-            inode = load_item(backend, imap.links[j][1])
-            data_segments = set()
-            util.add_item(inode)
-            for i in inode.links:
-                util.add_item(i[1])
-                data_segments.add(i[1][0:2])
-            print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
-
-    print
-    print "Segment utilizations:"
-    for (s, u) in sorted(util.segments.items()):
-        print "%s: %s" % (s, u)
+class InodeMap:
+    def __init__(self):
+        pass
+
+    def build(self, backend, checkpoint_record):
+        """Reconstruct the inode map from the checkpoint record given.
+
+        This will also build up information about segment utilization."""
+
+        self.checkpoint_record = checkpoint_record
+
+        util = UtilizationTracker(backend)
+        util.add_item(checkpoint_record)
+        inodes = {}
+
+        print "Inode map:"
+        for i in range(len(checkpoint_record.data) // 16):
+            (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
+            imap = load_item(backend, checkpoint_record.links[i][1])
+            util.add_item(imap)
+            print "[%d, %d]: %s" % (start, end, imap)
+            for j in range(len(imap.data) // 8):
+                (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
+                inode = load_item(backend, imap.links[j][1])
+                inodes[inum] = inode
+                data_segments = set()
+                util.add_item(inode)
+                for i in inode.links:
+                    util.add_item(i[1])
+                    data_segments.add(i[1][0:2])
+                print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
+
+        print
+        print "Segment utilizations:"
+        for (s, u) in sorted(util.segments.items()):
+            #if u[1] > 0:
+            print "%s: %s %s" % (s, u, float(u[1]) / u[0])
+
+        self.inodes = inodes
+        self.util = util
+        self.updated_inodes = set()
+
+    def mark_updated(self, inum):
+        self.updated_inodes.add(inum)
+
+    def write(self, backend, log):
+        updated_inodes = sorted(self.updated_inodes, reverse=True)
+
+        new_checkpoint = LogItem()
+        new_checkpoint.id = LogItem.random_id()
+        new_checkpoint.inum = 0
+        new_checkpoint.type = ITEM_TYPE.CHECKPOINT
+        new_checkpoint.data = ""
+        new_checkpoint.links = []
+
+        for i in range(len(self.checkpoint_record.data) // 16):
+            (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
+
+            new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
+
+            # Case 1: No inodes in this range of the old inode map have
+            # changed.  Simply emit a new pointer to the same inode map block.
+            # TODO: Add the ability to rewrite the inode map block if we choose
+            # to do so for cleaning, even if no inodes have changed.
+            if len(updated_inodes) == 0 or updated_inodes[-1] > end:
+                new_checkpoint.links.append(self.checkpoint_record.links[i])
+                continue
+
+            # Case 2: Some inodes have been updated.  Create a new inode map
+            # block, write it out, and point the new checkpoint at it.
+            inodes = [k for k in self.inodes if k >= start and k <= end]
+            inodes.sort()
+
+            block = LogItem()
+            block.id = LogItem.random_id()
+            block.inum = 0
+            block.type = ITEM_TYPE.INODE_MAP
+            block.links = []
+            block.data = ""
+            for j in inodes:
+                block.data += struct.pack("<Q", j)
+                block.links.append((self.inodes[j].id, self.inodes[j].location))
+            log.write(block, 2)
+
+            new_checkpoint.links.append((block.id, block.location))
+
+            while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
+                updated_inodes.pop()
+
+        log.write(new_checkpoint, 2)
+        self.checkpoint_record = new_checkpoint
+
+def rewrite_inode(backend, inode_map, inum, log):
+    inode = inode_map.inodes[inum]
+    blocks = []
+    for l in inode.links:
+        data = load_item(backend, l[1])
+        blocks.append(data)
+        log.write(data, 0)
+    inode.links = [(b.id, b.location) for b in blocks]
+    log.write(inode, 1)
+    inode_map.mark_updated(inum)
 
 if __name__ == '__main__':
     backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
-    build_inode_map(backend, chkpt)
+    imap = InodeMap()
+    imap.build(backend, chkpt)
+    print chkpt
+
+    print repr(chkpt.serialize())
+
+    log_dir = LogDirectory(backend, 1, 0)
+    rewrite_inode(backend, imap, 147, log_dir)
+    imap.write(backend, log_dir)
+    log_dir.close_all()