LBS v0.5 release.
[cumulus.git] / lbs.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 # Maximum number of nested indirect references allowed in a snapshot.
16 MAX_RECURSION_DEPTH = 3
17
18 class Struct:
19     """A class which merely acts as a data container.
20
21     Instances of this class (or its subclasses) are merely used to store data
22     in various attributes.  No methods are provided.
23     """
24
25     def __repr__(self):
26         return "<%s %s>" % (self.__class__, self.__dict__)
27
28 CHECKSUM_ALGORITHMS = {
29     'sha1': sha.new
30 }
31
32 class ChecksumCreator:
33     """Compute an LBS checksum for provided data.
34
35     The algorithm used is selectable, but currently defaults to sha1.
36     """
37
38     def __init__(self, algorithm='sha1'):
39         self.algorithm = algorithm
40         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
41
42     def update(self, data):
43         self.hash.update(data)
44         return self
45
46     def compute(self):
47         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
48
49 class ChecksumVerifier:
50     """Verify whether a checksum from a snapshot matches the supplied data."""
51
52     def __init__(self, checksumstr):
53         """Create an object to check the supplied checksum."""
54
55         (algo, checksum) = checksumstr.split("=", 1)
56         self.checksum = checksum
57         self.hash = CHECKSUM_ALGORITHMS[algo]()
58
59     def update(self, data):
60         self.hash.update(data)
61
62     def valid(self):
63         """Return a boolean indicating whether the checksum matches."""
64
65         result = self.hash.hexdigest()
66         return result == self.checksum
67
68 class LowlevelDataStore:
69     """Access to the backup store containing segments and snapshot descriptors.
70
71     Instances of this class are used to get direct filesystem-level access to
72     the backup data.  To read a backup, a caller will ordinarily not care about
73     direct access to backup segments, but will instead merely need to access
74     objects from those segments.  The ObjectStore class provides a suitable
75     wrapper around a DataStore to give this high-level access.
76     """
77
78     def __init__(self, path):
79         self.path = path
80
81     # Low-level filesystem access.  These methods could be overwritten to
82     # provide access to remote data stores.
83     def lowlevel_list(self):
84         """Get a listing of files stored."""
85
86         return os.listdir(self.path)
87
88     def lowlevel_open(self, filename):
89         """Return a file-like object for reading data from the given file."""
90
91         return open(os.path.join(self.path, filename), 'rb')
92
93     def lowlevel_stat(self, filename):
94         """Return a dictionary of information about the given file.
95
96         Currently, the only defined field is 'size', giving the size of the
97         file in bytes.
98         """
99
100         stat = os.stat(os.path.join(self.path, filename))
101         return {'size': stat.st_size}
102
103     # Slightly higher-level list methods.
104     def list_snapshots(self):
105         for f in self.lowlevel_list():
106             m = re.match(r"^snapshot-(.*)\.lbs$", f)
107             if m:
108                 yield m.group(1)
109
110     def list_segments(self):
111         for f in self.lowlevel_list():
112             m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
113             if m:
114                 yield m.group(1)
115
116 class ObjectStore:
117     def __init__(self, data_store):
118         self.store = data_store
119         self.cachedir = None
120         self.CACHE_SIZE = 16
121         self.lru_list = []
122
123     def get_cachedir(self):
124         if self.cachedir is None:
125             self.cachedir = tempfile.mkdtemp(".lbs")
126         return self.cachedir
127
128     def cleanup(self):
129         if self.cachedir is not None:
130             # TODO: Avoid use of system, make this safer
131             os.system("rm -rf " + self.cachedir)
132         self.cachedir = None
133
134     @staticmethod
135     def parse_ref(refstr):
136         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
137         if not m: return
138
139         segment = m.group(1)
140         object = m.group(2)
141         checksum = m.group(3)
142         slice = m.group(4)
143
144         if checksum is not None:
145             checksum = checksum.lstrip("(").rstrip(")")
146
147         if slice is not None:
148             slice = (int(m.group(5)), int(m.group(6)))
149
150         return (segment, object, checksum, slice)
151
152     def get_segment(self, segment):
153         raw = self.store.lowlevel_open(segment + ".tar.gpg")
154
155         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
156         def copy_thread(src, dst):
157             BLOCK_SIZE = 4096
158             while True:
159                 block = src.read(BLOCK_SIZE)
160                 if len(block) == 0: break
161                 dst.write(block)
162             dst.close()
163
164         thread.start_new_thread(copy_thread, (raw, input))
165         return output
166
167     def load_segment(self, segment):
168         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
169         for item in seg:
170             data_obj = seg.extractfile(item)
171             path = item.name.split('/')
172             if len(path) == 2 and path[0] == segment:
173                 yield (path[1], data_obj.read())
174
175     def load_snapshot(self, snapshot):
176         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
177         return file.read().splitlines(True)
178
179     def extract_segment(self, segment):
180         segdir = os.path.join(self.get_cachedir(), segment)
181         os.mkdir(segdir)
182         for (object, data) in self.load_segment(segment):
183             f = open(os.path.join(segdir, object), 'wb')
184             f.write(data)
185             f.close()
186
187     def load_object(self, segment, object):
188         path = os.path.join(self.get_cachedir(), segment, object)
189         if not os.access(path, os.R_OK):
190             self.extract_segment(segment)
191         if segment in self.lru_list: self.lru_list.remove(segment)
192         self.lru_list.append(segment)
193         while len(self.lru_list) > self.CACHE_SIZE:
194             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
195             self.lru_list = self.lru_list[1:]
196         return open(path, 'rb').read()
197
198     def get(self, refstr):
199         """Fetch the given object and return it.
200
201         The input should be an object reference, in string form.
202         """
203
204         (segment, object, checksum, slice) = self.parse_ref(refstr)
205
206         data = self.load_object(segment, object)
207
208         if checksum is not None:
209             verifier = ChecksumVerifier(checksum)
210             verifier.update(data)
211             if not verifier.valid():
212                 raise ValueError
213
214         if slice is not None:
215             (start, length) = slice
216             data = data[start:start+length]
217             if len(data) != length: raise IndexError
218
219         return data
220
221 def parse(lines, terminate=None):
222     """Generic parser for RFC822-style "Key: Value" data streams.
223
224     This parser can be used to read metadata logs and snapshot root descriptor
225     files.
226
227     lines must be an iterable object which yields a sequence of lines of input.
228
229     If terminate is specified, it is used as a predicate to determine when to
230     stop reading input lines.
231     """
232
233     dict = {}
234     last_key = None
235
236     for l in lines:
237         # Strip off a trailing newline, if present
238         if len(l) > 0 and l[-1] == "\n":
239             l = l[:-1]
240
241         if terminate is not None and terminate(l):
242             if len(dict) > 0: yield dict
243             dict = {}
244             last_key = None
245             continue
246
247         m = re.match(r"^(\w+):\s*(.*)$", l)
248         if m:
249             dict[m.group(1)] = m.group(2)
250             last_key = m.group(1)
251         elif len(l) > 0 and l[0].isspace() and last_key is not None:
252             dict[last_key] += l
253         else:
254             last_key = None
255
256     if len(dict) > 0: yield dict
257
258 def parse_full(lines):
259     try:
260         return parse(lines).next()
261     except StopIteration:
262         return {}
263
264 def read_metadata(object_store, root):
265     """Iterate through all lines in the metadata log, following references."""
266
267     # Stack for keeping track of recursion when following references to
268     # portions of the log.  The last entry in the stack corresponds to the
269     # object currently being parsed.  Each entry is a list of lines which have
270     # been reversed, so that popping successive lines from the end of each list
271     # will return lines of the metadata log in order.
272     stack = []
273
274     def follow_ref(refstr):
275         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
276         lines = object_store.get(refstr).splitlines(True)
277         lines.reverse()
278         stack.append(lines)
279
280     follow_ref(root)
281
282     while len(stack) > 0:
283         top = stack[-1]
284         if len(top) == 0:
285             stack.pop()
286             continue
287         line = top.pop()
288
289         # An indirect reference which we must follow?
290         if len(line) > 0 and line[0] == '@':
291             ref = line[1:]
292             ref.strip()
293             follow_ref(ref)
294         else:
295             yield line
296
297 class MetadataItem:
298     """Metadata for a single file (or directory or...) from a snapshot."""
299
300     # Functions for parsing various datatypes that can appear in a metadata log
301     # item.
302     @staticmethod
303     def decode_int(s):
304         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
305         if s.startswith("0x"):
306             return int(s, 16)
307         elif s.startswith("0"):
308             return int(s, 8)
309         else:
310             return int(s, 10)
311
312     @staticmethod
313     def decode_str(s):
314         """Decode a URI-encoded (%xx escapes) string."""
315         def hex_decode(m): return chr(int(m.group(1), 16))
316         return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
317
318     @staticmethod
319     def raw_str(s):
320         """An unecoded string."""
321         return s
322
323     @staticmethod
324     def decode_user(s):
325         """Decode a user/group to a tuple of uid/gid followed by name."""
326         items = s.split()
327         uid = MetadataItem.decode_int(items[0])
328         name = None
329         if len(items) > 1:
330             if items[1].startswith("(") and items[1].endswith(")"):
331                 name = MetadataItem.decode_str(items[1][1:-1])
332         return (uid, name)
333
334     @staticmethod
335     def decode_device(s):
336         """Decode a device major/minor number."""
337         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
338         return (major, minor)
339
340     class Items: pass
341
342     def __init__(self, fields, object_store):
343         """Initialize from a dictionary of key/value pairs from metadata log."""
344
345         self.fields = fields
346         self.object_store = object_store
347         self.keys = []
348         self.items = self.Items()
349         for (k, v) in fields.items():
350             if k in self.field_types:
351                 decoder = self.field_types[k]
352                 setattr(self.items, k, decoder(v))
353                 self.keys.append(k)
354
355     def data(self):
356         """Return an iterator for the data blocks that make up a file."""
357
358         # This traverses the list of blocks that make up a file, following
359         # indirect references.  It is implemented in much the same way as
360         # read_metadata, so see that function for details of the technique.
361
362         objects = self.fields['data'].split()
363         objects.reverse()
364         stack = [objects]
365
366         def follow_ref(refstr):
367             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
368             objects = self.object_store.get(refstr).split()
369             objects.reverse()
370             stack.append(objects)
371
372         while len(stack) > 0:
373             top = stack[-1]
374             if len(top) == 0:
375                 stack.pop()
376                 continue
377             ref = top.pop()
378
379             # An indirect reference which we must follow?
380             if len(ref) > 0 and ref[0] == '@':
381                 follow_ref(ref[1:])
382             else:
383                 yield ref
384
385 # Description of fields that might appear, and how they should be parsed.
386 MetadataItem.field_types = {
387     'name': MetadataItem.decode_str,
388     'type': MetadataItem.raw_str,
389     'mode': MetadataItem.decode_int,
390     'device': MetadataItem.decode_device,
391     'user': MetadataItem.decode_user,
392     'group': MetadataItem.decode_user,
393     'mtime': MetadataItem.decode_int,
394     'links': MetadataItem.decode_int,
395     'inode': MetadataItem.raw_str,
396     'checksum': MetadataItem.decode_str,
397     'size': MetadataItem.decode_int,
398     'contents': MetadataItem.decode_str,
399 }
400
401 def iterate_metadata(object_store, root):
402     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
403         yield MetadataItem(d, object_store)
404
405 class LocalDatabase:
406     """Access to the local database of snapshot contents and object checksums.
407
408     The local database is consulted when creating a snapshot to determine what
409     data can be re-used from old snapshots.  Segment cleaning is performed by
410     manipulating the data in the local database; the local database also
411     includes enough data to guide the segment cleaning process.
412     """
413
414     def __init__(self, path, dbname="localdb.sqlite"):
415         self.db_connection = sqlite3.connect(path + "/" + dbname)
416
417     # Low-level database access.  Use these methods when there isn't a
418     # higher-level interface available.  Exception: do, however, remember to
419     # use the commit() method after making changes to make sure they are
420     # actually saved, even when going through higher-level interfaces.
421     def commit(self):
422         "Commit any pending changes to the local database."
423         self.db_connection.commit()
424
425     def rollback(self):
426         "Roll back any pending changes to the local database."
427         self.db_connection.rollback()
428
429     def cursor(self):
430         "Return a DB-API cursor for directly accessing the local database."
431         return self.db_connection.cursor()
432
433     def garbage_collect(self):
434         """Delete entries from old snapshots from the database."""
435
436         cur = self.cursor()
437
438         # Delete old snapshots.
439         cur.execute("""delete from snapshots
440                        where snapshotid < (select max(snapshotid)
441                                            from snapshots)""")
442
443         # Delete entries in the snapshot_contents table which are for
444         # non-existent snapshots.
445         cur.execute("""delete from snapshot_contents
446                        where snapshotid not in
447                            (select snapshotid from snapshots)""")
448
449         # Find segments which contain no objects used by any current snapshots,
450         # and delete them from the segment table.
451         cur.execute("""delete from segments where segmentid not in
452                            (select distinct segmentid from snapshot_contents
453                                 natural join block_index)""")
454
455         # Finally, delete objects contained in non-existent segments.  We can't
456         # simply delete unused objects, since we use the set of unused objects
457         # to determine the used/free ratio of segments.
458         cur.execute("""delete from block_index
459                        where segmentid not in
460                            (select segmentid from segments)""")
461
462     # Segment cleaning.
463     class SegmentInfo(Struct): pass
464
465     def get_segment_cleaning_list(self, age_boost=0.0):
466         """Return a list of all current segments with information for cleaning.
467
468         Return all segments which are currently known in the local database
469         (there might be other, older segments in the archive itself), and
470         return usage statistics for each to help decide which segments to
471         clean.
472
473         The returned list will be sorted by estimated cleaning benefit, with
474         segments that are best to clean at the start of the list.
475
476         If specified, the age_boost parameter (measured in days) will added to
477         the age of each segment, as a way of adjusting the benefit computation
478         before a long-lived snapshot is taken (for example, age_boost might be
479         set to 7 when cleaning prior to taking a weekly snapshot).
480         """
481
482         cur = self.cursor()
483         segments = []
484         cur.execute("""select segmentid, used, size, mtime,
485                        julianday('now') - mtime as age from segment_info""")
486         for row in cur:
487             info = self.SegmentInfo()
488             info.id = row[0]
489             info.used_bytes = row[1]
490             info.size_bytes = row[2]
491             info.mtime = row[3]
492             info.age_days = row[4]
493
494             # Benefit calculation: u is the estimated fraction of each segment
495             # which is utilized (bytes belonging to objects still in use
496             # divided by total size; this doesn't take compression or storage
497             # overhead into account, but should give a reasonable estimate).
498             #
499             # The total benefit is a heuristic that combines several factors:
500             # the amount of space that can be reclaimed (1 - u), an ageing
501             # factor (info.age_days) that favors cleaning old segments to young
502             # ones and also is more likely to clean segments that will be
503             # rewritten for long-lived snapshots (age_boost), and finally a
504             # penalty factor for the cost of re-uploading data (u + 0.1).
505             u = info.used_bytes / info.size_bytes
506             info.cleaning_benefit \
507                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
508
509             segments.append(info)
510
511         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
512         return segments
513
514     def mark_segment_expired(self, segment):
515         """Mark a segment for cleaning in the local database.
516
517         The segment parameter should be either a SegmentInfo object or an
518         integer segment id.  Objects in the given segment will be marked as
519         expired, which means that any future snapshots that would re-use those
520         objects will instead write out a new copy of the object, and thus no
521         future snapshots will depend upon the given segment.
522         """
523
524         if isinstance(segment, int):
525             id = segment
526         elif isinstance(segment, self.SegmentInfo):
527             id = segment.id
528         else:
529             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
530
531         cur = self.cursor()
532         cur.execute("update block_index set expired = 1 where segmentid = ?",
533                     (id,))
534
535     def balance_expired_objects(self):
536         """Analyze expired objects in segments to be cleaned and group by age.
537
538         Update the block_index table of the local database to group expired
539         objects by age.  The exact number of buckets and the cutoffs for each
540         are dynamically determined.  Calling this function after marking
541         segments expired will help in the segment cleaning process, by ensuring
542         that when active objects from clean segments are rewritten, they will
543         be placed into new segments roughly grouped by age.
544         """
545
546         # The expired column of the block_index table is used when generating a
547         # new LBS snapshot.  A null value indicates that an object may be
548         # re-used.  Otherwise, an object must be written into a new segment if
549         # needed.  Objects with distinct expired values will be written into
550         # distinct segments, to allow for some grouping by age.  The value 0 is
551         # somewhat special in that it indicates any rewritten objects can be
552         # placed in the same segment as completely new objects; this can be
553         # used for very young objects which have been expired, or objects not
554         # expected to be encountered.
555         #
556         # In the balancing process, all objects which are not used in any
557         # current snapshots will have expired set to 0.  Objects which have
558         # been seen will be sorted by age and will have expired values set to
559         # 0, 1, 2, and so on based on age (with younger objects being assigned
560         # lower values).  The number of buckets and the age cutoffs is
561         # determined by looking at the distribution of block ages.
562
563         cur = self.cursor()
564
565         # First step: Mark all unused-and-expired objects with expired = -1,
566         # which will cause us to mostly ignore these objects when rebalancing.
567         # At the end, we will set these objects to be in group expired = 0.
568         # Mark expired objects which still seem to be in use with expired = 0;
569         # these objects will later have values set to indicate groupings of
570         # objects when repacking.
571         cur.execute("""update block_index set expired = -1
572                        where expired is not null""")
573
574         cur.execute("""update block_index set expired = 0
575                        where expired is not null and blockid in
576                            (select blockid from snapshot_contents)""")
577
578         # We will want to aim for at least one full segment for each bucket
579         # that we eventually create, but don't know how many bytes that should
580         # be due to compression.  So compute the average number of bytes in
581         # each expired segment as a rough estimate for the minimum size of each
582         # bucket.  (This estimate could be thrown off by many not-fully-packed
583         # segments, but for now don't worry too much about that.)  If we can't
584         # compute an average, it's probably because there are no expired
585         # segments, so we have no more work to do.
586         cur.execute("""select avg(size) from segment_info
587                        where segmentid in
588                            (select distinct segmentid from block_index
589                             where expired is not null)""")
590         segment_size_estimate = cur.fetchone()[0]
591         if not segment_size_estimate:
592             return
593
594         # Next, extract distribution of expired objects (number and size) by
595         # age.  Save the timestamp for "now" so that the classification of
596         # blocks into age buckets will not change later in the function, after
597         # time has passed.  Set any timestamps in the future to now, so we are
598         # guaranteed that for the rest of this function, age is always
599         # non-negative.
600         cur.execute("select julianday('now')")
601         now = cur.fetchone()[0]
602
603         cur.execute("""update block_index set timestamp = ?
604                        where timestamp > ? and expired is not null""",
605                     (now, now))
606
607         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
608                        from block_index where expired = 0
609                        group by age order by age""", (now,))
610         distribution = cur.fetchall()
611
612         # Start to determine the buckets for expired objects.  Heuristics used:
613         #   - An upper bound on the number of buckets is given by the number of
614         #     segments we estimate it will take to store all data.  In fact,
615         #     aim for a couple of segments per bucket.
616         #   - Place very young objects in bucket 0 (place with new objects)
617         #     unless there are enough of them to warrant a separate bucket.
618         #   - Try not to create unnecessarily many buckets, since fewer buckets
619         #     will allow repacked data to be grouped based on spatial locality
620         #     (while more buckets will group by temporal locality).  We want a
621         #     balance.
622         MIN_AGE = 4
623         total_bytes = sum([i[2] for i in distribution])
624         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
625         min_size = 1.5 * segment_size_estimate
626         target_size = max(2 * segment_size_estimate,
627                           total_bytes / target_buckets)
628
629         print "segment_size:", segment_size_estimate
630         print "distribution:", distribution
631         print "total_bytes:", total_bytes
632         print "target_buckets:", target_buckets
633         print "min, target size:", min_size, target_size
634
635         # Chosen cutoffs.  Each bucket consists of objects with age greater
636         # than one cutoff value, but not greater than the next largest cutoff.
637         cutoffs = []
638
639         # Starting with the oldest objects, begin grouping together into
640         # buckets of size at least target_size bytes.
641         distribution.reverse()
642         bucket_size = 0
643         min_age_bucket = False
644         for (age, items, size) in distribution:
645             if bucket_size >= target_size \
646                 or (age < MIN_AGE and not min_age_bucket):
647                 if bucket_size < target_size and len(cutoffs) > 0:
648                     cutoffs.pop()
649                 cutoffs.append(age)
650                 bucket_size = 0
651
652             bucket_size += size
653             if age < MIN_AGE:
654                 min_age_bucket = True
655
656         # The last (youngest) bucket will be group 0, unless it has enough data
657         # to be of size min_size by itself, or there happen to be no objects
658         # less than MIN_AGE at all.
659         if bucket_size >= min_size or not min_age_bucket:
660             cutoffs.append(-1)
661         cutoffs.append(-1)
662
663         print "cutoffs:", cutoffs
664
665         # Update the database to assign each object to the appropriate bucket.
666         cutoffs.reverse()
667         for i in range(len(cutoffs)):
668             cur.execute("""update block_index set expired = ?
669                            where round(? - timestamp) > ? and expired >= 0""",
670                         (i, now, cutoffs[i]))
671         cur.execute("update block_index set expired = 0 where expired = -1")