Update backend code and cumulus-sync for the new backup layout.
authorMichael Vrable <vrable@cs.hmc.edu>
Wed, 23 Oct 2013 20:30:12 +0000 (13:30 -0700)
committerMichael Vrable <vrable@cs.hmc.edu>
Tue, 28 Jan 2014 16:33:51 +0000 (08:33 -0800)
README
cumulus-sync
python/cumulus/__init__.py
python/cumulus/cmd_util.py
python/cumulus/store/__init__.py
python/cumulus/store/file.py
python/cumulus/store/s3.py

diff --git a/README b/README
index cb0a0a7..1eba0ce 100644 (file)
--- a/README
+++ b/README
@@ -6,7 +6,7 @@ How to Build
 Dependencies:
   - libuuid (sometimes part of e2fsprogs)
   - sqlite3
-  - Python (2.5 or later)
+  - Python (2.6 or later)
   - boto, the python interface to Amazon's Web Services (for S3 storage)
     http://code.google.com/p/boto
   - paramiko, SSH2 protocol for python (for sftp storage)
index 8a474a5..19d4aea 100755 (executable)
@@ -43,14 +43,14 @@ for s in snapshots:
     items_required.add(s)
     d = cumulus.parse_full(source.load_snapshot(s))
     items_required.update(d['Segments'].split())
-print "Required:", items_required
+print "Required:", len(items_required)
 
 files_present = set()
 for filetype in cumulus.SEARCH_PATHS:
     for (name, path) in store2.list_generic(filetype):
         items_required.discard(name)
         files_present.add(path)
-print "Files already present:", sorted(files_present)
+print "Files already present:", len(sorted(files_present))
 
 files_required = []
 items_found = set()
@@ -61,9 +61,6 @@ for filetype in cumulus.SEARCH_PATHS:
             items_found.add(name)
 files_required.sort()
 
-print "Missing:", items_required.difference(items_found)
-print "Required files:", files_required
-
-for f in files_required:
-    print f
+for i, f in enumerate(files_required):
+    print "[%d/%d] %s" % (i + 1, len(files_required), f)
     store2.raw_backend.put(f, store1.raw_backend.get(f))
index ef35325..02f978e 100644 (file)
@@ -219,7 +219,7 @@ class SearchPath(object):
             except cumulus.store.NotFoundError:
                 pass
         if not success:
-            raise cumulus.store.NotFoundError(basename)
+            raise cumulus.store.NotFoundError(backend)
 
 def _build_segments_searchpath(prefix):
     for (extension, filter) in SEGMENT_FILTERS:
@@ -231,6 +231,9 @@ SEARCH_PATHS = {
         [SearchPathEntry("meta", ".sha1sums"),
          SearchPathEntry("checksums", ".sha1sums"),
          SearchPathEntry("", ".sha1sums")]),
+    "meta": SearchPath(
+        r"^snapshot-(.*)\.meta(\.\S+)?$",
+        _build_segments_searchpath("meta")),
     "segments": SearchPath(
         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
          r"\.tar(\.\S+)?$"),
@@ -289,6 +292,15 @@ class BackendWrapper(object):
         return ((x[1].group(1), x[0])
                 for x in SEARCH_PATHS[filetype].list(self._backend))
 
+    def prefetch_generic(self):
+        """Calls scan on directories to prefetch file metadata."""
+        directories = set()
+        for typeinfo in SEARCH_PATHS.values():
+            directories.update(typeinfo.directories())
+        for d in directories:
+            print "Prefetch", d
+            self._backend.scan(d)
+
 class CumulusStore:
     def __init__(self, backend):
         if isinstance(backend, BackendWrapper):
@@ -427,6 +439,9 @@ class CumulusStore:
 
         return data
 
+    def prefetch(self):
+        self.backend.prefetch_generic()
+
 def parse(lines, terminate=None):
     """Generic parser for RFC822-style "Key: Value" data streams.
 
index 9d97190..2e163cf 100644 (file)
@@ -85,6 +85,7 @@ def cmd_list_snapshot_sizes(args):
     """
     store = cumulus.CumulusStore(options.store)
     backend = store.backend
+    backend.prefetch_generic()
     previous = set()
     size = 0
     def get_size(segment):
index 0899d70..7488b2f 100644 (file)
@@ -70,7 +70,7 @@ class Store (object):
     def stat(self, path):
         raise NotImplementedError
 
-    def scan(self):
+    def scan(self, path):
         """Cache file information stored in this backend.
 
         This might make subsequent list or stat calls more efficient, but this
index 8304401..e2da34a 100644 (file)
@@ -29,9 +29,6 @@ class FileStore(cumulus.store.Store):
             self.path = url
         self.prefix = self.path.rstrip("/")
 
-    def _get_path(self, type, name):
-        return os.path.join(self.prefix, type, name)
-
     def list(self, subdir):
         try:
             return os.listdir(os.path.join(self.prefix, subdir))
index 4ad403c..7d8aaaf 100644 (file)
 
 import os, sys, tempfile
 import boto
+from boto.exception import S3ResponseError
 from boto.s3.bucket import Bucket
 from boto.s3.key import Key
 
 import cumulus.store
 
+def throw_notfound(method):
+    """Decorator to convert a 404 error into a cumulus.store.NoutFoundError."""
+    def f(*args, **kwargs):
+        try:
+            return method(*args, **kwargs)
+        except S3ResponseError as e:
+            if e.status == 404:
+                print "Got a 404:", e
+                raise cumulus.store.NotFoundError(e)
+            else:
+                raise
+    return f
+
 class S3Store(cumulus.store.Store):
     def __init__(self, url, **kw):
         # Old versions of the Python urlparse library will take a URL like
@@ -44,39 +58,44 @@ class S3Store(cumulus.store.Store):
         self.prefix = prefix.strip("/")
         self.scan_cache = {}
 
-    def _get_key(self, type, name):
+    def _get_key(self, path):
         k = Key(self.bucket)
-        k.key = "%s/%s/%s" % (self.prefix, type, name)
+        k.key = "%s/%s" % (self.prefix, path)
         return k
 
-    def scan(self):
-        prefix = "%s/" % (self.prefix,)
+    @throw_notfound
+    def scan(self, path):
+        prefix = "%s/%s/" % (self.prefix, path)
         for i in self.bucket.list(prefix):
             assert i.key.startswith(prefix)
             self.scan_cache[i.key] = i
 
-    def list(self, type):
-        prefix = "%s/%s/" % (self.prefix, type)
+    @throw_notfound
+    def list(self, path):
+        prefix = "%s/%s/" % (self.prefix, path)
         for i in self.bucket.list(prefix):
             assert i.key.startswith(prefix)
             yield i.key[len(prefix):]
 
-    def get(self, type, name):
+    @throw_notfound
+    def get(self, path):
         fp = tempfile.TemporaryFile()
-        k = self._get_key(type, name)
+        k = self._get_key(path)
         k.get_file(fp)
         fp.seek(0)
         return fp
 
-    def put(self, type, name, fp):
-        k = self._get_key(type, name)
+    @throw_notfound
+    def put(self, path, fp):
+        k = self._get_key(path)
         k.set_contents_from_file(fp)
 
-    def delete(self, type, name):
-        self.bucket.delete_key("%s/%s/%s" % (self.prefix, type, name))
+    @throw_notfound
+    def delete(self, path):
+        self.bucket.delete_key("%s/%s" % (self.prefix, path))
 
-    def stat(self, type, name):
-        path = "%s/%s/%s" % (self.prefix, type, name)
+    def stat(self, path):
+        path = "%s/%s" % (self.prefix, path)
         if path in self.scan_cache:
             k = self.scan_cache[path]
         else: