Add code to rebuild_database.py to recompute segment metadata.
[cumulus.git] / python / cumulus / __init__.py
index 8d57c21..ef35325 100644 (file)
@@ -55,6 +55,7 @@ SEGMENT_FILTERS = [
     (".gpg", "cumulus-filter-gpg --decrypt"),
     (".gz", "gzip -dc"),
     (".bz2", "bzip2 -dc"),
+    ("", None),
 ]
 
 def uri_decode(s):
@@ -204,13 +205,16 @@ class SearchPath(object):
                 continue
         raise cumulus.store.NotFoundError(basename)
 
+    def match(self, filename):
+        return self._regex.match(filename)
+
     def list(self, backend):
         success = False
         for d in self.directories():
             try:
                 for f in backend.list(d):
                     success = True
-                    m = self._regex.match(f)
+                    m = self.match(f)
                     if m: yield (os.path.join(d, f), m)
             except cumulus.store.NotFoundError:
                 pass
@@ -229,7 +233,7 @@ SEARCH_PATHS = {
          SearchPathEntry("", ".sha1sums")]),
     "segments": SearchPath(
         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
-         r"(\.\S+)?$"),
+         r"\.tar(\.\S+)?$"),
         itertools.chain(
             _build_segments_searchpath("segments0"),
             _build_segments_searchpath("segments1"),
@@ -345,12 +349,10 @@ class CumulusStore:
         snapshot_file = self.backend.open_snapshot(snapshot)[0]
         return snapshot_file.read().splitlines(True)
 
-    def get_segment(self, segment):
-        accessed_segments.add(segment)
-
-        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+    @staticmethod
+    def filter_data(filehandle, filter_cmd):
         if filter_cmd is None:
-            return segment_fp
+            return filehandle
         (input, output) = os.popen2(filter_cmd)
         def copy_thread(src, dst):
             BLOCK_SIZE = 4096
@@ -360,9 +362,15 @@ class CumulusStore:
                 dst.write(block)
             src.close()
             dst.close()
-        thread.start_new_thread(copy_thread, (segment_fp, input))
+        thread.start_new_thread(copy_thread, (filehandle, input))
         return output
 
+    def get_segment(self, segment):
+        accessed_segments.add(segment)
+
+        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+        return self.filter_data(segment_fp, filter_cmd)
+
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
         for item in seg: