X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=cleaner%2Fcleaner;h=c4b1222cbea619460f733d2dc78642345bb63237;hb=bf1b396c85d03f9c16eee6cd84c71c82503c3ec4;hp=1e564e83c45e5c79c6f009716901589af475bfb2;hpb=524d3c03d2f3076a349768c177a0e523a7fb333e;p=bluesky.git diff --git a/cleaner/cleaner b/cleaner/cleaner index 1e564e8..c4b1222 100755 --- a/cleaner/cleaner +++ b/cleaner/cleaner @@ -8,14 +8,15 @@ # Copyright (C) 2010 The Regents of the University of California # Written by Michael Vrable -import base64, os, re, struct, sys +import base64, os, re, struct, sys, time import boto from boto.s3.key import Key # The BlueSky 'struct cloudlog_header' data type. HEADER_FORMAT = '<4s48sb16sQIII' HEADER_CRYPTBYTES = 48 -HEADER_MAGIC = 'AgI-' +HEADER_MAGIC1 = 'AgI-' # Unencrypted data +HEADER_MAGIC2 = 'AgI=' # Encrypted data HEADER_SIZE = struct.calcsize(HEADER_FORMAT) class ITEM_TYPE: @@ -42,9 +43,14 @@ class FileBackend: return [(f, os.stat(os.path.join(self.path, f)).st_size) for f in files] - def read(self, filename): + def read(self, filename, offset=0, length=None): fp = open(os.path.join(self.path, filename), 'rb') - return fp.read() + if offset > 0: + fp.seek(offset) + if legnth is None: + return fp.read() + else: + return fp.read(length) def write(self, filename, data): fp = open(os.path.join(self.path, filename), 'wb') @@ -61,15 +67,31 @@ class FileBackend: m = re.match(r"^log-(\d+)-(\d+)$", name) if m: return (int(m.group(1)), int(m.group(2))) +def retry_wrap(method): + def wrapped(self, *args, **kwargs): + for retries in range(3): + try: + return method(self, *args, **kwargs) + except: + print >>sys.stderr, "S3 operation failed, retrying..." + self.connect() + time.sleep(1.0) + return method(self, *args, **kwargs) + return wrapped + class S3Backend: """An interface to BlueSky where the log segments are on in Amazon S3.""" def __init__(self, bucket, path='', cachedir="."): - self.conn = boto.connect_s3(is_secure=False) - self.bucket = self.conn.get_bucket(bucket) + self.bucket_name = bucket self.path = path self.cachedir = cachedir self.cache = {} + self.connect() + + def connect(self): + self.conn = boto.connect_s3(is_secure=False) + self.bucket = self.conn.get_bucket(self.bucket_name) def list(self): files = [] @@ -77,10 +99,16 @@ class S3Backend: files.append((k.key, k.size)) return files - def read(self, filename): + @retry_wrap + def read(self, filename, offset=0, length=None): if filename in self.cache: fp = open(os.path.join(self.cachedir, filename), 'rb') - return fp.read() + if offset > 0: + fp.seek(offset) + if length is None: + return fp.read() + else: + return fp.read(length) else: k = Key(self.bucket) k.key = self.path + filename @@ -89,8 +117,13 @@ class S3Backend: fp.write(data) fp.close() self.cache[filename] = True + if offset > 0: + data = data[offset:] + if length is not None: + data = data[0:length] return data + @retry_wrap def write(self, filename, data): k = Key(self.bucket) k.key = self.path + filename @@ -98,6 +131,7 @@ class S3Backend: if filename in self.cache: del self.cache[filename] + @retry_wrap def delete(self, filename): k = Key(self.bucket) k.key = self.path + filename @@ -117,9 +151,10 @@ class LogItem: def __init__(self): self.cryptkeys = '\0' * HEADER_CRYPTBYTES + self.encrypted = False def __str__(self): - return "" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8]) + return "" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8]) @staticmethod def random_id(): @@ -135,8 +170,12 @@ class LogItem: link_ids = ''.join(link_ids) link_locs = ''.join(link_locs) + if self.encrypted: + magic = HEADER_MAGIC2 + else: + magic = HEADER_MAGIC1 header = struct.pack(HEADER_FORMAT, - HEADER_MAGIC, self.cryptkeys, + magic, self.cryptkeys, ord(self.type), self.id, self.inum, len(self.data), len(link_ids), len(link_locs)) return header + self.data + link_ids + link_locs @@ -218,7 +257,7 @@ def parse_item(data): header = struct.unpack_from(HEADER_FORMAT, data, 0) size = HEADER_SIZE + sum(header[5:8]) - if header[0] != HEADER_MAGIC: + if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2): print "Bad header magic!" return @@ -227,6 +266,7 @@ def parse_item(data): return item = LogItem() + if header[0] == HEADER_MAGIC2: item.encrypted = True item.cryptkeys = header[1] item.id = header[3] item.inum = header[4] @@ -256,7 +296,7 @@ def load_item(backend, location): The elements of the tuple are (directory, sequence, offset, size).""" filename = backend.loc_to_name((location[0], location[1])) - data = backend.read(filename)[location[2] : location[2] + location[3]] + data = backend.read(filename, location[2], location[3]) item = parse_item(data) item.location = location return item @@ -275,7 +315,7 @@ def parse_log(data, location=None): while len(data) - offset >= HEADER_SIZE: header = struct.unpack_from(HEADER_FORMAT, data, offset) size = HEADER_SIZE + sum(header[5:8]) - if header[0] != HEADER_MAGIC: + if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2): print "Bad header magic!" break if size + offset > len(data): @@ -401,7 +441,7 @@ def rewrite_inode(backend, inode_map, inum, log, copy_data=True): log.write(inode, 1) inode_map.mark_updated(inum) -def run_cleaner(backend, inode_map, log): +def run_cleaner(backend, inode_map, log, repack_inodes=False): # Determine which segments are poorly utilized and should be cleaned. We # need better heuristics here. for (s, u) in sorted(inode_map.util.segments.items()): @@ -416,7 +456,10 @@ def run_cleaner(backend, inode_map, log): # Given that list of segments to clean, scan through those segments to find # data which is still live and mark relevant inodes as needing to be # rewritten. - dirty_inodes = set() + if repack_inodes: + dirty_inodes = set(inode_map.inodes) + else: + dirty_inodes = set() dirty_inode_data = set() for s in inode_map.obsolete_segments: filename = backend.loc_to_name(s)