3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
19 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
28 """An interface to BlueSky where the log segments are on local disk.
30 This is mainly intended for testing purposes, as the real cleaner would
31 operate where data is being stored in S3."""
33 def __init__(self, path):
37 """Return a listing of all log segments and their sizes."""
39 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
42 return [(f, os.stat(os.path.join(self.path, f)).st_size)
45 def read(self, filename):
46 fp = open(os.path.join(self.path, filename), 'rb')
49 def write(self, filename, data):
50 fp = open(os.path.join(self.path, filename), 'wb')
54 def delete(self, filename):
55 os.unlink(os.path.join(self.path, filename))
57 def loc_to_name(self, location):
58 return "log-%08d-%08d" % (location)
60 def name_to_loc(self, name):
61 m = re.match(r"^log-(\d+)-(\d+)$", name)
62 if m: return (int(m.group(1)), int(m.group(2)))
65 """An interface to BlueSky where the log segments are on in Amazon S3."""
67 def __init__(self, bucket, path='', cachedir="."):
68 self.conn = boto.connect_s3(is_secure=False)
69 self.bucket = self.conn.get_bucket(bucket)
71 self.cachedir = cachedir
76 for k in self.bucket.list(self.path + 'log-'):
77 files.append((k.key, k.size))
80 def read(self, filename):
81 if filename in self.cache:
82 fp = open(os.path.join(self.cachedir, filename), 'rb')
86 k.key = self.path + filename
87 data = k.get_contents_as_string()
88 fp = open(os.path.join(self.cachedir, filename), 'wb')
91 self.cache[filename] = True
94 def write(self, filename, data):
96 k.key = self.path + filename
97 k.set_contents_from_string(data)
98 if filename in self.cache:
99 del self.cache[filename]
101 def delete(self, filename):
103 k.key = self.path + filename
105 if filename in self.cache:
106 del self.cache[filename]
108 def loc_to_name(self, location):
109 return "log-%08d-%08d" % (location)
111 def name_to_loc(self, name):
112 m = re.match(r"^log-(\d+)-(\d+)$", name)
113 if m: return (int(m.group(1)), int(m.group(2)))
116 """In-memory representation of a single item stored in a log file."""
119 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
122 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
126 return open('/dev/urandom').read(16)
131 for (i, l) in self.links:
134 link_locs.append(struct.pack('<IIII', *l))
135 link_ids = ''.join(link_ids)
136 link_locs = ''.join(link_locs)
138 header = struct.pack(HEADER_FORMAT,
139 HEADER_MAGIC, self.cryptkeys,
140 ord(self.type), self.id, self.inum,
141 len(self.data), len(link_ids), len(link_locs))
142 return header + self.data + link_ids + link_locs
145 def __init__(self, backend, location):
146 self.backend = backend
147 self.location = location
151 return sum(len(s) for s in self.data)
153 def write(self, item):
154 data = item.serialize()
156 self.data.append(data)
157 item.location = self.location + (offset, len(data))
160 data = ''.join(self.data)
161 filename = self.backend.loc_to_name(self.location)
162 print "Would write %d bytes of data to %s" % (len(data), filename)
163 self.backend.write(filename, data)
166 TARGET_SIZE = 4 << 20
168 def __init__(self, backend, dir):
169 self.backend = backend
172 for logname in backend.list():
173 loc = backend.name_to_loc(logname[0])
174 if loc is not None and loc[0] == dir:
175 self.seq_num = max(self.seq_num, loc[1] + 1)
177 print "Starting sequence number is", self.seq_num
179 def open_segment(self):
180 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
184 def write(self, item, segment_group=0):
185 if segment_group not in self.groups:
186 self.groups[segment_group] = self.open_segment()
187 seg = self.groups[segment_group]
189 if len(seg) >= LogDirectory.TARGET_SIZE:
191 del self.groups[segment_group]
194 for k in list(self.groups.keys()):
195 self.groups[k].close()
198 class UtilizationTracker:
199 """A simple object that tracks what fraction of each segment is used.
201 This data can be used to guide segment cleaning decisions."""
203 def __init__(self, backend):
205 for (segment, size) in backend.list():
206 self.segments[segment] = [size, 0]
208 def add_item(self, item):
209 if isinstance(item, LogItem):
211 if item is None: return
212 (dir, seq, offset, size) = item
213 filename = "log-%08d-%08d" % (dir, seq)
214 self.segments[filename][1] += size
216 def parse_item(data):
217 if len(data) < HEADER_SIZE: return
218 header = struct.unpack_from(HEADER_FORMAT, data, 0)
219 size = HEADER_SIZE + sum(header[5:8])
221 if header[0] != HEADER_MAGIC:
222 print "Bad header magic!"
225 if len(data) != size:
226 print "Item size does not match: %d != %d" % (size, len(data))
230 item.cryptkeys = header[1]
232 item.inum = header[4]
234 item.type = chr(header[2])
236 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
238 link_ids = data[HEADER_SIZE + header[5]
239 : HEADER_SIZE + header[5] + header[6]]
240 link_locs = data[HEADER_SIZE + header[5] + header[6]
241 : HEADER_SIZE + sum(header[5:8])]
242 for i in range(len(link_ids) // 16):
243 id = link_ids[16*i : 16*i + 16]
247 loc = struct.unpack('<IIII', link_locs[0:16])
248 link_locs = link_locs[16:]
249 links.append((id, loc))
253 def load_item(backend, location):
254 """Load the cloud item pointed at by the 4-tuple 'location'.
256 The elements of the tuple are (directory, sequence, offset, size)."""
258 filename = backend.loc_to_name((location[0], location[1]))
259 data = backend.read(filename)[location[2] : location[2] + location[3]]
260 item = parse_item(data)
261 item.location = location
264 def parse_log(data, location=None):
265 """Parse contents of a log file, yielding a sequence of log items."""
267 if isinstance(location, str):
268 m = re.match(r"^log-(\d+)-(\d+)$", location)
270 location = (int(m.group(1)), int(m.group(2)))
275 while len(data) - offset >= HEADER_SIZE:
276 header = struct.unpack_from(HEADER_FORMAT, data, offset)
277 size = HEADER_SIZE + sum(header[5:8])
278 if header[0] != HEADER_MAGIC:
279 print "Bad header magic!"
281 if size + offset > len(data):
282 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
284 item = parse_item(data[offset : offset + size])
285 if location is not None:
286 item.location = location + (offset, size)
287 if item is not None: yield item
290 def load_checkpoint_record(backend):
291 for (log, size) in reversed(backend.list()):
292 for item in reversed(list(parse_log(backend.read(log), log))):
294 if item.type == ITEM_TYPE.CHECKPOINT:
301 def build(self, backend, checkpoint_record):
302 """Reconstruct the inode map from the checkpoint record given.
304 This will also build up information about segment utilization."""
306 self.checkpoint_record = checkpoint_record
308 util = UtilizationTracker(backend)
309 util.add_item(checkpoint_record)
311 self.obsolete_segments = set()
314 for i in range(len(checkpoint_record.data) // 16):
315 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
316 imap = load_item(backend, checkpoint_record.links[i][1])
318 print "[%d, %d]: %s" % (start, end, imap)
319 for j in range(len(imap.data) // 8):
320 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
321 inode = load_item(backend, imap.links[j][1])
323 data_segments = set()
325 for i in inode.links:
327 data_segments.add(i[1][0:2])
328 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
331 print "Segment utilizations:"
332 for (s, u) in sorted(util.segments.items()):
333 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
340 self.updated_inodes = set()
342 def mark_updated(self, inum):
343 self.updated_inodes.add(inum)
345 def write(self, backend, log):
346 updated_inodes = sorted(self.updated_inodes, reverse=True)
348 new_checkpoint = LogItem()
349 new_checkpoint.id = LogItem.random_id()
350 new_checkpoint.inum = 0
351 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
352 new_checkpoint.data = ""
353 new_checkpoint.links = []
355 for i in range(len(self.checkpoint_record.data) // 16):
356 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
358 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
360 # Case 1: No inodes in this range of the old inode map have
361 # changed. Simply emit a new pointer to the same inode map block.
362 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
363 old_location = self.checkpoint_record.links[i][1][0:2]
364 if old_location not in self.obsolete_segments:
365 new_checkpoint.links.append(self.checkpoint_record.links[i])
368 # Case 2: Some inodes have been updated. Create a new inode map
369 # block, write it out, and point the new checkpoint at it.
370 inodes = [k for k in self.inodes if k >= start and k <= end]
374 block.id = LogItem.random_id()
376 block.type = ITEM_TYPE.INODE_MAP
380 block.data += struct.pack("<Q", j)
381 block.links.append((self.inodes[j].id, self.inodes[j].location))
384 new_checkpoint.links.append((block.id, block.location))
386 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
389 log.write(new_checkpoint, 2)
390 self.checkpoint_record = new_checkpoint
392 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
393 inode = inode_map.inodes[inum]
396 for l in inode.links:
397 data = load_item(backend, l[1])
400 inode.links = [(b.id, b.location) for b in blocks]
402 inode_map.mark_updated(inum)
404 def run_cleaner(backend, inode_map, log):
405 # Determine which segments are poorly utilized and should be cleaned. We
406 # need better heuristics here.
407 for (s, u) in sorted(inode_map.util.segments.items()):
408 if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
409 print "Should clean segment", s
410 loc = backend.name_to_loc(s)
411 if s: inode_map.obsolete_segments.add(loc)
413 # TODO: We probably also want heuristics that will find inodes with
414 # badly-fragmented data and rewrite that to achieve better locality.
416 # Given that list of segments to clean, scan through those segments to find
417 # data which is still live and mark relevant inodes as needing to be
420 dirty_inode_data = set()
421 for s in inode_map.obsolete_segments:
422 filename = backend.loc_to_name(s)
423 print "Scanning", filename, "for live data"
424 for item in parse_log(backend.read(filename), filename):
425 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
427 inode = inode_map.inodes[item.inum]
428 if s == inode.location[0:2]:
429 dirty_inodes.add(item.inum)
430 if item.inum not in dirty_inode_data:
431 for b in inode.links:
433 dirty_inode_data.add(item.inum)
436 print "Inodes to rewrite:", dirty_inodes
437 print "Inodes with data to rewrite:", dirty_inode_data
438 for i in sorted(dirty_inodes.union(dirty_inode_data)):
439 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
441 if __name__ == '__main__':
442 backend = S3Backend("mvrable-bluesky", cachedir=".")
443 chkpt = load_checkpoint_record(backend)
446 imap.build(backend, chkpt)
449 log_dir = LogDirectory(backend, 0)
450 run_cleaner(backend, imap, log_dir)
451 imap.write(backend, log_dir)