if ver > FORMAT_VERSION:
raise RuntimeError("Unsupported LBS format: " + format)
-parser = OptionParser(usage="%prog [option]... command [arg]...")
-parser.add_option("-v", action="store_true", dest="verbose", default=False,
- help="increase verbosity")
-parser.add_option("-n", action="store_true", dest="dry_run", default=False,
- help="dry run")
-parser.add_option("--store", dest="store",
- help="specify path to backup data store")
-parser.add_option("--localdb", dest="localdb",
- help="specify path to local database")
-parser.add_option("--intent", dest="intent", default=1.0,
- help="give expected next snapshot type when cleaning")
-(options, args) = parser.parse_args(sys.argv[1:])
-
# Read a passphrase from the user and store it in the LBS_GPG_PASSPHRASE
# environment variable.
def get_passphrase():
if not os.environ.has_key(ENV_KEY):
os.environ[ENV_KEY] = getpass.getpass()
-# Delete old snapshots from the local database, though do not actually schedule
-# any segment cleaning.
-# Syntax: $0 --localdb=LOCALDB prune-db
-def cmd_prune_db():
+def cmd_prune_db(args):
+ """ Delete old snapshots from the local database, though do not
+ actually schedule any segment cleaning.
+ Syntax: $0 --localdb=LOCALDB prune-db
+ """
db = lbs.LocalDatabase(options.localdb)
# Delete old snapshots from the local database.
#db.garbage_collect()
#db.commit()
-# Run the segment cleaner.
-# Syntax: $0 --localdb=LOCALDB clean
-def cmd_clean(clean_threshold=7.0):
+def cmd_clean(args, clean_threshold=7.0):
+ """ Run the segment cleaner.
+ Syntax: $0 --localdb=LOCALDB clean
+ """
db = lbs.LocalDatabase(options.localdb)
# Delete old snapshots from the local database.
db.balance_expired_objects()
db.commit()
-# List snapshots stored.
-# Syntax: $0 --data=DATADIR list-snapshots
-def cmd_list_snapshots():
+def cmd_list_snapshots(args):
+ """ List snapshots stored.
+ Syntax: $0 --data=DATADIR list-snapshots
+ """
store = lbs.LowlevelDataStore(options.store)
for s in sorted(store.list_snapshots()):
print s
-# List size of data needed for each snapshot.
-# Syntax: $0 --data=DATADIR list-snapshot-sizes
-def cmd_list_snapshot_sizes():
+def cmd_list_snapshot_sizes(args):
+ """ List size of data needed for each snapshot.
+ Syntax: $0 --data=DATADIR list-snapshot-sizes
+ """
lowlevel = lbs.LowlevelDataStore(options.store)
lowlevel.scan()
store = lbs.ObjectStore(lowlevel)
previous = set()
+ exts = {}
+ for seg in lowlevel.store.list('segments'):
+ exts.update ([seg.split ('.', 1)])
for s in sorted(lowlevel.list_snapshots()):
d = lbs.parse_full(store.load_snapshot(s))
check_version(d['Format'])
segments = d['Segments'].split()
(size, added, removed, addcount, remcount) = (0, 0, 0, 0, 0)
+ lo_stat = lowlevel.lowlevel_stat
for seg in segments:
- segsize = lowlevel.lowlevel_stat(seg + ".tar.gpg")['size']
+ segsize = lo_stat('.'.join ((seg, exts[seg])))['size']
size += segsize
if seg not in previous:
added += segsize
addcount += 1
for seg in previous:
if seg not in segments:
- removed += lowlevel.lowlevel_stat(seg + ".tar.gpg")['size']
+ removed += lo_stat('.'.join((seg, exts[seg])))['size']
remcount += 1
previous = set(segments)
print "%s [%s]: %.3f +%.3f -%.3f (+%d/-%d segments)" % (s, intent, size / 1024.0**2, added / 1024.0**2, removed / 1024.0**2, addcount, remcount)
-# Search for any files which are not needed by any current snapshots and offer
-# to delete them.
-# Syntax: $0 --store=DATADIR gc
-def cmd_garbage_collect():
+def cmd_garbage_collect(args):
+ """ Search for any files which are not needed by any current
+ snapshots and offer to delete them.
+ Syntax: $0 --store=DATADIR gc
+ """
lowlevel = lbs.LowlevelDataStore(options.store)
lowlevel.scan()
store = lbs.ObjectStore(lowlevel)
lowlevel.store.delete(t, f)
print "Reclaimed space:", reclaimed
-# Build checksum list for objects in the given segments, or all segments if
-# none are specified.
+cmd_gc = cmd_garbage_collect
+
def cmd_object_checksums(segments):
+ """ Build checksum list for objects in the given segments, or all
+ segments if none are specified.
+ """
get_passphrase()
lowlevel = lbs.LowlevelDataStore(options.store)
store = lbs.ObjectStore(lowlevel)
csum = lbs.ChecksumCreator().update(data).compute()
print "%s/%s:%d:%s" % (s, o, len(data), csum)
store.cleanup()
+object_sums = cmd_object_checksums
-# Read a snapshot file
def cmd_read_snapshots(snapshots):
+ """ Read a snapshot file
+ """
get_passphrase()
lowlevel = lbs.LowlevelDataStore(options.store)
store = lbs.ObjectStore(lowlevel)
print d['Segments'].split()
store.cleanup()
-# Produce a flattened metadata dump from a snapshot
-def cmd_read_metadata(snapshot):
+def cmd_read_metadata(args):
+ """ Produce a flattened metadata dump from a snapshot
+ """
+ snapshot = args [0]
get_passphrase()
lowlevel = lbs.LowlevelDataStore(options.store)
store = lbs.ObjectStore(lowlevel)
sys.stdout.write(l)
store.cleanup()
-# Verify snapshot integrity
def cmd_verify_snapshots(snapshots):
+ """ Verify snapshot integrity
+ """
get_passphrase()
lowlevel = lbs.LowlevelDataStore(options.store)
store = lbs.ObjectStore(lowlevel)
print sorted(list(listed_segments - lbs.accessed_segments))
store.cleanup()
-# Restore a snapshot, or some subset of files from it
def cmd_restore_snapshot(args):
+ """ Restore a snapshot, or some subset of files from it
+ """
get_passphrase()
lowlevel = lbs.LowlevelDataStore(options.store)
store = lbs.ObjectStore(lowlevel)
store.cleanup()
+usage = ["%prog [option]... command [arg]...", "", "Commands:"]
+cmd = method = None
+for cmd, method in locals().iteritems():
+ if cmd.startswith ('cmd_'):
+ usage.append(cmd[4:].replace('_', '-') + ':' + method.__doc__)
+parser = OptionParser(usage="\n".join(usage))
+parser.add_option("-v", action="store_true", dest="verbose", default=False,
+ help="increase verbosity")
+parser.add_option("-n", action="store_true", dest="dry_run", default=False,
+ help="dry run")
+parser.add_option("--store", dest="store",
+ help="specify path to backup data store")
+parser.add_option("--localdb", dest="localdb",
+ help="specify path to local database")
+parser.add_option("--intent", dest="intent", default=1.0,
+ help="give expected next snapshot type when cleaning")
+(options, args) = parser.parse_args(sys.argv[1:])
+
if len(args) == 0:
parser.print_usage()
sys.exit(1)
cmd = args[0]
args = args[1:]
-if cmd == 'clean':
- cmd_clean()
-elif cmd == 'prune-db':
- cmd_prune_db()
-elif cmd == 'list-snapshots':
- cmd_list_snapshots()
-elif cmd == 'object-sums':
- cmd_object_checksums(args)
-elif cmd == 'read-snapshots':
- cmd_read_snapshots(args)
-elif cmd == 'read-metadata':
- cmd_read_metadata(args[0])
-elif cmd == 'list-snapshot-sizes':
- cmd_list_snapshot_sizes()
-elif cmd == 'gc':
- cmd_garbage_collect()
-elif cmd == 'verify-snapshots':
- cmd_verify_snapshots(args)
-elif cmd == 'restore-snapshot':
- cmd_restore_snapshot(args)
+method = locals().get('cmd_' + cmd.replace('-', '_'))
+if method:
+ method (args)
else:
print "Unknown command:", cmd
parser.print_usage()
pass
-class Store:
+class Store (object):
"""Base class for all cumulus storage backends."""
+ def __new__ (cls, url, **kw):
+ """ Return the correct sub-class depending on url,
+ pass parsed url parameters to object
+ """
+ if cls != Store:
+ return super(Store, cls).__new__(cls, url, **kw)
+ (scheme, netloc, path, params, query, fragment) \
+ = urlparse.urlparse(url)
+
+ try:
+ cumulus = __import__('cumulus.store.%s' % scheme, globals())
+ subcls = getattr (cumulus.store, scheme).Store
+ obj = super(Store, cls).__new__(subcls, url, **kw)
+ obj.scheme = scheme
+ obj.netloc = netloc
+ obj.path = path
+ obj.params = params
+ obj.query = query
+ obj.fragment = fragment
+ return obj
+ except ImportError:
+ raise NotImplementedError, "Scheme %s not implemented" % scheme
+
def list(self, type):
- raise NotImplementedException
+ raise NotImplementedError
def get(self, type, name):
- raise NotImplementedException
+ raise NotImplementedError
def put(self, type, name, fp):
- raise NotImplementedException
+ raise NotImplementedError
def delete(self, type, name):
- raise NotImplementedException
+ raise NotImplementedError
def stat(self, type, name):
- raise NotImplementedException
+ raise NotImplementedError
def scan(self):
"""Cache file information stored in this backend.
pass
def open(url):
- (scheme, netloc, path, params, query, fragment) \
- = urlparse.urlparse(url)
-
- if scheme == "file":
- import cumulus.store.file
- return cumulus.store.file.FileStore(path)
- elif scheme == "s3":
- import cumulus.store.s3
- while path.startswith("/"): path = path[1:]
- (bucket, path) = path.split("/", 1)
- return cumulus.store.s3.S3Store(bucket, path)
- else:
- raise NotImplementedException
+ return Store(url)
--- /dev/null
+
+from ftplib import FTP, all_errors
+from netrc import netrc, NetrcParseError
+from cumulus.store import Store, type_patterns, NotFoundError
+
+class FtpStore (Store):
+ def __init__ (self, url, **kw):
+ self.synced = True
+ try:
+ upw, hp = self.netloc.split ('@')
+ except ValueError:
+ hp = self.netloc
+ upw = 'anonymous'
+ try:
+ host, port = hp.split (':')
+ port = int (port, 10)
+ except ValueError:
+ host = hp
+ port = 21
+ try:
+ user, passwd = upw.split (':')
+ except ValueError:
+ user = upw
+ passwd = None
+ try:
+ n = netrc ()
+ try:
+ user, acct, passwd = n.authenticators (host)
+ except ValueError:
+ pass
+ except (IOError, NetrcParseError):
+ pass
+ self.ftp = FTP ()
+ self.ftp.connect (host, port)
+ self.ftp.login (user, passwd)
+ self.prefix = self.path [1:] # skip *only* first '/'
+ self.ftp.cwd (self.prefix)
+
+ def _get_path (self, type, name):
+ # we are in right directory
+ return name
+
+ def list (self, type):
+ self.sync ()
+ files = self.ftp.nlst ()
+ return (f for f in files if type_patterns[type].match (f))
+
+ def get (self, type, name):
+ self.sync ()
+ sock = self.ftp.transfercmd ('RETR %s' % self._get_path (type, name))
+ self.synced = False
+ return sock.makefile ()
+
+ def put (self, type, name, fp):
+ self.sync ()
+ self.ftp.storbinary ("STOR %s" % self._get_path (type, name), fp)
+
+ def delete (self, type, name):
+ self.sync ()
+ self.ftp.delete (self._get_path (type, name))
+
+ def stat (self, type, name):
+ """ Note that the size-command is non-standard but supported by
+ most ftp servers today. If size returns an error condition we
+ try nlst to detect if the file exists and return an bogus length
+ """
+ self.sync ()
+ fn = self._get_path (type, name)
+ size = None
+ try:
+ # my client doesn't accept size in ascii-mode
+ self.ftp.sendcmd ('TYPE I')
+ size = self.ftp.size (fn)
+ self.ftp.sendcmd ('TYPE A')
+ except all_errors, err:
+ print err
+ pass
+ if size is not None:
+ return {'size': size}
+ print "nlst: %s" % fn, size
+ l = self.ftp.nlst (fn)
+ if l:
+ return {'size': 42}
+ raise NotFoundError, (type, name)
+
+ def sync (self):
+ """ After a get command at end of transfer a 2XX reply is still
+ in the input-queue, we have to get rid of that
+ """
+ if not self.synced:
+ self.ftp.voidresp()
+ self.synced = True
+
+Store = FtpStore