1 """High-level interface for working with Cumulus archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of a Cumulus archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import hashlib, os, re, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 import cumulus.store, cumulus.store.file
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
23 # All segments which have been accessed this session.
24 accessed_segments = set()
26 # Table of methods used to filter segments before storage, and corresponding
27 # filename extensions. These are listed in priority order (methods earlier in
28 # the list are tried first).
30 (".gpg", "cumulus-filter-gpg --decrypt"),
32 (".bz2", "bzip2 -dc"),
36 """Decode a URI-encoded (%xx escapes) string."""
37 def hex_decode(m): return chr(int(m.group(1), 16))
38 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
40 """Encode a string to URI-encoded (%xx escapes) form."""
42 if c > '+' and c < '\x7f' and c != '@':
45 return "%%%02x" % (ord(c),)
46 return ''.join(hex_encode(c) for c in s)
49 """A class which merely acts as a data container.
51 Instances of this class (or its subclasses) are merely used to store data
52 in various attributes. No methods are provided.
56 return "<%s %s>" % (self.__class__, self.__dict__)
58 CHECKSUM_ALGORITHMS = {
60 'sha256': hashlib.sha256,
63 class ChecksumCreator:
64 """Compute a Cumulus checksum for provided data.
66 The algorithm used is selectable, but currently defaults to sha1.
69 def __init__(self, algorithm='sha1'):
70 self.algorithm = algorithm
71 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
73 def update(self, data):
74 self.hash.update(data)
78 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
80 class ChecksumVerifier:
81 """Verify whether a checksum from a snapshot matches the supplied data."""
83 def __init__(self, checksumstr):
84 """Create an object to check the supplied checksum."""
86 (algo, checksum) = checksumstr.split("=", 1)
87 self.checksum = checksum
88 self.hash = CHECKSUM_ALGORITHMS[algo]()
90 def update(self, data):
91 self.hash.update(data)
94 """Return a boolean indicating whether the checksum matches."""
96 result = self.hash.hexdigest()
97 return result == self.checksum
99 class LowlevelDataStore:
100 """Access to the backup store containing segments and snapshot descriptors.
102 Instances of this class are used to get direct filesystem-level access to
103 the backup data. To read a backup, a caller will ordinarily not care about
104 direct access to backup segments, but will instead merely need to access
105 objects from those segments. The ObjectStore class provides a suitable
106 wrapper around a DataStore to give this high-level access.
109 def __init__(self, path):
110 if isinstance(path, cumulus.store.Store):
112 elif path.find(":") >= 0:
113 self.store = cumulus.store.open(path)
115 self.store = cumulus.store.file.FileStore(path)
117 def _classify(self, filename):
118 for (t, r) in cumulus.store.type_patterns.items():
119 if r.match(filename):
121 return (None, filename)
126 def lowlevel_open(self, filename):
127 """Return a file-like object for reading data from the given file."""
129 (type, filename) = self._classify(filename)
130 return self.store.get(type, filename)
132 def lowlevel_stat(self, filename):
133 """Return a dictionary of information about the given file.
135 Currently, the only defined field is 'size', giving the size of the
139 (type, filename) = self._classify(filename)
140 return self.store.stat(type, filename)
142 # Slightly higher-level list methods.
143 def list_snapshots(self):
144 for f in self.store.list('snapshots'):
145 m = cumulus.store.type_patterns['snapshots'].match(f)
146 if m: yield m.group(1)
148 def list_segments(self):
149 for f in self.store.list('segments'):
150 m = cumulus.store.type_patterns['segments'].match(f)
151 if m: yield m.group(1)
154 def __init__(self, data_store):
155 self.store = data_store
160 def get_cachedir(self):
161 if self.cachedir is None:
162 self.cachedir = tempfile.mkdtemp(".lbs")
166 if self.cachedir is not None:
167 # TODO: Avoid use of system, make this safer
168 os.system("rm -rf " + self.cachedir)
172 def parse_ref(refstr):
173 m = re.match(r"^zero\[(\d+)\]$", refstr)
175 return ("zero", None, None, (0, int(m.group(1))))
177 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
182 checksum = m.group(3)
185 if checksum is not None:
186 checksum = checksum.lstrip("(").rstrip(")")
188 if slice is not None:
189 if m.group(9) is not None:
190 # Size-assertion slice
191 slice = (0, int(m.group(9)), True)
192 elif m.group(6) is None:
194 slice = (0, int(m.group(8)), False)
196 slice = (int(m.group(7)), int(m.group(8)), False)
198 return (segment, object, checksum, slice)
200 def get_segment(self, segment):
201 accessed_segments.add(segment)
203 for (extension, filter) in SEGMENT_FILTERS:
205 raw = self.store.lowlevel_open(segment + ".tar" + extension)
207 (input, output) = os.popen2(filter)
208 def copy_thread(src, dst):
211 block = src.read(BLOCK_SIZE)
212 if len(block) == 0: break
216 thread.start_new_thread(copy_thread, (raw, input))
221 raise cumulus.store.NotFoundError
223 def load_segment(self, segment):
224 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
226 data_obj = seg.extractfile(item)
227 path = item.name.split('/')
228 if len(path) == 2 and path[0] == segment:
229 yield (path[1], data_obj.read())
231 def load_snapshot(self, snapshot):
232 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
233 return file.read().splitlines(True)
235 def extract_segment(self, segment):
236 segdir = os.path.join(self.get_cachedir(), segment)
238 for (object, data) in self.load_segment(segment):
239 f = open(os.path.join(segdir, object), 'wb')
243 def load_object(self, segment, object):
244 accessed_segments.add(segment)
245 path = os.path.join(self.get_cachedir(), segment, object)
246 if not os.access(path, os.R_OK):
247 self.extract_segment(segment)
248 if segment in self.lru_list: self.lru_list.remove(segment)
249 self.lru_list.append(segment)
250 while len(self.lru_list) > self.CACHE_SIZE:
251 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
252 self.lru_list = self.lru_list[1:]
253 return open(path, 'rb').read()
255 def get(self, refstr):
256 """Fetch the given object and return it.
258 The input should be an object reference, in string form.
261 (segment, object, checksum, slice) = self.parse_ref(refstr)
263 if segment == "zero":
264 return "\0" * slice[1]
266 data = self.load_object(segment, object)
268 if checksum is not None:
269 verifier = ChecksumVerifier(checksum)
270 verifier.update(data)
271 if not verifier.valid():
274 if slice is not None:
275 (start, length, exact) = slice
276 if exact and len(data) != length: raise ValueError
277 data = data[start:start+length]
278 if len(data) != length: raise IndexError
282 def parse(lines, terminate=None):
283 """Generic parser for RFC822-style "Key: Value" data streams.
285 This parser can be used to read metadata logs and snapshot root descriptor
288 lines must be an iterable object which yields a sequence of lines of input.
290 If terminate is specified, it is used as a predicate to determine when to
291 stop reading input lines.
298 # Strip off a trailing newline, if present
299 if len(l) > 0 and l[-1] == "\n":
302 if terminate is not None and terminate(l):
303 if len(dict) > 0: yield dict
308 m = re.match(r"^([-\w]+):\s*(.*)$", l)
310 dict[m.group(1)] = m.group(2)
311 last_key = m.group(1)
312 elif len(l) > 0 and l[0].isspace() and last_key is not None:
317 if len(dict) > 0: yield dict
319 def parse_full(lines):
321 return parse(lines).next()
322 except StopIteration:
325 def parse_metadata_version(s):
326 """Convert a string with the snapshot version format to a tuple."""
328 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
332 return tuple([int(d) for d in m.group(1).split(".")])
334 def read_metadata(object_store, root):
335 """Iterate through all lines in the metadata log, following references."""
337 # Stack for keeping track of recursion when following references to
338 # portions of the log. The last entry in the stack corresponds to the
339 # object currently being parsed. Each entry is a list of lines which have
340 # been reversed, so that popping successive lines from the end of each list
341 # will return lines of the metadata log in order.
344 def follow_ref(refstr):
345 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
346 lines = object_store.get(refstr).splitlines(True)
352 while len(stack) > 0:
359 # An indirect reference which we must follow?
360 if len(line) > 0 and line[0] == '@':
368 """Metadata for a single file (or directory or...) from a snapshot."""
370 # Functions for parsing various datatypes that can appear in a metadata log
374 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
375 if s.startswith("0x"):
377 elif s.startswith("0"):
384 """Decode a URI-encoded (%xx escapes) string."""
389 """An unecoded string."""
394 """Decode a user/group to a tuple of uid/gid followed by name."""
396 uid = MetadataItem.decode_int(items[0])
399 if items[1].startswith("(") and items[1].endswith(")"):
400 name = MetadataItem.decode_str(items[1][1:-1])
404 def decode_device(s):
405 """Decode a device major/minor number."""
406 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
407 return (major, minor)
411 def __init__(self, fields, object_store):
412 """Initialize from a dictionary of key/value pairs from metadata log."""
415 self.object_store = object_store
417 self.items = self.Items()
418 for (k, v) in fields.items():
419 if k in self.field_types:
420 decoder = self.field_types[k]
421 setattr(self.items, k, decoder(v))
425 """Return an iterator for the data blocks that make up a file."""
427 # This traverses the list of blocks that make up a file, following
428 # indirect references. It is implemented in much the same way as
429 # read_metadata, so see that function for details of the technique.
431 objects = self.fields['data'].split()
435 def follow_ref(refstr):
436 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
437 objects = self.object_store.get(refstr).split()
439 stack.append(objects)
441 while len(stack) > 0:
448 # An indirect reference which we must follow?
449 if len(ref) > 0 and ref[0] == '@':
454 # Description of fields that might appear, and how they should be parsed.
455 MetadataItem.field_types = {
456 'name': MetadataItem.decode_str,
457 'type': MetadataItem.raw_str,
458 'mode': MetadataItem.decode_int,
459 'device': MetadataItem.decode_device,
460 'user': MetadataItem.decode_user,
461 'group': MetadataItem.decode_user,
462 'ctime': MetadataItem.decode_int,
463 'mtime': MetadataItem.decode_int,
464 'links': MetadataItem.decode_int,
465 'inode': MetadataItem.raw_str,
466 'checksum': MetadataItem.decode_str,
467 'size': MetadataItem.decode_int,
468 'contents': MetadataItem.decode_str,
469 'target': MetadataItem.decode_str,
472 def iterate_metadata(object_store, root):
473 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
474 yield MetadataItem(d, object_store)
477 """Access to the local database of snapshot contents and object checksums.
479 The local database is consulted when creating a snapshot to determine what
480 data can be re-used from old snapshots. Segment cleaning is performed by
481 manipulating the data in the local database; the local database also
482 includes enough data to guide the segment cleaning process.
485 def __init__(self, path, dbname="localdb.sqlite"):
486 self.db_connection = sqlite3.connect(path + "/" + dbname)
488 # Low-level database access. Use these methods when there isn't a
489 # higher-level interface available. Exception: do, however, remember to
490 # use the commit() method after making changes to make sure they are
491 # actually saved, even when going through higher-level interfaces.
493 "Commit any pending changes to the local database."
494 self.db_connection.commit()
497 "Roll back any pending changes to the local database."
498 self.db_connection.rollback()
501 "Return a DB-API cursor for directly accessing the local database."
502 return self.db_connection.cursor()
504 def list_schemes(self):
505 """Return the list of snapshots found in the local database.
507 The returned value is a list of tuples (id, scheme, name, time, intent).
511 cur.execute("select distinct scheme from snapshots")
512 schemes = [row[0] for row in cur.fetchall()]
516 def garbage_collect(self, scheme, intent=1.0):
517 """Delete entries from old snapshots from the database.
519 Only snapshots with the specified scheme name will be deleted. If
520 intent is given, it gives the intended next snapshot type, to determine
521 how aggressively to clean (for example, intent=7 could be used if the
522 next snapshot will be a weekly snapshot).
527 # Find the id of the last snapshot to be created. This is used for
528 # measuring time in a way: we record this value in each segment we
529 # expire on this run, and then on a future run can tell if there have
530 # been intervening backups made.
531 cur.execute("select max(snapshotid) from snapshots")
532 last_snapshotid = cur.fetchone()[0]
534 # Get the list of old snapshots for this scheme. Delete all the old
535 # ones. Rules for what to keep:
536 # - Always keep the most recent snapshot.
537 # - If snapshot X is younger than Y, and X has higher intent, then Y
539 cur.execute("""select snapshotid, name, intent,
540 julianday('now') - timestamp as age
541 from snapshots where scheme = ?
542 order by age""", (scheme,))
546 for (id, name, snap_intent, snap_age) in cur.fetchall():
548 if snap_intent < max_intent:
549 # Delete small-intent snapshots if there is a more recent
550 # large-intent snapshot.
552 elif snap_intent == intent:
553 # Delete previous snapshots with the specified intent level.
556 if can_delete and not first:
557 print "Delete snapshot %d (%s)" % (id, name)
558 cur.execute("delete from snapshots where snapshotid = ?",
561 max_intent = max(max_intent, snap_intent)
563 # Delete entries in the segments_used table which are for non-existent
565 cur.execute("""delete from segments_used
566 where snapshotid not in
567 (select snapshotid from snapshots)""")
569 # Find segments which contain no objects used by any current snapshots,
570 # and delete them from the segment table.
571 cur.execute("""delete from segments where segmentid not in
572 (select segmentid from segments_used)""")
574 # Delete unused objects in the block_index table. By "unused", we mean
575 # any object which was stored in a segment which has been deleted, and
576 # any object in a segment which was marked for cleaning and has had
577 # cleaning performed already (the expired time is less than the current
578 # largest snapshot id).
579 cur.execute("""delete from block_index
580 where segmentid not in (select segmentid from segments)
581 or segmentid in (select segmentid from segments
582 where expire_time < ?)""",
585 # Remove sub-block signatures for deleted objects.
586 cur.execute("""delete from subblock_signatures
588 (select blockid from block_index)""")
591 class SegmentInfo(Struct): pass
593 def get_segment_cleaning_list(self, age_boost=0.0):
594 """Return a list of all current segments with information for cleaning.
596 Return all segments which are currently known in the local database
597 (there might be other, older segments in the archive itself), and
598 return usage statistics for each to help decide which segments to
601 The returned list will be sorted by estimated cleaning benefit, with
602 segments that are best to clean at the start of the list.
604 If specified, the age_boost parameter (measured in days) will added to
605 the age of each segment, as a way of adjusting the benefit computation
606 before a long-lived snapshot is taken (for example, age_boost might be
607 set to 7 when cleaning prior to taking a weekly snapshot).
612 cur.execute("""select segmentid, used, size, mtime,
613 julianday('now') - mtime as age from segment_info
614 where expire_time is null""")
616 info = self.SegmentInfo()
618 info.used_bytes = row[1]
619 info.size_bytes = row[2]
621 info.age_days = row[4]
623 # If data is not available for whatever reason, treat it as 0.0.
624 if info.age_days is None:
626 if info.used_bytes is None:
627 info.used_bytes = 0.0
629 # Benefit calculation: u is the estimated fraction of each segment
630 # which is utilized (bytes belonging to objects still in use
631 # divided by total size; this doesn't take compression or storage
632 # overhead into account, but should give a reasonable estimate).
634 # The total benefit is a heuristic that combines several factors:
635 # the amount of space that can be reclaimed (1 - u), an ageing
636 # factor (info.age_days) that favors cleaning old segments to young
637 # ones and also is more likely to clean segments that will be
638 # rewritten for long-lived snapshots (age_boost), and finally a
639 # penalty factor for the cost of re-uploading data (u + 0.1).
640 u = info.used_bytes / info.size_bytes
641 info.cleaning_benefit \
642 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
644 segments.append(info)
646 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
649 def mark_segment_expired(self, segment):
650 """Mark a segment for cleaning in the local database.
652 The segment parameter should be either a SegmentInfo object or an
653 integer segment id. Objects in the given segment will be marked as
654 expired, which means that any future snapshots that would re-use those
655 objects will instead write out a new copy of the object, and thus no
656 future snapshots will depend upon the given segment.
659 if isinstance(segment, int):
661 elif isinstance(segment, self.SegmentInfo):
664 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
667 cur.execute("select max(snapshotid) from snapshots")
668 last_snapshotid = cur.fetchone()[0]
669 cur.execute("update segments set expire_time = ? where segmentid = ?",
670 (last_snapshotid, id))
671 cur.execute("update block_index set expired = 0 where segmentid = ?",
674 def balance_expired_objects(self):
675 """Analyze expired objects in segments to be cleaned and group by age.
677 Update the block_index table of the local database to group expired
678 objects by age. The exact number of buckets and the cutoffs for each
679 are dynamically determined. Calling this function after marking
680 segments expired will help in the segment cleaning process, by ensuring
681 that when active objects from clean segments are rewritten, they will
682 be placed into new segments roughly grouped by age.
685 # The expired column of the block_index table is used when generating a
686 # new Cumulus snapshot. A null value indicates that an object may be
687 # re-used. Otherwise, an object must be written into a new segment if
688 # needed. Objects with distinct expired values will be written into
689 # distinct segments, to allow for some grouping by age. The value 0 is
690 # somewhat special in that it indicates any rewritten objects can be
691 # placed in the same segment as completely new objects; this can be
692 # used for very young objects which have been expired, or objects not
693 # expected to be encountered.
695 # In the balancing process, all objects which are not used in any
696 # current snapshots will have expired set to 0. Objects which have
697 # been seen will be sorted by age and will have expired values set to
698 # 0, 1, 2, and so on based on age (with younger objects being assigned
699 # lower values). The number of buckets and the age cutoffs is
700 # determined by looking at the distribution of block ages.
704 # Mark all expired objects with expired = 0; these objects will later
705 # have values set to indicate groupings of objects when repacking.
706 cur.execute("""update block_index set expired = 0
707 where expired is not null""")
709 # We will want to aim for at least one full segment for each bucket
710 # that we eventually create, but don't know how many bytes that should
711 # be due to compression. So compute the average number of bytes in
712 # each expired segment as a rough estimate for the minimum size of each
713 # bucket. (This estimate could be thrown off by many not-fully-packed
714 # segments, but for now don't worry too much about that.) If we can't
715 # compute an average, it's probably because there are no expired
716 # segments, so we have no more work to do.
717 cur.execute("""select avg(size) from segments
719 (select distinct segmentid from block_index
720 where expired is not null)""")
721 segment_size_estimate = cur.fetchone()[0]
722 if not segment_size_estimate:
725 # Next, extract distribution of expired objects (number and size) by
726 # age. Save the timestamp for "now" so that the classification of
727 # blocks into age buckets will not change later in the function, after
728 # time has passed. Set any timestamps in the future to now, so we are
729 # guaranteed that for the rest of this function, age is always
731 cur.execute("select julianday('now')")
732 now = cur.fetchone()[0]
734 cur.execute("""update block_index set timestamp = ?
735 where timestamp > ? and expired is not null""",
738 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
739 from block_index where expired = 0
740 group by age order by age""", (now,))
741 distribution = cur.fetchall()
743 # Start to determine the buckets for expired objects. Heuristics used:
744 # - An upper bound on the number of buckets is given by the number of
745 # segments we estimate it will take to store all data. In fact,
746 # aim for a couple of segments per bucket.
747 # - Place very young objects in bucket 0 (place with new objects)
748 # unless there are enough of them to warrant a separate bucket.
749 # - Try not to create unnecessarily many buckets, since fewer buckets
750 # will allow repacked data to be grouped based on spatial locality
751 # (while more buckets will group by temporal locality). We want a
754 total_bytes = sum([i[2] for i in distribution])
755 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
756 min_size = 1.5 * segment_size_estimate
757 target_size = max(2 * segment_size_estimate,
758 total_bytes / target_buckets)
760 print "segment_size:", segment_size_estimate
761 print "distribution:", distribution
762 print "total_bytes:", total_bytes
763 print "target_buckets:", target_buckets
764 print "min, target size:", min_size, target_size
766 # Chosen cutoffs. Each bucket consists of objects with age greater
767 # than one cutoff value, but not greater than the next largest cutoff.
770 # Starting with the oldest objects, begin grouping together into
771 # buckets of size at least target_size bytes.
772 distribution.reverse()
774 min_age_bucket = False
775 for (age, items, size) in distribution:
776 if bucket_size >= target_size \
777 or (age < MIN_AGE and not min_age_bucket):
778 if bucket_size < target_size and len(cutoffs) > 0:
785 min_age_bucket = True
787 # The last (youngest) bucket will be group 0, unless it has enough data
788 # to be of size min_size by itself, or there happen to be no objects
789 # less than MIN_AGE at all.
790 if bucket_size >= min_size or not min_age_bucket:
794 print "cutoffs:", cutoffs
796 # Update the database to assign each object to the appropriate bucket.
798 for i in range(len(cutoffs)):
799 cur.execute("""update block_index set expired = ?
800 where round(? - timestamp) > ?
801 and expired is not null""",
802 (i, now, cutoffs[i]))