7a391c7c033ff0baf4daa1829c5f585089020b45
[cumulus.git] / lbs.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 # The largest supported snapshot format that can be understood.
16 FORMAT_VERSION = (0, 6)         # LBS Snapshot v0.6
17
18 # Maximum number of nested indirect references allowed in a snapshot.
19 MAX_RECURSION_DEPTH = 3
20
21 class Struct:
22     """A class which merely acts as a data container.
23
24     Instances of this class (or its subclasses) are merely used to store data
25     in various attributes.  No methods are provided.
26     """
27
28     def __repr__(self):
29         return "<%s %s>" % (self.__class__, self.__dict__)
30
31 CHECKSUM_ALGORITHMS = {
32     'sha1': sha.new
33 }
34
35 class ChecksumCreator:
36     """Compute an LBS checksum for provided data.
37
38     The algorithm used is selectable, but currently defaults to sha1.
39     """
40
41     def __init__(self, algorithm='sha1'):
42         self.algorithm = algorithm
43         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
44
45     def update(self, data):
46         self.hash.update(data)
47         return self
48
49     def compute(self):
50         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
51
52 class ChecksumVerifier:
53     """Verify whether a checksum from a snapshot matches the supplied data."""
54
55     def __init__(self, checksumstr):
56         """Create an object to check the supplied checksum."""
57
58         (algo, checksum) = checksumstr.split("=", 1)
59         self.checksum = checksum
60         self.hash = CHECKSUM_ALGORITHMS[algo]()
61
62     def update(self, data):
63         self.hash.update(data)
64
65     def valid(self):
66         """Return a boolean indicating whether the checksum matches."""
67
68         result = self.hash.hexdigest()
69         return result == self.checksum
70
71 class LowlevelDataStore:
72     """Access to the backup store containing segments and snapshot descriptors.
73
74     Instances of this class are used to get direct filesystem-level access to
75     the backup data.  To read a backup, a caller will ordinarily not care about
76     direct access to backup segments, but will instead merely need to access
77     objects from those segments.  The ObjectStore class provides a suitable
78     wrapper around a DataStore to give this high-level access.
79     """
80
81     def __init__(self, path):
82         self.path = path
83
84     # Low-level filesystem access.  These methods could be overwritten to
85     # provide access to remote data stores.
86     def lowlevel_list(self):
87         """Get a listing of files stored."""
88
89         return os.listdir(self.path)
90
91     def lowlevel_open(self, filename):
92         """Return a file-like object for reading data from the given file."""
93
94         return open(os.path.join(self.path, filename), 'rb')
95
96     def lowlevel_stat(self, filename):
97         """Return a dictionary of information about the given file.
98
99         Currently, the only defined field is 'size', giving the size of the
100         file in bytes.
101         """
102
103         stat = os.stat(os.path.join(self.path, filename))
104         return {'size': stat.st_size}
105
106     # Slightly higher-level list methods.
107     def list_snapshots(self):
108         for f in self.lowlevel_list():
109             m = re.match(r"^snapshot-(.*)\.lbs$", f)
110             if m:
111                 yield m.group(1)
112
113     def list_segments(self):
114         for f in self.lowlevel_list():
115             m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
116             if m:
117                 yield m.group(1)
118
119 class ObjectStore:
120     def __init__(self, data_store):
121         self.store = data_store
122         self.cachedir = None
123         self.CACHE_SIZE = 16
124         self.lru_list = []
125
126     def get_cachedir(self):
127         if self.cachedir is None:
128             self.cachedir = tempfile.mkdtemp(".lbs")
129         return self.cachedir
130
131     def cleanup(self):
132         if self.cachedir is not None:
133             # TODO: Avoid use of system, make this safer
134             os.system("rm -rf " + self.cachedir)
135         self.cachedir = None
136
137     @staticmethod
138     def parse_ref(refstr):
139         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
140         if not m: return
141
142         segment = m.group(1)
143         object = m.group(2)
144         checksum = m.group(3)
145         slice = m.group(4)
146
147         if checksum is not None:
148             checksum = checksum.lstrip("(").rstrip(")")
149
150         if slice is not None:
151             slice = (int(m.group(5)), int(m.group(6)))
152
153         return (segment, object, checksum, slice)
154
155     def get_segment(self, segment):
156         raw = self.store.lowlevel_open(segment + ".tar.gpg")
157
158         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
159         def copy_thread(src, dst):
160             BLOCK_SIZE = 4096
161             while True:
162                 block = src.read(BLOCK_SIZE)
163                 if len(block) == 0: break
164                 dst.write(block)
165             dst.close()
166
167         thread.start_new_thread(copy_thread, (raw, input))
168         return output
169
170     def load_segment(self, segment):
171         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
172         for item in seg:
173             data_obj = seg.extractfile(item)
174             path = item.name.split('/')
175             if len(path) == 2 and path[0] == segment:
176                 yield (path[1], data_obj.read())
177
178     def load_snapshot(self, snapshot):
179         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
180         return file.read().splitlines(True)
181
182     def extract_segment(self, segment):
183         segdir = os.path.join(self.get_cachedir(), segment)
184         os.mkdir(segdir)
185         for (object, data) in self.load_segment(segment):
186             f = open(os.path.join(segdir, object), 'wb')
187             f.write(data)
188             f.close()
189
190     def load_object(self, segment, object):
191         path = os.path.join(self.get_cachedir(), segment, object)
192         if not os.access(path, os.R_OK):
193             self.extract_segment(segment)
194         if segment in self.lru_list: self.lru_list.remove(segment)
195         self.lru_list.append(segment)
196         while len(self.lru_list) > self.CACHE_SIZE:
197             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
198             self.lru_list = self.lru_list[1:]
199         return open(path, 'rb').read()
200
201     def get(self, refstr):
202         """Fetch the given object and return it.
203
204         The input should be an object reference, in string form.
205         """
206
207         (segment, object, checksum, slice) = self.parse_ref(refstr)
208
209         data = self.load_object(segment, object)
210
211         if checksum is not None:
212             verifier = ChecksumVerifier(checksum)
213             verifier.update(data)
214             if not verifier.valid():
215                 raise ValueError
216
217         if slice is not None:
218             (start, length) = slice
219             data = data[start:start+length]
220             if len(data) != length: raise IndexError
221
222         return data
223
224 def parse(lines, terminate=None):
225     """Generic parser for RFC822-style "Key: Value" data streams.
226
227     This parser can be used to read metadata logs and snapshot root descriptor
228     files.
229
230     lines must be an iterable object which yields a sequence of lines of input.
231
232     If terminate is specified, it is used as a predicate to determine when to
233     stop reading input lines.
234     """
235
236     dict = {}
237     last_key = None
238
239     for l in lines:
240         # Strip off a trailing newline, if present
241         if len(l) > 0 and l[-1] == "\n":
242             l = l[:-1]
243
244         if terminate is not None and terminate(l):
245             if len(dict) > 0: yield dict
246             dict = {}
247             last_key = None
248             continue
249
250         m = re.match(r"^(\w+):\s*(.*)$", l)
251         if m:
252             dict[m.group(1)] = m.group(2)
253             last_key = m.group(1)
254         elif len(l) > 0 and l[0].isspace() and last_key is not None:
255             dict[last_key] += l
256         else:
257             last_key = None
258
259     if len(dict) > 0: yield dict
260
261 def parse_full(lines):
262     try:
263         return parse(lines).next()
264     except StopIteration:
265         return {}
266
267 def parse_metadata_version(s):
268     """Convert a string with the snapshot version format to a tuple."""
269
270     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
271     if m is None:
272         return ()
273     else:
274         return tuple([int(d) for d in m.group(1).split(".")])
275
276 def read_metadata(object_store, root):
277     """Iterate through all lines in the metadata log, following references."""
278
279     # Stack for keeping track of recursion when following references to
280     # portions of the log.  The last entry in the stack corresponds to the
281     # object currently being parsed.  Each entry is a list of lines which have
282     # been reversed, so that popping successive lines from the end of each list
283     # will return lines of the metadata log in order.
284     stack = []
285
286     def follow_ref(refstr):
287         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
288         lines = object_store.get(refstr).splitlines(True)
289         lines.reverse()
290         stack.append(lines)
291
292     follow_ref(root)
293
294     while len(stack) > 0:
295         top = stack[-1]
296         if len(top) == 0:
297             stack.pop()
298             continue
299         line = top.pop()
300
301         # An indirect reference which we must follow?
302         if len(line) > 0 and line[0] == '@':
303             ref = line[1:]
304             ref.strip()
305             follow_ref(ref)
306         else:
307             yield line
308
309 class MetadataItem:
310     """Metadata for a single file (or directory or...) from a snapshot."""
311
312     # Functions for parsing various datatypes that can appear in a metadata log
313     # item.
314     @staticmethod
315     def decode_int(s):
316         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
317         if s.startswith("0x"):
318             return int(s, 16)
319         elif s.startswith("0"):
320             return int(s, 8)
321         else:
322             return int(s, 10)
323
324     @staticmethod
325     def decode_str(s):
326         """Decode a URI-encoded (%xx escapes) string."""
327         def hex_decode(m): return chr(int(m.group(1), 16))
328         return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
329
330     @staticmethod
331     def raw_str(s):
332         """An unecoded string."""
333         return s
334
335     @staticmethod
336     def decode_user(s):
337         """Decode a user/group to a tuple of uid/gid followed by name."""
338         items = s.split()
339         uid = MetadataItem.decode_int(items[0])
340         name = None
341         if len(items) > 1:
342             if items[1].startswith("(") and items[1].endswith(")"):
343                 name = MetadataItem.decode_str(items[1][1:-1])
344         return (uid, name)
345
346     @staticmethod
347     def decode_device(s):
348         """Decode a device major/minor number."""
349         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
350         return (major, minor)
351
352     class Items: pass
353
354     def __init__(self, fields, object_store):
355         """Initialize from a dictionary of key/value pairs from metadata log."""
356
357         self.fields = fields
358         self.object_store = object_store
359         self.keys = []
360         self.items = self.Items()
361         for (k, v) in fields.items():
362             if k in self.field_types:
363                 decoder = self.field_types[k]
364                 setattr(self.items, k, decoder(v))
365                 self.keys.append(k)
366
367     def data(self):
368         """Return an iterator for the data blocks that make up a file."""
369
370         # This traverses the list of blocks that make up a file, following
371         # indirect references.  It is implemented in much the same way as
372         # read_metadata, so see that function for details of the technique.
373
374         objects = self.fields['data'].split()
375         objects.reverse()
376         stack = [objects]
377
378         def follow_ref(refstr):
379             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
380             objects = self.object_store.get(refstr).split()
381             objects.reverse()
382             stack.append(objects)
383
384         while len(stack) > 0:
385             top = stack[-1]
386             if len(top) == 0:
387                 stack.pop()
388                 continue
389             ref = top.pop()
390
391             # An indirect reference which we must follow?
392             if len(ref) > 0 and ref[0] == '@':
393                 follow_ref(ref[1:])
394             else:
395                 yield ref
396
397 # Description of fields that might appear, and how they should be parsed.
398 MetadataItem.field_types = {
399     'name': MetadataItem.decode_str,
400     'type': MetadataItem.raw_str,
401     'mode': MetadataItem.decode_int,
402     'device': MetadataItem.decode_device,
403     'user': MetadataItem.decode_user,
404     'group': MetadataItem.decode_user,
405     'ctime': MetadataItem.decode_int,
406     'mtime': MetadataItem.decode_int,
407     'links': MetadataItem.decode_int,
408     'inode': MetadataItem.raw_str,
409     'checksum': MetadataItem.decode_str,
410     'size': MetadataItem.decode_int,
411     'contents': MetadataItem.decode_str,
412     'target': MetadataItem.decode_str,
413 }
414
415 def iterate_metadata(object_store, root):
416     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
417         yield MetadataItem(d, object_store)
418
419 class LocalDatabase:
420     """Access to the local database of snapshot contents and object checksums.
421
422     The local database is consulted when creating a snapshot to determine what
423     data can be re-used from old snapshots.  Segment cleaning is performed by
424     manipulating the data in the local database; the local database also
425     includes enough data to guide the segment cleaning process.
426     """
427
428     def __init__(self, path, dbname="localdb.sqlite"):
429         self.db_connection = sqlite3.connect(path + "/" + dbname)
430
431     # Low-level database access.  Use these methods when there isn't a
432     # higher-level interface available.  Exception: do, however, remember to
433     # use the commit() method after making changes to make sure they are
434     # actually saved, even when going through higher-level interfaces.
435     def commit(self):
436         "Commit any pending changes to the local database."
437         self.db_connection.commit()
438
439     def rollback(self):
440         "Roll back any pending changes to the local database."
441         self.db_connection.rollback()
442
443     def cursor(self):
444         "Return a DB-API cursor for directly accessing the local database."
445         return self.db_connection.cursor()
446
447     def garbage_collect(self):
448         """Delete entries from old snapshots from the database."""
449
450         cur = self.cursor()
451
452         # Delete old snapshots.
453         cur.execute("""delete from snapshots
454                        where snapshotid < (select max(snapshotid)
455                                            from snapshots)""")
456
457         # Delete entries in the segments_used table which are for non-existent
458         # snapshots.
459         cur.execute("""delete from segments_used
460                        where snapshotid not in
461                            (select snapshotid from snapshots)""")
462
463         # Find segments which contain no objects used by any current snapshots,
464         # and delete them from the segment table.
465         cur.execute("""delete from segments where segmentid not in
466                            (select segmentid from segments_used)""")
467
468         # Finally, delete objects contained in non-existent segments.  We can't
469         # simply delete unused objects, since we use the set of unused objects
470         # to determine the used/free ratio of segments.
471         cur.execute("""delete from block_index
472                        where segmentid not in
473                            (select segmentid from segments)""")
474
475     # Segment cleaning.
476     class SegmentInfo(Struct): pass
477
478     def get_segment_cleaning_list(self, age_boost=0.0):
479         """Return a list of all current segments with information for cleaning.
480
481         Return all segments which are currently known in the local database
482         (there might be other, older segments in the archive itself), and
483         return usage statistics for each to help decide which segments to
484         clean.
485
486         The returned list will be sorted by estimated cleaning benefit, with
487         segments that are best to clean at the start of the list.
488
489         If specified, the age_boost parameter (measured in days) will added to
490         the age of each segment, as a way of adjusting the benefit computation
491         before a long-lived snapshot is taken (for example, age_boost might be
492         set to 7 when cleaning prior to taking a weekly snapshot).
493         """
494
495         cur = self.cursor()
496         segments = []
497         cur.execute("""select segmentid, used, size, mtime,
498                        julianday('now') - mtime as age from segment_info""")
499         for row in cur:
500             info = self.SegmentInfo()
501             info.id = row[0]
502             info.used_bytes = row[1]
503             info.size_bytes = row[2]
504             info.mtime = row[3]
505             info.age_days = row[4]
506
507             # Benefit calculation: u is the estimated fraction of each segment
508             # which is utilized (bytes belonging to objects still in use
509             # divided by total size; this doesn't take compression or storage
510             # overhead into account, but should give a reasonable estimate).
511             #
512             # The total benefit is a heuristic that combines several factors:
513             # the amount of space that can be reclaimed (1 - u), an ageing
514             # factor (info.age_days) that favors cleaning old segments to young
515             # ones and also is more likely to clean segments that will be
516             # rewritten for long-lived snapshots (age_boost), and finally a
517             # penalty factor for the cost of re-uploading data (u + 0.1).
518             u = info.used_bytes / info.size_bytes
519             info.cleaning_benefit \
520                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
521
522             segments.append(info)
523
524         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
525         return segments
526
527     def mark_segment_expired(self, segment):
528         """Mark a segment for cleaning in the local database.
529
530         The segment parameter should be either a SegmentInfo object or an
531         integer segment id.  Objects in the given segment will be marked as
532         expired, which means that any future snapshots that would re-use those
533         objects will instead write out a new copy of the object, and thus no
534         future snapshots will depend upon the given segment.
535         """
536
537         if isinstance(segment, int):
538             id = segment
539         elif isinstance(segment, self.SegmentInfo):
540             id = segment.id
541         else:
542             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
543
544         cur = self.cursor()
545         cur.execute("update block_index set expired = 1 where segmentid = ?",
546                     (id,))
547
548     def balance_expired_objects(self):
549         """Analyze expired objects in segments to be cleaned and group by age.
550
551         Update the block_index table of the local database to group expired
552         objects by age.  The exact number of buckets and the cutoffs for each
553         are dynamically determined.  Calling this function after marking
554         segments expired will help in the segment cleaning process, by ensuring
555         that when active objects from clean segments are rewritten, they will
556         be placed into new segments roughly grouped by age.
557         """
558
559         # The expired column of the block_index table is used when generating a
560         # new LBS snapshot.  A null value indicates that an object may be
561         # re-used.  Otherwise, an object must be written into a new segment if
562         # needed.  Objects with distinct expired values will be written into
563         # distinct segments, to allow for some grouping by age.  The value 0 is
564         # somewhat special in that it indicates any rewritten objects can be
565         # placed in the same segment as completely new objects; this can be
566         # used for very young objects which have been expired, or objects not
567         # expected to be encountered.
568         #
569         # In the balancing process, all objects which are not used in any
570         # current snapshots will have expired set to 0.  Objects which have
571         # been seen will be sorted by age and will have expired values set to
572         # 0, 1, 2, and so on based on age (with younger objects being assigned
573         # lower values).  The number of buckets and the age cutoffs is
574         # determined by looking at the distribution of block ages.
575
576         cur = self.cursor()
577
578         # Mark all expired objects with expired = 0; these objects will later
579         # have values set to indicate groupings of objects when repacking.
580         cur.execute("""update block_index set expired = 0
581                        where expired is not null""")
582
583         # We will want to aim for at least one full segment for each bucket
584         # that we eventually create, but don't know how many bytes that should
585         # be due to compression.  So compute the average number of bytes in
586         # each expired segment as a rough estimate for the minimum size of each
587         # bucket.  (This estimate could be thrown off by many not-fully-packed
588         # segments, but for now don't worry too much about that.)  If we can't
589         # compute an average, it's probably because there are no expired
590         # segments, so we have no more work to do.
591         cur.execute("""select avg(size) from segments
592                        where segmentid in
593                            (select distinct segmentid from block_index
594                             where expired is not null)""")
595         segment_size_estimate = cur.fetchone()[0]
596         if not segment_size_estimate:
597             return
598
599         # Next, extract distribution of expired objects (number and size) by
600         # age.  Save the timestamp for "now" so that the classification of
601         # blocks into age buckets will not change later in the function, after
602         # time has passed.  Set any timestamps in the future to now, so we are
603         # guaranteed that for the rest of this function, age is always
604         # non-negative.
605         cur.execute("select julianday('now')")
606         now = cur.fetchone()[0]
607
608         cur.execute("""update block_index set timestamp = ?
609                        where timestamp > ? and expired is not null""",
610                     (now, now))
611
612         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
613                        from block_index where expired = 0
614                        group by age order by age""", (now,))
615         distribution = cur.fetchall()
616
617         # Start to determine the buckets for expired objects.  Heuristics used:
618         #   - An upper bound on the number of buckets is given by the number of
619         #     segments we estimate it will take to store all data.  In fact,
620         #     aim for a couple of segments per bucket.
621         #   - Place very young objects in bucket 0 (place with new objects)
622         #     unless there are enough of them to warrant a separate bucket.
623         #   - Try not to create unnecessarily many buckets, since fewer buckets
624         #     will allow repacked data to be grouped based on spatial locality
625         #     (while more buckets will group by temporal locality).  We want a
626         #     balance.
627         MIN_AGE = 4
628         total_bytes = sum([i[2] for i in distribution])
629         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
630         min_size = 1.5 * segment_size_estimate
631         target_size = max(2 * segment_size_estimate,
632                           total_bytes / target_buckets)
633
634         print "segment_size:", segment_size_estimate
635         print "distribution:", distribution
636         print "total_bytes:", total_bytes
637         print "target_buckets:", target_buckets
638         print "min, target size:", min_size, target_size
639
640         # Chosen cutoffs.  Each bucket consists of objects with age greater
641         # than one cutoff value, but not greater than the next largest cutoff.
642         cutoffs = []
643
644         # Starting with the oldest objects, begin grouping together into
645         # buckets of size at least target_size bytes.
646         distribution.reverse()
647         bucket_size = 0
648         min_age_bucket = False
649         for (age, items, size) in distribution:
650             if bucket_size >= target_size \
651                 or (age < MIN_AGE and not min_age_bucket):
652                 if bucket_size < target_size and len(cutoffs) > 0:
653                     cutoffs.pop()
654                 cutoffs.append(age)
655                 bucket_size = 0
656
657             bucket_size += size
658             if age < MIN_AGE:
659                 min_age_bucket = True
660
661         # The last (youngest) bucket will be group 0, unless it has enough data
662         # to be of size min_size by itself, or there happen to be no objects
663         # less than MIN_AGE at all.
664         if bucket_size >= min_size or not min_age_bucket:
665             cutoffs.append(-1)
666         cutoffs.append(-1)
667
668         print "cutoffs:", cutoffs
669
670         # Update the database to assign each object to the appropriate bucket.
671         cutoffs.reverse()
672         for i in range(len(cutoffs)):
673             cur.execute("""update block_index set expired = ?
674                            where round(? - timestamp) > ?""",
675                         (i, now, cutoffs[i]))