Add code to rebuild_database.py to recompute segment metadata.
authorMichael Vrable <vrable@cs.hmc.edu>
Thu, 25 Apr 2013 03:52:15 +0000 (20:52 -0700)
committerMichael Vrable <vrable@cs.hmc.edu>
Sun, 26 Jan 2014 20:25:36 +0000 (12:25 -0800)
This requires that the segments be available.

python/cumulus/__init__.py
python/cumulus/rebuild_database.py

index 8d57c21..ef35325 100644 (file)
@@ -55,6 +55,7 @@ SEGMENT_FILTERS = [
     (".gpg", "cumulus-filter-gpg --decrypt"),
     (".gz", "gzip -dc"),
     (".bz2", "bzip2 -dc"),
+    ("", None),
 ]
 
 def uri_decode(s):
@@ -204,13 +205,16 @@ class SearchPath(object):
                 continue
         raise cumulus.store.NotFoundError(basename)
 
+    def match(self, filename):
+        return self._regex.match(filename)
+
     def list(self, backend):
         success = False
         for d in self.directories():
             try:
                 for f in backend.list(d):
                     success = True
-                    m = self._regex.match(f)
+                    m = self.match(f)
                     if m: yield (os.path.join(d, f), m)
             except cumulus.store.NotFoundError:
                 pass
@@ -229,7 +233,7 @@ SEARCH_PATHS = {
          SearchPathEntry("", ".sha1sums")]),
     "segments": SearchPath(
         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
-         r"(\.\S+)?$"),
+         r"\.tar(\.\S+)?$"),
         itertools.chain(
             _build_segments_searchpath("segments0"),
             _build_segments_searchpath("segments1"),
@@ -345,12 +349,10 @@ class CumulusStore:
         snapshot_file = self.backend.open_snapshot(snapshot)[0]
         return snapshot_file.read().splitlines(True)
 
-    def get_segment(self, segment):
-        accessed_segments.add(segment)
-
-        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+    @staticmethod
+    def filter_data(filehandle, filter_cmd):
         if filter_cmd is None:
-            return segment_fp
+            return filehandle
         (input, output) = os.popen2(filter_cmd)
         def copy_thread(src, dst):
             BLOCK_SIZE = 4096
@@ -360,9 +362,15 @@ class CumulusStore:
                 dst.write(block)
             src.close()
             dst.close()
-        thread.start_new_thread(copy_thread, (segment_fp, input))
+        thread.start_new_thread(copy_thread, (filehandle, input))
         return output
 
+    def get_segment(self, segment):
+        accessed_segments.add(segment)
+
+        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+        return self.filter_data(segment_fp, filter_cmd)
+
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
         for item in seg:
index 53481d5..a02bdd8 100755 (executable)
@@ -30,9 +30,11 @@ import base64
 import hashlib
 import itertools
 import os
+import re
 import struct
 import subprocess
 import sys
+import tarfile
 
 import cumulus
 
@@ -318,7 +320,76 @@ class DatabaseRebuilder(object):
                     (blockid, self.chunker.ALGORITHM_NAME, buffer(sigs)))
 
 
+class SegmentStateRebuilder(object):
+    """Reconstructs segment metadata files from raw segment data."""
+
+    def __init__(self):
+        self.filters = dict(cumulus.SEGMENT_FILTERS)
+        self.segment_pattern = cumulus.SEARCH_PATHS["segments"]
+
+    def compute_metadata(self, path, relative_path):
+        """Recompute metadata of a single segment.
+
+        Args:
+            path: Path to the segment file in the file system.
+            relative_path: Path relative to the root for the storage backend.
+        """
+        # Does the filename match that of a segment?  If so, extract the
+        # extension to determine the filter to apply to decompress.
+        filename = os.path.basename(relative_path)
+        m = self.segment_pattern.match(filename)
+        if not m: return
+        segment_name = m.group(1)
+        extension = m.group(2)
+        if extension not in self.filters: return
+        filter_cmd = self.filters[extension]
+
+        # Compute attributes of the compressed segment data.
+        BLOCK_SIZE = 4096
+        with open(path) as segment:
+            disk_size = 0
+            checksummer = cumulus.ChecksumCreator(CHECKSUM_ALGORITHM)
+            while True:
+                buf = segment.read(BLOCK_SIZE)
+                if len(buf) == 0: break
+                disk_size += len(buf)
+                checksummer.update(buf)
+        checksum = checksummer.compute()
+
+        # Compute attributes of the objects within the segment.
+        data_size = 0
+        object_count = 0
+        with open(path) as segment:
+            decompressed = cumulus.CumulusStore.filter_data(segment, filter_cmd)
+            objects = tarfile.open(mode='r|', fileobj=decompressed)
+            for tarinfo in objects:
+                data_size += tarinfo.size
+                object_count += 1
+
+        return {"segment": segment_name,
+                "path": relative_path,
+                "checksum": checksum,
+                "data_size": data_size,
+                "disk_size": disk_size}
+
 if __name__ == "__main__":
+    segment_rebuilder = SegmentStateRebuilder()
+    topdir = sys.argv[1]
+    files = []
+    for dirpath, dirnames, filenames in os.walk(topdir):
+        for f in filenames:
+            files.append(os.path.join(dirpath, f))
+    files.sort()
+    for f in files:
+        metadata = segment_rebuilder.compute_metadata(
+            f,
+            os.path.relpath(f, topdir))
+        if metadata:
+            for (k, v) in sorted(metadata.items()):
+                print "%s: %s" % (k, cumulus.uri_encode(str(v)))
+            print
+    sys.exit(0)
+
     # Read metadata from stdin; filter out lines starting with "@@" so the
     # statcache file can be parsed as well.
     metadata = (x for x in sys.stdin if not x.startswith("@@"))