Update Python code to work with the new backup format.
authorMichael Vrable <vrable@cs.hmc.edu>
Sat, 2 Feb 2013 16:40:36 +0000 (08:40 -0800)
committerMichael Vrable <vrable@cs.hmc.edu>
Wed, 22 Jan 2014 05:11:09 +0000 (21:11 -0800)
cumulus-sync
python/cumulus/__init__.py
python/cumulus/cmd_util.py
python/cumulus/store/__init__.py
python/cumulus/store/file.py

index b76ebb5..8a474a5 100755 (executable)
@@ -30,27 +30,40 @@ sys.path.append(os.path.join(script_directory, 'python'))
 import cumulus
 import cumulus.store
 
-store1 = cumulus.store.open(sys.argv[1])
-store2 = cumulus.store.open(sys.argv[2])
+store1 = cumulus.BackendWrapper(sys.argv[1])
+store2 = cumulus.BackendWrapper(sys.argv[2])
 
-source = cumulus.ObjectStore(cumulus.LowlevelDataStore(store1))
+source = cumulus.CumulusStore(store1)
 
-filter = set()
-for s in sys.argv[3:]:
-    filter.add(s)
+items_required = set()
+snapshots = sys.argv[3:]
+if not snapshots:
+    snapshots = list(source.list_snapshots())
+for s in snapshots:
+    items_required.add(s)
     d = cumulus.parse_full(source.load_snapshot(s))
-    filter.update(d['Segments'].split())
-
-for ty in ('segments', 'checksums', 'snapshots'):
-    for f in sorted(store1.list(ty)):
-        m = cumulus.store.type_patterns[ty].match(f)
-        if not m: continue
-        if filter and m.group(1) not in filter:
-            continue
-
-        print ty, f
-        try:
-            store2.stat(ty, f)
-        except cumulus.store.NotFoundError:
-            store2.put(ty, f, store1.get(ty, f))
-            print "    [sent]"
+    items_required.update(d['Segments'].split())
+print "Required:", items_required
+
+files_present = set()
+for filetype in cumulus.SEARCH_PATHS:
+    for (name, path) in store2.list_generic(filetype):
+        items_required.discard(name)
+        files_present.add(path)
+print "Files already present:", sorted(files_present)
+
+files_required = []
+items_found = set()
+for filetype in cumulus.SEARCH_PATHS:
+    for (name, path) in store1.list_generic(filetype):
+        if name in items_required:
+            files_required.append(path)
+            items_found.add(name)
+files_required.sort()
+
+print "Missing:", items_required.difference(items_found)
+print "Required files:", files_required
+
+for f in files_required:
+    print f
+    store2.raw_backend.put(f, store1.raw_backend.get(f))
index d8b6814..10f0544 100644 (file)
@@ -27,10 +27,17 @@ various parts of a Cumulus archive:
 """
 
 from __future__ import division
-import hashlib, os, re, tarfile, tempfile, thread
+import hashlib
+import itertools
+import os
+import re
+import tarfile
+import tempfile
+import thread
 from pysqlite2 import dbapi2 as sqlite3
 
-import cumulus.store, cumulus.store.file
+import cumulus.store
+import cumulus.store.file
 
 # The largest supported snapshot format that can be understood.
 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
@@ -115,70 +122,182 @@ class ChecksumVerifier:
         result = self.hash.hexdigest()
         return result == self.checksum
 
-class LowlevelDataStore:
-    """Access to the backup store containing segments and snapshot descriptors.
+class SearchPathEntry(object):
+    """Item representing a possible search location for Cumulus files.
 
-    Instances of this class are used to get direct filesystem-level access to
-    the backup data.  To read a backup, a caller will ordinarily not care about
-    direct access to backup segments, but will instead merely need to access
-    objects from those segments.  The ObjectStore class provides a suitable
-    wrapper around a DataStore to give this high-level access.
+    Some Cumulus files might be stored in multiple possible file locations: due
+    to format (different compression mechanisms with different extensions),
+    locality (different segments might be placed in different directories to
+    control archiving policies), for backwards compatibility (default location
+    changed over time).  A SearchPathEntry describes a possible location for a
+    file.
     """
+    def __init__(self, directory_prefix, suffix, context=None):
+        self._directory_prefix = directory_prefix
+        self._suffix = suffix
+        self._context = context
 
-    def __init__(self, path):
-        if isinstance(path, cumulus.store.Store):
-            self.store = path
-        elif path.find(":") >= 0:
-            self.store = cumulus.store.open(path)
-        else:
-            self.store = cumulus.store.file.FileStore(path)
+    def __repr__(self):
+        return "%s(%r, %r, %r)" % (self.__class__.__name__,
+                                   self._directory_prefix, self._suffix,
+                                   self._context)
+
+    def build_path(self, basename):
+        """Construct the search path to use for a file with name basename.
+
+        Returns a tuple (pathname, context), where pathname is the path to try
+        and context is any additional data associated with this search entry
+        (if any).
+        """
+        return (os.path.join(self._directory_prefix, basename + self._suffix),
+                self._context)
 
-    def _classify(self, filename):
-        for (t, r) in cumulus.store.type_patterns.items():
-            if r.match(filename):
-                return (t, filename)
-        return (None, filename)
+class SearchPath(object):
+    """A collection of locations to search for files and lookup utilities.
 
-    def scan(self):
-        self.store.scan()
+    For looking for a file in a Cumulus storage backend, a SearchPath object
+    contains a list of possible locations to try.  A SearchPath can be used to
+    perform the search as well; when a file is found the search path ordering
+    is updated (moving the successful SearchPathEntry to the front of the list
+    for future searches).
+    """
+    def __init__(self, name_regex, searchpath):
+        self._regex = re.compile(name_regex)
+        self._path = list(searchpath)
+
+    def add_search_entry(self, entry):
+        self._path.append(entry)
+
+    def directories(self):
+        """Return the set of directories to search for a file type."""
+        return set(entry._directory_prefix for entry in self._path)
 
-    def lowlevel_open(self, filename):
-        """Return a file-like object for reading data from the given file."""
+    def get(self, backend, basename):
+        for (i, entry) in enumerate(self._path):
+            try:
+                (pathname, context) = entry.build_path(basename)
+                fp = backend.get(pathname)
+                # On success, move this entry to the front of the search path
+                # to speed future searches.
+                if i > 0:
+                    self._path.pop(i)
+                    self._path.insert(0, entry)
+                return (fp, pathname, context)
+            except cumulus.store.NotFoundError:
+                continue
+        raise cumulus.store.NotFoundError(basename)
 
-        (type, filename) = self._classify(filename)
-        return self.store.get(type + "/" + filename)
+    def stat(self, backend, basename):
+        for (i, entry) in enumerate(self._path):
+            try:
+                (pathname, context) = entry.build_path(basename)
+                stat_data = backend.stat(pathname)
+                # On success, move this entry to the front of the search path
+                # to speed future searches.
+                if i > 0:
+                    self._path.pop(i)
+                    self._path.insert(0, entry)
+                result = {"path": pathname}
+                result.update(stat_data)
+                return result
+            except cumulus.store.NotFoundError:
+                continue
+        raise cumulus.store.NotFoundError(basename)
+
+    def list(self, backend):
+        success = False
+        for d in self.directories():
+            try:
+                for f in backend.list(d):
+                    success = True
+                    m = self._regex.match(f)
+                    if m: yield (os.path.join(d, f), m)
+            except cumulus.store.NotFoundError:
+                pass
+        if not success:
+            raise cumulus.store.NotFoundError(basename)
+
+def _build_segments_searchpath(prefix):
+    for (extension, filter) in SEGMENT_FILTERS:
+        yield SearchPathEntry(prefix, extension, filter)
+
+SEARCH_PATHS = {
+    "checksums": SearchPath(
+        r"^snapshot-(.*)\.(\w+)sums$",
+        [SearchPathEntry("meta", ".sha1sums"),
+         SearchPathEntry("checksums", ".sha1sums"),
+         SearchPathEntry("", ".sha1sums")]),
+    "segments": SearchPath(
+        (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
+         r"(\.\S+)?$"),
+        itertools.chain(
+            _build_segments_searchpath("segments0"),
+            _build_segments_searchpath("segments1"),
+            _build_segments_searchpath(""),
+            _build_segments_searchpath("segments"))),
+    "snapshots": SearchPath(
+        r"^snapshot-(.*)\.(cumulus|lbs)$",
+        [SearchPathEntry("snapshots", ".cumulus"),
+         SearchPathEntry("snapshots", ".lbs"),
+         SearchPathEntry("", ".cumulus"),
+         SearchPathEntry("", ".lbs")]),
+}
+
+class BackendWrapper(object):
+    """Wrapper around a Cumulus storage backend that understands file types.
+
+    The BackendWrapper class understands different Cumulus file types, such as
+    snapshots and segments, and implements higher-level operations such as
+    "retrieve a snapshot with a specific name" (hiding operations such as
+    searching for the correct file name).
+    """
 
-    def lowlevel_stat(self, filename):
-        """Return a dictionary of information about the given file.
+    def __init__(self, backend):
+        """Initializes a wrapper around the specified storage backend.
 
-        Currently, the only defined field is 'size', giving the size of the
-        file in bytes.
+        store may either be a Store object or URL.
         """
+        if type(backend) in (str, unicode):
+            if backend.find(":") >= 0:
+                self._backend = cumulus.store.open(backend)
+            else:
+                self._backend = cumulus.store.file.FileStore(backend)
+        else:
+            self._backend = backend
 
-        (type, filename) = self._classify(filename)
-        return self.store.stat(type + "/" + filename)
+    @property
+    def raw_backend(self):
+        return self._backend
 
-    # Slightly higher-level list methods.
-    def list_snapshots(self):
-        for f in self.store.list('snapshots'):
-            m = cumulus.store.type_patterns['snapshots'].match(f)
-            if m: yield m.group(1)
+    def stat_generic(self, basename, filetype):
+        return SEARCH_PATHS[filetype].stat(self._backend, basename)
 
-    def list_segments(self):
-        for f in self.store.list('segments'):
-            m = cumulus.store.type_patterns['segments'].match(f)
-            if m: yield m.group(1)
+    def open_generic(self, basename, filetype):
+        return SEARCH_PATHS[filetype].get(self._backend, basename)
+
+    def open_snapshot(self, name):
+        return self.open_generic("snapshot-" + name, "snapshots")
 
-class ObjectStore:
-    def __init__(self, data_store):
-        self.store = data_store
+    def open_segment(self, name):
+        return self.open_generic(name + ".tar", "segments")
+
+    def list_generic(self, filetype):
+        return ((x[1].group(1), x[0])
+                for x in SEARCH_PATHS[filetype].list(self._backend))
+
+class CumulusStore:
+    def __init__(self, backend):
+        if isinstance(backend, BackendWrapper):
+            self.backend = backend
+        else:
+            self.backend = BackendWrapper(backend)
         self.cachedir = None
         self.CACHE_SIZE = 16
-        self.lru_list = []
+        self._lru_list = []
 
     def get_cachedir(self):
         if self.cachedir is None:
-            self.cachedir = tempfile.mkdtemp(".lbs")
+            self.cachedir = tempfile.mkdtemp("-cumulus")
         return self.cachedir
 
     def cleanup(self):
@@ -216,28 +335,33 @@ class ObjectStore:
 
         return (segment, object, checksum, slice)
 
+    def list_snapshots(self):
+        return set(x[0] for x in self.backend.list_generic("snapshots"))
+
+    def list_segments(self):
+        return set(x[0] for x in self.backend.list_generic("segments"))
+
+    def load_snapshot(self, snapshot):
+        snapshot_file = self.backend.open_snapshot(snapshot)[0]
+        return snapshot_file.read().splitlines(True)
+
     def get_segment(self, segment):
         accessed_segments.add(segment)
 
-        for (extension, filter) in SEGMENT_FILTERS:
-            try:
-                raw = self.store.lowlevel_open(segment + ".tar" + extension)
-
-                (input, output) = os.popen2(filter)
-                def copy_thread(src, dst):
-                    BLOCK_SIZE = 4096
-                    while True:
-                        block = src.read(BLOCK_SIZE)
-                        if len(block) == 0: break
-                        dst.write(block)
-                    dst.close()
-
-                thread.start_new_thread(copy_thread, (raw, input))
-                return output
-            except:
-                pass
-
-        raise cumulus.store.NotFoundError
+        (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+        if filter_cmd is None:
+            return segment_fp
+        (input, output) = os.popen2(filter_cmd)
+        def copy_thread(src, dst):
+            BLOCK_SIZE = 4096
+            while True:
+                block = src.read(BLOCK_SIZE)
+                if len(block) == 0: break
+                dst.write(block)
+            src.close()
+            dst.close()
+        thread.start_new_thread(copy_thread, (segment_fp, input))
+        return output
 
     def load_segment(self, segment):
         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
@@ -247,10 +371,6 @@ class ObjectStore:
             if len(path) == 2 and path[0] == segment:
                 yield (path[1], data_obj.read())
 
-    def load_snapshot(self, snapshot):
-        file = self.store.lowlevel_open("snapshot-" + snapshot + ".cumulus")
-        return file.read().splitlines(True)
-
     def extract_segment(self, segment):
         segdir = os.path.join(self.get_cachedir(), segment)
         os.mkdir(segdir)
@@ -264,11 +384,12 @@ class ObjectStore:
         path = os.path.join(self.get_cachedir(), segment, object)
         if not os.access(path, os.R_OK):
             self.extract_segment(segment)
-        if segment in self.lru_list: self.lru_list.remove(segment)
-        self.lru_list.append(segment)
-        while len(self.lru_list) > self.CACHE_SIZE:
-            os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
-            self.lru_list = self.lru_list[1:]
+        if segment in self._lru_list: self._lru_list.remove(segment)
+        self._lru_list.append(segment)
+        while len(self._lru_list) > self.CACHE_SIZE:
+            os.system("rm -rf " + os.path.join(self.cachedir,
+                                               self._lru_list[0]))
+            self._lru_list = self._lru_list[1:]
         return open(path, 'rb').read()
 
     def get(self, refstr):
index e0b094d..9d97190 100644 (file)
@@ -76,97 +76,75 @@ def cmd_list_snapshots(args):
     """ List snapshots stored.
         Syntax: $0 --data=DATADIR list-snapshots
     """
-    store = cumulus.LowlevelDataStore(options.store)
-    for s in sorted(store.list_snapshots()):
-        print s
+    store = cumulus.CumulusStore(options.store)
+    for s in sorted(store.list_snapshots()): print s
 
 def cmd_list_snapshot_sizes(args):
     """ List size of data needed for each snapshot.
         Syntax: $0 --data=DATADIR list-snapshot-sizes
     """
-    lowlevel = cumulus.LowlevelDataStore(options.store)
-    lowlevel.scan()
-    store = cumulus.ObjectStore(lowlevel)
+    store = cumulus.CumulusStore(options.store)
+    backend = store.backend
     previous = set()
-    exts = {}
-    for seg in lowlevel.store.list('segments'):
-        exts.update ([seg.split ('.', 1)])
-    for s in sorted(lowlevel.list_snapshots()):
+    size = 0
+    def get_size(segment):
+        return backend.stat_generic(segment + ".tar", "segments")["size"]
+    for s in sorted(store.list_snapshots()):
         d = cumulus.parse_full(store.load_snapshot(s))
         check_version(d['Format'])
 
-        try:
-            intent = float(d['Backup-Intent'])
-        except:
-            intent = 1.0
-
-        segments = d['Segments'].split()
-        (size, added, removed, addcount, remcount) = (0, 0, 0, 0, 0)
-        lo_stat = lowlevel.lowlevel_stat
-        for seg in segments:
-            segsize = lo_stat('.'.join ((seg, exts[seg])))['size']
-            size += segsize
-            if seg not in previous:
-                added += segsize
-                addcount += 1
-        for seg in previous:
-            if seg not in segments:
-                removed += lo_stat('.'.join((seg, exts[seg])))['size']
-                remcount += 1
-        previous = set(segments)
-        print "%s [%s]: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, intent, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount)
+        segments = set(d['Segments'].split())
+        (added, removed, addcount, remcount) = (0, 0, 0, 0)
+        for seg in segments.difference(previous):
+            added += get_size(seg)
+            addcount += 1
+        for seg in previous.difference(segments):
+            removed += get_size(seg)
+            remcount += 1
+        size += added - removed
+        previous = segments
+        print "%s: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount)
 
 def cmd_garbage_collect(args):
     """ Search for any files which are not needed by any current
         snapshots and offer to delete them.
         Syntax: $0 --store=DATADIR gc
     """
-    lowlevel = cumulus.LowlevelDataStore(options.store)
-    lowlevel.scan()
-    store = cumulus.ObjectStore(lowlevel)
-    snapshots = set(lowlevel.list_snapshots())
-    segments = set()
-    for s in snapshots:
+    store = cumulus.CumulusStore(options.store)
+    backend = store.backend
+    referenced = set()
+    for s in store.list_snapshots():
         d = cumulus.parse_full(store.load_snapshot(s))
         check_version(d['Format'])
-        segments.update(d['Segments'].split())
-
-    referenced = snapshots.union(segments)
-    reclaimed = 0
-    for (t, r) in cumulus.store.type_patterns.items():
-        for f in lowlevel.store.list(t):
-            m = r.match(f)
-            if m is None or m.group(1) not in referenced:
-                print "Garbage:", (t, f)
-                reclaimed += lowlevel.store.stat(t, f)['size']
-                if not options.dry_run:
-                    lowlevel.store.delete(t, f)
-    print "Reclaimed space:", reclaimed
+        referenced.add(s)
+        referenced.update(d['Segments'].split())
 
-cmd_gc = cmd_garbage_collect
+    print referenced
 
-def cmd_object_checksums(segments):
-    """ Build checksum list for objects in the given segments, or all
-        segments if none are specified.
-    """
-    get_passphrase()
-    lowlevel = cumulus.LowlevelDataStore(options.store)
-    store = cumulus.ObjectStore(lowlevel)
-    if len(segments) == 0:
-        segments = sorted(lowlevel.list_segments())
-    for s in segments:
-        for (o, data) in store.load_segment(s):
-            csum = cumulus.ChecksumCreator().update(data).compute()
-            print "%s/%s:%d:%s" % (s, o, len(data), csum)
-    store.cleanup()
-object_sums = cmd_object_checksums
+    to_delete = []
+    to_preserve = []
+    for filetype in cumulus.SEARCH_PATHS:
+        for (name, path) in store.backend.list_generic(filetype):
+            if name in referenced:
+                to_preserve.append(path)
+            else:
+                to_delete.append(path)
+
+    print to_preserve
+    print to_delete
+
+    raw_backend = backend.raw_backend
+    for f in to_delete:
+        print "Delete:", f
+        if not options.dry_run:
+            raw_backend.delete(f)
+cmd_gc = cmd_garbage_collect
 
 def cmd_read_snapshots(snapshots):
     """ Read a snapshot file
     """
     get_passphrase()
-    lowlevel = cumulus.LowlevelDataStore(options.store)
-    store = cumulus.ObjectStore(lowlevel)
+    store = cumulus.CumulusStore(options.store)
     for s in snapshots:
         d = cumulus.parse_full(store.load_snapshot(s))
         check_version(d['Format'])
@@ -179,8 +157,7 @@ def cmd_read_metadata(args):
     """
     snapshot = args [0]
     get_passphrase()
-    lowlevel = cumulus.LowlevelDataStore(options.store)
-    store = cumulus.ObjectStore(lowlevel)
+    store = cumulus.CumulusStore(options.store)
     d = cumulus.parse_full(store.load_snapshot(snapshot))
     check_version(d['Format'])
     metadata = cumulus.read_metadata(store, d['Root'])
@@ -198,8 +175,7 @@ def cmd_verify_snapshots(snapshots):
     """ Verify snapshot integrity
     """
     get_passphrase()
-    lowlevel = cumulus.LowlevelDataStore(options.store)
-    store = cumulus.ObjectStore(lowlevel)
+    store = cumulus.CumulusStore(options.store)
     for s in snapshots:
         cumulus.accessed_segments.clear()
         print "#### Snapshot", s
@@ -237,8 +213,7 @@ def cmd_restore_snapshot(args):
     """ Restore a snapshot, or some subset of files from it
     """
     get_passphrase()
-    lowlevel = cumulus.LowlevelDataStore(options.store)
-    store = cumulus.ObjectStore(lowlevel)
+    store = cumulus.CumulusStore(options.store)
     snapshot = cumulus.parse_full(store.load_snapshot(args[0]))
     check_version(snapshot['Format'])
     destdir = args[1]
@@ -279,7 +254,7 @@ def cmd_restore_snapshot(args):
             metadata_paths[pathname] = m
             for block in m.data():
                 (segment, object, checksum, slice) \
-                    = cumulus.ObjectStore.parse_ref(block)
+                    = cumulus.CumulusStore.parse_ref(block)
                 if segment not in metadata_segments:
                     metadata_segments[segment] = set()
                 metadata_segments[segment].add(pathname)
index 3b54cbb..0899d70 100644 (file)
@@ -55,19 +55,19 @@ class Store (object):
         except ImportError:
             raise NotImplementedError, "Scheme %s not implemented" % scheme
 
-    def list(self, type):
+    def list(self, path):
         raise NotImplementedError
 
-    def get(self, type, name):
+    def get(self, path):
         raise NotImplementedError
 
-    def put(self, type, name, fp):
+    def put(self, path, fp):
         raise NotImplementedError
 
-    def delete(self, type, name):
+    def delete(self, path):
         raise NotImplementedError
 
-    def stat(self, type, name):
+    def stat(self, path):
         raise NotImplementedError
 
     def scan(self):
index 3d536bf..8304401 100644 (file)
@@ -33,16 +33,19 @@ class FileStore(cumulus.store.Store):
         return os.path.join(self.prefix, type, name)
 
     def list(self, subdir):
-        return os.listdir(os.path.join(self.prefix, subdir))
+        try:
+            return os.listdir(os.path.join(self.prefix, subdir))
+        except OSError:
+            raise cumulus.store.NotFoundError(subdir)
 
     def get(self, path):
-        return open(os.path.join(self.prefix, path), 'rb')
+        try:
+            return open(os.path.join(self.prefix, path), 'rb')
+        except IOError:
+            raise cumulus.store.NotFoundError(path)
 
     def put(self, path, fp):
-        # TODO: Implement
-        raise NotImplementedError
-        k = self._get_path(type, name)
-        out = open(k, 'wb')
+        out = open(os.path.join(self.prefix, path), 'wb')
         buf = fp.read(4096)
         while len(buf) > 0:
             out.write(buf)