From: Michael Vrable Date: Wed, 6 Aug 2008 19:11:22 +0000 (-0700) Subject: Begin new storage-abstraction layer. X-Git-Url: http://git.vrable.net/?p=cumulus.git;a=commitdiff_plain;h=8bff41ddef78fa851b09d141c93bdf387abc1dee Begin new storage-abstraction layer. Begin work on new Python code for providing uniform access to both local filesystem and remote S3 storage. Convert the existing Python module to use the new interface. --- diff --git a/cumulus-util b/cumulus-util index 59c5fc7..f5d5fa3 100755 --- a/cumulus-util +++ b/cumulus-util @@ -4,7 +4,10 @@ import getpass, os, stat, sys, time from optparse import OptionParser -import lbs +import cumulus + +# Compatibility +lbs = cumulus # We support up to "LBS Snapshot v0.8" formats, but are also limited by the lbs # module. diff --git a/lbs.py b/lbs.py deleted file mode 100644 index b8de982..0000000 --- a/lbs.py +++ /dev/null @@ -1,765 +0,0 @@ -"""High-level interface for working with LBS archives. - -This module provides an easy interface for reading from and manipulating -various parts of an LBS archive: - - listing the snapshots and segments present - - reading segment contents - - parsing snapshot descriptors and snapshot metadata logs - - reading and maintaining the local object database -""" - -from __future__ import division -import os, re, sha, tarfile, tempfile, thread -from pysqlite2 import dbapi2 as sqlite3 - -# The largest supported snapshot format that can be understood. -FORMAT_VERSION = (0, 8) # LBS Snapshot v0.8 - -# Maximum number of nested indirect references allowed in a snapshot. -MAX_RECURSION_DEPTH = 3 - -# All segments which have been accessed this session. -accessed_segments = set() - -class Struct: - """A class which merely acts as a data container. - - Instances of this class (or its subclasses) are merely used to store data - in various attributes. No methods are provided. - """ - - def __repr__(self): - return "<%s %s>" % (self.__class__, self.__dict__) - -CHECKSUM_ALGORITHMS = { - 'sha1': sha.new -} - -class ChecksumCreator: - """Compute an LBS checksum for provided data. - - The algorithm used is selectable, but currently defaults to sha1. - """ - - def __init__(self, algorithm='sha1'): - self.algorithm = algorithm - self.hash = CHECKSUM_ALGORITHMS[algorithm]() - - def update(self, data): - self.hash.update(data) - return self - - def compute(self): - return "%s=%s" % (self.algorithm, self.hash.hexdigest()) - -class ChecksumVerifier: - """Verify whether a checksum from a snapshot matches the supplied data.""" - - def __init__(self, checksumstr): - """Create an object to check the supplied checksum.""" - - (algo, checksum) = checksumstr.split("=", 1) - self.checksum = checksum - self.hash = CHECKSUM_ALGORITHMS[algo]() - - def update(self, data): - self.hash.update(data) - - def valid(self): - """Return a boolean indicating whether the checksum matches.""" - - result = self.hash.hexdigest() - return result == self.checksum - -class LowlevelDataStore: - """Access to the backup store containing segments and snapshot descriptors. - - Instances of this class are used to get direct filesystem-level access to - the backup data. To read a backup, a caller will ordinarily not care about - direct access to backup segments, but will instead merely need to access - objects from those segments. The ObjectStore class provides a suitable - wrapper around a DataStore to give this high-level access. - """ - - def __init__(self, path): - self.path = path - - # Low-level filesystem access. These methods could be overwritten to - # provide access to remote data stores. - def lowlevel_list(self): - """Get a listing of files stored.""" - - return os.listdir(self.path) - - def lowlevel_open(self, filename): - """Return a file-like object for reading data from the given file.""" - - return open(os.path.join(self.path, filename), 'rb') - - def lowlevel_stat(self, filename): - """Return a dictionary of information about the given file. - - Currently, the only defined field is 'size', giving the size of the - file in bytes. - """ - - stat = os.stat(os.path.join(self.path, filename)) - return {'size': stat.st_size} - - # Slightly higher-level list methods. - def list_snapshots(self): - for f in self.lowlevel_list(): - m = re.match(r"^snapshot-(.*)\.lbs$", f) - if m: - yield m.group(1) - - def list_segments(self): - for f in self.lowlevel_list(): - m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f) - if m: - yield m.group(1) - -class ObjectStore: - def __init__(self, data_store): - self.store = data_store - self.cachedir = None - self.CACHE_SIZE = 16 - self.lru_list = [] - - def get_cachedir(self): - if self.cachedir is None: - self.cachedir = tempfile.mkdtemp(".lbs") - return self.cachedir - - def cleanup(self): - if self.cachedir is not None: - # TODO: Avoid use of system, make this safer - os.system("rm -rf " + self.cachedir) - self.cachedir = None - - @staticmethod - def parse_ref(refstr): - m = re.match(r"^zero\[(\d+)\]$", refstr) - if m: - return ("zero", None, None, (0, int(m.group(1)))) - - m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr) - if not m: return - - segment = m.group(1) - object = m.group(2) - checksum = m.group(3) - slice = m.group(4) - - if checksum is not None: - checksum = checksum.lstrip("(").rstrip(")") - - if slice is not None: - if m.group(9) is not None: - # Size-assertion slice - slice = (0, int(m.group(9)), True) - elif m.group(6) is None: - # Abbreviated slice - slice = (0, int(m.group(8)), False) - else: - slice = (int(m.group(7)), int(m.group(8)), False) - - return (segment, object, checksum, slice) - - def get_segment(self, segment): - accessed_segments.add(segment) - raw = self.store.lowlevel_open(segment + ".tar.gpg") - - (input, output) = os.popen2("lbs-filter-gpg --decrypt") - def copy_thread(src, dst): - BLOCK_SIZE = 4096 - while True: - block = src.read(BLOCK_SIZE) - if len(block) == 0: break - dst.write(block) - dst.close() - - thread.start_new_thread(copy_thread, (raw, input)) - return output - - def load_segment(self, segment): - seg = tarfile.open(segment, 'r|', self.get_segment(segment)) - for item in seg: - data_obj = seg.extractfile(item) - path = item.name.split('/') - if len(path) == 2 and path[0] == segment: - yield (path[1], data_obj.read()) - - def load_snapshot(self, snapshot): - file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs") - return file.read().splitlines(True) - - def extract_segment(self, segment): - segdir = os.path.join(self.get_cachedir(), segment) - os.mkdir(segdir) - for (object, data) in self.load_segment(segment): - f = open(os.path.join(segdir, object), 'wb') - f.write(data) - f.close() - - def load_object(self, segment, object): - accessed_segments.add(segment) - path = os.path.join(self.get_cachedir(), segment, object) - if not os.access(path, os.R_OK): - self.extract_segment(segment) - if segment in self.lru_list: self.lru_list.remove(segment) - self.lru_list.append(segment) - while len(self.lru_list) > self.CACHE_SIZE: - os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0])) - self.lru_list = self.lru_list[1:] - return open(path, 'rb').read() - - def get(self, refstr): - """Fetch the given object and return it. - - The input should be an object reference, in string form. - """ - - (segment, object, checksum, slice) = self.parse_ref(refstr) - - if segment == "zero": - return "\0" * slice[1] - - data = self.load_object(segment, object) - - if checksum is not None: - verifier = ChecksumVerifier(checksum) - verifier.update(data) - if not verifier.valid(): - raise ValueError - - if slice is not None: - (start, length, exact) = slice - if exact and len(data) != length: raise ValueError - data = data[start:start+length] - if len(data) != length: raise IndexError - - return data - -def parse(lines, terminate=None): - """Generic parser for RFC822-style "Key: Value" data streams. - - This parser can be used to read metadata logs and snapshot root descriptor - files. - - lines must be an iterable object which yields a sequence of lines of input. - - If terminate is specified, it is used as a predicate to determine when to - stop reading input lines. - """ - - dict = {} - last_key = None - - for l in lines: - # Strip off a trailing newline, if present - if len(l) > 0 and l[-1] == "\n": - l = l[:-1] - - if terminate is not None and terminate(l): - if len(dict) > 0: yield dict - dict = {} - last_key = None - continue - - m = re.match(r"^([-\w]+):\s*(.*)$", l) - if m: - dict[m.group(1)] = m.group(2) - last_key = m.group(1) - elif len(l) > 0 and l[0].isspace() and last_key is not None: - dict[last_key] += l - else: - last_key = None - - if len(dict) > 0: yield dict - -def parse_full(lines): - try: - return parse(lines).next() - except StopIteration: - return {} - -def parse_metadata_version(s): - """Convert a string with the snapshot version format to a tuple.""" - - m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s) - if m is None: - return () - else: - return tuple([int(d) for d in m.group(1).split(".")]) - -def read_metadata(object_store, root): - """Iterate through all lines in the metadata log, following references.""" - - # Stack for keeping track of recursion when following references to - # portions of the log. The last entry in the stack corresponds to the - # object currently being parsed. Each entry is a list of lines which have - # been reversed, so that popping successive lines from the end of each list - # will return lines of the metadata log in order. - stack = [] - - def follow_ref(refstr): - if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError - lines = object_store.get(refstr).splitlines(True) - lines.reverse() - stack.append(lines) - - follow_ref(root) - - while len(stack) > 0: - top = stack[-1] - if len(top) == 0: - stack.pop() - continue - line = top.pop() - - # An indirect reference which we must follow? - if len(line) > 0 and line[0] == '@': - ref = line[1:] - ref.strip() - follow_ref(ref) - else: - yield line - -class MetadataItem: - """Metadata for a single file (or directory or...) from a snapshot.""" - - # Functions for parsing various datatypes that can appear in a metadata log - # item. - @staticmethod - def decode_int(s): - """Decode an integer, expressed in decimal, octal, or hexadecimal.""" - if s.startswith("0x"): - return int(s, 16) - elif s.startswith("0"): - return int(s, 8) - else: - return int(s, 10) - - @staticmethod - def decode_str(s): - """Decode a URI-encoded (%xx escapes) string.""" - def hex_decode(m): return chr(int(m.group(1), 16)) - return re.sub(r"%([0-9a-f]{2})", hex_decode, s) - - @staticmethod - def raw_str(s): - """An unecoded string.""" - return s - - @staticmethod - def decode_user(s): - """Decode a user/group to a tuple of uid/gid followed by name.""" - items = s.split() - uid = MetadataItem.decode_int(items[0]) - name = None - if len(items) > 1: - if items[1].startswith("(") and items[1].endswith(")"): - name = MetadataItem.decode_str(items[1][1:-1]) - return (uid, name) - - @staticmethod - def decode_device(s): - """Decode a device major/minor number.""" - (major, minor) = map(MetadataItem.decode_int, s.split("/")) - return (major, minor) - - class Items: pass - - def __init__(self, fields, object_store): - """Initialize from a dictionary of key/value pairs from metadata log.""" - - self.fields = fields - self.object_store = object_store - self.keys = [] - self.items = self.Items() - for (k, v) in fields.items(): - if k in self.field_types: - decoder = self.field_types[k] - setattr(self.items, k, decoder(v)) - self.keys.append(k) - - def data(self): - """Return an iterator for the data blocks that make up a file.""" - - # This traverses the list of blocks that make up a file, following - # indirect references. It is implemented in much the same way as - # read_metadata, so see that function for details of the technique. - - objects = self.fields['data'].split() - objects.reverse() - stack = [objects] - - def follow_ref(refstr): - if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError - objects = self.object_store.get(refstr).split() - objects.reverse() - stack.append(objects) - - while len(stack) > 0: - top = stack[-1] - if len(top) == 0: - stack.pop() - continue - ref = top.pop() - - # An indirect reference which we must follow? - if len(ref) > 0 and ref[0] == '@': - follow_ref(ref[1:]) - else: - yield ref - -# Description of fields that might appear, and how they should be parsed. -MetadataItem.field_types = { - 'name': MetadataItem.decode_str, - 'type': MetadataItem.raw_str, - 'mode': MetadataItem.decode_int, - 'device': MetadataItem.decode_device, - 'user': MetadataItem.decode_user, - 'group': MetadataItem.decode_user, - 'ctime': MetadataItem.decode_int, - 'mtime': MetadataItem.decode_int, - 'links': MetadataItem.decode_int, - 'inode': MetadataItem.raw_str, - 'checksum': MetadataItem.decode_str, - 'size': MetadataItem.decode_int, - 'contents': MetadataItem.decode_str, - 'target': MetadataItem.decode_str, -} - -def iterate_metadata(object_store, root): - for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0): - yield MetadataItem(d, object_store) - -class LocalDatabase: - """Access to the local database of snapshot contents and object checksums. - - The local database is consulted when creating a snapshot to determine what - data can be re-used from old snapshots. Segment cleaning is performed by - manipulating the data in the local database; the local database also - includes enough data to guide the segment cleaning process. - """ - - def __init__(self, path, dbname="localdb.sqlite"): - self.db_connection = sqlite3.connect(path + "/" + dbname) - - # Low-level database access. Use these methods when there isn't a - # higher-level interface available. Exception: do, however, remember to - # use the commit() method after making changes to make sure they are - # actually saved, even when going through higher-level interfaces. - def commit(self): - "Commit any pending changes to the local database." - self.db_connection.commit() - - def rollback(self): - "Roll back any pending changes to the local database." - self.db_connection.rollback() - - def cursor(self): - "Return a DB-API cursor for directly accessing the local database." - return self.db_connection.cursor() - - def list_schemes(self): - """Return the list of snapshots found in the local database. - - The returned value is a list of tuples (id, scheme, name, time, intent). - """ - - cur = self.cursor() - cur.execute("select distinct scheme from snapshots") - schemes = [row[0] for row in cur.fetchall()] - schemes.sort() - return schemes - - def garbage_collect(self, scheme, intent=1.0): - """Delete entries from old snapshots from the database. - - Only snapshots with the specified scheme name will be deleted. If - intent is given, it gives the intended next snapshot type, to determine - how aggressively to clean (for example, intent=7 could be used if the - next snapshot will be a weekly snapshot). - """ - - cur = self.cursor() - - # Find the id of the last snapshot to be created. This is used for - # measuring time in a way: we record this value in each segment we - # expire on this run, and then on a future run can tell if there have - # been intervening backups made. - cur.execute("select max(snapshotid) from snapshots") - last_snapshotid = cur.fetchone()[0] - - # Get the list of old snapshots for this scheme. Delete all the old - # ones. Rules for what to keep: - # - Always keep the most recent snapshot. - # - If snapshot X is younger than Y, and X has higher intent, then Y - # can be deleted. - cur.execute("""select snapshotid, name, intent, - julianday('now') - timestamp as age - from snapshots where scheme = ? - order by age""", (scheme,)) - - first = True - max_intent = intent - for (id, name, snap_intent, snap_age) in cur.fetchall(): - can_delete = False - if snap_intent < max_intent: - # Delete small-intent snapshots if there is a more recent - # large-intent snapshot. - can_delete = True - elif snap_intent == intent: - # Delete previous snapshots with the specified intent level. - can_delete = True - - if can_delete and not first: - print "Delete snapshot %d (%s)" % (id, name) - cur.execute("delete from snapshots where snapshotid = ?", - (id,)) - first = False - max_intent = max(max_intent, snap_intent) - - # Delete entries in the segments_used table which are for non-existent - # snapshots. - cur.execute("""delete from segments_used - where snapshotid not in - (select snapshotid from snapshots)""") - - # Find segments which contain no objects used by any current snapshots, - # and delete them from the segment table. - cur.execute("""delete from segments where segmentid not in - (select segmentid from segments_used)""") - - # Delete unused objects in the block_index table. By "unused", we mean - # any object which was stored in a segment which has been deleted, and - # any object in a segment which was marked for cleaning and has had - # cleaning performed already (the expired time is less than the current - # largest snapshot id). - cur.execute("""delete from block_index - where segmentid not in (select segmentid from segments) - or segmentid in (select segmentid from segments - where expire_time < ?)""", - (last_snapshotid,)) - - # Remove sub-block signatures for deleted objects. - cur.execute("""delete from subblock_signatures - where blockid not in - (select blockid from block_index)""") - - # Segment cleaning. - class SegmentInfo(Struct): pass - - def get_segment_cleaning_list(self, age_boost=0.0): - """Return a list of all current segments with information for cleaning. - - Return all segments which are currently known in the local database - (there might be other, older segments in the archive itself), and - return usage statistics for each to help decide which segments to - clean. - - The returned list will be sorted by estimated cleaning benefit, with - segments that are best to clean at the start of the list. - - If specified, the age_boost parameter (measured in days) will added to - the age of each segment, as a way of adjusting the benefit computation - before a long-lived snapshot is taken (for example, age_boost might be - set to 7 when cleaning prior to taking a weekly snapshot). - """ - - cur = self.cursor() - segments = [] - cur.execute("""select segmentid, used, size, mtime, - julianday('now') - mtime as age from segment_info - where expire_time is null""") - for row in cur: - info = self.SegmentInfo() - info.id = row[0] - info.used_bytes = row[1] - info.size_bytes = row[2] - info.mtime = row[3] - info.age_days = row[4] - - # If data is not available for whatever reason, treat it as 0.0. - if info.age_days is None: - info.age_days = 0.0 - if info.used_bytes is None: - info.used_bytes = 0.0 - - # Benefit calculation: u is the estimated fraction of each segment - # which is utilized (bytes belonging to objects still in use - # divided by total size; this doesn't take compression or storage - # overhead into account, but should give a reasonable estimate). - # - # The total benefit is a heuristic that combines several factors: - # the amount of space that can be reclaimed (1 - u), an ageing - # factor (info.age_days) that favors cleaning old segments to young - # ones and also is more likely to clean segments that will be - # rewritten for long-lived snapshots (age_boost), and finally a - # penalty factor for the cost of re-uploading data (u + 0.1). - u = info.used_bytes / info.size_bytes - info.cleaning_benefit \ - = (1 - u) * (info.age_days + age_boost) / (u + 0.1) - - segments.append(info) - - segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True) - return segments - - def mark_segment_expired(self, segment): - """Mark a segment for cleaning in the local database. - - The segment parameter should be either a SegmentInfo object or an - integer segment id. Objects in the given segment will be marked as - expired, which means that any future snapshots that would re-use those - objects will instead write out a new copy of the object, and thus no - future snapshots will depend upon the given segment. - """ - - if isinstance(segment, int): - id = segment - elif isinstance(segment, self.SegmentInfo): - id = segment.id - else: - raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment))) - - cur = self.cursor() - cur.execute("select max(snapshotid) from snapshots") - last_snapshotid = cur.fetchone()[0] - cur.execute("update segments set expire_time = ? where segmentid = ?", - (last_snapshotid, id)) - cur.execute("update block_index set expired = 0 where segmentid = ?", - (id,)) - - def balance_expired_objects(self): - """Analyze expired objects in segments to be cleaned and group by age. - - Update the block_index table of the local database to group expired - objects by age. The exact number of buckets and the cutoffs for each - are dynamically determined. Calling this function after marking - segments expired will help in the segment cleaning process, by ensuring - that when active objects from clean segments are rewritten, they will - be placed into new segments roughly grouped by age. - """ - - # The expired column of the block_index table is used when generating a - # new LBS snapshot. A null value indicates that an object may be - # re-used. Otherwise, an object must be written into a new segment if - # needed. Objects with distinct expired values will be written into - # distinct segments, to allow for some grouping by age. The value 0 is - # somewhat special in that it indicates any rewritten objects can be - # placed in the same segment as completely new objects; this can be - # used for very young objects which have been expired, or objects not - # expected to be encountered. - # - # In the balancing process, all objects which are not used in any - # current snapshots will have expired set to 0. Objects which have - # been seen will be sorted by age and will have expired values set to - # 0, 1, 2, and so on based on age (with younger objects being assigned - # lower values). The number of buckets and the age cutoffs is - # determined by looking at the distribution of block ages. - - cur = self.cursor() - - # Mark all expired objects with expired = 0; these objects will later - # have values set to indicate groupings of objects when repacking. - cur.execute("""update block_index set expired = 0 - where expired is not null""") - - # We will want to aim for at least one full segment for each bucket - # that we eventually create, but don't know how many bytes that should - # be due to compression. So compute the average number of bytes in - # each expired segment as a rough estimate for the minimum size of each - # bucket. (This estimate could be thrown off by many not-fully-packed - # segments, but for now don't worry too much about that.) If we can't - # compute an average, it's probably because there are no expired - # segments, so we have no more work to do. - cur.execute("""select avg(size) from segments - where segmentid in - (select distinct segmentid from block_index - where expired is not null)""") - segment_size_estimate = cur.fetchone()[0] - if not segment_size_estimate: - return - - # Next, extract distribution of expired objects (number and size) by - # age. Save the timestamp for "now" so that the classification of - # blocks into age buckets will not change later in the function, after - # time has passed. Set any timestamps in the future to now, so we are - # guaranteed that for the rest of this function, age is always - # non-negative. - cur.execute("select julianday('now')") - now = cur.fetchone()[0] - - cur.execute("""update block_index set timestamp = ? - where timestamp > ? and expired is not null""", - (now, now)) - - cur.execute("""select round(? - timestamp) as age, count(*), sum(size) - from block_index where expired = 0 - group by age order by age""", (now,)) - distribution = cur.fetchall() - - # Start to determine the buckets for expired objects. Heuristics used: - # - An upper bound on the number of buckets is given by the number of - # segments we estimate it will take to store all data. In fact, - # aim for a couple of segments per bucket. - # - Place very young objects in bucket 0 (place with new objects) - # unless there are enough of them to warrant a separate bucket. - # - Try not to create unnecessarily many buckets, since fewer buckets - # will allow repacked data to be grouped based on spatial locality - # (while more buckets will group by temporal locality). We want a - # balance. - MIN_AGE = 4 - total_bytes = sum([i[2] for i in distribution]) - target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4 - min_size = 1.5 * segment_size_estimate - target_size = max(2 * segment_size_estimate, - total_bytes / target_buckets) - - print "segment_size:", segment_size_estimate - print "distribution:", distribution - print "total_bytes:", total_bytes - print "target_buckets:", target_buckets - print "min, target size:", min_size, target_size - - # Chosen cutoffs. Each bucket consists of objects with age greater - # than one cutoff value, but not greater than the next largest cutoff. - cutoffs = [] - - # Starting with the oldest objects, begin grouping together into - # buckets of size at least target_size bytes. - distribution.reverse() - bucket_size = 0 - min_age_bucket = False - for (age, items, size) in distribution: - if bucket_size >= target_size \ - or (age < MIN_AGE and not min_age_bucket): - if bucket_size < target_size and len(cutoffs) > 0: - cutoffs.pop() - cutoffs.append(age) - bucket_size = 0 - - bucket_size += size - if age < MIN_AGE: - min_age_bucket = True - - # The last (youngest) bucket will be group 0, unless it has enough data - # to be of size min_size by itself, or there happen to be no objects - # less than MIN_AGE at all. - if bucket_size >= min_size or not min_age_bucket: - cutoffs.append(-1) - cutoffs.append(-1) - - print "cutoffs:", cutoffs - - # Update the database to assign each object to the appropriate bucket. - cutoffs.reverse() - for i in range(len(cutoffs)): - cur.execute("""update block_index set expired = ? - where round(? - timestamp) > ? - and expired is not null""", - (i, now, cutoffs[i])) diff --git a/python/cumulus/__init__.py b/python/cumulus/__init__.py new file mode 100644 index 0000000..602e70e --- /dev/null +++ b/python/cumulus/__init__.py @@ -0,0 +1,765 @@ +"""High-level interface for working with LBS archives. + +This module provides an easy interface for reading from and manipulating +various parts of an LBS archive: + - listing the snapshots and segments present + - reading segment contents + - parsing snapshot descriptors and snapshot metadata logs + - reading and maintaining the local object database +""" + +from __future__ import division +import os, re, sha, tarfile, tempfile, thread +from pysqlite2 import dbapi2 as sqlite3 + +import cumulus.store, cumulus.store.file + +# The largest supported snapshot format that can be understood. +FORMAT_VERSION = (0, 8) # LBS Snapshot v0.8 + +# Maximum number of nested indirect references allowed in a snapshot. +MAX_RECURSION_DEPTH = 3 + +# All segments which have been accessed this session. +accessed_segments = set() + +class Struct: + """A class which merely acts as a data container. + + Instances of this class (or its subclasses) are merely used to store data + in various attributes. No methods are provided. + """ + + def __repr__(self): + return "<%s %s>" % (self.__class__, self.__dict__) + +CHECKSUM_ALGORITHMS = { + 'sha1': sha.new +} + +class ChecksumCreator: + """Compute an LBS checksum for provided data. + + The algorithm used is selectable, but currently defaults to sha1. + """ + + def __init__(self, algorithm='sha1'): + self.algorithm = algorithm + self.hash = CHECKSUM_ALGORITHMS[algorithm]() + + def update(self, data): + self.hash.update(data) + return self + + def compute(self): + return "%s=%s" % (self.algorithm, self.hash.hexdigest()) + +class ChecksumVerifier: + """Verify whether a checksum from a snapshot matches the supplied data.""" + + def __init__(self, checksumstr): + """Create an object to check the supplied checksum.""" + + (algo, checksum) = checksumstr.split("=", 1) + self.checksum = checksum + self.hash = CHECKSUM_ALGORITHMS[algo]() + + def update(self, data): + self.hash.update(data) + + def valid(self): + """Return a boolean indicating whether the checksum matches.""" + + result = self.hash.hexdigest() + return result == self.checksum + +class LowlevelDataStore: + """Access to the backup store containing segments and snapshot descriptors. + + Instances of this class are used to get direct filesystem-level access to + the backup data. To read a backup, a caller will ordinarily not care about + direct access to backup segments, but will instead merely need to access + objects from those segments. The ObjectStore class provides a suitable + wrapper around a DataStore to give this high-level access. + """ + + def __init__(self, path): + self.store = cumulus.store.file.FileStore(path) + + def _classify(self, filename): + for (t, r) in cumulus.store.type_patterns.items(): + if r.match(filename): + return (t, filename) + return (None, filename) + + def lowlevel_open(self, filename): + """Return a file-like object for reading data from the given file.""" + + (type, filename) = self._classify(filename) + return self.store.get(type, filename) + + def lowlevel_stat(self, filename): + """Return a dictionary of information about the given file. + + Currently, the only defined field is 'size', giving the size of the + file in bytes. + """ + + (type, filename) = self._classify(filename) + return self.store.stat(type, filename) + + # Slightly higher-level list methods. + def list_snapshots(self): + for f in self.store.list('snapshots'): + m = cumulus.store.type_patterns['snapshots'].match(f) + if m: yield m.group(1) + + def list_segments(self): + for f in self.store.list('segments'): + m = cumulus.store.type_patterns['segments'].match(f) + if m: yield m.group(1) + +class ObjectStore: + def __init__(self, data_store): + self.store = data_store + self.cachedir = None + self.CACHE_SIZE = 16 + self.lru_list = [] + + def get_cachedir(self): + if self.cachedir is None: + self.cachedir = tempfile.mkdtemp(".lbs") + return self.cachedir + + def cleanup(self): + if self.cachedir is not None: + # TODO: Avoid use of system, make this safer + os.system("rm -rf " + self.cachedir) + self.cachedir = None + + @staticmethod + def parse_ref(refstr): + m = re.match(r"^zero\[(\d+)\]$", refstr) + if m: + return ("zero", None, None, (0, int(m.group(1)))) + + m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr) + if not m: return + + segment = m.group(1) + object = m.group(2) + checksum = m.group(3) + slice = m.group(4) + + if checksum is not None: + checksum = checksum.lstrip("(").rstrip(")") + + if slice is not None: + if m.group(9) is not None: + # Size-assertion slice + slice = (0, int(m.group(9)), True) + elif m.group(6) is None: + # Abbreviated slice + slice = (0, int(m.group(8)), False) + else: + slice = (int(m.group(7)), int(m.group(8)), False) + + return (segment, object, checksum, slice) + + def get_segment(self, segment): + accessed_segments.add(segment) + raw = self.store.lowlevel_open(segment + ".tar.gpg") + + (input, output) = os.popen2("lbs-filter-gpg --decrypt") + def copy_thread(src, dst): + BLOCK_SIZE = 4096 + while True: + block = src.read(BLOCK_SIZE) + if len(block) == 0: break + dst.write(block) + dst.close() + + thread.start_new_thread(copy_thread, (raw, input)) + return output + + def load_segment(self, segment): + seg = tarfile.open(segment, 'r|', self.get_segment(segment)) + for item in seg: + data_obj = seg.extractfile(item) + path = item.name.split('/') + if len(path) == 2 and path[0] == segment: + yield (path[1], data_obj.read()) + + def load_snapshot(self, snapshot): + file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs") + return file.read().splitlines(True) + + def extract_segment(self, segment): + segdir = os.path.join(self.get_cachedir(), segment) + os.mkdir(segdir) + for (object, data) in self.load_segment(segment): + f = open(os.path.join(segdir, object), 'wb') + f.write(data) + f.close() + + def load_object(self, segment, object): + accessed_segments.add(segment) + path = os.path.join(self.get_cachedir(), segment, object) + if not os.access(path, os.R_OK): + self.extract_segment(segment) + if segment in self.lru_list: self.lru_list.remove(segment) + self.lru_list.append(segment) + while len(self.lru_list) > self.CACHE_SIZE: + os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0])) + self.lru_list = self.lru_list[1:] + return open(path, 'rb').read() + + def get(self, refstr): + """Fetch the given object and return it. + + The input should be an object reference, in string form. + """ + + (segment, object, checksum, slice) = self.parse_ref(refstr) + + if segment == "zero": + return "\0" * slice[1] + + data = self.load_object(segment, object) + + if checksum is not None: + verifier = ChecksumVerifier(checksum) + verifier.update(data) + if not verifier.valid(): + raise ValueError + + if slice is not None: + (start, length, exact) = slice + if exact and len(data) != length: raise ValueError + data = data[start:start+length] + if len(data) != length: raise IndexError + + return data + +def parse(lines, terminate=None): + """Generic parser for RFC822-style "Key: Value" data streams. + + This parser can be used to read metadata logs and snapshot root descriptor + files. + + lines must be an iterable object which yields a sequence of lines of input. + + If terminate is specified, it is used as a predicate to determine when to + stop reading input lines. + """ + + dict = {} + last_key = None + + for l in lines: + # Strip off a trailing newline, if present + if len(l) > 0 and l[-1] == "\n": + l = l[:-1] + + if terminate is not None and terminate(l): + if len(dict) > 0: yield dict + dict = {} + last_key = None + continue + + m = re.match(r"^([-\w]+):\s*(.*)$", l) + if m: + dict[m.group(1)] = m.group(2) + last_key = m.group(1) + elif len(l) > 0 and l[0].isspace() and last_key is not None: + dict[last_key] += l + else: + last_key = None + + if len(dict) > 0: yield dict + +def parse_full(lines): + try: + return parse(lines).next() + except StopIteration: + return {} + +def parse_metadata_version(s): + """Convert a string with the snapshot version format to a tuple.""" + + m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s) + if m is None: + return () + else: + return tuple([int(d) for d in m.group(1).split(".")]) + +def read_metadata(object_store, root): + """Iterate through all lines in the metadata log, following references.""" + + # Stack for keeping track of recursion when following references to + # portions of the log. The last entry in the stack corresponds to the + # object currently being parsed. Each entry is a list of lines which have + # been reversed, so that popping successive lines from the end of each list + # will return lines of the metadata log in order. + stack = [] + + def follow_ref(refstr): + if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError + lines = object_store.get(refstr).splitlines(True) + lines.reverse() + stack.append(lines) + + follow_ref(root) + + while len(stack) > 0: + top = stack[-1] + if len(top) == 0: + stack.pop() + continue + line = top.pop() + + # An indirect reference which we must follow? + if len(line) > 0 and line[0] == '@': + ref = line[1:] + ref.strip() + follow_ref(ref) + else: + yield line + +class MetadataItem: + """Metadata for a single file (or directory or...) from a snapshot.""" + + # Functions for parsing various datatypes that can appear in a metadata log + # item. + @staticmethod + def decode_int(s): + """Decode an integer, expressed in decimal, octal, or hexadecimal.""" + if s.startswith("0x"): + return int(s, 16) + elif s.startswith("0"): + return int(s, 8) + else: + return int(s, 10) + + @staticmethod + def decode_str(s): + """Decode a URI-encoded (%xx escapes) string.""" + def hex_decode(m): return chr(int(m.group(1), 16)) + return re.sub(r"%([0-9a-f]{2})", hex_decode, s) + + @staticmethod + def raw_str(s): + """An unecoded string.""" + return s + + @staticmethod + def decode_user(s): + """Decode a user/group to a tuple of uid/gid followed by name.""" + items = s.split() + uid = MetadataItem.decode_int(items[0]) + name = None + if len(items) > 1: + if items[1].startswith("(") and items[1].endswith(")"): + name = MetadataItem.decode_str(items[1][1:-1]) + return (uid, name) + + @staticmethod + def decode_device(s): + """Decode a device major/minor number.""" + (major, minor) = map(MetadataItem.decode_int, s.split("/")) + return (major, minor) + + class Items: pass + + def __init__(self, fields, object_store): + """Initialize from a dictionary of key/value pairs from metadata log.""" + + self.fields = fields + self.object_store = object_store + self.keys = [] + self.items = self.Items() + for (k, v) in fields.items(): + if k in self.field_types: + decoder = self.field_types[k] + setattr(self.items, k, decoder(v)) + self.keys.append(k) + + def data(self): + """Return an iterator for the data blocks that make up a file.""" + + # This traverses the list of blocks that make up a file, following + # indirect references. It is implemented in much the same way as + # read_metadata, so see that function for details of the technique. + + objects = self.fields['data'].split() + objects.reverse() + stack = [objects] + + def follow_ref(refstr): + if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError + objects = self.object_store.get(refstr).split() + objects.reverse() + stack.append(objects) + + while len(stack) > 0: + top = stack[-1] + if len(top) == 0: + stack.pop() + continue + ref = top.pop() + + # An indirect reference which we must follow? + if len(ref) > 0 and ref[0] == '@': + follow_ref(ref[1:]) + else: + yield ref + +# Description of fields that might appear, and how they should be parsed. +MetadataItem.field_types = { + 'name': MetadataItem.decode_str, + 'type': MetadataItem.raw_str, + 'mode': MetadataItem.decode_int, + 'device': MetadataItem.decode_device, + 'user': MetadataItem.decode_user, + 'group': MetadataItem.decode_user, + 'ctime': MetadataItem.decode_int, + 'mtime': MetadataItem.decode_int, + 'links': MetadataItem.decode_int, + 'inode': MetadataItem.raw_str, + 'checksum': MetadataItem.decode_str, + 'size': MetadataItem.decode_int, + 'contents': MetadataItem.decode_str, + 'target': MetadataItem.decode_str, +} + +def iterate_metadata(object_store, root): + for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0): + yield MetadataItem(d, object_store) + +class LocalDatabase: + """Access to the local database of snapshot contents and object checksums. + + The local database is consulted when creating a snapshot to determine what + data can be re-used from old snapshots. Segment cleaning is performed by + manipulating the data in the local database; the local database also + includes enough data to guide the segment cleaning process. + """ + + def __init__(self, path, dbname="localdb.sqlite"): + self.db_connection = sqlite3.connect(path + "/" + dbname) + + # Low-level database access. Use these methods when there isn't a + # higher-level interface available. Exception: do, however, remember to + # use the commit() method after making changes to make sure they are + # actually saved, even when going through higher-level interfaces. + def commit(self): + "Commit any pending changes to the local database." + self.db_connection.commit() + + def rollback(self): + "Roll back any pending changes to the local database." + self.db_connection.rollback() + + def cursor(self): + "Return a DB-API cursor for directly accessing the local database." + return self.db_connection.cursor() + + def list_schemes(self): + """Return the list of snapshots found in the local database. + + The returned value is a list of tuples (id, scheme, name, time, intent). + """ + + cur = self.cursor() + cur.execute("select distinct scheme from snapshots") + schemes = [row[0] for row in cur.fetchall()] + schemes.sort() + return schemes + + def garbage_collect(self, scheme, intent=1.0): + """Delete entries from old snapshots from the database. + + Only snapshots with the specified scheme name will be deleted. If + intent is given, it gives the intended next snapshot type, to determine + how aggressively to clean (for example, intent=7 could be used if the + next snapshot will be a weekly snapshot). + """ + + cur = self.cursor() + + # Find the id of the last snapshot to be created. This is used for + # measuring time in a way: we record this value in each segment we + # expire on this run, and then on a future run can tell if there have + # been intervening backups made. + cur.execute("select max(snapshotid) from snapshots") + last_snapshotid = cur.fetchone()[0] + + # Get the list of old snapshots for this scheme. Delete all the old + # ones. Rules for what to keep: + # - Always keep the most recent snapshot. + # - If snapshot X is younger than Y, and X has higher intent, then Y + # can be deleted. + cur.execute("""select snapshotid, name, intent, + julianday('now') - timestamp as age + from snapshots where scheme = ? + order by age""", (scheme,)) + + first = True + max_intent = intent + for (id, name, snap_intent, snap_age) in cur.fetchall(): + can_delete = False + if snap_intent < max_intent: + # Delete small-intent snapshots if there is a more recent + # large-intent snapshot. + can_delete = True + elif snap_intent == intent: + # Delete previous snapshots with the specified intent level. + can_delete = True + + if can_delete and not first: + print "Delete snapshot %d (%s)" % (id, name) + cur.execute("delete from snapshots where snapshotid = ?", + (id,)) + first = False + max_intent = max(max_intent, snap_intent) + + # Delete entries in the segments_used table which are for non-existent + # snapshots. + cur.execute("""delete from segments_used + where snapshotid not in + (select snapshotid from snapshots)""") + + # Find segments which contain no objects used by any current snapshots, + # and delete them from the segment table. + cur.execute("""delete from segments where segmentid not in + (select segmentid from segments_used)""") + + # Delete unused objects in the block_index table. By "unused", we mean + # any object which was stored in a segment which has been deleted, and + # any object in a segment which was marked for cleaning and has had + # cleaning performed already (the expired time is less than the current + # largest snapshot id). + cur.execute("""delete from block_index + where segmentid not in (select segmentid from segments) + or segmentid in (select segmentid from segments + where expire_time < ?)""", + (last_snapshotid,)) + + # Remove sub-block signatures for deleted objects. + cur.execute("""delete from subblock_signatures + where blockid not in + (select blockid from block_index)""") + + # Segment cleaning. + class SegmentInfo(Struct): pass + + def get_segment_cleaning_list(self, age_boost=0.0): + """Return a list of all current segments with information for cleaning. + + Return all segments which are currently known in the local database + (there might be other, older segments in the archive itself), and + return usage statistics for each to help decide which segments to + clean. + + The returned list will be sorted by estimated cleaning benefit, with + segments that are best to clean at the start of the list. + + If specified, the age_boost parameter (measured in days) will added to + the age of each segment, as a way of adjusting the benefit computation + before a long-lived snapshot is taken (for example, age_boost might be + set to 7 when cleaning prior to taking a weekly snapshot). + """ + + cur = self.cursor() + segments = [] + cur.execute("""select segmentid, used, size, mtime, + julianday('now') - mtime as age from segment_info + where expire_time is null""") + for row in cur: + info = self.SegmentInfo() + info.id = row[0] + info.used_bytes = row[1] + info.size_bytes = row[2] + info.mtime = row[3] + info.age_days = row[4] + + # If data is not available for whatever reason, treat it as 0.0. + if info.age_days is None: + info.age_days = 0.0 + if info.used_bytes is None: + info.used_bytes = 0.0 + + # Benefit calculation: u is the estimated fraction of each segment + # which is utilized (bytes belonging to objects still in use + # divided by total size; this doesn't take compression or storage + # overhead into account, but should give a reasonable estimate). + # + # The total benefit is a heuristic that combines several factors: + # the amount of space that can be reclaimed (1 - u), an ageing + # factor (info.age_days) that favors cleaning old segments to young + # ones and also is more likely to clean segments that will be + # rewritten for long-lived snapshots (age_boost), and finally a + # penalty factor for the cost of re-uploading data (u + 0.1). + u = info.used_bytes / info.size_bytes + info.cleaning_benefit \ + = (1 - u) * (info.age_days + age_boost) / (u + 0.1) + + segments.append(info) + + segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True) + return segments + + def mark_segment_expired(self, segment): + """Mark a segment for cleaning in the local database. + + The segment parameter should be either a SegmentInfo object or an + integer segment id. Objects in the given segment will be marked as + expired, which means that any future snapshots that would re-use those + objects will instead write out a new copy of the object, and thus no + future snapshots will depend upon the given segment. + """ + + if isinstance(segment, int): + id = segment + elif isinstance(segment, self.SegmentInfo): + id = segment.id + else: + raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment))) + + cur = self.cursor() + cur.execute("select max(snapshotid) from snapshots") + last_snapshotid = cur.fetchone()[0] + cur.execute("update segments set expire_time = ? where segmentid = ?", + (last_snapshotid, id)) + cur.execute("update block_index set expired = 0 where segmentid = ?", + (id,)) + + def balance_expired_objects(self): + """Analyze expired objects in segments to be cleaned and group by age. + + Update the block_index table of the local database to group expired + objects by age. The exact number of buckets and the cutoffs for each + are dynamically determined. Calling this function after marking + segments expired will help in the segment cleaning process, by ensuring + that when active objects from clean segments are rewritten, they will + be placed into new segments roughly grouped by age. + """ + + # The expired column of the block_index table is used when generating a + # new LBS snapshot. A null value indicates that an object may be + # re-used. Otherwise, an object must be written into a new segment if + # needed. Objects with distinct expired values will be written into + # distinct segments, to allow for some grouping by age. The value 0 is + # somewhat special in that it indicates any rewritten objects can be + # placed in the same segment as completely new objects; this can be + # used for very young objects which have been expired, or objects not + # expected to be encountered. + # + # In the balancing process, all objects which are not used in any + # current snapshots will have expired set to 0. Objects which have + # been seen will be sorted by age and will have expired values set to + # 0, 1, 2, and so on based on age (with younger objects being assigned + # lower values). The number of buckets and the age cutoffs is + # determined by looking at the distribution of block ages. + + cur = self.cursor() + + # Mark all expired objects with expired = 0; these objects will later + # have values set to indicate groupings of objects when repacking. + cur.execute("""update block_index set expired = 0 + where expired is not null""") + + # We will want to aim for at least one full segment for each bucket + # that we eventually create, but don't know how many bytes that should + # be due to compression. So compute the average number of bytes in + # each expired segment as a rough estimate for the minimum size of each + # bucket. (This estimate could be thrown off by many not-fully-packed + # segments, but for now don't worry too much about that.) If we can't + # compute an average, it's probably because there are no expired + # segments, so we have no more work to do. + cur.execute("""select avg(size) from segments + where segmentid in + (select distinct segmentid from block_index + where expired is not null)""") + segment_size_estimate = cur.fetchone()[0] + if not segment_size_estimate: + return + + # Next, extract distribution of expired objects (number and size) by + # age. Save the timestamp for "now" so that the classification of + # blocks into age buckets will not change later in the function, after + # time has passed. Set any timestamps in the future to now, so we are + # guaranteed that for the rest of this function, age is always + # non-negative. + cur.execute("select julianday('now')") + now = cur.fetchone()[0] + + cur.execute("""update block_index set timestamp = ? + where timestamp > ? and expired is not null""", + (now, now)) + + cur.execute("""select round(? - timestamp) as age, count(*), sum(size) + from block_index where expired = 0 + group by age order by age""", (now,)) + distribution = cur.fetchall() + + # Start to determine the buckets for expired objects. Heuristics used: + # - An upper bound on the number of buckets is given by the number of + # segments we estimate it will take to store all data. In fact, + # aim for a couple of segments per bucket. + # - Place very young objects in bucket 0 (place with new objects) + # unless there are enough of them to warrant a separate bucket. + # - Try not to create unnecessarily many buckets, since fewer buckets + # will allow repacked data to be grouped based on spatial locality + # (while more buckets will group by temporal locality). We want a + # balance. + MIN_AGE = 4 + total_bytes = sum([i[2] for i in distribution]) + target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4 + min_size = 1.5 * segment_size_estimate + target_size = max(2 * segment_size_estimate, + total_bytes / target_buckets) + + print "segment_size:", segment_size_estimate + print "distribution:", distribution + print "total_bytes:", total_bytes + print "target_buckets:", target_buckets + print "min, target size:", min_size, target_size + + # Chosen cutoffs. Each bucket consists of objects with age greater + # than one cutoff value, but not greater than the next largest cutoff. + cutoffs = [] + + # Starting with the oldest objects, begin grouping together into + # buckets of size at least target_size bytes. + distribution.reverse() + bucket_size = 0 + min_age_bucket = False + for (age, items, size) in distribution: + if bucket_size >= target_size \ + or (age < MIN_AGE and not min_age_bucket): + if bucket_size < target_size and len(cutoffs) > 0: + cutoffs.pop() + cutoffs.append(age) + bucket_size = 0 + + bucket_size += size + if age < MIN_AGE: + min_age_bucket = True + + # The last (youngest) bucket will be group 0, unless it has enough data + # to be of size min_size by itself, or there happen to be no objects + # less than MIN_AGE at all. + if bucket_size >= min_size or not min_age_bucket: + cutoffs.append(-1) + cutoffs.append(-1) + + print "cutoffs:", cutoffs + + # Update the database to assign each object to the appropriate bucket. + cutoffs.reverse() + for i in range(len(cutoffs)): + cur.execute("""update block_index set expired = ? + where round(? - timestamp) > ? + and expired is not null""", + (i, now, cutoffs[i])) diff --git a/python/cumulus/store/__init__.py b/python/cumulus/store/__init__.py new file mode 100644 index 0000000..5ebf0d7 --- /dev/null +++ b/python/cumulus/store/__init__.py @@ -0,0 +1,22 @@ +import re + +type_patterns = { + 'checksums': re.compile(r"^snapshot-(.*)\.(\w+)sums$"), + 'segments': re.compile(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$"), + 'snapshots': re.compile(r"^snapshot-(.*)\.lbs$") +} + +class Store: + """Base class for all cumulus storage backends.""" + + def list(self, type): + raise NotImplementedException + + def get(self, type, name): + raise NotImplementedException + + def put(self, type, name, fp): + raise NotImplementedException + + def delete(self, type, name): + raise NotImplementedException diff --git a/python/cumulus/store/s3.py b/python/cumulus/store/s3.py new file mode 100644 index 0000000..0453c21 --- /dev/null +++ b/python/cumulus/store/s3.py @@ -0,0 +1,38 @@ +import os, sys, tempfile +import boto +from boto.s3.bucket import Bucket +from boto.s3.key import Key + +import cumulus.store + +class S3Store(cumulus.store.Store): + def __init__(self, bucket, prefix): + self.conn = boto.connect_s3(is_secure=False) + self.bucket = self.conn.create_bucket(bucket) + while prefix.endswith("/"): prefix = prefix[:-1] + self.prefix = prefix + + def _get_key(self, type, name): + k = Key(self.bucket) + k.key = "%s/%s/%s" % (self.prefix, type, name) + return k + + def list(self, type): + prefix = "%s/%s/" % (self.prefix, type) + for i in self.bucket.list(prefix): + assert i.key.startswith(prefix) + yield i.key[len(prefix):] + + def get(self, type, name): + fp = tempfile.TemporaryFile() + k = self._get_key(type, name) + k.get_file(fp) + fp.seek(0) + return fp + + def put(self, type, name, fp): + k = self._get_key(type, name) + k.send_file(fp) + + def delete(self, type, name): + self.bucket.delete_key("%s/%s/%s" % (self.prefix, type, name))