Expanded Python module for accessing LBS snapshots, and lbs-util.py.
[cumulus.git] / lbs.py
diff --git a/lbs.py b/lbs.py
index ab2c261..e3ca9ab 100644 (file)
--- a/lbs.py
+++ b/lbs.py
@@ -9,8 +9,12 @@ various parts of an LBS archive:
 """
 
 from __future__ import division
+import os, re, sha, tarfile, tempfile, thread
 from pysqlite2 import dbapi2 as sqlite3
 
+# Maximum number of nested indirect references allowed in a snapshot.
+MAX_RECURSION_DEPTH = 3
+
 class Struct:
     """A class which merely acts as a data container.
 
@@ -21,6 +25,319 @@ class Struct:
     def __repr__(self):
         return "<%s %s>" % (self.__class__, self.__dict__)
 
+CHECKSUM_ALGORITHMS = {
+    'sha1': sha.new
+}
+
+class ChecksumCreator:
+    """Compute an LBS checksum for provided data.
+
+    The algorithm used is selectable, but currently defaults to sha1.
+    """
+
+    def __init__(self, algorithm='sha1'):
+        self.algorithm = algorithm
+        self.hash = CHECKSUM_ALGORITHMS[algorithm]()
+
+    def update(self, data):
+        self.hash.update(data)
+        return self
+
+    def compute(self):
+        return "%s=%s" % (self.algorithm, self.hash.hexdigest())
+
+class ChecksumVerifier:
+    """Verify whether a checksum from a snapshot matches the supplied data."""
+
+    def __init__(self, checksumstr):
+        """Create an object to check the supplied checksum."""
+
+        (algo, checksum) = checksumstr.split("=", 1)
+        self.checksum = checksum
+        self.hash = CHECKSUM_ALGORITHMS[algo]()
+
+    def update(self, data):
+        self.hash.update(data)
+
+    def valid(self):
+        """Return a boolean indicating whether the checksum matches."""
+
+        result = self.hash.hexdigest()
+        return result == self.checksum
+
+class LowlevelDataStore:
+    """Access to the backup store containing segments and snapshot descriptors.
+
+    Instances of this class are used to get direct filesystem-level access to
+    the backup data.  To read a backup, a caller will ordinarily not care about
+    direct access to backup segments, but will instead merely need to access
+    objects from those segments.  The ObjectStore class provides a suitable
+    wrapper around a DataStore to give this high-level access.
+    """
+
+    def __init__(self, path):
+        self.path = path
+
+    # Low-level filesystem access.  These methods could be overwritten to
+    # provide access to remote data stores.
+    def lowlevel_list(self):
+        """Get a listing of files stored."""
+
+        return os.listdir(self.path)
+
+    def lowlevel_open(self, filename):
+        """Return a file-like object for reading data from the given file."""
+
+        return open(os.path.join(self.path, filename), 'rb')
+
+    def lowlevel_stat(self, filename):
+        """Return a dictionary of information about the given file.
+
+        Currently, the only defined field is 'size', giving the size of the
+        file in bytes.
+        """
+
+        stat = os.stat(os.path.join(self.path, filename))
+        return {'size': stat.st_size}
+
+    # Slightly higher-level list methods.
+    def list_snapshots(self):
+        for f in self.lowlevel_list():
+            m = re.match(r"^snapshot-(.*)\.lbs$", f)
+            if m:
+                yield m.group(1)
+
+    def list_segments(self):
+        for f in self.lowlevel_list():
+            m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
+            if m:
+                yield m.group(1)
+
+class ObjectStore:
+    def __init__(self, data_store):
+        self.store = data_store
+        self.cachedir = None
+        self.CACHE_SIZE = 16
+        self.lru_list = []
+
+    def get_cachedir(self):
+        if self.cachedir is None:
+            self.cachedir = tempfile.mkdtemp(".lbs")
+        return self.cachedir
+
+    def cleanup(self):
+        if self.cachedir is not None:
+            # TODO: Avoid use of system, make this safer
+            os.system("rm -rv " + self.cachedir)
+        self.cachedir = None
+
+    @staticmethod
+    def parse_ref(refstr):
+        m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
+        if not m: return
+
+        segment = m.group(1)
+        object = m.group(2)
+        checksum = m.group(3)
+        slice = m.group(4)
+
+        if checksum is not None:
+            checksum = checksum.lstrip("(").rstrip(")")
+
+        if slice is not None:
+            slice = (int(m.group(5)), int(m.group(6)))
+
+        return (segment, object, checksum, slice)
+
+    def get_segment(self, segment):
+        raw = self.store.lowlevel_open(segment + ".tar.gpg")
+
+        (input, output) = os.popen2("lbs-filter-gpg --decrypt")
+        def copy_thread(src, dst):
+            BLOCK_SIZE = 4096
+            while True:
+                block = src.read(BLOCK_SIZE)
+                if len(block) == 0: break
+                dst.write(block)
+            dst.close()
+
+        thread.start_new_thread(copy_thread, (raw, input))
+        return output
+
+    def load_segment(self, segment):
+        seg = tarfile.open(segment, 'r|', self.get_segment(segment))
+        for item in seg:
+            data_obj = seg.extractfile(item)
+            path = item.name.split('/')
+            if len(path) == 2 and path[0] == segment:
+                yield (path[1], data_obj.read())
+
+    def load_snapshot(self, snapshot):
+        file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
+        return file.read().splitlines(True)
+
+    def extract_segment(self, segment):
+        segdir = os.path.join(self.get_cachedir(), segment)
+        os.mkdir(segdir)
+        for (object, data) in self.load_segment(segment):
+            f = open(os.path.join(segdir, object), 'wb')
+            f.write(data)
+            f.close()
+
+    def load_object(self, segment, object):
+        path = os.path.join(self.get_cachedir(), segment, object)
+        if not os.access(path, os.R_OK):
+            print "Extracting", segment
+            self.extract_segment(segment)
+        if segment in self.lru_list: self.lru_list.remove(segment)
+        self.lru_list.append(segment)
+        while len(self.lru_list) > self.CACHE_SIZE:
+            os.system("rm -rv " + os.path.join(self.cachedir, self.lru_list[0]))
+            self.lru_list = self.lru_list[1:]
+        return open(path, 'rb').read()
+
+    def get(self, refstr):
+        """Fetch the given object and return it.
+
+        The input should be an object reference, in string form.
+        """
+
+        (segment, object, checksum, slice) = self.parse_ref(refstr)
+
+        data = self.load_object(segment, object)
+
+        if checksum is not None:
+            verifier = ChecksumVerifier(checksum)
+            verifier.update(data)
+            if not verifier.valid():
+                raise ValueError
+
+        if slice is not None:
+            (start, length) = slice
+            data = data[start:start+length]
+            if len(data) != length: raise IndexError
+
+        return data
+
+def parse(lines, terminate=None):
+    """Generic parser for RFC822-style "Key: Value" data streams.
+
+    This parser can be used to read metadata logs and snapshot root descriptor
+    files.
+
+    lines must be an iterable object which yields a sequence of lines of input.
+
+    If terminate is specified, it is used as a predicate to determine when to
+    stop reading input lines.
+    """
+
+    dict = {}
+    last_key = None
+
+    for l in lines:
+        # Strip off a trailing newline, if present
+        if len(l) > 0 and l[-1] == "\n":
+            l = l[:-1]
+
+        if terminate is not None and terminate(l):
+            if len(dict) > 0: yield dict
+            dict = {}
+            last_key = None
+            continue
+
+        m = re.match(r"^(\w+):\s*(.*)$", l)
+        if m:
+            dict[m.group(1)] = m.group(2)
+            last_key = m.group(1)
+        elif len(l) > 0 and l[0].isspace() and last_key is not None:
+            dict[last_key] += l
+        else:
+            last_key = None
+
+    if len(dict) > 0: yield dict
+
+def parse_full(lines):
+    try:
+        return parse(lines).next()
+    except StopIteration:
+        return {}
+
+def read_metadata(object_store, root):
+    """Iterate through all lines in the metadata log, following references."""
+
+    # Stack for keeping track of recursion when following references to
+    # portions of the log.  The last entry in the stack corresponds to the
+    # object currently being parsed.  Each entry is a list of lines which have
+    # been reversed, so that popping successive lines from the end of each list
+    # will return lines of the metadata log in order.
+    stack = []
+
+    def follow_ref(refstr):
+        if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
+        lines = object_store.get(refstr).splitlines(True)
+        lines.reverse()
+        stack.append(lines)
+
+    follow_ref(root)
+
+    while len(stack) > 0:
+        top = stack[-1]
+        if len(top) == 0:
+            stack.pop()
+            continue
+        line = top.pop()
+
+        # An indirect reference which we must follow?
+        if len(line) > 0 and line[0] == '@':
+            ref = line[1:]
+            ref.strip()
+            follow_ref(ref)
+        else:
+            yield line
+
+class MetadataItem:
+    """Metadata for a single file (or directory or...) from a snapshot."""
+
+    def __init__(self, fields, object_store):
+        """Initialize from a dictionary of key/value pairs from metadata log."""
+
+        self.fields = fields
+        self.object_store = object_store
+
+    def data(self):
+        """Return an iterator for the data blocks that make up a file."""
+
+        # This traverses the list of blocks that make up a file, following
+        # indirect references.  It is implemented in much the same way as
+        # read_metadata, so see that function for details of the technique.
+
+        objects = self.fields['data'].split()
+        objects.reverse()
+        stack = [objects]
+
+        def follow_ref(refstr):
+            if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
+            objects = self.object_store.get(refstr).split()
+            objects.reverse()
+            stack.append(objects)
+
+        while len(stack) > 0:
+            top = stack[-1]
+            if len(top) == 0:
+                stack.pop()
+                continue
+            ref = top.pop()
+
+            # An indirect reference which we must follow?
+            if len(ref) > 0 and ref[0] == '@':
+                follow_ref(ref[1:])
+            else:
+                yield ref
+
+def iterate_metadata(object_store, root):
+    for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
+        yield MetadataItem(d, object_store)
+
 class LocalDatabase:
     """Access to the local database of snapshot contents and object checksums.