From c343597dac79f9edc63d95d881625a317fadb1d9 Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Sat, 2 Feb 2013 08:40:36 -0800 Subject: [PATCH] Update Python code to work with the new backup format. --- cumulus-sync | 55 ++++--- python/cumulus/__init__.py | 271 ++++++++++++++++++++++--------- python/cumulus/cmd_util.py | 123 ++++++-------- python/cumulus/store/__init__.py | 10 +- python/cumulus/store/file.py | 15 +- 5 files changed, 293 insertions(+), 181 deletions(-) diff --git a/cumulus-sync b/cumulus-sync index b76ebb5..8a474a5 100755 --- a/cumulus-sync +++ b/cumulus-sync @@ -30,27 +30,40 @@ sys.path.append(os.path.join(script_directory, 'python')) import cumulus import cumulus.store -store1 = cumulus.store.open(sys.argv[1]) -store2 = cumulus.store.open(sys.argv[2]) +store1 = cumulus.BackendWrapper(sys.argv[1]) +store2 = cumulus.BackendWrapper(sys.argv[2]) -source = cumulus.ObjectStore(cumulus.LowlevelDataStore(store1)) +source = cumulus.CumulusStore(store1) -filter = set() -for s in sys.argv[3:]: - filter.add(s) +items_required = set() +snapshots = sys.argv[3:] +if not snapshots: + snapshots = list(source.list_snapshots()) +for s in snapshots: + items_required.add(s) d = cumulus.parse_full(source.load_snapshot(s)) - filter.update(d['Segments'].split()) - -for ty in ('segments', 'checksums', 'snapshots'): - for f in sorted(store1.list(ty)): - m = cumulus.store.type_patterns[ty].match(f) - if not m: continue - if filter and m.group(1) not in filter: - continue - - print ty, f - try: - store2.stat(ty, f) - except cumulus.store.NotFoundError: - store2.put(ty, f, store1.get(ty, f)) - print " [sent]" + items_required.update(d['Segments'].split()) +print "Required:", items_required + +files_present = set() +for filetype in cumulus.SEARCH_PATHS: + for (name, path) in store2.list_generic(filetype): + items_required.discard(name) + files_present.add(path) +print "Files already present:", sorted(files_present) + +files_required = [] +items_found = set() +for filetype in cumulus.SEARCH_PATHS: + for (name, path) in store1.list_generic(filetype): + if name in items_required: + files_required.append(path) + items_found.add(name) +files_required.sort() + +print "Missing:", items_required.difference(items_found) +print "Required files:", files_required + +for f in files_required: + print f + store2.raw_backend.put(f, store1.raw_backend.get(f)) diff --git a/python/cumulus/__init__.py b/python/cumulus/__init__.py index d8b6814..10f0544 100644 --- a/python/cumulus/__init__.py +++ b/python/cumulus/__init__.py @@ -27,10 +27,17 @@ various parts of a Cumulus archive: """ from __future__ import division -import hashlib, os, re, tarfile, tempfile, thread +import hashlib +import itertools +import os +import re +import tarfile +import tempfile +import thread from pysqlite2 import dbapi2 as sqlite3 -import cumulus.store, cumulus.store.file +import cumulus.store +import cumulus.store.file # The largest supported snapshot format that can be understood. FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11 @@ -115,70 +122,182 @@ class ChecksumVerifier: result = self.hash.hexdigest() return result == self.checksum -class LowlevelDataStore: - """Access to the backup store containing segments and snapshot descriptors. +class SearchPathEntry(object): + """Item representing a possible search location for Cumulus files. - Instances of this class are used to get direct filesystem-level access to - the backup data. To read a backup, a caller will ordinarily not care about - direct access to backup segments, but will instead merely need to access - objects from those segments. The ObjectStore class provides a suitable - wrapper around a DataStore to give this high-level access. + Some Cumulus files might be stored in multiple possible file locations: due + to format (different compression mechanisms with different extensions), + locality (different segments might be placed in different directories to + control archiving policies), for backwards compatibility (default location + changed over time). A SearchPathEntry describes a possible location for a + file. """ + def __init__(self, directory_prefix, suffix, context=None): + self._directory_prefix = directory_prefix + self._suffix = suffix + self._context = context - def __init__(self, path): - if isinstance(path, cumulus.store.Store): - self.store = path - elif path.find(":") >= 0: - self.store = cumulus.store.open(path) - else: - self.store = cumulus.store.file.FileStore(path) + def __repr__(self): + return "%s(%r, %r, %r)" % (self.__class__.__name__, + self._directory_prefix, self._suffix, + self._context) + + def build_path(self, basename): + """Construct the search path to use for a file with name basename. + + Returns a tuple (pathname, context), where pathname is the path to try + and context is any additional data associated with this search entry + (if any). + """ + return (os.path.join(self._directory_prefix, basename + self._suffix), + self._context) - def _classify(self, filename): - for (t, r) in cumulus.store.type_patterns.items(): - if r.match(filename): - return (t, filename) - return (None, filename) +class SearchPath(object): + """A collection of locations to search for files and lookup utilities. - def scan(self): - self.store.scan() + For looking for a file in a Cumulus storage backend, a SearchPath object + contains a list of possible locations to try. A SearchPath can be used to + perform the search as well; when a file is found the search path ordering + is updated (moving the successful SearchPathEntry to the front of the list + for future searches). + """ + def __init__(self, name_regex, searchpath): + self._regex = re.compile(name_regex) + self._path = list(searchpath) + + def add_search_entry(self, entry): + self._path.append(entry) + + def directories(self): + """Return the set of directories to search for a file type.""" + return set(entry._directory_prefix for entry in self._path) - def lowlevel_open(self, filename): - """Return a file-like object for reading data from the given file.""" + def get(self, backend, basename): + for (i, entry) in enumerate(self._path): + try: + (pathname, context) = entry.build_path(basename) + fp = backend.get(pathname) + # On success, move this entry to the front of the search path + # to speed future searches. + if i > 0: + self._path.pop(i) + self._path.insert(0, entry) + return (fp, pathname, context) + except cumulus.store.NotFoundError: + continue + raise cumulus.store.NotFoundError(basename) - (type, filename) = self._classify(filename) - return self.store.get(type + "/" + filename) + def stat(self, backend, basename): + for (i, entry) in enumerate(self._path): + try: + (pathname, context) = entry.build_path(basename) + stat_data = backend.stat(pathname) + # On success, move this entry to the front of the search path + # to speed future searches. + if i > 0: + self._path.pop(i) + self._path.insert(0, entry) + result = {"path": pathname} + result.update(stat_data) + return result + except cumulus.store.NotFoundError: + continue + raise cumulus.store.NotFoundError(basename) + + def list(self, backend): + success = False + for d in self.directories(): + try: + for f in backend.list(d): + success = True + m = self._regex.match(f) + if m: yield (os.path.join(d, f), m) + except cumulus.store.NotFoundError: + pass + if not success: + raise cumulus.store.NotFoundError(basename) + +def _build_segments_searchpath(prefix): + for (extension, filter) in SEGMENT_FILTERS: + yield SearchPathEntry(prefix, extension, filter) + +SEARCH_PATHS = { + "checksums": SearchPath( + r"^snapshot-(.*)\.(\w+)sums$", + [SearchPathEntry("meta", ".sha1sums"), + SearchPathEntry("checksums", ".sha1sums"), + SearchPathEntry("", ".sha1sums")]), + "segments": SearchPath( + (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" + r"(\.\S+)?$"), + itertools.chain( + _build_segments_searchpath("segments0"), + _build_segments_searchpath("segments1"), + _build_segments_searchpath(""), + _build_segments_searchpath("segments"))), + "snapshots": SearchPath( + r"^snapshot-(.*)\.(cumulus|lbs)$", + [SearchPathEntry("snapshots", ".cumulus"), + SearchPathEntry("snapshots", ".lbs"), + SearchPathEntry("", ".cumulus"), + SearchPathEntry("", ".lbs")]), +} + +class BackendWrapper(object): + """Wrapper around a Cumulus storage backend that understands file types. + + The BackendWrapper class understands different Cumulus file types, such as + snapshots and segments, and implements higher-level operations such as + "retrieve a snapshot with a specific name" (hiding operations such as + searching for the correct file name). + """ - def lowlevel_stat(self, filename): - """Return a dictionary of information about the given file. + def __init__(self, backend): + """Initializes a wrapper around the specified storage backend. - Currently, the only defined field is 'size', giving the size of the - file in bytes. + store may either be a Store object or URL. """ + if type(backend) in (str, unicode): + if backend.find(":") >= 0: + self._backend = cumulus.store.open(backend) + else: + self._backend = cumulus.store.file.FileStore(backend) + else: + self._backend = backend - (type, filename) = self._classify(filename) - return self.store.stat(type + "/" + filename) + @property + def raw_backend(self): + return self._backend - # Slightly higher-level list methods. - def list_snapshots(self): - for f in self.store.list('snapshots'): - m = cumulus.store.type_patterns['snapshots'].match(f) - if m: yield m.group(1) + def stat_generic(self, basename, filetype): + return SEARCH_PATHS[filetype].stat(self._backend, basename) - def list_segments(self): - for f in self.store.list('segments'): - m = cumulus.store.type_patterns['segments'].match(f) - if m: yield m.group(1) + def open_generic(self, basename, filetype): + return SEARCH_PATHS[filetype].get(self._backend, basename) + + def open_snapshot(self, name): + return self.open_generic("snapshot-" + name, "snapshots") -class ObjectStore: - def __init__(self, data_store): - self.store = data_store + def open_segment(self, name): + return self.open_generic(name + ".tar", "segments") + + def list_generic(self, filetype): + return ((x[1].group(1), x[0]) + for x in SEARCH_PATHS[filetype].list(self._backend)) + +class CumulusStore: + def __init__(self, backend): + if isinstance(backend, BackendWrapper): + self.backend = backend + else: + self.backend = BackendWrapper(backend) self.cachedir = None self.CACHE_SIZE = 16 - self.lru_list = [] + self._lru_list = [] def get_cachedir(self): if self.cachedir is None: - self.cachedir = tempfile.mkdtemp(".lbs") + self.cachedir = tempfile.mkdtemp("-cumulus") return self.cachedir def cleanup(self): @@ -216,28 +335,33 @@ class ObjectStore: return (segment, object, checksum, slice) + def list_snapshots(self): + return set(x[0] for x in self.backend.list_generic("snapshots")) + + def list_segments(self): + return set(x[0] for x in self.backend.list_generic("segments")) + + def load_snapshot(self, snapshot): + snapshot_file = self.backend.open_snapshot(snapshot)[0] + return snapshot_file.read().splitlines(True) + def get_segment(self, segment): accessed_segments.add(segment) - for (extension, filter) in SEGMENT_FILTERS: - try: - raw = self.store.lowlevel_open(segment + ".tar" + extension) - - (input, output) = os.popen2(filter) - def copy_thread(src, dst): - BLOCK_SIZE = 4096 - while True: - block = src.read(BLOCK_SIZE) - if len(block) == 0: break - dst.write(block) - dst.close() - - thread.start_new_thread(copy_thread, (raw, input)) - return output - except: - pass - - raise cumulus.store.NotFoundError + (segment_fp, path, filter_cmd) = self.backend.open_segment(segment) + if filter_cmd is None: + return segment_fp + (input, output) = os.popen2(filter_cmd) + def copy_thread(src, dst): + BLOCK_SIZE = 4096 + while True: + block = src.read(BLOCK_SIZE) + if len(block) == 0: break + dst.write(block) + src.close() + dst.close() + thread.start_new_thread(copy_thread, (segment_fp, input)) + return output def load_segment(self, segment): seg = tarfile.open(segment, 'r|', self.get_segment(segment)) @@ -247,10 +371,6 @@ class ObjectStore: if len(path) == 2 and path[0] == segment: yield (path[1], data_obj.read()) - def load_snapshot(self, snapshot): - file = self.store.lowlevel_open("snapshot-" + snapshot + ".cumulus") - return file.read().splitlines(True) - def extract_segment(self, segment): segdir = os.path.join(self.get_cachedir(), segment) os.mkdir(segdir) @@ -264,11 +384,12 @@ class ObjectStore: path = os.path.join(self.get_cachedir(), segment, object) if not os.access(path, os.R_OK): self.extract_segment(segment) - if segment in self.lru_list: self.lru_list.remove(segment) - self.lru_list.append(segment) - while len(self.lru_list) > self.CACHE_SIZE: - os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0])) - self.lru_list = self.lru_list[1:] + if segment in self._lru_list: self._lru_list.remove(segment) + self._lru_list.append(segment) + while len(self._lru_list) > self.CACHE_SIZE: + os.system("rm -rf " + os.path.join(self.cachedir, + self._lru_list[0])) + self._lru_list = self._lru_list[1:] return open(path, 'rb').read() def get(self, refstr): diff --git a/python/cumulus/cmd_util.py b/python/cumulus/cmd_util.py index e0b094d..9d97190 100644 --- a/python/cumulus/cmd_util.py +++ b/python/cumulus/cmd_util.py @@ -76,97 +76,75 @@ def cmd_list_snapshots(args): """ List snapshots stored. Syntax: $0 --data=DATADIR list-snapshots """ - store = cumulus.LowlevelDataStore(options.store) - for s in sorted(store.list_snapshots()): - print s + store = cumulus.CumulusStore(options.store) + for s in sorted(store.list_snapshots()): print s def cmd_list_snapshot_sizes(args): """ List size of data needed for each snapshot. Syntax: $0 --data=DATADIR list-snapshot-sizes """ - lowlevel = cumulus.LowlevelDataStore(options.store) - lowlevel.scan() - store = cumulus.ObjectStore(lowlevel) + store = cumulus.CumulusStore(options.store) + backend = store.backend previous = set() - exts = {} - for seg in lowlevel.store.list('segments'): - exts.update ([seg.split ('.', 1)]) - for s in sorted(lowlevel.list_snapshots()): + size = 0 + def get_size(segment): + return backend.stat_generic(segment + ".tar", "segments")["size"] + for s in sorted(store.list_snapshots()): d = cumulus.parse_full(store.load_snapshot(s)) check_version(d['Format']) - try: - intent = float(d['Backup-Intent']) - except: - intent = 1.0 - - segments = d['Segments'].split() - (size, added, removed, addcount, remcount) = (0, 0, 0, 0, 0) - lo_stat = lowlevel.lowlevel_stat - for seg in segments: - segsize = lo_stat('.'.join ((seg, exts[seg])))['size'] - size += segsize - if seg not in previous: - added += segsize - addcount += 1 - for seg in previous: - if seg not in segments: - removed += lo_stat('.'.join((seg, exts[seg])))['size'] - remcount += 1 - previous = set(segments) - print "%s [%s]: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, intent, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount) + segments = set(d['Segments'].split()) + (added, removed, addcount, remcount) = (0, 0, 0, 0) + for seg in segments.difference(previous): + added += get_size(seg) + addcount += 1 + for seg in previous.difference(segments): + removed += get_size(seg) + remcount += 1 + size += added - removed + previous = segments + print "%s: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount) def cmd_garbage_collect(args): """ Search for any files which are not needed by any current snapshots and offer to delete them. Syntax: $0 --store=DATADIR gc """ - lowlevel = cumulus.LowlevelDataStore(options.store) - lowlevel.scan() - store = cumulus.ObjectStore(lowlevel) - snapshots = set(lowlevel.list_snapshots()) - segments = set() - for s in snapshots: + store = cumulus.CumulusStore(options.store) + backend = store.backend + referenced = set() + for s in store.list_snapshots(): d = cumulus.parse_full(store.load_snapshot(s)) check_version(d['Format']) - segments.update(d['Segments'].split()) - - referenced = snapshots.union(segments) - reclaimed = 0 - for (t, r) in cumulus.store.type_patterns.items(): - for f in lowlevel.store.list(t): - m = r.match(f) - if m is None or m.group(1) not in referenced: - print "Garbage:", (t, f) - reclaimed += lowlevel.store.stat(t, f)['size'] - if not options.dry_run: - lowlevel.store.delete(t, f) - print "Reclaimed space:", reclaimed + referenced.add(s) + referenced.update(d['Segments'].split()) -cmd_gc = cmd_garbage_collect + print referenced -def cmd_object_checksums(segments): - """ Build checksum list for objects in the given segments, or all - segments if none are specified. - """ - get_passphrase() - lowlevel = cumulus.LowlevelDataStore(options.store) - store = cumulus.ObjectStore(lowlevel) - if len(segments) == 0: - segments = sorted(lowlevel.list_segments()) - for s in segments: - for (o, data) in store.load_segment(s): - csum = cumulus.ChecksumCreator().update(data).compute() - print "%s/%s:%d:%s" % (s, o, len(data), csum) - store.cleanup() -object_sums = cmd_object_checksums + to_delete = [] + to_preserve = [] + for filetype in cumulus.SEARCH_PATHS: + for (name, path) in store.backend.list_generic(filetype): + if name in referenced: + to_preserve.append(path) + else: + to_delete.append(path) + + print to_preserve + print to_delete + + raw_backend = backend.raw_backend + for f in to_delete: + print "Delete:", f + if not options.dry_run: + raw_backend.delete(f) +cmd_gc = cmd_garbage_collect def cmd_read_snapshots(snapshots): """ Read a snapshot file """ get_passphrase() - lowlevel = cumulus.LowlevelDataStore(options.store) - store = cumulus.ObjectStore(lowlevel) + store = cumulus.CumulusStore(options.store) for s in snapshots: d = cumulus.parse_full(store.load_snapshot(s)) check_version(d['Format']) @@ -179,8 +157,7 @@ def cmd_read_metadata(args): """ snapshot = args [0] get_passphrase() - lowlevel = cumulus.LowlevelDataStore(options.store) - store = cumulus.ObjectStore(lowlevel) + store = cumulus.CumulusStore(options.store) d = cumulus.parse_full(store.load_snapshot(snapshot)) check_version(d['Format']) metadata = cumulus.read_metadata(store, d['Root']) @@ -198,8 +175,7 @@ def cmd_verify_snapshots(snapshots): """ Verify snapshot integrity """ get_passphrase() - lowlevel = cumulus.LowlevelDataStore(options.store) - store = cumulus.ObjectStore(lowlevel) + store = cumulus.CumulusStore(options.store) for s in snapshots: cumulus.accessed_segments.clear() print "#### Snapshot", s @@ -237,8 +213,7 @@ def cmd_restore_snapshot(args): """ Restore a snapshot, or some subset of files from it """ get_passphrase() - lowlevel = cumulus.LowlevelDataStore(options.store) - store = cumulus.ObjectStore(lowlevel) + store = cumulus.CumulusStore(options.store) snapshot = cumulus.parse_full(store.load_snapshot(args[0])) check_version(snapshot['Format']) destdir = args[1] @@ -279,7 +254,7 @@ def cmd_restore_snapshot(args): metadata_paths[pathname] = m for block in m.data(): (segment, object, checksum, slice) \ - = cumulus.ObjectStore.parse_ref(block) + = cumulus.CumulusStore.parse_ref(block) if segment not in metadata_segments: metadata_segments[segment] = set() metadata_segments[segment].add(pathname) diff --git a/python/cumulus/store/__init__.py b/python/cumulus/store/__init__.py index 3b54cbb..0899d70 100644 --- a/python/cumulus/store/__init__.py +++ b/python/cumulus/store/__init__.py @@ -55,19 +55,19 @@ class Store (object): except ImportError: raise NotImplementedError, "Scheme %s not implemented" % scheme - def list(self, type): + def list(self, path): raise NotImplementedError - def get(self, type, name): + def get(self, path): raise NotImplementedError - def put(self, type, name, fp): + def put(self, path, fp): raise NotImplementedError - def delete(self, type, name): + def delete(self, path): raise NotImplementedError - def stat(self, type, name): + def stat(self, path): raise NotImplementedError def scan(self): diff --git a/python/cumulus/store/file.py b/python/cumulus/store/file.py index 3d536bf..8304401 100644 --- a/python/cumulus/store/file.py +++ b/python/cumulus/store/file.py @@ -33,16 +33,19 @@ class FileStore(cumulus.store.Store): return os.path.join(self.prefix, type, name) def list(self, subdir): - return os.listdir(os.path.join(self.prefix, subdir)) + try: + return os.listdir(os.path.join(self.prefix, subdir)) + except OSError: + raise cumulus.store.NotFoundError(subdir) def get(self, path): - return open(os.path.join(self.prefix, path), 'rb') + try: + return open(os.path.join(self.prefix, path), 'rb') + except IOError: + raise cumulus.store.NotFoundError(path) def put(self, path, fp): - # TODO: Implement - raise NotImplementedError - k = self._get_path(type, name) - out = open(k, 'wb') + out = open(os.path.join(self.prefix, path), 'wb') buf = fp.read(4096) while len(buf) > 0: out.write(buf) -- 2.20.1