X-Git-Url: http://git.vrable.net/?p=cumulus.git;a=blobdiff_plain;f=python%2Fcumulus%2F__init__.py;h=6e03bd2f7275f23897c20d3c52d1c7e8e6a1a5ad;hp=51d3ee82f49ec0e072714d343f1e9b38cd552ac3;hb=ee98274cfd9e9383214a9792c01fdfe4f22ef677;hpb=2ee97034047db53780a52d803b1c577b4c23c303 diff --git a/python/cumulus/__init__.py b/python/cumulus/__init__.py index 51d3ee8..6e03bd2 100644 --- a/python/cumulus/__init__.py +++ b/python/cumulus/__init__.py @@ -1,21 +1,57 @@ -"""High-level interface for working with LBS archives. +# Cumulus: Efficient Filesystem Backup to the Cloud +# Copyright (C) 2008-2009, 2012 The Cumulus Developers +# See the AUTHORS file for a list of contributors. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""High-level interface for working with Cumulus archives. This module provides an easy interface for reading from and manipulating -various parts of an LBS archive: +various parts of a Cumulus archive: - listing the snapshots and segments present - reading segment contents - parsing snapshot descriptors and snapshot metadata logs - reading and maintaining the local object database """ -from __future__ import division -import os, re, sha, tarfile, tempfile, thread -from pysqlite2 import dbapi2 as sqlite3 - -import cumulus.store, cumulus.store.file +from __future__ import division, print_function, unicode_literals + +import codecs +import hashlib +import itertools +import os +import re +import sqlite3 +import sys +import tarfile +import tempfile +try: + import _thread +except ImportError: + import thread as _thread + +import cumulus.store +import cumulus.store.file + +if sys.version < "3": + StringTypes = (str, unicode) +else: + StringTypes = (str,) # The largest supported snapshot format that can be understood. -FORMAT_VERSION = (0, 8) # LBS Snapshot v0.8 +FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11 # Maximum number of nested indirect references allowed in a snapshot. MAX_RECURSION_DEPTH = 3 @@ -23,6 +59,35 @@ MAX_RECURSION_DEPTH = 3 # All segments which have been accessed this session. accessed_segments = set() +# Table of methods used to filter segments before storage, and corresponding +# filename extensions. These are listed in priority order (methods earlier in +# the list are tried first). +SEGMENT_FILTERS = [ + (".gpg", "cumulus-filter-gpg --decrypt"), + (".gz", "gzip -dc"), + (".bz2", "bzip2 -dc"), + ("", None), +] + +def to_lines(data): + """Decode binary data from a file into a sequence of lines. + + Newline markers are retained.""" + return list(codecs.iterdecode(data.splitlines(True), "utf-8")) + +def uri_decode(s): + """Decode a URI-encoded (%xx escapes) string.""" + def hex_decode(m): return chr(int(m.group(1), 16)) + return re.sub(r"%([0-9a-f]{2})", hex_decode, s) +def uri_encode(s): + """Encode a string to URI-encoded (%xx escapes) form.""" + def hex_encode(c): + if c > '+' and c < '\x7f' and c != '@': + return c + else: + return "%%%02x" % (ord(c),) + return ''.join(hex_encode(c) for c in s) + class Struct: """A class which merely acts as a data container. @@ -34,11 +99,13 @@ class Struct: return "<%s %s>" % (self.__class__, self.__dict__) CHECKSUM_ALGORITHMS = { - 'sha1': sha.new + 'sha1': hashlib.sha1, + 'sha224': hashlib.sha224, + 'sha256': hashlib.sha256, } class ChecksumCreator: - """Compute an LBS checksum for provided data. + """Compute a Cumulus checksum for provided data. The algorithm used is selectable, but currently defaults to sha1. """ @@ -73,65 +140,194 @@ class ChecksumVerifier: result = self.hash.hexdigest() return result == self.checksum -class LowlevelDataStore: - """Access to the backup store containing segments and snapshot descriptors. +class SearchPathEntry(object): + """Item representing a possible search location for Cumulus files. - Instances of this class are used to get direct filesystem-level access to - the backup data. To read a backup, a caller will ordinarily not care about - direct access to backup segments, but will instead merely need to access - objects from those segments. The ObjectStore class provides a suitable - wrapper around a DataStore to give this high-level access. + Some Cumulus files might be stored in multiple possible file locations: due + to format (different compression mechanisms with different extensions), + locality (different segments might be placed in different directories to + control archiving policies), for backwards compatibility (default location + changed over time). A SearchPathEntry describes a possible location for a + file. """ + def __init__(self, directory_prefix, suffix, context=None): + self._directory_prefix = directory_prefix + self._suffix = suffix + self._context = context - def __init__(self, path): - if path.find(":") >= 0: - self.store = cumulus.store.open(path) - else: - self.store = cumulus.store.file.FileStore(path) + def __repr__(self): + return "%s(%r, %r, %r)" % (self.__class__.__name__, + self._directory_prefix, self._suffix, + self._context) - def _classify(self, filename): - for (t, r) in cumulus.store.type_patterns.items(): - if r.match(filename): - return (t, filename) - return (None, filename) + def build_path(self, basename): + """Construct the search path to use for a file with name basename. - def lowlevel_open(self, filename): - """Return a file-like object for reading data from the given file.""" + Returns a tuple (pathname, context), where pathname is the path to try + and context is any additional data associated with this search entry + (if any). + """ + return (os.path.join(self._directory_prefix, basename + self._suffix), + self._context) - (type, filename) = self._classify(filename) - return self.store.get(type, filename) +class SearchPath(object): + """A collection of locations to search for files and lookup utilities. - def lowlevel_stat(self, filename): - """Return a dictionary of information about the given file. + For looking for a file in a Cumulus storage backend, a SearchPath object + contains a list of possible locations to try. A SearchPath can be used to + perform the search as well; when a file is found the search path ordering + is updated (moving the successful SearchPathEntry to the front of the list + for future searches). + """ + def __init__(self, name_regex, searchpath): + self._regex = re.compile(name_regex) + self._path = list(searchpath) + + def add_search_entry(self, entry): + self._path.append(entry) + + def directories(self): + """Return the set of directories to search for a file type.""" + return set(entry._directory_prefix for entry in self._path) + + def get(self, backend, basename): + for (i, entry) in enumerate(self._path): + try: + (pathname, context) = entry.build_path(basename) + fp = backend.get(pathname) + # On success, move this entry to the front of the search path + # to speed future searches. + if i > 0: + self._path.pop(i) + self._path.insert(0, entry) + return (fp, pathname, context) + except cumulus.store.NotFoundError: + continue + raise cumulus.store.NotFoundError(basename) + + def stat(self, backend, basename): + for (i, entry) in enumerate(self._path): + try: + (pathname, context) = entry.build_path(basename) + stat_data = backend.stat(pathname) + # On success, move this entry to the front of the search path + # to speed future searches. + if i > 0: + self._path.pop(i) + self._path.insert(0, entry) + result = {"path": pathname} + result.update(stat_data) + return result + except cumulus.store.NotFoundError: + continue + raise cumulus.store.NotFoundError(basename) + + def match(self, filename): + return self._regex.match(filename) + + def list(self, backend): + success = False + for d in self.directories(): + try: + for f in backend.list(d): + success = True + m = self.match(f) + if m: yield (os.path.join(d, f), m) + except cumulus.store.NotFoundError: + pass + if not success: + raise cumulus.store.NotFoundError(backend) + +def _build_segments_searchpath(prefix): + for (extension, filter) in SEGMENT_FILTERS: + yield SearchPathEntry(prefix, extension, filter) + +SEARCH_PATHS = { + "checksums": SearchPath( + r"^snapshot-(.*)\.(\w+)sums$", + [SearchPathEntry("meta", ".sha1sums"), + SearchPathEntry("checksums", ".sha1sums"), + SearchPathEntry("", ".sha1sums")]), + "meta": SearchPath( + r"^snapshot-(.*)\.meta(\.\S+)?$", + _build_segments_searchpath("meta")), + "segments": SearchPath( + (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" + r"\.tar(\.\S+)?$"), + itertools.chain( + _build_segments_searchpath("segments0"), + _build_segments_searchpath("segments1"), + _build_segments_searchpath(""), + _build_segments_searchpath("segments"))), + "snapshots": SearchPath( + r"^snapshot-(.*)\.(cumulus|lbs)$", + [SearchPathEntry("snapshots", ".cumulus"), + SearchPathEntry("snapshots", ".lbs"), + SearchPathEntry("", ".cumulus"), + SearchPathEntry("", ".lbs")]), +} + +class BackendWrapper(object): + """Wrapper around a Cumulus storage backend that understands file types. + + The BackendWrapper class understands different Cumulus file types, such as + snapshots and segments, and implements higher-level operations such as + "retrieve a snapshot with a specific name" (hiding operations such as + searching for the correct file name). + """ - Currently, the only defined field is 'size', giving the size of the - file in bytes. + def __init__(self, backend): + """Initializes a wrapper around the specified storage backend. + + store may either be a Store object or URL. """ + if type(backend) in StringTypes: + self._backend = cumulus.store.open(backend) + else: + self._backend = backend - (type, filename) = self._classify(filename) - return self.store.stat(type, filename) + @property + def raw_backend(self): + return self._backend - # Slightly higher-level list methods. - def list_snapshots(self): - for f in self.store.list('snapshots'): - m = cumulus.store.type_patterns['snapshots'].match(f) - if m: yield m.group(1) + def stat_generic(self, basename, filetype): + return SEARCH_PATHS[filetype].stat(self._backend, basename) - def list_segments(self): - for f in self.store.list('segments'): - m = cumulus.store.type_patterns['segments'].match(f) - if m: yield m.group(1) + def open_generic(self, basename, filetype): + return SEARCH_PATHS[filetype].get(self._backend, basename) + + def open_snapshot(self, name): + return self.open_generic("snapshot-" + name, "snapshots") + + def open_segment(self, name): + return self.open_generic(name + ".tar", "segments") -class ObjectStore: - def __init__(self, data_store): - self.store = data_store + def list_generic(self, filetype): + return ((x[1].group(1), x[0]) + for x in SEARCH_PATHS[filetype].list(self._backend)) + + def prefetch_generic(self): + """Calls scan on directories to prefetch file metadata.""" + directories = set() + for typeinfo in SEARCH_PATHS.values(): + directories.update(typeinfo.directories()) + for d in directories: + print("Prefetch", d) + self._backend.scan(d) + +class CumulusStore: + def __init__(self, backend): + if isinstance(backend, BackendWrapper): + self.backend = backend + else: + self.backend = BackendWrapper(backend) self.cachedir = None self.CACHE_SIZE = 16 - self.lru_list = [] + self._lru_list = [] def get_cachedir(self): if self.cachedir is None: - self.cachedir = tempfile.mkdtemp(".lbs") + self.cachedir = tempfile.mkdtemp("-cumulus") return self.cachedir def cleanup(self): @@ -144,7 +340,7 @@ class ObjectStore: def parse_ref(refstr): m = re.match(r"^zero\[(\d+)\]$", refstr) if m: - return ("zero", None, None, (0, int(m.group(1)))) + return ("zero", None, None, (0, int(m.group(1)), False)) m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr) if not m: return @@ -169,22 +365,38 @@ class ObjectStore: return (segment, object, checksum, slice) - def get_segment(self, segment): - accessed_segments.add(segment) - raw = self.store.lowlevel_open(segment + ".tar.gpg") + def list_snapshots(self): + return set(x[0] for x in self.backend.list_generic("snapshots")) - (input, output) = os.popen2("lbs-filter-gpg --decrypt") + def list_segments(self): + return set(x[0] for x in self.backend.list_generic("segments")) + + def load_snapshot(self, snapshot): + snapshot_file = self.backend.open_snapshot(snapshot)[0] + return to_lines(snapshot_file.read()) + + @staticmethod + def filter_data(filehandle, filter_cmd): + if filter_cmd is None: + return filehandle + (input, output) = os.popen2(filter_cmd) def copy_thread(src, dst): BLOCK_SIZE = 4096 while True: block = src.read(BLOCK_SIZE) if len(block) == 0: break dst.write(block) + src.close() dst.close() - - thread.start_new_thread(copy_thread, (raw, input)) + _thread.start_new_thread(copy_thread, (filehandle, input)) return output + def get_segment(self, segment): + accessed_segments.add(segment) + + (segment_fp, path, filter_cmd) = self.backend.open_segment(segment) + return self.filter_data(segment_fp, filter_cmd) + def load_segment(self, segment): seg = tarfile.open(segment, 'r|', self.get_segment(segment)) for item in seg: @@ -193,10 +405,6 @@ class ObjectStore: if len(path) == 2 and path[0] == segment: yield (path[1], data_obj.read()) - def load_snapshot(self, snapshot): - file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs") - return file.read().splitlines(True) - def extract_segment(self, segment): segdir = os.path.join(self.get_cachedir(), segment) os.mkdir(segdir) @@ -210,11 +418,12 @@ class ObjectStore: path = os.path.join(self.get_cachedir(), segment, object) if not os.access(path, os.R_OK): self.extract_segment(segment) - if segment in self.lru_list: self.lru_list.remove(segment) - self.lru_list.append(segment) - while len(self.lru_list) > self.CACHE_SIZE: - os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0])) - self.lru_list = self.lru_list[1:] + if segment in self._lru_list: self._lru_list.remove(segment) + self._lru_list.append(segment) + while len(self._lru_list) > self.CACHE_SIZE: + os.system("rm -rf " + os.path.join(self.cachedir, + self._lru_list[0])) + self._lru_list = self._lru_list[1:] return open(path, 'rb').read() def get(self, refstr): @@ -244,6 +453,9 @@ class ObjectStore: return data + def prefetch(self): + self.backend.prefetch_generic() + def parse(lines, terminate=None): """Generic parser for RFC822-style "Key: Value" data streams. @@ -283,14 +495,14 @@ def parse(lines, terminate=None): def parse_full(lines): try: - return parse(lines).next() + return next(parse(lines)) except StopIteration: return {} def parse_metadata_version(s): """Convert a string with the snapshot version format to a tuple.""" - m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s) + m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s) if m is None: return () else: @@ -308,7 +520,7 @@ def read_metadata(object_store, root): def follow_ref(refstr): if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError - lines = object_store.get(refstr).splitlines(True) + lines = to_lines(object_store.get(refstr)) lines.reverse() stack.append(lines) @@ -347,8 +559,7 @@ class MetadataItem: @staticmethod def decode_str(s): """Decode a URI-encoded (%xx escapes) string.""" - def hex_decode(m): return chr(int(m.group(1), 16)) - return re.sub(r"%([0-9a-f]{2})", hex_decode, s) + return uri_decode(s) @staticmethod def raw_str(s): @@ -479,7 +690,26 @@ class LocalDatabase: schemes.sort() return schemes - def garbage_collect(self, scheme, intent=1.0): + def list_snapshots(self, scheme): + """Return a list of snapshots for the given scheme.""" + cur = self.cursor() + cur.execute("select name from snapshots") + snapshots = [row[0] for row in cur.fetchall()] + snapshots.sort() + return snapshots + + def delete_snapshot(self, scheme, name): + """Remove the specified snapshot from the database. + + Warning: This does not garbage collect all dependent data in the + database, so it must be followed by a call to garbage_collect() to make + the database consistent. + """ + cur = self.cursor() + cur.execute("delete from snapshots where scheme = ? and name = ?", + (scheme, name)) + + def prune_old_snapshots(self, scheme, intent=1.0): """Delete entries from old snapshots from the database. Only snapshots with the specified scheme name will be deleted. If @@ -520,33 +750,36 @@ class LocalDatabase: can_delete = True if can_delete and not first: - print "Delete snapshot %d (%s)" % (id, name) + print("Delete snapshot %d (%s)" % (id, name)) cur.execute("delete from snapshots where snapshotid = ?", (id,)) first = False max_intent = max(max_intent, snap_intent) - # Delete entries in the segments_used table which are for non-existent - # snapshots. - cur.execute("""delete from segments_used + self.garbage_collect() + + def garbage_collect(self): + """Garbage-collect unreachable segment and object data. + + Remove all segments and checksums which is not reachable from the + current set of snapshots stored in the local database. + """ + cur = self.cursor() + + # Delete entries in the segment_utilization table which are for + # non-existent snapshots. + cur.execute("""delete from segment_utilization where snapshotid not in (select snapshotid from snapshots)""") - # Find segments which contain no objects used by any current snapshots, - # and delete them from the segment table. + # Delete segments not referenced by any current snapshots. cur.execute("""delete from segments where segmentid not in - (select segmentid from segments_used)""") + (select segmentid from segment_utilization)""") - # Delete unused objects in the block_index table. By "unused", we mean - # any object which was stored in a segment which has been deleted, and - # any object in a segment which was marked for cleaning and has had - # cleaning performed already (the expired time is less than the current - # largest snapshot id). + # Delete dangling objects in the block_index table. cur.execute("""delete from block_index - where segmentid not in (select segmentid from segments) - or segmentid in (select segmentid from segments - where expire_time < ?)""", - (last_snapshotid,)) + where segmentid not in + (select segmentid from segments)""") # Remove sub-block signatures for deleted objects. cur.execute("""delete from subblock_signatures @@ -649,7 +882,7 @@ class LocalDatabase: """ # The expired column of the block_index table is used when generating a - # new LBS snapshot. A null value indicates that an object may be + # new Cumulus snapshot. A null value indicates that an object may be # re-used. Otherwise, an object must be written into a new segment if # needed. Objects with distinct expired values will be written into # distinct segments, to allow for some grouping by age. The value 0 is @@ -723,11 +956,11 @@ class LocalDatabase: target_size = max(2 * segment_size_estimate, total_bytes / target_buckets) - print "segment_size:", segment_size_estimate - print "distribution:", distribution - print "total_bytes:", total_bytes - print "target_buckets:", target_buckets - print "min, target size:", min_size, target_size + print("segment_size:", segment_size_estimate) + print("distribution:", distribution) + print("total_bytes:", total_bytes) + print("target_buckets:", target_buckets) + print("min, target size:", min_size, target_size) # Chosen cutoffs. Each bucket consists of objects with age greater # than one cutoff value, but not greater than the next largest cutoff. @@ -757,7 +990,7 @@ class LocalDatabase: cutoffs.append(-1) cutoffs.append(-1) - print "cutoffs:", cutoffs + print("cutoffs:", cutoffs) # Update the database to assign each object to the appropriate bucket. cutoffs.reverse()