1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 """High-level interface for working with Cumulus archives.
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23 - listing the snapshots and segments present
24 - reading segment contents
25 - parsing snapshot descriptors and snapshot metadata logs
26 - reading and maintaining the local object database
29 from __future__ import division
30 import hashlib, os, re, tarfile, tempfile, thread
31 from pysqlite2 import dbapi2 as sqlite3
33 import cumulus.store, cumulus.store.file
35 # The largest supported snapshot format that can be understood.
36 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
38 # Maximum number of nested indirect references allowed in a snapshot.
39 MAX_RECURSION_DEPTH = 3
41 # All segments which have been accessed this session.
42 accessed_segments = set()
44 # Table of methods used to filter segments before storage, and corresponding
45 # filename extensions. These are listed in priority order (methods earlier in
46 # the list are tried first).
48 (".gpg", "cumulus-filter-gpg --decrypt"),
50 (".bz2", "bzip2 -dc"),
54 """Decode a URI-encoded (%xx escapes) string."""
55 def hex_decode(m): return chr(int(m.group(1), 16))
56 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
58 """Encode a string to URI-encoded (%xx escapes) form."""
60 if c > '+' and c < '\x7f' and c != '@':
63 return "%%%02x" % (ord(c),)
64 return ''.join(hex_encode(c) for c in s)
67 """A class which merely acts as a data container.
69 Instances of this class (or its subclasses) are merely used to store data
70 in various attributes. No methods are provided.
74 return "<%s %s>" % (self.__class__, self.__dict__)
76 CHECKSUM_ALGORITHMS = {
78 'sha224': hashlib.sha224,
79 'sha256': hashlib.sha256,
82 class ChecksumCreator:
83 """Compute a Cumulus checksum for provided data.
85 The algorithm used is selectable, but currently defaults to sha1.
88 def __init__(self, algorithm='sha1'):
89 self.algorithm = algorithm
90 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
92 def update(self, data):
93 self.hash.update(data)
97 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
99 class ChecksumVerifier:
100 """Verify whether a checksum from a snapshot matches the supplied data."""
102 def __init__(self, checksumstr):
103 """Create an object to check the supplied checksum."""
105 (algo, checksum) = checksumstr.split("=", 1)
106 self.checksum = checksum
107 self.hash = CHECKSUM_ALGORITHMS[algo]()
109 def update(self, data):
110 self.hash.update(data)
113 """Return a boolean indicating whether the checksum matches."""
115 result = self.hash.hexdigest()
116 return result == self.checksum
118 class LowlevelDataStore:
119 """Access to the backup store containing segments and snapshot descriptors.
121 Instances of this class are used to get direct filesystem-level access to
122 the backup data. To read a backup, a caller will ordinarily not care about
123 direct access to backup segments, but will instead merely need to access
124 objects from those segments. The ObjectStore class provides a suitable
125 wrapper around a DataStore to give this high-level access.
128 def __init__(self, path):
129 if isinstance(path, cumulus.store.Store):
131 elif path.find(":") >= 0:
132 self.store = cumulus.store.open(path)
134 self.store = cumulus.store.file.FileStore(path)
136 def _classify(self, filename):
137 for (t, r) in cumulus.store.type_patterns.items():
138 if r.match(filename):
140 return (None, filename)
145 def lowlevel_open(self, filename):
146 """Return a file-like object for reading data from the given file."""
148 (type, filename) = self._classify(filename)
149 return self.store.get(type, filename)
151 def lowlevel_stat(self, filename):
152 """Return a dictionary of information about the given file.
154 Currently, the only defined field is 'size', giving the size of the
158 (type, filename) = self._classify(filename)
159 return self.store.stat(type, filename)
161 # Slightly higher-level list methods.
162 def list_snapshots(self):
163 for f in self.store.list('snapshots'):
164 m = cumulus.store.type_patterns['snapshots'].match(f)
165 if m: yield m.group(1)
167 def list_segments(self):
168 for f in self.store.list('segments'):
169 m = cumulus.store.type_patterns['segments'].match(f)
170 if m: yield m.group(1)
173 def __init__(self, data_store):
174 self.store = data_store
179 def get_cachedir(self):
180 if self.cachedir is None:
181 self.cachedir = tempfile.mkdtemp(".lbs")
185 if self.cachedir is not None:
186 # TODO: Avoid use of system, make this safer
187 os.system("rm -rf " + self.cachedir)
191 def parse_ref(refstr):
192 m = re.match(r"^zero\[(\d+)\]$", refstr)
194 return ("zero", None, None, (0, int(m.group(1)), False))
196 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
201 checksum = m.group(3)
204 if checksum is not None:
205 checksum = checksum.lstrip("(").rstrip(")")
207 if slice is not None:
208 if m.group(9) is not None:
209 # Size-assertion slice
210 slice = (0, int(m.group(9)), True)
211 elif m.group(6) is None:
213 slice = (0, int(m.group(8)), False)
215 slice = (int(m.group(7)), int(m.group(8)), False)
217 return (segment, object, checksum, slice)
219 def get_segment(self, segment):
220 accessed_segments.add(segment)
222 for (extension, filter) in SEGMENT_FILTERS:
224 raw = self.store.lowlevel_open(segment + ".tar" + extension)
226 (input, output) = os.popen2(filter)
227 def copy_thread(src, dst):
230 block = src.read(BLOCK_SIZE)
231 if len(block) == 0: break
235 thread.start_new_thread(copy_thread, (raw, input))
240 raise cumulus.store.NotFoundError
242 def load_segment(self, segment):
243 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
245 data_obj = seg.extractfile(item)
246 path = item.name.split('/')
247 if len(path) == 2 and path[0] == segment:
248 yield (path[1], data_obj.read())
250 def load_snapshot(self, snapshot):
251 file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
252 return file.read().splitlines(True)
254 def extract_segment(self, segment):
255 segdir = os.path.join(self.get_cachedir(), segment)
257 for (object, data) in self.load_segment(segment):
258 f = open(os.path.join(segdir, object), 'wb')
262 def load_object(self, segment, object):
263 accessed_segments.add(segment)
264 path = os.path.join(self.get_cachedir(), segment, object)
265 if not os.access(path, os.R_OK):
266 self.extract_segment(segment)
267 if segment in self.lru_list: self.lru_list.remove(segment)
268 self.lru_list.append(segment)
269 while len(self.lru_list) > self.CACHE_SIZE:
270 os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
271 self.lru_list = self.lru_list[1:]
272 return open(path, 'rb').read()
274 def get(self, refstr):
275 """Fetch the given object and return it.
277 The input should be an object reference, in string form.
280 (segment, object, checksum, slice) = self.parse_ref(refstr)
282 if segment == "zero":
283 return "\0" * slice[1]
285 data = self.load_object(segment, object)
287 if checksum is not None:
288 verifier = ChecksumVerifier(checksum)
289 verifier.update(data)
290 if not verifier.valid():
293 if slice is not None:
294 (start, length, exact) = slice
295 if exact and len(data) != length: raise ValueError
296 data = data[start:start+length]
297 if len(data) != length: raise IndexError
301 def parse(lines, terminate=None):
302 """Generic parser for RFC822-style "Key: Value" data streams.
304 This parser can be used to read metadata logs and snapshot root descriptor
307 lines must be an iterable object which yields a sequence of lines of input.
309 If terminate is specified, it is used as a predicate to determine when to
310 stop reading input lines.
317 # Strip off a trailing newline, if present
318 if len(l) > 0 and l[-1] == "\n":
321 if terminate is not None and terminate(l):
322 if len(dict) > 0: yield dict
327 m = re.match(r"^([-\w]+):\s*(.*)$", l)
329 dict[m.group(1)] = m.group(2)
330 last_key = m.group(1)
331 elif len(l) > 0 and l[0].isspace() and last_key is not None:
336 if len(dict) > 0: yield dict
338 def parse_full(lines):
340 return parse(lines).next()
341 except StopIteration:
344 def parse_metadata_version(s):
345 """Convert a string with the snapshot version format to a tuple."""
347 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
351 return tuple([int(d) for d in m.group(1).split(".")])
353 def read_metadata(object_store, root):
354 """Iterate through all lines in the metadata log, following references."""
356 # Stack for keeping track of recursion when following references to
357 # portions of the log. The last entry in the stack corresponds to the
358 # object currently being parsed. Each entry is a list of lines which have
359 # been reversed, so that popping successive lines from the end of each list
360 # will return lines of the metadata log in order.
363 def follow_ref(refstr):
364 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
365 lines = object_store.get(refstr).splitlines(True)
371 while len(stack) > 0:
378 # An indirect reference which we must follow?
379 if len(line) > 0 and line[0] == '@':
387 """Metadata for a single file (or directory or...) from a snapshot."""
389 # Functions for parsing various datatypes that can appear in a metadata log
393 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
394 if s.startswith("0x"):
396 elif s.startswith("0"):
403 """Decode a URI-encoded (%xx escapes) string."""
408 """An unecoded string."""
413 """Decode a user/group to a tuple of uid/gid followed by name."""
415 uid = MetadataItem.decode_int(items[0])
418 if items[1].startswith("(") and items[1].endswith(")"):
419 name = MetadataItem.decode_str(items[1][1:-1])
423 def decode_device(s):
424 """Decode a device major/minor number."""
425 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
426 return (major, minor)
430 def __init__(self, fields, object_store):
431 """Initialize from a dictionary of key/value pairs from metadata log."""
434 self.object_store = object_store
436 self.items = self.Items()
437 for (k, v) in fields.items():
438 if k in self.field_types:
439 decoder = self.field_types[k]
440 setattr(self.items, k, decoder(v))
444 """Return an iterator for the data blocks that make up a file."""
446 # This traverses the list of blocks that make up a file, following
447 # indirect references. It is implemented in much the same way as
448 # read_metadata, so see that function for details of the technique.
450 objects = self.fields['data'].split()
454 def follow_ref(refstr):
455 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
456 objects = self.object_store.get(refstr).split()
458 stack.append(objects)
460 while len(stack) > 0:
467 # An indirect reference which we must follow?
468 if len(ref) > 0 and ref[0] == '@':
473 # Description of fields that might appear, and how they should be parsed.
474 MetadataItem.field_types = {
475 'name': MetadataItem.decode_str,
476 'type': MetadataItem.raw_str,
477 'mode': MetadataItem.decode_int,
478 'device': MetadataItem.decode_device,
479 'user': MetadataItem.decode_user,
480 'group': MetadataItem.decode_user,
481 'ctime': MetadataItem.decode_int,
482 'mtime': MetadataItem.decode_int,
483 'links': MetadataItem.decode_int,
484 'inode': MetadataItem.raw_str,
485 'checksum': MetadataItem.decode_str,
486 'size': MetadataItem.decode_int,
487 'contents': MetadataItem.decode_str,
488 'target': MetadataItem.decode_str,
491 def iterate_metadata(object_store, root):
492 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
493 yield MetadataItem(d, object_store)
496 """Access to the local database of snapshot contents and object checksums.
498 The local database is consulted when creating a snapshot to determine what
499 data can be re-used from old snapshots. Segment cleaning is performed by
500 manipulating the data in the local database; the local database also
501 includes enough data to guide the segment cleaning process.
504 def __init__(self, path, dbname="localdb.sqlite"):
505 self.db_connection = sqlite3.connect(path + "/" + dbname)
507 # Low-level database access. Use these methods when there isn't a
508 # higher-level interface available. Exception: do, however, remember to
509 # use the commit() method after making changes to make sure they are
510 # actually saved, even when going through higher-level interfaces.
512 "Commit any pending changes to the local database."
513 self.db_connection.commit()
516 "Roll back any pending changes to the local database."
517 self.db_connection.rollback()
520 "Return a DB-API cursor for directly accessing the local database."
521 return self.db_connection.cursor()
523 def list_schemes(self):
524 """Return the list of snapshots found in the local database.
526 The returned value is a list of tuples (id, scheme, name, time, intent).
530 cur.execute("select distinct scheme from snapshots")
531 schemes = [row[0] for row in cur.fetchall()]
535 def list_snapshots(self, scheme):
536 """Return a list of snapshots for the given scheme."""
538 cur.execute("select name from snapshots")
539 snapshots = [row[0] for row in cur.fetchall()]
543 def delete_snapshot(self, scheme, name):
544 """Remove the specified snapshot from the database.
546 Warning: This does not garbage collect all dependent data in the
547 database, so it must be followed by a call to garbage_collect() to make
548 the database consistent.
551 cur.execute("delete from snapshots where scheme = ? and name = ?",
554 def prune_old_snapshots(self, scheme, intent=1.0):
555 """Delete entries from old snapshots from the database.
557 Only snapshots with the specified scheme name will be deleted. If
558 intent is given, it gives the intended next snapshot type, to determine
559 how aggressively to clean (for example, intent=7 could be used if the
560 next snapshot will be a weekly snapshot).
565 # Find the id of the last snapshot to be created. This is used for
566 # measuring time in a way: we record this value in each segment we
567 # expire on this run, and then on a future run can tell if there have
568 # been intervening backups made.
569 cur.execute("select max(snapshotid) from snapshots")
570 last_snapshotid = cur.fetchone()[0]
572 # Get the list of old snapshots for this scheme. Delete all the old
573 # ones. Rules for what to keep:
574 # - Always keep the most recent snapshot.
575 # - If snapshot X is younger than Y, and X has higher intent, then Y
577 cur.execute("""select snapshotid, name, intent,
578 julianday('now') - timestamp as age
579 from snapshots where scheme = ?
580 order by age""", (scheme,))
584 for (id, name, snap_intent, snap_age) in cur.fetchall():
586 if snap_intent < max_intent:
587 # Delete small-intent snapshots if there is a more recent
588 # large-intent snapshot.
590 elif snap_intent == intent:
591 # Delete previous snapshots with the specified intent level.
594 if can_delete and not first:
595 print "Delete snapshot %d (%s)" % (id, name)
596 cur.execute("delete from snapshots where snapshotid = ?",
599 max_intent = max(max_intent, snap_intent)
601 self.garbage_collect()
603 def garbage_collect(self):
604 """Garbage-collect unreachable segment and object data.
606 Remove all segments and checksums which is not reachable from the
607 current set of snapshots stored in the local database.
611 # Delete entries in the segments_used table which are for non-existent
613 cur.execute("""delete from segments_used
614 where snapshotid not in
615 (select snapshotid from snapshots)""")
617 # Find segments which contain no objects used by any current snapshots,
618 # and delete them from the segment table.
619 cur.execute("""delete from segments where segmentid not in
620 (select segmentid from segments_used)""")
622 # Delete dangling objects in the block_index table.
623 cur.execute("""delete from block_index
624 where segmentid not in
625 (select segmentid from segments)""")
627 # Remove sub-block signatures for deleted objects.
628 cur.execute("""delete from subblock_signatures
630 (select blockid from block_index)""")
633 class SegmentInfo(Struct): pass
635 def get_segment_cleaning_list(self, age_boost=0.0):
636 """Return a list of all current segments with information for cleaning.
638 Return all segments which are currently known in the local database
639 (there might be other, older segments in the archive itself), and
640 return usage statistics for each to help decide which segments to
643 The returned list will be sorted by estimated cleaning benefit, with
644 segments that are best to clean at the start of the list.
646 If specified, the age_boost parameter (measured in days) will added to
647 the age of each segment, as a way of adjusting the benefit computation
648 before a long-lived snapshot is taken (for example, age_boost might be
649 set to 7 when cleaning prior to taking a weekly snapshot).
654 cur.execute("""select segmentid, used, size, mtime,
655 julianday('now') - mtime as age from segment_info
656 where expire_time is null""")
658 info = self.SegmentInfo()
660 info.used_bytes = row[1]
661 info.size_bytes = row[2]
663 info.age_days = row[4]
665 # If data is not available for whatever reason, treat it as 0.0.
666 if info.age_days is None:
668 if info.used_bytes is None:
669 info.used_bytes = 0.0
671 # Benefit calculation: u is the estimated fraction of each segment
672 # which is utilized (bytes belonging to objects still in use
673 # divided by total size; this doesn't take compression or storage
674 # overhead into account, but should give a reasonable estimate).
676 # The total benefit is a heuristic that combines several factors:
677 # the amount of space that can be reclaimed (1 - u), an ageing
678 # factor (info.age_days) that favors cleaning old segments to young
679 # ones and also is more likely to clean segments that will be
680 # rewritten for long-lived snapshots (age_boost), and finally a
681 # penalty factor for the cost of re-uploading data (u + 0.1).
682 u = info.used_bytes / info.size_bytes
683 info.cleaning_benefit \
684 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
686 segments.append(info)
688 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
691 def mark_segment_expired(self, segment):
692 """Mark a segment for cleaning in the local database.
694 The segment parameter should be either a SegmentInfo object or an
695 integer segment id. Objects in the given segment will be marked as
696 expired, which means that any future snapshots that would re-use those
697 objects will instead write out a new copy of the object, and thus no
698 future snapshots will depend upon the given segment.
701 if isinstance(segment, int):
703 elif isinstance(segment, self.SegmentInfo):
706 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
709 cur.execute("select max(snapshotid) from snapshots")
710 last_snapshotid = cur.fetchone()[0]
711 cur.execute("update segments set expire_time = ? where segmentid = ?",
712 (last_snapshotid, id))
713 cur.execute("update block_index set expired = 0 where segmentid = ?",
716 def balance_expired_objects(self):
717 """Analyze expired objects in segments to be cleaned and group by age.
719 Update the block_index table of the local database to group expired
720 objects by age. The exact number of buckets and the cutoffs for each
721 are dynamically determined. Calling this function after marking
722 segments expired will help in the segment cleaning process, by ensuring
723 that when active objects from clean segments are rewritten, they will
724 be placed into new segments roughly grouped by age.
727 # The expired column of the block_index table is used when generating a
728 # new Cumulus snapshot. A null value indicates that an object may be
729 # re-used. Otherwise, an object must be written into a new segment if
730 # needed. Objects with distinct expired values will be written into
731 # distinct segments, to allow for some grouping by age. The value 0 is
732 # somewhat special in that it indicates any rewritten objects can be
733 # placed in the same segment as completely new objects; this can be
734 # used for very young objects which have been expired, or objects not
735 # expected to be encountered.
737 # In the balancing process, all objects which are not used in any
738 # current snapshots will have expired set to 0. Objects which have
739 # been seen will be sorted by age and will have expired values set to
740 # 0, 1, 2, and so on based on age (with younger objects being assigned
741 # lower values). The number of buckets and the age cutoffs is
742 # determined by looking at the distribution of block ages.
746 # Mark all expired objects with expired = 0; these objects will later
747 # have values set to indicate groupings of objects when repacking.
748 cur.execute("""update block_index set expired = 0
749 where expired is not null""")
751 # We will want to aim for at least one full segment for each bucket
752 # that we eventually create, but don't know how many bytes that should
753 # be due to compression. So compute the average number of bytes in
754 # each expired segment as a rough estimate for the minimum size of each
755 # bucket. (This estimate could be thrown off by many not-fully-packed
756 # segments, but for now don't worry too much about that.) If we can't
757 # compute an average, it's probably because there are no expired
758 # segments, so we have no more work to do.
759 cur.execute("""select avg(size) from segments
761 (select distinct segmentid from block_index
762 where expired is not null)""")
763 segment_size_estimate = cur.fetchone()[0]
764 if not segment_size_estimate:
767 # Next, extract distribution of expired objects (number and size) by
768 # age. Save the timestamp for "now" so that the classification of
769 # blocks into age buckets will not change later in the function, after
770 # time has passed. Set any timestamps in the future to now, so we are
771 # guaranteed that for the rest of this function, age is always
773 cur.execute("select julianday('now')")
774 now = cur.fetchone()[0]
776 cur.execute("""update block_index set timestamp = ?
777 where timestamp > ? and expired is not null""",
780 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
781 from block_index where expired = 0
782 group by age order by age""", (now,))
783 distribution = cur.fetchall()
785 # Start to determine the buckets for expired objects. Heuristics used:
786 # - An upper bound on the number of buckets is given by the number of
787 # segments we estimate it will take to store all data. In fact,
788 # aim for a couple of segments per bucket.
789 # - Place very young objects in bucket 0 (place with new objects)
790 # unless there are enough of them to warrant a separate bucket.
791 # - Try not to create unnecessarily many buckets, since fewer buckets
792 # will allow repacked data to be grouped based on spatial locality
793 # (while more buckets will group by temporal locality). We want a
796 total_bytes = sum([i[2] for i in distribution])
797 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
798 min_size = 1.5 * segment_size_estimate
799 target_size = max(2 * segment_size_estimate,
800 total_bytes / target_buckets)
802 print "segment_size:", segment_size_estimate
803 print "distribution:", distribution
804 print "total_bytes:", total_bytes
805 print "target_buckets:", target_buckets
806 print "min, target size:", min_size, target_size
808 # Chosen cutoffs. Each bucket consists of objects with age greater
809 # than one cutoff value, but not greater than the next largest cutoff.
812 # Starting with the oldest objects, begin grouping together into
813 # buckets of size at least target_size bytes.
814 distribution.reverse()
816 min_age_bucket = False
817 for (age, items, size) in distribution:
818 if bucket_size >= target_size \
819 or (age < MIN_AGE and not min_age_bucket):
820 if bucket_size < target_size and len(cutoffs) > 0:
827 min_age_bucket = True
829 # The last (youngest) bucket will be group 0, unless it has enough data
830 # to be of size min_size by itself, or there happen to be no objects
831 # less than MIN_AGE at all.
832 if bucket_size >= min_size or not min_age_bucket:
836 print "cutoffs:", cutoffs
838 # Update the database to assign each object to the appropriate bucket.
840 for i in range(len(cutoffs)):
841 cur.execute("""update block_index set expired = ?
842 where round(? - timestamp) > ?
843 and expired is not null""",
844 (i, now, cutoffs[i]))