#!/usr/bin/env python # # A simple cleaner for the BlueSky cloud file system. At the moment this is an # offline cleaner--the file system must not be in use at the time that the # cleaning is performed. Later, it will be extended to be an online/concurrent # cleaner, where cleaning is performed even while the file system is mounted. # # Copyright (C) 2010 The Regents of the University of California # Written by Michael Vrable import base64, os, re, struct, sys import boto # The BlueSky 'struct cloudlog_header' data type. HEADER_FORMAT = '<4sb16sQIII' HEADER_MAGIC = 'AgI-' HEADER_SIZE = struct.calcsize(HEADER_FORMAT) class ITEM_TYPE: DATA = '1' INODE = '2' INODE_MAP = '3' CHECKPOINT = '4' class FileBackend: """An interface to BlueSky where the log segments are on local disk. This is mainly intended for testing purposes, as the real cleaner would operate where data is being stored in S3.""" def __init__(self, path): self.path = path def list(self): """Return a listing of all log segments and their sizes.""" files = [f for f in os.listdir(self.path) if f.startswith('log-')] files.sort() return [(f, os.stat(os.path.join(self.path, f)).st_size) for f in files] def read(self, filename): fp = open(os.path.join(self.path, filename), 'rb') return fp.read() def write(self, filename, data): fp = open(os.path.join(self.path, filename), 'wb') fp.write(data) fp.close() class LogItem: """In-memory representation of a single item stored in a log file.""" def __str__(self): return "" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8]) @staticmethod def random_id(): return open('/dev/urandom').read(16) def serialize(self): link_ids = [] link_locs = [] for (i, l) in self.links: link_ids.append(i) if i != '\0' * 16: link_locs.append(struct.pack('= LogDirectory.TARGET_SIZE: seg.close() del self.groups[segment_group] def close_all(self): for k in list(self.groups.keys()): self.groups[k].close() del self.groups[k] class UtilizationTracker: """A simple object that tracks what fraction of each segment is used. This data can be used to guide segment cleaning decisions.""" def __init__(self, backend): self.segments = {} for (segment, size) in backend.list(): self.segments[segment] = [size, 0] def add_item(self, item): if isinstance(item, LogItem): item = item.location if item is None: return (dir, seq, offset, size) = item filename = "log-%08d-%08d" % (dir, seq) self.segments[filename][1] += size def parse_item(data): if len(data) < HEADER_SIZE: return header = struct.unpack_from(HEADER_FORMAT, data, 0) size = HEADER_SIZE + sum(header[4:7]) if header[0] != HEADER_MAGIC: print "Bad header magic!" return if len(data) != size: print "Item size does not match!" return item = LogItem() item.id = header[2] item.inum = header[3] item.location = None item.type = chr(header[1]) item.size = size item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]] links = [] link_ids = data[HEADER_SIZE + header[4] : HEADER_SIZE + header[4] + header[5]] link_locs = data[HEADER_SIZE + header[4] + header[5] : HEADER_SIZE + sum(header[4:7])] for i in range(len(link_ids) // 16): id = link_ids[16*i : 16*i + 16] if id == '\0' * 16: loc = None else: loc = struct.unpack('= HEADER_SIZE: header = struct.unpack_from(HEADER_FORMAT, data, offset) size = HEADER_SIZE + sum(header[4:7]) if header[0] != HEADER_MAGIC: print "Bad header magic!" break if size + offset > len(data): print "Short data record at end of log: %s < %s" % (len(data) - offset, size) break item = parse_item(data[offset : offset + size]) if location is not None: item.location = location + (offset, size) if item is not None: yield item offset += size def load_checkpoint_record(backend): for (log, size) in reversed(backend.list()): for item in reversed(list(parse_log(backend.read(log), log))): if item.type == ITEM_TYPE.CHECKPOINT: return item class InodeMap: def __init__(self): pass def build(self, backend, checkpoint_record): """Reconstruct the inode map from the checkpoint record given. This will also build up information about segment utilization.""" self.checkpoint_record = checkpoint_record util = UtilizationTracker(backend) util.add_item(checkpoint_record) inodes = {} print "Inode map:" for i in range(len(checkpoint_record.data) // 16): (start, end) = struct.unpack_from(" 0: print "%s: %s %s" % (s, u, float(u[1]) / u[0]) self.inodes = inodes self.util = util self.updated_inodes = set() def mark_updated(self, inum): self.updated_inodes.add(inum) def write(self, backend, log): updated_inodes = sorted(self.updated_inodes, reverse=True) new_checkpoint = LogItem() new_checkpoint.id = LogItem.random_id() new_checkpoint.inum = 0 new_checkpoint.type = ITEM_TYPE.CHECKPOINT new_checkpoint.data = "" new_checkpoint.links = [] for i in range(len(self.checkpoint_record.data) // 16): (start, end) = struct.unpack_from(" end: new_checkpoint.links.append(self.checkpoint_record.links[i]) continue # Case 2: Some inodes have been updated. Create a new inode map # block, write it out, and point the new checkpoint at it. inodes = [k for k in self.inodes if k >= start and k <= end] inodes.sort() block = LogItem() block.id = LogItem.random_id() block.inum = 0 block.type = ITEM_TYPE.INODE_MAP block.links = [] block.data = "" for j in inodes: block.data += struct.pack(" 0 and updated_inodes[-1] <= end: updated_inodes.pop() log.write(new_checkpoint, 2) self.checkpoint_record = new_checkpoint def rewrite_inode(backend, inode_map, inum, log): inode = inode_map.inodes[inum] blocks = [] for l in inode.links: data = load_item(backend, l[1]) blocks.append(data) log.write(data, 0) inode.links = [(b.id, b.location) for b in blocks] log.write(inode, 1) inode_map.mark_updated(inum) if __name__ == '__main__': backend = FileBackend(".") chkpt = load_checkpoint_record(backend) imap = InodeMap() imap.build(backend, chkpt) print chkpt print repr(chkpt.serialize()) log_dir = LogDirectory(backend, 1, 0) rewrite_inode(backend, imap, 147, log_dir) imap.write(backend, log_dir) log_dir.close_all()