Merge git+ssh://c09-44.sysnet.ucsd.edu/scratch/bluesky
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys, time
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
19 HEADER_MAGIC2 = 'AgI='          # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
21
22 class ITEM_TYPE:
23     DATA = '1'
24     INODE = '2'
25     INODE_MAP = '3'
26     CHECKPOINT = '4'
27
28 class FileBackend:
29     """An interface to BlueSky where the log segments are on local disk.
30
31     This is mainly intended for testing purposes, as the real cleaner would
32     operate where data is being stored in S3."""
33
34     def __init__(self, path):
35         self.path = path
36
37     def list(self):
38         """Return a listing of all log segments and their sizes."""
39
40         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
41         files.sort()
42
43         return [(f, os.stat(os.path.join(self.path, f)).st_size)
44                 for f in files]
45
46     def read(self, filename, offset=0, length=None):
47         fp = open(os.path.join(self.path, filename), 'rb')
48         if offset > 0:
49             fp.seek(offset)
50         if legnth is None:
51             return fp.read()
52         else:
53             return fp.read(length)
54
55     def write(self, filename, data):
56         fp = open(os.path.join(self.path, filename), 'wb')
57         fp.write(data)
58         fp.close()
59
60     def delete(self, filename):
61         os.unlink(os.path.join(self.path, filename))
62
63     def loc_to_name(self, location):
64         return "log-%08d-%08d" % (location)
65
66     def name_to_loc(self, name):
67         m = re.match(r"^log-(\d+)-(\d+)$", name)
68         if m: return (int(m.group(1)), int(m.group(2)))
69
70 def retry_wrap(method):
71     def wrapped(self, *args, **kwargs):
72         for retries in range(3):
73             try:
74                 return method(self, *args, **kwargs)
75             except:
76                 print >>sys.stderr, "S3 operation failed, retrying..."
77                 self.connect()
78                 time.sleep(1.0)
79         return method(self, *args, **kwargs)
80     return wrapped
81
82 class S3Backend:
83     """An interface to BlueSky where the log segments are on in Amazon S3."""
84
85     def __init__(self, bucket, path='', cachedir="."):
86         self.bucket_name = bucket
87         self.path = path
88         self.cachedir = cachedir
89         self.cache = {}
90         self.connect()
91
92     def connect(self):
93         self.conn = boto.connect_s3(is_secure=False)
94         self.bucket = self.conn.get_bucket(self.bucket_name)
95
96     def list(self):
97         files = []
98         for k in self.bucket.list(self.path + 'log-'):
99             files.append((k.key, k.size))
100         return files
101
102     @retry_wrap
103     def read(self, filename, offset=0, length=None):
104         if filename in self.cache:
105             fp = open(os.path.join(self.cachedir, filename), 'rb')
106             if offset > 0:
107                 fp.seek(offset)
108             if length is None:
109                 return fp.read()
110             else:
111                 return fp.read(length)
112         else:
113             k = Key(self.bucket)
114             k.key = self.path + filename
115             data = k.get_contents_as_string()
116             fp = open(os.path.join(self.cachedir, filename), 'wb')
117             fp.write(data)
118             fp.close()
119             self.cache[filename] = True
120             if offset > 0:
121                 data = data[offset:]
122             if length is not None:
123                 data = data[0:length]
124             return data
125
126     @retry_wrap
127     def write(self, filename, data):
128         k = Key(self.bucket)
129         k.key = self.path + filename
130         k.set_contents_from_string(data)
131         if filename in self.cache:
132             del self.cache[filename]
133
134     @retry_wrap
135     def delete(self, filename):
136         k = Key(self.bucket)
137         k.key = self.path + filename
138         k.delete()
139         if filename in self.cache:
140             del self.cache[filename]
141
142     def loc_to_name(self, location):
143         return "log-%08d-%08d" % (location)
144
145     def name_to_loc(self, name):
146         m = re.match(r"^log-(\d+)-(\d+)$", name)
147         if m: return (int(m.group(1)), int(m.group(2)))
148
149 class LogItem:
150     """In-memory representation of a single item stored in a log file."""
151
152     def __init__(self):
153         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
154         self.encrypted = False
155
156     def __str__(self):
157         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
158
159     @staticmethod
160     def random_id():
161         return open('/dev/urandom').read(16)
162
163     def serialize(self):
164         link_ids = []
165         link_locs = []
166         for (i, l) in self.links:
167             link_ids.append(i)
168             if i != '\0' * 16:
169                 link_locs.append(struct.pack('<IIII', *l))
170         link_ids = ''.join(link_ids)
171         link_locs = ''.join(link_locs)
172
173         if self.encrypted:
174             magic = HEADER_MAGIC2
175         else:
176             magic = HEADER_MAGIC1
177         header = struct.pack(HEADER_FORMAT,
178                              magic, self.cryptkeys,
179                              ord(self.type), self.id, self.inum,
180                              len(self.data), len(link_ids), len(link_locs))
181         return header + self.data + link_ids + link_locs
182
183 class LogSegment:
184     def __init__(self, backend, location):
185         self.backend = backend
186         self.location = location
187         self.data = []
188
189     def __len__(self):
190         return sum(len(s) for s in self.data)
191
192     def write(self, item):
193         data = item.serialize()
194         offset = len(self)
195         self.data.append(data)
196         item.location = self.location + (offset, len(data))
197
198     def close(self):
199         data = ''.join(self.data)
200         filename = self.backend.loc_to_name(self.location)
201         print "Would write %d bytes of data to %s" % (len(data), filename)
202         self.backend.write(filename, data)
203
204 class LogDirectory:
205     TARGET_SIZE = 4 << 20
206
207     def __init__(self, backend, dir):
208         self.backend = backend
209         self.dir_num = dir
210         self.seq_num = 0
211         for logname in backend.list():
212             loc = backend.name_to_loc(logname[0])
213             if loc is not None and loc[0] == dir:
214                 self.seq_num = max(self.seq_num, loc[1] + 1)
215         self.groups = {}
216         print "Starting sequence number is", self.seq_num
217
218     def open_segment(self):
219         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
220         self.seq_num += 1
221         return seg
222
223     def write(self, item, segment_group=0):
224         if segment_group not in self.groups:
225             self.groups[segment_group] = self.open_segment()
226         seg = self.groups[segment_group]
227         seg.write(item)
228         if len(seg) >= LogDirectory.TARGET_SIZE:
229             seg.close()
230             del self.groups[segment_group]
231
232     def close_all(self):
233         for k in list(self.groups.keys()):
234             self.groups[k].close()
235             del self.groups[k]
236
237 class UtilizationTracker:
238     """A simple object that tracks what fraction of each segment is used.
239
240     This data can be used to guide segment cleaning decisions."""
241
242     def __init__(self, backend):
243         self.segments = {}
244         for (segment, size) in backend.list():
245             self.segments[segment] = [size, 0]
246
247     def add_item(self, item):
248         if isinstance(item, LogItem):
249             item = item.location
250         if item is None: return
251         (dir, seq, offset, size) = item
252         filename = "log-%08d-%08d" % (dir, seq)
253         self.segments[filename][1] += size
254
255 def parse_item(data):
256     if len(data) < HEADER_SIZE: return
257     header = struct.unpack_from(HEADER_FORMAT, data, 0)
258     size = HEADER_SIZE + sum(header[5:8])
259
260     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
261         print "Bad header magic!"
262         return
263
264     if len(data) != size:
265         print "Item size does not match: %d != %d" % (size, len(data))
266         return
267
268     item = LogItem()
269     if header[0] == HEADER_MAGIC2: item.encrypted = True
270     item.cryptkeys = header[1]
271     item.id = header[3]
272     item.inum = header[4]
273     item.location = None
274     item.type = chr(header[2])
275     item.size = size
276     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
277     links = []
278     link_ids = data[HEADER_SIZE + header[5]
279                     : HEADER_SIZE + header[5] + header[6]]
280     link_locs = data[HEADER_SIZE + header[5] + header[6]
281                      : HEADER_SIZE + sum(header[5:8])]
282     for i in range(len(link_ids) // 16):
283         id = link_ids[16*i : 16*i + 16]
284         if id == '\0' * 16:
285             loc = None
286         else:
287             loc = struct.unpack('<IIII', link_locs[0:16])
288             link_locs = link_locs[16:]
289         links.append((id, loc))
290     item.links = links
291     return item
292
293 def load_item(backend, location):
294     """Load the cloud item pointed at by the 4-tuple 'location'.
295
296     The elements of the tuple are (directory, sequence, offset, size)."""
297
298     filename = backend.loc_to_name((location[0], location[1]))
299     data = backend.read(filename, location[2], location[3])
300     item = parse_item(data)
301     item.location = location
302     return item
303
304 def parse_log(data, location=None):
305     """Parse contents of a log file, yielding a sequence of log items."""
306
307     if isinstance(location, str):
308         m = re.match(r"^log-(\d+)-(\d+)$", location)
309         if m:
310             location = (int(m.group(1)), int(m.group(2)))
311         else:
312             location = None
313
314     offset = 0
315     while len(data) - offset >= HEADER_SIZE:
316         header = struct.unpack_from(HEADER_FORMAT, data, offset)
317         size = HEADER_SIZE + sum(header[5:8])
318         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
319             print "Bad header magic!"
320             break
321         if size + offset > len(data):
322             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
323             break
324         item = parse_item(data[offset : offset + size])
325         if location is not None:
326             item.location = location + (offset, size)
327         if item is not None: yield item
328         offset += size
329
330 def load_checkpoint_record(backend):
331     for (log, size) in reversed(backend.list()):
332         for item in reversed(list(parse_log(backend.read(log), log))):
333             print item
334             if item.type == ITEM_TYPE.CHECKPOINT:
335                 return item
336
337 class InodeMap:
338     def __init__(self):
339         pass
340
341     def build(self, backend, checkpoint_record):
342         """Reconstruct the inode map from the checkpoint record given.
343
344         This will also build up information about segment utilization."""
345
346         self.checkpoint_record = checkpoint_record
347
348         util = UtilizationTracker(backend)
349         util.add_item(checkpoint_record)
350         inodes = {}
351         self.obsolete_segments = set()
352
353         print "Inode map:"
354         for i in range(len(checkpoint_record.data) // 16):
355             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
356             imap = load_item(backend, checkpoint_record.links[i][1])
357             util.add_item(imap)
358             print "[%d, %d]: %s" % (start, end, imap)
359             for j in range(len(imap.data) // 8):
360                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
361                 inode = load_item(backend, imap.links[j][1])
362                 inodes[inum] = inode
363                 data_segments = set()
364                 util.add_item(inode)
365                 for i in inode.links:
366                     util.add_item(i[1])
367                     data_segments.add(i[1][0:2])
368                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
369
370         print
371         print "Segment utilizations:"
372         for (s, u) in sorted(util.segments.items()):
373             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
374             if u[1] == 0:
375                 print "Deleting..."
376                 backend.delete(s)
377
378         self.inodes = inodes
379         self.util = util
380         self.updated_inodes = set()
381
382     def mark_updated(self, inum):
383         self.updated_inodes.add(inum)
384
385     def write(self, backend, log):
386         updated_inodes = sorted(self.updated_inodes, reverse=True)
387
388         new_checkpoint = LogItem()
389         new_checkpoint.id = LogItem.random_id()
390         new_checkpoint.inum = 0
391         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
392         new_checkpoint.data = ""
393         new_checkpoint.links = []
394
395         for i in range(len(self.checkpoint_record.data) // 16):
396             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
397
398             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
399
400             # Case 1: No inodes in this range of the old inode map have
401             # changed.  Simply emit a new pointer to the same inode map block.
402             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
403                 old_location = self.checkpoint_record.links[i][1][0:2]
404                 if old_location not in self.obsolete_segments:
405                     new_checkpoint.links.append(self.checkpoint_record.links[i])
406                     continue
407
408             # Case 2: Some inodes have been updated.  Create a new inode map
409             # block, write it out, and point the new checkpoint at it.
410             inodes = [k for k in self.inodes if k >= start and k <= end]
411             inodes.sort()
412
413             block = LogItem()
414             block.id = LogItem.random_id()
415             block.inum = 0
416             block.type = ITEM_TYPE.INODE_MAP
417             block.links = []
418             block.data = ""
419             for j in inodes:
420                 block.data += struct.pack("<Q", j)
421                 block.links.append((self.inodes[j].id, self.inodes[j].location))
422             log.write(block, 2)
423
424             new_checkpoint.links.append((block.id, block.location))
425
426             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
427                 updated_inodes.pop()
428
429         log.write(new_checkpoint, 2)
430         self.checkpoint_record = new_checkpoint
431
432 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
433     inode = inode_map.inodes[inum]
434     if copy_data:
435         blocks = []
436         for l in inode.links:
437             data = load_item(backend, l[1])
438             blocks.append(data)
439             log.write(data, 0)
440         inode.links = [(b.id, b.location) for b in blocks]
441     log.write(inode, 1)
442     inode_map.mark_updated(inum)
443
444 def run_cleaner(backend, inode_map, log, repack_inodes=False):
445     # Determine which segments are poorly utilized and should be cleaned.  We
446     # need better heuristics here.
447     for (s, u) in sorted(inode_map.util.segments.items()):
448         if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
449             print "Should clean segment", s
450             loc = backend.name_to_loc(s)
451             if s: inode_map.obsolete_segments.add(loc)
452
453     # TODO: We probably also want heuristics that will find inodes with
454     # badly-fragmented data and rewrite that to achieve better locality.
455
456     # Given that list of segments to clean, scan through those segments to find
457     # data which is still live and mark relevant inodes as needing to be
458     # rewritten.
459     if repack_inodes:
460         dirty_inodes = set(inode_map.inodes)
461     else:
462         dirty_inodes = set()
463     dirty_inode_data = set()
464     for s in inode_map.obsolete_segments:
465         filename = backend.loc_to_name(s)
466         print "Scanning", filename, "for live data"
467         for item in parse_log(backend.read(filename), filename):
468             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
469                 if item.inum != 0:
470                     inode = inode_map.inodes[item.inum]
471                     if s == inode.location[0:2]:
472                         dirty_inodes.add(item.inum)
473                     if item.inum not in dirty_inode_data:
474                         for b in inode.links:
475                             if s == b[1][0:2]:
476                                 dirty_inode_data.add(item.inum)
477                                 break
478
479     print "Inodes to rewrite:", dirty_inodes
480     print "Inodes with data to rewrite:", dirty_inode_data
481     for i in sorted(dirty_inodes.union(dirty_inode_data)):
482         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
483
484 if __name__ == '__main__':
485     backend = S3Backend("mvrable-bluesky", cachedir=".")
486     chkpt = load_checkpoint_record(backend)
487     print backend.list()
488     imap = InodeMap()
489     imap.build(backend, chkpt)
490     print chkpt
491
492     log_dir = LogDirectory(backend, 0)
493     run_cleaner(backend, imap, log_dir)
494     imap.write(backend, log_dir)
495     log_dir.close_all()