Have cleaner wait on proxy when needed
[bluesky.git] / cleaner / cleaner
index 009ebe8..f2dcfcb 100755 (executable)
@@ -8,23 +8,48 @@
 # Copyright (C) 2010  The Regents of the University of California
 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
 
-import base64, os, re, struct, sys
+import base64, os, re, struct, sys, time
 import boto
 from boto.s3.key import Key
 
 # The BlueSky 'struct cloudlog_header' data type.
 HEADER_FORMAT = '<4s48sb16sQIII'
 HEADER_CRYPTBYTES = 48
-HEADER_MAGIC = 'AgI-'
+HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
+HEADER_MAGIC2 = 'AgI='          # Encrypted data
 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
+CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
+
+# Log file to write benchmark data to
+benchlog = None
+def benchlog_write(msg, *args):
+    m = msg % args
+    print "LOG:", m
+    if benchlog is not None:
+        benchlog.write(msg % args)
+        benchlog.write("\n")
+
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
-class FileBackend:
+class Backend:
+    """Base class for BlueSky storage backends."""
+
+    def loc_to_name(self, location):
+        return "log-%08d-%08d" % (location)
+
+    def name_to_loc(self, name):
+        m = re.match(r"^log-(\d+)-(\d+)$", name)
+        if m: return (int(m.group(1)), int(m.group(2)))
+
+    def dump_stats(self):
+        pass
+
+class FileBackend(Backend):
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
@@ -33,10 +58,11 @@ class FileBackend:
     def __init__(self, path):
         self.path = path
 
-    def list(self):
+    def list(self, directory=0):
         """Return a listing of all log segments and their sizes."""
 
-        files = [f for f in os.listdir(self.path) if f.startswith('log-')]
+        prefix = "log-%08d-" % (directory,)
+        files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
         files.sort()
 
         return [(f, os.stat(os.path.join(self.path, f)).st_size)
@@ -46,7 +72,7 @@ class FileBackend:
         fp = open(os.path.join(self.path, filename), 'rb')
         if offset > 0:
             fp.seek(offset)
-        if legnth is None:
+        if length is None:
             return fp.read()
         else:
             return fp.read(length)
@@ -59,29 +85,46 @@ class FileBackend:
     def delete(self, filename):
         os.unlink(os.path.join(self.path, filename))
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
-
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
-
-class S3Backend:
+def retry_wrap(method):
+    def wrapped(self, *args, **kwargs):
+        for retries in range(3):
+            try:
+                return method(self, *args, **kwargs)
+            except:
+                print >>sys.stderr, "S3 operation failed, retrying..."
+                print >>sys.stderr, "  %s %s %s" % (method, args, kwargs)
+                self.connect()
+                time.sleep(1.0)
+        return method(self, *args, **kwargs)
+    return wrapped
+
+class S3Backend(Backend):
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
-        self.conn = boto.connect_s3(is_secure=False)
-        self.bucket = self.conn.get_bucket(bucket)
+        self.bucket_name = bucket
         self.path = path
         self.cachedir = cachedir
         self.cache = {}
+        for f in os.listdir(cachedir):
+            self.cache[f] = True
+        #print "Initial cache contents:", list(self.cache.keys())
+        self.connect()
+        self.stats_get = [0, 0]
+        self.stats_put = [0, 0]
+
+    def connect(self):
+        self.conn = boto.connect_s3(is_secure=False)
+        self.bucket = self.conn.get_bucket(self.bucket_name)
 
-    def list(self):
+    def list(self, directory=0):
         files = []
-        for k in self.bucket.list(self.path + 'log-'):
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
             files.append((k.key, k.size))
         return files
 
+    @retry_wrap
     def read(self, filename, offset=0, length=None):
         if filename in self.cache:
             fp = open(os.path.join(self.cachedir, filename), 'rb')
@@ -99,19 +142,25 @@ class S3Backend:
             fp.write(data)
             fp.close()
             self.cache[filename] = True
+            self.stats_get[0] += 1
+            self.stats_get[1] += len(data)
             if offset > 0:
                 data = data[offset:]
             if length is not None:
                 data = data[0:length]
             return data
 
+    @retry_wrap
     def write(self, filename, data):
         k = Key(self.bucket)
         k.key = self.path + filename
         k.set_contents_from_string(data)
+        self.stats_put[0] += 1
+        self.stats_put[1] += len(data)
         if filename in self.cache:
             del self.cache[filename]
 
+    @retry_wrap
     def delete(self, filename):
         k = Key(self.bucket)
         k.key = self.path + filename
@@ -119,21 +168,80 @@ class S3Backend:
         if filename in self.cache:
             del self.cache[filename]
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
+    def dump_stats(self):
+        print "S3 statistics:"
+        print "GET: %d ops / %d bytes" % tuple(self.stats_get)
+        print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
+        benchlog_write("s3_get: %d", self.stats_get[1])
+        benchlog_write("s3_put: %d", self.stats_put[1])
 
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
+class SimpleBackend(Backend):
+    """An interface to the simple BlueSky test network server."""
+
+    def __init__(self, server=('localhost', 12345), cachedir="."):
+        self.bucket_name = bucket
+        self.server_address = server
+        self.cachedir = cachedir
+        self.cache = {}
+
+    def _get_socket(self):
+        return socket.create_connection(self.server_address).makefile()
+
+    def list(self, directory=0):
+        files = []
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
+            files.append((k.key, k.size))
+        return files
+
+    def read(self, filename, offset=0, length=None):
+        if filename in self.cache:
+            fp = open(os.path.join(self.cachedir, filename), 'rb')
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
+        else:
+            f = self._get_socket()
+            f.write("GET %s %d %d\n" % (filename, 0, 0))
+            f.flush()
+            datalen = int(f.readline())
+            if datalen < 0:
+                raise RuntimeError
+            data = f.read(datalen)
+            fp = open(os.path.join(self.cachedir, filename), 'wb')
+            fp.write(data)
+            fp.close()
+            self.cache[filename] = True
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
+            return data
+
+    def write(self, filename, data):
+        f = self._get_socket()
+        f.write("PUT %s %d %d\n" % (filename, len(data)))
+        f.write(data)
+        f.flush()
+        result = int(f.readline())
+        if filename in self.cache:
+            del self.cache[filename]
+
+    def delete(self, filename):
+        pass
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
 
     def __init__(self):
         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
+        self.encrypted = False
 
     def __str__(self):
-        return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
+        return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
 
     @staticmethod
     def random_id():
@@ -149,8 +257,12 @@ class LogItem:
         link_ids = ''.join(link_ids)
         link_locs = ''.join(link_locs)
 
+        if self.encrypted:
+            magic = HEADER_MAGIC2
+        else:
+            magic = HEADER_MAGIC1
         header = struct.pack(HEADER_FORMAT,
-                             HEADER_MAGIC, self.cryptkeys,
+                             magic, self.cryptkeys,
                              ord(self.type), self.id, self.inum,
                              len(self.data), len(link_ids), len(link_locs))
         return header + self.data + link_ids + link_locs
@@ -183,7 +295,8 @@ class LogDirectory:
         self.backend = backend
         self.dir_num = dir
         self.seq_num = 0
-        for logname in backend.list():
+        for logname in backend.list(dir):
+            #print "Old log file:", logname
             loc = backend.name_to_loc(logname[0])
             if loc is not None and loc[0] == dir:
                 self.seq_num = max(self.seq_num, loc[1] + 1)
@@ -216,7 +329,7 @@ class UtilizationTracker:
 
     def __init__(self, backend):
         self.segments = {}
-        for (segment, size) in backend.list():
+        for (segment, size) in backend.list(0) + backend.list(1):
             self.segments[segment] = [size, 0]
 
     def add_item(self, item):
@@ -232,7 +345,7 @@ def parse_item(data):
     header = struct.unpack_from(HEADER_FORMAT, data, 0)
     size = HEADER_SIZE + sum(header[5:8])
 
-    if header[0] != HEADER_MAGIC:
+    if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
         print "Bad header magic!"
         return
 
@@ -241,6 +354,7 @@ def parse_item(data):
         return
 
     item = LogItem()
+    if header[0] == HEADER_MAGIC2: item.encrypted = True
     item.cryptkeys = header[1]
     item.id = header[3]
     item.inum = header[4]
@@ -289,7 +403,7 @@ def parse_log(data, location=None):
     while len(data) - offset >= HEADER_SIZE:
         header = struct.unpack_from(HEADER_FORMAT, data, offset)
         size = HEADER_SIZE + sum(header[5:8])
-        if header[0] != HEADER_MAGIC:
+        if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
             print "Bad header magic!"
             break
         if size + offset > len(data):
@@ -301,8 +415,8 @@ def parse_log(data, location=None):
         if item is not None: yield item
         offset += size
 
-def load_checkpoint_record(backend):
-    for (log, size) in reversed(backend.list()):
+def load_checkpoint_record(backend, directory=0):
+    for (log, size) in reversed(backend.list(directory)):
         for item in reversed(list(parse_log(backend.read(log), log))):
             print item
             if item.type == ITEM_TYPE.CHECKPOINT:
@@ -317,6 +431,7 @@ class InodeMap:
 
         This will also build up information about segment utilization."""
 
+        self.version_vector = {}
         self.checkpoint_record = checkpoint_record
 
         util = UtilizationTracker(backend)
@@ -324,12 +439,28 @@ class InodeMap:
         inodes = {}
         self.obsolete_segments = set()
 
-        print "Inode map:"
-        for i in range(len(checkpoint_record.data) // 16):
-            (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
+        data = checkpoint_record.data
+        if not data.startswith(CHECKPOINT_MAGIC):
+            raise ValueError, "Invalid checkpoint record!"
+        data = data[len(CHECKPOINT_MAGIC):]
+        (vvlen,) = struct.unpack_from("<I", data, 0)
+        self.vvsize = 4 + 8*vvlen
+        for i in range(vvlen):
+            (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
+            self.version_vector[v1] = v2
+        print self.version_vector
+        self.version_vector[checkpoint_record.location[0]] \
+            = checkpoint_record.location[1]
+        print self.version_vector
+
+        data = data[self.vvsize:]
+
+        #print "Inode map:"
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
             imap = load_item(backend, checkpoint_record.links[i][1])
             util.add_item(imap)
-            print "[%d, %d]: %s" % (start, end, imap)
+            #print "[%d, %d]: %s" % (start, end, imap)
             for j in range(len(imap.data) // 8):
                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
                 inode = load_item(backend, imap.links[j][1])
@@ -338,21 +469,40 @@ class InodeMap:
                 util.add_item(inode)
                 for i in inode.links:
                     util.add_item(i[1])
-                    data_segments.add(i[1][0:2])
-                print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
+                    if i[1] is not None:
+                        data_segments.add(i[1][0:2])
+                #print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
 
-        print
+        #print
         print "Segment utilizations:"
+        total_data = [0, 0]
+        deletions = [0, 0]
         for (s, u) in sorted(util.segments.items()):
+            for i in range(2): total_data[i] += u[i]
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             if u[1] == 0:
-                print "Deleting..."
-                backend.delete(s)
+                print "Would delete..."
+                (d, n) = backend.name_to_loc(s)
+                try:
+                    if n < self.version_vector[d]:
+                        backend.delete(s)
+                        deletions[0] += 1
+                        deletions[1] += u[0]
+                    else:
+                        print "Not deleting log file newer than checkpoint!"
+                except:
+                    print "Error determining age of log segment, keeping"
 
         self.inodes = inodes
         self.util = util
         self.updated_inodes = set()
 
+        print "%d bytes total / %d bytes used" % tuple(total_data)
+        print "would delete %d segments (%d bytes)" % tuple(deletions)
+        benchlog_write("bytes_used: %d", total_data[1])
+        benchlog_write("bytes_wasted: %d", total_data[0] - total_data[1])
+        benchlog_write("bytes_freed: %d", deletions[1])
+
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
 
@@ -363,13 +513,18 @@ class InodeMap:
         new_checkpoint.id = LogItem.random_id()
         new_checkpoint.inum = 0
         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
-        new_checkpoint.data = ""
+        new_checkpoint.data = CHECKPOINT_MAGIC
         new_checkpoint.links = []
 
-        for i in range(len(self.checkpoint_record.data) // 16):
-            (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
+        new_checkpoint.data += struct.pack('<I', len(self.version_vector))
+        for d in sorted(self.version_vector):
+            new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
+
+        data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
 
-            new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
+            new_checkpoint.data += data[16*i : 16*i + 16]
 
             # Case 1: No inodes in this range of the old inode map have
             # changed.  Simply emit a new pointer to the same inode map block.
@@ -406,12 +561,15 @@ class InodeMap:
 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
     inode = inode_map.inodes[inum]
     if copy_data:
-        blocks = []
+        newlinks = []
         for l in inode.links:
-            data = load_item(backend, l[1])
-            blocks.append(data)
-            log.write(data, 0)
-        inode.links = [(b.id, b.location) for b in blocks]
+            if l[1] is not None:
+                data = load_item(backend, l[1])
+                log.write(data, 0)
+                newlinks.append((data.id, data.location))
+            else:
+                newlinks.append(l)
+        inode.links = newlinks
     log.write(inode, 1)
     inode_map.mark_updated(inum)
 
@@ -419,7 +577,7 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
     # Determine which segments are poorly utilized and should be cleaned.  We
     # need better heuristics here.
     for (s, u) in sorted(inode_map.util.segments.items()):
-        if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
+        if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
             print "Should clean segment", s
             loc = backend.name_to_loc(s)
             if s: inode_map.obsolete_segments.add(loc)
@@ -437,7 +595,7 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
     dirty_inode_data = set()
     for s in inode_map.obsolete_segments:
         filename = backend.loc_to_name(s)
-        print "Scanning", filename, "for live data"
+        #print "Scanning", filename, "for live data"
         for item in parse_log(backend.read(filename), filename):
             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
                 if item.inum != 0:
@@ -446,24 +604,40 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
                         dirty_inodes.add(item.inum)
                     if item.inum not in dirty_inode_data:
                         for b in inode.links:
-                            if s == b[1][0:2]:
+                            if b[1] is not None and s == b[1][0:2]:
                                 dirty_inode_data.add(item.inum)
                                 break
 
-    print "Inodes to rewrite:", dirty_inodes
-    print "Inodes with data to rewrite:", dirty_inode_data
+    #print "Inodes to rewrite:", dirty_inodes
+    #print "Inodes with data to rewrite:", dirty_inode_data
     for i in sorted(dirty_inodes.union(dirty_inode_data)):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
-    backend = S3Backend("mvrable-bluesky", cachedir=".")
+    benchlog = open('cleaner.log', 'a')
+    benchlog_write("*** START CLEANER RUN ***")
+    start_time = time.time()
+    backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
+    #backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
-    print backend.list()
+    #print backend.list()
+    log_dir = LogDirectory(backend, 1)
     imap = InodeMap()
     imap.build(backend, chkpt)
     print chkpt
 
-    log_dir = LogDirectory(backend, 0)
+    print "Version vector:", imap.version_vector
+    print "Last cleaner log file:", log_dir.seq_num - 1
+    if imap.version_vector.get(1, -1) != log_dir.seq_num - 1:
+        print "Proxy hasn't updated to latest cleaner segment yet!"
+        benchlog_write("waiting for proxy...")
+        sys.exit(0)
+
     run_cleaner(backend, imap, log_dir)
+    print "Version vector:", imap.version_vector
     imap.write(backend, log_dir)
     log_dir.close_all()
+    end_time = time.time()
+    backend.dump_stats()
+    benchlog_write("running_time: %s", end_time - start_time)
+    benchlog_write("")