Extend object reference syntax with size assertions.
[cumulus.git] / lbs.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 # The largest supported snapshot format that can be understood.
16 FORMAT_VERSION = (0, 8)         # LBS Snapshot v0.8
17
18 # Maximum number of nested indirect references allowed in a snapshot.
19 MAX_RECURSION_DEPTH = 3
20
21 # All segments which have been accessed this session.
22 accessed_segments = set()
23
24 class Struct:
25     """A class which merely acts as a data container.
26
27     Instances of this class (or its subclasses) are merely used to store data
28     in various attributes.  No methods are provided.
29     """
30
31     def __repr__(self):
32         return "<%s %s>" % (self.__class__, self.__dict__)
33
34 CHECKSUM_ALGORITHMS = {
35     'sha1': sha.new
36 }
37
38 class ChecksumCreator:
39     """Compute an LBS checksum for provided data.
40
41     The algorithm used is selectable, but currently defaults to sha1.
42     """
43
44     def __init__(self, algorithm='sha1'):
45         self.algorithm = algorithm
46         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
47
48     def update(self, data):
49         self.hash.update(data)
50         return self
51
52     def compute(self):
53         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
54
55 class ChecksumVerifier:
56     """Verify whether a checksum from a snapshot matches the supplied data."""
57
58     def __init__(self, checksumstr):
59         """Create an object to check the supplied checksum."""
60
61         (algo, checksum) = checksumstr.split("=", 1)
62         self.checksum = checksum
63         self.hash = CHECKSUM_ALGORITHMS[algo]()
64
65     def update(self, data):
66         self.hash.update(data)
67
68     def valid(self):
69         """Return a boolean indicating whether the checksum matches."""
70
71         result = self.hash.hexdigest()
72         return result == self.checksum
73
74 class LowlevelDataStore:
75     """Access to the backup store containing segments and snapshot descriptors.
76
77     Instances of this class are used to get direct filesystem-level access to
78     the backup data.  To read a backup, a caller will ordinarily not care about
79     direct access to backup segments, but will instead merely need to access
80     objects from those segments.  The ObjectStore class provides a suitable
81     wrapper around a DataStore to give this high-level access.
82     """
83
84     def __init__(self, path):
85         self.path = path
86
87     # Low-level filesystem access.  These methods could be overwritten to
88     # provide access to remote data stores.
89     def lowlevel_list(self):
90         """Get a listing of files stored."""
91
92         return os.listdir(self.path)
93
94     def lowlevel_open(self, filename):
95         """Return a file-like object for reading data from the given file."""
96
97         return open(os.path.join(self.path, filename), 'rb')
98
99     def lowlevel_stat(self, filename):
100         """Return a dictionary of information about the given file.
101
102         Currently, the only defined field is 'size', giving the size of the
103         file in bytes.
104         """
105
106         stat = os.stat(os.path.join(self.path, filename))
107         return {'size': stat.st_size}
108
109     # Slightly higher-level list methods.
110     def list_snapshots(self):
111         for f in self.lowlevel_list():
112             m = re.match(r"^snapshot-(.*)\.lbs$", f)
113             if m:
114                 yield m.group(1)
115
116     def list_segments(self):
117         for f in self.lowlevel_list():
118             m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
119             if m:
120                 yield m.group(1)
121
122 class ObjectStore:
123     def __init__(self, data_store):
124         self.store = data_store
125         self.cachedir = None
126         self.CACHE_SIZE = 16
127         self.lru_list = []
128
129     def get_cachedir(self):
130         if self.cachedir is None:
131             self.cachedir = tempfile.mkdtemp(".lbs")
132         return self.cachedir
133
134     def cleanup(self):
135         if self.cachedir is not None:
136             # TODO: Avoid use of system, make this safer
137             os.system("rm -rf " + self.cachedir)
138         self.cachedir = None
139
140     @staticmethod
141     def parse_ref(refstr):
142         m = re.match(r"^zero\[(\d+)\]$", refstr)
143         if m:
144             return ("zero", None, None, (0, int(m.group(1))))
145
146         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
147         if not m: return
148
149         segment = m.group(1)
150         object = m.group(2)
151         checksum = m.group(3)
152         slice = m.group(4)
153
154         if checksum is not None:
155             checksum = checksum.lstrip("(").rstrip(")")
156
157         if slice is not None:
158             if m.group(9) is not None:
159                 # Size-assertion slice
160                 slice = (0, int(m.group(9)), True)
161             elif m.group(6) is None:
162                 # Abbreviated slice
163                 slice = (0, int(m.group(8)), False)
164             else:
165                 slice = (int(m.group(7)), int(m.group(8)), False)
166
167         return (segment, object, checksum, slice)
168
169     def get_segment(self, segment):
170         accessed_segments.add(segment)
171         raw = self.store.lowlevel_open(segment + ".tar.gpg")
172
173         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
174         def copy_thread(src, dst):
175             BLOCK_SIZE = 4096
176             while True:
177                 block = src.read(BLOCK_SIZE)
178                 if len(block) == 0: break
179                 dst.write(block)
180             dst.close()
181
182         thread.start_new_thread(copy_thread, (raw, input))
183         return output
184
185     def load_segment(self, segment):
186         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
187         for item in seg:
188             data_obj = seg.extractfile(item)
189             path = item.name.split('/')
190             if len(path) == 2 and path[0] == segment:
191                 yield (path[1], data_obj.read())
192
193     def load_snapshot(self, snapshot):
194         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
195         return file.read().splitlines(True)
196
197     def extract_segment(self, segment):
198         segdir = os.path.join(self.get_cachedir(), segment)
199         os.mkdir(segdir)
200         for (object, data) in self.load_segment(segment):
201             f = open(os.path.join(segdir, object), 'wb')
202             f.write(data)
203             f.close()
204
205     def load_object(self, segment, object):
206         accessed_segments.add(segment)
207         path = os.path.join(self.get_cachedir(), segment, object)
208         if not os.access(path, os.R_OK):
209             self.extract_segment(segment)
210         if segment in self.lru_list: self.lru_list.remove(segment)
211         self.lru_list.append(segment)
212         while len(self.lru_list) > self.CACHE_SIZE:
213             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
214             self.lru_list = self.lru_list[1:]
215         return open(path, 'rb').read()
216
217     def get(self, refstr):
218         """Fetch the given object and return it.
219
220         The input should be an object reference, in string form.
221         """
222
223         (segment, object, checksum, slice) = self.parse_ref(refstr)
224
225         if segment == "zero":
226             return "\0" * slice[1]
227
228         data = self.load_object(segment, object)
229
230         if checksum is not None:
231             verifier = ChecksumVerifier(checksum)
232             verifier.update(data)
233             if not verifier.valid():
234                 raise ValueError
235
236         if slice is not None:
237             (start, length, exact) = slice
238             if exact and len(data) != length: raise ValueError
239             data = data[start:start+length]
240             if len(data) != length: raise IndexError
241
242         return data
243
244 def parse(lines, terminate=None):
245     """Generic parser for RFC822-style "Key: Value" data streams.
246
247     This parser can be used to read metadata logs and snapshot root descriptor
248     files.
249
250     lines must be an iterable object which yields a sequence of lines of input.
251
252     If terminate is specified, it is used as a predicate to determine when to
253     stop reading input lines.
254     """
255
256     dict = {}
257     last_key = None
258
259     for l in lines:
260         # Strip off a trailing newline, if present
261         if len(l) > 0 and l[-1] == "\n":
262             l = l[:-1]
263
264         if terminate is not None and terminate(l):
265             if len(dict) > 0: yield dict
266             dict = {}
267             last_key = None
268             continue
269
270         m = re.match(r"^([-\w]+):\s*(.*)$", l)
271         if m:
272             dict[m.group(1)] = m.group(2)
273             last_key = m.group(1)
274         elif len(l) > 0 and l[0].isspace() and last_key is not None:
275             dict[last_key] += l
276         else:
277             last_key = None
278
279     if len(dict) > 0: yield dict
280
281 def parse_full(lines):
282     try:
283         return parse(lines).next()
284     except StopIteration:
285         return {}
286
287 def parse_metadata_version(s):
288     """Convert a string with the snapshot version format to a tuple."""
289
290     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
291     if m is None:
292         return ()
293     else:
294         return tuple([int(d) for d in m.group(1).split(".")])
295
296 def read_metadata(object_store, root):
297     """Iterate through all lines in the metadata log, following references."""
298
299     # Stack for keeping track of recursion when following references to
300     # portions of the log.  The last entry in the stack corresponds to the
301     # object currently being parsed.  Each entry is a list of lines which have
302     # been reversed, so that popping successive lines from the end of each list
303     # will return lines of the metadata log in order.
304     stack = []
305
306     def follow_ref(refstr):
307         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
308         lines = object_store.get(refstr).splitlines(True)
309         lines.reverse()
310         stack.append(lines)
311
312     follow_ref(root)
313
314     while len(stack) > 0:
315         top = stack[-1]
316         if len(top) == 0:
317             stack.pop()
318             continue
319         line = top.pop()
320
321         # An indirect reference which we must follow?
322         if len(line) > 0 and line[0] == '@':
323             ref = line[1:]
324             ref.strip()
325             follow_ref(ref)
326         else:
327             yield line
328
329 class MetadataItem:
330     """Metadata for a single file (or directory or...) from a snapshot."""
331
332     # Functions for parsing various datatypes that can appear in a metadata log
333     # item.
334     @staticmethod
335     def decode_int(s):
336         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
337         if s.startswith("0x"):
338             return int(s, 16)
339         elif s.startswith("0"):
340             return int(s, 8)
341         else:
342             return int(s, 10)
343
344     @staticmethod
345     def decode_str(s):
346         """Decode a URI-encoded (%xx escapes) string."""
347         def hex_decode(m): return chr(int(m.group(1), 16))
348         return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
349
350     @staticmethod
351     def raw_str(s):
352         """An unecoded string."""
353         return s
354
355     @staticmethod
356     def decode_user(s):
357         """Decode a user/group to a tuple of uid/gid followed by name."""
358         items = s.split()
359         uid = MetadataItem.decode_int(items[0])
360         name = None
361         if len(items) > 1:
362             if items[1].startswith("(") and items[1].endswith(")"):
363                 name = MetadataItem.decode_str(items[1][1:-1])
364         return (uid, name)
365
366     @staticmethod
367     def decode_device(s):
368         """Decode a device major/minor number."""
369         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
370         return (major, minor)
371
372     class Items: pass
373
374     def __init__(self, fields, object_store):
375         """Initialize from a dictionary of key/value pairs from metadata log."""
376
377         self.fields = fields
378         self.object_store = object_store
379         self.keys = []
380         self.items = self.Items()
381         for (k, v) in fields.items():
382             if k in self.field_types:
383                 decoder = self.field_types[k]
384                 setattr(self.items, k, decoder(v))
385                 self.keys.append(k)
386
387     def data(self):
388         """Return an iterator for the data blocks that make up a file."""
389
390         # This traverses the list of blocks that make up a file, following
391         # indirect references.  It is implemented in much the same way as
392         # read_metadata, so see that function for details of the technique.
393
394         objects = self.fields['data'].split()
395         objects.reverse()
396         stack = [objects]
397
398         def follow_ref(refstr):
399             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
400             objects = self.object_store.get(refstr).split()
401             objects.reverse()
402             stack.append(objects)
403
404         while len(stack) > 0:
405             top = stack[-1]
406             if len(top) == 0:
407                 stack.pop()
408                 continue
409             ref = top.pop()
410
411             # An indirect reference which we must follow?
412             if len(ref) > 0 and ref[0] == '@':
413                 follow_ref(ref[1:])
414             else:
415                 yield ref
416
417 # Description of fields that might appear, and how they should be parsed.
418 MetadataItem.field_types = {
419     'name': MetadataItem.decode_str,
420     'type': MetadataItem.raw_str,
421     'mode': MetadataItem.decode_int,
422     'device': MetadataItem.decode_device,
423     'user': MetadataItem.decode_user,
424     'group': MetadataItem.decode_user,
425     'ctime': MetadataItem.decode_int,
426     'mtime': MetadataItem.decode_int,
427     'links': MetadataItem.decode_int,
428     'inode': MetadataItem.raw_str,
429     'checksum': MetadataItem.decode_str,
430     'size': MetadataItem.decode_int,
431     'contents': MetadataItem.decode_str,
432     'target': MetadataItem.decode_str,
433 }
434
435 def iterate_metadata(object_store, root):
436     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
437         yield MetadataItem(d, object_store)
438
439 class LocalDatabase:
440     """Access to the local database of snapshot contents and object checksums.
441
442     The local database is consulted when creating a snapshot to determine what
443     data can be re-used from old snapshots.  Segment cleaning is performed by
444     manipulating the data in the local database; the local database also
445     includes enough data to guide the segment cleaning process.
446     """
447
448     def __init__(self, path, dbname="localdb.sqlite"):
449         self.db_connection = sqlite3.connect(path + "/" + dbname)
450
451     # Low-level database access.  Use these methods when there isn't a
452     # higher-level interface available.  Exception: do, however, remember to
453     # use the commit() method after making changes to make sure they are
454     # actually saved, even when going through higher-level interfaces.
455     def commit(self):
456         "Commit any pending changes to the local database."
457         self.db_connection.commit()
458
459     def rollback(self):
460         "Roll back any pending changes to the local database."
461         self.db_connection.rollback()
462
463     def cursor(self):
464         "Return a DB-API cursor for directly accessing the local database."
465         return self.db_connection.cursor()
466
467     def list_schemes(self):
468         """Return the list of snapshots found in the local database.
469
470         The returned value is a list of tuples (id, scheme, name, time, intent).
471         """
472
473         cur = self.cursor()
474         cur.execute("select distinct scheme from snapshots")
475         schemes = [row[0] for row in cur.fetchall()]
476         schemes.sort()
477         return schemes
478
479     def garbage_collect(self, scheme, intent=1.0):
480         """Delete entries from old snapshots from the database.
481
482         Only snapshots with the specified scheme name will be deleted.  If
483         intent is given, it gives the intended next snapshot type, to determine
484         how aggressively to clean (for example, intent=7 could be used if the
485         next snapshot will be a weekly snapshot).
486         """
487
488         cur = self.cursor()
489
490         # Find the id of the last snapshot to be created.  This is used for
491         # measuring time in a way: we record this value in each segment we
492         # expire on this run, and then on a future run can tell if there have
493         # been intervening backups made.
494         cur.execute("select max(snapshotid) from snapshots")
495         last_snapshotid = cur.fetchone()[0]
496
497         # Get the list of old snapshots for this scheme.  Delete all the old
498         # ones.  Rules for what to keep:
499         #   - Always keep the most recent snapshot.
500         #   - If snapshot X is younger than Y, and X has higher intent, then Y
501         #     can be deleted.
502         cur.execute("""select snapshotid, name, intent,
503                               julianday('now') - timestamp as age
504                        from snapshots where scheme = ?
505                        order by age""", (scheme,))
506
507         first = True
508         max_intent = intent
509         for (id, name, snap_intent, snap_age) in cur.fetchall():
510             can_delete = False
511             if snap_intent < max_intent:
512                 # Delete small-intent snapshots if there is a more recent
513                 # large-intent snapshot.
514                 can_delete = True
515             elif snap_intent == intent:
516                 # Delete previous snapshots with the specified intent level.
517                 can_delete = True
518
519             if can_delete and not first:
520                 print "Delete snapshot %d (%s)" % (id, name)
521                 cur.execute("delete from snapshots where snapshotid = ?",
522                             (id,))
523             first = False
524             max_intent = max(max_intent, snap_intent)
525
526         # Delete entries in the segments_used table which are for non-existent
527         # snapshots.
528         cur.execute("""delete from segments_used
529                        where snapshotid not in
530                            (select snapshotid from snapshots)""")
531
532         # Find segments which contain no objects used by any current snapshots,
533         # and delete them from the segment table.
534         cur.execute("""delete from segments where segmentid not in
535                            (select segmentid from segments_used)""")
536
537         # Delete unused objects in the block_index table.  By "unused", we mean
538         # any object which was stored in a segment which has been deleted, and
539         # any object in a segment which was marked for cleaning and has had
540         # cleaning performed already (the expired time is less than the current
541         # largest snapshot id).
542         cur.execute("""delete from block_index
543                        where segmentid not in (select segmentid from segments)
544                           or segmentid in (select segmentid from segments
545                                            where expire_time < ?)""",
546                     (last_snapshotid,))
547
548         # Remove sub-block signatures for deleted objects.
549         cur.execute("""delete from subblock_signatures
550                        where blockid not in
551                            (select blockid from block_index)""")
552
553     # Segment cleaning.
554     class SegmentInfo(Struct): pass
555
556     def get_segment_cleaning_list(self, age_boost=0.0):
557         """Return a list of all current segments with information for cleaning.
558
559         Return all segments which are currently known in the local database
560         (there might be other, older segments in the archive itself), and
561         return usage statistics for each to help decide which segments to
562         clean.
563
564         The returned list will be sorted by estimated cleaning benefit, with
565         segments that are best to clean at the start of the list.
566
567         If specified, the age_boost parameter (measured in days) will added to
568         the age of each segment, as a way of adjusting the benefit computation
569         before a long-lived snapshot is taken (for example, age_boost might be
570         set to 7 when cleaning prior to taking a weekly snapshot).
571         """
572
573         cur = self.cursor()
574         segments = []
575         cur.execute("""select segmentid, used, size, mtime,
576                        julianday('now') - mtime as age from segment_info
577                        where expire_time is null""")
578         for row in cur:
579             info = self.SegmentInfo()
580             info.id = row[0]
581             info.used_bytes = row[1]
582             info.size_bytes = row[2]
583             info.mtime = row[3]
584             info.age_days = row[4]
585
586             # If age is not available for whatever reason, treat it as 0.0.
587             if info.age_days is None:
588                 info.age_days = 0.0
589
590             # Benefit calculation: u is the estimated fraction of each segment
591             # which is utilized (bytes belonging to objects still in use
592             # divided by total size; this doesn't take compression or storage
593             # overhead into account, but should give a reasonable estimate).
594             #
595             # The total benefit is a heuristic that combines several factors:
596             # the amount of space that can be reclaimed (1 - u), an ageing
597             # factor (info.age_days) that favors cleaning old segments to young
598             # ones and also is more likely to clean segments that will be
599             # rewritten for long-lived snapshots (age_boost), and finally a
600             # penalty factor for the cost of re-uploading data (u + 0.1).
601             u = info.used_bytes / info.size_bytes
602             info.cleaning_benefit \
603                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
604
605             segments.append(info)
606
607         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
608         return segments
609
610     def mark_segment_expired(self, segment):
611         """Mark a segment for cleaning in the local database.
612
613         The segment parameter should be either a SegmentInfo object or an
614         integer segment id.  Objects in the given segment will be marked as
615         expired, which means that any future snapshots that would re-use those
616         objects will instead write out a new copy of the object, and thus no
617         future snapshots will depend upon the given segment.
618         """
619
620         if isinstance(segment, int):
621             id = segment
622         elif isinstance(segment, self.SegmentInfo):
623             id = segment.id
624         else:
625             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
626
627         cur = self.cursor()
628         cur.execute("select max(snapshotid) from snapshots")
629         last_snapshotid = cur.fetchone()[0]
630         cur.execute("update segments set expire_time = ? where segmentid = ?",
631                     (last_snapshotid, id))
632         cur.execute("update block_index set expired = 0 where segmentid = ?",
633                     (id,))
634
635     def balance_expired_objects(self):
636         """Analyze expired objects in segments to be cleaned and group by age.
637
638         Update the block_index table of the local database to group expired
639         objects by age.  The exact number of buckets and the cutoffs for each
640         are dynamically determined.  Calling this function after marking
641         segments expired will help in the segment cleaning process, by ensuring
642         that when active objects from clean segments are rewritten, they will
643         be placed into new segments roughly grouped by age.
644         """
645
646         # The expired column of the block_index table is used when generating a
647         # new LBS snapshot.  A null value indicates that an object may be
648         # re-used.  Otherwise, an object must be written into a new segment if
649         # needed.  Objects with distinct expired values will be written into
650         # distinct segments, to allow for some grouping by age.  The value 0 is
651         # somewhat special in that it indicates any rewritten objects can be
652         # placed in the same segment as completely new objects; this can be
653         # used for very young objects which have been expired, or objects not
654         # expected to be encountered.
655         #
656         # In the balancing process, all objects which are not used in any
657         # current snapshots will have expired set to 0.  Objects which have
658         # been seen will be sorted by age and will have expired values set to
659         # 0, 1, 2, and so on based on age (with younger objects being assigned
660         # lower values).  The number of buckets and the age cutoffs is
661         # determined by looking at the distribution of block ages.
662
663         cur = self.cursor()
664
665         # Mark all expired objects with expired = 0; these objects will later
666         # have values set to indicate groupings of objects when repacking.
667         cur.execute("""update block_index set expired = 0
668                        where expired is not null""")
669
670         # We will want to aim for at least one full segment for each bucket
671         # that we eventually create, but don't know how many bytes that should
672         # be due to compression.  So compute the average number of bytes in
673         # each expired segment as a rough estimate for the minimum size of each
674         # bucket.  (This estimate could be thrown off by many not-fully-packed
675         # segments, but for now don't worry too much about that.)  If we can't
676         # compute an average, it's probably because there are no expired
677         # segments, so we have no more work to do.
678         cur.execute("""select avg(size) from segments
679                        where segmentid in
680                            (select distinct segmentid from block_index
681                             where expired is not null)""")
682         segment_size_estimate = cur.fetchone()[0]
683         if not segment_size_estimate:
684             return
685
686         # Next, extract distribution of expired objects (number and size) by
687         # age.  Save the timestamp for "now" so that the classification of
688         # blocks into age buckets will not change later in the function, after
689         # time has passed.  Set any timestamps in the future to now, so we are
690         # guaranteed that for the rest of this function, age is always
691         # non-negative.
692         cur.execute("select julianday('now')")
693         now = cur.fetchone()[0]
694
695         cur.execute("""update block_index set timestamp = ?
696                        where timestamp > ? and expired is not null""",
697                     (now, now))
698
699         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
700                        from block_index where expired = 0
701                        group by age order by age""", (now,))
702         distribution = cur.fetchall()
703
704         # Start to determine the buckets for expired objects.  Heuristics used:
705         #   - An upper bound on the number of buckets is given by the number of
706         #     segments we estimate it will take to store all data.  In fact,
707         #     aim for a couple of segments per bucket.
708         #   - Place very young objects in bucket 0 (place with new objects)
709         #     unless there are enough of them to warrant a separate bucket.
710         #   - Try not to create unnecessarily many buckets, since fewer buckets
711         #     will allow repacked data to be grouped based on spatial locality
712         #     (while more buckets will group by temporal locality).  We want a
713         #     balance.
714         MIN_AGE = 4
715         total_bytes = sum([i[2] for i in distribution])
716         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
717         min_size = 1.5 * segment_size_estimate
718         target_size = max(2 * segment_size_estimate,
719                           total_bytes / target_buckets)
720
721         print "segment_size:", segment_size_estimate
722         print "distribution:", distribution
723         print "total_bytes:", total_bytes
724         print "target_buckets:", target_buckets
725         print "min, target size:", min_size, target_size
726
727         # Chosen cutoffs.  Each bucket consists of objects with age greater
728         # than one cutoff value, but not greater than the next largest cutoff.
729         cutoffs = []
730
731         # Starting with the oldest objects, begin grouping together into
732         # buckets of size at least target_size bytes.
733         distribution.reverse()
734         bucket_size = 0
735         min_age_bucket = False
736         for (age, items, size) in distribution:
737             if bucket_size >= target_size \
738                 or (age < MIN_AGE and not min_age_bucket):
739                 if bucket_size < target_size and len(cutoffs) > 0:
740                     cutoffs.pop()
741                 cutoffs.append(age)
742                 bucket_size = 0
743
744             bucket_size += size
745             if age < MIN_AGE:
746                 min_age_bucket = True
747
748         # The last (youngest) bucket will be group 0, unless it has enough data
749         # to be of size min_size by itself, or there happen to be no objects
750         # less than MIN_AGE at all.
751         if bucket_size >= min_size or not min_age_bucket:
752             cutoffs.append(-1)
753         cutoffs.append(-1)
754
755         print "cutoffs:", cutoffs
756
757         # Update the database to assign each object to the appropriate bucket.
758         cutoffs.reverse()
759         for i in range(len(cutoffs)):
760             cur.execute("""update block_index set expired = ?
761                            where round(? - timestamp) > ?
762                              and expired is not null""",
763                         (i, now, cutoffs[i]))