When verifying a snapshot, check that the segment list is accurate.
[cumulus.git] / lbs.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 # The largest supported snapshot format that can be understood.
16 FORMAT_VERSION = (0, 6)         # LBS Snapshot v0.6
17
18 # Maximum number of nested indirect references allowed in a snapshot.
19 MAX_RECURSION_DEPTH = 3
20
21 # All segments which have been accessed this session.
22 accessed_segments = set()
23
24 class Struct:
25     """A class which merely acts as a data container.
26
27     Instances of this class (or its subclasses) are merely used to store data
28     in various attributes.  No methods are provided.
29     """
30
31     def __repr__(self):
32         return "<%s %s>" % (self.__class__, self.__dict__)
33
34 CHECKSUM_ALGORITHMS = {
35     'sha1': sha.new
36 }
37
38 class ChecksumCreator:
39     """Compute an LBS checksum for provided data.
40
41     The algorithm used is selectable, but currently defaults to sha1.
42     """
43
44     def __init__(self, algorithm='sha1'):
45         self.algorithm = algorithm
46         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
47
48     def update(self, data):
49         self.hash.update(data)
50         return self
51
52     def compute(self):
53         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
54
55 class ChecksumVerifier:
56     """Verify whether a checksum from a snapshot matches the supplied data."""
57
58     def __init__(self, checksumstr):
59         """Create an object to check the supplied checksum."""
60
61         (algo, checksum) = checksumstr.split("=", 1)
62         self.checksum = checksum
63         self.hash = CHECKSUM_ALGORITHMS[algo]()
64
65     def update(self, data):
66         self.hash.update(data)
67
68     def valid(self):
69         """Return a boolean indicating whether the checksum matches."""
70
71         result = self.hash.hexdigest()
72         return result == self.checksum
73
74 class LowlevelDataStore:
75     """Access to the backup store containing segments and snapshot descriptors.
76
77     Instances of this class are used to get direct filesystem-level access to
78     the backup data.  To read a backup, a caller will ordinarily not care about
79     direct access to backup segments, but will instead merely need to access
80     objects from those segments.  The ObjectStore class provides a suitable
81     wrapper around a DataStore to give this high-level access.
82     """
83
84     def __init__(self, path):
85         self.path = path
86
87     # Low-level filesystem access.  These methods could be overwritten to
88     # provide access to remote data stores.
89     def lowlevel_list(self):
90         """Get a listing of files stored."""
91
92         return os.listdir(self.path)
93
94     def lowlevel_open(self, filename):
95         """Return a file-like object for reading data from the given file."""
96
97         return open(os.path.join(self.path, filename), 'rb')
98
99     def lowlevel_stat(self, filename):
100         """Return a dictionary of information about the given file.
101
102         Currently, the only defined field is 'size', giving the size of the
103         file in bytes.
104         """
105
106         stat = os.stat(os.path.join(self.path, filename))
107         return {'size': stat.st_size}
108
109     # Slightly higher-level list methods.
110     def list_snapshots(self):
111         for f in self.lowlevel_list():
112             m = re.match(r"^snapshot-(.*)\.lbs$", f)
113             if m:
114                 yield m.group(1)
115
116     def list_segments(self):
117         for f in self.lowlevel_list():
118             m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
119             if m:
120                 yield m.group(1)
121
122 class ObjectStore:
123     def __init__(self, data_store):
124         self.store = data_store
125         self.cachedir = None
126         self.CACHE_SIZE = 16
127         self.lru_list = []
128
129     def get_cachedir(self):
130         if self.cachedir is None:
131             self.cachedir = tempfile.mkdtemp(".lbs")
132         return self.cachedir
133
134     def cleanup(self):
135         if self.cachedir is not None:
136             # TODO: Avoid use of system, make this safer
137             os.system("rm -rf " + self.cachedir)
138         self.cachedir = None
139
140     @staticmethod
141     def parse_ref(refstr):
142         m = re.match(r"^zero\[(\d+)\+(\d+)\]$", refstr)
143         if m:
144             return ("zero", None, None, (int(m.group(1)), int(m.group(2))))
145
146         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
147         if not m: return
148
149         segment = m.group(1)
150         object = m.group(2)
151         checksum = m.group(3)
152         slice = m.group(4)
153
154         if checksum is not None:
155             checksum = checksum.lstrip("(").rstrip(")")
156
157         if slice is not None:
158             slice = (int(m.group(5)), int(m.group(6)))
159
160         return (segment, object, checksum, slice)
161
162     def get_segment(self, segment):
163         accessed_segments.add(segment)
164         raw = self.store.lowlevel_open(segment + ".tar.gpg")
165
166         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
167         def copy_thread(src, dst):
168             BLOCK_SIZE = 4096
169             while True:
170                 block = src.read(BLOCK_SIZE)
171                 if len(block) == 0: break
172                 dst.write(block)
173             dst.close()
174
175         thread.start_new_thread(copy_thread, (raw, input))
176         return output
177
178     def load_segment(self, segment):
179         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
180         for item in seg:
181             data_obj = seg.extractfile(item)
182             path = item.name.split('/')
183             if len(path) == 2 and path[0] == segment:
184                 yield (path[1], data_obj.read())
185
186     def load_snapshot(self, snapshot):
187         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
188         return file.read().splitlines(True)
189
190     def extract_segment(self, segment):
191         segdir = os.path.join(self.get_cachedir(), segment)
192         os.mkdir(segdir)
193         for (object, data) in self.load_segment(segment):
194             f = open(os.path.join(segdir, object), 'wb')
195             f.write(data)
196             f.close()
197
198     def load_object(self, segment, object):
199         accessed_segments.add(segment)
200         path = os.path.join(self.get_cachedir(), segment, object)
201         if not os.access(path, os.R_OK):
202             self.extract_segment(segment)
203         if segment in self.lru_list: self.lru_list.remove(segment)
204         self.lru_list.append(segment)
205         while len(self.lru_list) > self.CACHE_SIZE:
206             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
207             self.lru_list = self.lru_list[1:]
208         return open(path, 'rb').read()
209
210     def get(self, refstr):
211         """Fetch the given object and return it.
212
213         The input should be an object reference, in string form.
214         """
215
216         (segment, object, checksum, slice) = self.parse_ref(refstr)
217
218         if segment == "zero":
219             return "\0" * slice[1]
220
221         data = self.load_object(segment, object)
222
223         if checksum is not None:
224             verifier = ChecksumVerifier(checksum)
225             verifier.update(data)
226             if not verifier.valid():
227                 raise ValueError
228
229         if slice is not None:
230             (start, length) = slice
231             data = data[start:start+length]
232             if len(data) != length: raise IndexError
233
234         return data
235
236 def parse(lines, terminate=None):
237     """Generic parser for RFC822-style "Key: Value" data streams.
238
239     This parser can be used to read metadata logs and snapshot root descriptor
240     files.
241
242     lines must be an iterable object which yields a sequence of lines of input.
243
244     If terminate is specified, it is used as a predicate to determine when to
245     stop reading input lines.
246     """
247
248     dict = {}
249     last_key = None
250
251     for l in lines:
252         # Strip off a trailing newline, if present
253         if len(l) > 0 and l[-1] == "\n":
254             l = l[:-1]
255
256         if terminate is not None and terminate(l):
257             if len(dict) > 0: yield dict
258             dict = {}
259             last_key = None
260             continue
261
262         m = re.match(r"^(\w+):\s*(.*)$", l)
263         if m:
264             dict[m.group(1)] = m.group(2)
265             last_key = m.group(1)
266         elif len(l) > 0 and l[0].isspace() and last_key is not None:
267             dict[last_key] += l
268         else:
269             last_key = None
270
271     if len(dict) > 0: yield dict
272
273 def parse_full(lines):
274     try:
275         return parse(lines).next()
276     except StopIteration:
277         return {}
278
279 def parse_metadata_version(s):
280     """Convert a string with the snapshot version format to a tuple."""
281
282     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
283     if m is None:
284         return ()
285     else:
286         return tuple([int(d) for d in m.group(1).split(".")])
287
288 def read_metadata(object_store, root):
289     """Iterate through all lines in the metadata log, following references."""
290
291     # Stack for keeping track of recursion when following references to
292     # portions of the log.  The last entry in the stack corresponds to the
293     # object currently being parsed.  Each entry is a list of lines which have
294     # been reversed, so that popping successive lines from the end of each list
295     # will return lines of the metadata log in order.
296     stack = []
297
298     def follow_ref(refstr):
299         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
300         lines = object_store.get(refstr).splitlines(True)
301         lines.reverse()
302         stack.append(lines)
303
304     follow_ref(root)
305
306     while len(stack) > 0:
307         top = stack[-1]
308         if len(top) == 0:
309             stack.pop()
310             continue
311         line = top.pop()
312
313         # An indirect reference which we must follow?
314         if len(line) > 0 and line[0] == '@':
315             ref = line[1:]
316             ref.strip()
317             follow_ref(ref)
318         else:
319             yield line
320
321 class MetadataItem:
322     """Metadata for a single file (or directory or...) from a snapshot."""
323
324     # Functions for parsing various datatypes that can appear in a metadata log
325     # item.
326     @staticmethod
327     def decode_int(s):
328         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
329         if s.startswith("0x"):
330             return int(s, 16)
331         elif s.startswith("0"):
332             return int(s, 8)
333         else:
334             return int(s, 10)
335
336     @staticmethod
337     def decode_str(s):
338         """Decode a URI-encoded (%xx escapes) string."""
339         def hex_decode(m): return chr(int(m.group(1), 16))
340         return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
341
342     @staticmethod
343     def raw_str(s):
344         """An unecoded string."""
345         return s
346
347     @staticmethod
348     def decode_user(s):
349         """Decode a user/group to a tuple of uid/gid followed by name."""
350         items = s.split()
351         uid = MetadataItem.decode_int(items[0])
352         name = None
353         if len(items) > 1:
354             if items[1].startswith("(") and items[1].endswith(")"):
355                 name = MetadataItem.decode_str(items[1][1:-1])
356         return (uid, name)
357
358     @staticmethod
359     def decode_device(s):
360         """Decode a device major/minor number."""
361         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
362         return (major, minor)
363
364     class Items: pass
365
366     def __init__(self, fields, object_store):
367         """Initialize from a dictionary of key/value pairs from metadata log."""
368
369         self.fields = fields
370         self.object_store = object_store
371         self.keys = []
372         self.items = self.Items()
373         for (k, v) in fields.items():
374             if k in self.field_types:
375                 decoder = self.field_types[k]
376                 setattr(self.items, k, decoder(v))
377                 self.keys.append(k)
378
379     def data(self):
380         """Return an iterator for the data blocks that make up a file."""
381
382         # This traverses the list of blocks that make up a file, following
383         # indirect references.  It is implemented in much the same way as
384         # read_metadata, so see that function for details of the technique.
385
386         objects = self.fields['data'].split()
387         objects.reverse()
388         stack = [objects]
389
390         def follow_ref(refstr):
391             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
392             objects = self.object_store.get(refstr).split()
393             objects.reverse()
394             stack.append(objects)
395
396         while len(stack) > 0:
397             top = stack[-1]
398             if len(top) == 0:
399                 stack.pop()
400                 continue
401             ref = top.pop()
402
403             # An indirect reference which we must follow?
404             if len(ref) > 0 and ref[0] == '@':
405                 follow_ref(ref[1:])
406             else:
407                 yield ref
408
409 # Description of fields that might appear, and how they should be parsed.
410 MetadataItem.field_types = {
411     'name': MetadataItem.decode_str,
412     'type': MetadataItem.raw_str,
413     'mode': MetadataItem.decode_int,
414     'device': MetadataItem.decode_device,
415     'user': MetadataItem.decode_user,
416     'group': MetadataItem.decode_user,
417     'ctime': MetadataItem.decode_int,
418     'mtime': MetadataItem.decode_int,
419     'links': MetadataItem.decode_int,
420     'inode': MetadataItem.raw_str,
421     'checksum': MetadataItem.decode_str,
422     'size': MetadataItem.decode_int,
423     'contents': MetadataItem.decode_str,
424     'target': MetadataItem.decode_str,
425 }
426
427 def iterate_metadata(object_store, root):
428     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
429         yield MetadataItem(d, object_store)
430
431 class LocalDatabase:
432     """Access to the local database of snapshot contents and object checksums.
433
434     The local database is consulted when creating a snapshot to determine what
435     data can be re-used from old snapshots.  Segment cleaning is performed by
436     manipulating the data in the local database; the local database also
437     includes enough data to guide the segment cleaning process.
438     """
439
440     def __init__(self, path, dbname="localdb.sqlite"):
441         self.db_connection = sqlite3.connect(path + "/" + dbname)
442
443     # Low-level database access.  Use these methods when there isn't a
444     # higher-level interface available.  Exception: do, however, remember to
445     # use the commit() method after making changes to make sure they are
446     # actually saved, even when going through higher-level interfaces.
447     def commit(self):
448         "Commit any pending changes to the local database."
449         self.db_connection.commit()
450
451     def rollback(self):
452         "Roll back any pending changes to the local database."
453         self.db_connection.rollback()
454
455     def cursor(self):
456         "Return a DB-API cursor for directly accessing the local database."
457         return self.db_connection.cursor()
458
459     def garbage_collect(self):
460         """Delete entries from old snapshots from the database."""
461
462         cur = self.cursor()
463
464         # Delete old snapshots.
465         cur.execute("""delete from snapshots
466                        where snapshotid < (select max(snapshotid)
467                                            from snapshots)""")
468
469         # Delete entries in the segments_used table which are for non-existent
470         # snapshots.
471         cur.execute("""delete from segments_used
472                        where snapshotid not in
473                            (select snapshotid from snapshots)""")
474
475         # Find segments which contain no objects used by any current snapshots,
476         # and delete them from the segment table.
477         cur.execute("""delete from segments where segmentid not in
478                            (select segmentid from segments_used)""")
479
480         # Finally, delete objects contained in non-existent segments.  We can't
481         # simply delete unused objects, since we use the set of unused objects
482         # to determine the used/free ratio of segments.
483         cur.execute("""delete from block_index
484                        where segmentid not in
485                            (select segmentid from segments)""")
486
487     # Segment cleaning.
488     class SegmentInfo(Struct): pass
489
490     def get_segment_cleaning_list(self, age_boost=0.0):
491         """Return a list of all current segments with information for cleaning.
492
493         Return all segments which are currently known in the local database
494         (there might be other, older segments in the archive itself), and
495         return usage statistics for each to help decide which segments to
496         clean.
497
498         The returned list will be sorted by estimated cleaning benefit, with
499         segments that are best to clean at the start of the list.
500
501         If specified, the age_boost parameter (measured in days) will added to
502         the age of each segment, as a way of adjusting the benefit computation
503         before a long-lived snapshot is taken (for example, age_boost might be
504         set to 7 when cleaning prior to taking a weekly snapshot).
505         """
506
507         cur = self.cursor()
508         segments = []
509         cur.execute("""select segmentid, used, size, mtime,
510                        julianday('now') - mtime as age from segment_info""")
511         for row in cur:
512             info = self.SegmentInfo()
513             info.id = row[0]
514             info.used_bytes = row[1]
515             info.size_bytes = row[2]
516             info.mtime = row[3]
517             info.age_days = row[4]
518
519             # Benefit calculation: u is the estimated fraction of each segment
520             # which is utilized (bytes belonging to objects still in use
521             # divided by total size; this doesn't take compression or storage
522             # overhead into account, but should give a reasonable estimate).
523             #
524             # The total benefit is a heuristic that combines several factors:
525             # the amount of space that can be reclaimed (1 - u), an ageing
526             # factor (info.age_days) that favors cleaning old segments to young
527             # ones and also is more likely to clean segments that will be
528             # rewritten for long-lived snapshots (age_boost), and finally a
529             # penalty factor for the cost of re-uploading data (u + 0.1).
530             u = info.used_bytes / info.size_bytes
531             info.cleaning_benefit \
532                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
533
534             segments.append(info)
535
536         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
537         return segments
538
539     def mark_segment_expired(self, segment):
540         """Mark a segment for cleaning in the local database.
541
542         The segment parameter should be either a SegmentInfo object or an
543         integer segment id.  Objects in the given segment will be marked as
544         expired, which means that any future snapshots that would re-use those
545         objects will instead write out a new copy of the object, and thus no
546         future snapshots will depend upon the given segment.
547         """
548
549         if isinstance(segment, int):
550             id = segment
551         elif isinstance(segment, self.SegmentInfo):
552             id = segment.id
553         else:
554             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
555
556         cur = self.cursor()
557         cur.execute("update block_index set expired = 1 where segmentid = ?",
558                     (id,))
559
560     def balance_expired_objects(self):
561         """Analyze expired objects in segments to be cleaned and group by age.
562
563         Update the block_index table of the local database to group expired
564         objects by age.  The exact number of buckets and the cutoffs for each
565         are dynamically determined.  Calling this function after marking
566         segments expired will help in the segment cleaning process, by ensuring
567         that when active objects from clean segments are rewritten, they will
568         be placed into new segments roughly grouped by age.
569         """
570
571         # The expired column of the block_index table is used when generating a
572         # new LBS snapshot.  A null value indicates that an object may be
573         # re-used.  Otherwise, an object must be written into a new segment if
574         # needed.  Objects with distinct expired values will be written into
575         # distinct segments, to allow for some grouping by age.  The value 0 is
576         # somewhat special in that it indicates any rewritten objects can be
577         # placed in the same segment as completely new objects; this can be
578         # used for very young objects which have been expired, or objects not
579         # expected to be encountered.
580         #
581         # In the balancing process, all objects which are not used in any
582         # current snapshots will have expired set to 0.  Objects which have
583         # been seen will be sorted by age and will have expired values set to
584         # 0, 1, 2, and so on based on age (with younger objects being assigned
585         # lower values).  The number of buckets and the age cutoffs is
586         # determined by looking at the distribution of block ages.
587
588         cur = self.cursor()
589
590         # Mark all expired objects with expired = 0; these objects will later
591         # have values set to indicate groupings of objects when repacking.
592         cur.execute("""update block_index set expired = 0
593                        where expired is not null""")
594
595         # We will want to aim for at least one full segment for each bucket
596         # that we eventually create, but don't know how many bytes that should
597         # be due to compression.  So compute the average number of bytes in
598         # each expired segment as a rough estimate for the minimum size of each
599         # bucket.  (This estimate could be thrown off by many not-fully-packed
600         # segments, but for now don't worry too much about that.)  If we can't
601         # compute an average, it's probably because there are no expired
602         # segments, so we have no more work to do.
603         cur.execute("""select avg(size) from segments
604                        where segmentid in
605                            (select distinct segmentid from block_index
606                             where expired is not null)""")
607         segment_size_estimate = cur.fetchone()[0]
608         if not segment_size_estimate:
609             return
610
611         # Next, extract distribution of expired objects (number and size) by
612         # age.  Save the timestamp for "now" so that the classification of
613         # blocks into age buckets will not change later in the function, after
614         # time has passed.  Set any timestamps in the future to now, so we are
615         # guaranteed that for the rest of this function, age is always
616         # non-negative.
617         cur.execute("select julianday('now')")
618         now = cur.fetchone()[0]
619
620         cur.execute("""update block_index set timestamp = ?
621                        where timestamp > ? and expired is not null""",
622                     (now, now))
623
624         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
625                        from block_index where expired = 0
626                        group by age order by age""", (now,))
627         distribution = cur.fetchall()
628
629         # Start to determine the buckets for expired objects.  Heuristics used:
630         #   - An upper bound on the number of buckets is given by the number of
631         #     segments we estimate it will take to store all data.  In fact,
632         #     aim for a couple of segments per bucket.
633         #   - Place very young objects in bucket 0 (place with new objects)
634         #     unless there are enough of them to warrant a separate bucket.
635         #   - Try not to create unnecessarily many buckets, since fewer buckets
636         #     will allow repacked data to be grouped based on spatial locality
637         #     (while more buckets will group by temporal locality).  We want a
638         #     balance.
639         MIN_AGE = 4
640         total_bytes = sum([i[2] for i in distribution])
641         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
642         min_size = 1.5 * segment_size_estimate
643         target_size = max(2 * segment_size_estimate,
644                           total_bytes / target_buckets)
645
646         print "segment_size:", segment_size_estimate
647         print "distribution:", distribution
648         print "total_bytes:", total_bytes
649         print "target_buckets:", target_buckets
650         print "min, target size:", min_size, target_size
651
652         # Chosen cutoffs.  Each bucket consists of objects with age greater
653         # than one cutoff value, but not greater than the next largest cutoff.
654         cutoffs = []
655
656         # Starting with the oldest objects, begin grouping together into
657         # buckets of size at least target_size bytes.
658         distribution.reverse()
659         bucket_size = 0
660         min_age_bucket = False
661         for (age, items, size) in distribution:
662             if bucket_size >= target_size \
663                 or (age < MIN_AGE and not min_age_bucket):
664                 if bucket_size < target_size and len(cutoffs) > 0:
665                     cutoffs.pop()
666                 cutoffs.append(age)
667                 bucket_size = 0
668
669             bucket_size += size
670             if age < MIN_AGE:
671                 min_age_bucket = True
672
673         # The last (youngest) bucket will be group 0, unless it has enough data
674         # to be of size min_size by itself, or there happen to be no objects
675         # less than MIN_AGE at all.
676         if bucket_size >= min_size or not min_age_bucket:
677             cutoffs.append(-1)
678         cutoffs.append(-1)
679
680         print "cutoffs:", cutoffs
681
682         # Update the database to assign each object to the appropriate bucket.
683         cutoffs.reverse()
684         for i in range(len(cutoffs)):
685             cur.execute("""update block_index set expired = ?
686                            where round(? - timestamp) > ?""",
687                         (i, now, cutoffs[i]))