#!/usr/bin/env python # # A simple cleaner for the BlueSky cloud file system. At the moment this is an # offline cleaner--the file system must not be in use at the time that the # cleaning is performed. Later, it will be extended to be an online/concurrent # cleaner, where cleaning is performed even while the file system is mounted. # # Copyright (C) 2010 The Regents of the University of California # Written by Michael Vrable import base64, os, struct, sys import boto # The BlueSky 'struct cloudlog_header' data type. HEADER_FORMAT = '<4sb16sQIII' HEADER_MAGIC = 'AgI-' HEADER_SIZE = struct.calcsize(HEADER_FORMAT) class ITEM_TYPE: DATA = '1' INODE = '2' INODE_MAP = '3' CHECKPOINT = '4' class FileBackend: """An interface to BlueSky where the log segments are on local disk. This is mainly intended for testing purposes, as the real cleaner would operate where data is being stored in S3.""" def __init__(self, path): self.path = path def list(self): """Return a listing of all log segments and their sizes.""" files = [f for f in os.listdir(self.path) if f.startswith('log-')] files.sort() return [(f, os.stat(os.path.join(self.path, f)).st_size) for f in files] def read(self, filename): fp = open(os.path.join(self.path, filename), 'rb') return fp.read() class LogItem: """In-memory representation of a single item stored in a log file.""" def __str__(self): return "" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8]) class UtilizationTracker: """A simple object that tracks what fraction of each segment is used. This data can be used to guide segment cleaning decisions.""" def __init__(self, backend): self.segments = {} for (segment, size) in backend.list(): self.segments[segment] = [size, 0] def add_item(self, item): if isinstance(item, LogItem): item = item.location if item is None: return (dir, seq, offset, size) = item filename = "log-%08d-%08d" % (dir, seq) self.segments[filename][1] += size def parse_item(data): if len(data) < HEADER_SIZE: return header = struct.unpack_from(HEADER_FORMAT, data, 0) size = HEADER_SIZE + sum(header[4:7]) if header[0] != HEADER_MAGIC: print "Bad header magic!" return if len(data) != size: print "Item size does not match!" return item = LogItem() item.id = header[2] item.location = None item.type = chr(header[1]) item.size = size item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]] links = [] link_ids = data[HEADER_SIZE + header[4] : HEADER_SIZE + header[4] + header[5]] link_locs = data[HEADER_SIZE + header[4] + header[5] : HEADER_SIZE + sum(header[4:7])] for i in range(len(link_ids) // 16): id = link_ids[16*i : 16*i + 16] if id == '\0' * 16: loc = None else: loc = struct.unpack('= HEADER_SIZE: header = struct.unpack_from(HEADER_FORMAT, data, offset) size = HEADER_SIZE + sum(header[4:7]) if header[0] != HEADER_MAGIC: print "Bad header magic!" break if size + offset > len(data): print "Short data record at end of log: %s < %s" % (len(data) - offset, size) break item = parse_item(data[offset : offset + size]) if location is not None: item.location = location + (offset, size) if item is not None: yield item offset += size def load_checkpoint_record(backend): for (log, size) in reversed(backend.list()): for item in reversed(list(parse_log(backend.read(log)))): if item.type == ITEM_TYPE.CHECKPOINT: return item def build_inode_map(backend, checkpoint_record): """Reconstruct the inode map, starting from the checkpoint record given. This will also build up information about segment utilization.""" util = UtilizationTracker(backend) util.add_item(checkpoint_record) print "Inode map:" for i in range(len(checkpoint_record.data) // 16): (start, end) = struct.unpack_from("