store may either be a Store object or URL.
"""
if type(backend) in StringTypes:
- if backend.find(":") >= 0:
- self._backend = cumulus.store.open(backend)
- else:
- self._backend = cumulus.store.file.FileStore(backend)
+ self._backend = cumulus.store.open(backend)
else:
self._backend = backend
from __future__ import division, print_function, unicode_literals
+import importlib
import re
try:
- from urllib.parse import urlparse
+ # Python 3
+ from urllib import parse as urlparse
+ from urllib.parse import quote, unquote
except ImportError:
- from urlparse import urlparse
+ # Python 2
+ from urllib import quote, unquote
+ import urlparse
type_patterns = {
'checksums': re.compile(r"^snapshot-(.*)\.(\w+)sums$"),
pass
-class Store (object):
+class Store(object):
"""Base class for all cumulus storage backends."""
- def __new__ (cls, url, **kw):
- """ Return the correct sub-class depending on url,
- pass parsed url parameters to object
+ def __init__(self, url):
+ """Initializes a new storage backend.
+
+ Params:
+ url: The parsed (by urlsplit) URL that specifies the storage
+ location.
"""
- if cls != Store:
- return super(Store, cls).__new__(cls, url, **kw)
- (scheme, netloc, path, params, query, fragment) \
- = urlparse(url)
-
- try:
- cumulus = __import__('cumulus.store.%s' % scheme, globals())
- subcls = getattr (cumulus.store, scheme).Store
- obj = super(Store, cls).__new__(subcls, url, **kw)
- obj.scheme = scheme
- obj.netloc = netloc
- obj.path = path
- obj.params = params
- obj.query = query
- obj.fragment = fragment
- return obj
- except ImportError:
- raise NotImplementedError("Scheme %s not implemented" % scheme)
+ pass
+
+ # TODO: Implement context manager.
def list(self, path):
raise NotImplementedError
self.close()
def open(url):
- return Store(url)
+ """Parse a storage url, then locate and initialize a backend for it."""
+ parsed_url = urlparse.urlsplit(url)
+
+ # If there is no scheme, fall back to treating the string as local path and
+ # construct a file:/// URL.
+ if not parsed_url.scheme:
+ parsed_url = urlparse.SplitResult("file", "", quote(url), "", "")
+
+ try:
+ # TODO: Support a registry for schemes that don't map to a module.
+ if re.match(r"^\w+$", parsed_url.scheme):
+ handler = importlib.import_module("cumulus.store.%s" %
+ parsed_url.scheme)
+ obj = handler.Store(parsed_url)
+ return obj
+ except ImportError:
+ # Fall through to error below
+ pass
+
+ raise NotImplementedError("Scheme %s not implemented" % scheme)
import cumulus.store
-type_patterns = cumulus.store.type_patterns
-
-class FileStore(cumulus.store.Store):
- def __init__(self, url, **kw):
- # if constructor isn't called via factory interpret url as filename
- if not hasattr (self, 'path'):
- self.path = url
- self.prefix = self.path.rstrip("/")
+class Store(cumulus.store.Store):
+ """Storage backend that accesses the local file system."""
+ def __init__(self, url):
+ super(Store, self).__init__(url)
+ self.prefix = cumulus.store.unquote(url.path)
def list(self, subdir):
try:
raise cumulus.store.NotFoundError(path)
def put(self, path, fp):
- out = open(os.path.join(self.prefix, path), 'wb')
- buf = fp.read(4096)
- while len(buf) > 0:
- out.write(buf)
+ with open(os.path.join(self.prefix, path), "wb") as out:
buf = fp.read(4096)
+ while len(buf) > 0:
+ out.write(buf)
+ buf = fp.read(4096)
def delete(self, path):
os.unlink(os.path.join(self.prefix, path))
return {'size': stat.st_size}
except OSError:
raise cumulus.store.NotFoundError(path)
-
-Store = FileStore
return method(*args, **kwargs)
except S3ResponseError as e:
if e.status == 404:
- print("Got a 404:", e)
raise cumulus.store.NotFoundError(e)
else:
raise
return f
-class S3Store(cumulus.store.Store):
- def __init__(self, url, **kw):
- # Old versions of the Python urlparse library will take a URL like
- # s3://bucket/path/ and include the bucket with the path, while new
- # versions (2.6 and later) treat it as the netloc (which seems more
- # correct).
- #
- # But, so that we can work with either behavior, for now just combine
- # the netloc and path together before we do any further processing
- # (which will then split the combined path apart into a bucket and path
- # again). If we didn't want to support Python 2.5, this would be
- # easier as we could just use the netloc as the bucket directly.
- path = self.netloc + '/' + self.path
- (bucket, prefix) = path.lstrip("/").split("/", 1)
+class Store(cumulus.store.Store):
+ def __init__(self, url):
+ super(Store, self).__init__(url)
self.conn = boto.connect_s3(is_secure=False)
- self.bucket = self.conn.create_bucket(bucket)
- self.prefix = prefix.strip("/")
+ self.bucket = self.conn.create_bucket(url.hostname)
+ self.prefix = url.path
+ if not self.prefix.endswith("/"):
+ self.prefix += "/"
+ self.prefix = self.prefix.lstrip("/")
self.scan_cache = {}
+ def _fullpath(self, path, is_directory=False):
+ fullpath = self.prefix + path
+ if is_directory and not fullpath.endswith("/"):
+ fullpath += "/"
+ return fullpath
+
def _get_key(self, path):
k = Key(self.bucket)
- k.key = "%s/%s" % (self.prefix, path)
+ k.key = self._fullpath(path)
return k
@throw_notfound
def scan(self, path):
- prefix = "%s/%s/" % (self.prefix, path)
+ prefix = self._fullpath(path, is_directory=True)
for i in self.bucket.list(prefix):
assert i.key.startswith(prefix)
self.scan_cache[i.key] = i
@throw_notfound
def list(self, path):
- prefix = "%s/%s/" % (self.prefix, path)
+ prefix = self._fullpath(path, is_directory=True)
+ # TODO: Should use a delimiter
for i in self.bucket.list(prefix):
assert i.key.startswith(prefix)
yield i.key[len(prefix):]
@throw_notfound
def delete(self, path):
- self.bucket.delete_key("%s/%s" % (self.prefix, path))
+ self.bucket.delete_key(self._fullpath(path))
def stat(self, path):
- path = "%s/%s" % (self.prefix, path)
+ path = self._fullpath(path)
if path in self.scan_cache:
k = self.scan_cache[path]
else:
raise cumulus.store.NotFoundError
return {'size': int(k.size)}
-
-Store = S3Store