Update Python code to work with the new backup format.
[cumulus.git] / python / cumulus / __init__.py
index d8b6814..10f0544 100644 (file)
@@ -27,10 +27,17 @@ various parts of a Cumulus archive:
 """
 
 from __future__ import division
-import hashlib, os, re, tarfile, tempfile, thread
+import hashlib
+import itertools
+import os
+import re
+import tarfile
+import tempfile
+import thread
 from pysqlite2 import dbapi2 as sqlite3
 
-import cumulus.store, cumulus.store.file
+import cumulus.store
+import cumulus.store.file
 
 # The largest supported snapshot format that can be understood.
 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
@@ -115,70 +122,182 @@ class ChecksumVerifier:
         result = self.hash.hexdigest()
         return result == self.checksum
 
-class LowlevelDataStore:
-    """Access to the backup store containing segments and snapshot descriptors.
+class SearchPathEntry(object):
+    """Item representing a possible search location for Cumulus files.
 
-    Instances of this class are used to get direct filesystem-level access to
-    the backup data.  To read a backup, a caller will ordinarily not care about
-    direct access to backup segments, but will instead merely need to access
-    objects from those segments.  The ObjectStore class provides a suitable
-    wrapper around a DataStore to give this high-level access.
+    Some Cumulus files might be stored in multiple possible file locations: due
+    to format (different compression mechanisms with different extensions),
+    locality (different segments might be placed in different directories to
+    control archiving policies), for backwards compatibility (default location
+    changed over time).  A SearchPathEntry describes a possible location for a
+    file.
     """
+    def __init__(self, directory_prefix, suffix, context=None):
+        self._directory_prefix = directory_prefix
+        self._suffix = suffix
+        self._context = context
 
-    def __init__(self, path):
-        if isinstance(path, cumulus.store.Store):
-            self.store = path
-        elif path.find(":") >= 0:
-            self.store = cumulus.store.open(path)
-        else:
-            self.store = cumulus.store.file.FileStore(path)
+    def __repr__(self):
+        return "%s(%r, %r, %r)" % (self.__class__.__name__,
+                                   self._directory_prefix, self._suffix,
+                                   self._context)
+
+    def build_path(self, basename):
+        """Construct the search path to use for a file with name basename.
+
+        Returns a tuple (pathname, context), where pathname is the path to try
+        and context is any additional data associated with this search entry
+        (if any).
+        """
+        return (os.path.join(self._directory_prefix, basename + self._suffix),
+                self._context)
 
-    def _classify(self, filename):
-        for (t, r) in cumulus.store.type_patterns.items():
-            if r.match(filename):
-                return (t, filename)
-        return (None, filename)
+class SearchPath(object):
+    """A collection of locations to search for files and lookup utilities.
 
-    def scan(self):
-        self.store.scan()
+    For looking for a file in a Cumulus storage backend, a SearchPath object
+    contains a list of possible locations to try.  A SearchPath can be used to
+    perform the search as well; when a file is found the search path ordering
+    is updated (moving the successful SearchPathEntry to the front of the list
+    for future searches).
+    """
+    def __init__(self, name_regex, searchpath):
+        self._regex = re.compile(name_regex)
+        self._path = list(searchpath)
+
+    def add_search_entry(self, entry):
+        self._path.append(entry)
+
+    def directories(self):
+        """Return the set of directories to search for a file type."""
+        return set(entry._directory_prefix for entry in self._path)
 
-    def lowlevel_open(self, filename):
-        """Return a file-like object for reading data from the given file."""
+    def get(self, backend, basename):
+        for (i, entry) in enumerate(self._path):
+            try:
+                (pathname, context) = entry.build_path(basename)
+                fp = backend.get(pathname)
+                # On success, move this entry to the front of the search path
+                # to speed future searches.
+                if i > 0:
+                    self._path.pop(i)
+                    self._path.insert(0, entry)
+                return (fp, pathname, context)
+            except cumulus.store.NotFoundError:
+                continue
+        raise cumulus.store.NotFoundError(basename)
 
-        (type, filename) = self._classify(filename)
-        return self.store.get(type + "/" + filename)
+    def stat(self, backend, basename):
+        for (i, entry) in enumerate(self._path):
+            try:
+                (pathname, context) = entry.build_path(basename)
+                stat_data = backend.stat(pathname)
+                # On success, move this entry to the front of the search path
+                # to speed future searches.
+                if i > 0:
+                    self._path.pop(i)
+                    self._path.insert(0, entry)
+                result = {"path": pathname}
+                result.update(stat_data)
+                return result
+            except cumulus.store.NotFoundError:
+                continue
+        raise cumulus.store.NotFoundError(basename)
+
+    def list(self, backend):
+        success = False
+        for d in self.directories():
+            try:
+                for f in backend.list(d):
+                    success = True
+                    m = self._regex.match(f)
+                    if m: yield (os.path.join(d, f), m)
+            except cumulus.store.NotFoundError:
+                pass
+        if not success:
+            raise cumulus.store.NotFoundError(basename)
+
+def _build_segments_searchpath(prefix):
+    for (extension, filter) in SEGMENT_FILTERS:
+        yield SearchPathEntry(prefix, extension, filter)
+
+SEARCH_PATHS = {
+    "checksums": SearchPath(
+        r"^snapshot-(.*)\.(\w+)sums$",
+        [SearchPathEntry("meta", ".sha1sums"),
+         SearchPathEntry("checksums", ".sha1sums"),
+         SearchPathEntry("", ".sha1sums")]),
+    "segments": SearchPath(
+        (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
+         r"(\.\S+)?$"),
+        itertools.chain(
+            _build_segments_searchpath("segments0"),
+            _build_segments_searchpath("segments1"),
+            _build_segments_searchpath(""),
+            _build_segments_searchpath("segments"))),
+    "snapshots": SearchPath(
+        r"^snapshot-(.*)\.(cumulus|lbs)$",
+        [SearchPathEntry("snapshots", ".cumulus"),
+         SearchPathEntry("snapshots", ".lbs"),
+         SearchPathEntry("", ".cumulus"),
+         SearchPathEntry("", ".lbs")]),
+}
+
+class BackendWrapper(object):
+    """Wrapper around a Cumulus storage backend that understands file types.
+
+    The BackendWrapper class understands different Cumulus file types, such as
+    snapshots and segments, and implements higher-level operations such as
+    "retrieve a snapshot with a specific name" (hiding operations such as
+    searching for the correct file name).
+    """
 
-    def lowlevel_stat(self, filename):
-        """Return a dictionary of information about the given file.
+    def __init__(self, backend):
+        """Initializes a wrapper around the specified storage backend.
 
-        Currently, the only defined field is 'size', giving the size of the
-        file in bytes.
+        store may either be a Store object or URL.
         """
+        if type(backend) in (str, unicode):
+            if backend.find(":") >= 0:
+                self._backend = cumulus.store.open(backend)
+            else:
+                self._backend = cumulus.store.file.FileStore(backend)
+        else:
+            self._backend = backend
 
-        (type, filename) = self._classify(filename)
-        return self.store.stat(type + "/" + filename)
+    @property
+    def raw_backend(self):
+        return self._backend
 
-    # Slightly higher-level list methods.
-    def list_snapshots(self):
-        for f in self.store.list('snapshots'):
-            m = cumulus.store.type_patterns['snapshots'].match(f)
-            if m: yield m.group(1)
+    def stat_generic(self, basename, filetype):
+        return SEARCH_PATHS[filetype].stat(self._backend, basename)
 
-    def list_segments(self):
-        for f in self.store.list('segments'):
-            m = cumulus.store.type_patterns['segments'].match(f)
-            if m: yield m.group(1)
+    def open_generic(self, basename, filetype):
+        return SEARCH_PATHS[filetype].get(self._backend, basename)
+
+    def open_snapshot(self, name):
+        return self.open_generic("snapshot-" + name, "snapshots")
 
-class ObjectStore:
-    def __init__(self, data_store):
-        self.store = data_store
+    def open_segment(self, name):
+        return self.open_generic(name + ".tar", "segments")
+
+    def list_generic(self, filetype):
+        return ((x[1].group(1), x[0])
+                for x in SEARCH_PATHS[filetype].list(self._backend))
+
+class CumulusStore:
+    def __init__(self, backend):
+        if isinstance(backend, BackendWrapper):
+            self.backend = backend
+        else:
+            self.backend = BackendWrapper(backend)
         self.cachedir = None
         self.CACHE_SIZE = 16
-        self.lru_list = []
+        self._lru_list = []
 
     def get_cachedir(self):
         if self.cachedir is None:
-            self.cachedir = tempfile.mkdtemp(".lbs")
+            self.cachedir = tempfile.mkdtemp("-cumulus")
         return self.cachedir
 
     def cleanup(self):
@@ -216,28 +335,33 @@ class ObjectStore:
 
         return (segment, object, checksum, slice)
 
+    def list_snapshots(self):
+        return set(x[0] for x in self.backend.list_generic("snapshots"))
+
+    def list_segments(self):
+        return set(x[0] for x in self.backend.list_generic("segments"))
+
+    def load_snapshot(self, snapshot):
+        snapshot_file = self.backend.open_snapshot(snapshot)[0]
+        return snapshot_file.read().splitlines(True)
+
     def get_segment(self, segment):
         accessed_segments.add(segment)
 
-        for (extension, filter) in SEGMENT_FILTERS:
-            try:
-                raw = self.store.lowlevel_open(segment + ".tar" + extension)
-
-                (input, output) = os.popen2(filter)
-                def copy_thread(src, dst):
-                    BLOCK_SIZE = 4096
-                    while True:
-                        block = src.read(BLOCK_SIZE)
-                        if len(block) == 0: break
-                        dst.write(block)
-                    dst.close()
-
-                thread.start_new_thread(copy_thread, (raw, input))
-                return output
-            except:
-                pass
-
-        raise cumulus.store.NotFoundError
+        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+        if filter_cmd is None:
+            return segment_fp
+        (input, output) = os.popen2(filter_cmd)
+        def copy_thread(src, dst):
+            BLOCK_SIZE = 4096
+            while True:
+                block = src.read(BLOCK_SIZE)
+                if len(block) == 0: break
+                dst.write(block)
+            src.close()
+            dst.close()
+        thread.start_new_thread(copy_thread, (segment_fp, input))
+        return output
 
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
@@ -247,10 +371,6 @@ class ObjectStore:
             if len(path) == 2 and path[0] == segment:
                 yield (path[1], data_obj.read())
 
-    def load_snapshot(self, snapshot):
-        file = self.store.lowlevel_open("snapshot-" + snapshot + ".cumulus")
-        return file.read().splitlines(True)
-
     def extract_segment(self, segment):
         segdir = os.path.join(self.get_cachedir(), segment)
         os.mkdir(segdir)
@@ -264,11 +384,12 @@ class ObjectStore:
         path = os.path.join(self.get_cachedir(), segment, object)
         if not os.access(path, os.R_OK):
             self.extract_segment(segment)
-        if segment in self.lru_list: self.lru_list.remove(segment)
-        self.lru_list.append(segment)
-        while len(self.lru_list) > self.CACHE_SIZE:
-            os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
-            self.lru_list = self.lru_list[1:]
+        if segment in self._lru_list: self._lru_list.remove(segment)
+        self._lru_list.append(segment)
+        while len(self._lru_list) > self.CACHE_SIZE:
+            os.system("rm -rf " + os.path.join(self.cachedir,
+                                               self._lru_list[0]))
+            self._lru_list = self._lru_list[1:]
         return open(path, 'rb').read()
 
     def get(self, refstr):