1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 """High-level interface for working with Cumulus archives.
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23 - listing the snapshots and segments present
24 - reading segment contents
25 - parsing snapshot descriptors and snapshot metadata logs
26 - reading and maintaining the local object database
29 from __future__ import division, print_function, unicode_literals
42 import thread as _thread
45 import cumulus.store.file
48 StringTypes = (str, unicode)
52 # The largest supported snapshot format that can be understood.
53 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
55 # Maximum number of nested indirect references allowed in a snapshot.
56 MAX_RECURSION_DEPTH = 3
58 # All segments which have been accessed this session.
59 accessed_segments = set()
61 # Table of methods used to filter segments before storage, and corresponding
62 # filename extensions. These are listed in priority order (methods earlier in
63 # the list are tried first).
65 (".gpg", "cumulus-filter-gpg --decrypt"),
67 (".bz2", "bzip2 -dc"),
72 """Decode a URI-encoded (%xx escapes) string."""
73 def hex_decode(m): return chr(int(m.group(1), 16))
74 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
76 """Encode a string to URI-encoded (%xx escapes) form."""
78 if c > '+' and c < '\x7f' and c != '@':
81 return "%%%02x" % (ord(c),)
82 return ''.join(hex_encode(c) for c in s)
85 """A class which merely acts as a data container.
87 Instances of this class (or its subclasses) are merely used to store data
88 in various attributes. No methods are provided.
92 return "<%s %s>" % (self.__class__, self.__dict__)
94 CHECKSUM_ALGORITHMS = {
96 'sha224': hashlib.sha224,
97 'sha256': hashlib.sha256,
100 class ChecksumCreator:
101 """Compute a Cumulus checksum for provided data.
103 The algorithm used is selectable, but currently defaults to sha1.
106 def __init__(self, algorithm='sha1'):
107 self.algorithm = algorithm
108 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
110 def update(self, data):
111 self.hash.update(data)
115 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
117 class ChecksumVerifier:
118 """Verify whether a checksum from a snapshot matches the supplied data."""
120 def __init__(self, checksumstr):
121 """Create an object to check the supplied checksum."""
123 (algo, checksum) = checksumstr.split("=", 1)
124 self.checksum = checksum
125 self.hash = CHECKSUM_ALGORITHMS[algo]()
127 def update(self, data):
128 self.hash.update(data)
131 """Return a boolean indicating whether the checksum matches."""
133 result = self.hash.hexdigest()
134 return result == self.checksum
136 class SearchPathEntry(object):
137 """Item representing a possible search location for Cumulus files.
139 Some Cumulus files might be stored in multiple possible file locations: due
140 to format (different compression mechanisms with different extensions),
141 locality (different segments might be placed in different directories to
142 control archiving policies), for backwards compatibility (default location
143 changed over time). A SearchPathEntry describes a possible location for a
146 def __init__(self, directory_prefix, suffix, context=None):
147 self._directory_prefix = directory_prefix
148 self._suffix = suffix
149 self._context = context
152 return "%s(%r, %r, %r)" % (self.__class__.__name__,
153 self._directory_prefix, self._suffix,
156 def build_path(self, basename):
157 """Construct the search path to use for a file with name basename.
159 Returns a tuple (pathname, context), where pathname is the path to try
160 and context is any additional data associated with this search entry
163 return (os.path.join(self._directory_prefix, basename + self._suffix),
166 class SearchPath(object):
167 """A collection of locations to search for files and lookup utilities.
169 For looking for a file in a Cumulus storage backend, a SearchPath object
170 contains a list of possible locations to try. A SearchPath can be used to
171 perform the search as well; when a file is found the search path ordering
172 is updated (moving the successful SearchPathEntry to the front of the list
173 for future searches).
175 def __init__(self, name_regex, searchpath):
176 self._regex = re.compile(name_regex)
177 self._path = list(searchpath)
179 def add_search_entry(self, entry):
180 self._path.append(entry)
182 def directories(self):
183 """Return the set of directories to search for a file type."""
184 return set(entry._directory_prefix for entry in self._path)
186 def get(self, backend, basename):
187 for (i, entry) in enumerate(self._path):
189 (pathname, context) = entry.build_path(basename)
190 fp = backend.get(pathname)
191 # On success, move this entry to the front of the search path
192 # to speed future searches.
195 self._path.insert(0, entry)
196 return (fp, pathname, context)
197 except cumulus.store.NotFoundError:
199 raise cumulus.store.NotFoundError(basename)
201 def stat(self, backend, basename):
202 for (i, entry) in enumerate(self._path):
204 (pathname, context) = entry.build_path(basename)
205 stat_data = backend.stat(pathname)
206 # On success, move this entry to the front of the search path
207 # to speed future searches.
210 self._path.insert(0, entry)
211 result = {"path": pathname}
212 result.update(stat_data)
214 except cumulus.store.NotFoundError:
216 raise cumulus.store.NotFoundError(basename)
218 def match(self, filename):
219 return self._regex.match(filename)
221 def list(self, backend):
223 for d in self.directories():
225 for f in backend.list(d):
228 if m: yield (os.path.join(d, f), m)
229 except cumulus.store.NotFoundError:
232 raise cumulus.store.NotFoundError(backend)
234 def _build_segments_searchpath(prefix):
235 for (extension, filter) in SEGMENT_FILTERS:
236 yield SearchPathEntry(prefix, extension, filter)
239 "checksums": SearchPath(
240 r"^snapshot-(.*)\.(\w+)sums$",
241 [SearchPathEntry("meta", ".sha1sums"),
242 SearchPathEntry("checksums", ".sha1sums"),
243 SearchPathEntry("", ".sha1sums")]),
245 r"^snapshot-(.*)\.meta(\.\S+)?$",
246 _build_segments_searchpath("meta")),
247 "segments": SearchPath(
248 (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
251 _build_segments_searchpath("segments0"),
252 _build_segments_searchpath("segments1"),
253 _build_segments_searchpath(""),
254 _build_segments_searchpath("segments"))),
255 "snapshots": SearchPath(
256 r"^snapshot-(.*)\.(cumulus|lbs)$",
257 [SearchPathEntry("snapshots", ".cumulus"),
258 SearchPathEntry("snapshots", ".lbs"),
259 SearchPathEntry("", ".cumulus"),
260 SearchPathEntry("", ".lbs")]),
263 class BackendWrapper(object):
264 """Wrapper around a Cumulus storage backend that understands file types.
266 The BackendWrapper class understands different Cumulus file types, such as
267 snapshots and segments, and implements higher-level operations such as
268 "retrieve a snapshot with a specific name" (hiding operations such as
269 searching for the correct file name).
272 def __init__(self, backend):
273 """Initializes a wrapper around the specified storage backend.
275 store may either be a Store object or URL.
277 if type(backend) in StringTypes:
278 if backend.find(":") >= 0:
279 self._backend = cumulus.store.open(backend)
281 self._backend = cumulus.store.file.FileStore(backend)
283 self._backend = backend
286 def raw_backend(self):
289 def stat_generic(self, basename, filetype):
290 return SEARCH_PATHS[filetype].stat(self._backend, basename)
292 def open_generic(self, basename, filetype):
293 return SEARCH_PATHS[filetype].get(self._backend, basename)
295 def open_snapshot(self, name):
296 return self.open_generic("snapshot-" + name, "snapshots")
298 def open_segment(self, name):
299 return self.open_generic(name + ".tar", "segments")
301 def list_generic(self, filetype):
302 return ((x[1].group(1), x[0])
303 for x in SEARCH_PATHS[filetype].list(self._backend))
305 def prefetch_generic(self):
306 """Calls scan on directories to prefetch file metadata."""
308 for typeinfo in SEARCH_PATHS.values():
309 directories.update(typeinfo.directories())
310 for d in directories:
312 self._backend.scan(d)
315 def __init__(self, backend):
316 if isinstance(backend, BackendWrapper):
317 self.backend = backend
319 self.backend = BackendWrapper(backend)
324 def get_cachedir(self):
325 if self.cachedir is None:
326 self.cachedir = tempfile.mkdtemp("-cumulus")
330 if self.cachedir is not None:
331 # TODO: Avoid use of system, make this safer
332 os.system("rm -rf " + self.cachedir)
336 def parse_ref(refstr):
337 m = re.match(r"^zero\[(\d+)\]$", refstr)
339 return ("zero", None, None, (0, int(m.group(1)), False))
341 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
346 checksum = m.group(3)
349 if checksum is not None:
350 checksum = checksum.lstrip("(").rstrip(")")
352 if slice is not None:
353 if m.group(9) is not None:
354 # Size-assertion slice
355 slice = (0, int(m.group(9)), True)
356 elif m.group(6) is None:
358 slice = (0, int(m.group(8)), False)
360 slice = (int(m.group(7)), int(m.group(8)), False)
362 return (segment, object, checksum, slice)
364 def list_snapshots(self):
365 return set(x[0] for x in self.backend.list_generic("snapshots"))
367 def list_segments(self):
368 return set(x[0] for x in self.backend.list_generic("segments"))
370 def load_snapshot(self, snapshot):
371 snapshot_file = self.backend.open_snapshot(snapshot)[0]
372 return snapshot_file.read().splitlines(True)
375 def filter_data(filehandle, filter_cmd):
376 if filter_cmd is None:
378 (input, output) = os.popen2(filter_cmd)
379 def copy_thread(src, dst):
382 block = src.read(BLOCK_SIZE)
383 if len(block) == 0: break
387 _thread.start_new_thread(copy_thread, (filehandle, input))
390 def get_segment(self, segment):
391 accessed_segments.add(segment)
393 (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
394 return self.filter_data(segment_fp, filter_cmd)
396 def load_segment(self, segment):
397 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
399 data_obj = seg.extractfile(item)
400 path = item.name.split('/')
401 if len(path) == 2 and path[0] == segment:
402 yield (path[1], data_obj.read())
404 def extract_segment(self, segment):
405 segdir = os.path.join(self.get_cachedir(), segment)
407 for (object, data) in self.load_segment(segment):
408 f = open(os.path.join(segdir, object), 'wb')
412 def load_object(self, segment, object):
413 accessed_segments.add(segment)
414 path = os.path.join(self.get_cachedir(), segment, object)
415 if not os.access(path, os.R_OK):
416 self.extract_segment(segment)
417 if segment in self._lru_list: self._lru_list.remove(segment)
418 self._lru_list.append(segment)
419 while len(self._lru_list) > self.CACHE_SIZE:
420 os.system("rm -rf " + os.path.join(self.cachedir,
422 self._lru_list = self._lru_list[1:]
423 return open(path, 'rb').read()
425 def get(self, refstr):
426 """Fetch the given object and return it.
428 The input should be an object reference, in string form.
431 (segment, object, checksum, slice) = self.parse_ref(refstr)
433 if segment == "zero":
434 return "\0" * slice[1]
436 data = self.load_object(segment, object)
438 if checksum is not None:
439 verifier = ChecksumVerifier(checksum)
440 verifier.update(data)
441 if not verifier.valid():
444 if slice is not None:
445 (start, length, exact) = slice
446 if exact and len(data) != length: raise ValueError
447 data = data[start:start+length]
448 if len(data) != length: raise IndexError
453 self.backend.prefetch_generic()
455 def parse(lines, terminate=None):
456 """Generic parser for RFC822-style "Key: Value" data streams.
458 This parser can be used to read metadata logs and snapshot root descriptor
461 lines must be an iterable object which yields a sequence of lines of input.
463 If terminate is specified, it is used as a predicate to determine when to
464 stop reading input lines.
471 # Strip off a trailing newline, if present
472 if len(l) > 0 and l[-1] == "\n":
475 if terminate is not None and terminate(l):
476 if len(dict) > 0: yield dict
481 m = re.match(r"^([-\w]+):\s*(.*)$", l)
483 dict[m.group(1)] = m.group(2)
484 last_key = m.group(1)
485 elif len(l) > 0 and l[0].isspace() and last_key is not None:
490 if len(dict) > 0: yield dict
492 def parse_full(lines):
494 return next(parse(lines))
495 except StopIteration:
498 def parse_metadata_version(s):
499 """Convert a string with the snapshot version format to a tuple."""
501 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
505 return tuple([int(d) for d in m.group(1).split(".")])
507 def read_metadata(object_store, root):
508 """Iterate through all lines in the metadata log, following references."""
510 # Stack for keeping track of recursion when following references to
511 # portions of the log. The last entry in the stack corresponds to the
512 # object currently being parsed. Each entry is a list of lines which have
513 # been reversed, so that popping successive lines from the end of each list
514 # will return lines of the metadata log in order.
517 def follow_ref(refstr):
518 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
519 lines = object_store.get(refstr).splitlines(True)
525 while len(stack) > 0:
532 # An indirect reference which we must follow?
533 if len(line) > 0 and line[0] == '@':
541 """Metadata for a single file (or directory or...) from a snapshot."""
543 # Functions for parsing various datatypes that can appear in a metadata log
547 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
548 if s.startswith("0x"):
550 elif s.startswith("0"):
557 """Decode a URI-encoded (%xx escapes) string."""
562 """An unecoded string."""
567 """Decode a user/group to a tuple of uid/gid followed by name."""
569 uid = MetadataItem.decode_int(items[0])
572 if items[1].startswith("(") and items[1].endswith(")"):
573 name = MetadataItem.decode_str(items[1][1:-1])
577 def decode_device(s):
578 """Decode a device major/minor number."""
579 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
580 return (major, minor)
584 def __init__(self, fields, object_store):
585 """Initialize from a dictionary of key/value pairs from metadata log."""
588 self.object_store = object_store
590 self.items = self.Items()
591 for (k, v) in fields.items():
592 if k in self.field_types:
593 decoder = self.field_types[k]
594 setattr(self.items, k, decoder(v))
598 """Return an iterator for the data blocks that make up a file."""
600 # This traverses the list of blocks that make up a file, following
601 # indirect references. It is implemented in much the same way as
602 # read_metadata, so see that function for details of the technique.
604 objects = self.fields['data'].split()
608 def follow_ref(refstr):
609 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
610 objects = self.object_store.get(refstr).split()
612 stack.append(objects)
614 while len(stack) > 0:
621 # An indirect reference which we must follow?
622 if len(ref) > 0 and ref[0] == '@':
627 # Description of fields that might appear, and how they should be parsed.
628 MetadataItem.field_types = {
629 'name': MetadataItem.decode_str,
630 'type': MetadataItem.raw_str,
631 'mode': MetadataItem.decode_int,
632 'device': MetadataItem.decode_device,
633 'user': MetadataItem.decode_user,
634 'group': MetadataItem.decode_user,
635 'ctime': MetadataItem.decode_int,
636 'mtime': MetadataItem.decode_int,
637 'links': MetadataItem.decode_int,
638 'inode': MetadataItem.raw_str,
639 'checksum': MetadataItem.decode_str,
640 'size': MetadataItem.decode_int,
641 'contents': MetadataItem.decode_str,
642 'target': MetadataItem.decode_str,
645 def iterate_metadata(object_store, root):
646 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
647 yield MetadataItem(d, object_store)
650 """Access to the local database of snapshot contents and object checksums.
652 The local database is consulted when creating a snapshot to determine what
653 data can be re-used from old snapshots. Segment cleaning is performed by
654 manipulating the data in the local database; the local database also
655 includes enough data to guide the segment cleaning process.
658 def __init__(self, path, dbname="localdb.sqlite"):
659 self.db_connection = sqlite3.connect(path + "/" + dbname)
661 # Low-level database access. Use these methods when there isn't a
662 # higher-level interface available. Exception: do, however, remember to
663 # use the commit() method after making changes to make sure they are
664 # actually saved, even when going through higher-level interfaces.
666 "Commit any pending changes to the local database."
667 self.db_connection.commit()
670 "Roll back any pending changes to the local database."
671 self.db_connection.rollback()
674 "Return a DB-API cursor for directly accessing the local database."
675 return self.db_connection.cursor()
677 def list_schemes(self):
678 """Return the list of snapshots found in the local database.
680 The returned value is a list of tuples (id, scheme, name, time, intent).
684 cur.execute("select distinct scheme from snapshots")
685 schemes = [row[0] for row in cur.fetchall()]
689 def list_snapshots(self, scheme):
690 """Return a list of snapshots for the given scheme."""
692 cur.execute("select name from snapshots")
693 snapshots = [row[0] for row in cur.fetchall()]
697 def delete_snapshot(self, scheme, name):
698 """Remove the specified snapshot from the database.
700 Warning: This does not garbage collect all dependent data in the
701 database, so it must be followed by a call to garbage_collect() to make
702 the database consistent.
705 cur.execute("delete from snapshots where scheme = ? and name = ?",
708 def prune_old_snapshots(self, scheme, intent=1.0):
709 """Delete entries from old snapshots from the database.
711 Only snapshots with the specified scheme name will be deleted. If
712 intent is given, it gives the intended next snapshot type, to determine
713 how aggressively to clean (for example, intent=7 could be used if the
714 next snapshot will be a weekly snapshot).
719 # Find the id of the last snapshot to be created. This is used for
720 # measuring time in a way: we record this value in each segment we
721 # expire on this run, and then on a future run can tell if there have
722 # been intervening backups made.
723 cur.execute("select max(snapshotid) from snapshots")
724 last_snapshotid = cur.fetchone()[0]
726 # Get the list of old snapshots for this scheme. Delete all the old
727 # ones. Rules for what to keep:
728 # - Always keep the most recent snapshot.
729 # - If snapshot X is younger than Y, and X has higher intent, then Y
731 cur.execute("""select snapshotid, name, intent,
732 julianday('now') - timestamp as age
733 from snapshots where scheme = ?
734 order by age""", (scheme,))
738 for (id, name, snap_intent, snap_age) in cur.fetchall():
740 if snap_intent < max_intent:
741 # Delete small-intent snapshots if there is a more recent
742 # large-intent snapshot.
744 elif snap_intent == intent:
745 # Delete previous snapshots with the specified intent level.
748 if can_delete and not first:
749 print("Delete snapshot %d (%s)" % (id, name))
750 cur.execute("delete from snapshots where snapshotid = ?",
753 max_intent = max(max_intent, snap_intent)
755 self.garbage_collect()
757 def garbage_collect(self):
758 """Garbage-collect unreachable segment and object data.
760 Remove all segments and checksums which is not reachable from the
761 current set of snapshots stored in the local database.
765 # Delete entries in the segment_utilization table which are for
766 # non-existent snapshots.
767 cur.execute("""delete from segment_utilization
768 where snapshotid not in
769 (select snapshotid from snapshots)""")
771 # Delete segments not referenced by any current snapshots.
772 cur.execute("""delete from segments where segmentid not in
773 (select segmentid from segment_utilization)""")
775 # Delete dangling objects in the block_index table.
776 cur.execute("""delete from block_index
777 where segmentid not in
778 (select segmentid from segments)""")
780 # Remove sub-block signatures for deleted objects.
781 cur.execute("""delete from subblock_signatures
783 (select blockid from block_index)""")
786 class SegmentInfo(Struct): pass
788 def get_segment_cleaning_list(self, age_boost=0.0):
789 """Return a list of all current segments with information for cleaning.
791 Return all segments which are currently known in the local database
792 (there might be other, older segments in the archive itself), and
793 return usage statistics for each to help decide which segments to
796 The returned list will be sorted by estimated cleaning benefit, with
797 segments that are best to clean at the start of the list.
799 If specified, the age_boost parameter (measured in days) will added to
800 the age of each segment, as a way of adjusting the benefit computation
801 before a long-lived snapshot is taken (for example, age_boost might be
802 set to 7 when cleaning prior to taking a weekly snapshot).
807 cur.execute("""select segmentid, used, size, mtime,
808 julianday('now') - mtime as age from segment_info
809 where expire_time is null""")
811 info = self.SegmentInfo()
813 info.used_bytes = row[1]
814 info.size_bytes = row[2]
816 info.age_days = row[4]
818 # If data is not available for whatever reason, treat it as 0.0.
819 if info.age_days is None:
821 if info.used_bytes is None:
822 info.used_bytes = 0.0
824 # Benefit calculation: u is the estimated fraction of each segment
825 # which is utilized (bytes belonging to objects still in use
826 # divided by total size; this doesn't take compression or storage
827 # overhead into account, but should give a reasonable estimate).
829 # The total benefit is a heuristic that combines several factors:
830 # the amount of space that can be reclaimed (1 - u), an ageing
831 # factor (info.age_days) that favors cleaning old segments to young
832 # ones and also is more likely to clean segments that will be
833 # rewritten for long-lived snapshots (age_boost), and finally a
834 # penalty factor for the cost of re-uploading data (u + 0.1).
835 u = info.used_bytes / info.size_bytes
836 info.cleaning_benefit \
837 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
839 segments.append(info)
841 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
844 def mark_segment_expired(self, segment):
845 """Mark a segment for cleaning in the local database.
847 The segment parameter should be either a SegmentInfo object or an
848 integer segment id. Objects in the given segment will be marked as
849 expired, which means that any future snapshots that would re-use those
850 objects will instead write out a new copy of the object, and thus no
851 future snapshots will depend upon the given segment.
854 if isinstance(segment, int):
856 elif isinstance(segment, self.SegmentInfo):
859 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
862 cur.execute("select max(snapshotid) from snapshots")
863 last_snapshotid = cur.fetchone()[0]
864 cur.execute("update segments set expire_time = ? where segmentid = ?",
865 (last_snapshotid, id))
866 cur.execute("update block_index set expired = 0 where segmentid = ?",
869 def balance_expired_objects(self):
870 """Analyze expired objects in segments to be cleaned and group by age.
872 Update the block_index table of the local database to group expired
873 objects by age. The exact number of buckets and the cutoffs for each
874 are dynamically determined. Calling this function after marking
875 segments expired will help in the segment cleaning process, by ensuring
876 that when active objects from clean segments are rewritten, they will
877 be placed into new segments roughly grouped by age.
880 # The expired column of the block_index table is used when generating a
881 # new Cumulus snapshot. A null value indicates that an object may be
882 # re-used. Otherwise, an object must be written into a new segment if
883 # needed. Objects with distinct expired values will be written into
884 # distinct segments, to allow for some grouping by age. The value 0 is
885 # somewhat special in that it indicates any rewritten objects can be
886 # placed in the same segment as completely new objects; this can be
887 # used for very young objects which have been expired, or objects not
888 # expected to be encountered.
890 # In the balancing process, all objects which are not used in any
891 # current snapshots will have expired set to 0. Objects which have
892 # been seen will be sorted by age and will have expired values set to
893 # 0, 1, 2, and so on based on age (with younger objects being assigned
894 # lower values). The number of buckets and the age cutoffs is
895 # determined by looking at the distribution of block ages.
899 # Mark all expired objects with expired = 0; these objects will later
900 # have values set to indicate groupings of objects when repacking.
901 cur.execute("""update block_index set expired = 0
902 where expired is not null""")
904 # We will want to aim for at least one full segment for each bucket
905 # that we eventually create, but don't know how many bytes that should
906 # be due to compression. So compute the average number of bytes in
907 # each expired segment as a rough estimate for the minimum size of each
908 # bucket. (This estimate could be thrown off by many not-fully-packed
909 # segments, but for now don't worry too much about that.) If we can't
910 # compute an average, it's probably because there are no expired
911 # segments, so we have no more work to do.
912 cur.execute("""select avg(size) from segments
914 (select distinct segmentid from block_index
915 where expired is not null)""")
916 segment_size_estimate = cur.fetchone()[0]
917 if not segment_size_estimate:
920 # Next, extract distribution of expired objects (number and size) by
921 # age. Save the timestamp for "now" so that the classification of
922 # blocks into age buckets will not change later in the function, after
923 # time has passed. Set any timestamps in the future to now, so we are
924 # guaranteed that for the rest of this function, age is always
926 cur.execute("select julianday('now')")
927 now = cur.fetchone()[0]
929 cur.execute("""update block_index set timestamp = ?
930 where timestamp > ? and expired is not null""",
933 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
934 from block_index where expired = 0
935 group by age order by age""", (now,))
936 distribution = cur.fetchall()
938 # Start to determine the buckets for expired objects. Heuristics used:
939 # - An upper bound on the number of buckets is given by the number of
940 # segments we estimate it will take to store all data. In fact,
941 # aim for a couple of segments per bucket.
942 # - Place very young objects in bucket 0 (place with new objects)
943 # unless there are enough of them to warrant a separate bucket.
944 # - Try not to create unnecessarily many buckets, since fewer buckets
945 # will allow repacked data to be grouped based on spatial locality
946 # (while more buckets will group by temporal locality). We want a
949 total_bytes = sum([i[2] for i in distribution])
950 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
951 min_size = 1.5 * segment_size_estimate
952 target_size = max(2 * segment_size_estimate,
953 total_bytes / target_buckets)
955 print("segment_size:", segment_size_estimate)
956 print("distribution:", distribution)
957 print("total_bytes:", total_bytes)
958 print("target_buckets:", target_buckets)
959 print("min, target size:", min_size, target_size)
961 # Chosen cutoffs. Each bucket consists of objects with age greater
962 # than one cutoff value, but not greater than the next largest cutoff.
965 # Starting with the oldest objects, begin grouping together into
966 # buckets of size at least target_size bytes.
967 distribution.reverse()
969 min_age_bucket = False
970 for (age, items, size) in distribution:
971 if bucket_size >= target_size \
972 or (age < MIN_AGE and not min_age_bucket):
973 if bucket_size < target_size and len(cutoffs) > 0:
980 min_age_bucket = True
982 # The last (youngest) bucket will be group 0, unless it has enough data
983 # to be of size min_size by itself, or there happen to be no objects
984 # less than MIN_AGE at all.
985 if bucket_size >= min_size or not min_age_bucket:
989 print("cutoffs:", cutoffs)
991 # Update the database to assign each object to the appropriate bucket.
993 for i in range(len(cutoffs)):
994 cur.execute("""update block_index set expired = ?
995 where round(? - timestamp) > ?
996 and expired is not null""",
997 (i, now, cutoffs[i]))