507b9e13bcef1fb5ec078637ed1f95d588ca9245
[cumulus.git] / lbs.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 # The largest supported snapshot format that can be understood.
16 FORMAT_VERSION = (0, 6)         # LBS Snapshot v0.6
17
18 # Maximum number of nested indirect references allowed in a snapshot.
19 MAX_RECURSION_DEPTH = 3
20
21 # All segments which have been accessed this session.
22 accessed_segments = set()
23
24 class Struct:
25     """A class which merely acts as a data container.
26
27     Instances of this class (or its subclasses) are merely used to store data
28     in various attributes.  No methods are provided.
29     """
30
31     def __repr__(self):
32         return "<%s %s>" % (self.__class__, self.__dict__)
33
34 CHECKSUM_ALGORITHMS = {
35     'sha1': sha.new
36 }
37
38 class ChecksumCreator:
39     """Compute an LBS checksum for provided data.
40
41     The algorithm used is selectable, but currently defaults to sha1.
42     """
43
44     def __init__(self, algorithm='sha1'):
45         self.algorithm = algorithm
46         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
47
48     def update(self, data):
49         self.hash.update(data)
50         return self
51
52     def compute(self):
53         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
54
55 class ChecksumVerifier:
56     """Verify whether a checksum from a snapshot matches the supplied data."""
57
58     def __init__(self, checksumstr):
59         """Create an object to check the supplied checksum."""
60
61         (algo, checksum) = checksumstr.split("=", 1)
62         self.checksum = checksum
63         self.hash = CHECKSUM_ALGORITHMS[algo]()
64
65     def update(self, data):
66         self.hash.update(data)
67
68     def valid(self):
69         """Return a boolean indicating whether the checksum matches."""
70
71         result = self.hash.hexdigest()
72         return result == self.checksum
73
74 class LowlevelDataStore:
75     """Access to the backup store containing segments and snapshot descriptors.
76
77     Instances of this class are used to get direct filesystem-level access to
78     the backup data.  To read a backup, a caller will ordinarily not care about
79     direct access to backup segments, but will instead merely need to access
80     objects from those segments.  The ObjectStore class provides a suitable
81     wrapper around a DataStore to give this high-level access.
82     """
83
84     def __init__(self, path):
85         self.path = path
86
87     # Low-level filesystem access.  These methods could be overwritten to
88     # provide access to remote data stores.
89     def lowlevel_list(self):
90         """Get a listing of files stored."""
91
92         return os.listdir(self.path)
93
94     def lowlevel_open(self, filename):
95         """Return a file-like object for reading data from the given file."""
96
97         return open(os.path.join(self.path, filename), 'rb')
98
99     def lowlevel_stat(self, filename):
100         """Return a dictionary of information about the given file.
101
102         Currently, the only defined field is 'size', giving the size of the
103         file in bytes.
104         """
105
106         stat = os.stat(os.path.join(self.path, filename))
107         return {'size': stat.st_size}
108
109     # Slightly higher-level list methods.
110     def list_snapshots(self):
111         for f in self.lowlevel_list():
112             m = re.match(r"^snapshot-(.*)\.lbs$", f)
113             if m:
114                 yield m.group(1)
115
116     def list_segments(self):
117         for f in self.lowlevel_list():
118             m = re.match(r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})(\.\S+)?$", f)
119             if m:
120                 yield m.group(1)
121
122 class ObjectStore:
123     def __init__(self, data_store):
124         self.store = data_store
125         self.cachedir = None
126         self.CACHE_SIZE = 16
127         self.lru_list = []
128
129     def get_cachedir(self):
130         if self.cachedir is None:
131             self.cachedir = tempfile.mkdtemp(".lbs")
132         return self.cachedir
133
134     def cleanup(self):
135         if self.cachedir is not None:
136             # TODO: Avoid use of system, make this safer
137             os.system("rm -rf " + self.cachedir)
138         self.cachedir = None
139
140     @staticmethod
141     def parse_ref(refstr):
142         m = re.match(r"^zero\[(\d+)\]$", refstr)
143         if m:
144             return ("zero", None, None, (0, int(m.group(1))))
145
146         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[((\d+)\+)?(\d+)\])?$", refstr)
147         if not m: return
148
149         segment = m.group(1)
150         object = m.group(2)
151         checksum = m.group(3)
152         slice = m.group(4)
153
154         if checksum is not None:
155             checksum = checksum.lstrip("(").rstrip(")")
156
157         if slice is not None:
158             if m.group(5) is None:
159                 # Abbreviated slice
160                 slice = (0, int(m.group(7)))
161             else:
162                 slice = (int(m.group(6)), int(m.group(7)))
163
164         return (segment, object, checksum, slice)
165
166     def get_segment(self, segment):
167         accessed_segments.add(segment)
168         raw = self.store.lowlevel_open(segment + ".tar.gpg")
169
170         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
171         def copy_thread(src, dst):
172             BLOCK_SIZE = 4096
173             while True:
174                 block = src.read(BLOCK_SIZE)
175                 if len(block) == 0: break
176                 dst.write(block)
177             dst.close()
178
179         thread.start_new_thread(copy_thread, (raw, input))
180         return output
181
182     def load_segment(self, segment):
183         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
184         for item in seg:
185             data_obj = seg.extractfile(item)
186             path = item.name.split('/')
187             if len(path) == 2 and path[0] == segment:
188                 yield (path[1], data_obj.read())
189
190     def load_snapshot(self, snapshot):
191         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
192         return file.read().splitlines(True)
193
194     def extract_segment(self, segment):
195         segdir = os.path.join(self.get_cachedir(), segment)
196         os.mkdir(segdir)
197         for (object, data) in self.load_segment(segment):
198             f = open(os.path.join(segdir, object), 'wb')
199             f.write(data)
200             f.close()
201
202     def load_object(self, segment, object):
203         accessed_segments.add(segment)
204         path = os.path.join(self.get_cachedir(), segment, object)
205         if not os.access(path, os.R_OK):
206             self.extract_segment(segment)
207         if segment in self.lru_list: self.lru_list.remove(segment)
208         self.lru_list.append(segment)
209         while len(self.lru_list) > self.CACHE_SIZE:
210             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
211             self.lru_list = self.lru_list[1:]
212         return open(path, 'rb').read()
213
214     def get(self, refstr):
215         """Fetch the given object and return it.
216
217         The input should be an object reference, in string form.
218         """
219
220         (segment, object, checksum, slice) = self.parse_ref(refstr)
221
222         if segment == "zero":
223             return "\0" * slice[1]
224
225         data = self.load_object(segment, object)
226
227         if checksum is not None:
228             verifier = ChecksumVerifier(checksum)
229             verifier.update(data)
230             if not verifier.valid():
231                 raise ValueError
232
233         if slice is not None:
234             (start, length) = slice
235             data = data[start:start+length]
236             if len(data) != length: raise IndexError
237
238         return data
239
240 def parse(lines, terminate=None):
241     """Generic parser for RFC822-style "Key: Value" data streams.
242
243     This parser can be used to read metadata logs and snapshot root descriptor
244     files.
245
246     lines must be an iterable object which yields a sequence of lines of input.
247
248     If terminate is specified, it is used as a predicate to determine when to
249     stop reading input lines.
250     """
251
252     dict = {}
253     last_key = None
254
255     for l in lines:
256         # Strip off a trailing newline, if present
257         if len(l) > 0 and l[-1] == "\n":
258             l = l[:-1]
259
260         if terminate is not None and terminate(l):
261             if len(dict) > 0: yield dict
262             dict = {}
263             last_key = None
264             continue
265
266         m = re.match(r"^([-\w]+):\s*(.*)$", l)
267         if m:
268             dict[m.group(1)] = m.group(2)
269             last_key = m.group(1)
270         elif len(l) > 0 and l[0].isspace() and last_key is not None:
271             dict[last_key] += l
272         else:
273             last_key = None
274
275     if len(dict) > 0: yield dict
276
277 def parse_full(lines):
278     try:
279         return parse(lines).next()
280     except StopIteration:
281         return {}
282
283 def parse_metadata_version(s):
284     """Convert a string with the snapshot version format to a tuple."""
285
286     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
287     if m is None:
288         return ()
289     else:
290         return tuple([int(d) for d in m.group(1).split(".")])
291
292 def read_metadata(object_store, root):
293     """Iterate through all lines in the metadata log, following references."""
294
295     # Stack for keeping track of recursion when following references to
296     # portions of the log.  The last entry in the stack corresponds to the
297     # object currently being parsed.  Each entry is a list of lines which have
298     # been reversed, so that popping successive lines from the end of each list
299     # will return lines of the metadata log in order.
300     stack = []
301
302     def follow_ref(refstr):
303         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
304         lines = object_store.get(refstr).splitlines(True)
305         lines.reverse()
306         stack.append(lines)
307
308     follow_ref(root)
309
310     while len(stack) > 0:
311         top = stack[-1]
312         if len(top) == 0:
313             stack.pop()
314             continue
315         line = top.pop()
316
317         # An indirect reference which we must follow?
318         if len(line) > 0 and line[0] == '@':
319             ref = line[1:]
320             ref.strip()
321             follow_ref(ref)
322         else:
323             yield line
324
325 class MetadataItem:
326     """Metadata for a single file (or directory or...) from a snapshot."""
327
328     # Functions for parsing various datatypes that can appear in a metadata log
329     # item.
330     @staticmethod
331     def decode_int(s):
332         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
333         if s.startswith("0x"):
334             return int(s, 16)
335         elif s.startswith("0"):
336             return int(s, 8)
337         else:
338             return int(s, 10)
339
340     @staticmethod
341     def decode_str(s):
342         """Decode a URI-encoded (%xx escapes) string."""
343         def hex_decode(m): return chr(int(m.group(1), 16))
344         return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
345
346     @staticmethod
347     def raw_str(s):
348         """An unecoded string."""
349         return s
350
351     @staticmethod
352     def decode_user(s):
353         """Decode a user/group to a tuple of uid/gid followed by name."""
354         items = s.split()
355         uid = MetadataItem.decode_int(items[0])
356         name = None
357         if len(items) > 1:
358             if items[1].startswith("(") and items[1].endswith(")"):
359                 name = MetadataItem.decode_str(items[1][1:-1])
360         return (uid, name)
361
362     @staticmethod
363     def decode_device(s):
364         """Decode a device major/minor number."""
365         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
366         return (major, minor)
367
368     class Items: pass
369
370     def __init__(self, fields, object_store):
371         """Initialize from a dictionary of key/value pairs from metadata log."""
372
373         self.fields = fields
374         self.object_store = object_store
375         self.keys = []
376         self.items = self.Items()
377         for (k, v) in fields.items():
378             if k in self.field_types:
379                 decoder = self.field_types[k]
380                 setattr(self.items, k, decoder(v))
381                 self.keys.append(k)
382
383     def data(self):
384         """Return an iterator for the data blocks that make up a file."""
385
386         # This traverses the list of blocks that make up a file, following
387         # indirect references.  It is implemented in much the same way as
388         # read_metadata, so see that function for details of the technique.
389
390         objects = self.fields['data'].split()
391         objects.reverse()
392         stack = [objects]
393
394         def follow_ref(refstr):
395             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
396             objects = self.object_store.get(refstr).split()
397             objects.reverse()
398             stack.append(objects)
399
400         while len(stack) > 0:
401             top = stack[-1]
402             if len(top) == 0:
403                 stack.pop()
404                 continue
405             ref = top.pop()
406
407             # An indirect reference which we must follow?
408             if len(ref) > 0 and ref[0] == '@':
409                 follow_ref(ref[1:])
410             else:
411                 yield ref
412
413 # Description of fields that might appear, and how they should be parsed.
414 MetadataItem.field_types = {
415     'name': MetadataItem.decode_str,
416     'type': MetadataItem.raw_str,
417     'mode': MetadataItem.decode_int,
418     'device': MetadataItem.decode_device,
419     'user': MetadataItem.decode_user,
420     'group': MetadataItem.decode_user,
421     'ctime': MetadataItem.decode_int,
422     'mtime': MetadataItem.decode_int,
423     'links': MetadataItem.decode_int,
424     'inode': MetadataItem.raw_str,
425     'checksum': MetadataItem.decode_str,
426     'size': MetadataItem.decode_int,
427     'contents': MetadataItem.decode_str,
428     'target': MetadataItem.decode_str,
429 }
430
431 def iterate_metadata(object_store, root):
432     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
433         yield MetadataItem(d, object_store)
434
435 class LocalDatabase:
436     """Access to the local database of snapshot contents and object checksums.
437
438     The local database is consulted when creating a snapshot to determine what
439     data can be re-used from old snapshots.  Segment cleaning is performed by
440     manipulating the data in the local database; the local database also
441     includes enough data to guide the segment cleaning process.
442     """
443
444     def __init__(self, path, dbname="localdb.sqlite"):
445         self.db_connection = sqlite3.connect(path + "/" + dbname)
446
447     # Low-level database access.  Use these methods when there isn't a
448     # higher-level interface available.  Exception: do, however, remember to
449     # use the commit() method after making changes to make sure they are
450     # actually saved, even when going through higher-level interfaces.
451     def commit(self):
452         "Commit any pending changes to the local database."
453         self.db_connection.commit()
454
455     def rollback(self):
456         "Roll back any pending changes to the local database."
457         self.db_connection.rollback()
458
459     def cursor(self):
460         "Return a DB-API cursor for directly accessing the local database."
461         return self.db_connection.cursor()
462
463     def list_schemes(self):
464         """Return the list of snapshots found in the local database.
465
466         The returned value is a list of tuples (id, scheme, name, time, intent).
467         """
468
469         cur = self.cursor()
470         cur.execute("select distinct scheme from snapshots")
471         schemes = [row[0] for row in cur.fetchall()]
472         schemes.sort()
473         return schemes
474
475     def garbage_collect(self, scheme, intent=1.0):
476         """Delete entries from old snapshots from the database.
477
478         Only snapshots with the specified scheme name will be deleted.  If
479         intent is given, it gives the intended next snapshot type, to determine
480         how aggressively to clean (for example, intent=7 could be used if the
481         next snapshot will be a weekly snapshot).
482         """
483
484         cur = self.cursor()
485
486         # Find the id of the last snapshot to be created.  This is used for
487         # measuring time in a way: we record this value in each segment we
488         # expire on this run, and then on a future run can tell if there have
489         # been intervening backups made.
490         cur.execute("select max(snapshotid) from snapshots")
491         last_snapshotid = cur.fetchone()[0]
492
493         # Get the list of old snapshots for this scheme.  Delete all the old
494         # ones.  Rules for what to keep:
495         #   - Always keep the most recent snapshot.
496         #   - If snapshot X is younger than Y, and X has higher intent, then Y
497         #     can be deleted.
498         cur.execute("""select snapshotid, name, intent,
499                               julianday('now') - timestamp as age
500                        from snapshots where scheme = ?
501                        order by age""", (scheme,))
502
503         first = True
504         max_intent = intent
505         for (id, name, snap_intent, snap_age) in cur.fetchall():
506             can_delete = False
507             if snap_intent < max_intent:
508                 # Delete small-intent snapshots if there is a more recent
509                 # large-intent snapshot.
510                 can_delete = True
511             elif snap_intent == intent:
512                 # Delete previous snapshots with the specified intent level.
513                 can_delete = True
514
515             if can_delete and not first:
516                 print "Delete snapshot %d (%s)" % (id, name)
517                 cur.execute("delete from snapshots where snapshotid = ?",
518                             (id,))
519             first = False
520             max_intent = max(max_intent, snap_intent)
521
522         # Delete entries in the segments_used table which are for non-existent
523         # snapshots.
524         cur.execute("""delete from segments_used
525                        where snapshotid not in
526                            (select snapshotid from snapshots)""")
527
528         # Find segments which contain no objects used by any current snapshots,
529         # and delete them from the segment table.
530         cur.execute("""delete from segments where segmentid not in
531                            (select segmentid from segments_used)""")
532
533         # Delete unused objects in the block_index table.  By "unused", we mean
534         # any object which was stored in a segment which has been deleted, and
535         # any object in a segment which was marked for cleaning and has had
536         # cleaning performed already (the expired time is less than the current
537         # largest snapshot id).
538         cur.execute("""delete from block_index
539                        where segmentid not in (select segmentid from segments)
540                           or segmentid in (select segmentid from segments
541                                            where expire_time < ?)""",
542                     (last_snapshotid,))
543
544         # Remove sub-block signatures for deleted objects.
545         cur.execute("""delete from subblock_signatures
546                        where blockid not in
547                            (select blockid from block_index)""")
548
549     # Segment cleaning.
550     class SegmentInfo(Struct): pass
551
552     def get_segment_cleaning_list(self, age_boost=0.0):
553         """Return a list of all current segments with information for cleaning.
554
555         Return all segments which are currently known in the local database
556         (there might be other, older segments in the archive itself), and
557         return usage statistics for each to help decide which segments to
558         clean.
559
560         The returned list will be sorted by estimated cleaning benefit, with
561         segments that are best to clean at the start of the list.
562
563         If specified, the age_boost parameter (measured in days) will added to
564         the age of each segment, as a way of adjusting the benefit computation
565         before a long-lived snapshot is taken (for example, age_boost might be
566         set to 7 when cleaning prior to taking a weekly snapshot).
567         """
568
569         cur = self.cursor()
570         segments = []
571         cur.execute("""select segmentid, used, size, mtime,
572                        julianday('now') - mtime as age from segment_info
573                        where expire_time is null""")
574         for row in cur:
575             info = self.SegmentInfo()
576             info.id = row[0]
577             info.used_bytes = row[1]
578             info.size_bytes = row[2]
579             info.mtime = row[3]
580             info.age_days = row[4]
581
582             # If age is not available for whatever reason, treat it as 0.0.
583             if info.age_days is None:
584                 info.age_days = 0.0
585
586             # Benefit calculation: u is the estimated fraction of each segment
587             # which is utilized (bytes belonging to objects still in use
588             # divided by total size; this doesn't take compression or storage
589             # overhead into account, but should give a reasonable estimate).
590             #
591             # The total benefit is a heuristic that combines several factors:
592             # the amount of space that can be reclaimed (1 - u), an ageing
593             # factor (info.age_days) that favors cleaning old segments to young
594             # ones and also is more likely to clean segments that will be
595             # rewritten for long-lived snapshots (age_boost), and finally a
596             # penalty factor for the cost of re-uploading data (u + 0.1).
597             u = info.used_bytes / info.size_bytes
598             info.cleaning_benefit \
599                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
600
601             segments.append(info)
602
603         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
604         return segments
605
606     def mark_segment_expired(self, segment):
607         """Mark a segment for cleaning in the local database.
608
609         The segment parameter should be either a SegmentInfo object or an
610         integer segment id.  Objects in the given segment will be marked as
611         expired, which means that any future snapshots that would re-use those
612         objects will instead write out a new copy of the object, and thus no
613         future snapshots will depend upon the given segment.
614         """
615
616         if isinstance(segment, int):
617             id = segment
618         elif isinstance(segment, self.SegmentInfo):
619             id = segment.id
620         else:
621             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
622
623         cur = self.cursor()
624         cur.execute("select max(snapshotid) from snapshots")
625         last_snapshotid = cur.fetchone()[0]
626         cur.execute("update segments set expire_time = ? where segmentid = ?",
627                     (last_snapshotid, id))
628         cur.execute("update block_index set expired = 0 where segmentid = ?",
629                     (id,))
630
631     def balance_expired_objects(self):
632         """Analyze expired objects in segments to be cleaned and group by age.
633
634         Update the block_index table of the local database to group expired
635         objects by age.  The exact number of buckets and the cutoffs for each
636         are dynamically determined.  Calling this function after marking
637         segments expired will help in the segment cleaning process, by ensuring
638         that when active objects from clean segments are rewritten, they will
639         be placed into new segments roughly grouped by age.
640         """
641
642         # The expired column of the block_index table is used when generating a
643         # new LBS snapshot.  A null value indicates that an object may be
644         # re-used.  Otherwise, an object must be written into a new segment if
645         # needed.  Objects with distinct expired values will be written into
646         # distinct segments, to allow for some grouping by age.  The value 0 is
647         # somewhat special in that it indicates any rewritten objects can be
648         # placed in the same segment as completely new objects; this can be
649         # used for very young objects which have been expired, or objects not
650         # expected to be encountered.
651         #
652         # In the balancing process, all objects which are not used in any
653         # current snapshots will have expired set to 0.  Objects which have
654         # been seen will be sorted by age and will have expired values set to
655         # 0, 1, 2, and so on based on age (with younger objects being assigned
656         # lower values).  The number of buckets and the age cutoffs is
657         # determined by looking at the distribution of block ages.
658
659         cur = self.cursor()
660
661         # Mark all expired objects with expired = 0; these objects will later
662         # have values set to indicate groupings of objects when repacking.
663         cur.execute("""update block_index set expired = 0
664                        where expired is not null""")
665
666         # We will want to aim for at least one full segment for each bucket
667         # that we eventually create, but don't know how many bytes that should
668         # be due to compression.  So compute the average number of bytes in
669         # each expired segment as a rough estimate for the minimum size of each
670         # bucket.  (This estimate could be thrown off by many not-fully-packed
671         # segments, but for now don't worry too much about that.)  If we can't
672         # compute an average, it's probably because there are no expired
673         # segments, so we have no more work to do.
674         cur.execute("""select avg(size) from segments
675                        where segmentid in
676                            (select distinct segmentid from block_index
677                             where expired is not null)""")
678         segment_size_estimate = cur.fetchone()[0]
679         if not segment_size_estimate:
680             return
681
682         # Next, extract distribution of expired objects (number and size) by
683         # age.  Save the timestamp for "now" so that the classification of
684         # blocks into age buckets will not change later in the function, after
685         # time has passed.  Set any timestamps in the future to now, so we are
686         # guaranteed that for the rest of this function, age is always
687         # non-negative.
688         cur.execute("select julianday('now')")
689         now = cur.fetchone()[0]
690
691         cur.execute("""update block_index set timestamp = ?
692                        where timestamp > ? and expired is not null""",
693                     (now, now))
694
695         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
696                        from block_index where expired = 0
697                        group by age order by age""", (now,))
698         distribution = cur.fetchall()
699
700         # Start to determine the buckets for expired objects.  Heuristics used:
701         #   - An upper bound on the number of buckets is given by the number of
702         #     segments we estimate it will take to store all data.  In fact,
703         #     aim for a couple of segments per bucket.
704         #   - Place very young objects in bucket 0 (place with new objects)
705         #     unless there are enough of them to warrant a separate bucket.
706         #   - Try not to create unnecessarily many buckets, since fewer buckets
707         #     will allow repacked data to be grouped based on spatial locality
708         #     (while more buckets will group by temporal locality).  We want a
709         #     balance.
710         MIN_AGE = 4
711         total_bytes = sum([i[2] for i in distribution])
712         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
713         min_size = 1.5 * segment_size_estimate
714         target_size = max(2 * segment_size_estimate,
715                           total_bytes / target_buckets)
716
717         print "segment_size:", segment_size_estimate
718         print "distribution:", distribution
719         print "total_bytes:", total_bytes
720         print "target_buckets:", target_buckets
721         print "min, target size:", min_size, target_size
722
723         # Chosen cutoffs.  Each bucket consists of objects with age greater
724         # than one cutoff value, but not greater than the next largest cutoff.
725         cutoffs = []
726
727         # Starting with the oldest objects, begin grouping together into
728         # buckets of size at least target_size bytes.
729         distribution.reverse()
730         bucket_size = 0
731         min_age_bucket = False
732         for (age, items, size) in distribution:
733             if bucket_size >= target_size \
734                 or (age < MIN_AGE and not min_age_bucket):
735                 if bucket_size < target_size and len(cutoffs) > 0:
736                     cutoffs.pop()
737                 cutoffs.append(age)
738                 bucket_size = 0
739
740             bucket_size += size
741             if age < MIN_AGE:
742                 min_age_bucket = True
743
744         # The last (youngest) bucket will be group 0, unless it has enough data
745         # to be of size min_size by itself, or there happen to be no objects
746         # less than MIN_AGE at all.
747         if bucket_size >= min_size or not min_age_bucket:
748             cutoffs.append(-1)
749         cutoffs.append(-1)
750
751         print "cutoffs:", cutoffs
752
753         # Update the database to assign each object to the appropriate bucket.
754         cutoffs.reverse()
755         for i in range(len(cutoffs)):
756             cur.execute("""update block_index set expired = ?
757                            where round(? - timestamp) > ?
758                              and expired is not null""",
759                         (i, now, cutoffs[i]))