1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 """High-level interface for working with Cumulus archives.
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23 - listing the snapshots and segments present
24 - reading segment contents
25 - parsing snapshot descriptors and snapshot metadata logs
26 - reading and maintaining the local object database
29 from __future__ import division, print_function, unicode_literals
45 import thread as _thread
48 import cumulus.store.file
51 StringTypes = (str, unicode)
55 # The largest supported snapshot format that can be understood.
56 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
58 # Maximum number of nested indirect references allowed in a snapshot.
59 MAX_RECURSION_DEPTH = 3
61 # All segments which have been accessed this session.
62 accessed_segments = set()
64 # Table of methods used to filter segments before storage, and corresponding
65 # filename extensions. These are listed in priority order (methods earlier in
66 # the list are tried first).
68 (".gpg", "cumulus-filter-gpg --decrypt"),
70 (".bz2", "bzip2 -dc"),
75 """Decode binary data from a file into a sequence of lines.
77 Newline markers are retained."""
78 return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
81 """Decode a URI-encoded (%xx escapes) string."""
82 def hex_decode(m): return chr(int(m.group(1), 16))
83 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
85 """Encode a string to URI-encoded (%xx escapes) form."""
87 if c > '+' and c < '\x7f' and c != '@':
90 return "%%%02x" % (ord(c),)
91 return ''.join(hex_encode(c) for c in s)
94 """A class which merely acts as a data container.
96 Instances of this class (or its subclasses) are merely used to store data
97 in various attributes. No methods are provided.
101 return "<%s %s>" % (self.__class__, self.__dict__)
103 CHECKSUM_ALGORITHMS = {
104 'sha1': hashlib.sha1,
105 'sha224': hashlib.sha224,
106 'sha256': hashlib.sha256,
109 class ChecksumCreator:
110 """Compute a Cumulus checksum for provided data.
112 The algorithm used is selectable, but currently defaults to sha1.
115 def __init__(self, algorithm='sha1'):
116 self.algorithm = algorithm
117 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
119 def update(self, data):
120 self.hash.update(data)
124 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
126 class ChecksumVerifier:
127 """Verify whether a checksum from a snapshot matches the supplied data."""
129 def __init__(self, checksumstr):
130 """Create an object to check the supplied checksum."""
132 (algo, checksum) = checksumstr.split("=", 1)
133 self.checksum = checksum
134 self.hash = CHECKSUM_ALGORITHMS[algo]()
136 def update(self, data):
137 self.hash.update(data)
140 """Return a boolean indicating whether the checksum matches."""
142 result = self.hash.hexdigest()
143 return result == self.checksum
145 class SearchPathEntry(object):
146 """Item representing a possible search location for Cumulus files.
148 Some Cumulus files might be stored in multiple possible file locations: due
149 to format (different compression mechanisms with different extensions),
150 locality (different segments might be placed in different directories to
151 control archiving policies), for backwards compatibility (default location
152 changed over time). A SearchPathEntry describes a possible location for a
155 def __init__(self, directory_prefix, suffix, context=None):
156 self._directory_prefix = directory_prefix
157 self._suffix = suffix
158 self._context = context
161 return "%s(%r, %r, %r)" % (self.__class__.__name__,
162 self._directory_prefix, self._suffix,
165 def build_path(self, basename):
166 """Construct the search path to use for a file with name basename.
168 Returns a tuple (pathname, context), where pathname is the path to try
169 and context is any additional data associated with this search entry
172 return (posixpath.join(self._directory_prefix, basename + self._suffix),
175 class SearchPath(object):
176 """A collection of locations to search for files and lookup utilities.
178 For looking for a file in a Cumulus storage backend, a SearchPath object
179 contains a list of possible locations to try. A SearchPath can be used to
180 perform the search as well; when a file is found the search path ordering
181 is updated (moving the successful SearchPathEntry to the front of the list
182 for future searches).
184 def __init__(self, name_regex, searchpath):
185 self._regex = re.compile(name_regex)
186 self._path = list(searchpath)
188 def add_search_entry(self, entry):
189 self._path.append(entry)
191 def directories(self):
192 """Return the set of directories to search for a file type."""
193 return set(entry._directory_prefix for entry in self._path)
195 def get(self, backend, basename):
196 for (i, entry) in enumerate(self._path):
198 (pathname, context) = entry.build_path(basename)
199 fp = backend.get(pathname)
200 # On success, move this entry to the front of the search path
201 # to speed future searches.
204 self._path.insert(0, entry)
205 return (fp, pathname, context)
206 except cumulus.store.NotFoundError:
208 raise cumulus.store.NotFoundError(basename)
210 def stat(self, backend, basename):
211 for (i, entry) in enumerate(self._path):
213 (pathname, context) = entry.build_path(basename)
214 stat_data = backend.stat(pathname)
215 # On success, move this entry to the front of the search path
216 # to speed future searches.
219 self._path.insert(0, entry)
220 result = {"path": pathname}
221 result.update(stat_data)
223 except cumulus.store.NotFoundError:
225 raise cumulus.store.NotFoundError(basename)
227 def match(self, filename):
228 return self._regex.match(filename)
230 def list(self, backend):
232 for d in self.directories():
234 for f in backend.list(d):
237 if m: yield (posixpath.join(d, f), m)
238 except cumulus.store.NotFoundError:
241 raise cumulus.store.NotFoundError(backend)
243 def _build_segments_searchpath(prefix):
244 for (extension, filter) in SEGMENT_FILTERS:
245 yield SearchPathEntry(prefix, extension, filter)
248 "checksums": SearchPath(
249 r"^snapshot-(.*)\.(\w+)sums$",
250 [SearchPathEntry("meta", ".sha1sums"),
251 SearchPathEntry("checksums", ".sha1sums"),
252 SearchPathEntry("", ".sha1sums")]),
254 r"^snapshot-(.*)\.meta(\.\S+)?$",
255 _build_segments_searchpath("meta")),
256 "segments": SearchPath(
257 (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
260 _build_segments_searchpath("segments0"),
261 _build_segments_searchpath("segments1"),
262 _build_segments_searchpath(""),
263 _build_segments_searchpath("segments"))),
264 "snapshots": SearchPath(
265 r"^snapshot-(.*)\.(cumulus|lbs)$",
266 [SearchPathEntry("snapshots", ".cumulus"),
267 SearchPathEntry("snapshots", ".lbs"),
268 SearchPathEntry("", ".cumulus"),
269 SearchPathEntry("", ".lbs")]),
272 class BackendWrapper(object):
273 """Wrapper around a Cumulus storage backend that understands file types.
275 The BackendWrapper class understands different Cumulus file types, such as
276 snapshots and segments, and implements higher-level operations such as
277 "retrieve a snapshot with a specific name" (hiding operations such as
278 searching for the correct file name).
281 def __init__(self, backend):
282 """Initializes a wrapper around the specified storage backend.
284 store may either be a Store object or URL.
286 if type(backend) in StringTypes:
287 self._backend = cumulus.store.open(backend)
289 self._backend = backend
292 def raw_backend(self):
295 def stat_generic(self, basename, filetype):
296 return SEARCH_PATHS[filetype].stat(self._backend, basename)
298 def open_generic(self, basename, filetype):
299 return SEARCH_PATHS[filetype].get(self._backend, basename)
301 def open_snapshot(self, name):
302 return self.open_generic("snapshot-" + name, "snapshots")
304 def open_segment(self, name):
305 return self.open_generic(name + ".tar", "segments")
307 def list_generic(self, filetype):
308 return ((x[1].group(1), x[0])
309 for x in SEARCH_PATHS[filetype].list(self._backend))
311 def prefetch_generic(self):
312 """Calls scan on directories to prefetch file metadata."""
314 for typeinfo in SEARCH_PATHS.values():
315 directories.update(typeinfo.directories())
316 for d in directories:
318 self._backend.scan(d)
321 def __init__(self, backend):
322 if isinstance(backend, BackendWrapper):
323 self.backend = backend
325 self.backend = BackendWrapper(backend)
330 def get_cachedir(self):
331 if self.cachedir is None:
332 self.cachedir = tempfile.mkdtemp("-cumulus")
336 if self.cachedir is not None:
337 # TODO: Avoid use of system, make this safer
338 os.system("rm -rf " + self.cachedir)
342 def parse_ref(refstr):
343 m = re.match(r"^zero\[(\d+)\]$", refstr)
345 return ("zero", None, None, (0, int(m.group(1)), False))
347 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(=?(\d+)|(\d+)\+(\d+))\])?$", refstr)
352 checksum = m.group(3)
355 if checksum is not None:
356 checksum = checksum.lstrip("(").rstrip(")")
358 if slice is not None:
359 if m.group(6) is not None:
360 # Size-assertion slice
361 slice = (0, int(m.group(6)), True)
363 slice = (int(m.group(7)), int(m.group(8)), False)
365 return (segment, object, checksum, slice)
367 def list_snapshots(self):
368 return set(x[0] for x in self.backend.list_generic("snapshots"))
370 def list_segments(self):
371 return set(x[0] for x in self.backend.list_generic("segments"))
373 def load_snapshot(self, snapshot):
374 snapshot_file = self.backend.open_snapshot(snapshot)[0]
375 return to_lines(snapshot_file.read())
378 def filter_data(filehandle, filter_cmd):
379 if filter_cmd is None:
381 p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
382 stdout=subprocess.PIPE, close_fds=True)
383 input, output = p.stdin, p.stdout
384 def copy_thread(src, dst):
387 block = src.read(BLOCK_SIZE)
388 if len(block) == 0: break
393 _thread.start_new_thread(copy_thread, (filehandle, input))
396 def get_segment(self, segment):
397 accessed_segments.add(segment)
399 (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
400 return self.filter_data(segment_fp, filter_cmd)
402 def load_segment(self, segment):
403 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
405 data_obj = seg.extractfile(item)
406 path = item.name.split('/')
407 if len(path) == 2 and path[0] == segment:
408 yield (path[1], data_obj.read())
410 def extract_segment(self, segment):
411 segdir = os.path.join(self.get_cachedir(), segment)
413 for (object, data) in self.load_segment(segment):
414 f = open(os.path.join(segdir, object), 'wb')
418 def load_object(self, segment, object):
419 accessed_segments.add(segment)
420 path = os.path.join(self.get_cachedir(), segment, object)
421 if not os.access(path, os.R_OK):
422 self.extract_segment(segment)
423 if segment in self._lru_list: self._lru_list.remove(segment)
424 self._lru_list.append(segment)
425 while len(self._lru_list) > self.CACHE_SIZE:
426 os.system("rm -rf " + os.path.join(self.cachedir,
428 self._lru_list = self._lru_list[1:]
429 return open(path, 'rb').read()
431 def get(self, refstr):
432 """Fetch the given object and return it.
434 The input should be an object reference, in string form.
437 (segment, object, checksum, slice) = self.parse_ref(refstr)
439 if segment == "zero":
440 return "\0" * slice[1]
442 data = self.load_object(segment, object)
444 if checksum is not None:
445 verifier = ChecksumVerifier(checksum)
446 verifier.update(data)
447 if not verifier.valid():
450 if slice is not None:
451 (start, length, exact) = slice
452 # Note: The following assertion check may need to be commented out
453 # to restore from pre-v0.8 snapshots, as the syntax for
454 # size-assertion slices has changed.
455 if exact and len(data) != length: raise ValueError
456 data = data[start:start+length]
457 if len(data) != length: raise IndexError
462 self.backend.prefetch_generic()
464 def parse(lines, terminate=None):
465 """Generic parser for RFC822-style "Key: Value" data streams.
467 This parser can be used to read metadata logs and snapshot root descriptor
470 lines must be an iterable object which yields a sequence of lines of input.
472 If terminate is specified, it is used as a predicate to determine when to
473 stop reading input lines.
479 def make_result(result):
480 return dict((k, "".join(v)) for (k, v) in result.items())
483 # Strip off a trailing newline, if present
484 if len(l) > 0 and l[-1] == "\n":
487 if terminate is not None and terminate(l):
488 if len(result) > 0: yield make_result(result)
493 m = re.match(r"^([-\w]+):\s*(.*)$", l)
495 result[m.group(1)] = [m.group(2)]
496 last_key = m.group(1)
497 elif len(l) > 0 and l[0].isspace() and last_key is not None:
498 result[last_key].append(l)
502 if len(result) > 0: yield make_result(result)
504 def parse_full(lines):
506 return next(parse(lines))
507 except StopIteration:
510 def parse_metadata_version(s):
511 """Convert a string with the snapshot version format to a tuple."""
513 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
517 return tuple([int(d) for d in m.group(1).split(".")])
519 def read_metadata(object_store, root):
520 """Iterate through all lines in the metadata log, following references."""
522 # Stack for keeping track of recursion when following references to
523 # portions of the log. The last entry in the stack corresponds to the
524 # object currently being parsed. Each entry is a list of lines which have
525 # been reversed, so that popping successive lines from the end of each list
526 # will return lines of the metadata log in order.
529 def follow_ref(refstr):
530 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
531 lines = to_lines(object_store.get(refstr))
537 while len(stack) > 0:
544 # An indirect reference which we must follow?
545 if len(line) > 0 and line[0] == '@':
553 """Metadata for a single file (or directory or...) from a snapshot."""
555 # Functions for parsing various datatypes that can appear in a metadata log
559 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
560 if s.startswith("0x"):
562 elif s.startswith("0"):
569 """Decode a URI-encoded (%xx escapes) string."""
574 """An unecoded string."""
579 """Decode a user/group to a tuple of uid/gid followed by name."""
581 uid = MetadataItem.decode_int(items[0])
584 if items[1].startswith("(") and items[1].endswith(")"):
585 name = MetadataItem.decode_str(items[1][1:-1])
589 def decode_device(s):
590 """Decode a device major/minor number."""
591 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
592 return (major, minor)
596 def __init__(self, fields, object_store):
597 """Initialize from a dictionary of key/value pairs from metadata log."""
600 self.object_store = object_store
602 self.items = self.Items()
603 for (k, v) in fields.items():
604 if k in self.field_types:
605 decoder = self.field_types[k]
606 setattr(self.items, k, decoder(v))
610 """Return an iterator for the data blocks that make up a file."""
612 # This traverses the list of blocks that make up a file, following
613 # indirect references. It is implemented in much the same way as
614 # read_metadata, so see that function for details of the technique.
616 objects = self.fields['data'].split()
620 def follow_ref(refstr):
621 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
622 objects = self.object_store.get(refstr).split()
624 stack.append(objects)
626 while len(stack) > 0:
633 # An indirect reference which we must follow?
634 if len(ref) > 0 and ref[0] == '@':
639 # Description of fields that might appear, and how they should be parsed.
640 MetadataItem.field_types = {
641 'name': MetadataItem.decode_str,
642 'type': MetadataItem.raw_str,
643 'mode': MetadataItem.decode_int,
644 'device': MetadataItem.decode_device,
645 'user': MetadataItem.decode_user,
646 'group': MetadataItem.decode_user,
647 'ctime': MetadataItem.decode_int,
648 'mtime': MetadataItem.decode_int,
649 'links': MetadataItem.decode_int,
650 'inode': MetadataItem.raw_str,
651 'checksum': MetadataItem.decode_str,
652 'size': MetadataItem.decode_int,
653 'contents': MetadataItem.decode_str,
654 'target': MetadataItem.decode_str,
657 def iterate_metadata(object_store, root):
658 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
659 yield MetadataItem(d, object_store)
662 """Access to the local database of snapshot contents and object checksums.
664 The local database is consulted when creating a snapshot to determine what
665 data can be re-used from old snapshots. Segment cleaning is performed by
666 manipulating the data in the local database; the local database also
667 includes enough data to guide the segment cleaning process.
670 def __init__(self, path, dbname="localdb.sqlite"):
671 self.db_connection = sqlite3.connect(path + "/" + dbname)
673 # Low-level database access. Use these methods when there isn't a
674 # higher-level interface available. Exception: do, however, remember to
675 # use the commit() method after making changes to make sure they are
676 # actually saved, even when going through higher-level interfaces.
678 "Commit any pending changes to the local database."
679 self.db_connection.commit()
682 "Roll back any pending changes to the local database."
683 self.db_connection.rollback()
686 "Return a DB-API cursor for directly accessing the local database."
687 return self.db_connection.cursor()
689 def list_schemes(self):
690 """Return the list of snapshots found in the local database.
692 The returned value is a list of tuples (id, scheme, name, time, intent).
696 cur.execute("select distinct scheme from snapshots")
697 schemes = [row[0] for row in cur.fetchall()]
701 def list_snapshots(self, scheme):
702 """Return a list of snapshots for the given scheme."""
704 cur.execute("select name from snapshots")
705 snapshots = [row[0] for row in cur.fetchall()]
709 def delete_snapshot(self, scheme, name):
710 """Remove the specified snapshot from the database.
712 Warning: This does not garbage collect all dependent data in the
713 database, so it must be followed by a call to garbage_collect() to make
714 the database consistent.
717 cur.execute("delete from snapshots where scheme = ? and name = ?",
720 def prune_old_snapshots(self, scheme, intent=1.0):
721 """Delete entries from old snapshots from the database.
723 Only snapshots with the specified scheme name will be deleted. If
724 intent is given, it gives the intended next snapshot type, to determine
725 how aggressively to clean (for example, intent=7 could be used if the
726 next snapshot will be a weekly snapshot).
731 # Find the id of the last snapshot to be created. This is used for
732 # measuring time in a way: we record this value in each segment we
733 # expire on this run, and then on a future run can tell if there have
734 # been intervening backups made.
735 cur.execute("select max(snapshotid) from snapshots")
736 last_snapshotid = cur.fetchone()[0]
738 # Get the list of old snapshots for this scheme. Delete all the old
739 # ones. Rules for what to keep:
740 # - Always keep the most recent snapshot.
741 # - If snapshot X is younger than Y, and X has higher intent, then Y
743 cur.execute("""select snapshotid, name, intent,
744 julianday('now') - timestamp as age
745 from snapshots where scheme = ?
746 order by age""", (scheme,))
750 for (id, name, snap_intent, snap_age) in cur.fetchall():
752 if snap_intent < max_intent:
753 # Delete small-intent snapshots if there is a more recent
754 # large-intent snapshot.
756 elif snap_intent == intent:
757 # Delete previous snapshots with the specified intent level.
760 if can_delete and not first:
761 print("Delete snapshot %d (%s)" % (id, name))
762 cur.execute("delete from snapshots where snapshotid = ?",
765 max_intent = max(max_intent, snap_intent)
767 self.garbage_collect()
769 def garbage_collect(self):
770 """Garbage-collect unreachable segment and object data.
772 Remove all segments and checksums which is not reachable from the
773 current set of snapshots stored in the local database.
777 # Delete entries in the segment_utilization table which are for
778 # non-existent snapshots.
779 cur.execute("""delete from segment_utilization
780 where snapshotid not in
781 (select snapshotid from snapshots)""")
783 # Delete segments not referenced by any current snapshots.
784 cur.execute("""delete from segments where segmentid not in
785 (select segmentid from segment_utilization)""")
787 # Delete dangling objects in the block_index table.
788 cur.execute("""delete from block_index
789 where segmentid not in
790 (select segmentid from segments)""")
792 # Remove sub-block signatures for deleted objects.
793 cur.execute("""delete from subblock_signatures
795 (select blockid from block_index)""")
798 class SegmentInfo(Struct): pass
800 def get_segment_cleaning_list(self, age_boost=0.0):
801 """Return a list of all current segments with information for cleaning.
803 Return all segments which are currently known in the local database
804 (there might be other, older segments in the archive itself), and
805 return usage statistics for each to help decide which segments to
808 The returned list will be sorted by estimated cleaning benefit, with
809 segments that are best to clean at the start of the list.
811 If specified, the age_boost parameter (measured in days) will added to
812 the age of each segment, as a way of adjusting the benefit computation
813 before a long-lived snapshot is taken (for example, age_boost might be
814 set to 7 when cleaning prior to taking a weekly snapshot).
819 cur.execute("""select segmentid, used, size, mtime,
820 julianday('now') - mtime as age from segment_info
821 where expire_time is null""")
823 info = self.SegmentInfo()
825 info.used_bytes = row[1]
826 info.size_bytes = row[2]
828 info.age_days = row[4]
830 # If data is not available for whatever reason, treat it as 0.0.
831 if info.age_days is None:
833 if info.used_bytes is None:
834 info.used_bytes = 0.0
836 # Benefit calculation: u is the estimated fraction of each segment
837 # which is utilized (bytes belonging to objects still in use
838 # divided by total size; this doesn't take compression or storage
839 # overhead into account, but should give a reasonable estimate).
841 # The total benefit is a heuristic that combines several factors:
842 # the amount of space that can be reclaimed (1 - u), an ageing
843 # factor (info.age_days) that favors cleaning old segments to young
844 # ones and also is more likely to clean segments that will be
845 # rewritten for long-lived snapshots (age_boost), and finally a
846 # penalty factor for the cost of re-uploading data (u + 0.1).
847 u = info.used_bytes / info.size_bytes
848 info.cleaning_benefit \
849 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
851 segments.append(info)
853 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
856 def mark_segment_expired(self, segment):
857 """Mark a segment for cleaning in the local database.
859 The segment parameter should be either a SegmentInfo object or an
860 integer segment id. Objects in the given segment will be marked as
861 expired, which means that any future snapshots that would re-use those
862 objects will instead write out a new copy of the object, and thus no
863 future snapshots will depend upon the given segment.
866 if isinstance(segment, int):
868 elif isinstance(segment, self.SegmentInfo):
871 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
874 cur.execute("select max(snapshotid) from snapshots")
875 last_snapshotid = cur.fetchone()[0]
876 cur.execute("update segments set expire_time = ? where segmentid = ?",
877 (last_snapshotid, id))
878 cur.execute("update block_index set expired = 0 where segmentid = ?",
881 def balance_expired_objects(self):
882 """Analyze expired objects in segments to be cleaned and group by age.
884 Update the block_index table of the local database to group expired
885 objects by age. The exact number of buckets and the cutoffs for each
886 are dynamically determined. Calling this function after marking
887 segments expired will help in the segment cleaning process, by ensuring
888 that when active objects from clean segments are rewritten, they will
889 be placed into new segments roughly grouped by age.
892 # The expired column of the block_index table is used when generating a
893 # new Cumulus snapshot. A null value indicates that an object may be
894 # re-used. Otherwise, an object must be written into a new segment if
895 # needed. Objects with distinct expired values will be written into
896 # distinct segments, to allow for some grouping by age. The value 0 is
897 # somewhat special in that it indicates any rewritten objects can be
898 # placed in the same segment as completely new objects; this can be
899 # used for very young objects which have been expired, or objects not
900 # expected to be encountered.
902 # In the balancing process, all objects which are not used in any
903 # current snapshots will have expired set to 0. Objects which have
904 # been seen will be sorted by age and will have expired values set to
905 # 0, 1, 2, and so on based on age (with younger objects being assigned
906 # lower values). The number of buckets and the age cutoffs is
907 # determined by looking at the distribution of block ages.
911 # Mark all expired objects with expired = 0; these objects will later
912 # have values set to indicate groupings of objects when repacking.
913 cur.execute("""update block_index set expired = 0
914 where expired is not null""")
916 # We will want to aim for at least one full segment for each bucket
917 # that we eventually create, but don't know how many bytes that should
918 # be due to compression. So compute the average number of bytes in
919 # each expired segment as a rough estimate for the minimum size of each
920 # bucket. (This estimate could be thrown off by many not-fully-packed
921 # segments, but for now don't worry too much about that.) If we can't
922 # compute an average, it's probably because there are no expired
923 # segments, so we have no more work to do.
924 cur.execute("""select avg(size) from segments
926 (select distinct segmentid from block_index
927 where expired is not null)""")
928 segment_size_estimate = cur.fetchone()[0]
929 if not segment_size_estimate:
932 # Next, extract distribution of expired objects (number and size) by
933 # age. Save the timestamp for "now" so that the classification of
934 # blocks into age buckets will not change later in the function, after
935 # time has passed. Set any timestamps in the future to now, so we are
936 # guaranteed that for the rest of this function, age is always
938 cur.execute("select julianday('now')")
939 now = cur.fetchone()[0]
941 cur.execute("""update block_index set timestamp = ?
942 where timestamp > ? and expired is not null""",
945 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
946 from block_index where expired = 0
947 group by age order by age""", (now,))
948 distribution = cur.fetchall()
950 # Start to determine the buckets for expired objects. Heuristics used:
951 # - An upper bound on the number of buckets is given by the number of
952 # segments we estimate it will take to store all data. In fact,
953 # aim for a couple of segments per bucket.
954 # - Place very young objects in bucket 0 (place with new objects)
955 # unless there are enough of them to warrant a separate bucket.
956 # - Try not to create unnecessarily many buckets, since fewer buckets
957 # will allow repacked data to be grouped based on spatial locality
958 # (while more buckets will group by temporal locality). We want a
961 total_bytes = sum([i[2] for i in distribution])
962 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
963 min_size = 1.5 * segment_size_estimate
964 target_size = max(2 * segment_size_estimate,
965 total_bytes / target_buckets)
967 print("segment_size:", segment_size_estimate)
968 print("distribution:", distribution)
969 print("total_bytes:", total_bytes)
970 print("target_buckets:", target_buckets)
971 print("min, target size:", min_size, target_size)
973 # Chosen cutoffs. Each bucket consists of objects with age greater
974 # than one cutoff value, but not greater than the next largest cutoff.
977 # Starting with the oldest objects, begin grouping together into
978 # buckets of size at least target_size bytes.
979 distribution.reverse()
981 min_age_bucket = False
982 for (age, items, size) in distribution:
983 if bucket_size >= target_size \
984 or (age < MIN_AGE and not min_age_bucket):
985 if bucket_size < target_size and len(cutoffs) > 0:
992 min_age_bucket = True
994 # The last (youngest) bucket will be group 0, unless it has enough data
995 # to be of size min_size by itself, or there happen to be no objects
996 # less than MIN_AGE at all.
997 if bucket_size >= min_size or not min_age_bucket:
1001 print("cutoffs:", cutoffs)
1003 # Update the database to assign each object to the appropriate bucket.
1005 for i in range(len(cutoffs)):
1006 cur.execute("""update block_index set expired = ?
1007 where round(? - timestamp) > ?
1008 and expired is not null""",
1009 (i, now, cutoffs[i]))