- reading and maintaining the local object database
"""
-from __future__ import division
+from __future__ import division, print_function, unicode_literals
+
+import codecs
import hashlib
import itertools
import os
import re
+import sqlite3
+import subprocess
+import sys
import tarfile
import tempfile
-import thread
-from pysqlite2 import dbapi2 as sqlite3
+try:
+ import _thread
+except ImportError:
+ import thread as _thread
import cumulus.store
import cumulus.store.file
+if sys.version < "3":
+ StringTypes = (str, unicode)
+else:
+ StringTypes = (str,)
+
# The largest supported snapshot format that can be understood.
FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
(".gpg", "cumulus-filter-gpg --decrypt"),
(".gz", "gzip -dc"),
(".bz2", "bzip2 -dc"),
+ ("", None),
]
+def to_lines(data):
+ """Decode binary data from a file into a sequence of lines.
+
+ Newline markers are retained."""
+ return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
+
def uri_decode(s):
"""Decode a URI-encoded (%xx escapes) string."""
def hex_decode(m): return chr(int(m.group(1), 16))
continue
raise cumulus.store.NotFoundError(basename)
+ def match(self, filename):
+ return self._regex.match(filename)
+
def list(self, backend):
success = False
for d in self.directories():
try:
for f in backend.list(d):
success = True
- m = self._regex.match(f)
+ m = self.match(f)
if m: yield (os.path.join(d, f), m)
except cumulus.store.NotFoundError:
pass
if not success:
- raise cumulus.store.NotFoundError(basename)
+ raise cumulus.store.NotFoundError(backend)
def _build_segments_searchpath(prefix):
for (extension, filter) in SEGMENT_FILTERS:
[SearchPathEntry("meta", ".sha1sums"),
SearchPathEntry("checksums", ".sha1sums"),
SearchPathEntry("", ".sha1sums")]),
+ "meta": SearchPath(
+ r"^snapshot-(.*)\.meta(\.\S+)?$",
+ _build_segments_searchpath("meta")),
"segments": SearchPath(
(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
- r"(\.\S+)?$"),
+ r"\.tar(\.\S+)?$"),
itertools.chain(
_build_segments_searchpath("segments0"),
_build_segments_searchpath("segments1"),
store may either be a Store object or URL.
"""
- if type(backend) in (str, unicode):
- if backend.find(":") >= 0:
- self._backend = cumulus.store.open(backend)
- else:
- self._backend = cumulus.store.file.FileStore(backend)
+ if type(backend) in StringTypes:
+ self._backend = cumulus.store.open(backend)
else:
self._backend = backend
return ((x[1].group(1), x[0])
for x in SEARCH_PATHS[filetype].list(self._backend))
+ def prefetch_generic(self):
+ """Calls scan on directories to prefetch file metadata."""
+ directories = set()
+ for typeinfo in SEARCH_PATHS.values():
+ directories.update(typeinfo.directories())
+ for d in directories:
+ print("Prefetch", d)
+ self._backend.scan(d)
+
class CumulusStore:
def __init__(self, backend):
if isinstance(backend, BackendWrapper):
def load_snapshot(self, snapshot):
snapshot_file = self.backend.open_snapshot(snapshot)[0]
- return snapshot_file.read().splitlines(True)
-
- def get_segment(self, segment):
- accessed_segments.add(segment)
+ return to_lines(snapshot_file.read())
- (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+ @staticmethod
+ def filter_data(filehandle, filter_cmd):
if filter_cmd is None:
- return segment_fp
- (input, output) = os.popen2(filter_cmd)
+ return filehandle
+ p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, close_fds=True)
+ input, output = p.stdin, p.stdout
def copy_thread(src, dst):
BLOCK_SIZE = 4096
while True:
dst.write(block)
src.close()
dst.close()
- thread.start_new_thread(copy_thread, (segment_fp, input))
+ p.wait()
+ _thread.start_new_thread(copy_thread, (filehandle, input))
return output
+ def get_segment(self, segment):
+ accessed_segments.add(segment)
+
+ (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
+ return self.filter_data(segment_fp, filter_cmd)
+
def load_segment(self, segment):
seg = tarfile.open(segment, 'r|', self.get_segment(segment))
for item in seg:
return data
+ def prefetch(self):
+ self.backend.prefetch_generic()
+
def parse(lines, terminate=None):
"""Generic parser for RFC822-style "Key: Value" data streams.
def parse_full(lines):
try:
- return parse(lines).next()
+ return next(parse(lines))
except StopIteration:
return {}
def follow_ref(refstr):
if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
- lines = object_store.get(refstr).splitlines(True)
+ lines = to_lines(object_store.get(refstr))
lines.reverse()
stack.append(lines)
can_delete = True
if can_delete and not first:
- print "Delete snapshot %d (%s)" % (id, name)
+ print("Delete snapshot %d (%s)" % (id, name))
cur.execute("delete from snapshots where snapshotid = ?",
(id,))
first = False
target_size = max(2 * segment_size_estimate,
total_bytes / target_buckets)
- print "segment_size:", segment_size_estimate
- print "distribution:", distribution
- print "total_bytes:", total_bytes
- print "target_buckets:", target_buckets
- print "min, target size:", min_size, target_size
+ print("segment_size:", segment_size_estimate)
+ print("distribution:", distribution)
+ print("total_bytes:", total_bytes)
+ print("target_buckets:", target_buckets)
+ print("min, target size:", min_size, target_size)
# Chosen cutoffs. Each bucket consists of objects with age greater
# than one cutoff value, but not greater than the next largest cutoff.
cutoffs.append(-1)
cutoffs.append(-1)
- print "cutoffs:", cutoffs
+ print("cutoffs:", cutoffs)
# Update the database to assign each object to the appropriate bucket.
cutoffs.reverse()