Reworking cleanup.
[cumulus.git] / python / cumulus / __init__.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2008-2009, 2012 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """High-level interface for working with Cumulus archives.
20
21 This module provides an easy interface for reading from and manipulating
22 various parts of a Cumulus archive:
23   - listing the snapshots and segments present
24   - reading segment contents
25   - parsing snapshot descriptors and snapshot metadata logs
26   - reading and maintaining the local object database
27 """
28
29 from __future__ import division
30 import hashlib
31 import itertools
32 import os
33 import re
34 import tarfile
35 import tempfile
36 import thread
37 from pysqlite2 import dbapi2 as sqlite3
38
39 import cumulus.store
40 import cumulus.store.file
41
42 # The largest supported snapshot format that can be understood.
43 FORMAT_VERSION = (0, 11)        # Cumulus Snapshot v0.11
44
45 # Maximum number of nested indirect references allowed in a snapshot.
46 MAX_RECURSION_DEPTH = 3
47
48 # All segments which have been accessed this session.
49 accessed_segments = set()
50
51 # Table of methods used to filter segments before storage, and corresponding
52 # filename extensions.  These are listed in priority order (methods earlier in
53 # the list are tried first).
54 SEGMENT_FILTERS = [
55     (".gpg", "cumulus-filter-gpg --decrypt"),
56     (".gz", "gzip -dc"),
57     (".bz2", "bzip2 -dc"),
58 ]
59
60 def uri_decode(s):
61     """Decode a URI-encoded (%xx escapes) string."""
62     def hex_decode(m): return chr(int(m.group(1), 16))
63     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
64 def uri_encode(s):
65     """Encode a string to URI-encoded (%xx escapes) form."""
66     def hex_encode(c):
67         if c > '+' and c < '\x7f' and c != '@':
68             return c
69         else:
70             return "%%%02x" % (ord(c),)
71     return ''.join(hex_encode(c) for c in s)
72
73 class Struct:
74     """A class which merely acts as a data container.
75
76     Instances of this class (or its subclasses) are merely used to store data
77     in various attributes.  No methods are provided.
78     """
79
80     def __repr__(self):
81         return "<%s %s>" % (self.__class__, self.__dict__)
82
83 CHECKSUM_ALGORITHMS = {
84     'sha1': hashlib.sha1,
85     'sha224': hashlib.sha224,
86     'sha256': hashlib.sha256,
87 }
88
89 class ChecksumCreator:
90     """Compute a Cumulus checksum for provided data.
91
92     The algorithm used is selectable, but currently defaults to sha1.
93     """
94
95     def __init__(self, algorithm='sha1'):
96         self.algorithm = algorithm
97         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
98
99     def update(self, data):
100         self.hash.update(data)
101         return self
102
103     def compute(self):
104         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
105
106 class ChecksumVerifier:
107     """Verify whether a checksum from a snapshot matches the supplied data."""
108
109     def __init__(self, checksumstr):
110         """Create an object to check the supplied checksum."""
111
112         (algo, checksum) = checksumstr.split("=", 1)
113         self.checksum = checksum
114         self.hash = CHECKSUM_ALGORITHMS[algo]()
115
116     def update(self, data):
117         self.hash.update(data)
118
119     def valid(self):
120         """Return a boolean indicating whether the checksum matches."""
121
122         result = self.hash.hexdigest()
123         return result == self.checksum
124
125 class SearchPathEntry(object):
126     """Item representing a possible search location for Cumulus files.
127
128     Some Cumulus files might be stored in multiple possible file locations: due
129     to format (different compression mechanisms with different extensions),
130     locality (different segments might be placed in different directories to
131     control archiving policies), for backwards compatibility (default location
132     changed over time).  A SearchPathEntry describes a possible location for a
133     file.
134     """
135     def __init__(self, directory_prefix, suffix, context=None):
136         self._directory_prefix = directory_prefix
137         self._suffix = suffix
138         self._context = context
139
140     def __repr__(self):
141         return "%s(%r, %r, %r)" % (self.__class__.__name__,
142                                    self._directory_prefix, self._suffix,
143                                    self._context)
144
145     def build_path(self, basename):
146         """Construct the search path to use for a file with name basename.
147
148         Returns a tuple (pathname, context), where pathname is the path to try
149         and context is any additional data associated with this search entry
150         (if any).
151         """
152         return (os.path.join(self._directory_prefix, basename + self._suffix),
153                 self._context)
154
155 class SearchPath(object):
156     """A collection of locations to search for files and lookup utilities.
157
158     For looking for a file in a Cumulus storage backend, a SearchPath object
159     contains a list of possible locations to try.  A SearchPath can be used to
160     perform the search as well; when a file is found the search path ordering
161     is updated (moving the successful SearchPathEntry to the front of the list
162     for future searches).
163     """
164     def __init__(self, name_regex, searchpath):
165         self._regex = re.compile(name_regex)
166         self._path = list(searchpath)
167
168     def add_search_entry(self, entry):
169         self._path.append(entry)
170
171     def directories(self):
172         """Return the set of directories to search for a file type."""
173         return set(entry._directory_prefix for entry in self._path)
174
175     def get(self, backend, basename):
176         for (i, entry) in enumerate(self._path):
177             try:
178                 (pathname, context) = entry.build_path(basename)
179                 fp = backend.get(pathname)
180                 # On success, move this entry to the front of the search path
181                 # to speed future searches.
182                 if i > 0:
183                     self._path.pop(i)
184                     self._path.insert(0, entry)
185                 return (fp, pathname, context)
186             except cumulus.store.NotFoundError:
187                 continue
188         raise cumulus.store.NotFoundError(basename)
189
190     def stat(self, backend, basename):
191         for (i, entry) in enumerate(self._path):
192             try:
193                 (pathname, context) = entry.build_path(basename)
194                 stat_data = backend.stat(pathname)
195                 # On success, move this entry to the front of the search path
196                 # to speed future searches.
197                 if i > 0:
198                     self._path.pop(i)
199                     self._path.insert(0, entry)
200                 result = {"path": pathname}
201                 result.update(stat_data)
202                 return result
203             except cumulus.store.NotFoundError:
204                 continue
205         raise cumulus.store.NotFoundError(basename)
206
207     def list(self, backend):
208         success = False
209         for d in self.directories():
210             try:
211                 for f in backend.list(d):
212                     success = True
213                     m = self._regex.match(f)
214                     if m: yield (os.path.join(d, f), m)
215             except cumulus.store.NotFoundError:
216                 pass
217         if not success:
218             raise cumulus.store.NotFoundError(basename)
219
220 def _build_segments_searchpath(prefix):
221     for (extension, filter) in SEGMENT_FILTERS:
222         yield SearchPathEntry(prefix, extension, filter)
223
224 SEARCH_PATHS = {
225     "checksums": SearchPath(
226         r"^snapshot-(.*)\.(\w+)sums$",
227         [SearchPathEntry("meta", ".sha1sums"),
228          SearchPathEntry("checksums", ".sha1sums"),
229          SearchPathEntry("", ".sha1sums")]),
230     "segments": SearchPath(
231         (r"^([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
232          r"(\.\S+)?$"),
233         itertools.chain(
234             _build_segments_searchpath("segments0"),
235             _build_segments_searchpath("segments1"),
236             _build_segments_searchpath(""),
237             _build_segments_searchpath("segments"))),
238     "snapshots": SearchPath(
239         r"^snapshot-(.*)\.(cumulus|lbs)$",
240         [SearchPathEntry("snapshots", ".cumulus"),
241          SearchPathEntry("snapshots", ".lbs"),
242          SearchPathEntry("", ".cumulus"),
243          SearchPathEntry("", ".lbs")]),
244 }
245
246 class BackendWrapper(object):
247     """Wrapper around a Cumulus storage backend that understands file types.
248
249     The BackendWrapper class understands different Cumulus file types, such as
250     snapshots and segments, and implements higher-level operations such as
251     "retrieve a snapshot with a specific name" (hiding operations such as
252     searching for the correct file name).
253     """
254
255     def __init__(self, backend):
256         """Initializes a wrapper around the specified storage backend.
257
258         store may either be a Store object or URL.
259         """
260         if type(backend) in (str, unicode):
261             if backend.find(":") >= 0:
262                 self._backend = cumulus.store.open(backend)
263             else:
264                 self._backend = cumulus.store.file.FileStore(backend)
265         else:
266             self._backend = backend
267
268     @property
269     def raw_backend(self):
270         return self._backend
271
272     def stat_generic(self, basename, filetype):
273         return SEARCH_PATHS[filetype].stat(self._backend, basename)
274
275     def open_generic(self, basename, filetype):
276         return SEARCH_PATHS[filetype].get(self._backend, basename)
277
278     def open_snapshot(self, name):
279         return self.open_generic("snapshot-" + name, "snapshots")
280
281     def open_segment(self, name):
282         return self.open_generic(name + ".tar", "segments")
283
284     def list_generic(self, filetype):
285         return ((x[1].group(1), x[0])
286                 for x in SEARCH_PATHS[filetype].list(self._backend))
287
288 class CumulusStore:
289     def __init__(self, backend):
290         if isinstance(backend, BackendWrapper):
291             self.backend = backend
292         else:
293             self.backend = BackendWrapper(backend)
294         self.cachedir = None
295         self.CACHE_SIZE = 16
296         self._lru_list = []
297
298     def get_cachedir(self):
299         if self.cachedir is None:
300             self.cachedir = tempfile.mkdtemp("-cumulus")
301         return self.cachedir
302
303     def cleanup(self):
304         if self.cachedir is not None:
305             # TODO: Avoid use of system, make this safer
306             os.system("rm -rf " + self.cachedir)
307         self.cachedir = None
308
309     @staticmethod
310     def parse_ref(refstr):
311         m = re.match(r"^zero\[(\d+)\]$", refstr)
312         if m:
313             return ("zero", None, None, (0, int(m.group(1)), False))
314
315         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
316         if not m: return
317
318         segment = m.group(1)
319         object = m.group(2)
320         checksum = m.group(3)
321         slice = m.group(4)
322
323         if checksum is not None:
324             checksum = checksum.lstrip("(").rstrip(")")
325
326         if slice is not None:
327             if m.group(9) is not None:
328                 # Size-assertion slice
329                 slice = (0, int(m.group(9)), True)
330             elif m.group(6) is None:
331                 # Abbreviated slice
332                 slice = (0, int(m.group(8)), False)
333             else:
334                 slice = (int(m.group(7)), int(m.group(8)), False)
335
336         return (segment, object, checksum, slice)
337
338     def list_snapshots(self):
339         return set(x[0] for x in self.backend.list_generic("snapshots"))
340
341     def list_segments(self):
342         return set(x[0] for x in self.backend.list_generic("segments"))
343
344     def load_snapshot(self, snapshot):
345         snapshot_file = self.backend.open_snapshot(snapshot)[0]
346         return snapshot_file.read().splitlines(True)
347
348     def get_segment(self, segment):
349         accessed_segments.add(segment)
350
351         (segment_fp, path, filter_cmd) = self.backend.open_segment(segment)
352         if filter_cmd is None:
353             return segment_fp
354         (input, output) = os.popen2(filter_cmd)
355         def copy_thread(src, dst):
356             BLOCK_SIZE = 4096
357             while True:
358                 block = src.read(BLOCK_SIZE)
359                 if len(block) == 0: break
360                 dst.write(block)
361             src.close()
362             dst.close()
363         thread.start_new_thread(copy_thread, (segment_fp, input))
364         return output
365
366     def load_segment(self, segment):
367         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
368         for item in seg:
369             data_obj = seg.extractfile(item)
370             path = item.name.split('/')
371             if len(path) == 2 and path[0] == segment:
372                 yield (path[1], data_obj.read())
373
374     def extract_segment(self, segment):
375         segdir = os.path.join(self.get_cachedir(), segment)
376         os.mkdir(segdir)
377         for (object, data) in self.load_segment(segment):
378             f = open(os.path.join(segdir, object), 'wb')
379             f.write(data)
380             f.close()
381
382     def load_object(self, segment, object):
383         accessed_segments.add(segment)
384         path = os.path.join(self.get_cachedir(), segment, object)
385         if not os.access(path, os.R_OK):
386             self.extract_segment(segment)
387         if segment in self._lru_list: self._lru_list.remove(segment)
388         self._lru_list.append(segment)
389         while len(self._lru_list) > self.CACHE_SIZE:
390             os.system("rm -rf " + os.path.join(self.cachedir,
391                                                self._lru_list[0]))
392             self._lru_list = self._lru_list[1:]
393         return open(path, 'rb').read()
394
395     def get(self, refstr):
396         """Fetch the given object and return it.
397
398         The input should be an object reference, in string form.
399         """
400
401         (segment, object, checksum, slice) = self.parse_ref(refstr)
402
403         if segment == "zero":
404             return "\0" * slice[1]
405
406         data = self.load_object(segment, object)
407
408         if checksum is not None:
409             verifier = ChecksumVerifier(checksum)
410             verifier.update(data)
411             if not verifier.valid():
412                 raise ValueError
413
414         if slice is not None:
415             (start, length, exact) = slice
416             if exact and len(data) != length: raise ValueError
417             data = data[start:start+length]
418             if len(data) != length: raise IndexError
419
420         return data
421
422 def parse(lines, terminate=None):
423     """Generic parser for RFC822-style "Key: Value" data streams.
424
425     This parser can be used to read metadata logs and snapshot root descriptor
426     files.
427
428     lines must be an iterable object which yields a sequence of lines of input.
429
430     If terminate is specified, it is used as a predicate to determine when to
431     stop reading input lines.
432     """
433
434     dict = {}
435     last_key = None
436
437     for l in lines:
438         # Strip off a trailing newline, if present
439         if len(l) > 0 and l[-1] == "\n":
440             l = l[:-1]
441
442         if terminate is not None and terminate(l):
443             if len(dict) > 0: yield dict
444             dict = {}
445             last_key = None
446             continue
447
448         m = re.match(r"^([-\w]+):\s*(.*)$", l)
449         if m:
450             dict[m.group(1)] = m.group(2)
451             last_key = m.group(1)
452         elif len(l) > 0 and l[0].isspace() and last_key is not None:
453             dict[last_key] += l
454         else:
455             last_key = None
456
457     if len(dict) > 0: yield dict
458
459 def parse_full(lines):
460     try:
461         return parse(lines).next()
462     except StopIteration:
463         return {}
464
465 def parse_metadata_version(s):
466     """Convert a string with the snapshot version format to a tuple."""
467
468     m = re.match(r"^(?:Cumulus|LBS) Snapshot v(\d+(\.\d+)*)$", s)
469     if m is None:
470         return ()
471     else:
472         return tuple([int(d) for d in m.group(1).split(".")])
473
474 def read_metadata(object_store, root):
475     """Iterate through all lines in the metadata log, following references."""
476
477     # Stack for keeping track of recursion when following references to
478     # portions of the log.  The last entry in the stack corresponds to the
479     # object currently being parsed.  Each entry is a list of lines which have
480     # been reversed, so that popping successive lines from the end of each list
481     # will return lines of the metadata log in order.
482     stack = []
483
484     def follow_ref(refstr):
485         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
486         lines = object_store.get(refstr).splitlines(True)
487         lines.reverse()
488         stack.append(lines)
489
490     follow_ref(root)
491
492     while len(stack) > 0:
493         top = stack[-1]
494         if len(top) == 0:
495             stack.pop()
496             continue
497         line = top.pop()
498
499         # An indirect reference which we must follow?
500         if len(line) > 0 and line[0] == '@':
501             ref = line[1:]
502             ref.strip()
503             follow_ref(ref)
504         else:
505             yield line
506
507 class MetadataItem:
508     """Metadata for a single file (or directory or...) from a snapshot."""
509
510     # Functions for parsing various datatypes that can appear in a metadata log
511     # item.
512     @staticmethod
513     def decode_int(s):
514         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
515         if s.startswith("0x"):
516             return int(s, 16)
517         elif s.startswith("0"):
518             return int(s, 8)
519         else:
520             return int(s, 10)
521
522     @staticmethod
523     def decode_str(s):
524         """Decode a URI-encoded (%xx escapes) string."""
525         return uri_decode(s)
526
527     @staticmethod
528     def raw_str(s):
529         """An unecoded string."""
530         return s
531
532     @staticmethod
533     def decode_user(s):
534         """Decode a user/group to a tuple of uid/gid followed by name."""
535         items = s.split()
536         uid = MetadataItem.decode_int(items[0])
537         name = None
538         if len(items) > 1:
539             if items[1].startswith("(") and items[1].endswith(")"):
540                 name = MetadataItem.decode_str(items[1][1:-1])
541         return (uid, name)
542
543     @staticmethod
544     def decode_device(s):
545         """Decode a device major/minor number."""
546         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
547         return (major, minor)
548
549     class Items: pass
550
551     def __init__(self, fields, object_store):
552         """Initialize from a dictionary of key/value pairs from metadata log."""
553
554         self.fields = fields
555         self.object_store = object_store
556         self.keys = []
557         self.items = self.Items()
558         for (k, v) in fields.items():
559             if k in self.field_types:
560                 decoder = self.field_types[k]
561                 setattr(self.items, k, decoder(v))
562                 self.keys.append(k)
563
564     def data(self):
565         """Return an iterator for the data blocks that make up a file."""
566
567         # This traverses the list of blocks that make up a file, following
568         # indirect references.  It is implemented in much the same way as
569         # read_metadata, so see that function for details of the technique.
570
571         objects = self.fields['data'].split()
572         objects.reverse()
573         stack = [objects]
574
575         def follow_ref(refstr):
576             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
577             objects = self.object_store.get(refstr).split()
578             objects.reverse()
579             stack.append(objects)
580
581         while len(stack) > 0:
582             top = stack[-1]
583             if len(top) == 0:
584                 stack.pop()
585                 continue
586             ref = top.pop()
587
588             # An indirect reference which we must follow?
589             if len(ref) > 0 and ref[0] == '@':
590                 follow_ref(ref[1:])
591             else:
592                 yield ref
593
594 # Description of fields that might appear, and how they should be parsed.
595 MetadataItem.field_types = {
596     'name': MetadataItem.decode_str,
597     'type': MetadataItem.raw_str,
598     'mode': MetadataItem.decode_int,
599     'device': MetadataItem.decode_device,
600     'user': MetadataItem.decode_user,
601     'group': MetadataItem.decode_user,
602     'ctime': MetadataItem.decode_int,
603     'mtime': MetadataItem.decode_int,
604     'links': MetadataItem.decode_int,
605     'inode': MetadataItem.raw_str,
606     'checksum': MetadataItem.decode_str,
607     'size': MetadataItem.decode_int,
608     'contents': MetadataItem.decode_str,
609     'target': MetadataItem.decode_str,
610 }
611
612 def iterate_metadata(object_store, root):
613     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
614         yield MetadataItem(d, object_store)
615
616 class LocalDatabase:
617     """Access to the local database of snapshot contents and object checksums.
618
619     The local database is consulted when creating a snapshot to determine what
620     data can be re-used from old snapshots.  Segment cleaning is performed by
621     manipulating the data in the local database; the local database also
622     includes enough data to guide the segment cleaning process.
623     """
624
625     def __init__(self, path, dbname="localdb.sqlite"):
626         self.db_connection = sqlite3.connect(path + "/" + dbname)
627
628     # Low-level database access.  Use these methods when there isn't a
629     # higher-level interface available.  Exception: do, however, remember to
630     # use the commit() method after making changes to make sure they are
631     # actually saved, even when going through higher-level interfaces.
632     def commit(self):
633         "Commit any pending changes to the local database."
634         self.db_connection.commit()
635
636     def rollback(self):
637         "Roll back any pending changes to the local database."
638         self.db_connection.rollback()
639
640     def cursor(self):
641         "Return a DB-API cursor for directly accessing the local database."
642         return self.db_connection.cursor()
643
644     def list_schemes(self):
645         """Return the list of snapshots found in the local database.
646
647         The returned value is a list of tuples (id, scheme, name, time, intent).
648         """
649
650         cur = self.cursor()
651         cur.execute("select distinct scheme from snapshots")
652         schemes = [row[0] for row in cur.fetchall()]
653         schemes.sort()
654         return schemes
655
656     def list_snapshots(self, scheme):
657         """Return a list of snapshots for the given scheme."""
658         cur = self.cursor()
659         cur.execute("select name from snapshots")
660         snapshots = [row[0] for row in cur.fetchall()]
661         snapshots.sort()
662         return snapshots
663
664     def delete_snapshot(self, scheme, name):
665         """Remove the specified snapshot from the database.
666
667         Warning: This does not garbage collect all dependent data in the
668         database, so it must be followed by a call to garbage_collect() to make
669         the database consistent.
670         """
671         cur = self.cursor()
672         cur.execute("delete from snapshots where scheme = ? and name = ?",
673                     (scheme, name))
674
675     def prune_old_snapshots(self, scheme, intent=1.0):
676         """Delete entries from old snapshots from the database.
677
678         Only snapshots with the specified scheme name will be deleted.  If
679         intent is given, it gives the intended next snapshot type, to determine
680         how aggressively to clean (for example, intent=7 could be used if the
681         next snapshot will be a weekly snapshot).
682         """
683
684         cur = self.cursor()
685
686         # Find the id of the last snapshot to be created.  This is used for
687         # measuring time in a way: we record this value in each segment we
688         # expire on this run, and then on a future run can tell if there have
689         # been intervening backups made.
690         cur.execute("select max(snapshotid) from snapshots")
691         last_snapshotid = cur.fetchone()[0]
692
693         # Get the list of old snapshots for this scheme.  Delete all the old
694         # ones.  Rules for what to keep:
695         #   - Always keep the most recent snapshot.
696         #   - If snapshot X is younger than Y, and X has higher intent, then Y
697         #     can be deleted.
698         cur.execute("""select snapshotid, name, intent,
699                               julianday('now') - timestamp as age
700                        from snapshots where scheme = ?
701                        order by age""", (scheme,))
702
703         first = True
704         max_intent = intent
705         for (id, name, snap_intent, snap_age) in cur.fetchall():
706             can_delete = False
707             if snap_intent < max_intent:
708                 # Delete small-intent snapshots if there is a more recent
709                 # large-intent snapshot.
710                 can_delete = True
711             elif snap_intent == intent:
712                 # Delete previous snapshots with the specified intent level.
713                 can_delete = True
714
715             if can_delete and not first:
716                 print "Delete snapshot %d (%s)" % (id, name)
717                 cur.execute("delete from snapshots where snapshotid = ?",
718                             (id,))
719             first = False
720             max_intent = max(max_intent, snap_intent)
721
722         self.garbage_collect()
723
724     def garbage_collect(self):
725         """Garbage-collect unreachable segment and object data.
726
727         Remove all segments and checksums which is not reachable from the
728         current set of snapshots stored in the local database.
729         """
730         cur = self.cursor()
731
732         # Delete entries in the segment_utilization table which are for
733         # non-existent snapshots.
734         cur.execute("""delete from segment_utilization
735                        where snapshotid not in
736                            (select snapshotid from snapshots)""")
737
738         # Delete segments not referenced by any current snapshots.
739         cur.execute("""delete from segments where segmentid not in
740                            (select segmentid from segment_utilization)""")
741
742         # Delete dangling objects in the block_index table.
743         cur.execute("""delete from block_index
744                        where segmentid not in
745                            (select segmentid from segments)""")
746
747         # Remove sub-block signatures for deleted objects.
748         cur.execute("""delete from subblock_signatures
749                        where blockid not in
750                            (select blockid from block_index)""")
751
752     # Segment cleaning.
753     class SegmentInfo(Struct): pass
754
755     def get_segment_cleaning_list(self, age_boost=0.0):
756         """Return a list of all current segments with information for cleaning.
757
758         Return all segments which are currently known in the local database
759         (there might be other, older segments in the archive itself), and
760         return usage statistics for each to help decide which segments to
761         clean.
762
763         The returned list will be sorted by estimated cleaning benefit, with
764         segments that are best to clean at the start of the list.
765
766         If specified, the age_boost parameter (measured in days) will added to
767         the age of each segment, as a way of adjusting the benefit computation
768         before a long-lived snapshot is taken (for example, age_boost might be
769         set to 7 when cleaning prior to taking a weekly snapshot).
770         """
771
772         cur = self.cursor()
773         segments = []
774         cur.execute("""select segmentid, used, size, mtime,
775                        julianday('now') - mtime as age from segment_info
776                        where expire_time is null""")
777         for row in cur:
778             info = self.SegmentInfo()
779             info.id = row[0]
780             info.used_bytes = row[1]
781             info.size_bytes = row[2]
782             info.mtime = row[3]
783             info.age_days = row[4]
784
785             # If data is not available for whatever reason, treat it as 0.0.
786             if info.age_days is None:
787                 info.age_days = 0.0
788             if info.used_bytes is None:
789                 info.used_bytes = 0.0
790
791             # Benefit calculation: u is the estimated fraction of each segment
792             # which is utilized (bytes belonging to objects still in use
793             # divided by total size; this doesn't take compression or storage
794             # overhead into account, but should give a reasonable estimate).
795             #
796             # The total benefit is a heuristic that combines several factors:
797             # the amount of space that can be reclaimed (1 - u), an ageing
798             # factor (info.age_days) that favors cleaning old segments to young
799             # ones and also is more likely to clean segments that will be
800             # rewritten for long-lived snapshots (age_boost), and finally a
801             # penalty factor for the cost of re-uploading data (u + 0.1).
802             u = info.used_bytes / info.size_bytes
803             info.cleaning_benefit \
804                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
805
806             segments.append(info)
807
808         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
809         return segments
810
811     def mark_segment_expired(self, segment):
812         """Mark a segment for cleaning in the local database.
813
814         The segment parameter should be either a SegmentInfo object or an
815         integer segment id.  Objects in the given segment will be marked as
816         expired, which means that any future snapshots that would re-use those
817         objects will instead write out a new copy of the object, and thus no
818         future snapshots will depend upon the given segment.
819         """
820
821         if isinstance(segment, int):
822             id = segment
823         elif isinstance(segment, self.SegmentInfo):
824             id = segment.id
825         else:
826             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
827
828         cur = self.cursor()
829         cur.execute("select max(snapshotid) from snapshots")
830         last_snapshotid = cur.fetchone()[0]
831         cur.execute("update segments set expire_time = ? where segmentid = ?",
832                     (last_snapshotid, id))
833         cur.execute("update block_index set expired = 0 where segmentid = ?",
834                     (id,))
835
836     def balance_expired_objects(self):
837         """Analyze expired objects in segments to be cleaned and group by age.
838
839         Update the block_index table of the local database to group expired
840         objects by age.  The exact number of buckets and the cutoffs for each
841         are dynamically determined.  Calling this function after marking
842         segments expired will help in the segment cleaning process, by ensuring
843         that when active objects from clean segments are rewritten, they will
844         be placed into new segments roughly grouped by age.
845         """
846
847         # The expired column of the block_index table is used when generating a
848         # new Cumulus snapshot.  A null value indicates that an object may be
849         # re-used.  Otherwise, an object must be written into a new segment if
850         # needed.  Objects with distinct expired values will be written into
851         # distinct segments, to allow for some grouping by age.  The value 0 is
852         # somewhat special in that it indicates any rewritten objects can be
853         # placed in the same segment as completely new objects; this can be
854         # used for very young objects which have been expired, or objects not
855         # expected to be encountered.
856         #
857         # In the balancing process, all objects which are not used in any
858         # current snapshots will have expired set to 0.  Objects which have
859         # been seen will be sorted by age and will have expired values set to
860         # 0, 1, 2, and so on based on age (with younger objects being assigned
861         # lower values).  The number of buckets and the age cutoffs is
862         # determined by looking at the distribution of block ages.
863
864         cur = self.cursor()
865
866         # Mark all expired objects with expired = 0; these objects will later
867         # have values set to indicate groupings of objects when repacking.
868         cur.execute("""update block_index set expired = 0
869                        where expired is not null""")
870
871         # We will want to aim for at least one full segment for each bucket
872         # that we eventually create, but don't know how many bytes that should
873         # be due to compression.  So compute the average number of bytes in
874         # each expired segment as a rough estimate for the minimum size of each
875         # bucket.  (This estimate could be thrown off by many not-fully-packed
876         # segments, but for now don't worry too much about that.)  If we can't
877         # compute an average, it's probably because there are no expired
878         # segments, so we have no more work to do.
879         cur.execute("""select avg(size) from segments
880                        where segmentid in
881                            (select distinct segmentid from block_index
882                             where expired is not null)""")
883         segment_size_estimate = cur.fetchone()[0]
884         if not segment_size_estimate:
885             return
886
887         # Next, extract distribution of expired objects (number and size) by
888         # age.  Save the timestamp for "now" so that the classification of
889         # blocks into age buckets will not change later in the function, after
890         # time has passed.  Set any timestamps in the future to now, so we are
891         # guaranteed that for the rest of this function, age is always
892         # non-negative.
893         cur.execute("select julianday('now')")
894         now = cur.fetchone()[0]
895
896         cur.execute("""update block_index set timestamp = ?
897                        where timestamp > ? and expired is not null""",
898                     (now, now))
899
900         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
901                        from block_index where expired = 0
902                        group by age order by age""", (now,))
903         distribution = cur.fetchall()
904
905         # Start to determine the buckets for expired objects.  Heuristics used:
906         #   - An upper bound on the number of buckets is given by the number of
907         #     segments we estimate it will take to store all data.  In fact,
908         #     aim for a couple of segments per bucket.
909         #   - Place very young objects in bucket 0 (place with new objects)
910         #     unless there are enough of them to warrant a separate bucket.
911         #   - Try not to create unnecessarily many buckets, since fewer buckets
912         #     will allow repacked data to be grouped based on spatial locality
913         #     (while more buckets will group by temporal locality).  We want a
914         #     balance.
915         MIN_AGE = 4
916         total_bytes = sum([i[2] for i in distribution])
917         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
918         min_size = 1.5 * segment_size_estimate
919         target_size = max(2 * segment_size_estimate,
920                           total_bytes / target_buckets)
921
922         print "segment_size:", segment_size_estimate
923         print "distribution:", distribution
924         print "total_bytes:", total_bytes
925         print "target_buckets:", target_buckets
926         print "min, target size:", min_size, target_size
927
928         # Chosen cutoffs.  Each bucket consists of objects with age greater
929         # than one cutoff value, but not greater than the next largest cutoff.
930         cutoffs = []
931
932         # Starting with the oldest objects, begin grouping together into
933         # buckets of size at least target_size bytes.
934         distribution.reverse()
935         bucket_size = 0
936         min_age_bucket = False
937         for (age, items, size) in distribution:
938             if bucket_size >= target_size \
939                 or (age < MIN_AGE and not min_age_bucket):
940                 if bucket_size < target_size and len(cutoffs) > 0:
941                     cutoffs.pop()
942                 cutoffs.append(age)
943                 bucket_size = 0
944
945             bucket_size += size
946             if age < MIN_AGE:
947                 min_age_bucket = True
948
949         # The last (youngest) bucket will be group 0, unless it has enough data
950         # to be of size min_size by itself, or there happen to be no objects
951         # less than MIN_AGE at all.
952         if bucket_size >= min_size or not min_age_bucket:
953             cutoffs.append(-1)
954         cutoffs.append(-1)
955
956         print "cutoffs:", cutoffs
957
958         # Update the database to assign each object to the appropriate bucket.
959         cutoffs.reverse()
960         for i in range(len(cutoffs)):
961             cur.execute("""update block_index set expired = ?
962                            where round(? - timestamp) > ?
963                              and expired is not null""",
964                         (i, now, cutoffs[i]))