1 """High-level interface for working with LBS archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
15 # The largest supported snapshot format that can be understood.
16 FORMAT_VERSION = (0, 6) # LBS Snapshot v0.6
18 # Maximum number of nested indirect references allowed in a snapshot.
19 MAX_RECURSION_DEPTH = 3
21 # All segments which have been accessed this session.
22 accessed_segments = set()
25 """A class which merely acts as a data container.
27 Instances of this class (or its subclasses) are merely used to store data
28 in various attributes. No methods are provided.
32 return "<%s %s>" % (self.__class__, self.__dict__)
34 CHECKSUM_ALGORITHMS = {
38 class ChecksumCreator:
39 """Compute an LBS checksum for provided data.
41 The algorithm used is selectable, but currently defaults to sha1.
44 def __init__(self, algorithm='sha1'):
45 self.algorithm = algorithm
46 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
48 def update(self, data):
49 self.hash.update(data)
53 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
55 class ChecksumVerifier:
56 """Verify whether a checksum from a snapshot matches the supplied data."""
58 def __init__(self, checksumstr):
59 """Create an object to check the supplied checksum."""
61 (algo, checksum) = checksumstr.split("=", 1)
62 self.checksum = checksum
63 self.hash = CHECKSUM_ALGORITHMS[algo]()
65 def update(self, data):
66 self.hash.update(data)
69 """Return a boolean indicating whether the checksum matches."""
71 result = self.hash.hexdigest()
72 return result == self.checksum
74 class LowlevelDataStore:
75 """Access to the backup store containing segments and snapshot descriptors.
77 Instances of this class are used to get direct filesystem-level access to
78 the backup data. To read a backup, a caller will ordinarily not care about
79 direct access to backup segments, but will instead merely need to access
80 objects from those segments. The ObjectStore class provides a suitable
81 wrapper around a DataStore to give this high-level access.
84 def __init__(self, path):
87 # Low-level filesystem access. These methods could be overwritten to
88 # provide access to remote data stores.
89 def lowlevel_list(self):
90 """Get a listing of files stored."""
92 return os.listdir(self.path)
94 def lowlevel_open(self, filename):
95 """Return a file-like object for reading data from the given file."""
97 return open(os.path.join(self.path, filename), 'rb')
99 def lowlevel_stat(self, filename):
100 """Return a dictionary of information about the given file.
102 Currently, the only defined field is 'size', giving the size of the
106 stat = os.stat(os.path.join(self.path, filename))
107 return {'size': stat.st_size}
109 # Slightly higher-level list methods.
110 def list_snapshots(self):
111 for f in self.lowlevel_list():
112 m = re.match(r"^snapshot-(.*)\.lbs$", f)
116 def list_segments(self):
117 for f in self.lowlevel_list():
118 m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
123 def __init__(self, data_store):
124 self.store = data_store
129 def get_cachedir(self):
130 if self.cachedir is None:
131 self.cachedir = tempfile.mkdtemp(".lbs")
135 if self.cachedir is not None:
136 # TODO: Avoid use of system, make this safer
137 os.system("rm -rf " + self.cachedir)
141 def parse_ref(refstr):
142 m = re.match(r"^zero\[(\d+)\+(\d+)\]$", refstr)
144 return ("zero", None, None, (int(m.group(1)), int(m.group(2))))
146 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
151 checksum = m.group(3)
154 if checksum is not None:
155 checksum = checksum.lstrip("(").rstrip(")")
157 if slice is not None:
158 slice = (int(m.group(5)), int(m.group(6)))
160 return (segment, object, checksum, slice)
162 def get_segment(self, segment):
163 accessed_segments.add(segment)
164 raw = self.store.lowlevel_open(segment + ".tar.gpg")
166 (input, output) = os.popen2("lbs-filter-gpg --decrypt")
167 def copy_thread(src, dst):
170 block = src.read(BLOCK_SIZE)
171 if len(block) == 0: break
175 thread.start_new_thread(copy_thread, (raw, input))
178 def load_segment(self, segment):
179 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
181 data_obj = seg.extractfile(item)
182 path = item.name.split('/')
183 if len(path) == 2 and path[0] == segment:
184 yield (path[1], data_obj.read())
186 def load_snapshot(self, snapshot):
187 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
188 return file.read().splitlines(True)
190 def extract_segment(self, segment):
191 segdir = os.path.join(self.get_cachedir(), segment)
193 for (object, data) in self.load_segment(segment):
194 f = open(os.path.join(segdir, object), 'wb')
198 def load_object(self, segment, object):
199 accessed_segments.add(segment)
200 path = os.path.join(self.get_cachedir(), segment, object)
201 if not os.access(path, os.R_OK):
202 self.extract_segment(segment)
203 if segment in self.lru_list: self.lru_list.remove(segment)
204 self.lru_list.append(segment)
205 while len(self.lru_list) > self.CACHE_SIZE:
206 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
207 self.lru_list = self.lru_list[1:]
208 return open(path, 'rb').read()
210 def get(self, refstr):
211 """Fetch the given object and return it.
213 The input should be an object reference, in string form.
216 (segment, object, checksum, slice) = self.parse_ref(refstr)
218 if segment == "zero":
219 return "\0" * slice[1]
221 data = self.load_object(segment, object)
223 if checksum is not None:
224 verifier = ChecksumVerifier(checksum)
225 verifier.update(data)
226 if not verifier.valid():
229 if slice is not None:
230 (start, length) = slice
231 data = data[start:start+length]
232 if len(data) != length: raise IndexError
236 def parse(lines, terminate=None):
237 """Generic parser for RFC822-style "Key: Value" data streams.
239 This parser can be used to read metadata logs and snapshot root descriptor
242 lines must be an iterable object which yields a sequence of lines of input.
244 If terminate is specified, it is used as a predicate to determine when to
245 stop reading input lines.
252 # Strip off a trailing newline, if present
253 if len(l) > 0 and l[-1] == "\n":
256 if terminate is not None and terminate(l):
257 if len(dict) > 0: yield dict
262 m = re.match(r"^(\w+):\s*(.*)$", l)
264 dict[m.group(1)] = m.group(2)
265 last_key = m.group(1)
266 elif len(l) > 0 and l[0].isspace() and last_key is not None:
271 if len(dict) > 0: yield dict
273 def parse_full(lines):
275 return parse(lines).next()
276 except StopIteration:
279 def parse_metadata_version(s):
280 """Convert a string with the snapshot version format to a tuple."""
282 m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
286 return tuple([int(d) for d in m.group(1).split(".")])
288 def read_metadata(object_store, root):
289 """Iterate through all lines in the metadata log, following references."""
291 # Stack for keeping track of recursion when following references to
292 # portions of the log. The last entry in the stack corresponds to the
293 # object currently being parsed. Each entry is a list of lines which have
294 # been reversed, so that popping successive lines from the end of each list
295 # will return lines of the metadata log in order.
298 def follow_ref(refstr):
299 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
300 lines = object_store.get(refstr).splitlines(True)
306 while len(stack) > 0:
313 # An indirect reference which we must follow?
314 if len(line) > 0 and line[0] == '@':
322 """Metadata for a single file (or directory or...) from a snapshot."""
324 # Functions for parsing various datatypes that can appear in a metadata log
328 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
329 if s.startswith("0x"):
331 elif s.startswith("0"):
338 """Decode a URI-encoded (%xx escapes) string."""
339 def hex_decode(m): return chr(int(m.group(1), 16))
340 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
344 """An unecoded string."""
349 """Decode a user/group to a tuple of uid/gid followed by name."""
351 uid = MetadataItem.decode_int(items[0])
354 if items[1].startswith("(") and items[1].endswith(")"):
355 name = MetadataItem.decode_str(items[1][1:-1])
359 def decode_device(s):
360 """Decode a device major/minor number."""
361 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
362 return (major, minor)
366 def __init__(self, fields, object_store):
367 """Initialize from a dictionary of key/value pairs from metadata log."""
370 self.object_store = object_store
372 self.items = self.Items()
373 for (k, v) in fields.items():
374 if k in self.field_types:
375 decoder = self.field_types[k]
376 setattr(self.items, k, decoder(v))
380 """Return an iterator for the data blocks that make up a file."""
382 # This traverses the list of blocks that make up a file, following
383 # indirect references. It is implemented in much the same way as
384 # read_metadata, so see that function for details of the technique.
386 objects = self.fields['data'].split()
390 def follow_ref(refstr):
391 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
392 objects = self.object_store.get(refstr).split()
394 stack.append(objects)
396 while len(stack) > 0:
403 # An indirect reference which we must follow?
404 if len(ref) > 0 and ref[0] == '@':
409 # Description of fields that might appear, and how they should be parsed.
410 MetadataItem.field_types = {
411 'name': MetadataItem.decode_str,
412 'type': MetadataItem.raw_str,
413 'mode': MetadataItem.decode_int,
414 'device': MetadataItem.decode_device,
415 'user': MetadataItem.decode_user,
416 'group': MetadataItem.decode_user,
417 'ctime': MetadataItem.decode_int,
418 'mtime': MetadataItem.decode_int,
419 'links': MetadataItem.decode_int,
420 'inode': MetadataItem.raw_str,
421 'checksum': MetadataItem.decode_str,
422 'size': MetadataItem.decode_int,
423 'contents': MetadataItem.decode_str,
424 'target': MetadataItem.decode_str,
427 def iterate_metadata(object_store, root):
428 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
429 yield MetadataItem(d, object_store)
432 """Access to the local database of snapshot contents and object checksums.
434 The local database is consulted when creating a snapshot to determine what
435 data can be re-used from old snapshots. Segment cleaning is performed by
436 manipulating the data in the local database; the local database also
437 includes enough data to guide the segment cleaning process.
440 def __init__(self, path, dbname="localdb.sqlite"):
441 self.db_connection = sqlite3.connect(path + "/" + dbname)
443 # Low-level database access. Use these methods when there isn't a
444 # higher-level interface available. Exception: do, however, remember to
445 # use the commit() method after making changes to make sure they are
446 # actually saved, even when going through higher-level interfaces.
448 "Commit any pending changes to the local database."
449 self.db_connection.commit()
452 "Roll back any pending changes to the local database."
453 self.db_connection.rollback()
456 "Return a DB-API cursor for directly accessing the local database."
457 return self.db_connection.cursor()
459 def garbage_collect(self):
460 """Delete entries from old snapshots from the database."""
464 # Delete old snapshots.
465 cur.execute("""delete from snapshots
466 where snapshotid < (select max(snapshotid)
469 # Delete entries in the segments_used table which are for non-existent
471 cur.execute("""delete from segments_used
472 where snapshotid not in
473 (select snapshotid from snapshots)""")
475 # Find segments which contain no objects used by any current snapshots,
476 # and delete them from the segment table.
477 cur.execute("""delete from segments where segmentid not in
478 (select segmentid from segments_used)""")
480 # Finally, delete objects contained in non-existent segments. We can't
481 # simply delete unused objects, since we use the set of unused objects
482 # to determine the used/free ratio of segments.
483 cur.execute("""delete from block_index
484 where segmentid not in
485 (select segmentid from segments)""")
488 class SegmentInfo(Struct): pass
490 def get_segment_cleaning_list(self, age_boost=0.0):
491 """Return a list of all current segments with information for cleaning.
493 Return all segments which are currently known in the local database
494 (there might be other, older segments in the archive itself), and
495 return usage statistics for each to help decide which segments to
498 The returned list will be sorted by estimated cleaning benefit, with
499 segments that are best to clean at the start of the list.
501 If specified, the age_boost parameter (measured in days) will added to
502 the age of each segment, as a way of adjusting the benefit computation
503 before a long-lived snapshot is taken (for example, age_boost might be
504 set to 7 when cleaning prior to taking a weekly snapshot).
509 cur.execute("""select segmentid, used, size, mtime,
510 julianday('now') - mtime as age from segment_info""")
512 info = self.SegmentInfo()
514 info.used_bytes = row[1]
515 info.size_bytes = row[2]
517 info.age_days = row[4]
519 # Benefit calculation: u is the estimated fraction of each segment
520 # which is utilized (bytes belonging to objects still in use
521 # divided by total size; this doesn't take compression or storage
522 # overhead into account, but should give a reasonable estimate).
524 # The total benefit is a heuristic that combines several factors:
525 # the amount of space that can be reclaimed (1 - u), an ageing
526 # factor (info.age_days) that favors cleaning old segments to young
527 # ones and also is more likely to clean segments that will be
528 # rewritten for long-lived snapshots (age_boost), and finally a
529 # penalty factor for the cost of re-uploading data (u + 0.1).
530 u = info.used_bytes / info.size_bytes
531 info.cleaning_benefit \
532 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
534 segments.append(info)
536 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
539 def mark_segment_expired(self, segment):
540 """Mark a segment for cleaning in the local database.
542 The segment parameter should be either a SegmentInfo object or an
543 integer segment id. Objects in the given segment will be marked as
544 expired, which means that any future snapshots that would re-use those
545 objects will instead write out a new copy of the object, and thus no
546 future snapshots will depend upon the given segment.
549 if isinstance(segment, int):
551 elif isinstance(segment, self.SegmentInfo):
554 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
557 cur.execute("update block_index set expired = 1 where segmentid = ?",
560 def balance_expired_objects(self):
561 """Analyze expired objects in segments to be cleaned and group by age.
563 Update the block_index table of the local database to group expired
564 objects by age. The exact number of buckets and the cutoffs for each
565 are dynamically determined. Calling this function after marking
566 segments expired will help in the segment cleaning process, by ensuring
567 that when active objects from clean segments are rewritten, they will
568 be placed into new segments roughly grouped by age.
571 # The expired column of the block_index table is used when generating a
572 # new LBS snapshot. A null value indicates that an object may be
573 # re-used. Otherwise, an object must be written into a new segment if
574 # needed. Objects with distinct expired values will be written into
575 # distinct segments, to allow for some grouping by age. The value 0 is
576 # somewhat special in that it indicates any rewritten objects can be
577 # placed in the same segment as completely new objects; this can be
578 # used for very young objects which have been expired, or objects not
579 # expected to be encountered.
581 # In the balancing process, all objects which are not used in any
582 # current snapshots will have expired set to 0. Objects which have
583 # been seen will be sorted by age and will have expired values set to
584 # 0, 1, 2, and so on based on age (with younger objects being assigned
585 # lower values). The number of buckets and the age cutoffs is
586 # determined by looking at the distribution of block ages.
590 # Mark all expired objects with expired = 0; these objects will later
591 # have values set to indicate groupings of objects when repacking.
592 cur.execute("""update block_index set expired = 0
593 where expired is not null""")
595 # We will want to aim for at least one full segment for each bucket
596 # that we eventually create, but don't know how many bytes that should
597 # be due to compression. So compute the average number of bytes in
598 # each expired segment as a rough estimate for the minimum size of each
599 # bucket. (This estimate could be thrown off by many not-fully-packed
600 # segments, but for now don't worry too much about that.) If we can't
601 # compute an average, it's probably because there are no expired
602 # segments, so we have no more work to do.
603 cur.execute("""select avg(size) from segments
605 (select distinct segmentid from block_index
606 where expired is not null)""")
607 segment_size_estimate = cur.fetchone()[0]
608 if not segment_size_estimate:
611 # Next, extract distribution of expired objects (number and size) by
612 # age. Save the timestamp for "now" so that the classification of
613 # blocks into age buckets will not change later in the function, after
614 # time has passed. Set any timestamps in the future to now, so we are
615 # guaranteed that for the rest of this function, age is always
617 cur.execute("select julianday('now')")
618 now = cur.fetchone()[0]
620 cur.execute("""update block_index set timestamp = ?
621 where timestamp > ? and expired is not null""",
624 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
625 from block_index where expired = 0
626 group by age order by age""", (now,))
627 distribution = cur.fetchall()
629 # Start to determine the buckets for expired objects. Heuristics used:
630 # - An upper bound on the number of buckets is given by the number of
631 # segments we estimate it will take to store all data. In fact,
632 # aim for a couple of segments per bucket.
633 # - Place very young objects in bucket 0 (place with new objects)
634 # unless there are enough of them to warrant a separate bucket.
635 # - Try not to create unnecessarily many buckets, since fewer buckets
636 # will allow repacked data to be grouped based on spatial locality
637 # (while more buckets will group by temporal locality). We want a
640 total_bytes = sum([i[2] for i in distribution])
641 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
642 min_size = 1.5 * segment_size_estimate
643 target_size = max(2 * segment_size_estimate,
644 total_bytes / target_buckets)
646 print "segment_size:", segment_size_estimate
647 print "distribution:", distribution
648 print "total_bytes:", total_bytes
649 print "target_buckets:", target_buckets
650 print "min, target size:", min_size, target_size
652 # Chosen cutoffs. Each bucket consists of objects with age greater
653 # than one cutoff value, but not greater than the next largest cutoff.
656 # Starting with the oldest objects, begin grouping together into
657 # buckets of size at least target_size bytes.
658 distribution.reverse()
660 min_age_bucket = False
661 for (age, items, size) in distribution:
662 if bucket_size >= target_size \
663 or (age < MIN_AGE and not min_age_bucket):
664 if bucket_size < target_size and len(cutoffs) > 0:
671 min_age_bucket = True
673 # The last (youngest) bucket will be group 0, unless it has enough data
674 # to be of size min_size by itself, or there happen to be no objects
675 # less than MIN_AGE at all.
676 if bucket_size >= min_size or not min_age_bucket:
680 print "cutoffs:", cutoffs
682 # Update the database to assign each object to the appropriate bucket.
684 for i in range(len(cutoffs)):
685 cur.execute("""update block_index set expired = ?
686 where round(? - timestamp) > ?""",
687 (i, now, cutoffs[i]))