- reading and maintaining the local object database
"""
-from __future__ import division
+from __future__ import division, print_function, unicode_literals
+
+import codecs
import hashlib
import itertools
import os
+import posixpath
import re
+import six
import sqlite3
+import subprocess
+import sys
import tarfile
import tempfile
-import thread
+try:
+ import _thread
+except ImportError:
+ import thread as _thread
import cumulus.store
import cumulus.store.file
+import cumulus.util
# The largest supported snapshot format that can be understood.
FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
("", None),
]
-def uri_decode(s):
- """Decode a URI-encoded (%xx escapes) string."""
- def hex_decode(m): return chr(int(m.group(1), 16))
- return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
-def uri_encode(s):
- """Encode a string to URI-encoded (%xx escapes) form."""
- def hex_encode(c):
- if c > '+' and c < '\x7f' and c != '@':
- return c
- else:
- return "%%%02x" % (ord(c),)
- return ''.join(hex_encode(c) for c in s)
+def to_lines(data):
+ """Decode binary data from a file into a sequence of lines.
+
+ Newline markers are retained."""
+ return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
class Struct:
"""A class which merely acts as a data container.
and context is any additional data associated with this search entry
(if any).
"""
- return (os.path.join(self._directory_prefix, basename + self._suffix),
+ return (posixpath.join(self._directory_prefix, basename + self._suffix),
self._context)
class SearchPath(object):
for f in backend.list(d):
success = True
m = self.match(f)
- if m: yield (os.path.join(d, f), m)
+ if m: yield (posixpath.join(d, f), m)
except cumulus.store.NotFoundError:
pass
if not success:
store may either be a Store object or URL.
"""
- if type(backend) in (str, unicode):
- if backend.find(":") >= 0:
- self._backend = cumulus.store.open(backend)
- else:
- self._backend = cumulus.store.file.FileStore(backend)
+ if isinstance(backend, six.string_types):
+ self._backend = cumulus.store.open(backend)
else:
self._backend = backend
for typeinfo in SEARCH_PATHS.values():
directories.update(typeinfo.directories())
for d in directories:
- print "Prefetch", d
+ print("Prefetch", d)
self._backend.scan(d)
class CumulusStore:
if m:
return ("zero", None, None, (0, int(m.group(1)), False))
- m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
+ m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(=?(\d+)|(\d+)\+(\d+))\])?$", refstr)
if not m: return
segment = m.group(1)
checksum = checksum.lstrip("(").rstrip(")")
if slice is not None:
- if m.group(9) is not None:
+ if m.group(6) is not None:
# Size-assertion slice
- slice = (0, int(m.group(9)), True)
- elif m.group(6) is None:
- # Abbreviated slice
- slice = (0, int(m.group(8)), False)
+ slice = (0, int(m.group(6)), True)
else:
slice = (int(m.group(7)), int(m.group(8)), False)
def load_snapshot(self, snapshot):
snapshot_file = self.backend.open_snapshot(snapshot)[0]
- return snapshot_file.read().splitlines(True)
+ return to_lines(snapshot_file.read())
@staticmethod
def filter_data(filehandle, filter_cmd):
if filter_cmd is None:
return filehandle
- (input, output) = os.popen2(filter_cmd)
+ p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, close_fds=True)
+ input, output = p.stdin, p.stdout
def copy_thread(src, dst):
BLOCK_SIZE = 4096
while True:
dst.write(block)
src.close()
dst.close()
- thread.start_new_thread(copy_thread, (filehandle, input))
+ p.wait()
+ _thread.start_new_thread(copy_thread, (filehandle, input))
return output
def get_segment(self, segment):
if slice is not None:
(start, length, exact) = slice
+ # Note: The following assertion check may need to be commented out
+ # to restore from pre-v0.8 snapshots, as the syntax for
+ # size-assertion slices has changed.
if exact and len(data) != length: raise ValueError
data = data[start:start+length]
if len(data) != length: raise IndexError
stop reading input lines.
"""
- dict = {}
+ result = {}
last_key = None
+ def make_result(result):
+ return dict((k, "".join(v)) for (k, v) in result.items())
+
for l in lines:
# Strip off a trailing newline, if present
if len(l) > 0 and l[-1] == "\n":
l = l[:-1]
if terminate is not None and terminate(l):
- if len(dict) > 0: yield dict
- dict = {}
+ if len(result) > 0: yield make_result(result)
+ result = {}
last_key = None
continue
m = re.match(r"^([-\w]+):\s*(.*)$", l)
if m:
- dict[m.group(1)] = m.group(2)
+ result[m.group(1)] = [m.group(2)]
last_key = m.group(1)
elif len(l) > 0 and l[0].isspace() and last_key is not None:
- dict[last_key] += l
+ result[last_key].append(l)
else:
last_key = None
- if len(dict) > 0: yield dict
+ if len(result) > 0: yield make_result(result)
def parse_full(lines):
try:
- return parse(lines).next()
+ return next(parse(lines))
except StopIteration:
return {}
def follow_ref(refstr):
if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
- lines = object_store.get(refstr).splitlines(True)
+ lines = to_lines(object_store.get(refstr))
lines.reverse()
stack.append(lines)
@staticmethod
def decode_str(s):
"""Decode a URI-encoded (%xx escapes) string."""
- return uri_decode(s)
+ return cumulus.util.uri_decode_pathname(s)
@staticmethod
def raw_str(s):
can_delete = True
if can_delete and not first:
- print "Delete snapshot %d (%s)" % (id, name)
+ print("Delete snapshot %d (%s)" % (id, name))
cur.execute("delete from snapshots where snapshotid = ?",
(id,))
first = False
target_size = max(2 * segment_size_estimate,
total_bytes / target_buckets)
- print "segment_size:", segment_size_estimate
- print "distribution:", distribution
- print "total_bytes:", total_bytes
- print "target_buckets:", target_buckets
- print "min, target size:", min_size, target_size
+ print("segment_size:", segment_size_estimate)
+ print("distribution:", distribution)
+ print("total_bytes:", total_bytes)
+ print("target_buckets:", target_buckets)
+ print("min, target size:", min_size, target_size)
# Chosen cutoffs. Each bucket consists of objects with age greater
# than one cutoff value, but not greater than the next largest cutoff.
cutoffs.append(-1)
cutoffs.append(-1)
- print "cutoffs:", cutoffs
+ print("cutoffs:", cutoffs)
# Update the database to assign each object to the appropriate bucket.
cutoffs.reverse()