1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 """High-level interface for working with Cumulus archives.
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23 - listing the snapshots and segments present
24 - reading segment contents
25 - parsing snapshot descriptors and snapshot metadata logs
26 - reading and maintaining the local object database
29 from __future__ import division, print_function, unicode_literals
43 import thread as _thread
46 import cumulus.store.file
49 StringTypes = (str, unicode)
53 # The largest supported snapshot format that can be understood.
54 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
56 # Maximum number of nested indirect references allowed in a snapshot.
57 MAX_RECURSION_DEPTH = 3
59 # All segments which have been accessed this session.
60 accessed_segments = set()
62 # Table of methods used to filter segments before storage, and corresponding
63 # filename extensions. These are listed in priority order (methods earlier in
64 # the list are tried first).
66 (".gpg", "cumulus-filter-gpg --decrypt"),
68 (".bz2", "bzip2 -dc"),
73 """Decode binary data from a file into a sequence of lines.
75 Newline markers are retained."""
76 return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
79 """Decode a URI-encoded (%xx escapes) string."""
80 def hex_decode(m): return chr(int(m.group(1), 16))
81 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
83 """Encode a string to URI-encoded (%xx escapes) form."""
85 if c > '+' and c < '\x7f' and c != '@':
88 return "%%%02x" % (ord(c),)
89 return ''.join(hex_encode(c) for c in s)
92 """A class which merely acts as a data container.
94 Instances of this class (or its subclasses) are merely used to store data
95 in various attributes. No methods are provided.
99 return "<%s %s>" % (self.__class__, self.__dict__)
101 CHECKSUM_ALGORITHMS = {
102 'sha1': hashlib.sha1,
103 'sha224': hashlib.sha224,
104 'sha256': hashlib.sha256,
107 class ChecksumCreator:
108 """Compute a Cumulus checksum for provided data.
110 The algorithm used is selectable, but currently defaults to sha1.
113 def __init__(self, algorithm='sha1'):
114 self.algorithm = algorithm
115 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
117 def update(self, data):
118 self.hash.update(data)
122 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
124 class ChecksumVerifier:
125 """Verify whether a checksum from a snapshot matches the supplied data."""
127 def __init__(self, checksumstr):
128 """Create an object to check the supplied checksum."""
130 (algo, checksum) = checksumstr.split("=", 1)
131 self.checksum = checksum
132 self.hash = CHECKSUM_ALGORITHMS[algo]()
134 def update(self, data):
135 self.hash.update(data)
138 """Return a boolean indicating whether the checksum matches."""
140 result = self.hash.hexdigest()
141 return result == self.checksum
143 class SearchPathEntry(object):
144 """Item representing a possible search location for Cumulus files.
146 Some Cumulus files might be stored in multiple possible file locations: due
147 to format (different compression mechanisms with different extensions),
148 locality (different segments might be placed in different directories to
149 control archiving policies), for backwards compatibility (default location
150 changed over time). A SearchPathEntry describes a possible location for a
153 def __init__(self, directory_prefix, suffix, context=None):
154 self._directory_prefix = directory_prefix
155 self._suffix = suffix
156 self._context = context
159 return "%s(%r, %r, %r)" % (self.__class__.__name__,
160 self._directory_prefix, self._suffix,
163 def build_path(self, basename):
164 """Construct the search path to use for a file with name basename.
166 Returns a tuple (pathname, context), where pathname is the path to try
167 and context is any additional data associated with this search entry
170 return (os.path.join(self._directory_prefix, basename + self._suffix),
173 class SearchPath(object):
174 """A collection of locations to search for files and lookup utilities.
176 For looking for a file in a Cumulus storage backend, a SearchPath object
177 contains a list of possible locations to try. A SearchPath can be used to
178 perform the search as well; when a file is found the search path ordering
179 is updated (moving the successful SearchPathEntry to the front of the list
180 for future searches).
182 def __init__(self, name_regex, searchpath):
183 self._regex = re.compile(name_regex)
184 self._path = list(searchpath)
186 def add_search_entry(self, entry):
187 self._path.append(entry)
189 def directories(self):
190 """Return the set of directories to search for a file type."""
191 return set(entry._directory_prefix for entry in self._path)
193 def get(self, backend, basename):
194 for (i, entry) in enumerate(self._path):
196 (pathname, context) = entry.build_path(basename)
197 fp = backend.get(pathname)
198 # On success, move this entry to the front of the search path
199 # to speed future searches.
202 self._path.insert(0, entry)
203 return (fp, pathname, context)
204 except cumulus.store.NotFoundError:
206 raise cumulus.store.NotFoundError(basename)
208 def stat(self, backend, basename):
209 for (i, entry) in enumerate(self._path):
211 (pathname, context) = entry.build_path(basename)
212 stat_data = backend.stat(pathname)
213 # On success, move this entry to the front of the search path
214 # to speed future searches.
217 self._path.insert(0, entry)
218 result = {"path": pathname}
219 result.update(stat_data)
221 except cumulus.store.NotFoundError:
223 raise cumulus.store.NotFoundError(basename)
225 def match(self, filename):
226 return self._regex.match(filename)
228 def list(self, backend):
230 for d in self.directories():
232 for f in backend.list(d):
235 if m: yield (os.path.join(d, f), m)
236 except cumulus.store.NotFoundError:
239 raise cumulus.store.NotFoundError(backend)
241 def _build_segments_searchpath(prefix):
242 for (extension, filter) in SEGMENT_FILTERS:
243 yield SearchPathEntry(prefix, extension, filter)
246 "checksums": SearchPath(
247 r"^snapshot-(.*)\.(\w+)sums$",
248 [SearchPathEntry("meta", ".sha1sums"),
249 SearchPathEntry("checksums", ".sha1sums"),
250 SearchPathEntry("", ".sha1sums")]),
252 r"^snapshot-(.*)\.meta(\.\S+)?$",
253 _build_segments_searchpath("meta")),
254 "segments": SearchPath(
255 (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
258 _build_segments_searchpath("segments0"),
259 _build_segments_searchpath("segments1"),
260 _build_segments_searchpath(""),
261 _build_segments_searchpath("segments"))),
262 "snapshots": SearchPath(
263 r"^snapshot-(.*)\.(cumulus|lbs)$",
264 [SearchPathEntry("snapshots", ".cumulus"),
265 SearchPathEntry("snapshots", ".lbs"),
266 SearchPathEntry("", ".cumulus"),
267 SearchPathEntry("", ".lbs")]),
270 class BackendWrapper(object):
271 """Wrapper around a Cumulus storage backend that understands file types.
273 The BackendWrapper class understands different Cumulus file types, such as
274 snapshots and segments, and implements higher-level operations such as
275 "retrieve a snapshot with a specific name" (hiding operations such as
276 searching for the correct file name).
279 def __init__(self, backend):
280 """Initializes a wrapper around the specified storage backend.
282 store may either be a Store object or URL.
284 if type(backend) in StringTypes:
285 self._backend = cumulus.store.open(backend)
287 self._backend = backend
290 def raw_backend(self):
293 def stat_generic(self, basename, filetype):
294 return SEARCH_PATHS[filetype].stat(self._backend, basename)
296 def open_generic(self, basename, filetype):
297 return SEARCH_PATHS[filetype].get(self._backend, basename)
299 def open_snapshot(self, name):
300 return self.open_generic("snapshot-" + name, "snapshots")
302 def open_segment(self, name):
303 return self.open_generic(name + ".tar", "segments")
305 def list_generic(self, filetype):
306 return ((x[1].group(1), x[0])
307 for x in SEARCH_PATHS[filetype].list(self._backend))
309 def prefetch_generic(self):
310 """Calls scan on directories to prefetch file metadata."""
312 for typeinfo in SEARCH_PATHS.values():
313 directories.update(typeinfo.directories())
314 for d in directories:
316 self._backend.scan(d)
319 def __init__(self, backend):
320 if isinstance(backend, BackendWrapper):
321 self.backend = backend
323 self.backend = BackendWrapper(backend)
328 def get_cachedir(self):
329 if self.cachedir is None:
330 self.cachedir = tempfile.mkdtemp("-cumulus")
334 if self.cachedir is not None:
335 # TODO: Avoid use of system, make this safer
336 os.system("rm -rf " + self.cachedir)
340 def parse_ref(refstr):
341 m = re.match(r"^zero\[(\d+)\]$", refstr)
343 return ("zero", None, None, (0, int(m.group(1)), False))
345 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
350 checksum = m.group(3)
353 if checksum is not None:
354 checksum = checksum.lstrip("(").rstrip(")")
356 if slice is not None:
357 if m.group(9) is not None:
358 # Size-assertion slice
359 slice = (0, int(m.group(9)), True)
360 elif m.group(6) is None:
362 slice = (0, int(m.group(8)), False)
364 slice = (int(m.group(7)), int(m.group(8)), False)
366 return (segment, object, checksum, slice)
368 def list_snapshots(self):
369 return set(x[0] for x in self.backend.list_generic("snapshots"))
371 def list_segments(self):
372 return set(x[0] for x in self.backend.list_generic("segments"))
374 def load_snapshot(self, snapshot):
375 snapshot_file = self.backend.open_snapshot(snapshot)[0]
376 return to_lines(snapshot_file.read())
379 def filter_data(filehandle, filter_cmd):
380 if filter_cmd is None:
382 (input, output) = os.popen2(filter_cmd)
383 def copy_thread(src, dst):
386 block = src.read(BLOCK_SIZE)
387 if len(block) == 0: break
391 _thread.start_new_thread(copy_thread, (filehandle, input))
394 def get_segment(self, segment):
395 accessed_segments.add(segment)
397 (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
398 return self.filter_data(segment_fp, filter_cmd)
400 def load_segment(self, segment):
401 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
403 data_obj = seg.extractfile(item)
404 path = item.name.split('/')
405 if len(path) == 2 and path[0] == segment:
406 yield (path[1], data_obj.read())
408 def extract_segment(self, segment):
409 segdir = os.path.join(self.get_cachedir(), segment)
411 for (object, data) in self.load_segment(segment):
412 f = open(os.path.join(segdir, object), 'wb')
416 def load_object(self, segment, object):
417 accessed_segments.add(segment)
418 path = os.path.join(self.get_cachedir(), segment, object)
419 if not os.access(path, os.R_OK):
420 self.extract_segment(segment)
421 if segment in self._lru_list: self._lru_list.remove(segment)
422 self._lru_list.append(segment)
423 while len(self._lru_list) > self.CACHE_SIZE:
424 os.system("rm -rf " + os.path.join(self.cachedir,
426 self._lru_list = self._lru_list[1:]
427 return open(path, 'rb').read()
429 def get(self, refstr):
430 """Fetch the given object and return it.
432 The input should be an object reference, in string form.
435 (segment, object, checksum, slice) = self.parse_ref(refstr)
437 if segment == "zero":
438 return "\0" * slice[1]
440 data = self.load_object(segment, object)
442 if checksum is not None:
443 verifier = ChecksumVerifier(checksum)
444 verifier.update(data)
445 if not verifier.valid():
448 if slice is not None:
449 (start, length, exact) = slice
450 if exact and len(data) != length: raise ValueError
451 data = data[start:start+length]
452 if len(data) != length: raise IndexError
457 self.backend.prefetch_generic()
459 def parse(lines, terminate=None):
460 """Generic parser for RFC822-style "Key: Value" data streams.
462 This parser can be used to read metadata logs and snapshot root descriptor
465 lines must be an iterable object which yields a sequence of lines of input.
467 If terminate is specified, it is used as a predicate to determine when to
468 stop reading input lines.
475 # Strip off a trailing newline, if present
476 if len(l) > 0 and l[-1] == "\n":
479 if terminate is not None and terminate(l):
480 if len(dict) > 0: yield dict
485 m = re.match(r"^([-\w]+):\s*(.*)$", l)
487 dict[m.group(1)] = m.group(2)
488 last_key = m.group(1)
489 elif len(l) > 0 and l[0].isspace() and last_key is not None:
494 if len(dict) > 0: yield dict
496 def parse_full(lines):
498 return next(parse(lines))
499 except StopIteration:
502 def parse_metadata_version(s):
503 """Convert a string with the snapshot version format to a tuple."""
505 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
509 return tuple([int(d) for d in m.group(1).split(".")])
511 def read_metadata(object_store, root):
512 """Iterate through all lines in the metadata log, following references."""
514 # Stack for keeping track of recursion when following references to
515 # portions of the log. The last entry in the stack corresponds to the
516 # object currently being parsed. Each entry is a list of lines which have
517 # been reversed, so that popping successive lines from the end of each list
518 # will return lines of the metadata log in order.
521 def follow_ref(refstr):
522 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
523 lines = to_lines(object_store.get(refstr))
529 while len(stack) > 0:
536 # An indirect reference which we must follow?
537 if len(line) > 0 and line[0] == '@':
545 """Metadata for a single file (or directory or...) from a snapshot."""
547 # Functions for parsing various datatypes that can appear in a metadata log
551 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
552 if s.startswith("0x"):
554 elif s.startswith("0"):
561 """Decode a URI-encoded (%xx escapes) string."""
566 """An unecoded string."""
571 """Decode a user/group to a tuple of uid/gid followed by name."""
573 uid = MetadataItem.decode_int(items[0])
576 if items[1].startswith("(") and items[1].endswith(")"):
577 name = MetadataItem.decode_str(items[1][1:-1])
581 def decode_device(s):
582 """Decode a device major/minor number."""
583 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
584 return (major, minor)
588 def __init__(self, fields, object_store):
589 """Initialize from a dictionary of key/value pairs from metadata log."""
592 self.object_store = object_store
594 self.items = self.Items()
595 for (k, v) in fields.items():
596 if k in self.field_types:
597 decoder = self.field_types[k]
598 setattr(self.items, k, decoder(v))
602 """Return an iterator for the data blocks that make up a file."""
604 # This traverses the list of blocks that make up a file, following
605 # indirect references. It is implemented in much the same way as
606 # read_metadata, so see that function for details of the technique.
608 objects = self.fields['data'].split()
612 def follow_ref(refstr):
613 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
614 objects = self.object_store.get(refstr).split()
616 stack.append(objects)
618 while len(stack) > 0:
625 # An indirect reference which we must follow?
626 if len(ref) > 0 and ref[0] == '@':
631 # Description of fields that might appear, and how they should be parsed.
632 MetadataItem.field_types = {
633 'name': MetadataItem.decode_str,
634 'type': MetadataItem.raw_str,
635 'mode': MetadataItem.decode_int,
636 'device': MetadataItem.decode_device,
637 'user': MetadataItem.decode_user,
638 'group': MetadataItem.decode_user,
639 'ctime': MetadataItem.decode_int,
640 'mtime': MetadataItem.decode_int,
641 'links': MetadataItem.decode_int,
642 'inode': MetadataItem.raw_str,
643 'checksum': MetadataItem.decode_str,
644 'size': MetadataItem.decode_int,
645 'contents': MetadataItem.decode_str,
646 'target': MetadataItem.decode_str,
649 def iterate_metadata(object_store, root):
650 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
651 yield MetadataItem(d, object_store)
654 """Access to the local database of snapshot contents and object checksums.
656 The local database is consulted when creating a snapshot to determine what
657 data can be re-used from old snapshots. Segment cleaning is performed by
658 manipulating the data in the local database; the local database also
659 includes enough data to guide the segment cleaning process.
662 def __init__(self, path, dbname="localdb.sqlite"):
663 self.db_connection = sqlite3.connect(path + "/" + dbname)
665 # Low-level database access. Use these methods when there isn't a
666 # higher-level interface available. Exception: do, however, remember to
667 # use the commit() method after making changes to make sure they are
668 # actually saved, even when going through higher-level interfaces.
670 "Commit any pending changes to the local database."
671 self.db_connection.commit()
674 "Roll back any pending changes to the local database."
675 self.db_connection.rollback()
678 "Return a DB-API cursor for directly accessing the local database."
679 return self.db_connection.cursor()
681 def list_schemes(self):
682 """Return the list of snapshots found in the local database.
684 The returned value is a list of tuples (id, scheme, name, time, intent).
688 cur.execute("select distinct scheme from snapshots")
689 schemes = [row[0] for row in cur.fetchall()]
693 def list_snapshots(self, scheme):
694 """Return a list of snapshots for the given scheme."""
696 cur.execute("select name from snapshots")
697 snapshots = [row[0] for row in cur.fetchall()]
701 def delete_snapshot(self, scheme, name):
702 """Remove the specified snapshot from the database.
704 Warning: This does not garbage collect all dependent data in the
705 database, so it must be followed by a call to garbage_collect() to make
706 the database consistent.
709 cur.execute("delete from snapshots where scheme = ? and name = ?",
712 def prune_old_snapshots(self, scheme, intent=1.0):
713 """Delete entries from old snapshots from the database.
715 Only snapshots with the specified scheme name will be deleted. If
716 intent is given, it gives the intended next snapshot type, to determine
717 how aggressively to clean (for example, intent=7 could be used if the
718 next snapshot will be a weekly snapshot).
723 # Find the id of the last snapshot to be created. This is used for
724 # measuring time in a way: we record this value in each segment we
725 # expire on this run, and then on a future run can tell if there have
726 # been intervening backups made.
727 cur.execute("select max(snapshotid) from snapshots")
728 last_snapshotid = cur.fetchone()[0]
730 # Get the list of old snapshots for this scheme. Delete all the old
731 # ones. Rules for what to keep:
732 # - Always keep the most recent snapshot.
733 # - If snapshot X is younger than Y, and X has higher intent, then Y
735 cur.execute("""select snapshotid, name, intent,
736 julianday('now') - timestamp as age
737 from snapshots where scheme = ?
738 order by age""", (scheme,))
742 for (id, name, snap_intent, snap_age) in cur.fetchall():
744 if snap_intent < max_intent:
745 # Delete small-intent snapshots if there is a more recent
746 # large-intent snapshot.
748 elif snap_intent == intent:
749 # Delete previous snapshots with the specified intent level.
752 if can_delete and not first:
753 print("Delete snapshot %d (%s)" % (id, name))
754 cur.execute("delete from snapshots where snapshotid = ?",
757 max_intent = max(max_intent, snap_intent)
759 self.garbage_collect()
761 def garbage_collect(self):
762 """Garbage-collect unreachable segment and object data.
764 Remove all segments and checksums which is not reachable from the
765 current set of snapshots stored in the local database.
769 # Delete entries in the segment_utilization table which are for
770 # non-existent snapshots.
771 cur.execute("""delete from segment_utilization
772 where snapshotid not in
773 (select snapshotid from snapshots)""")
775 # Delete segments not referenced by any current snapshots.
776 cur.execute("""delete from segments where segmentid not in
777 (select segmentid from segment_utilization)""")
779 # Delete dangling objects in the block_index table.
780 cur.execute("""delete from block_index
781 where segmentid not in
782 (select segmentid from segments)""")
784 # Remove sub-block signatures for deleted objects.
785 cur.execute("""delete from subblock_signatures
787 (select blockid from block_index)""")
790 class SegmentInfo(Struct): pass
792 def get_segment_cleaning_list(self, age_boost=0.0):
793 """Return a list of all current segments with information for cleaning.
795 Return all segments which are currently known in the local database
796 (there might be other, older segments in the archive itself), and
797 return usage statistics for each to help decide which segments to
800 The returned list will be sorted by estimated cleaning benefit, with
801 segments that are best to clean at the start of the list.
803 If specified, the age_boost parameter (measured in days) will added to
804 the age of each segment, as a way of adjusting the benefit computation
805 before a long-lived snapshot is taken (for example, age_boost might be
806 set to 7 when cleaning prior to taking a weekly snapshot).
811 cur.execute("""select segmentid, used, size, mtime,
812 julianday('now') - mtime as age from segment_info
813 where expire_time is null""")
815 info = self.SegmentInfo()
817 info.used_bytes = row[1]
818 info.size_bytes = row[2]
820 info.age_days = row[4]
822 # If data is not available for whatever reason, treat it as 0.0.
823 if info.age_days is None:
825 if info.used_bytes is None:
826 info.used_bytes = 0.0
828 # Benefit calculation: u is the estimated fraction of each segment
829 # which is utilized (bytes belonging to objects still in use
830 # divided by total size; this doesn't take compression or storage
831 # overhead into account, but should give a reasonable estimate).
833 # The total benefit is a heuristic that combines several factors:
834 # the amount of space that can be reclaimed (1 - u), an ageing
835 # factor (info.age_days) that favors cleaning old segments to young
836 # ones and also is more likely to clean segments that will be
837 # rewritten for long-lived snapshots (age_boost), and finally a
838 # penalty factor for the cost of re-uploading data (u + 0.1).
839 u = info.used_bytes / info.size_bytes
840 info.cleaning_benefit \
841 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
843 segments.append(info)
845 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
848 def mark_segment_expired(self, segment):
849 """Mark a segment for cleaning in the local database.
851 The segment parameter should be either a SegmentInfo object or an
852 integer segment id. Objects in the given segment will be marked as
853 expired, which means that any future snapshots that would re-use those
854 objects will instead write out a new copy of the object, and thus no
855 future snapshots will depend upon the given segment.
858 if isinstance(segment, int):
860 elif isinstance(segment, self.SegmentInfo):
863 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
866 cur.execute("select max(snapshotid) from snapshots")
867 last_snapshotid = cur.fetchone()[0]
868 cur.execute("update segments set expire_time = ? where segmentid = ?",
869 (last_snapshotid, id))
870 cur.execute("update block_index set expired = 0 where segmentid = ?",
873 def balance_expired_objects(self):
874 """Analyze expired objects in segments to be cleaned and group by age.
876 Update the block_index table of the local database to group expired
877 objects by age. The exact number of buckets and the cutoffs for each
878 are dynamically determined. Calling this function after marking
879 segments expired will help in the segment cleaning process, by ensuring
880 that when active objects from clean segments are rewritten, they will
881 be placed into new segments roughly grouped by age.
884 # The expired column of the block_index table is used when generating a
885 # new Cumulus snapshot. A null value indicates that an object may be
886 # re-used. Otherwise, an object must be written into a new segment if
887 # needed. Objects with distinct expired values will be written into
888 # distinct segments, to allow for some grouping by age. The value 0 is
889 # somewhat special in that it indicates any rewritten objects can be
890 # placed in the same segment as completely new objects; this can be
891 # used for very young objects which have been expired, or objects not
892 # expected to be encountered.
894 # In the balancing process, all objects which are not used in any
895 # current snapshots will have expired set to 0. Objects which have
896 # been seen will be sorted by age and will have expired values set to
897 # 0, 1, 2, and so on based on age (with younger objects being assigned
898 # lower values). The number of buckets and the age cutoffs is
899 # determined by looking at the distribution of block ages.
903 # Mark all expired objects with expired = 0; these objects will later
904 # have values set to indicate groupings of objects when repacking.
905 cur.execute("""update block_index set expired = 0
906 where expired is not null""")
908 # We will want to aim for at least one full segment for each bucket
909 # that we eventually create, but don't know how many bytes that should
910 # be due to compression. So compute the average number of bytes in
911 # each expired segment as a rough estimate for the minimum size of each
912 # bucket. (This estimate could be thrown off by many not-fully-packed
913 # segments, but for now don't worry too much about that.) If we can't
914 # compute an average, it's probably because there are no expired
915 # segments, so we have no more work to do.
916 cur.execute("""select avg(size) from segments
918 (select distinct segmentid from block_index
919 where expired is not null)""")
920 segment_size_estimate = cur.fetchone()[0]
921 if not segment_size_estimate:
924 # Next, extract distribution of expired objects (number and size) by
925 # age. Save the timestamp for "now" so that the classification of
926 # blocks into age buckets will not change later in the function, after
927 # time has passed. Set any timestamps in the future to now, so we are
928 # guaranteed that for the rest of this function, age is always
930 cur.execute("select julianday('now')")
931 now = cur.fetchone()[0]
933 cur.execute("""update block_index set timestamp = ?
934 where timestamp > ? and expired is not null""",
937 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
938 from block_index where expired = 0
939 group by age order by age""", (now,))
940 distribution = cur.fetchall()
942 # Start to determine the buckets for expired objects. Heuristics used:
943 # - An upper bound on the number of buckets is given by the number of
944 # segments we estimate it will take to store all data. In fact,
945 # aim for a couple of segments per bucket.
946 # - Place very young objects in bucket 0 (place with new objects)
947 # unless there are enough of them to warrant a separate bucket.
948 # - Try not to create unnecessarily many buckets, since fewer buckets
949 # will allow repacked data to be grouped based on spatial locality
950 # (while more buckets will group by temporal locality). We want a
953 total_bytes = sum([i[2] for i in distribution])
954 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
955 min_size = 1.5 * segment_size_estimate
956 target_size = max(2 * segment_size_estimate,
957 total_bytes / target_buckets)
959 print("segment_size:", segment_size_estimate)
960 print("distribution:", distribution)
961 print("total_bytes:", total_bytes)
962 print("target_buckets:", target_buckets)
963 print("min, target size:", min_size, target_size)
965 # Chosen cutoffs. Each bucket consists of objects with age greater
966 # than one cutoff value, but not greater than the next largest cutoff.
969 # Starting with the oldest objects, begin grouping together into
970 # buckets of size at least target_size bytes.
971 distribution.reverse()
973 min_age_bucket = False
974 for (age, items, size) in distribution:
975 if bucket_size >= target_size \
976 or (age < MIN_AGE and not min_age_bucket):
977 if bucket_size < target_size and len(cutoffs) > 0:
984 min_age_bucket = True
986 # The last (youngest) bucket will be group 0, unless it has enough data
987 # to be of size min_size by itself, or there happen to be no objects
988 # less than MIN_AGE at all.
989 if bucket_size >= min_size or not min_age_bucket:
993 print("cutoffs:", cutoffs)
995 # Update the database to assign each object to the appropriate bucket.
997 for i in range(len(cutoffs)):
998 cur.execute("""update block_index set expired = ?
999 where round(? - timestamp) > ?
1000 and expired is not null""",
1001 (i, now, cutoffs[i]))