--- /dev/null
+#!/usr/bin/env python
+#
+# A simple cleaner for the BlueSky cloud file system. At the moment this is an
+# offline cleaner--the file system must not be in use at the time that the
+# cleaning is performed. Later, it will be extended to be an online/concurrent
+# cleaner, where cleaning is performed even while the file system is mounted.
+#
+# Copyright (C) 2010 The Regents of the University of California
+# Written by Michael Vrable <mvrable@cs.ucsd.edu>
+
+import base64, os, struct, sys
+import boto
+
+# The BlueSky 'struct cloudlog_header' data type.
+HEADER_FORMAT = '<4sb16sQIII'
+HEADER_MAGIC = 'AgI-'
+HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
+
+class ITEM_TYPE:
+ DATA = '1'
+ INODE = '2'
+ INODE_MAP = '3'
+ CHECKPOINT = '4'
+
+class FileBackend:
+ """An interface to BlueSky where the log segments are on local disk.
+
+ This is mainly intended for testing purposes, as the real cleaner would
+ operate where data is being stored in S3."""
+
+ def __init__(self, path):
+ self.path = path
+
+ def list(self):
+ """Return a listing of all log segments and their sizes."""
+
+ files = [f for f in os.listdir(self.path) if f.startswith('log-')]
+ files.sort()
+
+ return [(f, os.stat(os.path.join(self.path, f)).st_size)
+ for f in files]
+
+ def read(self, filename):
+ fp = open(os.path.join(self.path, filename), 'rb')
+ return fp.read()
+
+class LogItem:
+ """In-memory representation of a single item stored in a log file."""
+
+ def __str__(self):
+ return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
+
+class UtilizationTracker:
+ """A simple object that tracks what fraction of each segment is used.
+
+ This data can be used to guide segment cleaning decisions."""
+
+ def __init__(self, backend):
+ self.segments = {}
+ for (segment, size) in backend.list():
+ self.segments[segment] = [size, 0]
+
+ def add_item(self, item):
+ if isinstance(item, LogItem):
+ item = item.location
+ if item is None: return
+ (dir, seq, offset, size) = item
+ filename = "log-%08d-%08d" % (dir, seq)
+ self.segments[filename][1] += size
+
+def parse_item(data):
+ if len(data) < HEADER_SIZE: return
+ header = struct.unpack_from(HEADER_FORMAT, data, 0)
+ size = HEADER_SIZE + sum(header[4:7])
+
+ if header[0] != HEADER_MAGIC:
+ print "Bad header magic!"
+ return
+
+ if len(data) != size:
+ print "Item size does not match!"
+ return
+
+ item = LogItem()
+ item.id = header[2]
+ item.location = None
+ item.type = chr(header[1])
+ item.size = size
+ item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
+ links = []
+ link_ids = data[HEADER_SIZE + header[4]
+ : HEADER_SIZE + header[4] + header[5]]
+ link_locs = data[HEADER_SIZE + header[4] + header[5]
+ : HEADER_SIZE + sum(header[4:7])]
+ for i in range(len(link_ids) // 16):
+ id = link_ids[16*i : 16*i + 16]
+ if id == '\0' * 16:
+ loc = None
+ else:
+ loc = struct.unpack('<IIII', link_locs[0:16])
+ link_locs = link_locs[16:]
+ links.append((id, loc))
+ item.links = links
+ return item
+
+def load_item(backend, location):
+ """Load the cloud item pointed at by the 4-tuple 'location'.
+
+ The elements of the tuple are (directory, sequence, offset, size)."""
+
+ filename = "log-%08d-%08d" % (location[0], location[1])
+ data = backend.read(filename)[location[2] : location[2] + location[3]]
+ item = parse_item(data)
+ item.location = location
+ return item
+
+def parse_log(data, logname=None):
+ """Parse contents of a log file, yielding a sequence of log items."""
+
+ location = None
+ if logname is not None:
+ m = re.match(r"^log-(\d+)-(\d+)$", logname)
+ if m:
+ location = (int(m.group(1)), int(m.group(2)))
+
+ offset = 0
+ while len(data) - offset >= HEADER_SIZE:
+ header = struct.unpack_from(HEADER_FORMAT, data, offset)
+ size = HEADER_SIZE + sum(header[4:7])
+ if header[0] != HEADER_MAGIC:
+ print "Bad header magic!"
+ break
+ if size + offset > len(data):
+ print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
+ break
+ item = parse_item(data[offset : offset + size])
+ if location is not None:
+ item.location = location + (offset, size)
+ if item is not None: yield item
+ offset += size
+
+def load_checkpoint_record(backend):
+ for (log, size) in reversed(backend.list()):
+ for item in reversed(list(parse_log(backend.read(log)))):
+ if item.type == ITEM_TYPE.CHECKPOINT:
+ return item
+
+def build_inode_map(backend, checkpoint_record):
+ """Reconstruct the inode map, starting from the checkpoint record given.
+
+ This will also build up information about segment utilization."""
+
+ util = UtilizationTracker(backend)
+ util.add_item(checkpoint_record)
+
+ print "Inode map:"
+ for i in range(len(checkpoint_record.data) // 16):
+ (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
+ imap = load_item(backend, checkpoint_record.links[i][1])
+ util.add_item(imap)
+ print "[%d, %d]: %s" % (start, end, imap)
+ for j in range(len(imap.data) // 8):
+ (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
+ inode = load_item(backend, imap.links[j][1])
+ data_segments = set()
+ util.add_item(inode)
+ for i in inode.links:
+ util.add_item(i[1])
+ data_segments.add(i[1][0:2])
+ print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
+
+ print
+ print "Segment utilizations:"
+ for (s, u) in sorted(util.segments.items()):
+ print "%s: %s" % (s, u)
+
+if __name__ == '__main__':
+ backend = FileBackend(".")
+ chkpt = load_checkpoint_record(backend)
+ build_inode_map(backend, chkpt)