Fix to the cleaner when writing out a new inode map.
[bluesky.git] / cleaner / cleaner
1 #!/usr/bin/env python
2 #
3 # A simple cleaner for the BlueSky cloud file system.  At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed.  Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
7 #
8 # Copyright (C) 2010  The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
10
11 import base64, os, re, struct, sys, time
12 import boto
13 from boto.s3.key import Key
14
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-'          # Unencrypted data
19 HEADER_MAGIC2 = 'AgI='          # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
21
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
23
24 class ITEM_TYPE:
25     DATA = '1'
26     INODE = '2'
27     INODE_MAP = '3'
28     CHECKPOINT = '4'
29
30 class FileBackend:
31     """An interface to BlueSky where the log segments are on local disk.
32
33     This is mainly intended for testing purposes, as the real cleaner would
34     operate where data is being stored in S3."""
35
36     def __init__(self, path):
37         self.path = path
38
39     def list(self, directory=0):
40         """Return a listing of all log segments and their sizes."""
41
42         prefix = "log-%08d-" % (directory,)
43         files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
44         files.sort()
45
46         return [(f, os.stat(os.path.join(self.path, f)).st_size)
47                 for f in files]
48
49     def read(self, filename, offset=0, length=None):
50         fp = open(os.path.join(self.path, filename), 'rb')
51         if offset > 0:
52             fp.seek(offset)
53         if length is None:
54             return fp.read()
55         else:
56             return fp.read(length)
57
58     def write(self, filename, data):
59         fp = open(os.path.join(self.path, filename), 'wb')
60         fp.write(data)
61         fp.close()
62
63     def delete(self, filename):
64         os.unlink(os.path.join(self.path, filename))
65
66     def loc_to_name(self, location):
67         return "log-%08d-%08d" % (location)
68
69     def name_to_loc(self, name):
70         m = re.match(r"^log-(\d+)-(\d+)$", name)
71         if m: return (int(m.group(1)), int(m.group(2)))
72
73 def retry_wrap(method):
74     def wrapped(self, *args, **kwargs):
75         for retries in range(3):
76             try:
77                 return method(self, *args, **kwargs)
78             except:
79                 print >>sys.stderr, "S3 operation failed, retrying..."
80                 self.connect()
81                 time.sleep(1.0)
82         return method(self, *args, **kwargs)
83     return wrapped
84
85 class S3Backend:
86     """An interface to BlueSky where the log segments are on in Amazon S3."""
87
88     def __init__(self, bucket, path='', cachedir="."):
89         self.bucket_name = bucket
90         self.path = path
91         self.cachedir = cachedir
92         self.cache = {}
93         self.connect()
94
95     def connect(self):
96         self.conn = boto.connect_s3(is_secure=False)
97         self.bucket = self.conn.get_bucket(self.bucket_name)
98
99     def list(self, directory=0):
100         files = []
101         prefix = "log-%08d-" % (directory,)
102         for k in self.bucket.list(self.path + prefix):
103             files.append((k.key, k.size))
104         return files
105
106     @retry_wrap
107     def read(self, filename, offset=0, length=None):
108         if filename in self.cache:
109             fp = open(os.path.join(self.cachedir, filename), 'rb')
110             if offset > 0:
111                 fp.seek(offset)
112             if length is None:
113                 return fp.read()
114             else:
115                 return fp.read(length)
116         else:
117             k = Key(self.bucket)
118             k.key = self.path + filename
119             data = k.get_contents_as_string()
120             fp = open(os.path.join(self.cachedir, filename), 'wb')
121             fp.write(data)
122             fp.close()
123             self.cache[filename] = True
124             if offset > 0:
125                 data = data[offset:]
126             if length is not None:
127                 data = data[0:length]
128             return data
129
130     @retry_wrap
131     def write(self, filename, data):
132         k = Key(self.bucket)
133         k.key = self.path + filename
134         k.set_contents_from_string(data)
135         if filename in self.cache:
136             del self.cache[filename]
137
138     @retry_wrap
139     def delete(self, filename):
140         k = Key(self.bucket)
141         k.key = self.path + filename
142         k.delete()
143         if filename in self.cache:
144             del self.cache[filename]
145
146     def loc_to_name(self, location):
147         return "log-%08d-%08d" % (location)
148
149     def name_to_loc(self, name):
150         m = re.match(r"^log-(\d+)-(\d+)$", name)
151         if m: return (int(m.group(1)), int(m.group(2)))
152
153 class LogItem:
154     """In-memory representation of a single item stored in a log file."""
155
156     def __init__(self):
157         self.cryptkeys = '\0' * HEADER_CRYPTBYTES
158         self.encrypted = False
159
160     def __str__(self):
161         return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
162
163     @staticmethod
164     def random_id():
165         return open('/dev/urandom').read(16)
166
167     def serialize(self):
168         link_ids = []
169         link_locs = []
170         for (i, l) in self.links:
171             link_ids.append(i)
172             if i != '\0' * 16:
173                 link_locs.append(struct.pack('<IIII', *l))
174         link_ids = ''.join(link_ids)
175         link_locs = ''.join(link_locs)
176
177         if self.encrypted:
178             magic = HEADER_MAGIC2
179         else:
180             magic = HEADER_MAGIC1
181         header = struct.pack(HEADER_FORMAT,
182                              magic, self.cryptkeys,
183                              ord(self.type), self.id, self.inum,
184                              len(self.data), len(link_ids), len(link_locs))
185         return header + self.data + link_ids + link_locs
186
187 class LogSegment:
188     def __init__(self, backend, location):
189         self.backend = backend
190         self.location = location
191         self.data = []
192
193     def __len__(self):
194         return sum(len(s) for s in self.data)
195
196     def write(self, item):
197         data = item.serialize()
198         offset = len(self)
199         self.data.append(data)
200         item.location = self.location + (offset, len(data))
201
202     def close(self):
203         data = ''.join(self.data)
204         filename = self.backend.loc_to_name(self.location)
205         print "Would write %d bytes of data to %s" % (len(data), filename)
206         self.backend.write(filename, data)
207
208 class LogDirectory:
209     TARGET_SIZE = 4 << 20
210
211     def __init__(self, backend, dir):
212         self.backend = backend
213         self.dir_num = dir
214         self.seq_num = 0
215         for logname in backend.list(dir):
216             print "Old log file:", logname
217             loc = backend.name_to_loc(logname[0])
218             if loc is not None and loc[0] == dir:
219                 self.seq_num = max(self.seq_num, loc[1] + 1)
220         self.groups = {}
221         print "Starting sequence number is", self.seq_num
222
223     def open_segment(self):
224         seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
225         self.seq_num += 1
226         return seg
227
228     def write(self, item, segment_group=0):
229         if segment_group not in self.groups:
230             self.groups[segment_group] = self.open_segment()
231         seg = self.groups[segment_group]
232         seg.write(item)
233         if len(seg) >= LogDirectory.TARGET_SIZE:
234             seg.close()
235             del self.groups[segment_group]
236
237     def close_all(self):
238         for k in list(self.groups.keys()):
239             self.groups[k].close()
240             del self.groups[k]
241
242 class UtilizationTracker:
243     """A simple object that tracks what fraction of each segment is used.
244
245     This data can be used to guide segment cleaning decisions."""
246
247     def __init__(self, backend):
248         self.segments = {}
249         for (segment, size) in backend.list(0) + backend.list(1):
250             self.segments[segment] = [size, 0]
251
252     def add_item(self, item):
253         if isinstance(item, LogItem):
254             item = item.location
255         if item is None: return
256         (dir, seq, offset, size) = item
257         filename = "log-%08d-%08d" % (dir, seq)
258         self.segments[filename][1] += size
259
260 def parse_item(data):
261     if len(data) < HEADER_SIZE: return
262     header = struct.unpack_from(HEADER_FORMAT, data, 0)
263     size = HEADER_SIZE + sum(header[5:8])
264
265     if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
266         print "Bad header magic!"
267         return
268
269     if len(data) != size:
270         print "Item size does not match: %d != %d" % (size, len(data))
271         return
272
273     item = LogItem()
274     if header[0] == HEADER_MAGIC2: item.encrypted = True
275     item.cryptkeys = header[1]
276     item.id = header[3]
277     item.inum = header[4]
278     item.location = None
279     item.type = chr(header[2])
280     item.size = size
281     item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
282     links = []
283     link_ids = data[HEADER_SIZE + header[5]
284                     : HEADER_SIZE + header[5] + header[6]]
285     link_locs = data[HEADER_SIZE + header[5] + header[6]
286                      : HEADER_SIZE + sum(header[5:8])]
287     for i in range(len(link_ids) // 16):
288         id = link_ids[16*i : 16*i + 16]
289         if id == '\0' * 16:
290             loc = None
291         else:
292             loc = struct.unpack('<IIII', link_locs[0:16])
293             link_locs = link_locs[16:]
294         links.append((id, loc))
295     item.links = links
296     return item
297
298 def load_item(backend, location):
299     """Load the cloud item pointed at by the 4-tuple 'location'.
300
301     The elements of the tuple are (directory, sequence, offset, size)."""
302
303     filename = backend.loc_to_name((location[0], location[1]))
304     data = backend.read(filename, location[2], location[3])
305     item = parse_item(data)
306     item.location = location
307     return item
308
309 def parse_log(data, location=None):
310     """Parse contents of a log file, yielding a sequence of log items."""
311
312     if isinstance(location, str):
313         m = re.match(r"^log-(\d+)-(\d+)$", location)
314         if m:
315             location = (int(m.group(1)), int(m.group(2)))
316         else:
317             location = None
318
319     offset = 0
320     while len(data) - offset >= HEADER_SIZE:
321         header = struct.unpack_from(HEADER_FORMAT, data, offset)
322         size = HEADER_SIZE + sum(header[5:8])
323         if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
324             print "Bad header magic!"
325             break
326         if size + offset > len(data):
327             print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
328             break
329         item = parse_item(data[offset : offset + size])
330         if location is not None:
331             item.location = location + (offset, size)
332         if item is not None: yield item
333         offset += size
334
335 def load_checkpoint_record(backend, directory=0):
336     for (log, size) in reversed(backend.list(directory)):
337         for item in reversed(list(parse_log(backend.read(log), log))):
338             print item
339             if item.type == ITEM_TYPE.CHECKPOINT:
340                 return item
341
342 class InodeMap:
343     def __init__(self):
344         pass
345
346     def build(self, backend, checkpoint_record):
347         """Reconstruct the inode map from the checkpoint record given.
348
349         This will also build up information about segment utilization."""
350
351         self.version_vector = {}
352         self.checkpoint_record = checkpoint_record
353
354         util = UtilizationTracker(backend)
355         util.add_item(checkpoint_record)
356         inodes = {}
357         self.obsolete_segments = set()
358
359         data = checkpoint_record.data
360         if not data.startswith(CHECKPOINT_MAGIC):
361             raise ValueError, "Invalid checkpoint record!"
362         data = data[len(CHECKPOINT_MAGIC):]
363         (vvlen,) = struct.unpack_from("<I", data, 0)
364         self.vvsize = 4 + 8*vvlen
365         for i in range(vvlen):
366             (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
367             self.version_vector[v1] = v2
368         print self.version_vector
369         self.version_vector[checkpoint_record.location[0]] \
370             = checkpoint_record.location[1]
371         print self.version_vector
372
373         data = data[self.vvsize:]
374
375         print "Inode map:"
376         for i in range(len(data) // 16):
377             (start, end) = struct.unpack_from("<QQ", data, 16*i)
378             imap = load_item(backend, checkpoint_record.links[i][1])
379             util.add_item(imap)
380             print "[%d, %d]: %s" % (start, end, imap)
381             for j in range(len(imap.data) // 8):
382                 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
383                 inode = load_item(backend, imap.links[j][1])
384                 inodes[inum] = inode
385                 data_segments = set()
386                 util.add_item(inode)
387                 for i in inode.links:
388                     util.add_item(i[1])
389                     data_segments.add(i[1][0:2])
390                 print "  %d: %s (%d data segments)" % (inum, inode, len(data_segments))
391
392         print
393         print "Segment utilizations:"
394         for (s, u) in sorted(util.segments.items()):
395             print "%s: %s %s" % (s, u, float(u[1]) / u[0])
396             if u[1] == 0:
397                 # print "Deleting..."
398                 # backend.delete(s)
399                 pass
400
401         self.inodes = inodes
402         self.util = util
403         self.updated_inodes = set()
404
405     def mark_updated(self, inum):
406         self.updated_inodes.add(inum)
407
408     def write(self, backend, log):
409         updated_inodes = sorted(self.updated_inodes, reverse=True)
410
411         new_checkpoint = LogItem()
412         new_checkpoint.id = LogItem.random_id()
413         new_checkpoint.inum = 0
414         new_checkpoint.type = ITEM_TYPE.CHECKPOINT
415         new_checkpoint.data = CHECKPOINT_MAGIC
416         new_checkpoint.links = []
417
418         new_checkpoint.data += struct.pack('<I', len(self.version_vector))
419         for d in sorted(self.version_vector):
420             new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
421
422         data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
423         for i in range(len(data) // 16):
424             (start, end) = struct.unpack_from("<QQ", data, 16*i)
425
426             new_checkpoint.data += data[16*i : 16*i + 16]
427
428             # Case 1: No inodes in this range of the old inode map have
429             # changed.  Simply emit a new pointer to the same inode map block.
430             if len(updated_inodes) == 0 or updated_inodes[-1] > end:
431                 old_location = self.checkpoint_record.links[i][1][0:2]
432                 if old_location not in self.obsolete_segments:
433                     new_checkpoint.links.append(self.checkpoint_record.links[i])
434                     continue
435
436             # Case 2: Some inodes have been updated.  Create a new inode map
437             # block, write it out, and point the new checkpoint at it.
438             inodes = [k for k in self.inodes if k >= start and k <= end]
439             inodes.sort()
440
441             block = LogItem()
442             block.id = LogItem.random_id()
443             block.inum = 0
444             block.type = ITEM_TYPE.INODE_MAP
445             block.links = []
446             block.data = ""
447             for j in inodes:
448                 block.data += struct.pack("<Q", j)
449                 block.links.append((self.inodes[j].id, self.inodes[j].location))
450             log.write(block, 2)
451
452             new_checkpoint.links.append((block.id, block.location))
453
454             while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
455                 updated_inodes.pop()
456
457         log.write(new_checkpoint, 2)
458         self.checkpoint_record = new_checkpoint
459
460 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
461     inode = inode_map.inodes[inum]
462     if copy_data:
463         blocks = []
464         for l in inode.links:
465             data = load_item(backend, l[1])
466             blocks.append(data)
467             log.write(data, 0)
468         inode.links = [(b.id, b.location) for b in blocks]
469     log.write(inode, 1)
470     inode_map.mark_updated(inum)
471
472 def run_cleaner(backend, inode_map, log, repack_inodes=False):
473     # Determine which segments are poorly utilized and should be cleaned.  We
474     # need better heuristics here.
475     for (s, u) in sorted(inode_map.util.segments.items()):
476         if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
477             print "Should clean segment", s
478             loc = backend.name_to_loc(s)
479             if s: inode_map.obsolete_segments.add(loc)
480
481     # TODO: We probably also want heuristics that will find inodes with
482     # badly-fragmented data and rewrite that to achieve better locality.
483
484     # Given that list of segments to clean, scan through those segments to find
485     # data which is still live and mark relevant inodes as needing to be
486     # rewritten.
487     if repack_inodes:
488         dirty_inodes = set(inode_map.inodes)
489     else:
490         dirty_inodes = set()
491     dirty_inode_data = set()
492     for s in inode_map.obsolete_segments:
493         filename = backend.loc_to_name(s)
494         print "Scanning", filename, "for live data"
495         for item in parse_log(backend.read(filename), filename):
496             if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
497                 if item.inum != 0:
498                     inode = inode_map.inodes[item.inum]
499                     if s == inode.location[0:2]:
500                         dirty_inodes.add(item.inum)
501                     if item.inum not in dirty_inode_data:
502                         for b in inode.links:
503                             if s == b[1][0:2]:
504                                 dirty_inode_data.add(item.inum)
505                                 break
506
507     print "Inodes to rewrite:", dirty_inodes
508     print "Inodes with data to rewrite:", dirty_inode_data
509     for i in sorted(dirty_inodes.union(dirty_inode_data)):
510         rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
511
512 if __name__ == '__main__':
513     backend = S3Backend("mvrable-bluesky", cachedir=".")
514     #backend = FileBackend(".")
515     chkpt = load_checkpoint_record(backend)
516     print backend.list()
517     imap = InodeMap()
518     imap.build(backend, chkpt)
519     print chkpt
520
521     log_dir = LogDirectory(backend, 1)
522     run_cleaner(backend, imap, log_dir)
523     print "Version vector:", imap.version_vector
524     imap.write(backend, log_dir)
525     log_dir.close_all()