1 """High-level interface for working with LBS archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 # The largest supported snapshot format that can be understood.
16 FORMAT_VERSION = (0, 8) # LBS Snapshot v0.8
18 # Maximum number of nested indirect references allowed in a snapshot.
19 MAX_RECURSION_DEPTH = 3
21 # All segments which have been accessed this session.
22 accessed_segments = set()
25 """A class which merely acts as a data container.
27 Instances of this class (or its subclasses) are merely used to store data
28 in various attributes. No methods are provided.
32 return "<%s %s>" % (self.__class__, self.__dict__)
34 CHECKSUM_ALGORITHMS = {
38 class ChecksumCreator:
39 """Compute an LBS checksum for provided data.
41 The algorithm used is selectable, but currently defaults to sha1.
44 def __init__(self, algorithm='sha1'):
45 self.algorithm = algorithm
46 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
48 def update(self, data):
49 self.hash.update(data)
53 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
55 class ChecksumVerifier:
56 """Verify whether a checksum from a snapshot matches the supplied data."""
58 def __init__(self, checksumstr):
59 """Create an object to check the supplied checksum."""
61 (algo, checksum) = checksumstr.split("=", 1)
62 self.checksum = checksum
63 self.hash = CHECKSUM_ALGORITHMS[algo]()
65 def update(self, data):
66 self.hash.update(data)
69 """Return a boolean indicating whether the checksum matches."""
71 result = self.hash.hexdigest()
72 return result == self.checksum
74 class LowlevelDataStore:
75 """Access to the backup store containing segments and snapshot descriptors.
77 Instances of this class are used to get direct filesystem-level access to
78 the backup data. To read a backup, a caller will ordinarily not care about
79 direct access to backup segments, but will instead merely need to access
80 objects from those segments. The ObjectStore class provides a suitable
81 wrapper around a DataStore to give this high-level access.
84 def __init__(self, path):
87 # Low-level filesystem access. These methods could be overwritten to
88 # provide access to remote data stores.
89 def lowlevel_list(self):
90 """Get a listing of files stored."""
92 return os.listdir(self.path)
94 def lowlevel_open(self, filename):
95 """Return a file-like object for reading data from the given file."""
97 return open(os.path.join(self.path, filename), 'rb')
99 def lowlevel_stat(self, filename):
100 """Return a dictionary of information about the given file.
102 Currently, the only defined field is 'size', giving the size of the
106 stat = os.stat(os.path.join(self.path, filename))
107 return {'size': stat.st_size}
109 # Slightly higher-level list methods.
110 def list_snapshots(self):
111 for f in self.lowlevel_list():
112 m = re.match(r"^snapshot-(.*)\.lbs$", f)
116 def list_segments(self):
117 for f in self.lowlevel_list():
118 m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
123 def __init__(self, data_store):
124 self.store = data_store
129 def get_cachedir(self):
130 if self.cachedir is None:
131 self.cachedir = tempfile.mkdtemp(".lbs")
135 if self.cachedir is not None:
136 # TODO: Avoid use of system, make this safer
137 os.system("rm -rf " + self.cachedir)
141 def parse_ref(refstr):
142 m = re.match(r"^zero\[(\d+)\]$", refstr)
144 return ("zero", None, None, (0, int(m.group(1))))
146 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
151 checksum = m.group(3)
154 if checksum is not None:
155 checksum = checksum.lstrip("(").rstrip(")")
157 if slice is not None:
158 if m.group(9) is not None:
159 # Size-assertion slice
160 slice = (0, int(m.group(9)), True)
161 elif m.group(6) is None:
163 slice = (0, int(m.group(8)), False)
165 slice = (int(m.group(7)), int(m.group(8)), False)
167 return (segment, object, checksum, slice)
169 def get_segment(self, segment):
170 accessed_segments.add(segment)
171 raw = self.store.lowlevel_open(segment + ".tar.gpg")
173 (input, output) = os.popen2("lbs-filter-gpg --decrypt")
174 def copy_thread(src, dst):
177 block = src.read(BLOCK_SIZE)
178 if len(block) == 0: break
182 thread.start_new_thread(copy_thread, (raw, input))
185 def load_segment(self, segment):
186 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
188 data_obj = seg.extractfile(item)
189 path = item.name.split('/')
190 if len(path) == 2 and path[0] == segment:
191 yield (path[1], data_obj.read())
193 def load_snapshot(self, snapshot):
194 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
195 return file.read().splitlines(True)
197 def extract_segment(self, segment):
198 segdir = os.path.join(self.get_cachedir(), segment)
200 for (object, data) in self.load_segment(segment):
201 f = open(os.path.join(segdir, object), 'wb')
205 def load_object(self, segment, object):
206 accessed_segments.add(segment)
207 path = os.path.join(self.get_cachedir(), segment, object)
208 if not os.access(path, os.R_OK):
209 self.extract_segment(segment)
210 if segment in self.lru_list: self.lru_list.remove(segment)
211 self.lru_list.append(segment)
212 while len(self.lru_list) > self.CACHE_SIZE:
213 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
214 self.lru_list = self.lru_list[1:]
215 return open(path, 'rb').read()
217 def get(self, refstr):
218 """Fetch the given object and return it.
220 The input should be an object reference, in string form.
223 (segment, object, checksum, slice) = self.parse_ref(refstr)
225 if segment == "zero":
226 return "\0" * slice[1]
228 data = self.load_object(segment, object)
230 if checksum is not None:
231 verifier = ChecksumVerifier(checksum)
232 verifier.update(data)
233 if not verifier.valid():
236 if slice is not None:
237 (start, length, exact) = slice
238 if exact and len(data) != length: raise ValueError
239 data = data[start:start+length]
240 if len(data) != length: raise IndexError
244 def parse(lines, terminate=None):
245 """Generic parser for RFC822-style "Key: Value" data streams.
247 This parser can be used to read metadata logs and snapshot root descriptor
250 lines must be an iterable object which yields a sequence of lines of input.
252 If terminate is specified, it is used as a predicate to determine when to
253 stop reading input lines.
260 # Strip off a trailing newline, if present
261 if len(l) > 0 and l[-1] == "\n":
264 if terminate is not None and terminate(l):
265 if len(dict) > 0: yield dict
270 m = re.match(r"^([-\w]+):\s*(.*)$", l)
272 dict[m.group(1)] = m.group(2)
273 last_key = m.group(1)
274 elif len(l) > 0 and l[0].isspace() and last_key is not None:
279 if len(dict) > 0: yield dict
281 def parse_full(lines):
283 return parse(lines).next()
284 except StopIteration:
287 def parse_metadata_version(s):
288 """Convert a string with the snapshot version format to a tuple."""
290 m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
294 return tuple([int(d) for d in m.group(1).split(".")])
296 def read_metadata(object_store, root):
297 """Iterate through all lines in the metadata log, following references."""
299 # Stack for keeping track of recursion when following references to
300 # portions of the log. The last entry in the stack corresponds to the
301 # object currently being parsed. Each entry is a list of lines which have
302 # been reversed, so that popping successive lines from the end of each list
303 # will return lines of the metadata log in order.
306 def follow_ref(refstr):
307 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
308 lines = object_store.get(refstr).splitlines(True)
314 while len(stack) > 0:
321 # An indirect reference which we must follow?
322 if len(line) > 0 and line[0] == '@':
330 """Metadata for a single file (or directory or...) from a snapshot."""
332 # Functions for parsing various datatypes that can appear in a metadata log
336 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
337 if s.startswith("0x"):
339 elif s.startswith("0"):
346 """Decode a URI-encoded (%xx escapes) string."""
347 def hex_decode(m): return chr(int(m.group(1), 16))
348 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
352 """An unecoded string."""
357 """Decode a user/group to a tuple of uid/gid followed by name."""
359 uid = MetadataItem.decode_int(items[0])
362 if items[1].startswith("(") and items[1].endswith(")"):
363 name = MetadataItem.decode_str(items[1][1:-1])
367 def decode_device(s):
368 """Decode a device major/minor number."""
369 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
370 return (major, minor)
374 def __init__(self, fields, object_store):
375 """Initialize from a dictionary of key/value pairs from metadata log."""
378 self.object_store = object_store
380 self.items = self.Items()
381 for (k, v) in fields.items():
382 if k in self.field_types:
383 decoder = self.field_types[k]
384 setattr(self.items, k, decoder(v))
388 """Return an iterator for the data blocks that make up a file."""
390 # This traverses the list of blocks that make up a file, following
391 # indirect references. It is implemented in much the same way as
392 # read_metadata, so see that function for details of the technique.
394 objects = self.fields['data'].split()
398 def follow_ref(refstr):
399 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
400 objects = self.object_store.get(refstr).split()
402 stack.append(objects)
404 while len(stack) > 0:
411 # An indirect reference which we must follow?
412 if len(ref) > 0 and ref[0] == '@':
417 # Description of fields that might appear, and how they should be parsed.
418 MetadataItem.field_types = {
419 'name': MetadataItem.decode_str,
420 'type': MetadataItem.raw_str,
421 'mode': MetadataItem.decode_int,
422 'device': MetadataItem.decode_device,
423 'user': MetadataItem.decode_user,
424 'group': MetadataItem.decode_user,
425 'ctime': MetadataItem.decode_int,
426 'mtime': MetadataItem.decode_int,
427 'links': MetadataItem.decode_int,
428 'inode': MetadataItem.raw_str,
429 'checksum': MetadataItem.decode_str,
430 'size': MetadataItem.decode_int,
431 'contents': MetadataItem.decode_str,
432 'target': MetadataItem.decode_str,
435 def iterate_metadata(object_store, root):
436 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
437 yield MetadataItem(d, object_store)
440 """Access to the local database of snapshot contents and object checksums.
442 The local database is consulted when creating a snapshot to determine what
443 data can be re-used from old snapshots. Segment cleaning is performed by
444 manipulating the data in the local database; the local database also
445 includes enough data to guide the segment cleaning process.
448 def __init__(self, path, dbname="localdb.sqlite"):
449 self.db_connection = sqlite3.connect(path + "/" + dbname)
451 # Low-level database access. Use these methods when there isn't a
452 # higher-level interface available. Exception: do, however, remember to
453 # use the commit() method after making changes to make sure they are
454 # actually saved, even when going through higher-level interfaces.
456 "Commit any pending changes to the local database."
457 self.db_connection.commit()
460 "Roll back any pending changes to the local database."
461 self.db_connection.rollback()
464 "Return a DB-API cursor for directly accessing the local database."
465 return self.db_connection.cursor()
467 def list_schemes(self):
468 """Return the list of snapshots found in the local database.
470 The returned value is a list of tuples (id, scheme, name, time, intent).
474 cur.execute("select distinct scheme from snapshots")
475 schemes = [row[0] for row in cur.fetchall()]
479 def garbage_collect(self, scheme, intent=1.0):
480 """Delete entries from old snapshots from the database.
482 Only snapshots with the specified scheme name will be deleted. If
483 intent is given, it gives the intended next snapshot type, to determine
484 how aggressively to clean (for example, intent=7 could be used if the
485 next snapshot will be a weekly snapshot).
490 # Find the id of the last snapshot to be created. This is used for
491 # measuring time in a way: we record this value in each segment we
492 # expire on this run, and then on a future run can tell if there have
493 # been intervening backups made.
494 cur.execute("select max(snapshotid) from snapshots")
495 last_snapshotid = cur.fetchone()[0]
497 # Get the list of old snapshots for this scheme. Delete all the old
498 # ones. Rules for what to keep:
499 # - Always keep the most recent snapshot.
500 # - If snapshot X is younger than Y, and X has higher intent, then Y
502 cur.execute("""select snapshotid, name, intent,
503 julianday('now') - timestamp as age
504 from snapshots where scheme = ?
505 order by age""", (scheme,))
509 for (id, name, snap_intent, snap_age) in cur.fetchall():
511 if snap_intent < max_intent:
512 # Delete small-intent snapshots if there is a more recent
513 # large-intent snapshot.
515 elif snap_intent == intent:
516 # Delete previous snapshots with the specified intent level.
519 if can_delete and not first:
520 print "Delete snapshot %d (%s)" % (id, name)
521 cur.execute("delete from snapshots where snapshotid = ?",
524 max_intent = max(max_intent, snap_intent)
526 # Delete entries in the segments_used table which are for non-existent
528 cur.execute("""delete from segments_used
529 where snapshotid not in
530 (select snapshotid from snapshots)""")
532 # Find segments which contain no objects used by any current snapshots,
533 # and delete them from the segment table.
534 cur.execute("""delete from segments where segmentid not in
535 (select segmentid from segments_used)""")
537 # Delete unused objects in the block_index table. By "unused", we mean
538 # any object which was stored in a segment which has been deleted, and
539 # any object in a segment which was marked for cleaning and has had
540 # cleaning performed already (the expired time is less than the current
541 # largest snapshot id).
542 cur.execute("""delete from block_index
543 where segmentid not in (select segmentid from segments)
544 or segmentid in (select segmentid from segments
545 where expire_time < ?)""",
548 # Remove sub-block signatures for deleted objects.
549 cur.execute("""delete from subblock_signatures
551 (select blockid from block_index)""")
554 class SegmentInfo(Struct): pass
556 def get_segment_cleaning_list(self, age_boost=0.0):
557 """Return a list of all current segments with information for cleaning.
559 Return all segments which are currently known in the local database
560 (there might be other, older segments in the archive itself), and
561 return usage statistics for each to help decide which segments to
564 The returned list will be sorted by estimated cleaning benefit, with
565 segments that are best to clean at the start of the list.
567 If specified, the age_boost parameter (measured in days) will added to
568 the age of each segment, as a way of adjusting the benefit computation
569 before a long-lived snapshot is taken (for example, age_boost might be
570 set to 7 when cleaning prior to taking a weekly snapshot).
575 cur.execute("""select segmentid, used, size, mtime,
576 julianday('now') - mtime as age from segment_info
577 where expire_time is null""")
579 info = self.SegmentInfo()
581 info.used_bytes = row[1]
582 info.size_bytes = row[2]
584 info.age_days = row[4]
586 # If age is not available for whatever reason, treat it as 0.0.
587 if info.age_days is None:
590 # Benefit calculation: u is the estimated fraction of each segment
591 # which is utilized (bytes belonging to objects still in use
592 # divided by total size; this doesn't take compression or storage
593 # overhead into account, but should give a reasonable estimate).
595 # The total benefit is a heuristic that combines several factors:
596 # the amount of space that can be reclaimed (1 - u), an ageing
597 # factor (info.age_days) that favors cleaning old segments to young
598 # ones and also is more likely to clean segments that will be
599 # rewritten for long-lived snapshots (age_boost), and finally a
600 # penalty factor for the cost of re-uploading data (u + 0.1).
601 u = info.used_bytes / info.size_bytes
602 info.cleaning_benefit \
603 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
605 segments.append(info)
607 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
610 def mark_segment_expired(self, segment):
611 """Mark a segment for cleaning in the local database.
613 The segment parameter should be either a SegmentInfo object or an
614 integer segment id. Objects in the given segment will be marked as
615 expired, which means that any future snapshots that would re-use those
616 objects will instead write out a new copy of the object, and thus no
617 future snapshots will depend upon the given segment.
620 if isinstance(segment, int):
622 elif isinstance(segment, self.SegmentInfo):
625 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
628 cur.execute("select max(snapshotid) from snapshots")
629 last_snapshotid = cur.fetchone()[0]
630 cur.execute("update segments set expire_time = ? where segmentid = ?",
631 (last_snapshotid, id))
632 cur.execute("update block_index set expired = 0 where segmentid = ?",
635 def balance_expired_objects(self):
636 """Analyze expired objects in segments to be cleaned and group by age.
638 Update the block_index table of the local database to group expired
639 objects by age. The exact number of buckets and the cutoffs for each
640 are dynamically determined. Calling this function after marking
641 segments expired will help in the segment cleaning process, by ensuring
642 that when active objects from clean segments are rewritten, they will
643 be placed into new segments roughly grouped by age.
646 # The expired column of the block_index table is used when generating a
647 # new LBS snapshot. A null value indicates that an object may be
648 # re-used. Otherwise, an object must be written into a new segment if
649 # needed. Objects with distinct expired values will be written into
650 # distinct segments, to allow for some grouping by age. The value 0 is
651 # somewhat special in that it indicates any rewritten objects can be
652 # placed in the same segment as completely new objects; this can be
653 # used for very young objects which have been expired, or objects not
654 # expected to be encountered.
656 # In the balancing process, all objects which are not used in any
657 # current snapshots will have expired set to 0. Objects which have
658 # been seen will be sorted by age and will have expired values set to
659 # 0, 1, 2, and so on based on age (with younger objects being assigned
660 # lower values). The number of buckets and the age cutoffs is
661 # determined by looking at the distribution of block ages.
665 # Mark all expired objects with expired = 0; these objects will later
666 # have values set to indicate groupings of objects when repacking.
667 cur.execute("""update block_index set expired = 0
668 where expired is not null""")
670 # We will want to aim for at least one full segment for each bucket
671 # that we eventually create, but don't know how many bytes that should
672 # be due to compression. So compute the average number of bytes in
673 # each expired segment as a rough estimate for the minimum size of each
674 # bucket. (This estimate could be thrown off by many not-fully-packed
675 # segments, but for now don't worry too much about that.) If we can't
676 # compute an average, it's probably because there are no expired
677 # segments, so we have no more work to do.
678 cur.execute("""select avg(size) from segments
680 (select distinct segmentid from block_index
681 where expired is not null)""")
682 segment_size_estimate = cur.fetchone()[0]
683 if not segment_size_estimate:
686 # Next, extract distribution of expired objects (number and size) by
687 # age. Save the timestamp for "now" so that the classification of
688 # blocks into age buckets will not change later in the function, after
689 # time has passed. Set any timestamps in the future to now, so we are
690 # guaranteed that for the rest of this function, age is always
692 cur.execute("select julianday('now')")
693 now = cur.fetchone()[0]
695 cur.execute("""update block_index set timestamp = ?
696 where timestamp > ? and expired is not null""",
699 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
700 from block_index where expired = 0
701 group by age order by age""", (now,))
702 distribution = cur.fetchall()
704 # Start to determine the buckets for expired objects. Heuristics used:
705 # - An upper bound on the number of buckets is given by the number of
706 # segments we estimate it will take to store all data. In fact,
707 # aim for a couple of segments per bucket.
708 # - Place very young objects in bucket 0 (place with new objects)
709 # unless there are enough of them to warrant a separate bucket.
710 # - Try not to create unnecessarily many buckets, since fewer buckets
711 # will allow repacked data to be grouped based on spatial locality
712 # (while more buckets will group by temporal locality). We want a
715 total_bytes = sum([i[2] for i in distribution])
716 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
717 min_size = 1.5 * segment_size_estimate
718 target_size = max(2 * segment_size_estimate,
719 total_bytes / target_buckets)
721 print "segment_size:", segment_size_estimate
722 print "distribution:", distribution
723 print "total_bytes:", total_bytes
724 print "target_buckets:", target_buckets
725 print "min, target size:", min_size, target_size
727 # Chosen cutoffs. Each bucket consists of objects with age greater
728 # than one cutoff value, but not greater than the next largest cutoff.
731 # Starting with the oldest objects, begin grouping together into
732 # buckets of size at least target_size bytes.
733 distribution.reverse()
735 min_age_bucket = False
736 for (age, items, size) in distribution:
737 if bucket_size >= target_size \
738 or (age < MIN_AGE and not min_age_bucket):
739 if bucket_size < target_size and len(cutoffs) > 0:
746 min_age_bucket = True
748 # The last (youngest) bucket will be group 0, unless it has enough data
749 # to be of size min_size by itself, or there happen to be no objects
750 # less than MIN_AGE at all.
751 if bucket_size >= min_size or not min_age_bucket:
755 print "cutoffs:", cutoffs
757 # Update the database to assign each object to the appropriate bucket.
759 for i in range(len(cutoffs)):
760 cur.execute("""update block_index set expired = ?
761 where round(? - timestamp) > ?
762 and expired is not null""",
763 (i, now, cutoffs[i]))