3f6b6f7104b6ba664e3236c6591e7ed0dd55e882
[cumulus.git] / python / cumulus / __init__.py
1 """High-level interface for working with Cumulus archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of a Cumulus archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import hashlib, os, re, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 import cumulus.store, cumulus.store.file
16
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
19
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
22
23 # All segments which have been accessed this session.
24 accessed_segments = set()
25
26 # Table of methods used to filter segments before storage, and corresponding
27 # filename extensions.  These are listed in priority order (methods earlier in
28 # the list are tried first).
29 SEGMENT_FILTERS = [
30     (".gpg", "cumulus-filter-gpg --decrypt"),
31     (".gz", "gzip -dc"),
32     (".bz2", "bzip2 -dc"),
33 ]
34
35 def uri_decode(s):
36     """Decode a URI-encoded (%xx escapes) string."""
37     def hex_decode(m): return chr(int(m.group(1), 16))
38     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
39 def uri_encode(s):
40     """Encode a string to URI-encoded (%xx escapes) form."""
41     def hex_encode(c):
42         if c > '+' and c < '\x7f' and c != '@':
43             return c
44         else:
45             return "%%%02x" % (ord(c),)
46     return ''.join(hex_encode(c) for c in s)
47
48 class Struct:
49     """A class which merely acts as a data container.
50
51     Instances of this class (or its subclasses) are merely used to store data
52     in various attributes.  No methods are provided.
53     """
54
55     def __repr__(self):
56         return "<%s %s>" % (self.__class__, self.__dict__)
57
58 CHECKSUM_ALGORITHMS = {
59     'sha1': hashlib.sha1,
60     'sha256': hashlib.sha256,
61 }
62
63 class ChecksumCreator:
64     """Compute a Cumulus checksum for provided data.
65
66     The algorithm used is selectable, but currently defaults to sha1.
67     """
68
69     def __init__(self, algorithm='sha1'):
70         self.algorithm = algorithm
71         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
72
73     def update(self, data):
74         self.hash.update(data)
75         return self
76
77     def compute(self):
78         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
79
80 class ChecksumVerifier:
81     """Verify whether a checksum from a snapshot matches the supplied data."""
82
83     def __init__(self, checksumstr):
84         """Create an object to check the supplied checksum."""
85
86         (algo, checksum) = checksumstr.split("=", 1)
87         self.checksum = checksum
88         self.hash = CHECKSUM_ALGORITHMS[algo]()
89
90     def update(self, data):
91         self.hash.update(data)
92
93     def valid(self):
94         """Return a boolean indicating whether the checksum matches."""
95
96         result = self.hash.hexdigest()
97         return result == self.checksum
98
99 class LowlevelDataStore:
100     """Access to the backup store containing segments and snapshot descriptors.
101
102     Instances of this class are used to get direct filesystem-level access to
103     the backup data.  To read a backup, a caller will ordinarily not care about
104     direct access to backup segments, but will instead merely need to access
105     objects from those segments.  The ObjectStore class provides a suitable
106     wrapper around a DataStore to give this high-level access.
107     """
108
109     def __init__(self, path):
110         if isinstance(path, cumulus.store.Store):
111             self.store = path
112         elif path.find(":") >= 0:
113             self.store = cumulus.store.open(path)
114         else:
115             self.store = cumulus.store.file.FileStore(path)
116
117     def _classify(self, filename):
118         for (t, r) in cumulus.store.type_patterns.items():
119             if r.match(filename):
120                 return (t, filename)
121         return (None, filename)
122
123     def scan(self):
124         self.store.scan()
125
126     def lowlevel_open(self, filename):
127         """Return a file-like object for reading data from the given file."""
128
129         (type, filename) = self._classify(filename)
130         return self.store.get(type, filename)
131
132     def lowlevel_stat(self, filename):
133         """Return a dictionary of information about the given file.
134
135         Currently, the only defined field is 'size', giving the size of the
136         file in bytes.
137         """
138
139         (type, filename) = self._classify(filename)
140         return self.store.stat(type, filename)
141
142     # Slightly higher-level list methods.
143     def list_snapshots(self):
144         for f in self.store.list('snapshots'):
145             m = cumulus.store.type_patterns['snapshots'].match(f)
146             if m: yield m.group(1)
147
148     def list_segments(self):
149         for f in self.store.list('segments'):
150             m = cumulus.store.type_patterns['segments'].match(f)
151             if m: yield m.group(1)
152
153 class ObjectStore:
154     def __init__(self, data_store):
155         self.store = data_store
156         self.cachedir = None
157         self.CACHE_SIZE = 16
158         self.lru_list = []
159
160     def get_cachedir(self):
161         if self.cachedir is None:
162             self.cachedir = tempfile.mkdtemp(".lbs")
163         return self.cachedir
164
165     def cleanup(self):
166         if self.cachedir is not None:
167             # TODO: Avoid use of system, make this safer
168             os.system("rm -rf " + self.cachedir)
169         self.cachedir = None
170
171     @staticmethod
172     def parse_ref(refstr):
173         m = re.match(r"^zero\[(\d+)\]$", refstr)
174         if m:
175             return ("zero", None, None, (0, int(m.group(1))))
176
177         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
178         if not m: return
179
180         segment = m.group(1)
181         object = m.group(2)
182         checksum = m.group(3)
183         slice = m.group(4)
184
185         if checksum is not None:
186             checksum = checksum.lstrip("(").rstrip(")")
187
188         if slice is not None:
189             if m.group(9) is not None:
190                 # Size-assertion slice
191                 slice = (0, int(m.group(9)), True)
192             elif m.group(6) is None:
193                 # Abbreviated slice
194                 slice = (0, int(m.group(8)), False)
195             else:
196                 slice = (int(m.group(7)), int(m.group(8)), False)
197
198         return (segment, object, checksum, slice)
199
200     def get_segment(self, segment):
201         accessed_segments.add(segment)
202
203         for (extension, filter) in SEGMENT_FILTERS:
204             try:
205                 raw = self.store.lowlevel_open(segment + ".tar" + extension)
206
207                 (input, output) = os.popen2(filter)
208                 def copy_thread(src, dst):
209                     BLOCK_SIZE = 4096
210                     while True:
211                         block = src.read(BLOCK_SIZE)
212                         if len(block) == 0: break
213                         dst.write(block)
214                     dst.close()
215
216                 thread.start_new_thread(copy_thread, (raw, input))
217                 return output
218             except:
219                 pass
220
221         raise cumulus.store.NotFoundError
222
223     def load_segment(self, segment):
224         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
225         for item in seg:
226             data_obj = seg.extractfile(item)
227             path = item.name.split('/')
228             if len(path) == 2 and path[0] == segment:
229                 yield (path[1], data_obj.read())
230
231     def load_snapshot(self, snapshot):
232         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
233         return file.read().splitlines(True)
234
235     def extract_segment(self, segment):
236         segdir = os.path.join(self.get_cachedir(), segment)
237         os.mkdir(segdir)
238         for (object, data) in self.load_segment(segment):
239             f = open(os.path.join(segdir, object), 'wb')
240             f.write(data)
241             f.close()
242
243     def load_object(self, segment, object):
244         accessed_segments.add(segment)
245         path = os.path.join(self.get_cachedir(), segment, object)
246         if not os.access(path, os.R_OK):
247             self.extract_segment(segment)
248         if segment in self.lru_list: self.lru_list.remove(segment)
249         self.lru_list.append(segment)
250         while len(self.lru_list) > self.CACHE_SIZE:
251             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
252             self.lru_list = self.lru_list[1:]
253         return open(path, 'rb').read()
254
255     def get(self, refstr):
256         """Fetch the given object and return it.
257
258         The input should be an object reference, in string form.
259         """
260
261         (segment, object, checksum, slice) = self.parse_ref(refstr)
262
263         if segment == "zero":
264             return "\0" * slice[1]
265
266         data = self.load_object(segment, object)
267
268         if checksum is not None:
269             verifier = ChecksumVerifier(checksum)
270             verifier.update(data)
271             if not verifier.valid():
272                 raise ValueError
273
274         if slice is not None:
275             (start, length, exact) = slice
276             if exact and len(data) != length: raise ValueError
277             data = data[start:start+length]
278             if len(data) != length: raise IndexError
279
280         return data
281
282 def parse(lines, terminate=None):
283     """Generic parser for RFC822-style "Key: Value" data streams.
284
285     This parser can be used to read metadata logs and snapshot root descriptor
286     files.
287
288     lines must be an iterable object which yields a sequence of lines of input.
289
290     If terminate is specified, it is used as a predicate to determine when to
291     stop reading input lines.
292     """
293
294     dict = {}
295     last_key = None
296
297     for l in lines:
298         # Strip off a trailing newline, if present
299         if len(l) > 0 and l[-1] == "\n":
300             l = l[:-1]
301
302         if terminate is not None and terminate(l):
303             if len(dict) > 0: yield dict
304             dict = {}
305             last_key = None
306             continue
307
308         m = re.match(r"^([-\w]+):\s*(.*)$", l)
309         if m:
310             dict[m.group(1)] = m.group(2)
311             last_key = m.group(1)
312         elif len(l) > 0 and l[0].isspace() and last_key is not None:
313             dict[last_key] += l
314         else:
315             last_key = None
316
317     if len(dict) > 0: yield dict
318
319 def parse_full(lines):
320     try:
321         return parse(lines).next()
322     except StopIteration:
323         return {}
324
325 def parse_metadata_version(s):
326     """Convert a string with the snapshot version format to a tuple."""
327
328     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
329     if m is None:
330         return ()
331     else:
332         return tuple([int(d) for d in m.group(1).split(".")])
333
334 def read_metadata(object_store, root):
335     """Iterate through all lines in the metadata log, following references."""
336
337     # Stack for keeping track of recursion when following references to
338     # portions of the log.  The last entry in the stack corresponds to the
339     # object currently being parsed.  Each entry is a list of lines which have
340     # been reversed, so that popping successive lines from the end of each list
341     # will return lines of the metadata log in order.
342     stack = []
343
344     def follow_ref(refstr):
345         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
346         lines = object_store.get(refstr).splitlines(True)
347         lines.reverse()
348         stack.append(lines)
349
350     follow_ref(root)
351
352     while len(stack) > 0:
353         top = stack[-1]
354         if len(top) == 0:
355             stack.pop()
356             continue
357         line = top.pop()
358
359         # An indirect reference which we must follow?
360         if len(line) > 0 and line[0] == '@':
361             ref = line[1:]
362             ref.strip()
363             follow_ref(ref)
364         else:
365             yield line
366
367 class MetadataItem:
368     """Metadata for a single file (or directory or...) from a snapshot."""
369
370     # Functions for parsing various datatypes that can appear in a metadata log
371     # item.
372     @staticmethod
373     def decode_int(s):
374         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
375         if s.startswith("0x"):
376             return int(s, 16)
377         elif s.startswith("0"):
378             return int(s, 8)
379         else:
380             return int(s, 10)
381
382     @staticmethod
383     def decode_str(s):
384         """Decode a URI-encoded (%xx escapes) string."""
385         return uri_decode(s)
386
387     @staticmethod
388     def raw_str(s):
389         """An unecoded string."""
390         return s
391
392     @staticmethod
393     def decode_user(s):
394         """Decode a user/group to a tuple of uid/gid followed by name."""
395         items = s.split()
396         uid = MetadataItem.decode_int(items[0])
397         name = None
398         if len(items) > 1:
399             if items[1].startswith("(") and items[1].endswith(")"):
400                 name = MetadataItem.decode_str(items[1][1:-1])
401         return (uid, name)
402
403     @staticmethod
404     def decode_device(s):
405         """Decode a device major/minor number."""
406         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
407         return (major, minor)
408
409     class Items: pass
410
411     def __init__(self, fields, object_store):
412         """Initialize from a dictionary of key/value pairs from metadata log."""
413
414         self.fields = fields
415         self.object_store = object_store
416         self.keys = []
417         self.items = self.Items()
418         for (k, v) in fields.items():
419             if k in self.field_types:
420                 decoder = self.field_types[k]
421                 setattr(self.items, k, decoder(v))
422                 self.keys.append(k)
423
424     def data(self):
425         """Return an iterator for the data blocks that make up a file."""
426
427         # This traverses the list of blocks that make up a file, following
428         # indirect references.  It is implemented in much the same way as
429         # read_metadata, so see that function for details of the technique.
430
431         objects = self.fields['data'].split()
432         objects.reverse()
433         stack = [objects]
434
435         def follow_ref(refstr):
436             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
437             objects = self.object_store.get(refstr).split()
438             objects.reverse()
439             stack.append(objects)
440
441         while len(stack) > 0:
442             top = stack[-1]
443             if len(top) == 0:
444                 stack.pop()
445                 continue
446             ref = top.pop()
447
448             # An indirect reference which we must follow?
449             if len(ref) > 0 and ref[0] == '@':
450                 follow_ref(ref[1:])
451             else:
452                 yield ref
453
454 # Description of fields that might appear, and how they should be parsed.
455 MetadataItem.field_types = {
456     'name': MetadataItem.decode_str,
457     'type': MetadataItem.raw_str,
458     'mode': MetadataItem.decode_int,
459     'device': MetadataItem.decode_device,
460     'user': MetadataItem.decode_user,
461     'group': MetadataItem.decode_user,
462     'ctime': MetadataItem.decode_int,
463     'mtime': MetadataItem.decode_int,
464     'links': MetadataItem.decode_int,
465     'inode': MetadataItem.raw_str,
466     'checksum': MetadataItem.decode_str,
467     'size': MetadataItem.decode_int,
468     'contents': MetadataItem.decode_str,
469     'target': MetadataItem.decode_str,
470 }
471
472 def iterate_metadata(object_store, root):
473     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
474         yield MetadataItem(d, object_store)
475
476 class LocalDatabase:
477     """Access to the local database of snapshot contents and object checksums.
478
479     The local database is consulted when creating a snapshot to determine what
480     data can be re-used from old snapshots.  Segment cleaning is performed by
481     manipulating the data in the local database; the local database also
482     includes enough data to guide the segment cleaning process.
483     """
484
485     def __init__(self, path, dbname="localdb.sqlite"):
486         self.db_connection = sqlite3.connect(path + "/" + dbname)
487
488     # Low-level database access.  Use these methods when there isn't a
489     # higher-level interface available.  Exception: do, however, remember to
490     # use the commit() method after making changes to make sure they are
491     # actually saved, even when going through higher-level interfaces.
492     def commit(self):
493         "Commit any pending changes to the local database."
494         self.db_connection.commit()
495
496     def rollback(self):
497         "Roll back any pending changes to the local database."
498         self.db_connection.rollback()
499
500     def cursor(self):
501         "Return a DB-API cursor for directly accessing the local database."
502         return self.db_connection.cursor()
503
504     def list_schemes(self):
505         """Return the list of snapshots found in the local database.
506
507         The returned value is a list of tuples (id, scheme, name, time, intent).
508         """
509
510         cur = self.cursor()
511         cur.execute("select distinct scheme from snapshots")
512         schemes = [row[0] for row in cur.fetchall()]
513         schemes.sort()
514         return schemes
515
516     def garbage_collect(self, scheme, intent=1.0):
517         """Delete entries from old snapshots from the database.
518
519         Only snapshots with the specified scheme name will be deleted.  If
520         intent is given, it gives the intended next snapshot type, to determine
521         how aggressively to clean (for example, intent=7 could be used if the
522         next snapshot will be a weekly snapshot).
523         """
524
525         cur = self.cursor()
526
527         # Find the id of the last snapshot to be created.  This is used for
528         # measuring time in a way: we record this value in each segment we
529         # expire on this run, and then on a future run can tell if there have
530         # been intervening backups made.
531         cur.execute("select max(snapshotid) from snapshots")
532         last_snapshotid = cur.fetchone()[0]
533
534         # Get the list of old snapshots for this scheme.  Delete all the old
535         # ones.  Rules for what to keep:
536         #   - Always keep the most recent snapshot.
537         #   - If snapshot X is younger than Y, and X has higher intent, then Y
538         #     can be deleted.
539         cur.execute("""select snapshotid, name, intent,
540                               julianday('now') - timestamp as age
541                        from snapshots where scheme = ?
542                        order by age""", (scheme,))
543
544         first = True
545         max_intent = intent
546         for (id, name, snap_intent, snap_age) in cur.fetchall():
547             can_delete = False
548             if snap_intent < max_intent:
549                 # Delete small-intent snapshots if there is a more recent
550                 # large-intent snapshot.
551                 can_delete = True
552             elif snap_intent == intent:
553                 # Delete previous snapshots with the specified intent level.
554                 can_delete = True
555
556             if can_delete and not first:
557                 print "Delete snapshot %d (%s)" % (id, name)
558                 cur.execute("delete from snapshots where snapshotid = ?",
559                             (id,))
560             first = False
561             max_intent = max(max_intent, snap_intent)
562
563         # Delete entries in the segments_used table which are for non-existent
564         # snapshots.
565         cur.execute("""delete from segments_used
566                        where snapshotid not in
567                            (select snapshotid from snapshots)""")
568
569         # Find segments which contain no objects used by any current snapshots,
570         # and delete them from the segment table.
571         cur.execute("""delete from segments where segmentid not in
572                            (select segmentid from segments_used)""")
573
574         # Delete unused objects in the block_index table.  By "unused", we mean
575         # any object which was stored in a segment which has been deleted, and
576         # any object in a segment which was marked for cleaning and has had
577         # cleaning performed already (the expired time is less than the current
578         # largest snapshot id).
579         cur.execute("""delete from block_index
580                        where segmentid not in (select segmentid from segments)
581                           or segmentid in (select segmentid from segments
582                                            where expire_time < ?)""",
583                     (last_snapshotid,))
584
585         # Remove sub-block signatures for deleted objects.
586         cur.execute("""delete from subblock_signatures
587                        where blockid not in
588                            (select blockid from block_index)""")
589
590     # Segment cleaning.
591     class SegmentInfo(Struct): pass
592
593     def get_segment_cleaning_list(self, age_boost=0.0):
594         """Return a list of all current segments with information for cleaning.
595
596         Return all segments which are currently known in the local database
597         (there might be other, older segments in the archive itself), and
598         return usage statistics for each to help decide which segments to
599         clean.
600
601         The returned list will be sorted by estimated cleaning benefit, with
602         segments that are best to clean at the start of the list.
603
604         If specified, the age_boost parameter (measured in days) will added to
605         the age of each segment, as a way of adjusting the benefit computation
606         before a long-lived snapshot is taken (for example, age_boost might be
607         set to 7 when cleaning prior to taking a weekly snapshot).
608         """
609
610         cur = self.cursor()
611         segments = []
612         cur.execute("""select segmentid, used, size, mtime,
613                        julianday('now') - mtime as age from segment_info
614                        where expire_time is null""")
615         for row in cur:
616             info = self.SegmentInfo()
617             info.id = row[0]
618             info.used_bytes = row[1]
619             info.size_bytes = row[2]
620             info.mtime = row[3]
621             info.age_days = row[4]
622
623             # If data is not available for whatever reason, treat it as 0.0.
624             if info.age_days is None:
625                 info.age_days = 0.0
626             if info.used_bytes is None:
627                 info.used_bytes = 0.0
628
629             # Benefit calculation: u is the estimated fraction of each segment
630             # which is utilized (bytes belonging to objects still in use
631             # divided by total size; this doesn't take compression or storage
632             # overhead into account, but should give a reasonable estimate).
633             #
634             # The total benefit is a heuristic that combines several factors:
635             # the amount of space that can be reclaimed (1 - u), an ageing
636             # factor (info.age_days) that favors cleaning old segments to young
637             # ones and also is more likely to clean segments that will be
638             # rewritten for long-lived snapshots (age_boost), and finally a
639             # penalty factor for the cost of re-uploading data (u + 0.1).
640             u = info.used_bytes / info.size_bytes
641             info.cleaning_benefit \
642                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
643
644             segments.append(info)
645
646         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
647         return segments
648
649     def mark_segment_expired(self, segment):
650         """Mark a segment for cleaning in the local database.
651
652         The segment parameter should be either a SegmentInfo object or an
653         integer segment id.  Objects in the given segment will be marked as
654         expired, which means that any future snapshots that would re-use those
655         objects will instead write out a new copy of the object, and thus no
656         future snapshots will depend upon the given segment.
657         """
658
659         if isinstance(segment, int):
660             id = segment
661         elif isinstance(segment, self.SegmentInfo):
662             id = segment.id
663         else:
664             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
665
666         cur = self.cursor()
667         cur.execute("select max(snapshotid) from snapshots")
668         last_snapshotid = cur.fetchone()[0]
669         cur.execute("update segments set expire_time = ? where segmentid = ?",
670                     (last_snapshotid, id))
671         cur.execute("update block_index set expired = 0 where segmentid = ?",
672                     (id,))
673
674     def balance_expired_objects(self):
675         """Analyze expired objects in segments to be cleaned and group by age.
676
677         Update the block_index table of the local database to group expired
678         objects by age.  The exact number of buckets and the cutoffs for each
679         are dynamically determined.  Calling this function after marking
680         segments expired will help in the segment cleaning process, by ensuring
681         that when active objects from clean segments are rewritten, they will
682         be placed into new segments roughly grouped by age.
683         """
684
685         # The expired column of the block_index table is used when generating a
686         # new Cumulus snapshot.  A null value indicates that an object may be
687         # re-used.  Otherwise, an object must be written into a new segment if
688         # needed.  Objects with distinct expired values will be written into
689         # distinct segments, to allow for some grouping by age.  The value 0 is
690         # somewhat special in that it indicates any rewritten objects can be
691         # placed in the same segment as completely new objects; this can be
692         # used for very young objects which have been expired, or objects not
693         # expected to be encountered.
694         #
695         # In the balancing process, all objects which are not used in any
696         # current snapshots will have expired set to 0.  Objects which have
697         # been seen will be sorted by age and will have expired values set to
698         # 0, 1, 2, and so on based on age (with younger objects being assigned
699         # lower values).  The number of buckets and the age cutoffs is
700         # determined by looking at the distribution of block ages.
701
702         cur = self.cursor()
703
704         # Mark all expired objects with expired = 0; these objects will later
705         # have values set to indicate groupings of objects when repacking.
706         cur.execute("""update block_index set expired = 0
707                        where expired is not null""")
708
709         # We will want to aim for at least one full segment for each bucket
710         # that we eventually create, but don't know how many bytes that should
711         # be due to compression.  So compute the average number of bytes in
712         # each expired segment as a rough estimate for the minimum size of each
713         # bucket.  (This estimate could be thrown off by many not-fully-packed
714         # segments, but for now don't worry too much about that.)  If we can't
715         # compute an average, it's probably because there are no expired
716         # segments, so we have no more work to do.
717         cur.execute("""select avg(size) from segments
718                        where segmentid in
719                            (select distinct segmentid from block_index
720                             where expired is not null)""")
721         segment_size_estimate = cur.fetchone()[0]
722         if not segment_size_estimate:
723             return
724
725         # Next, extract distribution of expired objects (number and size) by
726         # age.  Save the timestamp for "now" so that the classification of
727         # blocks into age buckets will not change later in the function, after
728         # time has passed.  Set any timestamps in the future to now, so we are
729         # guaranteed that for the rest of this function, age is always
730         # non-negative.
731         cur.execute("select julianday('now')")
732         now = cur.fetchone()[0]
733
734         cur.execute("""update block_index set timestamp = ?
735                        where timestamp > ? and expired is not null""",
736                     (now, now))
737
738         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
739                        from block_index where expired = 0
740                        group by age order by age""", (now,))
741         distribution = cur.fetchall()
742
743         # Start to determine the buckets for expired objects.  Heuristics used:
744         #   - An upper bound on the number of buckets is given by the number of
745         #     segments we estimate it will take to store all data.  In fact,
746         #     aim for a couple of segments per bucket.
747         #   - Place very young objects in bucket 0 (place with new objects)
748         #     unless there are enough of them to warrant a separate bucket.
749         #   - Try not to create unnecessarily many buckets, since fewer buckets
750         #     will allow repacked data to be grouped based on spatial locality
751         #     (while more buckets will group by temporal locality).  We want a
752         #     balance.
753         MIN_AGE = 4
754         total_bytes = sum([i[2] for i in distribution])
755         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
756         min_size = 1.5 * segment_size_estimate
757         target_size = max(2 * segment_size_estimate,
758                           total_bytes / target_buckets)
759
760         print "segment_size:", segment_size_estimate
761         print "distribution:", distribution
762         print "total_bytes:", total_bytes
763         print "target_buckets:", target_buckets
764         print "min, target size:", min_size, target_size
765
766         # Chosen cutoffs.  Each bucket consists of objects with age greater
767         # than one cutoff value, but not greater than the next largest cutoff.
768         cutoffs = []
769
770         # Starting with the oldest objects, begin grouping together into
771         # buckets of size at least target_size bytes.
772         distribution.reverse()
773         bucket_size = 0
774         min_age_bucket = False
775         for (age, items, size) in distribution:
776             if bucket_size >= target_size \
777                 or (age < MIN_AGE and not min_age_bucket):
778                 if bucket_size < target_size and len(cutoffs) > 0:
779                     cutoffs.pop()
780                 cutoffs.append(age)
781                 bucket_size = 0
782
783             bucket_size += size
784             if age < MIN_AGE:
785                 min_age_bucket = True
786
787         # The last (youngest) bucket will be group 0, unless it has enough data
788         # to be of size min_size by itself, or there happen to be no objects
789         # less than MIN_AGE at all.
790         if bucket_size >= min_size or not min_age_bucket:
791             cutoffs.append(-1)
792         cutoffs.append(-1)
793
794         print "cutoffs:", cutoffs
795
796         # Update the database to assign each object to the appropriate bucket.
797         cutoffs.reverse()
798         for i in range(len(cutoffs)):
799             cur.execute("""update block_index set expired = ?
800                            where round(? - timestamp) > ?
801                              and expired is not null""",
802                         (i, now, cutoffs[i]))