Implement FTP backend and other code cleanups.
authorRalf Schlatterbeck <rsc@runtux.com>
Thu, 10 Sep 2009 21:42:20 +0000 (14:42 -0700)
committerMichael Vrable <mvrable@turin.ucsd.edu>
Thu, 10 Sep 2009 21:42:20 +0000 (14:42 -0700)
Details:

- Fix a race condition in setup of RemoteStore: The backup_script was
  set *after* a thread was already started. Sometimes this lead to a
  segfault because the thread tested the backup script twice. During the
  first test the variable wasn't yet defined while in the second test it
  was -- this lead to a segfault due to an uninitialized output file.
  I've move the backup_script to the constructor as an optional
  parameter.
cumulus-util:
- put all command-documentation into docstrings of respective function
- add the docstrings to the usage text, so that we know which commands
  exist
- Fix hard-coded extension '.tar.gpg' for cmd_list_snapshot_sizes
- framework for automatically computing the right method to call for a
  given command
python/cumulus/store/file.py:
- fix constructor so that we can directly call it when it's not called
  via the factory with an URL as parameter
- for NotFoundError give type and filename
python/cumulus/store/ftp.py:
- new FTP backend

README
cumulus-util
python/cumulus/store/__init__.py
python/cumulus/store/file.py
python/cumulus/store/ftp.py [new file with mode: 0644]
python/cumulus/store/s3.py
remote.cc
remote.h
scandir.cc

diff --git a/README b/README
index 9871798..82fed7d 100644 (file)
--- a/README
+++ b/README
@@ -6,6 +6,8 @@ How to Build
 Dependencies:
   - libuuid (sometimes part of e2fsprogs)
   - sqlite3
 Dependencies:
   - libuuid (sometimes part of e2fsprogs)
   - sqlite3
+  - boto, the python interface to Amazon's Web Services (for S3 storage)
+    http://code.google.com/p/boto
 
 Building should be a simple matter of running "make".  This will produce
 an executable called "cumulus".
 
 Building should be a simple matter of running "make".  This will produce
 an executable called "cumulus".
index 1bac12b..c905308 100755 (executable)
@@ -24,19 +24,6 @@ def check_version(format):
     if ver > FORMAT_VERSION:
         raise RuntimeError("Unsupported LBS format: " + format)
 
     if ver > FORMAT_VERSION:
         raise RuntimeError("Unsupported LBS format: " + format)
 
-parser = OptionParser(usage="%prog [option]... command [arg]...")
-parser.add_option("-v", action="store_true", dest="verbose", default=False,
-                  help="increase verbosity")
-parser.add_option("-n", action="store_true", dest="dry_run", default=False,
-                  help="dry run")
-parser.add_option("--store", dest="store",
-                  help="specify path to backup data store")
-parser.add_option("--localdb", dest="localdb",
-                  help="specify path to local database")
-parser.add_option("--intent", dest="intent", default=1.0,
-                  help="give expected next snapshot type when cleaning")
-(options, args) = parser.parse_args(sys.argv[1:])
-
 # Read a passphrase from the user and store it in the LBS_GPG_PASSPHRASE
 # environment variable.
 def get_passphrase():
 # Read a passphrase from the user and store it in the LBS_GPG_PASSPHRASE
 # environment variable.
 def get_passphrase():
@@ -44,19 +31,21 @@ def get_passphrase():
     if not os.environ.has_key(ENV_KEY):
         os.environ[ENV_KEY] = getpass.getpass()
 
     if not os.environ.has_key(ENV_KEY):
         os.environ[ENV_KEY] = getpass.getpass()
 
-# Delete old snapshots from the local database, though do not actually schedule
-# any segment cleaning.
-# Syntax: $0 --localdb=LOCALDB prune-db
-def cmd_prune_db():
+def cmd_prune_db(args):
+    """ Delete old snapshots from the local database, though do not
+        actually schedule any segment cleaning.
+        Syntax: $0 --localdb=LOCALDB prune-db
+    """
     db = lbs.LocalDatabase(options.localdb)
 
     # Delete old snapshots from the local database.
     #db.garbage_collect()
     #db.commit()
 
     db = lbs.LocalDatabase(options.localdb)
 
     # Delete old snapshots from the local database.
     #db.garbage_collect()
     #db.commit()
 
-# Run the segment cleaner.
-# Syntax: $0 --localdb=LOCALDB clean
-def cmd_clean(clean_threshold=7.0):
+def cmd_clean(args, clean_threshold=7.0):
+    """ Run the segment cleaner.
+        Syntax: $0 --localdb=LOCALDB clean
+    """
     db = lbs.LocalDatabase(options.localdb)
 
     # Delete old snapshots from the local database.
     db = lbs.LocalDatabase(options.localdb)
 
     # Delete old snapshots from the local database.
@@ -75,20 +64,25 @@ def cmd_clean(clean_threshold=7.0):
     db.balance_expired_objects()
     db.commit()
 
     db.balance_expired_objects()
     db.commit()
 
-# List snapshots stored.
-# Syntax: $0 --data=DATADIR list-snapshots
-def cmd_list_snapshots():
+def cmd_list_snapshots(args):
+    """ List snapshots stored.
+        Syntax: $0 --data=DATADIR list-snapshots
+    """
     store = lbs.LowlevelDataStore(options.store)
     for s in sorted(store.list_snapshots()):
         print s
 
     store = lbs.LowlevelDataStore(options.store)
     for s in sorted(store.list_snapshots()):
         print s
 
-# List size of data needed for each snapshot.
-# Syntax: $0 --data=DATADIR list-snapshot-sizes
-def cmd_list_snapshot_sizes():
+def cmd_list_snapshot_sizes(args):
+    """ List size of data needed for each snapshot.
+        Syntax: $0 --data=DATADIR list-snapshot-sizes
+    """
     lowlevel = lbs.LowlevelDataStore(options.store)
     lowlevel.scan()
     store = lbs.ObjectStore(lowlevel)
     previous = set()
     lowlevel = lbs.LowlevelDataStore(options.store)
     lowlevel.scan()
     store = lbs.ObjectStore(lowlevel)
     previous = set()
+    exts = {}
+    for seg in lowlevel.store.list('segments'):
+        exts.update ([seg.split ('.', 1)])
     for s in sorted(lowlevel.list_snapshots()):
         d = lbs.parse_full(store.load_snapshot(s))
         check_version(d['Format'])
     for s in sorted(lowlevel.list_snapshots()):
         d = lbs.parse_full(store.load_snapshot(s))
         check_version(d['Format'])
@@ -100,23 +94,25 @@ def cmd_list_snapshot_sizes():
 
         segments = d['Segments'].split()
         (size, added, removed, addcount, remcount) = (0, 0, 0, 0, 0)
 
         segments = d['Segments'].split()
         (size, added, removed, addcount, remcount) = (0, 0, 0, 0, 0)
+        lo_stat = lowlevel.lowlevel_stat
         for seg in segments:
         for seg in segments:
-            segsize = lowlevel.lowlevel_stat(seg + ".tar.gpg")['size']
+            segsize = lo_stat('.'.join ((seg, exts[seg])))['size']
             size += segsize
             if seg not in previous:
                 added += segsize
                 addcount += 1
         for seg in previous:
             if seg not in segments:
             size += segsize
             if seg not in previous:
                 added += segsize
                 addcount += 1
         for seg in previous:
             if seg not in segments:
-                removed += lowlevel.lowlevel_stat(seg + ".tar.gpg")['size']
+                removed += lo_stat('.'.join((seg, exts[seg])))['size']
                 remcount += 1
         previous = set(segments)
         print "%s [%s]: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, intent, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount)
 
                 remcount += 1
         previous = set(segments)
         print "%s [%s]: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, intent, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount)
 
-# Search for any files which are not needed by any current snapshots and offer
-# to delete them.
-# Syntax: $0 --store=DATADIR gc
-def cmd_garbage_collect():
+def cmd_garbage_collect(args):
+    """ Search for any files which are not needed by any current
+        snapshots and offer to delete them.
+        Syntax: $0 --store=DATADIR gc
+    """
     lowlevel = lbs.LowlevelDataStore(options.store)
     lowlevel.scan()
     store = lbs.ObjectStore(lowlevel)
     lowlevel = lbs.LowlevelDataStore(options.store)
     lowlevel.scan()
     store = lbs.ObjectStore(lowlevel)
@@ -139,9 +135,12 @@ def cmd_garbage_collect():
                     lowlevel.store.delete(t, f)
     print "Reclaimed space:", reclaimed
 
                     lowlevel.store.delete(t, f)
     print "Reclaimed space:", reclaimed
 
-# Build checksum list for objects in the given segments, or all segments if
-# none are specified.
+cmd_gc = cmd_garbage_collect
+
 def cmd_object_checksums(segments):
 def cmd_object_checksums(segments):
+    """ Build checksum list for objects in the given segments, or all
+        segments if none are specified.
+    """
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
@@ -152,9 +151,11 @@ def cmd_object_checksums(segments):
             csum = lbs.ChecksumCreator().update(data).compute()
             print "%s/%s:%d:%s" % (s, o, len(data), csum)
     store.cleanup()
             csum = lbs.ChecksumCreator().update(data).compute()
             print "%s/%s:%d:%s" % (s, o, len(data), csum)
     store.cleanup()
+object_sums = cmd_object_checksums
 
 
-# Read a snapshot file
 def cmd_read_snapshots(snapshots):
 def cmd_read_snapshots(snapshots):
+    """ Read a snapshot file
+    """
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
@@ -165,8 +166,10 @@ def cmd_read_snapshots(snapshots):
         print d['Segments'].split()
     store.cleanup()
 
         print d['Segments'].split()
     store.cleanup()
 
-# Produce a flattened metadata dump from a snapshot
-def cmd_read_metadata(snapshot):
+def cmd_read_metadata(args):
+    """ Produce a flattened metadata dump from a snapshot
+    """
+    snapshot = args [0]
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
@@ -183,8 +186,9 @@ def cmd_read_metadata(snapshot):
         sys.stdout.write(l)
     store.cleanup()
 
         sys.stdout.write(l)
     store.cleanup()
 
-# Verify snapshot integrity
 def cmd_verify_snapshots(snapshots):
 def cmd_verify_snapshots(snapshots):
+    """ Verify snapshot integrity
+    """
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
@@ -221,8 +225,9 @@ def cmd_verify_snapshots(snapshots):
             print sorted(list(listed_segments - lbs.accessed_segments))
     store.cleanup()
 
             print sorted(list(listed_segments - lbs.accessed_segments))
     store.cleanup()
 
-# Restore a snapshot, or some subset of files from it
 def cmd_restore_snapshot(args):
 def cmd_restore_snapshot(args):
+    """ Restore a snapshot, or some subset of files from it
+    """
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
     get_passphrase()
     lowlevel = lbs.LowlevelDataStore(options.store)
     store = lbs.ObjectStore(lowlevel)
@@ -373,31 +378,32 @@ def cmd_restore_snapshot(args):
 
     store.cleanup()
 
 
     store.cleanup()
 
+usage = ["%prog [option]... command [arg]...", "", "Commands:"]
+cmd = method = None
+for cmd, method in locals().iteritems():
+    if cmd.startswith ('cmd_'):
+        usage.append(cmd[4:].replace('_', '-') + ':' + method.__doc__)
+parser = OptionParser(usage="\n".join(usage))
+parser.add_option("-v", action="store_true", dest="verbose", default=False,
+                  help="increase verbosity")
+parser.add_option("-n", action="store_true", dest="dry_run", default=False,
+                  help="dry run")
+parser.add_option("--store", dest="store",
+                  help="specify path to backup data store")
+parser.add_option("--localdb", dest="localdb",
+                  help="specify path to local database")
+parser.add_option("--intent", dest="intent", default=1.0,
+                  help="give expected next snapshot type when cleaning")
+(options, args) = parser.parse_args(sys.argv[1:])
+
 if len(args) == 0:
     parser.print_usage()
     sys.exit(1)
 cmd = args[0]
 args = args[1:]
 if len(args) == 0:
     parser.print_usage()
     sys.exit(1)
 cmd = args[0]
 args = args[1:]
-if cmd == 'clean':
-    cmd_clean()
-elif cmd == 'prune-db':
-    cmd_prune_db()
-elif cmd == 'list-snapshots':
-    cmd_list_snapshots()
-elif cmd == 'object-sums':
-    cmd_object_checksums(args)
-elif cmd == 'read-snapshots':
-    cmd_read_snapshots(args)
-elif cmd == 'read-metadata':
-    cmd_read_metadata(args[0])
-elif cmd == 'list-snapshot-sizes':
-    cmd_list_snapshot_sizes()
-elif cmd == 'gc':
-    cmd_garbage_collect()
-elif cmd == 'verify-snapshots':
-    cmd_verify_snapshots(args)
-elif cmd == 'restore-snapshot':
-    cmd_restore_snapshot(args)
+method = locals().get('cmd_' + cmd.replace('-', '_'))
+if method:
+    method (args)
 else:
     print "Unknown command:", cmd
     parser.print_usage()
 else:
     print "Unknown command:", cmd
     parser.print_usage()
index 35de0ee..f32ff9f 100644 (file)
@@ -11,23 +11,46 @@ class NotFoundError(exceptions.KeyError):
 
     pass
 
 
     pass
 
-class Store:
+class Store (object):
     """Base class for all cumulus storage backends."""
 
     """Base class for all cumulus storage backends."""
 
+    def __new__ (cls, url, **kw):
+        """ Return the correct sub-class depending on url,
+        pass parsed url parameters to object
+        """
+        if cls != Store:
+            return super(Store, cls).__new__(cls, url, **kw)
+        (scheme, netloc, path, params, query, fragment) \
+            = urlparse.urlparse(url)
+
+        try:
+            cumulus = __import__('cumulus.store.%s' % scheme, globals())
+            subcls = getattr (cumulus.store, scheme).Store
+            obj = super(Store, cls).__new__(subcls, url, **kw)
+            obj.scheme = scheme
+            obj.netloc = netloc
+            obj.path = path
+            obj.params = params
+            obj.query = query
+            obj.fragment = fragment
+            return obj
+        except ImportError:
+            raise NotImplementedError, "Scheme %s not implemented" % scheme
+
     def list(self, type):
     def list(self, type):
-        raise NotImplementedException
+        raise NotImplementedError
 
     def get(self, type, name):
 
     def get(self, type, name):
-        raise NotImplementedException
+        raise NotImplementedError
 
     def put(self, type, name, fp):
 
     def put(self, type, name, fp):
-        raise NotImplementedException
+        raise NotImplementedError
 
     def delete(self, type, name):
 
     def delete(self, type, name):
-        raise NotImplementedException
+        raise NotImplementedError
 
     def stat(self, type, name):
 
     def stat(self, type, name):
-        raise NotImplementedException
+        raise NotImplementedError
 
     def scan(self):
         """Cache file information stored in this backend.
 
     def scan(self):
         """Cache file information stored in this backend.
@@ -38,16 +61,4 @@ class Store:
         pass
 
 def open(url):
         pass
 
 def open(url):
-    (scheme, netloc, path, params, query, fragment) \
-        = urlparse.urlparse(url)
-
-    if scheme == "file":
-        import cumulus.store.file
-        return cumulus.store.file.FileStore(path)
-    elif scheme == "s3":
-        import cumulus.store.s3
-        while path.startswith("/"): path = path[1:]
-        (bucket, path) = path.split("/", 1)
-        return cumulus.store.s3.S3Store(bucket, path)
-    else:
-        raise NotImplementedException
+    return Store(url)
index 6b16fb6..f343500 100644 (file)
@@ -5,9 +5,11 @@ import cumulus.store
 type_patterns = cumulus.store.type_patterns
 
 class FileStore(cumulus.store.Store):
 type_patterns = cumulus.store.type_patterns
 
 class FileStore(cumulus.store.Store):
-    def __init__(self, prefix):
-        while prefix.endswith("/") and prefix != "/": prefix = prefix[:-1]
-        self.prefix = prefix
+    def __init__(self, url, **kw):
+        # if constructor isn't called via factory interpret url as filename
+        if not hasattr (self, 'path'):
+            self.path = url
+        self.prefix = self.path.rstrip("/")
 
     def _get_path(self, type, name):
         return "%s/%s" % (self.prefix, name)
 
     def _get_path(self, type, name):
         return "%s/%s" % (self.prefix, name)
@@ -37,4 +39,6 @@ class FileStore(cumulus.store.Store):
             stat = os.stat(self._get_path(type, name))
             return {'size': stat.st_size}
         except OSError:
             stat = os.stat(self._get_path(type, name))
             return {'size': stat.st_size}
         except OSError:
-            raise cumulus.store.NotFoundError
+            raise cumulus.store.NotFoundError, (type, name)
+
+Store = FileStore
diff --git a/python/cumulus/store/ftp.py b/python/cumulus/store/ftp.py
new file mode 100644 (file)
index 0000000..52a7aad
--- /dev/null
@@ -0,0 +1,94 @@
+
+from ftplib        import FTP, all_errors
+from netrc         import netrc, NetrcParseError
+from cumulus.store import Store, type_patterns, NotFoundError
+
+class FtpStore (Store):
+    def __init__ (self, url, **kw):
+        self.synced = True
+        try:
+            upw, hp = self.netloc.split ('@')
+        except ValueError:
+            hp = self.netloc
+            upw = 'anonymous'
+        try:
+            host, port = hp.split (':')
+            port = int (port, 10)
+        except ValueError:
+            host = hp
+            port = 21
+        try:
+            user, passwd = upw.split (':')
+        except ValueError:
+            user = upw
+            passwd = None
+            try:
+                n = netrc ()
+                try:
+                    user, acct, passwd = n.authenticators (host)
+                except ValueError:
+                    pass
+            except (IOError, NetrcParseError):
+                pass
+        self.ftp = FTP ()
+        self.ftp.connect (host, port)
+        self.ftp.login (user, passwd)
+        self.prefix = self.path [1:] # skip *only* first '/'
+        self.ftp.cwd (self.prefix)
+
+    def _get_path (self, type, name):
+        # we are in right directory
+        return name
+
+    def list (self, type):
+        self.sync ()
+        files = self.ftp.nlst ()
+        return (f for f in files if type_patterns[type].match (f))
+
+    def get (self, type, name):
+        self.sync ()
+        sock = self.ftp.transfercmd ('RETR %s' % self._get_path (type, name))
+        self.synced = False
+        return sock.makefile ()
+
+    def put (self, type, name, fp):
+        self.sync ()
+        self.ftp.storbinary ("STOR %s" % self._get_path (type, name), fp)
+
+    def delete (self, type, name):
+        self.sync ()
+        self.ftp.delete (self._get_path (type, name))
+
+    def stat (self, type, name):
+        """ Note that the size-command is non-standard but supported by
+        most ftp servers today. If size returns an error condition we
+        try nlst to detect if the file exists and return an bogus length
+        """
+        self.sync ()
+        fn = self._get_path (type, name)
+        size = None
+        try:
+            # my client doesn't accept size in ascii-mode
+            self.ftp.sendcmd ('TYPE I')
+            size = self.ftp.size (fn)
+            self.ftp.sendcmd ('TYPE A')
+        except all_errors, err:
+            print err
+            pass
+        if size is not None:
+            return {'size': size}
+        print "nlst: %s" % fn, size
+        l = self.ftp.nlst (fn)
+        if l:
+            return {'size': 42}
+        raise NotFoundError, (type, name)
+
+    def sync (self):
+        """ After a get command at end of transfer a 2XX reply is still
+        in the input-queue, we have to get rid of that
+        """
+        if not self.synced:
+            self.ftp.voidresp()
+        self.synced = True
+
+Store = FtpStore
index 63efa17..65884ea 100644 (file)
@@ -6,11 +6,11 @@ from boto.s3.key import Key
 import cumulus.store
 
 class S3Store(cumulus.store.Store):
 import cumulus.store
 
 class S3Store(cumulus.store.Store):
-    def __init__(self, bucket, prefix):
+    def __init__(self, url, **kw):
+        (bucket, prefix) = self.path.lstrip("/").split("/", 1)
         self.conn = boto.connect_s3(is_secure=False)
         self.bucket = self.conn.create_bucket(bucket)
         self.conn = boto.connect_s3(is_secure=False)
         self.bucket = self.conn.create_bucket(bucket)
-        while prefix.endswith("/"): prefix = prefix[:-1]
-        self.prefix = prefix
+        self.prefix = prefix.rstrip ("/")
         self.scan_cache = {}
 
     def _get_key(self, type, name):
         self.scan_cache = {}
 
     def _get_key(self, type, name):
@@ -54,3 +54,5 @@ class S3Store(cumulus.store.Store):
             raise cumulus.store.NotFoundError
 
         return {'size': int(k.size)}
             raise cumulus.store.NotFoundError
 
         return {'size': int(k.size)}
+
+Store = S3Store
index 6dd4900..5a20c23 100644 (file)
--- a/remote.cc
+++ b/remote.cc
 
 using std::string;
 
 
 using std::string;
 
-RemoteStore::RemoteStore(const string &stagedir)
+RemoteStore::RemoteStore(const string &stagedir, const string &script)
 {
     staging_dir = stagedir;
 {
     staging_dir = stagedir;
+    backup_script = script;
 
     /* A background thread is created for each RemoteStore to manage the actual
      * transfers to a remote server.  The main program thread can enqueue
 
     /* A background thread is created for each RemoteStore to manage the actual
      * transfers to a remote server.  The main program thread can enqueue
index a2801da..2011654 100644 (file)
--- a/remote.h
+++ b/remote.h
@@ -39,10 +39,8 @@ class RemoteStore {
 public:
     static const size_t MAX_QUEUE_SIZE = 4;
 
 public:
     static const size_t MAX_QUEUE_SIZE = 4;
 
-    RemoteStore(const std::string &stagedir);
+    RemoteStore(const std::string &stagedir, const std::string &script = "");
     ~RemoteStore();
     ~RemoteStore();
-    void set_script(const std::string &script)
-        { backup_script = script; }
     RemoteFile *alloc_file(const std::string &name, const std::string &type);
     void enqueue(RemoteFile *file);
     void sync();
     RemoteFile *alloc_file(const std::string &name, const std::string &type);
     void enqueue(RemoteFile *file);
     void sync();
index f902fa0..8af3175 100644 (file)
@@ -804,8 +804,7 @@ int main(int argc, char *argv[])
                     tmp_dir.c_str());
             return 1;
         }
                     tmp_dir.c_str());
             return 1;
         }
-        remote = new RemoteStore(tmp_dir);
-        remote->set_script(backup_script);
+        remote = new RemoteStore(tmp_dir, backup_script=backup_script);
     } else {
         remote = new RemoteStore(backup_dest);
     }
     } else {
         remote = new RemoteStore(backup_dest);
     }