From: Michael Vrable Date: Wed, 8 Sep 2010 20:24:42 +0000 (-0700) Subject: Begin work on a segment cleaner prototype. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=6955b27db8185d222adb07e57d207f7f421037e6;p=bluesky.git Begin work on a segment cleaner prototype. Right now this can rebuild an inode map and compute segment utilization, though it isn't very efficient. --- diff --git a/cleaner/cleaner b/cleaner/cleaner new file mode 100755 index 0000000..7ebcc2b --- /dev/null +++ b/cleaner/cleaner @@ -0,0 +1,180 @@ +#!/usr/bin/env python +# +# A simple cleaner for the BlueSky cloud file system. At the moment this is an +# offline cleaner--the file system must not be in use at the time that the +# cleaning is performed. Later, it will be extended to be an online/concurrent +# cleaner, where cleaning is performed even while the file system is mounted. +# +# Copyright (C) 2010 The Regents of the University of California +# Written by Michael Vrable + +import base64, os, struct, sys +import boto + +# The BlueSky 'struct cloudlog_header' data type. +HEADER_FORMAT = '<4sb16sQIII' +HEADER_MAGIC = 'AgI-' +HEADER_SIZE = struct.calcsize(HEADER_FORMAT) + +class ITEM_TYPE: + DATA = '1' + INODE = '2' + INODE_MAP = '3' + CHECKPOINT = '4' + +class FileBackend: + """An interface to BlueSky where the log segments are on local disk. + + This is mainly intended for testing purposes, as the real cleaner would + operate where data is being stored in S3.""" + + def __init__(self, path): + self.path = path + + def list(self): + """Return a listing of all log segments and their sizes.""" + + files = [f for f in os.listdir(self.path) if f.startswith('log-')] + files.sort() + + return [(f, os.stat(os.path.join(self.path, f)).st_size) + for f in files] + + def read(self, filename): + fp = open(os.path.join(self.path, filename), 'rb') + return fp.read() + +class LogItem: + """In-memory representation of a single item stored in a log file.""" + + def __str__(self): + return "" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8]) + +class UtilizationTracker: + """A simple object that tracks what fraction of each segment is used. + + This data can be used to guide segment cleaning decisions.""" + + def __init__(self, backend): + self.segments = {} + for (segment, size) in backend.list(): + self.segments[segment] = [size, 0] + + def add_item(self, item): + if isinstance(item, LogItem): + item = item.location + if item is None: return + (dir, seq, offset, size) = item + filename = "log-%08d-%08d" % (dir, seq) + self.segments[filename][1] += size + +def parse_item(data): + if len(data) < HEADER_SIZE: return + header = struct.unpack_from(HEADER_FORMAT, data, 0) + size = HEADER_SIZE + sum(header[4:7]) + + if header[0] != HEADER_MAGIC: + print "Bad header magic!" + return + + if len(data) != size: + print "Item size does not match!" + return + + item = LogItem() + item.id = header[2] + item.location = None + item.type = chr(header[1]) + item.size = size + item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]] + links = [] + link_ids = data[HEADER_SIZE + header[4] + : HEADER_SIZE + header[4] + header[5]] + link_locs = data[HEADER_SIZE + header[4] + header[5] + : HEADER_SIZE + sum(header[4:7])] + for i in range(len(link_ids) // 16): + id = link_ids[16*i : 16*i + 16] + if id == '\0' * 16: + loc = None + else: + loc = struct.unpack('= HEADER_SIZE: + header = struct.unpack_from(HEADER_FORMAT, data, offset) + size = HEADER_SIZE + sum(header[4:7]) + if header[0] != HEADER_MAGIC: + print "Bad header magic!" + break + if size + offset > len(data): + print "Short data record at end of log: %s < %s" % (len(data) - offset, size) + break + item = parse_item(data[offset : offset + size]) + if location is not None: + item.location = location + (offset, size) + if item is not None: yield item + offset += size + +def load_checkpoint_record(backend): + for (log, size) in reversed(backend.list()): + for item in reversed(list(parse_log(backend.read(log)))): + if item.type == ITEM_TYPE.CHECKPOINT: + return item + +def build_inode_map(backend, checkpoint_record): + """Reconstruct the inode map, starting from the checkpoint record given. + + This will also build up information about segment utilization.""" + + util = UtilizationTracker(backend) + util.add_item(checkpoint_record) + + print "Inode map:" + for i in range(len(checkpoint_record.data) // 16): + (start, end) = struct.unpack_from("