Byte/string handling fixes for Python 3.
[cumulus.git] / python / cumulus / __init__.py
index ff811b0..6e03bd2 100644 (file)
@@ -1,3 +1,21 @@
+# Cumulus: Efficient Filesystem Backup to the Cloud
+# Copyright (C) 2008-2009, 2012 The Cumulus Developers
+# See the AUTHORS file for a list of contributors.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
 """High-level interface for working with Cumulus archives.
 
 This module provides an easy interface for reading from and manipulating
 """High-level interface for working with Cumulus archives.
 
 This module provides an easy interface for reading from and manipulating
@@ -8,11 +26,29 @@ various parts of a Cumulus archive:
   - reading and maintaining the local object database
 """
 
   - reading and maintaining the local object database
 """
 
-from __future__ import division
-import os, re, sha, tarfile, tempfile, thread
-from pysqlite2 import dbapi2 as sqlite3
-
-import cumulus.store, cumulus.store.file
+from __future__ import division, print_function, unicode_literals
+
+import codecs
+import hashlib
+import itertools
+import os
+import re
+import sqlite3
+import sys
+import tarfile
+import tempfile
+try:
+    import _thread
+except ImportError:
+    import thread as _thread
+
+import cumulus.store
+import cumulus.store.file
+
+if sys.version < "3":
+    StringTypes = (str, unicode)
+else:
+    StringTypes = (str,)
 
 # The largest supported snapshot format that can be understood.
 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
 
 # The largest supported snapshot format that can be understood.
 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
@@ -30,8 +66,15 @@ SEGMENT_FILTERS = [
     (".gpg", "cumulus-filter-gpg --decrypt"),
     (".gz", "gzip -dc"),
     (".bz2", "bzip2 -dc"),
     (".gpg", "cumulus-filter-gpg --decrypt"),
     (".gz", "gzip -dc"),
     (".bz2", "bzip2 -dc"),
+    ("", None),
 ]
 
 ]
 
+def to_lines(data):
+    """Decode binary data from a file into a sequence of lines.
+
+    Newline markers are retained."""
+    return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
+
 def uri_decode(s):
     """Decode a URI-encoded (%xx escapes) string."""
     def hex_decode(m): return chr(int(m.group(1), 16))
 def uri_decode(s):
     """Decode a URI-encoded (%xx escapes) string."""
     def hex_decode(m): return chr(int(m.group(1), 16))
@@ -56,7 +99,9 @@ class Struct:
         return "<%s %s>" % (self.__class__, self.__dict__)
 
 CHECKSUM_ALGORITHMS = {
         return "<%s %s>" % (self.__class__, self.__dict__)
 
 CHECKSUM_ALGORITHMS = {
-    'sha1': sha.new
+    'sha1': hashlib.sha1,
+    'sha224': hashlib.sha224,
+    'sha256': hashlib.sha256,
 }
 
 class ChecksumCreator:
 }
 
 class ChecksumCreator:
@@ -95,70 +140,194 @@ class ChecksumVerifier:
         result = self.hash.hexdigest()
         return result == self.checksum
 
         result = self.hash.hexdigest()
         return result == self.checksum
 
-class LowlevelDataStore:
-    """Access to the backup store containing segments and snapshot descriptors.
+class SearchPathEntry(object):
+    """Item representing a possible search location for Cumulus files.
 
 
-    Instances of this class are used to get direct filesystem-level access to
-    the backup data.  To read a backup, a caller will ordinarily not care about
-    direct access to backup segments, but will instead merely need to access
-    objects from those segments.  The ObjectStore class provides a suitable
-    wrapper around a DataStore to give this high-level access.
+    Some Cumulus files might be stored in multiple possible file locations: due
+    to format (different compression mechanisms with different extensions),
+    locality (different segments might be placed in different directories to
+    control archiving policies), for backwards compatibility (default location
+    changed over time).  A SearchPathEntry describes a possible location for a
+    file.
     """
     """
+    def __init__(self, directory_prefix, suffix, context=None):
+        self._directory_prefix = directory_prefix
+        self._suffix = suffix
+        self._context = context
 
 
-    def __init__(self, path):
-        if isinstance(path, cumulus.store.Store):
-            self.store = path
-        elif path.find(":") >= 0:
-            self.store = cumulus.store.open(path)
-        else:
-            self.store = cumulus.store.file.FileStore(path)
+    def __repr__(self):
+        return "%s(%r, %r, %r)" % (self.__class__.__name__,
+                                   self._directory_prefix, self._suffix,
+                                   self._context)
 
 
-    def _classify(self, filename):
-        for (t, r) in cumulus.store.type_patterns.items():
-            if r.match(filename):
-                return (t, filename)
-        return (None, filename)
+    def build_path(self, basename):
+        """Construct the search path to use for a file with name basename.
 
 
-    def scan(self):
-        self.store.scan()
+        Returns a tuple (pathname, context), where pathname is the path to try
+        and context is any additional data associated with this search entry
+        (if any).
+        """
+        return (os.path.join(self._directory_prefix, basename + self._suffix),
+                self._context)
 
 
-    def lowlevel_open(self, filename):
-        """Return a file-like object for reading data from the given file."""
+class SearchPath(object):
+    """A collection of locations to search for files and lookup utilities.
 
 
-        (type, filename) = self._classify(filename)
-        return self.store.get(type, filename)
+    For looking for a file in a Cumulus storage backend, a SearchPath object
+    contains a list of possible locations to try.  A SearchPath can be used to
+    perform the search as well; when a file is found the search path ordering
+    is updated (moving the successful SearchPathEntry to the front of the list
+    for future searches).
+    """
+    def __init__(self, name_regex, searchpath):
+        self._regex = re.compile(name_regex)
+        self._path = list(searchpath)
 
 
-    def lowlevel_stat(self, filename):
-        """Return a dictionary of information about the given file.
+    def add_search_entry(self, entry):
+        self._path.append(entry)
+
+    def directories(self):
+        """Return the set of directories to search for a file type."""
+        return set(entry._directory_prefix for entry in self._path)
+
+    def get(self, backend, basename):
+        for (i, entry) in enumerate(self._path):
+            try:
+                (pathname, context) = entry.build_path(basename)
+                fp = backend.get(pathname)
+                # On success, move this entry to the front of the search path
+                # to speed future searches.
+                if i > 0:
+                    self._path.pop(i)
+                    self._path.insert(0, entry)
+                return (fp, pathname, context)
+            except cumulus.store.NotFoundError:
+                continue
+        raise cumulus.store.NotFoundError(basename)
+
+    def stat(self, backend, basename):
+        for (i, entry) in enumerate(self._path):
+            try:
+                (pathname, context) = entry.build_path(basename)
+                stat_data = backend.stat(pathname)
+                # On success, move this entry to the front of the search path
+                # to speed future searches.
+                if i > 0:
+                    self._path.pop(i)
+                    self._path.insert(0, entry)
+                result = {"path": pathname}
+                result.update(stat_data)
+                return result
+            except cumulus.store.NotFoundError:
+                continue
+        raise cumulus.store.NotFoundError(basename)
 
 
-        Currently, the only defined field is 'size', giving the size of the
-        file in bytes.
+    def match(self, filename):
+        return self._regex.match(filename)
+
+    def list(self, backend):
+        success = False
+        for d in self.directories():
+            try:
+                for f in backend.list(d):
+                    success = True
+                    m = self.match(f)
+                    if m: yield (os.path.join(d, f), m)
+            except cumulus.store.NotFoundError:
+                pass
+        if not success:
+            raise cumulus.store.NotFoundError(backend)
+
+def _build_segments_searchpath(prefix):
+    for (extension, filter) in SEGMENT_FILTERS:
+        yield SearchPathEntry(prefix, extension, filter)
+
+SEARCH_PATHS = {
+    "checksums": SearchPath(
+        r"^snapshot-(.*)\.(\w+)sums$",
+        [SearchPathEntry("meta", ".sha1sums"),
+         SearchPathEntry("checksums", ".sha1sums"),
+         SearchPathEntry("", ".sha1sums")]),
+    "meta": SearchPath(
+        r"^snapshot-(.*)\.meta(\.\S+)?$",
+        _build_segments_searchpath("meta")),
+    "segments": SearchPath(
+        (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
+         r"\.tar(\.\S+)?$"),
+        itertools.chain(
+            _build_segments_searchpath("segments0"),
+            _build_segments_searchpath("segments1"),
+            _build_segments_searchpath(""),
+            _build_segments_searchpath("segments"))),
+    "snapshots": SearchPath(
+        r"^snapshot-(.*)\.(cumulus|lbs)$",
+        [SearchPathEntry("snapshots", ".cumulus"),
+         SearchPathEntry("snapshots", ".lbs"),
+         SearchPathEntry("", ".cumulus"),
+         SearchPathEntry("", ".lbs")]),
+}
+
+class BackendWrapper(object):
+    """Wrapper around a Cumulus storage backend that understands file types.
+
+    The BackendWrapper class understands different Cumulus file types, such as
+    snapshots and segments, and implements higher-level operations such as
+    "retrieve a snapshot with a specific name" (hiding operations such as
+    searching for the correct file name).
+    """
+
+    def __init__(self, backend):
+        """Initializes a wrapper around the specified storage backend.
+
+        store may either be a Store object or URL.
         """
         """
+        if type(backend) in StringTypes:
+            self._backend = cumulus.store.open(backend)
+        else:
+            self._backend = backend
 
 
-        (type, filename) = self._classify(filename)
-        return self.store.stat(type, filename)
+    @property
+    def raw_backend(self):
+        return self._backend
 
 
-    # Slightly higher-level list methods.
-    def list_snapshots(self):
-        for f in self.store.list('snapshots'):
-            m = cumulus.store.type_patterns['snapshots'].match(f)
-            if m: yield m.group(1)
+    def stat_generic(self, basename, filetype):
+        return SEARCH_PATHS[filetype].stat(self._backend, basename)
 
 
-    def list_segments(self):
-        for f in self.store.list('segments'):
-            m = cumulus.store.type_patterns['segments'].match(f)
-            if m: yield m.group(1)
+    def open_generic(self, basename, filetype):
+        return SEARCH_PATHS[filetype].get(self._backend, basename)
+
+    def open_snapshot(self, name):
+        return self.open_generic("snapshot-" + name, "snapshots")
+
+    def open_segment(self, name):
+        return self.open_generic(name + ".tar", "segments")
 
 
-class ObjectStore:
-    def __init__(self, data_store):
-        self.store = data_store
+    def list_generic(self, filetype):
+        return ((x[1].group(1), x[0])
+                for x in SEARCH_PATHS[filetype].list(self._backend))
+
+    def prefetch_generic(self):
+        """Calls scan on directories to prefetch file metadata."""
+        directories = set()
+        for typeinfo in SEARCH_PATHS.values():
+            directories.update(typeinfo.directories())
+        for d in directories:
+            print("Prefetch", d)
+            self._backend.scan(d)
+
+class CumulusStore:
+    def __init__(self, backend):
+        if isinstance(backend, BackendWrapper):
+            self.backend = backend
+        else:
+            self.backend = BackendWrapper(backend)
         self.cachedir = None
         self.CACHE_SIZE = 16
         self.cachedir = None
         self.CACHE_SIZE = 16
-        self.lru_list = []
+        self._lru_list = []
 
     def get_cachedir(self):
         if self.cachedir is None:
 
     def get_cachedir(self):
         if self.cachedir is None:
-            self.cachedir = tempfile.mkdtemp(".lbs")
+            self.cachedir = tempfile.mkdtemp("-cumulus")
         return self.cachedir
 
     def cleanup(self):
         return self.cachedir
 
     def cleanup(self):
@@ -171,7 +340,7 @@ class ObjectStore:
     def parse_ref(refstr):
         m = re.match(r"^zero\[(\d+)\]$", refstr)
         if m:
     def parse_ref(refstr):
         m = re.match(r"^zero\[(\d+)\]$", refstr)
         if m:
-            return ("zero", None, None, (0, int(m.group(1))))
+            return ("zero", None, None, (0, int(m.group(1)), False))
 
         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
         if not m: return
 
         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
         if not m: return
@@ -196,28 +365,37 @@ class ObjectStore:
 
         return (segment, object, checksum, slice)
 
 
         return (segment, object, checksum, slice)
 
+    def list_snapshots(self):
+        return set(x[0] for x in self.backend.list_generic("snapshots"))
+
+    def list_segments(self):
+        return set(x[0] for x in self.backend.list_generic("segments"))
+
+    def load_snapshot(self, snapshot):
+        snapshot_file = self.backend.open_snapshot(snapshot)[0]
+        return to_lines(snapshot_file.read())
+
+    @staticmethod
+    def filter_data(filehandle, filter_cmd):
+        if filter_cmd is None:
+            return filehandle
+        (input, output) = os.popen2(filter_cmd)
+        def copy_thread(src, dst):
+            BLOCK_SIZE = 4096
+            while True:
+                block = src.read(BLOCK_SIZE)
+                if len(block) == 0: break
+                dst.write(block)
+            src.close()
+            dst.close()
+        _thread.start_new_thread(copy_thread, (filehandle, input))
+        return output
+
     def get_segment(self, segment):
         accessed_segments.add(segment)
 
     def get_segment(self, segment):
         accessed_segments.add(segment)
 
-        for (extension, filter) in SEGMENT_FILTERS:
-            try:
-                raw = self.store.lowlevel_open(segment + ".tar" + extension)
-
-                (input, output) = os.popen2(filter)
-                def copy_thread(src, dst):
-                    BLOCK_SIZE = 4096
-                    while True:
-                        block = src.read(BLOCK_SIZE)
-                        if len(block) == 0: break
-                        dst.write(block)
-                    dst.close()
-
-                thread.start_new_thread(copy_thread, (raw, input))
-                return output
-            except:
-                pass
-
-        raise cumulus.store.NotFoundError
+        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+        return self.filter_data(segment_fp, filter_cmd)
 
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
 
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
@@ -227,10 +405,6 @@ class ObjectStore:
             if len(path) == 2 and path[0] == segment:
                 yield (path[1], data_obj.read())
 
             if len(path) == 2 and path[0] == segment:
                 yield (path[1], data_obj.read())
 
-    def load_snapshot(self, snapshot):
-        file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
-        return file.read().splitlines(True)
-
     def extract_segment(self, segment):
         segdir = os.path.join(self.get_cachedir(), segment)
         os.mkdir(segdir)
     def extract_segment(self, segment):
         segdir = os.path.join(self.get_cachedir(), segment)
         os.mkdir(segdir)
@@ -244,11 +418,12 @@ class ObjectStore:
         path = os.path.join(self.get_cachedir(), segment, object)
         if not os.access(path, os.R_OK):
             self.extract_segment(segment)
         path = os.path.join(self.get_cachedir(), segment, object)
         if not os.access(path, os.R_OK):
             self.extract_segment(segment)
-        if segment in self.lru_list: self.lru_list.remove(segment)
-        self.lru_list.append(segment)
-        while len(self.lru_list) > self.CACHE_SIZE:
-            os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
-            self.lru_list = self.lru_list[1:]
+        if segment in self._lru_list: self._lru_list.remove(segment)
+        self._lru_list.append(segment)
+        while len(self._lru_list) > self.CACHE_SIZE:
+            os.system("rm -rf " + os.path.join(self.cachedir,
+                                               self._lru_list[0]))
+            self._lru_list = self._lru_list[1:]
         return open(path, 'rb').read()
 
     def get(self, refstr):
         return open(path, 'rb').read()
 
     def get(self, refstr):
@@ -278,6 +453,9 @@ class ObjectStore:
 
         return data
 
 
         return data
 
+    def prefetch(self):
+        self.backend.prefetch_generic()
+
 def parse(lines, terminate=None):
     """Generic parser for RFC822-style "Key: Value" data streams.
 
 def parse(lines, terminate=None):
     """Generic parser for RFC822-style "Key: Value" data streams.
 
@@ -317,7 +495,7 @@ def parse(lines, terminate=None):
 
 def parse_full(lines):
     try:
 
 def parse_full(lines):
     try:
-        return parse(lines).next()
+        return next(parse(lines))
     except StopIteration:
         return {}
 
     except StopIteration:
         return {}
 
@@ -342,7 +520,7 @@ def read_metadata(object_store, root):
 
     def follow_ref(refstr):
         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
 
     def follow_ref(refstr):
         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
-        lines = object_store.get(refstr).splitlines(True)
+        lines = to_lines(object_store.get(refstr))
         lines.reverse()
         stack.append(lines)
 
         lines.reverse()
         stack.append(lines)
 
@@ -512,7 +690,26 @@ class LocalDatabase:
         schemes.sort()
         return schemes
 
         schemes.sort()
         return schemes
 
-    def garbage_collect(self, scheme, intent=1.0):
+    def list_snapshots(self, scheme):
+        """Return a list of snapshots for the given scheme."""
+        cur = self.cursor()
+        cur.execute("select name from snapshots")
+        snapshots = [row[0] for row in cur.fetchall()]
+        snapshots.sort()
+        return snapshots
+
+    def delete_snapshot(self, scheme, name):
+        """Remove the specified snapshot from the database.
+
+        Warning: This does not garbage collect all dependent data in the
+        database, so it must be followed by a call to garbage_collect() to make
+        the database consistent.
+        """
+        cur = self.cursor()
+        cur.execute("delete from snapshots where scheme = ? and name = ?",
+                    (scheme, name))
+
+    def prune_old_snapshots(self, scheme, intent=1.0):
         """Delete entries from old snapshots from the database.
 
         Only snapshots with the specified scheme name will be deleted.  If
         """Delete entries from old snapshots from the database.
 
         Only snapshots with the specified scheme name will be deleted.  If
@@ -553,33 +750,36 @@ class LocalDatabase:
                 can_delete = True
 
             if can_delete and not first:
                 can_delete = True
 
             if can_delete and not first:
-                print "Delete snapshot %d (%s)" % (id, name)
+                print("Delete snapshot %d (%s)" % (id, name))
                 cur.execute("delete from snapshots where snapshotid = ?",
                             (id,))
             first = False
             max_intent = max(max_intent, snap_intent)
 
                 cur.execute("delete from snapshots where snapshotid = ?",
                             (id,))
             first = False
             max_intent = max(max_intent, snap_intent)
 
-        # Delete entries in the segments_used table which are for non-existent
-        # snapshots.
-        cur.execute("""delete from segments_used
+        self.garbage_collect()
+
+    def garbage_collect(self):
+        """Garbage-collect unreachable segment and object data.
+
+        Remove all segments and checksums which is not reachable from the
+        current set of snapshots stored in the local database.
+        """
+        cur = self.cursor()
+
+        # Delete entries in the segment_utilization table which are for
+        # non-existent snapshots.
+        cur.execute("""delete from segment_utilization
                        where snapshotid not in
                            (select snapshotid from snapshots)""")
 
                        where snapshotid not in
                            (select snapshotid from snapshots)""")
 
-        # Find segments which contain no objects used by any current snapshots,
-        # and delete them from the segment table.
+        # Delete segments not referenced by any current snapshots.
         cur.execute("""delete from segments where segmentid not in
         cur.execute("""delete from segments where segmentid not in
-                           (select segmentid from segments_used)""")
+                           (select segmentid from segment_utilization)""")
 
 
-        # Delete unused objects in the block_index table.  By "unused", we mean
-        # any object which was stored in a segment which has been deleted, and
-        # any object in a segment which was marked for cleaning and has had
-        # cleaning performed already (the expired time is less than the current
-        # largest snapshot id).
+        # Delete dangling objects in the block_index table.
         cur.execute("""delete from block_index
         cur.execute("""delete from block_index
-                       where segmentid not in (select segmentid from segments)
-                          or segmentid in (select segmentid from segments
-                                           where expire_time < ?)""",
-                    (last_snapshotid,))
+                       where segmentid not in
+                           (select segmentid from segments)""")
 
         # Remove sub-block signatures for deleted objects.
         cur.execute("""delete from subblock_signatures
 
         # Remove sub-block signatures for deleted objects.
         cur.execute("""delete from subblock_signatures
@@ -756,11 +956,11 @@ class LocalDatabase:
         target_size = max(2 * segment_size_estimate,
                           total_bytes / target_buckets)
 
         target_size = max(2 * segment_size_estimate,
                           total_bytes / target_buckets)
 
-        print "segment_size:", segment_size_estimate
-        print "distribution:", distribution
-        print "total_bytes:", total_bytes
-        print "target_buckets:", target_buckets
-        print "min, target size:", min_size, target_size
+        print("segment_size:", segment_size_estimate)
+        print("distribution:", distribution)
+        print("total_bytes:", total_bytes)
+        print("target_buckets:", target_buckets)
+        print("min, target size:", min_size, target_size)
 
         # Chosen cutoffs.  Each bucket consists of objects with age greater
         # than one cutoff value, but not greater than the next largest cutoff.
 
         # Chosen cutoffs.  Each bucket consists of objects with age greater
         # than one cutoff value, but not greater than the next largest cutoff.
@@ -790,7 +990,7 @@ class LocalDatabase:
             cutoffs.append(-1)
         cutoffs.append(-1)
 
             cutoffs.append(-1)
         cutoffs.append(-1)
 
-        print "cutoffs:", cutoffs
+        print("cutoffs:", cutoffs)
 
         # Update the database to assign each object to the appropriate bucket.
         cutoffs.reverse()
 
         # Update the database to assign each object to the appropriate bucket.
         cutoffs.reverse()