1 # Cumulus: Efficient Filesystem Backup to the Cloud
2 # Copyright (C) 2014 The Cumulus Developers
3 # See the AUTHORS file for a list of contributors.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License along
16 # with this program; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 """Python interface to the Cumulus local database of snapshot data.
21 Includes interfaces for iterating over data and implementing database and
25 from __future__ import division, print_function, unicode_literals
32 SnapshotInfo = collections.namedtuple(
34 ["id", "scheme", "name", "timestamp"])
36 SegmentInfo = collections.namedtuple(
38 ["id", "name", "timestamp", "data_size", "disk_size", "type"])
40 SegmentStatistics = collections.namedtuple(
42 ["id", "name", "timestamp", "data_size", "disk_size", "type",
43 "bytes_referenced", "utilization"])
46 """Access to the local database of snapshot contents and object checksums.
48 The local database is consulted when creating a snapshot to determine what
49 data can be re-used from old snapshots. Segment cleaning is performed by
50 manipulating the data in the local database; the local database also
51 includes enough data to guide the segment cleaning process.
54 def __init__(self, path, dbname="localdb.sqlite"):
55 self.db_connection = sqlite3.connect(
56 os.path.join(path, dbname),
57 detect_types=sqlite3.PARSE_COLNAMES)
61 """Fetch the id of a database object.
63 If the input is an integer return it directly, otherwise try to fetch
66 if isinstance(item, int):
71 # Low-level database access. Use these methods when there isn't a
72 # higher-level interface available. Exception: do, however, remember to
73 # use the commit() method after making changes to make sure they are
74 # actually saved, even when going through higher-level interfaces.
76 "Commit any pending changes to the local database."
77 self.db_connection.commit()
80 "Roll back any pending changes to the local database."
81 self.db_connection.rollback()
84 "Return a DB-API cursor for directly accessing the local database."
85 return self.db_connection.cursor()
87 def get_snapshots(self):
88 """Returns information about all snapshots in the local database.
90 The returned value is a dictionary mapping scheme names to lists of
91 snapshots for that scheme. Each list entry is a SnapshotInfo instance.
94 cur.execute("""select snapshotid, scheme, name,
95 datetime(timestamp) as "timestamp [timestamp]"
96 from snapshots order by scheme, name""")
98 for row in cur.fetchall():
99 info = SnapshotInfo(*row)
100 snapshots.setdefault(info.scheme, []).append(info)
103 def delete_snapshot(self, snapshot):
104 """Remove the specified snapshot from the database.
106 Returns a boolean indicating whether the snapshot was deleted.
108 Warning: This does not garbage collect all dependent data in the
109 database, so it must be followed by a call to garbage_collect() to make
110 the database consistent.
113 cur.execute("delete from snapshots where snapshotid = ?",
114 (self._get_id(snapshot),))
115 return cur.rowcount > 0
117 def garbage_collect(self):
118 """Garbage-collect unreachable segment and object data.
120 Remove all segments and checksums which is not reachable from the
121 current set of snapshots stored in the local database.
125 # Delete entries in the segment_utilization table which are for
126 # non-existent snapshots.
127 cur.execute("""delete from segment_utilization
128 where snapshotid not in
129 (select snapshotid from snapshots)""")
131 # Delete segments not referenced by any current snapshots.
132 cur.execute("""delete from segments where segmentid not in
133 (select segmentid from segment_utilization)""")
135 # Delete dangling objects in the block_index table.
136 cur.execute("""delete from block_index
137 where segmentid not in
138 (select segmentid from segments)""")
140 # Remove sub-block signatures for deleted objects.
141 cur.execute("""delete from subblock_signatures
143 (select blockid from block_index)""")
145 def get_segment_info(self):
146 """Retrieve statistics about segments for cleaning decisions."""
148 cur.execute("""select segmentid, segment,
149 datetime(timestamp) as "timestamp [timestamp]",
150 data_size, disk_size, type
152 return dict((x[0], SegmentInfo(*x)) for x in cur.fetchall())
154 def get_segment_utilizations(self, snapshots):
155 """Computes estimates for the data referenced in each segment.
157 Computes a lower bound of the amount of data that is referenced in
158 segments by the specified set of snapshots.
161 segment_info = self.get_segment_info()
162 snapshots = [self._get_id(s) for s in snapshots]
164 query = """select segmentid, max(bytes_referenced)
165 from segment_utilization where snapshotid in (%s)
166 group by segmentid""" % (",".join(["?"] * len(snapshots)))
168 for row in cur.execute(query, snapshots):
169 info = segment_info[row[0]]
170 segments[row[0]] = SegmentStatistics(
171 bytes_referenced=row[1],
172 utilization=row[1]/info.data_size,
176 def mark_segment_expired(self, segment):
177 """Mark a segment for cleaning in the local database.
179 The segment parameter should be either a SegmentInfo object or an
180 integer segment id. Objects in the given segment will be marked as
181 expired, which means that any future snapshots that would re-use those
182 objects will instead write out a new copy of the object, and thus no
183 future snapshots will depend upon the given segment.
186 cur.execute("update block_index set expired = 0 where segmentid = ?",
187 (self._get_id(segment),))
189 def run_cleaner(database):
190 # Find the most recent snapshot for each backup scheme, then delete all
191 # older snapshots from the database.
193 for snapshots in database.get_snapshots().values():
194 snapshots = sorted(snapshots, key=lambda s: s.id)
195 kept_snapshots.append(snapshots[-1])
196 for s in snapshots[:-1]:
197 print("Deleting snapshot", s)
198 database.delete_snapshot(s)
199 print("Keeping snapshots:", kept_snapshots)
200 database.garbage_collect()
202 # TODO: Look at adding more complex policies later, perhaps under user
203 # control. Cleaning policies to consider (which can be combined):
204 # - clean older than specified age
205 # - do not clean younger than specified age
206 # - clean below utilization threshold
207 # - do not allow data from previously-unreferenced segments
208 # - gradual: clean a fraction of segments which match a rule
209 # - minimum segment size (disk or data)
210 # - benefit calculation?
212 for segment in database.get_segment_utilizations(kept_snapshots).values():
213 if segment.utilization < 0.4:
214 print("Clean segment:", segment)
215 database.mark_segment_expired(segment)
219 if __name__ == "__main__":
220 # Demo usage: runs a cleaner on the database located in the current
221 # directory. This should be removed later when cleaning is properly hooked
222 # up to the main tool.
223 run_cleaner(Database("."))