1 """High-level interface for working with LBS archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 import cumulus.store, cumulus.store.file
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 8) # LBS Snapshot v0.8
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
23 # All segments which have been accessed this session.
24 accessed_segments = set()
27 """Decode a URI-encoded (%xx escapes) string."""
28 def hex_decode(m): return chr(int(m.group(1), 16))
29 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
31 """Encode a string to URI-encoded (%xx escapes) form."""
33 if c > '+' and c < '\x7f' and c != '@':
36 return "%%%02x" % (ord(c),)
37 return ''.join(hex_encode(c) for c in s)
40 """A class which merely acts as a data container.
42 Instances of this class (or its subclasses) are merely used to store data
43 in various attributes. No methods are provided.
47 return "<%s %s>" % (self.__class__, self.__dict__)
49 CHECKSUM_ALGORITHMS = {
53 class ChecksumCreator:
54 """Compute an LBS checksum for provided data.
56 The algorithm used is selectable, but currently defaults to sha1.
59 def __init__(self, algorithm='sha1'):
60 self.algorithm = algorithm
61 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
63 def update(self, data):
64 self.hash.update(data)
68 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
70 class ChecksumVerifier:
71 """Verify whether a checksum from a snapshot matches the supplied data."""
73 def __init__(self, checksumstr):
74 """Create an object to check the supplied checksum."""
76 (algo, checksum) = checksumstr.split("=", 1)
77 self.checksum = checksum
78 self.hash = CHECKSUM_ALGORITHMS[algo]()
80 def update(self, data):
81 self.hash.update(data)
84 """Return a boolean indicating whether the checksum matches."""
86 result = self.hash.hexdigest()
87 return result == self.checksum
89 class LowlevelDataStore:
90 """Access to the backup store containing segments and snapshot descriptors.
92 Instances of this class are used to get direct filesystem-level access to
93 the backup data. To read a backup, a caller will ordinarily not care about
94 direct access to backup segments, but will instead merely need to access
95 objects from those segments. The ObjectStore class provides a suitable
96 wrapper around a DataStore to give this high-level access.
99 def __init__(self, path):
100 if isinstance(path, cumulus.store.Store):
102 elif path.find(":") >= 0:
103 self.store = cumulus.store.open(path)
105 self.store = cumulus.store.file.FileStore(path)
107 def _classify(self, filename):
108 for (t, r) in cumulus.store.type_patterns.items():
109 if r.match(filename):
111 return (None, filename)
113 def lowlevel_open(self, filename):
114 """Return a file-like object for reading data from the given file."""
116 (type, filename) = self._classify(filename)
117 return self.store.get(type, filename)
119 def lowlevel_stat(self, filename):
120 """Return a dictionary of information about the given file.
122 Currently, the only defined field is 'size', giving the size of the
126 (type, filename) = self._classify(filename)
127 return self.store.stat(type, filename)
129 # Slightly higher-level list methods.
130 def list_snapshots(self):
131 for f in self.store.list('snapshots'):
132 m = cumulus.store.type_patterns['snapshots'].match(f)
133 if m: yield m.group(1)
135 def list_segments(self):
136 for f in self.store.list('segments'):
137 m = cumulus.store.type_patterns['segments'].match(f)
138 if m: yield m.group(1)
141 def __init__(self, data_store):
142 self.store = data_store
147 def get_cachedir(self):
148 if self.cachedir is None:
149 self.cachedir = tempfile.mkdtemp(".lbs")
153 if self.cachedir is not None:
154 # TODO: Avoid use of system, make this safer
155 os.system("rm -rf " + self.cachedir)
159 def parse_ref(refstr):
160 m = re.match(r"^zero\[(\d+)\]$", refstr)
162 return ("zero", None, None, (0, int(m.group(1))))
164 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
169 checksum = m.group(3)
172 if checksum is not None:
173 checksum = checksum.lstrip("(").rstrip(")")
175 if slice is not None:
176 if m.group(9) is not None:
177 # Size-assertion slice
178 slice = (0, int(m.group(9)), True)
179 elif m.group(6) is None:
181 slice = (0, int(m.group(8)), False)
183 slice = (int(m.group(7)), int(m.group(8)), False)
185 return (segment, object, checksum, slice)
187 def get_segment(self, segment):
188 accessed_segments.add(segment)
189 raw = self.store.lowlevel_open(segment + ".tar.gpg")
191 (input, output) = os.popen2("lbs-filter-gpg --decrypt")
192 def copy_thread(src, dst):
195 block = src.read(BLOCK_SIZE)
196 if len(block) == 0: break
200 thread.start_new_thread(copy_thread, (raw, input))
203 def load_segment(self, segment):
204 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
206 data_obj = seg.extractfile(item)
207 path = item.name.split('/')
208 if len(path) == 2 and path[0] == segment:
209 yield (path[1], data_obj.read())
211 def load_snapshot(self, snapshot):
212 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
213 return file.read().splitlines(True)
215 def extract_segment(self, segment):
216 segdir = os.path.join(self.get_cachedir(), segment)
218 for (object, data) in self.load_segment(segment):
219 f = open(os.path.join(segdir, object), 'wb')
223 def load_object(self, segment, object):
224 accessed_segments.add(segment)
225 path = os.path.join(self.get_cachedir(), segment, object)
226 if not os.access(path, os.R_OK):
227 self.extract_segment(segment)
228 if segment in self.lru_list: self.lru_list.remove(segment)
229 self.lru_list.append(segment)
230 while len(self.lru_list) > self.CACHE_SIZE:
231 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
232 self.lru_list = self.lru_list[1:]
233 return open(path, 'rb').read()
235 def get(self, refstr):
236 """Fetch the given object and return it.
238 The input should be an object reference, in string form.
241 (segment, object, checksum, slice) = self.parse_ref(refstr)
243 if segment == "zero":
244 return "\0" * slice[1]
246 data = self.load_object(segment, object)
248 if checksum is not None:
249 verifier = ChecksumVerifier(checksum)
250 verifier.update(data)
251 if not verifier.valid():
254 if slice is not None:
255 (start, length, exact) = slice
256 if exact and len(data) != length: raise ValueError
257 data = data[start:start+length]
258 if len(data) != length: raise IndexError
262 def parse(lines, terminate=None):
263 """Generic parser for RFC822-style "Key: Value" data streams.
265 This parser can be used to read metadata logs and snapshot root descriptor
268 lines must be an iterable object which yields a sequence of lines of input.
270 If terminate is specified, it is used as a predicate to determine when to
271 stop reading input lines.
278 # Strip off a trailing newline, if present
279 if len(l) > 0 and l[-1] == "\n":
282 if terminate is not None and terminate(l):
283 if len(dict) > 0: yield dict
288 m = re.match(r"^([-\w]+):\s*(.*)$", l)
290 dict[m.group(1)] = m.group(2)
291 last_key = m.group(1)
292 elif len(l) > 0 and l[0].isspace() and last_key is not None:
297 if len(dict) > 0: yield dict
299 def parse_full(lines):
301 return parse(lines).next()
302 except StopIteration:
305 def parse_metadata_version(s):
306 """Convert a string with the snapshot version format to a tuple."""
308 m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
312 return tuple([int(d) for d in m.group(1).split(".")])
314 def read_metadata(object_store, root):
315 """Iterate through all lines in the metadata log, following references."""
317 # Stack for keeping track of recursion when following references to
318 # portions of the log. The last entry in the stack corresponds to the
319 # object currently being parsed. Each entry is a list of lines which have
320 # been reversed, so that popping successive lines from the end of each list
321 # will return lines of the metadata log in order.
324 def follow_ref(refstr):
325 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
326 lines = object_store.get(refstr).splitlines(True)
332 while len(stack) > 0:
339 # An indirect reference which we must follow?
340 if len(line) > 0 and line[0] == '@':
348 """Metadata for a single file (or directory or...) from a snapshot."""
350 # Functions for parsing various datatypes that can appear in a metadata log
354 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
355 if s.startswith("0x"):
357 elif s.startswith("0"):
364 """Decode a URI-encoded (%xx escapes) string."""
369 """An unecoded string."""
374 """Decode a user/group to a tuple of uid/gid followed by name."""
376 uid = MetadataItem.decode_int(items[0])
379 if items[1].startswith("(") and items[1].endswith(")"):
380 name = MetadataItem.decode_str(items[1][1:-1])
384 def decode_device(s):
385 """Decode a device major/minor number."""
386 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
387 return (major, minor)
391 def __init__(self, fields, object_store):
392 """Initialize from a dictionary of key/value pairs from metadata log."""
395 self.object_store = object_store
397 self.items = self.Items()
398 for (k, v) in fields.items():
399 if k in self.field_types:
400 decoder = self.field_types[k]
401 setattr(self.items, k, decoder(v))
405 """Return an iterator for the data blocks that make up a file."""
407 # This traverses the list of blocks that make up a file, following
408 # indirect references. It is implemented in much the same way as
409 # read_metadata, so see that function for details of the technique.
411 objects = self.fields['data'].split()
415 def follow_ref(refstr):
416 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
417 objects = self.object_store.get(refstr).split()
419 stack.append(objects)
421 while len(stack) > 0:
428 # An indirect reference which we must follow?
429 if len(ref) > 0 and ref[0] == '@':
434 # Description of fields that might appear, and how they should be parsed.
435 MetadataItem.field_types = {
436 'name': MetadataItem.decode_str,
437 'type': MetadataItem.raw_str,
438 'mode': MetadataItem.decode_int,
439 'device': MetadataItem.decode_device,
440 'user': MetadataItem.decode_user,
441 'group': MetadataItem.decode_user,
442 'ctime': MetadataItem.decode_int,
443 'mtime': MetadataItem.decode_int,
444 'links': MetadataItem.decode_int,
445 'inode': MetadataItem.raw_str,
446 'checksum': MetadataItem.decode_str,
447 'size': MetadataItem.decode_int,
448 'contents': MetadataItem.decode_str,
449 'target': MetadataItem.decode_str,
452 def iterate_metadata(object_store, root):
453 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
454 yield MetadataItem(d, object_store)
457 """Access to the local database of snapshot contents and object checksums.
459 The local database is consulted when creating a snapshot to determine what
460 data can be re-used from old snapshots. Segment cleaning is performed by
461 manipulating the data in the local database; the local database also
462 includes enough data to guide the segment cleaning process.
465 def __init__(self, path, dbname="localdb.sqlite"):
466 self.db_connection = sqlite3.connect(path + "/" + dbname)
468 # Low-level database access. Use these methods when there isn't a
469 # higher-level interface available. Exception: do, however, remember to
470 # use the commit() method after making changes to make sure they are
471 # actually saved, even when going through higher-level interfaces.
473 "Commit any pending changes to the local database."
474 self.db_connection.commit()
477 "Roll back any pending changes to the local database."
478 self.db_connection.rollback()
481 "Return a DB-API cursor for directly accessing the local database."
482 return self.db_connection.cursor()
484 def list_schemes(self):
485 """Return the list of snapshots found in the local database.
487 The returned value is a list of tuples (id, scheme, name, time, intent).
491 cur.execute("select distinct scheme from snapshots")
492 schemes = [row[0] for row in cur.fetchall()]
496 def garbage_collect(self, scheme, intent=1.0):
497 """Delete entries from old snapshots from the database.
499 Only snapshots with the specified scheme name will be deleted. If
500 intent is given, it gives the intended next snapshot type, to determine
501 how aggressively to clean (for example, intent=7 could be used if the
502 next snapshot will be a weekly snapshot).
507 # Find the id of the last snapshot to be created. This is used for
508 # measuring time in a way: we record this value in each segment we
509 # expire on this run, and then on a future run can tell if there have
510 # been intervening backups made.
511 cur.execute("select max(snapshotid) from snapshots")
512 last_snapshotid = cur.fetchone()[0]
514 # Get the list of old snapshots for this scheme. Delete all the old
515 # ones. Rules for what to keep:
516 # - Always keep the most recent snapshot.
517 # - If snapshot X is younger than Y, and X has higher intent, then Y
519 cur.execute("""select snapshotid, name, intent,
520 julianday('now') - timestamp as age
521 from snapshots where scheme = ?
522 order by age""", (scheme,))
526 for (id, name, snap_intent, snap_age) in cur.fetchall():
528 if snap_intent < max_intent:
529 # Delete small-intent snapshots if there is a more recent
530 # large-intent snapshot.
532 elif snap_intent == intent:
533 # Delete previous snapshots with the specified intent level.
536 if can_delete and not first:
537 print "Delete snapshot %d (%s)" % (id, name)
538 cur.execute("delete from snapshots where snapshotid = ?",
541 max_intent = max(max_intent, snap_intent)
543 # Delete entries in the segments_used table which are for non-existent
545 cur.execute("""delete from segments_used
546 where snapshotid not in
547 (select snapshotid from snapshots)""")
549 # Find segments which contain no objects used by any current snapshots,
550 # and delete them from the segment table.
551 cur.execute("""delete from segments where segmentid not in
552 (select segmentid from segments_used)""")
554 # Delete unused objects in the block_index table. By "unused", we mean
555 # any object which was stored in a segment which has been deleted, and
556 # any object in a segment which was marked for cleaning and has had
557 # cleaning performed already (the expired time is less than the current
558 # largest snapshot id).
559 cur.execute("""delete from block_index
560 where segmentid not in (select segmentid from segments)
561 or segmentid in (select segmentid from segments
562 where expire_time < ?)""",
565 # Remove sub-block signatures for deleted objects.
566 cur.execute("""delete from subblock_signatures
568 (select blockid from block_index)""")
571 class SegmentInfo(Struct): pass
573 def get_segment_cleaning_list(self, age_boost=0.0):
574 """Return a list of all current segments with information for cleaning.
576 Return all segments which are currently known in the local database
577 (there might be other, older segments in the archive itself), and
578 return usage statistics for each to help decide which segments to
581 The returned list will be sorted by estimated cleaning benefit, with
582 segments that are best to clean at the start of the list.
584 If specified, the age_boost parameter (measured in days) will added to
585 the age of each segment, as a way of adjusting the benefit computation
586 before a long-lived snapshot is taken (for example, age_boost might be
587 set to 7 when cleaning prior to taking a weekly snapshot).
592 cur.execute("""select segmentid, used, size, mtime,
593 julianday('now') - mtime as age from segment_info
594 where expire_time is null""")
596 info = self.SegmentInfo()
598 info.used_bytes = row[1]
599 info.size_bytes = row[2]
601 info.age_days = row[4]
603 # If data is not available for whatever reason, treat it as 0.0.
604 if info.age_days is None:
606 if info.used_bytes is None:
607 info.used_bytes = 0.0
609 # Benefit calculation: u is the estimated fraction of each segment
610 # which is utilized (bytes belonging to objects still in use
611 # divided by total size; this doesn't take compression or storage
612 # overhead into account, but should give a reasonable estimate).
614 # The total benefit is a heuristic that combines several factors:
615 # the amount of space that can be reclaimed (1 - u), an ageing
616 # factor (info.age_days) that favors cleaning old segments to young
617 # ones and also is more likely to clean segments that will be
618 # rewritten for long-lived snapshots (age_boost), and finally a
619 # penalty factor for the cost of re-uploading data (u + 0.1).
620 u = info.used_bytes / info.size_bytes
621 info.cleaning_benefit \
622 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
624 segments.append(info)
626 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
629 def mark_segment_expired(self, segment):
630 """Mark a segment for cleaning in the local database.
632 The segment parameter should be either a SegmentInfo object or an
633 integer segment id. Objects in the given segment will be marked as
634 expired, which means that any future snapshots that would re-use those
635 objects will instead write out a new copy of the object, and thus no
636 future snapshots will depend upon the given segment.
639 if isinstance(segment, int):
641 elif isinstance(segment, self.SegmentInfo):
644 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
647 cur.execute("select max(snapshotid) from snapshots")
648 last_snapshotid = cur.fetchone()[0]
649 cur.execute("update segments set expire_time = ? where segmentid = ?",
650 (last_snapshotid, id))
651 cur.execute("update block_index set expired = 0 where segmentid = ?",
654 def balance_expired_objects(self):
655 """Analyze expired objects in segments to be cleaned and group by age.
657 Update the block_index table of the local database to group expired
658 objects by age. The exact number of buckets and the cutoffs for each
659 are dynamically determined. Calling this function after marking
660 segments expired will help in the segment cleaning process, by ensuring
661 that when active objects from clean segments are rewritten, they will
662 be placed into new segments roughly grouped by age.
665 # The expired column of the block_index table is used when generating a
666 # new LBS snapshot. A null value indicates that an object may be
667 # re-used. Otherwise, an object must be written into a new segment if
668 # needed. Objects with distinct expired values will be written into
669 # distinct segments, to allow for some grouping by age. The value 0 is
670 # somewhat special in that it indicates any rewritten objects can be
671 # placed in the same segment as completely new objects; this can be
672 # used for very young objects which have been expired, or objects not
673 # expected to be encountered.
675 # In the balancing process, all objects which are not used in any
676 # current snapshots will have expired set to 0. Objects which have
677 # been seen will be sorted by age and will have expired values set to
678 # 0, 1, 2, and so on based on age (with younger objects being assigned
679 # lower values). The number of buckets and the age cutoffs is
680 # determined by looking at the distribution of block ages.
684 # Mark all expired objects with expired = 0; these objects will later
685 # have values set to indicate groupings of objects when repacking.
686 cur.execute("""update block_index set expired = 0
687 where expired is not null""")
689 # We will want to aim for at least one full segment for each bucket
690 # that we eventually create, but don't know how many bytes that should
691 # be due to compression. So compute the average number of bytes in
692 # each expired segment as a rough estimate for the minimum size of each
693 # bucket. (This estimate could be thrown off by many not-fully-packed
694 # segments, but for now don't worry too much about that.) If we can't
695 # compute an average, it's probably because there are no expired
696 # segments, so we have no more work to do.
697 cur.execute("""select avg(size) from segments
699 (select distinct segmentid from block_index
700 where expired is not null)""")
701 segment_size_estimate = cur.fetchone()[0]
702 if not segment_size_estimate:
705 # Next, extract distribution of expired objects (number and size) by
706 # age. Save the timestamp for "now" so that the classification of
707 # blocks into age buckets will not change later in the function, after
708 # time has passed. Set any timestamps in the future to now, so we are
709 # guaranteed that for the rest of this function, age is always
711 cur.execute("select julianday('now')")
712 now = cur.fetchone()[0]
714 cur.execute("""update block_index set timestamp = ?
715 where timestamp > ? and expired is not null""",
718 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
719 from block_index where expired = 0
720 group by age order by age""", (now,))
721 distribution = cur.fetchall()
723 # Start to determine the buckets for expired objects. Heuristics used:
724 # - An upper bound on the number of buckets is given by the number of
725 # segments we estimate it will take to store all data. In fact,
726 # aim for a couple of segments per bucket.
727 # - Place very young objects in bucket 0 (place with new objects)
728 # unless there are enough of them to warrant a separate bucket.
729 # - Try not to create unnecessarily many buckets, since fewer buckets
730 # will allow repacked data to be grouped based on spatial locality
731 # (while more buckets will group by temporal locality). We want a
734 total_bytes = sum([i[2] for i in distribution])
735 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
736 min_size = 1.5 * segment_size_estimate
737 target_size = max(2 * segment_size_estimate,
738 total_bytes / target_buckets)
740 print "segment_size:", segment_size_estimate
741 print "distribution:", distribution
742 print "total_bytes:", total_bytes
743 print "target_buckets:", target_buckets
744 print "min, target size:", min_size, target_size
746 # Chosen cutoffs. Each bucket consists of objects with age greater
747 # than one cutoff value, but not greater than the next largest cutoff.
750 # Starting with the oldest objects, begin grouping together into
751 # buckets of size at least target_size bytes.
752 distribution.reverse()
754 min_age_bucket = False
755 for (age, items, size) in distribution:
756 if bucket_size >= target_size \
757 or (age < MIN_AGE and not min_age_bucket):
758 if bucket_size < target_size and len(cutoffs) > 0:
765 min_age_bucket = True
767 # The last (youngest) bucket will be group 0, unless it has enough data
768 # to be of size min_size by itself, or there happen to be no objects
769 # less than MIN_AGE at all.
770 if bucket_size >= min_size or not min_age_bucket:
774 print "cutoffs:", cutoffs
776 # Update the database to assign each object to the appropriate bucket.
778 for i in range(len(cutoffs)):
779 cur.execute("""update block_index set expired = ?
780 where round(? - timestamp) > ?
781 and expired is not null""",
782 (i, now, cutoffs[i]))