Start a cleanup on the storage backends.
authorMichael Vrable <vrable@cs.hmc.edu>
Wed, 29 Jan 2014 05:34:22 +0000 (21:34 -0800)
committerMichael Vrable <vrable@cs.hmc.edu>
Sat, 1 Feb 2014 17:18:56 +0000 (09:18 -0800)
The basic interface and file and s3 backends are updated.  Remaining
backends still need some work.

python/cumulus/__init__.py
python/cumulus/store/__init__.py
python/cumulus/store/file.py
python/cumulus/store/s3.py

index 0e39d37..4b7bd6c 100644 (file)
@@ -275,10 +275,7 @@ class BackendWrapper(object):
         store may either be a Store object or URL.
         """
         if type(backend) in StringTypes:
-            if backend.find(":") >= 0:
-                self._backend = cumulus.store.open(backend)
-            else:
-                self._backend = cumulus.store.file.FileStore(backend)
+            self._backend = cumulus.store.open(backend)
         else:
             self._backend = backend
 
index e638bc5..a59c217 100644 (file)
 
 from __future__ import division, print_function, unicode_literals
 
+import importlib
 import re
 try:
-    from urllib.parse import urlparse
+    # Python 3
+    from urllib import parse as urlparse
+    from urllib.parse import quote, unquote
 except ImportError:
-    from urlparse import urlparse
+    # Python 2
+    from urllib import quote, unquote
+    import urlparse
 
 type_patterns = {
     'checksums': re.compile(r"^snapshot-(.*)\.(\w+)sums$"),
@@ -35,31 +40,19 @@ class NotFoundError(KeyError):
 
     pass
 
-class Store (object):
+class Store(object):
     """Base class for all cumulus storage backends."""
 
-    def __new__ (cls, url, **kw):
-        """ Return the correct sub-class depending on url,
-        pass parsed url parameters to object
+    def __init__(self, url):
+        """Initializes a new storage backend.
+
+        Params:
+          url: The parsed (by urlsplit) URL that specifies the storage
+              location.
         """
-        if cls != Store:
-            return super(Store, cls).__new__(cls, url, **kw)
-        (scheme, netloc, path, params, query, fragment) \
-            = urlparse(url)
-
-        try:
-            cumulus = __import__('cumulus.store.%s' % scheme, globals())
-            subcls = getattr (cumulus.store, scheme).Store
-            obj = super(Store, cls).__new__(subcls, url, **kw)
-            obj.scheme = scheme
-            obj.netloc = netloc
-            obj.path = path
-            obj.params = params
-            obj.query = query
-            obj.fragment = fragment
-            return obj
-        except ImportError:
-            raise NotImplementedError("Scheme %s not implemented" % scheme)
+        pass
+
+    # TODO: Implement context manager.
 
     def list(self, path):
         raise NotImplementedError
@@ -95,4 +88,23 @@ class Store (object):
         self.close()
 
 def open(url):
-    return Store(url)
+    """Parse a storage url, then locate and initialize a backend for it."""
+    parsed_url = urlparse.urlsplit(url)
+
+    # If there is no scheme, fall back to treating the string as local path and
+    # construct a file:/// URL.
+    if not parsed_url.scheme:
+        parsed_url = urlparse.SplitResult("file", "", quote(url), "", "")
+
+    try:
+        # TODO: Support a registry for schemes that don't map to a module.
+        if re.match(r"^\w+$", parsed_url.scheme):
+            handler = importlib.import_module("cumulus.store.%s" %
+                                              parsed_url.scheme)
+            obj = handler.Store(parsed_url)
+            return obj
+    except ImportError:
+        # Fall through to error below
+        pass
+
+    raise NotImplementedError("Scheme %s not implemented" % scheme)
index 6aea58b..fd6805f 100644 (file)
@@ -22,14 +22,11 @@ import os, sys, tempfile
 
 import cumulus.store
 
-type_patterns = cumulus.store.type_patterns
-
-class FileStore(cumulus.store.Store):
-    def __init__(self, url, **kw):
-        # if constructor isn't called via factory interpret url as filename
-        if not hasattr (self, 'path'):
-            self.path = url
-        self.prefix = self.path.rstrip("/")
+class Store(cumulus.store.Store):
+    """Storage backend that accesses the local file system."""
+    def __init__(self, url):
+        super(Store, self).__init__(url)
+        self.prefix = cumulus.store.unquote(url.path)
 
     def list(self, subdir):
         try:
@@ -44,11 +41,11 @@ class FileStore(cumulus.store.Store):
             raise cumulus.store.NotFoundError(path)
 
     def put(self, path, fp):
-        out = open(os.path.join(self.prefix, path), 'wb')
-        buf = fp.read(4096)
-        while len(buf) > 0:
-            out.write(buf)
+        with open(os.path.join(self.prefix, path), "wb") as out:
             buf = fp.read(4096)
+            while len(buf) > 0:
+                out.write(buf)
+                buf = fp.read(4096)
 
     def delete(self, path):
         os.unlink(os.path.join(self.prefix, path))
@@ -59,5 +56,3 @@ class FileStore(cumulus.store.Store):
             return {'size': stat.st_size}
         except OSError:
             raise cumulus.store.NotFoundError(path)
-
-Store = FileStore
index ef40113..e9e89e4 100644 (file)
@@ -35,46 +35,44 @@ def throw_notfound(method):
             return method(*args, **kwargs)
         except S3ResponseError as e:
             if e.status == 404:
-                print("Got a 404:", e)
                 raise cumulus.store.NotFoundError(e)
             else:
                 raise
     return f
 
-class S3Store(cumulus.store.Store):
-    def __init__(self, url, **kw):
-        # Old versions of the Python urlparse library will take a URL like
-        # s3://bucket/path/ and include the bucket with the path, while new
-        # versions (2.6 and later) treat it as the netloc (which seems more
-        # correct).
-        #
-        # But, so that we can work with either behavior, for now just combine
-        # the netloc and path together before we do any further processing
-        # (which will then split the combined path apart into a bucket and path
-        # again).  If we didn't want to support Python 2.5, this would be
-        # easier as we could just use the netloc as the bucket directly.
-        path = self.netloc + '/' + self.path
-        (bucket, prefix) = path.lstrip("/").split("/", 1)
+class Store(cumulus.store.Store):
+    def __init__(self, url):
+        super(Store, self).__init__(url)
         self.conn = boto.connect_s3(is_secure=False)
-        self.bucket = self.conn.create_bucket(bucket)
-        self.prefix = prefix.strip("/")
+        self.bucket = self.conn.create_bucket(url.hostname)
+        self.prefix = url.path
+        if not self.prefix.endswith("/"):
+            self.prefix += "/"
+        self.prefix = self.prefix.lstrip("/")
         self.scan_cache = {}
 
+    def _fullpath(self, path, is_directory=False):
+        fullpath = self.prefix + path
+        if is_directory and not fullpath.endswith("/"):
+            fullpath += "/"
+        return fullpath
+
     def _get_key(self, path):
         k = Key(self.bucket)
-        k.key = "%s/%s" % (self.prefix, path)
+        k.key = self._fullpath(path)
         return k
 
     @throw_notfound
     def scan(self, path):
-        prefix = "%s/%s/" % (self.prefix, path)
+        prefix = self._fullpath(path, is_directory=True)
         for i in self.bucket.list(prefix):
             assert i.key.startswith(prefix)
             self.scan_cache[i.key] = i
 
     @throw_notfound
     def list(self, path):
-        prefix = "%s/%s/" % (self.prefix, path)
+        prefix = self._fullpath(path, is_directory=True)
+        # TODO: Should use a delimiter
         for i in self.bucket.list(prefix):
             assert i.key.startswith(prefix)
             yield i.key[len(prefix):]
@@ -94,10 +92,10 @@ class S3Store(cumulus.store.Store):
 
     @throw_notfound
     def delete(self, path):
-        self.bucket.delete_key("%s/%s" % (self.prefix, path))
+        self.bucket.delete_key(self._fullpath(path))
 
     def stat(self, path):
-        path = "%s/%s" % (self.prefix, path)
+        path = self._fullpath(path)
         if path in self.scan_cache:
             k = self.scan_cache[path]
         else:
@@ -106,5 +104,3 @@ class S3Store(cumulus.store.Store):
             raise cumulus.store.NotFoundError
 
         return {'size': int(k.size)}
-
-Store = S3Store