3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4sb16sQIII'
18 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
27 """An interface to BlueSky where the log segments are on local disk.
29 This is mainly intended for testing purposes, as the real cleaner would
30 operate where data is being stored in S3."""
32 def __init__(self, path):
36 """Return a listing of all log segments and their sizes."""
38 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
41 return [(f, os.stat(os.path.join(self.path, f)).st_size)
44 def read(self, filename):
45 fp = open(os.path.join(self.path, filename), 'rb')
48 def write(self, filename, data):
49 fp = open(os.path.join(self.path, filename), 'wb')
53 def delete(self, filename):
54 os.unlink(os.path.join(self.path, filename))
56 def loc_to_name(self, location):
57 return "log-%08d-%08d" % (location)
59 def name_to_loc(self, name):
60 m = re.match(r"^log-(\d+)-(\d+)$", name)
61 if m: return (int(m.group(1)), int(m.group(2)))
64 """An interface to BlueSky where the log segments are on in Amazon S3."""
66 def __init__(self, bucket, path='', cachedir=None):
67 self.conn = boto.connect_s3(is_secure=False)
68 self.bucket = self.conn.get_bucket(bucket)
73 for k in self.bucket.list(self.path + 'log-'):
74 files.append((k.key, k.size))
77 def read(self, filename):
79 k.key = self.path + filename
80 return k.get_contents_as_string()
82 def write(self, filename, data):
84 k.key = self.path + filename
85 k.set_contents_from_string(data)
87 def delete(self, filename):
89 k.key = self.path + filename
92 def loc_to_name(self, location):
93 return "log-%08d-%08d" % (location)
95 def name_to_loc(self, name):
96 m = re.match(r"^log-(\d+)-(\d+)$", name)
97 if m: return (int(m.group(1)), int(m.group(2)))
100 """In-memory representation of a single item stored in a log file."""
103 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
107 return open('/dev/urandom').read(16)
112 for (i, l) in self.links:
115 link_locs.append(struct.pack('<IIII', *l))
116 link_ids = ''.join(link_ids)
117 link_locs = ''.join(link_locs)
119 header = struct.pack(HEADER_FORMAT,
120 HEADER_MAGIC, ord(self.type), self.id, self.inum,
121 len(self.data), len(link_ids), len(link_locs))
122 return header + self.data + link_ids + link_locs
125 def __init__(self, backend, location):
126 self.backend = backend
127 self.location = location
131 return sum(len(s) for s in self.data)
133 def write(self, item):
134 data = item.serialize()
136 self.data.append(data)
137 item.location = self.location + (offset, len(data))
140 data = ''.join(self.data)
141 filename = self.backend.loc_to_name(self.location)
142 print "Would write %d bytes of data to %s" % (len(data), filename)
143 self.backend.write(filename, data)
146 TARGET_SIZE = 4 << 20
148 def __init__(self, backend, dir):
149 self.backend = backend
152 for logname in backend.list():
153 loc = backend.name_to_loc(logname[0])
154 if loc is not None and loc[0] == dir:
155 self.seq_num = max(self.seq_num, loc[1] + 1)
157 print "Starting sequence number is", self.seq_num
159 def open_segment(self):
160 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
164 def write(self, item, segment_group=0):
165 if segment_group not in self.groups:
166 self.groups[segment_group] = self.open_segment()
167 seg = self.groups[segment_group]
169 if len(seg) >= LogDirectory.TARGET_SIZE:
171 del self.groups[segment_group]
174 for k in list(self.groups.keys()):
175 self.groups[k].close()
178 class UtilizationTracker:
179 """A simple object that tracks what fraction of each segment is used.
181 This data can be used to guide segment cleaning decisions."""
183 def __init__(self, backend):
185 for (segment, size) in backend.list():
186 self.segments[segment] = [size, 0]
188 def add_item(self, item):
189 if isinstance(item, LogItem):
191 if item is None: return
192 (dir, seq, offset, size) = item
193 filename = "log-%08d-%08d" % (dir, seq)
194 self.segments[filename][1] += size
196 def parse_item(data):
197 if len(data) < HEADER_SIZE: return
198 header = struct.unpack_from(HEADER_FORMAT, data, 0)
199 size = HEADER_SIZE + sum(header[4:7])
201 if header[0] != HEADER_MAGIC:
202 print "Bad header magic!"
205 if len(data) != size:
206 print "Item size does not match!"
211 item.inum = header[3]
213 item.type = chr(header[1])
215 item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
217 link_ids = data[HEADER_SIZE + header[4]
218 : HEADER_SIZE + header[4] + header[5]]
219 link_locs = data[HEADER_SIZE + header[4] + header[5]
220 : HEADER_SIZE + sum(header[4:7])]
221 for i in range(len(link_ids) // 16):
222 id = link_ids[16*i : 16*i + 16]
226 loc = struct.unpack('<IIII', link_locs[0:16])
227 link_locs = link_locs[16:]
228 links.append((id, loc))
232 def load_item(backend, location):
233 """Load the cloud item pointed at by the 4-tuple 'location'.
235 The elements of the tuple are (directory, sequence, offset, size)."""
237 filename = backend.loc_to_name((location[0], location[1]))
238 data = backend.read(filename)[location[2] : location[2] + location[3]]
239 item = parse_item(data)
240 item.location = location
243 def parse_log(data, location=None):
244 """Parse contents of a log file, yielding a sequence of log items."""
246 if isinstance(location, str):
247 m = re.match(r"^log-(\d+)-(\d+)$", location)
249 location = (int(m.group(1)), int(m.group(2)))
254 while len(data) - offset >= HEADER_SIZE:
255 header = struct.unpack_from(HEADER_FORMAT, data, offset)
256 size = HEADER_SIZE + sum(header[4:7])
257 if header[0] != HEADER_MAGIC:
258 print "Bad header magic!"
260 if size + offset > len(data):
261 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
263 item = parse_item(data[offset : offset + size])
264 if location is not None:
265 item.location = location + (offset, size)
266 if item is not None: yield item
269 def load_checkpoint_record(backend):
270 for (log, size) in reversed(backend.list()):
271 for item in reversed(list(parse_log(backend.read(log), log))):
273 if item.type == ITEM_TYPE.CHECKPOINT:
280 def build(self, backend, checkpoint_record):
281 """Reconstruct the inode map from the checkpoint record given.
283 This will also build up information about segment utilization."""
285 self.checkpoint_record = checkpoint_record
287 util = UtilizationTracker(backend)
288 util.add_item(checkpoint_record)
290 self.obsolete_segments = set()
293 for i in range(len(checkpoint_record.data) // 16):
294 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
295 imap = load_item(backend, checkpoint_record.links[i][1])
297 print "[%d, %d]: %s" % (start, end, imap)
298 for j in range(len(imap.data) // 8):
299 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
300 inode = load_item(backend, imap.links[j][1])
302 data_segments = set()
304 for i in inode.links:
306 data_segments.add(i[1][0:2])
307 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
310 print "Segment utilizations:"
311 for (s, u) in sorted(util.segments.items()):
312 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
319 self.updated_inodes = set()
321 def mark_updated(self, inum):
322 self.updated_inodes.add(inum)
324 def write(self, backend, log):
325 updated_inodes = sorted(self.updated_inodes, reverse=True)
327 new_checkpoint = LogItem()
328 new_checkpoint.id = LogItem.random_id()
329 new_checkpoint.inum = 0
330 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
331 new_checkpoint.data = ""
332 new_checkpoint.links = []
334 for i in range(len(self.checkpoint_record.data) // 16):
335 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
337 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
339 # Case 1: No inodes in this range of the old inode map have
340 # changed. Simply emit a new pointer to the same inode map block.
341 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
342 old_location = self.checkpoint_record.links[i][1][0:2]
343 if old_location not in self.obsolete_segments:
344 new_checkpoint.links.append(self.checkpoint_record.links[i])
347 # Case 2: Some inodes have been updated. Create a new inode map
348 # block, write it out, and point the new checkpoint at it.
349 inodes = [k for k in self.inodes if k >= start and k <= end]
353 block.id = LogItem.random_id()
355 block.type = ITEM_TYPE.INODE_MAP
359 block.data += struct.pack("<Q", j)
360 block.links.append((self.inodes[j].id, self.inodes[j].location))
363 new_checkpoint.links.append((block.id, block.location))
365 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
368 log.write(new_checkpoint, 2)
369 self.checkpoint_record = new_checkpoint
371 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
372 inode = inode_map.inodes[inum]
375 for l in inode.links:
376 data = load_item(backend, l[1])
379 inode.links = [(b.id, b.location) for b in blocks]
381 inode_map.mark_updated(inum)
383 def run_cleaner(backend, inode_map, log):
384 # Determine which segments are poorly utilized and should be cleaned. We
385 # need better heuristics here.
386 for (s, u) in sorted(inode_map.util.segments.items()):
387 if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
388 print "Should clean segment", s
389 loc = backend.name_to_loc(s)
390 if s: inode_map.obsolete_segments.add(loc)
392 # TODO: We probably also want heuristics that will find inodes with
393 # badly-fragmented data and rewrite that to achieve better locality.
395 # Given that list of segments to clean, scan through those segments to find
396 # data which is still live and mark relevant inodes as needing to be
399 dirty_inode_data = set()
400 for s in inode_map.obsolete_segments:
401 filename = backend.loc_to_name(s)
402 print "Scanning", filename, "for live data"
403 for item in parse_log(backend.read(filename), filename):
404 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
406 inode = inode_map.inodes[item.inum]
407 if s == inode.location[0:2]:
408 dirty_inodes.add(item.inum)
409 if item.inum not in dirty_inode_data:
410 for b in inode.links:
412 dirty_inode_data.add(item.inum)
415 print "Inodes to rewrite:", dirty_inodes
416 print "Inodes with data to rewrite:", dirty_inode_data
417 for i in sorted(dirty_inodes.union(dirty_inode_data)):
418 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
420 if __name__ == '__main__':
421 backend = S3Backend("mvrable-bluesky", cachedir=".")
422 chkpt = load_checkpoint_record(backend)
425 imap.build(backend, chkpt)
428 log_dir = LogDirectory(backend, 0)
429 run_cleaner(backend, imap, log_dir)
430 imap.write(backend, log_dir)