"""
from __future__ import division
-import hashlib, os, re, tarfile, tempfile, thread
+import hashlib
+import itertools
+import os
+import re
+import tarfile
+import tempfile
+import thread
from pysqlite2 import dbapi2 as sqlite3
-import cumulus.store, cumulus.store.file
+import cumulus.store
+import cumulus.store.file
# The largest supported snapshot format that can be understood.
FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
result = self.hash.hexdigest()
return result == self.checksum
-class LowlevelDataStore:
- """Access to the backup store containing segments and snapshot descriptors.
+class SearchPathEntry(object):
+ """Item representing a possible search location for Cumulus files.
- Instances of this class are used to get direct filesystem-level access to
- the backup data. To read a backup, a caller will ordinarily not care about
- direct access to backup segments, but will instead merely need to access
- objects from those segments. The ObjectStore class provides a suitable
- wrapper around a DataStore to give this high-level access.
+ Some Cumulus files might be stored in multiple possible file locations: due
+ to format (different compression mechanisms with different extensions),
+ locality (different segments might be placed in different directories to
+ control archiving policies), for backwards compatibility (default location
+ changed over time). A SearchPathEntry describes a possible location for a
+ file.
"""
+ def __init__(self, directory_prefix, suffix, context=None):
+ self._directory_prefix = directory_prefix
+ self._suffix = suffix
+ self._context = context
- def __init__(self, path):
- if isinstance(path, cumulus.store.Store):
- self.store = path
- elif path.find(":") >= 0:
- self.store = cumulus.store.open(path)
- else:
- self.store = cumulus.store.file.FileStore(path)
+ def __repr__(self):
+ return "%s(%r, %r, %r)" % (self.__class__.__name__,
+ self._directory_prefix, self._suffix,
+ self._context)
+
+ def build_path(self, basename):
+ """Construct the search path to use for a file with name basename.
+
+ Returns a tuple (pathname, context), where pathname is the path to try
+ and context is any additional data associated with this search entry
+ (if any).
+ """
+ return (os.path.join(self._directory_prefix, basename + self._suffix),
+ self._context)
- def _classify(self, filename):
- for (t, r) in cumulus.store.type_patterns.items():
- if r.match(filename):
- return (t, filename)
- return (None, filename)
+class SearchPath(object):
+ """A collection of locations to search for files and lookup utilities.
- def scan(self):
- self.store.scan()
+ For looking for a file in a Cumulus storage backend, a SearchPath object
+ contains a list of possible locations to try. A SearchPath can be used to
+ perform the search as well; when a file is found the search path ordering
+ is updated (moving the successful SearchPathEntry to the front of the list
+ for future searches).
+ """
+ def __init__(self, name_regex, searchpath):
+ self._regex = re.compile(name_regex)
+ self._path = list(searchpath)
+
+ def add_search_entry(self, entry):
+ self._path.append(entry)
+
+ def directories(self):
+ """Return the set of directories to search for a file type."""
+ return set(entry._directory_prefix for entry in self._path)
- def lowlevel_open(self, filename):
- """Return a file-like object for reading data from the given file."""
+ def get(self, backend, basename):
+ for (i, entry) in enumerate(self._path):
+ try:
+ (pathname, context) = entry.build_path(basename)
+ fp = backend.get(pathname)
+ # On success, move this entry to the front of the search path
+ # to speed future searches.
+ if i > 0:
+ self._path.pop(i)
+ self._path.insert(0, entry)
+ return (fp, pathname, context)
+ except cumulus.store.NotFoundError:
+ continue
+ raise cumulus.store.NotFoundError(basename)
- (type, filename) = self._classify(filename)
- return self.store.get(type + "/" + filename)
+ def stat(self, backend, basename):
+ for (i, entry) in enumerate(self._path):
+ try:
+ (pathname, context) = entry.build_path(basename)
+ stat_data = backend.stat(pathname)
+ # On success, move this entry to the front of the search path
+ # to speed future searches.
+ if i > 0:
+ self._path.pop(i)
+ self._path.insert(0, entry)
+ result = {"path": pathname}
+ result.update(stat_data)
+ return result
+ except cumulus.store.NotFoundError:
+ continue
+ raise cumulus.store.NotFoundError(basename)
+
+ def list(self, backend):
+ success = False
+ for d in self.directories():
+ try:
+ for f in backend.list(d):
+ success = True
+ m = self._regex.match(f)
+ if m: yield (os.path.join(d, f), m)
+ except cumulus.store.NotFoundError:
+ pass
+ if not success:
+ raise cumulus.store.NotFoundError(basename)
+
+def _build_segments_searchpath(prefix):
+ for (extension, filter) in SEGMENT_FILTERS:
+ yield SearchPathEntry(prefix, extension, filter)
+
+SEARCH_PATHS = {
+ "checksums": SearchPath(
+ r"^snapshot-(.*)\.(\w+)sums$",
+ [SearchPathEntry("meta", ".sha1sums"),
+ SearchPathEntry("checksums", ".sha1sums"),
+ SearchPathEntry("", ".sha1sums")]),
+ "segments": SearchPath(
+ (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
+ r"(\.\S+)?$"),
+ itertools.chain(
+ _build_segments_searchpath("segments0"),
+ _build_segments_searchpath("segments1"),
+ _build_segments_searchpath(""),
+ _build_segments_searchpath("segments"))),
+ "snapshots": SearchPath(
+ r"^snapshot-(.*)\.(cumulus|lbs)$",
+ [SearchPathEntry("snapshots", ".cumulus"),
+ SearchPathEntry("snapshots", ".lbs"),
+ SearchPathEntry("", ".cumulus"),
+ SearchPathEntry("", ".lbs")]),
+}
+
+class BackendWrapper(object):
+ """Wrapper around a Cumulus storage backend that understands file types.
+
+ The BackendWrapper class understands different Cumulus file types, such as
+ snapshots and segments, and implements higher-level operations such as
+ "retrieve a snapshot with a specific name" (hiding operations such as
+ searching for the correct file name).
+ """
- def lowlevel_stat(self, filename):
- """Return a dictionary of information about the given file.
+ def __init__(self, backend):
+ """Initializes a wrapper around the specified storage backend.
- Currently, the only defined field is 'size', giving the size of the
- file in bytes.
+ store may either be a Store object or URL.
"""
+ if type(backend) in (str, unicode):
+ if backend.find(":") >= 0:
+ self._backend = cumulus.store.open(backend)
+ else:
+ self._backend = cumulus.store.file.FileStore(backend)
+ else:
+ self._backend = backend
- (type, filename) = self._classify(filename)
- return self.store.stat(type + "/" + filename)
+ @property
+ def raw_backend(self):
+ return self._backend
- # Slightly higher-level list methods.
- def list_snapshots(self):
- for f in self.store.list('snapshots'):
- m = cumulus.store.type_patterns['snapshots'].match(f)
- if m: yield m.group(1)
+ def stat_generic(self, basename, filetype):
+ return SEARCH_PATHS[filetype].stat(self._backend, basename)
- def list_segments(self):
- for f in self.store.list('segments'):
- m = cumulus.store.type_patterns['segments'].match(f)
- if m: yield m.group(1)
+ def open_generic(self, basename, filetype):
+ return SEARCH_PATHS[filetype].get(self._backend, basename)
+
+ def open_snapshot(self, name):
+ return self.open_generic("snapshot-" + name, "snapshots")
-class ObjectStore:
- def __init__(self, data_store):
- self.store = data_store
+ def open_segment(self, name):
+ return self.open_generic(name + ".tar", "segments")
+
+ def list_generic(self, filetype):
+ return ((x[1].group(1), x[0])
+ for x in SEARCH_PATHS[filetype].list(self._backend))
+
+class CumulusStore:
+ def __init__(self, backend):
+ if isinstance(backend, BackendWrapper):
+ self.backend = backend
+ else:
+ self.backend = BackendWrapper(backend)
self.cachedir = None
self.CACHE_SIZE = 16
- self.lru_list = []
+ self._lru_list = []
def get_cachedir(self):
if self.cachedir is None:
- self.cachedir = tempfile.mkdtemp(".lbs")
+ self.cachedir = tempfile.mkdtemp("-cumulus")
return self.cachedir
def cleanup(self):
return (segment, object, checksum, slice)
+ def list_snapshots(self):
+ return set(x[0] for x in self.backend.list_generic("snapshots"))
+
+ def list_segments(self):
+ return set(x[0] for x in self.backend.list_generic("segments"))
+
+ def load_snapshot(self, snapshot):
+ snapshot_file = self.backend.open_snapshot(snapshot)[0]
+ return snapshot_file.read().splitlines(True)
+
def get_segment(self, segment):
accessed_segments.add(segment)
- for (extension, filter) in SEGMENT_FILTERS:
- try:
- raw = self.store.lowlevel_open(segment + ".tar" + extension)
-
- (input, output) = os.popen2(filter)
- def copy_thread(src, dst):
- BLOCK_SIZE = 4096
- while True:
- block = src.read(BLOCK_SIZE)
- if len(block) == 0: break
- dst.write(block)
- dst.close()
-
- thread.start_new_thread(copy_thread, (raw, input))
- return output
- except:
- pass
-
- raise cumulus.store.NotFoundError
+ (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+ if filter_cmd is None:
+ return segment_fp
+ (input, output) = os.popen2(filter_cmd)
+ def copy_thread(src, dst):
+ BLOCK_SIZE = 4096
+ while True:
+ block = src.read(BLOCK_SIZE)
+ if len(block) == 0: break
+ dst.write(block)
+ src.close()
+ dst.close()
+ thread.start_new_thread(copy_thread, (segment_fp, input))
+ return output
def load_segment(self, segment):
seg = tarfile.open(segment, 'r|', self.get_segment(segment))
if len(path) == 2 and path[0] == segment:
yield (path[1], data_obj.read())
- def load_snapshot(self, snapshot):
- file = self.store.lowlevel_open("snapshot-" + snapshot + ".cumulus")
- return file.read().splitlines(True)
-
def extract_segment(self, segment):
segdir = os.path.join(self.get_cachedir(), segment)
os.mkdir(segdir)
path = os.path.join(self.get_cachedir(), segment, object)
if not os.access(path, os.R_OK):
self.extract_segment(segment)
- if segment in self.lru_list: self.lru_list.remove(segment)
- self.lru_list.append(segment)
- while len(self.lru_list) > self.CACHE_SIZE:
- os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
- self.lru_list = self.lru_list[1:]
+ if segment in self._lru_list: self._lru_list.remove(segment)
+ self._lru_list.append(segment)
+ while len(self._lru_list) > self.CACHE_SIZE:
+ os.system("rm -rf " + os.path.join(self.cachedir,
+ self._lru_list[0]))
+ self._lru_list = self._lru_list[1:]
return open(path, 'rb').read()
def get(self, refstr):
""" List snapshots stored.
Syntax: $0 --data=DATADIR list-snapshots
"""
- store = cumulus.LowlevelDataStore(options.store)
- for s in sorted(store.list_snapshots()):
- print s
+ store = cumulus.CumulusStore(options.store)
+ for s in sorted(store.list_snapshots()): print s
def cmd_list_snapshot_sizes(args):
""" List size of data needed for each snapshot.
Syntax: $0 --data=DATADIR list-snapshot-sizes
"""
- lowlevel = cumulus.LowlevelDataStore(options.store)
- lowlevel.scan()
- store = cumulus.ObjectStore(lowlevel)
+ store = cumulus.CumulusStore(options.store)
+ backend = store.backend
previous = set()
- exts = {}
- for seg in lowlevel.store.list('segments'):
- exts.update ([seg.split ('.', 1)])
- for s in sorted(lowlevel.list_snapshots()):
+ size = 0
+ def get_size(segment):
+ return backend.stat_generic(segment + ".tar", "segments")["size"]
+ for s in sorted(store.list_snapshots()):
d = cumulus.parse_full(store.load_snapshot(s))
check_version(d['Format'])
- try:
- intent = float(d['Backup-Intent'])
- except:
- intent = 1.0
-
- segments = d['Segments'].split()
- (size, added, removed, addcount, remcount) = (0, 0, 0, 0, 0)
- lo_stat = lowlevel.lowlevel_stat
- for seg in segments:
- segsize = lo_stat('.'.join ((seg, exts[seg])))['size']
- size += segsize
- if seg not in previous:
- added += segsize
- addcount += 1
- for seg in previous:
- if seg not in segments:
- removed += lo_stat('.'.join((seg, exts[seg])))['size']
- remcount += 1
- previous = set(segments)
- print "%s [%s]: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, intent, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount)
+ segments = set(d['Segments'].split())
+ (added, removed, addcount, remcount) = (0, 0, 0, 0)
+ for seg in segments.difference(previous):
+ added += get_size(seg)
+ addcount += 1
+ for seg in previous.difference(segments):
+ removed += get_size(seg)
+ remcount += 1
+ size += added - removed
+ previous = segments
+ print "%s: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount)
def cmd_garbage_collect(args):
""" Search for any files which are not needed by any current
snapshots and offer to delete them.
Syntax: $0 --store=DATADIR gc
"""
- lowlevel = cumulus.LowlevelDataStore(options.store)
- lowlevel.scan()
- store = cumulus.ObjectStore(lowlevel)
- snapshots = set(lowlevel.list_snapshots())
- segments = set()
- for s in snapshots:
+ store = cumulus.CumulusStore(options.store)
+ backend = store.backend
+ referenced = set()
+ for s in store.list_snapshots():
d = cumulus.parse_full(store.load_snapshot(s))
check_version(d['Format'])
- segments.update(d['Segments'].split())
-
- referenced = snapshots.union(segments)
- reclaimed = 0
- for (t, r) in cumulus.store.type_patterns.items():
- for f in lowlevel.store.list(t):
- m = r.match(f)
- if m is None or m.group(1) not in referenced:
- print "Garbage:", (t, f)
- reclaimed += lowlevel.store.stat(t, f)['size']
- if not options.dry_run:
- lowlevel.store.delete(t, f)
- print "Reclaimed space:", reclaimed
+ referenced.add(s)
+ referenced.update(d['Segments'].split())
-cmd_gc = cmd_garbage_collect
+ print referenced
-def cmd_object_checksums(segments):
- """ Build checksum list for objects in the given segments, or all
- segments if none are specified.
- """
- get_passphrase()
- lowlevel = cumulus.LowlevelDataStore(options.store)
- store = cumulus.ObjectStore(lowlevel)
- if len(segments) == 0:
- segments = sorted(lowlevel.list_segments())
- for s in segments:
- for (o, data) in store.load_segment(s):
- csum = cumulus.ChecksumCreator().update(data).compute()
- print "%s/%s:%d:%s" % (s, o, len(data), csum)
- store.cleanup()
-object_sums = cmd_object_checksums
+ to_delete = []
+ to_preserve = []
+ for filetype in cumulus.SEARCH_PATHS:
+ for (name, path) in store.backend.list_generic(filetype):
+ if name in referenced:
+ to_preserve.append(path)
+ else:
+ to_delete.append(path)
+
+ print to_preserve
+ print to_delete
+
+ raw_backend = backend.raw_backend
+ for f in to_delete:
+ print "Delete:", f
+ if not options.dry_run:
+ raw_backend.delete(f)
+cmd_gc = cmd_garbage_collect
def cmd_read_snapshots(snapshots):
""" Read a snapshot file
"""
get_passphrase()
- lowlevel = cumulus.LowlevelDataStore(options.store)
- store = cumulus.ObjectStore(lowlevel)
+ store = cumulus.CumulusStore(options.store)
for s in snapshots:
d = cumulus.parse_full(store.load_snapshot(s))
check_version(d['Format'])
"""
snapshot = args [0]
get_passphrase()
- lowlevel = cumulus.LowlevelDataStore(options.store)
- store = cumulus.ObjectStore(lowlevel)
+ store = cumulus.CumulusStore(options.store)
d = cumulus.parse_full(store.load_snapshot(snapshot))
check_version(d['Format'])
metadata = cumulus.read_metadata(store, d['Root'])
""" Verify snapshot integrity
"""
get_passphrase()
- lowlevel = cumulus.LowlevelDataStore(options.store)
- store = cumulus.ObjectStore(lowlevel)
+ store = cumulus.CumulusStore(options.store)
for s in snapshots:
cumulus.accessed_segments.clear()
print "#### Snapshot", s
""" Restore a snapshot, or some subset of files from it
"""
get_passphrase()
- lowlevel = cumulus.LowlevelDataStore(options.store)
- store = cumulus.ObjectStore(lowlevel)
+ store = cumulus.CumulusStore(options.store)
snapshot = cumulus.parse_full(store.load_snapshot(args[0]))
check_version(snapshot['Format'])
destdir = args[1]
metadata_paths[pathname] = m
for block in m.data():
(segment, object, checksum, slice) \
- = cumulus.ObjectStore.parse_ref(block)
+ = cumulus.CumulusStore.parse_ref(block)
if segment not in metadata_segments:
metadata_segments[segment] = set()
metadata_segments[segment].add(pathname)