Improve tracking of segments and segment utilization.
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division
30 import hashlib, os, re, tarfile, tempfile, thread
31 from pysqlite2 import dbapi2 as sqlite3
32
33 import cumulus.store, cumulus.store.file
34
35 # The largest supported snapshot format that can be understood.
36 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
37
38 # Maximum number of nested indirect references allowed in a snapshot.
39 MAX_RECURSION_DEPTH = 3
40
41 # All segments which have been accessed this session.
42 accessed_segments = set()
43
44 # Table of methods used to filter segments before storage, and corresponding
45 # filename extensions.  These are listed in priority order (methods earlier in
46 # the list are tried first).
47 SEGMENT_FILTERS = [
48     (".gpg", "cumulus-filter-gpg --decrypt"),
49     (".gz", "gzip -dc"),
50     (".bz2", "bzip2 -dc"),
51 ]
52
53 def uri_decode(s):
54     """Decode a URI-encoded (%xx escapes) string."""
55     def hex_decode(m): return chr(int(m.group(1), 16))
56     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
57 def uri_encode(s):
58     """Encode a string to URI-encoded (%xx escapes) form."""
59     def hex_encode(c):
60         if c > '+' and c < '\x7f' and c != '@':
61             return c
62         else:
63             return "%%%02x" % (ord(c),)
64     return ''.join(hex_encode(c) for c in s)
65
66 class Struct:
67     """A class which merely acts as a data container.
68
69     Instances of this class (or its subclasses) are merely used to store data
70     in various attributes.  No methods are provided.
71     """
72
73     def __repr__(self):
74         return "<%s %s>" % (self.__class__, self.__dict__)
75
76 CHECKSUM_ALGORITHMS = {
77     'sha1': hashlib.sha1,
78     'sha224': hashlib.sha224,
79     'sha256': hashlib.sha256,
80 }
81
82 class ChecksumCreator:
83     """Compute a Cumulus checksum for provided data.
84
85     The algorithm used is selectable, but currently defaults to sha1.
86     """
87
88     def __init__(self, algorithm='sha1'):
89         self.algorithm = algorithm
90         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
91
92     def update(self, data):
93         self.hash.update(data)
94         return self
95
96     def compute(self):
97         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
98
99 class ChecksumVerifier:
100     """Verify whether a checksum from a snapshot matches the supplied data."""
101
102     def __init__(self, checksumstr):
103         """Create an object to check the supplied checksum."""
104
105         (algo, checksum) = checksumstr.split("=", 1)
106         self.checksum = checksum
107         self.hash = CHECKSUM_ALGORITHMS[algo]()
108
109     def update(self, data):
110         self.hash.update(data)
111
112     def valid(self):
113         """Return a boolean indicating whether the checksum matches."""
114
115         result = self.hash.hexdigest()
116         return result == self.checksum
117
118 class LowlevelDataStore:
119     """Access to the backup store containing segments and snapshot descriptors.
120
121     Instances of this class are used to get direct filesystem-level access to
122     the backup data.  To read a backup, a caller will ordinarily not care about
123     direct access to backup segments, but will instead merely need to access
124     objects from those segments.  The ObjectStore class provides a suitable
125     wrapper around a DataStore to give this high-level access.
126     """
127
128     def __init__(self, path):
129         if isinstance(path, cumulus.store.Store):
130             self.store = path
131         elif path.find(":") >= 0:
132             self.store = cumulus.store.open(path)
133         else:
134             self.store = cumulus.store.file.FileStore(path)
135
136     def _classify(self, filename):
137         for (t, r) in cumulus.store.type_patterns.items():
138             if r.match(filename):
139                 return (t, filename)
140         return (None, filename)
141
142     def scan(self):
143         self.store.scan()
144
145     def lowlevel_open(self, filename):
146         """Return a file-like object for reading data from the given file."""
147
148         (type, filename) = self._classify(filename)
149         return self.store.get(type, filename)
150
151     def lowlevel_stat(self, filename):
152         """Return a dictionary of information about the given file.
153
154         Currently, the only defined field is 'size', giving the size of the
155         file in bytes.
156         """
157
158         (type, filename) = self._classify(filename)
159         return self.store.stat(type, filename)
160
161     # Slightly higher-level list methods.
162     def list_snapshots(self):
163         for f in self.store.list('snapshots'):
164             m = cumulus.store.type_patterns['snapshots'].match(f)
165             if m: yield m.group(1)
166
167     def list_segments(self):
168         for f in self.store.list('segments'):
169             m = cumulus.store.type_patterns['segments'].match(f)
170             if m: yield m.group(1)
171
172 class ObjectStore:
173     def __init__(self, data_store):
174         self.store = data_store
175         self.cachedir = None
176         self.CACHE_SIZE = 16
177         self.lru_list = []
178
179     def get_cachedir(self):
180         if self.cachedir is None:
181             self.cachedir = tempfile.mkdtemp(".lbs")
182         return self.cachedir
183
184     def cleanup(self):
185         if self.cachedir is not None:
186             # TODO: Avoid use of system, make this safer
187             os.system("rm -rf " + self.cachedir)
188         self.cachedir = None
189
190     @staticmethod
191     def parse_ref(refstr):
192         m = re.match(r"^zero\[(\d+)\]$", refstr)
193         if m:
194             return ("zero", None, None, (0, int(m.group(1))))
195
196         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
197         if not m: return
198
199         segment = m.group(1)
200         object = m.group(2)
201         checksum = m.group(3)
202         slice = m.group(4)
203
204         if checksum is not None:
205             checksum = checksum.lstrip("(").rstrip(")")
206
207         if slice is not None:
208             if m.group(9) is not None:
209                 # Size-assertion slice
210                 slice = (0, int(m.group(9)), True)
211             elif m.group(6) is None:
212                 # Abbreviated slice
213                 slice = (0, int(m.group(8)), False)
214             else:
215                 slice = (int(m.group(7)), int(m.group(8)), False)
216
217         return (segment, object, checksum, slice)
218
219     def get_segment(self, segment):
220         accessed_segments.add(segment)
221
222         for (extension, filter) in SEGMENT_FILTERS:
223             try:
224                 raw = self.store.lowlevel_open(segment + ".tar" + extension)
225
226                 (input, output) = os.popen2(filter)
227                 def copy_thread(src, dst):
228                     BLOCK_SIZE = 4096
229                     while True:
230                         block = src.read(BLOCK_SIZE)
231                         if len(block) == 0: break
232                         dst.write(block)
233                     dst.close()
234
235                 thread.start_new_thread(copy_thread, (raw, input))
236                 return output
237             except:
238                 pass
239
240         raise cumulus.store.NotFoundError
241
242     def load_segment(self, segment):
243         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
244         for item in seg:
245             data_obj = seg.extractfile(item)
246             path = item.name.split('/')
247             if len(path) == 2 and path[0] == segment:
248                 yield (path[1], data_obj.read())
249
250     def load_snapshot(self, snapshot):
251         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
252         return file.read().splitlines(True)
253
254     def extract_segment(self, segment):
255         segdir = os.path.join(self.get_cachedir(), segment)
256         os.mkdir(segdir)
257         for (object, data) in self.load_segment(segment):
258             f = open(os.path.join(segdir, object), 'wb')
259             f.write(data)
260             f.close()
261
262     def load_object(self, segment, object):
263         accessed_segments.add(segment)
264         path = os.path.join(self.get_cachedir(), segment, object)
265         if not os.access(path, os.R_OK):
266             self.extract_segment(segment)
267         if segment in self.lru_list: self.lru_list.remove(segment)
268         self.lru_list.append(segment)
269         while len(self.lru_list) > self.CACHE_SIZE:
270             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
271             self.lru_list = self.lru_list[1:]
272         return open(path, 'rb').read()
273
274     def get(self, refstr):
275         """Fetch the given object and return it.
276
277         The input should be an object reference, in string form.
278         """
279
280         (segment, object, checksum, slice) = self.parse_ref(refstr)
281
282         if segment == "zero":
283             return "\0" * slice[1]
284
285         data = self.load_object(segment, object)
286
287         if checksum is not None:
288             verifier = ChecksumVerifier(checksum)
289             verifier.update(data)
290             if not verifier.valid():
291                 raise ValueError
292
293         if slice is not None:
294             (start, length, exact) = slice
295             if exact and len(data) != length: raise ValueError
296             data = data[start:start+length]
297             if len(data) != length: raise IndexError
298
299         return data
300
301 def parse(lines, terminate=None):
302     """Generic parser for RFC822-style "Key: Value" data streams.
303
304     This parser can be used to read metadata logs and snapshot root descriptor
305     files.
306
307     lines must be an iterable object which yields a sequence of lines of input.
308
309     If terminate is specified, it is used as a predicate to determine when to
310     stop reading input lines.
311     """
312
313     dict = {}
314     last_key = None
315
316     for l in lines:
317         # Strip off a trailing newline, if present
318         if len(l) > 0 and l[-1] == "\n":
319             l = l[:-1]
320
321         if terminate is not None and terminate(l):
322             if len(dict) > 0: yield dict
323             dict = {}
324             last_key = None
325             continue
326
327         m = re.match(r"^([-\w]+):\s*(.*)$", l)
328         if m:
329             dict[m.group(1)] = m.group(2)
330             last_key = m.group(1)
331         elif len(l) > 0 and l[0].isspace() and last_key is not None:
332             dict[last_key] += l
333         else:
334             last_key = None
335
336     if len(dict) > 0: yield dict
337
338 def parse_full(lines):
339     try:
340         return parse(lines).next()
341     except StopIteration:
342         return {}
343
344 def parse_metadata_version(s):
345     """Convert a string with the snapshot version format to a tuple."""
346
347     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
348     if m is None:
349         return ()
350     else:
351         return tuple([int(d) for d in m.group(1).split(".")])
352
353 def read_metadata(object_store, root):
354     """Iterate through all lines in the metadata log, following references."""
355
356     # Stack for keeping track of recursion when following references to
357     # portions of the log.  The last entry in the stack corresponds to the
358     # object currently being parsed.  Each entry is a list of lines which have
359     # been reversed, so that popping successive lines from the end of each list
360     # will return lines of the metadata log in order.
361     stack = []
362
363     def follow_ref(refstr):
364         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
365         lines = object_store.get(refstr).splitlines(True)
366         lines.reverse()
367         stack.append(lines)
368
369     follow_ref(root)
370
371     while len(stack) > 0:
372         top = stack[-1]
373         if len(top) == 0:
374             stack.pop()
375             continue
376         line = top.pop()
377
378         # An indirect reference which we must follow?
379         if len(line) > 0 and line[0] == '@':
380             ref = line[1:]
381             ref.strip()
382             follow_ref(ref)
383         else:
384             yield line
385
386 class MetadataItem:
387     """Metadata for a single file (or directory or...) from a snapshot."""
388
389     # Functions for parsing various datatypes that can appear in a metadata log
390     # item.
391     @staticmethod
392     def decode_int(s):
393         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
394         if s.startswith("0x"):
395             return int(s, 16)
396         elif s.startswith("0"):
397             return int(s, 8)
398         else:
399             return int(s, 10)
400
401     @staticmethod
402     def decode_str(s):
403         """Decode a URI-encoded (%xx escapes) string."""
404         return uri_decode(s)
405
406     @staticmethod
407     def raw_str(s):
408         """An unecoded string."""
409         return s
410
411     @staticmethod
412     def decode_user(s):
413         """Decode a user/group to a tuple of uid/gid followed by name."""
414         items = s.split()
415         uid = MetadataItem.decode_int(items[0])
416         name = None
417         if len(items) > 1:
418             if items[1].startswith("(") and items[1].endswith(")"):
419                 name = MetadataItem.decode_str(items[1][1:-1])
420         return (uid, name)
421
422     @staticmethod
423     def decode_device(s):
424         """Decode a device major/minor number."""
425         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
426         return (major, minor)
427
428     class Items: pass
429
430     def __init__(self, fields, object_store):
431         """Initialize from a dictionary of key/value pairs from metadata log."""
432
433         self.fields = fields
434         self.object_store = object_store
435         self.keys = []
436         self.items = self.Items()
437         for (k, v) in fields.items():
438             if k in self.field_types:
439                 decoder = self.field_types[k]
440                 setattr(self.items, k, decoder(v))
441                 self.keys.append(k)
442
443     def data(self):
444         """Return an iterator for the data blocks that make up a file."""
445
446         # This traverses the list of blocks that make up a file, following
447         # indirect references.  It is implemented in much the same way as
448         # read_metadata, so see that function for details of the technique.
449
450         objects = self.fields['data'].split()
451         objects.reverse()
452         stack = [objects]
453
454         def follow_ref(refstr):
455             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
456             objects = self.object_store.get(refstr).split()
457             objects.reverse()
458             stack.append(objects)
459
460         while len(stack) > 0:
461             top = stack[-1]
462             if len(top) == 0:
463                 stack.pop()
464                 continue
465             ref = top.pop()
466
467             # An indirect reference which we must follow?
468             if len(ref) > 0 and ref[0] == '@':
469                 follow_ref(ref[1:])
470             else:
471                 yield ref
472
473 # Description of fields that might appear, and how they should be parsed.
474 MetadataItem.field_types = {
475     'name': MetadataItem.decode_str,
476     'type': MetadataItem.raw_str,
477     'mode': MetadataItem.decode_int,
478     'device': MetadataItem.decode_device,
479     'user': MetadataItem.decode_user,
480     'group': MetadataItem.decode_user,
481     'ctime': MetadataItem.decode_int,
482     'mtime': MetadataItem.decode_int,
483     'links': MetadataItem.decode_int,
484     'inode': MetadataItem.raw_str,
485     'checksum': MetadataItem.decode_str,
486     'size': MetadataItem.decode_int,
487     'contents': MetadataItem.decode_str,
488     'target': MetadataItem.decode_str,
489 }
490
491 def iterate_metadata(object_store, root):
492     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
493         yield MetadataItem(d, object_store)
494
495 class LocalDatabase:
496     """Access to the local database of snapshot contents and object checksums.
497
498     The local database is consulted when creating a snapshot to determine what
499     data can be re-used from old snapshots.  Segment cleaning is performed by
500     manipulating the data in the local database; the local database also
501     includes enough data to guide the segment cleaning process.
502     """
503
504     def __init__(self, path, dbname="localdb.sqlite"):
505         self.db_connection = sqlite3.connect(path + "/" + dbname)
506
507     # Low-level database access.  Use these methods when there isn't a
508     # higher-level interface available.  Exception: do, however, remember to
509     # use the commit() method after making changes to make sure they are
510     # actually saved, even when going through higher-level interfaces.
511     def commit(self):
512         "Commit any pending changes to the local database."
513         self.db_connection.commit()
514
515     def rollback(self):
516         "Roll back any pending changes to the local database."
517         self.db_connection.rollback()
518
519     def cursor(self):
520         "Return a DB-API cursor for directly accessing the local database."
521         return self.db_connection.cursor()
522
523     def list_schemes(self):
524         """Return the list of snapshots found in the local database.
525
526         The returned value is a list of tuples (id, scheme, name, time, intent).
527         """
528
529         cur = self.cursor()
530         cur.execute("select distinct scheme from snapshots")
531         schemes = [row[0] for row in cur.fetchall()]
532         schemes.sort()
533         return schemes
534
535     def list_snapshots(self, scheme):
536         """Return a list of snapshots for the given scheme."""
537         cur = self.cursor()
538         cur.execute("select name from snapshots")
539         snapshots = [row[0] for row in cur.fetchall()]
540         snapshots.sort()
541         return snapshots
542
543     def delete_snapshot(self, scheme, name):
544         """Remove the specified snapshot from the database.
545
546         Warning: This does not garbage collect all dependent data in the
547         database, so it must be followed by a call to garbage_collect() to make
548         the database consistent.
549         """
550         cur = self.cursor()
551         cur.execute("delete from snapshots where scheme = ? and name = ?",
552                     (scheme, name))
553
554     def prune_old_snapshots(self, scheme, intent=1.0):
555         """Delete entries from old snapshots from the database.
556
557         Only snapshots with the specified scheme name will be deleted.  If
558         intent is given, it gives the intended next snapshot type, to determine
559         how aggressively to clean (for example, intent=7 could be used if the
560         next snapshot will be a weekly snapshot).
561         """
562
563         cur = self.cursor()
564
565         # Find the id of the last snapshot to be created.  This is used for
566         # measuring time in a way: we record this value in each segment we
567         # expire on this run, and then on a future run can tell if there have
568         # been intervening backups made.
569         cur.execute("select max(snapshotid) from snapshots")
570         last_snapshotid = cur.fetchone()[0]
571
572         # Get the list of old snapshots for this scheme.  Delete all the old
573         # ones.  Rules for what to keep:
574         #   - Always keep the most recent snapshot.
575         #   - If snapshot X is younger than Y, and X has higher intent, then Y
576         #     can be deleted.
577         cur.execute("""select snapshotid, name, intent,
578                               julianday('now') - timestamp as age
579                        from snapshots where scheme = ?
580                        order by age""", (scheme,))
581
582         first = True
583         max_intent = intent
584         for (id, name, snap_intent, snap_age) in cur.fetchall():
585             can_delete = False
586             if snap_intent < max_intent:
587                 # Delete small-intent snapshots if there is a more recent
588                 # large-intent snapshot.
589                 can_delete = True
590             elif snap_intent == intent:
591                 # Delete previous snapshots with the specified intent level.
592                 can_delete = True
593
594             if can_delete and not first:
595                 print "Delete snapshot %d (%s)" % (id, name)
596                 cur.execute("delete from snapshots where snapshotid = ?",
597                             (id,))
598             first = False
599             max_intent = max(max_intent, snap_intent)
600
601         self.garbage_collect()
602
603     def garbage_collect(self):
604         """Garbage-collect unreachable segment and object data.
605
606         Remove all segments and checksums which is not reachable from the
607         current set of snapshots stored in the local database.
608         """
609         cur = self.cursor()
610
611         # Delete entries in the segments_used table which are for non-existent
612         # snapshots.
613         cur.execute("""delete from segments_used
614                        where snapshotid not in
615                            (select snapshotid from snapshots)""")
616
617         # Find segments which contain no objects used by any current snapshots,
618         # and delete them from the segment table.
619         cur.execute("""delete from segments where segmentid not in
620                            (select segmentid from segments_used)""")
621
622         # Delete dangling objects in the block_index table.
623         cur.execute("""delete from block_index
624                        where segmentid not in
625                            (select segmentid from segments)""")
626
627         # Remove sub-block signatures for deleted objects.
628         cur.execute("""delete from subblock_signatures
629                        where blockid not in
630                            (select blockid from block_index)""")
631
632     # Segment cleaning.
633     class SegmentInfo(Struct): pass
634
635     def get_segment_cleaning_list(self, age_boost=0.0):
636         """Return a list of all current segments with information for cleaning.
637
638         Return all segments which are currently known in the local database
639         (there might be other, older segments in the archive itself), and
640         return usage statistics for each to help decide which segments to
641         clean.
642
643         The returned list will be sorted by estimated cleaning benefit, with
644         segments that are best to clean at the start of the list.
645
646         If specified, the age_boost parameter (measured in days) will added to
647         the age of each segment, as a way of adjusting the benefit computation
648         before a long-lived snapshot is taken (for example, age_boost might be
649         set to 7 when cleaning prior to taking a weekly snapshot).
650         """
651
652         cur = self.cursor()
653         segments = []
654         cur.execute("""select segmentid, used, size, mtime,
655                        julianday('now') - mtime as age from segment_info
656                        where expire_time is null""")
657         for row in cur:
658             info = self.SegmentInfo()
659             info.id = row[0]
660             info.used_bytes = row[1]
661             info.size_bytes = row[2]
662             info.mtime = row[3]
663             info.age_days = row[4]
664
665             # If data is not available for whatever reason, treat it as 0.0.
666             if info.age_days is None:
667                 info.age_days = 0.0
668             if info.used_bytes is None:
669                 info.used_bytes = 0.0
670
671             # Benefit calculation: u is the estimated fraction of each segment
672             # which is utilized (bytes belonging to objects still in use
673             # divided by total size; this doesn't take compression or storage
674             # overhead into account, but should give a reasonable estimate).
675             #
676             # The total benefit is a heuristic that combines several factors:
677             # the amount of space that can be reclaimed (1 - u), an ageing
678             # factor (info.age_days) that favors cleaning old segments to young
679             # ones and also is more likely to clean segments that will be
680             # rewritten for long-lived snapshots (age_boost), and finally a
681             # penalty factor for the cost of re-uploading data (u + 0.1).
682             u = info.used_bytes / info.size_bytes
683             info.cleaning_benefit \
684                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
685
686             segments.append(info)
687
688         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
689         return segments
690
691     def mark_segment_expired(self, segment):
692         """Mark a segment for cleaning in the local database.
693
694         The segment parameter should be either a SegmentInfo object or an
695         integer segment id.  Objects in the given segment will be marked as
696         expired, which means that any future snapshots that would re-use those
697         objects will instead write out a new copy of the object, and thus no
698         future snapshots will depend upon the given segment.
699         """
700
701         if isinstance(segment, int):
702             id = segment
703         elif isinstance(segment, self.SegmentInfo):
704             id = segment.id
705         else:
706             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
707
708         cur = self.cursor()
709         cur.execute("select max(snapshotid) from snapshots")
710         last_snapshotid = cur.fetchone()[0]
711         cur.execute("update segments set expire_time = ? where segmentid = ?",
712                     (last_snapshotid, id))
713         cur.execute("update block_index set expired = 0 where segmentid = ?",
714                     (id,))
715
716     def balance_expired_objects(self):
717         """Analyze expired objects in segments to be cleaned and group by age.
718
719         Update the block_index table of the local database to group expired
720         objects by age.  The exact number of buckets and the cutoffs for each
721         are dynamically determined.  Calling this function after marking
722         segments expired will help in the segment cleaning process, by ensuring
723         that when active objects from clean segments are rewritten, they will
724         be placed into new segments roughly grouped by age.
725         """
726
727         # The expired column of the block_index table is used when generating a
728         # new Cumulus snapshot.  A null value indicates that an object may be
729         # re-used.  Otherwise, an object must be written into a new segment if
730         # needed.  Objects with distinct expired values will be written into
731         # distinct segments, to allow for some grouping by age.  The value 0 is
732         # somewhat special in that it indicates any rewritten objects can be
733         # placed in the same segment as completely new objects; this can be
734         # used for very young objects which have been expired, or objects not
735         # expected to be encountered.
736         #
737         # In the balancing process, all objects which are not used in any
738         # current snapshots will have expired set to 0.  Objects which have
739         # been seen will be sorted by age and will have expired values set to
740         # 0, 1, 2, and so on based on age (with younger objects being assigned
741         # lower values).  The number of buckets and the age cutoffs is
742         # determined by looking at the distribution of block ages.
743
744         cur = self.cursor()
745
746         # Mark all expired objects with expired = 0; these objects will later
747         # have values set to indicate groupings of objects when repacking.
748         cur.execute("""update block_index set expired = 0
749                        where expired is not null""")
750
751         # We will want to aim for at least one full segment for each bucket
752         # that we eventually create, but don't know how many bytes that should
753         # be due to compression.  So compute the average number of bytes in
754         # each expired segment as a rough estimate for the minimum size of each
755         # bucket.  (This estimate could be thrown off by many not-fully-packed
756         # segments, but for now don't worry too much about that.)  If we can't
757         # compute an average, it's probably because there are no expired
758         # segments, so we have no more work to do.
759         cur.execute("""select avg(size) from segments
760                        where segmentid in
761                            (select distinct segmentid from block_index
762                             where expired is not null)""")
763         segment_size_estimate = cur.fetchone()[0]
764         if not segment_size_estimate:
765             return
766
767         # Next, extract distribution of expired objects (number and size) by
768         # age.  Save the timestamp for "now" so that the classification of
769         # blocks into age buckets will not change later in the function, after
770         # time has passed.  Set any timestamps in the future to now, so we are
771         # guaranteed that for the rest of this function, age is always
772         # non-negative.
773         cur.execute("select julianday('now')")
774         now = cur.fetchone()[0]
775
776         cur.execute("""update block_index set timestamp = ?
777                        where timestamp > ? and expired is not null""",
778                     (now, now))
779
780         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
781                        from block_index where expired = 0
782                        group by age order by age""", (now,))
783         distribution = cur.fetchall()
784
785         # Start to determine the buckets for expired objects.  Heuristics used:
786         #   - An upper bound on the number of buckets is given by the number of
787         #     segments we estimate it will take to store all data.  In fact,
788         #     aim for a couple of segments per bucket.
789         #   - Place very young objects in bucket 0 (place with new objects)
790         #     unless there are enough of them to warrant a separate bucket.
791         #   - Try not to create unnecessarily many buckets, since fewer buckets
792         #     will allow repacked data to be grouped based on spatial locality
793         #     (while more buckets will group by temporal locality).  We want a
794         #     balance.
795         MIN_AGE = 4
796         total_bytes = sum([i[2] for i in distribution])
797         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
798         min_size = 1.5 * segment_size_estimate
799         target_size = max(2 * segment_size_estimate,
800                           total_bytes / target_buckets)
801
802         print "segment_size:", segment_size_estimate
803         print "distribution:", distribution
804         print "total_bytes:", total_bytes
805         print "target_buckets:", target_buckets
806         print "min, target size:", min_size, target_size
807
808         # Chosen cutoffs.  Each bucket consists of objects with age greater
809         # than one cutoff value, but not greater than the next largest cutoff.
810         cutoffs = []
811
812         # Starting with the oldest objects, begin grouping together into
813         # buckets of size at least target_size bytes.
814         distribution.reverse()
815         bucket_size = 0
816         min_age_bucket = False
817         for (age, items, size) in distribution:
818             if bucket_size >= target_size \
819                 or (age < MIN_AGE and not min_age_bucket):
820                 if bucket_size < target_size and len(cutoffs) > 0:
821                     cutoffs.pop()
822                 cutoffs.append(age)
823                 bucket_size = 0
824
825             bucket_size += size
826             if age < MIN_AGE:
827                 min_age_bucket = True
828
829         # The last (youngest) bucket will be group 0, unless it has enough data
830         # to be of size min_size by itself, or there happen to be no objects
831         # less than MIN_AGE at all.
832         if bucket_size >= min_size or not min_age_bucket:
833             cutoffs.append(-1)
834         cutoffs.append(-1)
835
836         print "cutoffs:", cutoffs
837
838         # Update the database to assign each object to the appropriate bucket.
839         cutoffs.reverse()
840         for i in range(len(cutoffs)):
841             cur.execute("""update block_index set expired = ?
842                            where round(? - timestamp) > ?
843                              and expired is not null""",
844                         (i, now, cutoffs[i]))