Add sha224/sha256 as supported hash algorithms.
[cumulus.git] / python / cumulus / __init__.py
1 """High-level interface for working with Cumulus archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of a Cumulus archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import hashlib, os, re, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 import cumulus.store, cumulus.store.file
16
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
19
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
22
23 # All segments which have been accessed this session.
24 accessed_segments = set()
25
26 # Table of methods used to filter segments before storage, and corresponding
27 # filename extensions.  These are listed in priority order (methods earlier in
28 # the list are tried first).
29 SEGMENT_FILTERS = [
30     (".gpg", "cumulus-filter-gpg --decrypt"),
31     (".gz", "gzip -dc"),
32     (".bz2", "bzip2 -dc"),
33 ]
34
35 def uri_decode(s):
36     """Decode a URI-encoded (%xx escapes) string."""
37     def hex_decode(m): return chr(int(m.group(1), 16))
38     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
39 def uri_encode(s):
40     """Encode a string to URI-encoded (%xx escapes) form."""
41     def hex_encode(c):
42         if c > '+' and c < '\x7f' and c != '@':
43             return c
44         else:
45             return "%%%02x" % (ord(c),)
46     return ''.join(hex_encode(c) for c in s)
47
48 class Struct:
49     """A class which merely acts as a data container.
50
51     Instances of this class (or its subclasses) are merely used to store data
52     in various attributes.  No methods are provided.
53     """
54
55     def __repr__(self):
56         return "<%s %s>" % (self.__class__, self.__dict__)
57
58 CHECKSUM_ALGORITHMS = {
59     'sha1': hashlib.sha1,
60     'sha224': hashlib.sha224,
61     'sha256': hashlib.sha256,
62 }
63
64 class ChecksumCreator:
65     """Compute a Cumulus checksum for provided data.
66
67     The algorithm used is selectable, but currently defaults to sha1.
68     """
69
70     def __init__(self, algorithm='sha1'):
71         self.algorithm = algorithm
72         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
73
74     def update(self, data):
75         self.hash.update(data)
76         return self
77
78     def compute(self):
79         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
80
81 class ChecksumVerifier:
82     """Verify whether a checksum from a snapshot matches the supplied data."""
83
84     def __init__(self, checksumstr):
85         """Create an object to check the supplied checksum."""
86
87         (algo, checksum) = checksumstr.split("=", 1)
88         self.checksum = checksum
89         self.hash = CHECKSUM_ALGORITHMS[algo]()
90
91     def update(self, data):
92         self.hash.update(data)
93
94     def valid(self):
95         """Return a boolean indicating whether the checksum matches."""
96
97         result = self.hash.hexdigest()
98         return result == self.checksum
99
100 class LowlevelDataStore:
101     """Access to the backup store containing segments and snapshot descriptors.
102
103     Instances of this class are used to get direct filesystem-level access to
104     the backup data.  To read a backup, a caller will ordinarily not care about
105     direct access to backup segments, but will instead merely need to access
106     objects from those segments.  The ObjectStore class provides a suitable
107     wrapper around a DataStore to give this high-level access.
108     """
109
110     def __init__(self, path):
111         if isinstance(path, cumulus.store.Store):
112             self.store = path
113         elif path.find(":") >= 0:
114             self.store = cumulus.store.open(path)
115         else:
116             self.store = cumulus.store.file.FileStore(path)
117
118     def _classify(self, filename):
119         for (t, r) in cumulus.store.type_patterns.items():
120             if r.match(filename):
121                 return (t, filename)
122         return (None, filename)
123
124     def scan(self):
125         self.store.scan()
126
127     def lowlevel_open(self, filename):
128         """Return a file-like object for reading data from the given file."""
129
130         (type, filename) = self._classify(filename)
131         return self.store.get(type, filename)
132
133     def lowlevel_stat(self, filename):
134         """Return a dictionary of information about the given file.
135
136         Currently, the only defined field is 'size', giving the size of the
137         file in bytes.
138         """
139
140         (type, filename) = self._classify(filename)
141         return self.store.stat(type, filename)
142
143     # Slightly higher-level list methods.
144     def list_snapshots(self):
145         for f in self.store.list('snapshots'):
146             m = cumulus.store.type_patterns['snapshots'].match(f)
147             if m: yield m.group(1)
148
149     def list_segments(self):
150         for f in self.store.list('segments'):
151             m = cumulus.store.type_patterns['segments'].match(f)
152             if m: yield m.group(1)
153
154 class ObjectStore:
155     def __init__(self, data_store):
156         self.store = data_store
157         self.cachedir = None
158         self.CACHE_SIZE = 16
159         self.lru_list = []
160
161     def get_cachedir(self):
162         if self.cachedir is None:
163             self.cachedir = tempfile.mkdtemp(".lbs")
164         return self.cachedir
165
166     def cleanup(self):
167         if self.cachedir is not None:
168             # TODO: Avoid use of system, make this safer
169             os.system("rm -rf " + self.cachedir)
170         self.cachedir = None
171
172     @staticmethod
173     def parse_ref(refstr):
174         m = re.match(r"^zero\[(\d+)\]$", refstr)
175         if m:
176             return ("zero", None, None, (0, int(m.group(1))))
177
178         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
179         if not m: return
180
181         segment = m.group(1)
182         object = m.group(2)
183         checksum = m.group(3)
184         slice = m.group(4)
185
186         if checksum is not None:
187             checksum = checksum.lstrip("(").rstrip(")")
188
189         if slice is not None:
190             if m.group(9) is not None:
191                 # Size-assertion slice
192                 slice = (0, int(m.group(9)), True)
193             elif m.group(6) is None:
194                 # Abbreviated slice
195                 slice = (0, int(m.group(8)), False)
196             else:
197                 slice = (int(m.group(7)), int(m.group(8)), False)
198
199         return (segment, object, checksum, slice)
200
201     def get_segment(self, segment):
202         accessed_segments.add(segment)
203
204         for (extension, filter) in SEGMENT_FILTERS:
205             try:
206                 raw = self.store.lowlevel_open(segment + ".tar" + extension)
207
208                 (input, output) = os.popen2(filter)
209                 def copy_thread(src, dst):
210                     BLOCK_SIZE = 4096
211                     while True:
212                         block = src.read(BLOCK_SIZE)
213                         if len(block) == 0: break
214                         dst.write(block)
215                     dst.close()
216
217                 thread.start_new_thread(copy_thread, (raw, input))
218                 return output
219             except:
220                 pass
221
222         raise cumulus.store.NotFoundError
223
224     def load_segment(self, segment):
225         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
226         for item in seg:
227             data_obj = seg.extractfile(item)
228             path = item.name.split('/')
229             if len(path) == 2 and path[0] == segment:
230                 yield (path[1], data_obj.read())
231
232     def load_snapshot(self, snapshot):
233         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
234         return file.read().splitlines(True)
235
236     def extract_segment(self, segment):
237         segdir = os.path.join(self.get_cachedir(), segment)
238         os.mkdir(segdir)
239         for (object, data) in self.load_segment(segment):
240             f = open(os.path.join(segdir, object), 'wb')
241             f.write(data)
242             f.close()
243
244     def load_object(self, segment, object):
245         accessed_segments.add(segment)
246         path = os.path.join(self.get_cachedir(), segment, object)
247         if not os.access(path, os.R_OK):
248             self.extract_segment(segment)
249         if segment in self.lru_list: self.lru_list.remove(segment)
250         self.lru_list.append(segment)
251         while len(self.lru_list) > self.CACHE_SIZE:
252             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
253             self.lru_list = self.lru_list[1:]
254         return open(path, 'rb').read()
255
256     def get(self, refstr):
257         """Fetch the given object and return it.
258
259         The input should be an object reference, in string form.
260         """
261
262         (segment, object, checksum, slice) = self.parse_ref(refstr)
263
264         if segment == "zero":
265             return "\0" * slice[1]
266
267         data = self.load_object(segment, object)
268
269         if checksum is not None:
270             verifier = ChecksumVerifier(checksum)
271             verifier.update(data)
272             if not verifier.valid():
273                 raise ValueError
274
275         if slice is not None:
276             (start, length, exact) = slice
277             if exact and len(data) != length: raise ValueError
278             data = data[start:start+length]
279             if len(data) != length: raise IndexError
280
281         return data
282
283 def parse(lines, terminate=None):
284     """Generic parser for RFC822-style "Key: Value" data streams.
285
286     This parser can be used to read metadata logs and snapshot root descriptor
287     files.
288
289     lines must be an iterable object which yields a sequence of lines of input.
290
291     If terminate is specified, it is used as a predicate to determine when to
292     stop reading input lines.
293     """
294
295     dict = {}
296     last_key = None
297
298     for l in lines:
299         # Strip off a trailing newline, if present
300         if len(l) > 0 and l[-1] == "\n":
301             l = l[:-1]
302
303         if terminate is not None and terminate(l):
304             if len(dict) > 0: yield dict
305             dict = {}
306             last_key = None
307             continue
308
309         m = re.match(r"^([-\w]+):\s*(.*)$", l)
310         if m:
311             dict[m.group(1)] = m.group(2)
312             last_key = m.group(1)
313         elif len(l) > 0 and l[0].isspace() and last_key is not None:
314             dict[last_key] += l
315         else:
316             last_key = None
317
318     if len(dict) > 0: yield dict
319
320 def parse_full(lines):
321     try:
322         return parse(lines).next()
323     except StopIteration:
324         return {}
325
326 def parse_metadata_version(s):
327     """Convert a string with the snapshot version format to a tuple."""
328
329     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
330     if m is None:
331         return ()
332     else:
333         return tuple([int(d) for d in m.group(1).split(".")])
334
335 def read_metadata(object_store, root):
336     """Iterate through all lines in the metadata log, following references."""
337
338     # Stack for keeping track of recursion when following references to
339     # portions of the log.  The last entry in the stack corresponds to the
340     # object currently being parsed.  Each entry is a list of lines which have
341     # been reversed, so that popping successive lines from the end of each list
342     # will return lines of the metadata log in order.
343     stack = []
344
345     def follow_ref(refstr):
346         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
347         lines = object_store.get(refstr).splitlines(True)
348         lines.reverse()
349         stack.append(lines)
350
351     follow_ref(root)
352
353     while len(stack) > 0:
354         top = stack[-1]
355         if len(top) == 0:
356             stack.pop()
357             continue
358         line = top.pop()
359
360         # An indirect reference which we must follow?
361         if len(line) > 0 and line[0] == '@':
362             ref = line[1:]
363             ref.strip()
364             follow_ref(ref)
365         else:
366             yield line
367
368 class MetadataItem:
369     """Metadata for a single file (or directory or...) from a snapshot."""
370
371     # Functions for parsing various datatypes that can appear in a metadata log
372     # item.
373     @staticmethod
374     def decode_int(s):
375         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
376         if s.startswith("0x"):
377             return int(s, 16)
378         elif s.startswith("0"):
379             return int(s, 8)
380         else:
381             return int(s, 10)
382
383     @staticmethod
384     def decode_str(s):
385         """Decode a URI-encoded (%xx escapes) string."""
386         return uri_decode(s)
387
388     @staticmethod
389     def raw_str(s):
390         """An unecoded string."""
391         return s
392
393     @staticmethod
394     def decode_user(s):
395         """Decode a user/group to a tuple of uid/gid followed by name."""
396         items = s.split()
397         uid = MetadataItem.decode_int(items[0])
398         name = None
399         if len(items) > 1:
400             if items[1].startswith("(") and items[1].endswith(")"):
401                 name = MetadataItem.decode_str(items[1][1:-1])
402         return (uid, name)
403
404     @staticmethod
405     def decode_device(s):
406         """Decode a device major/minor number."""
407         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
408         return (major, minor)
409
410     class Items: pass
411
412     def __init__(self, fields, object_store):
413         """Initialize from a dictionary of key/value pairs from metadata log."""
414
415         self.fields = fields
416         self.object_store = object_store
417         self.keys = []
418         self.items = self.Items()
419         for (k, v) in fields.items():
420             if k in self.field_types:
421                 decoder = self.field_types[k]
422                 setattr(self.items, k, decoder(v))
423                 self.keys.append(k)
424
425     def data(self):
426         """Return an iterator for the data blocks that make up a file."""
427
428         # This traverses the list of blocks that make up a file, following
429         # indirect references.  It is implemented in much the same way as
430         # read_metadata, so see that function for details of the technique.
431
432         objects = self.fields['data'].split()
433         objects.reverse()
434         stack = [objects]
435
436         def follow_ref(refstr):
437             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
438             objects = self.object_store.get(refstr).split()
439             objects.reverse()
440             stack.append(objects)
441
442         while len(stack) > 0:
443             top = stack[-1]
444             if len(top) == 0:
445                 stack.pop()
446                 continue
447             ref = top.pop()
448
449             # An indirect reference which we must follow?
450             if len(ref) > 0 and ref[0] == '@':
451                 follow_ref(ref[1:])
452             else:
453                 yield ref
454
455 # Description of fields that might appear, and how they should be parsed.
456 MetadataItem.field_types = {
457     'name': MetadataItem.decode_str,
458     'type': MetadataItem.raw_str,
459     'mode': MetadataItem.decode_int,
460     'device': MetadataItem.decode_device,
461     'user': MetadataItem.decode_user,
462     'group': MetadataItem.decode_user,
463     'ctime': MetadataItem.decode_int,
464     'mtime': MetadataItem.decode_int,
465     'links': MetadataItem.decode_int,
466     'inode': MetadataItem.raw_str,
467     'checksum': MetadataItem.decode_str,
468     'size': MetadataItem.decode_int,
469     'contents': MetadataItem.decode_str,
470     'target': MetadataItem.decode_str,
471 }
472
473 def iterate_metadata(object_store, root):
474     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
475         yield MetadataItem(d, object_store)
476
477 class LocalDatabase:
478     """Access to the local database of snapshot contents and object checksums.
479
480     The local database is consulted when creating a snapshot to determine what
481     data can be re-used from old snapshots.  Segment cleaning is performed by
482     manipulating the data in the local database; the local database also
483     includes enough data to guide the segment cleaning process.
484     """
485
486     def __init__(self, path, dbname="localdb.sqlite"):
487         self.db_connection = sqlite3.connect(path + "/" + dbname)
488
489     # Low-level database access.  Use these methods when there isn't a
490     # higher-level interface available.  Exception: do, however, remember to
491     # use the commit() method after making changes to make sure they are
492     # actually saved, even when going through higher-level interfaces.
493     def commit(self):
494         "Commit any pending changes to the local database."
495         self.db_connection.commit()
496
497     def rollback(self):
498         "Roll back any pending changes to the local database."
499         self.db_connection.rollback()
500
501     def cursor(self):
502         "Return a DB-API cursor for directly accessing the local database."
503         return self.db_connection.cursor()
504
505     def list_schemes(self):
506         """Return the list of snapshots found in the local database.
507
508         The returned value is a list of tuples (id, scheme, name, time, intent).
509         """
510
511         cur = self.cursor()
512         cur.execute("select distinct scheme from snapshots")
513         schemes = [row[0] for row in cur.fetchall()]
514         schemes.sort()
515         return schemes
516
517     def garbage_collect(self, scheme, intent=1.0):
518         """Delete entries from old snapshots from the database.
519
520         Only snapshots with the specified scheme name will be deleted.  If
521         intent is given, it gives the intended next snapshot type, to determine
522         how aggressively to clean (for example, intent=7 could be used if the
523         next snapshot will be a weekly snapshot).
524         """
525
526         cur = self.cursor()
527
528         # Find the id of the last snapshot to be created.  This is used for
529         # measuring time in a way: we record this value in each segment we
530         # expire on this run, and then on a future run can tell if there have
531         # been intervening backups made.
532         cur.execute("select max(snapshotid) from snapshots")
533         last_snapshotid = cur.fetchone()[0]
534
535         # Get the list of old snapshots for this scheme.  Delete all the old
536         # ones.  Rules for what to keep:
537         #   - Always keep the most recent snapshot.
538         #   - If snapshot X is younger than Y, and X has higher intent, then Y
539         #     can be deleted.
540         cur.execute("""select snapshotid, name, intent,
541                               julianday('now') - timestamp as age
542                        from snapshots where scheme = ?
543                        order by age""", (scheme,))
544
545         first = True
546         max_intent = intent
547         for (id, name, snap_intent, snap_age) in cur.fetchall():
548             can_delete = False
549             if snap_intent < max_intent:
550                 # Delete small-intent snapshots if there is a more recent
551                 # large-intent snapshot.
552                 can_delete = True
553             elif snap_intent == intent:
554                 # Delete previous snapshots with the specified intent level.
555                 can_delete = True
556
557             if can_delete and not first:
558                 print "Delete snapshot %d (%s)" % (id, name)
559                 cur.execute("delete from snapshots where snapshotid = ?",
560                             (id,))
561             first = False
562             max_intent = max(max_intent, snap_intent)
563
564         # Delete entries in the segments_used table which are for non-existent
565         # snapshots.
566         cur.execute("""delete from segments_used
567                        where snapshotid not in
568                            (select snapshotid from snapshots)""")
569
570         # Find segments which contain no objects used by any current snapshots,
571         # and delete them from the segment table.
572         cur.execute("""delete from segments where segmentid not in
573                            (select segmentid from segments_used)""")
574
575         # Delete unused objects in the block_index table.  By "unused", we mean
576         # any object which was stored in a segment which has been deleted, and
577         # any object in a segment which was marked for cleaning and has had
578         # cleaning performed already (the expired time is less than the current
579         # largest snapshot id).
580         cur.execute("""delete from block_index
581                        where segmentid not in (select segmentid from segments)
582                           or segmentid in (select segmentid from segments
583                                            where expire_time < ?)""",
584                     (last_snapshotid,))
585
586         # Remove sub-block signatures for deleted objects.
587         cur.execute("""delete from subblock_signatures
588                        where blockid not in
589                            (select blockid from block_index)""")
590
591     # Segment cleaning.
592     class SegmentInfo(Struct): pass
593
594     def get_segment_cleaning_list(self, age_boost=0.0):
595         """Return a list of all current segments with information for cleaning.
596
597         Return all segments which are currently known in the local database
598         (there might be other, older segments in the archive itself), and
599         return usage statistics for each to help decide which segments to
600         clean.
601
602         The returned list will be sorted by estimated cleaning benefit, with
603         segments that are best to clean at the start of the list.
604
605         If specified, the age_boost parameter (measured in days) will added to
606         the age of each segment, as a way of adjusting the benefit computation
607         before a long-lived snapshot is taken (for example, age_boost might be
608         set to 7 when cleaning prior to taking a weekly snapshot).
609         """
610
611         cur = self.cursor()
612         segments = []
613         cur.execute("""select segmentid, used, size, mtime,
614                        julianday('now') - mtime as age from segment_info
615                        where expire_time is null""")
616         for row in cur:
617             info = self.SegmentInfo()
618             info.id = row[0]
619             info.used_bytes = row[1]
620             info.size_bytes = row[2]
621             info.mtime = row[3]
622             info.age_days = row[4]
623
624             # If data is not available for whatever reason, treat it as 0.0.
625             if info.age_days is None:
626                 info.age_days = 0.0
627             if info.used_bytes is None:
628                 info.used_bytes = 0.0
629
630             # Benefit calculation: u is the estimated fraction of each segment
631             # which is utilized (bytes belonging to objects still in use
632             # divided by total size; this doesn't take compression or storage
633             # overhead into account, but should give a reasonable estimate).
634             #
635             # The total benefit is a heuristic that combines several factors:
636             # the amount of space that can be reclaimed (1 - u), an ageing
637             # factor (info.age_days) that favors cleaning old segments to young
638             # ones and also is more likely to clean segments that will be
639             # rewritten for long-lived snapshots (age_boost), and finally a
640             # penalty factor for the cost of re-uploading data (u + 0.1).
641             u = info.used_bytes / info.size_bytes
642             info.cleaning_benefit \
643                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
644
645             segments.append(info)
646
647         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
648         return segments
649
650     def mark_segment_expired(self, segment):
651         """Mark a segment for cleaning in the local database.
652
653         The segment parameter should be either a SegmentInfo object or an
654         integer segment id.  Objects in the given segment will be marked as
655         expired, which means that any future snapshots that would re-use those
656         objects will instead write out a new copy of the object, and thus no
657         future snapshots will depend upon the given segment.
658         """
659
660         if isinstance(segment, int):
661             id = segment
662         elif isinstance(segment, self.SegmentInfo):
663             id = segment.id
664         else:
665             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
666
667         cur = self.cursor()
668         cur.execute("select max(snapshotid) from snapshots")
669         last_snapshotid = cur.fetchone()[0]
670         cur.execute("update segments set expire_time = ? where segmentid = ?",
671                     (last_snapshotid, id))
672         cur.execute("update block_index set expired = 0 where segmentid = ?",
673                     (id,))
674
675     def balance_expired_objects(self):
676         """Analyze expired objects in segments to be cleaned and group by age.
677
678         Update the block_index table of the local database to group expired
679         objects by age.  The exact number of buckets and the cutoffs for each
680         are dynamically determined.  Calling this function after marking
681         segments expired will help in the segment cleaning process, by ensuring
682         that when active objects from clean segments are rewritten, they will
683         be placed into new segments roughly grouped by age.
684         """
685
686         # The expired column of the block_index table is used when generating a
687         # new Cumulus snapshot.  A null value indicates that an object may be
688         # re-used.  Otherwise, an object must be written into a new segment if
689         # needed.  Objects with distinct expired values will be written into
690         # distinct segments, to allow for some grouping by age.  The value 0 is
691         # somewhat special in that it indicates any rewritten objects can be
692         # placed in the same segment as completely new objects; this can be
693         # used for very young objects which have been expired, or objects not
694         # expected to be encountered.
695         #
696         # In the balancing process, all objects which are not used in any
697         # current snapshots will have expired set to 0.  Objects which have
698         # been seen will be sorted by age and will have expired values set to
699         # 0, 1, 2, and so on based on age (with younger objects being assigned
700         # lower values).  The number of buckets and the age cutoffs is
701         # determined by looking at the distribution of block ages.
702
703         cur = self.cursor()
704
705         # Mark all expired objects with expired = 0; these objects will later
706         # have values set to indicate groupings of objects when repacking.
707         cur.execute("""update block_index set expired = 0
708                        where expired is not null""")
709
710         # We will want to aim for at least one full segment for each bucket
711         # that we eventually create, but don't know how many bytes that should
712         # be due to compression.  So compute the average number of bytes in
713         # each expired segment as a rough estimate for the minimum size of each
714         # bucket.  (This estimate could be thrown off by many not-fully-packed
715         # segments, but for now don't worry too much about that.)  If we can't
716         # compute an average, it's probably because there are no expired
717         # segments, so we have no more work to do.
718         cur.execute("""select avg(size) from segments
719                        where segmentid in
720                            (select distinct segmentid from block_index
721                             where expired is not null)""")
722         segment_size_estimate = cur.fetchone()[0]
723         if not segment_size_estimate:
724             return
725
726         # Next, extract distribution of expired objects (number and size) by
727         # age.  Save the timestamp for "now" so that the classification of
728         # blocks into age buckets will not change later in the function, after
729         # time has passed.  Set any timestamps in the future to now, so we are
730         # guaranteed that for the rest of this function, age is always
731         # non-negative.
732         cur.execute("select julianday('now')")
733         now = cur.fetchone()[0]
734
735         cur.execute("""update block_index set timestamp = ?
736                        where timestamp > ? and expired is not null""",
737                     (now, now))
738
739         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
740                        from block_index where expired = 0
741                        group by age order by age""", (now,))
742         distribution = cur.fetchall()
743
744         # Start to determine the buckets for expired objects.  Heuristics used:
745         #   - An upper bound on the number of buckets is given by the number of
746         #     segments we estimate it will take to store all data.  In fact,
747         #     aim for a couple of segments per bucket.
748         #   - Place very young objects in bucket 0 (place with new objects)
749         #     unless there are enough of them to warrant a separate bucket.
750         #   - Try not to create unnecessarily many buckets, since fewer buckets
751         #     will allow repacked data to be grouped based on spatial locality
752         #     (while more buckets will group by temporal locality).  We want a
753         #     balance.
754         MIN_AGE = 4
755         total_bytes = sum([i[2] for i in distribution])
756         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
757         min_size = 1.5 * segment_size_estimate
758         target_size = max(2 * segment_size_estimate,
759                           total_bytes / target_buckets)
760
761         print "segment_size:", segment_size_estimate
762         print "distribution:", distribution
763         print "total_bytes:", total_bytes
764         print "target_buckets:", target_buckets
765         print "min, target size:", min_size, target_size
766
767         # Chosen cutoffs.  Each bucket consists of objects with age greater
768         # than one cutoff value, but not greater than the next largest cutoff.
769         cutoffs = []
770
771         # Starting with the oldest objects, begin grouping together into
772         # buckets of size at least target_size bytes.
773         distribution.reverse()
774         bucket_size = 0
775         min_age_bucket = False
776         for (age, items, size) in distribution:
777             if bucket_size >= target_size \
778                 or (age < MIN_AGE and not min_age_bucket):
779                 if bucket_size < target_size and len(cutoffs) > 0:
780                     cutoffs.pop()
781                 cutoffs.append(age)
782                 bucket_size = 0
783
784             bucket_size += size
785             if age < MIN_AGE:
786                 min_age_bucket = True
787
788         # The last (youngest) bucket will be group 0, unless it has enough data
789         # to be of size min_size by itself, or there happen to be no objects
790         # less than MIN_AGE at all.
791         if bucket_size >= min_size or not min_age_bucket:
792             cutoffs.append(-1)
793         cutoffs.append(-1)
794
795         print "cutoffs:", cutoffs
796
797         # Update the database to assign each object to the appropriate bucket.
798         cutoffs.reverse()
799         for i in range(len(cutoffs)):
800             cur.execute("""update block_index set expired = ?
801                            where round(? - timestamp) > ?
802                              and expired is not null""",
803                         (i, now, cutoffs[i]))