1 """High-level interface for working with Cumulus archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of a Cumulus archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import hashlib, os, re, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 import cumulus.store, cumulus.store.file
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
23 # All segments which have been accessed this session.
24 accessed_segments = set()
26 # Table of methods used to filter segments before storage, and corresponding
27 # filename extensions. These are listed in priority order (methods earlier in
28 # the list are tried first).
30 (".gpg", "cumulus-filter-gpg --decrypt"),
32 (".bz2", "bzip2 -dc"),
36 """Decode a URI-encoded (%xx escapes) string."""
37 def hex_decode(m): return chr(int(m.group(1), 16))
38 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
40 """Encode a string to URI-encoded (%xx escapes) form."""
42 if c > '+' and c < '\x7f' and c != '@':
45 return "%%%02x" % (ord(c),)
46 return ''.join(hex_encode(c) for c in s)
49 """A class which merely acts as a data container.
51 Instances of this class (or its subclasses) are merely used to store data
52 in various attributes. No methods are provided.
56 return "<%s %s>" % (self.__class__, self.__dict__)
58 CHECKSUM_ALGORITHMS = {
60 'sha224': hashlib.sha224,
61 'sha256': hashlib.sha256,
64 class ChecksumCreator:
65 """Compute a Cumulus checksum for provided data.
67 The algorithm used is selectable, but currently defaults to sha1.
70 def __init__(self, algorithm='sha1'):
71 self.algorithm = algorithm
72 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
74 def update(self, data):
75 self.hash.update(data)
79 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
81 class ChecksumVerifier:
82 """Verify whether a checksum from a snapshot matches the supplied data."""
84 def __init__(self, checksumstr):
85 """Create an object to check the supplied checksum."""
87 (algo, checksum) = checksumstr.split("=", 1)
88 self.checksum = checksum
89 self.hash = CHECKSUM_ALGORITHMS[algo]()
91 def update(self, data):
92 self.hash.update(data)
95 """Return a boolean indicating whether the checksum matches."""
97 result = self.hash.hexdigest()
98 return result == self.checksum
100 class LowlevelDataStore:
101 """Access to the backup store containing segments and snapshot descriptors.
103 Instances of this class are used to get direct filesystem-level access to
104 the backup data. To read a backup, a caller will ordinarily not care about
105 direct access to backup segments, but will instead merely need to access
106 objects from those segments. The ObjectStore class provides a suitable
107 wrapper around a DataStore to give this high-level access.
110 def __init__(self, path):
111 if isinstance(path, cumulus.store.Store):
113 elif path.find(":") >= 0:
114 self.store = cumulus.store.open(path)
116 self.store = cumulus.store.file.FileStore(path)
118 def _classify(self, filename):
119 for (t, r) in cumulus.store.type_patterns.items():
120 if r.match(filename):
122 return (None, filename)
127 def lowlevel_open(self, filename):
128 """Return a file-like object for reading data from the given file."""
130 (type, filename) = self._classify(filename)
131 return self.store.get(type, filename)
133 def lowlevel_stat(self, filename):
134 """Return a dictionary of information about the given file.
136 Currently, the only defined field is 'size', giving the size of the
140 (type, filename) = self._classify(filename)
141 return self.store.stat(type, filename)
143 # Slightly higher-level list methods.
144 def list_snapshots(self):
145 for f in self.store.list('snapshots'):
146 m = cumulus.store.type_patterns['snapshots'].match(f)
147 if m: yield m.group(1)
149 def list_segments(self):
150 for f in self.store.list('segments'):
151 m = cumulus.store.type_patterns['segments'].match(f)
152 if m: yield m.group(1)
155 def __init__(self, data_store):
156 self.store = data_store
161 def get_cachedir(self):
162 if self.cachedir is None:
163 self.cachedir = tempfile.mkdtemp(".lbs")
167 if self.cachedir is not None:
168 # TODO: Avoid use of system, make this safer
169 os.system("rm -rf " + self.cachedir)
173 def parse_ref(refstr):
174 m = re.match(r"^zero\[(\d+)\]$", refstr)
176 return ("zero", None, None, (0, int(m.group(1))))
178 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
183 checksum = m.group(3)
186 if checksum is not None:
187 checksum = checksum.lstrip("(").rstrip(")")
189 if slice is not None:
190 if m.group(9) is not None:
191 # Size-assertion slice
192 slice = (0, int(m.group(9)), True)
193 elif m.group(6) is None:
195 slice = (0, int(m.group(8)), False)
197 slice = (int(m.group(7)), int(m.group(8)), False)
199 return (segment, object, checksum, slice)
201 def get_segment(self, segment):
202 accessed_segments.add(segment)
204 for (extension, filter) in SEGMENT_FILTERS:
206 raw = self.store.lowlevel_open(segment + ".tar" + extension)
208 (input, output) = os.popen2(filter)
209 def copy_thread(src, dst):
212 block = src.read(BLOCK_SIZE)
213 if len(block) == 0: break
217 thread.start_new_thread(copy_thread, (raw, input))
222 raise cumulus.store.NotFoundError
224 def load_segment(self, segment):
225 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
227 data_obj = seg.extractfile(item)
228 path = item.name.split('/')
229 if len(path) == 2 and path[0] == segment:
230 yield (path[1], data_obj.read())
232 def load_snapshot(self, snapshot):
233 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
234 return file.read().splitlines(True)
236 def extract_segment(self, segment):
237 segdir = os.path.join(self.get_cachedir(), segment)
239 for (object, data) in self.load_segment(segment):
240 f = open(os.path.join(segdir, object), 'wb')
244 def load_object(self, segment, object):
245 accessed_segments.add(segment)
246 path = os.path.join(self.get_cachedir(), segment, object)
247 if not os.access(path, os.R_OK):
248 self.extract_segment(segment)
249 if segment in self.lru_list: self.lru_list.remove(segment)
250 self.lru_list.append(segment)
251 while len(self.lru_list) > self.CACHE_SIZE:
252 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
253 self.lru_list = self.lru_list[1:]
254 return open(path, 'rb').read()
256 def get(self, refstr):
257 """Fetch the given object and return it.
259 The input should be an object reference, in string form.
262 (segment, object, checksum, slice) = self.parse_ref(refstr)
264 if segment == "zero":
265 return "\0" * slice[1]
267 data = self.load_object(segment, object)
269 if checksum is not None:
270 verifier = ChecksumVerifier(checksum)
271 verifier.update(data)
272 if not verifier.valid():
275 if slice is not None:
276 (start, length, exact) = slice
277 if exact and len(data) != length: raise ValueError
278 data = data[start:start+length]
279 if len(data) != length: raise IndexError
283 def parse(lines, terminate=None):
284 """Generic parser for RFC822-style "Key: Value" data streams.
286 This parser can be used to read metadata logs and snapshot root descriptor
289 lines must be an iterable object which yields a sequence of lines of input.
291 If terminate is specified, it is used as a predicate to determine when to
292 stop reading input lines.
299 # Strip off a trailing newline, if present
300 if len(l) > 0 and l[-1] == "\n":
303 if terminate is not None and terminate(l):
304 if len(dict) > 0: yield dict
309 m = re.match(r"^([-\w]+):\s*(.*)$", l)
311 dict[m.group(1)] = m.group(2)
312 last_key = m.group(1)
313 elif len(l) > 0 and l[0].isspace() and last_key is not None:
318 if len(dict) > 0: yield dict
320 def parse_full(lines):
322 return parse(lines).next()
323 except StopIteration:
326 def parse_metadata_version(s):
327 """Convert a string with the snapshot version format to a tuple."""
329 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
333 return tuple([int(d) for d in m.group(1).split(".")])
335 def read_metadata(object_store, root):
336 """Iterate through all lines in the metadata log, following references."""
338 # Stack for keeping track of recursion when following references to
339 # portions of the log. The last entry in the stack corresponds to the
340 # object currently being parsed. Each entry is a list of lines which have
341 # been reversed, so that popping successive lines from the end of each list
342 # will return lines of the metadata log in order.
345 def follow_ref(refstr):
346 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
347 lines = object_store.get(refstr).splitlines(True)
353 while len(stack) > 0:
360 # An indirect reference which we must follow?
361 if len(line) > 0 and line[0] == '@':
369 """Metadata for a single file (or directory or...) from a snapshot."""
371 # Functions for parsing various datatypes that can appear in a metadata log
375 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
376 if s.startswith("0x"):
378 elif s.startswith("0"):
385 """Decode a URI-encoded (%xx escapes) string."""
390 """An unecoded string."""
395 """Decode a user/group to a tuple of uid/gid followed by name."""
397 uid = MetadataItem.decode_int(items[0])
400 if items[1].startswith("(") and items[1].endswith(")"):
401 name = MetadataItem.decode_str(items[1][1:-1])
405 def decode_device(s):
406 """Decode a device major/minor number."""
407 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
408 return (major, minor)
412 def __init__(self, fields, object_store):
413 """Initialize from a dictionary of key/value pairs from metadata log."""
416 self.object_store = object_store
418 self.items = self.Items()
419 for (k, v) in fields.items():
420 if k in self.field_types:
421 decoder = self.field_types[k]
422 setattr(self.items, k, decoder(v))
426 """Return an iterator for the data blocks that make up a file."""
428 # This traverses the list of blocks that make up a file, following
429 # indirect references. It is implemented in much the same way as
430 # read_metadata, so see that function for details of the technique.
432 objects = self.fields['data'].split()
436 def follow_ref(refstr):
437 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
438 objects = self.object_store.get(refstr).split()
440 stack.append(objects)
442 while len(stack) > 0:
449 # An indirect reference which we must follow?
450 if len(ref) > 0 and ref[0] == '@':
455 # Description of fields that might appear, and how they should be parsed.
456 MetadataItem.field_types = {
457 'name': MetadataItem.decode_str,
458 'type': MetadataItem.raw_str,
459 'mode': MetadataItem.decode_int,
460 'device': MetadataItem.decode_device,
461 'user': MetadataItem.decode_user,
462 'group': MetadataItem.decode_user,
463 'ctime': MetadataItem.decode_int,
464 'mtime': MetadataItem.decode_int,
465 'links': MetadataItem.decode_int,
466 'inode': MetadataItem.raw_str,
467 'checksum': MetadataItem.decode_str,
468 'size': MetadataItem.decode_int,
469 'contents': MetadataItem.decode_str,
470 'target': MetadataItem.decode_str,
473 def iterate_metadata(object_store, root):
474 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
475 yield MetadataItem(d, object_store)
478 """Access to the local database of snapshot contents and object checksums.
480 The local database is consulted when creating a snapshot to determine what
481 data can be re-used from old snapshots. Segment cleaning is performed by
482 manipulating the data in the local database; the local database also
483 includes enough data to guide the segment cleaning process.
486 def __init__(self, path, dbname="localdb.sqlite"):
487 self.db_connection = sqlite3.connect(path + "/" + dbname)
489 # Low-level database access. Use these methods when there isn't a
490 # higher-level interface available. Exception: do, however, remember to
491 # use the commit() method after making changes to make sure they are
492 # actually saved, even when going through higher-level interfaces.
494 "Commit any pending changes to the local database."
495 self.db_connection.commit()
498 "Roll back any pending changes to the local database."
499 self.db_connection.rollback()
502 "Return a DB-API cursor for directly accessing the local database."
503 return self.db_connection.cursor()
505 def list_schemes(self):
506 """Return the list of snapshots found in the local database.
508 The returned value is a list of tuples (id, scheme, name, time, intent).
512 cur.execute("select distinct scheme from snapshots")
513 schemes = [row[0] for row in cur.fetchall()]
517 def garbage_collect(self, scheme, intent=1.0):
518 """Delete entries from old snapshots from the database.
520 Only snapshots with the specified scheme name will be deleted. If
521 intent is given, it gives the intended next snapshot type, to determine
522 how aggressively to clean (for example, intent=7 could be used if the
523 next snapshot will be a weekly snapshot).
528 # Find the id of the last snapshot to be created. This is used for
529 # measuring time in a way: we record this value in each segment we
530 # expire on this run, and then on a future run can tell if there have
531 # been intervening backups made.
532 cur.execute("select max(snapshotid) from snapshots")
533 last_snapshotid = cur.fetchone()[0]
535 # Get the list of old snapshots for this scheme. Delete all the old
536 # ones. Rules for what to keep:
537 # - Always keep the most recent snapshot.
538 # - If snapshot X is younger than Y, and X has higher intent, then Y
540 cur.execute("""select snapshotid, name, intent,
541 julianday('now') - timestamp as age
542 from snapshots where scheme = ?
543 order by age""", (scheme,))
547 for (id, name, snap_intent, snap_age) in cur.fetchall():
549 if snap_intent < max_intent:
550 # Delete small-intent snapshots if there is a more recent
551 # large-intent snapshot.
553 elif snap_intent == intent:
554 # Delete previous snapshots with the specified intent level.
557 if can_delete and not first:
558 print "Delete snapshot %d (%s)" % (id, name)
559 cur.execute("delete from snapshots where snapshotid = ?",
562 max_intent = max(max_intent, snap_intent)
564 # Delete entries in the segments_used table which are for non-existent
566 cur.execute("""delete from segments_used
567 where snapshotid not in
568 (select snapshotid from snapshots)""")
570 # Find segments which contain no objects used by any current snapshots,
571 # and delete them from the segment table.
572 cur.execute("""delete from segments where segmentid not in
573 (select segmentid from segments_used)""")
575 # Delete unused objects in the block_index table. By "unused", we mean
576 # any object which was stored in a segment which has been deleted, and
577 # any object in a segment which was marked for cleaning and has had
578 # cleaning performed already (the expired time is less than the current
579 # largest snapshot id).
580 cur.execute("""delete from block_index
581 where segmentid not in (select segmentid from segments)
582 or segmentid in (select segmentid from segments
583 where expire_time < ?)""",
586 # Remove sub-block signatures for deleted objects.
587 cur.execute("""delete from subblock_signatures
589 (select blockid from block_index)""")
592 class SegmentInfo(Struct): pass
594 def get_segment_cleaning_list(self, age_boost=0.0):
595 """Return a list of all current segments with information for cleaning.
597 Return all segments which are currently known in the local database
598 (there might be other, older segments in the archive itself), and
599 return usage statistics for each to help decide which segments to
602 The returned list will be sorted by estimated cleaning benefit, with
603 segments that are best to clean at the start of the list.
605 If specified, the age_boost parameter (measured in days) will added to
606 the age of each segment, as a way of adjusting the benefit computation
607 before a long-lived snapshot is taken (for example, age_boost might be
608 set to 7 when cleaning prior to taking a weekly snapshot).
613 cur.execute("""select segmentid, used, size, mtime,
614 julianday('now') - mtime as age from segment_info
615 where expire_time is null""")
617 info = self.SegmentInfo()
619 info.used_bytes = row[1]
620 info.size_bytes = row[2]
622 info.age_days = row[4]
624 # If data is not available for whatever reason, treat it as 0.0.
625 if info.age_days is None:
627 if info.used_bytes is None:
628 info.used_bytes = 0.0
630 # Benefit calculation: u is the estimated fraction of each segment
631 # which is utilized (bytes belonging to objects still in use
632 # divided by total size; this doesn't take compression or storage
633 # overhead into account, but should give a reasonable estimate).
635 # The total benefit is a heuristic that combines several factors:
636 # the amount of space that can be reclaimed (1 - u), an ageing
637 # factor (info.age_days) that favors cleaning old segments to young
638 # ones and also is more likely to clean segments that will be
639 # rewritten for long-lived snapshots (age_boost), and finally a
640 # penalty factor for the cost of re-uploading data (u + 0.1).
641 u = info.used_bytes / info.size_bytes
642 info.cleaning_benefit \
643 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
645 segments.append(info)
647 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
650 def mark_segment_expired(self, segment):
651 """Mark a segment for cleaning in the local database.
653 The segment parameter should be either a SegmentInfo object or an
654 integer segment id. Objects in the given segment will be marked as
655 expired, which means that any future snapshots that would re-use those
656 objects will instead write out a new copy of the object, and thus no
657 future snapshots will depend upon the given segment.
660 if isinstance(segment, int):
662 elif isinstance(segment, self.SegmentInfo):
665 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
668 cur.execute("select max(snapshotid) from snapshots")
669 last_snapshotid = cur.fetchone()[0]
670 cur.execute("update segments set expire_time = ? where segmentid = ?",
671 (last_snapshotid, id))
672 cur.execute("update block_index set expired = 0 where segmentid = ?",
675 def balance_expired_objects(self):
676 """Analyze expired objects in segments to be cleaned and group by age.
678 Update the block_index table of the local database to group expired
679 objects by age. The exact number of buckets and the cutoffs for each
680 are dynamically determined. Calling this function after marking
681 segments expired will help in the segment cleaning process, by ensuring
682 that when active objects from clean segments are rewritten, they will
683 be placed into new segments roughly grouped by age.
686 # The expired column of the block_index table is used when generating a
687 # new Cumulus snapshot. A null value indicates that an object may be
688 # re-used. Otherwise, an object must be written into a new segment if
689 # needed. Objects with distinct expired values will be written into
690 # distinct segments, to allow for some grouping by age. The value 0 is
691 # somewhat special in that it indicates any rewritten objects can be
692 # placed in the same segment as completely new objects; this can be
693 # used for very young objects which have been expired, or objects not
694 # expected to be encountered.
696 # In the balancing process, all objects which are not used in any
697 # current snapshots will have expired set to 0. Objects which have
698 # been seen will be sorted by age and will have expired values set to
699 # 0, 1, 2, and so on based on age (with younger objects being assigned
700 # lower values). The number of buckets and the age cutoffs is
701 # determined by looking at the distribution of block ages.
705 # Mark all expired objects with expired = 0; these objects will later
706 # have values set to indicate groupings of objects when repacking.
707 cur.execute("""update block_index set expired = 0
708 where expired is not null""")
710 # We will want to aim for at least one full segment for each bucket
711 # that we eventually create, but don't know how many bytes that should
712 # be due to compression. So compute the average number of bytes in
713 # each expired segment as a rough estimate for the minimum size of each
714 # bucket. (This estimate could be thrown off by many not-fully-packed
715 # segments, but for now don't worry too much about that.) If we can't
716 # compute an average, it's probably because there are no expired
717 # segments, so we have no more work to do.
718 cur.execute("""select avg(size) from segments
720 (select distinct segmentid from block_index
721 where expired is not null)""")
722 segment_size_estimate = cur.fetchone()[0]
723 if not segment_size_estimate:
726 # Next, extract distribution of expired objects (number and size) by
727 # age. Save the timestamp for "now" so that the classification of
728 # blocks into age buckets will not change later in the function, after
729 # time has passed. Set any timestamps in the future to now, so we are
730 # guaranteed that for the rest of this function, age is always
732 cur.execute("select julianday('now')")
733 now = cur.fetchone()[0]
735 cur.execute("""update block_index set timestamp = ?
736 where timestamp > ? and expired is not null""",
739 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
740 from block_index where expired = 0
741 group by age order by age""", (now,))
742 distribution = cur.fetchall()
744 # Start to determine the buckets for expired objects. Heuristics used:
745 # - An upper bound on the number of buckets is given by the number of
746 # segments we estimate it will take to store all data. In fact,
747 # aim for a couple of segments per bucket.
748 # - Place very young objects in bucket 0 (place with new objects)
749 # unless there are enough of them to warrant a separate bucket.
750 # - Try not to create unnecessarily many buckets, since fewer buckets
751 # will allow repacked data to be grouped based on spatial locality
752 # (while more buckets will group by temporal locality). We want a
755 total_bytes = sum([i[2] for i in distribution])
756 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
757 min_size = 1.5 * segment_size_estimate
758 target_size = max(2 * segment_size_estimate,
759 total_bytes / target_buckets)
761 print "segment_size:", segment_size_estimate
762 print "distribution:", distribution
763 print "total_bytes:", total_bytes
764 print "target_buckets:", target_buckets
765 print "min, target size:", min_size, target_size
767 # Chosen cutoffs. Each bucket consists of objects with age greater
768 # than one cutoff value, but not greater than the next largest cutoff.
771 # Starting with the oldest objects, begin grouping together into
772 # buckets of size at least target_size bytes.
773 distribution.reverse()
775 min_age_bucket = False
776 for (age, items, size) in distribution:
777 if bucket_size >= target_size \
778 or (age < MIN_AGE and not min_age_bucket):
779 if bucket_size < target_size and len(cutoffs) > 0:
786 min_age_bucket = True
788 # The last (youngest) bucket will be group 0, unless it has enough data
789 # to be of size min_size by itself, or there happen to be no objects
790 # less than MIN_AGE at all.
791 if bucket_size >= min_size or not min_age_bucket:
795 print "cutoffs:", cutoffs
797 # Update the database to assign each object to the appropriate bucket.
799 for i in range(len(cutoffs)):
800 cur.execute("""update block_index set expired = ?
801 where round(? - timestamp) > ?
802 and expired is not null""",
803 (i, now, cutoffs[i]))