From 8637c84e5657ec8aa677091a1cde4c2fe666cbfb Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Tue, 28 Jan 2014 21:34:22 -0800 Subject: [PATCH] Start a cleanup on the storage backends. The basic interface and file and s3 backends are updated. Remaining backends still need some work. --- python/cumulus/__init__.py | 5 +-- python/cumulus/store/__init__.py | 62 +++++++++++++++++++------------- python/cumulus/store/file.py | 23 +++++------- python/cumulus/store/s3.py | 44 +++++++++++------------ 4 files changed, 67 insertions(+), 67 deletions(-) diff --git a/python/cumulus/__init__.py b/python/cumulus/__init__.py index 0e39d37..4b7bd6c 100644 --- a/python/cumulus/__init__.py +++ b/python/cumulus/__init__.py @@ -275,10 +275,7 @@ class BackendWrapper(object): store may either be a Store object or URL. """ if type(backend) in StringTypes: - if backend.find(":") >= 0: - self._backend = cumulus.store.open(backend) - else: - self._backend = cumulus.store.file.FileStore(backend) + self._backend = cumulus.store.open(backend) else: self._backend = backend diff --git a/python/cumulus/store/__init__.py b/python/cumulus/store/__init__.py index e638bc5..a59c217 100644 --- a/python/cumulus/store/__init__.py +++ b/python/cumulus/store/__init__.py @@ -18,11 +18,16 @@ from __future__ import division, print_function, unicode_literals +import importlib import re try: - from urllib.parse import urlparse + # Python 3 + from urllib import parse as urlparse + from urllib.parse import quote, unquote except ImportError: - from urlparse import urlparse + # Python 2 + from urllib import quote, unquote + import urlparse type_patterns = { 'checksums': re.compile(r"^snapshot-(.*)\.(\w+)sums$"), @@ -35,31 +40,19 @@ class NotFoundError(KeyError): pass -class Store (object): +class Store(object): """Base class for all cumulus storage backends.""" - def __new__ (cls, url, **kw): - """ Return the correct sub-class depending on url, - pass parsed url parameters to object + def __init__(self, url): + """Initializes a new storage backend. + + Params: + url: The parsed (by urlsplit) URL that specifies the storage + location. """ - if cls != Store: - return super(Store, cls).__new__(cls, url, **kw) - (scheme, netloc, path, params, query, fragment) \ - = urlparse(url) - - try: - cumulus = __import__('cumulus.store.%s' % scheme, globals()) - subcls = getattr (cumulus.store, scheme).Store - obj = super(Store, cls).__new__(subcls, url, **kw) - obj.scheme = scheme - obj.netloc = netloc - obj.path = path - obj.params = params - obj.query = query - obj.fragment = fragment - return obj - except ImportError: - raise NotImplementedError("Scheme %s not implemented" % scheme) + pass + + # TODO: Implement context manager. def list(self, path): raise NotImplementedError @@ -95,4 +88,23 @@ class Store (object): self.close() def open(url): - return Store(url) + """Parse a storage url, then locate and initialize a backend for it.""" + parsed_url = urlparse.urlsplit(url) + + # If there is no scheme, fall back to treating the string as local path and + # construct a file:/// URL. + if not parsed_url.scheme: + parsed_url = urlparse.SplitResult("file", "", quote(url), "", "") + + try: + # TODO: Support a registry for schemes that don't map to a module. + if re.match(r"^\w+$", parsed_url.scheme): + handler = importlib.import_module("cumulus.store.%s" % + parsed_url.scheme) + obj = handler.Store(parsed_url) + return obj + except ImportError: + # Fall through to error below + pass + + raise NotImplementedError("Scheme %s not implemented" % scheme) diff --git a/python/cumulus/store/file.py b/python/cumulus/store/file.py index 6aea58b..fd6805f 100644 --- a/python/cumulus/store/file.py +++ b/python/cumulus/store/file.py @@ -22,14 +22,11 @@ import os, sys, tempfile import cumulus.store -type_patterns = cumulus.store.type_patterns - -class FileStore(cumulus.store.Store): - def __init__(self, url, **kw): - # if constructor isn't called via factory interpret url as filename - if not hasattr (self, 'path'): - self.path = url - self.prefix = self.path.rstrip("/") +class Store(cumulus.store.Store): + """Storage backend that accesses the local file system.""" + def __init__(self, url): + super(Store, self).__init__(url) + self.prefix = cumulus.store.unquote(url.path) def list(self, subdir): try: @@ -44,11 +41,11 @@ class FileStore(cumulus.store.Store): raise cumulus.store.NotFoundError(path) def put(self, path, fp): - out = open(os.path.join(self.prefix, path), 'wb') - buf = fp.read(4096) - while len(buf) > 0: - out.write(buf) + with open(os.path.join(self.prefix, path), "wb") as out: buf = fp.read(4096) + while len(buf) > 0: + out.write(buf) + buf = fp.read(4096) def delete(self, path): os.unlink(os.path.join(self.prefix, path)) @@ -59,5 +56,3 @@ class FileStore(cumulus.store.Store): return {'size': stat.st_size} except OSError: raise cumulus.store.NotFoundError(path) - -Store = FileStore diff --git a/python/cumulus/store/s3.py b/python/cumulus/store/s3.py index ef40113..e9e89e4 100644 --- a/python/cumulus/store/s3.py +++ b/python/cumulus/store/s3.py @@ -35,46 +35,44 @@ def throw_notfound(method): return method(*args, **kwargs) except S3ResponseError as e: if e.status == 404: - print("Got a 404:", e) raise cumulus.store.NotFoundError(e) else: raise return f -class S3Store(cumulus.store.Store): - def __init__(self, url, **kw): - # Old versions of the Python urlparse library will take a URL like - # s3://bucket/path/ and include the bucket with the path, while new - # versions (2.6 and later) treat it as the netloc (which seems more - # correct). - # - # But, so that we can work with either behavior, for now just combine - # the netloc and path together before we do any further processing - # (which will then split the combined path apart into a bucket and path - # again). If we didn't want to support Python 2.5, this would be - # easier as we could just use the netloc as the bucket directly. - path = self.netloc + '/' + self.path - (bucket, prefix) = path.lstrip("/").split("/", 1) +class Store(cumulus.store.Store): + def __init__(self, url): + super(Store, self).__init__(url) self.conn = boto.connect_s3(is_secure=False) - self.bucket = self.conn.create_bucket(bucket) - self.prefix = prefix.strip("/") + self.bucket = self.conn.create_bucket(url.hostname) + self.prefix = url.path + if not self.prefix.endswith("/"): + self.prefix += "/" + self.prefix = self.prefix.lstrip("/") self.scan_cache = {} + def _fullpath(self, path, is_directory=False): + fullpath = self.prefix + path + if is_directory and not fullpath.endswith("/"): + fullpath += "/" + return fullpath + def _get_key(self, path): k = Key(self.bucket) - k.key = "%s/%s" % (self.prefix, path) + k.key = self._fullpath(path) return k @throw_notfound def scan(self, path): - prefix = "%s/%s/" % (self.prefix, path) + prefix = self._fullpath(path, is_directory=True) for i in self.bucket.list(prefix): assert i.key.startswith(prefix) self.scan_cache[i.key] = i @throw_notfound def list(self, path): - prefix = "%s/%s/" % (self.prefix, path) + prefix = self._fullpath(path, is_directory=True) + # TODO: Should use a delimiter for i in self.bucket.list(prefix): assert i.key.startswith(prefix) yield i.key[len(prefix):] @@ -94,10 +92,10 @@ class S3Store(cumulus.store.Store): @throw_notfound def delete(self, path): - self.bucket.delete_key("%s/%s" % (self.prefix, path)) + self.bucket.delete_key(self._fullpath(path)) def stat(self, path): - path = "%s/%s" % (self.prefix, path) + path = self._fullpath(path) if path in self.scan_cache: k = self.scan_cache[path] else: @@ -106,5 +104,3 @@ class S3Store(cumulus.store.Store): raise cumulus.store.NotFoundError return {'size': int(k.size)} - -Store = S3Store -- 2.20.1