1 """High-level interface for working with LBS archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 # The largest supported snapshot format that can be understood.
16 FORMAT_VERSION = (0, 2) # LBS Snapshot v0.2
18 # Maximum number of nested indirect references allowed in a snapshot.
19 MAX_RECURSION_DEPTH = 3
22 """A class which merely acts as a data container.
24 Instances of this class (or its subclasses) are merely used to store data
25 in various attributes. No methods are provided.
29 return "<%s %s>" % (self.__class__, self.__dict__)
31 CHECKSUM_ALGORITHMS = {
35 class ChecksumCreator:
36 """Compute an LBS checksum for provided data.
38 The algorithm used is selectable, but currently defaults to sha1.
41 def __init__(self, algorithm='sha1'):
42 self.algorithm = algorithm
43 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
45 def update(self, data):
46 self.hash.update(data)
50 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
52 class ChecksumVerifier:
53 """Verify whether a checksum from a snapshot matches the supplied data."""
55 def __init__(self, checksumstr):
56 """Create an object to check the supplied checksum."""
58 (algo, checksum) = checksumstr.split("=", 1)
59 self.checksum = checksum
60 self.hash = CHECKSUM_ALGORITHMS[algo]()
62 def update(self, data):
63 self.hash.update(data)
66 """Return a boolean indicating whether the checksum matches."""
68 result = self.hash.hexdigest()
69 return result == self.checksum
71 class LowlevelDataStore:
72 """Access to the backup store containing segments and snapshot descriptors.
74 Instances of this class are used to get direct filesystem-level access to
75 the backup data. To read a backup, a caller will ordinarily not care about
76 direct access to backup segments, but will instead merely need to access
77 objects from those segments. The ObjectStore class provides a suitable
78 wrapper around a DataStore to give this high-level access.
81 def __init__(self, path):
84 # Low-level filesystem access. These methods could be overwritten to
85 # provide access to remote data stores.
86 def lowlevel_list(self):
87 """Get a listing of files stored."""
89 return os.listdir(self.path)
91 def lowlevel_open(self, filename):
92 """Return a file-like object for reading data from the given file."""
94 return open(os.path.join(self.path, filename), 'rb')
96 def lowlevel_stat(self, filename):
97 """Return a dictionary of information about the given file.
99 Currently, the only defined field is 'size', giving the size of the
103 stat = os.stat(os.path.join(self.path, filename))
104 return {'size': stat.st_size}
106 # Slightly higher-level list methods.
107 def list_snapshots(self):
108 for f in self.lowlevel_list():
109 m = re.match(r"^snapshot-(.*)\.lbs$", f)
113 def list_segments(self):
114 for f in self.lowlevel_list():
115 m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
120 def __init__(self, data_store):
121 self.store = data_store
126 def get_cachedir(self):
127 if self.cachedir is None:
128 self.cachedir = tempfile.mkdtemp(".lbs")
132 if self.cachedir is not None:
133 # TODO: Avoid use of system, make this safer
134 os.system("rm -rf " + self.cachedir)
138 def parse_ref(refstr):
139 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
144 checksum = m.group(3)
147 if checksum is not None:
148 checksum = checksum.lstrip("(").rstrip(")")
150 if slice is not None:
151 slice = (int(m.group(5)), int(m.group(6)))
153 return (segment, object, checksum, slice)
155 def get_segment(self, segment):
156 raw = self.store.lowlevel_open(segment + ".tar.gpg")
158 (input, output) = os.popen2("lbs-filter-gpg --decrypt")
159 def copy_thread(src, dst):
162 block = src.read(BLOCK_SIZE)
163 if len(block) == 0: break
167 thread.start_new_thread(copy_thread, (raw, input))
170 def load_segment(self, segment):
171 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
173 data_obj = seg.extractfile(item)
174 path = item.name.split('/')
175 if len(path) == 2 and path[0] == segment:
176 yield (path[1], data_obj.read())
178 def load_snapshot(self, snapshot):
179 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
180 return file.read().splitlines(True)
182 def extract_segment(self, segment):
183 segdir = os.path.join(self.get_cachedir(), segment)
185 for (object, data) in self.load_segment(segment):
186 f = open(os.path.join(segdir, object), 'wb')
190 def load_object(self, segment, object):
191 path = os.path.join(self.get_cachedir(), segment, object)
192 if not os.access(path, os.R_OK):
193 self.extract_segment(segment)
194 if segment in self.lru_list: self.lru_list.remove(segment)
195 self.lru_list.append(segment)
196 while len(self.lru_list) > self.CACHE_SIZE:
197 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
198 self.lru_list = self.lru_list[1:]
199 return open(path, 'rb').read()
201 def get(self, refstr):
202 """Fetch the given object and return it.
204 The input should be an object reference, in string form.
207 (segment, object, checksum, slice) = self.parse_ref(refstr)
209 data = self.load_object(segment, object)
211 if checksum is not None:
212 verifier = ChecksumVerifier(checksum)
213 verifier.update(data)
214 if not verifier.valid():
217 if slice is not None:
218 (start, length) = slice
219 data = data[start:start+length]
220 if len(data) != length: raise IndexError
224 def parse(lines, terminate=None):
225 """Generic parser for RFC822-style "Key: Value" data streams.
227 This parser can be used to read metadata logs and snapshot root descriptor
230 lines must be an iterable object which yields a sequence of lines of input.
232 If terminate is specified, it is used as a predicate to determine when to
233 stop reading input lines.
240 # Strip off a trailing newline, if present
241 if len(l) > 0 and l[-1] == "\n":
244 if terminate is not None and terminate(l):
245 if len(dict) > 0: yield dict
250 m = re.match(r"^(\w+):\s*(.*)$", l)
252 dict[m.group(1)] = m.group(2)
253 last_key = m.group(1)
254 elif len(l) > 0 and l[0].isspace() and last_key is not None:
259 if len(dict) > 0: yield dict
261 def parse_full(lines):
263 return parse(lines).next()
264 except StopIteration:
267 def parse_metadata_version(s):
268 """Convert a string with the snapshot version format to a tuple."""
270 m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
274 return tuple([int(d) for d in m.group(1).split(".")])
276 def read_metadata(object_store, root):
277 """Iterate through all lines in the metadata log, following references."""
279 # Stack for keeping track of recursion when following references to
280 # portions of the log. The last entry in the stack corresponds to the
281 # object currently being parsed. Each entry is a list of lines which have
282 # been reversed, so that popping successive lines from the end of each list
283 # will return lines of the metadata log in order.
286 def follow_ref(refstr):
287 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
288 lines = object_store.get(refstr).splitlines(True)
294 while len(stack) > 0:
301 # An indirect reference which we must follow?
302 if len(line) > 0 and line[0] == '@':
310 """Metadata for a single file (or directory or...) from a snapshot."""
312 # Functions for parsing various datatypes that can appear in a metadata log
316 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
317 if s.startswith("0x"):
319 elif s.startswith("0"):
326 """Decode a URI-encoded (%xx escapes) string."""
327 def hex_decode(m): return chr(int(m.group(1), 16))
328 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
332 """An unecoded string."""
337 """Decode a user/group to a tuple of uid/gid followed by name."""
339 uid = MetadataItem.decode_int(items[0])
342 if items[1].startswith("(") and items[1].endswith(")"):
343 name = MetadataItem.decode_str(items[1][1:-1])
347 def decode_device(s):
348 """Decode a device major/minor number."""
349 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
350 return (major, minor)
354 def __init__(self, fields, object_store):
355 """Initialize from a dictionary of key/value pairs from metadata log."""
358 self.object_store = object_store
360 self.items = self.Items()
361 for (k, v) in fields.items():
362 if k in self.field_types:
363 decoder = self.field_types[k]
364 setattr(self.items, k, decoder(v))
368 """Return an iterator for the data blocks that make up a file."""
370 # This traverses the list of blocks that make up a file, following
371 # indirect references. It is implemented in much the same way as
372 # read_metadata, so see that function for details of the technique.
374 objects = self.fields['data'].split()
378 def follow_ref(refstr):
379 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
380 objects = self.object_store.get(refstr).split()
382 stack.append(objects)
384 while len(stack) > 0:
391 # An indirect reference which we must follow?
392 if len(ref) > 0 and ref[0] == '@':
397 # Description of fields that might appear, and how they should be parsed.
398 MetadataItem.field_types = {
399 'name': MetadataItem.decode_str,
400 'type': MetadataItem.raw_str,
401 'mode': MetadataItem.decode_int,
402 'device': MetadataItem.decode_device,
403 'user': MetadataItem.decode_user,
404 'group': MetadataItem.decode_user,
405 'mtime': MetadataItem.decode_int,
406 'links': MetadataItem.decode_int,
407 'inode': MetadataItem.raw_str,
408 'checksum': MetadataItem.decode_str,
409 'size': MetadataItem.decode_int,
410 'contents': MetadataItem.decode_str,
413 def iterate_metadata(object_store, root):
414 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
415 yield MetadataItem(d, object_store)
418 """Access to the local database of snapshot contents and object checksums.
420 The local database is consulted when creating a snapshot to determine what
421 data can be re-used from old snapshots. Segment cleaning is performed by
422 manipulating the data in the local database; the local database also
423 includes enough data to guide the segment cleaning process.
426 def __init__(self, path, dbname="localdb.sqlite"):
427 self.db_connection = sqlite3.connect(path + "/" + dbname)
429 # Low-level database access. Use these methods when there isn't a
430 # higher-level interface available. Exception: do, however, remember to
431 # use the commit() method after making changes to make sure they are
432 # actually saved, even when going through higher-level interfaces.
434 "Commit any pending changes to the local database."
435 self.db_connection.commit()
438 "Roll back any pending changes to the local database."
439 self.db_connection.rollback()
442 "Return a DB-API cursor for directly accessing the local database."
443 return self.db_connection.cursor()
445 def garbage_collect(self):
446 """Delete entries from old snapshots from the database."""
450 # Delete old snapshots.
451 cur.execute("""delete from snapshots
452 where snapshotid < (select max(snapshotid)
455 # Delete entries in the snapshot_contents table which are for
456 # non-existent snapshots.
457 cur.execute("""delete from snapshot_contents
458 where snapshotid not in
459 (select snapshotid from snapshots)""")
461 # Find segments which contain no objects used by any current snapshots,
462 # and delete them from the segment table.
463 cur.execute("""delete from segments where segmentid not in
464 (select distinct segmentid from snapshot_contents
465 natural join block_index)""")
467 # Finally, delete objects contained in non-existent segments. We can't
468 # simply delete unused objects, since we use the set of unused objects
469 # to determine the used/free ratio of segments.
470 cur.execute("""delete from block_index
471 where segmentid not in
472 (select segmentid from segments)""")
475 class SegmentInfo(Struct): pass
477 def get_segment_cleaning_list(self, age_boost=0.0):
478 """Return a list of all current segments with information for cleaning.
480 Return all segments which are currently known in the local database
481 (there might be other, older segments in the archive itself), and
482 return usage statistics for each to help decide which segments to
485 The returned list will be sorted by estimated cleaning benefit, with
486 segments that are best to clean at the start of the list.
488 If specified, the age_boost parameter (measured in days) will added to
489 the age of each segment, as a way of adjusting the benefit computation
490 before a long-lived snapshot is taken (for example, age_boost might be
491 set to 7 when cleaning prior to taking a weekly snapshot).
496 cur.execute("""select segmentid, used, size, mtime,
497 julianday('now') - mtime as age from segment_info""")
499 info = self.SegmentInfo()
501 info.used_bytes = row[1]
502 info.size_bytes = row[2]
504 info.age_days = row[4]
506 # Benefit calculation: u is the estimated fraction of each segment
507 # which is utilized (bytes belonging to objects still in use
508 # divided by total size; this doesn't take compression or storage
509 # overhead into account, but should give a reasonable estimate).
511 # The total benefit is a heuristic that combines several factors:
512 # the amount of space that can be reclaimed (1 - u), an ageing
513 # factor (info.age_days) that favors cleaning old segments to young
514 # ones and also is more likely to clean segments that will be
515 # rewritten for long-lived snapshots (age_boost), and finally a
516 # penalty factor for the cost of re-uploading data (u + 0.1).
517 u = info.used_bytes / info.size_bytes
518 info.cleaning_benefit \
519 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
521 segments.append(info)
523 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
526 def mark_segment_expired(self, segment):
527 """Mark a segment for cleaning in the local database.
529 The segment parameter should be either a SegmentInfo object or an
530 integer segment id. Objects in the given segment will be marked as
531 expired, which means that any future snapshots that would re-use those
532 objects will instead write out a new copy of the object, and thus no
533 future snapshots will depend upon the given segment.
536 if isinstance(segment, int):
538 elif isinstance(segment, self.SegmentInfo):
541 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
544 cur.execute("update block_index set expired = 1 where segmentid = ?",
547 def balance_expired_objects(self):
548 """Analyze expired objects in segments to be cleaned and group by age.
550 Update the block_index table of the local database to group expired
551 objects by age. The exact number of buckets and the cutoffs for each
552 are dynamically determined. Calling this function after marking
553 segments expired will help in the segment cleaning process, by ensuring
554 that when active objects from clean segments are rewritten, they will
555 be placed into new segments roughly grouped by age.
558 # The expired column of the block_index table is used when generating a
559 # new LBS snapshot. A null value indicates that an object may be
560 # re-used. Otherwise, an object must be written into a new segment if
561 # needed. Objects with distinct expired values will be written into
562 # distinct segments, to allow for some grouping by age. The value 0 is
563 # somewhat special in that it indicates any rewritten objects can be
564 # placed in the same segment as completely new objects; this can be
565 # used for very young objects which have been expired, or objects not
566 # expected to be encountered.
568 # In the balancing process, all objects which are not used in any
569 # current snapshots will have expired set to 0. Objects which have
570 # been seen will be sorted by age and will have expired values set to
571 # 0, 1, 2, and so on based on age (with younger objects being assigned
572 # lower values). The number of buckets and the age cutoffs is
573 # determined by looking at the distribution of block ages.
577 # First step: Mark all unused-and-expired objects with expired = -1,
578 # which will cause us to mostly ignore these objects when rebalancing.
579 # At the end, we will set these objects to be in group expired = 0.
580 # Mark expired objects which still seem to be in use with expired = 0;
581 # these objects will later have values set to indicate groupings of
582 # objects when repacking.
583 cur.execute("""update block_index set expired = -1
584 where expired is not null""")
586 cur.execute("""update block_index set expired = 0
587 where expired is not null and blockid in
588 (select blockid from snapshot_contents)""")
590 # We will want to aim for at least one full segment for each bucket
591 # that we eventually create, but don't know how many bytes that should
592 # be due to compression. So compute the average number of bytes in
593 # each expired segment as a rough estimate for the minimum size of each
594 # bucket. (This estimate could be thrown off by many not-fully-packed
595 # segments, but for now don't worry too much about that.) If we can't
596 # compute an average, it's probably because there are no expired
597 # segments, so we have no more work to do.
598 cur.execute("""select avg(size) from segment_info
600 (select distinct segmentid from block_index
601 where expired is not null)""")
602 segment_size_estimate = cur.fetchone()[0]
603 if not segment_size_estimate:
606 # Next, extract distribution of expired objects (number and size) by
607 # age. Save the timestamp for "now" so that the classification of
608 # blocks into age buckets will not change later in the function, after
609 # time has passed. Set any timestamps in the future to now, so we are
610 # guaranteed that for the rest of this function, age is always
612 cur.execute("select julianday('now')")
613 now = cur.fetchone()[0]
615 cur.execute("""update block_index set timestamp = ?
616 where timestamp > ? and expired is not null""",
619 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
620 from block_index where expired = 0
621 group by age order by age""", (now,))
622 distribution = cur.fetchall()
624 # Start to determine the buckets for expired objects. Heuristics used:
625 # - An upper bound on the number of buckets is given by the number of
626 # segments we estimate it will take to store all data. In fact,
627 # aim for a couple of segments per bucket.
628 # - Place very young objects in bucket 0 (place with new objects)
629 # unless there are enough of them to warrant a separate bucket.
630 # - Try not to create unnecessarily many buckets, since fewer buckets
631 # will allow repacked data to be grouped based on spatial locality
632 # (while more buckets will group by temporal locality). We want a
635 total_bytes = sum([i[2] for i in distribution])
636 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
637 min_size = 1.5 * segment_size_estimate
638 target_size = max(2 * segment_size_estimate,
639 total_bytes / target_buckets)
641 print "segment_size:", segment_size_estimate
642 print "distribution:", distribution
643 print "total_bytes:", total_bytes
644 print "target_buckets:", target_buckets
645 print "min, target size:", min_size, target_size
647 # Chosen cutoffs. Each bucket consists of objects with age greater
648 # than one cutoff value, but not greater than the next largest cutoff.
651 # Starting with the oldest objects, begin grouping together into
652 # buckets of size at least target_size bytes.
653 distribution.reverse()
655 min_age_bucket = False
656 for (age, items, size) in distribution:
657 if bucket_size >= target_size \
658 or (age < MIN_AGE and not min_age_bucket):
659 if bucket_size < target_size and len(cutoffs) > 0:
666 min_age_bucket = True
668 # The last (youngest) bucket will be group 0, unless it has enough data
669 # to be of size min_size by itself, or there happen to be no objects
670 # less than MIN_AGE at all.
671 if bucket_size >= min_size or not min_age_bucket:
675 print "cutoffs:", cutoffs
677 # Update the database to assign each object to the appropriate bucket.
679 for i in range(len(cutoffs)):
680 cur.execute("""update block_index set expired = ?
681 where round(? - timestamp) > ? and expired >= 0""",
682 (i, now, cutoffs[i]))
683 cur.execute("update block_index set expired = 0 where expired = -1")