Add code to rebuild_database.py to recompute segment metadata.
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division
30 import hashlib
31 import itertools
32 import os
33 import re
34 import sqlite3
35 import tarfile
36 import tempfile
37 import thread
38
39 import cumulus.store
40 import cumulus.store.file
41
42 # The largest supported snapshot format that can be understood.
43 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
44
45 # Maximum number of nested indirect references allowed in a snapshot.
46 MAX_RECURSION_DEPTH = 3
47
48 # All segments which have been accessed this session.
49 accessed_segments = set()
50
51 # Table of methods used to filter segments before storage, and corresponding
52 # filename extensions.  These are listed in priority order (methods earlier in
53 # the list are tried first).
54 SEGMENT_FILTERS = [
55     (".gpg", "cumulus-filter-gpg --decrypt"),
56     (".gz", "gzip -dc"),
57     (".bz2", "bzip2 -dc"),
58     ("", None),
59 ]
60
61 def uri_decode(s):
62     """Decode a URI-encoded (%xx escapes) string."""
63     def hex_decode(m): return chr(int(m.group(1), 16))
64     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
65 def uri_encode(s):
66     """Encode a string to URI-encoded (%xx escapes) form."""
67     def hex_encode(c):
68         if c > '+' and c < '\x7f' and c != '@':
69             return c
70         else:
71             return "%%%02x" % (ord(c),)
72     return ''.join(hex_encode(c) for c in s)
73
74 class Struct:
75     """A class which merely acts as a data container.
76
77     Instances of this class (or its subclasses) are merely used to store data
78     in various attributes.  No methods are provided.
79     """
80
81     def __repr__(self):
82         return "<%s %s>" % (self.__class__, self.__dict__)
83
84 CHECKSUM_ALGORITHMS = {
85     'sha1': hashlib.sha1,
86     'sha224': hashlib.sha224,
87     'sha256': hashlib.sha256,
88 }
89
90 class ChecksumCreator:
91     """Compute a Cumulus checksum for provided data.
92
93     The algorithm used is selectable, but currently defaults to sha1.
94     """
95
96     def __init__(self, algorithm='sha1'):
97         self.algorithm = algorithm
98         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
99
100     def update(self, data):
101         self.hash.update(data)
102         return self
103
104     def compute(self):
105         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
106
107 class ChecksumVerifier:
108     """Verify whether a checksum from a snapshot matches the supplied data."""
109
110     def __init__(self, checksumstr):
111         """Create an object to check the supplied checksum."""
112
113         (algo, checksum) = checksumstr.split("=", 1)
114         self.checksum = checksum
115         self.hash = CHECKSUM_ALGORITHMS[algo]()
116
117     def update(self, data):
118         self.hash.update(data)
119
120     def valid(self):
121         """Return a boolean indicating whether the checksum matches."""
122
123         result = self.hash.hexdigest()
124         return result == self.checksum
125
126 class SearchPathEntry(object):
127     """Item representing a possible search location for Cumulus files.
128
129     Some Cumulus files might be stored in multiple possible file locations: due
130     to format (different compression mechanisms with different extensions),
131     locality (different segments might be placed in different directories to
132     control archiving policies), for backwards compatibility (default location
133     changed over time).  A SearchPathEntry describes a possible location for a
134     file.
135     """
136     def __init__(self, directory_prefix, suffix, context=None):
137         self._directory_prefix = directory_prefix
138         self._suffix = suffix
139         self._context = context
140
141     def __repr__(self):
142         return "%s(%r, %r, %r)" % (self.__class__.__name__,
143                                    self._directory_prefix, self._suffix,
144                                    self._context)
145
146     def build_path(self, basename):
147         """Construct the search path to use for a file with name basename.
148
149         Returns a tuple (pathname, context), where pathname is the path to try
150         and context is any additional data associated with this search entry
151         (if any).
152         """
153         return (os.path.join(self._directory_prefix, basename + self._suffix),
154                 self._context)
155
156 class SearchPath(object):
157     """A collection of locations to search for files and lookup utilities.
158
159     For looking for a file in a Cumulus storage backend, a SearchPath object
160     contains a list of possible locations to try.  A SearchPath can be used to
161     perform the search as well; when a file is found the search path ordering
162     is updated (moving the successful SearchPathEntry to the front of the list
163     for future searches).
164     """
165     def __init__(self, name_regex, searchpath):
166         self._regex = re.compile(name_regex)
167         self._path = list(searchpath)
168
169     def add_search_entry(self, entry):
170         self._path.append(entry)
171
172     def directories(self):
173         """Return the set of directories to search for a file type."""
174         return set(entry._directory_prefix for entry in self._path)
175
176     def get(self, backend, basename):
177         for (i, entry) in enumerate(self._path):
178             try:
179                 (pathname, context) = entry.build_path(basename)
180                 fp = backend.get(pathname)
181                 # On success, move this entry to the front of the search path
182                 # to speed future searches.
183                 if i > 0:
184                     self._path.pop(i)
185                     self._path.insert(0, entry)
186                 return (fp, pathname, context)
187             except cumulus.store.NotFoundError:
188                 continue
189         raise cumulus.store.NotFoundError(basename)
190
191     def stat(self, backend, basename):
192         for (i, entry) in enumerate(self._path):
193             try:
194                 (pathname, context) = entry.build_path(basename)
195                 stat_data = backend.stat(pathname)
196                 # On success, move this entry to the front of the search path
197                 # to speed future searches.
198                 if i > 0:
199                     self._path.pop(i)
200                     self._path.insert(0, entry)
201                 result = {"path": pathname}
202                 result.update(stat_data)
203                 return result
204             except cumulus.store.NotFoundError:
205                 continue
206         raise cumulus.store.NotFoundError(basename)
207
208     def match(self, filename):
209         return self._regex.match(filename)
210
211     def list(self, backend):
212         success = False
213         for d in self.directories():
214             try:
215                 for f in backend.list(d):
216                     success = True
217                     m = self.match(f)
218                     if m: yield (os.path.join(d, f), m)
219             except cumulus.store.NotFoundError:
220                 pass
221         if not success:
222             raise cumulus.store.NotFoundError(basename)
223
224 def _build_segments_searchpath(prefix):
225     for (extension, filter) in SEGMENT_FILTERS:
226         yield SearchPathEntry(prefix, extension, filter)
227
228 SEARCH_PATHS = {
229     "checksums": SearchPath(
230         r"^snapshot-(.*)\.(\w+)sums$",
231         [SearchPathEntry("meta", ".sha1sums"),
232          SearchPathEntry("checksums", ".sha1sums"),
233          SearchPathEntry("", ".sha1sums")]),
234     "segments": SearchPath(
235         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
236          r"\.tar(\.\S+)?$"),
237         itertools.chain(
238             _build_segments_searchpath("segments0"),
239             _build_segments_searchpath("segments1"),
240             _build_segments_searchpath(""),
241             _build_segments_searchpath("segments"))),
242     "snapshots": SearchPath(
243         r"^snapshot-(.*)\.(cumulus|lbs)$",
244         [SearchPathEntry("snapshots", ".cumulus"),
245          SearchPathEntry("snapshots", ".lbs"),
246          SearchPathEntry("", ".cumulus"),
247          SearchPathEntry("", ".lbs")]),
248 }
249
250 class BackendWrapper(object):
251     """Wrapper around a Cumulus storage backend that understands file types.
252
253     The BackendWrapper class understands different Cumulus file types, such as
254     snapshots and segments, and implements higher-level operations such as
255     "retrieve a snapshot with a specific name" (hiding operations such as
256     searching for the correct file name).
257     """
258
259     def __init__(self, backend):
260         """Initializes a wrapper around the specified storage backend.
261
262         store may either be a Store object or URL.
263         """
264         if type(backend) in (str, unicode):
265             if backend.find(":") >= 0:
266                 self._backend = cumulus.store.open(backend)
267             else:
268                 self._backend = cumulus.store.file.FileStore(backend)
269         else:
270             self._backend = backend
271
272     @property
273     def raw_backend(self):
274         return self._backend
275
276     def stat_generic(self, basename, filetype):
277         return SEARCH_PATHS[filetype].stat(self._backend, basename)
278
279     def open_generic(self, basename, filetype):
280         return SEARCH_PATHS[filetype].get(self._backend, basename)
281
282     def open_snapshot(self, name):
283         return self.open_generic("snapshot-" + name, "snapshots")
284
285     def open_segment(self, name):
286         return self.open_generic(name + ".tar", "segments")
287
288     def list_generic(self, filetype):
289         return ((x[1].group(1), x[0])
290                 for x in SEARCH_PATHS[filetype].list(self._backend))
291
292 class CumulusStore:
293     def __init__(self, backend):
294         if isinstance(backend, BackendWrapper):
295             self.backend = backend
296         else:
297             self.backend = BackendWrapper(backend)
298         self.cachedir = None
299         self.CACHE_SIZE = 16
300         self._lru_list = []
301
302     def get_cachedir(self):
303         if self.cachedir is None:
304             self.cachedir = tempfile.mkdtemp("-cumulus")
305         return self.cachedir
306
307     def cleanup(self):
308         if self.cachedir is not None:
309             # TODO: Avoid use of system, make this safer
310             os.system("rm -rf " + self.cachedir)
311         self.cachedir = None
312
313     @staticmethod
314     def parse_ref(refstr):
315         m = re.match(r"^zero\[(\d+)\]$", refstr)
316         if m:
317             return ("zero", None, None, (0, int(m.group(1)), False))
318
319         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
320         if not m: return
321
322         segment = m.group(1)
323         object = m.group(2)
324         checksum = m.group(3)
325         slice = m.group(4)
326
327         if checksum is not None:
328             checksum = checksum.lstrip("(").rstrip(")")
329
330         if slice is not None:
331             if m.group(9) is not None:
332                 # Size-assertion slice
333                 slice = (0, int(m.group(9)), True)
334             elif m.group(6) is None:
335                 # Abbreviated slice
336                 slice = (0, int(m.group(8)), False)
337             else:
338                 slice = (int(m.group(7)), int(m.group(8)), False)
339
340         return (segment, object, checksum, slice)
341
342     def list_snapshots(self):
343         return set(x[0] for x in self.backend.list_generic("snapshots"))
344
345     def list_segments(self):
346         return set(x[0] for x in self.backend.list_generic("segments"))
347
348     def load_snapshot(self, snapshot):
349         snapshot_file = self.backend.open_snapshot(snapshot)[0]
350         return snapshot_file.read().splitlines(True)
351
352     @staticmethod
353     def filter_data(filehandle, filter_cmd):
354         if filter_cmd is None:
355             return filehandle
356         (input, output) = os.popen2(filter_cmd)
357         def copy_thread(src, dst):
358             BLOCK_SIZE = 4096
359             while True:
360                 block = src.read(BLOCK_SIZE)
361                 if len(block) == 0: break
362                 dst.write(block)
363             src.close()
364             dst.close()
365         thread.start_new_thread(copy_thread, (filehandle, input))
366         return output
367
368     def get_segment(self, segment):
369         accessed_segments.add(segment)
370
371         (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
372         return self.filter_data(segment_fp, filter_cmd)
373
374     def load_segment(self, segment):
375         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
376         for item in seg:
377             data_obj = seg.extractfile(item)
378             path = item.name.split('/')
379             if len(path) == 2 and path[0] == segment:
380                 yield (path[1], data_obj.read())
381
382     def extract_segment(self, segment):
383         segdir = os.path.join(self.get_cachedir(), segment)
384         os.mkdir(segdir)
385         for (object, data) in self.load_segment(segment):
386             f = open(os.path.join(segdir, object), 'wb')
387             f.write(data)
388             f.close()
389
390     def load_object(self, segment, object):
391         accessed_segments.add(segment)
392         path = os.path.join(self.get_cachedir(), segment, object)
393         if not os.access(path, os.R_OK):
394             self.extract_segment(segment)
395         if segment in self._lru_list: self._lru_list.remove(segment)
396         self._lru_list.append(segment)
397         while len(self._lru_list) > self.CACHE_SIZE:
398             os.system("rm -rf " + os.path.join(self.cachedir,
399                                                self._lru_list[0]))
400             self._lru_list = self._lru_list[1:]
401         return open(path, 'rb').read()
402
403     def get(self, refstr):
404         """Fetch the given object and return it.
405
406         The input should be an object reference, in string form.
407         """
408
409         (segment, object, checksum, slice) = self.parse_ref(refstr)
410
411         if segment == "zero":
412             return "\0" * slice[1]
413
414         data = self.load_object(segment, object)
415
416         if checksum is not None:
417             verifier = ChecksumVerifier(checksum)
418             verifier.update(data)
419             if not verifier.valid():
420                 raise ValueError
421
422         if slice is not None:
423             (start, length, exact) = slice
424             if exact and len(data) != length: raise ValueError
425             data = data[start:start+length]
426             if len(data) != length: raise IndexError
427
428         return data
429
430 def parse(lines, terminate=None):
431     """Generic parser for RFC822-style "Key: Value" data streams.
432
433     This parser can be used to read metadata logs and snapshot root descriptor
434     files.
435
436     lines must be an iterable object which yields a sequence of lines of input.
437
438     If terminate is specified, it is used as a predicate to determine when to
439     stop reading input lines.
440     """
441
442     dict = {}
443     last_key = None
444
445     for l in lines:
446         # Strip off a trailing newline, if present
447         if len(l) > 0 and l[-1] == "\n":
448             l = l[:-1]
449
450         if terminate is not None and terminate(l):
451             if len(dict) > 0: yield dict
452             dict = {}
453             last_key = None
454             continue
455
456         m = re.match(r"^([-\w]+):\s*(.*)$", l)
457         if m:
458             dict[m.group(1)] = m.group(2)
459             last_key = m.group(1)
460         elif len(l) > 0 and l[0].isspace() and last_key is not None:
461             dict[last_key] += l
462         else:
463             last_key = None
464
465     if len(dict) > 0: yield dict
466
467 def parse_full(lines):
468     try:
469         return parse(lines).next()
470     except StopIteration:
471         return {}
472
473 def parse_metadata_version(s):
474     """Convert a string with the snapshot version format to a tuple."""
475
476     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
477     if m is None:
478         return ()
479     else:
480         return tuple([int(d) for d in m.group(1).split(".")])
481
482 def read_metadata(object_store, root):
483     """Iterate through all lines in the metadata log, following references."""
484
485     # Stack for keeping track of recursion when following references to
486     # portions of the log.  The last entry in the stack corresponds to the
487     # object currently being parsed.  Each entry is a list of lines which have
488     # been reversed, so that popping successive lines from the end of each list
489     # will return lines of the metadata log in order.
490     stack = []
491
492     def follow_ref(refstr):
493         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
494         lines = object_store.get(refstr).splitlines(True)
495         lines.reverse()
496         stack.append(lines)
497
498     follow_ref(root)
499
500     while len(stack) > 0:
501         top = stack[-1]
502         if len(top) == 0:
503             stack.pop()
504             continue
505         line = top.pop()
506
507         # An indirect reference which we must follow?
508         if len(line) > 0 and line[0] == '@':
509             ref = line[1:]
510             ref.strip()
511             follow_ref(ref)
512         else:
513             yield line
514
515 class MetadataItem:
516     """Metadata for a single file (or directory or...) from a snapshot."""
517
518     # Functions for parsing various datatypes that can appear in a metadata log
519     # item.
520     @staticmethod
521     def decode_int(s):
522         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
523         if s.startswith("0x"):
524             return int(s, 16)
525         elif s.startswith("0"):
526             return int(s, 8)
527         else:
528             return int(s, 10)
529
530     @staticmethod
531     def decode_str(s):
532         """Decode a URI-encoded (%xx escapes) string."""
533         return uri_decode(s)
534
535     @staticmethod
536     def raw_str(s):
537         """An unecoded string."""
538         return s
539
540     @staticmethod
541     def decode_user(s):
542         """Decode a user/group to a tuple of uid/gid followed by name."""
543         items = s.split()
544         uid = MetadataItem.decode_int(items[0])
545         name = None
546         if len(items) > 1:
547             if items[1].startswith("(") and items[1].endswith(")"):
548                 name = MetadataItem.decode_str(items[1][1:-1])
549         return (uid, name)
550
551     @staticmethod
552     def decode_device(s):
553         """Decode a device major/minor number."""
554         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
555         return (major, minor)
556
557     class Items: pass
558
559     def __init__(self, fields, object_store):
560         """Initialize from a dictionary of key/value pairs from metadata log."""
561
562         self.fields = fields
563         self.object_store = object_store
564         self.keys = []
565         self.items = self.Items()
566         for (k, v) in fields.items():
567             if k in self.field_types:
568                 decoder = self.field_types[k]
569                 setattr(self.items, k, decoder(v))
570                 self.keys.append(k)
571
572     def data(self):
573         """Return an iterator for the data blocks that make up a file."""
574
575         # This traverses the list of blocks that make up a file, following
576         # indirect references.  It is implemented in much the same way as
577         # read_metadata, so see that function for details of the technique.
578
579         objects = self.fields['data'].split()
580         objects.reverse()
581         stack = [objects]
582
583         def follow_ref(refstr):
584             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
585             objects = self.object_store.get(refstr).split()
586             objects.reverse()
587             stack.append(objects)
588
589         while len(stack) > 0:
590             top = stack[-1]
591             if len(top) == 0:
592                 stack.pop()
593                 continue
594             ref = top.pop()
595
596             # An indirect reference which we must follow?
597             if len(ref) > 0 and ref[0] == '@':
598                 follow_ref(ref[1:])
599             else:
600                 yield ref
601
602 # Description of fields that might appear, and how they should be parsed.
603 MetadataItem.field_types = {
604     'name': MetadataItem.decode_str,
605     'type': MetadataItem.raw_str,
606     'mode': MetadataItem.decode_int,
607     'device': MetadataItem.decode_device,
608     'user': MetadataItem.decode_user,
609     'group': MetadataItem.decode_user,
610     'ctime': MetadataItem.decode_int,
611     'mtime': MetadataItem.decode_int,
612     'links': MetadataItem.decode_int,
613     'inode': MetadataItem.raw_str,
614     'checksum': MetadataItem.decode_str,
615     'size': MetadataItem.decode_int,
616     'contents': MetadataItem.decode_str,
617     'target': MetadataItem.decode_str,
618 }
619
620 def iterate_metadata(object_store, root):
621     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
622         yield MetadataItem(d, object_store)
623
624 class LocalDatabase:
625     """Access to the local database of snapshot contents and object checksums.
626
627     The local database is consulted when creating a snapshot to determine what
628     data can be re-used from old snapshots.  Segment cleaning is performed by
629     manipulating the data in the local database; the local database also
630     includes enough data to guide the segment cleaning process.
631     """
632
633     def __init__(self, path, dbname="localdb.sqlite"):
634         self.db_connection = sqlite3.connect(path + "/" + dbname)
635
636     # Low-level database access.  Use these methods when there isn't a
637     # higher-level interface available.  Exception: do, however, remember to
638     # use the commit() method after making changes to make sure they are
639     # actually saved, even when going through higher-level interfaces.
640     def commit(self):
641         "Commit any pending changes to the local database."
642         self.db_connection.commit()
643
644     def rollback(self):
645         "Roll back any pending changes to the local database."
646         self.db_connection.rollback()
647
648     def cursor(self):
649         "Return a DB-API cursor for directly accessing the local database."
650         return self.db_connection.cursor()
651
652     def list_schemes(self):
653         """Return the list of snapshots found in the local database.
654
655         The returned value is a list of tuples (id, scheme, name, time, intent).
656         """
657
658         cur = self.cursor()
659         cur.execute("select distinct scheme from snapshots")
660         schemes = [row[0] for row in cur.fetchall()]
661         schemes.sort()
662         return schemes
663
664     def list_snapshots(self, scheme):
665         """Return a list of snapshots for the given scheme."""
666         cur = self.cursor()
667         cur.execute("select name from snapshots")
668         snapshots = [row[0] for row in cur.fetchall()]
669         snapshots.sort()
670         return snapshots
671
672     def delete_snapshot(self, scheme, name):
673         """Remove the specified snapshot from the database.
674
675         Warning: This does not garbage collect all dependent data in the
676         database, so it must be followed by a call to garbage_collect() to make
677         the database consistent.
678         """
679         cur = self.cursor()
680         cur.execute("delete from snapshots where scheme = ? and name = ?",
681                     (scheme, name))
682
683     def prune_old_snapshots(self, scheme, intent=1.0):
684         """Delete entries from old snapshots from the database.
685
686         Only snapshots with the specified scheme name will be deleted.  If
687         intent is given, it gives the intended next snapshot type, to determine
688         how aggressively to clean (for example, intent=7 could be used if the
689         next snapshot will be a weekly snapshot).
690         """
691
692         cur = self.cursor()
693
694         # Find the id of the last snapshot to be created.  This is used for
695         # measuring time in a way: we record this value in each segment we
696         # expire on this run, and then on a future run can tell if there have
697         # been intervening backups made.
698         cur.execute("select max(snapshotid) from snapshots")
699         last_snapshotid = cur.fetchone()[0]
700
701         # Get the list of old snapshots for this scheme.  Delete all the old
702         # ones.  Rules for what to keep:
703         #   - Always keep the most recent snapshot.
704         #   - If snapshot X is younger than Y, and X has higher intent, then Y
705         #     can be deleted.
706         cur.execute("""select snapshotid, name, intent,
707                               julianday('now') - timestamp as age
708                        from snapshots where scheme = ?
709                        order by age""", (scheme,))
710
711         first = True
712         max_intent = intent
713         for (id, name, snap_intent, snap_age) in cur.fetchall():
714             can_delete = False
715             if snap_intent < max_intent:
716                 # Delete small-intent snapshots if there is a more recent
717                 # large-intent snapshot.
718                 can_delete = True
719             elif snap_intent == intent:
720                 # Delete previous snapshots with the specified intent level.
721                 can_delete = True
722
723             if can_delete and not first:
724                 print "Delete snapshot %d (%s)" % (id, name)
725                 cur.execute("delete from snapshots where snapshotid = ?",
726                             (id,))
727             first = False
728             max_intent = max(max_intent, snap_intent)
729
730         self.garbage_collect()
731
732     def garbage_collect(self):
733         """Garbage-collect unreachable segment and object data.
734
735         Remove all segments and checksums which is not reachable from the
736         current set of snapshots stored in the local database.
737         """
738         cur = self.cursor()
739
740         # Delete entries in the segment_utilization table which are for
741         # non-existent snapshots.
742         cur.execute("""delete from segment_utilization
743                        where snapshotid not in
744                            (select snapshotid from snapshots)""")
745
746         # Delete segments not referenced by any current snapshots.
747         cur.execute("""delete from segments where segmentid not in
748                            (select segmentid from segment_utilization)""")
749
750         # Delete dangling objects in the block_index table.
751         cur.execute("""delete from block_index
752                        where segmentid not in
753                            (select segmentid from segments)""")
754
755         # Remove sub-block signatures for deleted objects.
756         cur.execute("""delete from subblock_signatures
757                        where blockid not in
758                            (select blockid from block_index)""")
759
760     # Segment cleaning.
761     class SegmentInfo(Struct): pass
762
763     def get_segment_cleaning_list(self, age_boost=0.0):
764         """Return a list of all current segments with information for cleaning.
765
766         Return all segments which are currently known in the local database
767         (there might be other, older segments in the archive itself), and
768         return usage statistics for each to help decide which segments to
769         clean.
770
771         The returned list will be sorted by estimated cleaning benefit, with
772         segments that are best to clean at the start of the list.
773
774         If specified, the age_boost parameter (measured in days) will added to
775         the age of each segment, as a way of adjusting the benefit computation
776         before a long-lived snapshot is taken (for example, age_boost might be
777         set to 7 when cleaning prior to taking a weekly snapshot).
778         """
779
780         cur = self.cursor()
781         segments = []
782         cur.execute("""select segmentid, used, size, mtime,
783                        julianday('now') - mtime as age from segment_info
784                        where expire_time is null""")
785         for row in cur:
786             info = self.SegmentInfo()
787             info.id = row[0]
788             info.used_bytes = row[1]
789             info.size_bytes = row[2]
790             info.mtime = row[3]
791             info.age_days = row[4]
792
793             # If data is not available for whatever reason, treat it as 0.0.
794             if info.age_days is None:
795                 info.age_days = 0.0
796             if info.used_bytes is None:
797                 info.used_bytes = 0.0
798
799             # Benefit calculation: u is the estimated fraction of each segment
800             # which is utilized (bytes belonging to objects still in use
801             # divided by total size; this doesn't take compression or storage
802             # overhead into account, but should give a reasonable estimate).
803             #
804             # The total benefit is a heuristic that combines several factors:
805             # the amount of space that can be reclaimed (1 - u), an ageing
806             # factor (info.age_days) that favors cleaning old segments to young
807             # ones and also is more likely to clean segments that will be
808             # rewritten for long-lived snapshots (age_boost), and finally a
809             # penalty factor for the cost of re-uploading data (u + 0.1).
810             u = info.used_bytes / info.size_bytes
811             info.cleaning_benefit \
812                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
813
814             segments.append(info)
815
816         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
817         return segments
818
819     def mark_segment_expired(self, segment):
820         """Mark a segment for cleaning in the local database.
821
822         The segment parameter should be either a SegmentInfo object or an
823         integer segment id.  Objects in the given segment will be marked as
824         expired, which means that any future snapshots that would re-use those
825         objects will instead write out a new copy of the object, and thus no
826         future snapshots will depend upon the given segment.
827         """
828
829         if isinstance(segment, int):
830             id = segment
831         elif isinstance(segment, self.SegmentInfo):
832             id = segment.id
833         else:
834             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
835
836         cur = self.cursor()
837         cur.execute("select max(snapshotid) from snapshots")
838         last_snapshotid = cur.fetchone()[0]
839         cur.execute("update segments set expire_time = ? where segmentid = ?",
840                     (last_snapshotid, id))
841         cur.execute("update block_index set expired = 0 where segmentid = ?",
842                     (id,))
843
844     def balance_expired_objects(self):
845         """Analyze expired objects in segments to be cleaned and group by age.
846
847         Update the block_index table of the local database to group expired
848         objects by age.  The exact number of buckets and the cutoffs for each
849         are dynamically determined.  Calling this function after marking
850         segments expired will help in the segment cleaning process, by ensuring
851         that when active objects from clean segments are rewritten, they will
852         be placed into new segments roughly grouped by age.
853         """
854
855         # The expired column of the block_index table is used when generating a
856         # new Cumulus snapshot.  A null value indicates that an object may be
857         # re-used.  Otherwise, an object must be written into a new segment if
858         # needed.  Objects with distinct expired values will be written into
859         # distinct segments, to allow for some grouping by age.  The value 0 is
860         # somewhat special in that it indicates any rewritten objects can be
861         # placed in the same segment as completely new objects; this can be
862         # used for very young objects which have been expired, or objects not
863         # expected to be encountered.
864         #
865         # In the balancing process, all objects which are not used in any
866         # current snapshots will have expired set to 0.  Objects which have
867         # been seen will be sorted by age and will have expired values set to
868         # 0, 1, 2, and so on based on age (with younger objects being assigned
869         # lower values).  The number of buckets and the age cutoffs is
870         # determined by looking at the distribution of block ages.
871
872         cur = self.cursor()
873
874         # Mark all expired objects with expired = 0; these objects will later
875         # have values set to indicate groupings of objects when repacking.
876         cur.execute("""update block_index set expired = 0
877                        where expired is not null""")
878
879         # We will want to aim for at least one full segment for each bucket
880         # that we eventually create, but don't know how many bytes that should
881         # be due to compression.  So compute the average number of bytes in
882         # each expired segment as a rough estimate for the minimum size of each
883         # bucket.  (This estimate could be thrown off by many not-fully-packed
884         # segments, but for now don't worry too much about that.)  If we can't
885         # compute an average, it's probably because there are no expired
886         # segments, so we have no more work to do.
887         cur.execute("""select avg(size) from segments
888                        where segmentid in
889                            (select distinct segmentid from block_index
890                             where expired is not null)""")
891         segment_size_estimate = cur.fetchone()[0]
892         if not segment_size_estimate:
893             return
894
895         # Next, extract distribution of expired objects (number and size) by
896         # age.  Save the timestamp for "now" so that the classification of
897         # blocks into age buckets will not change later in the function, after
898         # time has passed.  Set any timestamps in the future to now, so we are
899         # guaranteed that for the rest of this function, age is always
900         # non-negative.
901         cur.execute("select julianday('now')")
902         now = cur.fetchone()[0]
903
904         cur.execute("""update block_index set timestamp = ?
905                        where timestamp > ? and expired is not null""",
906                     (now, now))
907
908         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
909                        from block_index where expired = 0
910                        group by age order by age""", (now,))
911         distribution = cur.fetchall()
912
913         # Start to determine the buckets for expired objects.  Heuristics used:
914         #   - An upper bound on the number of buckets is given by the number of
915         #     segments we estimate it will take to store all data.  In fact,
916         #     aim for a couple of segments per bucket.
917         #   - Place very young objects in bucket 0 (place with new objects)
918         #     unless there are enough of them to warrant a separate bucket.
919         #   - Try not to create unnecessarily many buckets, since fewer buckets
920         #     will allow repacked data to be grouped based on spatial locality
921         #     (while more buckets will group by temporal locality).  We want a
922         #     balance.
923         MIN_AGE = 4
924         total_bytes = sum([i[2] for i in distribution])
925         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
926         min_size = 1.5 * segment_size_estimate
927         target_size = max(2 * segment_size_estimate,
928                           total_bytes / target_buckets)
929
930         print "segment_size:", segment_size_estimate
931         print "distribution:", distribution
932         print "total_bytes:", total_bytes
933         print "target_buckets:", target_buckets
934         print "min, target size:", min_size, target_size
935
936         # Chosen cutoffs.  Each bucket consists of objects with age greater
937         # than one cutoff value, but not greater than the next largest cutoff.
938         cutoffs = []
939
940         # Starting with the oldest objects, begin grouping together into
941         # buckets of size at least target_size bytes.
942         distribution.reverse()
943         bucket_size = 0
944         min_age_bucket = False
945         for (age, items, size) in distribution:
946             if bucket_size >= target_size \
947                 or (age < MIN_AGE and not min_age_bucket):
948                 if bucket_size < target_size and len(cutoffs) > 0:
949                     cutoffs.pop()
950                 cutoffs.append(age)
951                 bucket_size = 0
952
953             bucket_size += size
954             if age < MIN_AGE:
955                 min_age_bucket = True
956
957         # The last (youngest) bucket will be group 0, unless it has enough data
958         # to be of size min_size by itself, or there happen to be no objects
959         # less than MIN_AGE at all.
960         if bucket_size >= min_size or not min_age_bucket:
961             cutoffs.append(-1)
962         cutoffs.append(-1)
963
964         print "cutoffs:", cutoffs
965
966         # Update the database to assign each object to the appropriate bucket.
967         cutoffs.reverse()
968         for i in range(len(cutoffs)):
969             cur.execute("""update block_index set expired = ?
970                            where round(? - timestamp) > ?
971                              and expired is not null""",
972                         (i, now, cutoffs[i]))