Use the subprocess module, not os.popen2, for Python 3 compatibility.
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division, print_function, unicode_literals
30
31 import codecs
32 import hashlib
33 import itertools
34 import os
35 import re
36 import sqlite3
37 import subprocess
38 import sys
39 import tarfile
40 import tempfile
41 try:
42     import _thread
43 except ImportError:
44     import thread as _thread
45
46 import cumulus.store
47 import cumulus.store.file
48
49 if sys.version < "3":
50     StringTypes = (str, unicode)
51 else:
52     StringTypes = (str,)
53
54 # The largest supported snapshot format that can be understood.
55 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
56
57 # Maximum number of nested indirect references allowed in a snapshot.
58 MAX_RECURSION_DEPTH = 3
59
60 # All segments which have been accessed this session.
61 accessed_segments = set()
62
63 # Table of methods used to filter segments before storage, and corresponding
64 # filename extensions.  These are listed in priority order (methods earlier in
65 # the list are tried first).
66 SEGMENT_FILTERS = [
67     (".gpg", "cumulus-filter-gpg --decrypt"),
68     (".gz", "gzip -dc"),
69     (".bz2", "bzip2 -dc"),
70     ("", None),
71 ]
72
73 def to_lines(data):
74     """Decode binary data from a file into a sequence of lines.
75
76     Newline markers are retained."""
77     return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
78
79 def uri_decode(s):
80     """Decode a URI-encoded (%xx escapes) string."""
81     def hex_decode(m): return chr(int(m.group(1), 16))
82     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
83 def uri_encode(s):
84     """Encode a string to URI-encoded (%xx escapes) form."""
85     def hex_encode(c):
86         if c > '+' and c < '\x7f' and c != '@':
87             return c
88         else:
89             return "%%%02x" % (ord(c),)
90     return ''.join(hex_encode(c) for c in s)
91
92 class Struct:
93     """A class which merely acts as a data container.
94
95     Instances of this class (or its subclasses) are merely used to store data
96     in various attributes.  No methods are provided.
97     """
98
99     def __repr__(self):
100         return "<%s %s>" % (self.__class__, self.__dict__)
101
102 CHECKSUM_ALGORITHMS = {
103     'sha1': hashlib.sha1,
104     'sha224': hashlib.sha224,
105     'sha256': hashlib.sha256,
106 }
107
108 class ChecksumCreator:
109     """Compute a Cumulus checksum for provided data.
110
111     The algorithm used is selectable, but currently defaults to sha1.
112     """
113
114     def __init__(self, algorithm='sha1'):
115         self.algorithm = algorithm
116         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
117
118     def update(self, data):
119         self.hash.update(data)
120         return self
121
122     def compute(self):
123         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
124
125 class ChecksumVerifier:
126     """Verify whether a checksum from a snapshot matches the supplied data."""
127
128     def __init__(self, checksumstr):
129         """Create an object to check the supplied checksum."""
130
131         (algo, checksum) = checksumstr.split("=", 1)
132         self.checksum = checksum
133         self.hash = CHECKSUM_ALGORITHMS[algo]()
134
135     def update(self, data):
136         self.hash.update(data)
137
138     def valid(self):
139         """Return a boolean indicating whether the checksum matches."""
140
141         result = self.hash.hexdigest()
142         return result == self.checksum
143
144 class SearchPathEntry(object):
145     """Item representing a possible search location for Cumulus files.
146
147     Some Cumulus files might be stored in multiple possible file locations: due
148     to format (different compression mechanisms with different extensions),
149     locality (different segments might be placed in different directories to
150     control archiving policies), for backwards compatibility (default location
151     changed over time).  A SearchPathEntry describes a possible location for a
152     file.
153     """
154     def __init__(self, directory_prefix, suffix, context=None):
155         self._directory_prefix = directory_prefix
156         self._suffix = suffix
157         self._context = context
158
159     def __repr__(self):
160         return "%s(%r, %r, %r)" % (self.__class__.__name__,
161                                    self._directory_prefix, self._suffix,
162                                    self._context)
163
164     def build_path(self, basename):
165         """Construct the search path to use for a file with name basename.
166
167         Returns a tuple (pathname, context), where pathname is the path to try
168         and context is any additional data associated with this search entry
169         (if any).
170         """
171         return (os.path.join(self._directory_prefix, basename + self._suffix),
172                 self._context)
173
174 class SearchPath(object):
175     """A collection of locations to search for files and lookup utilities.
176
177     For looking for a file in a Cumulus storage backend, a SearchPath object
178     contains a list of possible locations to try.  A SearchPath can be used to
179     perform the search as well; when a file is found the search path ordering
180     is updated (moving the successful SearchPathEntry to the front of the list
181     for future searches).
182     """
183     def __init__(self, name_regex, searchpath):
184         self._regex = re.compile(name_regex)
185         self._path = list(searchpath)
186
187     def add_search_entry(self, entry):
188         self._path.append(entry)
189
190     def directories(self):
191         """Return the set of directories to search for a file type."""
192         return set(entry._directory_prefix for entry in self._path)
193
194     def get(self, backend, basename):
195         for (i, entry) in enumerate(self._path):
196             try:
197                 (pathname, context) = entry.build_path(basename)
198                 fp = backend.get(pathname)
199                 # On success, move this entry to the front of the search path
200                 # to speed future searches.
201                 if i > 0:
202                     self._path.pop(i)
203                     self._path.insert(0, entry)
204                 return (fp, pathname, context)
205             except cumulus.store.NotFoundError:
206                 continue
207         raise cumulus.store.NotFoundError(basename)
208
209     def stat(self, backend, basename):
210         for (i, entry) in enumerate(self._path):
211             try:
212                 (pathname, context) = entry.build_path(basename)
213                 stat_data = backend.stat(pathname)
214                 # On success, move this entry to the front of the search path
215                 # to speed future searches.
216                 if i > 0:
217                     self._path.pop(i)
218                     self._path.insert(0, entry)
219                 result = {"path": pathname}
220                 result.update(stat_data)
221                 return result
222             except cumulus.store.NotFoundError:
223                 continue
224         raise cumulus.store.NotFoundError(basename)
225
226     def match(self, filename):
227         return self._regex.match(filename)
228
229     def list(self, backend):
230         success = False
231         for d in self.directories():
232             try:
233                 for f in backend.list(d):
234                     success = True
235                     m = self.match(f)
236                     if m: yield (os.path.join(d, f), m)
237             except cumulus.store.NotFoundError:
238                 pass
239         if not success:
240             raise cumulus.store.NotFoundError(backend)
241
242 def _build_segments_searchpath(prefix):
243     for (extension, filter) in SEGMENT_FILTERS:
244         yield SearchPathEntry(prefix, extension, filter)
245
246 SEARCH_PATHS = {
247     "checksums": SearchPath(
248         r"^snapshot-(.*)\.(\w+)sums$",
249         [SearchPathEntry("meta", ".sha1sums"),
250          SearchPathEntry("checksums", ".sha1sums"),
251          SearchPathEntry("", ".sha1sums")]),
252     "meta": SearchPath(
253         r"^snapshot-(.*)\.meta(\.\S+)?$",
254         _build_segments_searchpath("meta")),
255     "segments": SearchPath(
256         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
257          r"\.tar(\.\S+)?$"),
258         itertools.chain(
259             _build_segments_searchpath("segments0"),
260             _build_segments_searchpath("segments1"),
261             _build_segments_searchpath(""),
262             _build_segments_searchpath("segments"))),
263     "snapshots": SearchPath(
264         r"^snapshot-(.*)\.(cumulus|lbs)$",
265         [SearchPathEntry("snapshots", ".cumulus"),
266          SearchPathEntry("snapshots", ".lbs"),
267          SearchPathEntry("", ".cumulus"),
268          SearchPathEntry("", ".lbs")]),
269 }
270
271 class BackendWrapper(object):
272     """Wrapper around a Cumulus storage backend that understands file types.
273
274     The BackendWrapper class understands different Cumulus file types, such as
275     snapshots and segments, and implements higher-level operations such as
276     "retrieve a snapshot with a specific name" (hiding operations such as
277     searching for the correct file name).
278     """
279
280     def __init__(self, backend):
281         """Initializes a wrapper around the specified storage backend.
282
283         store may either be a Store object or URL.
284         """
285         if type(backend) in StringTypes:
286             self._backend = cumulus.store.open(backend)
287         else:
288             self._backend = backend
289
290     @property
291     def raw_backend(self):
292         return self._backend
293
294     def stat_generic(self, basename, filetype):
295         return SEARCH_PATHS[filetype].stat(self._backend, basename)
296
297     def open_generic(self, basename, filetype):
298         return SEARCH_PATHS[filetype].get(self._backend, basename)
299
300     def open_snapshot(self, name):
301         return self.open_generic("snapshot-" + name, "snapshots")
302
303     def open_segment(self, name):
304         return self.open_generic(name + ".tar", "segments")
305
306     def list_generic(self, filetype):
307         return ((x[1].group(1), x[0])
308                 for x in SEARCH_PATHS[filetype].list(self._backend))
309
310     def prefetch_generic(self):
311         """Calls scan on directories to prefetch file metadata."""
312         directories = set()
313         for typeinfo in SEARCH_PATHS.values():
314             directories.update(typeinfo.directories())
315         for d in directories:
316             print("Prefetch", d)
317             self._backend.scan(d)
318
319 class CumulusStore:
320     def __init__(self, backend):
321         if isinstance(backend, BackendWrapper):
322             self.backend = backend
323         else:
324             self.backend = BackendWrapper(backend)
325         self.cachedir = None
326         self.CACHE_SIZE = 16
327         self._lru_list = []
328
329     def get_cachedir(self):
330         if self.cachedir is None:
331             self.cachedir = tempfile.mkdtemp("-cumulus")
332         return self.cachedir
333
334     def cleanup(self):
335         if self.cachedir is not None:
336             # TODO: Avoid use of system, make this safer
337             os.system("rm -rf " + self.cachedir)
338         self.cachedir = None
339
340     @staticmethod
341     def parse_ref(refstr):
342         m = re.match(r"^zero\[(\d+)\]$", refstr)
343         if m:
344             return ("zero", None, None, (0, int(m.group(1)), False))
345
346         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
347         if not m: return
348
349         segment = m.group(1)
350         object = m.group(2)
351         checksum = m.group(3)
352         slice = m.group(4)
353
354         if checksum is not None:
355             checksum = checksum.lstrip("(").rstrip(")")
356
357         if slice is not None:
358             if m.group(9) is not None:
359                 # Size-assertion slice
360                 slice = (0, int(m.group(9)), True)
361             elif m.group(6) is None:
362                 # Abbreviated slice
363                 slice = (0, int(m.group(8)), False)
364             else:
365                 slice = (int(m.group(7)), int(m.group(8)), False)
366
367         return (segment, object, checksum, slice)
368
369     def list_snapshots(self):
370         return set(x[0] for x in self.backend.list_generic("snapshots"))
371
372     def list_segments(self):
373         return set(x[0] for x in self.backend.list_generic("segments"))
374
375     def load_snapshot(self, snapshot):
376         snapshot_file = self.backend.open_snapshot(snapshot)[0]
377         return to_lines(snapshot_file.read())
378
379     @staticmethod
380     def filter_data(filehandle, filter_cmd):
381         if filter_cmd is None:
382             return filehandle
383         p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
384                              stdout=subprocess.PIPE, close_fds=True)
385         input, output = p.stdin, p.stdout
386         def copy_thread(src, dst):
387             BLOCK_SIZE = 4096
388             while True:
389                 block = src.read(BLOCK_SIZE)
390                 if len(block) == 0: break
391                 dst.write(block)
392             src.close()
393             dst.close()
394             p.wait()
395         _thread.start_new_thread(copy_thread, (filehandle, input))
396         return output
397
398     def get_segment(self, segment):
399         accessed_segments.add(segment)
400
401         (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
402         return self.filter_data(segment_fp, filter_cmd)
403
404     def load_segment(self, segment):
405         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
406         for item in seg:
407             data_obj = seg.extractfile(item)
408             path = item.name.split('/')
409             if len(path) == 2 and path[0] == segment:
410                 yield (path[1], data_obj.read())
411
412     def extract_segment(self, segment):
413         segdir = os.path.join(self.get_cachedir(), segment)
414         os.mkdir(segdir)
415         for (object, data) in self.load_segment(segment):
416             f = open(os.path.join(segdir, object), 'wb')
417             f.write(data)
418             f.close()
419
420     def load_object(self, segment, object):
421         accessed_segments.add(segment)
422         path = os.path.join(self.get_cachedir(), segment, object)
423         if not os.access(path, os.R_OK):
424             self.extract_segment(segment)
425         if segment in self._lru_list: self._lru_list.remove(segment)
426         self._lru_list.append(segment)
427         while len(self._lru_list) > self.CACHE_SIZE:
428             os.system("rm -rf " + os.path.join(self.cachedir,
429                                                self._lru_list[0]))
430             self._lru_list = self._lru_list[1:]
431         return open(path, 'rb').read()
432
433     def get(self, refstr):
434         """Fetch the given object and return it.
435
436         The input should be an object reference, in string form.
437         """
438
439         (segment, object, checksum, slice) = self.parse_ref(refstr)
440
441         if segment == "zero":
442             return "\0" * slice[1]
443
444         data = self.load_object(segment, object)
445
446         if checksum is not None:
447             verifier = ChecksumVerifier(checksum)
448             verifier.update(data)
449             if not verifier.valid():
450                 raise ValueError
451
452         if slice is not None:
453             (start, length, exact) = slice
454             if exact and len(data) != length: raise ValueError
455             data = data[start:start+length]
456             if len(data) != length: raise IndexError
457
458         return data
459
460     def prefetch(self):
461         self.backend.prefetch_generic()
462
463 def parse(lines, terminate=None):
464     """Generic parser for RFC822-style "Key: Value" data streams.
465
466     This parser can be used to read metadata logs and snapshot root descriptor
467     files.
468
469     lines must be an iterable object which yields a sequence of lines of input.
470
471     If terminate is specified, it is used as a predicate to determine when to
472     stop reading input lines.
473     """
474
475     dict = {}
476     last_key = None
477
478     for l in lines:
479         # Strip off a trailing newline, if present
480         if len(l) > 0 and l[-1] == "\n":
481             l = l[:-1]
482
483         if terminate is not None and terminate(l):
484             if len(dict) > 0: yield dict
485             dict = {}
486             last_key = None
487             continue
488
489         m = re.match(r"^([-\w]+):\s*(.*)$", l)
490         if m:
491             dict[m.group(1)] = m.group(2)
492             last_key = m.group(1)
493         elif len(l) > 0 and l[0].isspace() and last_key is not None:
494             dict[last_key] += l
495         else:
496             last_key = None
497
498     if len(dict) > 0: yield dict
499
500 def parse_full(lines):
501     try:
502         return next(parse(lines))
503     except StopIteration:
504         return {}
505
506 def parse_metadata_version(s):
507     """Convert a string with the snapshot version format to a tuple."""
508
509     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
510     if m is None:
511         return ()
512     else:
513         return tuple([int(d) for d in m.group(1).split(".")])
514
515 def read_metadata(object_store, root):
516     """Iterate through all lines in the metadata log, following references."""
517
518     # Stack for keeping track of recursion when following references to
519     # portions of the log.  The last entry in the stack corresponds to the
520     # object currently being parsed.  Each entry is a list of lines which have
521     # been reversed, so that popping successive lines from the end of each list
522     # will return lines of the metadata log in order.
523     stack = []
524
525     def follow_ref(refstr):
526         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
527         lines = to_lines(object_store.get(refstr))
528         lines.reverse()
529         stack.append(lines)
530
531     follow_ref(root)
532
533     while len(stack) > 0:
534         top = stack[-1]
535         if len(top) == 0:
536             stack.pop()
537             continue
538         line = top.pop()
539
540         # An indirect reference which we must follow?
541         if len(line) > 0 and line[0] == '@':
542             ref = line[1:]
543             ref.strip()
544             follow_ref(ref)
545         else:
546             yield line
547
548 class MetadataItem:
549     """Metadata for a single file (or directory or...) from a snapshot."""
550
551     # Functions for parsing various datatypes that can appear in a metadata log
552     # item.
553     @staticmethod
554     def decode_int(s):
555         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
556         if s.startswith("0x"):
557             return int(s, 16)
558         elif s.startswith("0"):
559             return int(s, 8)
560         else:
561             return int(s, 10)
562
563     @staticmethod
564     def decode_str(s):
565         """Decode a URI-encoded (%xx escapes) string."""
566         return uri_decode(s)
567
568     @staticmethod
569     def raw_str(s):
570         """An unecoded string."""
571         return s
572
573     @staticmethod
574     def decode_user(s):
575         """Decode a user/group to a tuple of uid/gid followed by name."""
576         items = s.split()
577         uid = MetadataItem.decode_int(items[0])
578         name = None
579         if len(items) > 1:
580             if items[1].startswith("(") and items[1].endswith(")"):
581                 name = MetadataItem.decode_str(items[1][1:-1])
582         return (uid, name)
583
584     @staticmethod
585     def decode_device(s):
586         """Decode a device major/minor number."""
587         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
588         return (major, minor)
589
590     class Items: pass
591
592     def __init__(self, fields, object_store):
593         """Initialize from a dictionary of key/value pairs from metadata log."""
594
595         self.fields = fields
596         self.object_store = object_store
597         self.keys = []
598         self.items = self.Items()
599         for (k, v) in fields.items():
600             if k in self.field_types:
601                 decoder = self.field_types[k]
602                 setattr(self.items, k, decoder(v))
603                 self.keys.append(k)
604
605     def data(self):
606         """Return an iterator for the data blocks that make up a file."""
607
608         # This traverses the list of blocks that make up a file, following
609         # indirect references.  It is implemented in much the same way as
610         # read_metadata, so see that function for details of the technique.
611
612         objects = self.fields['data'].split()
613         objects.reverse()
614         stack = [objects]
615
616         def follow_ref(refstr):
617             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
618             objects = self.object_store.get(refstr).split()
619             objects.reverse()
620             stack.append(objects)
621
622         while len(stack) > 0:
623             top = stack[-1]
624             if len(top) == 0:
625                 stack.pop()
626                 continue
627             ref = top.pop()
628
629             # An indirect reference which we must follow?
630             if len(ref) > 0 and ref[0] == '@':
631                 follow_ref(ref[1:])
632             else:
633                 yield ref
634
635 # Description of fields that might appear, and how they should be parsed.
636 MetadataItem.field_types = {
637     'name': MetadataItem.decode_str,
638     'type': MetadataItem.raw_str,
639     'mode': MetadataItem.decode_int,
640     'device': MetadataItem.decode_device,
641     'user': MetadataItem.decode_user,
642     'group': MetadataItem.decode_user,
643     'ctime': MetadataItem.decode_int,
644     'mtime': MetadataItem.decode_int,
645     'links': MetadataItem.decode_int,
646     'inode': MetadataItem.raw_str,
647     'checksum': MetadataItem.decode_str,
648     'size': MetadataItem.decode_int,
649     'contents': MetadataItem.decode_str,
650     'target': MetadataItem.decode_str,
651 }
652
653 def iterate_metadata(object_store, root):
654     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
655         yield MetadataItem(d, object_store)
656
657 class LocalDatabase:
658     """Access to the local database of snapshot contents and object checksums.
659
660     The local database is consulted when creating a snapshot to determine what
661     data can be re-used from old snapshots.  Segment cleaning is performed by
662     manipulating the data in the local database; the local database also
663     includes enough data to guide the segment cleaning process.
664     """
665
666     def __init__(self, path, dbname="localdb.sqlite"):
667         self.db_connection = sqlite3.connect(path + "/" + dbname)
668
669     # Low-level database access.  Use these methods when there isn't a
670     # higher-level interface available.  Exception: do, however, remember to
671     # use the commit() method after making changes to make sure they are
672     # actually saved, even when going through higher-level interfaces.
673     def commit(self):
674         "Commit any pending changes to the local database."
675         self.db_connection.commit()
676
677     def rollback(self):
678         "Roll back any pending changes to the local database."
679         self.db_connection.rollback()
680
681     def cursor(self):
682         "Return a DB-API cursor for directly accessing the local database."
683         return self.db_connection.cursor()
684
685     def list_schemes(self):
686         """Return the list of snapshots found in the local database.
687
688         The returned value is a list of tuples (id, scheme, name, time, intent).
689         """
690
691         cur = self.cursor()
692         cur.execute("select distinct scheme from snapshots")
693         schemes = [row[0] for row in cur.fetchall()]
694         schemes.sort()
695         return schemes
696
697     def list_snapshots(self, scheme):
698         """Return a list of snapshots for the given scheme."""
699         cur = self.cursor()
700         cur.execute("select name from snapshots")
701         snapshots = [row[0] for row in cur.fetchall()]
702         snapshots.sort()
703         return snapshots
704
705     def delete_snapshot(self, scheme, name):
706         """Remove the specified snapshot from the database.
707
708         Warning: This does not garbage collect all dependent data in the
709         database, so it must be followed by a call to garbage_collect() to make
710         the database consistent.
711         """
712         cur = self.cursor()
713         cur.execute("delete from snapshots where scheme = ? and name = ?",
714                     (scheme, name))
715
716     def prune_old_snapshots(self, scheme, intent=1.0):
717         """Delete entries from old snapshots from the database.
718
719         Only snapshots with the specified scheme name will be deleted.  If
720         intent is given, it gives the intended next snapshot type, to determine
721         how aggressively to clean (for example, intent=7 could be used if the
722         next snapshot will be a weekly snapshot).
723         """
724
725         cur = self.cursor()
726
727         # Find the id of the last snapshot to be created.  This is used for
728         # measuring time in a way: we record this value in each segment we
729         # expire on this run, and then on a future run can tell if there have
730         # been intervening backups made.
731         cur.execute("select max(snapshotid) from snapshots")
732         last_snapshotid = cur.fetchone()[0]
733
734         # Get the list of old snapshots for this scheme.  Delete all the old
735         # ones.  Rules for what to keep:
736         #   - Always keep the most recent snapshot.
737         #   - If snapshot X is younger than Y, and X has higher intent, then Y
738         #     can be deleted.
739         cur.execute("""select snapshotid, name, intent,
740                               julianday('now') - timestamp as age
741                        from snapshots where scheme = ?
742                        order by age""", (scheme,))
743
744         first = True
745         max_intent = intent
746         for (id, name, snap_intent, snap_age) in cur.fetchall():
747             can_delete = False
748             if snap_intent < max_intent:
749                 # Delete small-intent snapshots if there is a more recent
750                 # large-intent snapshot.
751                 can_delete = True
752             elif snap_intent == intent:
753                 # Delete previous snapshots with the specified intent level.
754                 can_delete = True
755
756             if can_delete and not first:
757                 print("Delete snapshot %d (%s)" % (id, name))
758                 cur.execute("delete from snapshots where snapshotid = ?",
759                             (id,))
760             first = False
761             max_intent = max(max_intent, snap_intent)
762
763         self.garbage_collect()
764
765     def garbage_collect(self):
766         """Garbage-collect unreachable segment and object data.
767
768         Remove all segments and checksums which is not reachable from the
769         current set of snapshots stored in the local database.
770         """
771         cur = self.cursor()
772
773         # Delete entries in the segment_utilization table which are for
774         # non-existent snapshots.
775         cur.execute("""delete from segment_utilization
776                        where snapshotid not in
777                            (select snapshotid from snapshots)""")
778
779         # Delete segments not referenced by any current snapshots.
780         cur.execute("""delete from segments where segmentid not in
781                            (select segmentid from segment_utilization)""")
782
783         # Delete dangling objects in the block_index table.
784         cur.execute("""delete from block_index
785                        where segmentid not in
786                            (select segmentid from segments)""")
787
788         # Remove sub-block signatures for deleted objects.
789         cur.execute("""delete from subblock_signatures
790                        where blockid not in
791                            (select blockid from block_index)""")
792
793     # Segment cleaning.
794     class SegmentInfo(Struct): pass
795
796     def get_segment_cleaning_list(self, age_boost=0.0):
797         """Return a list of all current segments with information for cleaning.
798
799         Return all segments which are currently known in the local database
800         (there might be other, older segments in the archive itself), and
801         return usage statistics for each to help decide which segments to
802         clean.
803
804         The returned list will be sorted by estimated cleaning benefit, with
805         segments that are best to clean at the start of the list.
806
807         If specified, the age_boost parameter (measured in days) will added to
808         the age of each segment, as a way of adjusting the benefit computation
809         before a long-lived snapshot is taken (for example, age_boost might be
810         set to 7 when cleaning prior to taking a weekly snapshot).
811         """
812
813         cur = self.cursor()
814         segments = []
815         cur.execute("""select segmentid, used, size, mtime,
816                        julianday('now') - mtime as age from segment_info
817                        where expire_time is null""")
818         for row in cur:
819             info = self.SegmentInfo()
820             info.id = row[0]
821             info.used_bytes = row[1]
822             info.size_bytes = row[2]
823             info.mtime = row[3]
824             info.age_days = row[4]
825
826             # If data is not available for whatever reason, treat it as 0.0.
827             if info.age_days is None:
828                 info.age_days = 0.0
829             if info.used_bytes is None:
830                 info.used_bytes = 0.0
831
832             # Benefit calculation: u is the estimated fraction of each segment
833             # which is utilized (bytes belonging to objects still in use
834             # divided by total size; this doesn't take compression or storage
835             # overhead into account, but should give a reasonable estimate).
836             #
837             # The total benefit is a heuristic that combines several factors:
838             # the amount of space that can be reclaimed (1 - u), an ageing
839             # factor (info.age_days) that favors cleaning old segments to young
840             # ones and also is more likely to clean segments that will be
841             # rewritten for long-lived snapshots (age_boost), and finally a
842             # penalty factor for the cost of re-uploading data (u + 0.1).
843             u = info.used_bytes / info.size_bytes
844             info.cleaning_benefit \
845                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
846
847             segments.append(info)
848
849         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
850         return segments
851
852     def mark_segment_expired(self, segment):
853         """Mark a segment for cleaning in the local database.
854
855         The segment parameter should be either a SegmentInfo object or an
856         integer segment id.  Objects in the given segment will be marked as
857         expired, which means that any future snapshots that would re-use those
858         objects will instead write out a new copy of the object, and thus no
859         future snapshots will depend upon the given segment.
860         """
861
862         if isinstance(segment, int):
863             id = segment
864         elif isinstance(segment, self.SegmentInfo):
865             id = segment.id
866         else:
867             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
868
869         cur = self.cursor()
870         cur.execute("select max(snapshotid) from snapshots")
871         last_snapshotid = cur.fetchone()[0]
872         cur.execute("update segments set expire_time = ? where segmentid = ?",
873                     (last_snapshotid, id))
874         cur.execute("update block_index set expired = 0 where segmentid = ?",
875                     (id,))
876
877     def balance_expired_objects(self):
878         """Analyze expired objects in segments to be cleaned and group by age.
879
880         Update the block_index table of the local database to group expired
881         objects by age.  The exact number of buckets and the cutoffs for each
882         are dynamically determined.  Calling this function after marking
883         segments expired will help in the segment cleaning process, by ensuring
884         that when active objects from clean segments are rewritten, they will
885         be placed into new segments roughly grouped by age.
886         """
887
888         # The expired column of the block_index table is used when generating a
889         # new Cumulus snapshot.  A null value indicates that an object may be
890         # re-used.  Otherwise, an object must be written into a new segment if
891         # needed.  Objects with distinct expired values will be written into
892         # distinct segments, to allow for some grouping by age.  The value 0 is
893         # somewhat special in that it indicates any rewritten objects can be
894         # placed in the same segment as completely new objects; this can be
895         # used for very young objects which have been expired, or objects not
896         # expected to be encountered.
897         #
898         # In the balancing process, all objects which are not used in any
899         # current snapshots will have expired set to 0.  Objects which have
900         # been seen will be sorted by age and will have expired values set to
901         # 0, 1, 2, and so on based on age (with younger objects being assigned
902         # lower values).  The number of buckets and the age cutoffs is
903         # determined by looking at the distribution of block ages.
904
905         cur = self.cursor()
906
907         # Mark all expired objects with expired = 0; these objects will later
908         # have values set to indicate groupings of objects when repacking.
909         cur.execute("""update block_index set expired = 0
910                        where expired is not null""")
911
912         # We will want to aim for at least one full segment for each bucket
913         # that we eventually create, but don't know how many bytes that should
914         # be due to compression.  So compute the average number of bytes in
915         # each expired segment as a rough estimate for the minimum size of each
916         # bucket.  (This estimate could be thrown off by many not-fully-packed
917         # segments, but for now don't worry too much about that.)  If we can't
918         # compute an average, it's probably because there are no expired
919         # segments, so we have no more work to do.
920         cur.execute("""select avg(size) from segments
921                        where segmentid in
922                            (select distinct segmentid from block_index
923                             where expired is not null)""")
924         segment_size_estimate = cur.fetchone()[0]
925         if not segment_size_estimate:
926             return
927
928         # Next, extract distribution of expired objects (number and size) by
929         # age.  Save the timestamp for "now" so that the classification of
930         # blocks into age buckets will not change later in the function, after
931         # time has passed.  Set any timestamps in the future to now, so we are
932         # guaranteed that for the rest of this function, age is always
933         # non-negative.
934         cur.execute("select julianday('now')")
935         now = cur.fetchone()[0]
936
937         cur.execute("""update block_index set timestamp = ?
938                        where timestamp > ? and expired is not null""",
939                     (now, now))
940
941         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
942                        from block_index where expired = 0
943                        group by age order by age""", (now,))
944         distribution = cur.fetchall()
945
946         # Start to determine the buckets for expired objects.  Heuristics used:
947         #   - An upper bound on the number of buckets is given by the number of
948         #     segments we estimate it will take to store all data.  In fact,
949         #     aim for a couple of segments per bucket.
950         #   - Place very young objects in bucket 0 (place with new objects)
951         #     unless there are enough of them to warrant a separate bucket.
952         #   - Try not to create unnecessarily many buckets, since fewer buckets
953         #     will allow repacked data to be grouped based on spatial locality
954         #     (while more buckets will group by temporal locality).  We want a
955         #     balance.
956         MIN_AGE = 4
957         total_bytes = sum([i[2] for i in distribution])
958         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
959         min_size = 1.5 * segment_size_estimate
960         target_size = max(2 * segment_size_estimate,
961                           total_bytes / target_buckets)
962
963         print("segment_size:", segment_size_estimate)
964         print("distribution:", distribution)
965         print("total_bytes:", total_bytes)
966         print("target_buckets:", target_buckets)
967         print("min, target size:", min_size, target_size)
968
969         # Chosen cutoffs.  Each bucket consists of objects with age greater
970         # than one cutoff value, but not greater than the next largest cutoff.
971         cutoffs = []
972
973         # Starting with the oldest objects, begin grouping together into
974         # buckets of size at least target_size bytes.
975         distribution.reverse()
976         bucket_size = 0
977         min_age_bucket = False
978         for (age, items, size) in distribution:
979             if bucket_size >= target_size \
980                 or (age < MIN_AGE and not min_age_bucket):
981                 if bucket_size < target_size and len(cutoffs) > 0:
982                     cutoffs.pop()
983                 cutoffs.append(age)
984                 bucket_size = 0
985
986             bucket_size += size
987             if age < MIN_AGE:
988                 min_age_bucket = True
989
990         # The last (youngest) bucket will be group 0, unless it has enough data
991         # to be of size min_size by itself, or there happen to be no objects
992         # less than MIN_AGE at all.
993         if bucket_size >= min_size or not min_age_bucket:
994             cutoffs.append(-1)
995         cutoffs.append(-1)
996
997         print("cutoffs:", cutoffs)
998
999         # Update the database to assign each object to the appropriate bucket.
1000         cutoffs.reverse()
1001         for i in range(len(cutoffs)):
1002             cur.execute("""update block_index set expired = ?
1003                            where round(? - timestamp) > ?
1004                              and expired is not null""",
1005                         (i, now, cutoffs[i]))