1 """High-level interface for working with LBS archives.
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5 - listing the snapshots and segments present
6 - reading segment contents
7 - parsing snapshot descriptors and snapshot metadata logs
8 - reading and maintaining the local object database
11 from __future__ import division
12 from pysqlite2 import dbapi2 as sqlite3
15 """A class which merely acts as a data container.
17 Instances of this class (or its subclasses) are merely used to store data
18 in various attributes. No methods are provided.
22 return "<%s %s>" % (self.__class__, self.__dict__)
25 """Access to the local database of snapshot contents and object checksums.
27 The local database is consulted when creating a snapshot to determine what
28 data can be re-used from old snapshots. Segment cleaning is performed by
29 manipulating the data in the local database; the local database also
30 includes enough data to guide the segment cleaning process.
33 def __init__(self, path, dbname="localdb.sqlite"):
34 self.db_connection = sqlite3.connect(path + "/" + dbname)
36 # Low-level database access. Use these methods when there isn't a
37 # higher-level interface available. Exception: do, however, remember to
38 # use the commit() method after making changes to make sure they are
39 # actually saved, even when going through higher-level interfaces.
41 "Commit any pending changes to the local database."
42 self.db_connection.commit()
45 "Roll back any pending changes to the local database."
46 self.db_connection.rollback()
49 "Return a DB-API cursor for directly accessing the local database."
50 return self.db_connection.cursor()
52 def garbage_collect(self):
53 """Delete entries from old snapshots from the database."""
57 # Delete old snapshots.
58 cur.execute("""delete from snapshots
59 where snapshotid < (select max(snapshotid)
62 # Delete entries in the snapshot_contents table which are for
63 # non-existent snapshots.
64 cur.execute("""delete from snapshot_contents
65 where snapshotid not in
66 (select snapshotid from snapshots)""")
68 # Find segments which contain no objects used by any current snapshots,
69 # and delete them from the segment table.
70 cur.execute("""delete from segments where segmentid not in
71 (select distinct segmentid from snapshot_contents
72 natural join block_index)""")
74 # Finally, delete objects contained in non-existent segments. We can't
75 # simply delete unused objects, since we use the set of unused objects
76 # to determine the used/free ratio of segments.
77 cur.execute("""delete from block_index
78 where segmentid not in
79 (select segmentid from segments)""")
82 class SegmentInfo(Struct): pass
84 def get_segment_cleaning_list(self, age_boost=0.0):
85 """Return a list of all current segments with information for cleaning.
87 Return all segments which are currently known in the local database
88 (there might be other, older segments in the archive itself), and
89 return usage statistics for each to help decide which segments to
92 The returned list will be sorted by estimated cleaning benefit, with
93 segments that are best to clean at the start of the list.
95 If specified, the age_boost parameter (measured in days) will added to
96 the age of each segment, as a way of adjusting the benefit computation
97 before a long-lived snapshot is taken (for example, age_boost might be
98 set to 7 when cleaning prior to taking a weekly snapshot).
103 cur.execute("""select segmentid, used, size, mtime,
104 julianday('now') - mtime as age from segment_info""")
106 info = self.SegmentInfo()
108 info.used_bytes = row[1]
109 info.size_bytes = row[2]
111 info.age_days = row[4]
113 # Benefit calculation: u is the estimated fraction of each segment
114 # which is utilized (bytes belonging to objects still in use
115 # divided by total size; this doesn't take compression or storage
116 # overhead into account, but should give a reasonable estimate).
118 # The total benefit is a heuristic that combines several factors:
119 # the amount of space that can be reclaimed (1 - u), an ageing
120 # factor (info.age_days) that favors cleaning old segments to young
121 # ones and also is more likely to clean segments that will be
122 # rewritten for long-lived snapshots (age_boost), and finally a
123 # penalty factor for the cost of re-uploading data (u + 0.1).
124 u = info.used_bytes / info.size_bytes
125 info.cleaning_benefit \
126 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
128 segments.append(info)
130 segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
133 def mark_segment_expired(self, segment):
134 """Mark a segment for cleaning in the local database.
136 The segment parameter should be either a SegmentInfo object or an
137 integer segment id. Objects in the given segment will be marked as
138 expired, which means that any future snapshots that would re-use those
139 objects will instead write out a new copy of the object, and thus no
140 future snapshots will depend upon the given segment.
143 if isinstance(segment, int):
145 elif isinstance(segment, self.SegmentInfo):
148 raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
151 cur.execute("update block_index set expired = 1 where segmentid = ?",
154 def balance_expired_objects(self):
155 """Analyze expired objects in segments to be cleaned and group by age.
157 Update the block_index table of the local database to group expired
158 objects by age. The exact number of buckets and the cutoffs for each
159 are dynamically determined. Calling this function after marking
160 segments expired will help in the segment cleaning process, by ensuring
161 that when active objects from clean segments are rewritten, they will
162 be placed into new segments roughly grouped by age.
165 # The expired column of the block_index table is used when generating a
166 # new LBS snapshot. A null value indicates that an object may be
167 # re-used. Otherwise, an object must be written into a new segment if
168 # needed. Objects with distinct expired values will be written into
169 # distinct segments, to allow for some grouping by age. The value 0 is
170 # somewhat special in that it indicates any rewritten objects can be
171 # placed in the same segment as completely new objects; this can be
172 # used for very young objects which have been expired, or objects not
173 # expected to be encountered.
175 # In the balancing process, all objects which are not used in any
176 # current snapshots will have expired set to 0. Objects which have
177 # been seen will be sorted by age and will have expired values set to
178 # 0, 1, 2, and so on based on age (with younger objects being assigned
179 # lower values). The number of buckets and the age cutoffs is
180 # determined by looking at the distribution of block ages.
184 # First step: Mark all unused-and-expired objects with expired = -1,
185 # which will cause us to mostly ignore these objects when rebalancing.
186 # At the end, we will set these objects to be in group expired = 0.
187 # Mark expired objects which still seem to be in use with expired = 0;
188 # these objects will later have values set to indicate groupings of
189 # objects when repacking.
190 cur.execute("""update block_index set expired = -1
191 where expired is not null""")
193 cur.execute("""update block_index set expired = 0
194 where expired is not null and blockid in
195 (select blockid from snapshot_contents)""")
197 # We will want to aim for at least one full segment for each bucket
198 # that we eventually create, but don't know how many bytes that should
199 # be due to compression. So compute the average number of bytes in
200 # each expired segment as a rough estimate for the minimum size of each
201 # bucket. (This estimate could be thrown off by many not-fully-packed
202 # segments, but for now don't worry too much about that.) If we can't
203 # compute an average, it's probably because there are no expired
204 # segments, so we have no more work to do.
205 cur.execute("""select avg(size) from segment_info
207 (select distinct segmentid from block_index
208 where expired is not null)""")
209 segment_size_estimate = cur.fetchone()[0]
210 if not segment_size_estimate:
213 # Next, extract distribution of expired objects (number and size) by
214 # age. Save the timestamp for "now" so that the classification of
215 # blocks into age buckets will not change later in the function, after
216 # time has passed. Set any timestamps in the future to now, so we are
217 # guaranteed that for the rest of this function, age is always
219 cur.execute("select julianday('now')")
220 now = cur.fetchone()[0]
222 cur.execute("""update block_index set timestamp = ?
223 where timestamp > ? and expired is not null""",
226 cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
227 from block_index where expired = 0
228 group by age order by age""", (now,))
229 distribution = cur.fetchall()
231 # Start to determine the buckets for expired objects. Heuristics used:
232 # - An upper bound on the number of buckets is given by the number of
233 # segments we estimate it will take to store all data. In fact,
234 # aim for a couple of segments per bucket.
235 # - Place very young objects in bucket 0 (place with new objects)
236 # unless there are enough of them to warrant a separate bucket.
237 # - Try not to create unnecessarily many buckets, since fewer buckets
238 # will allow repacked data to be grouped based on spatial locality
239 # (while more buckets will group by temporal locality). We want a
242 total_bytes = sum([i[2] for i in distribution])
243 target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
244 min_size = 1.5 * segment_size_estimate
245 target_size = max(2 * segment_size_estimate,
246 total_bytes / target_buckets)
248 print "segment_size:", segment_size_estimate
249 print "distribution:", distribution
250 print "total_bytes:", total_bytes
251 print "target_buckets:", target_buckets
252 print "min, target size:", min_size, target_size
254 # Chosen cutoffs. Each bucket consists of objects with age greater
255 # than one cutoff value, but not greater than the next largest cutoff.
258 # Starting with the oldest objects, begin grouping together into
259 # buckets of size at least target_size bytes.
260 distribution.reverse()
262 min_age_bucket = False
263 for (age, items, size) in distribution:
264 if bucket_size >= target_size \
265 or (age < MIN_AGE and not min_age_bucket):
266 if bucket_size < target_size and len(cutoffs) > 0:
273 min_age_bucket = True
275 # The last (youngest) bucket will be group 0, unless it has enough data
276 # to be of size min_size by itself, or there happen to be no objects
277 # less than MIN_AGE at all.
278 if bucket_size >= min_size or not min_age_bucket:
282 print "cutoffs:", cutoffs
284 # Update the database to assign each object to the appropriate bucket.
286 for i in range(len(cutoffs)):
287 cur.execute("""update block_index set expired = ?
288 where round(? - timestamp) > ? and expired >= 0""",
289 (i, now, cutoffs[i]))
290 cur.execute("update block_index set expired = 0 where expired = -1")