Byte/string handling fixes for Python 3.
[cumulus.git] / python / cumulus / __init__.py
index e8fc538..6e03bd2 100644 (file)
@@ -26,19 +26,30 @@ various parts of a Cumulus archive:
   - reading and maintaining the local object database
 """
 
-from __future__ import division
+from __future__ import division, print_function, unicode_literals
+
+import codecs
 import hashlib
 import itertools
 import os
 import re
+import sqlite3
+import sys
 import tarfile
 import tempfile
-import thread
-from pysqlite2 import dbapi2 as sqlite3
+try:
+    import _thread
+except ImportError:
+    import thread as _thread
 
 import cumulus.store
 import cumulus.store.file
 
+if sys.version < "3":
+    StringTypes = (str, unicode)
+else:
+    StringTypes = (str,)
+
 # The largest supported snapshot format that can be understood.
 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
 
@@ -55,8 +66,15 @@ SEGMENT_FILTERS = [
     (".gpg", "cumulus-filter-gpg --decrypt"),
     (".gz", "gzip -dc"),
     (".bz2", "bzip2 -dc"),
+    ("", None),
 ]
 
+def to_lines(data):
+    """Decode binary data from a file into a sequence of lines.
+
+    Newline markers are retained."""
+    return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
+
 def uri_decode(s):
     """Decode a URI-encoded (%xx escapes) string."""
     def hex_decode(m): return chr(int(m.group(1), 16))
@@ -204,18 +222,21 @@ class SearchPath(object):
                 continue
         raise cumulus.store.NotFoundError(basename)
 
+    def match(self, filename):
+        return self._regex.match(filename)
+
     def list(self, backend):
         success = False
         for d in self.directories():
             try:
                 for f in backend.list(d):
                     success = True
-                    m = self._regex.match(f)
+                    m = self.match(f)
                     if m: yield (os.path.join(d, f), m)
             except cumulus.store.NotFoundError:
                 pass
         if not success:
-            raise cumulus.store.NotFoundError(basename)
+            raise cumulus.store.NotFoundError(backend)
 
 def _build_segments_searchpath(prefix):
     for (extension, filter) in SEGMENT_FILTERS:
@@ -227,9 +248,12 @@ SEARCH_PATHS = {
         [SearchPathEntry("meta", ".sha1sums"),
          SearchPathEntry("checksums", ".sha1sums"),
          SearchPathEntry("", ".sha1sums")]),
+    "meta": SearchPath(
+        r"^snapshot-(.*)\.meta(\.\S+)?$",
+        _build_segments_searchpath("meta")),
     "segments": SearchPath(
         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
-         r"(\.\S+)?$"),
+         r"\.tar(\.\S+)?$"),
         itertools.chain(
             _build_segments_searchpath("segments0"),
             _build_segments_searchpath("segments1"),
@@ -257,11 +281,8 @@ class BackendWrapper(object):
 
         store may either be a Store object or URL.
         """
-        if type(backend) in (str, unicode):
-            if backend.find(":") >= 0:
-                self._backend = cumulus.store.open(backend)
-            else:
-                self._backend = cumulus.store.file.FileStore(backend)
+        if type(backend) in StringTypes:
+            self._backend = cumulus.store.open(backend)
         else:
             self._backend = backend
 
@@ -285,6 +306,15 @@ class BackendWrapper(object):
         return ((x[1].group(1), x[0])
                 for x in SEARCH_PATHS[filetype].list(self._backend))
 
+    def prefetch_generic(self):
+        """Calls scan on directories to prefetch file metadata."""
+        directories = set()
+        for typeinfo in SEARCH_PATHS.values():
+            directories.update(typeinfo.directories())
+        for d in directories:
+            print("Prefetch", d)
+            self._backend.scan(d)
+
 class CumulusStore:
     def __init__(self, backend):
         if isinstance(backend, BackendWrapper):
@@ -343,14 +373,12 @@ class CumulusStore:
 
     def load_snapshot(self, snapshot):
         snapshot_file = self.backend.open_snapshot(snapshot)[0]
-        return snapshot_file.read().splitlines(True)
-
-    def get_segment(self, segment):
-        accessed_segments.add(segment)
+        return to_lines(snapshot_file.read())
 
-        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+    @staticmethod
+    def filter_data(filehandle, filter_cmd):
         if filter_cmd is None:
-            return segment_fp
+            return filehandle
         (input, output) = os.popen2(filter_cmd)
         def copy_thread(src, dst):
             BLOCK_SIZE = 4096
@@ -360,9 +388,15 @@ class CumulusStore:
                 dst.write(block)
             src.close()
             dst.close()
-        thread.start_new_thread(copy_thread, (segment_fp, input))
+        _thread.start_new_thread(copy_thread, (filehandle, input))
         return output
 
+    def get_segment(self, segment):
+        accessed_segments.add(segment)
+
+        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+        return self.filter_data(segment_fp, filter_cmd)
+
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
         for item in seg:
@@ -419,6 +453,9 @@ class CumulusStore:
 
         return data
 
+    def prefetch(self):
+        self.backend.prefetch_generic()
+
 def parse(lines, terminate=None):
     """Generic parser for RFC822-style "Key: Value" data streams.
 
@@ -458,7 +495,7 @@ def parse(lines, terminate=None):
 
 def parse_full(lines):
     try:
-        return parse(lines).next()
+        return next(parse(lines))
     except StopIteration:
         return {}
 
@@ -483,7 +520,7 @@ def read_metadata(object_store, root):
 
     def follow_ref(refstr):
         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
-        lines = object_store.get(refstr).splitlines(True)
+        lines = to_lines(object_store.get(refstr))
         lines.reverse()
         stack.append(lines)
 
@@ -713,7 +750,7 @@ class LocalDatabase:
                 can_delete = True
 
             if can_delete and not first:
-                print "Delete snapshot %d (%s)" % (id, name)
+                print("Delete snapshot %d (%s)" % (id, name))
                 cur.execute("delete from snapshots where snapshotid = ?",
                             (id,))
             first = False
@@ -919,11 +956,11 @@ class LocalDatabase:
         target_size = max(2 * segment_size_estimate,
                           total_bytes / target_buckets)
 
-        print "segment_size:", segment_size_estimate
-        print "distribution:", distribution
-        print "total_bytes:", total_bytes
-        print "target_buckets:", target_buckets
-        print "min, target size:", min_size, target_size
+        print("segment_size:", segment_size_estimate)
+        print("distribution:", distribution)
+        print("total_bytes:", total_bytes)
+        print("target_buckets:", target_buckets)
+        print("min, target size:", min_size, target_size)
 
         # Chosen cutoffs.  Each bucket consists of objects with age greater
         # than one cutoff value, but not greater than the next largest cutoff.
@@ -953,7 +990,7 @@ class LocalDatabase:
             cutoffs.append(-1)
         cutoffs.append(-1)
 
-        print "cutoffs:", cutoffs
+        print("cutoffs:", cutoffs)
 
         # Update the database to assign each object to the appropriate bucket.
         cutoffs.reverse()