8dc4c9875e0f77ef95232f866f37a28e7ae9949e
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29
30 import hashlib
31 import itertools
32 import os
33 import re
34 import sqlite3
35 import tarfile
36 import tempfile
37 import _thread
38
39 import cumulus.store
40 import cumulus.store.file
41
42 # The largest supported snapshot format that can be understood.
43 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
44
45 # Maximum number of nested indirect references allowed in a snapshot.
46 MAX_RECURSION_DEPTH = 3
47
48 # All segments which have been accessed this session.
49 accessed_segments = set()
50
51 # Table of methods used to filter segments before storage, and corresponding
52 # filename extensions.  These are listed in priority order (methods earlier in
53 # the list are tried first).
54 SEGMENT_FILTERS = [
55     (".gpg", "cumulus-filter-gpg --decrypt"),
56     (".gz", "gzip -dc"),
57     (".bz2", "bzip2 -dc"),
58     ("", None),
59 ]
60
61 def uri_decode(s):
62     """Decode a URI-encoded (%xx escapes) string."""
63     def hex_decode(m): return chr(int(m.group(1), 16))
64     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
65 def uri_encode(s):
66     """Encode a string to URI-encoded (%xx escapes) form."""
67     def hex_encode(c):
68         if c > '+' and c < '\x7f' and c != '@':
69             return c
70         else:
71             return "%%%02x" % (ord(c),)
72     return ''.join(hex_encode(c) for c in s)
73
74 class Struct:
75     """A class which merely acts as a data container.
76
77     Instances of this class (or its subclasses) are merely used to store data
78     in various attributes.  No methods are provided.
79     """
80
81     def __repr__(self):
82         return "<%s %s>" % (self.__class__, self.__dict__)
83
84 CHECKSUM_ALGORITHMS = {
85     'sha1': hashlib.sha1,
86     'sha224': hashlib.sha224,
87     'sha256': hashlib.sha256,
88 }
89
90 class ChecksumCreator:
91     """Compute a Cumulus checksum for provided data.
92
93     The algorithm used is selectable, but currently defaults to sha1.
94     """
95
96     def __init__(self, algorithm='sha1'):
97         self.algorithm = algorithm
98         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
99
100     def update(self, data):
101         self.hash.update(data)
102         return self
103
104     def compute(self):
105         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
106
107 class ChecksumVerifier:
108     """Verify whether a checksum from a snapshot matches the supplied data."""
109
110     def __init__(self, checksumstr):
111         """Create an object to check the supplied checksum."""
112
113         (algo, checksum) = checksumstr.split("=", 1)
114         self.checksum = checksum
115         self.hash = CHECKSUM_ALGORITHMS[algo]()
116
117     def update(self, data):
118         self.hash.update(data)
119
120     def valid(self):
121         """Return a boolean indicating whether the checksum matches."""
122
123         result = self.hash.hexdigest()
124         return result == self.checksum
125
126 class SearchPathEntry(object):
127     """Item representing a possible search location for Cumulus files.
128
129     Some Cumulus files might be stored in multiple possible file locations: due
130     to format (different compression mechanisms with different extensions),
131     locality (different segments might be placed in different directories to
132     control archiving policies), for backwards compatibility (default location
133     changed over time).  A SearchPathEntry describes a possible location for a
134     file.
135     """
136     def __init__(self, directory_prefix, suffix, context=None):
137         self._directory_prefix = directory_prefix
138         self._suffix = suffix
139         self._context = context
140
141     def __repr__(self):
142         return "%s(%r, %r, %r)" % (self.__class__.__name__,
143                                    self._directory_prefix, self._suffix,
144                                    self._context)
145
146     def build_path(self, basename):
147         """Construct the search path to use for a file with name basename.
148
149         Returns a tuple (pathname, context), where pathname is the path to try
150         and context is any additional data associated with this search entry
151         (if any).
152         """
153         return (os.path.join(self._directory_prefix, basename + self._suffix),
154                 self._context)
155
156 class SearchPath(object):
157     """A collection of locations to search for files and lookup utilities.
158
159     For looking for a file in a Cumulus storage backend, a SearchPath object
160     contains a list of possible locations to try.  A SearchPath can be used to
161     perform the search as well; when a file is found the search path ordering
162     is updated (moving the successful SearchPathEntry to the front of the list
163     for future searches).
164     """
165     def __init__(self, name_regex, searchpath):
166         self._regex = re.compile(name_regex)
167         self._path = list(searchpath)
168
169     def add_search_entry(self, entry):
170         self._path.append(entry)
171
172     def directories(self):
173         """Return the set of directories to search for a file type."""
174         return set(entry._directory_prefix for entry in self._path)
175
176     def get(self, backend, basename):
177         for (i, entry) in enumerate(self._path):
178             try:
179                 (pathname, context) = entry.build_path(basename)
180                 fp = backend.get(pathname)
181                 # On success, move this entry to the front of the search path
182                 # to speed future searches.
183                 if i > 0:
184                     self._path.pop(i)
185                     self._path.insert(0, entry)
186                 return (fp, pathname, context)
187             except cumulus.store.NotFoundError:
188                 continue
189         raise cumulus.store.NotFoundError(basename)
190
191     def stat(self, backend, basename):
192         for (i, entry) in enumerate(self._path):
193             try:
194                 (pathname, context) = entry.build_path(basename)
195                 stat_data = backend.stat(pathname)
196                 # On success, move this entry to the front of the search path
197                 # to speed future searches.
198                 if i > 0:
199                     self._path.pop(i)
200                     self._path.insert(0, entry)
201                 result = {"path": pathname}
202                 result.update(stat_data)
203                 return result
204             except cumulus.store.NotFoundError:
205                 continue
206         raise cumulus.store.NotFoundError(basename)
207
208     def match(self, filename):
209         return self._regex.match(filename)
210
211     def list(self, backend):
212         success = False
213         for d in self.directories():
214             try:
215                 for f in backend.list(d):
216                     success = True
217                     m = self.match(f)
218                     if m: yield (os.path.join(d, f), m)
219             except cumulus.store.NotFoundError:
220                 pass
221         if not success:
222             raise cumulus.store.NotFoundError(backend)
223
224 def _build_segments_searchpath(prefix):
225     for (extension, filter) in SEGMENT_FILTERS:
226         yield SearchPathEntry(prefix, extension, filter)
227
228 SEARCH_PATHS = {
229     "checksums": SearchPath(
230         r"^snapshot-(.*)\.(\w+)sums$",
231         [SearchPathEntry("meta", ".sha1sums"),
232          SearchPathEntry("checksums", ".sha1sums"),
233          SearchPathEntry("", ".sha1sums")]),
234     "meta": SearchPath(
235         r"^snapshot-(.*)\.meta(\.\S+)?$",
236         _build_segments_searchpath("meta")),
237     "segments": SearchPath(
238         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
239          r"\.tar(\.\S+)?$"),
240         itertools.chain(
241             _build_segments_searchpath("segments0"),
242             _build_segments_searchpath("segments1"),
243             _build_segments_searchpath(""),
244             _build_segments_searchpath("segments"))),
245     "snapshots": SearchPath(
246         r"^snapshot-(.*)\.(cumulus|lbs)$",
247         [SearchPathEntry("snapshots", ".cumulus"),
248          SearchPathEntry("snapshots", ".lbs"),
249          SearchPathEntry("", ".cumulus"),
250          SearchPathEntry("", ".lbs")]),
251 }
252
253 class BackendWrapper(object):
254     """Wrapper around a Cumulus storage backend that understands file types.
255
256     The BackendWrapper class understands different Cumulus file types, such as
257     snapshots and segments, and implements higher-level operations such as
258     "retrieve a snapshot with a specific name" (hiding operations such as
259     searching for the correct file name).
260     """
261
262     def __init__(self, backend):
263         """Initializes a wrapper around the specified storage backend.
264
265         store may either be a Store object or URL.
266         """
267         if type(backend) in (str, str):
268             if backend.find(":") >= 0:
269                 self._backend = cumulus.store.open(backend)
270             else:
271                 self._backend = cumulus.store.file.FileStore(backend)
272         else:
273             self._backend = backend
274
275     @property
276     def raw_backend(self):
277         return self._backend
278
279     def stat_generic(self, basename, filetype):
280         return SEARCH_PATHS[filetype].stat(self._backend, basename)
281
282     def open_generic(self, basename, filetype):
283         return SEARCH_PATHS[filetype].get(self._backend, basename)
284
285     def open_snapshot(self, name):
286         return self.open_generic("snapshot-" + name, "snapshots")
287
288     def open_segment(self, name):
289         return self.open_generic(name + ".tar", "segments")
290
291     def list_generic(self, filetype):
292         return ((x[1].group(1), x[0])
293                 for x in SEARCH_PATHS[filetype].list(self._backend))
294
295     def prefetch_generic(self):
296         """Calls scan on directories to prefetch file metadata."""
297         directories = set()
298         for typeinfo in list(SEARCH_PATHS.values()):
299             directories.update(typeinfo.directories())
300         for d in directories:
301             print("Prefetch", d)
302             self._backend.scan(d)
303
304 class CumulusStore:
305     def __init__(self, backend):
306         if isinstance(backend, BackendWrapper):
307             self.backend = backend
308         else:
309             self.backend = BackendWrapper(backend)
310         self.cachedir = None
311         self.CACHE_SIZE = 16
312         self._lru_list = []
313
314     def get_cachedir(self):
315         if self.cachedir is None:
316             self.cachedir = tempfile.mkdtemp("-cumulus")
317         return self.cachedir
318
319     def cleanup(self):
320         if self.cachedir is not None:
321             # TODO: Avoid use of system, make this safer
322             os.system("rm -rf " + self.cachedir)
323         self.cachedir = None
324
325     @staticmethod
326     def parse_ref(refstr):
327         m = re.match(r"^zero\[(\d+)\]$", refstr)
328         if m:
329             return ("zero", None, None, (0, int(m.group(1)), False))
330
331         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
332         if not m: return
333
334         segment = m.group(1)
335         object = m.group(2)
336         checksum = m.group(3)
337         slice = m.group(4)
338
339         if checksum is not None:
340             checksum = checksum.lstrip("(").rstrip(")")
341
342         if slice is not None:
343             if m.group(9) is not None:
344                 # Size-assertion slice
345                 slice = (0, int(m.group(9)), True)
346             elif m.group(6) is None:
347                 # Abbreviated slice
348                 slice = (0, int(m.group(8)), False)
349             else:
350                 slice = (int(m.group(7)), int(m.group(8)), False)
351
352         return (segment, object, checksum, slice)
353
354     def list_snapshots(self):
355         return set(x[0] for x in self.backend.list_generic("snapshots"))
356
357     def list_segments(self):
358         return set(x[0] for x in self.backend.list_generic("segments"))
359
360     def load_snapshot(self, snapshot):
361         snapshot_file = self.backend.open_snapshot(snapshot)[0]
362         return snapshot_file.read().splitlines(True)
363
364     @staticmethod
365     def filter_data(filehandle, filter_cmd):
366         if filter_cmd is None:
367             return filehandle
368         (input, output) = os.popen2(filter_cmd)
369         def copy_thread(src, dst):
370             BLOCK_SIZE = 4096
371             while True:
372                 block = src.read(BLOCK_SIZE)
373                 if len(block) == 0: break
374                 dst.write(block)
375             src.close()
376             dst.close()
377         _thread.start_new_thread(copy_thread, (filehandle, input))
378         return output
379
380     def get_segment(self, segment):
381         accessed_segments.add(segment)
382
383         (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
384         return self.filter_data(segment_fp, filter_cmd)
385
386     def load_segment(self, segment):
387         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
388         for item in seg:
389             data_obj = seg.extractfile(item)
390             path = item.name.split('/')
391             if len(path) == 2 and path[0] == segment:
392                 yield (path[1], data_obj.read())
393
394     def extract_segment(self, segment):
395         segdir = os.path.join(self.get_cachedir(), segment)
396         os.mkdir(segdir)
397         for (object, data) in self.load_segment(segment):
398             f = open(os.path.join(segdir, object), 'wb')
399             f.write(data)
400             f.close()
401
402     def load_object(self, segment, object):
403         accessed_segments.add(segment)
404         path = os.path.join(self.get_cachedir(), segment, object)
405         if not os.access(path, os.R_OK):
406             self.extract_segment(segment)
407         if segment in self._lru_list: self._lru_list.remove(segment)
408         self._lru_list.append(segment)
409         while len(self._lru_list) > self.CACHE_SIZE:
410             os.system("rm -rf " + os.path.join(self.cachedir,
411                                                self._lru_list[0]))
412             self._lru_list = self._lru_list[1:]
413         return open(path, 'rb').read()
414
415     def get(self, refstr):
416         """Fetch the given object and return it.
417
418         The input should be an object reference, in string form.
419         """
420
421         (segment, object, checksum, slice) = self.parse_ref(refstr)
422
423         if segment == "zero":
424             return "\0" * slice[1]
425
426         data = self.load_object(segment, object)
427
428         if checksum is not None:
429             verifier = ChecksumVerifier(checksum)
430             verifier.update(data)
431             if not verifier.valid():
432                 raise ValueError
433
434         if slice is not None:
435             (start, length, exact) = slice
436             if exact and len(data) != length: raise ValueError
437             data = data[start:start+length]
438             if len(data) != length: raise IndexError
439
440         return data
441
442     def prefetch(self):
443         self.backend.prefetch_generic()
444
445 def parse(lines, terminate=None):
446     """Generic parser for RFC822-style "Key: Value" data streams.
447
448     This parser can be used to read metadata logs and snapshot root descriptor
449     files.
450
451     lines must be an iterable object which yields a sequence of lines of input.
452
453     If terminate is specified, it is used as a predicate to determine when to
454     stop reading input lines.
455     """
456
457     dict = {}
458     last_key = None
459
460     for l in lines:
461         # Strip off a trailing newline, if present
462         if len(l) > 0 and l[-1] == "\n":
463             l = l[:-1]
464
465         if terminate is not None and terminate(l):
466             if len(dict) > 0: yield dict
467             dict = {}
468             last_key = None
469             continue
470
471         m = re.match(r"^([-\w]+):\s*(.*)$", l)
472         if m:
473             dict[m.group(1)] = m.group(2)
474             last_key = m.group(1)
475         elif len(l) > 0 and l[0].isspace() and last_key is not None:
476             dict[last_key] += l
477         else:
478             last_key = None
479
480     if len(dict) > 0: yield dict
481
482 def parse_full(lines):
483     try:
484         return next(parse(lines))
485     except StopIteration:
486         return {}
487
488 def parse_metadata_version(s):
489     """Convert a string with the snapshot version format to a tuple."""
490
491     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
492     if m is None:
493         return ()
494     else:
495         return tuple([int(d) for d in m.group(1).split(".")])
496
497 def read_metadata(object_store, root):
498     """Iterate through all lines in the metadata log, following references."""
499
500     # Stack for keeping track of recursion when following references to
501     # portions of the log.  The last entry in the stack corresponds to the
502     # object currently being parsed.  Each entry is a list of lines which have
503     # been reversed, so that popping successive lines from the end of each list
504     # will return lines of the metadata log in order.
505     stack = []
506
507     def follow_ref(refstr):
508         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
509         lines = object_store.get(refstr).splitlines(True)
510         lines.reverse()
511         stack.append(lines)
512
513     follow_ref(root)
514
515     while len(stack) > 0:
516         top = stack[-1]
517         if len(top) == 0:
518             stack.pop()
519             continue
520         line = top.pop()
521
522         # An indirect reference which we must follow?
523         if len(line) > 0 and line[0] == '@':
524             ref = line[1:]
525             ref.strip()
526             follow_ref(ref)
527         else:
528             yield line
529
530 class MetadataItem:
531     """Metadata for a single file (or directory or...) from a snapshot."""
532
533     # Functions for parsing various datatypes that can appear in a metadata log
534     # item.
535     @staticmethod
536     def decode_int(s):
537         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
538         if s.startswith("0x"):
539             return int(s, 16)
540         elif s.startswith("0"):
541             return int(s, 8)
542         else:
543             return int(s, 10)
544
545     @staticmethod
546     def decode_str(s):
547         """Decode a URI-encoded (%xx escapes) string."""
548         return uri_decode(s)
549
550     @staticmethod
551     def raw_str(s):
552         """An unecoded string."""
553         return s
554
555     @staticmethod
556     def decode_user(s):
557         """Decode a user/group to a tuple of uid/gid followed by name."""
558         items = s.split()
559         uid = MetadataItem.decode_int(items[0])
560         name = None
561         if len(items) > 1:
562             if items[1].startswith("(") and items[1].endswith(")"):
563                 name = MetadataItem.decode_str(items[1][1:-1])
564         return (uid, name)
565
566     @staticmethod
567     def decode_device(s):
568         """Decode a device major/minor number."""
569         (major, minor) = list(map(MetadataItem.decode_int, s.split("/")))
570         return (major, minor)
571
572     class Items: pass
573
574     def __init__(self, fields, object_store):
575         """Initialize from a dictionary of key/value pairs from metadata log."""
576
577         self.fields = fields
578         self.object_store = object_store
579         self.keys = []
580         self.items = self.Items()
581         for (k, v) in list(fields.items()):
582             if k in self.field_types:
583                 decoder = self.field_types[k]
584                 setattr(self.items, k, decoder(v))
585                 self.keys.append(k)
586
587     def data(self):
588         """Return an iterator for the data blocks that make up a file."""
589
590         # This traverses the list of blocks that make up a file, following
591         # indirect references.  It is implemented in much the same way as
592         # read_metadata, so see that function for details of the technique.
593
594         objects = self.fields['data'].split()
595         objects.reverse()
596         stack = [objects]
597
598         def follow_ref(refstr):
599             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
600             objects = self.object_store.get(refstr).split()
601             objects.reverse()
602             stack.append(objects)
603
604         while len(stack) > 0:
605             top = stack[-1]
606             if len(top) == 0:
607                 stack.pop()
608                 continue
609             ref = top.pop()
610
611             # An indirect reference which we must follow?
612             if len(ref) > 0 and ref[0] == '@':
613                 follow_ref(ref[1:])
614             else:
615                 yield ref
616
617 # Description of fields that might appear, and how they should be parsed.
618 MetadataItem.field_types = {
619     'name': MetadataItem.decode_str,
620     'type': MetadataItem.raw_str,
621     'mode': MetadataItem.decode_int,
622     'device': MetadataItem.decode_device,
623     'user': MetadataItem.decode_user,
624     'group': MetadataItem.decode_user,
625     'ctime': MetadataItem.decode_int,
626     'mtime': MetadataItem.decode_int,
627     'links': MetadataItem.decode_int,
628     'inode': MetadataItem.raw_str,
629     'checksum': MetadataItem.decode_str,
630     'size': MetadataItem.decode_int,
631     'contents': MetadataItem.decode_str,
632     'target': MetadataItem.decode_str,
633 }
634
635 def iterate_metadata(object_store, root):
636     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
637         yield MetadataItem(d, object_store)
638
639 class LocalDatabase:
640     """Access to the local database of snapshot contents and object checksums.
641
642     The local database is consulted when creating a snapshot to determine what
643     data can be re-used from old snapshots.  Segment cleaning is performed by
644     manipulating the data in the local database; the local database also
645     includes enough data to guide the segment cleaning process.
646     """
647
648     def __init__(self, path, dbname="localdb.sqlite"):
649         self.db_connection = sqlite3.connect(path + "/" + dbname)
650
651     # Low-level database access.  Use these methods when there isn't a
652     # higher-level interface available.  Exception: do, however, remember to
653     # use the commit() method after making changes to make sure they are
654     # actually saved, even when going through higher-level interfaces.
655     def commit(self):
656         "Commit any pending changes to the local database."
657         self.db_connection.commit()
658
659     def rollback(self):
660         "Roll back any pending changes to the local database."
661         self.db_connection.rollback()
662
663     def cursor(self):
664         "Return a DB-API cursor for directly accessing the local database."
665         return self.db_connection.cursor()
666
667     def list_schemes(self):
668         """Return the list of snapshots found in the local database.
669
670         The returned value is a list of tuples (id, scheme, name, time, intent).
671         """
672
673         cur = self.cursor()
674         cur.execute("select distinct scheme from snapshots")
675         schemes = [row[0] for row in cur.fetchall()]
676         schemes.sort()
677         return schemes
678
679     def list_snapshots(self, scheme):
680         """Return a list of snapshots for the given scheme."""
681         cur = self.cursor()
682         cur.execute("select name from snapshots")
683         snapshots = [row[0] for row in cur.fetchall()]
684         snapshots.sort()
685         return snapshots
686
687     def delete_snapshot(self, scheme, name):
688         """Remove the specified snapshot from the database.
689
690         Warning: This does not garbage collect all dependent data in the
691         database, so it must be followed by a call to garbage_collect() to make
692         the database consistent.
693         """
694         cur = self.cursor()
695         cur.execute("delete from snapshots where scheme = ? and name = ?",
696                     (scheme, name))
697
698     def prune_old_snapshots(self, scheme, intent=1.0):
699         """Delete entries from old snapshots from the database.
700
701         Only snapshots with the specified scheme name will be deleted.  If
702         intent is given, it gives the intended next snapshot type, to determine
703         how aggressively to clean (for example, intent=7 could be used if the
704         next snapshot will be a weekly snapshot).
705         """
706
707         cur = self.cursor()
708
709         # Find the id of the last snapshot to be created.  This is used for
710         # measuring time in a way: we record this value in each segment we
711         # expire on this run, and then on a future run can tell if there have
712         # been intervening backups made.
713         cur.execute("select max(snapshotid) from snapshots")
714         last_snapshotid = cur.fetchone()[0]
715
716         # Get the list of old snapshots for this scheme.  Delete all the old
717         # ones.  Rules for what to keep:
718         #   - Always keep the most recent snapshot.
719         #   - If snapshot X is younger than Y, and X has higher intent, then Y
720         #     can be deleted.
721         cur.execute("""select snapshotid, name, intent,
722                               julianday('now') - timestamp as age
723                        from snapshots where scheme = ?
724                        order by age""", (scheme,))
725
726         first = True
727         max_intent = intent
728         for (id, name, snap_intent, snap_age) in cur.fetchall():
729             can_delete = False
730             if snap_intent < max_intent:
731                 # Delete small-intent snapshots if there is a more recent
732                 # large-intent snapshot.
733                 can_delete = True
734             elif snap_intent == intent:
735                 # Delete previous snapshots with the specified intent level.
736                 can_delete = True
737
738             if can_delete and not first:
739                 print("Delete snapshot %d (%s)" % (id, name))
740                 cur.execute("delete from snapshots where snapshotid = ?",
741                             (id,))
742             first = False
743             max_intent = max(max_intent, snap_intent)
744
745         self.garbage_collect()
746
747     def garbage_collect(self):
748         """Garbage-collect unreachable segment and object data.
749
750         Remove all segments and checksums which is not reachable from the
751         current set of snapshots stored in the local database.
752         """
753         cur = self.cursor()
754
755         # Delete entries in the segment_utilization table which are for
756         # non-existent snapshots.
757         cur.execute("""delete from segment_utilization
758                        where snapshotid not in
759                            (select snapshotid from snapshots)""")
760
761         # Delete segments not referenced by any current snapshots.
762         cur.execute("""delete from segments where segmentid not in
763                            (select segmentid from segment_utilization)""")
764
765         # Delete dangling objects in the block_index table.
766         cur.execute("""delete from block_index
767                        where segmentid not in
768                            (select segmentid from segments)""")
769
770         # Remove sub-block signatures for deleted objects.
771         cur.execute("""delete from subblock_signatures
772                        where blockid not in
773                            (select blockid from block_index)""")
774
775     # Segment cleaning.
776     class SegmentInfo(Struct): pass
777
778     def get_segment_cleaning_list(self, age_boost=0.0):
779         """Return a list of all current segments with information for cleaning.
780
781         Return all segments which are currently known in the local database
782         (there might be other, older segments in the archive itself), and
783         return usage statistics for each to help decide which segments to
784         clean.
785
786         The returned list will be sorted by estimated cleaning benefit, with
787         segments that are best to clean at the start of the list.
788
789         If specified, the age_boost parameter (measured in days) will added to
790         the age of each segment, as a way of adjusting the benefit computation
791         before a long-lived snapshot is taken (for example, age_boost might be
792         set to 7 when cleaning prior to taking a weekly snapshot).
793         """
794
795         cur = self.cursor()
796         segments = []
797         cur.execute("""select segmentid, used, size, mtime,
798                        julianday('now') - mtime as age from segment_info
799                        where expire_time is null""")
800         for row in cur:
801             info = self.SegmentInfo()
802             info.id = row[0]
803             info.used_bytes = row[1]
804             info.size_bytes = row[2]
805             info.mtime = row[3]
806             info.age_days = row[4]
807
808             # If data is not available for whatever reason, treat it as 0.0.
809             if info.age_days is None:
810                 info.age_days = 0.0
811             if info.used_bytes is None:
812                 info.used_bytes = 0.0
813
814             # Benefit calculation: u is the estimated fraction of each segment
815             # which is utilized (bytes belonging to objects still in use
816             # divided by total size; this doesn't take compression or storage
817             # overhead into account, but should give a reasonable estimate).
818             #
819             # The total benefit is a heuristic that combines several factors:
820             # the amount of space that can be reclaimed (1 - u), an ageing
821             # factor (info.age_days) that favors cleaning old segments to young
822             # ones and also is more likely to clean segments that will be
823             # rewritten for long-lived snapshots (age_boost), and finally a
824             # penalty factor for the cost of re-uploading data (u + 0.1).
825             u = info.used_bytes / info.size_bytes
826             info.cleaning_benefit \
827                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
828
829             segments.append(info)
830
831         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
832         return segments
833
834     def mark_segment_expired(self, segment):
835         """Mark a segment for cleaning in the local database.
836
837         The segment parameter should be either a SegmentInfo object or an
838         integer segment id.  Objects in the given segment will be marked as
839         expired, which means that any future snapshots that would re-use those
840         objects will instead write out a new copy of the object, and thus no
841         future snapshots will depend upon the given segment.
842         """
843
844         if isinstance(segment, int):
845             id = segment
846         elif isinstance(segment, self.SegmentInfo):
847             id = segment.id
848         else:
849             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
850
851         cur = self.cursor()
852         cur.execute("select max(snapshotid) from snapshots")
853         last_snapshotid = cur.fetchone()[0]
854         cur.execute("update segments set expire_time = ? where segmentid = ?",
855                     (last_snapshotid, id))
856         cur.execute("update block_index set expired = 0 where segmentid = ?",
857                     (id,))
858
859     def balance_expired_objects(self):
860         """Analyze expired objects in segments to be cleaned and group by age.
861
862         Update the block_index table of the local database to group expired
863         objects by age.  The exact number of buckets and the cutoffs for each
864         are dynamically determined.  Calling this function after marking
865         segments expired will help in the segment cleaning process, by ensuring
866         that when active objects from clean segments are rewritten, they will
867         be placed into new segments roughly grouped by age.
868         """
869
870         # The expired column of the block_index table is used when generating a
871         # new Cumulus snapshot.  A null value indicates that an object may be
872         # re-used.  Otherwise, an object must be written into a new segment if
873         # needed.  Objects with distinct expired values will be written into
874         # distinct segments, to allow for some grouping by age.  The value 0 is
875         # somewhat special in that it indicates any rewritten objects can be
876         # placed in the same segment as completely new objects; this can be
877         # used for very young objects which have been expired, or objects not
878         # expected to be encountered.
879         #
880         # In the balancing process, all objects which are not used in any
881         # current snapshots will have expired set to 0.  Objects which have
882         # been seen will be sorted by age and will have expired values set to
883         # 0, 1, 2, and so on based on age (with younger objects being assigned
884         # lower values).  The number of buckets and the age cutoffs is
885         # determined by looking at the distribution of block ages.
886
887         cur = self.cursor()
888
889         # Mark all expired objects with expired = 0; these objects will later
890         # have values set to indicate groupings of objects when repacking.
891         cur.execute("""update block_index set expired = 0
892                        where expired is not null""")
893
894         # We will want to aim for at least one full segment for each bucket
895         # that we eventually create, but don't know how many bytes that should
896         # be due to compression.  So compute the average number of bytes in
897         # each expired segment as a rough estimate for the minimum size of each
898         # bucket.  (This estimate could be thrown off by many not-fully-packed
899         # segments, but for now don't worry too much about that.)  If we can't
900         # compute an average, it's probably because there are no expired
901         # segments, so we have no more work to do.
902         cur.execute("""select avg(size) from segments
903                        where segmentid in
904                            (select distinct segmentid from block_index
905                             where expired is not null)""")
906         segment_size_estimate = cur.fetchone()[0]
907         if not segment_size_estimate:
908             return
909
910         # Next, extract distribution of expired objects (number and size) by
911         # age.  Save the timestamp for "now" so that the classification of
912         # blocks into age buckets will not change later in the function, after
913         # time has passed.  Set any timestamps in the future to now, so we are
914         # guaranteed that for the rest of this function, age is always
915         # non-negative.
916         cur.execute("select julianday('now')")
917         now = cur.fetchone()[0]
918
919         cur.execute("""update block_index set timestamp = ?
920                        where timestamp > ? and expired is not null""",
921                     (now, now))
922
923         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
924                        from block_index where expired = 0
925                        group by age order by age""", (now,))
926         distribution = cur.fetchall()
927
928         # Start to determine the buckets for expired objects.  Heuristics used:
929         #   - An upper bound on the number of buckets is given by the number of
930         #     segments we estimate it will take to store all data.  In fact,
931         #     aim for a couple of segments per bucket.
932         #   - Place very young objects in bucket 0 (place with new objects)
933         #     unless there are enough of them to warrant a separate bucket.
934         #   - Try not to create unnecessarily many buckets, since fewer buckets
935         #     will allow repacked data to be grouped based on spatial locality
936         #     (while more buckets will group by temporal locality).  We want a
937         #     balance.
938         MIN_AGE = 4
939         total_bytes = sum([i[2] for i in distribution])
940         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
941         min_size = 1.5 * segment_size_estimate
942         target_size = max(2 * segment_size_estimate,
943                           total_bytes / target_buckets)
944
945         print("segment_size:", segment_size_estimate)
946         print("distribution:", distribution)
947         print("total_bytes:", total_bytes)
948         print("target_buckets:", target_buckets)
949         print("min, target size:", min_size, target_size)
950
951         # Chosen cutoffs.  Each bucket consists of objects with age greater
952         # than one cutoff value, but not greater than the next largest cutoff.
953         cutoffs = []
954
955         # Starting with the oldest objects, begin grouping together into
956         # buckets of size at least target_size bytes.
957         distribution.reverse()
958         bucket_size = 0
959         min_age_bucket = False
960         for (age, items, size) in distribution:
961             if bucket_size >= target_size \
962                 or (age < MIN_AGE and not min_age_bucket):
963                 if bucket_size < target_size and len(cutoffs) > 0:
964                     cutoffs.pop()
965                 cutoffs.append(age)
966                 bucket_size = 0
967
968             bucket_size += size
969             if age < MIN_AGE:
970                 min_age_bucket = True
971
972         # The last (youngest) bucket will be group 0, unless it has enough data
973         # to be of size min_size by itself, or there happen to be no objects
974         # less than MIN_AGE at all.
975         if bucket_size >= min_size or not min_age_bucket:
976             cutoffs.append(-1)
977         cutoffs.append(-1)
978
979         print("cutoffs:", cutoffs)
980
981         # Update the database to assign each object to the appropriate bucket.
982         cutoffs.reverse()
983         for i in range(len(cutoffs)):
984             cur.execute("""update block_index set expired = ?
985                            where round(? - timestamp) > ?
986                              and expired is not null""",
987                         (i, now, cutoffs[i]))