cumulus-sync: Create a tool for copying snapshots between locations.
[cumulus.git] / python / cumulus / __init__.py
1 """High-level interface for working with LBS archives.
2
3 This module provides an easy interface for reading from and manipulating
4 various parts of an LBS archive:
5   - listing the snapshots and segments present
6   - reading segment contents
7   - parsing snapshot descriptors and snapshot metadata logs
8   - reading and maintaining the local object database
9 """
10
11 from __future__ import division
12 import os, re, sha, tarfile, tempfile, thread
13 from pysqlite2 import dbapi2 as sqlite3
14
15 import cumulus.store, cumulus.store.file
16
17 # The largest supported snapshot format that can be understood.
18 FORMAT_VERSION = (0, 8)         # LBS Snapshot v0.8
19
20 # Maximum number of nested indirect references allowed in a snapshot.
21 MAX_RECURSION_DEPTH = 3
22
23 # All segments which have been accessed this session.
24 accessed_segments = set()
25
26 def uri_decode(s):
27     """Decode a URI-encoded (%xx escapes) string."""
28     def hex_decode(m): return chr(int(m.group(1), 16))
29     return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
30 def uri_encode(s):
31     """Encode a string to URI-encoded (%xx escapes) form."""
32     def hex_encode(c):
33         if c > '+' and c < '\x7f' and c != '@':
34             return c
35         else:
36             return "%%%02x" % (ord(c),)
37     return ''.join(hex_encode(c) for c in s)
38
39 class Struct:
40     """A class which merely acts as a data container.
41
42     Instances of this class (or its subclasses) are merely used to store data
43     in various attributes.  No methods are provided.
44     """
45
46     def __repr__(self):
47         return "<%s %s>" % (self.__class__, self.__dict__)
48
49 CHECKSUM_ALGORITHMS = {
50     'sha1': sha.new
51 }
52
53 class ChecksumCreator:
54     """Compute an LBS checksum for provided data.
55
56     The algorithm used is selectable, but currently defaults to sha1.
57     """
58
59     def __init__(self, algorithm='sha1'):
60         self.algorithm = algorithm
61         self.hash = CHECKSUM_ALGORITHMS[algorithm]()
62
63     def update(self, data):
64         self.hash.update(data)
65         return self
66
67     def compute(self):
68         return "%s=%s" % (self.algorithm, self.hash.hexdigest())
69
70 class ChecksumVerifier:
71     """Verify whether a checksum from a snapshot matches the supplied data."""
72
73     def __init__(self, checksumstr):
74         """Create an object to check the supplied checksum."""
75
76         (algo, checksum) = checksumstr.split("=", 1)
77         self.checksum = checksum
78         self.hash = CHECKSUM_ALGORITHMS[algo]()
79
80     def update(self, data):
81         self.hash.update(data)
82
83     def valid(self):
84         """Return a boolean indicating whether the checksum matches."""
85
86         result = self.hash.hexdigest()
87         return result == self.checksum
88
89 class LowlevelDataStore:
90     """Access to the backup store containing segments and snapshot descriptors.
91
92     Instances of this class are used to get direct filesystem-level access to
93     the backup data.  To read a backup, a caller will ordinarily not care about
94     direct access to backup segments, but will instead merely need to access
95     objects from those segments.  The ObjectStore class provides a suitable
96     wrapper around a DataStore to give this high-level access.
97     """
98
99     def __init__(self, path):
100         if isinstance(path, cumulus.store.Store):
101             self.store = path
102         elif path.find(":") >= 0:
103             self.store = cumulus.store.open(path)
104         else:
105             self.store = cumulus.store.file.FileStore(path)
106
107     def _classify(self, filename):
108         for (t, r) in cumulus.store.type_patterns.items():
109             if r.match(filename):
110                 return (t, filename)
111         return (None, filename)
112
113     def lowlevel_open(self, filename):
114         """Return a file-like object for reading data from the given file."""
115
116         (type, filename) = self._classify(filename)
117         return self.store.get(type, filename)
118
119     def lowlevel_stat(self, filename):
120         """Return a dictionary of information about the given file.
121
122         Currently, the only defined field is 'size', giving the size of the
123         file in bytes.
124         """
125
126         (type, filename) = self._classify(filename)
127         return self.store.stat(type, filename)
128
129     # Slightly higher-level list methods.
130     def list_snapshots(self):
131         for f in self.store.list('snapshots'):
132             m = cumulus.store.type_patterns['snapshots'].match(f)
133             if m: yield m.group(1)
134
135     def list_segments(self):
136         for f in self.store.list('segments'):
137             m = cumulus.store.type_patterns['segments'].match(f)
138             if m: yield m.group(1)
139
140 class ObjectStore:
141     def __init__(self, data_store):
142         self.store = data_store
143         self.cachedir = None
144         self.CACHE_SIZE = 16
145         self.lru_list = []
146
147     def get_cachedir(self):
148         if self.cachedir is None:
149             self.cachedir = tempfile.mkdtemp(".lbs")
150         return self.cachedir
151
152     def cleanup(self):
153         if self.cachedir is not None:
154             # TODO: Avoid use of system, make this safer
155             os.system("rm -rf " + self.cachedir)
156         self.cachedir = None
157
158     @staticmethod
159     def parse_ref(refstr):
160         m = re.match(r"^zero\[(\d+)\]$", refstr)
161         if m:
162             return ("zero", None, None, (0, int(m.group(1))))
163
164         m = re.match(r"^([-0-9a-f]+)\/([0-9a-f]+)(\(\S+\))?(\[(((\d+)\+)?(\d+)|=(\d+))\])?$", refstr)
165         if not m: return
166
167         segment = m.group(1)
168         object = m.group(2)
169         checksum = m.group(3)
170         slice = m.group(4)
171
172         if checksum is not None:
173             checksum = checksum.lstrip("(").rstrip(")")
174
175         if slice is not None:
176             if m.group(9) is not None:
177                 # Size-assertion slice
178                 slice = (0, int(m.group(9)), True)
179             elif m.group(6) is None:
180                 # Abbreviated slice
181                 slice = (0, int(m.group(8)), False)
182             else:
183                 slice = (int(m.group(7)), int(m.group(8)), False)
184
185         return (segment, object, checksum, slice)
186
187     def get_segment(self, segment):
188         accessed_segments.add(segment)
189         raw = self.store.lowlevel_open(segment + ".tar.gpg")
190
191         (input, output) = os.popen2("lbs-filter-gpg --decrypt")
192         def copy_thread(src, dst):
193             BLOCK_SIZE = 4096
194             while True:
195                 block = src.read(BLOCK_SIZE)
196                 if len(block) == 0: break
197                 dst.write(block)
198             dst.close()
199
200         thread.start_new_thread(copy_thread, (raw, input))
201         return output
202
203     def load_segment(self, segment):
204         seg = tarfile.open(segment, 'r|', self.get_segment(segment))
205         for item in seg:
206             data_obj = seg.extractfile(item)
207             path = item.name.split('/')
208             if len(path) == 2 and path[0] == segment:
209                 yield (path[1], data_obj.read())
210
211     def load_snapshot(self, snapshot):
212         file = self.store.lowlevel_open("snapshot-" + snapshot + ".lbs")
213         return file.read().splitlines(True)
214
215     def extract_segment(self, segment):
216         segdir = os.path.join(self.get_cachedir(), segment)
217         os.mkdir(segdir)
218         for (object, data) in self.load_segment(segment):
219             f = open(os.path.join(segdir, object), 'wb')
220             f.write(data)
221             f.close()
222
223     def load_object(self, segment, object):
224         accessed_segments.add(segment)
225         path = os.path.join(self.get_cachedir(), segment, object)
226         if not os.access(path, os.R_OK):
227             self.extract_segment(segment)
228         if segment in self.lru_list: self.lru_list.remove(segment)
229         self.lru_list.append(segment)
230         while len(self.lru_list) > self.CACHE_SIZE:
231             os.system("rm -rf " + os.path.join(self.cachedir, self.lru_list[0]))
232             self.lru_list = self.lru_list[1:]
233         return open(path, 'rb').read()
234
235     def get(self, refstr):
236         """Fetch the given object and return it.
237
238         The input should be an object reference, in string form.
239         """
240
241         (segment, object, checksum, slice) = self.parse_ref(refstr)
242
243         if segment == "zero":
244             return "\0" * slice[1]
245
246         data = self.load_object(segment, object)
247
248         if checksum is not None:
249             verifier = ChecksumVerifier(checksum)
250             verifier.update(data)
251             if not verifier.valid():
252                 raise ValueError
253
254         if slice is not None:
255             (start, length, exact) = slice
256             if exact and len(data) != length: raise ValueError
257             data = data[start:start+length]
258             if len(data) != length: raise IndexError
259
260         return data
261
262 def parse(lines, terminate=None):
263     """Generic parser for RFC822-style "Key: Value" data streams.
264
265     This parser can be used to read metadata logs and snapshot root descriptor
266     files.
267
268     lines must be an iterable object which yields a sequence of lines of input.
269
270     If terminate is specified, it is used as a predicate to determine when to
271     stop reading input lines.
272     """
273
274     dict = {}
275     last_key = None
276
277     for l in lines:
278         # Strip off a trailing newline, if present
279         if len(l) > 0 and l[-1] == "\n":
280             l = l[:-1]
281
282         if terminate is not None and terminate(l):
283             if len(dict) > 0: yield dict
284             dict = {}
285             last_key = None
286             continue
287
288         m = re.match(r"^([-\w]+):\s*(.*)$", l)
289         if m:
290             dict[m.group(1)] = m.group(2)
291             last_key = m.group(1)
292         elif len(l) > 0 and l[0].isspace() and last_key is not None:
293             dict[last_key] += l
294         else:
295             last_key = None
296
297     if len(dict) > 0: yield dict
298
299 def parse_full(lines):
300     try:
301         return parse(lines).next()
302     except StopIteration:
303         return {}
304
305 def parse_metadata_version(s):
306     """Convert a string with the snapshot version format to a tuple."""
307
308     m = re.match(r"^LBS Snapshot v(\d+(\.\d+)*)$", s)
309     if m is None:
310         return ()
311     else:
312         return tuple([int(d) for d in m.group(1).split(".")])
313
314 def read_metadata(object_store, root):
315     """Iterate through all lines in the metadata log, following references."""
316
317     # Stack for keeping track of recursion when following references to
318     # portions of the log.  The last entry in the stack corresponds to the
319     # object currently being parsed.  Each entry is a list of lines which have
320     # been reversed, so that popping successive lines from the end of each list
321     # will return lines of the metadata log in order.
322     stack = []
323
324     def follow_ref(refstr):
325         if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
326         lines = object_store.get(refstr).splitlines(True)
327         lines.reverse()
328         stack.append(lines)
329
330     follow_ref(root)
331
332     while len(stack) > 0:
333         top = stack[-1]
334         if len(top) == 0:
335             stack.pop()
336             continue
337         line = top.pop()
338
339         # An indirect reference which we must follow?
340         if len(line) > 0 and line[0] == '@':
341             ref = line[1:]
342             ref.strip()
343             follow_ref(ref)
344         else:
345             yield line
346
347 class MetadataItem:
348     """Metadata for a single file (or directory or...) from a snapshot."""
349
350     # Functions for parsing various datatypes that can appear in a metadata log
351     # item.
352     @staticmethod
353     def decode_int(s):
354         """Decode an integer, expressed in decimal, octal, or hexadecimal."""
355         if s.startswith("0x"):
356             return int(s, 16)
357         elif s.startswith("0"):
358             return int(s, 8)
359         else:
360             return int(s, 10)
361
362     @staticmethod
363     def decode_str(s):
364         """Decode a URI-encoded (%xx escapes) string."""
365         return uri_decode(s)
366
367     @staticmethod
368     def raw_str(s):
369         """An unecoded string."""
370         return s
371
372     @staticmethod
373     def decode_user(s):
374         """Decode a user/group to a tuple of uid/gid followed by name."""
375         items = s.split()
376         uid = MetadataItem.decode_int(items[0])
377         name = None
378         if len(items) > 1:
379             if items[1].startswith("(") and items[1].endswith(")"):
380                 name = MetadataItem.decode_str(items[1][1:-1])
381         return (uid, name)
382
383     @staticmethod
384     def decode_device(s):
385         """Decode a device major/minor number."""
386         (major, minor) = map(MetadataItem.decode_int, s.split("/"))
387         return (major, minor)
388
389     class Items: pass
390
391     def __init__(self, fields, object_store):
392         """Initialize from a dictionary of key/value pairs from metadata log."""
393
394         self.fields = fields
395         self.object_store = object_store
396         self.keys = []
397         self.items = self.Items()
398         for (k, v) in fields.items():
399             if k in self.field_types:
400                 decoder = self.field_types[k]
401                 setattr(self.items, k, decoder(v))
402                 self.keys.append(k)
403
404     def data(self):
405         """Return an iterator for the data blocks that make up a file."""
406
407         # This traverses the list of blocks that make up a file, following
408         # indirect references.  It is implemented in much the same way as
409         # read_metadata, so see that function for details of the technique.
410
411         objects = self.fields['data'].split()
412         objects.reverse()
413         stack = [objects]
414
415         def follow_ref(refstr):
416             if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
417             objects = self.object_store.get(refstr).split()
418             objects.reverse()
419             stack.append(objects)
420
421         while len(stack) > 0:
422             top = stack[-1]
423             if len(top) == 0:
424                 stack.pop()
425                 continue
426             ref = top.pop()
427
428             # An indirect reference which we must follow?
429             if len(ref) > 0 and ref[0] == '@':
430                 follow_ref(ref[1:])
431             else:
432                 yield ref
433
434 # Description of fields that might appear, and how they should be parsed.
435 MetadataItem.field_types = {
436     'name': MetadataItem.decode_str,
437     'type': MetadataItem.raw_str,
438     'mode': MetadataItem.decode_int,
439     'device': MetadataItem.decode_device,
440     'user': MetadataItem.decode_user,
441     'group': MetadataItem.decode_user,
442     'ctime': MetadataItem.decode_int,
443     'mtime': MetadataItem.decode_int,
444     'links': MetadataItem.decode_int,
445     'inode': MetadataItem.raw_str,
446     'checksum': MetadataItem.decode_str,
447     'size': MetadataItem.decode_int,
448     'contents': MetadataItem.decode_str,
449     'target': MetadataItem.decode_str,
450 }
451
452 def iterate_metadata(object_store, root):
453     for d in parse(read_metadata(object_store, root), lambda l: len(l) == 0):
454         yield MetadataItem(d, object_store)
455
456 class LocalDatabase:
457     """Access to the local database of snapshot contents and object checksums.
458
459     The local database is consulted when creating a snapshot to determine what
460     data can be re-used from old snapshots.  Segment cleaning is performed by
461     manipulating the data in the local database; the local database also
462     includes enough data to guide the segment cleaning process.
463     """
464
465     def __init__(self, path, dbname="localdb.sqlite"):
466         self.db_connection = sqlite3.connect(path + "/" + dbname)
467
468     # Low-level database access.  Use these methods when there isn't a
469     # higher-level interface available.  Exception: do, however, remember to
470     # use the commit() method after making changes to make sure they are
471     # actually saved, even when going through higher-level interfaces.
472     def commit(self):
473         "Commit any pending changes to the local database."
474         self.db_connection.commit()
475
476     def rollback(self):
477         "Roll back any pending changes to the local database."
478         self.db_connection.rollback()
479
480     def cursor(self):
481         "Return a DB-API cursor for directly accessing the local database."
482         return self.db_connection.cursor()
483
484     def list_schemes(self):
485         """Return the list of snapshots found in the local database.
486
487         The returned value is a list of tuples (id, scheme, name, time, intent).
488         """
489
490         cur = self.cursor()
491         cur.execute("select distinct scheme from snapshots")
492         schemes = [row[0] for row in cur.fetchall()]
493         schemes.sort()
494         return schemes
495
496     def garbage_collect(self, scheme, intent=1.0):
497         """Delete entries from old snapshots from the database.
498
499         Only snapshots with the specified scheme name will be deleted.  If
500         intent is given, it gives the intended next snapshot type, to determine
501         how aggressively to clean (for example, intent=7 could be used if the
502         next snapshot will be a weekly snapshot).
503         """
504
505         cur = self.cursor()
506
507         # Find the id of the last snapshot to be created.  This is used for
508         # measuring time in a way: we record this value in each segment we
509         # expire on this run, and then on a future run can tell if there have
510         # been intervening backups made.
511         cur.execute("select max(snapshotid) from snapshots")
512         last_snapshotid = cur.fetchone()[0]
513
514         # Get the list of old snapshots for this scheme.  Delete all the old
515         # ones.  Rules for what to keep:
516         #   - Always keep the most recent snapshot.
517         #   - If snapshot X is younger than Y, and X has higher intent, then Y
518         #     can be deleted.
519         cur.execute("""select snapshotid, name, intent,
520                               julianday('now') - timestamp as age
521                        from snapshots where scheme = ?
522                        order by age""", (scheme,))
523
524         first = True
525         max_intent = intent
526         for (id, name, snap_intent, snap_age) in cur.fetchall():
527             can_delete = False
528             if snap_intent < max_intent:
529                 # Delete small-intent snapshots if there is a more recent
530                 # large-intent snapshot.
531                 can_delete = True
532             elif snap_intent == intent:
533                 # Delete previous snapshots with the specified intent level.
534                 can_delete = True
535
536             if can_delete and not first:
537                 print "Delete snapshot %d (%s)" % (id, name)
538                 cur.execute("delete from snapshots where snapshotid = ?",
539                             (id,))
540             first = False
541             max_intent = max(max_intent, snap_intent)
542
543         # Delete entries in the segments_used table which are for non-existent
544         # snapshots.
545         cur.execute("""delete from segments_used
546                        where snapshotid not in
547                            (select snapshotid from snapshots)""")
548
549         # Find segments which contain no objects used by any current snapshots,
550         # and delete them from the segment table.
551         cur.execute("""delete from segments where segmentid not in
552                            (select segmentid from segments_used)""")
553
554         # Delete unused objects in the block_index table.  By "unused", we mean
555         # any object which was stored in a segment which has been deleted, and
556         # any object in a segment which was marked for cleaning and has had
557         # cleaning performed already (the expired time is less than the current
558         # largest snapshot id).
559         cur.execute("""delete from block_index
560                        where segmentid not in (select segmentid from segments)
561                           or segmentid in (select segmentid from segments
562                                            where expire_time < ?)""",
563                     (last_snapshotid,))
564
565         # Remove sub-block signatures for deleted objects.
566         cur.execute("""delete from subblock_signatures
567                        where blockid not in
568                            (select blockid from block_index)""")
569
570     # Segment cleaning.
571     class SegmentInfo(Struct): pass
572
573     def get_segment_cleaning_list(self, age_boost=0.0):
574         """Return a list of all current segments with information for cleaning.
575
576         Return all segments which are currently known in the local database
577         (there might be other, older segments in the archive itself), and
578         return usage statistics for each to help decide which segments to
579         clean.
580
581         The returned list will be sorted by estimated cleaning benefit, with
582         segments that are best to clean at the start of the list.
583
584         If specified, the age_boost parameter (measured in days) will added to
585         the age of each segment, as a way of adjusting the benefit computation
586         before a long-lived snapshot is taken (for example, age_boost might be
587         set to 7 when cleaning prior to taking a weekly snapshot).
588         """
589
590         cur = self.cursor()
591         segments = []
592         cur.execute("""select segmentid, used, size, mtime,
593                        julianday('now') - mtime as age from segment_info
594                        where expire_time is null""")
595         for row in cur:
596             info = self.SegmentInfo()
597             info.id = row[0]
598             info.used_bytes = row[1]
599             info.size_bytes = row[2]
600             info.mtime = row[3]
601             info.age_days = row[4]
602
603             # If data is not available for whatever reason, treat it as 0.0.
604             if info.age_days is None:
605                 info.age_days = 0.0
606             if info.used_bytes is None:
607                 info.used_bytes = 0.0
608
609             # Benefit calculation: u is the estimated fraction of each segment
610             # which is utilized (bytes belonging to objects still in use
611             # divided by total size; this doesn't take compression or storage
612             # overhead into account, but should give a reasonable estimate).
613             #
614             # The total benefit is a heuristic that combines several factors:
615             # the amount of space that can be reclaimed (1 - u), an ageing
616             # factor (info.age_days) that favors cleaning old segments to young
617             # ones and also is more likely to clean segments that will be
618             # rewritten for long-lived snapshots (age_boost), and finally a
619             # penalty factor for the cost of re-uploading data (u + 0.1).
620             u = info.used_bytes / info.size_bytes
621             info.cleaning_benefit \
622                 = (1 - u) * (info.age_days + age_boost) / (u + 0.1)
623
624             segments.append(info)
625
626         segments.sort(cmp, key=lambda s: s.cleaning_benefit, reverse=True)
627         return segments
628
629     def mark_segment_expired(self, segment):
630         """Mark a segment for cleaning in the local database.
631
632         The segment parameter should be either a SegmentInfo object or an
633         integer segment id.  Objects in the given segment will be marked as
634         expired, which means that any future snapshots that would re-use those
635         objects will instead write out a new copy of the object, and thus no
636         future snapshots will depend upon the given segment.
637         """
638
639         if isinstance(segment, int):
640             id = segment
641         elif isinstance(segment, self.SegmentInfo):
642             id = segment.id
643         else:
644             raise TypeError("Invalid segment: %s, must be of type int or SegmentInfo, not %s" % (segment, type(segment)))
645
646         cur = self.cursor()
647         cur.execute("select max(snapshotid) from snapshots")
648         last_snapshotid = cur.fetchone()[0]
649         cur.execute("update segments set expire_time = ? where segmentid = ?",
650                     (last_snapshotid, id))
651         cur.execute("update block_index set expired = 0 where segmentid = ?",
652                     (id,))
653
654     def balance_expired_objects(self):
655         """Analyze expired objects in segments to be cleaned and group by age.
656
657         Update the block_index table of the local database to group expired
658         objects by age.  The exact number of buckets and the cutoffs for each
659         are dynamically determined.  Calling this function after marking
660         segments expired will help in the segment cleaning process, by ensuring
661         that when active objects from clean segments are rewritten, they will
662         be placed into new segments roughly grouped by age.
663         """
664
665         # The expired column of the block_index table is used when generating a
666         # new LBS snapshot.  A null value indicates that an object may be
667         # re-used.  Otherwise, an object must be written into a new segment if
668         # needed.  Objects with distinct expired values will be written into
669         # distinct segments, to allow for some grouping by age.  The value 0 is
670         # somewhat special in that it indicates any rewritten objects can be
671         # placed in the same segment as completely new objects; this can be
672         # used for very young objects which have been expired, or objects not
673         # expected to be encountered.
674         #
675         # In the balancing process, all objects which are not used in any
676         # current snapshots will have expired set to 0.  Objects which have
677         # been seen will be sorted by age and will have expired values set to
678         # 0, 1, 2, and so on based on age (with younger objects being assigned
679         # lower values).  The number of buckets and the age cutoffs is
680         # determined by looking at the distribution of block ages.
681
682         cur = self.cursor()
683
684         # Mark all expired objects with expired = 0; these objects will later
685         # have values set to indicate groupings of objects when repacking.
686         cur.execute("""update block_index set expired = 0
687                        where expired is not null""")
688
689         # We will want to aim for at least one full segment for each bucket
690         # that we eventually create, but don't know how many bytes that should
691         # be due to compression.  So compute the average number of bytes in
692         # each expired segment as a rough estimate for the minimum size of each
693         # bucket.  (This estimate could be thrown off by many not-fully-packed
694         # segments, but for now don't worry too much about that.)  If we can't
695         # compute an average, it's probably because there are no expired
696         # segments, so we have no more work to do.
697         cur.execute("""select avg(size) from segments
698                        where segmentid in
699                            (select distinct segmentid from block_index
700                             where expired is not null)""")
701         segment_size_estimate = cur.fetchone()[0]
702         if not segment_size_estimate:
703             return
704
705         # Next, extract distribution of expired objects (number and size) by
706         # age.  Save the timestamp for "now" so that the classification of
707         # blocks into age buckets will not change later in the function, after
708         # time has passed.  Set any timestamps in the future to now, so we are
709         # guaranteed that for the rest of this function, age is always
710         # non-negative.
711         cur.execute("select julianday('now')")
712         now = cur.fetchone()[0]
713
714         cur.execute("""update block_index set timestamp = ?
715                        where timestamp > ? and expired is not null""",
716                     (now, now))
717
718         cur.execute("""select round(? - timestamp) as age, count(*), sum(size)
719                        from block_index where expired = 0
720                        group by age order by age""", (now,))
721         distribution = cur.fetchall()
722
723         # Start to determine the buckets for expired objects.  Heuristics used:
724         #   - An upper bound on the number of buckets is given by the number of
725         #     segments we estimate it will take to store all data.  In fact,
726         #     aim for a couple of segments per bucket.
727         #   - Place very young objects in bucket 0 (place with new objects)
728         #     unless there are enough of them to warrant a separate bucket.
729         #   - Try not to create unnecessarily many buckets, since fewer buckets
730         #     will allow repacked data to be grouped based on spatial locality
731         #     (while more buckets will group by temporal locality).  We want a
732         #     balance.
733         MIN_AGE = 4
734         total_bytes = sum([i[2] for i in distribution])
735         target_buckets = 2 * (total_bytes / segment_size_estimate) ** 0.4
736         min_size = 1.5 * segment_size_estimate
737         target_size = max(2 * segment_size_estimate,
738                           total_bytes / target_buckets)
739
740         print "segment_size:", segment_size_estimate
741         print "distribution:", distribution
742         print "total_bytes:", total_bytes
743         print "target_buckets:", target_buckets
744         print "min, target size:", min_size, target_size
745
746         # Chosen cutoffs.  Each bucket consists of objects with age greater
747         # than one cutoff value, but not greater than the next largest cutoff.
748         cutoffs = []
749
750         # Starting with the oldest objects, begin grouping together into
751         # buckets of size at least target_size bytes.
752         distribution.reverse()
753         bucket_size = 0
754         min_age_bucket = False
755         for (age, items, size) in distribution:
756             if bucket_size >= target_size \
757                 or (age < MIN_AGE and not min_age_bucket):
758                 if bucket_size < target_size and len(cutoffs) > 0:
759                     cutoffs.pop()
760                 cutoffs.append(age)
761                 bucket_size = 0
762
763             bucket_size += size
764             if age < MIN_AGE:
765                 min_age_bucket = True
766
767         # The last (youngest) bucket will be group 0, unless it has enough data
768         # to be of size min_size by itself, or there happen to be no objects
769         # less than MIN_AGE at all.
770         if bucket_size >= min_size or not min_age_bucket:
771             cutoffs.append(-1)
772         cutoffs.append(-1)
773
774         print "cutoffs:", cutoffs
775
776         # Update the database to assign each object to the appropriate bucket.
777         cutoffs.reverse()
778         for i in range(len(cutoffs)):
779             cur.execute("""update block_index set expired = ?
780                            where round(? - timestamp) > ?
781                              and expired is not null""",
782                         (i, now, cutoffs[i]))