Implement metadata caching for S3 backend.
[cumulus.git] / python / cumulus / __init__.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 import cumulus.store, cumulus.store.file
16
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 8)         # LBS Snapshot v0.8
19
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
22
23 # All segments which have been accessed this session.
24 accessed_segments = set()
25
26 # Table of methods used to filter segments before storage, and corresponding
27 # filename extensions.  These are listed in priority order (methods earlier in
28 # the list are tried first).
29 SEGMENT_FILTERS = [
30     (".gpg", "cumulus-filter-gpg --decrypt"),
31     (".gz", "gzip -dc"),
32     (".bz2", "bzip2 -dc"),
33 ]
34
35 def uri_decode(s):
36     """Decode a URI-encoded (%xx escapes) string."""
37     def hex_decode(m): return chr(int(m.group(1), 16))
38     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
39 def uri_encode(s):
40     """Encode a string to URI-encoded (%xx escapes) form."""
41     def hex_encode(c):
42         if c > '+' and c < '\x7f' and c != '@':
43             return c
44         else:
45             return "%%%02x" % (ord(c),)
46     return ''.join(hex_encode(c) for c in s)
47
48 class Struct:
49     """A class which merely acts as a data container.
50
51     Instances of this class (or its subclasses) are merely used to store data
52     in various attributes.  No methods are provided.
53     """
54
55     def __repr__(self):
56         return "<%s %s>" % (self.__class__, self.__dict__)
57
58 CHECKSUM_ALGORITHMS = {
59     'sha1': sha.new
60 }
61
62 class ChecksumCreator:
63     """Compute an LBS checksum for provided data.
64
65     The algorithm used is selectable, but currently defaults to sha1.
66     """
67
68     def __init__(self, algorithm='sha1'):
69         self.algorithm = algorithm
70         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
71
72     def update(self, data):
73         self.hash.update(data)
74         return self
75
76     def compute(self):
77         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
78
79 class ChecksumVerifier:
80     """Verify whether a checksum from a snapshot matches the supplied data."""
81
82     def __init__(self, checksumstr):
83         """Create an object to check the supplied checksum."""
84
85         (algo, checksum) = checksumstr.split("=", 1)
86         self.checksum = checksum
87         self.hash = CHECKSUM_ALGORITHMS[algo]()
88
89     def update(self, data):
90         self.hash.update(data)
91
92     def valid(self):
93         """Return a boolean indicating whether the checksum matches."""
94
95         result = self.hash.hexdigest()
96         return result == self.checksum
97
98 class LowlevelDataStore:
99     """Access to the backup store containing segments and snapshot descriptors.
100
101     Instances of this class are used to get direct filesystem-level access to
102     the backup data.  To read a backup, a caller will ordinarily not care about
103     direct access to backup segments, but will instead merely need to access
104     objects from those segments.  The ObjectStore class provides a suitable
105     wrapper around a DataStore to give this high-level access.
106     """
107
108     def __init__(self, path):
109         if isinstance(path, cumulus.store.Store):
110             self.store = path
111         elif path.find(":") >= 0:
112             self.store = cumulus.store.open(path)
113         else:
114             self.store = cumulus.store.file.FileStore(path)
115
116     def _classify(self, filename):
117         for (t, r) in cumulus.store.type_patterns.items():
118             if r.match(filename):
119                 return (t, filename)
120         return (None, filename)
121
122     def scan(self):
123         self.store.scan()
124
125     def lowlevel_open(self, filename):
126         """Return a file-like object for reading data from the given file."""
127
128         (type, filename) = self._classify(filename)
129         return self.store.get(type, filename)
130
131     def lowlevel_stat(self, filename):
132         """Return a dictionary of information about the given file.
133
134         Currently, the only defined field is 'size', giving the size of the
135         file in bytes.
136         """
137
138         (type, filename) = self._classify(filename)
139         return self.store.stat(type, filename)
140
141     # Slightly higher-level list methods.
142     def list_snapshots(self):
143         for f in self.store.list('snapshots'):
144             m = cumulus.store.type_patterns['snapshots'].match(f)
145             if m: yield m.group(1)
146
147     def list_segments(self):
148         for f in self.store.list('segments'):
149             m = cumulus.store.type_patterns['segments'].match(f)
150             if m: yield m.group(1)
151
152 class ObjectStore:
153     def __init__(self, data_store):
154         self.store = data_store
155         self.cachedir = None
156         self.CACHE_SIZE = 16
157         self.lru_list = []
158
159     def get_cachedir(self):
160         if self.cachedir is None:
161             self.cachedir = tempfile.mkdtemp(".lbs")
162         return self.cachedir
163
164     def cleanup(self):
165         if self.cachedir is not None:
166             # TODO: Avoid use of system, make this safer
167             os.system("rm -rf " + self.cachedir)
168         self.cachedir = None
169
170     @staticmethod
171     def parse_ref(refstr):
172         m = re.match(r"^zero\[(\d+)\]$", refstr)
173         if m:
174             return ("zero", None, None, (0, int(m.group(1))))
175
176         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
177         if not m: return
178
179         segment = m.group(1)
180         object = m.group(2)
181         checksum = m.group(3)
182         slice = m.group(4)
183
184         if checksum is not None:
185             checksum = checksum.lstrip("(").rstrip(")")
186
187         if slice is not None:
188             if m.group(9) is not None:
189                 # Size-assertion slice
190                 slice = (0, int(m.group(9)), True)
191             elif m.group(6) is None:
192                 # Abbreviated slice
193                 slice = (0, int(m.group(8)), False)
194             else:
195                 slice = (int(m.group(7)), int(m.group(8)), False)
196
197         return (segment, object, checksum, slice)
198
199     def get_segment(self, segment):
200         accessed_segments.add(segment)
201
202         for (extension, filter) in SEGMENT_FILTERS:
203             try:
204                 raw = self.store.lowlevel_open(segment + ".tar" + extension)
205
206                 (input, output) = os.popen2(filter)
207                 def copy_thread(src, dst):
208                     BLOCK_SIZE = 4096
209                     while True:
210                         block = src.read(BLOCK_SIZE)
211                         if len(block) == 0: break
212                         dst.write(block)
213                     dst.close()
214
215                 thread.start_new_thread(copy_thread, (raw, input))
216                 return output
217             except:
218                 pass
219
220         raise cumulus.store.NotFoundError
221
222     def load_segment(self, segment):
223         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
224         for item in seg:
225             data_obj = seg.extractfile(item)
226             path = item.name.split('/')
227             if len(path) == 2 and path[0] == segment:
228                 yield (path[1], data_obj.read())
229
230     def load_snapshot(self, snapshot):
231         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
232         return file.read().splitlines(True)
233
234     def extract_segment(self, segment):
235         segdir = os.path.join(self.get_cachedir(), segment)
236         os.mkdir(segdir)
237         for (object, data) in self.load_segment(segment):
238             f = open(os.path.join(segdir, object), 'wb')
239             f.write(data)
240             f.close()
241
242     def load_object(self, segment, object):
243         accessed_segments.add(segment)
244         path = os.path.join(self.get_cachedir(), segment, object)
245         if not os.access(path, os.R_OK):
246             self.extract_segment(segment)
247         if segment in self.lru_list: self.lru_list.remove(segment)
248         self.lru_list.append(segment)
249         while len(self.lru_list) > self.CACHE_SIZE:
250             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
251             self.lru_list = self.lru_list[1:]
252         return open(path, 'rb').read()
253
254     def get(self, refstr):
255         """Fetch the given object and return it.
256
257         The input should be an object reference, in string form.
258         """
259
260         (segment, object, checksum, slice) = self.parse_ref(refstr)
261
262         if segment == "zero":
263             return "\0" * slice[1]
264
265         data = self.load_object(segment, object)
266
267         if checksum is not None:
268             verifier = ChecksumVerifier(checksum)
269             verifier.update(data)
270             if not verifier.valid():
271                 raise ValueError
272
273         if slice is not None:
274             (start, length, exact) = slice
275             if exact and len(data) != length: raise ValueError
276             data = data[start:start+length]
277             if len(data) != length: raise IndexError
278
279         return data
280
281 def parse(lines, terminate=None):
282     """Generic parser for RFC822-style "Key: Value" data streams.
283
284     This parser can be used to read metadata logs and snapshot root descriptor
285     files.
286
287     lines must be an iterable object which yields a sequence of lines of input.
288
289     If terminate is specified, it is used as a predicate to determine when to
290     stop reading input lines.
291     """
292
293     dict = {}
294     last_key = None
295
296     for l in lines:
297         # Strip off a trailing newline, if present
298         if len(l) > 0 and l[-1] == "\n":
299             l = l[:-1]
300
301         if terminate is not None and terminate(l):
302             if len(dict) > 0: yield dict
303             dict = {}
304             last_key = None
305             continue
306
307         m = re.match(r"^([-\w]+):\s*(.*)$", l)
308         if m:
309             dict[m.group(1)] = m.group(2)
310             last_key = m.group(1)
311         elif len(l) > 0 and l[0].isspace() and last_key is not None:
312             dict[last_key] += l
313         else:
314             last_key = None
315
316     if len(dict) > 0: yield dict
317
318 def parse_full(lines):
319     try:
320         return parse(lines).next()
321     except StopIteration:
322         return {}
323
324 def parse_metadata_version(s):
325     """Convert a string with the snapshot version format to a tuple."""
326
327     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
328     if m is None:
329         return ()
330     else:
331         return tuple([int(d) for d in m.group(1).split(".")])
332
333 def read_metadata(object_store, root):
334     """Iterate through all lines in the metadata log, following references."""
335
336     # Stack for keeping track of recursion when following references to
337     # portions of the log.  The last entry in the stack corresponds to the
338     # object currently being parsed.  Each entry is a list of lines which have
339     # been reversed, so that popping successive lines from the end of each list
340     # will return lines of the metadata log in order.
341     stack = []
342
343     def follow_ref(refstr):
344         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
345         lines = object_store.get(refstr).splitlines(True)
346         lines.reverse()
347         stack.append(lines)
348
349     follow_ref(root)
350
351     while len(stack) > 0:
352         top = stack[-1]
353         if len(top) == 0:
354             stack.pop()
355             continue
356         line = top.pop()
357
358         # An indirect reference which we must follow?
359         if len(line) > 0 and line[0] == '@':
360             ref = line[1:]
361             ref.strip()
362             follow_ref(ref)
363         else:
364             yield line
365
366 class MetadataItem:
367     """Metadata for a single file (or directory or...) from a snapshot."""
368
369     # Functions for parsing various datatypes that can appear in a metadata log
370     # item.
371     @staticmethod
372     def decode_int(s):
373         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
374         if s.startswith("0x"):
375             return int(s, 16)
376         elif s.startswith("0"):
377             return int(s, 8)
378         else:
379             return int(s, 10)
380
381     @staticmethod
382     def decode_str(s):
383         """Decode a URI-encoded (%xx escapes) string."""
384         return uri_decode(s)
385
386     @staticmethod
387     def raw_str(s):
388         """An unecoded string."""
389         return s
390
391     @staticmethod
392     def decode_user(s):
393         """Decode a user/group to a tuple of uid/gid followed by name."""
394         items = s.split()
395         uid = MetadataItem.decode_int(items[0])
396         name = None
397         if len(items) > 1:
398             if items[1].startswith("(") and items[1].endswith(")"):
399                 name = MetadataItem.decode_str(items[1][1:-1])
400         return (uid, name)
401
402     @staticmethod
403     def decode_device(s):
404         """Decode a device major/minor number."""
405         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
406         return (major, minor)
407
408     class Items: pass
409
410     def __init__(self, fields, object_store):
411         """Initialize from a dictionary of key/value pairs from metadata log."""
412
413         self.fields = fields
414         self.object_store = object_store
415         self.keys = []
416         self.items = self.Items()
417         for (k, v) in fields.items():
418             if k in self.field_types:
419                 decoder = self.field_types[k]
420                 setattr(self.items, k, decoder(v))
421                 self.keys.append(k)
422
423     def data(self):
424         """Return an iterator for the data blocks that make up a file."""
425
426         # This traverses the list of blocks that make up a file, following
427         # indirect references.  It is implemented in much the same way as
428         # read_metadata, so see that function for details of the technique.
429
430         objects = self.fields['data'].split()
431         objects.reverse()
432         stack = [objects]
433
434         def follow_ref(refstr):
435             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
436             objects = self.object_store.get(refstr).split()
437             objects.reverse()
438             stack.append(objects)
439
440         while len(stack) > 0:
441             top = stack[-1]
442             if len(top) == 0:
443                 stack.pop()
444                 continue
445             ref = top.pop()
446
447             # An indirect reference which we must follow?
448             if len(ref) > 0 and ref[0] == '@':
449                 follow_ref(ref[1:])
450             else:
451                 yield ref
452
453 # Description of fields that might appear, and how they should be parsed.
454 MetadataItem.field_types = {
455     'name': MetadataItem.decode_str,
456     'type': MetadataItem.raw_str,
457     'mode': MetadataItem.decode_int,
458     'device': MetadataItem.decode_device,
459     'user': MetadataItem.decode_user,
460     'group': MetadataItem.decode_user,
461     'ctime': MetadataItem.decode_int,
462     'mtime': MetadataItem.decode_int,
463     'links': MetadataItem.decode_int,
464     'inode': MetadataItem.raw_str,
465     'checksum': MetadataItem.decode_str,
466     'size': MetadataItem.decode_int,
467     'contents': MetadataItem.decode_str,
468     'target': MetadataItem.decode_str,
469 }
470
471 def iterate_metadata(object_store, root):
472     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
473         yield MetadataItem(d, object_store)
474
475 class LocalDatabase:
476     """Access to the local database of snapshot contents and object checksums.
477
478     The local database is consulted when creating a snapshot to determine what
479     data can be re-used from old snapshots.  Segment cleaning is performed by
480     manipulating the data in the local database; the local database also
481     includes enough data to guide the segment cleaning process.
482     """
483
484     def __init__(self, path, dbname="localdb.sqlite"):
485         self.db_connection = sqlite3.connect(path + "/" + dbname)
486
487     # Low-level database access.  Use these methods when there isn't a
488     # higher-level interface available.  Exception: do, however, remember to
489     # use the commit() method after making changes to make sure they are
490     # actually saved, even when going through higher-level interfaces.
491     def commit(self):
492         "Commit any pending changes to the local database."
493         self.db_connection.commit()
494
495     def rollback(self):
496         "Roll back any pending changes to the local database."
497         self.db_connection.rollback()
498
499     def cursor(self):
500         "Return a DB-API cursor for directly accessing the local database."
501         return self.db_connection.cursor()
502
503     def list_schemes(self):
504         """Return the list of snapshots found in the local database.
505
506         The returned value is a list of tuples (id, scheme, name, time, intent).
507         """
508
509         cur = self.cursor()
510         cur.execute("select distinct scheme from snapshots")
511         schemes = [row[0] for row in cur.fetchall()]
512         schemes.sort()
513         return schemes
514
515     def garbage_collect(self, scheme, intent=1.0):
516         """Delete entries from old snapshots from the database.
517
518         Only snapshots with the specified scheme name will be deleted.  If
519         intent is given, it gives the intended next snapshot type, to determine
520         how aggressively to clean (for example, intent=7 could be used if the
521         next snapshot will be a weekly snapshot).
522         """
523
524         cur = self.cursor()
525
526         # Find the id of the last snapshot to be created.  This is used for
527         # measuring time in a way: we record this value in each segment we
528         # expire on this run, and then on a future run can tell if there have
529         # been intervening backups made.
530         cur.execute("select max(snapshotid) from snapshots")
531         last_snapshotid = cur.fetchone()[0]
532
533         # Get the list of old snapshots for this scheme.  Delete all the old
534         # ones.  Rules for what to keep:
535         #   - Always keep the most recent snapshot.
536         #   - If snapshot X is younger than Y, and X has higher intent, then Y
537         #     can be deleted.
538         cur.execute("""select snapshotid, name, intent,
539                               julianday('now') - timestamp as age
540                        from snapshots where scheme = ?
541                        order by age""", (scheme,))
542
543         first = True
544         max_intent = intent
545         for (id, name, snap_intent, snap_age) in cur.fetchall():
546             can_delete = False
547             if snap_intent < max_intent:
548                 # Delete small-intent snapshots if there is a more recent
549                 # large-intent snapshot.
550                 can_delete = True
551             elif snap_intent == intent:
552                 # Delete previous snapshots with the specified intent level.
553                 can_delete = True
554
555             if can_delete and not first:
556                 print "Delete snapshot %d (%s)" % (id, name)
557                 cur.execute("delete from snapshots where snapshotid = ?",
558                             (id,))
559             first = False
560             max_intent = max(max_intent, snap_intent)
561
562         # Delete entries in the segments_used table which are for non-existent
563         # snapshots.
564         cur.execute("""delete from segments_used
565                        where snapshotid not in
566                            (select snapshotid from snapshots)""")
567
568         # Find segments which contain no objects used by any current snapshots,
569         # and delete them from the segment table.
570         cur.execute("""delete from segments where segmentid not in
571                            (select segmentid from segments_used)""")
572
573         # Delete unused objects in the block_index table.  By "unused", we mean
574         # any object which was stored in a segment which has been deleted, and
575         # any object in a segment which was marked for cleaning and has had
576         # cleaning performed already (the expired time is less than the current
577         # largest snapshot id).
578         cur.execute("""delete from block_index
579                        where segmentid not in (select segmentid from segments)
580                           or segmentid in (select segmentid from segments
581                                            where expire_time < ?)""",
582                     (last_snapshotid,))
583
584         # Remove sub-block signatures for deleted objects.
585         cur.execute("""delete from subblock_signatures
586                        where blockid not in
587                            (select blockid from block_index)""")
588
589     # Segment cleaning.
590     class SegmentInfo(Struct): pass
591
592     def get_segment_cleaning_list(self, age_boost=0.0):
593         """Return a list of all current segments with information for cleaning.
594
595         Return all segments which are currently known in the local database
596         (there might be other, older segments in the archive itself), and
597         return usage statistics for each to help decide which segments to
598         clean.
599
600         The returned list will be sorted by estimated cleaning benefit, with
601         segments that are best to clean at the start of the list.
602
603         If specified, the age_boost parameter (measured in days) will added to
604         the age of each segment, as a way of adjusting the benefit computation
605         before a long-lived snapshot is taken (for example, age_boost might be
606         set to 7 when cleaning prior to taking a weekly snapshot).
607         """
608
609         cur = self.cursor()
610         segments = []
611         cur.execute("""select segmentid, used, size, mtime,
612                        julianday('now') - mtime as age from segment_info
613                        where expire_time is null""")
614         for row in cur:
615             info = self.SegmentInfo()
616             info.id = row[0]
617             info.used_bytes = row[1]
618             info.size_bytes = row[2]
619             info.mtime = row[3]
620             info.age_days = row[4]
621
622             # If data is not available for whatever reason, treat it as 0.0.
623             if info.age_days is None:
624                 info.age_days = 0.0
625             if info.used_bytes is None:
626                 info.used_bytes = 0.0
627
628             # Benefit calculation: u is the estimated fraction of each segment
629             # which is utilized (bytes belonging to objects still in use
630             # divided by total size; this doesn't take compression or storage
631             # overhead into account, but should give a reasonable estimate).
632             #
633             # The total benefit is a heuristic that combines several factors:
634             # the amount of space that can be reclaimed (1 - u), an ageing
635             # factor (info.age_days) that favors cleaning old segments to young
636             # ones and also is more likely to clean segments that will be
637             # rewritten for long-lived snapshots (age_boost), and finally a
638             # penalty factor for the cost of re-uploading data (u + 0.1).
639             u = info.used_bytes / info.size_bytes
640             info.cleaning_benefit \
641                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
642
643             segments.append(info)
644
645         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
646         return segments
647
648     def mark_segment_expired(self, segment):
649         """Mark a segment for cleaning in the local database.
650
651         The segment parameter should be either a SegmentInfo object or an
652         integer segment id.  Objects in the given segment will be marked as
653         expired, which means that any future snapshots that would re-use those
654         objects will instead write out a new copy of the object, and thus no
655         future snapshots will depend upon the given segment.
656         """
657
658         if isinstance(segment, int):
659             id = segment
660         elif isinstance(segment, self.SegmentInfo):
661             id = segment.id
662         else:
663             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
664
665         cur = self.cursor()
666         cur.execute("select max(snapshotid) from snapshots")
667         last_snapshotid = cur.fetchone()[0]
668         cur.execute("update segments set expire_time = ? where segmentid = ?",
669                     (last_snapshotid, id))
670         cur.execute("update block_index set expired = 0 where segmentid = ?",
671                     (id,))
672
673     def balance_expired_objects(self):
674         """Analyze expired objects in segments to be cleaned and group by age.
675
676         Update the block_index table of the local database to group expired
677         objects by age.  The exact number of buckets and the cutoffs for each
678         are dynamically determined.  Calling this function after marking
679         segments expired will help in the segment cleaning process, by ensuring
680         that when active objects from clean segments are rewritten, they will
681         be placed into new segments roughly grouped by age.
682         """
683
684         # The expired column of the block_index table is used when generating a
685         # new LBS snapshot.  A null value indicates that an object may be
686         # re-used.  Otherwise, an object must be written into a new segment if
687         # needed.  Objects with distinct expired values will be written into
688         # distinct segments, to allow for some grouping by age.  The value 0 is
689         # somewhat special in that it indicates any rewritten objects can be
690         # placed in the same segment as completely new objects; this can be
691         # used for very young objects which have been expired, or objects not
692         # expected to be encountered.
693         #
694         # In the balancing process, all objects which are not used in any
695         # current snapshots will have expired set to 0.  Objects which have
696         # been seen will be sorted by age and will have expired values set to
697         # 0, 1, 2, and so on based on age (with younger objects being assigned
698         # lower values).  The number of buckets and the age cutoffs is
699         # determined by looking at the distribution of block ages.
700
701         cur = self.cursor()
702
703         # Mark all expired objects with expired = 0; these objects will later
704         # have values set to indicate groupings of objects when repacking.
705         cur.execute("""update block_index set expired = 0
706                        where expired is not null""")
707
708         # We will want to aim for at least one full segment for each bucket
709         # that we eventually create, but don't know how many bytes that should
710         # be due to compression.  So compute the average number of bytes in
711         # each expired segment as a rough estimate for the minimum size of each
712         # bucket.  (This estimate could be thrown off by many not-fully-packed
713         # segments, but for now don't worry too much about that.)  If we can't
714         # compute an average, it's probably because there are no expired
715         # segments, so we have no more work to do.
716         cur.execute("""select avg(size) from segments
717                        where segmentid in
718                            (select distinct segmentid from block_index
719                             where expired is not null)""")
720         segment_size_estimate = cur.fetchone()[0]
721         if not segment_size_estimate:
722             return
723
724         # Next, extract distribution of expired objects (number and size) by
725         # age.  Save the timestamp for "now" so that the classification of
726         # blocks into age buckets will not change later in the function, after
727         # time has passed.  Set any timestamps in the future to now, so we are
728         # guaranteed that for the rest of this function, age is always
729         # non-negative.
730         cur.execute("select julianday('now')")
731         now = cur.fetchone()[0]
732
733         cur.execute("""update block_index set timestamp = ?
734                        where timestamp > ? and expired is not null""",
735                     (now, now))
736
737         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
738                        from block_index where expired = 0
739                        group by age order by age""", (now,))
740         distribution = cur.fetchall()
741
742         # Start to determine the buckets for expired objects.  Heuristics used:
743         #   - An upper bound on the number of buckets is given by the number of
744         #     segments we estimate it will take to store all data.  In fact,
745         #     aim for a couple of segments per bucket.
746         #   - Place very young objects in bucket 0 (place with new objects)
747         #     unless there are enough of them to warrant a separate bucket.
748         #   - Try not to create unnecessarily many buckets, since fewer buckets
749         #     will allow repacked data to be grouped based on spatial locality
750         #     (while more buckets will group by temporal locality).  We want a
751         #     balance.
752         MIN_AGE = 4
753         total_bytes = sum([i[2] for i in distribution])
754         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
755         min_size = 1.5 * segment_size_estimate
756         target_size = max(2 * segment_size_estimate,
757                           total_bytes / target_buckets)
758
759         print "segment_size:", segment_size_estimate
760         print "distribution:", distribution
761         print "total_bytes:", total_bytes
762         print "target_buckets:", target_buckets
763         print "min, target size:", min_size, target_size
764
765         # Chosen cutoffs.  Each bucket consists of objects with age greater
766         # than one cutoff value, but not greater than the next largest cutoff.
767         cutoffs = []
768
769         # Starting with the oldest objects, begin grouping together into
770         # buckets of size at least target_size bytes.
771         distribution.reverse()
772         bucket_size = 0
773         min_age_bucket = False
774         for (age, items, size) in distribution:
775             if bucket_size >= target_size \
776                 or (age < MIN_AGE and not min_age_bucket):
777                 if bucket_size < target_size and len(cutoffs) > 0:
778                     cutoffs.pop()
779                 cutoffs.append(age)
780                 bucket_size = 0
781
782             bucket_size += size
783             if age < MIN_AGE:
784                 min_age_bucket = True
785
786         # The last (youngest) bucket will be group 0, unless it has enough data
787         # to be of size min_size by itself, or there happen to be no objects
788         # less than MIN_AGE at all.
789         if bucket_size >= min_size or not min_age_bucket:
790             cutoffs.append(-1)
791         cutoffs.append(-1)
792
793         print "cutoffs:", cutoffs
794
795         # Update the database to assign each object to the appropriate bucket.
796         cutoffs.reverse()
797         for i in range(len(cutoffs)):
798             cur.execute("""update block_index set expired = ?
799                            where round(? - timestamp) > ?
800                              and expired is not null""",
801                         (i, now, cutoffs[i]))