1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 """High-level interface for working with Cumulus archives.
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23 - listing the snapshots and segments present
24 - reading segment contents
25 - parsing snapshot descriptors and snapshot metadata logs
26 - reading and maintaining the local object database
29 from __future__ import division, print_function, unicode_literals
44 import thread as _thread
47 import cumulus.store.file
50 StringTypes = (str, unicode)
54 # The largest supported snapshot format that can be understood.
55 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
57 # Maximum number of nested indirect references allowed in a snapshot.
58 MAX_RECURSION_DEPTH = 3
60 # All segments which have been accessed this session.
61 accessed_segments = set()
63 # Table of methods used to filter segments before storage, and corresponding
64 # filename extensions. These are listed in priority order (methods earlier in
65 # the list are tried first).
67 (".gpg", "cumulus-filter-gpg --decrypt"),
69 (".bz2", "bzip2 -dc"),
74 """Decode binary data from a file into a sequence of lines.
76 Newline markers are retained."""
77 return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
80 """Decode a URI-encoded (%xx escapes) string."""
81 def hex_decode(m): return chr(int(m.group(1), 16))
82 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
84 """Encode a string to URI-encoded (%xx escapes) form."""
86 if c > '+' and c < '\x7f' and c != '@':
89 return "%%%02x" % (ord(c),)
90 return ''.join(hex_encode(c) for c in s)
93 """A class which merely acts as a data container.
95 Instances of this class (or its subclasses) are merely used to store data
96 in various attributes. No methods are provided.
100 return "<%s %s>" % (self.__class__, self.__dict__)
102 CHECKSUM_ALGORITHMS = {
103 'sha1': hashlib.sha1,
104 'sha224': hashlib.sha224,
105 'sha256': hashlib.sha256,
108 class ChecksumCreator:
109 """Compute a Cumulus checksum for provided data.
111 The algorithm used is selectable, but currently defaults to sha1.
114 def __init__(self, algorithm='sha1'):
115 self.algorithm = algorithm
116 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
118 def update(self, data):
119 self.hash.update(data)
123 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
125 class ChecksumVerifier:
126 """Verify whether a checksum from a snapshot matches the supplied data."""
128 def __init__(self, checksumstr):
129 """Create an object to check the supplied checksum."""
131 (algo, checksum) = checksumstr.split("=", 1)
132 self.checksum = checksum
133 self.hash = CHECKSUM_ALGORITHMS[algo]()
135 def update(self, data):
136 self.hash.update(data)
139 """Return a boolean indicating whether the checksum matches."""
141 result = self.hash.hexdigest()
142 return result == self.checksum
144 class SearchPathEntry(object):
145 """Item representing a possible search location for Cumulus files.
147 Some Cumulus files might be stored in multiple possible file locations: due
148 to format (different compression mechanisms with different extensions),
149 locality (different segments might be placed in different directories to
150 control archiving policies), for backwards compatibility (default location
151 changed over time). A SearchPathEntry describes a possible location for a
154 def __init__(self, directory_prefix, suffix, context=None):
155 self._directory_prefix = directory_prefix
156 self._suffix = suffix
157 self._context = context
160 return "%s(%r, %r, %r)" % (self.__class__.__name__,
161 self._directory_prefix, self._suffix,
164 def build_path(self, basename):
165 """Construct the search path to use for a file with name basename.
167 Returns a tuple (pathname, context), where pathname is the path to try
168 and context is any additional data associated with this search entry
171 return (os.path.join(self._directory_prefix, basename + self._suffix),
174 class SearchPath(object):
175 """A collection of locations to search for files and lookup utilities.
177 For looking for a file in a Cumulus storage backend, a SearchPath object
178 contains a list of possible locations to try. A SearchPath can be used to
179 perform the search as well; when a file is found the search path ordering
180 is updated (moving the successful SearchPathEntry to the front of the list
181 for future searches).
183 def __init__(self, name_regex, searchpath):
184 self._regex = re.compile(name_regex)
185 self._path = list(searchpath)
187 def add_search_entry(self, entry):
188 self._path.append(entry)
190 def directories(self):
191 """Return the set of directories to search for a file type."""
192 return set(entry._directory_prefix for entry in self._path)
194 def get(self, backend, basename):
195 for (i, entry) in enumerate(self._path):
197 (pathname, context) = entry.build_path(basename)
198 fp = backend.get(pathname)
199 # On success, move this entry to the front of the search path
200 # to speed future searches.
203 self._path.insert(0, entry)
204 return (fp, pathname, context)
205 except cumulus.store.NotFoundError:
207 raise cumulus.store.NotFoundError(basename)
209 def stat(self, backend, basename):
210 for (i, entry) in enumerate(self._path):
212 (pathname, context) = entry.build_path(basename)
213 stat_data = backend.stat(pathname)
214 # On success, move this entry to the front of the search path
215 # to speed future searches.
218 self._path.insert(0, entry)
219 result = {"path": pathname}
220 result.update(stat_data)
222 except cumulus.store.NotFoundError:
224 raise cumulus.store.NotFoundError(basename)
226 def match(self, filename):
227 return self._regex.match(filename)
229 def list(self, backend):
231 for d in self.directories():
233 for f in backend.list(d):
236 if m: yield (os.path.join(d, f), m)
237 except cumulus.store.NotFoundError:
240 raise cumulus.store.NotFoundError(backend)
242 def _build_segments_searchpath(prefix):
243 for (extension, filter) in SEGMENT_FILTERS:
244 yield SearchPathEntry(prefix, extension, filter)
247 "checksums": SearchPath(
248 r"^snapshot-(.*)\.(\w+)sums$",
249 [SearchPathEntry("meta", ".sha1sums"),
250 SearchPathEntry("checksums", ".sha1sums"),
251 SearchPathEntry("", ".sha1sums")]),
253 r"^snapshot-(.*)\.meta(\.\S+)?$",
254 _build_segments_searchpath("meta")),
255 "segments": SearchPath(
256 (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
259 _build_segments_searchpath("segments0"),
260 _build_segments_searchpath("segments1"),
261 _build_segments_searchpath(""),
262 _build_segments_searchpath("segments"))),
263 "snapshots": SearchPath(
264 r"^snapshot-(.*)\.(cumulus|lbs)$",
265 [SearchPathEntry("snapshots", ".cumulus"),
266 SearchPathEntry("snapshots", ".lbs"),
267 SearchPathEntry("", ".cumulus"),
268 SearchPathEntry("", ".lbs")]),
271 class BackendWrapper(object):
272 """Wrapper around a Cumulus storage backend that understands file types.
274 The BackendWrapper class understands different Cumulus file types, such as
275 snapshots and segments, and implements higher-level operations such as
276 "retrieve a snapshot with a specific name" (hiding operations such as
277 searching for the correct file name).
280 def __init__(self, backend):
281 """Initializes a wrapper around the specified storage backend.
283 store may either be a Store object or URL.
285 if type(backend) in StringTypes:
286 self._backend = cumulus.store.open(backend)
288 self._backend = backend
291 def raw_backend(self):
294 def stat_generic(self, basename, filetype):
295 return SEARCH_PATHS[filetype].stat(self._backend, basename)
297 def open_generic(self, basename, filetype):
298 return SEARCH_PATHS[filetype].get(self._backend, basename)
300 def open_snapshot(self, name):
301 return self.open_generic("snapshot-" + name, "snapshots")
303 def open_segment(self, name):
304 return self.open_generic(name + ".tar", "segments")
306 def list_generic(self, filetype):
307 return ((x[1].group(1), x[0])
308 for x in SEARCH_PATHS[filetype].list(self._backend))
310 def prefetch_generic(self):
311 """Calls scan on directories to prefetch file metadata."""
313 for typeinfo in SEARCH_PATHS.values():
314 directories.update(typeinfo.directories())
315 for d in directories:
317 self._backend.scan(d)
320 def __init__(self, backend):
321 if isinstance(backend, BackendWrapper):
322 self.backend = backend
324 self.backend = BackendWrapper(backend)
329 def get_cachedir(self):
330 if self.cachedir is None:
331 self.cachedir = tempfile.mkdtemp("-cumulus")
335 if self.cachedir is not None:
336 # TODO: Avoid use of system, make this safer
337 os.system("rm -rf " + self.cachedir)
341 def parse_ref(refstr):
342 m = re.match(r"^zero\[(\d+)\]$", refstr)
344 return ("zero", None, None, (0, int(m.group(1)), False))
346 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
351 checksum = m.group(3)
354 if checksum is not None:
355 checksum = checksum.lstrip("(").rstrip(")")
357 if slice is not None:
358 if m.group(9) is not None:
359 # Size-assertion slice
360 slice = (0, int(m.group(9)), True)
361 elif m.group(6) is None:
363 slice = (0, int(m.group(8)), False)
365 slice = (int(m.group(7)), int(m.group(8)), False)
367 return (segment, object, checksum, slice)
369 def list_snapshots(self):
370 return set(x[0] for x in self.backend.list_generic("snapshots"))
372 def list_segments(self):
373 return set(x[0] for x in self.backend.list_generic("segments"))
375 def load_snapshot(self, snapshot):
376 snapshot_file = self.backend.open_snapshot(snapshot)[0]
377 return to_lines(snapshot_file.read())
380 def filter_data(filehandle, filter_cmd):
381 if filter_cmd is None:
383 p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
384 stdout=subprocess.PIPE, close_fds=True)
385 input, output = p.stdin, p.stdout
386 def copy_thread(src, dst):
389 block = src.read(BLOCK_SIZE)
390 if len(block) == 0: break
395 _thread.start_new_thread(copy_thread, (filehandle, input))
398 def get_segment(self, segment):
399 accessed_segments.add(segment)
401 (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
402 return self.filter_data(segment_fp, filter_cmd)
404 def load_segment(self, segment):
405 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
407 data_obj = seg.extractfile(item)
408 path = item.name.split('/')
409 if len(path) == 2 and path[0] == segment:
410 yield (path[1], data_obj.read())
412 def extract_segment(self, segment):
413 segdir = os.path.join(self.get_cachedir(), segment)
415 for (object, data) in self.load_segment(segment):
416 f = open(os.path.join(segdir, object), 'wb')
420 def load_object(self, segment, object):
421 accessed_segments.add(segment)
422 path = os.path.join(self.get_cachedir(), segment, object)
423 if not os.access(path, os.R_OK):
424 self.extract_segment(segment)
425 if segment in self._lru_list: self._lru_list.remove(segment)
426 self._lru_list.append(segment)
427 while len(self._lru_list) > self.CACHE_SIZE:
428 os.system("rm -rf " + os.path.join(self.cachedir,
430 self._lru_list = self._lru_list[1:]
431 return open(path, 'rb').read()
433 def get(self, refstr):
434 """Fetch the given object and return it.
436 The input should be an object reference, in string form.
439 (segment, object, checksum, slice) = self.parse_ref(refstr)
441 if segment == "zero":
442 return "\0" * slice[1]
444 data = self.load_object(segment, object)
446 if checksum is not None:
447 verifier = ChecksumVerifier(checksum)
448 verifier.update(data)
449 if not verifier.valid():
452 if slice is not None:
453 (start, length, exact) = slice
454 if exact and len(data) != length: raise ValueError
455 data = data[start:start+length]
456 if len(data) != length: raise IndexError
461 self.backend.prefetch_generic()
463 def parse(lines, terminate=None):
464 """Generic parser for RFC822-style "Key: Value" data streams.
466 This parser can be used to read metadata logs and snapshot root descriptor
469 lines must be an iterable object which yields a sequence of lines of input.
471 If terminate is specified, it is used as a predicate to determine when to
472 stop reading input lines.
479 # Strip off a trailing newline, if present
480 if len(l) > 0 and l[-1] == "\n":
483 if terminate is not None and terminate(l):
484 if len(dict) > 0: yield dict
489 m = re.match(r"^([-\w]+):\s*(.*)$", l)
491 dict[m.group(1)] = m.group(2)
492 last_key = m.group(1)
493 elif len(l) > 0 and l[0].isspace() and last_key is not None:
498 if len(dict) > 0: yield dict
500 def parse_full(lines):
502 return next(parse(lines))
503 except StopIteration:
506 def parse_metadata_version(s):
507 """Convert a string with the snapshot version format to a tuple."""
509 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
513 return tuple([int(d) for d in m.group(1).split(".")])
515 def read_metadata(object_store, root):
516 """Iterate through all lines in the metadata log, following references."""
518 # Stack for keeping track of recursion when following references to
519 # portions of the log. The last entry in the stack corresponds to the
520 # object currently being parsed. Each entry is a list of lines which have
521 # been reversed, so that popping successive lines from the end of each list
522 # will return lines of the metadata log in order.
525 def follow_ref(refstr):
526 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
527 lines = to_lines(object_store.get(refstr))
533 while len(stack) > 0:
540 # An indirect reference which we must follow?
541 if len(line) > 0 and line[0] == '@':
549 """Metadata for a single file (or directory or...) from a snapshot."""
551 # Functions for parsing various datatypes that can appear in a metadata log
555 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
556 if s.startswith("0x"):
558 elif s.startswith("0"):
565 """Decode a URI-encoded (%xx escapes) string."""
570 """An unecoded string."""
575 """Decode a user/group to a tuple of uid/gid followed by name."""
577 uid = MetadataItem.decode_int(items[0])
580 if items[1].startswith("(") and items[1].endswith(")"):
581 name = MetadataItem.decode_str(items[1][1:-1])
585 def decode_device(s):
586 """Decode a device major/minor number."""
587 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
588 return (major, minor)
592 def __init__(self, fields, object_store):
593 """Initialize from a dictionary of key/value pairs from metadata log."""
596 self.object_store = object_store
598 self.items = self.Items()
599 for (k, v) in fields.items():
600 if k in self.field_types:
601 decoder = self.field_types[k]
602 setattr(self.items, k, decoder(v))
606 """Return an iterator for the data blocks that make up a file."""
608 # This traverses the list of blocks that make up a file, following
609 # indirect references. It is implemented in much the same way as
610 # read_metadata, so see that function for details of the technique.
612 objects = self.fields['data'].split()
616 def follow_ref(refstr):
617 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
618 objects = self.object_store.get(refstr).split()
620 stack.append(objects)
622 while len(stack) > 0:
629 # An indirect reference which we must follow?
630 if len(ref) > 0 and ref[0] == '@':
635 # Description of fields that might appear, and how they should be parsed.
636 MetadataItem.field_types = {
637 'name': MetadataItem.decode_str,
638 'type': MetadataItem.raw_str,
639 'mode': MetadataItem.decode_int,
640 'device': MetadataItem.decode_device,
641 'user': MetadataItem.decode_user,
642 'group': MetadataItem.decode_user,
643 'ctime': MetadataItem.decode_int,
644 'mtime': MetadataItem.decode_int,
645 'links': MetadataItem.decode_int,
646 'inode': MetadataItem.raw_str,
647 'checksum': MetadataItem.decode_str,
648 'size': MetadataItem.decode_int,
649 'contents': MetadataItem.decode_str,
650 'target': MetadataItem.decode_str,
653 def iterate_metadata(object_store, root):
654 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
655 yield MetadataItem(d, object_store)
658 """Access to the local database of snapshot contents and object checksums.
660 The local database is consulted when creating a snapshot to determine what
661 data can be re-used from old snapshots. Segment cleaning is performed by
662 manipulating the data in the local database; the local database also
663 includes enough data to guide the segment cleaning process.
666 def __init__(self, path, dbname="localdb.sqlite"):
667 self.db_connection = sqlite3.connect(path + "/" + dbname)
669 # Low-level database access. Use these methods when there isn't a
670 # higher-level interface available. Exception: do, however, remember to
671 # use the commit() method after making changes to make sure they are
672 # actually saved, even when going through higher-level interfaces.
674 "Commit any pending changes to the local database."
675 self.db_connection.commit()
678 "Roll back any pending changes to the local database."
679 self.db_connection.rollback()
682 "Return a DB-API cursor for directly accessing the local database."
683 return self.db_connection.cursor()
685 def list_schemes(self):
686 """Return the list of snapshots found in the local database.
688 The returned value is a list of tuples (id, scheme, name, time, intent).
692 cur.execute("select distinct scheme from snapshots")
693 schemes = [row[0] for row in cur.fetchall()]
697 def list_snapshots(self, scheme):
698 """Return a list of snapshots for the given scheme."""
700 cur.execute("select name from snapshots")
701 snapshots = [row[0] for row in cur.fetchall()]
705 def delete_snapshot(self, scheme, name):
706 """Remove the specified snapshot from the database.
708 Warning: This does not garbage collect all dependent data in the
709 database, so it must be followed by a call to garbage_collect() to make
710 the database consistent.
713 cur.execute("delete from snapshots where scheme = ? and name = ?",
716 def prune_old_snapshots(self, scheme, intent=1.0):
717 """Delete entries from old snapshots from the database.
719 Only snapshots with the specified scheme name will be deleted. If
720 intent is given, it gives the intended next snapshot type, to determine
721 how aggressively to clean (for example, intent=7 could be used if the
722 next snapshot will be a weekly snapshot).
727 # Find the id of the last snapshot to be created. This is used for
728 # measuring time in a way: we record this value in each segment we
729 # expire on this run, and then on a future run can tell if there have
730 # been intervening backups made.
731 cur.execute("select max(snapshotid) from snapshots")
732 last_snapshotid = cur.fetchone()[0]
734 # Get the list of old snapshots for this scheme. Delete all the old
735 # ones. Rules for what to keep:
736 # - Always keep the most recent snapshot.
737 # - If snapshot X is younger than Y, and X has higher intent, then Y
739 cur.execute("""select snapshotid, name, intent,
740 julianday('now') - timestamp as age
741 from snapshots where scheme = ?
742 order by age""", (scheme,))
746 for (id, name, snap_intent, snap_age) in cur.fetchall():
748 if snap_intent < max_intent:
749 # Delete small-intent snapshots if there is a more recent
750 # large-intent snapshot.
752 elif snap_intent == intent:
753 # Delete previous snapshots with the specified intent level.
756 if can_delete and not first:
757 print("Delete snapshot %d (%s)" % (id, name))
758 cur.execute("delete from snapshots where snapshotid = ?",
761 max_intent = max(max_intent, snap_intent)
763 self.garbage_collect()
765 def garbage_collect(self):
766 """Garbage-collect unreachable segment and object data.
768 Remove all segments and checksums which is not reachable from the
769 current set of snapshots stored in the local database.
773 # Delete entries in the segment_utilization table which are for
774 # non-existent snapshots.
775 cur.execute("""delete from segment_utilization
776 where snapshotid not in
777 (select snapshotid from snapshots)""")
779 # Delete segments not referenced by any current snapshots.
780 cur.execute("""delete from segments where segmentid not in
781 (select segmentid from segment_utilization)""")
783 # Delete dangling objects in the block_index table.
784 cur.execute("""delete from block_index
785 where segmentid not in
786 (select segmentid from segments)""")
788 # Remove sub-block signatures for deleted objects.
789 cur.execute("""delete from subblock_signatures
791 (select blockid from block_index)""")
794 class SegmentInfo(Struct): pass
796 def get_segment_cleaning_list(self, age_boost=0.0):
797 """Return a list of all current segments with information for cleaning.
799 Return all segments which are currently known in the local database
800 (there might be other, older segments in the archive itself), and
801 return usage statistics for each to help decide which segments to
804 The returned list will be sorted by estimated cleaning benefit, with
805 segments that are best to clean at the start of the list.
807 If specified, the age_boost parameter (measured in days) will added to
808 the age of each segment, as a way of adjusting the benefit computation
809 before a long-lived snapshot is taken (for example, age_boost might be
810 set to 7 when cleaning prior to taking a weekly snapshot).
815 cur.execute("""select segmentid, used, size, mtime,
816 julianday('now') - mtime as age from segment_info
817 where expire_time is null""")
819 info = self.SegmentInfo()
821 info.used_bytes = row[1]
822 info.size_bytes = row[2]
824 info.age_days = row[4]
826 # If data is not available for whatever reason, treat it as 0.0.
827 if info.age_days is None:
829 if info.used_bytes is None:
830 info.used_bytes = 0.0
832 # Benefit calculation: u is the estimated fraction of each segment
833 # which is utilized (bytes belonging to objects still in use
834 # divided by total size; this doesn't take compression or storage
835 # overhead into account, but should give a reasonable estimate).
837 # The total benefit is a heuristic that combines several factors:
838 # the amount of space that can be reclaimed (1 - u), an ageing
839 # factor (info.age_days) that favors cleaning old segments to young
840 # ones and also is more likely to clean segments that will be
841 # rewritten for long-lived snapshots (age_boost), and finally a
842 # penalty factor for the cost of re-uploading data (u + 0.1).
843 u = info.used_bytes / info.size_bytes
844 info.cleaning_benefit \
845 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
847 segments.append(info)
849 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
852 def mark_segment_expired(self, segment):
853 """Mark a segment for cleaning in the local database.
855 The segment parameter should be either a SegmentInfo object or an
856 integer segment id. Objects in the given segment will be marked as
857 expired, which means that any future snapshots that would re-use those
858 objects will instead write out a new copy of the object, and thus no
859 future snapshots will depend upon the given segment.
862 if isinstance(segment, int):
864 elif isinstance(segment, self.SegmentInfo):
867 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
870 cur.execute("select max(snapshotid) from snapshots")
871 last_snapshotid = cur.fetchone()[0]
872 cur.execute("update segments set expire_time = ? where segmentid = ?",
873 (last_snapshotid, id))
874 cur.execute("update block_index set expired = 0 where segmentid = ?",
877 def balance_expired_objects(self):
878 """Analyze expired objects in segments to be cleaned and group by age.
880 Update the block_index table of the local database to group expired
881 objects by age. The exact number of buckets and the cutoffs for each
882 are dynamically determined. Calling this function after marking
883 segments expired will help in the segment cleaning process, by ensuring
884 that when active objects from clean segments are rewritten, they will
885 be placed into new segments roughly grouped by age.
888 # The expired column of the block_index table is used when generating a
889 # new Cumulus snapshot. A null value indicates that an object may be
890 # re-used. Otherwise, an object must be written into a new segment if
891 # needed. Objects with distinct expired values will be written into
892 # distinct segments, to allow for some grouping by age. The value 0 is
893 # somewhat special in that it indicates any rewritten objects can be
894 # placed in the same segment as completely new objects; this can be
895 # used for very young objects which have been expired, or objects not
896 # expected to be encountered.
898 # In the balancing process, all objects which are not used in any
899 # current snapshots will have expired set to 0. Objects which have
900 # been seen will be sorted by age and will have expired values set to
901 # 0, 1, 2, and so on based on age (with younger objects being assigned
902 # lower values). The number of buckets and the age cutoffs is
903 # determined by looking at the distribution of block ages.
907 # Mark all expired objects with expired = 0; these objects will later
908 # have values set to indicate groupings of objects when repacking.
909 cur.execute("""update block_index set expired = 0
910 where expired is not null""")
912 # We will want to aim for at least one full segment for each bucket
913 # that we eventually create, but don't know how many bytes that should
914 # be due to compression. So compute the average number of bytes in
915 # each expired segment as a rough estimate for the minimum size of each
916 # bucket. (This estimate could be thrown off by many not-fully-packed
917 # segments, but for now don't worry too much about that.) If we can't
918 # compute an average, it's probably because there are no expired
919 # segments, so we have no more work to do.
920 cur.execute("""select avg(size) from segments
922 (select distinct segmentid from block_index
923 where expired is not null)""")
924 segment_size_estimate = cur.fetchone()[0]
925 if not segment_size_estimate:
928 # Next, extract distribution of expired objects (number and size) by
929 # age. Save the timestamp for "now" so that the classification of
930 # blocks into age buckets will not change later in the function, after
931 # time has passed. Set any timestamps in the future to now, so we are
932 # guaranteed that for the rest of this function, age is always
934 cur.execute("select julianday('now')")
935 now = cur.fetchone()[0]
937 cur.execute("""update block_index set timestamp = ?
938 where timestamp > ? and expired is not null""",
941 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
942 from block_index where expired = 0
943 group by age order by age""", (now,))
944 distribution = cur.fetchall()
946 # Start to determine the buckets for expired objects. Heuristics used:
947 # - An upper bound on the number of buckets is given by the number of
948 # segments we estimate it will take to store all data. In fact,
949 # aim for a couple of segments per bucket.
950 # - Place very young objects in bucket 0 (place with new objects)
951 # unless there are enough of them to warrant a separate bucket.
952 # - Try not to create unnecessarily many buckets, since fewer buckets
953 # will allow repacked data to be grouped based on spatial locality
954 # (while more buckets will group by temporal locality). We want a
957 total_bytes = sum([i[2] for i in distribution])
958 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
959 min_size = 1.5 * segment_size_estimate
960 target_size = max(2 * segment_size_estimate,
961 total_bytes / target_buckets)
963 print("segment_size:", segment_size_estimate)
964 print("distribution:", distribution)
965 print("total_bytes:", total_bytes)
966 print("target_buckets:", target_buckets)
967 print("min, target size:", min_size, target_size)
969 # Chosen cutoffs. Each bucket consists of objects with age greater
970 # than one cutoff value, but not greater than the next largest cutoff.
973 # Starting with the oldest objects, begin grouping together into
974 # buckets of size at least target_size bytes.
975 distribution.reverse()
977 min_age_bucket = False
978 for (age, items, size) in distribution:
979 if bucket_size >= target_size \
980 or (age < MIN_AGE and not min_age_bucket):
981 if bucket_size < target_size and len(cutoffs) > 0:
988 min_age_bucket = True
990 # The last (youngest) bucket will be group 0, unless it has enough data
991 # to be of size min_size by itself, or there happen to be no objects
992 # less than MIN_AGE at all.
993 if bucket_size >= min_size or not min_age_bucket:
997 print("cutoffs:", cutoffs)
999 # Update the database to assign each object to the appropriate bucket.
1001 for i in range(len(cutoffs)):
1002 cur.execute("""update block_index set expired = ?
1003 where round(? - timestamp) > ?
1004 and expired is not null""",
1005 (i, now, cutoffs[i]))