b3516583d607b30ce20a2637ca421615b6d7007f
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division, print_function, unicode_literals
30
31 import codecs
32 import hashlib
33 import itertools
34 import os
35 import posixpath
36 import re
37 import sqlite3
38 import subprocess
39 import sys
40 import tarfile
41 import tempfile
42 try:
43     import _thread
44 except ImportError:
45     import thread as _thread
46
47 import cumulus.store
48 import cumulus.store.file
49
50 if sys.version < "3":
51     StringTypes = (str, unicode)
52 else:
53     StringTypes = (str,)
54
55 # The largest supported snapshot format that can be understood.
56 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
57
58 # Maximum number of nested indirect references allowed in a snapshot.
59 MAX_RECURSION_DEPTH = 3
60
61 # All segments which have been accessed this session.
62 accessed_segments = set()
63
64 # Table of methods used to filter segments before storage, and corresponding
65 # filename extensions.  These are listed in priority order (methods earlier in
66 # the list are tried first).
67 SEGMENT_FILTERS = [
68     (".gpg", "cumulus-filter-gpg --decrypt"),
69     (".gz", "gzip -dc"),
70     (".bz2", "bzip2 -dc"),
71     ("", None),
72 ]
73
74 def to_lines(data):
75     """Decode binary data from a file into a sequence of lines.
76
77     Newline markers are retained."""
78     return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
79
80 def uri_decode(s):
81     """Decode a URI-encoded (%xx escapes) string."""
82     def hex_decode(m): return chr(int(m.group(1), 16))
83     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
84 def uri_encode(s):
85     """Encode a string to URI-encoded (%xx escapes) form."""
86     def hex_encode(c):
87         if c > '+' and c < '\x7f' and c != '@':
88             return c
89         else:
90             return "%%%02x" % (ord(c),)
91     return ''.join(hex_encode(c) for c in s)
92
93 class Struct:
94     """A class which merely acts as a data container.
95
96     Instances of this class (or its subclasses) are merely used to store data
97     in various attributes.  No methods are provided.
98     """
99
100     def __repr__(self):
101         return "<%s %s>" % (self.__class__, self.__dict__)
102
103 CHECKSUM_ALGORITHMS = {
104     'sha1': hashlib.sha1,
105     'sha224': hashlib.sha224,
106     'sha256': hashlib.sha256,
107 }
108
109 class ChecksumCreator:
110     """Compute a Cumulus checksum for provided data.
111
112     The algorithm used is selectable, but currently defaults to sha1.
113     """
114
115     def __init__(self, algorithm='sha1'):
116         self.algorithm = algorithm
117         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
118
119     def update(self, data):
120         self.hash.update(data)
121         return self
122
123     def compute(self):
124         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
125
126 class ChecksumVerifier:
127     """Verify whether a checksum from a snapshot matches the supplied data."""
128
129     def __init__(self, checksumstr):
130         """Create an object to check the supplied checksum."""
131
132         (algo, checksum) = checksumstr.split("=", 1)
133         self.checksum = checksum
134         self.hash = CHECKSUM_ALGORITHMS[algo]()
135
136     def update(self, data):
137         self.hash.update(data)
138
139     def valid(self):
140         """Return a boolean indicating whether the checksum matches."""
141
142         result = self.hash.hexdigest()
143         return result == self.checksum
144
145 class SearchPathEntry(object):
146     """Item representing a possible search location for Cumulus files.
147
148     Some Cumulus files might be stored in multiple possible file locations: due
149     to format (different compression mechanisms with different extensions),
150     locality (different segments might be placed in different directories to
151     control archiving policies), for backwards compatibility (default location
152     changed over time).  A SearchPathEntry describes a possible location for a
153     file.
154     """
155     def __init__(self, directory_prefix, suffix, context=None):
156         self._directory_prefix = directory_prefix
157         self._suffix = suffix
158         self._context = context
159
160     def __repr__(self):
161         return "%s(%r, %r, %r)" % (self.__class__.__name__,
162                                    self._directory_prefix, self._suffix,
163                                    self._context)
164
165     def build_path(self, basename):
166         """Construct the search path to use for a file with name basename.
167
168         Returns a tuple (pathname, context), where pathname is the path to try
169         and context is any additional data associated with this search entry
170         (if any).
171         """
172         return (posixpath.join(self._directory_prefix, basename + self._suffix),
173                 self._context)
174
175 class SearchPath(object):
176     """A collection of locations to search for files and lookup utilities.
177
178     For looking for a file in a Cumulus storage backend, a SearchPath object
179     contains a list of possible locations to try.  A SearchPath can be used to
180     perform the search as well; when a file is found the search path ordering
181     is updated (moving the successful SearchPathEntry to the front of the list
182     for future searches).
183     """
184     def __init__(self, name_regex, searchpath):
185         self._regex = re.compile(name_regex)
186         self._path = list(searchpath)
187
188     def add_search_entry(self, entry):
189         self._path.append(entry)
190
191     def directories(self):
192         """Return the set of directories to search for a file type."""
193         return set(entry._directory_prefix for entry in self._path)
194
195     def get(self, backend, basename):
196         for (i, entry) in enumerate(self._path):
197             try:
198                 (pathname, context) = entry.build_path(basename)
199                 fp = backend.get(pathname)
200                 # On success, move this entry to the front of the search path
201                 # to speed future searches.
202                 if i > 0:
203                     self._path.pop(i)
204                     self._path.insert(0, entry)
205                 return (fp, pathname, context)
206             except cumulus.store.NotFoundError:
207                 continue
208         raise cumulus.store.NotFoundError(basename)
209
210     def stat(self, backend, basename):
211         for (i, entry) in enumerate(self._path):
212             try:
213                 (pathname, context) = entry.build_path(basename)
214                 stat_data = backend.stat(pathname)
215                 # On success, move this entry to the front of the search path
216                 # to speed future searches.
217                 if i > 0:
218                     self._path.pop(i)
219                     self._path.insert(0, entry)
220                 result = {"path": pathname}
221                 result.update(stat_data)
222                 return result
223             except cumulus.store.NotFoundError:
224                 continue
225         raise cumulus.store.NotFoundError(basename)
226
227     def match(self, filename):
228         return self._regex.match(filename)
229
230     def list(self, backend):
231         success = False
232         for d in self.directories():
233             try:
234                 for f in backend.list(d):
235                     success = True
236                     m = self.match(f)
237                     if m: yield (posixpath.join(d, f), m)
238             except cumulus.store.NotFoundError:
239                 pass
240         if not success:
241             raise cumulus.store.NotFoundError(backend)
242
243 def _build_segments_searchpath(prefix):
244     for (extension, filter) in SEGMENT_FILTERS:
245         yield SearchPathEntry(prefix, extension, filter)
246
247 SEARCH_PATHS = {
248     "checksums": SearchPath(
249         r"^snapshot-(.*)\.(\w+)sums$",
250         [SearchPathEntry("meta", ".sha1sums"),
251          SearchPathEntry("checksums", ".sha1sums"),
252          SearchPathEntry("", ".sha1sums")]),
253     "meta": SearchPath(
254         r"^snapshot-(.*)\.meta(\.\S+)?$",
255         _build_segments_searchpath("meta")),
256     "segments": SearchPath(
257         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
258          r"\.tar(\.\S+)?$"),
259         itertools.chain(
260             _build_segments_searchpath("segments0"),
261             _build_segments_searchpath("segments1"),
262             _build_segments_searchpath(""),
263             _build_segments_searchpath("segments"))),
264     "snapshots": SearchPath(
265         r"^snapshot-(.*)\.(cumulus|lbs)$",
266         [SearchPathEntry("snapshots", ".cumulus"),
267          SearchPathEntry("snapshots", ".lbs"),
268          SearchPathEntry("", ".cumulus"),
269          SearchPathEntry("", ".lbs")]),
270 }
271
272 class BackendWrapper(object):
273     """Wrapper around a Cumulus storage backend that understands file types.
274
275     The BackendWrapper class understands different Cumulus file types, such as
276     snapshots and segments, and implements higher-level operations such as
277     "retrieve a snapshot with a specific name" (hiding operations such as
278     searching for the correct file name).
279     """
280
281     def __init__(self, backend):
282         """Initializes a wrapper around the specified storage backend.
283
284         store may either be a Store object or URL.
285         """
286         if type(backend) in StringTypes:
287             self._backend = cumulus.store.open(backend)
288         else:
289             self._backend = backend
290
291     @property
292     def raw_backend(self):
293         return self._backend
294
295     def stat_generic(self, basename, filetype):
296         return SEARCH_PATHS[filetype].stat(self._backend, basename)
297
298     def open_generic(self, basename, filetype):
299         return SEARCH_PATHS[filetype].get(self._backend, basename)
300
301     def open_snapshot(self, name):
302         return self.open_generic("snapshot-" + name, "snapshots")
303
304     def open_segment(self, name):
305         return self.open_generic(name + ".tar", "segments")
306
307     def list_generic(self, filetype):
308         return ((x[1].group(1), x[0])
309                 for x in SEARCH_PATHS[filetype].list(self._backend))
310
311     def prefetch_generic(self):
312         """Calls scan on directories to prefetch file metadata."""
313         directories = set()
314         for typeinfo in SEARCH_PATHS.values():
315             directories.update(typeinfo.directories())
316         for d in directories:
317             print("Prefetch", d)
318             self._backend.scan(d)
319
320 class CumulusStore:
321     def __init__(self, backend):
322         if isinstance(backend, BackendWrapper):
323             self.backend = backend
324         else:
325             self.backend = BackendWrapper(backend)
326         self.cachedir = None
327         self.CACHE_SIZE = 16
328         self._lru_list = []
329
330     def get_cachedir(self):
331         if self.cachedir is None:
332             self.cachedir = tempfile.mkdtemp("-cumulus")
333         return self.cachedir
334
335     def cleanup(self):
336         if self.cachedir is not None:
337             # TODO: Avoid use of system, make this safer
338             os.system("rm -rf " + self.cachedir)
339         self.cachedir = None
340
341     @staticmethod
342     def parse_ref(refstr):
343         m = re.match(r"^zero\[(\d+)\]$", refstr)
344         if m:
345             return ("zero", None, None, (0, int(m.group(1)), False))
346
347         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(=?(\d+)|(\d+)\+(\d+))\])?$", refstr)
348         if not m: return
349
350         segment = m.group(1)
351         object = m.group(2)
352         checksum = m.group(3)
353         slice = m.group(4)
354
355         if checksum is not None:
356             checksum = checksum.lstrip("(").rstrip(")")
357
358         if slice is not None:
359             if m.group(6) is not None:
360                 # Size-assertion slice
361                 slice = (0, int(m.group(6)), True)
362             else:
363                 slice = (int(m.group(7)), int(m.group(8)), False)
364
365         return (segment, object, checksum, slice)
366
367     def list_snapshots(self):
368         return set(x[0] for x in self.backend.list_generic("snapshots"))
369
370     def list_segments(self):
371         return set(x[0] for x in self.backend.list_generic("segments"))
372
373     def load_snapshot(self, snapshot):
374         snapshot_file = self.backend.open_snapshot(snapshot)[0]
375         return to_lines(snapshot_file.read())
376
377     @staticmethod
378     def filter_data(filehandle, filter_cmd):
379         if filter_cmd is None:
380             return filehandle
381         p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
382                              stdout=subprocess.PIPE, close_fds=True)
383         input, output = p.stdin, p.stdout
384         def copy_thread(src, dst):
385             BLOCK_SIZE = 4096
386             while True:
387                 block = src.read(BLOCK_SIZE)
388                 if len(block) == 0: break
389                 dst.write(block)
390             src.close()
391             dst.close()
392             p.wait()
393         _thread.start_new_thread(copy_thread, (filehandle, input))
394         return output
395
396     def get_segment(self, segment):
397         accessed_segments.add(segment)
398
399         (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
400         return self.filter_data(segment_fp, filter_cmd)
401
402     def load_segment(self, segment):
403         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
404         for item in seg:
405             data_obj = seg.extractfile(item)
406             path = item.name.split('/')
407             if len(path) == 2 and path[0] == segment:
408                 yield (path[1], data_obj.read())
409
410     def extract_segment(self, segment):
411         segdir = os.path.join(self.get_cachedir(), segment)
412         os.mkdir(segdir)
413         for (object, data) in self.load_segment(segment):
414             f = open(os.path.join(segdir, object), 'wb')
415             f.write(data)
416             f.close()
417
418     def load_object(self, segment, object):
419         accessed_segments.add(segment)
420         path = os.path.join(self.get_cachedir(), segment, object)
421         if not os.access(path, os.R_OK):
422             self.extract_segment(segment)
423         if segment in self._lru_list: self._lru_list.remove(segment)
424         self._lru_list.append(segment)
425         while len(self._lru_list) > self.CACHE_SIZE:
426             os.system("rm -rf " + os.path.join(self.cachedir,
427                                                self._lru_list[0]))
428             self._lru_list = self._lru_list[1:]
429         return open(path, 'rb').read()
430
431     def get(self, refstr):
432         """Fetch the given object and return it.
433
434         The input should be an object reference, in string form.
435         """
436
437         (segment, object, checksum, slice) = self.parse_ref(refstr)
438
439         if segment == "zero":
440             return "\0" * slice[1]
441
442         data = self.load_object(segment, object)
443
444         if checksum is not None:
445             verifier = ChecksumVerifier(checksum)
446             verifier.update(data)
447             if not verifier.valid():
448                 raise ValueError
449
450         if slice is not None:
451             (start, length, exact) = slice
452             # Note: The following assertion check may need to be commented out
453             # to restore from pre-v0.8 snapshots, as the syntax for
454             # size-assertion slices has changed.
455             if exact and len(data) != length: raise ValueError
456             data = data[start:start+length]
457             if len(data) != length: raise IndexError
458
459         return data
460
461     def prefetch(self):
462         self.backend.prefetch_generic()
463
464 def parse(lines, terminate=None):
465     """Generic parser for RFC822-style "Key: Value" data streams.
466
467     This parser can be used to read metadata logs and snapshot root descriptor
468     files.
469
470     lines must be an iterable object which yields a sequence of lines of input.
471
472     If terminate is specified, it is used as a predicate to determine when to
473     stop reading input lines.
474     """
475
476     result = {}
477     last_key = None
478
479     def make_result(result):
480         return dict((k, "".join(v)) for (k, v) in result.items())
481
482     for l in lines:
483         # Strip off a trailing newline, if present
484         if len(l) > 0 and l[-1] == "\n":
485             l = l[:-1]
486
487         if terminate is not None and terminate(l):
488             if len(result) > 0: yield make_result(result)
489             result = {}
490             last_key = None
491             continue
492
493         m = re.match(r"^([-\w]+):\s*(.*)$", l)
494         if m:
495             result[m.group(1)] = [m.group(2)]
496             last_key = m.group(1)
497         elif len(l) > 0 and l[0].isspace() and last_key is not None:
498             result[last_key].append(l)
499         else:
500             last_key = None
501
502     if len(result) > 0: yield make_result(result)
503
504 def parse_full(lines):
505     try:
506         return next(parse(lines))
507     except StopIteration:
508         return {}
509
510 def parse_metadata_version(s):
511     """Convert a string with the snapshot version format to a tuple."""
512
513     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
514     if m is None:
515         return ()
516     else:
517         return tuple([int(d) for d in m.group(1).split(".")])
518
519 def read_metadata(object_store, root):
520     """Iterate through all lines in the metadata log, following references."""
521
522     # Stack for keeping track of recursion when following references to
523     # portions of the log.  The last entry in the stack corresponds to the
524     # object currently being parsed.  Each entry is a list of lines which have
525     # been reversed, so that popping successive lines from the end of each list
526     # will return lines of the metadata log in order.
527     stack = []
528
529     def follow_ref(refstr):
530         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
531         lines = to_lines(object_store.get(refstr))
532         lines.reverse()
533         stack.append(lines)
534
535     follow_ref(root)
536
537     while len(stack) > 0:
538         top = stack[-1]
539         if len(top) == 0:
540             stack.pop()
541             continue
542         line = top.pop()
543
544         # An indirect reference which we must follow?
545         if len(line) > 0 and line[0] == '@':
546             ref = line[1:]
547             ref.strip()
548             follow_ref(ref)
549         else:
550             yield line
551
552 class MetadataItem:
553     """Metadata for a single file (or directory or...) from a snapshot."""
554
555     # Functions for parsing various datatypes that can appear in a metadata log
556     # item.
557     @staticmethod
558     def decode_int(s):
559         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
560         if s.startswith("0x"):
561             return int(s, 16)
562         elif s.startswith("0"):
563             return int(s, 8)
564         else:
565             return int(s, 10)
566
567     @staticmethod
568     def decode_str(s):
569         """Decode a URI-encoded (%xx escapes) string."""
570         return uri_decode(s)
571
572     @staticmethod
573     def raw_str(s):
574         """An unecoded string."""
575         return s
576
577     @staticmethod
578     def decode_user(s):
579         """Decode a user/group to a tuple of uid/gid followed by name."""
580         items = s.split()
581         uid = MetadataItem.decode_int(items[0])
582         name = None
583         if len(items) > 1:
584             if items[1].startswith("(") and items[1].endswith(")"):
585                 name = MetadataItem.decode_str(items[1][1:-1])
586         return (uid, name)
587
588     @staticmethod
589     def decode_device(s):
590         """Decode a device major/minor number."""
591         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
592         return (major, minor)
593
594     class Items: pass
595
596     def __init__(self, fields, object_store):
597         """Initialize from a dictionary of key/value pairs from metadata log."""
598
599         self.fields = fields
600         self.object_store = object_store
601         self.keys = []
602         self.items = self.Items()
603         for (k, v) in fields.items():
604             if k in self.field_types:
605                 decoder = self.field_types[k]
606                 setattr(self.items, k, decoder(v))
607                 self.keys.append(k)
608
609     def data(self):
610         """Return an iterator for the data blocks that make up a file."""
611
612         # This traverses the list of blocks that make up a file, following
613         # indirect references.  It is implemented in much the same way as
614         # read_metadata, so see that function for details of the technique.
615
616         objects = self.fields['data'].split()
617         objects.reverse()
618         stack = [objects]
619
620         def follow_ref(refstr):
621             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
622             objects = self.object_store.get(refstr).split()
623             objects.reverse()
624             stack.append(objects)
625
626         while len(stack) > 0:
627             top = stack[-1]
628             if len(top) == 0:
629                 stack.pop()
630                 continue
631             ref = top.pop()
632
633             # An indirect reference which we must follow?
634             if len(ref) > 0 and ref[0] == '@':
635                 follow_ref(ref[1:])
636             else:
637                 yield ref
638
639 # Description of fields that might appear, and how they should be parsed.
640 MetadataItem.field_types = {
641     'name': MetadataItem.decode_str,
642     'type': MetadataItem.raw_str,
643     'mode': MetadataItem.decode_int,
644     'device': MetadataItem.decode_device,
645     'user': MetadataItem.decode_user,
646     'group': MetadataItem.decode_user,
647     'ctime': MetadataItem.decode_int,
648     'mtime': MetadataItem.decode_int,
649     'links': MetadataItem.decode_int,
650     'inode': MetadataItem.raw_str,
651     'checksum': MetadataItem.decode_str,
652     'size': MetadataItem.decode_int,
653     'contents': MetadataItem.decode_str,
654     'target': MetadataItem.decode_str,
655 }
656
657 def iterate_metadata(object_store, root):
658     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
659         yield MetadataItem(d, object_store)
660
661 class LocalDatabase:
662     """Access to the local database of snapshot contents and object checksums.
663
664     The local database is consulted when creating a snapshot to determine what
665     data can be re-used from old snapshots.  Segment cleaning is performed by
666     manipulating the data in the local database; the local database also
667     includes enough data to guide the segment cleaning process.
668     """
669
670     def __init__(self, path, dbname="localdb.sqlite"):
671         self.db_connection = sqlite3.connect(path + "/" + dbname)
672
673     # Low-level database access.  Use these methods when there isn't a
674     # higher-level interface available.  Exception: do, however, remember to
675     # use the commit() method after making changes to make sure they are
676     # actually saved, even when going through higher-level interfaces.
677     def commit(self):
678         "Commit any pending changes to the local database."
679         self.db_connection.commit()
680
681     def rollback(self):
682         "Roll back any pending changes to the local database."
683         self.db_connection.rollback()
684
685     def cursor(self):
686         "Return a DB-API cursor for directly accessing the local database."
687         return self.db_connection.cursor()
688
689     def list_schemes(self):
690         """Return the list of snapshots found in the local database.
691
692         The returned value is a list of tuples (id, scheme, name, time, intent).
693         """
694
695         cur = self.cursor()
696         cur.execute("select distinct scheme from snapshots")
697         schemes = [row[0] for row in cur.fetchall()]
698         schemes.sort()
699         return schemes
700
701     def list_snapshots(self, scheme):
702         """Return a list of snapshots for the given scheme."""
703         cur = self.cursor()
704         cur.execute("select name from snapshots")
705         snapshots = [row[0] for row in cur.fetchall()]
706         snapshots.sort()
707         return snapshots
708
709     def delete_snapshot(self, scheme, name):
710         """Remove the specified snapshot from the database.
711
712         Warning: This does not garbage collect all dependent data in the
713         database, so it must be followed by a call to garbage_collect() to make
714         the database consistent.
715         """
716         cur = self.cursor()
717         cur.execute("delete from snapshots where scheme = ? and name = ?",
718                     (scheme, name))
719
720     def prune_old_snapshots(self, scheme, intent=1.0):
721         """Delete entries from old snapshots from the database.
722
723         Only snapshots with the specified scheme name will be deleted.  If
724         intent is given, it gives the intended next snapshot type, to determine
725         how aggressively to clean (for example, intent=7 could be used if the
726         next snapshot will be a weekly snapshot).
727         """
728
729         cur = self.cursor()
730
731         # Find the id of the last snapshot to be created.  This is used for
732         # measuring time in a way: we record this value in each segment we
733         # expire on this run, and then on a future run can tell if there have
734         # been intervening backups made.
735         cur.execute("select max(snapshotid) from snapshots")
736         last_snapshotid = cur.fetchone()[0]
737
738         # Get the list of old snapshots for this scheme.  Delete all the old
739         # ones.  Rules for what to keep:
740         #   - Always keep the most recent snapshot.
741         #   - If snapshot X is younger than Y, and X has higher intent, then Y
742         #     can be deleted.
743         cur.execute("""select snapshotid, name, intent,
744                               julianday('now') - timestamp as age
745                        from snapshots where scheme = ?
746                        order by age""", (scheme,))
747
748         first = True
749         max_intent = intent
750         for (id, name, snap_intent, snap_age) in cur.fetchall():
751             can_delete = False
752             if snap_intent < max_intent:
753                 # Delete small-intent snapshots if there is a more recent
754                 # large-intent snapshot.
755                 can_delete = True
756             elif snap_intent == intent:
757                 # Delete previous snapshots with the specified intent level.
758                 can_delete = True
759
760             if can_delete and not first:
761                 print("Delete snapshot %d (%s)" % (id, name))
762                 cur.execute("delete from snapshots where snapshotid = ?",
763                             (id,))
764             first = False
765             max_intent = max(max_intent, snap_intent)
766
767         self.garbage_collect()
768
769     def garbage_collect(self):
770         """Garbage-collect unreachable segment and object data.
771
772         Remove all segments and checksums which is not reachable from the
773         current set of snapshots stored in the local database.
774         """
775         cur = self.cursor()
776
777         # Delete entries in the segment_utilization table which are for
778         # non-existent snapshots.
779         cur.execute("""delete from segment_utilization
780                        where snapshotid not in
781                            (select snapshotid from snapshots)""")
782
783         # Delete segments not referenced by any current snapshots.
784         cur.execute("""delete from segments where segmentid not in
785                            (select segmentid from segment_utilization)""")
786
787         # Delete dangling objects in the block_index table.
788         cur.execute("""delete from block_index
789                        where segmentid not in
790                            (select segmentid from segments)""")
791
792         # Remove sub-block signatures for deleted objects.
793         cur.execute("""delete from subblock_signatures
794                        where blockid not in
795                            (select blockid from block_index)""")
796
797     # Segment cleaning.
798     class SegmentInfo(Struct): pass
799
800     def get_segment_cleaning_list(self, age_boost=0.0):
801         """Return a list of all current segments with information for cleaning.
802
803         Return all segments which are currently known in the local database
804         (there might be other, older segments in the archive itself), and
805         return usage statistics for each to help decide which segments to
806         clean.
807
808         The returned list will be sorted by estimated cleaning benefit, with
809         segments that are best to clean at the start of the list.
810
811         If specified, the age_boost parameter (measured in days) will added to
812         the age of each segment, as a way of adjusting the benefit computation
813         before a long-lived snapshot is taken (for example, age_boost might be
814         set to 7 when cleaning prior to taking a weekly snapshot).
815         """
816
817         cur = self.cursor()
818         segments = []
819         cur.execute("""select segmentid, used, size, mtime,
820                        julianday('now') - mtime as age from segment_info
821                        where expire_time is null""")
822         for row in cur:
823             info = self.SegmentInfo()
824             info.id = row[0]
825             info.used_bytes = row[1]
826             info.size_bytes = row[2]
827             info.mtime = row[3]
828             info.age_days = row[4]
829
830             # If data is not available for whatever reason, treat it as 0.0.
831             if info.age_days is None:
832                 info.age_days = 0.0
833             if info.used_bytes is None:
834                 info.used_bytes = 0.0
835
836             # Benefit calculation: u is the estimated fraction of each segment
837             # which is utilized (bytes belonging to objects still in use
838             # divided by total size; this doesn't take compression or storage
839             # overhead into account, but should give a reasonable estimate).
840             #
841             # The total benefit is a heuristic that combines several factors:
842             # the amount of space that can be reclaimed (1 - u), an ageing
843             # factor (info.age_days) that favors cleaning old segments to young
844             # ones and also is more likely to clean segments that will be
845             # rewritten for long-lived snapshots (age_boost), and finally a
846             # penalty factor for the cost of re-uploading data (u + 0.1).
847             u = info.used_bytes / info.size_bytes
848             info.cleaning_benefit \
849                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
850
851             segments.append(info)
852
853         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
854         return segments
855
856     def mark_segment_expired(self, segment):
857         """Mark a segment for cleaning in the local database.
858
859         The segment parameter should be either a SegmentInfo object or an
860         integer segment id.  Objects in the given segment will be marked as
861         expired, which means that any future snapshots that would re-use those
862         objects will instead write out a new copy of the object, and thus no
863         future snapshots will depend upon the given segment.
864         """
865
866         if isinstance(segment, int):
867             id = segment
868         elif isinstance(segment, self.SegmentInfo):
869             id = segment.id
870         else:
871             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
872
873         cur = self.cursor()
874         cur.execute("select max(snapshotid) from snapshots")
875         last_snapshotid = cur.fetchone()[0]
876         cur.execute("update segments set expire_time = ? where segmentid = ?",
877                     (last_snapshotid, id))
878         cur.execute("update block_index set expired = 0 where segmentid = ?",
879                     (id,))
880
881     def balance_expired_objects(self):
882         """Analyze expired objects in segments to be cleaned and group by age.
883
884         Update the block_index table of the local database to group expired
885         objects by age.  The exact number of buckets and the cutoffs for each
886         are dynamically determined.  Calling this function after marking
887         segments expired will help in the segment cleaning process, by ensuring
888         that when active objects from clean segments are rewritten, they will
889         be placed into new segments roughly grouped by age.
890         """
891
892         # The expired column of the block_index table is used when generating a
893         # new Cumulus snapshot.  A null value indicates that an object may be
894         # re-used.  Otherwise, an object must be written into a new segment if
895         # needed.  Objects with distinct expired values will be written into
896         # distinct segments, to allow for some grouping by age.  The value 0 is
897         # somewhat special in that it indicates any rewritten objects can be
898         # placed in the same segment as completely new objects; this can be
899         # used for very young objects which have been expired, or objects not
900         # expected to be encountered.
901         #
902         # In the balancing process, all objects which are not used in any
903         # current snapshots will have expired set to 0.  Objects which have
904         # been seen will be sorted by age and will have expired values set to
905         # 0, 1, 2, and so on based on age (with younger objects being assigned
906         # lower values).  The number of buckets and the age cutoffs is
907         # determined by looking at the distribution of block ages.
908
909         cur = self.cursor()
910
911         # Mark all expired objects with expired = 0; these objects will later
912         # have values set to indicate groupings of objects when repacking.
913         cur.execute("""update block_index set expired = 0
914                        where expired is not null""")
915
916         # We will want to aim for at least one full segment for each bucket
917         # that we eventually create, but don't know how many bytes that should
918         # be due to compression.  So compute the average number of bytes in
919         # each expired segment as a rough estimate for the minimum size of each
920         # bucket.  (This estimate could be thrown off by many not-fully-packed
921         # segments, but for now don't worry too much about that.)  If we can't
922         # compute an average, it's probably because there are no expired
923         # segments, so we have no more work to do.
924         cur.execute("""select avg(size) from segments
925                        where segmentid in
926                            (select distinct segmentid from block_index
927                             where expired is not null)""")
928         segment_size_estimate = cur.fetchone()[0]
929         if not segment_size_estimate:
930             return
931
932         # Next, extract distribution of expired objects (number and size) by
933         # age.  Save the timestamp for "now" so that the classification of
934         # blocks into age buckets will not change later in the function, after
935         # time has passed.  Set any timestamps in the future to now, so we are
936         # guaranteed that for the rest of this function, age is always
937         # non-negative.
938         cur.execute("select julianday('now')")
939         now = cur.fetchone()[0]
940
941         cur.execute("""update block_index set timestamp = ?
942                        where timestamp > ? and expired is not null""",
943                     (now, now))
944
945         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
946                        from block_index where expired = 0
947                        group by age order by age""", (now,))
948         distribution = cur.fetchall()
949
950         # Start to determine the buckets for expired objects.  Heuristics used:
951         #   - An upper bound on the number of buckets is given by the number of
952         #     segments we estimate it will take to store all data.  In fact,
953         #     aim for a couple of segments per bucket.
954         #   - Place very young objects in bucket 0 (place with new objects)
955         #     unless there are enough of them to warrant a separate bucket.
956         #   - Try not to create unnecessarily many buckets, since fewer buckets
957         #     will allow repacked data to be grouped based on spatial locality
958         #     (while more buckets will group by temporal locality).  We want a
959         #     balance.
960         MIN_AGE = 4
961         total_bytes = sum([i[2] for i in distribution])
962         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
963         min_size = 1.5 * segment_size_estimate
964         target_size = max(2 * segment_size_estimate,
965                           total_bytes / target_buckets)
966
967         print("segment_size:", segment_size_estimate)
968         print("distribution:", distribution)
969         print("total_bytes:", total_bytes)
970         print("target_buckets:", target_buckets)
971         print("min, target size:", min_size, target_size)
972
973         # Chosen cutoffs.  Each bucket consists of objects with age greater
974         # than one cutoff value, but not greater than the next largest cutoff.
975         cutoffs = []
976
977         # Starting with the oldest objects, begin grouping together into
978         # buckets of size at least target_size bytes.
979         distribution.reverse()
980         bucket_size = 0
981         min_age_bucket = False
982         for (age, items, size) in distribution:
983             if bucket_size >= target_size \
984                 or (age < MIN_AGE and not min_age_bucket):
985                 if bucket_size < target_size and len(cutoffs) > 0:
986                     cutoffs.pop()
987                 cutoffs.append(age)
988                 bucket_size = 0
989
990             bucket_size += size
991             if age < MIN_AGE:
992                 min_age_bucket = True
993
994         # The last (youngest) bucket will be group 0, unless it has enough data
995         # to be of size min_size by itself, or there happen to be no objects
996         # less than MIN_AGE at all.
997         if bucket_size >= min_size or not min_age_bucket:
998             cutoffs.append(-1)
999         cutoffs.append(-1)
1000
1001         print("cutoffs:", cutoffs)
1002
1003         # Update the database to assign each object to the appropriate bucket.
1004         cutoffs.reverse()
1005         for i in range(len(cutoffs)):
1006             cur.execute("""update block_index set expired = ?
1007                            where round(? - timestamp) > ?
1008                              and expired is not null""",
1009                         (i, now, cutoffs[i]))