1 """High-level interface for working with LBS archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 import cumulus.store, cumulus.store.file
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 8) # LBS Snapshot v0.8
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
23 # All segments which have been accessed this session.
24 accessed_segments = set()
26 # Table of methods used to filter segments before storage, and corresponding
27 # filename extensions. These are listed in priority order (methods earlier in
28 # the list are tried first).
30 (".gpg", "cumulus-filter-gpg --decrypt"),
32 (".bz2", "bzip2 -dc"),
36 """Decode a URI-encoded (%xx escapes) string."""
37 def hex_decode(m): return chr(int(m.group(1), 16))
38 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
40 """Encode a string to URI-encoded (%xx escapes) form."""
42 if c > '+' and c < '\x7f' and c != '@':
45 return "%%%02x" % (ord(c),)
46 return ''.join(hex_encode(c) for c in s)
49 """A class which merely acts as a data container.
51 Instances of this class (or its subclasses) are merely used to store data
52 in various attributes. No methods are provided.
56 return "<%s %s>" % (self.__class__, self.__dict__)
58 CHECKSUM_ALGORITHMS = {
62 class ChecksumCreator:
63 """Compute an LBS checksum for provided data.
65 The algorithm used is selectable, but currently defaults to sha1.
68 def __init__(self, algorithm='sha1'):
69 self.algorithm = algorithm
70 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
72 def update(self, data):
73 self.hash.update(data)
77 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
79 class ChecksumVerifier:
80 """Verify whether a checksum from a snapshot matches the supplied data."""
82 def __init__(self, checksumstr):
83 """Create an object to check the supplied checksum."""
85 (algo, checksum) = checksumstr.split("=", 1)
86 self.checksum = checksum
87 self.hash = CHECKSUM_ALGORITHMS[algo]()
89 def update(self, data):
90 self.hash.update(data)
93 """Return a boolean indicating whether the checksum matches."""
95 result = self.hash.hexdigest()
96 return result == self.checksum
98 class LowlevelDataStore:
99 """Access to the backup store containing segments and snapshot descriptors.
101 Instances of this class are used to get direct filesystem-level access to
102 the backup data. To read a backup, a caller will ordinarily not care about
103 direct access to backup segments, but will instead merely need to access
104 objects from those segments. The ObjectStore class provides a suitable
105 wrapper around a DataStore to give this high-level access.
108 def __init__(self, path):
109 if isinstance(path, cumulus.store.Store):
111 elif path.find(":") >= 0:
112 self.store = cumulus.store.open(path)
114 self.store = cumulus.store.file.FileStore(path)
116 def _classify(self, filename):
117 for (t, r) in cumulus.store.type_patterns.items():
118 if r.match(filename):
120 return (None, filename)
122 def lowlevel_open(self, filename):
123 """Return a file-like object for reading data from the given file."""
125 (type, filename) = self._classify(filename)
126 return self.store.get(type, filename)
128 def lowlevel_stat(self, filename):
129 """Return a dictionary of information about the given file.
131 Currently, the only defined field is 'size', giving the size of the
135 (type, filename) = self._classify(filename)
136 return self.store.stat(type, filename)
138 # Slightly higher-level list methods.
139 def list_snapshots(self):
140 for f in self.store.list('snapshots'):
141 m = cumulus.store.type_patterns['snapshots'].match(f)
142 if m: yield m.group(1)
144 def list_segments(self):
145 for f in self.store.list('segments'):
146 m = cumulus.store.type_patterns['segments'].match(f)
147 if m: yield m.group(1)
150 def __init__(self, data_store):
151 self.store = data_store
156 def get_cachedir(self):
157 if self.cachedir is None:
158 self.cachedir = tempfile.mkdtemp(".lbs")
162 if self.cachedir is not None:
163 # TODO: Avoid use of system, make this safer
164 os.system("rm -rf " + self.cachedir)
168 def parse_ref(refstr):
169 m = re.match(r"^zero\[(\d+)\]$", refstr)
171 return ("zero", None, None, (0, int(m.group(1))))
173 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
178 checksum = m.group(3)
181 if checksum is not None:
182 checksum = checksum.lstrip("(").rstrip(")")
184 if slice is not None:
185 if m.group(9) is not None:
186 # Size-assertion slice
187 slice = (0, int(m.group(9)), True)
188 elif m.group(6) is None:
190 slice = (0, int(m.group(8)), False)
192 slice = (int(m.group(7)), int(m.group(8)), False)
194 return (segment, object, checksum, slice)
196 def get_segment(self, segment):
197 accessed_segments.add(segment)
199 for (extension, filter) in SEGMENT_FILTERS:
201 raw = self.store.lowlevel_open(segment + ".tar" + extension)
203 (input, output) = os.popen2(filter)
204 def copy_thread(src, dst):
207 block = src.read(BLOCK_SIZE)
208 if len(block) == 0: break
212 thread.start_new_thread(copy_thread, (raw, input))
217 raise cumulus.store.NotFoundError
219 def load_segment(self, segment):
220 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
222 data_obj = seg.extractfile(item)
223 path = item.name.split('/')
224 if len(path) == 2 and path[0] == segment:
225 yield (path[1], data_obj.read())
227 def load_snapshot(self, snapshot):
228 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
229 return file.read().splitlines(True)
231 def extract_segment(self, segment):
232 segdir = os.path.join(self.get_cachedir(), segment)
234 for (object, data) in self.load_segment(segment):
235 f = open(os.path.join(segdir, object), 'wb')
239 def load_object(self, segment, object):
240 accessed_segments.add(segment)
241 path = os.path.join(self.get_cachedir(), segment, object)
242 if not os.access(path, os.R_OK):
243 self.extract_segment(segment)
244 if segment in self.lru_list: self.lru_list.remove(segment)
245 self.lru_list.append(segment)
246 while len(self.lru_list) > self.CACHE_SIZE:
247 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
248 self.lru_list = self.lru_list[1:]
249 return open(path, 'rb').read()
251 def get(self, refstr):
252 """Fetch the given object and return it.
254 The input should be an object reference, in string form.
257 (segment, object, checksum, slice) = self.parse_ref(refstr)
259 if segment == "zero":
260 return "\0" * slice[1]
262 data = self.load_object(segment, object)
264 if checksum is not None:
265 verifier = ChecksumVerifier(checksum)
266 verifier.update(data)
267 if not verifier.valid():
270 if slice is not None:
271 (start, length, exact) = slice
272 if exact and len(data) != length: raise ValueError
273 data = data[start:start+length]
274 if len(data) != length: raise IndexError
278 def parse(lines, terminate=None):
279 """Generic parser for RFC822-style "Key: Value" data streams.
281 This parser can be used to read metadata logs and snapshot root descriptor
284 lines must be an iterable object which yields a sequence of lines of input.
286 If terminate is specified, it is used as a predicate to determine when to
287 stop reading input lines.
294 # Strip off a trailing newline, if present
295 if len(l) > 0 and l[-1] == "\n":
298 if terminate is not None and terminate(l):
299 if len(dict) > 0: yield dict
304 m = re.match(r"^([-\w]+):\s*(.*)$", l)
306 dict[m.group(1)] = m.group(2)
307 last_key = m.group(1)
308 elif len(l) > 0 and l[0].isspace() and last_key is not None:
313 if len(dict) > 0: yield dict
315 def parse_full(lines):
317 return parse(lines).next()
318 except StopIteration:
321 def parse_metadata_version(s):
322 """Convert a string with the snapshot version format to a tuple."""
324 m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
328 return tuple([int(d) for d in m.group(1).split(".")])
330 def read_metadata(object_store, root):
331 """Iterate through all lines in the metadata log, following references."""
333 # Stack for keeping track of recursion when following references to
334 # portions of the log. The last entry in the stack corresponds to the
335 # object currently being parsed. Each entry is a list of lines which have
336 # been reversed, so that popping successive lines from the end of each list
337 # will return lines of the metadata log in order.
340 def follow_ref(refstr):
341 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
342 lines = object_store.get(refstr).splitlines(True)
348 while len(stack) > 0:
355 # An indirect reference which we must follow?
356 if len(line) > 0 and line[0] == '@':
364 """Metadata for a single file (or directory or...) from a snapshot."""
366 # Functions for parsing various datatypes that can appear in a metadata log
370 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
371 if s.startswith("0x"):
373 elif s.startswith("0"):
380 """Decode a URI-encoded (%xx escapes) string."""
385 """An unecoded string."""
390 """Decode a user/group to a tuple of uid/gid followed by name."""
392 uid = MetadataItem.decode_int(items[0])
395 if items[1].startswith("(") and items[1].endswith(")"):
396 name = MetadataItem.decode_str(items[1][1:-1])
400 def decode_device(s):
401 """Decode a device major/minor number."""
402 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
403 return (major, minor)
407 def __init__(self, fields, object_store):
408 """Initialize from a dictionary of key/value pairs from metadata log."""
411 self.object_store = object_store
413 self.items = self.Items()
414 for (k, v) in fields.items():
415 if k in self.field_types:
416 decoder = self.field_types[k]
417 setattr(self.items, k, decoder(v))
421 """Return an iterator for the data blocks that make up a file."""
423 # This traverses the list of blocks that make up a file, following
424 # indirect references. It is implemented in much the same way as
425 # read_metadata, so see that function for details of the technique.
427 objects = self.fields['data'].split()
431 def follow_ref(refstr):
432 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
433 objects = self.object_store.get(refstr).split()
435 stack.append(objects)
437 while len(stack) > 0:
444 # An indirect reference which we must follow?
445 if len(ref) > 0 and ref[0] == '@':
450 # Description of fields that might appear, and how they should be parsed.
451 MetadataItem.field_types = {
452 'name': MetadataItem.decode_str,
453 'type': MetadataItem.raw_str,
454 'mode': MetadataItem.decode_int,
455 'device': MetadataItem.decode_device,
456 'user': MetadataItem.decode_user,
457 'group': MetadataItem.decode_user,
458 'ctime': MetadataItem.decode_int,
459 'mtime': MetadataItem.decode_int,
460 'links': MetadataItem.decode_int,
461 'inode': MetadataItem.raw_str,
462 'checksum': MetadataItem.decode_str,
463 'size': MetadataItem.decode_int,
464 'contents': MetadataItem.decode_str,
465 'target': MetadataItem.decode_str,
468 def iterate_metadata(object_store, root):
469 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
470 yield MetadataItem(d, object_store)
473 """Access to the local database of snapshot contents and object checksums.
475 The local database is consulted when creating a snapshot to determine what
476 data can be re-used from old snapshots. Segment cleaning is performed by
477 manipulating the data in the local database; the local database also
478 includes enough data to guide the segment cleaning process.
481 def __init__(self, path, dbname="localdb.sqlite"):
482 self.db_connection = sqlite3.connect(path + "/" + dbname)
484 # Low-level database access. Use these methods when there isn't a
485 # higher-level interface available. Exception: do, however, remember to
486 # use the commit() method after making changes to make sure they are
487 # actually saved, even when going through higher-level interfaces.
489 "Commit any pending changes to the local database."
490 self.db_connection.commit()
493 "Roll back any pending changes to the local database."
494 self.db_connection.rollback()
497 "Return a DB-API cursor for directly accessing the local database."
498 return self.db_connection.cursor()
500 def list_schemes(self):
501 """Return the list of snapshots found in the local database.
503 The returned value is a list of tuples (id, scheme, name, time, intent).
507 cur.execute("select distinct scheme from snapshots")
508 schemes = [row[0] for row in cur.fetchall()]
512 def garbage_collect(self, scheme, intent=1.0):
513 """Delete entries from old snapshots from the database.
515 Only snapshots with the specified scheme name will be deleted. If
516 intent is given, it gives the intended next snapshot type, to determine
517 how aggressively to clean (for example, intent=7 could be used if the
518 next snapshot will be a weekly snapshot).
523 # Find the id of the last snapshot to be created. This is used for
524 # measuring time in a way: we record this value in each segment we
525 # expire on this run, and then on a future run can tell if there have
526 # been intervening backups made.
527 cur.execute("select max(snapshotid) from snapshots")
528 last_snapshotid = cur.fetchone()[0]
530 # Get the list of old snapshots for this scheme. Delete all the old
531 # ones. Rules for what to keep:
532 # - Always keep the most recent snapshot.
533 # - If snapshot X is younger than Y, and X has higher intent, then Y
535 cur.execute("""select snapshotid, name, intent,
536 julianday('now') - timestamp as age
537 from snapshots where scheme = ?
538 order by age""", (scheme,))
542 for (id, name, snap_intent, snap_age) in cur.fetchall():
544 if snap_intent < max_intent:
545 # Delete small-intent snapshots if there is a more recent
546 # large-intent snapshot.
548 elif snap_intent == intent:
549 # Delete previous snapshots with the specified intent level.
552 if can_delete and not first:
553 print "Delete snapshot %d (%s)" % (id, name)
554 cur.execute("delete from snapshots where snapshotid = ?",
557 max_intent = max(max_intent, snap_intent)
559 # Delete entries in the segments_used table which are for non-existent
561 cur.execute("""delete from segments_used
562 where snapshotid not in
563 (select snapshotid from snapshots)""")
565 # Find segments which contain no objects used by any current snapshots,
566 # and delete them from the segment table.
567 cur.execute("""delete from segments where segmentid not in
568 (select segmentid from segments_used)""")
570 # Delete unused objects in the block_index table. By "unused", we mean
571 # any object which was stored in a segment which has been deleted, and
572 # any object in a segment which was marked for cleaning and has had
573 # cleaning performed already (the expired time is less than the current
574 # largest snapshot id).
575 cur.execute("""delete from block_index
576 where segmentid not in (select segmentid from segments)
577 or segmentid in (select segmentid from segments
578 where expire_time < ?)""",
581 # Remove sub-block signatures for deleted objects.
582 cur.execute("""delete from subblock_signatures
584 (select blockid from block_index)""")
587 class SegmentInfo(Struct): pass
589 def get_segment_cleaning_list(self, age_boost=0.0):
590 """Return a list of all current segments with information for cleaning.
592 Return all segments which are currently known in the local database
593 (there might be other, older segments in the archive itself), and
594 return usage statistics for each to help decide which segments to
597 The returned list will be sorted by estimated cleaning benefit, with
598 segments that are best to clean at the start of the list.
600 If specified, the age_boost parameter (measured in days) will added to
601 the age of each segment, as a way of adjusting the benefit computation
602 before a long-lived snapshot is taken (for example, age_boost might be
603 set to 7 when cleaning prior to taking a weekly snapshot).
608 cur.execute("""select segmentid, used, size, mtime,
609 julianday('now') - mtime as age from segment_info
610 where expire_time is null""")
612 info = self.SegmentInfo()
614 info.used_bytes = row[1]
615 info.size_bytes = row[2]
617 info.age_days = row[4]
619 # If data is not available for whatever reason, treat it as 0.0.
620 if info.age_days is None:
622 if info.used_bytes is None:
623 info.used_bytes = 0.0
625 # Benefit calculation: u is the estimated fraction of each segment
626 # which is utilized (bytes belonging to objects still in use
627 # divided by total size; this doesn't take compression or storage
628 # overhead into account, but should give a reasonable estimate).
630 # The total benefit is a heuristic that combines several factors:
631 # the amount of space that can be reclaimed (1 - u), an ageing
632 # factor (info.age_days) that favors cleaning old segments to young
633 # ones and also is more likely to clean segments that will be
634 # rewritten for long-lived snapshots (age_boost), and finally a
635 # penalty factor for the cost of re-uploading data (u + 0.1).
636 u = info.used_bytes / info.size_bytes
637 info.cleaning_benefit \
638 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
640 segments.append(info)
642 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
645 def mark_segment_expired(self, segment):
646 """Mark a segment for cleaning in the local database.
648 The segment parameter should be either a SegmentInfo object or an
649 integer segment id. Objects in the given segment will be marked as
650 expired, which means that any future snapshots that would re-use those
651 objects will instead write out a new copy of the object, and thus no
652 future snapshots will depend upon the given segment.
655 if isinstance(segment, int):
657 elif isinstance(segment, self.SegmentInfo):
660 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
663 cur.execute("select max(snapshotid) from snapshots")
664 last_snapshotid = cur.fetchone()[0]
665 cur.execute("update segments set expire_time = ? where segmentid = ?",
666 (last_snapshotid, id))
667 cur.execute("update block_index set expired = 0 where segmentid = ?",
670 def balance_expired_objects(self):
671 """Analyze expired objects in segments to be cleaned and group by age.
673 Update the block_index table of the local database to group expired
674 objects by age. The exact number of buckets and the cutoffs for each
675 are dynamically determined. Calling this function after marking
676 segments expired will help in the segment cleaning process, by ensuring
677 that when active objects from clean segments are rewritten, they will
678 be placed into new segments roughly grouped by age.
681 # The expired column of the block_index table is used when generating a
682 # new LBS snapshot. A null value indicates that an object may be
683 # re-used. Otherwise, an object must be written into a new segment if
684 # needed. Objects with distinct expired values will be written into
685 # distinct segments, to allow for some grouping by age. The value 0 is
686 # somewhat special in that it indicates any rewritten objects can be
687 # placed in the same segment as completely new objects; this can be
688 # used for very young objects which have been expired, or objects not
689 # expected to be encountered.
691 # In the balancing process, all objects which are not used in any
692 # current snapshots will have expired set to 0. Objects which have
693 # been seen will be sorted by age and will have expired values set to
694 # 0, 1, 2, and so on based on age (with younger objects being assigned
695 # lower values). The number of buckets and the age cutoffs is
696 # determined by looking at the distribution of block ages.
700 # Mark all expired objects with expired = 0; these objects will later
701 # have values set to indicate groupings of objects when repacking.
702 cur.execute("""update block_index set expired = 0
703 where expired is not null""")
705 # We will want to aim for at least one full segment for each bucket
706 # that we eventually create, but don't know how many bytes that should
707 # be due to compression. So compute the average number of bytes in
708 # each expired segment as a rough estimate for the minimum size of each
709 # bucket. (This estimate could be thrown off by many not-fully-packed
710 # segments, but for now don't worry too much about that.) If we can't
711 # compute an average, it's probably because there are no expired
712 # segments, so we have no more work to do.
713 cur.execute("""select avg(size) from segments
715 (select distinct segmentid from block_index
716 where expired is not null)""")
717 segment_size_estimate = cur.fetchone()[0]
718 if not segment_size_estimate:
721 # Next, extract distribution of expired objects (number and size) by
722 # age. Save the timestamp for "now" so that the classification of
723 # blocks into age buckets will not change later in the function, after
724 # time has passed. Set any timestamps in the future to now, so we are
725 # guaranteed that for the rest of this function, age is always
727 cur.execute("select julianday('now')")
728 now = cur.fetchone()[0]
730 cur.execute("""update block_index set timestamp = ?
731 where timestamp > ? and expired is not null""",
734 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
735 from block_index where expired = 0
736 group by age order by age""", (now,))
737 distribution = cur.fetchall()
739 # Start to determine the buckets for expired objects. Heuristics used:
740 # - An upper bound on the number of buckets is given by the number of
741 # segments we estimate it will take to store all data. In fact,
742 # aim for a couple of segments per bucket.
743 # - Place very young objects in bucket 0 (place with new objects)
744 # unless there are enough of them to warrant a separate bucket.
745 # - Try not to create unnecessarily many buckets, since fewer buckets
746 # will allow repacked data to be grouped based on spatial locality
747 # (while more buckets will group by temporal locality). We want a
750 total_bytes = sum([i[2] for i in distribution])
751 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
752 min_size = 1.5 * segment_size_estimate
753 target_size = max(2 * segment_size_estimate,
754 total_bytes / target_buckets)
756 print "segment_size:", segment_size_estimate
757 print "distribution:", distribution
758 print "total_bytes:", total_bytes
759 print "target_buckets:", target_buckets
760 print "min, target size:", min_size, target_size
762 # Chosen cutoffs. Each bucket consists of objects with age greater
763 # than one cutoff value, but not greater than the next largest cutoff.
766 # Starting with the oldest objects, begin grouping together into
767 # buckets of size at least target_size bytes.
768 distribution.reverse()
770 min_age_bucket = False
771 for (age, items, size) in distribution:
772 if bucket_size >= target_size \
773 or (age < MIN_AGE and not min_age_bucket):
774 if bucket_size < target_size and len(cutoffs) > 0:
781 min_age_bucket = True
783 # The last (youngest) bucket will be group 0, unless it has enough data
784 # to be of size min_size by itself, or there happen to be no objects
785 # less than MIN_AGE at all.
786 if bucket_size >= min_size or not min_age_bucket:
790 print "cutoffs:", cutoffs
792 # Update the database to assign each object to the appropriate bucket.
794 for i in range(len(cutoffs)):
795 cur.execute("""update block_index set expired = ?
796 where round(? - timestamp) > ?
797 and expired is not null""",
798 (i, now, cutoffs[i]))