Introduce a script to provide access to remote repositories.
[cumulus.git] / python / cumulus / __init__.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 import cumulus.store, cumulus.store.file
16
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 8)         # LBS Snapshot v0.8
19
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
22
23 # All segments which have been accessed this session.
24 accessed_segments = set()
25
26 def uri_decode(s):
27     """Decode a URI-encoded (%xx escapes) string."""
28     def hex_decode(m): return chr(int(m.group(1), 16))
29     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
30 def uri_encode(s):
31     """Encode a string to URI-encoded (%xx escapes) form."""
32     def hex_encode(c):
33         if c > '+' and c < '\x7f' and c != '@':
34             return c
35         else:
36             return "%%%02x" % (ord(c),)
37     return ''.join(hex_encode(c) for c in s)
38
39 class Struct:
40     """A class which merely acts as a data container.
41
42     Instances of this class (or its subclasses) are merely used to store data
43     in various attributes.  No methods are provided.
44     """
45
46     def __repr__(self):
47         return "<%s %s>" % (self.__class__, self.__dict__)
48
49 CHECKSUM_ALGORITHMS = {
50     'sha1': sha.new
51 }
52
53 class ChecksumCreator:
54     """Compute an LBS checksum for provided data.
55
56     The algorithm used is selectable, but currently defaults to sha1.
57     """
58
59     def __init__(self, algorithm='sha1'):
60         self.algorithm = algorithm
61         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
62
63     def update(self, data):
64         self.hash.update(data)
65         return self
66
67     def compute(self):
68         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
69
70 class ChecksumVerifier:
71     """Verify whether a checksum from a snapshot matches the supplied data."""
72
73     def __init__(self, checksumstr):
74         """Create an object to check the supplied checksum."""
75
76         (algo, checksum) = checksumstr.split("=", 1)
77         self.checksum = checksum
78         self.hash = CHECKSUM_ALGORITHMS[algo]()
79
80     def update(self, data):
81         self.hash.update(data)
82
83     def valid(self):
84         """Return a boolean indicating whether the checksum matches."""
85
86         result = self.hash.hexdigest()
87         return result == self.checksum
88
89 class LowlevelDataStore:
90     """Access to the backup store containing segments and snapshot descriptors.
91
92     Instances of this class are used to get direct filesystem-level access to
93     the backup data.  To read a backup, a caller will ordinarily not care about
94     direct access to backup segments, but will instead merely need to access
95     objects from those segments.  The ObjectStore class provides a suitable
96     wrapper around a DataStore to give this high-level access.
97     """
98
99     def __init__(self, path):
100         if path.find(":") >= 0:
101             self.store = cumulus.store.open(path)
102         else:
103             self.store = cumulus.store.file.FileStore(path)
104
105     def _classify(self, filename):
106         for (t, r) in cumulus.store.type_patterns.items():
107             if r.match(filename):
108                 return (t, filename)
109         return (None, filename)
110
111     def lowlevel_open(self, filename):
112         """Return a file-like object for reading data from the given file."""
113
114         (type, filename) = self._classify(filename)
115         return self.store.get(type, filename)
116
117     def lowlevel_stat(self, filename):
118         """Return a dictionary of information about the given file.
119
120         Currently, the only defined field is 'size', giving the size of the
121         file in bytes.
122         """
123
124         (type, filename) = self._classify(filename)
125         return self.store.stat(type, filename)
126
127     # Slightly higher-level list methods.
128     def list_snapshots(self):
129         for f in self.store.list('snapshots'):
130             m = cumulus.store.type_patterns['snapshots'].match(f)
131             if m: yield m.group(1)
132
133     def list_segments(self):
134         for f in self.store.list('segments'):
135             m = cumulus.store.type_patterns['segments'].match(f)
136             if m: yield m.group(1)
137
138 class ObjectStore:
139     def __init__(self, data_store):
140         self.store = data_store
141         self.cachedir = None
142         self.CACHE_SIZE = 16
143         self.lru_list = []
144
145     def get_cachedir(self):
146         if self.cachedir is None:
147             self.cachedir = tempfile.mkdtemp(".lbs")
148         return self.cachedir
149
150     def cleanup(self):
151         if self.cachedir is not None:
152             # TODO: Avoid use of system, make this safer
153             os.system("rm -rf " + self.cachedir)
154         self.cachedir = None
155
156     @staticmethod
157     def parse_ref(refstr):
158         m = re.match(r"^zero\[(\d+)\]$", refstr)
159         if m:
160             return ("zero", None, None, (0, int(m.group(1))))
161
162         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
163         if not m: return
164
165         segment = m.group(1)
166         object = m.group(2)
167         checksum = m.group(3)
168         slice = m.group(4)
169
170         if checksum is not None:
171             checksum = checksum.lstrip("(").rstrip(")")
172
173         if slice is not None:
174             if m.group(9) is not None:
175                 # Size-assertion slice
176                 slice = (0, int(m.group(9)), True)
177             elif m.group(6) is None:
178                 # Abbreviated slice
179                 slice = (0, int(m.group(8)), False)
180             else:
181                 slice = (int(m.group(7)), int(m.group(8)), False)
182
183         return (segment, object, checksum, slice)
184
185     def get_segment(self, segment):
186         accessed_segments.add(segment)
187         raw = self.store.lowlevel_open(segment + ".tar.gpg")
188
189         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
190         def copy_thread(src, dst):
191             BLOCK_SIZE = 4096
192             while True:
193                 block = src.read(BLOCK_SIZE)
194                 if len(block) == 0: break
195                 dst.write(block)
196             dst.close()
197
198         thread.start_new_thread(copy_thread, (raw, input))
199         return output
200
201     def load_segment(self, segment):
202         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
203         for item in seg:
204             data_obj = seg.extractfile(item)
205             path = item.name.split('/')
206             if len(path) == 2 and path[0] == segment:
207                 yield (path[1], data_obj.read())
208
209     def load_snapshot(self, snapshot):
210         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
211         return file.read().splitlines(True)
212
213     def extract_segment(self, segment):
214         segdir = os.path.join(self.get_cachedir(), segment)
215         os.mkdir(segdir)
216         for (object, data) in self.load_segment(segment):
217             f = open(os.path.join(segdir, object), 'wb')
218             f.write(data)
219             f.close()
220
221     def load_object(self, segment, object):
222         accessed_segments.add(segment)
223         path = os.path.join(self.get_cachedir(), segment, object)
224         if not os.access(path, os.R_OK):
225             self.extract_segment(segment)
226         if segment in self.lru_list: self.lru_list.remove(segment)
227         self.lru_list.append(segment)
228         while len(self.lru_list) > self.CACHE_SIZE:
229             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
230             self.lru_list = self.lru_list[1:]
231         return open(path, 'rb').read()
232
233     def get(self, refstr):
234         """Fetch the given object and return it.
235
236         The input should be an object reference, in string form.
237         """
238
239         (segment, object, checksum, slice) = self.parse_ref(refstr)
240
241         if segment == "zero":
242             return "\0" * slice[1]
243
244         data = self.load_object(segment, object)
245
246         if checksum is not None:
247             verifier = ChecksumVerifier(checksum)
248             verifier.update(data)
249             if not verifier.valid():
250                 raise ValueError
251
252         if slice is not None:
253             (start, length, exact) = slice
254             if exact and len(data) != length: raise ValueError
255             data = data[start:start+length]
256             if len(data) != length: raise IndexError
257
258         return data
259
260 def parse(lines, terminate=None):
261     """Generic parser for RFC822-style "Key: Value" data streams.
262
263     This parser can be used to read metadata logs and snapshot root descriptor
264     files.
265
266     lines must be an iterable object which yields a sequence of lines of input.
267
268     If terminate is specified, it is used as a predicate to determine when to
269     stop reading input lines.
270     """
271
272     dict = {}
273     last_key = None
274
275     for l in lines:
276         # Strip off a trailing newline, if present
277         if len(l) > 0 and l[-1] == "\n":
278             l = l[:-1]
279
280         if terminate is not None and terminate(l):
281             if len(dict) > 0: yield dict
282             dict = {}
283             last_key = None
284             continue
285
286         m = re.match(r"^([-\w]+):\s*(.*)$", l)
287         if m:
288             dict[m.group(1)] = m.group(2)
289             last_key = m.group(1)
290         elif len(l) > 0 and l[0].isspace() and last_key is not None:
291             dict[last_key] += l
292         else:
293             last_key = None
294
295     if len(dict) > 0: yield dict
296
297 def parse_full(lines):
298     try:
299         return parse(lines).next()
300     except StopIteration:
301         return {}
302
303 def parse_metadata_version(s):
304     """Convert a string with the snapshot version format to a tuple."""
305
306     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
307     if m is None:
308         return ()
309     else:
310         return tuple([int(d) for d in m.group(1).split(".")])
311
312 def read_metadata(object_store, root):
313     """Iterate through all lines in the metadata log, following references."""
314
315     # Stack for keeping track of recursion when following references to
316     # portions of the log.  The last entry in the stack corresponds to the
317     # object currently being parsed.  Each entry is a list of lines which have
318     # been reversed, so that popping successive lines from the end of each list
319     # will return lines of the metadata log in order.
320     stack = []
321
322     def follow_ref(refstr):
323         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
324         lines = object_store.get(refstr).splitlines(True)
325         lines.reverse()
326         stack.append(lines)
327
328     follow_ref(root)
329
330     while len(stack) > 0:
331         top = stack[-1]
332         if len(top) == 0:
333             stack.pop()
334             continue
335         line = top.pop()
336
337         # An indirect reference which we must follow?
338         if len(line) > 0 and line[0] == '@':
339             ref = line[1:]
340             ref.strip()
341             follow_ref(ref)
342         else:
343             yield line
344
345 class MetadataItem:
346     """Metadata for a single file (or directory or...) from a snapshot."""
347
348     # Functions for parsing various datatypes that can appear in a metadata log
349     # item.
350     @staticmethod
351     def decode_int(s):
352         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
353         if s.startswith("0x"):
354             return int(s, 16)
355         elif s.startswith("0"):
356             return int(s, 8)
357         else:
358             return int(s, 10)
359
360     @staticmethod
361     def decode_str(s):
362         """Decode a URI-encoded (%xx escapes) string."""
363         return uri_decode(s)
364
365     @staticmethod
366     def raw_str(s):
367         """An unecoded string."""
368         return s
369
370     @staticmethod
371     def decode_user(s):
372         """Decode a user/group to a tuple of uid/gid followed by name."""
373         items = s.split()
374         uid = MetadataItem.decode_int(items[0])
375         name = None
376         if len(items) > 1:
377             if items[1].startswith("(") and items[1].endswith(")"):
378                 name = MetadataItem.decode_str(items[1][1:-1])
379         return (uid, name)
380
381     @staticmethod
382     def decode_device(s):
383         """Decode a device major/minor number."""
384         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
385         return (major, minor)
386
387     class Items: pass
388
389     def __init__(self, fields, object_store):
390         """Initialize from a dictionary of key/value pairs from metadata log."""
391
392         self.fields = fields
393         self.object_store = object_store
394         self.keys = []
395         self.items = self.Items()
396         for (k, v) in fields.items():
397             if k in self.field_types:
398                 decoder = self.field_types[k]
399                 setattr(self.items, k, decoder(v))
400                 self.keys.append(k)
401
402     def data(self):
403         """Return an iterator for the data blocks that make up a file."""
404
405         # This traverses the list of blocks that make up a file, following
406         # indirect references.  It is implemented in much the same way as
407         # read_metadata, so see that function for details of the technique.
408
409         objects = self.fields['data'].split()
410         objects.reverse()
411         stack = [objects]
412
413         def follow_ref(refstr):
414             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
415             objects = self.object_store.get(refstr).split()
416             objects.reverse()
417             stack.append(objects)
418
419         while len(stack) > 0:
420             top = stack[-1]
421             if len(top) == 0:
422                 stack.pop()
423                 continue
424             ref = top.pop()
425
426             # An indirect reference which we must follow?
427             if len(ref) > 0 and ref[0] == '@':
428                 follow_ref(ref[1:])
429             else:
430                 yield ref
431
432 # Description of fields that might appear, and how they should be parsed.
433 MetadataItem.field_types = {
434     'name': MetadataItem.decode_str,
435     'type': MetadataItem.raw_str,
436     'mode': MetadataItem.decode_int,
437     'device': MetadataItem.decode_device,
438     'user': MetadataItem.decode_user,
439     'group': MetadataItem.decode_user,
440     'ctime': MetadataItem.decode_int,
441     'mtime': MetadataItem.decode_int,
442     'links': MetadataItem.decode_int,
443     'inode': MetadataItem.raw_str,
444     'checksum': MetadataItem.decode_str,
445     'size': MetadataItem.decode_int,
446     'contents': MetadataItem.decode_str,
447     'target': MetadataItem.decode_str,
448 }
449
450 def iterate_metadata(object_store, root):
451     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
452         yield MetadataItem(d, object_store)
453
454 class LocalDatabase:
455     """Access to the local database of snapshot contents and object checksums.
456
457     The local database is consulted when creating a snapshot to determine what
458     data can be re-used from old snapshots.  Segment cleaning is performed by
459     manipulating the data in the local database; the local database also
460     includes enough data to guide the segment cleaning process.
461     """
462
463     def __init__(self, path, dbname="localdb.sqlite"):
464         self.db_connection = sqlite3.connect(path + "/" + dbname)
465
466     # Low-level database access.  Use these methods when there isn't a
467     # higher-level interface available.  Exception: do, however, remember to
468     # use the commit() method after making changes to make sure they are
469     # actually saved, even when going through higher-level interfaces.
470     def commit(self):
471         "Commit any pending changes to the local database."
472         self.db_connection.commit()
473
474     def rollback(self):
475         "Roll back any pending changes to the local database."
476         self.db_connection.rollback()
477
478     def cursor(self):
479         "Return a DB-API cursor for directly accessing the local database."
480         return self.db_connection.cursor()
481
482     def list_schemes(self):
483         """Return the list of snapshots found in the local database.
484
485         The returned value is a list of tuples (id, scheme, name, time, intent).
486         """
487
488         cur = self.cursor()
489         cur.execute("select distinct scheme from snapshots")
490         schemes = [row[0] for row in cur.fetchall()]
491         schemes.sort()
492         return schemes
493
494     def garbage_collect(self, scheme, intent=1.0):
495         """Delete entries from old snapshots from the database.
496
497         Only snapshots with the specified scheme name will be deleted.  If
498         intent is given, it gives the intended next snapshot type, to determine
499         how aggressively to clean (for example, intent=7 could be used if the
500         next snapshot will be a weekly snapshot).
501         """
502
503         cur = self.cursor()
504
505         # Find the id of the last snapshot to be created.  This is used for
506         # measuring time in a way: we record this value in each segment we
507         # expire on this run, and then on a future run can tell if there have
508         # been intervening backups made.
509         cur.execute("select max(snapshotid) from snapshots")
510         last_snapshotid = cur.fetchone()[0]
511
512         # Get the list of old snapshots for this scheme.  Delete all the old
513         # ones.  Rules for what to keep:
514         #   - Always keep the most recent snapshot.
515         #   - If snapshot X is younger than Y, and X has higher intent, then Y
516         #     can be deleted.
517         cur.execute("""select snapshotid, name, intent,
518                               julianday('now') - timestamp as age
519                        from snapshots where scheme = ?
520                        order by age""", (scheme,))
521
522         first = True
523         max_intent = intent
524         for (id, name, snap_intent, snap_age) in cur.fetchall():
525             can_delete = False
526             if snap_intent < max_intent:
527                 # Delete small-intent snapshots if there is a more recent
528                 # large-intent snapshot.
529                 can_delete = True
530             elif snap_intent == intent:
531                 # Delete previous snapshots with the specified intent level.
532                 can_delete = True
533
534             if can_delete and not first:
535                 print "Delete snapshot %d (%s)" % (id, name)
536                 cur.execute("delete from snapshots where snapshotid = ?",
537                             (id,))
538             first = False
539             max_intent = max(max_intent, snap_intent)
540
541         # Delete entries in the segments_used table which are for non-existent
542         # snapshots.
543         cur.execute("""delete from segments_used
544                        where snapshotid not in
545                            (select snapshotid from snapshots)""")
546
547         # Find segments which contain no objects used by any current snapshots,
548         # and delete them from the segment table.
549         cur.execute("""delete from segments where segmentid not in
550                            (select segmentid from segments_used)""")
551
552         # Delete unused objects in the block_index table.  By "unused", we mean
553         # any object which was stored in a segment which has been deleted, and
554         # any object in a segment which was marked for cleaning and has had
555         # cleaning performed already (the expired time is less than the current
556         # largest snapshot id).
557         cur.execute("""delete from block_index
558                        where segmentid not in (select segmentid from segments)
559                           or segmentid in (select segmentid from segments
560                                            where expire_time < ?)""",
561                     (last_snapshotid,))
562
563         # Remove sub-block signatures for deleted objects.
564         cur.execute("""delete from subblock_signatures
565                        where blockid not in
566                            (select blockid from block_index)""")
567
568     # Segment cleaning.
569     class SegmentInfo(Struct): pass
570
571     def get_segment_cleaning_list(self, age_boost=0.0):
572         """Return a list of all current segments with information for cleaning.
573
574         Return all segments which are currently known in the local database
575         (there might be other, older segments in the archive itself), and
576         return usage statistics for each to help decide which segments to
577         clean.
578
579         The returned list will be sorted by estimated cleaning benefit, with
580         segments that are best to clean at the start of the list.
581
582         If specified, the age_boost parameter (measured in days) will added to
583         the age of each segment, as a way of adjusting the benefit computation
584         before a long-lived snapshot is taken (for example, age_boost might be
585         set to 7 when cleaning prior to taking a weekly snapshot).
586         """
587
588         cur = self.cursor()
589         segments = []
590         cur.execute("""select segmentid, used, size, mtime,
591                        julianday('now') - mtime as age from segment_info
592                        where expire_time is null""")
593         for row in cur:
594             info = self.SegmentInfo()
595             info.id = row[0]
596             info.used_bytes = row[1]
597             info.size_bytes = row[2]
598             info.mtime = row[3]
599             info.age_days = row[4]
600
601             # If data is not available for whatever reason, treat it as 0.0.
602             if info.age_days is None:
603                 info.age_days = 0.0
604             if info.used_bytes is None:
605                 info.used_bytes = 0.0
606
607             # Benefit calculation: u is the estimated fraction of each segment
608             # which is utilized (bytes belonging to objects still in use
609             # divided by total size; this doesn't take compression or storage
610             # overhead into account, but should give a reasonable estimate).
611             #
612             # The total benefit is a heuristic that combines several factors:
613             # the amount of space that can be reclaimed (1 - u), an ageing
614             # factor (info.age_days) that favors cleaning old segments to young
615             # ones and also is more likely to clean segments that will be
616             # rewritten for long-lived snapshots (age_boost), and finally a
617             # penalty factor for the cost of re-uploading data (u + 0.1).
618             u = info.used_bytes / info.size_bytes
619             info.cleaning_benefit \
620                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
621
622             segments.append(info)
623
624         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
625         return segments
626
627     def mark_segment_expired(self, segment):
628         """Mark a segment for cleaning in the local database.
629
630         The segment parameter should be either a SegmentInfo object or an
631         integer segment id.  Objects in the given segment will be marked as
632         expired, which means that any future snapshots that would re-use those
633         objects will instead write out a new copy of the object, and thus no
634         future snapshots will depend upon the given segment.
635         """
636
637         if isinstance(segment, int):
638             id = segment
639         elif isinstance(segment, self.SegmentInfo):
640             id = segment.id
641         else:
642             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
643
644         cur = self.cursor()
645         cur.execute("select max(snapshotid) from snapshots")
646         last_snapshotid = cur.fetchone()[0]
647         cur.execute("update segments set expire_time = ? where segmentid = ?",
648                     (last_snapshotid, id))
649         cur.execute("update block_index set expired = 0 where segmentid = ?",
650                     (id,))
651
652     def balance_expired_objects(self):
653         """Analyze expired objects in segments to be cleaned and group by age.
654
655         Update the block_index table of the local database to group expired
656         objects by age.  The exact number of buckets and the cutoffs for each
657         are dynamically determined.  Calling this function after marking
658         segments expired will help in the segment cleaning process, by ensuring
659         that when active objects from clean segments are rewritten, they will
660         be placed into new segments roughly grouped by age.
661         """
662
663         # The expired column of the block_index table is used when generating a
664         # new LBS snapshot.  A null value indicates that an object may be
665         # re-used.  Otherwise, an object must be written into a new segment if
666         # needed.  Objects with distinct expired values will be written into
667         # distinct segments, to allow for some grouping by age.  The value 0 is
668         # somewhat special in that it indicates any rewritten objects can be
669         # placed in the same segment as completely new objects; this can be
670         # used for very young objects which have been expired, or objects not
671         # expected to be encountered.
672         #
673         # In the balancing process, all objects which are not used in any
674         # current snapshots will have expired set to 0.  Objects which have
675         # been seen will be sorted by age and will have expired values set to
676         # 0, 1, 2, and so on based on age (with younger objects being assigned
677         # lower values).  The number of buckets and the age cutoffs is
678         # determined by looking at the distribution of block ages.
679
680         cur = self.cursor()
681
682         # Mark all expired objects with expired = 0; these objects will later
683         # have values set to indicate groupings of objects when repacking.
684         cur.execute("""update block_index set expired = 0
685                        where expired is not null""")
686
687         # We will want to aim for at least one full segment for each bucket
688         # that we eventually create, but don't know how many bytes that should
689         # be due to compression.  So compute the average number of bytes in
690         # each expired segment as a rough estimate for the minimum size of each
691         # bucket.  (This estimate could be thrown off by many not-fully-packed
692         # segments, but for now don't worry too much about that.)  If we can't
693         # compute an average, it's probably because there are no expired
694         # segments, so we have no more work to do.
695         cur.execute("""select avg(size) from segments
696                        where segmentid in
697                            (select distinct segmentid from block_index
698                             where expired is not null)""")
699         segment_size_estimate = cur.fetchone()[0]
700         if not segment_size_estimate:
701             return
702
703         # Next, extract distribution of expired objects (number and size) by
704         # age.  Save the timestamp for "now" so that the classification of
705         # blocks into age buckets will not change later in the function, after
706         # time has passed.  Set any timestamps in the future to now, so we are
707         # guaranteed that for the rest of this function, age is always
708         # non-negative.
709         cur.execute("select julianday('now')")
710         now = cur.fetchone()[0]
711
712         cur.execute("""update block_index set timestamp = ?
713                        where timestamp > ? and expired is not null""",
714                     (now, now))
715
716         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
717                        from block_index where expired = 0
718                        group by age order by age""", (now,))
719         distribution = cur.fetchall()
720
721         # Start to determine the buckets for expired objects.  Heuristics used:
722         #   - An upper bound on the number of buckets is given by the number of
723         #     segments we estimate it will take to store all data.  In fact,
724         #     aim for a couple of segments per bucket.
725         #   - Place very young objects in bucket 0 (place with new objects)
726         #     unless there are enough of them to warrant a separate bucket.
727         #   - Try not to create unnecessarily many buckets, since fewer buckets
728         #     will allow repacked data to be grouped based on spatial locality
729         #     (while more buckets will group by temporal locality).  We want a
730         #     balance.
731         MIN_AGE = 4
732         total_bytes = sum([i[2] for i in distribution])
733         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
734         min_size = 1.5 * segment_size_estimate
735         target_size = max(2 * segment_size_estimate,
736                           total_bytes / target_buckets)
737
738         print "segment_size:", segment_size_estimate
739         print "distribution:", distribution
740         print "total_bytes:", total_bytes
741         print "target_buckets:", target_buckets
742         print "min, target size:", min_size, target_size
743
744         # Chosen cutoffs.  Each bucket consists of objects with age greater
745         # than one cutoff value, but not greater than the next largest cutoff.
746         cutoffs = []
747
748         # Starting with the oldest objects, begin grouping together into
749         # buckets of size at least target_size bytes.
750         distribution.reverse()
751         bucket_size = 0
752         min_age_bucket = False
753         for (age, items, size) in distribution:
754             if bucket_size >= target_size \
755                 or (age < MIN_AGE and not min_age_bucket):
756                 if bucket_size < target_size and len(cutoffs) > 0:
757                     cutoffs.pop()
758                 cutoffs.append(age)
759                 bucket_size = 0
760
761             bucket_size += size
762             if age < MIN_AGE:
763                 min_age_bucket = True
764
765         # The last (youngest) bucket will be group 0, unless it has enough data
766         # to be of size min_size by itself, or there happen to be no objects
767         # less than MIN_AGE at all.
768         if bucket_size >= min_size or not min_age_bucket:
769             cutoffs.append(-1)
770         cutoffs.append(-1)
771
772         print "cutoffs:", cutoffs
773
774         # Update the database to assign each object to the appropriate bucket.
775         cutoffs.reverse()
776         for i in range(len(cutoffs)):
777             cur.execute("""update block_index set expired = ?
778                            where round(? - timestamp) > ?
779                              and expired is not null""",
780                         (i, now, cutoffs[i]))