Try out a fix for timeouts with limited bandwidth to cloud.
[bluesky.git] / cleaner / cleaner
index c7b3d84..f2dcfcb 100755 (executable)
@@ -21,13 +21,35 @@ HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
 
 
 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
 
+# Log file to write benchmark data to
+benchlog = None
+def benchlog_write(msg, *args):
+    m = msg % args
+    print "LOG:", m
+    if benchlog is not None:
+        benchlog.write(msg % args)
+        benchlog.write("\n")
+
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
-class FileBackend:
+class Backend:
+    """Base class for BlueSky storage backends."""
+
+    def loc_to_name(self, location):
+        return "log-%08d-%08d" % (location)
+
+    def name_to_loc(self, name):
+        m = re.match(r"^log-(\d+)-(\d+)$", name)
+        if m: return (int(m.group(1)), int(m.group(2)))
+
+    def dump_stats(self):
+        pass
+
+class FileBackend(Backend):
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
@@ -63,13 +85,6 @@ class FileBackend:
     def delete(self, filename):
         os.unlink(os.path.join(self.path, filename))
 
     def delete(self, filename):
         os.unlink(os.path.join(self.path, filename))
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
-
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
-
 def retry_wrap(method):
     def wrapped(self, *args, **kwargs):
         for retries in range(3):
 def retry_wrap(method):
     def wrapped(self, *args, **kwargs):
         for retries in range(3):
@@ -77,12 +92,13 @@ def retry_wrap(method):
                 return method(self, *args, **kwargs)
             except:
                 print >>sys.stderr, "S3 operation failed, retrying..."
                 return method(self, *args, **kwargs)
             except:
                 print >>sys.stderr, "S3 operation failed, retrying..."
+                print >>sys.stderr, "  %s %s %s" % (method, args, kwargs)
                 self.connect()
                 time.sleep(1.0)
         return method(self, *args, **kwargs)
     return wrapped
 
                 self.connect()
                 time.sleep(1.0)
         return method(self, *args, **kwargs)
     return wrapped
 
-class S3Backend:
+class S3Backend(Backend):
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
@@ -90,7 +106,12 @@ class S3Backend:
         self.path = path
         self.cachedir = cachedir
         self.cache = {}
         self.path = path
         self.cachedir = cachedir
         self.cache = {}
+        for f in os.listdir(cachedir):
+            self.cache[f] = True
+        #print "Initial cache contents:", list(self.cache.keys())
         self.connect()
         self.connect()
+        self.stats_get = [0, 0]
+        self.stats_put = [0, 0]
 
     def connect(self):
         self.conn = boto.connect_s3(is_secure=False)
 
     def connect(self):
         self.conn = boto.connect_s3(is_secure=False)
@@ -121,6 +142,8 @@ class S3Backend:
             fp.write(data)
             fp.close()
             self.cache[filename] = True
             fp.write(data)
             fp.close()
             self.cache[filename] = True
+            self.stats_get[0] += 1
+            self.stats_get[1] += len(data)
             if offset > 0:
                 data = data[offset:]
             if length is not None:
             if offset > 0:
                 data = data[offset:]
             if length is not None:
@@ -132,6 +155,8 @@ class S3Backend:
         k = Key(self.bucket)
         k.key = self.path + filename
         k.set_contents_from_string(data)
         k = Key(self.bucket)
         k.key = self.path + filename
         k.set_contents_from_string(data)
+        self.stats_put[0] += 1
+        self.stats_put[1] += len(data)
         if filename in self.cache:
             del self.cache[filename]
 
         if filename in self.cache:
             del self.cache[filename]
 
@@ -143,12 +168,70 @@ class S3Backend:
         if filename in self.cache:
             del self.cache[filename]
 
         if filename in self.cache:
             del self.cache[filename]
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
+    def dump_stats(self):
+        print "S3 statistics:"
+        print "GET: %d ops / %d bytes" % tuple(self.stats_get)
+        print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
+        benchlog_write("s3_get: %d", self.stats_get[1])
+        benchlog_write("s3_put: %d", self.stats_put[1])
 
 
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
+class SimpleBackend(Backend):
+    """An interface to the simple BlueSky test network server."""
+
+    def __init__(self, server=('localhost', 12345), cachedir="."):
+        self.bucket_name = bucket
+        self.server_address = server
+        self.cachedir = cachedir
+        self.cache = {}
+
+    def _get_socket(self):
+        return socket.create_connection(self.server_address).makefile()
+
+    def list(self, directory=0):
+        files = []
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
+            files.append((k.key, k.size))
+        return files
+
+    def read(self, filename, offset=0, length=None):
+        if filename in self.cache:
+            fp = open(os.path.join(self.cachedir, filename), 'rb')
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
+        else:
+            f = self._get_socket()
+            f.write("GET %s %d %d\n" % (filename, 0, 0))
+            f.flush()
+            datalen = int(f.readline())
+            if datalen < 0:
+                raise RuntimeError
+            data = f.read(datalen)
+            fp = open(os.path.join(self.cachedir, filename), 'wb')
+            fp.write(data)
+            fp.close()
+            self.cache[filename] = True
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
+            return data
+
+    def write(self, filename, data):
+        f = self._get_socket()
+        f.write("PUT %s %d %d\n" % (filename, len(data)))
+        f.write(data)
+        f.flush()
+        result = int(f.readline())
+        if filename in self.cache:
+            del self.cache[filename]
+
+    def delete(self, filename):
+        pass
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
@@ -213,7 +296,7 @@ class LogDirectory:
         self.dir_num = dir
         self.seq_num = 0
         for logname in backend.list(dir):
         self.dir_num = dir
         self.seq_num = 0
         for logname in backend.list(dir):
-            print "Old log file:", logname
+            #print "Old log file:", logname
             loc = backend.name_to_loc(logname[0])
             if loc is not None and loc[0] == dir:
                 self.seq_num = max(self.seq_num, loc[1] + 1)
             loc = backend.name_to_loc(logname[0])
             if loc is not None and loc[0] == dir:
                 self.seq_num = max(self.seq_num, loc[1] + 1)
@@ -372,12 +455,12 @@ class InodeMap:
 
         data = data[self.vvsize:]
 
 
         data = data[self.vvsize:]
 
-        print "Inode map:"
+        #print "Inode map:"
         for i in range(len(data) // 16):
             (start, end) = struct.unpack_from("<QQ", data, 16*i)
             imap = load_item(backend, checkpoint_record.links[i][1])
             util.add_item(imap)
         for i in range(len(data) // 16):
             (start, end) = struct.unpack_from("<QQ", data, 16*i)
             imap = load_item(backend, checkpoint_record.links[i][1])
             util.add_item(imap)
-            print "[%d, %d]: %s" % (start, end, imap)
+            #print "[%d, %d]: %s" % (start, end, imap)
             for j in range(len(imap.data) // 8):
                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
                 inode = load_item(backend, imap.links[j][1])
             for j in range(len(imap.data) // 8):
                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
                 inode = load_item(backend, imap.links[j][1])
@@ -386,22 +469,40 @@ class InodeMap:
                 util.add_item(inode)
                 for i in inode.links:
                     util.add_item(i[1])
                 util.add_item(inode)
                 for i in inode.links:
                     util.add_item(i[1])
-                    data_segments.add(i[1][0:2])
-                print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
+                    if i[1] is not None:
+                        data_segments.add(i[1][0:2])
+                #print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
 
 
-        print
+        #print
         print "Segment utilizations:"
         print "Segment utilizations:"
+        total_data = [0, 0]
+        deletions = [0, 0]
         for (s, u) in sorted(util.segments.items()):
         for (s, u) in sorted(util.segments.items()):
+            for i in range(2): total_data[i] += u[i]
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             if u[1] == 0:
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             if u[1] == 0:
-                # print "Deleting..."
-                # backend.delete(s)
-                pass
+                print "Would delete..."
+                (d, n) = backend.name_to_loc(s)
+                try:
+                    if n < self.version_vector[d]:
+                        backend.delete(s)
+                        deletions[0] += 1
+                        deletions[1] += u[0]
+                    else:
+                        print "Not deleting log file newer than checkpoint!"
+                except:
+                    print "Error determining age of log segment, keeping"
 
         self.inodes = inodes
         self.util = util
         self.updated_inodes = set()
 
 
         self.inodes = inodes
         self.util = util
         self.updated_inodes = set()
 
+        print "%d bytes total / %d bytes used" % tuple(total_data)
+        print "would delete %d segments (%d bytes)" % tuple(deletions)
+        benchlog_write("bytes_used: %d", total_data[1])
+        benchlog_write("bytes_wasted: %d", total_data[0] - total_data[1])
+        benchlog_write("bytes_freed: %d", deletions[1])
+
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
 
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
 
@@ -460,12 +561,15 @@ class InodeMap:
 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
     inode = inode_map.inodes[inum]
     if copy_data:
 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
     inode = inode_map.inodes[inum]
     if copy_data:
-        blocks = []
+        newlinks = []
         for l in inode.links:
         for l in inode.links:
-            data = load_item(backend, l[1])
-            blocks.append(data)
-            log.write(data, 0)
-        inode.links = [(b.id, b.location) for b in blocks]
+            if l[1] is not None:
+                data = load_item(backend, l[1])
+                log.write(data, 0)
+                newlinks.append((data.id, data.location))
+            else:
+                newlinks.append(l)
+        inode.links = newlinks
     log.write(inode, 1)
     inode_map.mark_updated(inum)
 
     log.write(inode, 1)
     inode_map.mark_updated(inum)
 
@@ -491,7 +595,7 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
     dirty_inode_data = set()
     for s in inode_map.obsolete_segments:
         filename = backend.loc_to_name(s)
     dirty_inode_data = set()
     for s in inode_map.obsolete_segments:
         filename = backend.loc_to_name(s)
-        print "Scanning", filename, "for live data"
+        #print "Scanning", filename, "for live data"
         for item in parse_log(backend.read(filename), filename):
             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
                 if item.inum != 0:
         for item in parse_log(backend.read(filename), filename):
             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
                 if item.inum != 0:
@@ -500,26 +604,40 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
                         dirty_inodes.add(item.inum)
                     if item.inum not in dirty_inode_data:
                         for b in inode.links:
                         dirty_inodes.add(item.inum)
                     if item.inum not in dirty_inode_data:
                         for b in inode.links:
-                            if s == b[1][0:2]:
+                            if b[1] is not None and s == b[1][0:2]:
                                 dirty_inode_data.add(item.inum)
                                 break
 
                                 dirty_inode_data.add(item.inum)
                                 break
 
-    print "Inodes to rewrite:", dirty_inodes
-    print "Inodes with data to rewrite:", dirty_inode_data
+    #print "Inodes to rewrite:", dirty_inodes
+    #print "Inodes with data to rewrite:", dirty_inode_data
     for i in sorted(dirty_inodes.union(dirty_inode_data)):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
     for i in sorted(dirty_inodes.union(dirty_inode_data)):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
-    backend = S3Backend("mvrable-bluesky", cachedir=".")
+    benchlog = open('cleaner.log', 'a')
+    benchlog_write("*** START CLEANER RUN ***")
+    start_time = time.time()
+    backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
     #backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
     #backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
-    print backend.list()
+    #print backend.list()
+    log_dir = LogDirectory(backend, 1)
     imap = InodeMap()
     imap.build(backend, chkpt)
     print chkpt
 
     imap = InodeMap()
     imap.build(backend, chkpt)
     print chkpt
 
-    log_dir = LogDirectory(backend, 1)
+    print "Version vector:", imap.version_vector
+    print "Last cleaner log file:", log_dir.seq_num - 1
+    if imap.version_vector.get(1, -1) != log_dir.seq_num - 1:
+        print "Proxy hasn't updated to latest cleaner segment yet!"
+        benchlog_write("waiting for proxy...")
+        sys.exit(0)
+
     run_cleaner(backend, imap, log_dir)
     print "Version vector:", imap.version_vector
     imap.write(backend, log_dir)
     log_dir.close_all()
     run_cleaner(backend, imap, log_dir)
     print "Version vector:", imap.version_vector
     imap.write(backend, log_dir)
     log_dir.close_all()
+    end_time = time.time()
+    backend.dump_stats()
+    benchlog_write("running_time: %s", end_time - start_time)
+    benchlog_write("")