Manual 2to3 fixups.
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division, print_function, unicode_literals
30
31 import hashlib
32 import itertools
33 import os
34 import re
35 import sqlite3
36 import sys
37 import tarfile
38 import tempfile
39 try:
40     import _thread
41 except ImportError:
42     import thread as _thread
43
44 import cumulus.store
45 import cumulus.store.file
46
47 if sys.version < '3':
48     StringTypes = (str, unicode)
49 else:
50     StringTypes = (str,)
51
52 # The largest supported snapshot format that can be understood.
53 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
54
55 # Maximum number of nested indirect references allowed in a snapshot.
56 MAX_RECURSION_DEPTH = 3
57
58 # All segments which have been accessed this session.
59 accessed_segments = set()
60
61 # Table of methods used to filter segments before storage, and corresponding
62 # filename extensions.  These are listed in priority order (methods earlier in
63 # the list are tried first).
64 SEGMENT_FILTERS = [
65     (".gpg", "cumulus-filter-gpg --decrypt"),
66     (".gz", "gzip -dc"),
67     (".bz2", "bzip2 -dc"),
68     ("", None),
69 ]
70
71 def uri_decode(s):
72     """Decode a URI-encoded (%xx escapes) string."""
73     def hex_decode(m): return chr(int(m.group(1), 16))
74     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
75 def uri_encode(s):
76     """Encode a string to URI-encoded (%xx escapes) form."""
77     def hex_encode(c):
78         if c > '+' and c < '\x7f' and c != '@':
79             return c
80         else:
81             return "%%%02x" % (ord(c),)
82     return ''.join(hex_encode(c) for c in s)
83
84 class Struct:
85     """A class which merely acts as a data container.
86
87     Instances of this class (or its subclasses) are merely used to store data
88     in various attributes.  No methods are provided.
89     """
90
91     def __repr__(self):
92         return "<%s %s>" % (self.__class__, self.__dict__)
93
94 CHECKSUM_ALGORITHMS = {
95     'sha1': hashlib.sha1,
96     'sha224': hashlib.sha224,
97     'sha256': hashlib.sha256,
98 }
99
100 class ChecksumCreator:
101     """Compute a Cumulus checksum for provided data.
102
103     The algorithm used is selectable, but currently defaults to sha1.
104     """
105
106     def __init__(self, algorithm='sha1'):
107         self.algorithm = algorithm
108         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
109
110     def update(self, data):
111         self.hash.update(data)
112         return self
113
114     def compute(self):
115         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
116
117 class ChecksumVerifier:
118     """Verify whether a checksum from a snapshot matches the supplied data."""
119
120     def __init__(self, checksumstr):
121         """Create an object to check the supplied checksum."""
122
123         (algo, checksum) = checksumstr.split("=", 1)
124         self.checksum = checksum
125         self.hash = CHECKSUM_ALGORITHMS[algo]()
126
127     def update(self, data):
128         self.hash.update(data)
129
130     def valid(self):
131         """Return a boolean indicating whether the checksum matches."""
132
133         result = self.hash.hexdigest()
134         return result == self.checksum
135
136 class SearchPathEntry(object):
137     """Item representing a possible search location for Cumulus files.
138
139     Some Cumulus files might be stored in multiple possible file locations: due
140     to format (different compression mechanisms with different extensions),
141     locality (different segments might be placed in different directories to
142     control archiving policies), for backwards compatibility (default location
143     changed over time).  A SearchPathEntry describes a possible location for a
144     file.
145     """
146     def __init__(self, directory_prefix, suffix, context=None):
147         self._directory_prefix = directory_prefix
148         self._suffix = suffix
149         self._context = context
150
151     def __repr__(self):
152         return "%s(%r, %r, %r)" % (self.__class__.__name__,
153                                    self._directory_prefix, self._suffix,
154                                    self._context)
155
156     def build_path(self, basename):
157         """Construct the search path to use for a file with name basename.
158
159         Returns a tuple (pathname, context), where pathname is the path to try
160         and context is any additional data associated with this search entry
161         (if any).
162         """
163         return (os.path.join(self._directory_prefix, basename + self._suffix),
164                 self._context)
165
166 class SearchPath(object):
167     """A collection of locations to search for files and lookup utilities.
168
169     For looking for a file in a Cumulus storage backend, a SearchPath object
170     contains a list of possible locations to try.  A SearchPath can be used to
171     perform the search as well; when a file is found the search path ordering
172     is updated (moving the successful SearchPathEntry to the front of the list
173     for future searches).
174     """
175     def __init__(self, name_regex, searchpath):
176         self._regex = re.compile(name_regex)
177         self._path = list(searchpath)
178
179     def add_search_entry(self, entry):
180         self._path.append(entry)
181
182     def directories(self):
183         """Return the set of directories to search for a file type."""
184         return set(entry._directory_prefix for entry in self._path)
185
186     def get(self, backend, basename):
187         for (i, entry) in enumerate(self._path):
188             try:
189                 (pathname, context) = entry.build_path(basename)
190                 fp = backend.get(pathname)
191                 # On success, move this entry to the front of the search path
192                 # to speed future searches.
193                 if i > 0:
194                     self._path.pop(i)
195                     self._path.insert(0, entry)
196                 return (fp, pathname, context)
197             except cumulus.store.NotFoundError:
198                 continue
199         raise cumulus.store.NotFoundError(basename)
200
201     def stat(self, backend, basename):
202         for (i, entry) in enumerate(self._path):
203             try:
204                 (pathname, context) = entry.build_path(basename)
205                 stat_data = backend.stat(pathname)
206                 # On success, move this entry to the front of the search path
207                 # to speed future searches.
208                 if i > 0:
209                     self._path.pop(i)
210                     self._path.insert(0, entry)
211                 result = {"path": pathname}
212                 result.update(stat_data)
213                 return result
214             except cumulus.store.NotFoundError:
215                 continue
216         raise cumulus.store.NotFoundError(basename)
217
218     def match(self, filename):
219         return self._regex.match(filename)
220
221     def list(self, backend):
222         success = False
223         for d in self.directories():
224             try:
225                 for f in backend.list(d):
226                     success = True
227                     m = self.match(f)
228                     if m: yield (os.path.join(d, f), m)
229             except cumulus.store.NotFoundError:
230                 pass
231         if not success:
232             raise cumulus.store.NotFoundError(backend)
233
234 def _build_segments_searchpath(prefix):
235     for (extension, filter) in SEGMENT_FILTERS:
236         yield SearchPathEntry(prefix, extension, filter)
237
238 SEARCH_PATHS = {
239     "checksums": SearchPath(
240         r"^snapshot-(.*)\.(\w+)sums$",
241         [SearchPathEntry("meta", ".sha1sums"),
242          SearchPathEntry("checksums", ".sha1sums"),
243          SearchPathEntry("", ".sha1sums")]),
244     "meta": SearchPath(
245         r"^snapshot-(.*)\.meta(\.\S+)?$",
246         _build_segments_searchpath("meta")),
247     "segments": SearchPath(
248         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
249          r"\.tar(\.\S+)?$"),
250         itertools.chain(
251             _build_segments_searchpath("segments0"),
252             _build_segments_searchpath("segments1"),
253             _build_segments_searchpath(""),
254             _build_segments_searchpath("segments"))),
255     "snapshots": SearchPath(
256         r"^snapshot-(.*)\.(cumulus|lbs)$",
257         [SearchPathEntry("snapshots", ".cumulus"),
258          SearchPathEntry("snapshots", ".lbs"),
259          SearchPathEntry("", ".cumulus"),
260          SearchPathEntry("", ".lbs")]),
261 }
262
263 class BackendWrapper(object):
264     """Wrapper around a Cumulus storage backend that understands file types.
265
266     The BackendWrapper class understands different Cumulus file types, such as
267     snapshots and segments, and implements higher-level operations such as
268     "retrieve a snapshot with a specific name" (hiding operations such as
269     searching for the correct file name).
270     """
271
272     def __init__(self, backend):
273         """Initializes a wrapper around the specified storage backend.
274
275         store may either be a Store object or URL.
276         """
277         if type(backend) in StringTypes:
278             if backend.find(":") >= 0:
279                 self._backend = cumulus.store.open(backend)
280             else:
281                 self._backend = cumulus.store.file.FileStore(backend)
282         else:
283             self._backend = backend
284
285     @property
286     def raw_backend(self):
287         return self._backend
288
289     def stat_generic(self, basename, filetype):
290         return SEARCH_PATHS[filetype].stat(self._backend, basename)
291
292     def open_generic(self, basename, filetype):
293         return SEARCH_PATHS[filetype].get(self._backend, basename)
294
295     def open_snapshot(self, name):
296         return self.open_generic("snapshot-" + name, "snapshots")
297
298     def open_segment(self, name):
299         return self.open_generic(name + ".tar", "segments")
300
301     def list_generic(self, filetype):
302         return ((x[1].group(1), x[0])
303                 for x in SEARCH_PATHS[filetype].list(self._backend))
304
305     def prefetch_generic(self):
306         """Calls scan on directories to prefetch file metadata."""
307         directories = set()
308         for typeinfo in SEARCH_PATHS.values():
309             directories.update(typeinfo.directories())
310         for d in directories:
311             print("Prefetch", d)
312             self._backend.scan(d)
313
314 class CumulusStore:
315     def __init__(self, backend):
316         if isinstance(backend, BackendWrapper):
317             self.backend = backend
318         else:
319             self.backend = BackendWrapper(backend)
320         self.cachedir = None
321         self.CACHE_SIZE = 16
322         self._lru_list = []
323
324     def get_cachedir(self):
325         if self.cachedir is None:
326             self.cachedir = tempfile.mkdtemp("-cumulus")
327         return self.cachedir
328
329     def cleanup(self):
330         if self.cachedir is not None:
331             # TODO: Avoid use of system, make this safer
332             os.system("rm -rf " + self.cachedir)
333         self.cachedir = None
334
335     @staticmethod
336     def parse_ref(refstr):
337         m = re.match(r"^zero\[(\d+)\]$", refstr)
338         if m:
339             return ("zero", None, None, (0, int(m.group(1)), False))
340
341         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
342         if not m: return
343
344         segment = m.group(1)
345         object = m.group(2)
346         checksum = m.group(3)
347         slice = m.group(4)
348
349         if checksum is not None:
350             checksum = checksum.lstrip("(").rstrip(")")
351
352         if slice is not None:
353             if m.group(9) is not None:
354                 # Size-assertion slice
355                 slice = (0, int(m.group(9)), True)
356             elif m.group(6) is None:
357                 # Abbreviated slice
358                 slice = (0, int(m.group(8)), False)
359             else:
360                 slice = (int(m.group(7)), int(m.group(8)), False)
361
362         return (segment, object, checksum, slice)
363
364     def list_snapshots(self):
365         return set(x[0] for x in self.backend.list_generic("snapshots"))
366
367     def list_segments(self):
368         return set(x[0] for x in self.backend.list_generic("segments"))
369
370     def load_snapshot(self, snapshot):
371         snapshot_file = self.backend.open_snapshot(snapshot)[0]
372         return snapshot_file.read().splitlines(True)
373
374     @staticmethod
375     def filter_data(filehandle, filter_cmd):
376         if filter_cmd is None:
377             return filehandle
378         (input, output) = os.popen2(filter_cmd)
379         def copy_thread(src, dst):
380             BLOCK_SIZE = 4096
381             while True:
382                 block = src.read(BLOCK_SIZE)
383                 if len(block) == 0: break
384                 dst.write(block)
385             src.close()
386             dst.close()
387         _thread.start_new_thread(copy_thread, (filehandle, input))
388         return output
389
390     def get_segment(self, segment):
391         accessed_segments.add(segment)
392
393         (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
394         return self.filter_data(segment_fp, filter_cmd)
395
396     def load_segment(self, segment):
397         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
398         for item in seg:
399             data_obj = seg.extractfile(item)
400             path = item.name.split('/')
401             if len(path) == 2 and path[0] == segment:
402                 yield (path[1], data_obj.read())
403
404     def extract_segment(self, segment):
405         segdir = os.path.join(self.get_cachedir(), segment)
406         os.mkdir(segdir)
407         for (object, data) in self.load_segment(segment):
408             f = open(os.path.join(segdir, object), 'wb')
409             f.write(data)
410             f.close()
411
412     def load_object(self, segment, object):
413         accessed_segments.add(segment)
414         path = os.path.join(self.get_cachedir(), segment, object)
415         if not os.access(path, os.R_OK):
416             self.extract_segment(segment)
417         if segment in self._lru_list: self._lru_list.remove(segment)
418         self._lru_list.append(segment)
419         while len(self._lru_list) > self.CACHE_SIZE:
420             os.system("rm -rf " + os.path.join(self.cachedir,
421                                                self._lru_list[0]))
422             self._lru_list = self._lru_list[1:]
423         return open(path, 'rb').read()
424
425     def get(self, refstr):
426         """Fetch the given object and return it.
427
428         The input should be an object reference, in string form.
429         """
430
431         (segment, object, checksum, slice) = self.parse_ref(refstr)
432
433         if segment == "zero":
434             return "\0" * slice[1]
435
436         data = self.load_object(segment, object)
437
438         if checksum is not None:
439             verifier = ChecksumVerifier(checksum)
440             verifier.update(data)
441             if not verifier.valid():
442                 raise ValueError
443
444         if slice is not None:
445             (start, length, exact) = slice
446             if exact and len(data) != length: raise ValueError
447             data = data[start:start+length]
448             if len(data) != length: raise IndexError
449
450         return data
451
452     def prefetch(self):
453         self.backend.prefetch_generic()
454
455 def parse(lines, terminate=None):
456     """Generic parser for RFC822-style "Key: Value" data streams.
457
458     This parser can be used to read metadata logs and snapshot root descriptor
459     files.
460
461     lines must be an iterable object which yields a sequence of lines of input.
462
463     If terminate is specified, it is used as a predicate to determine when to
464     stop reading input lines.
465     """
466
467     dict = {}
468     last_key = None
469
470     for l in lines:
471         # Strip off a trailing newline, if present
472         if len(l) > 0 and l[-1] == "\n":
473             l = l[:-1]
474
475         if terminate is not None and terminate(l):
476             if len(dict) > 0: yield dict
477             dict = {}
478             last_key = None
479             continue
480
481         m = re.match(r"^([-\w]+):\s*(.*)$", l)
482         if m:
483             dict[m.group(1)] = m.group(2)
484             last_key = m.group(1)
485         elif len(l) > 0 and l[0].isspace() and last_key is not None:
486             dict[last_key] += l
487         else:
488             last_key = None
489
490     if len(dict) > 0: yield dict
491
492 def parse_full(lines):
493     try:
494         return next(parse(lines))
495     except StopIteration:
496         return {}
497
498 def parse_metadata_version(s):
499     """Convert a string with the snapshot version format to a tuple."""
500
501     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
502     if m is None:
503         return ()
504     else:
505         return tuple([int(d) for d in m.group(1).split(".")])
506
507 def read_metadata(object_store, root):
508     """Iterate through all lines in the metadata log, following references."""
509
510     # Stack for keeping track of recursion when following references to
511     # portions of the log.  The last entry in the stack corresponds to the
512     # object currently being parsed.  Each entry is a list of lines which have
513     # been reversed, so that popping successive lines from the end of each list
514     # will return lines of the metadata log in order.
515     stack = []
516
517     def follow_ref(refstr):
518         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
519         lines = object_store.get(refstr).splitlines(True)
520         lines.reverse()
521         stack.append(lines)
522
523     follow_ref(root)
524
525     while len(stack) > 0:
526         top = stack[-1]
527         if len(top) == 0:
528             stack.pop()
529             continue
530         line = top.pop()
531
532         # An indirect reference which we must follow?
533         if len(line) > 0 and line[0] == '@':
534             ref = line[1:]
535             ref.strip()
536             follow_ref(ref)
537         else:
538             yield line
539
540 class MetadataItem:
541     """Metadata for a single file (or directory or...) from a snapshot."""
542
543     # Functions for parsing various datatypes that can appear in a metadata log
544     # item.
545     @staticmethod
546     def decode_int(s):
547         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
548         if s.startswith("0x"):
549             return int(s, 16)
550         elif s.startswith("0"):
551             return int(s, 8)
552         else:
553             return int(s, 10)
554
555     @staticmethod
556     def decode_str(s):
557         """Decode a URI-encoded (%xx escapes) string."""
558         return uri_decode(s)
559
560     @staticmethod
561     def raw_str(s):
562         """An unecoded string."""
563         return s
564
565     @staticmethod
566     def decode_user(s):
567         """Decode a user/group to a tuple of uid/gid followed by name."""
568         items = s.split()
569         uid = MetadataItem.decode_int(items[0])
570         name = None
571         if len(items) > 1:
572             if items[1].startswith("(") and items[1].endswith(")"):
573                 name = MetadataItem.decode_str(items[1][1:-1])
574         return (uid, name)
575
576     @staticmethod
577     def decode_device(s):
578         """Decode a device major/minor number."""
579         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
580         return (major, minor)
581
582     class Items: pass
583
584     def __init__(self, fields, object_store):
585         """Initialize from a dictionary of key/value pairs from metadata log."""
586
587         self.fields = fields
588         self.object_store = object_store
589         self.keys = []
590         self.items = self.Items()
591         for (k, v) in fields.items():
592             if k in self.field_types:
593                 decoder = self.field_types[k]
594                 setattr(self.items, k, decoder(v))
595                 self.keys.append(k)
596
597     def data(self):
598         """Return an iterator for the data blocks that make up a file."""
599
600         # This traverses the list of blocks that make up a file, following
601         # indirect references.  It is implemented in much the same way as
602         # read_metadata, so see that function for details of the technique.
603
604         objects = self.fields['data'].split()
605         objects.reverse()
606         stack = [objects]
607
608         def follow_ref(refstr):
609             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
610             objects = self.object_store.get(refstr).split()
611             objects.reverse()
612             stack.append(objects)
613
614         while len(stack) > 0:
615             top = stack[-1]
616             if len(top) == 0:
617                 stack.pop()
618                 continue
619             ref = top.pop()
620
621             # An indirect reference which we must follow?
622             if len(ref) > 0 and ref[0] == '@':
623                 follow_ref(ref[1:])
624             else:
625                 yield ref
626
627 # Description of fields that might appear, and how they should be parsed.
628 MetadataItem.field_types = {
629     'name': MetadataItem.decode_str,
630     'type': MetadataItem.raw_str,
631     'mode': MetadataItem.decode_int,
632     'device': MetadataItem.decode_device,
633     'user': MetadataItem.decode_user,
634     'group': MetadataItem.decode_user,
635     'ctime': MetadataItem.decode_int,
636     'mtime': MetadataItem.decode_int,
637     'links': MetadataItem.decode_int,
638     'inode': MetadataItem.raw_str,
639     'checksum': MetadataItem.decode_str,
640     'size': MetadataItem.decode_int,
641     'contents': MetadataItem.decode_str,
642     'target': MetadataItem.decode_str,
643 }
644
645 def iterate_metadata(object_store, root):
646     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
647         yield MetadataItem(d, object_store)
648
649 class LocalDatabase:
650     """Access to the local database of snapshot contents and object checksums.
651
652     The local database is consulted when creating a snapshot to determine what
653     data can be re-used from old snapshots.  Segment cleaning is performed by
654     manipulating the data in the local database; the local database also
655     includes enough data to guide the segment cleaning process.
656     """
657
658     def __init__(self, path, dbname="localdb.sqlite"):
659         self.db_connection = sqlite3.connect(path + "/" + dbname)
660
661     # Low-level database access.  Use these methods when there isn't a
662     # higher-level interface available.  Exception: do, however, remember to
663     # use the commit() method after making changes to make sure they are
664     # actually saved, even when going through higher-level interfaces.
665     def commit(self):
666         "Commit any pending changes to the local database."
667         self.db_connection.commit()
668
669     def rollback(self):
670         "Roll back any pending changes to the local database."
671         self.db_connection.rollback()
672
673     def cursor(self):
674         "Return a DB-API cursor for directly accessing the local database."
675         return self.db_connection.cursor()
676
677     def list_schemes(self):
678         """Return the list of snapshots found in the local database.
679
680         The returned value is a list of tuples (id, scheme, name, time, intent).
681         """
682
683         cur = self.cursor()
684         cur.execute("select distinct scheme from snapshots")
685         schemes = [row[0] for row in cur.fetchall()]
686         schemes.sort()
687         return schemes
688
689     def list_snapshots(self, scheme):
690         """Return a list of snapshots for the given scheme."""
691         cur = self.cursor()
692         cur.execute("select name from snapshots")
693         snapshots = [row[0] for row in cur.fetchall()]
694         snapshots.sort()
695         return snapshots
696
697     def delete_snapshot(self, scheme, name):
698         """Remove the specified snapshot from the database.
699
700         Warning: This does not garbage collect all dependent data in the
701         database, so it must be followed by a call to garbage_collect() to make
702         the database consistent.
703         """
704         cur = self.cursor()
705         cur.execute("delete from snapshots where scheme = ? and name = ?",
706                     (scheme, name))
707
708     def prune_old_snapshots(self, scheme, intent=1.0):
709         """Delete entries from old snapshots from the database.
710
711         Only snapshots with the specified scheme name will be deleted.  If
712         intent is given, it gives the intended next snapshot type, to determine
713         how aggressively to clean (for example, intent=7 could be used if the
714         next snapshot will be a weekly snapshot).
715         """
716
717         cur = self.cursor()
718
719         # Find the id of the last snapshot to be created.  This is used for
720         # measuring time in a way: we record this value in each segment we
721         # expire on this run, and then on a future run can tell if there have
722         # been intervening backups made.
723         cur.execute("select max(snapshotid) from snapshots")
724         last_snapshotid = cur.fetchone()[0]
725
726         # Get the list of old snapshots for this scheme.  Delete all the old
727         # ones.  Rules for what to keep:
728         #   - Always keep the most recent snapshot.
729         #   - If snapshot X is younger than Y, and X has higher intent, then Y
730         #     can be deleted.
731         cur.execute("""select snapshotid, name, intent,
732                               julianday('now') - timestamp as age
733                        from snapshots where scheme = ?
734                        order by age""", (scheme,))
735
736         first = True
737         max_intent = intent
738         for (id, name, snap_intent, snap_age) in cur.fetchall():
739             can_delete = False
740             if snap_intent < max_intent:
741                 # Delete small-intent snapshots if there is a more recent
742                 # large-intent snapshot.
743                 can_delete = True
744             elif snap_intent == intent:
745                 # Delete previous snapshots with the specified intent level.
746                 can_delete = True
747
748             if can_delete and not first:
749                 print("Delete snapshot %d (%s)" % (id, name))
750                 cur.execute("delete from snapshots where snapshotid = ?",
751                             (id,))
752             first = False
753             max_intent = max(max_intent, snap_intent)
754
755         self.garbage_collect()
756
757     def garbage_collect(self):
758         """Garbage-collect unreachable segment and object data.
759
760         Remove all segments and checksums which is not reachable from the
761         current set of snapshots stored in the local database.
762         """
763         cur = self.cursor()
764
765         # Delete entries in the segment_utilization table which are for
766         # non-existent snapshots.
767         cur.execute("""delete from segment_utilization
768                        where snapshotid not in
769                            (select snapshotid from snapshots)""")
770
771         # Delete segments not referenced by any current snapshots.
772         cur.execute("""delete from segments where segmentid not in
773                            (select segmentid from segment_utilization)""")
774
775         # Delete dangling objects in the block_index table.
776         cur.execute("""delete from block_index
777                        where segmentid not in
778                            (select segmentid from segments)""")
779
780         # Remove sub-block signatures for deleted objects.
781         cur.execute("""delete from subblock_signatures
782                        where blockid not in
783                            (select blockid from block_index)""")
784
785     # Segment cleaning.
786     class SegmentInfo(Struct): pass
787
788     def get_segment_cleaning_list(self, age_boost=0.0):
789         """Return a list of all current segments with information for cleaning.
790
791         Return all segments which are currently known in the local database
792         (there might be other, older segments in the archive itself), and
793         return usage statistics for each to help decide which segments to
794         clean.
795
796         The returned list will be sorted by estimated cleaning benefit, with
797         segments that are best to clean at the start of the list.
798
799         If specified, the age_boost parameter (measured in days) will added to
800         the age of each segment, as a way of adjusting the benefit computation
801         before a long-lived snapshot is taken (for example, age_boost might be
802         set to 7 when cleaning prior to taking a weekly snapshot).
803         """
804
805         cur = self.cursor()
806         segments = []
807         cur.execute("""select segmentid, used, size, mtime,
808                        julianday('now') - mtime as age from segment_info
809                        where expire_time is null""")
810         for row in cur:
811             info = self.SegmentInfo()
812             info.id = row[0]
813             info.used_bytes = row[1]
814             info.size_bytes = row[2]
815             info.mtime = row[3]
816             info.age_days = row[4]
817
818             # If data is not available for whatever reason, treat it as 0.0.
819             if info.age_days is None:
820                 info.age_days = 0.0
821             if info.used_bytes is None:
822                 info.used_bytes = 0.0
823
824             # Benefit calculation: u is the estimated fraction of each segment
825             # which is utilized (bytes belonging to objects still in use
826             # divided by total size; this doesn't take compression or storage
827             # overhead into account, but should give a reasonable estimate).
828             #
829             # The total benefit is a heuristic that combines several factors:
830             # the amount of space that can be reclaimed (1 - u), an ageing
831             # factor (info.age_days) that favors cleaning old segments to young
832             # ones and also is more likely to clean segments that will be
833             # rewritten for long-lived snapshots (age_boost), and finally a
834             # penalty factor for the cost of re-uploading data (u + 0.1).
835             u = info.used_bytes / info.size_bytes
836             info.cleaning_benefit \
837                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
838
839             segments.append(info)
840
841         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
842         return segments
843
844     def mark_segment_expired(self, segment):
845         """Mark a segment for cleaning in the local database.
846
847         The segment parameter should be either a SegmentInfo object or an
848         integer segment id.  Objects in the given segment will be marked as
849         expired, which means that any future snapshots that would re-use those
850         objects will instead write out a new copy of the object, and thus no
851         future snapshots will depend upon the given segment.
852         """
853
854         if isinstance(segment, int):
855             id = segment
856         elif isinstance(segment, self.SegmentInfo):
857             id = segment.id
858         else:
859             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
860
861         cur = self.cursor()
862         cur.execute("select max(snapshotid) from snapshots")
863         last_snapshotid = cur.fetchone()[0]
864         cur.execute("update segments set expire_time = ? where segmentid = ?",
865                     (last_snapshotid, id))
866         cur.execute("update block_index set expired = 0 where segmentid = ?",
867                     (id,))
868
869     def balance_expired_objects(self):
870         """Analyze expired objects in segments to be cleaned and group by age.
871
872         Update the block_index table of the local database to group expired
873         objects by age.  The exact number of buckets and the cutoffs for each
874         are dynamically determined.  Calling this function after marking
875         segments expired will help in the segment cleaning process, by ensuring
876         that when active objects from clean segments are rewritten, they will
877         be placed into new segments roughly grouped by age.
878         """
879
880         # The expired column of the block_index table is used when generating a
881         # new Cumulus snapshot.  A null value indicates that an object may be
882         # re-used.  Otherwise, an object must be written into a new segment if
883         # needed.  Objects with distinct expired values will be written into
884         # distinct segments, to allow for some grouping by age.  The value 0 is
885         # somewhat special in that it indicates any rewritten objects can be
886         # placed in the same segment as completely new objects; this can be
887         # used for very young objects which have been expired, or objects not
888         # expected to be encountered.
889         #
890         # In the balancing process, all objects which are not used in any
891         # current snapshots will have expired set to 0.  Objects which have
892         # been seen will be sorted by age and will have expired values set to
893         # 0, 1, 2, and so on based on age (with younger objects being assigned
894         # lower values).  The number of buckets and the age cutoffs is
895         # determined by looking at the distribution of block ages.
896
897         cur = self.cursor()
898
899         # Mark all expired objects with expired = 0; these objects will later
900         # have values set to indicate groupings of objects when repacking.
901         cur.execute("""update block_index set expired = 0
902                        where expired is not null""")
903
904         # We will want to aim for at least one full segment for each bucket
905         # that we eventually create, but don't know how many bytes that should
906         # be due to compression.  So compute the average number of bytes in
907         # each expired segment as a rough estimate for the minimum size of each
908         # bucket.  (This estimate could be thrown off by many not-fully-packed
909         # segments, but for now don't worry too much about that.)  If we can't
910         # compute an average, it's probably because there are no expired
911         # segments, so we have no more work to do.
912         cur.execute("""select avg(size) from segments
913                        where segmentid in
914                            (select distinct segmentid from block_index
915                             where expired is not null)""")
916         segment_size_estimate = cur.fetchone()[0]
917         if not segment_size_estimate:
918             return
919
920         # Next, extract distribution of expired objects (number and size) by
921         # age.  Save the timestamp for "now" so that the classification of
922         # blocks into age buckets will not change later in the function, after
923         # time has passed.  Set any timestamps in the future to now, so we are
924         # guaranteed that for the rest of this function, age is always
925         # non-negative.
926         cur.execute("select julianday('now')")
927         now = cur.fetchone()[0]
928
929         cur.execute("""update block_index set timestamp = ?
930                        where timestamp > ? and expired is not null""",
931                     (now, now))
932
933         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
934                        from block_index where expired = 0
935                        group by age order by age""", (now,))
936         distribution = cur.fetchall()
937
938         # Start to determine the buckets for expired objects.  Heuristics used:
939         #   - An upper bound on the number of buckets is given by the number of
940         #     segments we estimate it will take to store all data.  In fact,
941         #     aim for a couple of segments per bucket.
942         #   - Place very young objects in bucket 0 (place with new objects)
943         #     unless there are enough of them to warrant a separate bucket.
944         #   - Try not to create unnecessarily many buckets, since fewer buckets
945         #     will allow repacked data to be grouped based on spatial locality
946         #     (while more buckets will group by temporal locality).  We want a
947         #     balance.
948         MIN_AGE = 4
949         total_bytes = sum([i[2] for i in distribution])
950         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
951         min_size = 1.5 * segment_size_estimate
952         target_size = max(2 * segment_size_estimate,
953                           total_bytes / target_buckets)
954
955         print("segment_size:", segment_size_estimate)
956         print("distribution:", distribution)
957         print("total_bytes:", total_bytes)
958         print("target_buckets:", target_buckets)
959         print("min, target size:", min_size, target_size)
960
961         # Chosen cutoffs.  Each bucket consists of objects with age greater
962         # than one cutoff value, but not greater than the next largest cutoff.
963         cutoffs = []
964
965         # Starting with the oldest objects, begin grouping together into
966         # buckets of size at least target_size bytes.
967         distribution.reverse()
968         bucket_size = 0
969         min_age_bucket = False
970         for (age, items, size) in distribution:
971             if bucket_size >= target_size \
972                 or (age < MIN_AGE and not min_age_bucket):
973                 if bucket_size < target_size and len(cutoffs) > 0:
974                     cutoffs.pop()
975                 cutoffs.append(age)
976                 bucket_size = 0
977
978             bucket_size += size
979             if age < MIN_AGE:
980                 min_age_bucket = True
981
982         # The last (youngest) bucket will be group 0, unless it has enough data
983         # to be of size min_size by itself, or there happen to be no objects
984         # less than MIN_AGE at all.
985         if bucket_size >= min_size or not min_age_bucket:
986             cutoffs.append(-1)
987         cutoffs.append(-1)
988
989         print("cutoffs:", cutoffs)
990
991         # Update the database to assign each object to the appropriate bucket.
992         cutoffs.reverse()
993         for i in range(len(cutoffs)):
994             cur.execute("""update block_index set expired = ?
995                            where round(? - timestamp) > ?
996                              and expired is not null""",
997                         (i, now, cutoffs[i]))