Preview of a new Python-based management tool; includes segment cleaning.
[cumulus.git] / lbs.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 from pysqlite2 import dbapi2 as sqlite3
13
14 class Struct:
15     """A class which merely acts as a data container.
16
17     Instances of this class (or its subclasses) are merely used to store data
18     in various attributes.  No methods are provided.
19     """
20
21     def __repr__(self):
22         return "<%s %s>" % (self.__class__, self.__dict__)
23
24 class LocalDatabase:
25     """Access to the local database of snapshot contents and object checksums.
26
27     The local database is consulted when creating a snapshot to determine what
28     data can be re-used from old snapshots.  Segment cleaning is performed by
29     manipulating the data in the local database; the local database also
30     includes enough data to guide the segment cleaning process.
31     """
32
33     def __init__(self, path, dbname="localdb.sqlite"):
34         self.db_connection = sqlite3.connect(path + "/" + dbname)
35
36     # Low-level database access.  Use these methods when there isn't a
37     # higher-level interface available.  Exception: do, however, remember to
38     # use the commit() method after making changes to make sure they are
39     # actually saved, even when going through higher-level interfaces.
40     def commit(self):
41         "Commit any pending changes to the local database."
42         self.db_connection.commit()
43
44     def rollback(self):
45         "Roll back any pending changes to the local database."
46         self.db_connection.rollback()
47
48     def cursor(self):
49         "Return a DB-API cursor for directly accessing the local database."
50         return self.db_connection.cursor()
51
52     def garbage_collect(self):
53         """Delete entries from old snapshots from the database."""
54
55         cur = self.cursor()
56
57         # Delete old snapshots.
58         cur.execute("""delete from snapshots
59                        where snapshotid < (select max(snapshotid)
60                                            from snapshots)""")
61
62         # Delete entries in the snapshot_contents table which are for
63         # non-existent snapshots.
64         cur.execute("""delete from snapshot_contents
65                        where snapshotid not in
66                            (select snapshotid from snapshots)""")
67
68         # Find segments which contain no objects used by any current snapshots,
69         # and delete them from the segment table.
70         cur.execute("""delete from segments where segmentid not in
71                            (select distinct segmentid from snapshot_contents
72                                 natural join block_index)""")
73
74         # Finally, delete objects contained in non-existent segments.  We can't
75         # simply delete unused objects, since we use the set of unused objects
76         # to determine the used/free ratio of segments.
77         cur.execute("""delete from block_index
78                        where segmentid not in
79                            (select segmentid from segments)""")
80
81     # Segment cleaning.
82     class SegmentInfo(Struct): pass
83
84     def get_segment_cleaning_list(self, age_boost=0.0):
85         """Return a list of all current segments with information for cleaning.
86
87         Return all segments which are currently known in the local database
88         (there might be other, older segments in the archive itself), and
89         return usage statistics for each to help decide which segments to
90         clean.
91
92         The returned list will be sorted by estimated cleaning benefit, with
93         segments that are best to clean at the start of the list.
94
95         If specified, the age_boost parameter (measured in days) will added to
96         the age of each segment, as a way of adjusting the benefit computation
97         before a long-lived snapshot is taken (for example, age_boost might be
98         set to 7 when cleaning prior to taking a weekly snapshot).
99         """
100
101         cur = self.cursor()
102         segments = []
103         cur.execute("""select segmentid, used, size, mtime,
104                        julianday('now') - mtime as age from segment_info""")
105         for row in cur:
106             info = self.SegmentInfo()
107             info.id = row[0]
108             info.used_bytes = row[1]
109             info.size_bytes = row[2]
110             info.mtime = row[3]
111             info.age_days = row[4]
112
113             # Benefit calculation: u is the estimated fraction of each segment
114             # which is utilized (bytes belonging to objects still in use
115             # divided by total size; this doesn't take compression or storage
116             # overhead into account, but should give a reasonable estimate).
117             #
118             # The total benefit is a heuristic that combines several factors:
119             # the amount of space that can be reclaimed (1 - u), an ageing
120             # factor (info.age_days) that favors cleaning old segments to young
121             # ones and also is more likely to clean segments that will be
122             # rewritten for long-lived snapshots (age_boost), and finally a
123             # penalty factor for the cost of re-uploading data (u + 0.1).
124             u = info.used_bytes / info.size_bytes
125             info.cleaning_benefit \
126                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
127
128             segments.append(info)
129
130         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
131         return segments
132
133     def mark_segment_expired(self, segment):
134         """Mark a segment for cleaning in the local database.
135
136         The segment parameter should be either a SegmentInfo object or an
137         integer segment id.  Objects in the given segment will be marked as
138         expired, which means that any future snapshots that would re-use those
139         objects will instead write out a new copy of the object, and thus no
140         future snapshots will depend upon the given segment.
141         """
142
143         if isinstance(segment, int):
144             id = segment
145         elif isinstance(segment, self.SegmentInfo):
146             id = segment.id
147         else:
148             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
149
150         cur = self.cursor()
151         cur.execute("update block_index set expired = 1 where segmentid = ?",
152                     (id,))
153
154     def balance_expired_objects(self):
155         """Analyze expired objects in segments to be cleaned and group by age.
156
157         Update the block_index table of the local database to group expired
158         objects by age.  The exact number of buckets and the cutoffs for each
159         are dynamically determined.  Calling this function after marking
160         segments expired will help in the segment cleaning process, by ensuring
161         that when active objects from clean segments are rewritten, they will
162         be placed into new segments roughly grouped by age.
163         """
164
165         # The expired column of the block_index table is used when generating a
166         # new LBS snapshot.  A null value indicates that an object may be
167         # re-used.  Otherwise, an object must be written into a new segment if
168         # needed.  Objects with distinct expired values will be written into
169         # distinct segments, to allow for some grouping by age.  The value 0 is
170         # somewhat special in that it indicates any rewritten objects can be
171         # placed in the same segment as completely new objects; this can be
172         # used for very young objects which have been expired, or objects not
173         # expected to be encountered.
174         #
175         # In the balancing process, all objects which are not used in any
176         # current snapshots will have expired set to 0.  Objects which have
177         # been seen will be sorted by age and will have expired values set to
178         # 0, 1, 2, and so on based on age (with younger objects being assigned
179         # lower values).  The number of buckets and the age cutoffs is
180         # determined by looking at the distribution of block ages.
181
182         cur = self.cursor()
183
184         # First step: Mark all unused-and-expired objects with expired = -1,
185         # which will cause us to mostly ignore these objects when rebalancing.
186         # At the end, we will set these objects to be in group expired = 0.
187         # Mark expired objects which still seem to be in use with expired = 0;
188         # these objects will later have values set to indicate groupings of
189         # objects when repacking.
190         cur.execute("""update block_index set expired = -1
191                        where expired is not null""")
192
193         cur.execute("""update block_index set expired = 0
194                        where expired is not null and blockid in
195                            (select blockid from snapshot_contents)""")
196
197         # We will want to aim for at least one full segment for each bucket
198         # that we eventually create, but don't know how many bytes that should
199         # be due to compression.  So compute the average number of bytes in
200         # each expired segment as a rough estimate for the minimum size of each
201         # bucket.  (This estimate could be thrown off by many not-fully-packed
202         # segments, but for now don't worry too much about that.)  If we can't
203         # compute an average, it's probably because there are no expired
204         # segments, so we have no more work to do.
205         cur.execute("""select avg(size) from segment_info
206                        where segmentid in
207                            (select distinct segmentid from block_index
208                             where expired is not null)""")
209         segment_size_estimate = cur.fetchone()[0]
210         if not segment_size_estimate:
211             return
212
213         # Next, extract distribution of expired objects (number and size) by
214         # age.  Save the timestamp for "now" so that the classification of
215         # blocks into age buckets will not change later in the function, after
216         # time has passed.  Set any timestamps in the future to now, so we are
217         # guaranteed that for the rest of this function, age is always
218         # non-negative.
219         cur.execute("select julianday('now')")
220         now = cur.fetchone()[0]
221
222         cur.execute("""update block_index set timestamp = ?
223                        where timestamp > ? and expired is not null""",
224                     (now, now))
225
226         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
227                        from block_index where expired = 0
228                        group by age order by age""", (now,))
229         distribution = cur.fetchall()
230
231         # Start to determine the buckets for expired objects.  Heuristics used:
232         #   - An upper bound on the number of buckets is given by the number of
233         #     segments we estimate it will take to store all data.  In fact,
234         #     aim for a couple of segments per bucket.
235         #   - Place very young objects in bucket 0 (place with new objects)
236         #     unless there are enough of them to warrant a separate bucket.
237         #   - Try not to create unnecessarily many buckets, since fewer buckets
238         #     will allow repacked data to be grouped based on spatial locality
239         #     (while more buckets will group by temporal locality).  We want a
240         #     balance.
241         MIN_AGE = 4
242         total_bytes = sum([i[2] for i in distribution])
243         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
244         min_size = 1.5 * segment_size_estimate
245         target_size = max(2 * segment_size_estimate,
246                           total_bytes / target_buckets)
247
248         print "segment_size:", segment_size_estimate
249         print "distribution:", distribution
250         print "total_bytes:", total_bytes
251         print "target_buckets:", target_buckets
252         print "min, target size:", min_size, target_size
253
254         # Chosen cutoffs.  Each bucket consists of objects with age greater
255         # than one cutoff value, but not greater than the next largest cutoff.
256         cutoffs = []
257
258         # Starting with the oldest objects, begin grouping together into
259         # buckets of size at least target_size bytes.
260         distribution.reverse()
261         bucket_size = 0
262         min_age_bucket = False
263         for (age, items, size) in distribution:
264             if bucket_size >= target_size \
265                 or (age < MIN_AGE and not min_age_bucket):
266                 if bucket_size < target_size and len(cutoffs) > 0:
267                     cutoffs.pop()
268                 cutoffs.append(age)
269                 bucket_size = 0
270
271             bucket_size += size
272             if age < MIN_AGE:
273                 min_age_bucket = True
274
275         # The last (youngest) bucket will be group 0, unless it has enough data
276         # to be of size min_size by itself, or there happen to be no objects
277         # less than MIN_AGE at all.
278         if bucket_size >= min_size or not min_age_bucket:
279             cutoffs.append(-1)
280         cutoffs.append(-1)
281
282         print "cutoffs:", cutoffs
283
284         # Update the database to assign each object to the appropriate bucket.
285         cutoffs.reverse()
286         for i in range(len(cutoffs)):
287             cur.execute("""update block_index set expired = ?
288                            where round(? - timestamp) > ? and expired >= 0""",
289                         (i, now, cutoffs[i]))
290         cur.execute("update block_index set expired = 0 where expired = -1")