Implement a new Python wrapper for the local database.
[cumulus.git] / python / cumulus / localdb.py
1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2014 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18
19 """Python interface to the Cumulus local database of snapshot data.
20
21 Includes interfaces for iterating over data and implementing database and
22 segment cleaning.
23 """
24
25 from __future__ import division, print_function, unicode_literals
26
27 import collections
28 import datetime
29 import os
30 import sqlite3
31
32 SnapshotInfo = collections.namedtuple(
33     "SnapshotInfo",
34     ["id", "scheme", "name", "timestamp"])
35
36 SegmentInfo = collections.namedtuple(
37     "SegmentInfo",
38     ["id", "name", "timestamp", "data_size", "disk_size", "type"])
39
40 SegmentStatistics = collections.namedtuple(
41     "SegmentStatistics",
42     ["id", "name", "timestamp", "data_size", "disk_size", "type",
43      "bytes_referenced", "utilization"])
44
45 class Database:
46     """Access to the local database of snapshot contents and object checksums.
47
48     The local database is consulted when creating a snapshot to determine what
49     data can be re-used from old snapshots.  Segment cleaning is performed by
50     manipulating the data in the local database; the local database also
51     includes enough data to guide the segment cleaning process.
52     """
53
54     def __init__(self, path, dbname="localdb.sqlite"):
55         self.db_connection = sqlite3.connect(
56             os.path.join(path, dbname),
57             detect_types=sqlite3.PARSE_COLNAMES)
58
59     @staticmethod
60     def _get_id(item):
61         """Fetch the id of a database object.
62
63         If the input is an integer return it directly, otherwise try to fetch
64         the .id field on it.
65         """
66         if isinstance(item, int):
67             return item
68         else:
69             return item.id
70
71     # Low-level database access.  Use these methods when there isn't a
72     # higher-level interface available.  Exception: do, however, remember to
73     # use the commit() method after making changes to make sure they are
74     # actually saved, even when going through higher-level interfaces.
75     def commit(self):
76         "Commit any pending changes to the local database."
77         self.db_connection.commit()
78
79     def rollback(self):
80         "Roll back any pending changes to the local database."
81         self.db_connection.rollback()
82
83     def cursor(self):
84         "Return a DB-API cursor for directly accessing the local database."
85         return self.db_connection.cursor()
86
87     def get_snapshots(self):
88         """Returns information about all snapshots in the local database.
89
90         The returned value is a dictionary mapping scheme names to lists of
91         snapshots for that scheme.  Each list entry is a SnapshotInfo instance.
92         """
93         cur = self.cursor()
94         cur.execute("""select snapshotid, scheme, name,
95                               datetime(timestamp) as "timestamp [timestamp]"
96                        from snapshots order by scheme, name""")
97         snapshots = {}
98         for row in cur.fetchall():
99             info = SnapshotInfo(*row)
100             snapshots.setdefault(info.scheme, []).append(info)
101         return snapshots
102
103     def delete_snapshot(self, snapshot):
104         """Remove the specified snapshot from the database.
105
106         Returns a boolean indicating whether the snapshot was deleted.
107
108         Warning: This does not garbage collect all dependent data in the
109         database, so it must be followed by a call to garbage_collect() to make
110         the database consistent.
111         """
112         cur = self.cursor()
113         cur.execute("delete from snapshots where snapshotid = ?",
114                     (self._get_id(snapshot),))
115         return cur.rowcount > 0
116
117     def garbage_collect(self):
118         """Garbage-collect unreachable segment and object data.
119
120         Remove all segments and checksums which is not reachable from the
121         current set of snapshots stored in the local database.
122         """
123         cur = self.cursor()
124
125         # Delete entries in the segment_utilization table which are for
126         # non-existent snapshots.
127         cur.execute("""delete from segment_utilization
128                        where snapshotid not in
129                            (select snapshotid from snapshots)""")
130
131         # Delete segments not referenced by any current snapshots.
132         cur.execute("""delete from segments where segmentid not in
133                            (select segmentid from segment_utilization)""")
134
135         # Delete dangling objects in the block_index table.
136         cur.execute("""delete from block_index
137                        where segmentid not in
138                            (select segmentid from segments)""")
139
140         # Remove sub-block signatures for deleted objects.
141         cur.execute("""delete from subblock_signatures
142                        where blockid not in
143                            (select blockid from block_index)""")
144
145     def get_segment_info(self):
146         """Retrieve statistics about segments for cleaning decisions."""
147         cur = self.cursor()
148         cur.execute("""select segmentid, segment,
149                               datetime(timestamp) as "timestamp [timestamp]",
150                               data_size, disk_size, type
151                        from segments""")
152         return dict((x[0], SegmentInfo(*x)) for x in cur.fetchall())
153
154     def get_segment_utilizations(self, snapshots):
155         """Computes estimates for the data referenced in each segment.
156
157         Computes a lower bound of the amount of data that is referenced in
158         segments by the specified set of snapshots.
159         """
160         cur = self.cursor()
161         segment_info = self.get_segment_info()
162         snapshots = [self._get_id(s) for s in snapshots]
163
164         query = """select segmentid, max(bytes_referenced)
165                    from segment_utilization where snapshotid in (%s)
166                    group by segmentid""" % (",".join(["?"] * len(snapshots)))
167         segments = {}
168         for row in cur.execute(query, snapshots):
169             info = segment_info[row[0]]
170             segments[row[0]] = SegmentStatistics(
171                 bytes_referenced=row[1],
172                 utilization=row[1]/info.data_size,
173                 **info._asdict())
174         return segments
175
176     def mark_segment_expired(self, segment):
177         """Mark a segment for cleaning in the local database.
178
179         The segment parameter should be either a SegmentInfo object or an
180         integer segment id.  Objects in the given segment will be marked as
181         expired, which means that any future snapshots that would re-use those
182         objects will instead write out a new copy of the object, and thus no
183         future snapshots will depend upon the given segment.
184         """
185         cur = self.cursor()
186         cur.execute("update block_index set expired = 0 where segmentid = ?",
187                     (self._get_id(segment),))
188
189 def run_cleaner(database):
190     # Find the most recent snapshot for each backup scheme, then delete all
191     # older snapshots from the database.
192     kept_snapshots = []
193     for snapshots in database.get_snapshots().values():
194         snapshots = sorted(snapshots, key=lambda s: s.id)
195         kept_snapshots.append(snapshots[-1])
196         for s in snapshots[:-1]:
197             print("Deleting snapshot", s)
198             database.delete_snapshot(s)
199     print("Keeping snapshots:", kept_snapshots)
200     database.garbage_collect()
201
202     # TODO: Look at adding more complex policies later, perhaps under user
203     # control.  Cleaning policies to consider (which can be combined):
204     #   - clean older than specified age
205     #   - do not clean younger than specified age
206     #   - clean below utilization threshold
207     #   - do not allow data from previously-unreferenced segments
208     #   - gradual: clean a fraction of segments which match a rule
209     #   - minimum segment size (disk or data)
210     #   - benefit calculation?
211
212     for segment in database.get_segment_utilizations(kept_snapshots).values():
213         if segment.utilization < 0.4:
214             print("Clean segment:", segment)
215             database.mark_segment_expired(segment)
216
217     database.commit()
218
219 if __name__ == "__main__":
220     # Demo usage: runs a cleaner on the database located in the current
221     # directory.  This should be removed later when cleaning is properly hooked
222     # up to the main tool.
223     run_cleaner(Database("."))