1 """High-level interface for working with LBS archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 # Maximum number of nested indirect references allowed in a snapshot.
16 MAX_RECURSION_DEPTH = 3
19 """A class which merely acts as a data container.
21 Instances of this class (or its subclasses) are merely used to store data
22 in various attributes. No methods are provided.
26 return "<%s %s>" % (self.__class__, self.__dict__)
28 CHECKSUM_ALGORITHMS = {
32 class ChecksumCreator:
33 """Compute an LBS checksum for provided data.
35 The algorithm used is selectable, but currently defaults to sha1.
38 def __init__(self, algorithm='sha1'):
39 self.algorithm = algorithm
40 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
42 def update(self, data):
43 self.hash.update(data)
47 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
49 class ChecksumVerifier:
50 """Verify whether a checksum from a snapshot matches the supplied data."""
52 def __init__(self, checksumstr):
53 """Create an object to check the supplied checksum."""
55 (algo, checksum) = checksumstr.split("=", 1)
56 self.checksum = checksum
57 self.hash = CHECKSUM_ALGORITHMS[algo]()
59 def update(self, data):
60 self.hash.update(data)
63 """Return a boolean indicating whether the checksum matches."""
65 result = self.hash.hexdigest()
66 return result == self.checksum
68 class LowlevelDataStore:
69 """Access to the backup store containing segments and snapshot descriptors.
71 Instances of this class are used to get direct filesystem-level access to
72 the backup data. To read a backup, a caller will ordinarily not care about
73 direct access to backup segments, but will instead merely need to access
74 objects from those segments. The ObjectStore class provides a suitable
75 wrapper around a DataStore to give this high-level access.
78 def __init__(self, path):
81 # Low-level filesystem access. These methods could be overwritten to
82 # provide access to remote data stores.
83 def lowlevel_list(self):
84 """Get a listing of files stored."""
86 return os.listdir(self.path)
88 def lowlevel_open(self, filename):
89 """Return a file-like object for reading data from the given file."""
91 return open(os.path.join(self.path, filename), 'rb')
93 def lowlevel_stat(self, filename):
94 """Return a dictionary of information about the given file.
96 Currently, the only defined field is 'size', giving the size of the
100 stat = os.stat(os.path.join(self.path, filename))
101 return {'size': stat.st_size}
103 # Slightly higher-level list methods.
104 def list_snapshots(self):
105 for f in self.lowlevel_list():
106 m = re.match(r"^snapshot-(.*)\.lbs$", f)
110 def list_segments(self):
111 for f in self.lowlevel_list():
112 m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
117 def __init__(self, data_store):
118 self.store = data_store
123 def get_cachedir(self):
124 if self.cachedir is None:
125 self.cachedir = tempfile.mkdtemp(".lbs")
129 if self.cachedir is not None:
130 # TODO: Avoid use of system, make this safer
131 os.system("rm -rf " + self.cachedir)
135 def parse_ref(refstr):
136 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
141 checksum = m.group(3)
144 if checksum is not None:
145 checksum = checksum.lstrip("(").rstrip(")")
147 if slice is not None:
148 slice = (int(m.group(5)), int(m.group(6)))
150 return (segment, object, checksum, slice)
152 def get_segment(self, segment):
153 raw = self.store.lowlevel_open(segment + ".tar.gpg")
155 (input, output) = os.popen2("lbs-filter-gpg --decrypt")
156 def copy_thread(src, dst):
159 block = src.read(BLOCK_SIZE)
160 if len(block) == 0: break
164 thread.start_new_thread(copy_thread, (raw, input))
167 def load_segment(self, segment):
168 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
170 data_obj = seg.extractfile(item)
171 path = item.name.split('/')
172 if len(path) == 2 and path[0] == segment:
173 yield (path[1], data_obj.read())
175 def load_snapshot(self, snapshot):
176 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
177 return file.read().splitlines(True)
179 def extract_segment(self, segment):
180 segdir = os.path.join(self.get_cachedir(), segment)
182 for (object, data) in self.load_segment(segment):
183 f = open(os.path.join(segdir, object), 'wb')
187 def load_object(self, segment, object):
188 path = os.path.join(self.get_cachedir(), segment, object)
189 if not os.access(path, os.R_OK):
190 print "Extracting", segment
191 self.extract_segment(segment)
192 if segment in self.lru_list: self.lru_list.remove(segment)
193 self.lru_list.append(segment)
194 while len(self.lru_list) > self.CACHE_SIZE:
195 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
196 self.lru_list = self.lru_list[1:]
197 return open(path, 'rb').read()
199 def get(self, refstr):
200 """Fetch the given object and return it.
202 The input should be an object reference, in string form.
205 (segment, object, checksum, slice) = self.parse_ref(refstr)
207 data = self.load_object(segment, object)
209 if checksum is not None:
210 verifier = ChecksumVerifier(checksum)
211 verifier.update(data)
212 if not verifier.valid():
215 if slice is not None:
216 (start, length) = slice
217 data = data[start:start+length]
218 if len(data) != length: raise IndexError
222 def parse(lines, terminate=None):
223 """Generic parser for RFC822-style "Key: Value" data streams.
225 This parser can be used to read metadata logs and snapshot root descriptor
228 lines must be an iterable object which yields a sequence of lines of input.
230 If terminate is specified, it is used as a predicate to determine when to
231 stop reading input lines.
238 # Strip off a trailing newline, if present
239 if len(l) > 0 and l[-1] == "\n":
242 if terminate is not None and terminate(l):
243 if len(dict) > 0: yield dict
248 m = re.match(r"^(\w+):\s*(.*)$", l)
250 dict[m.group(1)] = m.group(2)
251 last_key = m.group(1)
252 elif len(l) > 0 and l[0].isspace() and last_key is not None:
257 if len(dict) > 0: yield dict
259 def parse_full(lines):
261 return parse(lines).next()
262 except StopIteration:
265 def read_metadata(object_store, root):
266 """Iterate through all lines in the metadata log, following references."""
268 # Stack for keeping track of recursion when following references to
269 # portions of the log. The last entry in the stack corresponds to the
270 # object currently being parsed. Each entry is a list of lines which have
271 # been reversed, so that popping successive lines from the end of each list
272 # will return lines of the metadata log in order.
275 def follow_ref(refstr):
276 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
277 lines = object_store.get(refstr).splitlines(True)
283 while len(stack) > 0:
290 # An indirect reference which we must follow?
291 if len(line) > 0 and line[0] == '@':
299 """Metadata for a single file (or directory or...) from a snapshot."""
301 # Functions for parsing various datatypes that can appear in a metadata log
305 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
306 if s.startswith("0x"):
308 elif s.startswith("0"):
315 """Decode a URI-encoded (%xx escapes) string."""
316 def hex_decode(m): return chr(int(m.group(1), 16))
317 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
321 """An unecoded string."""
326 """Decode a user/group to a tuple of uid/gid followed by name."""
328 uid = MetadataItem.decode_int(items[0])
331 if items[1].startswith("(") and items[1].endswith(")"):
332 name = MetadataItem.decode_str(items[1][1:-1])
336 def decode_device(s):
337 """Decode a device major/minor number."""
338 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
339 return (major, minor)
343 def __init__(self, fields, object_store):
344 """Initialize from a dictionary of key/value pairs from metadata log."""
347 self.object_store = object_store
349 self.items = self.Items()
350 for (k, v) in fields.items():
351 if k in self.field_types:
352 decoder = self.field_types[k]
353 setattr(self.items, k, decoder(v))
357 """Return an iterator for the data blocks that make up a file."""
359 # This traverses the list of blocks that make up a file, following
360 # indirect references. It is implemented in much the same way as
361 # read_metadata, so see that function for details of the technique.
363 objects = self.fields['data'].split()
367 def follow_ref(refstr):
368 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
369 objects = self.object_store.get(refstr).split()
371 stack.append(objects)
373 while len(stack) > 0:
380 # An indirect reference which we must follow?
381 if len(ref) > 0 and ref[0] == '@':
386 # Description of fields that might appear, and how they should be parsed.
387 MetadataItem.field_types = {
388 'name': MetadataItem.decode_str,
389 'type': MetadataItem.raw_str,
390 'mode': MetadataItem.decode_int,
391 'device': MetadataItem.decode_device,
392 'user': MetadataItem.decode_user,
393 'group': MetadataItem.decode_user,
394 'mtime': MetadataItem.decode_int,
395 'links': MetadataItem.decode_int,
396 'inode': MetadataItem.raw_str,
397 'checksum': MetadataItem.decode_str,
398 'size': MetadataItem.decode_int,
399 'contents': MetadataItem.decode_str,
402 def iterate_metadata(object_store, root):
403 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
404 yield MetadataItem(d, object_store)
407 """Access to the local database of snapshot contents and object checksums.
409 The local database is consulted when creating a snapshot to determine what
410 data can be re-used from old snapshots. Segment cleaning is performed by
411 manipulating the data in the local database; the local database also
412 includes enough data to guide the segment cleaning process.
415 def __init__(self, path, dbname="localdb.sqlite"):
416 self.db_connection = sqlite3.connect(path + "/" + dbname)
418 # Low-level database access. Use these methods when there isn't a
419 # higher-level interface available. Exception: do, however, remember to
420 # use the commit() method after making changes to make sure they are
421 # actually saved, even when going through higher-level interfaces.
423 "Commit any pending changes to the local database."
424 self.db_connection.commit()
427 "Roll back any pending changes to the local database."
428 self.db_connection.rollback()
431 "Return a DB-API cursor for directly accessing the local database."
432 return self.db_connection.cursor()
434 def garbage_collect(self):
435 """Delete entries from old snapshots from the database."""
439 # Delete old snapshots.
440 cur.execute("""delete from snapshots
441 where snapshotid < (select max(snapshotid)
444 # Delete entries in the snapshot_contents table which are for
445 # non-existent snapshots.
446 cur.execute("""delete from snapshot_contents
447 where snapshotid not in
448 (select snapshotid from snapshots)""")
450 # Find segments which contain no objects used by any current snapshots,
451 # and delete them from the segment table.
452 cur.execute("""delete from segments where segmentid not in
453 (select distinct segmentid from snapshot_contents
454 natural join block_index)""")
456 # Finally, delete objects contained in non-existent segments. We can't
457 # simply delete unused objects, since we use the set of unused objects
458 # to determine the used/free ratio of segments.
459 cur.execute("""delete from block_index
460 where segmentid not in
461 (select segmentid from segments)""")
464 class SegmentInfo(Struct): pass
466 def get_segment_cleaning_list(self, age_boost=0.0):
467 """Return a list of all current segments with information for cleaning.
469 Return all segments which are currently known in the local database
470 (there might be other, older segments in the archive itself), and
471 return usage statistics for each to help decide which segments to
474 The returned list will be sorted by estimated cleaning benefit, with
475 segments that are best to clean at the start of the list.
477 If specified, the age_boost parameter (measured in days) will added to
478 the age of each segment, as a way of adjusting the benefit computation
479 before a long-lived snapshot is taken (for example, age_boost might be
480 set to 7 when cleaning prior to taking a weekly snapshot).
485 cur.execute("""select segmentid, used, size, mtime,
486 julianday('now') - mtime as age from segment_info""")
488 info = self.SegmentInfo()
490 info.used_bytes = row[1]
491 info.size_bytes = row[2]
493 info.age_days = row[4]
495 # Benefit calculation: u is the estimated fraction of each segment
496 # which is utilized (bytes belonging to objects still in use
497 # divided by total size; this doesn't take compression or storage
498 # overhead into account, but should give a reasonable estimate).
500 # The total benefit is a heuristic that combines several factors:
501 # the amount of space that can be reclaimed (1 - u), an ageing
502 # factor (info.age_days) that favors cleaning old segments to young
503 # ones and also is more likely to clean segments that will be
504 # rewritten for long-lived snapshots (age_boost), and finally a
505 # penalty factor for the cost of re-uploading data (u + 0.1).
506 u = info.used_bytes / info.size_bytes
507 info.cleaning_benefit \
508 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
510 segments.append(info)
512 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
515 def mark_segment_expired(self, segment):
516 """Mark a segment for cleaning in the local database.
518 The segment parameter should be either a SegmentInfo object or an
519 integer segment id. Objects in the given segment will be marked as
520 expired, which means that any future snapshots that would re-use those
521 objects will instead write out a new copy of the object, and thus no
522 future snapshots will depend upon the given segment.
525 if isinstance(segment, int):
527 elif isinstance(segment, self.SegmentInfo):
530 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
533 cur.execute("update block_index set expired = 1 where segmentid = ?",
536 def balance_expired_objects(self):
537 """Analyze expired objects in segments to be cleaned and group by age.
539 Update the block_index table of the local database to group expired
540 objects by age. The exact number of buckets and the cutoffs for each
541 are dynamically determined. Calling this function after marking
542 segments expired will help in the segment cleaning process, by ensuring
543 that when active objects from clean segments are rewritten, they will
544 be placed into new segments roughly grouped by age.
547 # The expired column of the block_index table is used when generating a
548 # new LBS snapshot. A null value indicates that an object may be
549 # re-used. Otherwise, an object must be written into a new segment if
550 # needed. Objects with distinct expired values will be written into
551 # distinct segments, to allow for some grouping by age. The value 0 is
552 # somewhat special in that it indicates any rewritten objects can be
553 # placed in the same segment as completely new objects; this can be
554 # used for very young objects which have been expired, or objects not
555 # expected to be encountered.
557 # In the balancing process, all objects which are not used in any
558 # current snapshots will have expired set to 0. Objects which have
559 # been seen will be sorted by age and will have expired values set to
560 # 0, 1, 2, and so on based on age (with younger objects being assigned
561 # lower values). The number of buckets and the age cutoffs is
562 # determined by looking at the distribution of block ages.
566 # First step: Mark all unused-and-expired objects with expired = -1,
567 # which will cause us to mostly ignore these objects when rebalancing.
568 # At the end, we will set these objects to be in group expired = 0.
569 # Mark expired objects which still seem to be in use with expired = 0;
570 # these objects will later have values set to indicate groupings of
571 # objects when repacking.
572 cur.execute("""update block_index set expired = -1
573 where expired is not null""")
575 cur.execute("""update block_index set expired = 0
576 where expired is not null and blockid in
577 (select blockid from snapshot_contents)""")
579 # We will want to aim for at least one full segment for each bucket
580 # that we eventually create, but don't know how many bytes that should
581 # be due to compression. So compute the average number of bytes in
582 # each expired segment as a rough estimate for the minimum size of each
583 # bucket. (This estimate could be thrown off by many not-fully-packed
584 # segments, but for now don't worry too much about that.) If we can't
585 # compute an average, it's probably because there are no expired
586 # segments, so we have no more work to do.
587 cur.execute("""select avg(size) from segment_info
589 (select distinct segmentid from block_index
590 where expired is not null)""")
591 segment_size_estimate = cur.fetchone()[0]
592 if not segment_size_estimate:
595 # Next, extract distribution of expired objects (number and size) by
596 # age. Save the timestamp for "now" so that the classification of
597 # blocks into age buckets will not change later in the function, after
598 # time has passed. Set any timestamps in the future to now, so we are
599 # guaranteed that for the rest of this function, age is always
601 cur.execute("select julianday('now')")
602 now = cur.fetchone()[0]
604 cur.execute("""update block_index set timestamp = ?
605 where timestamp > ? and expired is not null""",
608 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
609 from block_index where expired = 0
610 group by age order by age""", (now,))
611 distribution = cur.fetchall()
613 # Start to determine the buckets for expired objects. Heuristics used:
614 # - An upper bound on the number of buckets is given by the number of
615 # segments we estimate it will take to store all data. In fact,
616 # aim for a couple of segments per bucket.
617 # - Place very young objects in bucket 0 (place with new objects)
618 # unless there are enough of them to warrant a separate bucket.
619 # - Try not to create unnecessarily many buckets, since fewer buckets
620 # will allow repacked data to be grouped based on spatial locality
621 # (while more buckets will group by temporal locality). We want a
624 total_bytes = sum([i[2] for i in distribution])
625 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
626 min_size = 1.5 * segment_size_estimate
627 target_size = max(2 * segment_size_estimate,
628 total_bytes / target_buckets)
630 print "segment_size:", segment_size_estimate
631 print "distribution:", distribution
632 print "total_bytes:", total_bytes
633 print "target_buckets:", target_buckets
634 print "min, target size:", min_size, target_size
636 # Chosen cutoffs. Each bucket consists of objects with age greater
637 # than one cutoff value, but not greater than the next largest cutoff.
640 # Starting with the oldest objects, begin grouping together into
641 # buckets of size at least target_size bytes.
642 distribution.reverse()
644 min_age_bucket = False
645 for (age, items, size) in distribution:
646 if bucket_size >= target_size \
647 or (age < MIN_AGE and not min_age_bucket):
648 if bucket_size < target_size and len(cutoffs) > 0:
655 min_age_bucket = True
657 # The last (youngest) bucket will be group 0, unless it has enough data
658 # to be of size min_size by itself, or there happen to be no objects
659 # less than MIN_AGE at all.
660 if bucket_size >= min_size or not min_age_bucket:
664 print "cutoffs:", cutoffs
666 # Update the database to assign each object to the appropriate bucket.
668 for i in range(len(cutoffs)):
669 cur.execute("""update block_index set expired = ?
670 where round(? - timestamp) > ? and expired >= 0""",
671 (i, now, cutoffs[i]))
672 cur.execute("update block_index set expired = 0 where expired = -1")