Have cleaner wait on proxy when needed
[bluesky.git] / cleaner / cleaner
index 60c91f5..f2dcfcb 100755 (executable)
@@ -21,6 +21,15 @@ HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
 
+# Log file to write benchmark data to
+benchlog = None
+def benchlog_write(msg, *args):
+    m = msg % args
+    print "LOG:", m
+    if benchlog is not None:
+        benchlog.write(msg % args)
+        benchlog.write("\n")
+
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
@@ -83,6 +92,7 @@ def retry_wrap(method):
                 return method(self, *args, **kwargs)
             except:
                 print >>sys.stderr, "S3 operation failed, retrying..."
+                print >>sys.stderr, "  %s %s %s" % (method, args, kwargs)
                 self.connect()
                 time.sleep(1.0)
         return method(self, *args, **kwargs)
@@ -98,7 +108,7 @@ class S3Backend(Backend):
         self.cache = {}
         for f in os.listdir(cachedir):
             self.cache[f] = True
-        print "Initial cache contents:", list(self.cache.keys())
+        #print "Initial cache contents:", list(self.cache.keys())
         self.connect()
         self.stats_get = [0, 0]
         self.stats_put = [0, 0]
@@ -162,6 +172,8 @@ class S3Backend(Backend):
         print "S3 statistics:"
         print "GET: %d ops / %d bytes" % tuple(self.stats_get)
         print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
+        benchlog_write("s3_get: %d", self.stats_get[1])
+        benchlog_write("s3_put: %d", self.stats_put[1])
 
 class SimpleBackend(Backend):
     """An interface to the simple BlueSky test network server."""
@@ -284,7 +296,7 @@ class LogDirectory:
         self.dir_num = dir
         self.seq_num = 0
         for logname in backend.list(dir):
-            print "Old log file:", logname
+            #print "Old log file:", logname
             loc = backend.name_to_loc(logname[0])
             if loc is not None and loc[0] == dir:
                 self.seq_num = max(self.seq_num, loc[1] + 1)
@@ -443,12 +455,12 @@ class InodeMap:
 
         data = data[self.vvsize:]
 
-        print "Inode map:"
+        #print "Inode map:"
         for i in range(len(data) // 16):
             (start, end) = struct.unpack_from("<QQ", data, 16*i)
             imap = load_item(backend, checkpoint_record.links[i][1])
             util.add_item(imap)
-            print "[%d, %d]: %s" % (start, end, imap)
+            #print "[%d, %d]: %s" % (start, end, imap)
             for j in range(len(imap.data) // 8):
                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
                 inode = load_item(backend, imap.links[j][1])
@@ -457,10 +469,11 @@ class InodeMap:
                 util.add_item(inode)
                 for i in inode.links:
                     util.add_item(i[1])
-                    data_segments.add(i[1][0:2])
-                print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
+                    if i[1] is not None:
+                        data_segments.add(i[1][0:2])
+                #print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
 
-        print
+        #print
         print "Segment utilizations:"
         total_data = [0, 0]
         deletions = [0, 0]
@@ -469,9 +482,16 @@ class InodeMap:
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             if u[1] == 0:
                 print "Would delete..."
-                backend.delete(s)
-                deletions[0] += 1
-                deletions[1] += u[0]
+                (d, n) = backend.name_to_loc(s)
+                try:
+                    if n < self.version_vector[d]:
+                        backend.delete(s)
+                        deletions[0] += 1
+                        deletions[1] += u[0]
+                    else:
+                        print "Not deleting log file newer than checkpoint!"
+                except:
+                    print "Error determining age of log segment, keeping"
 
         self.inodes = inodes
         self.util = util
@@ -479,6 +499,9 @@ class InodeMap:
 
         print "%d bytes total / %d bytes used" % tuple(total_data)
         print "would delete %d segments (%d bytes)" % tuple(deletions)
+        benchlog_write("bytes_used: %d", total_data[1])
+        benchlog_write("bytes_wasted: %d", total_data[0] - total_data[1])
+        benchlog_write("bytes_freed: %d", deletions[1])
 
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
@@ -538,12 +561,15 @@ class InodeMap:
 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
     inode = inode_map.inodes[inum]
     if copy_data:
-        blocks = []
+        newlinks = []
         for l in inode.links:
-            data = load_item(backend, l[1])
-            blocks.append(data)
-            log.write(data, 0)
-        inode.links = [(b.id, b.location) for b in blocks]
+            if l[1] is not None:
+                data = load_item(backend, l[1])
+                log.write(data, 0)
+                newlinks.append((data.id, data.location))
+            else:
+                newlinks.append(l)
+        inode.links = newlinks
     log.write(inode, 1)
     inode_map.mark_updated(inum)
 
@@ -569,7 +595,7 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
     dirty_inode_data = set()
     for s in inode_map.obsolete_segments:
         filename = backend.loc_to_name(s)
-        print "Scanning", filename, "for live data"
+        #print "Scanning", filename, "for live data"
         for item in parse_log(backend.read(filename), filename):
             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
                 if item.inum != 0:
@@ -578,30 +604,40 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
                         dirty_inodes.add(item.inum)
                     if item.inum not in dirty_inode_data:
                         for b in inode.links:
-                            if s == b[1][0:2]:
+                            if b[1] is not None and s == b[1][0:2]:
                                 dirty_inode_data.add(item.inum)
                                 break
 
-    print "Inodes to rewrite:", dirty_inodes
-    print "Inodes with data to rewrite:", dirty_inode_data
+    #print "Inodes to rewrite:", dirty_inodes
+    #print "Inodes with data to rewrite:", dirty_inode_data
     for i in sorted(dirty_inodes.union(dirty_inode_data)):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
+    benchlog = open('cleaner.log', 'a')
+    benchlog_write("*** START CLEANER RUN ***")
     start_time = time.time()
     backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
     #backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
-    print backend.list()
+    #print backend.list()
+    log_dir = LogDirectory(backend, 1)
     imap = InodeMap()
     imap.build(backend, chkpt)
     print chkpt
 
-    log_dir = LogDirectory(backend, 1)
+    print "Version vector:", imap.version_vector
+    print "Last cleaner log file:", log_dir.seq_num - 1
+    if imap.version_vector.get(1, -1) != log_dir.seq_num - 1:
+        print "Proxy hasn't updated to latest cleaner segment yet!"
+        benchlog_write("waiting for proxy...")
+        sys.exit(0)
+
     run_cleaner(backend, imap, log_dir)
     print "Version vector:", imap.version_vector
     imap.write(backend, log_dir)
     log_dir.close_all()
     end_time = time.time()
-    print "Cleaner running time:", end_time - start_time
     backend.dump_stats()
+    benchlog_write("running_time: %s", end_time - start_time)
+    benchlog_write("")