Update backend code and cumulus-sync for the new backup layout.
authorMichael Vrable <vrable@cs.hmc.edu>
Wed, 23 Oct 2013 20:30:12 +0000 (13:30 -0700)
committerMichael Vrable <vrable@cs.hmc.edu>
Tue, 28 Jan 2014 16:33:51 +0000 (08:33 -0800)
README
cumulus-sync
python/cumulus/__init__.py
python/cumulus/cmd_util.py
python/cumulus/store/__init__.py
python/cumulus/store/file.py
python/cumulus/store/s3.py

diff --git a/README b/README
index cb0a0a7..1eba0ce 100644 (file)
--- a/README
+++ b/README
@@ -6,7 +6,7 @@ How to Build
 Dependencies:
   - libuuid (sometimes part of e2fsprogs)
   - sqlite3
 Dependencies:
   - libuuid (sometimes part of e2fsprogs)
   - sqlite3
-  - Python (2.5 or later)
+  - Python (2.6 or later)
   - boto, the python interface to Amazon's Web Services (for S3 storage)
     http://code.google.com/p/boto
   - paramiko, SSH2 protocol for python (for sftp storage)
   - boto, the python interface to Amazon's Web Services (for S3 storage)
     http://code.google.com/p/boto
   - paramiko, SSH2 protocol for python (for sftp storage)
index 8a474a5..19d4aea 100755 (executable)
@@ -43,14 +43,14 @@ for s in snapshots:
     items_required.add(s)
     d = cumulus.parse_full(source.load_snapshot(s))
     items_required.update(d['Segments'].split())
     items_required.add(s)
     d = cumulus.parse_full(source.load_snapshot(s))
     items_required.update(d['Segments'].split())
-print "Required:", items_required
+print "Required:", len(items_required)
 
 files_present = set()
 for filetype in cumulus.SEARCH_PATHS:
     for (name, path) in store2.list_generic(filetype):
         items_required.discard(name)
         files_present.add(path)
 
 files_present = set()
 for filetype in cumulus.SEARCH_PATHS:
     for (name, path) in store2.list_generic(filetype):
         items_required.discard(name)
         files_present.add(path)
-print "Files already present:", sorted(files_present)
+print "Files already present:", len(sorted(files_present))
 
 files_required = []
 items_found = set()
 
 files_required = []
 items_found = set()
@@ -61,9 +61,6 @@ for filetype in cumulus.SEARCH_PATHS:
             items_found.add(name)
 files_required.sort()
 
             items_found.add(name)
 files_required.sort()
 
-print "Missing:", items_required.difference(items_found)
-print "Required files:", files_required
-
-for f in files_required:
-    print f
+for i, f in enumerate(files_required):
+    print "[%d/%d] %s" % (i + 1, len(files_required), f)
     store2.raw_backend.put(f, store1.raw_backend.get(f))
     store2.raw_backend.put(f, store1.raw_backend.get(f))
index ef35325..02f978e 100644 (file)
@@ -219,7 +219,7 @@ class SearchPath(object):
             except cumulus.store.NotFoundError:
                 pass
         if not success:
             except cumulus.store.NotFoundError:
                 pass
         if not success:
-            raise cumulus.store.NotFoundError(basename)
+            raise cumulus.store.NotFoundError(backend)
 
 def _build_segments_searchpath(prefix):
     for (extension, filter) in SEGMENT_FILTERS:
 
 def _build_segments_searchpath(prefix):
     for (extension, filter) in SEGMENT_FILTERS:
@@ -231,6 +231,9 @@ SEARCH_PATHS = {
         [SearchPathEntry("meta", ".sha1sums"),
          SearchPathEntry("checksums", ".sha1sums"),
          SearchPathEntry("", ".sha1sums")]),
         [SearchPathEntry("meta", ".sha1sums"),
          SearchPathEntry("checksums", ".sha1sums"),
          SearchPathEntry("", ".sha1sums")]),
+    "meta": SearchPath(
+        r"^snapshot-(.*)\.meta(\.\S+)?$",
+        _build_segments_searchpath("meta")),
     "segments": SearchPath(
         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
          r"\.tar(\.\S+)?$"),
     "segments": SearchPath(
         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
          r"\.tar(\.\S+)?$"),
@@ -289,6 +292,15 @@ class BackendWrapper(object):
         return ((x[1].group(1), x[0])
                 for x in SEARCH_PATHS[filetype].list(self._backend))
 
         return ((x[1].group(1), x[0])
                 for x in SEARCH_PATHS[filetype].list(self._backend))
 
+    def prefetch_generic(self):
+        """Calls scan on directories to prefetch file metadata."""
+        directories = set()
+        for typeinfo in SEARCH_PATHS.values():
+            directories.update(typeinfo.directories())
+        for d in directories:
+            print "Prefetch", d
+            self._backend.scan(d)
+
 class CumulusStore:
     def __init__(self, backend):
         if isinstance(backend, BackendWrapper):
 class CumulusStore:
     def __init__(self, backend):
         if isinstance(backend, BackendWrapper):
@@ -427,6 +439,9 @@ class CumulusStore:
 
         return data
 
 
         return data
 
+    def prefetch(self):
+        self.backend.prefetch_generic()
+
 def parse(lines, terminate=None):
     """Generic parser for RFC822-style "Key: Value" data streams.
 
 def parse(lines, terminate=None):
     """Generic parser for RFC822-style "Key: Value" data streams.
 
index 9d97190..2e163cf 100644 (file)
@@ -85,6 +85,7 @@ def cmd_list_snapshot_sizes(args):
     """
     store = cumulus.CumulusStore(options.store)
     backend = store.backend
     """
     store = cumulus.CumulusStore(options.store)
     backend = store.backend
+    backend.prefetch_generic()
     previous = set()
     size = 0
     def get_size(segment):
     previous = set()
     size = 0
     def get_size(segment):
index 0899d70..7488b2f 100644 (file)
@@ -70,7 +70,7 @@ class Store (object):
     def stat(self, path):
         raise NotImplementedError
 
     def stat(self, path):
         raise NotImplementedError
 
-    def scan(self):
+    def scan(self, path):
         """Cache file information stored in this backend.
 
         This might make subsequent list or stat calls more efficient, but this
         """Cache file information stored in this backend.
 
         This might make subsequent list or stat calls more efficient, but this
index 8304401..e2da34a 100644 (file)
@@ -29,9 +29,6 @@ class FileStore(cumulus.store.Store):
             self.path = url
         self.prefix = self.path.rstrip("/")
 
             self.path = url
         self.prefix = self.path.rstrip("/")
 
-    def _get_path(self, type, name):
-        return os.path.join(self.prefix, type, name)
-
     def list(self, subdir):
         try:
             return os.listdir(os.path.join(self.prefix, subdir))
     def list(self, subdir):
         try:
             return os.listdir(os.path.join(self.prefix, subdir))
index 4ad403c..7d8aaaf 100644 (file)
 
 import os, sys, tempfile
 import boto
 
 import os, sys, tempfile
 import boto
+from boto.exception import S3ResponseError
 from boto.s3.bucket import Bucket
 from boto.s3.key import Key
 
 import cumulus.store
 
 from boto.s3.bucket import Bucket
 from boto.s3.key import Key
 
 import cumulus.store
 
+def throw_notfound(method):
+    """Decorator to convert a 404 error into a cumulus.store.NoutFoundError."""
+    def f(*args, **kwargs):
+        try:
+            return method(*args, **kwargs)
+        except S3ResponseError as e:
+            if e.status == 404:
+                print "Got a 404:", e
+                raise cumulus.store.NotFoundError(e)
+            else:
+                raise
+    return f
+
 class S3Store(cumulus.store.Store):
     def __init__(self, url, **kw):
         # Old versions of the Python urlparse library will take a URL like
 class S3Store(cumulus.store.Store):
     def __init__(self, url, **kw):
         # Old versions of the Python urlparse library will take a URL like
@@ -44,39 +58,44 @@ class S3Store(cumulus.store.Store):
         self.prefix = prefix.strip("/")
         self.scan_cache = {}
 
         self.prefix = prefix.strip("/")
         self.scan_cache = {}
 
-    def _get_key(self, type, name):
+    def _get_key(self, path):
         k = Key(self.bucket)
         k = Key(self.bucket)
-        k.key = "%s/%s/%s" % (self.prefix, type, name)
+        k.key = "%s/%s" % (self.prefix, path)
         return k
 
         return k
 
-    def scan(self):
-        prefix = "%s/" % (self.prefix,)
+    @throw_notfound
+    def scan(self, path):
+        prefix = "%s/%s/" % (self.prefix, path)
         for i in self.bucket.list(prefix):
             assert i.key.startswith(prefix)
             self.scan_cache[i.key] = i
 
         for i in self.bucket.list(prefix):
             assert i.key.startswith(prefix)
             self.scan_cache[i.key] = i
 
-    def list(self, type):
-        prefix = "%s/%s/" % (self.prefix, type)
+    @throw_notfound
+    def list(self, path):
+        prefix = "%s/%s/" % (self.prefix, path)
         for i in self.bucket.list(prefix):
             assert i.key.startswith(prefix)
             yield i.key[len(prefix):]
 
         for i in self.bucket.list(prefix):
             assert i.key.startswith(prefix)
             yield i.key[len(prefix):]
 
-    def get(self, type, name):
+    @throw_notfound
+    def get(self, path):
         fp = tempfile.TemporaryFile()
         fp = tempfile.TemporaryFile()
-        k = self._get_key(type, name)
+        k = self._get_key(path)
         k.get_file(fp)
         fp.seek(0)
         return fp
 
         k.get_file(fp)
         fp.seek(0)
         return fp
 
-    def put(self, type, name, fp):
-        k = self._get_key(type, name)
+    @throw_notfound
+    def put(self, path, fp):
+        k = self._get_key(path)
         k.set_contents_from_file(fp)
 
         k.set_contents_from_file(fp)
 
-    def delete(self, type, name):
-        self.bucket.delete_key("%s/%s/%s" % (self.prefix, type, name))
+    @throw_notfound
+    def delete(self, path):
+        self.bucket.delete_key("%s/%s" % (self.prefix, path))
 
 
-    def stat(self, type, name):
-        path = "%s/%s/%s" % (self.prefix, type, name)
+    def stat(self, path):
+        path = "%s/%s" % (self.prefix, path)
         if path in self.scan_cache:
             k = self.scan_cache[path]
         else:
         if path in self.scan_cache:
             k = self.scan_cache[path]
         else: