51d3ee82f49ec0e072714d343f1e9b38cd552ac3
[cumulus.git] / python / cumulus / __init__.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 import cumulus.store, cumulus.store.file
16
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 8)         # LBS Snapshot v0.8
19
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
22
23 # All segments which have been accessed this session.
24 accessed_segments = set()
25
26 class Struct:
27     """A class which merely acts as a data container.
28
29     Instances of this class (or its subclasses) are merely used to store data
30     in various attributes.  No methods are provided.
31     """
32
33     def __repr__(self):
34         return "<%s %s>" % (self.__class__, self.__dict__)
35
36 CHECKSUM_ALGORITHMS = {
37     'sha1': sha.new
38 }
39
40 class ChecksumCreator:
41     """Compute an LBS checksum for provided data.
42
43     The algorithm used is selectable, but currently defaults to sha1.
44     """
45
46     def __init__(self, algorithm='sha1'):
47         self.algorithm = algorithm
48         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
49
50     def update(self, data):
51         self.hash.update(data)
52         return self
53
54     def compute(self):
55         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
56
57 class ChecksumVerifier:
58     """Verify whether a checksum from a snapshot matches the supplied data."""
59
60     def __init__(self, checksumstr):
61         """Create an object to check the supplied checksum."""
62
63         (algo, checksum) = checksumstr.split("=", 1)
64         self.checksum = checksum
65         self.hash = CHECKSUM_ALGORITHMS[algo]()
66
67     def update(self, data):
68         self.hash.update(data)
69
70     def valid(self):
71         """Return a boolean indicating whether the checksum matches."""
72
73         result = self.hash.hexdigest()
74         return result == self.checksum
75
76 class LowlevelDataStore:
77     """Access to the backup store containing segments and snapshot descriptors.
78
79     Instances of this class are used to get direct filesystem-level access to
80     the backup data.  To read a backup, a caller will ordinarily not care about
81     direct access to backup segments, but will instead merely need to access
82     objects from those segments.  The ObjectStore class provides a suitable
83     wrapper around a DataStore to give this high-level access.
84     """
85
86     def __init__(self, path):
87         if path.find(":") >= 0:
88             self.store = cumulus.store.open(path)
89         else:
90             self.store = cumulus.store.file.FileStore(path)
91
92     def _classify(self, filename):
93         for (t, r) in cumulus.store.type_patterns.items():
94             if r.match(filename):
95                 return (t, filename)
96         return (None, filename)
97
98     def lowlevel_open(self, filename):
99         """Return a file-like object for reading data from the given file."""
100
101         (type, filename) = self._classify(filename)
102         return self.store.get(type, filename)
103
104     def lowlevel_stat(self, filename):
105         """Return a dictionary of information about the given file.
106
107         Currently, the only defined field is 'size', giving the size of the
108         file in bytes.
109         """
110
111         (type, filename) = self._classify(filename)
112         return self.store.stat(type, filename)
113
114     # Slightly higher-level list methods.
115     def list_snapshots(self):
116         for f in self.store.list('snapshots'):
117             m = cumulus.store.type_patterns['snapshots'].match(f)
118             if m: yield m.group(1)
119
120     def list_segments(self):
121         for f in self.store.list('segments'):
122             m = cumulus.store.type_patterns['segments'].match(f)
123             if m: yield m.group(1)
124
125 class ObjectStore:
126     def __init__(self, data_store):
127         self.store = data_store
128         self.cachedir = None
129         self.CACHE_SIZE = 16
130         self.lru_list = []
131
132     def get_cachedir(self):
133         if self.cachedir is None:
134             self.cachedir = tempfile.mkdtemp(".lbs")
135         return self.cachedir
136
137     def cleanup(self):
138         if self.cachedir is not None:
139             # TODO: Avoid use of system, make this safer
140             os.system("rm -rf " + self.cachedir)
141         self.cachedir = None
142
143     @staticmethod
144     def parse_ref(refstr):
145         m = re.match(r"^zero\[(\d+)\]$", refstr)
146         if m:
147             return ("zero", None, None, (0, int(m.group(1))))
148
149         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
150         if not m: return
151
152         segment = m.group(1)
153         object = m.group(2)
154         checksum = m.group(3)
155         slice = m.group(4)
156
157         if checksum is not None:
158             checksum = checksum.lstrip("(").rstrip(")")
159
160         if slice is not None:
161             if m.group(9) is not None:
162                 # Size-assertion slice
163                 slice = (0, int(m.group(9)), True)
164             elif m.group(6) is None:
165                 # Abbreviated slice
166                 slice = (0, int(m.group(8)), False)
167             else:
168                 slice = (int(m.group(7)), int(m.group(8)), False)
169
170         return (segment, object, checksum, slice)
171
172     def get_segment(self, segment):
173         accessed_segments.add(segment)
174         raw = self.store.lowlevel_open(segment + ".tar.gpg")
175
176         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
177         def copy_thread(src, dst):
178             BLOCK_SIZE = 4096
179             while True:
180                 block = src.read(BLOCK_SIZE)
181                 if len(block) == 0: break
182                 dst.write(block)
183             dst.close()
184
185         thread.start_new_thread(copy_thread, (raw, input))
186         return output
187
188     def load_segment(self, segment):
189         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
190         for item in seg:
191             data_obj = seg.extractfile(item)
192             path = item.name.split('/')
193             if len(path) == 2 and path[0] == segment:
194                 yield (path[1], data_obj.read())
195
196     def load_snapshot(self, snapshot):
197         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
198         return file.read().splitlines(True)
199
200     def extract_segment(self, segment):
201         segdir = os.path.join(self.get_cachedir(), segment)
202         os.mkdir(segdir)
203         for (object, data) in self.load_segment(segment):
204             f = open(os.path.join(segdir, object), 'wb')
205             f.write(data)
206             f.close()
207
208     def load_object(self, segment, object):
209         accessed_segments.add(segment)
210         path = os.path.join(self.get_cachedir(), segment, object)
211         if not os.access(path, os.R_OK):
212             self.extract_segment(segment)
213         if segment in self.lru_list: self.lru_list.remove(segment)
214         self.lru_list.append(segment)
215         while len(self.lru_list) > self.CACHE_SIZE:
216             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
217             self.lru_list = self.lru_list[1:]
218         return open(path, 'rb').read()
219
220     def get(self, refstr):
221         """Fetch the given object and return it.
222
223         The input should be an object reference, in string form.
224         """
225
226         (segment, object, checksum, slice) = self.parse_ref(refstr)
227
228         if segment == "zero":
229             return "\0" * slice[1]
230
231         data = self.load_object(segment, object)
232
233         if checksum is not None:
234             verifier = ChecksumVerifier(checksum)
235             verifier.update(data)
236             if not verifier.valid():
237                 raise ValueError
238
239         if slice is not None:
240             (start, length, exact) = slice
241             if exact and len(data) != length: raise ValueError
242             data = data[start:start+length]
243             if len(data) != length: raise IndexError
244
245         return data
246
247 def parse(lines, terminate=None):
248     """Generic parser for RFC822-style "Key: Value" data streams.
249
250     This parser can be used to read metadata logs and snapshot root descriptor
251     files.
252
253     lines must be an iterable object which yields a sequence of lines of input.
254
255     If terminate is specified, it is used as a predicate to determine when to
256     stop reading input lines.
257     """
258
259     dict = {}
260     last_key = None
261
262     for l in lines:
263         # Strip off a trailing newline, if present
264         if len(l) > 0 and l[-1] == "\n":
265             l = l[:-1]
266
267         if terminate is not None and terminate(l):
268             if len(dict) > 0: yield dict
269             dict = {}
270             last_key = None
271             continue
272
273         m = re.match(r"^([-\w]+):\s*(.*)$", l)
274         if m:
275             dict[m.group(1)] = m.group(2)
276             last_key = m.group(1)
277         elif len(l) > 0 and l[0].isspace() and last_key is not None:
278             dict[last_key] += l
279         else:
280             last_key = None
281
282     if len(dict) > 0: yield dict
283
284 def parse_full(lines):
285     try:
286         return parse(lines).next()
287     except StopIteration:
288         return {}
289
290 def parse_metadata_version(s):
291     """Convert a string with the snapshot version format to a tuple."""
292
293     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
294     if m is None:
295         return ()
296     else:
297         return tuple([int(d) for d in m.group(1).split(".")])
298
299 def read_metadata(object_store, root):
300     """Iterate through all lines in the metadata log, following references."""
301
302     # Stack for keeping track of recursion when following references to
303     # portions of the log.  The last entry in the stack corresponds to the
304     # object currently being parsed.  Each entry is a list of lines which have
305     # been reversed, so that popping successive lines from the end of each list
306     # will return lines of the metadata log in order.
307     stack = []
308
309     def follow_ref(refstr):
310         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
311         lines = object_store.get(refstr).splitlines(True)
312         lines.reverse()
313         stack.append(lines)
314
315     follow_ref(root)
316
317     while len(stack) > 0:
318         top = stack[-1]
319         if len(top) == 0:
320             stack.pop()
321             continue
322         line = top.pop()
323
324         # An indirect reference which we must follow?
325         if len(line) > 0 and line[0] == '@':
326             ref = line[1:]
327             ref.strip()
328             follow_ref(ref)
329         else:
330             yield line
331
332 class MetadataItem:
333     """Metadata for a single file (or directory or...) from a snapshot."""
334
335     # Functions for parsing various datatypes that can appear in a metadata log
336     # item.
337     @staticmethod
338     def decode_int(s):
339         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
340         if s.startswith("0x"):
341             return int(s, 16)
342         elif s.startswith("0"):
343             return int(s, 8)
344         else:
345             return int(s, 10)
346
347     @staticmethod
348     def decode_str(s):
349         """Decode a URI-encoded (%xx escapes) string."""
350         def hex_decode(m): return chr(int(m.group(1), 16))
351         return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
352
353     @staticmethod
354     def raw_str(s):
355         """An unecoded string."""
356         return s
357
358     @staticmethod
359     def decode_user(s):
360         """Decode a user/group to a tuple of uid/gid followed by name."""
361         items = s.split()
362         uid = MetadataItem.decode_int(items[0])
363         name = None
364         if len(items) > 1:
365             if items[1].startswith("(") and items[1].endswith(")"):
366                 name = MetadataItem.decode_str(items[1][1:-1])
367         return (uid, name)
368
369     @staticmethod
370     def decode_device(s):
371         """Decode a device major/minor number."""
372         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
373         return (major, minor)
374
375     class Items: pass
376
377     def __init__(self, fields, object_store):
378         """Initialize from a dictionary of key/value pairs from metadata log."""
379
380         self.fields = fields
381         self.object_store = object_store
382         self.keys = []
383         self.items = self.Items()
384         for (k, v) in fields.items():
385             if k in self.field_types:
386                 decoder = self.field_types[k]
387                 setattr(self.items, k, decoder(v))
388                 self.keys.append(k)
389
390     def data(self):
391         """Return an iterator for the data blocks that make up a file."""
392
393         # This traverses the list of blocks that make up a file, following
394         # indirect references.  It is implemented in much the same way as
395         # read_metadata, so see that function for details of the technique.
396
397         objects = self.fields['data'].split()
398         objects.reverse()
399         stack = [objects]
400
401         def follow_ref(refstr):
402             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
403             objects = self.object_store.get(refstr).split()
404             objects.reverse()
405             stack.append(objects)
406
407         while len(stack) > 0:
408             top = stack[-1]
409             if len(top) == 0:
410                 stack.pop()
411                 continue
412             ref = top.pop()
413
414             # An indirect reference which we must follow?
415             if len(ref) > 0 and ref[0] == '@':
416                 follow_ref(ref[1:])
417             else:
418                 yield ref
419
420 # Description of fields that might appear, and how they should be parsed.
421 MetadataItem.field_types = {
422     'name': MetadataItem.decode_str,
423     'type': MetadataItem.raw_str,
424     'mode': MetadataItem.decode_int,
425     'device': MetadataItem.decode_device,
426     'user': MetadataItem.decode_user,
427     'group': MetadataItem.decode_user,
428     'ctime': MetadataItem.decode_int,
429     'mtime': MetadataItem.decode_int,
430     'links': MetadataItem.decode_int,
431     'inode': MetadataItem.raw_str,
432     'checksum': MetadataItem.decode_str,
433     'size': MetadataItem.decode_int,
434     'contents': MetadataItem.decode_str,
435     'target': MetadataItem.decode_str,
436 }
437
438 def iterate_metadata(object_store, root):
439     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
440         yield MetadataItem(d, object_store)
441
442 class LocalDatabase:
443     """Access to the local database of snapshot contents and object checksums.
444
445     The local database is consulted when creating a snapshot to determine what
446     data can be re-used from old snapshots.  Segment cleaning is performed by
447     manipulating the data in the local database; the local database also
448     includes enough data to guide the segment cleaning process.
449     """
450
451     def __init__(self, path, dbname="localdb.sqlite"):
452         self.db_connection = sqlite3.connect(path + "/" + dbname)
453
454     # Low-level database access.  Use these methods when there isn't a
455     # higher-level interface available.  Exception: do, however, remember to
456     # use the commit() method after making changes to make sure they are
457     # actually saved, even when going through higher-level interfaces.
458     def commit(self):
459         "Commit any pending changes to the local database."
460         self.db_connection.commit()
461
462     def rollback(self):
463         "Roll back any pending changes to the local database."
464         self.db_connection.rollback()
465
466     def cursor(self):
467         "Return a DB-API cursor for directly accessing the local database."
468         return self.db_connection.cursor()
469
470     def list_schemes(self):
471         """Return the list of snapshots found in the local database.
472
473         The returned value is a list of tuples (id, scheme, name, time, intent).
474         """
475
476         cur = self.cursor()
477         cur.execute("select distinct scheme from snapshots")
478         schemes = [row[0] for row in cur.fetchall()]
479         schemes.sort()
480         return schemes
481
482     def garbage_collect(self, scheme, intent=1.0):
483         """Delete entries from old snapshots from the database.
484
485         Only snapshots with the specified scheme name will be deleted.  If
486         intent is given, it gives the intended next snapshot type, to determine
487         how aggressively to clean (for example, intent=7 could be used if the
488         next snapshot will be a weekly snapshot).
489         """
490
491         cur = self.cursor()
492
493         # Find the id of the last snapshot to be created.  This is used for
494         # measuring time in a way: we record this value in each segment we
495         # expire on this run, and then on a future run can tell if there have
496         # been intervening backups made.
497         cur.execute("select max(snapshotid) from snapshots")
498         last_snapshotid = cur.fetchone()[0]
499
500         # Get the list of old snapshots for this scheme.  Delete all the old
501         # ones.  Rules for what to keep:
502         #   - Always keep the most recent snapshot.
503         #   - If snapshot X is younger than Y, and X has higher intent, then Y
504         #     can be deleted.
505         cur.execute("""select snapshotid, name, intent,
506                               julianday('now') - timestamp as age
507                        from snapshots where scheme = ?
508                        order by age""", (scheme,))
509
510         first = True
511         max_intent = intent
512         for (id, name, snap_intent, snap_age) in cur.fetchall():
513             can_delete = False
514             if snap_intent < max_intent:
515                 # Delete small-intent snapshots if there is a more recent
516                 # large-intent snapshot.
517                 can_delete = True
518             elif snap_intent == intent:
519                 # Delete previous snapshots with the specified intent level.
520                 can_delete = True
521
522             if can_delete and not first:
523                 print "Delete snapshot %d (%s)" % (id, name)
524                 cur.execute("delete from snapshots where snapshotid = ?",
525                             (id,))
526             first = False
527             max_intent = max(max_intent, snap_intent)
528
529         # Delete entries in the segments_used table which are for non-existent
530         # snapshots.
531         cur.execute("""delete from segments_used
532                        where snapshotid not in
533                            (select snapshotid from snapshots)""")
534
535         # Find segments which contain no objects used by any current snapshots,
536         # and delete them from the segment table.
537         cur.execute("""delete from segments where segmentid not in
538                            (select segmentid from segments_used)""")
539
540         # Delete unused objects in the block_index table.  By "unused", we mean
541         # any object which was stored in a segment which has been deleted, and
542         # any object in a segment which was marked for cleaning and has had
543         # cleaning performed already (the expired time is less than the current
544         # largest snapshot id).
545         cur.execute("""delete from block_index
546                        where segmentid not in (select segmentid from segments)
547                           or segmentid in (select segmentid from segments
548                                            where expire_time < ?)""",
549                     (last_snapshotid,))
550
551         # Remove sub-block signatures for deleted objects.
552         cur.execute("""delete from subblock_signatures
553                        where blockid not in
554                            (select blockid from block_index)""")
555
556     # Segment cleaning.
557     class SegmentInfo(Struct): pass
558
559     def get_segment_cleaning_list(self, age_boost=0.0):
560         """Return a list of all current segments with information for cleaning.
561
562         Return all segments which are currently known in the local database
563         (there might be other, older segments in the archive itself), and
564         return usage statistics for each to help decide which segments to
565         clean.
566
567         The returned list will be sorted by estimated cleaning benefit, with
568         segments that are best to clean at the start of the list.
569
570         If specified, the age_boost parameter (measured in days) will added to
571         the age of each segment, as a way of adjusting the benefit computation
572         before a long-lived snapshot is taken (for example, age_boost might be
573         set to 7 when cleaning prior to taking a weekly snapshot).
574         """
575
576         cur = self.cursor()
577         segments = []
578         cur.execute("""select segmentid, used, size, mtime,
579                        julianday('now') - mtime as age from segment_info
580                        where expire_time is null""")
581         for row in cur:
582             info = self.SegmentInfo()
583             info.id = row[0]
584             info.used_bytes = row[1]
585             info.size_bytes = row[2]
586             info.mtime = row[3]
587             info.age_days = row[4]
588
589             # If data is not available for whatever reason, treat it as 0.0.
590             if info.age_days is None:
591                 info.age_days = 0.0
592             if info.used_bytes is None:
593                 info.used_bytes = 0.0
594
595             # Benefit calculation: u is the estimated fraction of each segment
596             # which is utilized (bytes belonging to objects still in use
597             # divided by total size; this doesn't take compression or storage
598             # overhead into account, but should give a reasonable estimate).
599             #
600             # The total benefit is a heuristic that combines several factors:
601             # the amount of space that can be reclaimed (1 - u), an ageing
602             # factor (info.age_days) that favors cleaning old segments to young
603             # ones and also is more likely to clean segments that will be
604             # rewritten for long-lived snapshots (age_boost), and finally a
605             # penalty factor for the cost of re-uploading data (u + 0.1).
606             u = info.used_bytes / info.size_bytes
607             info.cleaning_benefit \
608                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
609
610             segments.append(info)
611
612         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
613         return segments
614
615     def mark_segment_expired(self, segment):
616         """Mark a segment for cleaning in the local database.
617
618         The segment parameter should be either a SegmentInfo object or an
619         integer segment id.  Objects in the given segment will be marked as
620         expired, which means that any future snapshots that would re-use those
621         objects will instead write out a new copy of the object, and thus no
622         future snapshots will depend upon the given segment.
623         """
624
625         if isinstance(segment, int):
626             id = segment
627         elif isinstance(segment, self.SegmentInfo):
628             id = segment.id
629         else:
630             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
631
632         cur = self.cursor()
633         cur.execute("select max(snapshotid) from snapshots")
634         last_snapshotid = cur.fetchone()[0]
635         cur.execute("update segments set expire_time = ? where segmentid = ?",
636                     (last_snapshotid, id))
637         cur.execute("update block_index set expired = 0 where segmentid = ?",
638                     (id,))
639
640     def balance_expired_objects(self):
641         """Analyze expired objects in segments to be cleaned and group by age.
642
643         Update the block_index table of the local database to group expired
644         objects by age.  The exact number of buckets and the cutoffs for each
645         are dynamically determined.  Calling this function after marking
646         segments expired will help in the segment cleaning process, by ensuring
647         that when active objects from clean segments are rewritten, they will
648         be placed into new segments roughly grouped by age.
649         """
650
651         # The expired column of the block_index table is used when generating a
652         # new LBS snapshot.  A null value indicates that an object may be
653         # re-used.  Otherwise, an object must be written into a new segment if
654         # needed.  Objects with distinct expired values will be written into
655         # distinct segments, to allow for some grouping by age.  The value 0 is
656         # somewhat special in that it indicates any rewritten objects can be
657         # placed in the same segment as completely new objects; this can be
658         # used for very young objects which have been expired, or objects not
659         # expected to be encountered.
660         #
661         # In the balancing process, all objects which are not used in any
662         # current snapshots will have expired set to 0.  Objects which have
663         # been seen will be sorted by age and will have expired values set to
664         # 0, 1, 2, and so on based on age (with younger objects being assigned
665         # lower values).  The number of buckets and the age cutoffs is
666         # determined by looking at the distribution of block ages.
667
668         cur = self.cursor()
669
670         # Mark all expired objects with expired = 0; these objects will later
671         # have values set to indicate groupings of objects when repacking.
672         cur.execute("""update block_index set expired = 0
673                        where expired is not null""")
674
675         # We will want to aim for at least one full segment for each bucket
676         # that we eventually create, but don't know how many bytes that should
677         # be due to compression.  So compute the average number of bytes in
678         # each expired segment as a rough estimate for the minimum size of each
679         # bucket.  (This estimate could be thrown off by many not-fully-packed
680         # segments, but for now don't worry too much about that.)  If we can't
681         # compute an average, it's probably because there are no expired
682         # segments, so we have no more work to do.
683         cur.execute("""select avg(size) from segments
684                        where segmentid in
685                            (select distinct segmentid from block_index
686                             where expired is not null)""")
687         segment_size_estimate = cur.fetchone()[0]
688         if not segment_size_estimate:
689             return
690
691         # Next, extract distribution of expired objects (number and size) by
692         # age.  Save the timestamp for "now" so that the classification of
693         # blocks into age buckets will not change later in the function, after
694         # time has passed.  Set any timestamps in the future to now, so we are
695         # guaranteed that for the rest of this function, age is always
696         # non-negative.
697         cur.execute("select julianday('now')")
698         now = cur.fetchone()[0]
699
700         cur.execute("""update block_index set timestamp = ?
701                        where timestamp > ? and expired is not null""",
702                     (now, now))
703
704         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
705                        from block_index where expired = 0
706                        group by age order by age""", (now,))
707         distribution = cur.fetchall()
708
709         # Start to determine the buckets for expired objects.  Heuristics used:
710         #   - An upper bound on the number of buckets is given by the number of
711         #     segments we estimate it will take to store all data.  In fact,
712         #     aim for a couple of segments per bucket.
713         #   - Place very young objects in bucket 0 (place with new objects)
714         #     unless there are enough of them to warrant a separate bucket.
715         #   - Try not to create unnecessarily many buckets, since fewer buckets
716         #     will allow repacked data to be grouped based on spatial locality
717         #     (while more buckets will group by temporal locality).  We want a
718         #     balance.
719         MIN_AGE = 4
720         total_bytes = sum([i[2] for i in distribution])
721         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
722         min_size = 1.5 * segment_size_estimate
723         target_size = max(2 * segment_size_estimate,
724                           total_bytes / target_buckets)
725
726         print "segment_size:", segment_size_estimate
727         print "distribution:", distribution
728         print "total_bytes:", total_bytes
729         print "target_buckets:", target_buckets
730         print "min, target size:", min_size, target_size
731
732         # Chosen cutoffs.  Each bucket consists of objects with age greater
733         # than one cutoff value, but not greater than the next largest cutoff.
734         cutoffs = []
735
736         # Starting with the oldest objects, begin grouping together into
737         # buckets of size at least target_size bytes.
738         distribution.reverse()
739         bucket_size = 0
740         min_age_bucket = False
741         for (age, items, size) in distribution:
742             if bucket_size >= target_size \
743                 or (age < MIN_AGE and not min_age_bucket):
744                 if bucket_size < target_size and len(cutoffs) > 0:
745                     cutoffs.pop()
746                 cutoffs.append(age)
747                 bucket_size = 0
748
749             bucket_size += size
750             if age < MIN_AGE:
751                 min_age_bucket = True
752
753         # The last (youngest) bucket will be group 0, unless it has enough data
754         # to be of size min_size by itself, or there happen to be no objects
755         # less than MIN_AGE at all.
756         if bucket_size >= min_size or not min_age_bucket:
757             cutoffs.append(-1)
758         cutoffs.append(-1)
759
760         print "cutoffs:", cutoffs
761
762         # Update the database to assign each object to the appropriate bucket.
763         cutoffs.reverse()
764         for i in range(len(cutoffs)):
765             cur.execute("""update block_index set expired = ?
766                            where round(? - timestamp) > ?
767                              and expired is not null""",
768                         (i, now, cutoffs[i]))