Byte/string handling fixes for Python 3.
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division, print_function, unicode_literals
30
31 import codecs
32 import hashlib
33 import itertools
34 import os
35 import re
36 import sqlite3
37 import sys
38 import tarfile
39 import tempfile
40 try:
41     import _thread
42 except ImportError:
43     import thread as _thread
44
45 import cumulus.store
46 import cumulus.store.file
47
48 if sys.version < "3":
49     StringTypes = (str, unicode)
50 else:
51     StringTypes = (str,)
52
53 # The largest supported snapshot format that can be understood.
54 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
55
56 # Maximum number of nested indirect references allowed in a snapshot.
57 MAX_RECURSION_DEPTH = 3
58
59 # All segments which have been accessed this session.
60 accessed_segments = set()
61
62 # Table of methods used to filter segments before storage, and corresponding
63 # filename extensions.  These are listed in priority order (methods earlier in
64 # the list are tried first).
65 SEGMENT_FILTERS = [
66     (".gpg", "cumulus-filter-gpg --decrypt"),
67     (".gz", "gzip -dc"),
68     (".bz2", "bzip2 -dc"),
69     ("", None),
70 ]
71
72 def to_lines(data):
73     """Decode binary data from a file into a sequence of lines.
74
75     Newline markers are retained."""
76     return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
77
78 def uri_decode(s):
79     """Decode a URI-encoded (%xx escapes) string."""
80     def hex_decode(m): return chr(int(m.group(1), 16))
81     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
82 def uri_encode(s):
83     """Encode a string to URI-encoded (%xx escapes) form."""
84     def hex_encode(c):
85         if c > '+' and c < '\x7f' and c != '@':
86             return c
87         else:
88             return "%%%02x" % (ord(c),)
89     return ''.join(hex_encode(c) for c in s)
90
91 class Struct:
92     """A class which merely acts as a data container.
93
94     Instances of this class (or its subclasses) are merely used to store data
95     in various attributes.  No methods are provided.
96     """
97
98     def __repr__(self):
99         return "<%s %s>" % (self.__class__, self.__dict__)
100
101 CHECKSUM_ALGORITHMS = {
102     'sha1': hashlib.sha1,
103     'sha224': hashlib.sha224,
104     'sha256': hashlib.sha256,
105 }
106
107 class ChecksumCreator:
108     """Compute a Cumulus checksum for provided data.
109
110     The algorithm used is selectable, but currently defaults to sha1.
111     """
112
113     def __init__(self, algorithm='sha1'):
114         self.algorithm = algorithm
115         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
116
117     def update(self, data):
118         self.hash.update(data)
119         return self
120
121     def compute(self):
122         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
123
124 class ChecksumVerifier:
125     """Verify whether a checksum from a snapshot matches the supplied data."""
126
127     def __init__(self, checksumstr):
128         """Create an object to check the supplied checksum."""
129
130         (algo, checksum) = checksumstr.split("=", 1)
131         self.checksum = checksum
132         self.hash = CHECKSUM_ALGORITHMS[algo]()
133
134     def update(self, data):
135         self.hash.update(data)
136
137     def valid(self):
138         """Return a boolean indicating whether the checksum matches."""
139
140         result = self.hash.hexdigest()
141         return result == self.checksum
142
143 class SearchPathEntry(object):
144     """Item representing a possible search location for Cumulus files.
145
146     Some Cumulus files might be stored in multiple possible file locations: due
147     to format (different compression mechanisms with different extensions),
148     locality (different segments might be placed in different directories to
149     control archiving policies), for backwards compatibility (default location
150     changed over time).  A SearchPathEntry describes a possible location for a
151     file.
152     """
153     def __init__(self, directory_prefix, suffix, context=None):
154         self._directory_prefix = directory_prefix
155         self._suffix = suffix
156         self._context = context
157
158     def __repr__(self):
159         return "%s(%r, %r, %r)" % (self.__class__.__name__,
160                                    self._directory_prefix, self._suffix,
161                                    self._context)
162
163     def build_path(self, basename):
164         """Construct the search path to use for a file with name basename.
165
166         Returns a tuple (pathname, context), where pathname is the path to try
167         and context is any additional data associated with this search entry
168         (if any).
169         """
170         return (os.path.join(self._directory_prefix, basename + self._suffix),
171                 self._context)
172
173 class SearchPath(object):
174     """A collection of locations to search for files and lookup utilities.
175
176     For looking for a file in a Cumulus storage backend, a SearchPath object
177     contains a list of possible locations to try.  A SearchPath can be used to
178     perform the search as well; when a file is found the search path ordering
179     is updated (moving the successful SearchPathEntry to the front of the list
180     for future searches).
181     """
182     def __init__(self, name_regex, searchpath):
183         self._regex = re.compile(name_regex)
184         self._path = list(searchpath)
185
186     def add_search_entry(self, entry):
187         self._path.append(entry)
188
189     def directories(self):
190         """Return the set of directories to search for a file type."""
191         return set(entry._directory_prefix for entry in self._path)
192
193     def get(self, backend, basename):
194         for (i, entry) in enumerate(self._path):
195             try:
196                 (pathname, context) = entry.build_path(basename)
197                 fp = backend.get(pathname)
198                 # On success, move this entry to the front of the search path
199                 # to speed future searches.
200                 if i > 0:
201                     self._path.pop(i)
202                     self._path.insert(0, entry)
203                 return (fp, pathname, context)
204             except cumulus.store.NotFoundError:
205                 continue
206         raise cumulus.store.NotFoundError(basename)
207
208     def stat(self, backend, basename):
209         for (i, entry) in enumerate(self._path):
210             try:
211                 (pathname, context) = entry.build_path(basename)
212                 stat_data = backend.stat(pathname)
213                 # On success, move this entry to the front of the search path
214                 # to speed future searches.
215                 if i > 0:
216                     self._path.pop(i)
217                     self._path.insert(0, entry)
218                 result = {"path": pathname}
219                 result.update(stat_data)
220                 return result
221             except cumulus.store.NotFoundError:
222                 continue
223         raise cumulus.store.NotFoundError(basename)
224
225     def match(self, filename):
226         return self._regex.match(filename)
227
228     def list(self, backend):
229         success = False
230         for d in self.directories():
231             try:
232                 for f in backend.list(d):
233                     success = True
234                     m = self.match(f)
235                     if m: yield (os.path.join(d, f), m)
236             except cumulus.store.NotFoundError:
237                 pass
238         if not success:
239             raise cumulus.store.NotFoundError(backend)
240
241 def _build_segments_searchpath(prefix):
242     for (extension, filter) in SEGMENT_FILTERS:
243         yield SearchPathEntry(prefix, extension, filter)
244
245 SEARCH_PATHS = {
246     "checksums": SearchPath(
247         r"^snapshot-(.*)\.(\w+)sums$",
248         [SearchPathEntry("meta", ".sha1sums"),
249          SearchPathEntry("checksums", ".sha1sums"),
250          SearchPathEntry("", ".sha1sums")]),
251     "meta": SearchPath(
252         r"^snapshot-(.*)\.meta(\.\S+)?$",
253         _build_segments_searchpath("meta")),
254     "segments": SearchPath(
255         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
256          r"\.tar(\.\S+)?$"),
257         itertools.chain(
258             _build_segments_searchpath("segments0"),
259             _build_segments_searchpath("segments1"),
260             _build_segments_searchpath(""),
261             _build_segments_searchpath("segments"))),
262     "snapshots": SearchPath(
263         r"^snapshot-(.*)\.(cumulus|lbs)$",
264         [SearchPathEntry("snapshots", ".cumulus"),
265          SearchPathEntry("snapshots", ".lbs"),
266          SearchPathEntry("", ".cumulus"),
267          SearchPathEntry("", ".lbs")]),
268 }
269
270 class BackendWrapper(object):
271     """Wrapper around a Cumulus storage backend that understands file types.
272
273     The BackendWrapper class understands different Cumulus file types, such as
274     snapshots and segments, and implements higher-level operations such as
275     "retrieve a snapshot with a specific name" (hiding operations such as
276     searching for the correct file name).
277     """
278
279     def __init__(self, backend):
280         """Initializes a wrapper around the specified storage backend.
281
282         store may either be a Store object or URL.
283         """
284         if type(backend) in StringTypes:
285             self._backend = cumulus.store.open(backend)
286         else:
287             self._backend = backend
288
289     @property
290     def raw_backend(self):
291         return self._backend
292
293     def stat_generic(self, basename, filetype):
294         return SEARCH_PATHS[filetype].stat(self._backend, basename)
295
296     def open_generic(self, basename, filetype):
297         return SEARCH_PATHS[filetype].get(self._backend, basename)
298
299     def open_snapshot(self, name):
300         return self.open_generic("snapshot-" + name, "snapshots")
301
302     def open_segment(self, name):
303         return self.open_generic(name + ".tar", "segments")
304
305     def list_generic(self, filetype):
306         return ((x[1].group(1), x[0])
307                 for x in SEARCH_PATHS[filetype].list(self._backend))
308
309     def prefetch_generic(self):
310         """Calls scan on directories to prefetch file metadata."""
311         directories = set()
312         for typeinfo in SEARCH_PATHS.values():
313             directories.update(typeinfo.directories())
314         for d in directories:
315             print("Prefetch", d)
316             self._backend.scan(d)
317
318 class CumulusStore:
319     def __init__(self, backend):
320         if isinstance(backend, BackendWrapper):
321             self.backend = backend
322         else:
323             self.backend = BackendWrapper(backend)
324         self.cachedir = None
325         self.CACHE_SIZE = 16
326         self._lru_list = []
327
328     def get_cachedir(self):
329         if self.cachedir is None:
330             self.cachedir = tempfile.mkdtemp("-cumulus")
331         return self.cachedir
332
333     def cleanup(self):
334         if self.cachedir is not None:
335             # TODO: Avoid use of system, make this safer
336             os.system("rm -rf " + self.cachedir)
337         self.cachedir = None
338
339     @staticmethod
340     def parse_ref(refstr):
341         m = re.match(r"^zero\[(\d+)\]$", refstr)
342         if m:
343             return ("zero", None, None, (0, int(m.group(1)), False))
344
345         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
346         if not m: return
347
348         segment = m.group(1)
349         object = m.group(2)
350         checksum = m.group(3)
351         slice = m.group(4)
352
353         if checksum is not None:
354             checksum = checksum.lstrip("(").rstrip(")")
355
356         if slice is not None:
357             if m.group(9) is not None:
358                 # Size-assertion slice
359                 slice = (0, int(m.group(9)), True)
360             elif m.group(6) is None:
361                 # Abbreviated slice
362                 slice = (0, int(m.group(8)), False)
363             else:
364                 slice = (int(m.group(7)), int(m.group(8)), False)
365
366         return (segment, object, checksum, slice)
367
368     def list_snapshots(self):
369         return set(x[0] for x in self.backend.list_generic("snapshots"))
370
371     def list_segments(self):
372         return set(x[0] for x in self.backend.list_generic("segments"))
373
374     def load_snapshot(self, snapshot):
375         snapshot_file = self.backend.open_snapshot(snapshot)[0]
376         return to_lines(snapshot_file.read())
377
378     @staticmethod
379     def filter_data(filehandle, filter_cmd):
380         if filter_cmd is None:
381             return filehandle
382         (input, output) = os.popen2(filter_cmd)
383         def copy_thread(src, dst):
384             BLOCK_SIZE = 4096
385             while True:
386                 block = src.read(BLOCK_SIZE)
387                 if len(block) == 0: break
388                 dst.write(block)
389             src.close()
390             dst.close()
391         _thread.start_new_thread(copy_thread, (filehandle, input))
392         return output
393
394     def get_segment(self, segment):
395         accessed_segments.add(segment)
396
397         (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
398         return self.filter_data(segment_fp, filter_cmd)
399
400     def load_segment(self, segment):
401         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
402         for item in seg:
403             data_obj = seg.extractfile(item)
404             path = item.name.split('/')
405             if len(path) == 2 and path[0] == segment:
406                 yield (path[1], data_obj.read())
407
408     def extract_segment(self, segment):
409         segdir = os.path.join(self.get_cachedir(), segment)
410         os.mkdir(segdir)
411         for (object, data) in self.load_segment(segment):
412             f = open(os.path.join(segdir, object), 'wb')
413             f.write(data)
414             f.close()
415
416     def load_object(self, segment, object):
417         accessed_segments.add(segment)
418         path = os.path.join(self.get_cachedir(), segment, object)
419         if not os.access(path, os.R_OK):
420             self.extract_segment(segment)
421         if segment in self._lru_list: self._lru_list.remove(segment)
422         self._lru_list.append(segment)
423         while len(self._lru_list) > self.CACHE_SIZE:
424             os.system("rm -rf " + os.path.join(self.cachedir,
425                                                self._lru_list[0]))
426             self._lru_list = self._lru_list[1:]
427         return open(path, 'rb').read()
428
429     def get(self, refstr):
430         """Fetch the given object and return it.
431
432         The input should be an object reference, in string form.
433         """
434
435         (segment, object, checksum, slice) = self.parse_ref(refstr)
436
437         if segment == "zero":
438             return "\0" * slice[1]
439
440         data = self.load_object(segment, object)
441
442         if checksum is not None:
443             verifier = ChecksumVerifier(checksum)
444             verifier.update(data)
445             if not verifier.valid():
446                 raise ValueError
447
448         if slice is not None:
449             (start, length, exact) = slice
450             if exact and len(data) != length: raise ValueError
451             data = data[start:start+length]
452             if len(data) != length: raise IndexError
453
454         return data
455
456     def prefetch(self):
457         self.backend.prefetch_generic()
458
459 def parse(lines, terminate=None):
460     """Generic parser for RFC822-style "Key: Value" data streams.
461
462     This parser can be used to read metadata logs and snapshot root descriptor
463     files.
464
465     lines must be an iterable object which yields a sequence of lines of input.
466
467     If terminate is specified, it is used as a predicate to determine when to
468     stop reading input lines.
469     """
470
471     dict = {}
472     last_key = None
473
474     for l in lines:
475         # Strip off a trailing newline, if present
476         if len(l) > 0 and l[-1] == "\n":
477             l = l[:-1]
478
479         if terminate is not None and terminate(l):
480             if len(dict) > 0: yield dict
481             dict = {}
482             last_key = None
483             continue
484
485         m = re.match(r"^([-\w]+):\s*(.*)$", l)
486         if m:
487             dict[m.group(1)] = m.group(2)
488             last_key = m.group(1)
489         elif len(l) > 0 and l[0].isspace() and last_key is not None:
490             dict[last_key] += l
491         else:
492             last_key = None
493
494     if len(dict) > 0: yield dict
495
496 def parse_full(lines):
497     try:
498         return next(parse(lines))
499     except StopIteration:
500         return {}
501
502 def parse_metadata_version(s):
503     """Convert a string with the snapshot version format to a tuple."""
504
505     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
506     if m is None:
507         return ()
508     else:
509         return tuple([int(d) for d in m.group(1).split(".")])
510
511 def read_metadata(object_store, root):
512     """Iterate through all lines in the metadata log, following references."""
513
514     # Stack for keeping track of recursion when following references to
515     # portions of the log.  The last entry in the stack corresponds to the
516     # object currently being parsed.  Each entry is a list of lines which have
517     # been reversed, so that popping successive lines from the end of each list
518     # will return lines of the metadata log in order.
519     stack = []
520
521     def follow_ref(refstr):
522         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
523         lines = to_lines(object_store.get(refstr))
524         lines.reverse()
525         stack.append(lines)
526
527     follow_ref(root)
528
529     while len(stack) > 0:
530         top = stack[-1]
531         if len(top) == 0:
532             stack.pop()
533             continue
534         line = top.pop()
535
536         # An indirect reference which we must follow?
537         if len(line) > 0 and line[0] == '@':
538             ref = line[1:]
539             ref.strip()
540             follow_ref(ref)
541         else:
542             yield line
543
544 class MetadataItem:
545     """Metadata for a single file (or directory or...) from a snapshot."""
546
547     # Functions for parsing various datatypes that can appear in a metadata log
548     # item.
549     @staticmethod
550     def decode_int(s):
551         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
552         if s.startswith("0x"):
553             return int(s, 16)
554         elif s.startswith("0"):
555             return int(s, 8)
556         else:
557             return int(s, 10)
558
559     @staticmethod
560     def decode_str(s):
561         """Decode a URI-encoded (%xx escapes) string."""
562         return uri_decode(s)
563
564     @staticmethod
565     def raw_str(s):
566         """An unecoded string."""
567         return s
568
569     @staticmethod
570     def decode_user(s):
571         """Decode a user/group to a tuple of uid/gid followed by name."""
572         items = s.split()
573         uid = MetadataItem.decode_int(items[0])
574         name = None
575         if len(items) > 1:
576             if items[1].startswith("(") and items[1].endswith(")"):
577                 name = MetadataItem.decode_str(items[1][1:-1])
578         return (uid, name)
579
580     @staticmethod
581     def decode_device(s):
582         """Decode a device major/minor number."""
583         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
584         return (major, minor)
585
586     class Items: pass
587
588     def __init__(self, fields, object_store):
589         """Initialize from a dictionary of key/value pairs from metadata log."""
590
591         self.fields = fields
592         self.object_store = object_store
593         self.keys = []
594         self.items = self.Items()
595         for (k, v) in fields.items():
596             if k in self.field_types:
597                 decoder = self.field_types[k]
598                 setattr(self.items, k, decoder(v))
599                 self.keys.append(k)
600
601     def data(self):
602         """Return an iterator for the data blocks that make up a file."""
603
604         # This traverses the list of blocks that make up a file, following
605         # indirect references.  It is implemented in much the same way as
606         # read_metadata, so see that function for details of the technique.
607
608         objects = self.fields['data'].split()
609         objects.reverse()
610         stack = [objects]
611
612         def follow_ref(refstr):
613             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
614             objects = self.object_store.get(refstr).split()
615             objects.reverse()
616             stack.append(objects)
617
618         while len(stack) > 0:
619             top = stack[-1]
620             if len(top) == 0:
621                 stack.pop()
622                 continue
623             ref = top.pop()
624
625             # An indirect reference which we must follow?
626             if len(ref) > 0 and ref[0] == '@':
627                 follow_ref(ref[1:])
628             else:
629                 yield ref
630
631 # Description of fields that might appear, and how they should be parsed.
632 MetadataItem.field_types = {
633     'name': MetadataItem.decode_str,
634     'type': MetadataItem.raw_str,
635     'mode': MetadataItem.decode_int,
636     'device': MetadataItem.decode_device,
637     'user': MetadataItem.decode_user,
638     'group': MetadataItem.decode_user,
639     'ctime': MetadataItem.decode_int,
640     'mtime': MetadataItem.decode_int,
641     'links': MetadataItem.decode_int,
642     'inode': MetadataItem.raw_str,
643     'checksum': MetadataItem.decode_str,
644     'size': MetadataItem.decode_int,
645     'contents': MetadataItem.decode_str,
646     'target': MetadataItem.decode_str,
647 }
648
649 def iterate_metadata(object_store, root):
650     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
651         yield MetadataItem(d, object_store)
652
653 class LocalDatabase:
654     """Access to the local database of snapshot contents and object checksums.
655
656     The local database is consulted when creating a snapshot to determine what
657     data can be re-used from old snapshots.  Segment cleaning is performed by
658     manipulating the data in the local database; the local database also
659     includes enough data to guide the segment cleaning process.
660     """
661
662     def __init__(self, path, dbname="localdb.sqlite"):
663         self.db_connection = sqlite3.connect(path + "/" + dbname)
664
665     # Low-level database access.  Use these methods when there isn't a
666     # higher-level interface available.  Exception: do, however, remember to
667     # use the commit() method after making changes to make sure they are
668     # actually saved, even when going through higher-level interfaces.
669     def commit(self):
670         "Commit any pending changes to the local database."
671         self.db_connection.commit()
672
673     def rollback(self):
674         "Roll back any pending changes to the local database."
675         self.db_connection.rollback()
676
677     def cursor(self):
678         "Return a DB-API cursor for directly accessing the local database."
679         return self.db_connection.cursor()
680
681     def list_schemes(self):
682         """Return the list of snapshots found in the local database.
683
684         The returned value is a list of tuples (id, scheme, name, time, intent).
685         """
686
687         cur = self.cursor()
688         cur.execute("select distinct scheme from snapshots")
689         schemes = [row[0] for row in cur.fetchall()]
690         schemes.sort()
691         return schemes
692
693     def list_snapshots(self, scheme):
694         """Return a list of snapshots for the given scheme."""
695         cur = self.cursor()
696         cur.execute("select name from snapshots")
697         snapshots = [row[0] for row in cur.fetchall()]
698         snapshots.sort()
699         return snapshots
700
701     def delete_snapshot(self, scheme, name):
702         """Remove the specified snapshot from the database.
703
704         Warning: This does not garbage collect all dependent data in the
705         database, so it must be followed by a call to garbage_collect() to make
706         the database consistent.
707         """
708         cur = self.cursor()
709         cur.execute("delete from snapshots where scheme = ? and name = ?",
710                     (scheme, name))
711
712     def prune_old_snapshots(self, scheme, intent=1.0):
713         """Delete entries from old snapshots from the database.
714
715         Only snapshots with the specified scheme name will be deleted.  If
716         intent is given, it gives the intended next snapshot type, to determine
717         how aggressively to clean (for example, intent=7 could be used if the
718         next snapshot will be a weekly snapshot).
719         """
720
721         cur = self.cursor()
722
723         # Find the id of the last snapshot to be created.  This is used for
724         # measuring time in a way: we record this value in each segment we
725         # expire on this run, and then on a future run can tell if there have
726         # been intervening backups made.
727         cur.execute("select max(snapshotid) from snapshots")
728         last_snapshotid = cur.fetchone()[0]
729
730         # Get the list of old snapshots for this scheme.  Delete all the old
731         # ones.  Rules for what to keep:
732         #   - Always keep the most recent snapshot.
733         #   - If snapshot X is younger than Y, and X has higher intent, then Y
734         #     can be deleted.
735         cur.execute("""select snapshotid, name, intent,
736                               julianday('now') - timestamp as age
737                        from snapshots where scheme = ?
738                        order by age""", (scheme,))
739
740         first = True
741         max_intent = intent
742         for (id, name, snap_intent, snap_age) in cur.fetchall():
743             can_delete = False
744             if snap_intent < max_intent:
745                 # Delete small-intent snapshots if there is a more recent
746                 # large-intent snapshot.
747                 can_delete = True
748             elif snap_intent == intent:
749                 # Delete previous snapshots with the specified intent level.
750                 can_delete = True
751
752             if can_delete and not first:
753                 print("Delete snapshot %d (%s)" % (id, name))
754                 cur.execute("delete from snapshots where snapshotid = ?",
755                             (id,))
756             first = False
757             max_intent = max(max_intent, snap_intent)
758
759         self.garbage_collect()
760
761     def garbage_collect(self):
762         """Garbage-collect unreachable segment and object data.
763
764         Remove all segments and checksums which is not reachable from the
765         current set of snapshots stored in the local database.
766         """
767         cur = self.cursor()
768
769         # Delete entries in the segment_utilization table which are for
770         # non-existent snapshots.
771         cur.execute("""delete from segment_utilization
772                        where snapshotid not in
773                            (select snapshotid from snapshots)""")
774
775         # Delete segments not referenced by any current snapshots.
776         cur.execute("""delete from segments where segmentid not in
777                            (select segmentid from segment_utilization)""")
778
779         # Delete dangling objects in the block_index table.
780         cur.execute("""delete from block_index
781                        where segmentid not in
782                            (select segmentid from segments)""")
783
784         # Remove sub-block signatures for deleted objects.
785         cur.execute("""delete from subblock_signatures
786                        where blockid not in
787                            (select blockid from block_index)""")
788
789     # Segment cleaning.
790     class SegmentInfo(Struct): pass
791
792     def get_segment_cleaning_list(self, age_boost=0.0):
793         """Return a list of all current segments with information for cleaning.
794
795         Return all segments which are currently known in the local database
796         (there might be other, older segments in the archive itself), and
797         return usage statistics for each to help decide which segments to
798         clean.
799
800         The returned list will be sorted by estimated cleaning benefit, with
801         segments that are best to clean at the start of the list.
802
803         If specified, the age_boost parameter (measured in days) will added to
804         the age of each segment, as a way of adjusting the benefit computation
805         before a long-lived snapshot is taken (for example, age_boost might be
806         set to 7 when cleaning prior to taking a weekly snapshot).
807         """
808
809         cur = self.cursor()
810         segments = []
811         cur.execute("""select segmentid, used, size, mtime,
812                        julianday('now') - mtime as age from segment_info
813                        where expire_time is null""")
814         for row in cur:
815             info = self.SegmentInfo()
816             info.id = row[0]
817             info.used_bytes = row[1]
818             info.size_bytes = row[2]
819             info.mtime = row[3]
820             info.age_days = row[4]
821
822             # If data is not available for whatever reason, treat it as 0.0.
823             if info.age_days is None:
824                 info.age_days = 0.0
825             if info.used_bytes is None:
826                 info.used_bytes = 0.0
827
828             # Benefit calculation: u is the estimated fraction of each segment
829             # which is utilized (bytes belonging to objects still in use
830             # divided by total size; this doesn't take compression or storage
831             # overhead into account, but should give a reasonable estimate).
832             #
833             # The total benefit is a heuristic that combines several factors:
834             # the amount of space that can be reclaimed (1 - u), an ageing
835             # factor (info.age_days) that favors cleaning old segments to young
836             # ones and also is more likely to clean segments that will be
837             # rewritten for long-lived snapshots (age_boost), and finally a
838             # penalty factor for the cost of re-uploading data (u + 0.1).
839             u = info.used_bytes / info.size_bytes
840             info.cleaning_benefit \
841                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
842
843             segments.append(info)
844
845         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
846         return segments
847
848     def mark_segment_expired(self, segment):
849         """Mark a segment for cleaning in the local database.
850
851         The segment parameter should be either a SegmentInfo object or an
852         integer segment id.  Objects in the given segment will be marked as
853         expired, which means that any future snapshots that would re-use those
854         objects will instead write out a new copy of the object, and thus no
855         future snapshots will depend upon the given segment.
856         """
857
858         if isinstance(segment, int):
859             id = segment
860         elif isinstance(segment, self.SegmentInfo):
861             id = segment.id
862         else:
863             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
864
865         cur = self.cursor()
866         cur.execute("select max(snapshotid) from snapshots")
867         last_snapshotid = cur.fetchone()[0]
868         cur.execute("update segments set expire_time = ? where segmentid = ?",
869                     (last_snapshotid, id))
870         cur.execute("update block_index set expired = 0 where segmentid = ?",
871                     (id,))
872
873     def balance_expired_objects(self):
874         """Analyze expired objects in segments to be cleaned and group by age.
875
876         Update the block_index table of the local database to group expired
877         objects by age.  The exact number of buckets and the cutoffs for each
878         are dynamically determined.  Calling this function after marking
879         segments expired will help in the segment cleaning process, by ensuring
880         that when active objects from clean segments are rewritten, they will
881         be placed into new segments roughly grouped by age.
882         """
883
884         # The expired column of the block_index table is used when generating a
885         # new Cumulus snapshot.  A null value indicates that an object may be
886         # re-used.  Otherwise, an object must be written into a new segment if
887         # needed.  Objects with distinct expired values will be written into
888         # distinct segments, to allow for some grouping by age.  The value 0 is
889         # somewhat special in that it indicates any rewritten objects can be
890         # placed in the same segment as completely new objects; this can be
891         # used for very young objects which have been expired, or objects not
892         # expected to be encountered.
893         #
894         # In the balancing process, all objects which are not used in any
895         # current snapshots will have expired set to 0.  Objects which have
896         # been seen will be sorted by age and will have expired values set to
897         # 0, 1, 2, and so on based on age (with younger objects being assigned
898         # lower values).  The number of buckets and the age cutoffs is
899         # determined by looking at the distribution of block ages.
900
901         cur = self.cursor()
902
903         # Mark all expired objects with expired = 0; these objects will later
904         # have values set to indicate groupings of objects when repacking.
905         cur.execute("""update block_index set expired = 0
906                        where expired is not null""")
907
908         # We will want to aim for at least one full segment for each bucket
909         # that we eventually create, but don't know how many bytes that should
910         # be due to compression.  So compute the average number of bytes in
911         # each expired segment as a rough estimate for the minimum size of each
912         # bucket.  (This estimate could be thrown off by many not-fully-packed
913         # segments, but for now don't worry too much about that.)  If we can't
914         # compute an average, it's probably because there are no expired
915         # segments, so we have no more work to do.
916         cur.execute("""select avg(size) from segments
917                        where segmentid in
918                            (select distinct segmentid from block_index
919                             where expired is not null)""")
920         segment_size_estimate = cur.fetchone()[0]
921         if not segment_size_estimate:
922             return
923
924         # Next, extract distribution of expired objects (number and size) by
925         # age.  Save the timestamp for "now" so that the classification of
926         # blocks into age buckets will not change later in the function, after
927         # time has passed.  Set any timestamps in the future to now, so we are
928         # guaranteed that for the rest of this function, age is always
929         # non-negative.
930         cur.execute("select julianday('now')")
931         now = cur.fetchone()[0]
932
933         cur.execute("""update block_index set timestamp = ?
934                        where timestamp > ? and expired is not null""",
935                     (now, now))
936
937         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
938                        from block_index where expired = 0
939                        group by age order by age""", (now,))
940         distribution = cur.fetchall()
941
942         # Start to determine the buckets for expired objects.  Heuristics used:
943         #   - An upper bound on the number of buckets is given by the number of
944         #     segments we estimate it will take to store all data.  In fact,
945         #     aim for a couple of segments per bucket.
946         #   - Place very young objects in bucket 0 (place with new objects)
947         #     unless there are enough of them to warrant a separate bucket.
948         #   - Try not to create unnecessarily many buckets, since fewer buckets
949         #     will allow repacked data to be grouped based on spatial locality
950         #     (while more buckets will group by temporal locality).  We want a
951         #     balance.
952         MIN_AGE = 4
953         total_bytes = sum([i[2] for i in distribution])
954         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
955         min_size = 1.5 * segment_size_estimate
956         target_size = max(2 * segment_size_estimate,
957                           total_bytes / target_buckets)
958
959         print("segment_size:", segment_size_estimate)
960         print("distribution:", distribution)
961         print("total_bytes:", total_bytes)
962         print("target_buckets:", target_buckets)
963         print("min, target size:", min_size, target_size)
964
965         # Chosen cutoffs.  Each bucket consists of objects with age greater
966         # than one cutoff value, but not greater than the next largest cutoff.
967         cutoffs = []
968
969         # Starting with the oldest objects, begin grouping together into
970         # buckets of size at least target_size bytes.
971         distribution.reverse()
972         bucket_size = 0
973         min_age_bucket = False
974         for (age, items, size) in distribution:
975             if bucket_size >= target_size \
976                 or (age < MIN_AGE and not min_age_bucket):
977                 if bucket_size < target_size and len(cutoffs) > 0:
978                     cutoffs.pop()
979                 cutoffs.append(age)
980                 bucket_size = 0
981
982             bucket_size += size
983             if age < MIN_AGE:
984                 min_age_bucket = True
985
986         # The last (youngest) bucket will be group 0, unless it has enough data
987         # to be of size min_size by itself, or there happen to be no objects
988         # less than MIN_AGE at all.
989         if bucket_size >= min_size or not min_age_bucket:
990             cutoffs.append(-1)
991         cutoffs.append(-1)
992
993         print("cutoffs:", cutoffs)
994
995         # Update the database to assign each object to the appropriate bucket.
996         cutoffs.reverse()
997         for i in range(len(cutoffs)):
998             cur.execute("""update block_index set expired = ?
999                            where round(? - timestamp) > ?
1000                              and expired is not null""",
1001                         (i, now, cutoffs[i]))