Rework uri_encode/uri_decode to more cleanly work with bytes/strings.
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division, print_function, unicode_literals
30
31 import codecs
32 import hashlib
33 import itertools
34 import os
35 import posixpath
36 import re
37 import six
38 import sqlite3
39 import subprocess
40 import sys
41 import tarfile
42 import tempfile
43 try:
44     import _thread
45 except ImportError:
46     import thread as _thread
47
48 import cumulus.store
49 import cumulus.store.file
50 import cumulus.util
51
52 # The largest supported snapshot format that can be understood.
53 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
54
55 # Maximum number of nested indirect references allowed in a snapshot.
56 MAX_RECURSION_DEPTH = 3
57
58 # All segments which have been accessed this session.
59 accessed_segments = set()
60
61 # Table of methods used to filter segments before storage, and corresponding
62 # filename extensions.  These are listed in priority order (methods earlier in
63 # the list are tried first).
64 SEGMENT_FILTERS = [
65     (".gpg", "cumulus-filter-gpg --decrypt"),
66     (".gz", "gzip -dc"),
67     (".bz2", "bzip2 -dc"),
68     ("", None),
69 ]
70
71 def to_lines(data):
72     """Decode binary data from a file into a sequence of lines.
73
74     Newline markers are retained."""
75     return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
76
77 class Struct:
78     """A class which merely acts as a data container.
79
80     Instances of this class (or its subclasses) are merely used to store data
81     in various attributes.  No methods are provided.
82     """
83
84     def __repr__(self):
85         return "<%s %s>" % (self.__class__, self.__dict__)
86
87 CHECKSUM_ALGORITHMS = {
88     'sha1': hashlib.sha1,
89     'sha224': hashlib.sha224,
90     'sha256': hashlib.sha256,
91 }
92
93 class ChecksumCreator:
94     """Compute a Cumulus checksum for provided data.
95
96     The algorithm used is selectable, but currently defaults to sha1.
97     """
98
99     def __init__(self, algorithm='sha1'):
100         self.algorithm = algorithm
101         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
102
103     def update(self, data):
104         self.hash.update(data)
105         return self
106
107     def compute(self):
108         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
109
110 class ChecksumVerifier:
111     """Verify whether a checksum from a snapshot matches the supplied data."""
112
113     def __init__(self, checksumstr):
114         """Create an object to check the supplied checksum."""
115
116         (algo, checksum) = checksumstr.split("=", 1)
117         self.checksum = checksum
118         self.hash = CHECKSUM_ALGORITHMS[algo]()
119
120     def update(self, data):
121         self.hash.update(data)
122
123     def valid(self):
124         """Return a boolean indicating whether the checksum matches."""
125
126         result = self.hash.hexdigest()
127         return result == self.checksum
128
129 class SearchPathEntry(object):
130     """Item representing a possible search location for Cumulus files.
131
132     Some Cumulus files might be stored in multiple possible file locations: due
133     to format (different compression mechanisms with different extensions),
134     locality (different segments might be placed in different directories to
135     control archiving policies), for backwards compatibility (default location
136     changed over time).  A SearchPathEntry describes a possible location for a
137     file.
138     """
139     def __init__(self, directory_prefix, suffix, context=None):
140         self._directory_prefix = directory_prefix
141         self._suffix = suffix
142         self._context = context
143
144     def __repr__(self):
145         return "%s(%r, %r, %r)" % (self.__class__.__name__,
146                                    self._directory_prefix, self._suffix,
147                                    self._context)
148
149     def build_path(self, basename):
150         """Construct the search path to use for a file with name basename.
151
152         Returns a tuple (pathname, context), where pathname is the path to try
153         and context is any additional data associated with this search entry
154         (if any).
155         """
156         return (posixpath.join(self._directory_prefix, basename + self._suffix),
157                 self._context)
158
159 class SearchPath(object):
160     """A collection of locations to search for files and lookup utilities.
161
162     For looking for a file in a Cumulus storage backend, a SearchPath object
163     contains a list of possible locations to try.  A SearchPath can be used to
164     perform the search as well; when a file is found the search path ordering
165     is updated (moving the successful SearchPathEntry to the front of the list
166     for future searches).
167     """
168     def __init__(self, name_regex, searchpath):
169         self._regex = re.compile(name_regex)
170         self._path = list(searchpath)
171
172     def add_search_entry(self, entry):
173         self._path.append(entry)
174
175     def directories(self):
176         """Return the set of directories to search for a file type."""
177         return set(entry._directory_prefix for entry in self._path)
178
179     def get(self, backend, basename):
180         for (i, entry) in enumerate(self._path):
181             try:
182                 (pathname, context) = entry.build_path(basename)
183                 fp = backend.get(pathname)
184                 # On success, move this entry to the front of the search path
185                 # to speed future searches.
186                 if i > 0:
187                     self._path.pop(i)
188                     self._path.insert(0, entry)
189                 return (fp, pathname, context)
190             except cumulus.store.NotFoundError:
191                 continue
192         raise cumulus.store.NotFoundError(basename)
193
194     def stat(self, backend, basename):
195         for (i, entry) in enumerate(self._path):
196             try:
197                 (pathname, context) = entry.build_path(basename)
198                 stat_data = backend.stat(pathname)
199                 # On success, move this entry to the front of the search path
200                 # to speed future searches.
201                 if i > 0:
202                     self._path.pop(i)
203                     self._path.insert(0, entry)
204                 result = {"path": pathname}
205                 result.update(stat_data)
206                 return result
207             except cumulus.store.NotFoundError:
208                 continue
209         raise cumulus.store.NotFoundError(basename)
210
211     def match(self, filename):
212         return self._regex.match(filename)
213
214     def list(self, backend):
215         success = False
216         for d in self.directories():
217             try:
218                 for f in backend.list(d):
219                     success = True
220                     m = self.match(f)
221                     if m: yield (posixpath.join(d, f), m)
222             except cumulus.store.NotFoundError:
223                 pass
224         if not success:
225             raise cumulus.store.NotFoundError(backend)
226
227 def _build_segments_searchpath(prefix):
228     for (extension, filter) in SEGMENT_FILTERS:
229         yield SearchPathEntry(prefix, extension, filter)
230
231 SEARCH_PATHS = {
232     "checksums": SearchPath(
233         r"^snapshot-(.*)\.(\w+)sums$",
234         [SearchPathEntry("meta", ".sha1sums"),
235          SearchPathEntry("checksums", ".sha1sums"),
236          SearchPathEntry("", ".sha1sums")]),
237     "meta": SearchPath(
238         r"^snapshot-(.*)\.meta(\.\S+)?$",
239         _build_segments_searchpath("meta")),
240     "segments": SearchPath(
241         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
242          r"\.tar(\.\S+)?$"),
243         itertools.chain(
244             _build_segments_searchpath("segments0"),
245             _build_segments_searchpath("segments1"),
246             _build_segments_searchpath(""),
247             _build_segments_searchpath("segments"))),
248     "snapshots": SearchPath(
249         r"^snapshot-(.*)\.(cumulus|lbs)$",
250         [SearchPathEntry("snapshots", ".cumulus"),
251          SearchPathEntry("snapshots", ".lbs"),
252          SearchPathEntry("", ".cumulus"),
253          SearchPathEntry("", ".lbs")]),
254 }
255
256 class BackendWrapper(object):
257     """Wrapper around a Cumulus storage backend that understands file types.
258
259     The BackendWrapper class understands different Cumulus file types, such as
260     snapshots and segments, and implements higher-level operations such as
261     "retrieve a snapshot with a specific name" (hiding operations such as
262     searching for the correct file name).
263     """
264
265     def __init__(self, backend):
266         """Initializes a wrapper around the specified storage backend.
267
268         store may either be a Store object or URL.
269         """
270         if isinstance(backend, six.string_types):
271             self._backend = cumulus.store.open(backend)
272         else:
273             self._backend = backend
274
275     @property
276     def raw_backend(self):
277         return self._backend
278
279     def stat_generic(self, basename, filetype):
280         return SEARCH_PATHS[filetype].stat(self._backend, basename)
281
282     def open_generic(self, basename, filetype):
283         return SEARCH_PATHS[filetype].get(self._backend, basename)
284
285     def open_snapshot(self, name):
286         return self.open_generic("snapshot-" + name, "snapshots")
287
288     def open_segment(self, name):
289         return self.open_generic(name + ".tar", "segments")
290
291     def list_generic(self, filetype):
292         return ((x[1].group(1), x[0])
293                 for x in SEARCH_PATHS[filetype].list(self._backend))
294
295     def prefetch_generic(self):
296         """Calls scan on directories to prefetch file metadata."""
297         directories = set()
298         for typeinfo in SEARCH_PATHS.values():
299             directories.update(typeinfo.directories())
300         for d in directories:
301             print("Prefetch", d)
302             self._backend.scan(d)
303
304 class CumulusStore:
305     def __init__(self, backend):
306         if isinstance(backend, BackendWrapper):
307             self.backend = backend
308         else:
309             self.backend = BackendWrapper(backend)
310         self.cachedir = None
311         self.CACHE_SIZE = 16
312         self._lru_list = []
313
314     def get_cachedir(self):
315         if self.cachedir is None:
316             self.cachedir = tempfile.mkdtemp("-cumulus")
317         return self.cachedir
318
319     def cleanup(self):
320         if self.cachedir is not None:
321             # TODO: Avoid use of system, make this safer
322             os.system("rm -rf " + self.cachedir)
323         self.cachedir = None
324
325     @staticmethod
326     def parse_ref(refstr):
327         m = re.match(r"^zero\[(\d+)\]$", refstr)
328         if m:
329             return ("zero", None, None, (0, int(m.group(1)), False))
330
331         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(=?(\d+)|(\d+)\+(\d+))\])?$", refstr)
332         if not m: return
333
334         segment = m.group(1)
335         object = m.group(2)
336         checksum = m.group(3)
337         slice = m.group(4)
338
339         if checksum is not None:
340             checksum = checksum.lstrip("(").rstrip(")")
341
342         if slice is not None:
343             if m.group(6) is not None:
344                 # Size-assertion slice
345                 slice = (0, int(m.group(6)), True)
346             else:
347                 slice = (int(m.group(7)), int(m.group(8)), False)
348
349         return (segment, object, checksum, slice)
350
351     def list_snapshots(self):
352         return set(x[0] for x in self.backend.list_generic("snapshots"))
353
354     def list_segments(self):
355         return set(x[0] for x in self.backend.list_generic("segments"))
356
357     def load_snapshot(self, snapshot):
358         snapshot_file = self.backend.open_snapshot(snapshot)[0]
359         return to_lines(snapshot_file.read())
360
361     @staticmethod
362     def filter_data(filehandle, filter_cmd):
363         if filter_cmd is None:
364             return filehandle
365         p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
366                              stdout=subprocess.PIPE, close_fds=True)
367         input, output = p.stdin, p.stdout
368         def copy_thread(src, dst):
369             BLOCK_SIZE = 4096
370             while True:
371                 block = src.read(BLOCK_SIZE)
372                 if len(block) == 0: break
373                 dst.write(block)
374             src.close()
375             dst.close()
376             p.wait()
377         _thread.start_new_thread(copy_thread, (filehandle, input))
378         return output
379
380     def get_segment(self, segment):
381         accessed_segments.add(segment)
382
383         (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
384         return self.filter_data(segment_fp, filter_cmd)
385
386     def load_segment(self, segment):
387         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
388         for item in seg:
389             data_obj = seg.extractfile(item)
390             path = item.name.split('/')
391             if len(path) == 2 and path[0] == segment:
392                 yield (path[1], data_obj.read())
393
394     def extract_segment(self, segment):
395         segdir = os.path.join(self.get_cachedir(), segment)
396         os.mkdir(segdir)
397         for (object, data) in self.load_segment(segment):
398             f = open(os.path.join(segdir, object), 'wb')
399             f.write(data)
400             f.close()
401
402     def load_object(self, segment, object):
403         accessed_segments.add(segment)
404         path = os.path.join(self.get_cachedir(), segment, object)
405         if not os.access(path, os.R_OK):
406             self.extract_segment(segment)
407         if segment in self._lru_list: self._lru_list.remove(segment)
408         self._lru_list.append(segment)
409         while len(self._lru_list) > self.CACHE_SIZE:
410             os.system("rm -rf " + os.path.join(self.cachedir,
411                                                self._lru_list[0]))
412             self._lru_list = self._lru_list[1:]
413         return open(path, 'rb').read()
414
415     def get(self, refstr):
416         """Fetch the given object and return it.
417
418         The input should be an object reference, in string form.
419         """
420
421         (segment, object, checksum, slice) = self.parse_ref(refstr)
422
423         if segment == "zero":
424             return "\0" * slice[1]
425
426         data = self.load_object(segment, object)
427
428         if checksum is not None:
429             verifier = ChecksumVerifier(checksum)
430             verifier.update(data)
431             if not verifier.valid():
432                 raise ValueError
433
434         if slice is not None:
435             (start, length, exact) = slice
436             # Note: The following assertion check may need to be commented out
437             # to restore from pre-v0.8 snapshots, as the syntax for
438             # size-assertion slices has changed.
439             if exact and len(data) != length: raise ValueError
440             data = data[start:start+length]
441             if len(data) != length: raise IndexError
442
443         return data
444
445     def prefetch(self):
446         self.backend.prefetch_generic()
447
448 def parse(lines, terminate=None):
449     """Generic parser for RFC822-style "Key: Value" data streams.
450
451     This parser can be used to read metadata logs and snapshot root descriptor
452     files.
453
454     lines must be an iterable object which yields a sequence of lines of input.
455
456     If terminate is specified, it is used as a predicate to determine when to
457     stop reading input lines.
458     """
459
460     result = {}
461     last_key = None
462
463     def make_result(result):
464         return dict((k, "".join(v)) for (k, v) in result.items())
465
466     for l in lines:
467         # Strip off a trailing newline, if present
468         if len(l) > 0 and l[-1] == "\n":
469             l = l[:-1]
470
471         if terminate is not None and terminate(l):
472             if len(result) > 0: yield make_result(result)
473             result = {}
474             last_key = None
475             continue
476
477         m = re.match(r"^([-\w]+):\s*(.*)$", l)
478         if m:
479             result[m.group(1)] = [m.group(2)]
480             last_key = m.group(1)
481         elif len(l) > 0 and l[0].isspace() and last_key is not None:
482             result[last_key].append(l)
483         else:
484             last_key = None
485
486     if len(result) > 0: yield make_result(result)
487
488 def parse_full(lines):
489     try:
490         return next(parse(lines))
491     except StopIteration:
492         return {}
493
494 def parse_metadata_version(s):
495     """Convert a string with the snapshot version format to a tuple."""
496
497     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
498     if m is None:
499         return ()
500     else:
501         return tuple([int(d) for d in m.group(1).split(".")])
502
503 def read_metadata(object_store, root):
504     """Iterate through all lines in the metadata log, following references."""
505
506     # Stack for keeping track of recursion when following references to
507     # portions of the log.  The last entry in the stack corresponds to the
508     # object currently being parsed.  Each entry is a list of lines which have
509     # been reversed, so that popping successive lines from the end of each list
510     # will return lines of the metadata log in order.
511     stack = []
512
513     def follow_ref(refstr):
514         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
515         lines = to_lines(object_store.get(refstr))
516         lines.reverse()
517         stack.append(lines)
518
519     follow_ref(root)
520
521     while len(stack) > 0:
522         top = stack[-1]
523         if len(top) == 0:
524             stack.pop()
525             continue
526         line = top.pop()
527
528         # An indirect reference which we must follow?
529         if len(line) > 0 and line[0] == '@':
530             ref = line[1:]
531             ref.strip()
532             follow_ref(ref)
533         else:
534             yield line
535
536 class MetadataItem:
537     """Metadata for a single file (or directory or...) from a snapshot."""
538
539     # Functions for parsing various datatypes that can appear in a metadata log
540     # item.
541     @staticmethod
542     def decode_int(s):
543         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
544         if s.startswith("0x"):
545             return int(s, 16)
546         elif s.startswith("0"):
547             return int(s, 8)
548         else:
549             return int(s, 10)
550
551     @staticmethod
552     def decode_str(s):
553         """Decode a URI-encoded (%xx escapes) string."""
554         return cumulus.util.uri_decode_pathname(s)
555
556     @staticmethod
557     def raw_str(s):
558         """An unecoded string."""
559         return s
560
561     @staticmethod
562     def decode_user(s):
563         """Decode a user/group to a tuple of uid/gid followed by name."""
564         items = s.split()
565         uid = MetadataItem.decode_int(items[0])
566         name = None
567         if len(items) > 1:
568             if items[1].startswith("(") and items[1].endswith(")"):
569                 name = MetadataItem.decode_str(items[1][1:-1])
570         return (uid, name)
571
572     @staticmethod
573     def decode_device(s):
574         """Decode a device major/minor number."""
575         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
576         return (major, minor)
577
578     class Items: pass
579
580     def __init__(self, fields, object_store):
581         """Initialize from a dictionary of key/value pairs from metadata log."""
582
583         self.fields = fields
584         self.object_store = object_store
585         self.keys = []
586         self.items = self.Items()
587         for (k, v) in fields.items():
588             if k in self.field_types:
589                 decoder = self.field_types[k]
590                 setattr(self.items, k, decoder(v))
591                 self.keys.append(k)
592
593     def data(self):
594         """Return an iterator for the data blocks that make up a file."""
595
596         # This traverses the list of blocks that make up a file, following
597         # indirect references.  It is implemented in much the same way as
598         # read_metadata, so see that function for details of the technique.
599
600         objects = self.fields['data'].split()
601         objects.reverse()
602         stack = [objects]
603
604         def follow_ref(refstr):
605             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
606             objects = self.object_store.get(refstr).split()
607             objects.reverse()
608             stack.append(objects)
609
610         while len(stack) > 0:
611             top = stack[-1]
612             if len(top) == 0:
613                 stack.pop()
614                 continue
615             ref = top.pop()
616
617             # An indirect reference which we must follow?
618             if len(ref) > 0 and ref[0] == '@':
619                 follow_ref(ref[1:])
620             else:
621                 yield ref
622
623 # Description of fields that might appear, and how they should be parsed.
624 MetadataItem.field_types = {
625     'name': MetadataItem.decode_str,
626     'type': MetadataItem.raw_str,
627     'mode': MetadataItem.decode_int,
628     'device': MetadataItem.decode_device,
629     'user': MetadataItem.decode_user,
630     'group': MetadataItem.decode_user,
631     'ctime': MetadataItem.decode_int,
632     'mtime': MetadataItem.decode_int,
633     'links': MetadataItem.decode_int,
634     'inode': MetadataItem.raw_str,
635     'checksum': MetadataItem.decode_str,
636     'size': MetadataItem.decode_int,
637     'contents': MetadataItem.decode_str,
638     'target': MetadataItem.decode_str,
639 }
640
641 def iterate_metadata(object_store, root):
642     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
643         yield MetadataItem(d, object_store)
644
645 class LocalDatabase:
646     """Access to the local database of snapshot contents and object checksums.
647
648     The local database is consulted when creating a snapshot to determine what
649     data can be re-used from old snapshots.  Segment cleaning is performed by
650     manipulating the data in the local database; the local database also
651     includes enough data to guide the segment cleaning process.
652     """
653
654     def __init__(self, path, dbname="localdb.sqlite"):
655         self.db_connection = sqlite3.connect(path + "/" + dbname)
656
657     # Low-level database access.  Use these methods when there isn't a
658     # higher-level interface available.  Exception: do, however, remember to
659     # use the commit() method after making changes to make sure they are
660     # actually saved, even when going through higher-level interfaces.
661     def commit(self):
662         "Commit any pending changes to the local database."
663         self.db_connection.commit()
664
665     def rollback(self):
666         "Roll back any pending changes to the local database."
667         self.db_connection.rollback()
668
669     def cursor(self):
670         "Return a DB-API cursor for directly accessing the local database."
671         return self.db_connection.cursor()
672
673     def list_schemes(self):
674         """Return the list of snapshots found in the local database.
675
676         The returned value is a list of tuples (id, scheme, name, time, intent).
677         """
678
679         cur = self.cursor()
680         cur.execute("select distinct scheme from snapshots")
681         schemes = [row[0] for row in cur.fetchall()]
682         schemes.sort()
683         return schemes
684
685     def list_snapshots(self, scheme):
686         """Return a list of snapshots for the given scheme."""
687         cur = self.cursor()
688         cur.execute("select name from snapshots")
689         snapshots = [row[0] for row in cur.fetchall()]
690         snapshots.sort()
691         return snapshots
692
693     def delete_snapshot(self, scheme, name):
694         """Remove the specified snapshot from the database.
695
696         Warning: This does not garbage collect all dependent data in the
697         database, so it must be followed by a call to garbage_collect() to make
698         the database consistent.
699         """
700         cur = self.cursor()
701         cur.execute("delete from snapshots where scheme = ? and name = ?",
702                     (scheme, name))
703
704     def prune_old_snapshots(self, scheme, intent=1.0):
705         """Delete entries from old snapshots from the database.
706
707         Only snapshots with the specified scheme name will be deleted.  If
708         intent is given, it gives the intended next snapshot type, to determine
709         how aggressively to clean (for example, intent=7 could be used if the
710         next snapshot will be a weekly snapshot).
711         """
712
713         cur = self.cursor()
714
715         # Find the id of the last snapshot to be created.  This is used for
716         # measuring time in a way: we record this value in each segment we
717         # expire on this run, and then on a future run can tell if there have
718         # been intervening backups made.
719         cur.execute("select max(snapshotid) from snapshots")
720         last_snapshotid = cur.fetchone()[0]
721
722         # Get the list of old snapshots for this scheme.  Delete all the old
723         # ones.  Rules for what to keep:
724         #   - Always keep the most recent snapshot.
725         #   - If snapshot X is younger than Y, and X has higher intent, then Y
726         #     can be deleted.
727         cur.execute("""select snapshotid, name, intent,
728                               julianday('now') - timestamp as age
729                        from snapshots where scheme = ?
730                        order by age""", (scheme,))
731
732         first = True
733         max_intent = intent
734         for (id, name, snap_intent, snap_age) in cur.fetchall():
735             can_delete = False
736             if snap_intent < max_intent:
737                 # Delete small-intent snapshots if there is a more recent
738                 # large-intent snapshot.
739                 can_delete = True
740             elif snap_intent == intent:
741                 # Delete previous snapshots with the specified intent level.
742                 can_delete = True
743
744             if can_delete and not first:
745                 print("Delete snapshot %d (%s)" % (id, name))
746                 cur.execute("delete from snapshots where snapshotid = ?",
747                             (id,))
748             first = False
749             max_intent = max(max_intent, snap_intent)
750
751         self.garbage_collect()
752
753     def garbage_collect(self):
754         """Garbage-collect unreachable segment and object data.
755
756         Remove all segments and checksums which is not reachable from the
757         current set of snapshots stored in the local database.
758         """
759         cur = self.cursor()
760
761         # Delete entries in the segment_utilization table which are for
762         # non-existent snapshots.
763         cur.execute("""delete from segment_utilization
764                        where snapshotid not in
765                            (select snapshotid from snapshots)""")
766
767         # Delete segments not referenced by any current snapshots.
768         cur.execute("""delete from segments where segmentid not in
769                            (select segmentid from segment_utilization)""")
770
771         # Delete dangling objects in the block_index table.
772         cur.execute("""delete from block_index
773                        where segmentid not in
774                            (select segmentid from segments)""")
775
776         # Remove sub-block signatures for deleted objects.
777         cur.execute("""delete from subblock_signatures
778                        where blockid not in
779                            (select blockid from block_index)""")
780
781     # Segment cleaning.
782     class SegmentInfo(Struct): pass
783
784     def get_segment_cleaning_list(self, age_boost=0.0):
785         """Return a list of all current segments with information for cleaning.
786
787         Return all segments which are currently known in the local database
788         (there might be other, older segments in the archive itself), and
789         return usage statistics for each to help decide which segments to
790         clean.
791
792         The returned list will be sorted by estimated cleaning benefit, with
793         segments that are best to clean at the start of the list.
794
795         If specified, the age_boost parameter (measured in days) will added to
796         the age of each segment, as a way of adjusting the benefit computation
797         before a long-lived snapshot is taken (for example, age_boost might be
798         set to 7 when cleaning prior to taking a weekly snapshot).
799         """
800
801         cur = self.cursor()
802         segments = []
803         cur.execute("""select segmentid, used, size, mtime,
804                        julianday('now') - mtime as age from segment_info
805                        where expire_time is null""")
806         for row in cur:
807             info = self.SegmentInfo()
808             info.id = row[0]
809             info.used_bytes = row[1]
810             info.size_bytes = row[2]
811             info.mtime = row[3]
812             info.age_days = row[4]
813
814             # If data is not available for whatever reason, treat it as 0.0.
815             if info.age_days is None:
816                 info.age_days = 0.0
817             if info.used_bytes is None:
818                 info.used_bytes = 0.0
819
820             # Benefit calculation: u is the estimated fraction of each segment
821             # which is utilized (bytes belonging to objects still in use
822             # divided by total size; this doesn't take compression or storage
823             # overhead into account, but should give a reasonable estimate).
824             #
825             # The total benefit is a heuristic that combines several factors:
826             # the amount of space that can be reclaimed (1 - u), an ageing
827             # factor (info.age_days) that favors cleaning old segments to young
828             # ones and also is more likely to clean segments that will be
829             # rewritten for long-lived snapshots (age_boost), and finally a
830             # penalty factor for the cost of re-uploading data (u + 0.1).
831             u = info.used_bytes / info.size_bytes
832             info.cleaning_benefit \
833                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
834
835             segments.append(info)
836
837         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
838         return segments
839
840     def mark_segment_expired(self, segment):
841         """Mark a segment for cleaning in the local database.
842
843         The segment parameter should be either a SegmentInfo object or an
844         integer segment id.  Objects in the given segment will be marked as
845         expired, which means that any future snapshots that would re-use those
846         objects will instead write out a new copy of the object, and thus no
847         future snapshots will depend upon the given segment.
848         """
849
850         if isinstance(segment, int):
851             id = segment
852         elif isinstance(segment, self.SegmentInfo):
853             id = segment.id
854         else:
855             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
856
857         cur = self.cursor()
858         cur.execute("select max(snapshotid) from snapshots")
859         last_snapshotid = cur.fetchone()[0]
860         cur.execute("update segments set expire_time = ? where segmentid = ?",
861                     (last_snapshotid, id))
862         cur.execute("update block_index set expired = 0 where segmentid = ?",
863                     (id,))
864
865     def balance_expired_objects(self):
866         """Analyze expired objects in segments to be cleaned and group by age.
867
868         Update the block_index table of the local database to group expired
869         objects by age.  The exact number of buckets and the cutoffs for each
870         are dynamically determined.  Calling this function after marking
871         segments expired will help in the segment cleaning process, by ensuring
872         that when active objects from clean segments are rewritten, they will
873         be placed into new segments roughly grouped by age.
874         """
875
876         # The expired column of the block_index table is used when generating a
877         # new Cumulus snapshot.  A null value indicates that an object may be
878         # re-used.  Otherwise, an object must be written into a new segment if
879         # needed.  Objects with distinct expired values will be written into
880         # distinct segments, to allow for some grouping by age.  The value 0 is
881         # somewhat special in that it indicates any rewritten objects can be
882         # placed in the same segment as completely new objects; this can be
883         # used for very young objects which have been expired, or objects not
884         # expected to be encountered.
885         #
886         # In the balancing process, all objects which are not used in any
887         # current snapshots will have expired set to 0.  Objects which have
888         # been seen will be sorted by age and will have expired values set to
889         # 0, 1, 2, and so on based on age (with younger objects being assigned
890         # lower values).  The number of buckets and the age cutoffs is
891         # determined by looking at the distribution of block ages.
892
893         cur = self.cursor()
894
895         # Mark all expired objects with expired = 0; these objects will later
896         # have values set to indicate groupings of objects when repacking.
897         cur.execute("""update block_index set expired = 0
898                        where expired is not null""")
899
900         # We will want to aim for at least one full segment for each bucket
901         # that we eventually create, but don't know how many bytes that should
902         # be due to compression.  So compute the average number of bytes in
903         # each expired segment as a rough estimate for the minimum size of each
904         # bucket.  (This estimate could be thrown off by many not-fully-packed
905         # segments, but for now don't worry too much about that.)  If we can't
906         # compute an average, it's probably because there are no expired
907         # segments, so we have no more work to do.
908         cur.execute("""select avg(size) from segments
909                        where segmentid in
910                            (select distinct segmentid from block_index
911                             where expired is not null)""")
912         segment_size_estimate = cur.fetchone()[0]
913         if not segment_size_estimate:
914             return
915
916         # Next, extract distribution of expired objects (number and size) by
917         # age.  Save the timestamp for "now" so that the classification of
918         # blocks into age buckets will not change later in the function, after
919         # time has passed.  Set any timestamps in the future to now, so we are
920         # guaranteed that for the rest of this function, age is always
921         # non-negative.
922         cur.execute("select julianday('now')")
923         now = cur.fetchone()[0]
924
925         cur.execute("""update block_index set timestamp = ?
926                        where timestamp > ? and expired is not null""",
927                     (now, now))
928
929         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
930                        from block_index where expired = 0
931                        group by age order by age""", (now,))
932         distribution = cur.fetchall()
933
934         # Start to determine the buckets for expired objects.  Heuristics used:
935         #   - An upper bound on the number of buckets is given by the number of
936         #     segments we estimate it will take to store all data.  In fact,
937         #     aim for a couple of segments per bucket.
938         #   - Place very young objects in bucket 0 (place with new objects)
939         #     unless there are enough of them to warrant a separate bucket.
940         #   - Try not to create unnecessarily many buckets, since fewer buckets
941         #     will allow repacked data to be grouped based on spatial locality
942         #     (while more buckets will group by temporal locality).  We want a
943         #     balance.
944         MIN_AGE = 4
945         total_bytes = sum([i[2] for i in distribution])
946         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
947         min_size = 1.5 * segment_size_estimate
948         target_size = max(2 * segment_size_estimate,
949                           total_bytes / target_buckets)
950
951         print("segment_size:", segment_size_estimate)
952         print("distribution:", distribution)
953         print("total_bytes:", total_bytes)
954         print("target_buckets:", target_buckets)
955         print("min, target size:", min_size, target_size)
956
957         # Chosen cutoffs.  Each bucket consists of objects with age greater
958         # than one cutoff value, but not greater than the next largest cutoff.
959         cutoffs = []
960
961         # Starting with the oldest objects, begin grouping together into
962         # buckets of size at least target_size bytes.
963         distribution.reverse()
964         bucket_size = 0
965         min_age_bucket = False
966         for (age, items, size) in distribution:
967             if bucket_size >= target_size \
968                 or (age < MIN_AGE and not min_age_bucket):
969                 if bucket_size < target_size and len(cutoffs) > 0:
970                     cutoffs.pop()
971                 cutoffs.append(age)
972                 bucket_size = 0
973
974             bucket_size += size
975             if age < MIN_AGE:
976                 min_age_bucket = True
977
978         # The last (youngest) bucket will be group 0, unless it has enough data
979         # to be of size min_size by itself, or there happen to be no objects
980         # less than MIN_AGE at all.
981         if bucket_size >= min_size or not min_age_bucket:
982             cutoffs.append(-1)
983         cutoffs.append(-1)
984
985         print("cutoffs:", cutoffs)
986
987         # Update the database to assign each object to the appropriate bucket.
988         cutoffs.reverse()
989         for i in range(len(cutoffs)):
990             cur.execute("""update block_index set expired = ?
991                            where round(? - timestamp) > ?
992                              and expired is not null""",
993                         (i, now, cutoffs[i]))