Add format support for efficient sparse file handling.
[cumulus.git] / lbs.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 # The largest supported snapshot format that can be understood.
16 FORMAT_VERSION = (0, 6)         # LBS Snapshot v0.6
17
18 # Maximum number of nested indirect references allowed in a snapshot.
19 MAX_RECURSION_DEPTH = 3
20
21 class Struct:
22     """A class which merely acts as a data container.
23
24     Instances of this class (or its subclasses) are merely used to store data
25     in various attributes.  No methods are provided.
26     """
27
28     def __repr__(self):
29         return "<%s %s>" % (self.__class__, self.__dict__)
30
31 CHECKSUM_ALGORITHMS = {
32     'sha1': sha.new
33 }
34
35 class ChecksumCreator:
36     """Compute an LBS checksum for provided data.
37
38     The algorithm used is selectable, but currently defaults to sha1.
39     """
40
41     def __init__(self, algorithm='sha1'):
42         self.algorithm = algorithm
43         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
44
45     def update(self, data):
46         self.hash.update(data)
47         return self
48
49     def compute(self):
50         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
51
52 class ChecksumVerifier:
53     """Verify whether a checksum from a snapshot matches the supplied data."""
54
55     def __init__(self, checksumstr):
56         """Create an object to check the supplied checksum."""
57
58         (algo, checksum) = checksumstr.split("=", 1)
59         self.checksum = checksum
60         self.hash = CHECKSUM_ALGORITHMS[algo]()
61
62     def update(self, data):
63         self.hash.update(data)
64
65     def valid(self):
66         """Return a boolean indicating whether the checksum matches."""
67
68         result = self.hash.hexdigest()
69         return result == self.checksum
70
71 class LowlevelDataStore:
72     """Access to the backup store containing segments and snapshot descriptors.
73
74     Instances of this class are used to get direct filesystem-level access to
75     the backup data.  To read a backup, a caller will ordinarily not care about
76     direct access to backup segments, but will instead merely need to access
77     objects from those segments.  The ObjectStore class provides a suitable
78     wrapper around a DataStore to give this high-level access.
79     """
80
81     def __init__(self, path):
82         self.path = path
83
84     # Low-level filesystem access.  These methods could be overwritten to
85     # provide access to remote data stores.
86     def lowlevel_list(self):
87         """Get a listing of files stored."""
88
89         return os.listdir(self.path)
90
91     def lowlevel_open(self, filename):
92         """Return a file-like object for reading data from the given file."""
93
94         return open(os.path.join(self.path, filename), 'rb')
95
96     def lowlevel_stat(self, filename):
97         """Return a dictionary of information about the given file.
98
99         Currently, the only defined field is 'size', giving the size of the
100         file in bytes.
101         """
102
103         stat = os.stat(os.path.join(self.path, filename))
104         return {'size': stat.st_size}
105
106     # Slightly higher-level list methods.
107     def list_snapshots(self):
108         for f in self.lowlevel_list():
109             m = re.match(r"^snapshot-(.*)\.lbs$", f)
110             if m:
111                 yield m.group(1)
112
113     def list_segments(self):
114         for f in self.lowlevel_list():
115             m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
116             if m:
117                 yield m.group(1)
118
119 class ObjectStore:
120     def __init__(self, data_store):
121         self.store = data_store
122         self.cachedir = None
123         self.CACHE_SIZE = 16
124         self.lru_list = []
125
126     def get_cachedir(self):
127         if self.cachedir is None:
128             self.cachedir = tempfile.mkdtemp(".lbs")
129         return self.cachedir
130
131     def cleanup(self):
132         if self.cachedir is not None:
133             # TODO: Avoid use of system, make this safer
134             os.system("rm -rf " + self.cachedir)
135         self.cachedir = None
136
137     @staticmethod
138     def parse_ref(refstr):
139         m = re.match(r"^zero\[(\d+)\+(\d+)\]$", refstr)
140         if m:
141             return ("zero", None, None, (int(m.group(1)), int(m.group(2))))
142
143         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(\d+)\+(\d+)\])?$", refstr)
144         if not m: return
145
146         segment = m.group(1)
147         object = m.group(2)
148         checksum = m.group(3)
149         slice = m.group(4)
150
151         if checksum is not None:
152             checksum = checksum.lstrip("(").rstrip(")")
153
154         if slice is not None:
155             slice = (int(m.group(5)), int(m.group(6)))
156
157         return (segment, object, checksum, slice)
158
159     def get_segment(self, segment):
160         raw = self.store.lowlevel_open(segment + ".tar.gpg")
161
162         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
163         def copy_thread(src, dst):
164             BLOCK_SIZE = 4096
165             while True:
166                 block = src.read(BLOCK_SIZE)
167                 if len(block) == 0: break
168                 dst.write(block)
169             dst.close()
170
171         thread.start_new_thread(copy_thread, (raw, input))
172         return output
173
174     def load_segment(self, segment):
175         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
176         for item in seg:
177             data_obj = seg.extractfile(item)
178             path = item.name.split('/')
179             if len(path) == 2 and path[0] == segment:
180                 yield (path[1], data_obj.read())
181
182     def load_snapshot(self, snapshot):
183         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
184         return file.read().splitlines(True)
185
186     def extract_segment(self, segment):
187         segdir = os.path.join(self.get_cachedir(), segment)
188         os.mkdir(segdir)
189         for (object, data) in self.load_segment(segment):
190             f = open(os.path.join(segdir, object), 'wb')
191             f.write(data)
192             f.close()
193
194     def load_object(self, segment, object):
195         path = os.path.join(self.get_cachedir(), segment, object)
196         if not os.access(path, os.R_OK):
197             self.extract_segment(segment)
198         if segment in self.lru_list: self.lru_list.remove(segment)
199         self.lru_list.append(segment)
200         while len(self.lru_list) > self.CACHE_SIZE:
201             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
202             self.lru_list = self.lru_list[1:]
203         return open(path, 'rb').read()
204
205     def get(self, refstr):
206         """Fetch the given object and return it.
207
208         The input should be an object reference, in string form.
209         """
210
211         (segment, object, checksum, slice) = self.parse_ref(refstr)
212
213         if segment == "zero":
214             return "\0" * slice[1]
215
216         data = self.load_object(segment, object)
217
218         if checksum is not None:
219             verifier = ChecksumVerifier(checksum)
220             verifier.update(data)
221             if not verifier.valid():
222                 raise ValueError
223
224         if slice is not None:
225             (start, length) = slice
226             data = data[start:start+length]
227             if len(data) != length: raise IndexError
228
229         return data
230
231 def parse(lines, terminate=None):
232     """Generic parser for RFC822-style "Key: Value" data streams.
233
234     This parser can be used to read metadata logs and snapshot root descriptor
235     files.
236
237     lines must be an iterable object which yields a sequence of lines of input.
238
239     If terminate is specified, it is used as a predicate to determine when to
240     stop reading input lines.
241     """
242
243     dict = {}
244     last_key = None
245
246     for l in lines:
247         # Strip off a trailing newline, if present
248         if len(l) > 0 and l[-1] == "\n":
249             l = l[:-1]
250
251         if terminate is not None and terminate(l):
252             if len(dict) > 0: yield dict
253             dict = {}
254             last_key = None
255             continue
256
257         m = re.match(r"^(\w+):\s*(.*)$", l)
258         if m:
259             dict[m.group(1)] = m.group(2)
260             last_key = m.group(1)
261         elif len(l) > 0 and l[0].isspace() and last_key is not None:
262             dict[last_key] += l
263         else:
264             last_key = None
265
266     if len(dict) > 0: yield dict
267
268 def parse_full(lines):
269     try:
270         return parse(lines).next()
271     except StopIteration:
272         return {}
273
274 def parse_metadata_version(s):
275     """Convert a string with the snapshot version format to a tuple."""
276
277     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
278     if m is None:
279         return ()
280     else:
281         return tuple([int(d) for d in m.group(1).split(".")])
282
283 def read_metadata(object_store, root):
284     """Iterate through all lines in the metadata log, following references."""
285
286     # Stack for keeping track of recursion when following references to
287     # portions of the log.  The last entry in the stack corresponds to the
288     # object currently being parsed.  Each entry is a list of lines which have
289     # been reversed, so that popping successive lines from the end of each list
290     # will return lines of the metadata log in order.
291     stack = []
292
293     def follow_ref(refstr):
294         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
295         lines = object_store.get(refstr).splitlines(True)
296         lines.reverse()
297         stack.append(lines)
298
299     follow_ref(root)
300
301     while len(stack) > 0:
302         top = stack[-1]
303         if len(top) == 0:
304             stack.pop()
305             continue
306         line = top.pop()
307
308         # An indirect reference which we must follow?
309         if len(line) > 0 and line[0] == '@':
310             ref = line[1:]
311             ref.strip()
312             follow_ref(ref)
313         else:
314             yield line
315
316 class MetadataItem:
317     """Metadata for a single file (or directory or...) from a snapshot."""
318
319     # Functions for parsing various datatypes that can appear in a metadata log
320     # item.
321     @staticmethod
322     def decode_int(s):
323         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
324         if s.startswith("0x"):
325             return int(s, 16)
326         elif s.startswith("0"):
327             return int(s, 8)
328         else:
329             return int(s, 10)
330
331     @staticmethod
332     def decode_str(s):
333         """Decode a URI-encoded (%xx escapes) string."""
334         def hex_decode(m): return chr(int(m.group(1), 16))
335         return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
336
337     @staticmethod
338     def raw_str(s):
339         """An unecoded string."""
340         return s
341
342     @staticmethod
343     def decode_user(s):
344         """Decode a user/group to a tuple of uid/gid followed by name."""
345         items = s.split()
346         uid = MetadataItem.decode_int(items[0])
347         name = None
348         if len(items) > 1:
349             if items[1].startswith("(") and items[1].endswith(")"):
350                 name = MetadataItem.decode_str(items[1][1:-1])
351         return (uid, name)
352
353     @staticmethod
354     def decode_device(s):
355         """Decode a device major/minor number."""
356         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
357         return (major, minor)
358
359     class Items: pass
360
361     def __init__(self, fields, object_store):
362         """Initialize from a dictionary of key/value pairs from metadata log."""
363
364         self.fields = fields
365         self.object_store = object_store
366         self.keys = []
367         self.items = self.Items()
368         for (k, v) in fields.items():
369             if k in self.field_types:
370                 decoder = self.field_types[k]
371                 setattr(self.items, k, decoder(v))
372                 self.keys.append(k)
373
374     def data(self):
375         """Return an iterator for the data blocks that make up a file."""
376
377         # This traverses the list of blocks that make up a file, following
378         # indirect references.  It is implemented in much the same way as
379         # read_metadata, so see that function for details of the technique.
380
381         objects = self.fields['data'].split()
382         objects.reverse()
383         stack = [objects]
384
385         def follow_ref(refstr):
386             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
387             objects = self.object_store.get(refstr).split()
388             objects.reverse()
389             stack.append(objects)
390
391         while len(stack) > 0:
392             top = stack[-1]
393             if len(top) == 0:
394                 stack.pop()
395                 continue
396             ref = top.pop()
397
398             # An indirect reference which we must follow?
399             if len(ref) > 0 and ref[0] == '@':
400                 follow_ref(ref[1:])
401             else:
402                 yield ref
403
404 # Description of fields that might appear, and how they should be parsed.
405 MetadataItem.field_types = {
406     'name': MetadataItem.decode_str,
407     'type': MetadataItem.raw_str,
408     'mode': MetadataItem.decode_int,
409     'device': MetadataItem.decode_device,
410     'user': MetadataItem.decode_user,
411     'group': MetadataItem.decode_user,
412     'ctime': MetadataItem.decode_int,
413     'mtime': MetadataItem.decode_int,
414     'links': MetadataItem.decode_int,
415     'inode': MetadataItem.raw_str,
416     'checksum': MetadataItem.decode_str,
417     'size': MetadataItem.decode_int,
418     'contents': MetadataItem.decode_str,
419     'target': MetadataItem.decode_str,
420 }
421
422 def iterate_metadata(object_store, root):
423     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
424         yield MetadataItem(d, object_store)
425
426 class LocalDatabase:
427     """Access to the local database of snapshot contents and object checksums.
428
429     The local database is consulted when creating a snapshot to determine what
430     data can be re-used from old snapshots.  Segment cleaning is performed by
431     manipulating the data in the local database; the local database also
432     includes enough data to guide the segment cleaning process.
433     """
434
435     def __init__(self, path, dbname="localdb.sqlite"):
436         self.db_connection = sqlite3.connect(path + "/" + dbname)
437
438     # Low-level database access.  Use these methods when there isn't a
439     # higher-level interface available.  Exception: do, however, remember to
440     # use the commit() method after making changes to make sure they are
441     # actually saved, even when going through higher-level interfaces.
442     def commit(self):
443         "Commit any pending changes to the local database."
444         self.db_connection.commit()
445
446     def rollback(self):
447         "Roll back any pending changes to the local database."
448         self.db_connection.rollback()
449
450     def cursor(self):
451         "Return a DB-API cursor for directly accessing the local database."
452         return self.db_connection.cursor()
453
454     def garbage_collect(self):
455         """Delete entries from old snapshots from the database."""
456
457         cur = self.cursor()
458
459         # Delete old snapshots.
460         cur.execute("""delete from snapshots
461                        where snapshotid < (select max(snapshotid)
462                                            from snapshots)""")
463
464         # Delete entries in the segments_used table which are for non-existent
465         # snapshots.
466         cur.execute("""delete from segments_used
467                        where snapshotid not in
468                            (select snapshotid from snapshots)""")
469
470         # Find segments which contain no objects used by any current snapshots,
471         # and delete them from the segment table.
472         cur.execute("""delete from segments where segmentid not in
473                            (select segmentid from segments_used)""")
474
475         # Finally, delete objects contained in non-existent segments.  We can't
476         # simply delete unused objects, since we use the set of unused objects
477         # to determine the used/free ratio of segments.
478         cur.execute("""delete from block_index
479                        where segmentid not in
480                            (select segmentid from segments)""")
481
482     # Segment cleaning.
483     class SegmentInfo(Struct): pass
484
485     def get_segment_cleaning_list(self, age_boost=0.0):
486         """Return a list of all current segments with information for cleaning.
487
488         Return all segments which are currently known in the local database
489         (there might be other, older segments in the archive itself), and
490         return usage statistics for each to help decide which segments to
491         clean.
492
493         The returned list will be sorted by estimated cleaning benefit, with
494         segments that are best to clean at the start of the list.
495
496         If specified, the age_boost parameter (measured in days) will added to
497         the age of each segment, as a way of adjusting the benefit computation
498         before a long-lived snapshot is taken (for example, age_boost might be
499         set to 7 when cleaning prior to taking a weekly snapshot).
500         """
501
502         cur = self.cursor()
503         segments = []
504         cur.execute("""select segmentid, used, size, mtime,
505                        julianday('now') - mtime as age from segment_info""")
506         for row in cur:
507             info = self.SegmentInfo()
508             info.id = row[0]
509             info.used_bytes = row[1]
510             info.size_bytes = row[2]
511             info.mtime = row[3]
512             info.age_days = row[4]
513
514             # Benefit calculation: u is the estimated fraction of each segment
515             # which is utilized (bytes belonging to objects still in use
516             # divided by total size; this doesn't take compression or storage
517             # overhead into account, but should give a reasonable estimate).
518             #
519             # The total benefit is a heuristic that combines several factors:
520             # the amount of space that can be reclaimed (1 - u), an ageing
521             # factor (info.age_days) that favors cleaning old segments to young
522             # ones and also is more likely to clean segments that will be
523             # rewritten for long-lived snapshots (age_boost), and finally a
524             # penalty factor for the cost of re-uploading data (u + 0.1).
525             u = info.used_bytes / info.size_bytes
526             info.cleaning_benefit \
527                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
528
529             segments.append(info)
530
531         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
532         return segments
533
534     def mark_segment_expired(self, segment):
535         """Mark a segment for cleaning in the local database.
536
537         The segment parameter should be either a SegmentInfo object or an
538         integer segment id.  Objects in the given segment will be marked as
539         expired, which means that any future snapshots that would re-use those
540         objects will instead write out a new copy of the object, and thus no
541         future snapshots will depend upon the given segment.
542         """
543
544         if isinstance(segment, int):
545             id = segment
546         elif isinstance(segment, self.SegmentInfo):
547             id = segment.id
548         else:
549             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
550
551         cur = self.cursor()
552         cur.execute("update block_index set expired = 1 where segmentid = ?",
553                     (id,))
554
555     def balance_expired_objects(self):
556         """Analyze expired objects in segments to be cleaned and group by age.
557
558         Update the block_index table of the local database to group expired
559         objects by age.  The exact number of buckets and the cutoffs for each
560         are dynamically determined.  Calling this function after marking
561         segments expired will help in the segment cleaning process, by ensuring
562         that when active objects from clean segments are rewritten, they will
563         be placed into new segments roughly grouped by age.
564         """
565
566         # The expired column of the block_index table is used when generating a
567         # new LBS snapshot.  A null value indicates that an object may be
568         # re-used.  Otherwise, an object must be written into a new segment if
569         # needed.  Objects with distinct expired values will be written into
570         # distinct segments, to allow for some grouping by age.  The value 0 is
571         # somewhat special in that it indicates any rewritten objects can be
572         # placed in the same segment as completely new objects; this can be
573         # used for very young objects which have been expired, or objects not
574         # expected to be encountered.
575         #
576         # In the balancing process, all objects which are not used in any
577         # current snapshots will have expired set to 0.  Objects which have
578         # been seen will be sorted by age and will have expired values set to
579         # 0, 1, 2, and so on based on age (with younger objects being assigned
580         # lower values).  The number of buckets and the age cutoffs is
581         # determined by looking at the distribution of block ages.
582
583         cur = self.cursor()
584
585         # Mark all expired objects with expired = 0; these objects will later
586         # have values set to indicate groupings of objects when repacking.
587         cur.execute("""update block_index set expired = 0
588                        where expired is not null""")
589
590         # We will want to aim for at least one full segment for each bucket
591         # that we eventually create, but don't know how many bytes that should
592         # be due to compression.  So compute the average number of bytes in
593         # each expired segment as a rough estimate for the minimum size of each
594         # bucket.  (This estimate could be thrown off by many not-fully-packed
595         # segments, but for now don't worry too much about that.)  If we can't
596         # compute an average, it's probably because there are no expired
597         # segments, so we have no more work to do.
598         cur.execute("""select avg(size) from segments
599                        where segmentid in
600                            (select distinct segmentid from block_index
601                             where expired is not null)""")
602         segment_size_estimate = cur.fetchone()[0]
603         if not segment_size_estimate:
604             return
605
606         # Next, extract distribution of expired objects (number and size) by
607         # age.  Save the timestamp for "now" so that the classification of
608         # blocks into age buckets will not change later in the function, after
609         # time has passed.  Set any timestamps in the future to now, so we are
610         # guaranteed that for the rest of this function, age is always
611         # non-negative.
612         cur.execute("select julianday('now')")
613         now = cur.fetchone()[0]
614
615         cur.execute("""update block_index set timestamp = ?
616                        where timestamp > ? and expired is not null""",
617                     (now, now))
618
619         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
620                        from block_index where expired = 0
621                        group by age order by age""", (now,))
622         distribution = cur.fetchall()
623
624         # Start to determine the buckets for expired objects.  Heuristics used:
625         #   - An upper bound on the number of buckets is given by the number of
626         #     segments we estimate it will take to store all data.  In fact,
627         #     aim for a couple of segments per bucket.
628         #   - Place very young objects in bucket 0 (place with new objects)
629         #     unless there are enough of them to warrant a separate bucket.
630         #   - Try not to create unnecessarily many buckets, since fewer buckets
631         #     will allow repacked data to be grouped based on spatial locality
632         #     (while more buckets will group by temporal locality).  We want a
633         #     balance.
634         MIN_AGE = 4
635         total_bytes = sum([i[2] for i in distribution])
636         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
637         min_size = 1.5 * segment_size_estimate
638         target_size = max(2 * segment_size_estimate,
639                           total_bytes / target_buckets)
640
641         print "segment_size:", segment_size_estimate
642         print "distribution:", distribution
643         print "total_bytes:", total_bytes
644         print "target_buckets:", target_buckets
645         print "min, target size:", min_size, target_size
646
647         # Chosen cutoffs.  Each bucket consists of objects with age greater
648         # than one cutoff value, but not greater than the next largest cutoff.
649         cutoffs = []
650
651         # Starting with the oldest objects, begin grouping together into
652         # buckets of size at least target_size bytes.
653         distribution.reverse()
654         bucket_size = 0
655         min_age_bucket = False
656         for (age, items, size) in distribution:
657             if bucket_size >= target_size \
658                 or (age < MIN_AGE and not min_age_bucket):
659                 if bucket_size < target_size and len(cutoffs) > 0:
660                     cutoffs.pop()
661                 cutoffs.append(age)
662                 bucket_size = 0
663
664             bucket_size += size
665             if age < MIN_AGE:
666                 min_age_bucket = True
667
668         # The last (youngest) bucket will be group 0, unless it has enough data
669         # to be of size min_size by itself, or there happen to be no objects
670         # less than MIN_AGE at all.
671         if bucket_size >= min_size or not min_age_bucket:
672             cutoffs.append(-1)
673         cutoffs.append(-1)
674
675         print "cutoffs:", cutoffs
676
677         # Update the database to assign each object to the appropriate bucket.
678         cutoffs.reverse()
679         for i in range(len(cutoffs)):
680             cur.execute("""update block_index set expired = ?
681                            where round(? - timestamp) > ?""",
682                         (i, now, cutoffs[i]))