Plot updates
[bluesky.git] / cleaner / cleaner
index b148b95..4cda7c7 100755 (executable)
@@ -27,7 +27,18 @@ class ITEM_TYPE:
     INODE_MAP = '3'
     CHECKPOINT = '4'
 
-class FileBackend:
+class Backend:
+    """Base class for BlueSky storage backends."""
+
+    def loc_to_name(self, location):
+        return "log-%08d-%08d" % (location)
+
+    def name_to_loc(self, name):
+        m = re.match(r"^log-(\d+)-(\d+)$", name)
+        if m: return (int(m.group(1)), int(m.group(2)))
+
+
+class FileBackend(Backend):
     """An interface to BlueSky where the log segments are on local disk.
 
     This is mainly intended for testing purposes, as the real cleaner would
@@ -63,13 +74,6 @@ class FileBackend:
     def delete(self, filename):
         os.unlink(os.path.join(self.path, filename))
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
-
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
-
 def retry_wrap(method):
     def wrapped(self, *args, **kwargs):
         for retries in range(3):
@@ -82,7 +86,7 @@ def retry_wrap(method):
         return method(self, *args, **kwargs)
     return wrapped
 
-class S3Backend:
+class S3Backend(Backend):
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
@@ -143,12 +147,63 @@ class S3Backend:
         if filename in self.cache:
             del self.cache[filename]
 
-    def loc_to_name(self, location):
-        return "log-%08d-%08d" % (location)
+class SimpleBackend(Backend):
+    """An interface to the simple BlueSky test network server."""
 
-    def name_to_loc(self, name):
-        m = re.match(r"^log-(\d+)-(\d+)$", name)
-        if m: return (int(m.group(1)), int(m.group(2)))
+    def __init__(self, server=('localhost', 12345), cachedir="."):
+        self.bucket_name = bucket
+        self.server_address = server
+        self.cachedir = cachedir
+        self.cache = {}
+
+    def _get_socket(self):
+        return socket.create_connection(self.server_address).makefile()
+
+    def list(self, directory=0):
+        files = []
+        prefix = "log-%08d-" % (directory,)
+        for k in self.bucket.list(self.path + prefix):
+            files.append((k.key, k.size))
+        return files
+
+    def read(self, filename, offset=0, length=None):
+        if filename in self.cache:
+            fp = open(os.path.join(self.cachedir, filename), 'rb')
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
+        else:
+            f = self._get_socket()
+            f.write("GET %s %d %d\n" % (filename, 0, 0))
+            f.flush()
+            datalen = int(f.readline())
+            if datalen < 0:
+                raise RuntimeError
+            data = f.read(datalen)
+            fp = open(os.path.join(self.cachedir, filename), 'wb')
+            fp.write(data)
+            fp.close()
+            self.cache[filename] = True
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
+            return data
+
+    def write(self, filename, data):
+        f = self._get_socket()
+        f.write("PUT %s %d %d\n" % (filename, len(data)))
+        f.write(data)
+        f.flush()
+        result = int(f.readline())
+        if filename in self.cache:
+            del self.cache[filename]
+
+    def delete(self, filename):
+        pass
 
 class LogItem:
     """In-memory representation of a single item stored in a log file."""
@@ -419,7 +474,7 @@ class InodeMap:
         for d in sorted(self.version_vector):
             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
 
-        data = self.checkpoint_record.data[self.vvsize:]
+        data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
         for i in range(len(data) // 16):
             (start, end) = struct.unpack_from("<QQ", data, 16*i)
 
@@ -473,7 +528,7 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
     # Determine which segments are poorly utilized and should be cleaned.  We
     # need better heuristics here.
     for (s, u) in sorted(inode_map.util.segments.items()):
-        if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
+        if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
             print "Should clean segment", s
             loc = backend.name_to_loc(s)
             if s: inode_map.obsolete_segments.add(loc)
@@ -510,8 +565,8 @@ def run_cleaner(backend, inode_map, log, repack_inodes=False):
         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
 
 if __name__ == '__main__':
-    #backend = S3Backend("mvrable-bluesky", cachedir=".")
-    backend = FileBackend(".")
+    backend = S3Backend("mvrable-bluesky", cachedir=".")
+    #backend = FileBackend(".")
     chkpt = load_checkpoint_record(backend)
     print backend.list()
     imap = InodeMap()