From fae5b159f9526d9903121fea3d5da7ba4b4ba73b Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Mon, 17 Feb 2014 16:09:40 -0800 Subject: [PATCH] Implement a new Python wrapper for the local database. This provides a cleaned-up wrapper for the local Cumulus database, in a separate module. This also currently functions as a simple segment cleaner (to be used until a more comprehensive cleaner is available). --- python/cumulus/localdb.py | 223 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 python/cumulus/localdb.py diff --git a/python/cumulus/localdb.py b/python/cumulus/localdb.py new file mode 100644 index 0000000..2e9fdd9 --- /dev/null +++ b/python/cumulus/localdb.py @@ -0,0 +1,223 @@ +# Cumulus: Efficient Filesystem Backup to the Cloud +# Copyright (C) 2014 The Cumulus Developers +# See the AUTHORS file for a list of contributors. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +"""Python interface to the Cumulus local database of snapshot data. + +Includes interfaces for iterating over data and implementing database and +segment cleaning. +""" + +from __future__ import division, print_function, unicode_literals + +import collections +import datetime +import os +import sqlite3 + +SnapshotInfo = collections.namedtuple( + "SnapshotInfo", + ["id", "scheme", "name", "timestamp"]) + +SegmentInfo = collections.namedtuple( + "SegmentInfo", + ["id", "name", "timestamp", "data_size", "disk_size", "type"]) + +SegmentStatistics = collections.namedtuple( + "SegmentStatistics", + ["id", "name", "timestamp", "data_size", "disk_size", "type", + "bytes_referenced", "utilization"]) + +class Database: + """Access to the local database of snapshot contents and object checksums. + + The local database is consulted when creating a snapshot to determine what + data can be re-used from old snapshots. Segment cleaning is performed by + manipulating the data in the local database; the local database also + includes enough data to guide the segment cleaning process. + """ + + def __init__(self, path, dbname="localdb.sqlite"): + self.db_connection = sqlite3.connect( + os.path.join(path, dbname), + detect_types=sqlite3.PARSE_COLNAMES) + + @staticmethod + def _get_id(item): + """Fetch the id of a database object. + + If the input is an integer return it directly, otherwise try to fetch + the .id field on it. + """ + if isinstance(item, int): + return item + else: + return item.id + + # Low-level database access. Use these methods when there isn't a + # higher-level interface available. Exception: do, however, remember to + # use the commit() method after making changes to make sure they are + # actually saved, even when going through higher-level interfaces. + def commit(self): + "Commit any pending changes to the local database." + self.db_connection.commit() + + def rollback(self): + "Roll back any pending changes to the local database." + self.db_connection.rollback() + + def cursor(self): + "Return a DB-API cursor for directly accessing the local database." + return self.db_connection.cursor() + + def get_snapshots(self): + """Returns information about all snapshots in the local database. + + The returned value is a dictionary mapping scheme names to lists of + snapshots for that scheme. Each list entry is a SnapshotInfo instance. + """ + cur = self.cursor() + cur.execute("""select snapshotid, scheme, name, + datetime(timestamp) as "timestamp [timestamp]" + from snapshots order by scheme, name""") + snapshots = {} + for row in cur.fetchall(): + info = SnapshotInfo(*row) + snapshots.setdefault(info.scheme, []).append(info) + return snapshots + + def delete_snapshot(self, snapshot): + """Remove the specified snapshot from the database. + + Returns a boolean indicating whether the snapshot was deleted. + + Warning: This does not garbage collect all dependent data in the + database, so it must be followed by a call to garbage_collect() to make + the database consistent. + """ + cur = self.cursor() + cur.execute("delete from snapshots where snapshotid = ?", + (self._get_id(snapshot),)) + return cur.rowcount > 0 + + def garbage_collect(self): + """Garbage-collect unreachable segment and object data. + + Remove all segments and checksums which is not reachable from the + current set of snapshots stored in the local database. + """ + cur = self.cursor() + + # Delete entries in the segment_utilization table which are for + # non-existent snapshots. + cur.execute("""delete from segment_utilization + where snapshotid not in + (select snapshotid from snapshots)""") + + # Delete segments not referenced by any current snapshots. + cur.execute("""delete from segments where segmentid not in + (select segmentid from segment_utilization)""") + + # Delete dangling objects in the block_index table. + cur.execute("""delete from block_index + where segmentid not in + (select segmentid from segments)""") + + # Remove sub-block signatures for deleted objects. + cur.execute("""delete from subblock_signatures + where blockid not in + (select blockid from block_index)""") + + def get_segment_info(self): + """Retrieve statistics about segments for cleaning decisions.""" + cur = self.cursor() + cur.execute("""select segmentid, segment, + datetime(timestamp) as "timestamp [timestamp]", + data_size, disk_size, type + from segments""") + return dict((x[0], SegmentInfo(*x)) for x in cur.fetchall()) + + def get_segment_utilizations(self, snapshots): + """Computes estimates for the data referenced in each segment. + + Computes a lower bound of the amount of data that is referenced in + segments by the specified set of snapshots. + """ + cur = self.cursor() + segment_info = self.get_segment_info() + snapshots = [self._get_id(s) for s in snapshots] + + query = """select segmentid, max(bytes_referenced) + from segment_utilization where snapshotid in (%s) + group by segmentid""" % (",".join(["?"] * len(snapshots))) + segments = {} + for row in cur.execute(query, snapshots): + info = segment_info[row[0]] + segments[row[0]] = SegmentStatistics( + bytes_referenced=row[1], + utilization=row[1]/info.data_size, + **info._asdict()) + return segments + + def mark_segment_expired(self, segment): + """Mark a segment for cleaning in the local database. + + The segment parameter should be either a SegmentInfo object or an + integer segment id. Objects in the given segment will be marked as + expired, which means that any future snapshots that would re-use those + objects will instead write out a new copy of the object, and thus no + future snapshots will depend upon the given segment. + """ + cur = self.cursor() + cur.execute("update block_index set expired = 0 where segmentid = ?", + (self._get_id(segment),)) + +def run_cleaner(database): + # Find the most recent snapshot for each backup scheme, then delete all + # older snapshots from the database. + kept_snapshots = [] + for snapshots in database.get_snapshots().values(): + snapshots = sorted(snapshots, key=lambda s: s.id) + kept_snapshots.append(snapshots[-1]) + for s in snapshots[:-1]: + print("Deleting snapshot", s) + database.delete_snapshot(s) + print("Keeping snapshots:", kept_snapshots) + database.garbage_collect() + + # TODO: Look at adding more complex policies later, perhaps under user + # control. Cleaning policies to consider (which can be combined): + # - clean older than specified age + # - do not clean younger than specified age + # - clean below utilization threshold + # - do not allow data from previously-unreferenced segments + # - gradual: clean a fraction of segments which match a rule + # - minimum segment size (disk or data) + # - benefit calculation? + + for segment in database.get_segment_utilizations(kept_snapshots).values(): + if segment.utilization < 0.4: + print("Clean segment:", segment) + database.mark_segment_expired(segment) + + database.commit() + +if __name__ == "__main__": + # Demo usage: runs a cleaner on the database located in the current + # directory. This should be removed later when cleaning is properly hooked + # up to the main tool. + run_cleaner(Database(".")) -- 2.20.1