Byte/string handling fixes for Python 3.
[cumulus.git] / python / cumulus / __init__.py
index ff811b0..6e03bd2 100644 (file)
@@ -1,3 +1,21 @@
+# Cumulus: Efficient Filesystem Backup to the Cloud
+# Copyright (C) 2008-2009, 2012 The Cumulus Developers
+# See the AUTHORS file for a list of contributors.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
 """High-level interface for working with Cumulus archives.
 
 This module provides an easy interface for reading from and manipulating
@@ -8,11 +26,29 @@ various parts of a Cumulus archive:
   - reading and maintaining the local object database
 """
 
-from __future__ import division
-import os, re, sha, tarfile, tempfile, thread
-from pysqlite2 import dbapi2 as sqlite3
-
-import cumulus.store, cumulus.store.file
+from __future__ import division, print_function, unicode_literals
+
+import codecs
+import hashlib
+import itertools
+import os
+import re
+import sqlite3
+import sys
+import tarfile
+import tempfile
+try:
+    import _thread
+except ImportError:
+    import thread as _thread
+
+import cumulus.store
+import cumulus.store.file
+
+if sys.version < "3":
+    StringTypes = (str, unicode)
+else:
+    StringTypes = (str,)
 
 # The largest supported snapshot format that can be understood.
 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
@@ -30,8 +66,15 @@ SEGMENT_FILTERS = [
     (".gpg", "cumulus-filter-gpg --decrypt"),
     (".gz", "gzip -dc"),
     (".bz2", "bzip2 -dc"),
+    ("", None),
 ]
 
+def to_lines(data):
+    """Decode binary data from a file into a sequence of lines.
+
+    Newline markers are retained."""
+    return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
+
 def uri_decode(s):
     """Decode a URI-encoded (%xx escapes) string."""
     def hex_decode(m): return chr(int(m.group(1), 16))
@@ -56,7 +99,9 @@ class Struct:
         return "<%s %s>" % (self.__class__, self.__dict__)
 
 CHECKSUM_ALGORITHMS = {
-    'sha1': sha.new
+    'sha1': hashlib.sha1,
+    'sha224': hashlib.sha224,
+    'sha256': hashlib.sha256,
 }
 
 class ChecksumCreator:
@@ -95,70 +140,194 @@ class ChecksumVerifier:
         result = self.hash.hexdigest()
         return result == self.checksum
 
-class LowlevelDataStore:
-    """Access to the backup store containing segments and snapshot descriptors.
+class SearchPathEntry(object):
+    """Item representing a possible search location for Cumulus files.
 
-    Instances of this class are used to get direct filesystem-level access to
-    the backup data.  To read a backup, a caller will ordinarily not care about
-    direct access to backup segments, but will instead merely need to access
-    objects from those segments.  The ObjectStore class provides a suitable
-    wrapper around a DataStore to give this high-level access.
+    Some Cumulus files might be stored in multiple possible file locations: due
+    to format (different compression mechanisms with different extensions),
+    locality (different segments might be placed in different directories to
+    control archiving policies), for backwards compatibility (default location
+    changed over time).  A SearchPathEntry describes a possible location for a
+    file.
     """
+    def __init__(self, directory_prefix, suffix, context=None):
+        self._directory_prefix = directory_prefix
+        self._suffix = suffix
+        self._context = context
 
-    def __init__(self, path):
-        if isinstance(path, cumulus.store.Store):
-            self.store = path
-        elif path.find(":") >= 0:
-            self.store = cumulus.store.open(path)
-        else:
-            self.store = cumulus.store.file.FileStore(path)
+    def __repr__(self):
+        return "%s(%r, %r, %r)" % (self.__class__.__name__,
+                                   self._directory_prefix, self._suffix,
+                                   self._context)
 
-    def _classify(self, filename):
-        for (t, r) in cumulus.store.type_patterns.items():
-            if r.match(filename):
-                return (t, filename)
-        return (None, filename)
+    def build_path(self, basename):
+        """Construct the search path to use for a file with name basename.
 
-    def scan(self):
-        self.store.scan()
+        Returns a tuple (pathname, context), where pathname is the path to try
+        and context is any additional data associated with this search entry
+        (if any).
+        """
+        return (os.path.join(self._directory_prefix, basename + self._suffix),
+                self._context)
 
-    def lowlevel_open(self, filename):
-        """Return a file-like object for reading data from the given file."""
+class SearchPath(object):
+    """A collection of locations to search for files and lookup utilities.
 
-        (type, filename) = self._classify(filename)
-        return self.store.get(type, filename)
+    For looking for a file in a Cumulus storage backend, a SearchPath object
+    contains a list of possible locations to try.  A SearchPath can be used to
+    perform the search as well; when a file is found the search path ordering
+    is updated (moving the successful SearchPathEntry to the front of the list
+    for future searches).
+    """
+    def __init__(self, name_regex, searchpath):
+        self._regex = re.compile(name_regex)
+        self._path = list(searchpath)
 
-    def lowlevel_stat(self, filename):
-        """Return a dictionary of information about the given file.
+    def add_search_entry(self, entry):
+        self._path.append(entry)
+
+    def directories(self):
+        """Return the set of directories to search for a file type."""
+        return set(entry._directory_prefix for entry in self._path)
+
+    def get(self, backend, basename):
+        for (i, entry) in enumerate(self._path):
+            try:
+                (pathname, context) = entry.build_path(basename)
+                fp = backend.get(pathname)
+                # On success, move this entry to the front of the search path
+                # to speed future searches.
+                if i > 0:
+                    self._path.pop(i)
+                    self._path.insert(0, entry)
+                return (fp, pathname, context)
+            except cumulus.store.NotFoundError:
+                continue
+        raise cumulus.store.NotFoundError(basename)
+
+    def stat(self, backend, basename):
+        for (i, entry) in enumerate(self._path):
+            try:
+                (pathname, context) = entry.build_path(basename)
+                stat_data = backend.stat(pathname)
+                # On success, move this entry to the front of the search path
+                # to speed future searches.
+                if i > 0:
+                    self._path.pop(i)
+                    self._path.insert(0, entry)
+                result = {"path": pathname}
+                result.update(stat_data)
+                return result
+            except cumulus.store.NotFoundError:
+                continue
+        raise cumulus.store.NotFoundError(basename)
 
-        Currently, the only defined field is 'size', giving the size of the
-        file in bytes.
+    def match(self, filename):
+        return self._regex.match(filename)
+
+    def list(self, backend):
+        success = False
+        for d in self.directories():
+            try:
+                for f in backend.list(d):
+                    success = True
+                    m = self.match(f)
+                    if m: yield (os.path.join(d, f), m)
+            except cumulus.store.NotFoundError:
+                pass
+        if not success:
+            raise cumulus.store.NotFoundError(backend)
+
+def _build_segments_searchpath(prefix):
+    for (extension, filter) in SEGMENT_FILTERS:
+        yield SearchPathEntry(prefix, extension, filter)
+
+SEARCH_PATHS = {
+    "checksums": SearchPath(
+        r"^snapshot-(.*)\.(\w+)sums$",
+        [SearchPathEntry("meta", ".sha1sums"),
+         SearchPathEntry("checksums", ".sha1sums"),
+         SearchPathEntry("", ".sha1sums")]),
+    "meta": SearchPath(
+        r"^snapshot-(.*)\.meta(\.\S+)?$",
+        _build_segments_searchpath("meta")),
+    "segments": SearchPath(
+        (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
+         r"\.tar(\.\S+)?$"),
+        itertools.chain(
+            _build_segments_searchpath("segments0"),
+            _build_segments_searchpath("segments1"),
+            _build_segments_searchpath(""),
+            _build_segments_searchpath("segments"))),
+    "snapshots": SearchPath(
+        r"^snapshot-(.*)\.(cumulus|lbs)$",
+        [SearchPathEntry("snapshots", ".cumulus"),
+         SearchPathEntry("snapshots", ".lbs"),
+         SearchPathEntry("", ".cumulus"),
+         SearchPathEntry("", ".lbs")]),
+}
+
+class BackendWrapper(object):
+    """Wrapper around a Cumulus storage backend that understands file types.
+
+    The BackendWrapper class understands different Cumulus file types, such as
+    snapshots and segments, and implements higher-level operations such as
+    "retrieve a snapshot with a specific name" (hiding operations such as
+    searching for the correct file name).
+    """
+
+    def __init__(self, backend):
+        """Initializes a wrapper around the specified storage backend.
+
+        store may either be a Store object or URL.
         """
+        if type(backend) in StringTypes:
+            self._backend = cumulus.store.open(backend)
+        else:
+            self._backend = backend
 
-        (type, filename) = self._classify(filename)
-        return self.store.stat(type, filename)
+    @property
+    def raw_backend(self):
+        return self._backend
 
-    # Slightly higher-level list methods.
-    def list_snapshots(self):
-        for f in self.store.list('snapshots'):
-            m = cumulus.store.type_patterns['snapshots'].match(f)
-            if m: yield m.group(1)
+    def stat_generic(self, basename, filetype):
+        return SEARCH_PATHS[filetype].stat(self._backend, basename)
 
-    def list_segments(self):
-        for f in self.store.list('segments'):
-            m = cumulus.store.type_patterns['segments'].match(f)
-            if m: yield m.group(1)
+    def open_generic(self, basename, filetype):
+        return SEARCH_PATHS[filetype].get(self._backend, basename)
+
+    def open_snapshot(self, name):
+        return self.open_generic("snapshot-" + name, "snapshots")
+
+    def open_segment(self, name):
+        return self.open_generic(name + ".tar", "segments")
 
-class ObjectStore:
-    def __init__(self, data_store):
-        self.store = data_store
+    def list_generic(self, filetype):
+        return ((x[1].group(1), x[0])
+                for x in SEARCH_PATHS[filetype].list(self._backend))
+
+    def prefetch_generic(self):
+        """Calls scan on directories to prefetch file metadata."""
+        directories = set()
+        for typeinfo in SEARCH_PATHS.values():
+            directories.update(typeinfo.directories())
+        for d in directories:
+            print("Prefetch", d)
+            self._backend.scan(d)
+
+class CumulusStore:
+    def __init__(self, backend):
+        if isinstance(backend, BackendWrapper):
+            self.backend = backend
+        else:
+            self.backend = BackendWrapper(backend)
         self.cachedir = None
         self.CACHE_SIZE = 16
-        self.lru_list = []
+        self._lru_list = []
 
     def get_cachedir(self):
         if self.cachedir is None:
-            self.cachedir = tempfile.mkdtemp(".lbs")
+            self.cachedir = tempfile.mkdtemp("-cumulus")
         return self.cachedir
 
     def cleanup(self):
@@ -171,7 +340,7 @@ class ObjectStore:
     def parse_ref(refstr):
         m = re.match(r"^zero\[(\d+)\]$", refstr)
         if m:
-            return ("zero", None, None, (0, int(m.group(1))))
+            return ("zero", None, None, (0, int(m.group(1)), False))
 
         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
         if not m: return
@@ -196,28 +365,37 @@ class ObjectStore:
 
         return (segment, object, checksum, slice)
 
+    def list_snapshots(self):
+        return set(x[0] for x in self.backend.list_generic("snapshots"))
+
+    def list_segments(self):
+        return set(x[0] for x in self.backend.list_generic("segments"))
+
+    def load_snapshot(self, snapshot):
+        snapshot_file = self.backend.open_snapshot(snapshot)[0]
+        return to_lines(snapshot_file.read())
+
+    @staticmethod
+    def filter_data(filehandle, filter_cmd):
+        if filter_cmd is None:
+            return filehandle
+        (input, output) = os.popen2(filter_cmd)
+        def copy_thread(src, dst):
+            BLOCK_SIZE = 4096
+            while True:
+                block = src.read(BLOCK_SIZE)
+                if len(block) == 0: break
+                dst.write(block)
+            src.close()
+            dst.close()
+        _thread.start_new_thread(copy_thread, (filehandle, input))
+        return output
+
     def get_segment(self, segment):
         accessed_segments.add(segment)
 
-        for (extension, filter) in SEGMENT_FILTERS:
-            try:
-                raw = self.store.lowlevel_open(segment + ".tar" + extension)
-
-                (input, output) = os.popen2(filter)
-                def copy_thread(src, dst):
-                    BLOCK_SIZE = 4096
-                    while True:
-                        block = src.read(BLOCK_SIZE)
-                        if len(block) == 0: break
-                        dst.write(block)
-                    dst.close()
-
-                thread.start_new_thread(copy_thread, (raw, input))
-                return output
-            except:
-                pass
-
-        raise cumulus.store.NotFoundError
+        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+        return self.filter_data(segment_fp, filter_cmd)
 
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
@@ -227,10 +405,6 @@ class ObjectStore:
             if len(path) == 2 and path[0] == segment:
                 yield (path[1], data_obj.read())
 
-    def load_snapshot(self, snapshot):
-        file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
-        return file.read().splitlines(True)
-
     def extract_segment(self, segment):
         segdir = os.path.join(self.get_cachedir(), segment)
         os.mkdir(segdir)
@@ -244,11 +418,12 @@ class ObjectStore:
         path = os.path.join(self.get_cachedir(), segment, object)
         if not os.access(path, os.R_OK):
             self.extract_segment(segment)
-        if segment in self.lru_list: self.lru_list.remove(segment)
-        self.lru_list.append(segment)
-        while len(self.lru_list) > self.CACHE_SIZE:
-            os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
-            self.lru_list = self.lru_list[1:]
+        if segment in self._lru_list: self._lru_list.remove(segment)
+        self._lru_list.append(segment)
+        while len(self._lru_list) > self.CACHE_SIZE:
+            os.system("rm -rf " + os.path.join(self.cachedir,
+                                               self._lru_list[0]))
+            self._lru_list = self._lru_list[1:]
         return open(path, 'rb').read()
 
     def get(self, refstr):
@@ -278,6 +453,9 @@ class ObjectStore:
 
         return data
 
+    def prefetch(self):
+        self.backend.prefetch_generic()
+
 def parse(lines, terminate=None):
     """Generic parser for RFC822-style "Key: Value" data streams.
 
@@ -317,7 +495,7 @@ def parse(lines, terminate=None):
 
 def parse_full(lines):
     try:
-        return parse(lines).next()
+        return next(parse(lines))
     except StopIteration:
         return {}
 
@@ -342,7 +520,7 @@ def read_metadata(object_store, root):
 
     def follow_ref(refstr):
         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
-        lines = object_store.get(refstr).splitlines(True)
+        lines = to_lines(object_store.get(refstr))
         lines.reverse()
         stack.append(lines)
 
@@ -512,7 +690,26 @@ class LocalDatabase:
         schemes.sort()
         return schemes
 
-    def garbage_collect(self, scheme, intent=1.0):
+    def list_snapshots(self, scheme):
+        """Return a list of snapshots for the given scheme."""
+        cur = self.cursor()
+        cur.execute("select name from snapshots")
+        snapshots = [row[0] for row in cur.fetchall()]
+        snapshots.sort()
+        return snapshots
+
+    def delete_snapshot(self, scheme, name):
+        """Remove the specified snapshot from the database.
+
+        Warning: This does not garbage collect all dependent data in the
+        database, so it must be followed by a call to garbage_collect() to make
+        the database consistent.
+        """
+        cur = self.cursor()
+        cur.execute("delete from snapshots where scheme = ? and name = ?",
+                    (scheme, name))
+
+    def prune_old_snapshots(self, scheme, intent=1.0):
         """Delete entries from old snapshots from the database.
 
         Only snapshots with the specified scheme name will be deleted.  If
@@ -553,33 +750,36 @@ class LocalDatabase:
                 can_delete = True
 
             if can_delete and not first:
-                print "Delete snapshot %d (%s)" % (id, name)
+                print("Delete snapshot %d (%s)" % (id, name))
                 cur.execute("delete from snapshots where snapshotid = ?",
                             (id,))
             first = False
             max_intent = max(max_intent, snap_intent)
 
-        # Delete entries in the segments_used table which are for non-existent
-        # snapshots.
-        cur.execute("""delete from segments_used
+        self.garbage_collect()
+
+    def garbage_collect(self):
+        """Garbage-collect unreachable segment and object data.
+
+        Remove all segments and checksums which is not reachable from the
+        current set of snapshots stored in the local database.
+        """
+        cur = self.cursor()
+
+        # Delete entries in the segment_utilization table which are for
+        # non-existent snapshots.
+        cur.execute("""delete from segment_utilization
                        where snapshotid not in
                            (select snapshotid from snapshots)""")
 
-        # Find segments which contain no objects used by any current snapshots,
-        # and delete them from the segment table.
+        # Delete segments not referenced by any current snapshots.
         cur.execute("""delete from segments where segmentid not in
-                           (select segmentid from segments_used)""")
+                           (select segmentid from segment_utilization)""")
 
-        # Delete unused objects in the block_index table.  By "unused", we mean
-        # any object which was stored in a segment which has been deleted, and
-        # any object in a segment which was marked for cleaning and has had
-        # cleaning performed already (the expired time is less than the current
-        # largest snapshot id).
+        # Delete dangling objects in the block_index table.
         cur.execute("""delete from block_index
-                       where segmentid not in (select segmentid from segments)
-                          or segmentid in (select segmentid from segments
-                                           where expire_time < ?)""",
-                    (last_snapshotid,))
+                       where segmentid not in
+                           (select segmentid from segments)""")
 
         # Remove sub-block signatures for deleted objects.
         cur.execute("""delete from subblock_signatures
@@ -756,11 +956,11 @@ class LocalDatabase:
         target_size = max(2 * segment_size_estimate,
                           total_bytes / target_buckets)
 
-        print "segment_size:", segment_size_estimate
-        print "distribution:", distribution
-        print "total_bytes:", total_bytes
-        print "target_buckets:", target_buckets
-        print "min, target size:", min_size, target_size
+        print("segment_size:", segment_size_estimate)
+        print("distribution:", distribution)
+        print("total_bytes:", total_bytes)
+        print("target_buckets:", target_buckets)
+        print("min, target size:", min_size, target_size)
 
         # Chosen cutoffs.  Each bucket consists of objects with age greater
         # than one cutoff value, but not greater than the next largest cutoff.
@@ -790,7 +990,7 @@ class LocalDatabase:
             cutoffs.append(-1)
         cutoffs.append(-1)
 
-        print "cutoffs:", cutoffs
+        print("cutoffs:", cutoffs)
 
         # Update the database to assign each object to the appropriate bucket.
         cutoffs.reverse()