Start a cleanup on the storage backends.
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division, print_function, unicode_literals
30
31 import hashlib
32 import itertools
33 import os
34 import re
35 import sqlite3
36 import sys
37 import tarfile
38 import tempfile
39 try:
40     import _thread
41 except ImportError:
42     import thread as _thread
43
44 import cumulus.store
45 import cumulus.store.file
46
47 if sys.version < '3':
48     StringTypes = (str, unicode)
49 else:
50     StringTypes = (str,)
51
52 # The largest supported snapshot format that can be understood.
53 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
54
55 # Maximum number of nested indirect references allowed in a snapshot.
56 MAX_RECURSION_DEPTH = 3
57
58 # All segments which have been accessed this session.
59 accessed_segments = set()
60
61 # Table of methods used to filter segments before storage, and corresponding
62 # filename extensions.  These are listed in priority order (methods earlier in
63 # the list are tried first).
64 SEGMENT_FILTERS = [
65     (".gpg", "cumulus-filter-gpg --decrypt"),
66     (".gz", "gzip -dc"),
67     (".bz2", "bzip2 -dc"),
68     ("", None),
69 ]
70
71 def uri_decode(s):
72     """Decode a URI-encoded (%xx escapes) string."""
73     def hex_decode(m): return chr(int(m.group(1), 16))
74     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
75 def uri_encode(s):
76     """Encode a string to URI-encoded (%xx escapes) form."""
77     def hex_encode(c):
78         if c > '+' and c < '\x7f' and c != '@':
79             return c
80         else:
81             return "%%%02x" % (ord(c),)
82     return ''.join(hex_encode(c) for c in s)
83
84 class Struct:
85     """A class which merely acts as a data container.
86
87     Instances of this class (or its subclasses) are merely used to store data
88     in various attributes.  No methods are provided.
89     """
90
91     def __repr__(self):
92         return "<%s %s>" % (self.__class__, self.__dict__)
93
94 CHECKSUM_ALGORITHMS = {
95     'sha1': hashlib.sha1,
96     'sha224': hashlib.sha224,
97     'sha256': hashlib.sha256,
98 }
99
100 class ChecksumCreator:
101     """Compute a Cumulus checksum for provided data.
102
103     The algorithm used is selectable, but currently defaults to sha1.
104     """
105
106     def __init__(self, algorithm='sha1'):
107         self.algorithm = algorithm
108         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
109
110     def update(self, data):
111         self.hash.update(data)
112         return self
113
114     def compute(self):
115         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
116
117 class ChecksumVerifier:
118     """Verify whether a checksum from a snapshot matches the supplied data."""
119
120     def __init__(self, checksumstr):
121         """Create an object to check the supplied checksum."""
122
123         (algo, checksum) = checksumstr.split("=", 1)
124         self.checksum = checksum
125         self.hash = CHECKSUM_ALGORITHMS[algo]()
126
127     def update(self, data):
128         self.hash.update(data)
129
130     def valid(self):
131         """Return a boolean indicating whether the checksum matches."""
132
133         result = self.hash.hexdigest()
134         return result == self.checksum
135
136 class SearchPathEntry(object):
137     """Item representing a possible search location for Cumulus files.
138
139     Some Cumulus files might be stored in multiple possible file locations: due
140     to format (different compression mechanisms with different extensions),
141     locality (different segments might be placed in different directories to
142     control archiving policies), for backwards compatibility (default location
143     changed over time).  A SearchPathEntry describes a possible location for a
144     file.
145     """
146     def __init__(self, directory_prefix, suffix, context=None):
147         self._directory_prefix = directory_prefix
148         self._suffix = suffix
149         self._context = context
150
151     def __repr__(self):
152         return "%s(%r, %r, %r)" % (self.__class__.__name__,
153                                    self._directory_prefix, self._suffix,
154                                    self._context)
155
156     def build_path(self, basename):
157         """Construct the search path to use for a file with name basename.
158
159         Returns a tuple (pathname, context), where pathname is the path to try
160         and context is any additional data associated with this search entry
161         (if any).
162         """
163         return (os.path.join(self._directory_prefix, basename + self._suffix),
164                 self._context)
165
166 class SearchPath(object):
167     """A collection of locations to search for files and lookup utilities.
168
169     For looking for a file in a Cumulus storage backend, a SearchPath object
170     contains a list of possible locations to try.  A SearchPath can be used to
171     perform the search as well; when a file is found the search path ordering
172     is updated (moving the successful SearchPathEntry to the front of the list
173     for future searches).
174     """
175     def __init__(self, name_regex, searchpath):
176         self._regex = re.compile(name_regex)
177         self._path = list(searchpath)
178
179     def add_search_entry(self, entry):
180         self._path.append(entry)
181
182     def directories(self):
183         """Return the set of directories to search for a file type."""
184         return set(entry._directory_prefix for entry in self._path)
185
186     def get(self, backend, basename):
187         for (i, entry) in enumerate(self._path):
188             try:
189                 (pathname, context) = entry.build_path(basename)
190                 fp = backend.get(pathname)
191                 # On success, move this entry to the front of the search path
192                 # to speed future searches.
193                 if i > 0:
194                     self._path.pop(i)
195                     self._path.insert(0, entry)
196                 return (fp, pathname, context)
197             except cumulus.store.NotFoundError:
198                 continue
199         raise cumulus.store.NotFoundError(basename)
200
201     def stat(self, backend, basename):
202         for (i, entry) in enumerate(self._path):
203             try:
204                 (pathname, context) = entry.build_path(basename)
205                 stat_data = backend.stat(pathname)
206                 # On success, move this entry to the front of the search path
207                 # to speed future searches.
208                 if i > 0:
209                     self._path.pop(i)
210                     self._path.insert(0, entry)
211                 result = {"path": pathname}
212                 result.update(stat_data)
213                 return result
214             except cumulus.store.NotFoundError:
215                 continue
216         raise cumulus.store.NotFoundError(basename)
217
218     def match(self, filename):
219         return self._regex.match(filename)
220
221     def list(self, backend):
222         success = False
223         for d in self.directories():
224             try:
225                 for f in backend.list(d):
226                     success = True
227                     m = self.match(f)
228                     if m: yield (os.path.join(d, f), m)
229             except cumulus.store.NotFoundError:
230                 pass
231         if not success:
232             raise cumulus.store.NotFoundError(backend)
233
234 def _build_segments_searchpath(prefix):
235     for (extension, filter) in SEGMENT_FILTERS:
236         yield SearchPathEntry(prefix, extension, filter)
237
238 SEARCH_PATHS = {
239     "checksums": SearchPath(
240         r"^snapshot-(.*)\.(\w+)sums$",
241         [SearchPathEntry("meta", ".sha1sums"),
242          SearchPathEntry("checksums", ".sha1sums"),
243          SearchPathEntry("", ".sha1sums")]),
244     "meta": SearchPath(
245         r"^snapshot-(.*)\.meta(\.\S+)?$",
246         _build_segments_searchpath("meta")),
247     "segments": SearchPath(
248         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
249          r"\.tar(\.\S+)?$"),
250         itertools.chain(
251             _build_segments_searchpath("segments0"),
252             _build_segments_searchpath("segments1"),
253             _build_segments_searchpath(""),
254             _build_segments_searchpath("segments"))),
255     "snapshots": SearchPath(
256         r"^snapshot-(.*)\.(cumulus|lbs)$",
257         [SearchPathEntry("snapshots", ".cumulus"),
258          SearchPathEntry("snapshots", ".lbs"),
259          SearchPathEntry("", ".cumulus"),
260          SearchPathEntry("", ".lbs")]),
261 }
262
263 class BackendWrapper(object):
264     """Wrapper around a Cumulus storage backend that understands file types.
265
266     The BackendWrapper class understands different Cumulus file types, such as
267     snapshots and segments, and implements higher-level operations such as
268     "retrieve a snapshot with a specific name" (hiding operations such as
269     searching for the correct file name).
270     """
271
272     def __init__(self, backend):
273         """Initializes a wrapper around the specified storage backend.
274
275         store may either be a Store object or URL.
276         """
277         if type(backend) in StringTypes:
278             self._backend = cumulus.store.open(backend)
279         else:
280             self._backend = backend
281
282     @property
283     def raw_backend(self):
284         return self._backend
285
286     def stat_generic(self, basename, filetype):
287         return SEARCH_PATHS[filetype].stat(self._backend, basename)
288
289     def open_generic(self, basename, filetype):
290         return SEARCH_PATHS[filetype].get(self._backend, basename)
291
292     def open_snapshot(self, name):
293         return self.open_generic("snapshot-" + name, "snapshots")
294
295     def open_segment(self, name):
296         return self.open_generic(name + ".tar", "segments")
297
298     def list_generic(self, filetype):
299         return ((x[1].group(1), x[0])
300                 for x in SEARCH_PATHS[filetype].list(self._backend))
301
302     def prefetch_generic(self):
303         """Calls scan on directories to prefetch file metadata."""
304         directories = set()
305         for typeinfo in SEARCH_PATHS.values():
306             directories.update(typeinfo.directories())
307         for d in directories:
308             print("Prefetch", d)
309             self._backend.scan(d)
310
311 class CumulusStore:
312     def __init__(self, backend):
313         if isinstance(backend, BackendWrapper):
314             self.backend = backend
315         else:
316             self.backend = BackendWrapper(backend)
317         self.cachedir = None
318         self.CACHE_SIZE = 16
319         self._lru_list = []
320
321     def get_cachedir(self):
322         if self.cachedir is None:
323             self.cachedir = tempfile.mkdtemp("-cumulus")
324         return self.cachedir
325
326     def cleanup(self):
327         if self.cachedir is not None:
328             # TODO: Avoid use of system, make this safer
329             os.system("rm -rf " + self.cachedir)
330         self.cachedir = None
331
332     @staticmethod
333     def parse_ref(refstr):
334         m = re.match(r"^zero\[(\d+)\]$", refstr)
335         if m:
336             return ("zero", None, None, (0, int(m.group(1)), False))
337
338         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
339         if not m: return
340
341         segment = m.group(1)
342         object = m.group(2)
343         checksum = m.group(3)
344         slice = m.group(4)
345
346         if checksum is not None:
347             checksum = checksum.lstrip("(").rstrip(")")
348
349         if slice is not None:
350             if m.group(9) is not None:
351                 # Size-assertion slice
352                 slice = (0, int(m.group(9)), True)
353             elif m.group(6) is None:
354                 # Abbreviated slice
355                 slice = (0, int(m.group(8)), False)
356             else:
357                 slice = (int(m.group(7)), int(m.group(8)), False)
358
359         return (segment, object, checksum, slice)
360
361     def list_snapshots(self):
362         return set(x[0] for x in self.backend.list_generic("snapshots"))
363
364     def list_segments(self):
365         return set(x[0] for x in self.backend.list_generic("segments"))
366
367     def load_snapshot(self, snapshot):
368         snapshot_file = self.backend.open_snapshot(snapshot)[0]
369         return snapshot_file.read().splitlines(True)
370
371     @staticmethod
372     def filter_data(filehandle, filter_cmd):
373         if filter_cmd is None:
374             return filehandle
375         (input, output) = os.popen2(filter_cmd)
376         def copy_thread(src, dst):
377             BLOCK_SIZE = 4096
378             while True:
379                 block = src.read(BLOCK_SIZE)
380                 if len(block) == 0: break
381                 dst.write(block)
382             src.close()
383             dst.close()
384         _thread.start_new_thread(copy_thread, (filehandle, input))
385         return output
386
387     def get_segment(self, segment):
388         accessed_segments.add(segment)
389
390         (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
391         return self.filter_data(segment_fp, filter_cmd)
392
393     def load_segment(self, segment):
394         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
395         for item in seg:
396             data_obj = seg.extractfile(item)
397             path = item.name.split('/')
398             if len(path) == 2 and path[0] == segment:
399                 yield (path[1], data_obj.read())
400
401     def extract_segment(self, segment):
402         segdir = os.path.join(self.get_cachedir(), segment)
403         os.mkdir(segdir)
404         for (object, data) in self.load_segment(segment):
405             f = open(os.path.join(segdir, object), 'wb')
406             f.write(data)
407             f.close()
408
409     def load_object(self, segment, object):
410         accessed_segments.add(segment)
411         path = os.path.join(self.get_cachedir(), segment, object)
412         if not os.access(path, os.R_OK):
413             self.extract_segment(segment)
414         if segment in self._lru_list: self._lru_list.remove(segment)
415         self._lru_list.append(segment)
416         while len(self._lru_list) > self.CACHE_SIZE:
417             os.system("rm -rf " + os.path.join(self.cachedir,
418                                                self._lru_list[0]))
419             self._lru_list = self._lru_list[1:]
420         return open(path, 'rb').read()
421
422     def get(self, refstr):
423         """Fetch the given object and return it.
424
425         The input should be an object reference, in string form.
426         """
427
428         (segment, object, checksum, slice) = self.parse_ref(refstr)
429
430         if segment == "zero":
431             return "\0" * slice[1]
432
433         data = self.load_object(segment, object)
434
435         if checksum is not None:
436             verifier = ChecksumVerifier(checksum)
437             verifier.update(data)
438             if not verifier.valid():
439                 raise ValueError
440
441         if slice is not None:
442             (start, length, exact) = slice
443             if exact and len(data) != length: raise ValueError
444             data = data[start:start+length]
445             if len(data) != length: raise IndexError
446
447         return data
448
449     def prefetch(self):
450         self.backend.prefetch_generic()
451
452 def parse(lines, terminate=None):
453     """Generic parser for RFC822-style "Key: Value" data streams.
454
455     This parser can be used to read metadata logs and snapshot root descriptor
456     files.
457
458     lines must be an iterable object which yields a sequence of lines of input.
459
460     If terminate is specified, it is used as a predicate to determine when to
461     stop reading input lines.
462     """
463
464     dict = {}
465     last_key = None
466
467     for l in lines:
468         # Strip off a trailing newline, if present
469         if len(l) > 0 and l[-1] == "\n":
470             l = l[:-1]
471
472         if terminate is not None and terminate(l):
473             if len(dict) > 0: yield dict
474             dict = {}
475             last_key = None
476             continue
477
478         m = re.match(r"^([-\w]+):\s*(.*)$", l)
479         if m:
480             dict[m.group(1)] = m.group(2)
481             last_key = m.group(1)
482         elif len(l) > 0 and l[0].isspace() and last_key is not None:
483             dict[last_key] += l
484         else:
485             last_key = None
486
487     if len(dict) > 0: yield dict
488
489 def parse_full(lines):
490     try:
491         return next(parse(lines))
492     except StopIteration:
493         return {}
494
495 def parse_metadata_version(s):
496     """Convert a string with the snapshot version format to a tuple."""
497
498     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
499     if m is None:
500         return ()
501     else:
502         return tuple([int(d) for d in m.group(1).split(".")])
503
504 def read_metadata(object_store, root):
505     """Iterate through all lines in the metadata log, following references."""
506
507     # Stack for keeping track of recursion when following references to
508     # portions of the log.  The last entry in the stack corresponds to the
509     # object currently being parsed.  Each entry is a list of lines which have
510     # been reversed, so that popping successive lines from the end of each list
511     # will return lines of the metadata log in order.
512     stack = []
513
514     def follow_ref(refstr):
515         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
516         lines = object_store.get(refstr).splitlines(True)
517         lines.reverse()
518         stack.append(lines)
519
520     follow_ref(root)
521
522     while len(stack) > 0:
523         top = stack[-1]
524         if len(top) == 0:
525             stack.pop()
526             continue
527         line = top.pop()
528
529         # An indirect reference which we must follow?
530         if len(line) > 0 and line[0] == '@':
531             ref = line[1:]
532             ref.strip()
533             follow_ref(ref)
534         else:
535             yield line
536
537 class MetadataItem:
538     """Metadata for a single file (or directory or...) from a snapshot."""
539
540     # Functions for parsing various datatypes that can appear in a metadata log
541     # item.
542     @staticmethod
543     def decode_int(s):
544         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
545         if s.startswith("0x"):
546             return int(s, 16)
547         elif s.startswith("0"):
548             return int(s, 8)
549         else:
550             return int(s, 10)
551
552     @staticmethod
553     def decode_str(s):
554         """Decode a URI-encoded (%xx escapes) string."""
555         return uri_decode(s)
556
557     @staticmethod
558     def raw_str(s):
559         """An unecoded string."""
560         return s
561
562     @staticmethod
563     def decode_user(s):
564         """Decode a user/group to a tuple of uid/gid followed by name."""
565         items = s.split()
566         uid = MetadataItem.decode_int(items[0])
567         name = None
568         if len(items) > 1:
569             if items[1].startswith("(") and items[1].endswith(")"):
570                 name = MetadataItem.decode_str(items[1][1:-1])
571         return (uid, name)
572
573     @staticmethod
574     def decode_device(s):
575         """Decode a device major/minor number."""
576         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
577         return (major, minor)
578
579     class Items: pass
580
581     def __init__(self, fields, object_store):
582         """Initialize from a dictionary of key/value pairs from metadata log."""
583
584         self.fields = fields
585         self.object_store = object_store
586         self.keys = []
587         self.items = self.Items()
588         for (k, v) in fields.items():
589             if k in self.field_types:
590                 decoder = self.field_types[k]
591                 setattr(self.items, k, decoder(v))
592                 self.keys.append(k)
593
594     def data(self):
595         """Return an iterator for the data blocks that make up a file."""
596
597         # This traverses the list of blocks that make up a file, following
598         # indirect references.  It is implemented in much the same way as
599         # read_metadata, so see that function for details of the technique.
600
601         objects = self.fields['data'].split()
602         objects.reverse()
603         stack = [objects]
604
605         def follow_ref(refstr):
606             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
607             objects = self.object_store.get(refstr).split()
608             objects.reverse()
609             stack.append(objects)
610
611         while len(stack) > 0:
612             top = stack[-1]
613             if len(top) == 0:
614                 stack.pop()
615                 continue
616             ref = top.pop()
617
618             # An indirect reference which we must follow?
619             if len(ref) > 0 and ref[0] == '@':
620                 follow_ref(ref[1:])
621             else:
622                 yield ref
623
624 # Description of fields that might appear, and how they should be parsed.
625 MetadataItem.field_types = {
626     'name': MetadataItem.decode_str,
627     'type': MetadataItem.raw_str,
628     'mode': MetadataItem.decode_int,
629     'device': MetadataItem.decode_device,
630     'user': MetadataItem.decode_user,
631     'group': MetadataItem.decode_user,
632     'ctime': MetadataItem.decode_int,
633     'mtime': MetadataItem.decode_int,
634     'links': MetadataItem.decode_int,
635     'inode': MetadataItem.raw_str,
636     'checksum': MetadataItem.decode_str,
637     'size': MetadataItem.decode_int,
638     'contents': MetadataItem.decode_str,
639     'target': MetadataItem.decode_str,
640 }
641
642 def iterate_metadata(object_store, root):
643     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
644         yield MetadataItem(d, object_store)
645
646 class LocalDatabase:
647     """Access to the local database of snapshot contents and object checksums.
648
649     The local database is consulted when creating a snapshot to determine what
650     data can be re-used from old snapshots.  Segment cleaning is performed by
651     manipulating the data in the local database; the local database also
652     includes enough data to guide the segment cleaning process.
653     """
654
655     def __init__(self, path, dbname="localdb.sqlite"):
656         self.db_connection = sqlite3.connect(path + "/" + dbname)
657
658     # Low-level database access.  Use these methods when there isn't a
659     # higher-level interface available.  Exception: do, however, remember to
660     # use the commit() method after making changes to make sure they are
661     # actually saved, even when going through higher-level interfaces.
662     def commit(self):
663         "Commit any pending changes to the local database."
664         self.db_connection.commit()
665
666     def rollback(self):
667         "Roll back any pending changes to the local database."
668         self.db_connection.rollback()
669
670     def cursor(self):
671         "Return a DB-API cursor for directly accessing the local database."
672         return self.db_connection.cursor()
673
674     def list_schemes(self):
675         """Return the list of snapshots found in the local database.
676
677         The returned value is a list of tuples (id, scheme, name, time, intent).
678         """
679
680         cur = self.cursor()
681         cur.execute("select distinct scheme from snapshots")
682         schemes = [row[0] for row in cur.fetchall()]
683         schemes.sort()
684         return schemes
685
686     def list_snapshots(self, scheme):
687         """Return a list of snapshots for the given scheme."""
688         cur = self.cursor()
689         cur.execute("select name from snapshots")
690         snapshots = [row[0] for row in cur.fetchall()]
691         snapshots.sort()
692         return snapshots
693
694     def delete_snapshot(self, scheme, name):
695         """Remove the specified snapshot from the database.
696
697         Warning: This does not garbage collect all dependent data in the
698         database, so it must be followed by a call to garbage_collect() to make
699         the database consistent.
700         """
701         cur = self.cursor()
702         cur.execute("delete from snapshots where scheme = ? and name = ?",
703                     (scheme, name))
704
705     def prune_old_snapshots(self, scheme, intent=1.0):
706         """Delete entries from old snapshots from the database.
707
708         Only snapshots with the specified scheme name will be deleted.  If
709         intent is given, it gives the intended next snapshot type, to determine
710         how aggressively to clean (for example, intent=7 could be used if the
711         next snapshot will be a weekly snapshot).
712         """
713
714         cur = self.cursor()
715
716         # Find the id of the last snapshot to be created.  This is used for
717         # measuring time in a way: we record this value in each segment we
718         # expire on this run, and then on a future run can tell if there have
719         # been intervening backups made.
720         cur.execute("select max(snapshotid) from snapshots")
721         last_snapshotid = cur.fetchone()[0]
722
723         # Get the list of old snapshots for this scheme.  Delete all the old
724         # ones.  Rules for what to keep:
725         #   - Always keep the most recent snapshot.
726         #   - If snapshot X is younger than Y, and X has higher intent, then Y
727         #     can be deleted.
728         cur.execute("""select snapshotid, name, intent,
729                               julianday('now') - timestamp as age
730                        from snapshots where scheme = ?
731                        order by age""", (scheme,))
732
733         first = True
734         max_intent = intent
735         for (id, name, snap_intent, snap_age) in cur.fetchall():
736             can_delete = False
737             if snap_intent < max_intent:
738                 # Delete small-intent snapshots if there is a more recent
739                 # large-intent snapshot.
740                 can_delete = True
741             elif snap_intent == intent:
742                 # Delete previous snapshots with the specified intent level.
743                 can_delete = True
744
745             if can_delete and not first:
746                 print("Delete snapshot %d (%s)" % (id, name))
747                 cur.execute("delete from snapshots where snapshotid = ?",
748                             (id,))
749             first = False
750             max_intent = max(max_intent, snap_intent)
751
752         self.garbage_collect()
753
754     def garbage_collect(self):
755         """Garbage-collect unreachable segment and object data.
756
757         Remove all segments and checksums which is not reachable from the
758         current set of snapshots stored in the local database.
759         """
760         cur = self.cursor()
761
762         # Delete entries in the segment_utilization table which are for
763         # non-existent snapshots.
764         cur.execute("""delete from segment_utilization
765                        where snapshotid not in
766                            (select snapshotid from snapshots)""")
767
768         # Delete segments not referenced by any current snapshots.
769         cur.execute("""delete from segments where segmentid not in
770                            (select segmentid from segment_utilization)""")
771
772         # Delete dangling objects in the block_index table.
773         cur.execute("""delete from block_index
774                        where segmentid not in
775                            (select segmentid from segments)""")
776
777         # Remove sub-block signatures for deleted objects.
778         cur.execute("""delete from subblock_signatures
779                        where blockid not in
780                            (select blockid from block_index)""")
781
782     # Segment cleaning.
783     class SegmentInfo(Struct): pass
784
785     def get_segment_cleaning_list(self, age_boost=0.0):
786         """Return a list of all current segments with information for cleaning.
787
788         Return all segments which are currently known in the local database
789         (there might be other, older segments in the archive itself), and
790         return usage statistics for each to help decide which segments to
791         clean.
792
793         The returned list will be sorted by estimated cleaning benefit, with
794         segments that are best to clean at the start of the list.
795
796         If specified, the age_boost parameter (measured in days) will added to
797         the age of each segment, as a way of adjusting the benefit computation
798         before a long-lived snapshot is taken (for example, age_boost might be
799         set to 7 when cleaning prior to taking a weekly snapshot).
800         """
801
802         cur = self.cursor()
803         segments = []
804         cur.execute("""select segmentid, used, size, mtime,
805                        julianday('now') - mtime as age from segment_info
806                        where expire_time is null""")
807         for row in cur:
808             info = self.SegmentInfo()
809             info.id = row[0]
810             info.used_bytes = row[1]
811             info.size_bytes = row[2]
812             info.mtime = row[3]
813             info.age_days = row[4]
814
815             # If data is not available for whatever reason, treat it as 0.0.
816             if info.age_days is None:
817                 info.age_days = 0.0
818             if info.used_bytes is None:
819                 info.used_bytes = 0.0
820
821             # Benefit calculation: u is the estimated fraction of each segment
822             # which is utilized (bytes belonging to objects still in use
823             # divided by total size; this doesn't take compression or storage
824             # overhead into account, but should give a reasonable estimate).
825             #
826             # The total benefit is a heuristic that combines several factors:
827             # the amount of space that can be reclaimed (1 - u), an ageing
828             # factor (info.age_days) that favors cleaning old segments to young
829             # ones and also is more likely to clean segments that will be
830             # rewritten for long-lived snapshots (age_boost), and finally a
831             # penalty factor for the cost of re-uploading data (u + 0.1).
832             u = info.used_bytes / info.size_bytes
833             info.cleaning_benefit \
834                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
835
836             segments.append(info)
837
838         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
839         return segments
840
841     def mark_segment_expired(self, segment):
842         """Mark a segment for cleaning in the local database.
843
844         The segment parameter should be either a SegmentInfo object or an
845         integer segment id.  Objects in the given segment will be marked as
846         expired, which means that any future snapshots that would re-use those
847         objects will instead write out a new copy of the object, and thus no
848         future snapshots will depend upon the given segment.
849         """
850
851         if isinstance(segment, int):
852             id = segment
853         elif isinstance(segment, self.SegmentInfo):
854             id = segment.id
855         else:
856             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
857
858         cur = self.cursor()
859         cur.execute("select max(snapshotid) from snapshots")
860         last_snapshotid = cur.fetchone()[0]
861         cur.execute("update segments set expire_time = ? where segmentid = ?",
862                     (last_snapshotid, id))
863         cur.execute("update block_index set expired = 0 where segmentid = ?",
864                     (id,))
865
866     def balance_expired_objects(self):
867         """Analyze expired objects in segments to be cleaned and group by age.
868
869         Update the block_index table of the local database to group expired
870         objects by age.  The exact number of buckets and the cutoffs for each
871         are dynamically determined.  Calling this function after marking
872         segments expired will help in the segment cleaning process, by ensuring
873         that when active objects from clean segments are rewritten, they will
874         be placed into new segments roughly grouped by age.
875         """
876
877         # The expired column of the block_index table is used when generating a
878         # new Cumulus snapshot.  A null value indicates that an object may be
879         # re-used.  Otherwise, an object must be written into a new segment if
880         # needed.  Objects with distinct expired values will be written into
881         # distinct segments, to allow for some grouping by age.  The value 0 is
882         # somewhat special in that it indicates any rewritten objects can be
883         # placed in the same segment as completely new objects; this can be
884         # used for very young objects which have been expired, or objects not
885         # expected to be encountered.
886         #
887         # In the balancing process, all objects which are not used in any
888         # current snapshots will have expired set to 0.  Objects which have
889         # been seen will be sorted by age and will have expired values set to
890         # 0, 1, 2, and so on based on age (with younger objects being assigned
891         # lower values).  The number of buckets and the age cutoffs is
892         # determined by looking at the distribution of block ages.
893
894         cur = self.cursor()
895
896         # Mark all expired objects with expired = 0; these objects will later
897         # have values set to indicate groupings of objects when repacking.
898         cur.execute("""update block_index set expired = 0
899                        where expired is not null""")
900
901         # We will want to aim for at least one full segment for each bucket
902         # that we eventually create, but don't know how many bytes that should
903         # be due to compression.  So compute the average number of bytes in
904         # each expired segment as a rough estimate for the minimum size of each
905         # bucket.  (This estimate could be thrown off by many not-fully-packed
906         # segments, but for now don't worry too much about that.)  If we can't
907         # compute an average, it's probably because there are no expired
908         # segments, so we have no more work to do.
909         cur.execute("""select avg(size) from segments
910                        where segmentid in
911                            (select distinct segmentid from block_index
912                             where expired is not null)""")
913         segment_size_estimate = cur.fetchone()[0]
914         if not segment_size_estimate:
915             return
916
917         # Next, extract distribution of expired objects (number and size) by
918         # age.  Save the timestamp for "now" so that the classification of
919         # blocks into age buckets will not change later in the function, after
920         # time has passed.  Set any timestamps in the future to now, so we are
921         # guaranteed that for the rest of this function, age is always
922         # non-negative.
923         cur.execute("select julianday('now')")
924         now = cur.fetchone()[0]
925
926         cur.execute("""update block_index set timestamp = ?
927                        where timestamp > ? and expired is not null""",
928                     (now, now))
929
930         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
931                        from block_index where expired = 0
932                        group by age order by age""", (now,))
933         distribution = cur.fetchall()
934
935         # Start to determine the buckets for expired objects.  Heuristics used:
936         #   - An upper bound on the number of buckets is given by the number of
937         #     segments we estimate it will take to store all data.  In fact,
938         #     aim for a couple of segments per bucket.
939         #   - Place very young objects in bucket 0 (place with new objects)
940         #     unless there are enough of them to warrant a separate bucket.
941         #   - Try not to create unnecessarily many buckets, since fewer buckets
942         #     will allow repacked data to be grouped based on spatial locality
943         #     (while more buckets will group by temporal locality).  We want a
944         #     balance.
945         MIN_AGE = 4
946         total_bytes = sum([i[2] for i in distribution])
947         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
948         min_size = 1.5 * segment_size_estimate
949         target_size = max(2 * segment_size_estimate,
950                           total_bytes / target_buckets)
951
952         print("segment_size:", segment_size_estimate)
953         print("distribution:", distribution)
954         print("total_bytes:", total_bytes)
955         print("target_buckets:", target_buckets)
956         print("min, target size:", min_size, target_size)
957
958         # Chosen cutoffs.  Each bucket consists of objects with age greater
959         # than one cutoff value, but not greater than the next largest cutoff.
960         cutoffs = []
961
962         # Starting with the oldest objects, begin grouping together into
963         # buckets of size at least target_size bytes.
964         distribution.reverse()
965         bucket_size = 0
966         min_age_bucket = False
967         for (age, items, size) in distribution:
968             if bucket_size >= target_size \
969                 or (age < MIN_AGE and not min_age_bucket):
970                 if bucket_size < target_size and len(cutoffs) > 0:
971                     cutoffs.pop()
972                 cutoffs.append(age)
973                 bucket_size = 0
974
975             bucket_size += size
976             if age < MIN_AGE:
977                 min_age_bucket = True
978
979         # The last (youngest) bucket will be group 0, unless it has enough data
980         # to be of size min_size by itself, or there happen to be no objects
981         # less than MIN_AGE at all.
982         if bucket_size >= min_size or not min_age_bucket:
983             cutoffs.append(-1)
984         cutoffs.append(-1)
985
986         print("cutoffs:", cutoffs)
987
988         # Update the database to assign each object to the appropriate bucket.
989         cutoffs.reverse()
990         for i in range(len(cutoffs)):
991             cur.execute("""update block_index set expired = ?
992                            where round(? - timestamp) > ?
993                              and expired is not null""",
994                         (i, now, cutoffs[i]))