X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=python%2Fcumulus%2F__init__.py;h=323a7c75b4679ca6293ff9a6ee67d7aa6806e217;hb=da1d95d3242ee9d596e60b8d5bfcf9e5bedcd80f;hp=10f0544189407e7fe31c1b4abf7b6c7fc1534adb;hpb=c343597dac79f9edc63d95d881625a317fadb1d9;p=cumulus.git diff --git a/python/cumulus/__init__.py b/python/cumulus/__init__.py index 10f0544..323a7c7 100644 --- a/python/cumulus/__init__.py +++ b/python/cumulus/__init__.py @@ -26,19 +26,31 @@ various parts of a Cumulus archive: - reading and maintaining the local object database """ -from __future__ import division +from __future__ import division, print_function, unicode_literals + +import codecs import hashlib import itertools import os import re +import sqlite3 +import subprocess +import sys import tarfile import tempfile -import thread -from pysqlite2 import dbapi2 as sqlite3 +try: + import _thread +except ImportError: + import thread as _thread import cumulus.store import cumulus.store.file +if sys.version < "3": + StringTypes = (str, unicode) +else: + StringTypes = (str,) + # The largest supported snapshot format that can be understood. FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11 @@ -55,8 +67,15 @@ SEGMENT_FILTERS = [ (".gpg", "cumulus-filter-gpg --decrypt"), (".gz", "gzip -dc"), (".bz2", "bzip2 -dc"), + ("", None), ] +def to_lines(data): + """Decode binary data from a file into a sequence of lines. + + Newline markers are retained.""" + return list(codecs.iterdecode(data.splitlines(True), "utf-8")) + def uri_decode(s): """Decode a URI-encoded (%xx escapes) string.""" def hex_decode(m): return chr(int(m.group(1), 16)) @@ -204,18 +223,21 @@ class SearchPath(object): continue raise cumulus.store.NotFoundError(basename) + def match(self, filename): + return self._regex.match(filename) + def list(self, backend): success = False for d in self.directories(): try: for f in backend.list(d): success = True - m = self._regex.match(f) + m = self.match(f) if m: yield (os.path.join(d, f), m) except cumulus.store.NotFoundError: pass if not success: - raise cumulus.store.NotFoundError(basename) + raise cumulus.store.NotFoundError(backend) def _build_segments_searchpath(prefix): for (extension, filter) in SEGMENT_FILTERS: @@ -227,9 +249,12 @@ SEARCH_PATHS = { [SearchPathEntry("meta", ".sha1sums"), SearchPathEntry("checksums", ".sha1sums"), SearchPathEntry("", ".sha1sums")]), + "meta": SearchPath( + r"^snapshot-(.*)\.meta(\.\S+)?$", + _build_segments_searchpath("meta")), "segments": SearchPath( (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})" - r"(\.\S+)?$"), + r"\.tar(\.\S+)?$"), itertools.chain( _build_segments_searchpath("segments0"), _build_segments_searchpath("segments1"), @@ -257,11 +282,8 @@ class BackendWrapper(object): store may either be a Store object or URL. """ - if type(backend) in (str, unicode): - if backend.find(":") >= 0: - self._backend = cumulus.store.open(backend) - else: - self._backend = cumulus.store.file.FileStore(backend) + if type(backend) in StringTypes: + self._backend = cumulus.store.open(backend) else: self._backend = backend @@ -285,6 +307,15 @@ class BackendWrapper(object): return ((x[1].group(1), x[0]) for x in SEARCH_PATHS[filetype].list(self._backend)) + def prefetch_generic(self): + """Calls scan on directories to prefetch file metadata.""" + directories = set() + for typeinfo in SEARCH_PATHS.values(): + directories.update(typeinfo.directories()) + for d in directories: + print("Prefetch", d) + self._backend.scan(d) + class CumulusStore: def __init__(self, backend): if isinstance(backend, BackendWrapper): @@ -312,7 +343,7 @@ class CumulusStore: if m: return ("zero", None, None, (0, int(m.group(1)), False)) - m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr) + m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(=?(\d+)|(\d+)\+(\d+))\])?$", refstr) if not m: return segment = m.group(1) @@ -324,12 +355,9 @@ class CumulusStore: checksum = checksum.lstrip("(").rstrip(")") if slice is not None: - if m.group(9) is not None: + if m.group(6) is not None: # Size-assertion slice - slice = (0, int(m.group(9)), True) - elif m.group(6) is None: - # Abbreviated slice - slice = (0, int(m.group(8)), False) + slice = (0, int(m.group(6)), True) else: slice = (int(m.group(7)), int(m.group(8)), False) @@ -343,15 +371,15 @@ class CumulusStore: def load_snapshot(self, snapshot): snapshot_file = self.backend.open_snapshot(snapshot)[0] - return snapshot_file.read().splitlines(True) - - def get_segment(self, segment): - accessed_segments.add(segment) + return to_lines(snapshot_file.read()) - (segment_fp, path, filter_cmd) = self.backend.open_segment(segment) + @staticmethod + def filter_data(filehandle, filter_cmd): if filter_cmd is None: - return segment_fp - (input, output) = os.popen2(filter_cmd) + return filehandle + p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, close_fds=True) + input, output = p.stdin, p.stdout def copy_thread(src, dst): BLOCK_SIZE = 4096 while True: @@ -360,9 +388,16 @@ class CumulusStore: dst.write(block) src.close() dst.close() - thread.start_new_thread(copy_thread, (segment_fp, input)) + p.wait() + _thread.start_new_thread(copy_thread, (filehandle, input)) return output + def get_segment(self, segment): + accessed_segments.add(segment) + + (segment_fp, path, filter_cmd) = self.backend.open_segment(segment) + return self.filter_data(segment_fp, filter_cmd) + def load_segment(self, segment): seg = tarfile.open(segment, 'r|', self.get_segment(segment)) for item in seg: @@ -413,12 +448,18 @@ class CumulusStore: if slice is not None: (start, length, exact) = slice + # Note: The following assertion check may need to be commented out + # to restore from pre-v0.8 snapshots, as the syntax for + # size-assertion slices has changed. if exact and len(data) != length: raise ValueError data = data[start:start+length] if len(data) != length: raise IndexError return data + def prefetch(self): + self.backend.prefetch_generic() + def parse(lines, terminate=None): """Generic parser for RFC822-style "Key: Value" data streams. @@ -458,7 +499,7 @@ def parse(lines, terminate=None): def parse_full(lines): try: - return parse(lines).next() + return next(parse(lines)) except StopIteration: return {} @@ -483,7 +524,7 @@ def read_metadata(object_store, root): def follow_ref(refstr): if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError - lines = object_store.get(refstr).splitlines(True) + lines = to_lines(object_store.get(refstr)) lines.reverse() stack.append(lines) @@ -713,7 +754,7 @@ class LocalDatabase: can_delete = True if can_delete and not first: - print "Delete snapshot %d (%s)" % (id, name) + print("Delete snapshot %d (%s)" % (id, name)) cur.execute("delete from snapshots where snapshotid = ?", (id,)) first = False @@ -729,16 +770,15 @@ class LocalDatabase: """ cur = self.cursor() - # Delete entries in the segments_used table which are for non-existent - # snapshots. - cur.execute("""delete from segments_used + # Delete entries in the segment_utilization table which are for + # non-existent snapshots. + cur.execute("""delete from segment_utilization where snapshotid not in (select snapshotid from snapshots)""") - # Find segments which contain no objects used by any current snapshots, - # and delete them from the segment table. + # Delete segments not referenced by any current snapshots. cur.execute("""delete from segments where segmentid not in - (select segmentid from segments_used)""") + (select segmentid from segment_utilization)""") # Delete dangling objects in the block_index table. cur.execute("""delete from block_index @@ -920,11 +960,11 @@ class LocalDatabase: target_size = max(2 * segment_size_estimate, total_bytes / target_buckets) - print "segment_size:", segment_size_estimate - print "distribution:", distribution - print "total_bytes:", total_bytes - print "target_buckets:", target_buckets - print "min, target size:", min_size, target_size + print("segment_size:", segment_size_estimate) + print("distribution:", distribution) + print("total_bytes:", total_bytes) + print("target_buckets:", target_buckets) + print("min, target size:", min_size, target_size) # Chosen cutoffs. Each bucket consists of objects with age greater # than one cutoff value, but not greater than the next largest cutoff. @@ -954,7 +994,7 @@ class LocalDatabase: cutoffs.append(-1) cutoffs.append(-1) - print "cutoffs:", cutoffs + print("cutoffs:", cutoffs) # Update the database to assign each object to the appropriate bucket. cutoffs.reverse()