From f48d9bc505b99bf3e83ae15f5c46c5a7b2d188b7 Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Tue, 11 Sep 2007 10:38:13 -0700 Subject: [PATCH] Expanded Python module for accessing LBS snapshots, and lbs-util.py. Expand the lbs Python module with code for reading from LBS archives. This includes decoding segments and parsing snapshot descriptors and metadata logs. Expand the lbs-util.py tool so that it has most of the functionality of lbs-util (Perl implementation). The Perl code should now be considered deprecated, and will be removed at some point in the future. The Python code still needs some cleaning up. --- NEWS | 9 ++ lbs-util.py | 94 +++++++++++++++- lbs.py | 317 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 419 insertions(+), 1 deletion(-) diff --git a/NEWS b/NEWS index 1291270..134bd9e 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,12 @@ +0.5? + - Much improved Python interface for accessing and manipulating LBS + archives and local database information. The interface should not + yet be considered stable. + - Python implementation of lbs-util now includes most of the + features of the Perl implementation, plus some other new features. + The Perl library and utility are deprecated, and will be removed + in the future. + 0.4 [2007-08-24] - Documentation improvements: a getting started README, and a description of some of the implementation details. diff --git a/lbs-util.py b/lbs-util.py index d30fbec..33c9c75 100755 --- a/lbs-util.py +++ b/lbs-util.py @@ -2,17 +2,26 @@ # # Utility for managing LBS archives. -import sys +import getpass, os, sys from optparse import OptionParser import lbs parser = OptionParser(usage="%prog [option]... command [arg]...") parser.add_option("-v", action="store_true", dest="verbose", default=False, help="increase verbosity") +parser.add_option("--store", dest="store", + help="specify path to backup data store") parser.add_option("--localdb", dest="localdb", help="specify path to local database") (options, args) = parser.parse_args(sys.argv[1:]) +# Read a passphrase from the user and store it in the LBS_GPG_PASSPHRASE +# environment variable. +def get_passphrase(): + ENV_KEY = 'LBS_GPG_PASSPHRASE' + if not os.environ.has_key(ENV_KEY): + os.environ[ENV_KEY] = getpass.getpass() + # Run the segment cleaner. # Syntax: $0 --localdb=LOCALDB clean def cmd_clean(clean_threshold=7.0): @@ -32,6 +41,79 @@ def cmd_clean(clean_threshold=7.0): db.balance_expired_objects() db.commit() +# List snapshots stored. +# Syntax: $0 --data=DATADIR list-snapshots +def cmd_list_snapshots(): + store = lbs.LowlevelDataStore(options.store) + for s in sorted(store.list_snapshots()): + print s + +# List size of data needed for each snapshot. +# Syntax: $0 --data=DATADIR list-snapshot-sizes +def cmd_list_snapshot_sizes(): + lowlevel = lbs.LowlevelDataStore(options.store) + store = lbs.ObjectStore(lowlevel) + previous = set() + for s in sorted(lowlevel.list_snapshots()): + d = lbs.parse_full(store.load_snapshot(s)) + segments = d['Segments'].split() + (size, added, removed) = (0, 0, 0) + for seg in segments: + segsize = lowlevel.lowlevel_stat(seg + ".tar.gpg")['size'] + size += segsize + if seg not in previous: added += segsize + for seg in previous: + if seg not in segments: + removed += lowlevel.lowlevel_stat(seg + ".tar.gpg")['size'] + previous = set(segments) + print "%s: %.3f +%.3f -%.3f" % (s, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2) + +# Build checksum list for objects in the given segments +def cmd_object_checksums(segments): + lowlevel = lbs.LowlevelDataStore(options.store) + store = lbs.ObjectStore(lowlevel) + for s in segments: + for (o, data) in store.load_segment(s): + csum = lbs.ChecksumCreator().update(data).compute() + print "%s/%s:%d:%s" % (s, o, len(data), csum) + store.cleanup() + +# Read a snapshot file +def cmd_read_snapshots(snapshots): + get_passphrase() + lowlevel = lbs.LowlevelDataStore(options.store) + store = lbs.ObjectStore(lowlevel) + for s in snapshots: + d = lbs.parse_full(store.load_snapshot(s)) + print d + print d['Segments'].split() + store.cleanup() + +# Verify snapshot integrity +def cmd_verify_snapshots(snapshots): + get_passphrase() + lowlevel = lbs.LowlevelDataStore(options.store) + store = lbs.ObjectStore(lowlevel) + for s in snapshots: + print "#### Snapshot", s + d = lbs.parse_full(store.load_snapshot(s)) + print "## Root:", d['Root'] + metadata = lbs.iterate_metadata(store, d['Root']) + for m in metadata: + if m.fields['type'] != '-': continue + print "%s [%d bytes]" % (m.fields['name'], int(m.fields['size'])) + verifier = lbs.ChecksumVerifier(m.fields['checksum']) + size = 0 + for block in m.data(): + data = store.get(block) + verifier.update(data) + size += len(data) + if int(m.fields['size']) != size: + raise ValueError("File size does not match!") + if not verifier.valid(): + raise ValueError("Bad checksum found") + store.cleanup() + if len(args) == 0: parser.print_usage() sys.exit(1) @@ -39,6 +121,16 @@ cmd = args[0] args = args[1:] if cmd == 'clean': cmd_clean() +elif cmd == 'list-snapshots': + cmd_list_snapshots() +elif cmd == 'object-sums': + cmd_object_checksums(args) +elif cmd == 'read-snapshots': + cmd_read_snapshots(args) +elif cmd == 'list-snapshot-sizes': + cmd_list_snapshot_sizes() +elif cmd == 'verify-snapshots': + cmd_verify_snapshots(args) else: print "Unknown command:", cmd parser.print_usage() diff --git a/lbs.py b/lbs.py index ab2c261..e3ca9ab 100644 --- a/lbs.py +++ b/lbs.py @@ -9,8 +9,12 @@ various parts of an LBS archive: """ from __future__ import division +import os, re, sha, tarfile, tempfile, thread from pysqlite2 import dbapi2 as sqlite3 +# Maximum number of nested indirect references allowed in a snapshot. +MAX_RECURSION_DEPTH = 3 + class Struct: """A class which merely acts as a data container. @@ -21,6 +25,319 @@ class Struct: def __repr__(self): return "<%s %s>" % (self.__class__, self.__dict__) +CHECKSUM_ALGORITHMS = { + 'sha1': sha.new +} + +class ChecksumCreator: + """Compute an LBS checksum for provided data. + + The algorithm used is selectable, but currently defaults to sha1. + """ + + def __init__(self, algorithm='sha1'): + self.algorithm = algorithm + self.hash = CHECKSUM_ALGORITHMS[algorithm]() + + def update(self, data): + self.hash.update(data) + return self + + def compute(self): + return "%s=%s" % (self.algorithm, self.hash.hexdigest()) + +class ChecksumVerifier: + """Verify whether a checksum from a snapshot matches the supplied data.""" + + def __init__(self, checksumstr): + """Create an object to check the supplied checksum.""" + + (algo, checksum) = checksumstr.split("=", 1) + self.checksum = checksum + self.hash = CHECKSUM_ALGORITHMS[algo]() + + def update(self, data): + self.hash.update(data) + + def valid(self): + """Return a boolean indicating whether the checksum matches.""" + + result = self.hash.hexdigest() + return result == self.checksum + +class LowlevelDataStore: + """Access to the backup store containing segments and snapshot descriptors. + + Instances of this class are used to get direct filesystem-level access to + the backup data. To read a backup, a caller will ordinarily not care about + direct access to backup segments, but will instead merely need to access + objects from those segments. The ObjectStore class provides a suitable + wrapper around a DataStore to give this high-level access. + """ + + def __init__(self, path): + self.path = path + + # Low-level filesystem access. These methods could be overwritten to + # provide access to remote data stores. + def lowlevel_list(self): + """Get a listing of files stored.""" + + return os.listdir(self.path) + + def lowlevel_open(self, filename): + """Return a file-like object for reading data from the given file.""" + + return open(os.path.join(self.path, filename), 'rb') + + def lowlevel_stat(self, filename): + """Return a dictionary of information about the given file. + + Currently, the only defined field is 'size', giving the size of the + file in bytes. + """ + + stat = os.stat(os.path.join(self.path, filename)) + return {'size': stat.st_size} + + # Slightly higher-level list methods. + def list_snapshots(self): + for f in self.lowlevel_list(): + m = re.match(r"^snapshot-(.*)\.lbs$", f) + if m: + yield m.group(1) + + def list_segments(self): + for f in self.lowlevel_list(): + m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f) + if m: + yield m.group(1) + +class ObjectStore: + def __init__(self, data_store): + self.store = data_store + self.cachedir = None + self.CACHE_SIZE = 16 + self.lru_list = [] + + def get_cachedir(self): + if self.cachedir is None: + self.cachedir = tempfile.mkdtemp(".lbs") + return self.cachedir + + def cleanup(self): + if self.cachedir is not None: + # TODO: Avoid use of system, make this safer + os.system("rm -rv " + self.cachedir) + self.cachedir = None + + @staticmethod + def parse_ref(refstr): + m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr) + if not m: return + + segment = m.group(1) + object = m.group(2) + checksum = m.group(3) + slice = m.group(4) + + if checksum is not None: + checksum = checksum.lstrip("(").rstrip(")") + + if slice is not None: + slice = (int(m.group(5)), int(m.group(6))) + + return (segment, object, checksum, slice) + + def get_segment(self, segment): + raw = self.store.lowlevel_open(segment + ".tar.gpg") + + (input, output) = os.popen2("lbs-filter-gpg --decrypt") + def copy_thread(src, dst): + BLOCK_SIZE = 4096 + while True: + block = src.read(BLOCK_SIZE) + if len(block) == 0: break + dst.write(block) + dst.close() + + thread.start_new_thread(copy_thread, (raw, input)) + return output + + def load_segment(self, segment): + seg = tarfile.open(segment, 'r|', self.get_segment(segment)) + for item in seg: + data_obj = seg.extractfile(item) + path = item.name.split('/') + if len(path) == 2 and path[0] == segment: + yield (path[1], data_obj.read()) + + def load_snapshot(self, snapshot): + file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs") + return file.read().splitlines(True) + + def extract_segment(self, segment): + segdir = os.path.join(self.get_cachedir(), segment) + os.mkdir(segdir) + for (object, data) in self.load_segment(segment): + f = open(os.path.join(segdir, object), 'wb') + f.write(data) + f.close() + + def load_object(self, segment, object): + path = os.path.join(self.get_cachedir(), segment, object) + if not os.access(path, os.R_OK): + print "Extracting", segment + self.extract_segment(segment) + if segment in self.lru_list: self.lru_list.remove(segment) + self.lru_list.append(segment) + while len(self.lru_list) > self.CACHE_SIZE: + os.system("rm -rv " + os.path.join(self.cachedir, self.lru_list[0])) + self.lru_list = self.lru_list[1:] + return open(path, 'rb').read() + + def get(self, refstr): + """Fetch the given object and return it. + + The input should be an object reference, in string form. + """ + + (segment, object, checksum, slice) = self.parse_ref(refstr) + + data = self.load_object(segment, object) + + if checksum is not None: + verifier = ChecksumVerifier(checksum) + verifier.update(data) + if not verifier.valid(): + raise ValueError + + if slice is not None: + (start, length) = slice + data = data[start:start+length] + if len(data) != length: raise IndexError + + return data + +def parse(lines, terminate=None): + """Generic parser for RFC822-style "Key: Value" data streams. + + This parser can be used to read metadata logs and snapshot root descriptor + files. + + lines must be an iterable object which yields a sequence of lines of input. + + If terminate is specified, it is used as a predicate to determine when to + stop reading input lines. + """ + + dict = {} + last_key = None + + for l in lines: + # Strip off a trailing newline, if present + if len(l) > 0 and l[-1] == "\n": + l = l[:-1] + + if terminate is not None and terminate(l): + if len(dict) > 0: yield dict + dict = {} + last_key = None + continue + + m = re.match(r"^(\w+):\s*(.*)$", l) + if m: + dict[m.group(1)] = m.group(2) + last_key = m.group(1) + elif len(l) > 0 and l[0].isspace() and last_key is not None: + dict[last_key] += l + else: + last_key = None + + if len(dict) > 0: yield dict + +def parse_full(lines): + try: + return parse(lines).next() + except StopIteration: + return {} + +def read_metadata(object_store, root): + """Iterate through all lines in the metadata log, following references.""" + + # Stack for keeping track of recursion when following references to + # portions of the log. The last entry in the stack corresponds to the + # object currently being parsed. Each entry is a list of lines which have + # been reversed, so that popping successive lines from the end of each list + # will return lines of the metadata log in order. + stack = [] + + def follow_ref(refstr): + if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError + lines = object_store.get(refstr).splitlines(True) + lines.reverse() + stack.append(lines) + + follow_ref(root) + + while len(stack) > 0: + top = stack[-1] + if len(top) == 0: + stack.pop() + continue + line = top.pop() + + # An indirect reference which we must follow? + if len(line) > 0 and line[0] == '@': + ref = line[1:] + ref.strip() + follow_ref(ref) + else: + yield line + +class MetadataItem: + """Metadata for a single file (or directory or...) from a snapshot.""" + + def __init__(self, fields, object_store): + """Initialize from a dictionary of key/value pairs from metadata log.""" + + self.fields = fields + self.object_store = object_store + + def data(self): + """Return an iterator for the data blocks that make up a file.""" + + # This traverses the list of blocks that make up a file, following + # indirect references. It is implemented in much the same way as + # read_metadata, so see that function for details of the technique. + + objects = self.fields['data'].split() + objects.reverse() + stack = [objects] + + def follow_ref(refstr): + if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError + objects = self.object_store.get(refstr).split() + objects.reverse() + stack.append(objects) + + while len(stack) > 0: + top = stack[-1] + if len(top) == 0: + stack.pop() + continue + ref = top.pop() + + # An indirect reference which we must follow? + if len(ref) > 0 and ref[0] == '@': + follow_ref(ref[1:]) + else: + yield ref + +def iterate_metadata(object_store, root): + for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0): + yield MetadataItem(d, object_store) + class LocalDatabase: """Access to the local database of snapshot contents and object checksums. -- 2.20.1