Dump some statistics when the cleaner runs
[bluesky.git] / cleaner / cleaner
index b148b95..93feef4 100755 (executable)
@@ -27,7 +27,20 @@ class ITEM_TYPE:
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
-class FileBackend:
+class Backend:
+    """Base class for BlueSky storage backends."""
+
+    def loc_to_name(self, location):
+        return "log-%08d-%08d" % (location)
+
+    def name_to_loc(self, name):
+        m = re.match(r"^log-(\d+)-(\d+)$", name)
+        if m: return (int(m.group(1)), int(m.group(2)))
+
+    def dump_stats(self):
+        pass
+
+class FileBackend(Backend):
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
@@ -63,13 +76,6 @@ class FileBackend:
     def delete(self, filename):
         os.unlink(os.path.join(self.path, filename))
 
     def delete(self, filename):
         os.unlink(os.path.join(self.path, filename))
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
-
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
-
 def retry_wrap(method):
     def wrapped(self, *args, **kwargs):
         for retries in range(3):
 def retry_wrap(method):
     def wrapped(self, *args, **kwargs):
         for retries in range(3):
@@ -82,7 +88,7 @@ def retry_wrap(method):
         return method(self, *args, **kwargs)
     return wrapped
 
         return method(self, *args, **kwargs)
     return wrapped
 
-class S3Backend:
+class S3Backend(Backend):
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
@@ -91,6 +97,8 @@ class S3Backend:
         self.cachedir = cachedir
         self.cache = {}
         self.connect()
         self.cachedir = cachedir
         self.cache = {}
         self.connect()
+        self.stats_get = [0, 0]
+        self.stats_put = [0, 0]
 
     def connect(self):
         self.conn = boto.connect_s3(is_secure=False)
 
     def connect(self):
         self.conn = boto.connect_s3(is_secure=False)
@@ -121,6 +129,8 @@ class S3Backend:
             fp.write(data)
             fp.close()
             self.cache[filename] = True
             fp.write(data)
             fp.close()
             self.cache[filename] = True
+            self.stats_get[0] += 1
+            self.stats_get[1] += len(data)
             if offset > 0:
                 data = data[offset:]
             if length is not None:
             if offset > 0:
                 data = data[offset:]
             if length is not None:
@@ -132,6 +142,8 @@ class S3Backend:
         k = Key(self.bucket)
         k.key = self.path + filename
         k.set_contents_from_string(data)
         k = Key(self.bucket)
         k.key = self.path + filename
         k.set_contents_from_string(data)
+        self.stats_put[0] += 1
+        self.stats_put[1] += len(data)
         if filename in self.cache:
             del self.cache[filename]
 
         if filename in self.cache:
             del self.cache[filename]
 
@@ -143,12 +155,68 @@ class S3Backend:
         if filename in self.cache:
             del self.cache[filename]
 
         if filename in self.cache:
             del self.cache[filename]
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
+    def dump_stats(self):
+        print "S3 statistics:"
+        print "GET: %d ops / %d bytes" % tuple(self.stats_get)
+        print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
 
 
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
+class SimpleBackend(Backend):
+    """An interface to the simple BlueSky test network server."""
+
+    def __init__(self, server=('localhost', 12345), cachedir="."):
+        self.bucket_name = bucket
+        self.server_address = server
+        self.cachedir = cachedir
+        self.cache = {}
+
+    def _get_socket(self):
+        return socket.create_connection(self.server_address).makefile()
+
+    def list(self, directory=0):
+        files = []
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
+            files.append((k.key, k.size))
+        return files
+
+    def read(self, filename, offset=0, length=None):
+        if filename in self.cache:
+            fp = open(os.path.join(self.cachedir, filename), 'rb')
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
+        else:
+            f = self._get_socket()
+            f.write("GET %s %d %d\n" % (filename, 0, 0))
+            f.flush()
+            datalen = int(f.readline())
+            if datalen < 0:
+                raise RuntimeError
+            data = f.read(datalen)
+            fp = open(os.path.join(self.cachedir, filename), 'wb')
+            fp.write(data)
+            fp.close()
+            self.cache[filename] = True
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
+            return data
+
+    def write(self, filename, data):
+        f = self._get_socket()
+        f.write("PUT %s %d %d\n" % (filename, len(data)))
+        f.write(data)
+        f.flush()
+        result = int(f.readline())
+        if filename in self.cache:
+            del self.cache[filename]
+
+    def delete(self, filename):
+        pass
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
@@ -391,17 +459,24 @@ class InodeMap:
 
         print
         print "Segment utilizations:"
 
         print
         print "Segment utilizations:"
+        total_data = [0, 0]
+        deletions = [0, 0]
         for (s, u) in sorted(util.segments.items()):
         for (s, u) in sorted(util.segments.items()):
+            for i in range(2): total_data[i] += u[i]
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             if u[1] == 0:
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             if u[1] == 0:
-                # print "Deleting..."
-                # backend.delete(s)
-                pass
+                print "Would delete..."
+                #backend.delete(s)
+                deletions[0] += 1
+                deletions[1] += u[0]
 
         self.inodes = inodes
         self.util = util
         self.updated_inodes = set()
 
 
         self.inodes = inodes
         self.util = util
         self.updated_inodes = set()
 
+        print "%d bytes total / %d bytes used" % tuple(total_data)
+        print "would delete %d segments (%d bytes)" % tuple(deletions)
+
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
 
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
 
@@ -419,7 +494,7 @@ class InodeMap:
         for d in sorted(self.version_vector):
             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
 
         for d in sorted(self.version_vector):
             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
 
-        data = self.checkpoint_record.data[self.vvsize:]
+        data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
         for i in range(len(data) // 16):
             (start, end) = struct.unpack_from("<QQ", data, 16*i)
 
         for i in range(len(data) // 16):
             (start, end) = struct.unpack_from("<QQ", data, 16*i)
 
@@ -473,7 +548,7 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
     # Determine which segments are poorly utilized and should be cleaned.  We
     # need better heuristics here.
     for (s, u) in sorted(inode_map.util.segments.items()):
     # Determine which segments are poorly utilized and should be cleaned.  We
     # need better heuristics here.
     for (s, u) in sorted(inode_map.util.segments.items()):
-        if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
+        if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
             print "Should clean segment", s
             loc = backend.name_to_loc(s)
             if s: inode_map.obsolete_segments.add(loc)
             print "Should clean segment", s
             loc = backend.name_to_loc(s)
             if s: inode_map.obsolete_segments.add(loc)
@@ -510,8 +585,9 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
-    #backend = S3Backend("mvrable-bluesky", cachedir=".")
-    backend = FileBackend(".")
+    start_time = time.time()
+    backend = S3Backend("mvrable-bluesky-west", cachedir="/export/cache")
+    #backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
     print backend.list()
     imap = InodeMap()
     chkpt = load_checkpoint_record(backend)
     print backend.list()
     imap = InodeMap()
@@ -523,3 +599,6 @@ if __name__ == '__main__':
     print "Version vector:", imap.version_vector
     imap.write(backend, log_dir)
     log_dir.close_all()
     print "Version vector:", imap.version_vector
     imap.write(backend, log_dir)
     log_dir.close_all()
+    end_time = time.time()
+    print "Cleaner running time:", end_time - start_time
+    backend.dump_stats()