7eab075b3628ef1a8d1e867d10ee27ed81fbf16e
[cumulus.git] / lbs.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 # Maximum number of nested indirect references allowed in a snapshot.
16 MAX_RECURSION_DEPTH = 3
17
18 class Struct:
19     """A class which merely acts as a data container.
20
21     Instances of this class (or its subclasses) are merely used to store data
22     in various attributes.  No methods are provided.
23     """
24
25     def __repr__(self):
26         return "<%s %s>" % (self.__class__, self.__dict__)
27
28 CHECKSUM_ALGORITHMS = {
29     'sha1': sha.new
30 }
31
32 class ChecksumCreator:
33     """Compute an LBS checksum for provided data.
34
35     The algorithm used is selectable, but currently defaults to sha1.
36     """
37
38     def __init__(self, algorithm='sha1'):
39         self.algorithm = algorithm
40         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
41
42     def update(self, data):
43         self.hash.update(data)
44         return self
45
46     def compute(self):
47         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
48
49 class ChecksumVerifier:
50     """Verify whether a checksum from a snapshot matches the supplied data."""
51
52     def __init__(self, checksumstr):
53         """Create an object to check the supplied checksum."""
54
55         (algo, checksum) = checksumstr.split("=", 1)
56         self.checksum = checksum
57         self.hash = CHECKSUM_ALGORITHMS[algo]()
58
59     def update(self, data):
60         self.hash.update(data)
61
62     def valid(self):
63         """Return a boolean indicating whether the checksum matches."""
64
65         result = self.hash.hexdigest()
66         return result == self.checksum
67
68 class LowlevelDataStore:
69     """Access to the backup store containing segments and snapshot descriptors.
70
71     Instances of this class are used to get direct filesystem-level access to
72     the backup data.  To read a backup, a caller will ordinarily not care about
73     direct access to backup segments, but will instead merely need to access
74     objects from those segments.  The ObjectStore class provides a suitable
75     wrapper around a DataStore to give this high-level access.
76     """
77
78     def __init__(self, path):
79         self.path = path
80
81     # Low-level filesystem access.  These methods could be overwritten to
82     # provide access to remote data stores.
83     def lowlevel_list(self):
84         """Get a listing of files stored."""
85
86         return os.listdir(self.path)
87
88     def lowlevel_open(self, filename):
89         """Return a file-like object for reading data from the given file."""
90
91         return open(os.path.join(self.path, filename), 'rb')
92
93     def lowlevel_stat(self, filename):
94         """Return a dictionary of information about the given file.
95
96         Currently, the only defined field is 'size', giving the size of the
97         file in bytes.
98         """
99
100         stat = os.stat(os.path.join(self.path, filename))
101         return {'size': stat.st_size}
102
103     # Slightly higher-level list methods.
104     def list_snapshots(self):
105         for f in self.lowlevel_list():
106             m = re.match(r"^snapshot-(.*)\.lbs$", f)
107             if m:
108                 yield m.group(1)
109
110     def list_segments(self):
111         for f in self.lowlevel_list():
112             m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
113             if m:
114                 yield m.group(1)
115
116 class ObjectStore:
117     def __init__(self, data_store):
118         self.store = data_store
119         self.cachedir = None
120         self.CACHE_SIZE = 16
121         self.lru_list = []
122
123     def get_cachedir(self):
124         if self.cachedir is None:
125             self.cachedir = tempfile.mkdtemp(".lbs")
126         return self.cachedir
127
128     def cleanup(self):
129         if self.cachedir is not None:
130             # TODO: Avoid use of system, make this safer
131             os.system("rm -rf " + self.cachedir)
132         self.cachedir = None
133
134     @staticmethod
135     def parse_ref(refstr):
136         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
137         if not m: return
138
139         segment = m.group(1)
140         object = m.group(2)
141         checksum = m.group(3)
142         slice = m.group(4)
143
144         if checksum is not None:
145             checksum = checksum.lstrip("(").rstrip(")")
146
147         if slice is not None:
148             slice = (int(m.group(5)), int(m.group(6)))
149
150         return (segment, object, checksum, slice)
151
152     def get_segment(self, segment):
153         raw = self.store.lowlevel_open(segment + ".tar.gpg")
154
155         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
156         def copy_thread(src, dst):
157             BLOCK_SIZE = 4096
158             while True:
159                 block = src.read(BLOCK_SIZE)
160                 if len(block) == 0: break
161                 dst.write(block)
162             dst.close()
163
164         thread.start_new_thread(copy_thread, (raw, input))
165         return output
166
167     def load_segment(self, segment):
168         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
169         for item in seg:
170             data_obj = seg.extractfile(item)
171             path = item.name.split('/')
172             if len(path) == 2 and path[0] == segment:
173                 yield (path[1], data_obj.read())
174
175     def load_snapshot(self, snapshot):
176         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
177         return file.read().splitlines(True)
178
179     def extract_segment(self, segment):
180         segdir = os.path.join(self.get_cachedir(), segment)
181         os.mkdir(segdir)
182         for (object, data) in self.load_segment(segment):
183             f = open(os.path.join(segdir, object), 'wb')
184             f.write(data)
185             f.close()
186
187     def load_object(self, segment, object):
188         path = os.path.join(self.get_cachedir(), segment, object)
189         if not os.access(path, os.R_OK):
190             print "Extracting", segment
191             self.extract_segment(segment)
192         if segment in self.lru_list: self.lru_list.remove(segment)
193         self.lru_list.append(segment)
194         while len(self.lru_list) > self.CACHE_SIZE:
195             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
196             self.lru_list = self.lru_list[1:]
197         return open(path, 'rb').read()
198
199     def get(self, refstr):
200         """Fetch the given object and return it.
201
202         The input should be an object reference, in string form.
203         """
204
205         (segment, object, checksum, slice) = self.parse_ref(refstr)
206
207         data = self.load_object(segment, object)
208
209         if checksum is not None:
210             verifier = ChecksumVerifier(checksum)
211             verifier.update(data)
212             if not verifier.valid():
213                 raise ValueError
214
215         if slice is not None:
216             (start, length) = slice
217             data = data[start:start+length]
218             if len(data) != length: raise IndexError
219
220         return data
221
222 def parse(lines, terminate=None):
223     """Generic parser for RFC822-style "Key: Value" data streams.
224
225     This parser can be used to read metadata logs and snapshot root descriptor
226     files.
227
228     lines must be an iterable object which yields a sequence of lines of input.
229
230     If terminate is specified, it is used as a predicate to determine when to
231     stop reading input lines.
232     """
233
234     dict = {}
235     last_key = None
236
237     for l in lines:
238         # Strip off a trailing newline, if present
239         if len(l) > 0 and l[-1] == "\n":
240             l = l[:-1]
241
242         if terminate is not None and terminate(l):
243             if len(dict) > 0: yield dict
244             dict = {}
245             last_key = None
246             continue
247
248         m = re.match(r"^(\w+):\s*(.*)$", l)
249         if m:
250             dict[m.group(1)] = m.group(2)
251             last_key = m.group(1)
252         elif len(l) > 0 and l[0].isspace() and last_key is not None:
253             dict[last_key] += l
254         else:
255             last_key = None
256
257     if len(dict) > 0: yield dict
258
259 def parse_full(lines):
260     try:
261         return parse(lines).next()
262     except StopIteration:
263         return {}
264
265 def read_metadata(object_store, root):
266     """Iterate through all lines in the metadata log, following references."""
267
268     # Stack for keeping track of recursion when following references to
269     # portions of the log.  The last entry in the stack corresponds to the
270     # object currently being parsed.  Each entry is a list of lines which have
271     # been reversed, so that popping successive lines from the end of each list
272     # will return lines of the metadata log in order.
273     stack = []
274
275     def follow_ref(refstr):
276         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
277         lines = object_store.get(refstr).splitlines(True)
278         lines.reverse()
279         stack.append(lines)
280
281     follow_ref(root)
282
283     while len(stack) > 0:
284         top = stack[-1]
285         if len(top) == 0:
286             stack.pop()
287             continue
288         line = top.pop()
289
290         # An indirect reference which we must follow?
291         if len(line) > 0 and line[0] == '@':
292             ref = line[1:]
293             ref.strip()
294             follow_ref(ref)
295         else:
296             yield line
297
298 class MetadataItem:
299     """Metadata for a single file (or directory or...) from a snapshot."""
300
301     # Functions for parsing various datatypes that can appear in a metadata log
302     # item.
303     @staticmethod
304     def decode_int(s):
305         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
306         if s.startswith("0x"):
307             return int(s, 16)
308         elif s.startswith("0"):
309             return int(s, 8)
310         else:
311             return int(s, 10)
312
313     @staticmethod
314     def decode_str(s):
315         """Decode a URI-encoded (%xx escapes) string."""
316         def hex_decode(m): return chr(int(m.group(1), 16))
317         return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
318
319     @staticmethod
320     def raw_str(s):
321         """An unecoded string."""
322         return s
323
324     @staticmethod
325     def decode_user(s):
326         """Decode a user/group to a tuple of uid/gid followed by name."""
327         items = s.split()
328         uid = MetadataItem.decode_int(items[0])
329         name = None
330         if len(items) > 1:
331             if items[1].startswith("(") and items[1].endswith(")"):
332                 name = MetadataItem.decode_str(items[1][1:-1])
333         return (uid, name)
334
335     @staticmethod
336     def decode_device(s):
337         """Decode a device major/minor number."""
338         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
339         return (major, minor)
340
341     class Items: pass
342
343     def __init__(self, fields, object_store):
344         """Initialize from a dictionary of key/value pairs from metadata log."""
345
346         self.fields = fields
347         self.object_store = object_store
348         self.keys = []
349         self.items = self.Items()
350         for (k, v) in fields.items():
351             if k in self.field_types:
352                 decoder = self.field_types[k]
353                 setattr(self.items, k, decoder(v))
354                 self.keys.append(k)
355
356     def data(self):
357         """Return an iterator for the data blocks that make up a file."""
358
359         # This traverses the list of blocks that make up a file, following
360         # indirect references.  It is implemented in much the same way as
361         # read_metadata, so see that function for details of the technique.
362
363         objects = self.fields['data'].split()
364         objects.reverse()
365         stack = [objects]
366
367         def follow_ref(refstr):
368             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
369             objects = self.object_store.get(refstr).split()
370             objects.reverse()
371             stack.append(objects)
372
373         while len(stack) > 0:
374             top = stack[-1]
375             if len(top) == 0:
376                 stack.pop()
377                 continue
378             ref = top.pop()
379
380             # An indirect reference which we must follow?
381             if len(ref) > 0 and ref[0] == '@':
382                 follow_ref(ref[1:])
383             else:
384                 yield ref
385
386 # Description of fields that might appear, and how they should be parsed.
387 MetadataItem.field_types = {
388     'name': MetadataItem.decode_str,
389     'type': MetadataItem.raw_str,
390     'mode': MetadataItem.decode_int,
391     'device': MetadataItem.decode_device,
392     'user': MetadataItem.decode_user,
393     'group': MetadataItem.decode_user,
394     'mtime': MetadataItem.decode_int,
395     'links': MetadataItem.decode_int,
396     'inode': MetadataItem.raw_str,
397     'checksum': MetadataItem.decode_str,
398     'size': MetadataItem.decode_int,
399     'contents': MetadataItem.decode_str,
400 }
401
402 def iterate_metadata(object_store, root):
403     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
404         yield MetadataItem(d, object_store)
405
406 class LocalDatabase:
407     """Access to the local database of snapshot contents and object checksums.
408
409     The local database is consulted when creating a snapshot to determine what
410     data can be re-used from old snapshots.  Segment cleaning is performed by
411     manipulating the data in the local database; the local database also
412     includes enough data to guide the segment cleaning process.
413     """
414
415     def __init__(self, path, dbname="localdb.sqlite"):
416         self.db_connection = sqlite3.connect(path + "/" + dbname)
417
418     # Low-level database access.  Use these methods when there isn't a
419     # higher-level interface available.  Exception: do, however, remember to
420     # use the commit() method after making changes to make sure they are
421     # actually saved, even when going through higher-level interfaces.
422     def commit(self):
423         "Commit any pending changes to the local database."
424         self.db_connection.commit()
425
426     def rollback(self):
427         "Roll back any pending changes to the local database."
428         self.db_connection.rollback()
429
430     def cursor(self):
431         "Return a DB-API cursor for directly accessing the local database."
432         return self.db_connection.cursor()
433
434     def garbage_collect(self):
435         """Delete entries from old snapshots from the database."""
436
437         cur = self.cursor()
438
439         # Delete old snapshots.
440         cur.execute("""delete from snapshots
441                        where snapshotid < (select max(snapshotid)
442                                            from snapshots)""")
443
444         # Delete entries in the snapshot_contents table which are for
445         # non-existent snapshots.
446         cur.execute("""delete from snapshot_contents
447                        where snapshotid not in
448                            (select snapshotid from snapshots)""")
449
450         # Find segments which contain no objects used by any current snapshots,
451         # and delete them from the segment table.
452         cur.execute("""delete from segments where segmentid not in
453                            (select distinct segmentid from snapshot_contents
454                                 natural join block_index)""")
455
456         # Finally, delete objects contained in non-existent segments.  We can't
457         # simply delete unused objects, since we use the set of unused objects
458         # to determine the used/free ratio of segments.
459         cur.execute("""delete from block_index
460                        where segmentid not in
461                            (select segmentid from segments)""")
462
463     # Segment cleaning.
464     class SegmentInfo(Struct): pass
465
466     def get_segment_cleaning_list(self, age_boost=0.0):
467         """Return a list of all current segments with information for cleaning.
468
469         Return all segments which are currently known in the local database
470         (there might be other, older segments in the archive itself), and
471         return usage statistics for each to help decide which segments to
472         clean.
473
474         The returned list will be sorted by estimated cleaning benefit, with
475         segments that are best to clean at the start of the list.
476
477         If specified, the age_boost parameter (measured in days) will added to
478         the age of each segment, as a way of adjusting the benefit computation
479         before a long-lived snapshot is taken (for example, age_boost might be
480         set to 7 when cleaning prior to taking a weekly snapshot).
481         """
482
483         cur = self.cursor()
484         segments = []
485         cur.execute("""select segmentid, used, size, mtime,
486                        julianday('now') - mtime as age from segment_info""")
487         for row in cur:
488             info = self.SegmentInfo()
489             info.id = row[0]
490             info.used_bytes = row[1]
491             info.size_bytes = row[2]
492             info.mtime = row[3]
493             info.age_days = row[4]
494
495             # Benefit calculation: u is the estimated fraction of each segment
496             # which is utilized (bytes belonging to objects still in use
497             # divided by total size; this doesn't take compression or storage
498             # overhead into account, but should give a reasonable estimate).
499             #
500             # The total benefit is a heuristic that combines several factors:
501             # the amount of space that can be reclaimed (1 - u), an ageing
502             # factor (info.age_days) that favors cleaning old segments to young
503             # ones and also is more likely to clean segments that will be
504             # rewritten for long-lived snapshots (age_boost), and finally a
505             # penalty factor for the cost of re-uploading data (u + 0.1).
506             u = info.used_bytes / info.size_bytes
507             info.cleaning_benefit \
508                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
509
510             segments.append(info)
511
512         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
513         return segments
514
515     def mark_segment_expired(self, segment):
516         """Mark a segment for cleaning in the local database.
517
518         The segment parameter should be either a SegmentInfo object or an
519         integer segment id.  Objects in the given segment will be marked as
520         expired, which means that any future snapshots that would re-use those
521         objects will instead write out a new copy of the object, and thus no
522         future snapshots will depend upon the given segment.
523         """
524
525         if isinstance(segment, int):
526             id = segment
527         elif isinstance(segment, self.SegmentInfo):
528             id = segment.id
529         else:
530             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
531
532         cur = self.cursor()
533         cur.execute("update block_index set expired = 1 where segmentid = ?",
534                     (id,))
535
536     def balance_expired_objects(self):
537         """Analyze expired objects in segments to be cleaned and group by age.
538
539         Update the block_index table of the local database to group expired
540         objects by age.  The exact number of buckets and the cutoffs for each
541         are dynamically determined.  Calling this function after marking
542         segments expired will help in the segment cleaning process, by ensuring
543         that when active objects from clean segments are rewritten, they will
544         be placed into new segments roughly grouped by age.
545         """
546
547         # The expired column of the block_index table is used when generating a
548         # new LBS snapshot.  A null value indicates that an object may be
549         # re-used.  Otherwise, an object must be written into a new segment if
550         # needed.  Objects with distinct expired values will be written into
551         # distinct segments, to allow for some grouping by age.  The value 0 is
552         # somewhat special in that it indicates any rewritten objects can be
553         # placed in the same segment as completely new objects; this can be
554         # used for very young objects which have been expired, or objects not
555         # expected to be encountered.
556         #
557         # In the balancing process, all objects which are not used in any
558         # current snapshots will have expired set to 0.  Objects which have
559         # been seen will be sorted by age and will have expired values set to
560         # 0, 1, 2, and so on based on age (with younger objects being assigned
561         # lower values).  The number of buckets and the age cutoffs is
562         # determined by looking at the distribution of block ages.
563
564         cur = self.cursor()
565
566         # First step: Mark all unused-and-expired objects with expired = -1,
567         # which will cause us to mostly ignore these objects when rebalancing.
568         # At the end, we will set these objects to be in group expired = 0.
569         # Mark expired objects which still seem to be in use with expired = 0;
570         # these objects will later have values set to indicate groupings of
571         # objects when repacking.
572         cur.execute("""update block_index set expired = -1
573                        where expired is not null""")
574
575         cur.execute("""update block_index set expired = 0
576                        where expired is not null and blockid in
577                            (select blockid from snapshot_contents)""")
578
579         # We will want to aim for at least one full segment for each bucket
580         # that we eventually create, but don't know how many bytes that should
581         # be due to compression.  So compute the average number of bytes in
582         # each expired segment as a rough estimate for the minimum size of each
583         # bucket.  (This estimate could be thrown off by many not-fully-packed
584         # segments, but for now don't worry too much about that.)  If we can't
585         # compute an average, it's probably because there are no expired
586         # segments, so we have no more work to do.
587         cur.execute("""select avg(size) from segment_info
588                        where segmentid in
589                            (select distinct segmentid from block_index
590                             where expired is not null)""")
591         segment_size_estimate = cur.fetchone()[0]
592         if not segment_size_estimate:
593             return
594
595         # Next, extract distribution of expired objects (number and size) by
596         # age.  Save the timestamp for "now" so that the classification of
597         # blocks into age buckets will not change later in the function, after
598         # time has passed.  Set any timestamps in the future to now, so we are
599         # guaranteed that for the rest of this function, age is always
600         # non-negative.
601         cur.execute("select julianday('now')")
602         now = cur.fetchone()[0]
603
604         cur.execute("""update block_index set timestamp = ?
605                        where timestamp > ? and expired is not null""",
606                     (now, now))
607
608         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
609                        from block_index where expired = 0
610                        group by age order by age""", (now,))
611         distribution = cur.fetchall()
612
613         # Start to determine the buckets for expired objects.  Heuristics used:
614         #   - An upper bound on the number of buckets is given by the number of
615         #     segments we estimate it will take to store all data.  In fact,
616         #     aim for a couple of segments per bucket.
617         #   - Place very young objects in bucket 0 (place with new objects)
618         #     unless there are enough of them to warrant a separate bucket.
619         #   - Try not to create unnecessarily many buckets, since fewer buckets
620         #     will allow repacked data to be grouped based on spatial locality
621         #     (while more buckets will group by temporal locality).  We want a
622         #     balance.
623         MIN_AGE = 4
624         total_bytes = sum([i[2] for i in distribution])
625         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
626         min_size = 1.5 * segment_size_estimate
627         target_size = max(2 * segment_size_estimate,
628                           total_bytes / target_buckets)
629
630         print "segment_size:", segment_size_estimate
631         print "distribution:", distribution
632         print "total_bytes:", total_bytes
633         print "target_buckets:", target_buckets
634         print "min, target size:", min_size, target_size
635
636         # Chosen cutoffs.  Each bucket consists of objects with age greater
637         # than one cutoff value, but not greater than the next largest cutoff.
638         cutoffs = []
639
640         # Starting with the oldest objects, begin grouping together into
641         # buckets of size at least target_size bytes.
642         distribution.reverse()
643         bucket_size = 0
644         min_age_bucket = False
645         for (age, items, size) in distribution:
646             if bucket_size >= target_size \
647                 or (age < MIN_AGE and not min_age_bucket):
648                 if bucket_size < target_size and len(cutoffs) > 0:
649                     cutoffs.pop()
650                 cutoffs.append(age)
651                 bucket_size = 0
652
653             bucket_size += size
654             if age < MIN_AGE:
655                 min_age_bucket = True
656
657         # The last (youngest) bucket will be group 0, unless it has enough data
658         # to be of size min_size by itself, or there happen to be no objects
659         # less than MIN_AGE at all.
660         if bucket_size >= min_size or not min_age_bucket:
661             cutoffs.append(-1)
662         cutoffs.append(-1)
663
664         print "cutoffs:", cutoffs
665
666         # Update the database to assign each object to the appropriate bucket.
667         cutoffs.reverse()
668         for i in range(len(cutoffs)):
669             cur.execute("""update block_index set expired = ?
670                            where round(? - timestamp) > ? and expired >= 0""",
671                         (i, now, cutoffs[i]))
672         cur.execute("update block_index set expired = 0 where expired = -1")