3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
19 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
28 """An interface to BlueSky where the log segments are on local disk.
30 This is mainly intended for testing purposes, as the real cleaner would
31 operate where data is being stored in S3."""
33 def __init__(self, path):
37 """Return a listing of all log segments and their sizes."""
39 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
42 return [(f, os.stat(os.path.join(self.path, f)).st_size)
45 def read(self, filename, offset=0, length=None):
46 fp = open(os.path.join(self.path, filename), 'rb')
52 return fp.read(length)
54 def write(self, filename, data):
55 fp = open(os.path.join(self.path, filename), 'wb')
59 def delete(self, filename):
60 os.unlink(os.path.join(self.path, filename))
62 def loc_to_name(self, location):
63 return "log-%08d-%08d" % (location)
65 def name_to_loc(self, name):
66 m = re.match(r"^log-(\d+)-(\d+)$", name)
67 if m: return (int(m.group(1)), int(m.group(2)))
70 """An interface to BlueSky where the log segments are on in Amazon S3."""
72 def __init__(self, bucket, path='', cachedir="."):
73 self.conn = boto.connect_s3(is_secure=False)
74 self.bucket = self.conn.get_bucket(bucket)
76 self.cachedir = cachedir
81 for k in self.bucket.list(self.path + 'log-'):
82 files.append((k.key, k.size))
85 def read(self, filename, offset=0, length=None):
86 if filename in self.cache:
87 fp = open(os.path.join(self.cachedir, filename), 'rb')
93 return fp.read(length)
96 k.key = self.path + filename
97 data = k.get_contents_as_string()
98 fp = open(os.path.join(self.cachedir, filename), 'wb')
101 self.cache[filename] = True
104 if length is not None:
105 data = data[0:length]
108 def write(self, filename, data):
110 k.key = self.path + filename
111 k.set_contents_from_string(data)
112 if filename in self.cache:
113 del self.cache[filename]
115 def delete(self, filename):
117 k.key = self.path + filename
119 if filename in self.cache:
120 del self.cache[filename]
122 def loc_to_name(self, location):
123 return "log-%08d-%08d" % (location)
125 def name_to_loc(self, name):
126 m = re.match(r"^log-(\d+)-(\d+)$", name)
127 if m: return (int(m.group(1)), int(m.group(2)))
130 """In-memory representation of a single item stored in a log file."""
133 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
136 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
140 return open('/dev/urandom').read(16)
145 for (i, l) in self.links:
148 link_locs.append(struct.pack('<IIII', *l))
149 link_ids = ''.join(link_ids)
150 link_locs = ''.join(link_locs)
152 header = struct.pack(HEADER_FORMAT,
153 HEADER_MAGIC, self.cryptkeys,
154 ord(self.type), self.id, self.inum,
155 len(self.data), len(link_ids), len(link_locs))
156 return header + self.data + link_ids + link_locs
159 def __init__(self, backend, location):
160 self.backend = backend
161 self.location = location
165 return sum(len(s) for s in self.data)
167 def write(self, item):
168 data = item.serialize()
170 self.data.append(data)
171 item.location = self.location + (offset, len(data))
174 data = ''.join(self.data)
175 filename = self.backend.loc_to_name(self.location)
176 print "Would write %d bytes of data to %s" % (len(data), filename)
177 self.backend.write(filename, data)
180 TARGET_SIZE = 4 << 20
182 def __init__(self, backend, dir):
183 self.backend = backend
186 for logname in backend.list():
187 loc = backend.name_to_loc(logname[0])
188 if loc is not None and loc[0] == dir:
189 self.seq_num = max(self.seq_num, loc[1] + 1)
191 print "Starting sequence number is", self.seq_num
193 def open_segment(self):
194 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
198 def write(self, item, segment_group=0):
199 if segment_group not in self.groups:
200 self.groups[segment_group] = self.open_segment()
201 seg = self.groups[segment_group]
203 if len(seg) >= LogDirectory.TARGET_SIZE:
205 del self.groups[segment_group]
208 for k in list(self.groups.keys()):
209 self.groups[k].close()
212 class UtilizationTracker:
213 """A simple object that tracks what fraction of each segment is used.
215 This data can be used to guide segment cleaning decisions."""
217 def __init__(self, backend):
219 for (segment, size) in backend.list():
220 self.segments[segment] = [size, 0]
222 def add_item(self, item):
223 if isinstance(item, LogItem):
225 if item is None: return
226 (dir, seq, offset, size) = item
227 filename = "log-%08d-%08d" % (dir, seq)
228 self.segments[filename][1] += size
230 def parse_item(data):
231 if len(data) < HEADER_SIZE: return
232 header = struct.unpack_from(HEADER_FORMAT, data, 0)
233 size = HEADER_SIZE + sum(header[5:8])
235 if header[0] != HEADER_MAGIC:
236 print "Bad header magic!"
239 if len(data) != size:
240 print "Item size does not match: %d != %d" % (size, len(data))
244 item.cryptkeys = header[1]
246 item.inum = header[4]
248 item.type = chr(header[2])
250 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
252 link_ids = data[HEADER_SIZE + header[5]
253 : HEADER_SIZE + header[5] + header[6]]
254 link_locs = data[HEADER_SIZE + header[5] + header[6]
255 : HEADER_SIZE + sum(header[5:8])]
256 for i in range(len(link_ids) // 16):
257 id = link_ids[16*i : 16*i + 16]
261 loc = struct.unpack('<IIII', link_locs[0:16])
262 link_locs = link_locs[16:]
263 links.append((id, loc))
267 def load_item(backend, location):
268 """Load the cloud item pointed at by the 4-tuple 'location'.
270 The elements of the tuple are (directory, sequence, offset, size)."""
272 filename = backend.loc_to_name((location[0], location[1]))
273 data = backend.read(filename, location[2], location[3])
274 item = parse_item(data)
275 item.location = location
278 def parse_log(data, location=None):
279 """Parse contents of a log file, yielding a sequence of log items."""
281 if isinstance(location, str):
282 m = re.match(r"^log-(\d+)-(\d+)$", location)
284 location = (int(m.group(1)), int(m.group(2)))
289 while len(data) - offset >= HEADER_SIZE:
290 header = struct.unpack_from(HEADER_FORMAT, data, offset)
291 size = HEADER_SIZE + sum(header[5:8])
292 if header[0] != HEADER_MAGIC:
293 print "Bad header magic!"
295 if size + offset > len(data):
296 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
298 item = parse_item(data[offset : offset + size])
299 if location is not None:
300 item.location = location + (offset, size)
301 if item is not None: yield item
304 def load_checkpoint_record(backend):
305 for (log, size) in reversed(backend.list()):
306 for item in reversed(list(parse_log(backend.read(log), log))):
308 if item.type == ITEM_TYPE.CHECKPOINT:
315 def build(self, backend, checkpoint_record):
316 """Reconstruct the inode map from the checkpoint record given.
318 This will also build up information about segment utilization."""
320 self.checkpoint_record = checkpoint_record
322 util = UtilizationTracker(backend)
323 util.add_item(checkpoint_record)
325 self.obsolete_segments = set()
328 for i in range(len(checkpoint_record.data) // 16):
329 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
330 imap = load_item(backend, checkpoint_record.links[i][1])
332 print "[%d, %d]: %s" % (start, end, imap)
333 for j in range(len(imap.data) // 8):
334 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
335 inode = load_item(backend, imap.links[j][1])
337 data_segments = set()
339 for i in inode.links:
341 data_segments.add(i[1][0:2])
342 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
345 print "Segment utilizations:"
346 for (s, u) in sorted(util.segments.items()):
347 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
354 self.updated_inodes = set()
356 def mark_updated(self, inum):
357 self.updated_inodes.add(inum)
359 def write(self, backend, log):
360 updated_inodes = sorted(self.updated_inodes, reverse=True)
362 new_checkpoint = LogItem()
363 new_checkpoint.id = LogItem.random_id()
364 new_checkpoint.inum = 0
365 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
366 new_checkpoint.data = ""
367 new_checkpoint.links = []
369 for i in range(len(self.checkpoint_record.data) // 16):
370 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
372 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
374 # Case 1: No inodes in this range of the old inode map have
375 # changed. Simply emit a new pointer to the same inode map block.
376 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
377 old_location = self.checkpoint_record.links[i][1][0:2]
378 if old_location not in self.obsolete_segments:
379 new_checkpoint.links.append(self.checkpoint_record.links[i])
382 # Case 2: Some inodes have been updated. Create a new inode map
383 # block, write it out, and point the new checkpoint at it.
384 inodes = [k for k in self.inodes if k >= start and k <= end]
388 block.id = LogItem.random_id()
390 block.type = ITEM_TYPE.INODE_MAP
394 block.data += struct.pack("<Q", j)
395 block.links.append((self.inodes[j].id, self.inodes[j].location))
398 new_checkpoint.links.append((block.id, block.location))
400 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
403 log.write(new_checkpoint, 2)
404 self.checkpoint_record = new_checkpoint
406 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
407 inode = inode_map.inodes[inum]
410 for l in inode.links:
411 data = load_item(backend, l[1])
414 inode.links = [(b.id, b.location) for b in blocks]
416 inode_map.mark_updated(inum)
418 def run_cleaner(backend, inode_map, log, repack_inodes=False):
419 # Determine which segments are poorly utilized and should be cleaned. We
420 # need better heuristics here.
421 for (s, u) in sorted(inode_map.util.segments.items()):
422 if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
423 print "Should clean segment", s
424 loc = backend.name_to_loc(s)
425 if s: inode_map.obsolete_segments.add(loc)
427 # TODO: We probably also want heuristics that will find inodes with
428 # badly-fragmented data and rewrite that to achieve better locality.
430 # Given that list of segments to clean, scan through those segments to find
431 # data which is still live and mark relevant inodes as needing to be
434 dirty_inodes = set(inode_map.inodes)
437 dirty_inode_data = set()
438 for s in inode_map.obsolete_segments:
439 filename = backend.loc_to_name(s)
440 print "Scanning", filename, "for live data"
441 for item in parse_log(backend.read(filename), filename):
442 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
444 inode = inode_map.inodes[item.inum]
445 if s == inode.location[0:2]:
446 dirty_inodes.add(item.inum)
447 if item.inum not in dirty_inode_data:
448 for b in inode.links:
450 dirty_inode_data.add(item.inum)
453 print "Inodes to rewrite:", dirty_inodes
454 print "Inodes with data to rewrite:", dirty_inode_data
455 for i in sorted(dirty_inodes.union(dirty_inode_data)):
456 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
458 if __name__ == '__main__':
459 backend = S3Backend("mvrable-bluesky", cachedir=".")
460 chkpt = load_checkpoint_record(backend)
463 imap.build(backend, chkpt)
466 log_dir = LogDirectory(backend, 0)
467 run_cleaner(backend, imap, log_dir)
468 imap.write(backend, log_dir)