-"""High-level interface for working with LBS archives.
+# Cumulus: Efficient Filesystem Backup to the Cloud
+# Copyright (C) 2008-2009, 2012 The Cumulus Developers
+# See the AUTHORS file for a list of contributors.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""High-level interface for working with Cumulus archives.
This module provides an easy interface for reading from and manipulating
-various parts of an LBS archive:
+various parts of a Cumulus archive:
- listing the snapshots and segments present
- reading segment contents
- parsing snapshot descriptors and snapshot metadata logs
- reading and maintaining the local object database
"""
-from __future__ import division
-import os, re, sha, tarfile, tempfile, thread
-from pysqlite2 import dbapi2 as sqlite3
-
-import cumulus.store, cumulus.store.file
+from __future__ import division, print_function, unicode_literals
+
+import codecs
+import hashlib
+import itertools
+import os
+import re
+import sqlite3
+import subprocess
+import sys
+import tarfile
+import tempfile
+try:
+ import _thread
+except ImportError:
+ import thread as _thread
+
+import cumulus.store
+import cumulus.store.file
+
+if sys.version < "3":
+ StringTypes = (str, unicode)
+else:
+ StringTypes = (str,)
# The largest supported snapshot format that can be understood.
-FORMAT_VERSION = (0, 8) # LBS Snapshot v0.8
+FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
# Maximum number of nested indirect references allowed in a snapshot.
MAX_RECURSION_DEPTH = 3
# All segments which have been accessed this session.
accessed_segments = set()
+# Table of methods used to filter segments before storage, and corresponding
+# filename extensions. These are listed in priority order (methods earlier in
+# the list are tried first).
+SEGMENT_FILTERS = [
+ (".gpg", "cumulus-filter-gpg --decrypt"),
+ (".gz", "gzip -dc"),
+ (".bz2", "bzip2 -dc"),
+ ("", None),
+]
+
+def to_lines(data):
+ """Decode binary data from a file into a sequence of lines.
+
+ Newline markers are retained."""
+ return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
+
def uri_decode(s):
"""Decode a URI-encoded (%xx escapes) string."""
def hex_decode(m): return chr(int(m.group(1), 16))
return "<%s %s>" % (self.__class__, self.__dict__)
CHECKSUM_ALGORITHMS = {
- 'sha1': sha.new
+ 'sha1': hashlib.sha1,
+ 'sha224': hashlib.sha224,
+ 'sha256': hashlib.sha256,
}
class ChecksumCreator:
- """Compute an LBS checksum for provided data.
+ """Compute a Cumulus checksum for provided data.
The algorithm used is selectable, but currently defaults to sha1.
"""
result = self.hash.hexdigest()
return result == self.checksum
-class LowlevelDataStore:
- """Access to the backup store containing segments and snapshot descriptors.
+class SearchPathEntry(object):
+ """Item representing a possible search location for Cumulus files.
- Instances of this class are used to get direct filesystem-level access to
- the backup data. To read a backup, a caller will ordinarily not care about
- direct access to backup segments, but will instead merely need to access
- objects from those segments. The ObjectStore class provides a suitable
- wrapper around a DataStore to give this high-level access.
+ Some Cumulus files might be stored in multiple possible file locations: due
+ to format (different compression mechanisms with different extensions),
+ locality (different segments might be placed in different directories to
+ control archiving policies), for backwards compatibility (default location
+ changed over time). A SearchPathEntry describes a possible location for a
+ file.
"""
+ def __init__(self, directory_prefix, suffix, context=None):
+ self._directory_prefix = directory_prefix
+ self._suffix = suffix
+ self._context = context
- def __init__(self, path):
- if path.find(":") >= 0:
- self.store = cumulus.store.open(path)
- else:
- self.store = cumulus.store.file.FileStore(path)
+ def __repr__(self):
+ return "%s(%r, %r, %r)" % (self.__class__.__name__,
+ self._directory_prefix, self._suffix,
+ self._context)
- def _classify(self, filename):
- for (t, r) in cumulus.store.type_patterns.items():
- if r.match(filename):
- return (t, filename)
- return (None, filename)
+ def build_path(self, basename):
+ """Construct the search path to use for a file with name basename.
- def lowlevel_open(self, filename):
- """Return a file-like object for reading data from the given file."""
+ Returns a tuple (pathname, context), where pathname is the path to try
+ and context is any additional data associated with this search entry
+ (if any).
+ """
+ return (os.path.join(self._directory_prefix, basename + self._suffix),
+ self._context)
- (type, filename) = self._classify(filename)
- return self.store.get(type, filename)
+class SearchPath(object):
+ """A collection of locations to search for files and lookup utilities.
- def lowlevel_stat(self, filename):
- """Return a dictionary of information about the given file.
+ For looking for a file in a Cumulus storage backend, a SearchPath object
+ contains a list of possible locations to try. A SearchPath can be used to
+ perform the search as well; when a file is found the search path ordering
+ is updated (moving the successful SearchPathEntry to the front of the list
+ for future searches).
+ """
+ def __init__(self, name_regex, searchpath):
+ self._regex = re.compile(name_regex)
+ self._path = list(searchpath)
+
+ def add_search_entry(self, entry):
+ self._path.append(entry)
+
+ def directories(self):
+ """Return the set of directories to search for a file type."""
+ return set(entry._directory_prefix for entry in self._path)
+
+ def get(self, backend, basename):
+ for (i, entry) in enumerate(self._path):
+ try:
+ (pathname, context) = entry.build_path(basename)
+ fp = backend.get(pathname)
+ # On success, move this entry to the front of the search path
+ # to speed future searches.
+ if i > 0:
+ self._path.pop(i)
+ self._path.insert(0, entry)
+ return (fp, pathname, context)
+ except cumulus.store.NotFoundError:
+ continue
+ raise cumulus.store.NotFoundError(basename)
+
+ def stat(self, backend, basename):
+ for (i, entry) in enumerate(self._path):
+ try:
+ (pathname, context) = entry.build_path(basename)
+ stat_data = backend.stat(pathname)
+ # On success, move this entry to the front of the search path
+ # to speed future searches.
+ if i > 0:
+ self._path.pop(i)
+ self._path.insert(0, entry)
+ result = {"path": pathname}
+ result.update(stat_data)
+ return result
+ except cumulus.store.NotFoundError:
+ continue
+ raise cumulus.store.NotFoundError(basename)
+
+ def match(self, filename):
+ return self._regex.match(filename)
+
+ def list(self, backend):
+ success = False
+ for d in self.directories():
+ try:
+ for f in backend.list(d):
+ success = True
+ m = self.match(f)
+ if m: yield (os.path.join(d, f), m)
+ except cumulus.store.NotFoundError:
+ pass
+ if not success:
+ raise cumulus.store.NotFoundError(backend)
+
+def _build_segments_searchpath(prefix):
+ for (extension, filter) in SEGMENT_FILTERS:
+ yield SearchPathEntry(prefix, extension, filter)
+
+SEARCH_PATHS = {
+ "checksums": SearchPath(
+ r"^snapshot-(.*)\.(\w+)sums$",
+ [SearchPathEntry("meta", ".sha1sums"),
+ SearchPathEntry("checksums", ".sha1sums"),
+ SearchPathEntry("", ".sha1sums")]),
+ "meta": SearchPath(
+ r"^snapshot-(.*)\.meta(\.\S+)?$",
+ _build_segments_searchpath("meta")),
+ "segments": SearchPath(
+ (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
+ r"\.tar(\.\S+)?$"),
+ itertools.chain(
+ _build_segments_searchpath("segments0"),
+ _build_segments_searchpath("segments1"),
+ _build_segments_searchpath(""),
+ _build_segments_searchpath("segments"))),
+ "snapshots": SearchPath(
+ r"^snapshot-(.*)\.(cumulus|lbs)$",
+ [SearchPathEntry("snapshots", ".cumulus"),
+ SearchPathEntry("snapshots", ".lbs"),
+ SearchPathEntry("", ".cumulus"),
+ SearchPathEntry("", ".lbs")]),
+}
+
+class BackendWrapper(object):
+ """Wrapper around a Cumulus storage backend that understands file types.
- Currently, the only defined field is 'size', giving the size of the
- file in bytes.
+ The BackendWrapper class understands different Cumulus file types, such as
+ snapshots and segments, and implements higher-level operations such as
+ "retrieve a snapshot with a specific name" (hiding operations such as
+ searching for the correct file name).
+ """
+
+ def __init__(self, backend):
+ """Initializes a wrapper around the specified storage backend.
+
+ store may either be a Store object or URL.
"""
+ if type(backend) in StringTypes:
+ self._backend = cumulus.store.open(backend)
+ else:
+ self._backend = backend
- (type, filename) = self._classify(filename)
- return self.store.stat(type, filename)
+ @property
+ def raw_backend(self):
+ return self._backend
- # Slightly higher-level list methods.
- def list_snapshots(self):
- for f in self.store.list('snapshots'):
- m = cumulus.store.type_patterns['snapshots'].match(f)
- if m: yield m.group(1)
+ def stat_generic(self, basename, filetype):
+ return SEARCH_PATHS[filetype].stat(self._backend, basename)
- def list_segments(self):
- for f in self.store.list('segments'):
- m = cumulus.store.type_patterns['segments'].match(f)
- if m: yield m.group(1)
+ def open_generic(self, basename, filetype):
+ return SEARCH_PATHS[filetype].get(self._backend, basename)
+
+ def open_snapshot(self, name):
+ return self.open_generic("snapshot-" + name, "snapshots")
-class ObjectStore:
- def __init__(self, data_store):
- self.store = data_store
+ def open_segment(self, name):
+ return self.open_generic(name + ".tar", "segments")
+
+ def list_generic(self, filetype):
+ return ((x[1].group(1), x[0])
+ for x in SEARCH_PATHS[filetype].list(self._backend))
+
+ def prefetch_generic(self):
+ """Calls scan on directories to prefetch file metadata."""
+ directories = set()
+ for typeinfo in SEARCH_PATHS.values():
+ directories.update(typeinfo.directories())
+ for d in directories:
+ print("Prefetch", d)
+ self._backend.scan(d)
+
+class CumulusStore:
+ def __init__(self, backend):
+ if isinstance(backend, BackendWrapper):
+ self.backend = backend
+ else:
+ self.backend = BackendWrapper(backend)
self.cachedir = None
self.CACHE_SIZE = 16
- self.lru_list = []
+ self._lru_list = []
def get_cachedir(self):
if self.cachedir is None:
- self.cachedir = tempfile.mkdtemp(".lbs")
+ self.cachedir = tempfile.mkdtemp("-cumulus")
return self.cachedir
def cleanup(self):
def parse_ref(refstr):
m = re.match(r"^zero\[(\d+)\]$", refstr)
if m:
- return ("zero", None, None, (0, int(m.group(1))))
+ return ("zero", None, None, (0, int(m.group(1)), False))
m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
if not m: return
return (segment, object, checksum, slice)
- def get_segment(self, segment):
- accessed_segments.add(segment)
- raw = self.store.lowlevel_open(segment + ".tar.gpg")
+ def list_snapshots(self):
+ return set(x[0] for x in self.backend.list_generic("snapshots"))
+
+ def list_segments(self):
+ return set(x[0] for x in self.backend.list_generic("segments"))
- (input, output) = os.popen2("lbs-filter-gpg --decrypt")
+ def load_snapshot(self, snapshot):
+ snapshot_file = self.backend.open_snapshot(snapshot)[0]
+ return to_lines(snapshot_file.read())
+
+ @staticmethod
+ def filter_data(filehandle, filter_cmd):
+ if filter_cmd is None:
+ return filehandle
+ p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, close_fds=True)
+ input, output = p.stdin, p.stdout
def copy_thread(src, dst):
BLOCK_SIZE = 4096
while True:
block = src.read(BLOCK_SIZE)
if len(block) == 0: break
dst.write(block)
+ src.close()
dst.close()
-
- thread.start_new_thread(copy_thread, (raw, input))
+ p.wait()
+ _thread.start_new_thread(copy_thread, (filehandle, input))
return output
+ def get_segment(self, segment):
+ accessed_segments.add(segment)
+
+ (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+ return self.filter_data(segment_fp, filter_cmd)
+
def load_segment(self, segment):
seg = tarfile.open(segment, 'r|', self.get_segment(segment))
for item in seg:
if len(path) == 2 and path[0] == segment:
yield (path[1], data_obj.read())
- def load_snapshot(self, snapshot):
- file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
- return file.read().splitlines(True)
-
def extract_segment(self, segment):
segdir = os.path.join(self.get_cachedir(), segment)
os.mkdir(segdir)
path = os.path.join(self.get_cachedir(), segment, object)
if not os.access(path, os.R_OK):
self.extract_segment(segment)
- if segment in self.lru_list: self.lru_list.remove(segment)
- self.lru_list.append(segment)
- while len(self.lru_list) > self.CACHE_SIZE:
- os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
- self.lru_list = self.lru_list[1:]
+ if segment in self._lru_list: self._lru_list.remove(segment)
+ self._lru_list.append(segment)
+ while len(self._lru_list) > self.CACHE_SIZE:
+ os.system("rm -rf " + os.path.join(self.cachedir,
+ self._lru_list[0]))
+ self._lru_list = self._lru_list[1:]
return open(path, 'rb').read()
def get(self, refstr):
return data
+ def prefetch(self):
+ self.backend.prefetch_generic()
+
def parse(lines, terminate=None):
"""Generic parser for RFC822-style "Key: Value" data streams.
def parse_full(lines):
try:
- return parse(lines).next()
+ return next(parse(lines))
except StopIteration:
return {}
def parse_metadata_version(s):
"""Convert a string with the snapshot version format to a tuple."""
- m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
+ m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
if m is None:
return ()
else:
def follow_ref(refstr):
if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
- lines = object_store.get(refstr).splitlines(True)
+ lines = to_lines(object_store.get(refstr))
lines.reverse()
stack.append(lines)
schemes.sort()
return schemes
- def garbage_collect(self, scheme, intent=1.0):
+ def list_snapshots(self, scheme):
+ """Return a list of snapshots for the given scheme."""
+ cur = self.cursor()
+ cur.execute("select name from snapshots")
+ snapshots = [row[0] for row in cur.fetchall()]
+ snapshots.sort()
+ return snapshots
+
+ def delete_snapshot(self, scheme, name):
+ """Remove the specified snapshot from the database.
+
+ Warning: This does not garbage collect all dependent data in the
+ database, so it must be followed by a call to garbage_collect() to make
+ the database consistent.
+ """
+ cur = self.cursor()
+ cur.execute("delete from snapshots where scheme = ? and name = ?",
+ (scheme, name))
+
+ def prune_old_snapshots(self, scheme, intent=1.0):
"""Delete entries from old snapshots from the database.
Only snapshots with the specified scheme name will be deleted. If
can_delete = True
if can_delete and not first:
- print "Delete snapshot %d (%s)" % (id, name)
+ print("Delete snapshot %d (%s)" % (id, name))
cur.execute("delete from snapshots where snapshotid = ?",
(id,))
first = False
max_intent = max(max_intent, snap_intent)
- # Delete entries in the segments_used table which are for non-existent
- # snapshots.
- cur.execute("""delete from segments_used
+ self.garbage_collect()
+
+ def garbage_collect(self):
+ """Garbage-collect unreachable segment and object data.
+
+ Remove all segments and checksums which is not reachable from the
+ current set of snapshots stored in the local database.
+ """
+ cur = self.cursor()
+
+ # Delete entries in the segment_utilization table which are for
+ # non-existent snapshots.
+ cur.execute("""delete from segment_utilization
where snapshotid not in
(select snapshotid from snapshots)""")
- # Find segments which contain no objects used by any current snapshots,
- # and delete them from the segment table.
+ # Delete segments not referenced by any current snapshots.
cur.execute("""delete from segments where segmentid not in
- (select segmentid from segments_used)""")
+ (select segmentid from segment_utilization)""")
- # Delete unused objects in the block_index table. By "unused", we mean
- # any object which was stored in a segment which has been deleted, and
- # any object in a segment which was marked for cleaning and has had
- # cleaning performed already (the expired time is less than the current
- # largest snapshot id).
+ # Delete dangling objects in the block_index table.
cur.execute("""delete from block_index
- where segmentid not in (select segmentid from segments)
- or segmentid in (select segmentid from segments
- where expire_time < ?)""",
- (last_snapshotid,))
+ where segmentid not in
+ (select segmentid from segments)""")
# Remove sub-block signatures for deleted objects.
cur.execute("""delete from subblock_signatures
"""
# The expired column of the block_index table is used when generating a
- # new LBS snapshot. A null value indicates that an object may be
+ # new Cumulus snapshot. A null value indicates that an object may be
# re-used. Otherwise, an object must be written into a new segment if
# needed. Objects with distinct expired values will be written into
# distinct segments, to allow for some grouping by age. The value 0 is
target_size = max(2 * segment_size_estimate,
total_bytes / target_buckets)
- print "segment_size:", segment_size_estimate
- print "distribution:", distribution
- print "total_bytes:", total_bytes
- print "target_buckets:", target_buckets
- print "min, target size:", min_size, target_size
+ print("segment_size:", segment_size_estimate)
+ print("distribution:", distribution)
+ print("total_bytes:", total_bytes)
+ print("target_buckets:", target_buckets)
+ print("min, target size:", min_size, target_size)
# Chosen cutoffs. Each bucket consists of objects with age greater
# than one cutoff value, but not greater than the next largest cutoff.
cutoffs.append(-1)
cutoffs.append(-1)
- print "cutoffs:", cutoffs
+ print("cutoffs:", cutoffs)
# Update the database to assign each object to the appropriate bucket.
cutoffs.reverse()