Dump some statistics when the cleaner runs
[bluesky.git] / cleaner / cleaner
index 7ebcc2b..93feef4 100755 (executable)
@@ -8,21 +8,39 @@
 # Copyright (C) 2010  The Regents of the University of California
 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
 
 # Copyright (C) 2010  The Regents of the University of California
 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
 
-import base64, os, struct, sys
+import base64, os, re, struct, sys, time
 import boto
 import boto
+from boto.s3.key import Key
 
 # The BlueSky 'struct cloudlog_header' data type.
 
 # The BlueSky 'struct cloudlog_header' data type.
-HEADER_FORMAT = '<4sb16sQIII'
-HEADER_MAGIC = 'AgI-'
+HEADER_FORMAT = '<4s48sb16sQIII'
+HEADER_CRYPTBYTES = 48
+HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
+HEADER_MAGIC2 = 'AgI='          # Encrypted data
 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
 
+CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
+
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
 class ITEM_TYPE:
     DATA = '1'
     INODE = '2'
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
-class FileBackend:
+class Backend:
+    """Base class for BlueSky storage backends."""
+
+    def loc_to_name(self, location):
+        return "log-%08d-%08d" % (location)
+
+    def name_to_loc(self, name):
+        m = re.match(r"^log-(\d+)-(\d+)$", name)
+        if m: return (int(m.group(1)), int(m.group(2)))
+
+    def dump_stats(self):
+        pass
+
+class FileBackend(Backend):
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
@@ -31,24 +49,263 @@ class FileBackend:
     def __init__(self, path):
         self.path = path
 
     def __init__(self, path):
         self.path = path
 
-    def list(self):
+    def list(self, directory=0):
         """Return a listing of all log segments and their sizes."""
 
         """Return a listing of all log segments and their sizes."""
 
-        files = [f for f in os.listdir(self.path) if f.startswith('log-')]
+        prefix = "log-%08d-" % (directory,)
+        files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
         files.sort()
 
         return [(f, os.stat(os.path.join(self.path, f)).st_size)
                 for f in files]
 
         files.sort()
 
         return [(f, os.stat(os.path.join(self.path, f)).st_size)
                 for f in files]
 
-    def read(self, filename):
+    def read(self, filename, offset=0, length=None):
         fp = open(os.path.join(self.path, filename), 'rb')
         fp = open(os.path.join(self.path, filename), 'rb')
-        return fp.read()
+        if offset > 0:
+            fp.seek(offset)
+        if length is None:
+            return fp.read()
+        else:
+            return fp.read(length)
+
+    def write(self, filename, data):
+        fp = open(os.path.join(self.path, filename), 'wb')
+        fp.write(data)
+        fp.close()
+
+    def delete(self, filename):
+        os.unlink(os.path.join(self.path, filename))
+
+def retry_wrap(method):
+    def wrapped(self, *args, **kwargs):
+        for retries in range(3):
+            try:
+                return method(self, *args, **kwargs)
+            except:
+                print >>sys.stderr, "S3 operation failed, retrying..."
+                self.connect()
+                time.sleep(1.0)
+        return method(self, *args, **kwargs)
+    return wrapped
+
+class S3Backend(Backend):
+    """An interface to BlueSky where the log segments are on in Amazon S3."""
+
+    def __init__(self, bucket, path='', cachedir="."):
+        self.bucket_name = bucket
+        self.path = path
+        self.cachedir = cachedir
+        self.cache = {}
+        self.connect()
+        self.stats_get = [0, 0]
+        self.stats_put = [0, 0]
+
+    def connect(self):
+        self.conn = boto.connect_s3(is_secure=False)
+        self.bucket = self.conn.get_bucket(self.bucket_name)
+
+    def list(self, directory=0):
+        files = []
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
+            files.append((k.key, k.size))
+        return files
+
+    @retry_wrap
+    def read(self, filename, offset=0, length=None):
+        if filename in self.cache:
+            fp = open(os.path.join(self.cachedir, filename), 'rb')
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
+        else:
+            k = Key(self.bucket)
+            k.key = self.path + filename
+            data = k.get_contents_as_string()
+            fp = open(os.path.join(self.cachedir, filename), 'wb')
+            fp.write(data)
+            fp.close()
+            self.cache[filename] = True
+            self.stats_get[0] += 1
+            self.stats_get[1] += len(data)
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
+            return data
+
+    @retry_wrap
+    def write(self, filename, data):
+        k = Key(self.bucket)
+        k.key = self.path + filename
+        k.set_contents_from_string(data)
+        self.stats_put[0] += 1
+        self.stats_put[1] += len(data)
+        if filename in self.cache:
+            del self.cache[filename]
+
+    @retry_wrap
+    def delete(self, filename):
+        k = Key(self.bucket)
+        k.key = self.path + filename
+        k.delete()
+        if filename in self.cache:
+            del self.cache[filename]
+
+    def dump_stats(self):
+        print "S3 statistics:"
+        print "GET: %d ops / %d bytes" % tuple(self.stats_get)
+        print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
+
+class SimpleBackend(Backend):
+    """An interface to the simple BlueSky test network server."""
+
+    def __init__(self, server=('localhost', 12345), cachedir="."):
+        self.bucket_name = bucket
+        self.server_address = server
+        self.cachedir = cachedir
+        self.cache = {}
+
+    def _get_socket(self):
+        return socket.create_connection(self.server_address).makefile()
+
+    def list(self, directory=0):
+        files = []
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
+            files.append((k.key, k.size))
+        return files
+
+    def read(self, filename, offset=0, length=None):
+        if filename in self.cache:
+            fp = open(os.path.join(self.cachedir, filename), 'rb')
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
+        else:
+            f = self._get_socket()
+            f.write("GET %s %d %d\n" % (filename, 0, 0))
+            f.flush()
+            datalen = int(f.readline())
+            if datalen < 0:
+                raise RuntimeError
+            data = f.read(datalen)
+            fp = open(os.path.join(self.cachedir, filename), 'wb')
+            fp.write(data)
+            fp.close()
+            self.cache[filename] = True
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
+            return data
+
+    def write(self, filename, data):
+        f = self._get_socket()
+        f.write("PUT %s %d %d\n" % (filename, len(data)))
+        f.write(data)
+        f.flush()
+        result = int(f.readline())
+        if filename in self.cache:
+            del self.cache[filename]
+
+    def delete(self, filename):
+        pass
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
 
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
 
+    def __init__(self):
+        self.cryptkeys = '\0' * HEADER_CRYPTBYTES
+        self.encrypted = False
+
     def __str__(self):
     def __str__(self):
-        return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
+        return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
+
+    @staticmethod
+    def random_id():
+        return open('/dev/urandom').read(16)
+
+    def serialize(self):
+        link_ids = []
+        link_locs = []
+        for (i, l) in self.links:
+            link_ids.append(i)
+            if i != '\0' * 16:
+                link_locs.append(struct.pack('<IIII', *l))
+        link_ids = ''.join(link_ids)
+        link_locs = ''.join(link_locs)
+
+        if self.encrypted:
+            magic = HEADER_MAGIC2
+        else:
+            magic = HEADER_MAGIC1
+        header = struct.pack(HEADER_FORMAT,
+                             magic, self.cryptkeys,
+                             ord(self.type), self.id, self.inum,
+                             len(self.data), len(link_ids), len(link_locs))
+        return header + self.data + link_ids + link_locs
+
+class LogSegment:
+    def __init__(self, backend, location):
+        self.backend = backend
+        self.location = location
+        self.data = []
+
+    def __len__(self):
+        return sum(len(s) for s in self.data)
+
+    def write(self, item):
+        data = item.serialize()
+        offset = len(self)
+        self.data.append(data)
+        item.location = self.location + (offset, len(data))
+
+    def close(self):
+        data = ''.join(self.data)
+        filename = self.backend.loc_to_name(self.location)
+        print "Would write %d bytes of data to %s" % (len(data), filename)
+        self.backend.write(filename, data)
+
+class LogDirectory:
+    TARGET_SIZE = 4 << 20
+
+    def __init__(self, backend, dir):
+        self.backend = backend
+        self.dir_num = dir
+        self.seq_num = 0
+        for logname in backend.list(dir):
+            print "Old log file:", logname
+            loc = backend.name_to_loc(logname[0])
+            if loc is not None and loc[0] == dir:
+                self.seq_num = max(self.seq_num, loc[1] + 1)
+        self.groups = {}
+        print "Starting sequence number is", self.seq_num
+
+    def open_segment(self):
+        seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
+        self.seq_num += 1
+        return seg
+
+    def write(self, item, segment_group=0):
+        if segment_group not in self.groups:
+            self.groups[segment_group] = self.open_segment()
+        seg = self.groups[segment_group]
+        seg.write(item)
+        if len(seg) >= LogDirectory.TARGET_SIZE:
+            seg.close()
+            del self.groups[segment_group]
+
+    def close_all(self):
+        for k in list(self.groups.keys()):
+            self.groups[k].close()
+            del self.groups[k]
 
 class UtilizationTracker:
     """A simple object that tracks what fraction of each segment is used.
 
 class UtilizationTracker:
     """A simple object that tracks what fraction of each segment is used.
@@ -57,7 +314,7 @@ class UtilizationTracker:
 
     def __init__(self, backend):
         self.segments = {}
 
     def __init__(self, backend):
         self.segments = {}
-        for (segment, size) in backend.list():
+        for (segment, size) in backend.list(0) + backend.list(1):
             self.segments[segment] = [size, 0]
 
     def add_item(self, item):
             self.segments[segment] = [size, 0]
 
     def add_item(self, item):
@@ -71,27 +328,30 @@ class UtilizationTracker:
 def parse_item(data):
     if len(data) < HEADER_SIZE: return
     header = struct.unpack_from(HEADER_FORMAT, data, 0)
 def parse_item(data):
     if len(data) < HEADER_SIZE: return
     header = struct.unpack_from(HEADER_FORMAT, data, 0)
-    size = HEADER_SIZE + sum(header[4:7])
+    size = HEADER_SIZE + sum(header[5:8])
 
 
-    if header[0] != HEADER_MAGIC:
+    if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
         print "Bad header magic!"
         return
 
     if len(data) != size:
         print "Bad header magic!"
         return
 
     if len(data) != size:
-        print "Item size does not match!"
+        print "Item size does not match: %d != %d" % (size, len(data))
         return
 
     item = LogItem()
         return
 
     item = LogItem()
-    item.id = header[2]
+    if header[0] == HEADER_MAGIC2: item.encrypted = True
+    item.cryptkeys = header[1]
+    item.id = header[3]
+    item.inum = header[4]
     item.location = None
     item.location = None
-    item.type = chr(header[1])
+    item.type = chr(header[2])
     item.size = size
     item.size = size
-    item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
+    item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
     links = []
     links = []
-    link_ids = data[HEADER_SIZE + header[4]
-                    : HEADER_SIZE + header[4] + header[5]]
-    link_locs = data[HEADER_SIZE + header[4] + header[5]
-                     : HEADER_SIZE + sum(header[4:7])]
+    link_ids = data[HEADER_SIZE + header[5]
+                    : HEADER_SIZE + header[5] + header[6]]
+    link_locs = data[HEADER_SIZE + header[5] + header[6]
+                     : HEADER_SIZE + sum(header[5:8])]
     for i in range(len(link_ids) // 16):
         id = link_ids[16*i : 16*i + 16]
         if id == '\0' * 16:
     for i in range(len(link_ids) // 16):
         id = link_ids[16*i : 16*i + 16]
         if id == '\0' * 16:
@@ -108,26 +368,27 @@ def load_item(backend, location):
 
     The elements of the tuple are (directory, sequence, offset, size)."""
 
 
     The elements of the tuple are (directory, sequence, offset, size)."""
 
-    filename = "log-%08d-%08d" % (location[0], location[1])
-    data = backend.read(filename)[location[2] : location[2] + location[3]]
+    filename = backend.loc_to_name((location[0], location[1]))
+    data = backend.read(filename, location[2], location[3])
     item = parse_item(data)
     item.location = location
     return item
 
     item = parse_item(data)
     item.location = location
     return item
 
-def parse_log(data, logname=None):
+def parse_log(data, location=None):
     """Parse contents of a log file, yielding a sequence of log items."""
 
     """Parse contents of a log file, yielding a sequence of log items."""
 
-    location = None
-    if logname is not None:
-        m = re.match(r"^log-(\d+)-(\d+)$", logname)
+    if isinstance(location, str):
+        m = re.match(r"^log-(\d+)-(\d+)$", location)
         if m:
             location = (int(m.group(1)), int(m.group(2)))
         if m:
             location = (int(m.group(1)), int(m.group(2)))
+        else:
+            location = None
 
     offset = 0
     while len(data) - offset >= HEADER_SIZE:
         header = struct.unpack_from(HEADER_FORMAT, data, offset)
 
     offset = 0
     while len(data) - offset >= HEADER_SIZE:
         header = struct.unpack_from(HEADER_FORMAT, data, offset)
-        size = HEADER_SIZE + sum(header[4:7])
-        if header[0] != HEADER_MAGIC:
+        size = HEADER_SIZE + sum(header[5:8])
+        if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
             print "Bad header magic!"
             break
         if size + offset > len(data):
             print "Bad header magic!"
             break
         if size + offset > len(data):
@@ -139,42 +400,205 @@ def parse_log(data, logname=None):
         if item is not None: yield item
         offset += size
 
         if item is not None: yield item
         offset += size
 
-def load_checkpoint_record(backend):
-    for (log, size) in reversed(backend.list()):
-        for item in reversed(list(parse_log(backend.read(log)))):
+def load_checkpoint_record(backend, directory=0):
+    for (log, size) in reversed(backend.list(directory)):
+        for item in reversed(list(parse_log(backend.read(log), log))):
+            print item
             if item.type == ITEM_TYPE.CHECKPOINT:
                 return item
 
             if item.type == ITEM_TYPE.CHECKPOINT:
                 return item
 
-def build_inode_map(backend, checkpoint_record):
-    """Reconstruct the inode map, starting from the checkpoint record given.
-
-    This will also build up information about segment utilization."""
-
-    util = UtilizationTracker(backend)
-    util.add_item(checkpoint_record)
-
-    print "Inode map:"
-    for i in range(len(checkpoint_record.data) // 16):
-        (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
-        imap = load_item(backend, checkpoint_record.links[i][1])
-        util.add_item(imap)
-        print "[%d, %d]: %s" % (start, end, imap)
-        for j in range(len(imap.data) // 8):
-            (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
-            inode = load_item(backend, imap.links[j][1])
-            data_segments = set()
-            util.add_item(inode)
-            for i in inode.links:
-                util.add_item(i[1])
-                data_segments.add(i[1][0:2])
-            print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
-
-    print
-    print "Segment utilizations:"
-    for (s, u) in sorted(util.segments.items()):
-        print "%s: %s" % (s, u)
+class InodeMap:
+    def __init__(self):
+        pass
+
+    def build(self, backend, checkpoint_record):
+        """Reconstruct the inode map from the checkpoint record given.
+
+        This will also build up information about segment utilization."""
+
+        self.version_vector = {}
+        self.checkpoint_record = checkpoint_record
+
+        util = UtilizationTracker(backend)
+        util.add_item(checkpoint_record)
+        inodes = {}
+        self.obsolete_segments = set()
+
+        data = checkpoint_record.data
+        if not data.startswith(CHECKPOINT_MAGIC):
+            raise ValueError, "Invalid checkpoint record!"
+        data = data[len(CHECKPOINT_MAGIC):]
+        (vvlen,) = struct.unpack_from("<I", data, 0)
+        self.vvsize = 4 + 8*vvlen
+        for i in range(vvlen):
+            (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
+            self.version_vector[v1] = v2
+        print self.version_vector
+        self.version_vector[checkpoint_record.location[0]] \
+            = checkpoint_record.location[1]
+        print self.version_vector
+
+        data = data[self.vvsize:]
+
+        print "Inode map:"
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
+            imap = load_item(backend, checkpoint_record.links[i][1])
+            util.add_item(imap)
+            print "[%d, %d]: %s" % (start, end, imap)
+            for j in range(len(imap.data) // 8):
+                (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
+                inode = load_item(backend, imap.links[j][1])
+                inodes[inum] = inode
+                data_segments = set()
+                util.add_item(inode)
+                for i in inode.links:
+                    util.add_item(i[1])
+                    data_segments.add(i[1][0:2])
+                print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
+
+        print
+        print "Segment utilizations:"
+        total_data = [0, 0]
+        deletions = [0, 0]
+        for (s, u) in sorted(util.segments.items()):
+            for i in range(2): total_data[i] += u[i]
+            print "%s: %s %s" % (s, u, float(u[1]) / u[0])
+            if u[1] == 0:
+                print "Would delete..."
+                #backend.delete(s)
+                deletions[0] += 1
+                deletions[1] += u[0]
+
+        self.inodes = inodes
+        self.util = util
+        self.updated_inodes = set()
+
+        print "%d bytes total / %d bytes used" % tuple(total_data)
+        print "would delete %d segments (%d bytes)" % tuple(deletions)
+
+    def mark_updated(self, inum):
+        self.updated_inodes.add(inum)
+
+    def write(self, backend, log):
+        updated_inodes = sorted(self.updated_inodes, reverse=True)
+
+        new_checkpoint = LogItem()
+        new_checkpoint.id = LogItem.random_id()
+        new_checkpoint.inum = 0
+        new_checkpoint.type = ITEM_TYPE.CHECKPOINT
+        new_checkpoint.data = CHECKPOINT_MAGIC
+        new_checkpoint.links = []
+
+        new_checkpoint.data += struct.pack('<I', len(self.version_vector))
+        for d in sorted(self.version_vector):
+            new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
+
+        data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
+        for i in range(len(data) // 16):
+            (start, end) = struct.unpack_from("<QQ", data, 16*i)
+
+            new_checkpoint.data += data[16*i : 16*i + 16]
+
+            # Case 1: No inodes in this range of the old inode map have
+            # changed.  Simply emit a new pointer to the same inode map block.
+            if len(updated_inodes) == 0 or updated_inodes[-1] > end:
+                old_location = self.checkpoint_record.links[i][1][0:2]
+                if old_location not in self.obsolete_segments:
+                    new_checkpoint.links.append(self.checkpoint_record.links[i])
+                    continue
+
+            # Case 2: Some inodes have been updated.  Create a new inode map
+            # block, write it out, and point the new checkpoint at it.
+            inodes = [k for k in self.inodes if k >= start and k <= end]
+            inodes.sort()
+
+            block = LogItem()
+            block.id = LogItem.random_id()
+            block.inum = 0
+            block.type = ITEM_TYPE.INODE_MAP
+            block.links = []
+            block.data = ""
+            for j in inodes:
+                block.data += struct.pack("<Q", j)
+                block.links.append((self.inodes[j].id, self.inodes[j].location))
+            log.write(block, 2)
+
+            new_checkpoint.links.append((block.id, block.location))
+
+            while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
+                updated_inodes.pop()
+
+        log.write(new_checkpoint, 2)
+        self.checkpoint_record = new_checkpoint
+
+def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
+    inode = inode_map.inodes[inum]
+    if copy_data:
+        blocks = []
+        for l in inode.links:
+            data = load_item(backend, l[1])
+            blocks.append(data)
+            log.write(data, 0)
+        inode.links = [(b.id, b.location) for b in blocks]
+    log.write(inode, 1)
+    inode_map.mark_updated(inum)
+
+def run_cleaner(backend, inode_map, log, repack_inodes=False):
+    # Determine which segments are poorly utilized and should be cleaned.  We
+    # need better heuristics here.
+    for (s, u) in sorted(inode_map.util.segments.items()):
+        if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
+            print "Should clean segment", s
+            loc = backend.name_to_loc(s)
+            if s: inode_map.obsolete_segments.add(loc)
+
+    # TODO: We probably also want heuristics that will find inodes with
+    # badly-fragmented data and rewrite that to achieve better locality.
+
+    # Given that list of segments to clean, scan through those segments to find
+    # data which is still live and mark relevant inodes as needing to be
+    # rewritten.
+    if repack_inodes:
+        dirty_inodes = set(inode_map.inodes)
+    else:
+        dirty_inodes = set()
+    dirty_inode_data = set()
+    for s in inode_map.obsolete_segments:
+        filename = backend.loc_to_name(s)
+        print "Scanning", filename, "for live data"
+        for item in parse_log(backend.read(filename), filename):
+            if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
+                if item.inum != 0:
+                    inode = inode_map.inodes[item.inum]
+                    if s == inode.location[0:2]:
+                        dirty_inodes.add(item.inum)
+                    if item.inum not in dirty_inode_data:
+                        for b in inode.links:
+                            if s == b[1][0:2]:
+                                dirty_inode_data.add(item.inum)
+                                break
+
+    print "Inodes to rewrite:", dirty_inodes
+    print "Inodes with data to rewrite:", dirty_inode_data
+    for i in sorted(dirty_inodes.union(dirty_inode_data)):
+        rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
 
 if __name__ == '__main__':
-    backend = FileBackend(".")
+    start_time = time.time()
+    backend = S3Backend("mvrable-bluesky-west", cachedir="/export/cache")
+    #backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
     chkpt = load_checkpoint_record(backend)
-    build_inode_map(backend, chkpt)
+    print backend.list()
+    imap = InodeMap()
+    imap.build(backend, chkpt)
+    print chkpt
+
+    log_dir = LogDirectory(backend, 1)
+    run_cleaner(backend, imap, log_dir)
+    print "Version vector:", imap.version_vector
+    imap.write(backend, log_dir)
+    log_dir.close_all()
+    end_time = time.time()
+    print "Cleaner running time:", end_time - start_time
+    backend.dump_stats()