1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 """High-level interface for working with Cumulus archives.
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23 - listing the snapshots and segments present
24 - reading segment contents
25 - parsing snapshot descriptors and snapshot metadata logs
26 - reading and maintaining the local object database
29 from __future__ import division
40 import cumulus.store.file
42 # The largest supported snapshot format that can be understood.
43 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
45 # Maximum number of nested indirect references allowed in a snapshot.
46 MAX_RECURSION_DEPTH = 3
48 # All segments which have been accessed this session.
49 accessed_segments = set()
51 # Table of methods used to filter segments before storage, and corresponding
52 # filename extensions. These are listed in priority order (methods earlier in
53 # the list are tried first).
55 (".gpg", "cumulus-filter-gpg --decrypt"),
57 (".bz2", "bzip2 -dc"),
62 """Decode a URI-encoded (%xx escapes) string."""
63 def hex_decode(m): return chr(int(m.group(1), 16))
64 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
66 """Encode a string to URI-encoded (%xx escapes) form."""
68 if c > '+' and c < '\x7f' and c != '@':
71 return "%%%02x" % (ord(c),)
72 return ''.join(hex_encode(c) for c in s)
75 """A class which merely acts as a data container.
77 Instances of this class (or its subclasses) are merely used to store data
78 in various attributes. No methods are provided.
82 return "<%s %s>" % (self.__class__, self.__dict__)
84 CHECKSUM_ALGORITHMS = {
86 'sha224': hashlib.sha224,
87 'sha256': hashlib.sha256,
90 class ChecksumCreator:
91 """Compute a Cumulus checksum for provided data.
93 The algorithm used is selectable, but currently defaults to sha1.
96 def __init__(self, algorithm='sha1'):
97 self.algorithm = algorithm
98 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
100 def update(self, data):
101 self.hash.update(data)
105 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
107 class ChecksumVerifier:
108 """Verify whether a checksum from a snapshot matches the supplied data."""
110 def __init__(self, checksumstr):
111 """Create an object to check the supplied checksum."""
113 (algo, checksum) = checksumstr.split("=", 1)
114 self.checksum = checksum
115 self.hash = CHECKSUM_ALGORITHMS[algo]()
117 def update(self, data):
118 self.hash.update(data)
121 """Return a boolean indicating whether the checksum matches."""
123 result = self.hash.hexdigest()
124 return result == self.checksum
126 class SearchPathEntry(object):
127 """Item representing a possible search location for Cumulus files.
129 Some Cumulus files might be stored in multiple possible file locations: due
130 to format (different compression mechanisms with different extensions),
131 locality (different segments might be placed in different directories to
132 control archiving policies), for backwards compatibility (default location
133 changed over time). A SearchPathEntry describes a possible location for a
136 def __init__(self, directory_prefix, suffix, context=None):
137 self._directory_prefix = directory_prefix
138 self._suffix = suffix
139 self._context = context
142 return "%s(%r, %r, %r)" % (self.__class__.__name__,
143 self._directory_prefix, self._suffix,
146 def build_path(self, basename):
147 """Construct the search path to use for a file with name basename.
149 Returns a tuple (pathname, context), where pathname is the path to try
150 and context is any additional data associated with this search entry
153 return (os.path.join(self._directory_prefix, basename + self._suffix),
156 class SearchPath(object):
157 """A collection of locations to search for files and lookup utilities.
159 For looking for a file in a Cumulus storage backend, a SearchPath object
160 contains a list of possible locations to try. A SearchPath can be used to
161 perform the search as well; when a file is found the search path ordering
162 is updated (moving the successful SearchPathEntry to the front of the list
163 for future searches).
165 def __init__(self, name_regex, searchpath):
166 self._regex = re.compile(name_regex)
167 self._path = list(searchpath)
169 def add_search_entry(self, entry):
170 self._path.append(entry)
172 def directories(self):
173 """Return the set of directories to search for a file type."""
174 return set(entry._directory_prefix for entry in self._path)
176 def get(self, backend, basename):
177 for (i, entry) in enumerate(self._path):
179 (pathname, context) = entry.build_path(basename)
180 fp = backend.get(pathname)
181 # On success, move this entry to the front of the search path
182 # to speed future searches.
185 self._path.insert(0, entry)
186 return (fp, pathname, context)
187 except cumulus.store.NotFoundError:
189 raise cumulus.store.NotFoundError(basename)
191 def stat(self, backend, basename):
192 for (i, entry) in enumerate(self._path):
194 (pathname, context) = entry.build_path(basename)
195 stat_data = backend.stat(pathname)
196 # On success, move this entry to the front of the search path
197 # to speed future searches.
200 self._path.insert(0, entry)
201 result = {"path": pathname}
202 result.update(stat_data)
204 except cumulus.store.NotFoundError:
206 raise cumulus.store.NotFoundError(basename)
208 def match(self, filename):
209 return self._regex.match(filename)
211 def list(self, backend):
213 for d in self.directories():
215 for f in backend.list(d):
218 if m: yield (os.path.join(d, f), m)
219 except cumulus.store.NotFoundError:
222 raise cumulus.store.NotFoundError(basename)
224 def _build_segments_searchpath(prefix):
225 for (extension, filter) in SEGMENT_FILTERS:
226 yield SearchPathEntry(prefix, extension, filter)
229 "checksums": SearchPath(
230 r"^snapshot-(.*)\.(\w+)sums$",
231 [SearchPathEntry("meta", ".sha1sums"),
232 SearchPathEntry("checksums", ".sha1sums"),
233 SearchPathEntry("", ".sha1sums")]),
234 "segments": SearchPath(
235 (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
238 _build_segments_searchpath("segments0"),
239 _build_segments_searchpath("segments1"),
240 _build_segments_searchpath(""),
241 _build_segments_searchpath("segments"))),
242 "snapshots": SearchPath(
243 r"^snapshot-(.*)\.(cumulus|lbs)$",
244 [SearchPathEntry("snapshots", ".cumulus"),
245 SearchPathEntry("snapshots", ".lbs"),
246 SearchPathEntry("", ".cumulus"),
247 SearchPathEntry("", ".lbs")]),
250 class BackendWrapper(object):
251 """Wrapper around a Cumulus storage backend that understands file types.
253 The BackendWrapper class understands different Cumulus file types, such as
254 snapshots and segments, and implements higher-level operations such as
255 "retrieve a snapshot with a specific name" (hiding operations such as
256 searching for the correct file name).
259 def __init__(self, backend):
260 """Initializes a wrapper around the specified storage backend.
262 store may either be a Store object or URL.
264 if type(backend) in (str, unicode):
265 if backend.find(":") >= 0:
266 self._backend = cumulus.store.open(backend)
268 self._backend = cumulus.store.file.FileStore(backend)
270 self._backend = backend
273 def raw_backend(self):
276 def stat_generic(self, basename, filetype):
277 return SEARCH_PATHS[filetype].stat(self._backend, basename)
279 def open_generic(self, basename, filetype):
280 return SEARCH_PATHS[filetype].get(self._backend, basename)
282 def open_snapshot(self, name):
283 return self.open_generic("snapshot-" + name, "snapshots")
285 def open_segment(self, name):
286 return self.open_generic(name + ".tar", "segments")
288 def list_generic(self, filetype):
289 return ((x[1].group(1), x[0])
290 for x in SEARCH_PATHS[filetype].list(self._backend))
293 def __init__(self, backend):
294 if isinstance(backend, BackendWrapper):
295 self.backend = backend
297 self.backend = BackendWrapper(backend)
302 def get_cachedir(self):
303 if self.cachedir is None:
304 self.cachedir = tempfile.mkdtemp("-cumulus")
308 if self.cachedir is not None:
309 # TODO: Avoid use of system, make this safer
310 os.system("rm -rf " + self.cachedir)
314 def parse_ref(refstr):
315 m = re.match(r"^zero\[(\d+)\]$", refstr)
317 return ("zero", None, None, (0, int(m.group(1)), False))
319 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
324 checksum = m.group(3)
327 if checksum is not None:
328 checksum = checksum.lstrip("(").rstrip(")")
330 if slice is not None:
331 if m.group(9) is not None:
332 # Size-assertion slice
333 slice = (0, int(m.group(9)), True)
334 elif m.group(6) is None:
336 slice = (0, int(m.group(8)), False)
338 slice = (int(m.group(7)), int(m.group(8)), False)
340 return (segment, object, checksum, slice)
342 def list_snapshots(self):
343 return set(x[0] for x in self.backend.list_generic("snapshots"))
345 def list_segments(self):
346 return set(x[0] for x in self.backend.list_generic("segments"))
348 def load_snapshot(self, snapshot):
349 snapshot_file = self.backend.open_snapshot(snapshot)[0]
350 return snapshot_file.read().splitlines(True)
353 def filter_data(filehandle, filter_cmd):
354 if filter_cmd is None:
356 (input, output) = os.popen2(filter_cmd)
357 def copy_thread(src, dst):
360 block = src.read(BLOCK_SIZE)
361 if len(block) == 0: break
365 thread.start_new_thread(copy_thread, (filehandle, input))
368 def get_segment(self, segment):
369 accessed_segments.add(segment)
371 (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
372 return self.filter_data(segment_fp, filter_cmd)
374 def load_segment(self, segment):
375 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
377 data_obj = seg.extractfile(item)
378 path = item.name.split('/')
379 if len(path) == 2 and path[0] == segment:
380 yield (path[1], data_obj.read())
382 def extract_segment(self, segment):
383 segdir = os.path.join(self.get_cachedir(), segment)
385 for (object, data) in self.load_segment(segment):
386 f = open(os.path.join(segdir, object), 'wb')
390 def load_object(self, segment, object):
391 accessed_segments.add(segment)
392 path = os.path.join(self.get_cachedir(), segment, object)
393 if not os.access(path, os.R_OK):
394 self.extract_segment(segment)
395 if segment in self._lru_list: self._lru_list.remove(segment)
396 self._lru_list.append(segment)
397 while len(self._lru_list) > self.CACHE_SIZE:
398 os.system("rm -rf " + os.path.join(self.cachedir,
400 self._lru_list = self._lru_list[1:]
401 return open(path, 'rb').read()
403 def get(self, refstr):
404 """Fetch the given object and return it.
406 The input should be an object reference, in string form.
409 (segment, object, checksum, slice) = self.parse_ref(refstr)
411 if segment == "zero":
412 return "\0" * slice[1]
414 data = self.load_object(segment, object)
416 if checksum is not None:
417 verifier = ChecksumVerifier(checksum)
418 verifier.update(data)
419 if not verifier.valid():
422 if slice is not None:
423 (start, length, exact) = slice
424 if exact and len(data) != length: raise ValueError
425 data = data[start:start+length]
426 if len(data) != length: raise IndexError
430 def parse(lines, terminate=None):
431 """Generic parser for RFC822-style "Key: Value" data streams.
433 This parser can be used to read metadata logs and snapshot root descriptor
436 lines must be an iterable object which yields a sequence of lines of input.
438 If terminate is specified, it is used as a predicate to determine when to
439 stop reading input lines.
446 # Strip off a trailing newline, if present
447 if len(l) > 0 and l[-1] == "\n":
450 if terminate is not None and terminate(l):
451 if len(dict) > 0: yield dict
456 m = re.match(r"^([-\w]+):\s*(.*)$", l)
458 dict[m.group(1)] = m.group(2)
459 last_key = m.group(1)
460 elif len(l) > 0 and l[0].isspace() and last_key is not None:
465 if len(dict) > 0: yield dict
467 def parse_full(lines):
469 return parse(lines).next()
470 except StopIteration:
473 def parse_metadata_version(s):
474 """Convert a string with the snapshot version format to a tuple."""
476 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
480 return tuple([int(d) for d in m.group(1).split(".")])
482 def read_metadata(object_store, root):
483 """Iterate through all lines in the metadata log, following references."""
485 # Stack for keeping track of recursion when following references to
486 # portions of the log. The last entry in the stack corresponds to the
487 # object currently being parsed. Each entry is a list of lines which have
488 # been reversed, so that popping successive lines from the end of each list
489 # will return lines of the metadata log in order.
492 def follow_ref(refstr):
493 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
494 lines = object_store.get(refstr).splitlines(True)
500 while len(stack) > 0:
507 # An indirect reference which we must follow?
508 if len(line) > 0 and line[0] == '@':
516 """Metadata for a single file (or directory or...) from a snapshot."""
518 # Functions for parsing various datatypes that can appear in a metadata log
522 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
523 if s.startswith("0x"):
525 elif s.startswith("0"):
532 """Decode a URI-encoded (%xx escapes) string."""
537 """An unecoded string."""
542 """Decode a user/group to a tuple of uid/gid followed by name."""
544 uid = MetadataItem.decode_int(items[0])
547 if items[1].startswith("(") and items[1].endswith(")"):
548 name = MetadataItem.decode_str(items[1][1:-1])
552 def decode_device(s):
553 """Decode a device major/minor number."""
554 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
555 return (major, minor)
559 def __init__(self, fields, object_store):
560 """Initialize from a dictionary of key/value pairs from metadata log."""
563 self.object_store = object_store
565 self.items = self.Items()
566 for (k, v) in fields.items():
567 if k in self.field_types:
568 decoder = self.field_types[k]
569 setattr(self.items, k, decoder(v))
573 """Return an iterator for the data blocks that make up a file."""
575 # This traverses the list of blocks that make up a file, following
576 # indirect references. It is implemented in much the same way as
577 # read_metadata, so see that function for details of the technique.
579 objects = self.fields['data'].split()
583 def follow_ref(refstr):
584 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
585 objects = self.object_store.get(refstr).split()
587 stack.append(objects)
589 while len(stack) > 0:
596 # An indirect reference which we must follow?
597 if len(ref) > 0 and ref[0] == '@':
602 # Description of fields that might appear, and how they should be parsed.
603 MetadataItem.field_types = {
604 'name': MetadataItem.decode_str,
605 'type': MetadataItem.raw_str,
606 'mode': MetadataItem.decode_int,
607 'device': MetadataItem.decode_device,
608 'user': MetadataItem.decode_user,
609 'group': MetadataItem.decode_user,
610 'ctime': MetadataItem.decode_int,
611 'mtime': MetadataItem.decode_int,
612 'links': MetadataItem.decode_int,
613 'inode': MetadataItem.raw_str,
614 'checksum': MetadataItem.decode_str,
615 'size': MetadataItem.decode_int,
616 'contents': MetadataItem.decode_str,
617 'target': MetadataItem.decode_str,
620 def iterate_metadata(object_store, root):
621 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
622 yield MetadataItem(d, object_store)
625 """Access to the local database of snapshot contents and object checksums.
627 The local database is consulted when creating a snapshot to determine what
628 data can be re-used from old snapshots. Segment cleaning is performed by
629 manipulating the data in the local database; the local database also
630 includes enough data to guide the segment cleaning process.
633 def __init__(self, path, dbname="localdb.sqlite"):
634 self.db_connection = sqlite3.connect(path + "/" + dbname)
636 # Low-level database access. Use these methods when there isn't a
637 # higher-level interface available. Exception: do, however, remember to
638 # use the commit() method after making changes to make sure they are
639 # actually saved, even when going through higher-level interfaces.
641 "Commit any pending changes to the local database."
642 self.db_connection.commit()
645 "Roll back any pending changes to the local database."
646 self.db_connection.rollback()
649 "Return a DB-API cursor for directly accessing the local database."
650 return self.db_connection.cursor()
652 def list_schemes(self):
653 """Return the list of snapshots found in the local database.
655 The returned value is a list of tuples (id, scheme, name, time, intent).
659 cur.execute("select distinct scheme from snapshots")
660 schemes = [row[0] for row in cur.fetchall()]
664 def list_snapshots(self, scheme):
665 """Return a list of snapshots for the given scheme."""
667 cur.execute("select name from snapshots")
668 snapshots = [row[0] for row in cur.fetchall()]
672 def delete_snapshot(self, scheme, name):
673 """Remove the specified snapshot from the database.
675 Warning: This does not garbage collect all dependent data in the
676 database, so it must be followed by a call to garbage_collect() to make
677 the database consistent.
680 cur.execute("delete from snapshots where scheme = ? and name = ?",
683 def prune_old_snapshots(self, scheme, intent=1.0):
684 """Delete entries from old snapshots from the database.
686 Only snapshots with the specified scheme name will be deleted. If
687 intent is given, it gives the intended next snapshot type, to determine
688 how aggressively to clean (for example, intent=7 could be used if the
689 next snapshot will be a weekly snapshot).
694 # Find the id of the last snapshot to be created. This is used for
695 # measuring time in a way: we record this value in each segment we
696 # expire on this run, and then on a future run can tell if there have
697 # been intervening backups made.
698 cur.execute("select max(snapshotid) from snapshots")
699 last_snapshotid = cur.fetchone()[0]
701 # Get the list of old snapshots for this scheme. Delete all the old
702 # ones. Rules for what to keep:
703 # - Always keep the most recent snapshot.
704 # - If snapshot X is younger than Y, and X has higher intent, then Y
706 cur.execute("""select snapshotid, name, intent,
707 julianday('now') - timestamp as age
708 from snapshots where scheme = ?
709 order by age""", (scheme,))
713 for (id, name, snap_intent, snap_age) in cur.fetchall():
715 if snap_intent < max_intent:
716 # Delete small-intent snapshots if there is a more recent
717 # large-intent snapshot.
719 elif snap_intent == intent:
720 # Delete previous snapshots with the specified intent level.
723 if can_delete and not first:
724 print "Delete snapshot %d (%s)" % (id, name)
725 cur.execute("delete from snapshots where snapshotid = ?",
728 max_intent = max(max_intent, snap_intent)
730 self.garbage_collect()
732 def garbage_collect(self):
733 """Garbage-collect unreachable segment and object data.
735 Remove all segments and checksums which is not reachable from the
736 current set of snapshots stored in the local database.
740 # Delete entries in the segment_utilization table which are for
741 # non-existent snapshots.
742 cur.execute("""delete from segment_utilization
743 where snapshotid not in
744 (select snapshotid from snapshots)""")
746 # Delete segments not referenced by any current snapshots.
747 cur.execute("""delete from segments where segmentid not in
748 (select segmentid from segment_utilization)""")
750 # Delete dangling objects in the block_index table.
751 cur.execute("""delete from block_index
752 where segmentid not in
753 (select segmentid from segments)""")
755 # Remove sub-block signatures for deleted objects.
756 cur.execute("""delete from subblock_signatures
758 (select blockid from block_index)""")
761 class SegmentInfo(Struct): pass
763 def get_segment_cleaning_list(self, age_boost=0.0):
764 """Return a list of all current segments with information for cleaning.
766 Return all segments which are currently known in the local database
767 (there might be other, older segments in the archive itself), and
768 return usage statistics for each to help decide which segments to
771 The returned list will be sorted by estimated cleaning benefit, with
772 segments that are best to clean at the start of the list.
774 If specified, the age_boost parameter (measured in days) will added to
775 the age of each segment, as a way of adjusting the benefit computation
776 before a long-lived snapshot is taken (for example, age_boost might be
777 set to 7 when cleaning prior to taking a weekly snapshot).
782 cur.execute("""select segmentid, used, size, mtime,
783 julianday('now') - mtime as age from segment_info
784 where expire_time is null""")
786 info = self.SegmentInfo()
788 info.used_bytes = row[1]
789 info.size_bytes = row[2]
791 info.age_days = row[4]
793 # If data is not available for whatever reason, treat it as 0.0.
794 if info.age_days is None:
796 if info.used_bytes is None:
797 info.used_bytes = 0.0
799 # Benefit calculation: u is the estimated fraction of each segment
800 # which is utilized (bytes belonging to objects still in use
801 # divided by total size; this doesn't take compression or storage
802 # overhead into account, but should give a reasonable estimate).
804 # The total benefit is a heuristic that combines several factors:
805 # the amount of space that can be reclaimed (1 - u), an ageing
806 # factor (info.age_days) that favors cleaning old segments to young
807 # ones and also is more likely to clean segments that will be
808 # rewritten for long-lived snapshots (age_boost), and finally a
809 # penalty factor for the cost of re-uploading data (u + 0.1).
810 u = info.used_bytes / info.size_bytes
811 info.cleaning_benefit \
812 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
814 segments.append(info)
816 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
819 def mark_segment_expired(self, segment):
820 """Mark a segment for cleaning in the local database.
822 The segment parameter should be either a SegmentInfo object or an
823 integer segment id. Objects in the given segment will be marked as
824 expired, which means that any future snapshots that would re-use those
825 objects will instead write out a new copy of the object, and thus no
826 future snapshots will depend upon the given segment.
829 if isinstance(segment, int):
831 elif isinstance(segment, self.SegmentInfo):
834 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
837 cur.execute("select max(snapshotid) from snapshots")
838 last_snapshotid = cur.fetchone()[0]
839 cur.execute("update segments set expire_time = ? where segmentid = ?",
840 (last_snapshotid, id))
841 cur.execute("update block_index set expired = 0 where segmentid = ?",
844 def balance_expired_objects(self):
845 """Analyze expired objects in segments to be cleaned and group by age.
847 Update the block_index table of the local database to group expired
848 objects by age. The exact number of buckets and the cutoffs for each
849 are dynamically determined. Calling this function after marking
850 segments expired will help in the segment cleaning process, by ensuring
851 that when active objects from clean segments are rewritten, they will
852 be placed into new segments roughly grouped by age.
855 # The expired column of the block_index table is used when generating a
856 # new Cumulus snapshot. A null value indicates that an object may be
857 # re-used. Otherwise, an object must be written into a new segment if
858 # needed. Objects with distinct expired values will be written into
859 # distinct segments, to allow for some grouping by age. The value 0 is
860 # somewhat special in that it indicates any rewritten objects can be
861 # placed in the same segment as completely new objects; this can be
862 # used for very young objects which have been expired, or objects not
863 # expected to be encountered.
865 # In the balancing process, all objects which are not used in any
866 # current snapshots will have expired set to 0. Objects which have
867 # been seen will be sorted by age and will have expired values set to
868 # 0, 1, 2, and so on based on age (with younger objects being assigned
869 # lower values). The number of buckets and the age cutoffs is
870 # determined by looking at the distribution of block ages.
874 # Mark all expired objects with expired = 0; these objects will later
875 # have values set to indicate groupings of objects when repacking.
876 cur.execute("""update block_index set expired = 0
877 where expired is not null""")
879 # We will want to aim for at least one full segment for each bucket
880 # that we eventually create, but don't know how many bytes that should
881 # be due to compression. So compute the average number of bytes in
882 # each expired segment as a rough estimate for the minimum size of each
883 # bucket. (This estimate could be thrown off by many not-fully-packed
884 # segments, but for now don't worry too much about that.) If we can't
885 # compute an average, it's probably because there are no expired
886 # segments, so we have no more work to do.
887 cur.execute("""select avg(size) from segments
889 (select distinct segmentid from block_index
890 where expired is not null)""")
891 segment_size_estimate = cur.fetchone()[0]
892 if not segment_size_estimate:
895 # Next, extract distribution of expired objects (number and size) by
896 # age. Save the timestamp for "now" so that the classification of
897 # blocks into age buckets will not change later in the function, after
898 # time has passed. Set any timestamps in the future to now, so we are
899 # guaranteed that for the rest of this function, age is always
901 cur.execute("select julianday('now')")
902 now = cur.fetchone()[0]
904 cur.execute("""update block_index set timestamp = ?
905 where timestamp > ? and expired is not null""",
908 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
909 from block_index where expired = 0
910 group by age order by age""", (now,))
911 distribution = cur.fetchall()
913 # Start to determine the buckets for expired objects. Heuristics used:
914 # - An upper bound on the number of buckets is given by the number of
915 # segments we estimate it will take to store all data. In fact,
916 # aim for a couple of segments per bucket.
917 # - Place very young objects in bucket 0 (place with new objects)
918 # unless there are enough of them to warrant a separate bucket.
919 # - Try not to create unnecessarily many buckets, since fewer buckets
920 # will allow repacked data to be grouped based on spatial locality
921 # (while more buckets will group by temporal locality). We want a
924 total_bytes = sum([i[2] for i in distribution])
925 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
926 min_size = 1.5 * segment_size_estimate
927 target_size = max(2 * segment_size_estimate,
928 total_bytes / target_buckets)
930 print "segment_size:", segment_size_estimate
931 print "distribution:", distribution
932 print "total_bytes:", total_bytes
933 print "target_buckets:", target_buckets
934 print "min, target size:", min_size, target_size
936 # Chosen cutoffs. Each bucket consists of objects with age greater
937 # than one cutoff value, but not greater than the next largest cutoff.
940 # Starting with the oldest objects, begin grouping together into
941 # buckets of size at least target_size bytes.
942 distribution.reverse()
944 min_age_bucket = False
945 for (age, items, size) in distribution:
946 if bucket_size >= target_size \
947 or (age < MIN_AGE and not min_age_bucket):
948 if bucket_size < target_size and len(cutoffs) > 0:
955 min_age_bucket = True
957 # The last (youngest) bucket will be group 0, unless it has enough data
958 # to be of size min_size by itself, or there happen to be no objects
959 # less than MIN_AGE at all.
960 if bucket_size >= min_size or not min_age_bucket:
964 print "cutoffs:", cutoffs
966 # Update the database to assign each object to the appropriate bucket.
968 for i in range(len(cutoffs)):
969 cur.execute("""update block_index set expired = ?
970 where round(? - timestamp) > ?
971 and expired is not null""",
972 (i, now, cutoffs[i]))