Improve cleaner performance.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC = 'AgI-'
19 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
20
21 class ITEM_TYPE:
22     DATA = '1'
23     INODE = '2'
24     INODE_MAP = '3'
25     CHECKPOINT = '4'
26
27 class FileBackend:
28     """An interface to BlueSky where the log segments are on local disk.
29
30     This is mainly intended for testing purposes, as the real cleaner would
31     operate where data is being stored in S3."""
32
33     def __init__(self, path):
34         self.path = path
35
36     def list(self):
37         """Return a listing of all log segments and their sizes."""
38
39         files = [f for f in os.listdir(self.path) if f.startswith('log-')]
40         files.sort()
41
42         return [(f, os.stat(os.path.join(self.path, f)).st_size)
43                 for f in files]
44
45     def read(self, filename, offset=0, length=None):
46         fp = open(os.path.join(self.path, filename), 'rb')
47         if offset > 0:
48             fp.seek(offset)
49         if legnth is None:
50             return fp.read()
51         else:
52             return fp.read(length)
53
54     def write(self, filename, data):
55         fp = open(os.path.join(self.path, filename), 'wb')
56         fp.write(data)
57         fp.close()
58
59     def delete(self, filename):
60         os.unlink(os.path.join(self.path, filename))
61
62     def loc_to_name(self, location):
63         return "log-%08d-%08d" % (location)
64
65     def name_to_loc(self, name):
66         m = re.match(r"^log-(\d+)-(\d+)$", name)
67         if m: return (int(m.group(1)), int(m.group(2)))
68
69 class S3Backend:
70     """An interface to BlueSky where the log segments are on in Amazon S3."""
71
72     def __init__(self, bucket, path='', cachedir="."):
73         self.conn = boto.connect_s3(is_secure=False)
74         self.bucket = self.conn.get_bucket(bucket)
75         self.path = path
76         self.cachedir = cachedir
77         self.cache = {}
78
79     def list(self):
80         files = []
81         for k in self.bucket.list(self.path + 'log-'):
82             files.append((k.key, k.size))
83         return files
84
85     def read(self, filename, offset=0, length=None):
86         if filename in self.cache:
87             fp = open(os.path.join(self.cachedir, filename), 'rb')
88             if offset > 0:
89                 fp.seek(offset)
90             if length is None:
91                 return fp.read()
92             else:
93                 return fp.read(length)
94         else:
95             k = Key(self.bucket)
96             k.key = self.path + filename
97             data = k.get_contents_as_string()
98             fp = open(os.path.join(self.cachedir, filename), 'wb')
99             fp.write(data)
100             fp.close()
101             self.cache[filename] = True
102             if offset > 0:
103                 data = data[offset:]
104             if length is not None:
105                 data = data[0:length]
106             return data
107
108     def write(self, filename, data):
109         k = Key(self.bucket)
110         k.key = self.path + filename
111         k.set_contents_from_string(data)
112         if filename in self.cache:
113             del self.cache[filename]
114
115     def delete(self, filename):
116         k = Key(self.bucket)
117         k.key = self.path + filename
118         k.delete()
119         if filename in self.cache:
120             del self.cache[filename]
121
122     def loc_to_name(self, location):
123         return "log-%08d-%08d" % (location)
124
125     def name_to_loc(self, name):
126         m = re.match(r"^log-(\d+)-(\d+)$", name)
127         if m: return (int(m.group(1)), int(m.group(2)))
128
129 class LogItem:
130     """In-memory representation of a single item stored in a log file."""
131
132     def __init__(self):
133         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
134
135     def __str__(self):
136         return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
137
138     @staticmethod
139     def random_id():
140         return open('/dev/urandom').read(16)
141
142     def serialize(self):
143         link_ids = []
144         link_locs = []
145         for (i, l) in self.links:
146             link_ids.append(i)
147             if i != '\0' * 16:
148                 link_locs.append(struct.pack('<IIII', *l))
149         link_ids = ''.join(link_ids)
150         link_locs = ''.join(link_locs)
151
152         header = struct.pack(HEADER_FORMAT,
153                              HEADER_MAGIC, self.cryptkeys,
154                              ord(self.type), self.id, self.inum,
155                              len(self.data), len(link_ids), len(link_locs))
156         return header + self.data + link_ids + link_locs
157
158 class LogSegment:
159     def __init__(self, backend, location):
160         self.backend = backend
161         self.location = location
162         self.data = []
163
164     def __len__(self):
165         return sum(len(s) for s in self.data)
166
167     def write(self, item):
168         data = item.serialize()
169         offset = len(self)
170         self.data.append(data)
171         item.location = self.location + (offset, len(data))
172
173     def close(self):
174         data = ''.join(self.data)
175         filename = self.backend.loc_to_name(self.location)
176         print "Would write %d bytes of data to %s" % (len(data), filename)
177         self.backend.write(filename, data)
178
179 class LogDirectory:
180     TARGET_SIZE = 4 << 20
181
182     def __init__(self, backend, dir):
183         self.backend = backend
184         self.dir_num = dir
185         self.seq_num = 0
186         for logname in backend.list():
187             loc = backend.name_to_loc(logname[0])
188             if loc is not None and loc[0] == dir:
189                 self.seq_num = max(self.seq_num, loc[1] + 1)
190         self.groups = {}
191         print "Starting sequence number is", self.seq_num
192
193     def open_segment(self):
194         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
195         self.seq_num += 1
196         return seg
197
198     def write(self, item, segment_group=0):
199         if segment_group not in self.groups:
200             self.groups[segment_group] = self.open_segment()
201         seg = self.groups[segment_group]
202         seg.write(item)
203         if len(seg) >= LogDirectory.TARGET_SIZE:
204             seg.close()
205             del self.groups[segment_group]
206
207     def close_all(self):
208         for k in list(self.groups.keys()):
209             self.groups[k].close()
210             del self.groups[k]
211
212 class UtilizationTracker:
213     """A simple object that tracks what fraction of each segment is used.
214
215     This data can be used to guide segment cleaning decisions."""
216
217     def __init__(self, backend):
218         self.segments = {}
219         for (segment, size) in backend.list():
220             self.segments[segment] = [size, 0]
221
222     def add_item(self, item):
223         if isinstance(item, LogItem):
224             item = item.location
225         if item is None: return
226         (dir, seq, offset, size) = item
227         filename = "log-%08d-%08d" % (dir, seq)
228         self.segments[filename][1] += size
229
230 def parse_item(data):
231     if len(data) < HEADER_SIZE: return
232     header = struct.unpack_from(HEADER_FORMAT, data, 0)
233     size = HEADER_SIZE + sum(header[5:8])
234
235     if header[0] != HEADER_MAGIC:
236         print "Bad header magic!"
237         return
238
239     if len(data) != size:
240         print "Item size does not match: %d != %d" % (size, len(data))
241         return
242
243     item = LogItem()
244     item.cryptkeys = header[1]
245     item.id = header[3]
246     item.inum = header[4]
247     item.location = None
248     item.type = chr(header[2])
249     item.size = size
250     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
251     links = []
252     link_ids = data[HEADER_SIZE + header[5]
253                     : HEADER_SIZE + header[5] + header[6]]
254     link_locs = data[HEADER_SIZE + header[5] + header[6]
255                      : HEADER_SIZE + sum(header[5:8])]
256     for i in range(len(link_ids) // 16):
257         id = link_ids[16*i : 16*i + 16]
258         if id == '\0' * 16:
259             loc = None
260         else:
261             loc = struct.unpack('<IIII', link_locs[0:16])
262             link_locs = link_locs[16:]
263         links.append((id, loc))
264     item.links = links
265     return item
266
267 def load_item(backend, location):
268     """Load the cloud item pointed at by the 4-tuple 'location'.
269
270     The elements of the tuple are (directory, sequence, offset, size)."""
271
272     filename = backend.loc_to_name((location[0], location[1]))
273     data = backend.read(filename, location[2], location[3])
274     item = parse_item(data)
275     item.location = location
276     return item
277
278 def parse_log(data, location=None):
279     """Parse contents of a log file, yielding a sequence of log items."""
280
281     if isinstance(location, str):
282         m = re.match(r"^log-(\d+)-(\d+)$", location)
283         if m:
284             location = (int(m.group(1)), int(m.group(2)))
285         else:
286             location = None
287
288     offset = 0
289     while len(data) - offset >= HEADER_SIZE:
290         header = struct.unpack_from(HEADER_FORMAT, data, offset)
291         size = HEADER_SIZE + sum(header[5:8])
292         if header[0] != HEADER_MAGIC:
293             print "Bad header magic!"
294             break
295         if size + offset > len(data):
296             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
297             break
298         item = parse_item(data[offset : offset + size])
299         if location is not None:
300             item.location = location + (offset, size)
301         if item is not None: yield item
302         offset += size
303
304 def load_checkpoint_record(backend):
305     for (log, size) in reversed(backend.list()):
306         for item in reversed(list(parse_log(backend.read(log), log))):
307             print item
308             if item.type == ITEM_TYPE.CHECKPOINT:
309                 return item
310
311 class InodeMap:
312     def __init__(self):
313         pass
314
315     def build(self, backend, checkpoint_record):
316         """Reconstruct the inode map from the checkpoint record given.
317
318         This will also build up information about segment utilization."""
319
320         self.checkpoint_record = checkpoint_record
321
322         util = UtilizationTracker(backend)
323         util.add_item(checkpoint_record)
324         inodes = {}
325         self.obsolete_segments = set()
326
327         print "Inode map:"
328         for i in range(len(checkpoint_record.data) // 16):
329             (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
330             imap = load_item(backend, checkpoint_record.links[i][1])
331             util.add_item(imap)
332             print "[%d, %d]: %s" % (start, end, imap)
333             for j in range(len(imap.data) // 8):
334                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
335                 inode = load_item(backend, imap.links[j][1])
336                 inodes[inum] = inode
337                 data_segments = set()
338                 util.add_item(inode)
339                 for i in inode.links:
340                     util.add_item(i[1])
341                     data_segments.add(i[1][0:2])
342                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
343
344         print
345         print "Segment utilizations:"
346         for (s, u) in sorted(util.segments.items()):
347             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
348             if u[1] == 0:
349                 print "Deleting..."
350                 backend.delete(s)
351
352         self.inodes = inodes
353         self.util = util
354         self.updated_inodes = set()
355
356     def mark_updated(self, inum):
357         self.updated_inodes.add(inum)
358
359     def write(self, backend, log):
360         updated_inodes = sorted(self.updated_inodes, reverse=True)
361
362         new_checkpoint = LogItem()
363         new_checkpoint.id = LogItem.random_id()
364         new_checkpoint.inum = 0
365         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
366         new_checkpoint.data = ""
367         new_checkpoint.links = []
368
369         for i in range(len(self.checkpoint_record.data) // 16):
370             (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
371
372             new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
373
374             # Case 1: No inodes in this range of the old inode map have
375             # changed.  Simply emit a new pointer to the same inode map block.
376             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
377                 old_location = self.checkpoint_record.links[i][1][0:2]
378                 if old_location not in self.obsolete_segments:
379                     new_checkpoint.links.append(self.checkpoint_record.links[i])
380                     continue
381
382             # Case 2: Some inodes have been updated.  Create a new inode map
383             # block, write it out, and point the new checkpoint at it.
384             inodes = [k for k in self.inodes if k >= start and k <= end]
385             inodes.sort()
386
387             block = LogItem()
388             block.id = LogItem.random_id()
389             block.inum = 0
390             block.type = ITEM_TYPE.INODE_MAP
391             block.links = []
392             block.data = ""
393             for j in inodes:
394                 block.data += struct.pack("<Q", j)
395                 block.links.append((self.inodes[j].id, self.inodes[j].location))
396             log.write(block, 2)
397
398             new_checkpoint.links.append((block.id, block.location))
399
400             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
401                 updated_inodes.pop()
402
403         log.write(new_checkpoint, 2)
404         self.checkpoint_record = new_checkpoint
405
406 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
407     inode = inode_map.inodes[inum]
408     if copy_data:
409         blocks = []
410         for l in inode.links:
411             data = load_item(backend, l[1])
412             blocks.append(data)
413             log.write(data, 0)
414         inode.links = [(b.id, b.location) for b in blocks]
415     log.write(inode, 1)
416     inode_map.mark_updated(inum)
417
418 def run_cleaner(backend, inode_map, log, repack_inodes=False):
419     # Determine which segments are poorly utilized and should be cleaned.  We
420     # need better heuristics here.
421     for (s, u) in sorted(inode_map.util.segments.items()):
422         if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
423             print "Should clean segment", s
424             loc = backend.name_to_loc(s)
425             if s: inode_map.obsolete_segments.add(loc)
426
427     # TODO: We probably also want heuristics that will find inodes with
428     # badly-fragmented data and rewrite that to achieve better locality.
429
430     # Given that list of segments to clean, scan through those segments to find
431     # data which is still live and mark relevant inodes as needing to be
432     # rewritten.
433     if repack_inodes:
434         dirty_inodes = set(inode_map.inodes)
435     else:
436         dirty_inodes = set()
437     dirty_inode_data = set()
438     for s in inode_map.obsolete_segments:
439         filename = backend.loc_to_name(s)
440         print "Scanning", filename, "for live data"
441         for item in parse_log(backend.read(filename), filename):
442             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
443                 if item.inum != 0:
444                     inode = inode_map.inodes[item.inum]
445                     if s == inode.location[0:2]:
446                         dirty_inodes.add(item.inum)
447                     if item.inum not in dirty_inode_data:
448                         for b in inode.links:
449                             if s == b[1][0:2]:
450                                 dirty_inode_data.add(item.inum)
451                                 break
452
453     print "Inodes to rewrite:", dirty_inodes
454     print "Inodes with data to rewrite:", dirty_inode_data
455     for i in sorted(dirty_inodes.union(dirty_inode_data)):
456         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
457
458 if __name__ == '__main__':
459     backend = S3Backend("mvrable-bluesky", cachedir=".")
460     chkpt = load_checkpoint_record(backend)
461     print backend.list()
462     imap = InodeMap()
463     imap.build(backend, chkpt)
464     print chkpt
465
466     log_dir = LogDirectory(backend, 0)
467     run_cleaner(backend, imap, log_dir)
468     imap.write(backend, log_dir)
469     log_dir.close_all()