Implement metadata caching for S3 backend.
[cumulus.git] / python / cumulus / __init__.py
index 602e70e..5418d3c 100644 (file)
@@ -23,6 +23,28 @@ MAX_RECURSION_DEPTH = 3
 # All segments which have been accessed this session.
 accessed_segments = set()
 
+# Table of methods used to filter segments before storage, and corresponding
+# filename extensions.  These are listed in priority order (methods earlier in
+# the list are tried first).
+SEGMENT_FILTERS = [
+    (".gpg", "cumulus-filter-gpg --decrypt"),
+    (".gz", "gzip -dc"),
+    (".bz2", "bzip2 -dc"),
+]
+
+def uri_decode(s):
+    """Decode a URI-encoded (%xx escapes) string."""
+    def hex_decode(m): return chr(int(m.group(1), 16))
+    return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
+def uri_encode(s):
+    """Encode a string to URI-encoded (%xx escapes) form."""
+    def hex_encode(c):
+        if c > '+' and c < '\x7f' and c != '@':
+            return c
+        else:
+            return "%%%02x" % (ord(c),)
+    return ''.join(hex_encode(c) for c in s)
+
 class Struct:
     """A class which merely acts as a data container.
 
@@ -84,7 +106,12 @@ class LowlevelDataStore:
     """
 
     def __init__(self, path):
-        self.store = cumulus.store.file.FileStore(path)
+        if isinstance(path, cumulus.store.Store):
+            self.store = path
+        elif path.find(":") >= 0:
+            self.store = cumulus.store.open(path)
+        else:
+            self.store = cumulus.store.file.FileStore(path)
 
     def _classify(self, filename):
         for (t, r) in cumulus.store.type_patterns.items():
@@ -92,6 +119,9 @@ class LowlevelDataStore:
                 return (t, filename)
         return (None, filename)
 
+    def scan(self):
+        self.store.scan()
+
     def lowlevel_open(self, filename):
         """Return a file-like object for reading data from the given file."""
 
@@ -168,19 +198,26 @@ class ObjectStore:
 
     def get_segment(self, segment):
         accessed_segments.add(segment)
-        raw = self.store.lowlevel_open(segment + ".tar.gpg")
 
-        (input, output) = os.popen2("lbs-filter-gpg --decrypt")
-        def copy_thread(src, dst):
-            BLOCK_SIZE = 4096
-            while True:
-                block = src.read(BLOCK_SIZE)
-                if len(block) == 0: break
-                dst.write(block)
-            dst.close()
+        for (extension, filter) in SEGMENT_FILTERS:
+            try:
+                raw = self.store.lowlevel_open(segment + ".tar" + extension)
+
+                (input, output) = os.popen2(filter)
+                def copy_thread(src, dst):
+                    BLOCK_SIZE = 4096
+                    while True:
+                        block = src.read(BLOCK_SIZE)
+                        if len(block) == 0: break
+                        dst.write(block)
+                    dst.close()
+
+                thread.start_new_thread(copy_thread, (raw, input))
+                return output
+            except:
+                pass
 
-        thread.start_new_thread(copy_thread, (raw, input))
-        return output
+        raise cumulus.store.NotFoundError
 
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
@@ -344,8 +381,7 @@ class MetadataItem:
     @staticmethod
     def decode_str(s):
         """Decode a URI-encoded (%xx escapes) string."""
-        def hex_decode(m): return chr(int(m.group(1), 16))
-        return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
+        return uri_decode(s)
 
     @staticmethod
     def raw_str(s):