Dump some statistics when the cleaner runs
[bluesky.git] / cleaner / cleaner
index c4b1222..93feef4 100755 (executable)
@@ -19,13 +19,28 @@ HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
 HEADER_MAGIC2 = 'AgI='          # Encrypted data
 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
 HEADER_MAGIC2 = 'AgI='          # Encrypted data
 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
+CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
+
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
-class FileBackend:
+class Backend:
+    """Base class for BlueSky storage backends."""
+
+    def loc_to_name(self, location):
+        return "log-%08d-%08d" % (location)
+
+    def name_to_loc(self, name):
+        m = re.match(r"^log-(\d+)-(\d+)$", name)
+        if m: return (int(m.group(1)), int(m.group(2)))
+
+    def dump_stats(self):
+        pass
+
+class FileBackend(Backend):
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
@@ -34,10 +49,11 @@ class FileBackend:
     def __init__(self, path):
         self.path = path
 
     def __init__(self, path):
         self.path = path
 
-    def list(self):
+    def list(self, directory=0):
         """Return a listing of all log segments and their sizes."""
 
         """Return a listing of all log segments and their sizes."""
 
-        files = [f for f in os.listdir(self.path) if f.startswith('log-')]
+        prefix = "log-%08d-" % (directory,)
+        files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
         files.sort()
 
         return [(f, os.stat(os.path.join(self.path, f)).st_size)
         files.sort()
 
         return [(f, os.stat(os.path.join(self.path, f)).st_size)
@@ -47,7 +63,7 @@ class FileBackend:
         fp = open(os.path.join(self.path, filename), 'rb')
         if offset > 0:
             fp.seek(offset)
         fp = open(os.path.join(self.path, filename), 'rb')
         if offset > 0:
             fp.seek(offset)
-        if legnth is None:
+        if length is None:
             return fp.read()
         else:
             return fp.read(length)
             return fp.read()
         else:
             return fp.read(length)
@@ -60,13 +76,6 @@ class FileBackend:
     def delete(self, filename):
         os.unlink(os.path.join(self.path, filename))
 
     def delete(self, filename):
         os.unlink(os.path.join(self.path, filename))
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
-
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
-
 def retry_wrap(method):
     def wrapped(self, *args, **kwargs):
         for retries in range(3):
 def retry_wrap(method):
     def wrapped(self, *args, **kwargs):
         for retries in range(3):
@@ -79,7 +88,7 @@ def retry_wrap(method):
         return method(self, *args, **kwargs)
     return wrapped
 
         return method(self, *args, **kwargs)
     return wrapped
 
-class S3Backend:
+class S3Backend(Backend):
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
@@ -88,14 +97,17 @@ class S3Backend:
         self.cachedir = cachedir
         self.cache = {}
         self.connect()
         self.cachedir = cachedir
         self.cache = {}
         self.connect()
+        self.stats_get = [0, 0]
+        self.stats_put = [0, 0]
 
     def connect(self):
         self.conn = boto.connect_s3(is_secure=False)
         self.bucket = self.conn.get_bucket(self.bucket_name)
 
 
     def connect(self):
         self.conn = boto.connect_s3(is_secure=False)
         self.bucket = self.conn.get_bucket(self.bucket_name)
 
-    def list(self):
+    def list(self, directory=0):
         files = []
         files = []
-        for k in self.bucket.list(self.path + 'log-'):
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
             files.append((k.key, k.size))
         return files
 
             files.append((k.key, k.size))
         return files
 
@@ -117,6 +129,8 @@ class S3Backend:
             fp.write(data)
             fp.close()
             self.cache[filename] = True
             fp.write(data)
             fp.close()
             self.cache[filename] = True
+            self.stats_get[0] += 1
+            self.stats_get[1] += len(data)
             if offset > 0:
                 data = data[offset:]
             if length is not None:
             if offset > 0:
                 data = data[offset:]
             if length is not None:
@@ -128,6 +142,8 @@ class S3Backend:
         k = Key(self.bucket)
         k.key = self.path + filename
         k.set_contents_from_string(data)
         k = Key(self.bucket)
         k.key = self.path + filename
         k.set_contents_from_string(data)
+        self.stats_put[0] += 1
+        self.stats_put[1] += len(data)
         if filename in self.cache:
             del self.cache[filename]
 
         if filename in self.cache:
             del self.cache[filename]
 
@@ -139,12 +155,68 @@ class S3Backend:
         if filename in self.cache:
             del self.cache[filename]
 
         if filename in self.cache:
             del self.cache[filename]
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
+    def dump_stats(self):
+        print "S3 statistics:"
+        print "GET: %d ops / %d bytes" % tuple(self.stats_get)
+        print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
 
 
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
+class SimpleBackend(Backend):
+    """An interface to the simple BlueSky test network server."""
+
+    def __init__(self, server=('localhost', 12345), cachedir="."):
+        self.bucket_name = bucket
+        self.server_address = server
+        self.cachedir = cachedir
+        self.cache = {}
+
+    def _get_socket(self):
+        return socket.create_connection(self.server_address).makefile()
+
+    def list(self, directory=0):
+        files = []
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
+            files.append((k.key, k.size))
+        return files
+
+    def read(self, filename, offset=0, length=None):
+        if filename in self.cache:
+            fp = open(os.path.join(self.cachedir, filename), 'rb')
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
+        else:
+            f = self._get_socket()
+            f.write("GET %s %d %d\n" % (filename, 0, 0))
+            f.flush()
+            datalen = int(f.readline())
+            if datalen < 0:
+                raise RuntimeError
+            data = f.read(datalen)
+            fp = open(os.path.join(self.cachedir, filename), 'wb')
+            fp.write(data)
+            fp.close()
+            self.cache[filename] = True
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
+            return data
+
+    def write(self, filename, data):
+        f = self._get_socket()
+        f.write("PUT %s %d %d\n" % (filename, len(data)))
+        f.write(data)
+        f.flush()
+        result = int(f.readline())
+        if filename in self.cache:
+            del self.cache[filename]
+
+    def delete(self, filename):
+        pass
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
@@ -208,7 +280,8 @@ class LogDirectory:
         self.backend = backend
         self.dir_num = dir
         self.seq_num = 0
         self.backend = backend
         self.dir_num = dir
         self.seq_num = 0
-        for logname in backend.list():
+        for logname in backend.list(dir):
+            print "Old log file:", logname
             loc = backend.name_to_loc(logname[0])
             if loc is not None and loc[0] == dir:
                 self.seq_num = max(self.seq_num, loc[1] + 1)
             loc = backend.name_to_loc(logname[0])
             if loc is not None and loc[0] == dir:
                 self.seq_num = max(self.seq_num, loc[1] + 1)
@@ -241,7 +314,7 @@ class UtilizationTracker:
 
     def __init__(self, backend):
         self.segments = {}
 
     def __init__(self, backend):
         self.segments = {}
-        for (segment, size) in backend.list():
+        for (segment, size) in backend.list(0) + backend.list(1):
             self.segments[segment] = [size, 0]
 
     def add_item(self, item):
             self.segments[segment] = [size, 0]
 
     def add_item(self, item):
@@ -327,8 +400,8 @@ def parse_log(data, location=None):
         if item is not None: yield item
         offset += size
 
         if item is not None: yield item
         offset += size
 
-def load_checkpoint_record(backend):
-    for (log, size) in reversed(backend.list()):
+def load_checkpoint_record(backend, directory=0):
+    for (log, size) in reversed(backend.list(directory)):
         for item in reversed(list(parse_log(backend.read(log), log))):
             print item
             if item.type == ITEM_TYPE.CHECKPOINT:
         for item in reversed(list(parse_log(backend.read(log), log))):
             print item
             if item.type == ITEM_TYPE.CHECKPOINT:
@@ -343,6 +416,7 @@ class InodeMap:
 
         This will also build up information about segment utilization."""
 
 
         This will also build up information about segment utilization."""
 
+        self.version_vector = {}
         self.checkpoint_record = checkpoint_record
 
         util = UtilizationTracker(backend)
         self.checkpoint_record = checkpoint_record
 
         util = UtilizationTracker(backend)
@@ -350,9 +424,25 @@ class InodeMap:
         inodes = {}
         self.obsolete_segments = set()
 
         inodes = {}
         self.obsolete_segments = set()
 
+        data = checkpoint_record.data
+        if not data.startswith(CHECKPOINT_MAGIC):
+            raise ValueError, "Invalid checkpoint record!"
+        data = data[len(CHECKPOINT_MAGIC):]
+        (vvlen,) = struct.unpack_from("<I", data, 0)
+        self.vvsize = 4 + 8*vvlen
+        for i in range(vvlen):
+            (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
+            self.version_vector[v1] = v2
+        print self.version_vector
+        self.version_vector[checkpoint_record.location[0]] \
+            = checkpoint_record.location[1]
+        print self.version_vector
+
+        data = data[self.vvsize:]
+
         print "Inode map:"
         print "Inode map:"
-        for i in range(len(checkpoint_record.data) // 16):
-            (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
             imap = load_item(backend, checkpoint_record.links[i][1])
             util.add_item(imap)
             print "[%d, %d]: %s" % (start, end, imap)
             imap = load_item(backend, checkpoint_record.links[i][1])
             util.add_item(imap)
             print "[%d, %d]: %s" % (start, end, imap)
@@ -369,16 +459,24 @@ class InodeMap:
 
         print
         print "Segment utilizations:"
 
         print
         print "Segment utilizations:"
+        total_data = [0, 0]
+        deletions = [0, 0]
         for (s, u) in sorted(util.segments.items()):
         for (s, u) in sorted(util.segments.items()):
+            for i in range(2): total_data[i] += u[i]
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             if u[1] == 0:
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             if u[1] == 0:
-                print "Deleting..."
-                backend.delete(s)
+                print "Would delete..."
+                #backend.delete(s)
+                deletions[0] += 1
+                deletions[1] += u[0]
 
         self.inodes = inodes
         self.util = util
         self.updated_inodes = set()
 
 
         self.inodes = inodes
         self.util = util
         self.updated_inodes = set()
 
+        print "%d bytes total / %d bytes used" % tuple(total_data)
+        print "would delete %d segments (%d bytes)" % tuple(deletions)
+
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
 
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
 
@@ -389,13 +487,18 @@ class InodeMap:
         new_checkpoint.id = LogItem.random_id()
         new_checkpoint.inum = 0
         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
         new_checkpoint.id = LogItem.random_id()
         new_checkpoint.inum = 0
         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
-        new_checkpoint.data = ""
+        new_checkpoint.data = CHECKPOINT_MAGIC
         new_checkpoint.links = []
 
         new_checkpoint.links = []
 
-        for i in range(len(self.checkpoint_record.data) // 16):
-            (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
+        new_checkpoint.data += struct.pack('<I', len(self.version_vector))
+        for d in sorted(self.version_vector):
+            new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
+
+        data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
 
 
-            new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
+            new_checkpoint.data += data[16*i : 16*i + 16]
 
             # Case 1: No inodes in this range of the old inode map have
             # changed.  Simply emit a new pointer to the same inode map block.
 
             # Case 1: No inodes in this range of the old inode map have
             # changed.  Simply emit a new pointer to the same inode map block.
@@ -445,7 +548,7 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
     # Determine which segments are poorly utilized and should be cleaned.  We
     # need better heuristics here.
     for (s, u) in sorted(inode_map.util.segments.items()):
     # Determine which segments are poorly utilized and should be cleaned.  We
     # need better heuristics here.
     for (s, u) in sorted(inode_map.util.segments.items()):
-        if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
+        if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
             print "Should clean segment", s
             loc = backend.name_to_loc(s)
             if s: inode_map.obsolete_segments.add(loc)
             print "Should clean segment", s
             loc = backend.name_to_loc(s)
             if s: inode_map.obsolete_segments.add(loc)
@@ -482,14 +585,20 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
-    backend = S3Backend("mvrable-bluesky", cachedir=".")
+    start_time = time.time()
+    backend = S3Backend("mvrable-bluesky-west", cachedir="/export/cache")
+    #backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
     print backend.list()
     imap = InodeMap()
     imap.build(backend, chkpt)
     print chkpt
 
     chkpt = load_checkpoint_record(backend)
     print backend.list()
     imap = InodeMap()
     imap.build(backend, chkpt)
     print chkpt
 
-    log_dir = LogDirectory(backend, 0)
+    log_dir = LogDirectory(backend, 1)
     run_cleaner(backend, imap, log_dir)
     run_cleaner(backend, imap, log_dir)
+    print "Version vector:", imap.version_vector
     imap.write(backend, log_dir)
     log_dir.close_all()
     imap.write(backend, log_dir)
     log_dir.close_all()
+    end_time = time.time()
+    print "Cleaner running time:", end_time - start_time
+    backend.dump_stats()