1 """High-level interface for working with LBS archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 import cumulus.store, cumulus.store.file
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 8) # LBS Snapshot v0.8
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
23 # All segments which have been accessed this session.
24 accessed_segments = set()
27 """Decode a URI-encoded (%xx escapes) string."""
28 def hex_decode(m): return chr(int(m.group(1), 16))
29 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
31 """Encode a string to URI-encoded (%xx escapes) form."""
33 if c > '+' and c < '\x7f' and c != '@':
36 return "%%%02x" % (ord(c),)
37 return ''.join(hex_encode(c) for c in s)
40 """A class which merely acts as a data container.
42 Instances of this class (or its subclasses) are merely used to store data
43 in various attributes. No methods are provided.
47 return "<%s %s>" % (self.__class__, self.__dict__)
49 CHECKSUM_ALGORITHMS = {
53 class ChecksumCreator:
54 """Compute an LBS checksum for provided data.
56 The algorithm used is selectable, but currently defaults to sha1.
59 def __init__(self, algorithm='sha1'):
60 self.algorithm = algorithm
61 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
63 def update(self, data):
64 self.hash.update(data)
68 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
70 class ChecksumVerifier:
71 """Verify whether a checksum from a snapshot matches the supplied data."""
73 def __init__(self, checksumstr):
74 """Create an object to check the supplied checksum."""
76 (algo, checksum) = checksumstr.split("=", 1)
77 self.checksum = checksum
78 self.hash = CHECKSUM_ALGORITHMS[algo]()
80 def update(self, data):
81 self.hash.update(data)
84 """Return a boolean indicating whether the checksum matches."""
86 result = self.hash.hexdigest()
87 return result == self.checksum
89 class LowlevelDataStore:
90 """Access to the backup store containing segments and snapshot descriptors.
92 Instances of this class are used to get direct filesystem-level access to
93 the backup data. To read a backup, a caller will ordinarily not care about
94 direct access to backup segments, but will instead merely need to access
95 objects from those segments. The ObjectStore class provides a suitable
96 wrapper around a DataStore to give this high-level access.
99 def __init__(self, path):
100 if path.find(":") >= 0:
101 self.store = cumulus.store.open(path)
103 self.store = cumulus.store.file.FileStore(path)
105 def _classify(self, filename):
106 for (t, r) in cumulus.store.type_patterns.items():
107 if r.match(filename):
109 return (None, filename)
111 def lowlevel_open(self, filename):
112 """Return a file-like object for reading data from the given file."""
114 (type, filename) = self._classify(filename)
115 return self.store.get(type, filename)
117 def lowlevel_stat(self, filename):
118 """Return a dictionary of information about the given file.
120 Currently, the only defined field is 'size', giving the size of the
124 (type, filename) = self._classify(filename)
125 return self.store.stat(type, filename)
127 # Slightly higher-level list methods.
128 def list_snapshots(self):
129 for f in self.store.list('snapshots'):
130 m = cumulus.store.type_patterns['snapshots'].match(f)
131 if m: yield m.group(1)
133 def list_segments(self):
134 for f in self.store.list('segments'):
135 m = cumulus.store.type_patterns['segments'].match(f)
136 if m: yield m.group(1)
139 def __init__(self, data_store):
140 self.store = data_store
145 def get_cachedir(self):
146 if self.cachedir is None:
147 self.cachedir = tempfile.mkdtemp(".lbs")
151 if self.cachedir is not None:
152 # TODO: Avoid use of system, make this safer
153 os.system("rm -rf " + self.cachedir)
157 def parse_ref(refstr):
158 m = re.match(r"^zero\[(\d+)\]$", refstr)
160 return ("zero", None, None, (0, int(m.group(1))))
162 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
167 checksum = m.group(3)
170 if checksum is not None:
171 checksum = checksum.lstrip("(").rstrip(")")
173 if slice is not None:
174 if m.group(9) is not None:
175 # Size-assertion slice
176 slice = (0, int(m.group(9)), True)
177 elif m.group(6) is None:
179 slice = (0, int(m.group(8)), False)
181 slice = (int(m.group(7)), int(m.group(8)), False)
183 return (segment, object, checksum, slice)
185 def get_segment(self, segment):
186 accessed_segments.add(segment)
187 raw = self.store.lowlevel_open(segment + ".tar.gpg")
189 (input, output) = os.popen2("lbs-filter-gpg --decrypt")
190 def copy_thread(src, dst):
193 block = src.read(BLOCK_SIZE)
194 if len(block) == 0: break
198 thread.start_new_thread(copy_thread, (raw, input))
201 def load_segment(self, segment):
202 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
204 data_obj = seg.extractfile(item)
205 path = item.name.split('/')
206 if len(path) == 2 and path[0] == segment:
207 yield (path[1], data_obj.read())
209 def load_snapshot(self, snapshot):
210 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
211 return file.read().splitlines(True)
213 def extract_segment(self, segment):
214 segdir = os.path.join(self.get_cachedir(), segment)
216 for (object, data) in self.load_segment(segment):
217 f = open(os.path.join(segdir, object), 'wb')
221 def load_object(self, segment, object):
222 accessed_segments.add(segment)
223 path = os.path.join(self.get_cachedir(), segment, object)
224 if not os.access(path, os.R_OK):
225 self.extract_segment(segment)
226 if segment in self.lru_list: self.lru_list.remove(segment)
227 self.lru_list.append(segment)
228 while len(self.lru_list) > self.CACHE_SIZE:
229 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
230 self.lru_list = self.lru_list[1:]
231 return open(path, 'rb').read()
233 def get(self, refstr):
234 """Fetch the given object and return it.
236 The input should be an object reference, in string form.
239 (segment, object, checksum, slice) = self.parse_ref(refstr)
241 if segment == "zero":
242 return "\0" * slice[1]
244 data = self.load_object(segment, object)
246 if checksum is not None:
247 verifier = ChecksumVerifier(checksum)
248 verifier.update(data)
249 if not verifier.valid():
252 if slice is not None:
253 (start, length, exact) = slice
254 if exact and len(data) != length: raise ValueError
255 data = data[start:start+length]
256 if len(data) != length: raise IndexError
260 def parse(lines, terminate=None):
261 """Generic parser for RFC822-style "Key: Value" data streams.
263 This parser can be used to read metadata logs and snapshot root descriptor
266 lines must be an iterable object which yields a sequence of lines of input.
268 If terminate is specified, it is used as a predicate to determine when to
269 stop reading input lines.
276 # Strip off a trailing newline, if present
277 if len(l) > 0 and l[-1] == "\n":
280 if terminate is not None and terminate(l):
281 if len(dict) > 0: yield dict
286 m = re.match(r"^([-\w]+):\s*(.*)$", l)
288 dict[m.group(1)] = m.group(2)
289 last_key = m.group(1)
290 elif len(l) > 0 and l[0].isspace() and last_key is not None:
295 if len(dict) > 0: yield dict
297 def parse_full(lines):
299 return parse(lines).next()
300 except StopIteration:
303 def parse_metadata_version(s):
304 """Convert a string with the snapshot version format to a tuple."""
306 m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
310 return tuple([int(d) for d in m.group(1).split(".")])
312 def read_metadata(object_store, root):
313 """Iterate through all lines in the metadata log, following references."""
315 # Stack for keeping track of recursion when following references to
316 # portions of the log. The last entry in the stack corresponds to the
317 # object currently being parsed. Each entry is a list of lines which have
318 # been reversed, so that popping successive lines from the end of each list
319 # will return lines of the metadata log in order.
322 def follow_ref(refstr):
323 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
324 lines = object_store.get(refstr).splitlines(True)
330 while len(stack) > 0:
337 # An indirect reference which we must follow?
338 if len(line) > 0 and line[0] == '@':
346 """Metadata for a single file (or directory or...) from a snapshot."""
348 # Functions for parsing various datatypes that can appear in a metadata log
352 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
353 if s.startswith("0x"):
355 elif s.startswith("0"):
362 """Decode a URI-encoded (%xx escapes) string."""
367 """An unecoded string."""
372 """Decode a user/group to a tuple of uid/gid followed by name."""
374 uid = MetadataItem.decode_int(items[0])
377 if items[1].startswith("(") and items[1].endswith(")"):
378 name = MetadataItem.decode_str(items[1][1:-1])
382 def decode_device(s):
383 """Decode a device major/minor number."""
384 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
385 return (major, minor)
389 def __init__(self, fields, object_store):
390 """Initialize from a dictionary of key/value pairs from metadata log."""
393 self.object_store = object_store
395 self.items = self.Items()
396 for (k, v) in fields.items():
397 if k in self.field_types:
398 decoder = self.field_types[k]
399 setattr(self.items, k, decoder(v))
403 """Return an iterator for the data blocks that make up a file."""
405 # This traverses the list of blocks that make up a file, following
406 # indirect references. It is implemented in much the same way as
407 # read_metadata, so see that function for details of the technique.
409 objects = self.fields['data'].split()
413 def follow_ref(refstr):
414 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
415 objects = self.object_store.get(refstr).split()
417 stack.append(objects)
419 while len(stack) > 0:
426 # An indirect reference which we must follow?
427 if len(ref) > 0 and ref[0] == '@':
432 # Description of fields that might appear, and how they should be parsed.
433 MetadataItem.field_types = {
434 'name': MetadataItem.decode_str,
435 'type': MetadataItem.raw_str,
436 'mode': MetadataItem.decode_int,
437 'device': MetadataItem.decode_device,
438 'user': MetadataItem.decode_user,
439 'group': MetadataItem.decode_user,
440 'ctime': MetadataItem.decode_int,
441 'mtime': MetadataItem.decode_int,
442 'links': MetadataItem.decode_int,
443 'inode': MetadataItem.raw_str,
444 'checksum': MetadataItem.decode_str,
445 'size': MetadataItem.decode_int,
446 'contents': MetadataItem.decode_str,
447 'target': MetadataItem.decode_str,
450 def iterate_metadata(object_store, root):
451 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
452 yield MetadataItem(d, object_store)
455 """Access to the local database of snapshot contents and object checksums.
457 The local database is consulted when creating a snapshot to determine what
458 data can be re-used from old snapshots. Segment cleaning is performed by
459 manipulating the data in the local database; the local database also
460 includes enough data to guide the segment cleaning process.
463 def __init__(self, path, dbname="localdb.sqlite"):
464 self.db_connection = sqlite3.connect(path + "/" + dbname)
466 # Low-level database access. Use these methods when there isn't a
467 # higher-level interface available. Exception: do, however, remember to
468 # use the commit() method after making changes to make sure they are
469 # actually saved, even when going through higher-level interfaces.
471 "Commit any pending changes to the local database."
472 self.db_connection.commit()
475 "Roll back any pending changes to the local database."
476 self.db_connection.rollback()
479 "Return a DB-API cursor for directly accessing the local database."
480 return self.db_connection.cursor()
482 def list_schemes(self):
483 """Return the list of snapshots found in the local database.
485 The returned value is a list of tuples (id, scheme, name, time, intent).
489 cur.execute("select distinct scheme from snapshots")
490 schemes = [row[0] for row in cur.fetchall()]
494 def garbage_collect(self, scheme, intent=1.0):
495 """Delete entries from old snapshots from the database.
497 Only snapshots with the specified scheme name will be deleted. If
498 intent is given, it gives the intended next snapshot type, to determine
499 how aggressively to clean (for example, intent=7 could be used if the
500 next snapshot will be a weekly snapshot).
505 # Find the id of the last snapshot to be created. This is used for
506 # measuring time in a way: we record this value in each segment we
507 # expire on this run, and then on a future run can tell if there have
508 # been intervening backups made.
509 cur.execute("select max(snapshotid) from snapshots")
510 last_snapshotid = cur.fetchone()[0]
512 # Get the list of old snapshots for this scheme. Delete all the old
513 # ones. Rules for what to keep:
514 # - Always keep the most recent snapshot.
515 # - If snapshot X is younger than Y, and X has higher intent, then Y
517 cur.execute("""select snapshotid, name, intent,
518 julianday('now') - timestamp as age
519 from snapshots where scheme = ?
520 order by age""", (scheme,))
524 for (id, name, snap_intent, snap_age) in cur.fetchall():
526 if snap_intent < max_intent:
527 # Delete small-intent snapshots if there is a more recent
528 # large-intent snapshot.
530 elif snap_intent == intent:
531 # Delete previous snapshots with the specified intent level.
534 if can_delete and not first:
535 print "Delete snapshot %d (%s)" % (id, name)
536 cur.execute("delete from snapshots where snapshotid = ?",
539 max_intent = max(max_intent, snap_intent)
541 # Delete entries in the segments_used table which are for non-existent
543 cur.execute("""delete from segments_used
544 where snapshotid not in
545 (select snapshotid from snapshots)""")
547 # Find segments which contain no objects used by any current snapshots,
548 # and delete them from the segment table.
549 cur.execute("""delete from segments where segmentid not in
550 (select segmentid from segments_used)""")
552 # Delete unused objects in the block_index table. By "unused", we mean
553 # any object which was stored in a segment which has been deleted, and
554 # any object in a segment which was marked for cleaning and has had
555 # cleaning performed already (the expired time is less than the current
556 # largest snapshot id).
557 cur.execute("""delete from block_index
558 where segmentid not in (select segmentid from segments)
559 or segmentid in (select segmentid from segments
560 where expire_time < ?)""",
563 # Remove sub-block signatures for deleted objects.
564 cur.execute("""delete from subblock_signatures
566 (select blockid from block_index)""")
569 class SegmentInfo(Struct): pass
571 def get_segment_cleaning_list(self, age_boost=0.0):
572 """Return a list of all current segments with information for cleaning.
574 Return all segments which are currently known in the local database
575 (there might be other, older segments in the archive itself), and
576 return usage statistics for each to help decide which segments to
579 The returned list will be sorted by estimated cleaning benefit, with
580 segments that are best to clean at the start of the list.
582 If specified, the age_boost parameter (measured in days) will added to
583 the age of each segment, as a way of adjusting the benefit computation
584 before a long-lived snapshot is taken (for example, age_boost might be
585 set to 7 when cleaning prior to taking a weekly snapshot).
590 cur.execute("""select segmentid, used, size, mtime,
591 julianday('now') - mtime as age from segment_info
592 where expire_time is null""")
594 info = self.SegmentInfo()
596 info.used_bytes = row[1]
597 info.size_bytes = row[2]
599 info.age_days = row[4]
601 # If data is not available for whatever reason, treat it as 0.0.
602 if info.age_days is None:
604 if info.used_bytes is None:
605 info.used_bytes = 0.0
607 # Benefit calculation: u is the estimated fraction of each segment
608 # which is utilized (bytes belonging to objects still in use
609 # divided by total size; this doesn't take compression or storage
610 # overhead into account, but should give a reasonable estimate).
612 # The total benefit is a heuristic that combines several factors:
613 # the amount of space that can be reclaimed (1 - u), an ageing
614 # factor (info.age_days) that favors cleaning old segments to young
615 # ones and also is more likely to clean segments that will be
616 # rewritten for long-lived snapshots (age_boost), and finally a
617 # penalty factor for the cost of re-uploading data (u + 0.1).
618 u = info.used_bytes / info.size_bytes
619 info.cleaning_benefit \
620 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
622 segments.append(info)
624 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
627 def mark_segment_expired(self, segment):
628 """Mark a segment for cleaning in the local database.
630 The segment parameter should be either a SegmentInfo object or an
631 integer segment id. Objects in the given segment will be marked as
632 expired, which means that any future snapshots that would re-use those
633 objects will instead write out a new copy of the object, and thus no
634 future snapshots will depend upon the given segment.
637 if isinstance(segment, int):
639 elif isinstance(segment, self.SegmentInfo):
642 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
645 cur.execute("select max(snapshotid) from snapshots")
646 last_snapshotid = cur.fetchone()[0]
647 cur.execute("update segments set expire_time = ? where segmentid = ?",
648 (last_snapshotid, id))
649 cur.execute("update block_index set expired = 0 where segmentid = ?",
652 def balance_expired_objects(self):
653 """Analyze expired objects in segments to be cleaned and group by age.
655 Update the block_index table of the local database to group expired
656 objects by age. The exact number of buckets and the cutoffs for each
657 are dynamically determined. Calling this function after marking
658 segments expired will help in the segment cleaning process, by ensuring
659 that when active objects from clean segments are rewritten, they will
660 be placed into new segments roughly grouped by age.
663 # The expired column of the block_index table is used when generating a
664 # new LBS snapshot. A null value indicates that an object may be
665 # re-used. Otherwise, an object must be written into a new segment if
666 # needed. Objects with distinct expired values will be written into
667 # distinct segments, to allow for some grouping by age. The value 0 is
668 # somewhat special in that it indicates any rewritten objects can be
669 # placed in the same segment as completely new objects; this can be
670 # used for very young objects which have been expired, or objects not
671 # expected to be encountered.
673 # In the balancing process, all objects which are not used in any
674 # current snapshots will have expired set to 0. Objects which have
675 # been seen will be sorted by age and will have expired values set to
676 # 0, 1, 2, and so on based on age (with younger objects being assigned
677 # lower values). The number of buckets and the age cutoffs is
678 # determined by looking at the distribution of block ages.
682 # Mark all expired objects with expired = 0; these objects will later
683 # have values set to indicate groupings of objects when repacking.
684 cur.execute("""update block_index set expired = 0
685 where expired is not null""")
687 # We will want to aim for at least one full segment for each bucket
688 # that we eventually create, but don't know how many bytes that should
689 # be due to compression. So compute the average number of bytes in
690 # each expired segment as a rough estimate for the minimum size of each
691 # bucket. (This estimate could be thrown off by many not-fully-packed
692 # segments, but for now don't worry too much about that.) If we can't
693 # compute an average, it's probably because there are no expired
694 # segments, so we have no more work to do.
695 cur.execute("""select avg(size) from segments
697 (select distinct segmentid from block_index
698 where expired is not null)""")
699 segment_size_estimate = cur.fetchone()[0]
700 if not segment_size_estimate:
703 # Next, extract distribution of expired objects (number and size) by
704 # age. Save the timestamp for "now" so that the classification of
705 # blocks into age buckets will not change later in the function, after
706 # time has passed. Set any timestamps in the future to now, so we are
707 # guaranteed that for the rest of this function, age is always
709 cur.execute("select julianday('now')")
710 now = cur.fetchone()[0]
712 cur.execute("""update block_index set timestamp = ?
713 where timestamp > ? and expired is not null""",
716 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
717 from block_index where expired = 0
718 group by age order by age""", (now,))
719 distribution = cur.fetchall()
721 # Start to determine the buckets for expired objects. Heuristics used:
722 # - An upper bound on the number of buckets is given by the number of
723 # segments we estimate it will take to store all data. In fact,
724 # aim for a couple of segments per bucket.
725 # - Place very young objects in bucket 0 (place with new objects)
726 # unless there are enough of them to warrant a separate bucket.
727 # - Try not to create unnecessarily many buckets, since fewer buckets
728 # will allow repacked data to be grouped based on spatial locality
729 # (while more buckets will group by temporal locality). We want a
732 total_bytes = sum([i[2] for i in distribution])
733 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
734 min_size = 1.5 * segment_size_estimate
735 target_size = max(2 * segment_size_estimate,
736 total_bytes / target_buckets)
738 print "segment_size:", segment_size_estimate
739 print "distribution:", distribution
740 print "total_bytes:", total_bytes
741 print "target_buckets:", target_buckets
742 print "min, target size:", min_size, target_size
744 # Chosen cutoffs. Each bucket consists of objects with age greater
745 # than one cutoff value, but not greater than the next largest cutoff.
748 # Starting with the oldest objects, begin grouping together into
749 # buckets of size at least target_size bytes.
750 distribution.reverse()
752 min_age_bucket = False
753 for (age, items, size) in distribution:
754 if bucket_size >= target_size \
755 or (age < MIN_AGE and not min_age_bucket):
756 if bucket_size < target_size and len(cutoffs) > 0:
763 min_age_bucket = True
765 # The last (youngest) bucket will be group 0, unless it has enough data
766 # to be of size min_size by itself, or there happen to be no objects
767 # less than MIN_AGE at all.
768 if bucket_size >= min_size or not min_age_bucket:
772 print "cutoffs:", cutoffs
774 # Update the database to assign each object to the appropriate bucket.
776 for i in range(len(cutoffs)):
777 cur.execute("""update block_index set expired = ?
778 where round(? - timestamp) > ?
779 and expired is not null""",
780 (i, now, cutoffs[i]))