Implement handling of unstable data in WRITE/COMMIT nfs procedures.
[bluesky.git] / cleaner / cleaner
index 1e564e8..009ebe8 100755 (executable)
@@ -42,9 +42,14 @@ class FileBackend:
         return [(f, os.stat(os.path.join(self.path, f)).st_size)
                 for f in files]
 
-    def read(self, filename):
+    def read(self, filename, offset=0, length=None):
         fp = open(os.path.join(self.path, filename), 'rb')
-        return fp.read()
+        if offset > 0:
+            fp.seek(offset)
+        if legnth is None:
+            return fp.read()
+        else:
+            return fp.read(length)
 
     def write(self, filename, data):
         fp = open(os.path.join(self.path, filename), 'wb')
@@ -77,10 +82,15 @@ class S3Backend:
             files.append((k.key, k.size))
         return files
 
-    def read(self, filename):
+    def read(self, filename, offset=0, length=None):
         if filename in self.cache:
             fp = open(os.path.join(self.cachedir, filename), 'rb')
-            return fp.read()
+            if offset > 0:
+                fp.seek(offset)
+            if length is None:
+                return fp.read()
+            else:
+                return fp.read(length)
         else:
             k = Key(self.bucket)
             k.key = self.path + filename
@@ -89,6 +99,10 @@ class S3Backend:
             fp.write(data)
             fp.close()
             self.cache[filename] = True
+            if offset > 0:
+                data = data[offset:]
+            if length is not None:
+                data = data[0:length]
             return data
 
     def write(self, filename, data):
@@ -256,7 +270,7 @@ def load_item(backend, location):
     The elements of the tuple are (directory, sequence, offset, size)."""
 
     filename = backend.loc_to_name((location[0], location[1]))
-    data = backend.read(filename)[location[2] : location[2] + location[3]]
+    data = backend.read(filename, location[2], location[3])
     item = parse_item(data)
     item.location = location
     return item
@@ -401,7 +415,7 @@ def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
     log.write(inode, 1)
     inode_map.mark_updated(inum)
 
-def run_cleaner(backend, inode_map, log):
+def run_cleaner(backend, inode_map, log, repack_inodes=False):
     # Determine which segments are poorly utilized and should be cleaned.  We
     # need better heuristics here.
     for (s, u) in sorted(inode_map.util.segments.items()):
@@ -416,7 +430,10 @@ def run_cleaner(backend, inode_map, log):
     # Given that list of segments to clean, scan through those segments to find
     # data which is still live and mark relevant inodes as needing to be
     # rewritten.
-    dirty_inodes = set()
+    if repack_inodes:
+        dirty_inodes = set(inode_map.inodes)
+    else:
+        dirty_inodes = set()
     dirty_inode_data = set()
     for s in inode_map.obsolete_segments:
         filename = backend.loc_to_name(s)