1 """High-level interface for working with LBS archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 import cumulus.store, cumulus.store.file
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 8) # LBS Snapshot v0.8
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
23 # All segments which have been accessed this session.
24 accessed_segments = set()
27 """A class which merely acts as a data container.
29 Instances of this class (or its subclasses) are merely used to store data
30 in various attributes. No methods are provided.
34 return "<%s %s>" % (self.__class__, self.__dict__)
36 CHECKSUM_ALGORITHMS = {
40 class ChecksumCreator:
41 """Compute an LBS checksum for provided data.
43 The algorithm used is selectable, but currently defaults to sha1.
46 def __init__(self, algorithm='sha1'):
47 self.algorithm = algorithm
48 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
50 def update(self, data):
51 self.hash.update(data)
55 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
57 class ChecksumVerifier:
58 """Verify whether a checksum from a snapshot matches the supplied data."""
60 def __init__(self, checksumstr):
61 """Create an object to check the supplied checksum."""
63 (algo, checksum) = checksumstr.split("=", 1)
64 self.checksum = checksum
65 self.hash = CHECKSUM_ALGORITHMS[algo]()
67 def update(self, data):
68 self.hash.update(data)
71 """Return a boolean indicating whether the checksum matches."""
73 result = self.hash.hexdigest()
74 return result == self.checksum
76 class LowlevelDataStore:
77 """Access to the backup store containing segments and snapshot descriptors.
79 Instances of this class are used to get direct filesystem-level access to
80 the backup data. To read a backup, a caller will ordinarily not care about
81 direct access to backup segments, but will instead merely need to access
82 objects from those segments. The ObjectStore class provides a suitable
83 wrapper around a DataStore to give this high-level access.
86 def __init__(self, path):
87 if path.find(":") >= 0:
88 self.store = cumulus.store.open(path)
90 self.store = cumulus.store.file.FileStore(path)
92 def _classify(self, filename):
93 for (t, r) in cumulus.store.type_patterns.items():
96 return (None, filename)
98 def lowlevel_open(self, filename):
99 """Return a file-like object for reading data from the given file."""
101 (type, filename) = self._classify(filename)
102 return self.store.get(type, filename)
104 def lowlevel_stat(self, filename):
105 """Return a dictionary of information about the given file.
107 Currently, the only defined field is 'size', giving the size of the
111 (type, filename) = self._classify(filename)
112 return self.store.stat(type, filename)
114 # Slightly higher-level list methods.
115 def list_snapshots(self):
116 for f in self.store.list('snapshots'):
117 m = cumulus.store.type_patterns['snapshots'].match(f)
118 if m: yield m.group(1)
120 def list_segments(self):
121 for f in self.store.list('segments'):
122 m = cumulus.store.type_patterns['segments'].match(f)
123 if m: yield m.group(1)
126 def __init__(self, data_store):
127 self.store = data_store
132 def get_cachedir(self):
133 if self.cachedir is None:
134 self.cachedir = tempfile.mkdtemp(".lbs")
138 if self.cachedir is not None:
139 # TODO: Avoid use of system, make this safer
140 os.system("rm -rf " + self.cachedir)
144 def parse_ref(refstr):
145 m = re.match(r"^zero\[(\d+)\]$", refstr)
147 return ("zero", None, None, (0, int(m.group(1))))
149 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
154 checksum = m.group(3)
157 if checksum is not None:
158 checksum = checksum.lstrip("(").rstrip(")")
160 if slice is not None:
161 if m.group(9) is not None:
162 # Size-assertion slice
163 slice = (0, int(m.group(9)), True)
164 elif m.group(6) is None:
166 slice = (0, int(m.group(8)), False)
168 slice = (int(m.group(7)), int(m.group(8)), False)
170 return (segment, object, checksum, slice)
172 def get_segment(self, segment):
173 accessed_segments.add(segment)
174 raw = self.store.lowlevel_open(segment + ".tar.gpg")
176 (input, output) = os.popen2("lbs-filter-gpg --decrypt")
177 def copy_thread(src, dst):
180 block = src.read(BLOCK_SIZE)
181 if len(block) == 0: break
185 thread.start_new_thread(copy_thread, (raw, input))
188 def load_segment(self, segment):
189 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
191 data_obj = seg.extractfile(item)
192 path = item.name.split('/')
193 if len(path) == 2 and path[0] == segment:
194 yield (path[1], data_obj.read())
196 def load_snapshot(self, snapshot):
197 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
198 return file.read().splitlines(True)
200 def extract_segment(self, segment):
201 segdir = os.path.join(self.get_cachedir(), segment)
203 for (object, data) in self.load_segment(segment):
204 f = open(os.path.join(segdir, object), 'wb')
208 def load_object(self, segment, object):
209 accessed_segments.add(segment)
210 path = os.path.join(self.get_cachedir(), segment, object)
211 if not os.access(path, os.R_OK):
212 self.extract_segment(segment)
213 if segment in self.lru_list: self.lru_list.remove(segment)
214 self.lru_list.append(segment)
215 while len(self.lru_list) > self.CACHE_SIZE:
216 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
217 self.lru_list = self.lru_list[1:]
218 return open(path, 'rb').read()
220 def get(self, refstr):
221 """Fetch the given object and return it.
223 The input should be an object reference, in string form.
226 (segment, object, checksum, slice) = self.parse_ref(refstr)
228 if segment == "zero":
229 return "\0" * slice[1]
231 data = self.load_object(segment, object)
233 if checksum is not None:
234 verifier = ChecksumVerifier(checksum)
235 verifier.update(data)
236 if not verifier.valid():
239 if slice is not None:
240 (start, length, exact) = slice
241 if exact and len(data) != length: raise ValueError
242 data = data[start:start+length]
243 if len(data) != length: raise IndexError
247 def parse(lines, terminate=None):
248 """Generic parser for RFC822-style "Key: Value" data streams.
250 This parser can be used to read metadata logs and snapshot root descriptor
253 lines must be an iterable object which yields a sequence of lines of input.
255 If terminate is specified, it is used as a predicate to determine when to
256 stop reading input lines.
263 # Strip off a trailing newline, if present
264 if len(l) > 0 and l[-1] == "\n":
267 if terminate is not None and terminate(l):
268 if len(dict) > 0: yield dict
273 m = re.match(r"^([-\w]+):\s*(.*)$", l)
275 dict[m.group(1)] = m.group(2)
276 last_key = m.group(1)
277 elif len(l) > 0 and l[0].isspace() and last_key is not None:
282 if len(dict) > 0: yield dict
284 def parse_full(lines):
286 return parse(lines).next()
287 except StopIteration:
290 def parse_metadata_version(s):
291 """Convert a string with the snapshot version format to a tuple."""
293 m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
297 return tuple([int(d) for d in m.group(1).split(".")])
299 def read_metadata(object_store, root):
300 """Iterate through all lines in the metadata log, following references."""
302 # Stack for keeping track of recursion when following references to
303 # portions of the log. The last entry in the stack corresponds to the
304 # object currently being parsed. Each entry is a list of lines which have
305 # been reversed, so that popping successive lines from the end of each list
306 # will return lines of the metadata log in order.
309 def follow_ref(refstr):
310 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
311 lines = object_store.get(refstr).splitlines(True)
317 while len(stack) > 0:
324 # An indirect reference which we must follow?
325 if len(line) > 0 and line[0] == '@':
333 """Metadata for a single file (or directory or...) from a snapshot."""
335 # Functions for parsing various datatypes that can appear in a metadata log
339 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
340 if s.startswith("0x"):
342 elif s.startswith("0"):
349 """Decode a URI-encoded (%xx escapes) string."""
350 def hex_decode(m): return chr(int(m.group(1), 16))
351 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
355 """An unecoded string."""
360 """Decode a user/group to a tuple of uid/gid followed by name."""
362 uid = MetadataItem.decode_int(items[0])
365 if items[1].startswith("(") and items[1].endswith(")"):
366 name = MetadataItem.decode_str(items[1][1:-1])
370 def decode_device(s):
371 """Decode a device major/minor number."""
372 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
373 return (major, minor)
377 def __init__(self, fields, object_store):
378 """Initialize from a dictionary of key/value pairs from metadata log."""
381 self.object_store = object_store
383 self.items = self.Items()
384 for (k, v) in fields.items():
385 if k in self.field_types:
386 decoder = self.field_types[k]
387 setattr(self.items, k, decoder(v))
391 """Return an iterator for the data blocks that make up a file."""
393 # This traverses the list of blocks that make up a file, following
394 # indirect references. It is implemented in much the same way as
395 # read_metadata, so see that function for details of the technique.
397 objects = self.fields['data'].split()
401 def follow_ref(refstr):
402 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
403 objects = self.object_store.get(refstr).split()
405 stack.append(objects)
407 while len(stack) > 0:
414 # An indirect reference which we must follow?
415 if len(ref) > 0 and ref[0] == '@':
420 # Description of fields that might appear, and how they should be parsed.
421 MetadataItem.field_types = {
422 'name': MetadataItem.decode_str,
423 'type': MetadataItem.raw_str,
424 'mode': MetadataItem.decode_int,
425 'device': MetadataItem.decode_device,
426 'user': MetadataItem.decode_user,
427 'group': MetadataItem.decode_user,
428 'ctime': MetadataItem.decode_int,
429 'mtime': MetadataItem.decode_int,
430 'links': MetadataItem.decode_int,
431 'inode': MetadataItem.raw_str,
432 'checksum': MetadataItem.decode_str,
433 'size': MetadataItem.decode_int,
434 'contents': MetadataItem.decode_str,
435 'target': MetadataItem.decode_str,
438 def iterate_metadata(object_store, root):
439 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
440 yield MetadataItem(d, object_store)
443 """Access to the local database of snapshot contents and object checksums.
445 The local database is consulted when creating a snapshot to determine what
446 data can be re-used from old snapshots. Segment cleaning is performed by
447 manipulating the data in the local database; the local database also
448 includes enough data to guide the segment cleaning process.
451 def __init__(self, path, dbname="localdb.sqlite"):
452 self.db_connection = sqlite3.connect(path + "/" + dbname)
454 # Low-level database access. Use these methods when there isn't a
455 # higher-level interface available. Exception: do, however, remember to
456 # use the commit() method after making changes to make sure they are
457 # actually saved, even when going through higher-level interfaces.
459 "Commit any pending changes to the local database."
460 self.db_connection.commit()
463 "Roll back any pending changes to the local database."
464 self.db_connection.rollback()
467 "Return a DB-API cursor for directly accessing the local database."
468 return self.db_connection.cursor()
470 def list_schemes(self):
471 """Return the list of snapshots found in the local database.
473 The returned value is a list of tuples (id, scheme, name, time, intent).
477 cur.execute("select distinct scheme from snapshots")
478 schemes = [row[0] for row in cur.fetchall()]
482 def garbage_collect(self, scheme, intent=1.0):
483 """Delete entries from old snapshots from the database.
485 Only snapshots with the specified scheme name will be deleted. If
486 intent is given, it gives the intended next snapshot type, to determine
487 how aggressively to clean (for example, intent=7 could be used if the
488 next snapshot will be a weekly snapshot).
493 # Find the id of the last snapshot to be created. This is used for
494 # measuring time in a way: we record this value in each segment we
495 # expire on this run, and then on a future run can tell if there have
496 # been intervening backups made.
497 cur.execute("select max(snapshotid) from snapshots")
498 last_snapshotid = cur.fetchone()[0]
500 # Get the list of old snapshots for this scheme. Delete all the old
501 # ones. Rules for what to keep:
502 # - Always keep the most recent snapshot.
503 # - If snapshot X is younger than Y, and X has higher intent, then Y
505 cur.execute("""select snapshotid, name, intent,
506 julianday('now') - timestamp as age
507 from snapshots where scheme = ?
508 order by age""", (scheme,))
512 for (id, name, snap_intent, snap_age) in cur.fetchall():
514 if snap_intent < max_intent:
515 # Delete small-intent snapshots if there is a more recent
516 # large-intent snapshot.
518 elif snap_intent == intent:
519 # Delete previous snapshots with the specified intent level.
522 if can_delete and not first:
523 print "Delete snapshot %d (%s)" % (id, name)
524 cur.execute("delete from snapshots where snapshotid = ?",
527 max_intent = max(max_intent, snap_intent)
529 # Delete entries in the segments_used table which are for non-existent
531 cur.execute("""delete from segments_used
532 where snapshotid not in
533 (select snapshotid from snapshots)""")
535 # Find segments which contain no objects used by any current snapshots,
536 # and delete them from the segment table.
537 cur.execute("""delete from segments where segmentid not in
538 (select segmentid from segments_used)""")
540 # Delete unused objects in the block_index table. By "unused", we mean
541 # any object which was stored in a segment which has been deleted, and
542 # any object in a segment which was marked for cleaning and has had
543 # cleaning performed already (the expired time is less than the current
544 # largest snapshot id).
545 cur.execute("""delete from block_index
546 where segmentid not in (select segmentid from segments)
547 or segmentid in (select segmentid from segments
548 where expire_time < ?)""",
551 # Remove sub-block signatures for deleted objects.
552 cur.execute("""delete from subblock_signatures
554 (select blockid from block_index)""")
557 class SegmentInfo(Struct): pass
559 def get_segment_cleaning_list(self, age_boost=0.0):
560 """Return a list of all current segments with information for cleaning.
562 Return all segments which are currently known in the local database
563 (there might be other, older segments in the archive itself), and
564 return usage statistics for each to help decide which segments to
567 The returned list will be sorted by estimated cleaning benefit, with
568 segments that are best to clean at the start of the list.
570 If specified, the age_boost parameter (measured in days) will added to
571 the age of each segment, as a way of adjusting the benefit computation
572 before a long-lived snapshot is taken (for example, age_boost might be
573 set to 7 when cleaning prior to taking a weekly snapshot).
578 cur.execute("""select segmentid, used, size, mtime,
579 julianday('now') - mtime as age from segment_info
580 where expire_time is null""")
582 info = self.SegmentInfo()
584 info.used_bytes = row[1]
585 info.size_bytes = row[2]
587 info.age_days = row[4]
589 # If data is not available for whatever reason, treat it as 0.0.
590 if info.age_days is None:
592 if info.used_bytes is None:
593 info.used_bytes = 0.0
595 # Benefit calculation: u is the estimated fraction of each segment
596 # which is utilized (bytes belonging to objects still in use
597 # divided by total size; this doesn't take compression or storage
598 # overhead into account, but should give a reasonable estimate).
600 # The total benefit is a heuristic that combines several factors:
601 # the amount of space that can be reclaimed (1 - u), an ageing
602 # factor (info.age_days) that favors cleaning old segments to young
603 # ones and also is more likely to clean segments that will be
604 # rewritten for long-lived snapshots (age_boost), and finally a
605 # penalty factor for the cost of re-uploading data (u + 0.1).
606 u = info.used_bytes / info.size_bytes
607 info.cleaning_benefit \
608 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
610 segments.append(info)
612 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
615 def mark_segment_expired(self, segment):
616 """Mark a segment for cleaning in the local database.
618 The segment parameter should be either a SegmentInfo object or an
619 integer segment id. Objects in the given segment will be marked as
620 expired, which means that any future snapshots that would re-use those
621 objects will instead write out a new copy of the object, and thus no
622 future snapshots will depend upon the given segment.
625 if isinstance(segment, int):
627 elif isinstance(segment, self.SegmentInfo):
630 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
633 cur.execute("select max(snapshotid) from snapshots")
634 last_snapshotid = cur.fetchone()[0]
635 cur.execute("update segments set expire_time = ? where segmentid = ?",
636 (last_snapshotid, id))
637 cur.execute("update block_index set expired = 0 where segmentid = ?",
640 def balance_expired_objects(self):
641 """Analyze expired objects in segments to be cleaned and group by age.
643 Update the block_index table of the local database to group expired
644 objects by age. The exact number of buckets and the cutoffs for each
645 are dynamically determined. Calling this function after marking
646 segments expired will help in the segment cleaning process, by ensuring
647 that when active objects from clean segments are rewritten, they will
648 be placed into new segments roughly grouped by age.
651 # The expired column of the block_index table is used when generating a
652 # new LBS snapshot. A null value indicates that an object may be
653 # re-used. Otherwise, an object must be written into a new segment if
654 # needed. Objects with distinct expired values will be written into
655 # distinct segments, to allow for some grouping by age. The value 0 is
656 # somewhat special in that it indicates any rewritten objects can be
657 # placed in the same segment as completely new objects; this can be
658 # used for very young objects which have been expired, or objects not
659 # expected to be encountered.
661 # In the balancing process, all objects which are not used in any
662 # current snapshots will have expired set to 0. Objects which have
663 # been seen will be sorted by age and will have expired values set to
664 # 0, 1, 2, and so on based on age (with younger objects being assigned
665 # lower values). The number of buckets and the age cutoffs is
666 # determined by looking at the distribution of block ages.
670 # Mark all expired objects with expired = 0; these objects will later
671 # have values set to indicate groupings of objects when repacking.
672 cur.execute("""update block_index set expired = 0
673 where expired is not null""")
675 # We will want to aim for at least one full segment for each bucket
676 # that we eventually create, but don't know how many bytes that should
677 # be due to compression. So compute the average number of bytes in
678 # each expired segment as a rough estimate for the minimum size of each
679 # bucket. (This estimate could be thrown off by many not-fully-packed
680 # segments, but for now don't worry too much about that.) If we can't
681 # compute an average, it's probably because there are no expired
682 # segments, so we have no more work to do.
683 cur.execute("""select avg(size) from segments
685 (select distinct segmentid from block_index
686 where expired is not null)""")
687 segment_size_estimate = cur.fetchone()[0]
688 if not segment_size_estimate:
691 # Next, extract distribution of expired objects (number and size) by
692 # age. Save the timestamp for "now" so that the classification of
693 # blocks into age buckets will not change later in the function, after
694 # time has passed. Set any timestamps in the future to now, so we are
695 # guaranteed that for the rest of this function, age is always
697 cur.execute("select julianday('now')")
698 now = cur.fetchone()[0]
700 cur.execute("""update block_index set timestamp = ?
701 where timestamp > ? and expired is not null""",
704 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
705 from block_index where expired = 0
706 group by age order by age""", (now,))
707 distribution = cur.fetchall()
709 # Start to determine the buckets for expired objects. Heuristics used:
710 # - An upper bound on the number of buckets is given by the number of
711 # segments we estimate it will take to store all data. In fact,
712 # aim for a couple of segments per bucket.
713 # - Place very young objects in bucket 0 (place with new objects)
714 # unless there are enough of them to warrant a separate bucket.
715 # - Try not to create unnecessarily many buckets, since fewer buckets
716 # will allow repacked data to be grouped based on spatial locality
717 # (while more buckets will group by temporal locality). We want a
720 total_bytes = sum([i[2] for i in distribution])
721 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
722 min_size = 1.5 * segment_size_estimate
723 target_size = max(2 * segment_size_estimate,
724 total_bytes / target_buckets)
726 print "segment_size:", segment_size_estimate
727 print "distribution:", distribution
728 print "total_bytes:", total_bytes
729 print "target_buckets:", target_buckets
730 print "min, target size:", min_size, target_size
732 # Chosen cutoffs. Each bucket consists of objects with age greater
733 # than one cutoff value, but not greater than the next largest cutoff.
736 # Starting with the oldest objects, begin grouping together into
737 # buckets of size at least target_size bytes.
738 distribution.reverse()
740 min_age_bucket = False
741 for (age, items, size) in distribution:
742 if bucket_size >= target_size \
743 or (age < MIN_AGE and not min_age_bucket):
744 if bucket_size < target_size and len(cutoffs) > 0:
751 min_age_bucket = True
753 # The last (youngest) bucket will be group 0, unless it has enough data
754 # to be of size min_size by itself, or there happen to be no objects
755 # less than MIN_AGE at all.
756 if bucket_size >= min_size or not min_age_bucket:
760 print "cutoffs:", cutoffs
762 # Update the database to assign each object to the appropriate bucket.
764 for i in range(len(cutoffs)):
765 cur.execute("""update block_index set expired = ?
766 where round(? - timestamp) > ?
767 and expired is not null""",
768 (i, now, cutoffs[i]))