Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / cleaner / cleaner
index 445bdeb..7b50c73 100755 (executable)
@@ -7,22 +7,73 @@
 #
 # Copyright (C) 2010  The Regents of the University of California
 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
 #
 # Copyright (C) 2010  The Regents of the University of California
 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
-
-import base64, os, re, struct, sys
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+#    notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+#    notice, this list of conditions and the following disclaimer in the
+#    documentation and/or other materials provided with the distribution.
+# 3. Neither the name of the University nor the names of its contributors
+#    may be used to endorse or promote products derived from this software
+#    without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+
+import base64, os, re, struct, sys, time
 import boto
 import boto
+from boto.s3.key import Key
 
 # The BlueSky 'struct cloudlog_header' data type.
 
 # The BlueSky 'struct cloudlog_header' data type.
-HEADER_FORMAT = '<4sb16sQIII'
-HEADER_MAGIC = 'AgI-'
+HEADER_FORMAT = '<4s48sb16sQIII'
+HEADER_CRYPTBYTES = 48
+HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
+HEADER_MAGIC2 = 'AgI='          # Encrypted data
 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
+CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
+
+# Log file to write benchmark data to
+benchlog = None
+def benchlog_write(msg, *args):
+    m = msg % args
+    print "LOG:", m
+    if benchlog is not None:
+        benchlog.write(msg % args)
+        benchlog.write("\n")
+
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
-class FileBackend:
+class Backend:
+    """Base class for BlueSky storage backends."""
+
+    def loc_to_name(self, location):
+        return "log-%08d-%08d" % (location)
+
+    def name_to_loc(self, name):
+        m = re.match(r"^log-(\d+)-(\d+)$", name)
+        if m: return (int(m.group(1)), int(m.group(2)))
+
+    def dump_stats(self):
+        pass
+
+class FileBackend(Backend):
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
@@ -31,36 +82,190 @@ class FileBackend:
     def __init__(self, path):
         self.path = path
 
     def __init__(self, path):
         self.path = path
 
-    def list(self):
+    def list(self, directory=0):
         """Return a listing of all log segments and their sizes."""
 
         """Return a listing of all log segments and their sizes."""
 
-        files = [f for f in os.listdir(self.path) if f.startswith('log-')]
+        prefix = "log-%08d-" % (directory,)
+        files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
         files.sort()
 
         return [(f, os.stat(os.path.join(self.path, f)).st_size)
                 for f in files]
 
         files.sort()
 
         return [(f, os.stat(os.path.join(self.path, f)).st_size)
                 for f in files]
 
-    def read(self, filename):
+    def read(self, filename, offset=0, length=None):
         fp = open(os.path.join(self.path, filename), 'rb')
         fp = open(os.path.join(self.path, filename), 'rb')
-        return fp.read()
+        if offset > 0:
+            fp.seek(offset)
+        if length is None:
+            return fp.read()
+        else:
+            return fp.read(length)
 
     def write(self, filename, data):
         fp = open(os.path.join(self.path, filename), 'wb')
         fp.write(data)
         fp.close()
 
 
     def write(self, filename, data):
         fp = open(os.path.join(self.path, filename), 'wb')
         fp.write(data)
         fp.close()
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
+    def delete(self, filename):
+        os.unlink(os.path.join(self.path, filename))
+
+def retry_wrap(method):
+    def wrapped(self, *args, **kwargs):
+        for retries in range(3):
+            try:
+                return method(self, *args, **kwargs)
+            except:
+                print >>sys.stderr, "S3 operation failed, retrying..."
+                print >>sys.stderr, "  %s %s %s" % (method, args, kwargs)
+                self.connect()
+                time.sleep(1.0)
+        return method(self, *args, **kwargs)
+    return wrapped
+
+class S3Backend(Backend):
+    """An interface to BlueSky where the log segments are on in Amazon S3."""
+
+    def __init__(self, bucket, path='', cachedir="."):
+        self.bucket_name = bucket
+        self.path = path
+        self.cachedir = cachedir
+        self.cache = {}
+        for f in os.listdir(cachedir):
+            self.cache[f] = True
+        #print "Initial cache contents:", list(self.cache.keys())
+        self.connect()
+        self.stats_get = [0, 0]
+        self.stats_put = [0, 0]
+
+    def connect(self):
+        self.conn = boto.connect_s3(is_secure=False)
+        self.bucket = self.conn.get_bucket(self.bucket_name)
+
+    def list(self, directory=0):
+        files = []
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
+            files.append((k.key, k.size))
+        return files
+
+    @retry_wrap
+    def read(self, filename, offset=0, length=None):
+        if filename in self.cache:
+            fp = open(os.path.join(self.cachedir, filename), 'rb')
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
+        else:
+            k = Key(self.bucket)
+            k.key = self.path + filename
+            data = k.get_contents_as_string()
+            fp = open(os.path.join(self.cachedir, filename), 'wb')
+            fp.write(data)
+            fp.close()
+            self.cache[filename] = True
+            self.stats_get[0] += 1
+            self.stats_get[1] += len(data)
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
+            return data
+
+    @retry_wrap
+    def write(self, filename, data):
+        k = Key(self.bucket)
+        k.key = self.path + filename
+        k.set_contents_from_string(data)
+        self.stats_put[0] += 1
+        self.stats_put[1] += len(data)
+        if filename in self.cache:
+            del self.cache[filename]
+
+    @retry_wrap
+    def delete(self, filename):
+        k = Key(self.bucket)
+        k.key = self.path + filename
+        k.delete()
+        if filename in self.cache:
+            del self.cache[filename]
+
+    def dump_stats(self):
+        print "S3 statistics:"
+        print "GET: %d ops / %d bytes" % tuple(self.stats_get)
+        print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
+        benchlog_write("s3_get: %d", self.stats_get[1])
+        benchlog_write("s3_put: %d", self.stats_put[1])
+
+class SimpleBackend(Backend):
+    """An interface to the simple BlueSky test network server."""
+
+    def __init__(self, server=('localhost', 12345), cachedir="."):
+        self.bucket_name = bucket
+        self.server_address = server
+        self.cachedir = cachedir
+        self.cache = {}
+
+    def _get_socket(self):
+        return socket.create_connection(self.server_address).makefile()
+
+    def list(self, directory=0):
+        files = []
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
+            files.append((k.key, k.size))
+        return files
+
+    def read(self, filename, offset=0, length=None):
+        if filename in self.cache:
+            fp = open(os.path.join(self.cachedir, filename), 'rb')
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
+        else:
+            f = self._get_socket()
+            f.write("GET %s %d %d\n" % (filename, 0, 0))
+            f.flush()
+            datalen = int(f.readline())
+            if datalen < 0:
+                raise RuntimeError
+            data = f.read(datalen)
+            fp = open(os.path.join(self.cachedir, filename), 'wb')
+            fp.write(data)
+            fp.close()
+            self.cache[filename] = True
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
+            return data
 
 
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
+    def write(self, filename, data):
+        f = self._get_socket()
+        f.write("PUT %s %d %d\n" % (filename, len(data)))
+        f.write(data)
+        f.flush()
+        result = int(f.readline())
+        if filename in self.cache:
+            del self.cache[filename]
+
+    def delete(self, filename):
+        pass
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
 
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
 
+    def __init__(self):
+        self.cryptkeys = '\0' * HEADER_CRYPTBYTES
+        self.encrypted = False
+
     def __str__(self):
     def __str__(self):
-        return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
+        return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
 
     @staticmethod
     def random_id():
 
     @staticmethod
     def random_id():
@@ -76,8 +281,13 @@ class LogItem:
         link_ids = ''.join(link_ids)
         link_locs = ''.join(link_locs)
 
         link_ids = ''.join(link_ids)
         link_locs = ''.join(link_locs)
 
+        if self.encrypted:
+            magic = HEADER_MAGIC2
+        else:
+            magic = HEADER_MAGIC1
         header = struct.pack(HEADER_FORMAT,
         header = struct.pack(HEADER_FORMAT,
-                             HEADER_MAGIC, ord(self.type), self.id, self.inum,
+                             magic, self.cryptkeys,
+                             ord(self.type), self.id, self.inum,
                              len(self.data), len(link_ids), len(link_locs))
         return header + self.data + link_ids + link_locs
 
                              len(self.data), len(link_ids), len(link_locs))
         return header + self.data + link_ids + link_locs
 
@@ -109,7 +319,8 @@ class LogDirectory:
         self.backend = backend
         self.dir_num = dir
         self.seq_num = 0
         self.backend = backend
         self.dir_num = dir
         self.seq_num = 0
-        for logname in backend.list():
+        for logname in backend.list(dir):
+            #print "Old log file:", logname
             loc = backend.name_to_loc(logname[0])
             if loc is not None and loc[0] == dir:
                 self.seq_num = max(self.seq_num, loc[1] + 1)
             loc = backend.name_to_loc(logname[0])
             if loc is not None and loc[0] == dir:
                 self.seq_num = max(self.seq_num, loc[1] + 1)
@@ -142,7 +353,7 @@ class UtilizationTracker:
 
     def __init__(self, backend):
         self.segments = {}
 
     def __init__(self, backend):
         self.segments = {}
-        for (segment, size) in backend.list():
+        for (segment, size) in backend.list(0) + backend.list(1):
             self.segments[segment] = [size, 0]
 
     def add_item(self, item):
             self.segments[segment] = [size, 0]
 
     def add_item(self, item):
@@ -156,28 +367,30 @@ class UtilizationTracker:
 def parse_item(data):
     if len(data) < HEADER_SIZE: return
     header = struct.unpack_from(HEADER_FORMAT, data, 0)
 def parse_item(data):
     if len(data) < HEADER_SIZE: return
     header = struct.unpack_from(HEADER_FORMAT, data, 0)
-    size = HEADER_SIZE + sum(header[4:7])
+    size = HEADER_SIZE + sum(header[5:8])
 
 
-    if header[0] != HEADER_MAGIC:
+    if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
         print "Bad header magic!"
         return
 
     if len(data) != size:
         print "Bad header magic!"
         return
 
     if len(data) != size:
-        print "Item size does not match!"
+        print "Item size does not match: %d != %d" % (size, len(data))
         return
 
     item = LogItem()
         return
 
     item = LogItem()
-    item.id = header[2]
-    item.inum = header[3]
+    if header[0] == HEADER_MAGIC2: item.encrypted = True
+    item.cryptkeys = header[1]
+    item.id = header[3]
+    item.inum = header[4]
     item.location = None
     item.location = None
-    item.type = chr(header[1])
+    item.type = chr(header[2])
     item.size = size
     item.size = size
-    item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
+    item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
     links = []
     links = []
-    link_ids = data[HEADER_SIZE + header[4]
-                    : HEADER_SIZE + header[4] + header[5]]
-    link_locs = data[HEADER_SIZE + header[4] + header[5]
-                     : HEADER_SIZE + sum(header[4:7])]
+    link_ids = data[HEADER_SIZE + header[5]
+                    : HEADER_SIZE + header[5] + header[6]]
+    link_locs = data[HEADER_SIZE + header[5] + header[6]
+                     : HEADER_SIZE + sum(header[5:8])]
     for i in range(len(link_ids) // 16):
         id = link_ids[16*i : 16*i + 16]
         if id == '\0' * 16:
     for i in range(len(link_ids) // 16):
         id = link_ids[16*i : 16*i + 16]
         if id == '\0' * 16:
@@ -195,7 +408,7 @@ def load_item(backend, location):
     The elements of the tuple are (directory, sequence, offset, size)."""
 
     filename = backend.loc_to_name((location[0], location[1]))
     The elements of the tuple are (directory, sequence, offset, size)."""
 
     filename = backend.loc_to_name((location[0], location[1]))
-    data = backend.read(filename)[location[2] : location[2] + location[3]]
+    data = backend.read(filename, location[2], location[3])
     item = parse_item(data)
     item.location = location
     return item
     item = parse_item(data)
     item.location = location
     return item
@@ -213,8 +426,8 @@ def parse_log(data, location=None):
     offset = 0
     while len(data) - offset >= HEADER_SIZE:
         header = struct.unpack_from(HEADER_FORMAT, data, offset)
     offset = 0
     while len(data) - offset >= HEADER_SIZE:
         header = struct.unpack_from(HEADER_FORMAT, data, offset)
-        size = HEADER_SIZE + sum(header[4:7])
-        if header[0] != HEADER_MAGIC:
+        size = HEADER_SIZE + sum(header[5:8])
+        if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
             print "Bad header magic!"
             break
         if size + offset > len(data):
             print "Bad header magic!"
             break
         if size + offset > len(data):
@@ -226,9 +439,10 @@ def parse_log(data, location=None):
         if item is not None: yield item
         offset += size
 
         if item is not None: yield item
         offset += size
 
-def load_checkpoint_record(backend):
-    for (log, size) in reversed(backend.list()):
+def load_checkpoint_record(backend, directory=0):
+    for (log, size) in reversed(backend.list(directory)):
         for item in reversed(list(parse_log(backend.read(log), log))):
         for item in reversed(list(parse_log(backend.read(log), log))):
+            print item
             if item.type == ITEM_TYPE.CHECKPOINT:
                 return item
 
             if item.type == ITEM_TYPE.CHECKPOINT:
                 return item
 
@@ -241,6 +455,7 @@ class InodeMap:
 
         This will also build up information about segment utilization."""
 
 
         This will also build up information about segment utilization."""
 
+        self.version_vector = {}
         self.checkpoint_record = checkpoint_record
 
         util = UtilizationTracker(backend)
         self.checkpoint_record = checkpoint_record
 
         util = UtilizationTracker(backend)
@@ -248,12 +463,28 @@ class InodeMap:
         inodes = {}
         self.obsolete_segments = set()
 
         inodes = {}
         self.obsolete_segments = set()
 
-        print "Inode map:"
-        for i in range(len(checkpoint_record.data) // 16):
-            (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
+        data = checkpoint_record.data
+        if not data.startswith(CHECKPOINT_MAGIC):
+            raise ValueError, "Invalid checkpoint record!"
+        data = data[len(CHECKPOINT_MAGIC):]
+        (vvlen,) = struct.unpack_from("<I", data, 0)
+        self.vvsize = 4 + 8*vvlen
+        for i in range(vvlen):
+            (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
+            self.version_vector[v1] = v2
+        print self.version_vector
+        self.version_vector[checkpoint_record.location[0]] \
+            = checkpoint_record.location[1]
+        print self.version_vector
+
+        data = data[self.vvsize:]
+
+        #print "Inode map:"
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
             imap = load_item(backend, checkpoint_record.links[i][1])
             util.add_item(imap)
             imap = load_item(backend, checkpoint_record.links[i][1])
             util.add_item(imap)
-            print "[%d, %d]: %s" % (start, end, imap)
+            #print "[%d, %d]: %s" % (start, end, imap)
             for j in range(len(imap.data) // 8):
                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
                 inode = load_item(backend, imap.links[j][1])
             for j in range(len(imap.data) // 8):
                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
                 inode = load_item(backend, imap.links[j][1])
@@ -262,19 +493,40 @@ class InodeMap:
                 util.add_item(inode)
                 for i in inode.links:
                     util.add_item(i[1])
                 util.add_item(inode)
                 for i in inode.links:
                     util.add_item(i[1])
-                    data_segments.add(i[1][0:2])
-                print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
+                    if i[1] is not None:
+                        data_segments.add(i[1][0:2])
+                #print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
 
 
-        print
+        #print
         print "Segment utilizations:"
         print "Segment utilizations:"
+        total_data = [0, 0]
+        deletions = [0, 0]
         for (s, u) in sorted(util.segments.items()):
         for (s, u) in sorted(util.segments.items()):
-            #if u[1] > 0:
+            for i in range(2): total_data[i] += u[i]
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
+            if u[1] == 0:
+                print "Would delete..."
+                (d, n) = backend.name_to_loc(s)
+                try:
+                    if n < self.version_vector[d]:
+                        backend.delete(s)
+                        deletions[0] += 1
+                        deletions[1] += u[0]
+                    else:
+                        print "Not deleting log file newer than checkpoint!"
+                except:
+                    print "Error determining age of log segment, keeping"
 
         self.inodes = inodes
         self.util = util
         self.updated_inodes = set()
 
 
         self.inodes = inodes
         self.util = util
         self.updated_inodes = set()
 
+        print "%d bytes total / %d bytes used" % tuple(total_data)
+        print "would delete %d segments (%d bytes)" % tuple(deletions)
+        benchlog_write("bytes_used: %d", total_data[1])
+        benchlog_write("bytes_wasted: %d", total_data[0] - total_data[1])
+        benchlog_write("bytes_freed: %d", deletions[1])
+
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
 
     def mark_updated(self, inum):
         self.updated_inodes.add(inum)
 
@@ -285,13 +537,18 @@ class InodeMap:
         new_checkpoint.id = LogItem.random_id()
         new_checkpoint.inum = 0
         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
         new_checkpoint.id = LogItem.random_id()
         new_checkpoint.inum = 0
         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
-        new_checkpoint.data = ""
+        new_checkpoint.data = CHECKPOINT_MAGIC
         new_checkpoint.links = []
 
         new_checkpoint.links = []
 
-        for i in range(len(self.checkpoint_record.data) // 16):
-            (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
+        new_checkpoint.data += struct.pack('<I', len(self.version_vector))
+        for d in sorted(self.version_vector):
+            new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
 
 
-            new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
+        data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
+
+            new_checkpoint.data += data[16*i : 16*i + 16]
 
             # Case 1: No inodes in this range of the old inode map have
             # changed.  Simply emit a new pointer to the same inode map block.
 
             # Case 1: No inodes in this range of the old inode map have
             # changed.  Simply emit a new pointer to the same inode map block.
@@ -328,20 +585,23 @@ class InodeMap:
 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
     inode = inode_map.inodes[inum]
     if copy_data:
 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
     inode = inode_map.inodes[inum]
     if copy_data:
-        blocks = []
+        newlinks = []
         for l in inode.links:
         for l in inode.links:
-            data = load_item(backend, l[1])
-            blocks.append(data)
-            log.write(data, 0)
-        inode.links = [(b.id, b.location) for b in blocks]
+            if l[1] is not None:
+                data = load_item(backend, l[1])
+                log.write(data, 0)
+                newlinks.append((data.id, data.location))
+            else:
+                newlinks.append(l)
+        inode.links = newlinks
     log.write(inode, 1)
     inode_map.mark_updated(inum)
 
     log.write(inode, 1)
     inode_map.mark_updated(inum)
 
-def run_cleaner(backend, inode_map, log):
+def run_cleaner(backend, inode_map, log, repack_inodes=False):
     # Determine which segments are poorly utilized and should be cleaned.  We
     # need better heuristics here.
     for (s, u) in sorted(inode_map.util.segments.items()):
     # Determine which segments are poorly utilized and should be cleaned.  We
     # need better heuristics here.
     for (s, u) in sorted(inode_map.util.segments.items()):
-        if float(u[1]) / u[0] < 0.95 and u[1] > 0:
+        if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
             print "Should clean segment", s
             loc = backend.name_to_loc(s)
             if s: inode_map.obsolete_segments.add(loc)
             print "Should clean segment", s
             loc = backend.name_to_loc(s)
             if s: inode_map.obsolete_segments.add(loc)
@@ -352,11 +612,14 @@ def run_cleaner(backend, inode_map, log):
     # Given that list of segments to clean, scan through those segments to find
     # data which is still live and mark relevant inodes as needing to be
     # rewritten.
     # Given that list of segments to clean, scan through those segments to find
     # data which is still live and mark relevant inodes as needing to be
     # rewritten.
-    dirty_inodes = set()
+    if repack_inodes:
+        dirty_inodes = set(inode_map.inodes)
+    else:
+        dirty_inodes = set()
     dirty_inode_data = set()
     for s in inode_map.obsolete_segments:
         filename = backend.loc_to_name(s)
     dirty_inode_data = set()
     for s in inode_map.obsolete_segments:
         filename = backend.loc_to_name(s)
-        print "Scanning", filename, "for live data"
+        #print "Scanning", filename, "for live data"
         for item in parse_log(backend.read(filename), filename):
             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
                 if item.inum != 0:
         for item in parse_log(backend.read(filename), filename):
             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
                 if item.inum != 0:
@@ -365,23 +628,40 @@ def run_cleaner(backend, inode_map, log):
                         dirty_inodes.add(item.inum)
                     if item.inum not in dirty_inode_data:
                         for b in inode.links:
                         dirty_inodes.add(item.inum)
                     if item.inum not in dirty_inode_data:
                         for b in inode.links:
-                            if s == b[1][0:2]:
+                            if b[1] is not None and s == b[1][0:2]:
                                 dirty_inode_data.add(item.inum)
                                 break
 
                                 dirty_inode_data.add(item.inum)
                                 break
 
-    print "Inodes to rewrite:", dirty_inodes
-    print "Inodes with data to rewrite:", dirty_inode_data
+    #print "Inodes to rewrite:", dirty_inodes
+    #print "Inodes with data to rewrite:", dirty_inode_data
     for i in sorted(dirty_inodes.union(dirty_inode_data)):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
     for i in sorted(dirty_inodes.union(dirty_inode_data)):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
-    backend = FileBackend(".")
+    benchlog = open('cleaner.log', 'a')
+    benchlog_write("*** START CLEANER RUN ***")
+    start_time = time.time()
+    backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
+    #backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
     chkpt = load_checkpoint_record(backend)
+    #print backend.list()
+    log_dir = LogDirectory(backend, 1)
     imap = InodeMap()
     imap.build(backend, chkpt)
     print chkpt
 
     imap = InodeMap()
     imap.build(backend, chkpt)
     print chkpt
 
-    log_dir = LogDirectory(backend, 1)
+    print "Version vector:", imap.version_vector
+    print "Last cleaner log file:", log_dir.seq_num - 1
+    if imap.version_vector.get(1, -1) != log_dir.seq_num - 1:
+        print "Proxy hasn't updated to latest cleaner segment yet!"
+        benchlog_write("waiting for proxy...")
+        sys.exit(0)
+
     run_cleaner(backend, imap, log_dir)
     run_cleaner(backend, imap, log_dir)
+    print "Version vector:", imap.version_vector
     imap.write(backend, log_dir)
     log_dir.close_all()
     imap.write(backend, log_dir)
     log_dir.close_all()
+    end_time = time.time()
+    backend.dump_stats()
+    benchlog_write("running_time: %s", end_time - start_time)
+    benchlog_write("")