1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 """High-level interface for working with Cumulus archives.
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23 - listing the snapshots and segments present
24 - reading segment contents
25 - parsing snapshot descriptors and snapshot metadata logs
26 - reading and maintaining the local object database
29 from __future__ import division
37 from pysqlite2 import dbapi2 as sqlite3
40 import cumulus.store.file
42 # The largest supported snapshot format that can be understood.
43 FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
45 # Maximum number of nested indirect references allowed in a snapshot.
46 MAX_RECURSION_DEPTH = 3
48 # All segments which have been accessed this session.
49 accessed_segments = set()
51 # Table of methods used to filter segments before storage, and corresponding
52 # filename extensions. These are listed in priority order (methods earlier in
53 # the list are tried first).
55 (".gpg", "cumulus-filter-gpg --decrypt"),
57 (".bz2", "bzip2 -dc"),
61 """Decode a URI-encoded (%xx escapes) string."""
62 def hex_decode(m): return chr(int(m.group(1), 16))
63 return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
65 """Encode a string to URI-encoded (%xx escapes) form."""
67 if c > '+' and c < '\x7f' and c != '@':
70 return "%%%02x" % (ord(c),)
71 return ''.join(hex_encode(c) for c in s)
74 """A class which merely acts as a data container.
76 Instances of this class (or its subclasses) are merely used to store data
77 in various attributes. No methods are provided.
81 return "<%s %s>" % (self.__class__, self.__dict__)
83 CHECKSUM_ALGORITHMS = {
85 'sha224': hashlib.sha224,
86 'sha256': hashlib.sha256,
89 class ChecksumCreator:
90 """Compute a Cumulus checksum for provided data.
92 The algorithm used is selectable, but currently defaults to sha1.
95 def __init__(self, algorithm='sha1'):
96 self.algorithm = algorithm
97 self.hash = CHECKSUM_ALGORITHMS[algorithm]()
99 def update(self, data):
100 self.hash.update(data)
104 return "%s=%s" % (self.algorithm, self.hash.hexdigest())
106 class ChecksumVerifier:
107 """Verify whether a checksum from a snapshot matches the supplied data."""
109 def __init__(self, checksumstr):
110 """Create an object to check the supplied checksum."""
112 (algo, checksum) = checksumstr.split("=", 1)
113 self.checksum = checksum
114 self.hash = CHECKSUM_ALGORITHMS[algo]()
116 def update(self, data):
117 self.hash.update(data)
120 """Return a boolean indicating whether the checksum matches."""
122 result = self.hash.hexdigest()
123 return result == self.checksum
125 class SearchPathEntry(object):
126 """Item representing a possible search location for Cumulus files.
128 Some Cumulus files might be stored in multiple possible file locations: due
129 to format (different compression mechanisms with different extensions),
130 locality (different segments might be placed in different directories to
131 control archiving policies), for backwards compatibility (default location
132 changed over time). A SearchPathEntry describes a possible location for a
135 def __init__(self, directory_prefix, suffix, context=None):
136 self._directory_prefix = directory_prefix
137 self._suffix = suffix
138 self._context = context
141 return "%s(%r, %r, %r)" % (self.__class__.__name__,
142 self._directory_prefix, self._suffix,
145 def build_path(self, basename):
146 """Construct the search path to use for a file with name basename.
148 Returns a tuple (pathname, context), where pathname is the path to try
149 and context is any additional data associated with this search entry
152 return (os.path.join(self._directory_prefix, basename + self._suffix),
155 class SearchPath(object):
156 """A collection of locations to search for files and lookup utilities.
158 For looking for a file in a Cumulus storage backend, a SearchPath object
159 contains a list of possible locations to try. A SearchPath can be used to
160 perform the search as well; when a file is found the search path ordering
161 is updated (moving the successful SearchPathEntry to the front of the list
162 for future searches).
164 def __init__(self, name_regex, searchpath):
165 self._regex = re.compile(name_regex)
166 self._path = list(searchpath)
168 def add_search_entry(self, entry):
169 self._path.append(entry)
171 def directories(self):
172 """Return the set of directories to search for a file type."""
173 return set(entry._directory_prefix for entry in self._path)
175 def get(self, backend, basename):
176 for (i, entry) in enumerate(self._path):
178 (pathname, context) = entry.build_path(basename)
179 fp = backend.get(pathname)
180 # On success, move this entry to the front of the search path
181 # to speed future searches.
184 self._path.insert(0, entry)
185 return (fp, pathname, context)
186 except cumulus.store.NotFoundError:
188 raise cumulus.store.NotFoundError(basename)
190 def stat(self, backend, basename):
191 for (i, entry) in enumerate(self._path):
193 (pathname, context) = entry.build_path(basename)
194 stat_data = backend.stat(pathname)
195 # On success, move this entry to the front of the search path
196 # to speed future searches.
199 self._path.insert(0, entry)
200 result = {"path": pathname}
201 result.update(stat_data)
203 except cumulus.store.NotFoundError:
205 raise cumulus.store.NotFoundError(basename)
207 def list(self, backend):
209 for d in self.directories():
211 for f in backend.list(d):
213 m = self._regex.match(f)
214 if m: yield (os.path.join(d, f), m)
215 except cumulus.store.NotFoundError:
218 raise cumulus.store.NotFoundError(basename)
220 def _build_segments_searchpath(prefix):
221 for (extension, filter) in SEGMENT_FILTERS:
222 yield SearchPathEntry(prefix, extension, filter)
225 "checksums": SearchPath(
226 r"^snapshot-(.*)\.(\w+)sums$",
227 [SearchPathEntry("meta", ".sha1sums"),
228 SearchPathEntry("checksums", ".sha1sums"),
229 SearchPathEntry("", ".sha1sums")]),
230 "segments": SearchPath(
231 (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
234 _build_segments_searchpath("segments0"),
235 _build_segments_searchpath("segments1"),
236 _build_segments_searchpath(""),
237 _build_segments_searchpath("segments"))),
238 "snapshots": SearchPath(
239 r"^snapshot-(.*)\.(cumulus|lbs)$",
240 [SearchPathEntry("snapshots", ".cumulus"),
241 SearchPathEntry("snapshots", ".lbs"),
242 SearchPathEntry("", ".cumulus"),
243 SearchPathEntry("", ".lbs")]),
246 class BackendWrapper(object):
247 """Wrapper around a Cumulus storage backend that understands file types.
249 The BackendWrapper class understands different Cumulus file types, such as
250 snapshots and segments, and implements higher-level operations such as
251 "retrieve a snapshot with a specific name" (hiding operations such as
252 searching for the correct file name).
255 def __init__(self, backend):
256 """Initializes a wrapper around the specified storage backend.
258 store may either be a Store object or URL.
260 if type(backend) in (str, unicode):
261 if backend.find(":") >= 0:
262 self._backend = cumulus.store.open(backend)
264 self._backend = cumulus.store.file.FileStore(backend)
266 self._backend = backend
269 def raw_backend(self):
272 def stat_generic(self, basename, filetype):
273 return SEARCH_PATHS[filetype].stat(self._backend, basename)
275 def open_generic(self, basename, filetype):
276 return SEARCH_PATHS[filetype].get(self._backend, basename)
278 def open_snapshot(self, name):
279 return self.open_generic("snapshot-" + name, "snapshots")
281 def open_segment(self, name):
282 return self.open_generic(name + ".tar", "segments")
284 def list_generic(self, filetype):
285 return ((x[1].group(1), x[0])
286 for x in SEARCH_PATHS[filetype].list(self._backend))
289 def __init__(self, backend):
290 if isinstance(backend, BackendWrapper):
291 self.backend = backend
293 self.backend = BackendWrapper(backend)
298 def get_cachedir(self):
299 if self.cachedir is None:
300 self.cachedir = tempfile.mkdtemp("-cumulus")
304 if self.cachedir is not None:
305 # TODO: Avoid use of system, make this safer
306 os.system("rm -rf " + self.cachedir)
310 def parse_ref(refstr):
311 m = re.match(r"^zero\[(\d+)\]$", refstr)
313 return ("zero", None, None, (0, int(m.group(1)), False))
315 m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
320 checksum = m.group(3)
323 if checksum is not None:
324 checksum = checksum.lstrip("(").rstrip(")")
326 if slice is not None:
327 if m.group(9) is not None:
328 # Size-assertion slice
329 slice = (0, int(m.group(9)), True)
330 elif m.group(6) is None:
332 slice = (0, int(m.group(8)), False)
334 slice = (int(m.group(7)), int(m.group(8)), False)
336 return (segment, object, checksum, slice)
338 def list_snapshots(self):
339 return set(x[0] for x in self.backend.list_generic("snapshots"))
341 def list_segments(self):
342 return set(x[0] for x in self.backend.list_generic("segments"))
344 def load_snapshot(self, snapshot):
345 snapshot_file = self.backend.open_snapshot(snapshot)[0]
346 return snapshot_file.read().splitlines(True)
348 def get_segment(self, segment):
349 accessed_segments.add(segment)
351 (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
352 if filter_cmd is None:
354 (input, output) = os.popen2(filter_cmd)
355 def copy_thread(src, dst):
358 block = src.read(BLOCK_SIZE)
359 if len(block) == 0: break
363 thread.start_new_thread(copy_thread, (segment_fp, input))
366 def load_segment(self, segment):
367 seg = tarfile.open(segment, 'r|', self.get_segment(segment))
369 data_obj = seg.extractfile(item)
370 path = item.name.split('/')
371 if len(path) == 2 and path[0] == segment:
372 yield (path[1], data_obj.read())
374 def extract_segment(self, segment):
375 segdir = os.path.join(self.get_cachedir(), segment)
377 for (object, data) in self.load_segment(segment):
378 f = open(os.path.join(segdir, object), 'wb')
382 def load_object(self, segment, object):
383 accessed_segments.add(segment)
384 path = os.path.join(self.get_cachedir(), segment, object)
385 if not os.access(path, os.R_OK):
386 self.extract_segment(segment)
387 if segment in self._lru_list: self._lru_list.remove(segment)
388 self._lru_list.append(segment)
389 while len(self._lru_list) > self.CACHE_SIZE:
390 os.system("rm -rf " + os.path.join(self.cachedir,
392 self._lru_list = self._lru_list[1:]
393 return open(path, 'rb').read()
395 def get(self, refstr):
396 """Fetch the given object and return it.
398 The input should be an object reference, in string form.
401 (segment, object, checksum, slice) = self.parse_ref(refstr)
403 if segment == "zero":
404 return "\0" * slice[1]
406 data = self.load_object(segment, object)
408 if checksum is not None:
409 verifier = ChecksumVerifier(checksum)
410 verifier.update(data)
411 if not verifier.valid():
414 if slice is not None:
415 (start, length, exact) = slice
416 if exact and len(data) != length: raise ValueError
417 data = data[start:start+length]
418 if len(data) != length: raise IndexError
422 def parse(lines, terminate=None):
423 """Generic parser for RFC822-style "Key: Value" data streams.
425 This parser can be used to read metadata logs and snapshot root descriptor
428 lines must be an iterable object which yields a sequence of lines of input.
430 If terminate is specified, it is used as a predicate to determine when to
431 stop reading input lines.
438 # Strip off a trailing newline, if present
439 if len(l) > 0 and l[-1] == "\n":
442 if terminate is not None and terminate(l):
443 if len(dict) > 0: yield dict
448 m = re.match(r"^([-\w]+):\s*(.*)$", l)
450 dict[m.group(1)] = m.group(2)
451 last_key = m.group(1)
452 elif len(l) > 0 and l[0].isspace() and last_key is not None:
457 if len(dict) > 0: yield dict
459 def parse_full(lines):
461 return parse(lines).next()
462 except StopIteration:
465 def parse_metadata_version(s):
466 """Convert a string with the snapshot version format to a tuple."""
468 m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
472 return tuple([int(d) for d in m.group(1).split(".")])
474 def read_metadata(object_store, root):
475 """Iterate through all lines in the metadata log, following references."""
477 # Stack for keeping track of recursion when following references to
478 # portions of the log. The last entry in the stack corresponds to the
479 # object currently being parsed. Each entry is a list of lines which have
480 # been reversed, so that popping successive lines from the end of each list
481 # will return lines of the metadata log in order.
484 def follow_ref(refstr):
485 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
486 lines = object_store.get(refstr).splitlines(True)
492 while len(stack) > 0:
499 # An indirect reference which we must follow?
500 if len(line) > 0 and line[0] == '@':
508 """Metadata for a single file (or directory or...) from a snapshot."""
510 # Functions for parsing various datatypes that can appear in a metadata log
514 """Decode an integer, expressed in decimal, octal, or hexadecimal."""
515 if s.startswith("0x"):
517 elif s.startswith("0"):
524 """Decode a URI-encoded (%xx escapes) string."""
529 """An unecoded string."""
534 """Decode a user/group to a tuple of uid/gid followed by name."""
536 uid = MetadataItem.decode_int(items[0])
539 if items[1].startswith("(") and items[1].endswith(")"):
540 name = MetadataItem.decode_str(items[1][1:-1])
544 def decode_device(s):
545 """Decode a device major/minor number."""
546 (major, minor) = map(MetadataItem.decode_int, s.split("/"))
547 return (major, minor)
551 def __init__(self, fields, object_store):
552 """Initialize from a dictionary of key/value pairs from metadata log."""
555 self.object_store = object_store
557 self.items = self.Items()
558 for (k, v) in fields.items():
559 if k in self.field_types:
560 decoder = self.field_types[k]
561 setattr(self.items, k, decoder(v))
565 """Return an iterator for the data blocks that make up a file."""
567 # This traverses the list of blocks that make up a file, following
568 # indirect references. It is implemented in much the same way as
569 # read_metadata, so see that function for details of the technique.
571 objects = self.fields['data'].split()
575 def follow_ref(refstr):
576 if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
577 objects = self.object_store.get(refstr).split()
579 stack.append(objects)
581 while len(stack) > 0:
588 # An indirect reference which we must follow?
589 if len(ref) > 0 and ref[0] == '@':
594 # Description of fields that might appear, and how they should be parsed.
595 MetadataItem.field_types = {
596 'name': MetadataItem.decode_str,
597 'type': MetadataItem.raw_str,
598 'mode': MetadataItem.decode_int,
599 'device': MetadataItem.decode_device,
600 'user': MetadataItem.decode_user,
601 'group': MetadataItem.decode_user,
602 'ctime': MetadataItem.decode_int,
603 'mtime': MetadataItem.decode_int,
604 'links': MetadataItem.decode_int,
605 'inode': MetadataItem.raw_str,
606 'checksum': MetadataItem.decode_str,
607 'size': MetadataItem.decode_int,
608 'contents': MetadataItem.decode_str,
609 'target': MetadataItem.decode_str,
612 def iterate_metadata(object_store, root):
613 for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
614 yield MetadataItem(d, object_store)
617 """Access to the local database of snapshot contents and object checksums.
619 The local database is consulted when creating a snapshot to determine what
620 data can be re-used from old snapshots. Segment cleaning is performed by
621 manipulating the data in the local database; the local database also
622 includes enough data to guide the segment cleaning process.
625 def __init__(self, path, dbname="localdb.sqlite"):
626 self.db_connection = sqlite3.connect(path + "/" + dbname)
628 # Low-level database access. Use these methods when there isn't a
629 # higher-level interface available. Exception: do, however, remember to
630 # use the commit() method after making changes to make sure they are
631 # actually saved, even when going through higher-level interfaces.
633 "Commit any pending changes to the local database."
634 self.db_connection.commit()
637 "Roll back any pending changes to the local database."
638 self.db_connection.rollback()
641 "Return a DB-API cursor for directly accessing the local database."
642 return self.db_connection.cursor()
644 def list_schemes(self):
645 """Return the list of snapshots found in the local database.
647 The returned value is a list of tuples (id, scheme, name, time, intent).
651 cur.execute("select distinct scheme from snapshots")
652 schemes = [row[0] for row in cur.fetchall()]
656 def list_snapshots(self, scheme):
657 """Return a list of snapshots for the given scheme."""
659 cur.execute("select name from snapshots")
660 snapshots = [row[0] for row in cur.fetchall()]
664 def delete_snapshot(self, scheme, name):
665 """Remove the specified snapshot from the database.
667 Warning: This does not garbage collect all dependent data in the
668 database, so it must be followed by a call to garbage_collect() to make
669 the database consistent.
672 cur.execute("delete from snapshots where scheme = ? and name = ?",
675 def prune_old_snapshots(self, scheme, intent=1.0):
676 """Delete entries from old snapshots from the database.
678 Only snapshots with the specified scheme name will be deleted. If
679 intent is given, it gives the intended next snapshot type, to determine
680 how aggressively to clean (for example, intent=7 could be used if the
681 next snapshot will be a weekly snapshot).
686 # Find the id of the last snapshot to be created. This is used for
687 # measuring time in a way: we record this value in each segment we
688 # expire on this run, and then on a future run can tell if there have
689 # been intervening backups made.
690 cur.execute("select max(snapshotid) from snapshots")
691 last_snapshotid = cur.fetchone()[0]
693 # Get the list of old snapshots for this scheme. Delete all the old
694 # ones. Rules for what to keep:
695 # - Always keep the most recent snapshot.
696 # - If snapshot X is younger than Y, and X has higher intent, then Y
698 cur.execute("""select snapshotid, name, intent,
699 julianday('now') - timestamp as age
700 from snapshots where scheme = ?
701 order by age""", (scheme,))
705 for (id, name, snap_intent, snap_age) in cur.fetchall():
707 if snap_intent < max_intent:
708 # Delete small-intent snapshots if there is a more recent
709 # large-intent snapshot.
711 elif snap_intent == intent:
712 # Delete previous snapshots with the specified intent level.
715 if can_delete and not first:
716 print "Delete snapshot %d (%s)" % (id, name)
717 cur.execute("delete from snapshots where snapshotid = ?",
720 max_intent = max(max_intent, snap_intent)
722 self.garbage_collect()
724 def garbage_collect(self):
725 """Garbage-collect unreachable segment and object data.
727 Remove all segments and checksums which is not reachable from the
728 current set of snapshots stored in the local database.
732 # Delete entries in the segments_used table which are for non-existent
734 cur.execute("""delete from segments_used
735 where snapshotid not in
736 (select snapshotid from snapshots)""")
738 # Find segments which contain no objects used by any current snapshots,
739 # and delete them from the segment table.
740 cur.execute("""delete from segments where segmentid not in
741 (select segmentid from segments_used)""")
743 # Delete dangling objects in the block_index table.
744 cur.execute("""delete from block_index
745 where segmentid not in
746 (select segmentid from segments)""")
748 # Remove sub-block signatures for deleted objects.
749 cur.execute("""delete from subblock_signatures
751 (select blockid from block_index)""")
754 class SegmentInfo(Struct): pass
756 def get_segment_cleaning_list(self, age_boost=0.0):
757 """Return a list of all current segments with information for cleaning.
759 Return all segments which are currently known in the local database
760 (there might be other, older segments in the archive itself), and
761 return usage statistics for each to help decide which segments to
764 The returned list will be sorted by estimated cleaning benefit, with
765 segments that are best to clean at the start of the list.
767 If specified, the age_boost parameter (measured in days) will added to
768 the age of each segment, as a way of adjusting the benefit computation
769 before a long-lived snapshot is taken (for example, age_boost might be
770 set to 7 when cleaning prior to taking a weekly snapshot).
775 cur.execute("""select segmentid, used, size, mtime,
776 julianday('now') - mtime as age from segment_info
777 where expire_time is null""")
779 info = self.SegmentInfo()
781 info.used_bytes = row[1]
782 info.size_bytes = row[2]
784 info.age_days = row[4]
786 # If data is not available for whatever reason, treat it as 0.0.
787 if info.age_days is None:
789 if info.used_bytes is None:
790 info.used_bytes = 0.0
792 # Benefit calculation: u is the estimated fraction of each segment
793 # which is utilized (bytes belonging to objects still in use
794 # divided by total size; this doesn't take compression or storage
795 # overhead into account, but should give a reasonable estimate).
797 # The total benefit is a heuristic that combines several factors:
798 # the amount of space that can be reclaimed (1 - u), an ageing
799 # factor (info.age_days) that favors cleaning old segments to young
800 # ones and also is more likely to clean segments that will be
801 # rewritten for long-lived snapshots (age_boost), and finally a
802 # penalty factor for the cost of re-uploading data (u + 0.1).
803 u = info.used_bytes / info.size_bytes
804 info.cleaning_benefit \
805 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
807 segments.append(info)
809 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
812 def mark_segment_expired(self, segment):
813 """Mark a segment for cleaning in the local database.
815 The segment parameter should be either a SegmentInfo object or an
816 integer segment id. Objects in the given segment will be marked as
817 expired, which means that any future snapshots that would re-use those
818 objects will instead write out a new copy of the object, and thus no
819 future snapshots will depend upon the given segment.
822 if isinstance(segment, int):
824 elif isinstance(segment, self.SegmentInfo):
827 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
830 cur.execute("select max(snapshotid) from snapshots")
831 last_snapshotid = cur.fetchone()[0]
832 cur.execute("update segments set expire_time = ? where segmentid = ?",
833 (last_snapshotid, id))
834 cur.execute("update block_index set expired = 0 where segmentid = ?",
837 def balance_expired_objects(self):
838 """Analyze expired objects in segments to be cleaned and group by age.
840 Update the block_index table of the local database to group expired
841 objects by age. The exact number of buckets and the cutoffs for each
842 are dynamically determined. Calling this function after marking
843 segments expired will help in the segment cleaning process, by ensuring
844 that when active objects from clean segments are rewritten, they will
845 be placed into new segments roughly grouped by age.
848 # The expired column of the block_index table is used when generating a
849 # new Cumulus snapshot. A null value indicates that an object may be
850 # re-used. Otherwise, an object must be written into a new segment if
851 # needed. Objects with distinct expired values will be written into
852 # distinct segments, to allow for some grouping by age. The value 0 is
853 # somewhat special in that it indicates any rewritten objects can be
854 # placed in the same segment as completely new objects; this can be
855 # used for very young objects which have been expired, or objects not
856 # expected to be encountered.
858 # In the balancing process, all objects which are not used in any
859 # current snapshots will have expired set to 0. Objects which have
860 # been seen will be sorted by age and will have expired values set to
861 # 0, 1, 2, and so on based on age (with younger objects being assigned
862 # lower values). The number of buckets and the age cutoffs is
863 # determined by looking at the distribution of block ages.
867 # Mark all expired objects with expired = 0; these objects will later
868 # have values set to indicate groupings of objects when repacking.
869 cur.execute("""update block_index set expired = 0
870 where expired is not null""")
872 # We will want to aim for at least one full segment for each bucket
873 # that we eventually create, but don't know how many bytes that should
874 # be due to compression. So compute the average number of bytes in
875 # each expired segment as a rough estimate for the minimum size of each
876 # bucket. (This estimate could be thrown off by many not-fully-packed
877 # segments, but for now don't worry too much about that.) If we can't
878 # compute an average, it's probably because there are no expired
879 # segments, so we have no more work to do.
880 cur.execute("""select avg(size) from segments
882 (select distinct segmentid from block_index
883 where expired is not null)""")
884 segment_size_estimate = cur.fetchone()[0]
885 if not segment_size_estimate:
888 # Next, extract distribution of expired objects (number and size) by
889 # age. Save the timestamp for "now" so that the classification of
890 # blocks into age buckets will not change later in the function, after
891 # time has passed. Set any timestamps in the future to now, so we are
892 # guaranteed that for the rest of this function, age is always
894 cur.execute("select julianday('now')")
895 now = cur.fetchone()[0]
897 cur.execute("""update block_index set timestamp = ?
898 where timestamp > ? and expired is not null""",
901 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
902 from block_index where expired = 0
903 group by age order by age""", (now,))
904 distribution = cur.fetchall()
906 # Start to determine the buckets for expired objects. Heuristics used:
907 # - An upper bound on the number of buckets is given by the number of
908 # segments we estimate it will take to store all data. In fact,
909 # aim for a couple of segments per bucket.
910 # - Place very young objects in bucket 0 (place with new objects)
911 # unless there are enough of them to warrant a separate bucket.
912 # - Try not to create unnecessarily many buckets, since fewer buckets
913 # will allow repacked data to be grouped based on spatial locality
914 # (while more buckets will group by temporal locality). We want a
917 total_bytes = sum([i[2] for i in distribution])
918 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
919 min_size = 1.5 * segment_size_estimate
920 target_size = max(2 * segment_size_estimate,
921 total_bytes / target_buckets)
923 print "segment_size:", segment_size_estimate
924 print "distribution:", distribution
925 print "total_bytes:", total_bytes
926 print "target_buckets:", target_buckets
927 print "min, target size:", min_size, target_size
929 # Chosen cutoffs. Each bucket consists of objects with age greater
930 # than one cutoff value, but not greater than the next largest cutoff.
933 # Starting with the oldest objects, begin grouping together into
934 # buckets of size at least target_size bytes.
935 distribution.reverse()
937 min_age_bucket = False
938 for (age, items, size) in distribution:
939 if bucket_size >= target_size \
940 or (age < MIN_AGE and not min_age_bucket):
941 if bucket_size < target_size and len(cutoffs) > 0:
948 min_age_bucket = True
950 # The last (youngest) bucket will be group 0, unless it has enough data
951 # to be of size min_size by itself, or there happen to be no objects
952 # less than MIN_AGE at all.
953 if bucket_size >= min_size or not min_age_bucket:
957 print "cutoffs:", cutoffs
959 # Update the database to assign each object to the appropriate bucket.
961 for i in range(len(cutoffs)):
962 cur.execute("""update block_index set expired = ?
963 where round(? - timestamp) > ?
964 and expired is not null""",
965 (i, now, cutoffs[i]))