From 65edb87a0b8a578b4a7221478a851d71df21fab0 Mon Sep 17 00:00:00 2001 From: Ralf Schlatterbeck Date: Thu, 10 Sep 2009 14:42:20 -0700 Subject: [PATCH] Implement FTP backend and other code cleanups. Details: - Fix a race condition in setup of RemoteStore: The backup_script was set *after* a thread was already started. Sometimes this lead to a segfault because the thread tested the backup script twice. During the first test the variable wasn't yet defined while in the second test it was -- this lead to a segfault due to an uninitialized output file. I've move the backup_script to the constructor as an optional parameter. cumulus-util: - put all command-documentation into docstrings of respective function - add the docstrings to the usage text, so that we know which commands exist - Fix hard-coded extension '.tar.gpg' for cmd_list_snapshot_sizes - framework for automatically computing the right method to call for a given command python/cumulus/store/file.py: - fix constructor so that we can directly call it when it's not called via the factory with an URL as parameter - for NotFoundError give type and filename python/cumulus/store/ftp.py: - new FTP backend --- README | 2 + cumulus-util | 124 ++++++++++++++++--------------- python/cumulus/store/__init__.py | 49 +++++++----- python/cumulus/store/file.py | 12 ++- python/cumulus/store/ftp.py | 94 +++++++++++++++++++++++ python/cumulus/store/s3.py | 8 +- remote.cc | 3 +- remote.h | 4 +- scandir.cc | 3 +- 9 files changed, 208 insertions(+), 91 deletions(-) create mode 100644 python/cumulus/store/ftp.py diff --git a/README b/README index 9871798..82fed7d 100644 --- a/README +++ b/README @@ -6,6 +6,8 @@ How to Build Dependencies: - libuuid (sometimes part of e2fsprogs) - sqlite3 + - boto, the python interface to Amazon's Web Services (for S3 storage) + http://code.google.com/p/boto Building should be a simple matter of running "make". This will produce an executable called "cumulus". diff --git a/cumulus-util b/cumulus-util index 1bac12b..c905308 100755 --- a/cumulus-util +++ b/cumulus-util @@ -24,19 +24,6 @@ def check_version(format): if ver > FORMAT_VERSION: raise RuntimeError("Unsupported LBS format: " + format) -parser = OptionParser(usage="%prog [option]... command [arg]...") -parser.add_option("-v", action="store_true", dest="verbose", default=False, - help="increase verbosity") -parser.add_option("-n", action="store_true", dest="dry_run", default=False, - help="dry run") -parser.add_option("--store", dest="store", - help="specify path to backup data store") -parser.add_option("--localdb", dest="localdb", - help="specify path to local database") -parser.add_option("--intent", dest="intent", default=1.0, - help="give expected next snapshot type when cleaning") -(options, args) = parser.parse_args(sys.argv[1:]) - # Read a passphrase from the user and store it in the LBS_GPG_PASSPHRASE # environment variable. def get_passphrase(): @@ -44,19 +31,21 @@ def get_passphrase(): if not os.environ.has_key(ENV_KEY): os.environ[ENV_KEY] = getpass.getpass() -# Delete old snapshots from the local database, though do not actually schedule -# any segment cleaning. -# Syntax: $0 --localdb=LOCALDB prune-db -def cmd_prune_db(): +def cmd_prune_db(args): + """ Delete old snapshots from the local database, though do not + actually schedule any segment cleaning. + Syntax: $0 --localdb=LOCALDB prune-db + """ db = lbs.LocalDatabase(options.localdb) # Delete old snapshots from the local database. #db.garbage_collect() #db.commit() -# Run the segment cleaner. -# Syntax: $0 --localdb=LOCALDB clean -def cmd_clean(clean_threshold=7.0): +def cmd_clean(args, clean_threshold=7.0): + """ Run the segment cleaner. + Syntax: $0 --localdb=LOCALDB clean + """ db = lbs.LocalDatabase(options.localdb) # Delete old snapshots from the local database. @@ -75,20 +64,25 @@ def cmd_clean(clean_threshold=7.0): db.balance_expired_objects() db.commit() -# List snapshots stored. -# Syntax: $0 --data=DATADIR list-snapshots -def cmd_list_snapshots(): +def cmd_list_snapshots(args): + """ List snapshots stored. + Syntax: $0 --data=DATADIR list-snapshots + """ store = lbs.LowlevelDataStore(options.store) for s in sorted(store.list_snapshots()): print s -# List size of data needed for each snapshot. -# Syntax: $0 --data=DATADIR list-snapshot-sizes -def cmd_list_snapshot_sizes(): +def cmd_list_snapshot_sizes(args): + """ List size of data needed for each snapshot. + Syntax: $0 --data=DATADIR list-snapshot-sizes + """ lowlevel = lbs.LowlevelDataStore(options.store) lowlevel.scan() store = lbs.ObjectStore(lowlevel) previous = set() + exts = {} + for seg in lowlevel.store.list('segments'): + exts.update ([seg.split ('.', 1)]) for s in sorted(lowlevel.list_snapshots()): d = lbs.parse_full(store.load_snapshot(s)) check_version(d['Format']) @@ -100,23 +94,25 @@ def cmd_list_snapshot_sizes(): segments = d['Segments'].split() (size, added, removed, addcount, remcount) = (0, 0, 0, 0, 0) + lo_stat = lowlevel.lowlevel_stat for seg in segments: - segsize = lowlevel.lowlevel_stat(seg + ".tar.gpg")['size'] + segsize = lo_stat('.'.join ((seg, exts[seg])))['size'] size += segsize if seg not in previous: added += segsize addcount += 1 for seg in previous: if seg not in segments: - removed += lowlevel.lowlevel_stat(seg + ".tar.gpg")['size'] + removed += lo_stat('.'.join((seg, exts[seg])))['size'] remcount += 1 previous = set(segments) print "%s [%s]: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, intent, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount) -# Search for any files which are not needed by any current snapshots and offer -# to delete them. -# Syntax: $0 --store=DATADIR gc -def cmd_garbage_collect(): +def cmd_garbage_collect(args): + """ Search for any files which are not needed by any current + snapshots and offer to delete them. + Syntax: $0 --store=DATADIR gc + """ lowlevel = lbs.LowlevelDataStore(options.store) lowlevel.scan() store = lbs.ObjectStore(lowlevel) @@ -139,9 +135,12 @@ def cmd_garbage_collect(): lowlevel.store.delete(t, f) print "Reclaimed space:", reclaimed -# Build checksum list for objects in the given segments, or all segments if -# none are specified. +cmd_gc = cmd_garbage_collect + def cmd_object_checksums(segments): + """ Build checksum list for objects in the given segments, or all + segments if none are specified. + """ get_passphrase() lowlevel = lbs.LowlevelDataStore(options.store) store = lbs.ObjectStore(lowlevel) @@ -152,9 +151,11 @@ def cmd_object_checksums(segments): csum = lbs.ChecksumCreator().update(data).compute() print "%s/%s:%d:%s" % (s, o, len(data), csum) store.cleanup() +object_sums = cmd_object_checksums -# Read a snapshot file def cmd_read_snapshots(snapshots): + """ Read a snapshot file + """ get_passphrase() lowlevel = lbs.LowlevelDataStore(options.store) store = lbs.ObjectStore(lowlevel) @@ -165,8 +166,10 @@ def cmd_read_snapshots(snapshots): print d['Segments'].split() store.cleanup() -# Produce a flattened metadata dump from a snapshot -def cmd_read_metadata(snapshot): +def cmd_read_metadata(args): + """ Produce a flattened metadata dump from a snapshot + """ + snapshot = args [0] get_passphrase() lowlevel = lbs.LowlevelDataStore(options.store) store = lbs.ObjectStore(lowlevel) @@ -183,8 +186,9 @@ def cmd_read_metadata(snapshot): sys.stdout.write(l) store.cleanup() -# Verify snapshot integrity def cmd_verify_snapshots(snapshots): + """ Verify snapshot integrity + """ get_passphrase() lowlevel = lbs.LowlevelDataStore(options.store) store = lbs.ObjectStore(lowlevel) @@ -221,8 +225,9 @@ def cmd_verify_snapshots(snapshots): print sorted(list(listed_segments - lbs.accessed_segments)) store.cleanup() -# Restore a snapshot, or some subset of files from it def cmd_restore_snapshot(args): + """ Restore a snapshot, or some subset of files from it + """ get_passphrase() lowlevel = lbs.LowlevelDataStore(options.store) store = lbs.ObjectStore(lowlevel) @@ -373,31 +378,32 @@ def cmd_restore_snapshot(args): store.cleanup() +usage = ["%prog [option]... command [arg]...", "", "Commands:"] +cmd = method = None +for cmd, method in locals().iteritems(): + if cmd.startswith ('cmd_'): + usage.append(cmd[4:].replace('_', '-') + ':' + method.__doc__) +parser = OptionParser(usage="\n".join(usage)) +parser.add_option("-v", action="store_true", dest="verbose", default=False, + help="increase verbosity") +parser.add_option("-n", action="store_true", dest="dry_run", default=False, + help="dry run") +parser.add_option("--store", dest="store", + help="specify path to backup data store") +parser.add_option("--localdb", dest="localdb", + help="specify path to local database") +parser.add_option("--intent", dest="intent", default=1.0, + help="give expected next snapshot type when cleaning") +(options, args) = parser.parse_args(sys.argv[1:]) + if len(args) == 0: parser.print_usage() sys.exit(1) cmd = args[0] args = args[1:] -if cmd == 'clean': - cmd_clean() -elif cmd == 'prune-db': - cmd_prune_db() -elif cmd == 'list-snapshots': - cmd_list_snapshots() -elif cmd == 'object-sums': - cmd_object_checksums(args) -elif cmd == 'read-snapshots': - cmd_read_snapshots(args) -elif cmd == 'read-metadata': - cmd_read_metadata(args[0]) -elif cmd == 'list-snapshot-sizes': - cmd_list_snapshot_sizes() -elif cmd == 'gc': - cmd_garbage_collect() -elif cmd == 'verify-snapshots': - cmd_verify_snapshots(args) -elif cmd == 'restore-snapshot': - cmd_restore_snapshot(args) +method = locals().get('cmd_' + cmd.replace('-', '_')) +if method: + method (args) else: print "Unknown command:", cmd parser.print_usage() diff --git a/python/cumulus/store/__init__.py b/python/cumulus/store/__init__.py index 35de0ee..f32ff9f 100644 --- a/python/cumulus/store/__init__.py +++ b/python/cumulus/store/__init__.py @@ -11,23 +11,46 @@ class NotFoundError(exceptions.KeyError): pass -class Store: +class Store (object): """Base class for all cumulus storage backends.""" + def __new__ (cls, url, **kw): + """ Return the correct sub-class depending on url, + pass parsed url parameters to object + """ + if cls != Store: + return super(Store, cls).__new__(cls, url, **kw) + (scheme, netloc, path, params, query, fragment) \ + = urlparse.urlparse(url) + + try: + cumulus = __import__('cumulus.store.%s' % scheme, globals()) + subcls = getattr (cumulus.store, scheme).Store + obj = super(Store, cls).__new__(subcls, url, **kw) + obj.scheme = scheme + obj.netloc = netloc + obj.path = path + obj.params = params + obj.query = query + obj.fragment = fragment + return obj + except ImportError: + raise NotImplementedError, "Scheme %s not implemented" % scheme + def list(self, type): - raise NotImplementedException + raise NotImplementedError def get(self, type, name): - raise NotImplementedException + raise NotImplementedError def put(self, type, name, fp): - raise NotImplementedException + raise NotImplementedError def delete(self, type, name): - raise NotImplementedException + raise NotImplementedError def stat(self, type, name): - raise NotImplementedException + raise NotImplementedError def scan(self): """Cache file information stored in this backend. @@ -38,16 +61,4 @@ class Store: pass def open(url): - (scheme, netloc, path, params, query, fragment) \ - = urlparse.urlparse(url) - - if scheme == "file": - import cumulus.store.file - return cumulus.store.file.FileStore(path) - elif scheme == "s3": - import cumulus.store.s3 - while path.startswith("/"): path = path[1:] - (bucket, path) = path.split("/", 1) - return cumulus.store.s3.S3Store(bucket, path) - else: - raise NotImplementedException + return Store(url) diff --git a/python/cumulus/store/file.py b/python/cumulus/store/file.py index 6b16fb6..f343500 100644 --- a/python/cumulus/store/file.py +++ b/python/cumulus/store/file.py @@ -5,9 +5,11 @@ import cumulus.store type_patterns = cumulus.store.type_patterns class FileStore(cumulus.store.Store): - def __init__(self, prefix): - while prefix.endswith("/") and prefix != "/": prefix = prefix[:-1] - self.prefix = prefix + def __init__(self, url, **kw): + # if constructor isn't called via factory interpret url as filename + if not hasattr (self, 'path'): + self.path = url + self.prefix = self.path.rstrip("/") def _get_path(self, type, name): return "%s/%s" % (self.prefix, name) @@ -37,4 +39,6 @@ class FileStore(cumulus.store.Store): stat = os.stat(self._get_path(type, name)) return {'size': stat.st_size} except OSError: - raise cumulus.store.NotFoundError + raise cumulus.store.NotFoundError, (type, name) + +Store = FileStore diff --git a/python/cumulus/store/ftp.py b/python/cumulus/store/ftp.py new file mode 100644 index 0000000..52a7aad --- /dev/null +++ b/python/cumulus/store/ftp.py @@ -0,0 +1,94 @@ + +from ftplib import FTP, all_errors +from netrc import netrc, NetrcParseError +from cumulus.store import Store, type_patterns, NotFoundError + +class FtpStore (Store): + def __init__ (self, url, **kw): + self.synced = True + try: + upw, hp = self.netloc.split ('@') + except ValueError: + hp = self.netloc + upw = 'anonymous' + try: + host, port = hp.split (':') + port = int (port, 10) + except ValueError: + host = hp + port = 21 + try: + user, passwd = upw.split (':') + except ValueError: + user = upw + passwd = None + try: + n = netrc () + try: + user, acct, passwd = n.authenticators (host) + except ValueError: + pass + except (IOError, NetrcParseError): + pass + self.ftp = FTP () + self.ftp.connect (host, port) + self.ftp.login (user, passwd) + self.prefix = self.path [1:] # skip *only* first '/' + self.ftp.cwd (self.prefix) + + def _get_path (self, type, name): + # we are in right directory + return name + + def list (self, type): + self.sync () + files = self.ftp.nlst () + return (f for f in files if type_patterns[type].match (f)) + + def get (self, type, name): + self.sync () + sock = self.ftp.transfercmd ('RETR %s' % self._get_path (type, name)) + self.synced = False + return sock.makefile () + + def put (self, type, name, fp): + self.sync () + self.ftp.storbinary ("STOR %s" % self._get_path (type, name), fp) + + def delete (self, type, name): + self.sync () + self.ftp.delete (self._get_path (type, name)) + + def stat (self, type, name): + """ Note that the size-command is non-standard but supported by + most ftp servers today. If size returns an error condition we + try nlst to detect if the file exists and return an bogus length + """ + self.sync () + fn = self._get_path (type, name) + size = None + try: + # my client doesn't accept size in ascii-mode + self.ftp.sendcmd ('TYPE I') + size = self.ftp.size (fn) + self.ftp.sendcmd ('TYPE A') + except all_errors, err: + print err + pass + if size is not None: + return {'size': size} + print "nlst: %s" % fn, size + l = self.ftp.nlst (fn) + if l: + return {'size': 42} + raise NotFoundError, (type, name) + + def sync (self): + """ After a get command at end of transfer a 2XX reply is still + in the input-queue, we have to get rid of that + """ + if not self.synced: + self.ftp.voidresp() + self.synced = True + +Store = FtpStore diff --git a/python/cumulus/store/s3.py b/python/cumulus/store/s3.py index 63efa17..65884ea 100644 --- a/python/cumulus/store/s3.py +++ b/python/cumulus/store/s3.py @@ -6,11 +6,11 @@ from boto.s3.key import Key import cumulus.store class S3Store(cumulus.store.Store): - def __init__(self, bucket, prefix): + def __init__(self, url, **kw): + (bucket, prefix) = self.path.lstrip("/").split("/", 1) self.conn = boto.connect_s3(is_secure=False) self.bucket = self.conn.create_bucket(bucket) - while prefix.endswith("/"): prefix = prefix[:-1] - self.prefix = prefix + self.prefix = prefix.rstrip ("/") self.scan_cache = {} def _get_key(self, type, name): @@ -54,3 +54,5 @@ class S3Store(cumulus.store.Store): raise cumulus.store.NotFoundError return {'size': int(k.size)} + +Store = S3Store diff --git a/remote.cc b/remote.cc index 6dd4900..5a20c23 100644 --- a/remote.cc +++ b/remote.cc @@ -45,9 +45,10 @@ using std::string; -RemoteStore::RemoteStore(const string &stagedir) +RemoteStore::RemoteStore(const string &stagedir, const string &script) { staging_dir = stagedir; + backup_script = script; /* A background thread is created for each RemoteStore to manage the actual * transfers to a remote server. The main program thread can enqueue diff --git a/remote.h b/remote.h index a2801da..2011654 100644 --- a/remote.h +++ b/remote.h @@ -39,10 +39,8 @@ class RemoteStore { public: static const size_t MAX_QUEUE_SIZE = 4; - RemoteStore(const std::string &stagedir); + RemoteStore(const std::string &stagedir, const std::string &script = ""); ~RemoteStore(); - void set_script(const std::string &script) - { backup_script = script; } RemoteFile *alloc_file(const std::string &name, const std::string &type); void enqueue(RemoteFile *file); void sync(); diff --git a/scandir.cc b/scandir.cc index f902fa0..8af3175 100644 --- a/scandir.cc +++ b/scandir.cc @@ -804,8 +804,7 @@ int main(int argc, char *argv[]) tmp_dir.c_str()); return 1; } - remote = new RemoteStore(tmp_dir); - remote->set_script(backup_script); + remote = new RemoteStore(tmp_dir, backup_script=backup_script); } else { remote = new RemoteStore(backup_dest); } -- 2.20.1