1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 """High-level interface for working with Cumulus archives.
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23 - listing the snapshots and segments present
24 - reading segment contents
25 - parsing snapshot descriptors and snapshot metadata logs
26 - reading and maintaining the local object database
29 from __future__ import division, print_function, unicode_literals
46 import thread as _thread
49 import cumulus.store.file
52 # The largest supported snapshot format that can be understood.
53 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
55 # Maximum number of nested indirect references allowed in a snapshot.
56 MAX_RECURSION_DEPTH = 3
58 # All segments which have been accessed this session.
59 accessed_segments = set()
61 # Table of methods used to filter segments before storage, and corresponding
62 # filename extensions. These are listed in priority order (methods earlier in
63 # the list are tried first).
65 (".gpg", "cumulus-filter-gpg --decrypt"),
67 (".bz2", "bzip2 -dc"),
72 """Decode binary data from a file into a sequence of lines.
74 Newline markers are retained."""
75 return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
78 """A class which merely acts as a data container.
80 Instances of this class (or its subclasses) are merely used to store data
81 in various attributes. No methods are provided.
85 return "<%s %s>" % (self.__class__, self.__dict__)
87 CHECKSUM_ALGORITHMS = {
89 'sha224': hashlib.sha224,
90 'sha256': hashlib.sha256,
93 class ChecksumCreator:
94 """Compute a Cumulus checksum for provided data.
96 The algorithm used is selectable, but currently defaults to sha1.
99 def __init__(self, algorithm='sha1'):
100 self.algorithm = algorithm
101 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
103 def update(self, data):
104 self.hash.update(data)
108 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
110 class ChecksumVerifier:
111 """Verify whether a checksum from a snapshot matches the supplied data."""
113 def __init__(self, checksumstr):
114 """Create an object to check the supplied checksum."""
116 (algo, checksum) = checksumstr.split("=", 1)
117 self.checksum = checksum
118 self.hash = CHECKSUM_ALGORITHMS[algo]()
120 def update(self, data):
121 self.hash.update(data)
124 """Return a boolean indicating whether the checksum matches."""
126 result = self.hash.hexdigest()
127 return result == self.checksum
129 class SearchPathEntry(object):
130 """Item representing a possible search location for Cumulus files.
132 Some Cumulus files might be stored in multiple possible file locations: due
133 to format (different compression mechanisms with different extensions),
134 locality (different segments might be placed in different directories to
135 control archiving policies), for backwards compatibility (default location
136 changed over time). A SearchPathEntry describes a possible location for a
139 def __init__(self, directory_prefix, suffix, context=None):
140 self._directory_prefix = directory_prefix
141 self._suffix = suffix
142 self._context = context
145 return "%s(%r, %r, %r)" % (self.__class__.__name__,
146 self._directory_prefix, self._suffix,
149 def build_path(self, basename):
150 """Construct the search path to use for a file with name basename.
152 Returns a tuple (pathname, context), where pathname is the path to try
153 and context is any additional data associated with this search entry
156 return (posixpath.join(self._directory_prefix, basename + self._suffix),
159 class SearchPath(object):
160 """A collection of locations to search for files and lookup utilities.
162 For looking for a file in a Cumulus storage backend, a SearchPath object
163 contains a list of possible locations to try. A SearchPath can be used to
164 perform the search as well; when a file is found the search path ordering
165 is updated (moving the successful SearchPathEntry to the front of the list
166 for future searches).
168 def __init__(self, name_regex, searchpath):
169 self._regex = re.compile(name_regex)
170 self._path = list(searchpath)
172 def add_search_entry(self, entry):
173 self._path.append(entry)
175 def directories(self):
176 """Return the set of directories to search for a file type."""
177 return set(entry._directory_prefix for entry in self._path)
179 def get(self, backend, basename):
180 for (i, entry) in enumerate(self._path):
182 (pathname, context) = entry.build_path(basename)
183 fp = backend.get(pathname)
184 # On success, move this entry to the front of the search path
185 # to speed future searches.
188 self._path.insert(0, entry)
189 return (fp, pathname, context)
190 except cumulus.store.NotFoundError:
192 raise cumulus.store.NotFoundError(basename)
194 def stat(self, backend, basename):
195 for (i, entry) in enumerate(self._path):
197 (pathname, context) = entry.build_path(basename)
198 stat_data = backend.stat(pathname)
199 # On success, move this entry to the front of the search path
200 # to speed future searches.
203 self._path.insert(0, entry)
204 result = {"path": pathname}
205 result.update(stat_data)
207 except cumulus.store.NotFoundError:
209 raise cumulus.store.NotFoundError(basename)
211 def match(self, filename):
212 return self._regex.match(filename)
214 def list(self, backend):
216 for d in self.directories():
218 for f in backend.list(d):
221 if m: yield (posixpath.join(d, f), m)
222 except cumulus.store.NotFoundError:
225 raise cumulus.store.NotFoundError(backend)
227 def _build_segments_searchpath(prefix):
228 for (extension, filter) in SEGMENT_FILTERS:
229 yield SearchPathEntry(prefix, extension, filter)
232 "checksums": SearchPath(
233 r"^snapshot-(.*)\.(\w+)sums$",
234 [SearchPathEntry("meta", ".sha1sums"),
235 SearchPathEntry("checksums", ".sha1sums"),
236 SearchPathEntry("", ".sha1sums")]),
238 r"^snapshot-(.*)\.meta(\.\S+)?$",
239 _build_segments_searchpath("meta")),
240 "segments": SearchPath(
241 (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
244 _build_segments_searchpath("segments0"),
245 _build_segments_searchpath("segments1"),
246 _build_segments_searchpath(""),
247 _build_segments_searchpath("segments"))),
248 "snapshots": SearchPath(
249 r"^snapshot-(.*)\.(cumulus|lbs)$",
250 [SearchPathEntry("snapshots", ".cumulus"),
251 SearchPathEntry("snapshots", ".lbs"),
252 SearchPathEntry("", ".cumulus"),
253 SearchPathEntry("", ".lbs")]),
256 class BackendWrapper(object):
257 """Wrapper around a Cumulus storage backend that understands file types.
259 The BackendWrapper class understands different Cumulus file types, such as
260 snapshots and segments, and implements higher-level operations such as
261 "retrieve a snapshot with a specific name" (hiding operations such as
262 searching for the correct file name).
265 def __init__(self, backend):
266 """Initializes a wrapper around the specified storage backend.
268 store may either be a Store object or URL.
270 if isinstance(backend, six.string_types):
271 self._backend = cumulus.store.open(backend)
273 self._backend = backend
276 def raw_backend(self):
279 def stat_generic(self, basename, filetype):
280 return SEARCH_PATHS[filetype].stat(self._backend, basename)
282 def open_generic(self, basename, filetype):
283 return SEARCH_PATHS[filetype].get(self._backend, basename)
285 def open_snapshot(self, name):
286 return self.open_generic("snapshot-" + name, "snapshots")
288 def open_segment(self, name):
289 return self.open_generic(name + ".tar", "segments")
291 def list_generic(self, filetype):
292 return ((x[1].group(1), x[0])
293 for x in SEARCH_PATHS[filetype].list(self._backend))
295 def prefetch_generic(self):
296 """Calls scan on directories to prefetch file metadata."""
298 for typeinfo in SEARCH_PATHS.values():
299 directories.update(typeinfo.directories())
300 for d in directories:
302 self._backend.scan(d)
305 def __init__(self, backend):
306 if isinstance(backend, BackendWrapper):
307 self.backend = backend
309 self.backend = BackendWrapper(backend)
314 def get_cachedir(self):
315 if self.cachedir is None:
316 self.cachedir = tempfile.mkdtemp("-cumulus")
320 if self.cachedir is not None:
321 # TODO: Avoid use of system, make this safer
322 os.system("rm -rf " + self.cachedir)
326 def parse_ref(refstr):
327 m = re.match(r"^zero\[(\d+)\]$", refstr)
329 return ("zero", None, None, (0, int(m.group(1)), False))
331 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(=?(\d+)|(\d+)\+(\d+))\])?$", refstr)
336 checksum = m.group(3)
339 if checksum is not None:
340 checksum = checksum.lstrip("(").rstrip(")")
342 if slice is not None:
343 if m.group(6) is not None:
344 # Size-assertion slice
345 slice = (0, int(m.group(6)), True)
347 slice = (int(m.group(7)), int(m.group(8)), False)
349 return (segment, object, checksum, slice)
351 def list_snapshots(self):
352 return set(x[0] for x in self.backend.list_generic("snapshots"))
354 def list_segments(self):
355 return set(x[0] for x in self.backend.list_generic("segments"))
357 def load_snapshot(self, snapshot):
358 snapshot_file = self.backend.open_snapshot(snapshot)[0]
359 return to_lines(snapshot_file.read())
362 def filter_data(filehandle, filter_cmd):
363 if filter_cmd is None:
365 p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
366 stdout=subprocess.PIPE, close_fds=True)
367 input, output = p.stdin, p.stdout
368 def copy_thread(src, dst):
371 block = src.read(BLOCK_SIZE)
372 if len(block) == 0: break
377 _thread.start_new_thread(copy_thread, (filehandle, input))
380 def get_segment(self, segment):
381 accessed_segments.add(segment)
383 (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
384 return self.filter_data(segment_fp, filter_cmd)
386 def load_segment(self, segment):
387 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
389 data_obj = seg.extractfile(item)
390 path = item.name.split('/')
391 if len(path) == 2 and path[0] == segment:
392 yield (path[1], data_obj.read())
394 def extract_segment(self, segment):
395 segdir = os.path.join(self.get_cachedir(), segment)
397 for (object, data) in self.load_segment(segment):
398 f = open(os.path.join(segdir, object), 'wb')
402 def load_object(self, segment, object):
403 accessed_segments.add(segment)
404 path = os.path.join(self.get_cachedir(), segment, object)
405 if not os.access(path, os.R_OK):
406 self.extract_segment(segment)
407 if segment in self._lru_list: self._lru_list.remove(segment)
408 self._lru_list.append(segment)
409 while len(self._lru_list) > self.CACHE_SIZE:
410 os.system("rm -rf " + os.path.join(self.cachedir,
412 self._lru_list = self._lru_list[1:]
413 return open(path, 'rb').read()
415 def get(self, refstr):
416 """Fetch the given object and return it.
418 The input should be an object reference, in string form.
421 (segment, object, checksum, slice) = self.parse_ref(refstr)
423 if segment == "zero":
424 return "\0" * slice[1]
426 data = self.load_object(segment, object)
428 if checksum is not None:
429 verifier = ChecksumVerifier(checksum)
430 verifier.update(data)
431 if not verifier.valid():
434 if slice is not None:
435 (start, length, exact) = slice
436 # Note: The following assertion check may need to be commented out
437 # to restore from pre-v0.8 snapshots, as the syntax for
438 # size-assertion slices has changed.
439 if exact and len(data) != length: raise ValueError
440 data = data[start:start+length]
441 if len(data) != length: raise IndexError
446 self.backend.prefetch_generic()
448 def parse(lines, terminate=None):
449 """Generic parser for RFC822-style "Key: Value" data streams.
451 This parser can be used to read metadata logs and snapshot root descriptor
454 lines must be an iterable object which yields a sequence of lines of input.
456 If terminate is specified, it is used as a predicate to determine when to
457 stop reading input lines.
463 def make_result(result):
464 return dict((k, "".join(v)) for (k, v) in result.items())
467 # Strip off a trailing newline, if present
468 if len(l) > 0 and l[-1] == "\n":
471 if terminate is not None and terminate(l):
472 if len(result) > 0: yield make_result(result)
477 m = re.match(r"^([-\w]+):\s*(.*)$", l)
479 result[m.group(1)] = [m.group(2)]
480 last_key = m.group(1)
481 elif len(l) > 0 and l[0].isspace() and last_key is not None:
482 result[last_key].append(l)
486 if len(result) > 0: yield make_result(result)
488 def parse_full(lines):
490 return next(parse(lines))
491 except StopIteration:
494 def parse_metadata_version(s):
495 """Convert a string with the snapshot version format to a tuple."""
497 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
501 return tuple([int(d) for d in m.group(1).split(".")])
503 def read_metadata(object_store, root):
504 """Iterate through all lines in the metadata log, following references."""
506 # Stack for keeping track of recursion when following references to
507 # portions of the log. The last entry in the stack corresponds to the
508 # object currently being parsed. Each entry is a list of lines which have
509 # been reversed, so that popping successive lines from the end of each list
510 # will return lines of the metadata log in order.
513 def follow_ref(refstr):
514 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
515 lines = to_lines(object_store.get(refstr))
521 while len(stack) > 0:
528 # An indirect reference which we must follow?
529 if len(line) > 0 and line[0] == '@':
537 """Metadata for a single file (or directory or...) from a snapshot."""
539 # Functions for parsing various datatypes that can appear in a metadata log
543 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
544 if s.startswith("0x"):
546 elif s.startswith("0"):
553 """Decode a URI-encoded (%xx escapes) string."""
554 return cumulus.util.uri_decode_pathname(s)
558 """An unecoded string."""
563 """Decode a user/group to a tuple of uid/gid followed by name."""
565 uid = MetadataItem.decode_int(items[0])
568 if items[1].startswith("(") and items[1].endswith(")"):
569 name = MetadataItem.decode_str(items[1][1:-1])
573 def decode_device(s):
574 """Decode a device major/minor number."""
575 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
576 return (major, minor)
580 def __init__(self, fields, object_store):
581 """Initialize from a dictionary of key/value pairs from metadata log."""
584 self.object_store = object_store
586 self.items = self.Items()
587 for (k, v) in fields.items():
588 if k in self.field_types:
589 decoder = self.field_types[k]
590 setattr(self.items, k, decoder(v))
594 """Return an iterator for the data blocks that make up a file."""
596 # This traverses the list of blocks that make up a file, following
597 # indirect references. It is implemented in much the same way as
598 # read_metadata, so see that function for details of the technique.
600 objects = self.fields['data'].split()
604 def follow_ref(refstr):
605 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
606 objects = self.object_store.get(refstr).split()
608 stack.append(objects)
610 while len(stack) > 0:
617 # An indirect reference which we must follow?
618 if len(ref) > 0 and ref[0] == '@':
623 # Description of fields that might appear, and how they should be parsed.
624 MetadataItem.field_types = {
625 'name': MetadataItem.decode_str,
626 'type': MetadataItem.raw_str,
627 'mode': MetadataItem.decode_int,
628 'device': MetadataItem.decode_device,
629 'user': MetadataItem.decode_user,
630 'group': MetadataItem.decode_user,
631 'ctime': MetadataItem.decode_int,
632 'mtime': MetadataItem.decode_int,
633 'links': MetadataItem.decode_int,
634 'inode': MetadataItem.raw_str,
635 'checksum': MetadataItem.decode_str,
636 'size': MetadataItem.decode_int,
637 'contents': MetadataItem.decode_str,
638 'target': MetadataItem.decode_str,
641 def iterate_metadata(object_store, root):
642 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
643 yield MetadataItem(d, object_store)
646 """Access to the local database of snapshot contents and object checksums.
648 The local database is consulted when creating a snapshot to determine what
649 data can be re-used from old snapshots. Segment cleaning is performed by
650 manipulating the data in the local database; the local database also
651 includes enough data to guide the segment cleaning process.
654 def __init__(self, path, dbname="localdb.sqlite"):
655 self.db_connection = sqlite3.connect(path + "/" + dbname)
657 # Low-level database access. Use these methods when there isn't a
658 # higher-level interface available. Exception: do, however, remember to
659 # use the commit() method after making changes to make sure they are
660 # actually saved, even when going through higher-level interfaces.
662 "Commit any pending changes to the local database."
663 self.db_connection.commit()
666 "Roll back any pending changes to the local database."
667 self.db_connection.rollback()
670 "Return a DB-API cursor for directly accessing the local database."
671 return self.db_connection.cursor()
673 def list_schemes(self):
674 """Return the list of snapshots found in the local database.
676 The returned value is a list of tuples (id, scheme, name, time, intent).
680 cur.execute("select distinct scheme from snapshots")
681 schemes = [row[0] for row in cur.fetchall()]
685 def list_snapshots(self, scheme):
686 """Return a list of snapshots for the given scheme."""
688 cur.execute("select name from snapshots")
689 snapshots = [row[0] for row in cur.fetchall()]
693 def delete_snapshot(self, scheme, name):
694 """Remove the specified snapshot from the database.
696 Warning: This does not garbage collect all dependent data in the
697 database, so it must be followed by a call to garbage_collect() to make
698 the database consistent.
701 cur.execute("delete from snapshots where scheme = ? and name = ?",
704 def prune_old_snapshots(self, scheme, intent=1.0):
705 """Delete entries from old snapshots from the database.
707 Only snapshots with the specified scheme name will be deleted. If
708 intent is given, it gives the intended next snapshot type, to determine
709 how aggressively to clean (for example, intent=7 could be used if the
710 next snapshot will be a weekly snapshot).
715 # Find the id of the last snapshot to be created. This is used for
716 # measuring time in a way: we record this value in each segment we
717 # expire on this run, and then on a future run can tell if there have
718 # been intervening backups made.
719 cur.execute("select max(snapshotid) from snapshots")
720 last_snapshotid = cur.fetchone()[0]
722 # Get the list of old snapshots for this scheme. Delete all the old
723 # ones. Rules for what to keep:
724 # - Always keep the most recent snapshot.
725 # - If snapshot X is younger than Y, and X has higher intent, then Y
727 cur.execute("""select snapshotid, name, intent,
728 julianday('now') - timestamp as age
729 from snapshots where scheme = ?
730 order by age""", (scheme,))
734 for (id, name, snap_intent, snap_age) in cur.fetchall():
736 if snap_intent < max_intent:
737 # Delete small-intent snapshots if there is a more recent
738 # large-intent snapshot.
740 elif snap_intent == intent:
741 # Delete previous snapshots with the specified intent level.
744 if can_delete and not first:
745 print("Delete snapshot %d (%s)" % (id, name))
746 cur.execute("delete from snapshots where snapshotid = ?",
749 max_intent = max(max_intent, snap_intent)
751 self.garbage_collect()
753 def garbage_collect(self):
754 """Garbage-collect unreachable segment and object data.
756 Remove all segments and checksums which is not reachable from the
757 current set of snapshots stored in the local database.
761 # Delete entries in the segment_utilization table which are for
762 # non-existent snapshots.
763 cur.execute("""delete from segment_utilization
764 where snapshotid not in
765 (select snapshotid from snapshots)""")
767 # Delete segments not referenced by any current snapshots.
768 cur.execute("""delete from segments where segmentid not in
769 (select segmentid from segment_utilization)""")
771 # Delete dangling objects in the block_index table.
772 cur.execute("""delete from block_index
773 where segmentid not in
774 (select segmentid from segments)""")
776 # Remove sub-block signatures for deleted objects.
777 cur.execute("""delete from subblock_signatures
779 (select blockid from block_index)""")
782 class SegmentInfo(Struct): pass
784 def get_segment_cleaning_list(self, age_boost=0.0):
785 """Return a list of all current segments with information for cleaning.
787 Return all segments which are currently known in the local database
788 (there might be other, older segments in the archive itself), and
789 return usage statistics for each to help decide which segments to
792 The returned list will be sorted by estimated cleaning benefit, with
793 segments that are best to clean at the start of the list.
795 If specified, the age_boost parameter (measured in days) will added to
796 the age of each segment, as a way of adjusting the benefit computation
797 before a long-lived snapshot is taken (for example, age_boost might be
798 set to 7 when cleaning prior to taking a weekly snapshot).
803 cur.execute("""select segmentid, used, size, mtime,
804 julianday('now') - mtime as age from segment_info
805 where expire_time is null""")
807 info = self.SegmentInfo()
809 info.used_bytes = row[1]
810 info.size_bytes = row[2]
812 info.age_days = row[4]
814 # If data is not available for whatever reason, treat it as 0.0.
815 if info.age_days is None:
817 if info.used_bytes is None:
818 info.used_bytes = 0.0
820 # Benefit calculation: u is the estimated fraction of each segment
821 # which is utilized (bytes belonging to objects still in use
822 # divided by total size; this doesn't take compression or storage
823 # overhead into account, but should give a reasonable estimate).
825 # The total benefit is a heuristic that combines several factors:
826 # the amount of space that can be reclaimed (1 - u), an ageing
827 # factor (info.age_days) that favors cleaning old segments to young
828 # ones and also is more likely to clean segments that will be
829 # rewritten for long-lived snapshots (age_boost), and finally a
830 # penalty factor for the cost of re-uploading data (u + 0.1).
831 u = info.used_bytes / info.size_bytes
832 info.cleaning_benefit \
833 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
835 segments.append(info)
837 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
840 def mark_segment_expired(self, segment):
841 """Mark a segment for cleaning in the local database.
843 The segment parameter should be either a SegmentInfo object or an
844 integer segment id. Objects in the given segment will be marked as
845 expired, which means that any future snapshots that would re-use those
846 objects will instead write out a new copy of the object, and thus no
847 future snapshots will depend upon the given segment.
850 if isinstance(segment, int):
852 elif isinstance(segment, self.SegmentInfo):
855 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
858 cur.execute("select max(snapshotid) from snapshots")
859 last_snapshotid = cur.fetchone()[0]
860 cur.execute("update segments set expire_time = ? where segmentid = ?",
861 (last_snapshotid, id))
862 cur.execute("update block_index set expired = 0 where segmentid = ?",
865 def balance_expired_objects(self):
866 """Analyze expired objects in segments to be cleaned and group by age.
868 Update the block_index table of the local database to group expired
869 objects by age. The exact number of buckets and the cutoffs for each
870 are dynamically determined. Calling this function after marking
871 segments expired will help in the segment cleaning process, by ensuring
872 that when active objects from clean segments are rewritten, they will
873 be placed into new segments roughly grouped by age.
876 # The expired column of the block_index table is used when generating a
877 # new Cumulus snapshot. A null value indicates that an object may be
878 # re-used. Otherwise, an object must be written into a new segment if
879 # needed. Objects with distinct expired values will be written into
880 # distinct segments, to allow for some grouping by age. The value 0 is
881 # somewhat special in that it indicates any rewritten objects can be
882 # placed in the same segment as completely new objects; this can be
883 # used for very young objects which have been expired, or objects not
884 # expected to be encountered.
886 # In the balancing process, all objects which are not used in any
887 # current snapshots will have expired set to 0. Objects which have
888 # been seen will be sorted by age and will have expired values set to
889 # 0, 1, 2, and so on based on age (with younger objects being assigned
890 # lower values). The number of buckets and the age cutoffs is
891 # determined by looking at the distribution of block ages.
895 # Mark all expired objects with expired = 0; these objects will later
896 # have values set to indicate groupings of objects when repacking.
897 cur.execute("""update block_index set expired = 0
898 where expired is not null""")
900 # We will want to aim for at least one full segment for each bucket
901 # that we eventually create, but don't know how many bytes that should
902 # be due to compression. So compute the average number of bytes in
903 # each expired segment as a rough estimate for the minimum size of each
904 # bucket. (This estimate could be thrown off by many not-fully-packed
905 # segments, but for now don't worry too much about that.) If we can't
906 # compute an average, it's probably because there are no expired
907 # segments, so we have no more work to do.
908 cur.execute("""select avg(size) from segments
910 (select distinct segmentid from block_index
911 where expired is not null)""")
912 segment_size_estimate = cur.fetchone()[0]
913 if not segment_size_estimate:
916 # Next, extract distribution of expired objects (number and size) by
917 # age. Save the timestamp for "now" so that the classification of
918 # blocks into age buckets will not change later in the function, after
919 # time has passed. Set any timestamps in the future to now, so we are
920 # guaranteed that for the rest of this function, age is always
922 cur.execute("select julianday('now')")
923 now = cur.fetchone()[0]
925 cur.execute("""update block_index set timestamp = ?
926 where timestamp > ? and expired is not null""",
929 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
930 from block_index where expired = 0
931 group by age order by age""", (now,))
932 distribution = cur.fetchall()
934 # Start to determine the buckets for expired objects. Heuristics used:
935 # - An upper bound on the number of buckets is given by the number of
936 # segments we estimate it will take to store all data. In fact,
937 # aim for a couple of segments per bucket.
938 # - Place very young objects in bucket 0 (place with new objects)
939 # unless there are enough of them to warrant a separate bucket.
940 # - Try not to create unnecessarily many buckets, since fewer buckets
941 # will allow repacked data to be grouped based on spatial locality
942 # (while more buckets will group by temporal locality). We want a
945 total_bytes = sum([i[2] for i in distribution])
946 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
947 min_size = 1.5 * segment_size_estimate
948 target_size = max(2 * segment_size_estimate,
949 total_bytes / target_buckets)
951 print("segment_size:", segment_size_estimate)
952 print("distribution:", distribution)
953 print("total_bytes:", total_bytes)
954 print("target_buckets:", target_buckets)
955 print("min, target size:", min_size, target_size)
957 # Chosen cutoffs. Each bucket consists of objects with age greater
958 # than one cutoff value, but not greater than the next largest cutoff.
961 # Starting with the oldest objects, begin grouping together into
962 # buckets of size at least target_size bytes.
963 distribution.reverse()
965 min_age_bucket = False
966 for (age, items, size) in distribution:
967 if bucket_size >= target_size \
968 or (age < MIN_AGE and not min_age_bucket):
969 if bucket_size < target_size and len(cutoffs) > 0:
976 min_age_bucket = True
978 # The last (youngest) bucket will be group 0, unless it has enough data
979 # to be of size min_size by itself, or there happen to be no objects
980 # less than MIN_AGE at all.
981 if bucket_size >= min_size or not min_age_bucket:
985 print("cutoffs:", cutoffs)
987 # Update the database to assign each object to the appropriate bucket.
989 for i in range(len(cutoffs)):
990 cur.execute("""update block_index set expired = ?
991 where round(? - timestamp) > ?
992 and expired is not null""",
993 (i, now, cutoffs[i]))