Expanded Python module for accessing LBS snapshots, and lbs-util.py.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Tue, 11 Sep 2007 17:38:13 +0000 (10:38 -0700)
committerMichael Vrable <mvrable@turin.ucsd.edu>
Tue, 11 Sep 2007 17:38:13 +0000 (10:38 -0700)
Expand the lbs Python module with code for reading from LBS archives.  This
includes decoding segments and parsing snapshot descriptors and metadata
logs.

Expand the lbs-util.py tool so that it has most of the functionality of
lbs-util (Perl implementation).  The Perl code should now be considered
deprecated, and will be removed at some point in the future.

The Python code still needs some cleaning up.

NEWS
lbs-util.py
lbs.py

diff --git a/NEWS b/NEWS
index 1291270..134bd9e 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -1,3 +1,12 @@
+0.5?
+    - Much improved Python interface for accessing and manipulating LBS
+      archives and local database information.  The interface should not
+      yet be considered stable.
+    - Python implementation of lbs-util now includes most of the
+      features of the Perl implementation, plus some other new features.
+      The Perl library and utility are deprecated, and will be removed
+      in the future.
+
 0.4 [2007-08-24]
     - Documentation improvements: a getting started README, and a
       description of some of the implementation details.
index d30fbec..33c9c75 100755 (executable)
@@ -2,17 +2,26 @@
 #
 # Utility for managing LBS archives.
 
-import sys
+import getpass, os, sys
 from optparse import OptionParser
 import lbs
 
 parser = OptionParser(usage="%prog [option]... command [arg]...")
 parser.add_option("-v", action="store_true", dest="verbose", default=False,
                   help="increase verbosity")
+parser.add_option("--store", dest="store",
+                  help="specify path to backup data store")
 parser.add_option("--localdb", dest="localdb",
                   help="specify path to local database")
 (options, args) = parser.parse_args(sys.argv[1:])
 
+# Read a passphrase from the user and store it in the LBS_GPG_PASSPHRASE
+# environment variable.
+def get_passphrase():
+    ENV_KEY = 'LBS_GPG_PASSPHRASE'
+    if not os.environ.has_key(ENV_KEY):
+        os.environ[ENV_KEY] = getpass.getpass()
+
 # Run the segment cleaner.
 # Syntax: $0 --localdb=LOCALDB clean
 def cmd_clean(clean_threshold=7.0):
@@ -32,6 +41,79 @@ def cmd_clean(clean_threshold=7.0):
     db.balance_expired_objects()
     db.commit()
 
+# List snapshots stored.
+# Syntax: $0 --data=DATADIR list-snapshots
+def cmd_list_snapshots():
+    store = lbs.LowlevelDataStore(options.store)
+    for s in sorted(store.list_snapshots()):
+        print s
+
+# List size of data needed for each snapshot.
+# Syntax: $0 --data=DATADIR list-snapshot-sizes
+def cmd_list_snapshot_sizes():
+    lowlevel = lbs.LowlevelDataStore(options.store)
+    store = lbs.ObjectStore(lowlevel)
+    previous = set()
+    for s in sorted(lowlevel.list_snapshots()):
+        d = lbs.parse_full(store.load_snapshot(s))
+        segments = d['Segments'].split()
+        (size, added, removed) = (0, 0, 0)
+        for seg in segments:
+            segsize = lowlevel.lowlevel_stat(seg + ".tar.gpg")['size']
+            size += segsize
+            if seg not in previous: added += segsize
+        for seg in previous:
+            if seg not in segments:
+                removed += lowlevel.lowlevel_stat(seg + ".tar.gpg")['size']
+        previous = set(segments)
+        print "%s: %.3f +%.3f -%.3f" % (s, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2)
+
+# Build checksum list for objects in the given segments
+def cmd_object_checksums(segments):
+    lowlevel = lbs.LowlevelDataStore(options.store)
+    store = lbs.ObjectStore(lowlevel)
+    for s in segments:
+        for (o, data) in store.load_segment(s):
+            csum = lbs.ChecksumCreator().update(data).compute()
+            print "%s/%s:%d:%s" % (s, o, len(data), csum)
+    store.cleanup()
+
+# Read a snapshot file
+def cmd_read_snapshots(snapshots):
+    get_passphrase()
+    lowlevel = lbs.LowlevelDataStore(options.store)
+    store = lbs.ObjectStore(lowlevel)
+    for s in snapshots:
+        d = lbs.parse_full(store.load_snapshot(s))
+        print d
+        print d['Segments'].split()
+    store.cleanup()
+
+# Verify snapshot integrity
+def cmd_verify_snapshots(snapshots):
+    get_passphrase()
+    lowlevel = lbs.LowlevelDataStore(options.store)
+    store = lbs.ObjectStore(lowlevel)
+    for s in snapshots:
+        print "#### Snapshot", s
+        d = lbs.parse_full(store.load_snapshot(s))
+        print "## Root:", d['Root']
+        metadata = lbs.iterate_metadata(store, d['Root'])
+        for m in metadata:
+            if m.fields['type'] != '-': continue
+            print "%s [%d bytes]" % (m.fields['name'], int(m.fields['size']))
+            verifier = lbs.ChecksumVerifier(m.fields['checksum'])
+            size = 0
+            for block in m.data():
+                data = store.get(block)
+                verifier.update(data)
+                size += len(data)
+            if int(m.fields['size']) != size:
+                raise ValueError("File size does not match!")
+            if not verifier.valid():
+                raise ValueError("Bad checksum found")
+    store.cleanup()
+
 if len(args) == 0:
     parser.print_usage()
     sys.exit(1)
@@ -39,6 +121,16 @@ cmd = args[0]
 args = args[1:]
 if cmd == 'clean':
     cmd_clean()
+elif cmd == 'list-snapshots':
+    cmd_list_snapshots()
+elif cmd == 'object-sums':
+    cmd_object_checksums(args)
+elif cmd == 'read-snapshots':
+    cmd_read_snapshots(args)
+elif cmd == 'list-snapshot-sizes':
+    cmd_list_snapshot_sizes()
+elif cmd == 'verify-snapshots':
+    cmd_verify_snapshots(args)
 else:
     print "Unknown command:", cmd
     parser.print_usage()
diff --git a/lbs.py b/lbs.py
index ab2c261..e3ca9ab 100644 (file)
--- a/lbs.py
+++ b/lbs.py
@@ -9,8 +9,12 @@ various parts of an LBS archive:
 """
 
 from __future__ import division
+import os, re, sha, tarfile, tempfile, thread
 from pysqlite2 import dbapi2 as sqlite3
 
+# Maximum number of nested indirect references allowed in a snapshot.
+MAX_RECURSION_DEPTH = 3
+
 class Struct:
     """A class which merely acts as a data container.
 
@@ -21,6 +25,319 @@ class Struct:
     def __repr__(self):
         return "<%s %s>" % (self.__class__, self.__dict__)
 
+CHECKSUM_ALGORITHMS = {
+    'sha1': sha.new
+}
+
+class ChecksumCreator:
+    """Compute an LBS checksum for provided data.
+
+    The algorithm used is selectable, but currently defaults to sha1.
+    """
+
+    def __init__(self, algorithm='sha1'):
+        self.algorithm = algorithm
+        self.hash = CHECKSUM_ALGORITHMS[algorithm]()
+
+    def update(self, data):
+        self.hash.update(data)
+        return self
+
+    def compute(self):
+        return "%s=%s" % (self.algorithm, self.hash.hexdigest())
+
+class ChecksumVerifier:
+    """Verify whether a checksum from a snapshot matches the supplied data."""
+
+    def __init__(self, checksumstr):
+        """Create an object to check the supplied checksum."""
+
+        (algo, checksum) = checksumstr.split("=", 1)
+        self.checksum = checksum
+        self.hash = CHECKSUM_ALGORITHMS[algo]()
+
+    def update(self, data):
+        self.hash.update(data)
+
+    def valid(self):
+        """Return a boolean indicating whether the checksum matches."""
+
+        result = self.hash.hexdigest()
+        return result == self.checksum
+
+class LowlevelDataStore:
+    """Access to the backup store containing segments and snapshot descriptors.
+
+    Instances of this class are used to get direct filesystem-level access to
+    the backup data.  To read a backup, a caller will ordinarily not care about
+    direct access to backup segments, but will instead merely need to access
+    objects from those segments.  The ObjectStore class provides a suitable
+    wrapper around a DataStore to give this high-level access.
+    """
+
+    def __init__(self, path):
+        self.path = path
+
+    # Low-level filesystem access.  These methods could be overwritten to
+    # provide access to remote data stores.
+    def lowlevel_list(self):
+        """Get a listing of files stored."""
+
+        return os.listdir(self.path)
+
+    def lowlevel_open(self, filename):
+        """Return a file-like object for reading data from the given file."""
+
+        return open(os.path.join(self.path, filename), 'rb')
+
+    def lowlevel_stat(self, filename):
+        """Return a dictionary of information about the given file.
+
+        Currently, the only defined field is 'size', giving the size of the
+        file in bytes.
+        """
+
+        stat = os.stat(os.path.join(self.path, filename))
+        return {'size': stat.st_size}
+
+    # Slightly higher-level list methods.
+    def list_snapshots(self):
+        for f in self.lowlevel_list():
+            m = re.match(r"^snapshot-(.*)\.lbs$", f)
+            if m:
+                yield m.group(1)
+
+    def list_segments(self):
+        for f in self.lowlevel_list():
+            m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
+            if m:
+                yield m.group(1)
+
+class ObjectStore:
+    def __init__(self, data_store):
+        self.store = data_store
+        self.cachedir = None
+        self.CACHE_SIZE = 16
+        self.lru_list = []
+
+    def get_cachedir(self):
+        if self.cachedir is None:
+            self.cachedir = tempfile.mkdtemp(".lbs")
+        return self.cachedir
+
+    def cleanup(self):
+        if self.cachedir is not None:
+            # TODO: Avoid use of system, make this safer
+            os.system("rm -rv " + self.cachedir)
+        self.cachedir = None
+
+    @staticmethod
+    def parse_ref(refstr):
+        m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
+        if not m: return
+
+        segment = m.group(1)
+        object = m.group(2)
+        checksum = m.group(3)
+        slice = m.group(4)
+
+        if checksum is not None:
+            checksum = checksum.lstrip("(").rstrip(")")
+
+        if slice is not None:
+            slice = (int(m.group(5)), int(m.group(6)))
+
+        return (segment, object, checksum, slice)
+
+    def get_segment(self, segment):
+        raw = self.store.lowlevel_open(segment + ".tar.gpg")
+
+        (input, output) = os.popen2("lbs-filter-gpg --decrypt")
+        def copy_thread(src, dst):
+            BLOCK_SIZE = 4096
+            while True:
+                block = src.read(BLOCK_SIZE)
+                if len(block) == 0: break
+                dst.write(block)
+            dst.close()
+
+        thread.start_new_thread(copy_thread, (raw, input))
+        return output
+
+    def load_segment(self, segment):
+        seg = tarfile.open(segment, 'r|', self.get_segment(segment))
+        for item in seg:
+            data_obj = seg.extractfile(item)
+            path = item.name.split('/')
+            if len(path) == 2 and path[0] == segment:
+                yield (path[1], data_obj.read())
+
+    def load_snapshot(self, snapshot):
+        file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
+        return file.read().splitlines(True)
+
+    def extract_segment(self, segment):
+        segdir = os.path.join(self.get_cachedir(), segment)
+        os.mkdir(segdir)
+        for (object, data) in self.load_segment(segment):
+            f = open(os.path.join(segdir, object), 'wb')
+            f.write(data)
+            f.close()
+
+    def load_object(self, segment, object):
+        path = os.path.join(self.get_cachedir(), segment, object)
+        if not os.access(path, os.R_OK):
+            print "Extracting", segment
+            self.extract_segment(segment)
+        if segment in self.lru_list: self.lru_list.remove(segment)
+        self.lru_list.append(segment)
+        while len(self.lru_list) > self.CACHE_SIZE:
+            os.system("rm -rv " + os.path.join(self.cachedir, self.lru_list[0]))
+            self.lru_list = self.lru_list[1:]
+        return open(path, 'rb').read()
+
+    def get(self, refstr):
+        """Fetch the given object and return it.
+
+        The input should be an object reference, in string form.
+        """
+
+        (segment, object, checksum, slice) = self.parse_ref(refstr)
+
+        data = self.load_object(segment, object)
+
+        if checksum is not None:
+            verifier = ChecksumVerifier(checksum)
+            verifier.update(data)
+            if not verifier.valid():
+                raise ValueError
+
+        if slice is not None:
+            (start, length) = slice
+            data = data[start:start+length]
+            if len(data) != length: raise IndexError
+
+        return data
+
+def parse(lines, terminate=None):
+    """Generic parser for RFC822-style "Key: Value" data streams.
+
+    This parser can be used to read metadata logs and snapshot root descriptor
+    files.
+
+    lines must be an iterable object which yields a sequence of lines of input.
+
+    If terminate is specified, it is used as a predicate to determine when to
+    stop reading input lines.
+    """
+
+    dict = {}
+    last_key = None
+
+    for l in lines:
+        # Strip off a trailing newline, if present
+        if len(l) > 0 and l[-1] == "\n":
+            l = l[:-1]
+
+        if terminate is not None and terminate(l):
+            if len(dict) > 0: yield dict
+            dict = {}
+            last_key = None
+            continue
+
+        m = re.match(r"^(\w+):\s*(.*)$", l)
+        if m:
+            dict[m.group(1)] = m.group(2)
+            last_key = m.group(1)
+        elif len(l) > 0 and l[0].isspace() and last_key is not None:
+            dict[last_key] += l
+        else:
+            last_key = None
+
+    if len(dict) > 0: yield dict
+
+def parse_full(lines):
+    try:
+        return parse(lines).next()
+    except StopIteration:
+        return {}
+
+def read_metadata(object_store, root):
+    """Iterate through all lines in the metadata log, following references."""
+
+    # Stack for keeping track of recursion when following references to
+    # portions of the log.  The last entry in the stack corresponds to the
+    # object currently being parsed.  Each entry is a list of lines which have
+    # been reversed, so that popping successive lines from the end of each list
+    # will return lines of the metadata log in order.
+    stack = []
+
+    def follow_ref(refstr):
+        if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
+        lines = object_store.get(refstr).splitlines(True)
+        lines.reverse()
+        stack.append(lines)
+
+    follow_ref(root)
+
+    while len(stack) > 0:
+        top = stack[-1]
+        if len(top) == 0:
+            stack.pop()
+            continue
+        line = top.pop()
+
+        # An indirect reference which we must follow?
+        if len(line) > 0 and line[0] == '@':
+            ref = line[1:]
+            ref.strip()
+            follow_ref(ref)
+        else:
+            yield line
+
+class MetadataItem:
+    """Metadata for a single file (or directory or...) from a snapshot."""
+
+    def __init__(self, fields, object_store):
+        """Initialize from a dictionary of key/value pairs from metadata log."""
+
+        self.fields = fields
+        self.object_store = object_store
+
+    def data(self):
+        """Return an iterator for the data blocks that make up a file."""
+
+        # This traverses the list of blocks that make up a file, following
+        # indirect references.  It is implemented in much the same way as
+        # read_metadata, so see that function for details of the technique.
+
+        objects = self.fields['data'].split()
+        objects.reverse()
+        stack = [objects]
+
+        def follow_ref(refstr):
+            if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
+            objects = self.object_store.get(refstr).split()
+            objects.reverse()
+            stack.append(objects)
+
+        while len(stack) > 0:
+            top = stack[-1]
+            if len(top) == 0:
+                stack.pop()
+                continue
+            ref = top.pop()
+
+            # An indirect reference which we must follow?
+            if len(ref) > 0 and ref[0] == '@':
+                follow_ref(ref[1:])
+            else:
+                yield ref
+
+def iterate_metadata(object_store, root):
+    for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
+        yield MetadataItem(d, object_store)
+
 class LocalDatabase:
     """Access to the local database of snapshot contents and object checksums.