51755c70683aa0a8f129a10f223b84cb50df006b
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division
30 import hashlib, os, re, tarfile, tempfile, thread
31 from pysqlite2 import dbapi2 as sqlite3
32
33 import cumulus.store, cumulus.store.file
34
35 # The largest supported snapshot format that can be understood.
36 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
37
38 # Maximum number of nested indirect references allowed in a snapshot.
39 MAX_RECURSION_DEPTH = 3
40
41 # All segments which have been accessed this session.
42 accessed_segments = set()
43
44 # Table of methods used to filter segments before storage, and corresponding
45 # filename extensions.  These are listed in priority order (methods earlier in
46 # the list are tried first).
47 SEGMENT_FILTERS = [
48     (".gpg", "cumulus-filter-gpg --decrypt"),
49     (".gz", "gzip -dc"),
50     (".bz2", "bzip2 -dc"),
51 ]
52
53 def uri_decode(s):
54     """Decode a URI-encoded (%xx escapes) string."""
55     def hex_decode(m): return chr(int(m.group(1), 16))
56     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
57 def uri_encode(s):
58     """Encode a string to URI-encoded (%xx escapes) form."""
59     def hex_encode(c):
60         if c > '+' and c < '\x7f' and c != '@':
61             return c
62         else:
63             return "%%%02x" % (ord(c),)
64     return ''.join(hex_encode(c) for c in s)
65
66 class Struct:
67     """A class which merely acts as a data container.
68
69     Instances of this class (or its subclasses) are merely used to store data
70     in various attributes.  No methods are provided.
71     """
72
73     def __repr__(self):
74         return "<%s %s>" % (self.__class__, self.__dict__)
75
76 CHECKSUM_ALGORITHMS = {
77     'sha1': hashlib.sha1,
78     'sha224': hashlib.sha224,
79     'sha256': hashlib.sha256,
80 }
81
82 class ChecksumCreator:
83     """Compute a Cumulus checksum for provided data.
84
85     The algorithm used is selectable, but currently defaults to sha1.
86     """
87
88     def __init__(self, algorithm='sha1'):
89         self.algorithm = algorithm
90         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
91
92     def update(self, data):
93         self.hash.update(data)
94         return self
95
96     def compute(self):
97         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
98
99 class ChecksumVerifier:
100     """Verify whether a checksum from a snapshot matches the supplied data."""
101
102     def __init__(self, checksumstr):
103         """Create an object to check the supplied checksum."""
104
105         (algo, checksum) = checksumstr.split("=", 1)
106         self.checksum = checksum
107         self.hash = CHECKSUM_ALGORITHMS[algo]()
108
109     def update(self, data):
110         self.hash.update(data)
111
112     def valid(self):
113         """Return a boolean indicating whether the checksum matches."""
114
115         result = self.hash.hexdigest()
116         return result == self.checksum
117
118 class LowlevelDataStore:
119     """Access to the backup store containing segments and snapshot descriptors.
120
121     Instances of this class are used to get direct filesystem-level access to
122     the backup data.  To read a backup, a caller will ordinarily not care about
123     direct access to backup segments, but will instead merely need to access
124     objects from those segments.  The ObjectStore class provides a suitable
125     wrapper around a DataStore to give this high-level access.
126     """
127
128     def __init__(self, path):
129         if isinstance(path, cumulus.store.Store):
130             self.store = path
131         elif path.find(":") >= 0:
132             self.store = cumulus.store.open(path)
133         else:
134             self.store = cumulus.store.file.FileStore(path)
135
136     def _classify(self, filename):
137         for (t, r) in cumulus.store.type_patterns.items():
138             if r.match(filename):
139                 return (t, filename)
140         return (None, filename)
141
142     def scan(self):
143         self.store.scan()
144
145     def lowlevel_open(self, filename):
146         """Return a file-like object for reading data from the given file."""
147
148         (type, filename) = self._classify(filename)
149         return self.store.get(type, filename)
150
151     def lowlevel_stat(self, filename):
152         """Return a dictionary of information about the given file.
153
154         Currently, the only defined field is 'size', giving the size of the
155         file in bytes.
156         """
157
158         (type, filename) = self._classify(filename)
159         return self.store.stat(type, filename)
160
161     # Slightly higher-level list methods.
162     def list_snapshots(self):
163         for f in self.store.list('snapshots'):
164             m = cumulus.store.type_patterns['snapshots'].match(f)
165             if m: yield m.group(1)
166
167     def list_segments(self):
168         for f in self.store.list('segments'):
169             m = cumulus.store.type_patterns['segments'].match(f)
170             if m: yield m.group(1)
171
172 class ObjectStore:
173     def __init__(self, data_store):
174         self.store = data_store
175         self.cachedir = None
176         self.CACHE_SIZE = 16
177         self.lru_list = []
178
179     def get_cachedir(self):
180         if self.cachedir is None:
181             self.cachedir = tempfile.mkdtemp(".lbs")
182         return self.cachedir
183
184     def cleanup(self):
185         if self.cachedir is not None:
186             # TODO: Avoid use of system, make this safer
187             os.system("rm -rf " + self.cachedir)
188         self.cachedir = None
189
190     @staticmethod
191     def parse_ref(refstr):
192         m = re.match(r"^zero\[(\d+)\]$", refstr)
193         if m:
194             return ("zero", None, None, (0, int(m.group(1))))
195
196         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
197         if not m: return
198
199         segment = m.group(1)
200         object = m.group(2)
201         checksum = m.group(3)
202         slice = m.group(4)
203
204         if checksum is not None:
205             checksum = checksum.lstrip("(").rstrip(")")
206
207         if slice is not None:
208             if m.group(9) is not None:
209                 # Size-assertion slice
210                 slice = (0, int(m.group(9)), True)
211             elif m.group(6) is None:
212                 # Abbreviated slice
213                 slice = (0, int(m.group(8)), False)
214             else:
215                 slice = (int(m.group(7)), int(m.group(8)), False)
216
217         return (segment, object, checksum, slice)
218
219     def get_segment(self, segment):
220         accessed_segments.add(segment)
221
222         for (extension, filter) in SEGMENT_FILTERS:
223             try:
224                 raw = self.store.lowlevel_open(segment + ".tar" + extension)
225
226                 (input, output) = os.popen2(filter)
227                 def copy_thread(src, dst):
228                     BLOCK_SIZE = 4096
229                     while True:
230                         block = src.read(BLOCK_SIZE)
231                         if len(block) == 0: break
232                         dst.write(block)
233                     dst.close()
234
235                 thread.start_new_thread(copy_thread, (raw, input))
236                 return output
237             except:
238                 pass
239
240         raise cumulus.store.NotFoundError
241
242     def load_segment(self, segment):
243         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
244         for item in seg:
245             data_obj = seg.extractfile(item)
246             path = item.name.split('/')
247             if len(path) == 2 and path[0] == segment:
248                 yield (path[1], data_obj.read())
249
250     def load_snapshot(self, snapshot):
251         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
252         return file.read().splitlines(True)
253
254     def extract_segment(self, segment):
255         segdir = os.path.join(self.get_cachedir(), segment)
256         os.mkdir(segdir)
257         for (object, data) in self.load_segment(segment):
258             f = open(os.path.join(segdir, object), 'wb')
259             f.write(data)
260             f.close()
261
262     def load_object(self, segment, object):
263         accessed_segments.add(segment)
264         path = os.path.join(self.get_cachedir(), segment, object)
265         if not os.access(path, os.R_OK):
266             self.extract_segment(segment)
267         if segment in self.lru_list: self.lru_list.remove(segment)
268         self.lru_list.append(segment)
269         while len(self.lru_list) > self.CACHE_SIZE:
270             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
271             self.lru_list = self.lru_list[1:]
272         return open(path, 'rb').read()
273
274     def get(self, refstr):
275         """Fetch the given object and return it.
276
277         The input should be an object reference, in string form.
278         """
279
280         (segment, object, checksum, slice) = self.parse_ref(refstr)
281
282         if segment == "zero":
283             return "\0" * slice[1]
284
285         data = self.load_object(segment, object)
286
287         if checksum is not None:
288             verifier = ChecksumVerifier(checksum)
289             verifier.update(data)
290             if not verifier.valid():
291                 raise ValueError
292
293         if slice is not None:
294             (start, length, exact) = slice
295             if exact and len(data) != length: raise ValueError
296             data = data[start:start+length]
297             if len(data) != length: raise IndexError
298
299         return data
300
301 def parse(lines, terminate=None):
302     """Generic parser for RFC822-style "Key: Value" data streams.
303
304     This parser can be used to read metadata logs and snapshot root descriptor
305     files.
306
307     lines must be an iterable object which yields a sequence of lines of input.
308
309     If terminate is specified, it is used as a predicate to determine when to
310     stop reading input lines.
311     """
312
313     dict = {}
314     last_key = None
315
316     for l in lines:
317         # Strip off a trailing newline, if present
318         if len(l) > 0 and l[-1] == "\n":
319             l = l[:-1]
320
321         if terminate is not None and terminate(l):
322             if len(dict) > 0: yield dict
323             dict = {}
324             last_key = None
325             continue
326
327         m = re.match(r"^([-\w]+):\s*(.*)$", l)
328         if m:
329             dict[m.group(1)] = m.group(2)
330             last_key = m.group(1)
331         elif len(l) > 0 and l[0].isspace() and last_key is not None:
332             dict[last_key] += l
333         else:
334             last_key = None
335
336     if len(dict) > 0: yield dict
337
338 def parse_full(lines):
339     try:
340         return parse(lines).next()
341     except StopIteration:
342         return {}
343
344 def parse_metadata_version(s):
345     """Convert a string with the snapshot version format to a tuple."""
346
347     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
348     if m is None:
349         return ()
350     else:
351         return tuple([int(d) for d in m.group(1).split(".")])
352
353 def read_metadata(object_store, root):
354     """Iterate through all lines in the metadata log, following references."""
355
356     # Stack for keeping track of recursion when following references to
357     # portions of the log.  The last entry in the stack corresponds to the
358     # object currently being parsed.  Each entry is a list of lines which have
359     # been reversed, so that popping successive lines from the end of each list
360     # will return lines of the metadata log in order.
361     stack = []
362
363     def follow_ref(refstr):
364         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
365         lines = object_store.get(refstr).splitlines(True)
366         lines.reverse()
367         stack.append(lines)
368
369     follow_ref(root)
370
371     while len(stack) > 0:
372         top = stack[-1]
373         if len(top) == 0:
374             stack.pop()
375             continue
376         line = top.pop()
377
378         # An indirect reference which we must follow?
379         if len(line) > 0 and line[0] == '@':
380             ref = line[1:]
381             ref.strip()
382             follow_ref(ref)
383         else:
384             yield line
385
386 class MetadataItem:
387     """Metadata for a single file (or directory or...) from a snapshot."""
388
389     # Functions for parsing various datatypes that can appear in a metadata log
390     # item.
391     @staticmethod
392     def decode_int(s):
393         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
394         if s.startswith("0x"):
395             return int(s, 16)
396         elif s.startswith("0"):
397             return int(s, 8)
398         else:
399             return int(s, 10)
400
401     @staticmethod
402     def decode_str(s):
403         """Decode a URI-encoded (%xx escapes) string."""
404         return uri_decode(s)
405
406     @staticmethod
407     def raw_str(s):
408         """An unecoded string."""
409         return s
410
411     @staticmethod
412     def decode_user(s):
413         """Decode a user/group to a tuple of uid/gid followed by name."""
414         items = s.split()
415         uid = MetadataItem.decode_int(items[0])
416         name = None
417         if len(items) > 1:
418             if items[1].startswith("(") and items[1].endswith(")"):
419                 name = MetadataItem.decode_str(items[1][1:-1])
420         return (uid, name)
421
422     @staticmethod
423     def decode_device(s):
424         """Decode a device major/minor number."""
425         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
426         return (major, minor)
427
428     class Items: pass
429
430     def __init__(self, fields, object_store):
431         """Initialize from a dictionary of key/value pairs from metadata log."""
432
433         self.fields = fields
434         self.object_store = object_store
435         self.keys = []
436         self.items = self.Items()
437         for (k, v) in fields.items():
438             if k in self.field_types:
439                 decoder = self.field_types[k]
440                 setattr(self.items, k, decoder(v))
441                 self.keys.append(k)
442
443     def data(self):
444         """Return an iterator for the data blocks that make up a file."""
445
446         # This traverses the list of blocks that make up a file, following
447         # indirect references.  It is implemented in much the same way as
448         # read_metadata, so see that function for details of the technique.
449
450         objects = self.fields['data'].split()
451         objects.reverse()
452         stack = [objects]
453
454         def follow_ref(refstr):
455             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
456             objects = self.object_store.get(refstr).split()
457             objects.reverse()
458             stack.append(objects)
459
460         while len(stack) > 0:
461             top = stack[-1]
462             if len(top) == 0:
463                 stack.pop()
464                 continue
465             ref = top.pop()
466
467             # An indirect reference which we must follow?
468             if len(ref) > 0 and ref[0] == '@':
469                 follow_ref(ref[1:])
470             else:
471                 yield ref
472
473 # Description of fields that might appear, and how they should be parsed.
474 MetadataItem.field_types = {
475     'name': MetadataItem.decode_str,
476     'type': MetadataItem.raw_str,
477     'mode': MetadataItem.decode_int,
478     'device': MetadataItem.decode_device,
479     'user': MetadataItem.decode_user,
480     'group': MetadataItem.decode_user,
481     'ctime': MetadataItem.decode_int,
482     'mtime': MetadataItem.decode_int,
483     'links': MetadataItem.decode_int,
484     'inode': MetadataItem.raw_str,
485     'checksum': MetadataItem.decode_str,
486     'size': MetadataItem.decode_int,
487     'contents': MetadataItem.decode_str,
488     'target': MetadataItem.decode_str,
489 }
490
491 def iterate_metadata(object_store, root):
492     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
493         yield MetadataItem(d, object_store)
494
495 class LocalDatabase:
496     """Access to the local database of snapshot contents and object checksums.
497
498     The local database is consulted when creating a snapshot to determine what
499     data can be re-used from old snapshots.  Segment cleaning is performed by
500     manipulating the data in the local database; the local database also
501     includes enough data to guide the segment cleaning process.
502     """
503
504     def __init__(self, path, dbname="localdb.sqlite"):
505         self.db_connection = sqlite3.connect(path + "/" + dbname)
506
507     # Low-level database access.  Use these methods when there isn't a
508     # higher-level interface available.  Exception: do, however, remember to
509     # use the commit() method after making changes to make sure they are
510     # actually saved, even when going through higher-level interfaces.
511     def commit(self):
512         "Commit any pending changes to the local database."
513         self.db_connection.commit()
514
515     def rollback(self):
516         "Roll back any pending changes to the local database."
517         self.db_connection.rollback()
518
519     def cursor(self):
520         "Return a DB-API cursor for directly accessing the local database."
521         return self.db_connection.cursor()
522
523     def list_schemes(self):
524         """Return the list of snapshots found in the local database.
525
526         The returned value is a list of tuples (id, scheme, name, time, intent).
527         """
528
529         cur = self.cursor()
530         cur.execute("select distinct scheme from snapshots")
531         schemes = [row[0] for row in cur.fetchall()]
532         schemes.sort()
533         return schemes
534
535     def garbage_collect(self, scheme, intent=1.0):
536         """Delete entries from old snapshots from the database.
537
538         Only snapshots with the specified scheme name will be deleted.  If
539         intent is given, it gives the intended next snapshot type, to determine
540         how aggressively to clean (for example, intent=7 could be used if the
541         next snapshot will be a weekly snapshot).
542         """
543
544         cur = self.cursor()
545
546         # Find the id of the last snapshot to be created.  This is used for
547         # measuring time in a way: we record this value in each segment we
548         # expire on this run, and then on a future run can tell if there have
549         # been intervening backups made.
550         cur.execute("select max(snapshotid) from snapshots")
551         last_snapshotid = cur.fetchone()[0]
552
553         # Get the list of old snapshots for this scheme.  Delete all the old
554         # ones.  Rules for what to keep:
555         #   - Always keep the most recent snapshot.
556         #   - If snapshot X is younger than Y, and X has higher intent, then Y
557         #     can be deleted.
558         cur.execute("""select snapshotid, name, intent,
559                               julianday('now') - timestamp as age
560                        from snapshots where scheme = ?
561                        order by age""", (scheme,))
562
563         first = True
564         max_intent = intent
565         for (id, name, snap_intent, snap_age) in cur.fetchall():
566             can_delete = False
567             if snap_intent < max_intent:
568                 # Delete small-intent snapshots if there is a more recent
569                 # large-intent snapshot.
570                 can_delete = True
571             elif snap_intent == intent:
572                 # Delete previous snapshots with the specified intent level.
573                 can_delete = True
574
575             if can_delete and not first:
576                 print "Delete snapshot %d (%s)" % (id, name)
577                 cur.execute("delete from snapshots where snapshotid = ?",
578                             (id,))
579             first = False
580             max_intent = max(max_intent, snap_intent)
581
582         # Delete entries in the segments_used table which are for non-existent
583         # snapshots.
584         cur.execute("""delete from segments_used
585                        where snapshotid not in
586                            (select snapshotid from snapshots)""")
587
588         # Find segments which contain no objects used by any current snapshots,
589         # and delete them from the segment table.
590         cur.execute("""delete from segments where segmentid not in
591                            (select segmentid from segments_used)""")
592
593         # Delete unused objects in the block_index table.  By "unused", we mean
594         # any object which was stored in a segment which has been deleted, and
595         # any object in a segment which was marked for cleaning and has had
596         # cleaning performed already (the expired time is less than the current
597         # largest snapshot id).
598         cur.execute("""delete from block_index
599                        where segmentid not in (select segmentid from segments)
600                           or segmentid in (select segmentid from segments
601                                            where expire_time < ?)""",
602                     (last_snapshotid,))
603
604         # Remove sub-block signatures for deleted objects.
605         cur.execute("""delete from subblock_signatures
606                        where blockid not in
607                            (select blockid from block_index)""")
608
609     # Segment cleaning.
610     class SegmentInfo(Struct): pass
611
612     def get_segment_cleaning_list(self, age_boost=0.0):
613         """Return a list of all current segments with information for cleaning.
614
615         Return all segments which are currently known in the local database
616         (there might be other, older segments in the archive itself), and
617         return usage statistics for each to help decide which segments to
618         clean.
619
620         The returned list will be sorted by estimated cleaning benefit, with
621         segments that are best to clean at the start of the list.
622
623         If specified, the age_boost parameter (measured in days) will added to
624         the age of each segment, as a way of adjusting the benefit computation
625         before a long-lived snapshot is taken (for example, age_boost might be
626         set to 7 when cleaning prior to taking a weekly snapshot).
627         """
628
629         cur = self.cursor()
630         segments = []
631         cur.execute("""select segmentid, used, size, mtime,
632                        julianday('now') - mtime as age from segment_info
633                        where expire_time is null""")
634         for row in cur:
635             info = self.SegmentInfo()
636             info.id = row[0]
637             info.used_bytes = row[1]
638             info.size_bytes = row[2]
639             info.mtime = row[3]
640             info.age_days = row[4]
641
642             # If data is not available for whatever reason, treat it as 0.0.
643             if info.age_days is None:
644                 info.age_days = 0.0
645             if info.used_bytes is None:
646                 info.used_bytes = 0.0
647
648             # Benefit calculation: u is the estimated fraction of each segment
649             # which is utilized (bytes belonging to objects still in use
650             # divided by total size; this doesn't take compression or storage
651             # overhead into account, but should give a reasonable estimate).
652             #
653             # The total benefit is a heuristic that combines several factors:
654             # the amount of space that can be reclaimed (1 - u), an ageing
655             # factor (info.age_days) that favors cleaning old segments to young
656             # ones and also is more likely to clean segments that will be
657             # rewritten for long-lived snapshots (age_boost), and finally a
658             # penalty factor for the cost of re-uploading data (u + 0.1).
659             u = info.used_bytes / info.size_bytes
660             info.cleaning_benefit \
661                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
662
663             segments.append(info)
664
665         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
666         return segments
667
668     def mark_segment_expired(self, segment):
669         """Mark a segment for cleaning in the local database.
670
671         The segment parameter should be either a SegmentInfo object or an
672         integer segment id.  Objects in the given segment will be marked as
673         expired, which means that any future snapshots that would re-use those
674         objects will instead write out a new copy of the object, and thus no
675         future snapshots will depend upon the given segment.
676         """
677
678         if isinstance(segment, int):
679             id = segment
680         elif isinstance(segment, self.SegmentInfo):
681             id = segment.id
682         else:
683             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
684
685         cur = self.cursor()
686         cur.execute("select max(snapshotid) from snapshots")
687         last_snapshotid = cur.fetchone()[0]
688         cur.execute("update segments set expire_time = ? where segmentid = ?",
689                     (last_snapshotid, id))
690         cur.execute("update block_index set expired = 0 where segmentid = ?",
691                     (id,))
692
693     def balance_expired_objects(self):
694         """Analyze expired objects in segments to be cleaned and group by age.
695
696         Update the block_index table of the local database to group expired
697         objects by age.  The exact number of buckets and the cutoffs for each
698         are dynamically determined.  Calling this function after marking
699         segments expired will help in the segment cleaning process, by ensuring
700         that when active objects from clean segments are rewritten, they will
701         be placed into new segments roughly grouped by age.
702         """
703
704         # The expired column of the block_index table is used when generating a
705         # new Cumulus snapshot.  A null value indicates that an object may be
706         # re-used.  Otherwise, an object must be written into a new segment if
707         # needed.  Objects with distinct expired values will be written into
708         # distinct segments, to allow for some grouping by age.  The value 0 is
709         # somewhat special in that it indicates any rewritten objects can be
710         # placed in the same segment as completely new objects; this can be
711         # used for very young objects which have been expired, or objects not
712         # expected to be encountered.
713         #
714         # In the balancing process, all objects which are not used in any
715         # current snapshots will have expired set to 0.  Objects which have
716         # been seen will be sorted by age and will have expired values set to
717         # 0, 1, 2, and so on based on age (with younger objects being assigned
718         # lower values).  The number of buckets and the age cutoffs is
719         # determined by looking at the distribution of block ages.
720
721         cur = self.cursor()
722
723         # Mark all expired objects with expired = 0; these objects will later
724         # have values set to indicate groupings of objects when repacking.
725         cur.execute("""update block_index set expired = 0
726                        where expired is not null""")
727
728         # We will want to aim for at least one full segment for each bucket
729         # that we eventually create, but don't know how many bytes that should
730         # be due to compression.  So compute the average number of bytes in
731         # each expired segment as a rough estimate for the minimum size of each
732         # bucket.  (This estimate could be thrown off by many not-fully-packed
733         # segments, but for now don't worry too much about that.)  If we can't
734         # compute an average, it's probably because there are no expired
735         # segments, so we have no more work to do.
736         cur.execute("""select avg(size) from segments
737                        where segmentid in
738                            (select distinct segmentid from block_index
739                             where expired is not null)""")
740         segment_size_estimate = cur.fetchone()[0]
741         if not segment_size_estimate:
742             return
743
744         # Next, extract distribution of expired objects (number and size) by
745         # age.  Save the timestamp for "now" so that the classification of
746         # blocks into age buckets will not change later in the function, after
747         # time has passed.  Set any timestamps in the future to now, so we are
748         # guaranteed that for the rest of this function, age is always
749         # non-negative.
750         cur.execute("select julianday('now')")
751         now = cur.fetchone()[0]
752
753         cur.execute("""update block_index set timestamp = ?
754                        where timestamp > ? and expired is not null""",
755                     (now, now))
756
757         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
758                        from block_index where expired = 0
759                        group by age order by age""", (now,))
760         distribution = cur.fetchall()
761
762         # Start to determine the buckets for expired objects.  Heuristics used:
763         #   - An upper bound on the number of buckets is given by the number of
764         #     segments we estimate it will take to store all data.  In fact,
765         #     aim for a couple of segments per bucket.
766         #   - Place very young objects in bucket 0 (place with new objects)
767         #     unless there are enough of them to warrant a separate bucket.
768         #   - Try not to create unnecessarily many buckets, since fewer buckets
769         #     will allow repacked data to be grouped based on spatial locality
770         #     (while more buckets will group by temporal locality).  We want a
771         #     balance.
772         MIN_AGE = 4
773         total_bytes = sum([i[2] for i in distribution])
774         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
775         min_size = 1.5 * segment_size_estimate
776         target_size = max(2 * segment_size_estimate,
777                           total_bytes / target_buckets)
778
779         print "segment_size:", segment_size_estimate
780         print "distribution:", distribution
781         print "total_bytes:", total_bytes
782         print "target_buckets:", target_buckets
783         print "min, target size:", min_size, target_size
784
785         # Chosen cutoffs.  Each bucket consists of objects with age greater
786         # than one cutoff value, but not greater than the next largest cutoff.
787         cutoffs = []
788
789         # Starting with the oldest objects, begin grouping together into
790         # buckets of size at least target_size bytes.
791         distribution.reverse()
792         bucket_size = 0
793         min_age_bucket = False
794         for (age, items, size) in distribution:
795             if bucket_size >= target_size \
796                 or (age < MIN_AGE and not min_age_bucket):
797                 if bucket_size < target_size and len(cutoffs) > 0:
798                     cutoffs.pop()
799                 cutoffs.append(age)
800                 bucket_size = 0
801
802             bucket_size += size
803             if age < MIN_AGE:
804                 min_age_bucket = True
805
806         # The last (youngest) bucket will be group 0, unless it has enough data
807         # to be of size min_size by itself, or there happen to be no objects
808         # less than MIN_AGE at all.
809         if bucket_size >= min_size or not min_age_bucket:
810             cutoffs.append(-1)
811         cutoffs.append(-1)
812
813         print "cutoffs:", cutoffs
814
815         # Update the database to assign each object to the appropriate bucket.
816         cutoffs.reverse()
817         for i in range(len(cutoffs)):
818             cur.execute("""update block_index set expired = ?
819                            where round(? - timestamp) > ?
820                              and expired is not null""",
821                         (i, now, cutoffs[i]))