Dependencies:
- libuuid (sometimes part of e2fsprogs)
- sqlite3
- - Python (2.5 or later)
+ - Python (2.6 or later)
- boto, the python interface to Amazon's Web Services (for S3 storage)
http://code.google.com/p/boto
- paramiko, SSH2 protocol for python (for sftp storage)
items_required.add(s)
d = cumulus.parse_full(source.load_snapshot(s))
items_required.update(d['Segments'].split())
-print "Required:", items_required
+print "Required:", len(items_required)
files_present = set()
for filetype in cumulus.SEARCH_PATHS:
for (name, path) in store2.list_generic(filetype):
items_required.discard(name)
files_present.add(path)
-print "Files already present:", sorted(files_present)
+print "Files already present:", len(sorted(files_present))
files_required = []
items_found = set()
items_found.add(name)
files_required.sort()
-print "Missing:", items_required.difference(items_found)
-print "Required files:", files_required
-
-for f in files_required:
- print f
+for i, f in enumerate(files_required):
+ print "[%d/%d] %s" % (i + 1, len(files_required), f)
store2.raw_backend.put(f, store1.raw_backend.get(f))
except cumulus.store.NotFoundError:
pass
if not success:
- raise cumulus.store.NotFoundError(basename)
+ raise cumulus.store.NotFoundError(backend)
def _build_segments_searchpath(prefix):
for (extension, filter) in SEGMENT_FILTERS:
[SearchPathEntry("meta", ".sha1sums"),
SearchPathEntry("checksums", ".sha1sums"),
SearchPathEntry("", ".sha1sums")]),
+ "meta": SearchPath(
+ r"^snapshot-(.*)\.meta(\.\S+)?$",
+ _build_segments_searchpath("meta")),
"segments": SearchPath(
(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
r"\.tar(\.\S+)?$"),
return ((x[1].group(1), x[0])
for x in SEARCH_PATHS[filetype].list(self._backend))
+ def prefetch_generic(self):
+ """Calls scan on directories to prefetch file metadata."""
+ directories = set()
+ for typeinfo in SEARCH_PATHS.values():
+ directories.update(typeinfo.directories())
+ for d in directories:
+ print "Prefetch", d
+ self._backend.scan(d)
+
class CumulusStore:
def __init__(self, backend):
if isinstance(backend, BackendWrapper):
return data
+ def prefetch(self):
+ self.backend.prefetch_generic()
+
def parse(lines, terminate=None):
"""Generic parser for RFC822-style "Key: Value" data streams.
"""
store = cumulus.CumulusStore(options.store)
backend = store.backend
+ backend.prefetch_generic()
previous = set()
size = 0
def get_size(segment):
def stat(self, path):
raise NotImplementedError
- def scan(self):
+ def scan(self, path):
"""Cache file information stored in this backend.
This might make subsequent list or stat calls more efficient, but this
self.path = url
self.prefix = self.path.rstrip("/")
- def _get_path(self, type, name):
- return os.path.join(self.prefix, type, name)
-
def list(self, subdir):
try:
return os.listdir(os.path.join(self.prefix, subdir))
import os, sys, tempfile
import boto
+from boto.exception import S3ResponseError
from boto.s3.bucket import Bucket
from boto.s3.key import Key
import cumulus.store
+def throw_notfound(method):
+ """Decorator to convert a 404 error into a cumulus.store.NoutFoundError."""
+ def f(*args, **kwargs):
+ try:
+ return method(*args, **kwargs)
+ except S3ResponseError as e:
+ if e.status == 404:
+ print "Got a 404:", e
+ raise cumulus.store.NotFoundError(e)
+ else:
+ raise
+ return f
+
class S3Store(cumulus.store.Store):
def __init__(self, url, **kw):
# Old versions of the Python urlparse library will take a URL like
self.prefix = prefix.strip("/")
self.scan_cache = {}
- def _get_key(self, type, name):
+ def _get_key(self, path):
k = Key(self.bucket)
- k.key = "%s/%s/%s" % (self.prefix, type, name)
+ k.key = "%s/%s" % (self.prefix, path)
return k
- def scan(self):
- prefix = "%s/" % (self.prefix,)
+ @throw_notfound
+ def scan(self, path):
+ prefix = "%s/%s/" % (self.prefix, path)
for i in self.bucket.list(prefix):
assert i.key.startswith(prefix)
self.scan_cache[i.key] = i
- def list(self, type):
- prefix = "%s/%s/" % (self.prefix, type)
+ @throw_notfound
+ def list(self, path):
+ prefix = "%s/%s/" % (self.prefix, path)
for i in self.bucket.list(prefix):
assert i.key.startswith(prefix)
yield i.key[len(prefix):]
- def get(self, type, name):
+ @throw_notfound
+ def get(self, path):
fp = tempfile.TemporaryFile()
- k = self._get_key(type, name)
+ k = self._get_key(path)
k.get_file(fp)
fp.seek(0)
return fp
- def put(self, type, name, fp):
- k = self._get_key(type, name)
+ @throw_notfound
+ def put(self, path, fp):
+ k = self._get_key(path)
k.set_contents_from_file(fp)
- def delete(self, type, name):
- self.bucket.delete_key("%s/%s/%s" % (self.prefix, type, name))
+ @throw_notfound
+ def delete(self, path):
+ self.bucket.delete_key("%s/%s" % (self.prefix, path))
- def stat(self, type, name):
- path = "%s/%s/%s" % (self.prefix, type, name)
+ def stat(self, path):
+ path = "%s/%s" % (self.prefix, path)
if path in self.scan_cache:
k = self.scan_cache[path]
else: