Detect decompression script needed for segments based on extension.
[cumulus.git] / python / cumulus / __init__.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 import cumulus.store, cumulus.store.file
16
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 8)         # LBS Snapshot v0.8
19
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
22
23 # All segments which have been accessed this session.
24 accessed_segments = set()
25
26 # Table of methods used to filter segments before storage, and corresponding
27 # filename extensions.  These are listed in priority order (methods earlier in
28 # the list are tried first).
29 SEGMENT_FILTERS = [
30     (".gpg", "lbs-filter-gpg --decrypt"),
31     (".gz", "gzip -dc"),
32     (".bz2", "bzip2 -dc"),
33 ]
34
35 def uri_decode(s):
36     """Decode a URI-encoded (%xx escapes) string."""
37     def hex_decode(m): return chr(int(m.group(1), 16))
38     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
39 def uri_encode(s):
40     """Encode a string to URI-encoded (%xx escapes) form."""
41     def hex_encode(c):
42         if c > '+' and c < '\x7f' and c != '@':
43             return c
44         else:
45             return "%%%02x" % (ord(c),)
46     return ''.join(hex_encode(c) for c in s)
47
48 class Struct:
49     """A class which merely acts as a data container.
50
51     Instances of this class (or its subclasses) are merely used to store data
52     in various attributes.  No methods are provided.
53     """
54
55     def __repr__(self):
56         return "<%s %s>" % (self.__class__, self.__dict__)
57
58 CHECKSUM_ALGORITHMS = {
59     'sha1': sha.new
60 }
61
62 class ChecksumCreator:
63     """Compute an LBS checksum for provided data.
64
65     The algorithm used is selectable, but currently defaults to sha1.
66     """
67
68     def __init__(self, algorithm='sha1'):
69         self.algorithm = algorithm
70         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
71
72     def update(self, data):
73         self.hash.update(data)
74         return self
75
76     def compute(self):
77         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
78
79 class ChecksumVerifier:
80     """Verify whether a checksum from a snapshot matches the supplied data."""
81
82     def __init__(self, checksumstr):
83         """Create an object to check the supplied checksum."""
84
85         (algo, checksum) = checksumstr.split("=", 1)
86         self.checksum = checksum
87         self.hash = CHECKSUM_ALGORITHMS[algo]()
88
89     def update(self, data):
90         self.hash.update(data)
91
92     def valid(self):
93         """Return a boolean indicating whether the checksum matches."""
94
95         result = self.hash.hexdigest()
96         return result == self.checksum
97
98 class LowlevelDataStore:
99     """Access to the backup store containing segments and snapshot descriptors.
100
101     Instances of this class are used to get direct filesystem-level access to
102     the backup data.  To read a backup, a caller will ordinarily not care about
103     direct access to backup segments, but will instead merely need to access
104     objects from those segments.  The ObjectStore class provides a suitable
105     wrapper around a DataStore to give this high-level access.
106     """
107
108     def __init__(self, path):
109         if isinstance(path, cumulus.store.Store):
110             self.store = path
111         elif path.find(":") >= 0:
112             self.store = cumulus.store.open(path)
113         else:
114             self.store = cumulus.store.file.FileStore(path)
115
116     def _classify(self, filename):
117         for (t, r) in cumulus.store.type_patterns.items():
118             if r.match(filename):
119                 return (t, filename)
120         return (None, filename)
121
122     def lowlevel_open(self, filename):
123         """Return a file-like object for reading data from the given file."""
124
125         (type, filename) = self._classify(filename)
126         return self.store.get(type, filename)
127
128     def lowlevel_stat(self, filename):
129         """Return a dictionary of information about the given file.
130
131         Currently, the only defined field is 'size', giving the size of the
132         file in bytes.
133         """
134
135         (type, filename) = self._classify(filename)
136         return self.store.stat(type, filename)
137
138     # Slightly higher-level list methods.
139     def list_snapshots(self):
140         for f in self.store.list('snapshots'):
141             m = cumulus.store.type_patterns['snapshots'].match(f)
142             if m: yield m.group(1)
143
144     def list_segments(self):
145         for f in self.store.list('segments'):
146             m = cumulus.store.type_patterns['segments'].match(f)
147             if m: yield m.group(1)
148
149 class ObjectStore:
150     def __init__(self, data_store):
151         self.store = data_store
152         self.cachedir = None
153         self.CACHE_SIZE = 16
154         self.lru_list = []
155
156     def get_cachedir(self):
157         if self.cachedir is None:
158             self.cachedir = tempfile.mkdtemp(".lbs")
159         return self.cachedir
160
161     def cleanup(self):
162         if self.cachedir is not None:
163             # TODO: Avoid use of system, make this safer
164             os.system("rm -rf " + self.cachedir)
165         self.cachedir = None
166
167     @staticmethod
168     def parse_ref(refstr):
169         m = re.match(r"^zero\[(\d+)\]$", refstr)
170         if m:
171             return ("zero", None, None, (0, int(m.group(1))))
172
173         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
174         if not m: return
175
176         segment = m.group(1)
177         object = m.group(2)
178         checksum = m.group(3)
179         slice = m.group(4)
180
181         if checksum is not None:
182             checksum = checksum.lstrip("(").rstrip(")")
183
184         if slice is not None:
185             if m.group(9) is not None:
186                 # Size-assertion slice
187                 slice = (0, int(m.group(9)), True)
188             elif m.group(6) is None:
189                 # Abbreviated slice
190                 slice = (0, int(m.group(8)), False)
191             else:
192                 slice = (int(m.group(7)), int(m.group(8)), False)
193
194         return (segment, object, checksum, slice)
195
196     def get_segment(self, segment):
197         accessed_segments.add(segment)
198
199         for (extension, filter) in SEGMENT_FILTERS:
200             try:
201                 raw = self.store.lowlevel_open(segment + ".tar" + extension)
202
203                 (input, output) = os.popen2(filter)
204                 def copy_thread(src, dst):
205                     BLOCK_SIZE = 4096
206                     while True:
207                         block = src.read(BLOCK_SIZE)
208                         if len(block) == 0: break
209                         dst.write(block)
210                     dst.close()
211
212                 thread.start_new_thread(copy_thread, (raw, input))
213                 return output
214             except:
215                 pass
216
217         raise cumulus.store.NotFoundError
218
219     def load_segment(self, segment):
220         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
221         for item in seg:
222             data_obj = seg.extractfile(item)
223             path = item.name.split('/')
224             if len(path) == 2 and path[0] == segment:
225                 yield (path[1], data_obj.read())
226
227     def load_snapshot(self, snapshot):
228         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
229         return file.read().splitlines(True)
230
231     def extract_segment(self, segment):
232         segdir = os.path.join(self.get_cachedir(), segment)
233         os.mkdir(segdir)
234         for (object, data) in self.load_segment(segment):
235             f = open(os.path.join(segdir, object), 'wb')
236             f.write(data)
237             f.close()
238
239     def load_object(self, segment, object):
240         accessed_segments.add(segment)
241         path = os.path.join(self.get_cachedir(), segment, object)
242         if not os.access(path, os.R_OK):
243             self.extract_segment(segment)
244         if segment in self.lru_list: self.lru_list.remove(segment)
245         self.lru_list.append(segment)
246         while len(self.lru_list) > self.CACHE_SIZE:
247             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
248             self.lru_list = self.lru_list[1:]
249         return open(path, 'rb').read()
250
251     def get(self, refstr):
252         """Fetch the given object and return it.
253
254         The input should be an object reference, in string form.
255         """
256
257         (segment, object, checksum, slice) = self.parse_ref(refstr)
258
259         if segment == "zero":
260             return "\0" * slice[1]
261
262         data = self.load_object(segment, object)
263
264         if checksum is not None:
265             verifier = ChecksumVerifier(checksum)
266             verifier.update(data)
267             if not verifier.valid():
268                 raise ValueError
269
270         if slice is not None:
271             (start, length, exact) = slice
272             if exact and len(data) != length: raise ValueError
273             data = data[start:start+length]
274             if len(data) != length: raise IndexError
275
276         return data
277
278 def parse(lines, terminate=None):
279     """Generic parser for RFC822-style "Key: Value" data streams.
280
281     This parser can be used to read metadata logs and snapshot root descriptor
282     files.
283
284     lines must be an iterable object which yields a sequence of lines of input.
285
286     If terminate is specified, it is used as a predicate to determine when to
287     stop reading input lines.
288     """
289
290     dict = {}
291     last_key = None
292
293     for l in lines:
294         # Strip off a trailing newline, if present
295         if len(l) > 0 and l[-1] == "\n":
296             l = l[:-1]
297
298         if terminate is not None and terminate(l):
299             if len(dict) > 0: yield dict
300             dict = {}
301             last_key = None
302             continue
303
304         m = re.match(r"^([-\w]+):\s*(.*)$", l)
305         if m:
306             dict[m.group(1)] = m.group(2)
307             last_key = m.group(1)
308         elif len(l) > 0 and l[0].isspace() and last_key is not None:
309             dict[last_key] += l
310         else:
311             last_key = None
312
313     if len(dict) > 0: yield dict
314
315 def parse_full(lines):
316     try:
317         return parse(lines).next()
318     except StopIteration:
319         return {}
320
321 def parse_metadata_version(s):
322     """Convert a string with the snapshot version format to a tuple."""
323
324     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
325     if m is None:
326         return ()
327     else:
328         return tuple([int(d) for d in m.group(1).split(".")])
329
330 def read_metadata(object_store, root):
331     """Iterate through all lines in the metadata log, following references."""
332
333     # Stack for keeping track of recursion when following references to
334     # portions of the log.  The last entry in the stack corresponds to the
335     # object currently being parsed.  Each entry is a list of lines which have
336     # been reversed, so that popping successive lines from the end of each list
337     # will return lines of the metadata log in order.
338     stack = []
339
340     def follow_ref(refstr):
341         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
342         lines = object_store.get(refstr).splitlines(True)
343         lines.reverse()
344         stack.append(lines)
345
346     follow_ref(root)
347
348     while len(stack) > 0:
349         top = stack[-1]
350         if len(top) == 0:
351             stack.pop()
352             continue
353         line = top.pop()
354
355         # An indirect reference which we must follow?
356         if len(line) > 0 and line[0] == '@':
357             ref = line[1:]
358             ref.strip()
359             follow_ref(ref)
360         else:
361             yield line
362
363 class MetadataItem:
364     """Metadata for a single file (or directory or...) from a snapshot."""
365
366     # Functions for parsing various datatypes that can appear in a metadata log
367     # item.
368     @staticmethod
369     def decode_int(s):
370         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
371         if s.startswith("0x"):
372             return int(s, 16)
373         elif s.startswith("0"):
374             return int(s, 8)
375         else:
376             return int(s, 10)
377
378     @staticmethod
379     def decode_str(s):
380         """Decode a URI-encoded (%xx escapes) string."""
381         return uri_decode(s)
382
383     @staticmethod
384     def raw_str(s):
385         """An unecoded string."""
386         return s
387
388     @staticmethod
389     def decode_user(s):
390         """Decode a user/group to a tuple of uid/gid followed by name."""
391         items = s.split()
392         uid = MetadataItem.decode_int(items[0])
393         name = None
394         if len(items) > 1:
395             if items[1].startswith("(") and items[1].endswith(")"):
396                 name = MetadataItem.decode_str(items[1][1:-1])
397         return (uid, name)
398
399     @staticmethod
400     def decode_device(s):
401         """Decode a device major/minor number."""
402         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
403         return (major, minor)
404
405     class Items: pass
406
407     def __init__(self, fields, object_store):
408         """Initialize from a dictionary of key/value pairs from metadata log."""
409
410         self.fields = fields
411         self.object_store = object_store
412         self.keys = []
413         self.items = self.Items()
414         for (k, v) in fields.items():
415             if k in self.field_types:
416                 decoder = self.field_types[k]
417                 setattr(self.items, k, decoder(v))
418                 self.keys.append(k)
419
420     def data(self):
421         """Return an iterator for the data blocks that make up a file."""
422
423         # This traverses the list of blocks that make up a file, following
424         # indirect references.  It is implemented in much the same way as
425         # read_metadata, so see that function for details of the technique.
426
427         objects = self.fields['data'].split()
428         objects.reverse()
429         stack = [objects]
430
431         def follow_ref(refstr):
432             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
433             objects = self.object_store.get(refstr).split()
434             objects.reverse()
435             stack.append(objects)
436
437         while len(stack) > 0:
438             top = stack[-1]
439             if len(top) == 0:
440                 stack.pop()
441                 continue
442             ref = top.pop()
443
444             # An indirect reference which we must follow?
445             if len(ref) > 0 and ref[0] == '@':
446                 follow_ref(ref[1:])
447             else:
448                 yield ref
449
450 # Description of fields that might appear, and how they should be parsed.
451 MetadataItem.field_types = {
452     'name': MetadataItem.decode_str,
453     'type': MetadataItem.raw_str,
454     'mode': MetadataItem.decode_int,
455     'device': MetadataItem.decode_device,
456     'user': MetadataItem.decode_user,
457     'group': MetadataItem.decode_user,
458     'ctime': MetadataItem.decode_int,
459     'mtime': MetadataItem.decode_int,
460     'links': MetadataItem.decode_int,
461     'inode': MetadataItem.raw_str,
462     'checksum': MetadataItem.decode_str,
463     'size': MetadataItem.decode_int,
464     'contents': MetadataItem.decode_str,
465     'target': MetadataItem.decode_str,
466 }
467
468 def iterate_metadata(object_store, root):
469     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
470         yield MetadataItem(d, object_store)
471
472 class LocalDatabase:
473     """Access to the local database of snapshot contents and object checksums.
474
475     The local database is consulted when creating a snapshot to determine what
476     data can be re-used from old snapshots.  Segment cleaning is performed by
477     manipulating the data in the local database; the local database also
478     includes enough data to guide the segment cleaning process.
479     """
480
481     def __init__(self, path, dbname="localdb.sqlite"):
482         self.db_connection = sqlite3.connect(path + "/" + dbname)
483
484     # Low-level database access.  Use these methods when there isn't a
485     # higher-level interface available.  Exception: do, however, remember to
486     # use the commit() method after making changes to make sure they are
487     # actually saved, even when going through higher-level interfaces.
488     def commit(self):
489         "Commit any pending changes to the local database."
490         self.db_connection.commit()
491
492     def rollback(self):
493         "Roll back any pending changes to the local database."
494         self.db_connection.rollback()
495
496     def cursor(self):
497         "Return a DB-API cursor for directly accessing the local database."
498         return self.db_connection.cursor()
499
500     def list_schemes(self):
501         """Return the list of snapshots found in the local database.
502
503         The returned value is a list of tuples (id, scheme, name, time, intent).
504         """
505
506         cur = self.cursor()
507         cur.execute("select distinct scheme from snapshots")
508         schemes = [row[0] for row in cur.fetchall()]
509         schemes.sort()
510         return schemes
511
512     def garbage_collect(self, scheme, intent=1.0):
513         """Delete entries from old snapshots from the database.
514
515         Only snapshots with the specified scheme name will be deleted.  If
516         intent is given, it gives the intended next snapshot type, to determine
517         how aggressively to clean (for example, intent=7 could be used if the
518         next snapshot will be a weekly snapshot).
519         """
520
521         cur = self.cursor()
522
523         # Find the id of the last snapshot to be created.  This is used for
524         # measuring time in a way: we record this value in each segment we
525         # expire on this run, and then on a future run can tell if there have
526         # been intervening backups made.
527         cur.execute("select max(snapshotid) from snapshots")
528         last_snapshotid = cur.fetchone()[0]
529
530         # Get the list of old snapshots for this scheme.  Delete all the old
531         # ones.  Rules for what to keep:
532         #   - Always keep the most recent snapshot.
533         #   - If snapshot X is younger than Y, and X has higher intent, then Y
534         #     can be deleted.
535         cur.execute("""select snapshotid, name, intent,
536                               julianday('now') - timestamp as age
537                        from snapshots where scheme = ?
538                        order by age""", (scheme,))
539
540         first = True
541         max_intent = intent
542         for (id, name, snap_intent, snap_age) in cur.fetchall():
543             can_delete = False
544             if snap_intent < max_intent:
545                 # Delete small-intent snapshots if there is a more recent
546                 # large-intent snapshot.
547                 can_delete = True
548             elif snap_intent == intent:
549                 # Delete previous snapshots with the specified intent level.
550                 can_delete = True
551
552             if can_delete and not first:
553                 print "Delete snapshot %d (%s)" % (id, name)
554                 cur.execute("delete from snapshots where snapshotid = ?",
555                             (id,))
556             first = False
557             max_intent = max(max_intent, snap_intent)
558
559         # Delete entries in the segments_used table which are for non-existent
560         # snapshots.
561         cur.execute("""delete from segments_used
562                        where snapshotid not in
563                            (select snapshotid from snapshots)""")
564
565         # Find segments which contain no objects used by any current snapshots,
566         # and delete them from the segment table.
567         cur.execute("""delete from segments where segmentid not in
568                            (select segmentid from segments_used)""")
569
570         # Delete unused objects in the block_index table.  By "unused", we mean
571         # any object which was stored in a segment which has been deleted, and
572         # any object in a segment which was marked for cleaning and has had
573         # cleaning performed already (the expired time is less than the current
574         # largest snapshot id).
575         cur.execute("""delete from block_index
576                        where segmentid not in (select segmentid from segments)
577                           or segmentid in (select segmentid from segments
578                                            where expire_time < ?)""",
579                     (last_snapshotid,))
580
581         # Remove sub-block signatures for deleted objects.
582         cur.execute("""delete from subblock_signatures
583                        where blockid not in
584                            (select blockid from block_index)""")
585
586     # Segment cleaning.
587     class SegmentInfo(Struct): pass
588
589     def get_segment_cleaning_list(self, age_boost=0.0):
590         """Return a list of all current segments with information for cleaning.
591
592         Return all segments which are currently known in the local database
593         (there might be other, older segments in the archive itself), and
594         return usage statistics for each to help decide which segments to
595         clean.
596
597         The returned list will be sorted by estimated cleaning benefit, with
598         segments that are best to clean at the start of the list.
599
600         If specified, the age_boost parameter (measured in days) will added to
601         the age of each segment, as a way of adjusting the benefit computation
602         before a long-lived snapshot is taken (for example, age_boost might be
603         set to 7 when cleaning prior to taking a weekly snapshot).
604         """
605
606         cur = self.cursor()
607         segments = []
608         cur.execute("""select segmentid, used, size, mtime,
609                        julianday('now') - mtime as age from segment_info
610                        where expire_time is null""")
611         for row in cur:
612             info = self.SegmentInfo()
613             info.id = row[0]
614             info.used_bytes = row[1]
615             info.size_bytes = row[2]
616             info.mtime = row[3]
617             info.age_days = row[4]
618
619             # If data is not available for whatever reason, treat it as 0.0.
620             if info.age_days is None:
621                 info.age_days = 0.0
622             if info.used_bytes is None:
623                 info.used_bytes = 0.0
624
625             # Benefit calculation: u is the estimated fraction of each segment
626             # which is utilized (bytes belonging to objects still in use
627             # divided by total size; this doesn't take compression or storage
628             # overhead into account, but should give a reasonable estimate).
629             #
630             # The total benefit is a heuristic that combines several factors:
631             # the amount of space that can be reclaimed (1 - u), an ageing
632             # factor (info.age_days) that favors cleaning old segments to young
633             # ones and also is more likely to clean segments that will be
634             # rewritten for long-lived snapshots (age_boost), and finally a
635             # penalty factor for the cost of re-uploading data (u + 0.1).
636             u = info.used_bytes / info.size_bytes
637             info.cleaning_benefit \
638                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
639
640             segments.append(info)
641
642         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
643         return segments
644
645     def mark_segment_expired(self, segment):
646         """Mark a segment for cleaning in the local database.
647
648         The segment parameter should be either a SegmentInfo object or an
649         integer segment id.  Objects in the given segment will be marked as
650         expired, which means that any future snapshots that would re-use those
651         objects will instead write out a new copy of the object, and thus no
652         future snapshots will depend upon the given segment.
653         """
654
655         if isinstance(segment, int):
656             id = segment
657         elif isinstance(segment, self.SegmentInfo):
658             id = segment.id
659         else:
660             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
661
662         cur = self.cursor()
663         cur.execute("select max(snapshotid) from snapshots")
664         last_snapshotid = cur.fetchone()[0]
665         cur.execute("update segments set expire_time = ? where segmentid = ?",
666                     (last_snapshotid, id))
667         cur.execute("update block_index set expired = 0 where segmentid = ?",
668                     (id,))
669
670     def balance_expired_objects(self):
671         """Analyze expired objects in segments to be cleaned and group by age.
672
673         Update the block_index table of the local database to group expired
674         objects by age.  The exact number of buckets and the cutoffs for each
675         are dynamically determined.  Calling this function after marking
676         segments expired will help in the segment cleaning process, by ensuring
677         that when active objects from clean segments are rewritten, they will
678         be placed into new segments roughly grouped by age.
679         """
680
681         # The expired column of the block_index table is used when generating a
682         # new LBS snapshot.  A null value indicates that an object may be
683         # re-used.  Otherwise, an object must be written into a new segment if
684         # needed.  Objects with distinct expired values will be written into
685         # distinct segments, to allow for some grouping by age.  The value 0 is
686         # somewhat special in that it indicates any rewritten objects can be
687         # placed in the same segment as completely new objects; this can be
688         # used for very young objects which have been expired, or objects not
689         # expected to be encountered.
690         #
691         # In the balancing process, all objects which are not used in any
692         # current snapshots will have expired set to 0.  Objects which have
693         # been seen will be sorted by age and will have expired values set to
694         # 0, 1, 2, and so on based on age (with younger objects being assigned
695         # lower values).  The number of buckets and the age cutoffs is
696         # determined by looking at the distribution of block ages.
697
698         cur = self.cursor()
699
700         # Mark all expired objects with expired = 0; these objects will later
701         # have values set to indicate groupings of objects when repacking.
702         cur.execute("""update block_index set expired = 0
703                        where expired is not null""")
704
705         # We will want to aim for at least one full segment for each bucket
706         # that we eventually create, but don't know how many bytes that should
707         # be due to compression.  So compute the average number of bytes in
708         # each expired segment as a rough estimate for the minimum size of each
709         # bucket.  (This estimate could be thrown off by many not-fully-packed
710         # segments, but for now don't worry too much about that.)  If we can't
711         # compute an average, it's probably because there are no expired
712         # segments, so we have no more work to do.
713         cur.execute("""select avg(size) from segments
714                        where segmentid in
715                            (select distinct segmentid from block_index
716                             where expired is not null)""")
717         segment_size_estimate = cur.fetchone()[0]
718         if not segment_size_estimate:
719             return
720
721         # Next, extract distribution of expired objects (number and size) by
722         # age.  Save the timestamp for "now" so that the classification of
723         # blocks into age buckets will not change later in the function, after
724         # time has passed.  Set any timestamps in the future to now, so we are
725         # guaranteed that for the rest of this function, age is always
726         # non-negative.
727         cur.execute("select julianday('now')")
728         now = cur.fetchone()[0]
729
730         cur.execute("""update block_index set timestamp = ?
731                        where timestamp > ? and expired is not null""",
732                     (now, now))
733
734         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
735                        from block_index where expired = 0
736                        group by age order by age""", (now,))
737         distribution = cur.fetchall()
738
739         # Start to determine the buckets for expired objects.  Heuristics used:
740         #   - An upper bound on the number of buckets is given by the number of
741         #     segments we estimate it will take to store all data.  In fact,
742         #     aim for a couple of segments per bucket.
743         #   - Place very young objects in bucket 0 (place with new objects)
744         #     unless there are enough of them to warrant a separate bucket.
745         #   - Try not to create unnecessarily many buckets, since fewer buckets
746         #     will allow repacked data to be grouped based on spatial locality
747         #     (while more buckets will group by temporal locality).  We want a
748         #     balance.
749         MIN_AGE = 4
750         total_bytes = sum([i[2] for i in distribution])
751         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
752         min_size = 1.5 * segment_size_estimate
753         target_size = max(2 * segment_size_estimate,
754                           total_bytes / target_buckets)
755
756         print "segment_size:", segment_size_estimate
757         print "distribution:", distribution
758         print "total_bytes:", total_bytes
759         print "target_buckets:", target_buckets
760         print "min, target size:", min_size, target_size
761
762         # Chosen cutoffs.  Each bucket consists of objects with age greater
763         # than one cutoff value, but not greater than the next largest cutoff.
764         cutoffs = []
765
766         # Starting with the oldest objects, begin grouping together into
767         # buckets of size at least target_size bytes.
768         distribution.reverse()
769         bucket_size = 0
770         min_age_bucket = False
771         for (age, items, size) in distribution:
772             if bucket_size >= target_size \
773                 or (age < MIN_AGE and not min_age_bucket):
774                 if bucket_size < target_size and len(cutoffs) > 0:
775                     cutoffs.pop()
776                 cutoffs.append(age)
777                 bucket_size = 0
778
779             bucket_size += size
780             if age < MIN_AGE:
781                 min_age_bucket = True
782
783         # The last (youngest) bucket will be group 0, unless it has enough data
784         # to be of size min_size by itself, or there happen to be no objects
785         # less than MIN_AGE at all.
786         if bucket_size >= min_size or not min_age_bucket:
787             cutoffs.append(-1)
788         cutoffs.append(-1)
789
790         print "cutoffs:", cutoffs
791
792         # Update the database to assign each object to the appropriate bucket.
793         cutoffs.reverse()
794         for i in range(len(cutoffs)):
795             cur.execute("""update block_index set expired = ?
796                            where round(? - timestamp) > ?
797                              and expired is not null""",
798                         (i, now, cutoffs[i]))