Add retries to the S3 backend in the cleaner.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
19 HEADER_MAGIC2 = 'AgI='          # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
21
22 class ITEM_TYPE:
23     DATA = '1'
24     INODE = '2'
25     INODE_MAP = '3'
26     CHECKPOINT = '4'
27
28 class FileBackend:
29     """An interface to BlueSky where the log segments are on local disk.
30
31     This is mainly intended for testing purposes, as the real cleaner would
32     operate where data is being stored in S3."""
33
34     def __init__(self, path):
35         self.path = path
36
37     def list(self):
38         """Return a listing of all log segments and their sizes."""
39
40         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
41         files.sort()
42
43         return [(f, os.stat(os.path.join(self.path, f)).st_size)
44                 for f in files]
45
46     def read(self, filename, offset=0, length=None):
47         fp = open(os.path.join(self.path, filename), 'rb')
48         if offset > 0:
49             fp.seek(offset)
50         if legnth is None:
51             return fp.read()
52         else:
53             return fp.read(length)
54
55     def write(self, filename, data):
56         fp = open(os.path.join(self.path, filename), 'wb')
57         fp.write(data)
58         fp.close()
59
60     def delete(self, filename):
61         os.unlink(os.path.join(self.path, filename))
62
63     def loc_to_name(self, location):
64         return "log-%08d-%08d" % (location)
65
66     def name_to_loc(self, name):
67         m = re.match(r"^log-(\d+)-(\d+)$", name)
68         if m: return (int(m.group(1)), int(m.group(2)))
69
70 def retry_wrap(method):
71     def wrapped(self, *args, **kwargs):
72         for retries in range(3):
73             try:
74                 return method(self, *args, **kwargs)
75             except:
76                 print >>sys.stderr, "S3 operation failed, retrying..."
77                 self.connect()
78         return method(self, *args, **kwargs)
79     return wrapped
80
81 class S3Backend:
82     """An interface to BlueSky where the log segments are on in Amazon S3."""
83
84     def __init__(self, bucket, path='', cachedir="."):
85         self.bucket_name = bucket
86         self.path = path
87         self.cachedir = cachedir
88         self.cache = {}
89         self.connect()
90
91     def connect(self):
92         self.conn = boto.connect_s3(is_secure=False)
93         self.bucket = self.conn.get_bucket(self.bucket_name)
94
95     def list(self):
96         files = []
97         for k in self.bucket.list(self.path + 'log-'):
98             files.append((k.key, k.size))
99         return files
100
101     @retry_wrap
102     def read(self, filename, offset=0, length=None):
103         if filename in self.cache:
104             fp = open(os.path.join(self.cachedir, filename), 'rb')
105             if offset > 0:
106                 fp.seek(offset)
107             if length is None:
108                 return fp.read()
109             else:
110                 return fp.read(length)
111         else:
112             k = Key(self.bucket)
113             k.key = self.path + filename
114             data = k.get_contents_as_string()
115             fp = open(os.path.join(self.cachedir, filename), 'wb')
116             fp.write(data)
117             fp.close()
118             self.cache[filename] = True
119             if offset > 0:
120                 data = data[offset:]
121             if length is not None:
122                 data = data[0:length]
123             return data
124
125     @retry_wrap
126     def write(self, filename, data):
127         k = Key(self.bucket)
128         k.key = self.path + filename
129         k.set_contents_from_string(data)
130         if filename in self.cache:
131             del self.cache[filename]
132
133     @retry_wrap
134     def delete(self, filename):
135         k = Key(self.bucket)
136         k.key = self.path + filename
137         k.delete()
138         if filename in self.cache:
139             del self.cache[filename]
140
141     def loc_to_name(self, location):
142         return "log-%08d-%08d" % (location)
143
144     def name_to_loc(self, name):
145         m = re.match(r"^log-(\d+)-(\d+)$", name)
146         if m: return (int(m.group(1)), int(m.group(2)))
147
148 class LogItem:
149     """In-memory representation of a single item stored in a log file."""
150
151     def __init__(self):
152         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
153         self.encrypted = False
154
155     def __str__(self):
156         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
157
158     @staticmethod
159     def random_id():
160         return open('/dev/urandom').read(16)
161
162     def serialize(self):
163         link_ids = []
164         link_locs = []
165         for (i, l) in self.links:
166             link_ids.append(i)
167             if i != '\0' * 16:
168                 link_locs.append(struct.pack('<IIII', *l))
169         link_ids = ''.join(link_ids)
170         link_locs = ''.join(link_locs)
171
172         if self.encrypted:
173             magic = HEADER_MAGIC2
174         else:
175             magic = HEADER_MAGIC1
176         header = struct.pack(HEADER_FORMAT,
177                              magic, self.cryptkeys,
178                              ord(self.type), self.id, self.inum,
179                              len(self.data), len(link_ids), len(link_locs))
180         return header + self.data + link_ids + link_locs
181
182 class LogSegment:
183     def __init__(self, backend, location):
184         self.backend = backend
185         self.location = location
186         self.data = []
187
188     def __len__(self):
189         return sum(len(s) for s in self.data)
190
191     def write(self, item):
192         data = item.serialize()
193         offset = len(self)
194         self.data.append(data)
195         item.location = self.location + (offset, len(data))
196
197     def close(self):
198         data = ''.join(self.data)
199         filename = self.backend.loc_to_name(self.location)
200         print "Would write %d bytes of data to %s" % (len(data), filename)
201         self.backend.write(filename, data)
202
203 class LogDirectory:
204     TARGET_SIZE = 4 << 20
205
206     def __init__(self, backend, dir):
207         self.backend = backend
208         self.dir_num = dir
209         self.seq_num = 0
210         for logname in backend.list():
211             loc = backend.name_to_loc(logname[0])
212             if loc is not None and loc[0] == dir:
213                 self.seq_num = max(self.seq_num, loc[1] + 1)
214         self.groups = {}
215         print "Starting sequence number is", self.seq_num
216
217     def open_segment(self):
218         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
219         self.seq_num += 1
220         return seg
221
222     def write(self, item, segment_group=0):
223         if segment_group not in self.groups:
224             self.groups[segment_group] = self.open_segment()
225         seg = self.groups[segment_group]
226         seg.write(item)
227         if len(seg) >= LogDirectory.TARGET_SIZE:
228             seg.close()
229             del self.groups[segment_group]
230
231     def close_all(self):
232         for k in list(self.groups.keys()):
233             self.groups[k].close()
234             del self.groups[k]
235
236 class UtilizationTracker:
237     """A simple object that tracks what fraction of each segment is used.
238
239     This data can be used to guide segment cleaning decisions."""
240
241     def __init__(self, backend):
242         self.segments = {}
243         for (segment, size) in backend.list():
244             self.segments[segment] = [size, 0]
245
246     def add_item(self, item):
247         if isinstance(item, LogItem):
248             item = item.location
249         if item is None: return
250         (dir, seq, offset, size) = item
251         filename = "log-%08d-%08d" % (dir, seq)
252         self.segments[filename][1] += size
253
254 def parse_item(data):
255     if len(data) < HEADER_SIZE: return
256     header = struct.unpack_from(HEADER_FORMAT, data, 0)
257     size = HEADER_SIZE + sum(header[5:8])
258
259     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
260         print "Bad header magic!"
261         return
262
263     if len(data) != size:
264         print "Item size does not match: %d != %d" % (size, len(data))
265         return
266
267     item = LogItem()
268     if header[0] == HEADER_MAGIC2: item.encrypted = True
269     item.cryptkeys = header[1]
270     item.id = header[3]
271     item.inum = header[4]
272     item.location = None
273     item.type = chr(header[2])
274     item.size = size
275     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
276     links = []
277     link_ids = data[HEADER_SIZE + header[5]
278                     : HEADER_SIZE + header[5] + header[6]]
279     link_locs = data[HEADER_SIZE + header[5] + header[6]
280                      : HEADER_SIZE + sum(header[5:8])]
281     for i in range(len(link_ids) // 16):
282         id = link_ids[16*i : 16*i + 16]
283         if id == '\0' * 16:
284             loc = None
285         else:
286             loc = struct.unpack('<IIII', link_locs[0:16])
287             link_locs = link_locs[16:]
288         links.append((id, loc))
289     item.links = links
290     return item
291
292 def load_item(backend, location):
293     """Load the cloud item pointed at by the 4-tuple 'location'.
294
295     The elements of the tuple are (directory, sequence, offset, size)."""
296
297     filename = backend.loc_to_name((location[0], location[1]))
298     data = backend.read(filename, location[2], location[3])
299     item = parse_item(data)
300     item.location = location
301     return item
302
303 def parse_log(data, location=None):
304     """Parse contents of a log file, yielding a sequence of log items."""
305
306     if isinstance(location, str):
307         m = re.match(r"^log-(\d+)-(\d+)$", location)
308         if m:
309             location = (int(m.group(1)), int(m.group(2)))
310         else:
311             location = None
312
313     offset = 0
314     while len(data) - offset >= HEADER_SIZE:
315         header = struct.unpack_from(HEADER_FORMAT, data, offset)
316         size = HEADER_SIZE + sum(header[5:8])
317         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
318             print "Bad header magic!"
319             break
320         if size + offset > len(data):
321             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
322             break
323         item = parse_item(data[offset : offset + size])
324         if location is not None:
325             item.location = location + (offset, size)
326         if item is not None: yield item
327         offset += size
328
329 def load_checkpoint_record(backend):
330     for (log, size) in reversed(backend.list()):
331         for item in reversed(list(parse_log(backend.read(log), log))):
332             print item
333             if item.type == ITEM_TYPE.CHECKPOINT:
334                 return item
335
336 class InodeMap:
337     def __init__(self):
338         pass
339
340     def build(self, backend, checkpoint_record):
341         """Reconstruct the inode map from the checkpoint record given.
342
343         This will also build up information about segment utilization."""
344
345         self.checkpoint_record = checkpoint_record
346
347         util = UtilizationTracker(backend)
348         util.add_item(checkpoint_record)
349         inodes = {}
350         self.obsolete_segments = set()
351
352         print "Inode map:"
353         for i in range(len(checkpoint_record.data) // 16):
354             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
355             imap = load_item(backend, checkpoint_record.links[i][1])
356             util.add_item(imap)
357             print "[%d, %d]: %s" % (start, end, imap)
358             for j in range(len(imap.data) // 8):
359                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
360                 inode = load_item(backend, imap.links[j][1])
361                 inodes[inum] = inode
362                 data_segments = set()
363                 util.add_item(inode)
364                 for i in inode.links:
365                     util.add_item(i[1])
366                     data_segments.add(i[1][0:2])
367                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
368
369         print
370         print "Segment utilizations:"
371         for (s, u) in sorted(util.segments.items()):
372             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
373             if u[1] == 0:
374                 print "Deleting..."
375                 backend.delete(s)
376
377         self.inodes = inodes
378         self.util = util
379         self.updated_inodes = set()
380
381     def mark_updated(self, inum):
382         self.updated_inodes.add(inum)
383
384     def write(self, backend, log):
385         updated_inodes = sorted(self.updated_inodes, reverse=True)
386
387         new_checkpoint = LogItem()
388         new_checkpoint.id = LogItem.random_id()
389         new_checkpoint.inum = 0
390         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
391         new_checkpoint.data = ""
392         new_checkpoint.links = []
393
394         for i in range(len(self.checkpoint_record.data) // 16):
395             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
396
397             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
398
399             # Case 1: No inodes in this range of the old inode map have
400             # changed.  Simply emit a new pointer to the same inode map block.
401             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
402                 old_location = self.checkpoint_record.links[i][1][0:2]
403                 if old_location not in self.obsolete_segments:
404                     new_checkpoint.links.append(self.checkpoint_record.links[i])
405                     continue
406
407             # Case 2: Some inodes have been updated.  Create a new inode map
408             # block, write it out, and point the new checkpoint at it.
409             inodes = [k for k in self.inodes if k >= start and k <= end]
410             inodes.sort()
411
412             block = LogItem()
413             block.id = LogItem.random_id()
414             block.inum = 0
415             block.type = ITEM_TYPE.INODE_MAP
416             block.links = []
417             block.data = ""
418             for j in inodes:
419                 block.data += struct.pack("<Q", j)
420                 block.links.append((self.inodes[j].id, self.inodes[j].location))
421             log.write(block, 2)
422
423             new_checkpoint.links.append((block.id, block.location))
424
425             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
426                 updated_inodes.pop()
427
428         log.write(new_checkpoint, 2)
429         self.checkpoint_record = new_checkpoint
430
431 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
432     inode = inode_map.inodes[inum]
433     if copy_data:
434         blocks = []
435         for l in inode.links:
436             data = load_item(backend, l[1])
437             blocks.append(data)
438             log.write(data, 0)
439         inode.links = [(b.id, b.location) for b in blocks]
440     log.write(inode, 1)
441     inode_map.mark_updated(inum)
442
443 def run_cleaner(backend, inode_map, log, repack_inodes=False):
444     # Determine which segments are poorly utilized and should be cleaned.  We
445     # need better heuristics here.
446     for (s, u) in sorted(inode_map.util.segments.items()):
447         if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
448             print "Should clean segment", s
449             loc = backend.name_to_loc(s)
450             if s: inode_map.obsolete_segments.add(loc)
451
452     # TODO: We probably also want heuristics that will find inodes with
453     # badly-fragmented data and rewrite that to achieve better locality.
454
455     # Given that list of segments to clean, scan through those segments to find
456     # data which is still live and mark relevant inodes as needing to be
457     # rewritten.
458     if repack_inodes:
459         dirty_inodes = set(inode_map.inodes)
460     else:
461         dirty_inodes = set()
462     dirty_inode_data = set()
463     for s in inode_map.obsolete_segments:
464         filename = backend.loc_to_name(s)
465         print "Scanning", filename, "for live data"
466         for item in parse_log(backend.read(filename), filename):
467             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
468                 if item.inum != 0:
469                     inode = inode_map.inodes[item.inum]
470                     if s == inode.location[0:2]:
471                         dirty_inodes.add(item.inum)
472                     if item.inum not in dirty_inode_data:
473                         for b in inode.links:
474                             if s == b[1][0:2]:
475                                 dirty_inode_data.add(item.inum)
476                                 break
477
478     print "Inodes to rewrite:", dirty_inodes
479     print "Inodes with data to rewrite:", dirty_inode_data
480     for i in sorted(dirty_inodes.union(dirty_inode_data)):
481         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
482
483 if __name__ == '__main__':
484     backend = S3Backend("mvrable-bluesky", cachedir=".")
485     chkpt = load_checkpoint_record(backend)
486     print backend.list()
487     imap = InodeMap()
488     imap.build(backend, chkpt)
489     print chkpt
490
491     log_dir = LogDirectory(backend, 0)
492     run_cleaner(backend, imap, log_dir)
493     imap.write(backend, log_dir)
494     log_dir.close_all()