3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4sb16sQIII'
18 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
27 """An interface to BlueSky where the log segments are on local disk.
29 This is mainly intended for testing purposes, as the real cleaner would
30 operate where data is being stored in S3."""
32 def __init__(self, path):
36 """Return a listing of all log segments and their sizes."""
38 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
41 return [(f, os.stat(os.path.join(self.path, f)).st_size)
44 def read(self, filename):
45 fp = open(os.path.join(self.path, filename), 'rb')
48 def write(self, filename, data):
49 fp = open(os.path.join(self.path, filename), 'wb')
53 def delete(self, filename):
54 os.unlink(os.path.join(self.path, filename))
56 def loc_to_name(self, location):
57 return "log-%08d-%08d" % (location)
59 def name_to_loc(self, name):
60 m = re.match(r"^log-(\d+)-(\d+)$", name)
61 if m: return (int(m.group(1)), int(m.group(2)))
64 """An interface to BlueSky where the log segments are on in Amazon S3."""
66 def __init__(self, bucket, path='', cachedir="."):
67 self.conn = boto.connect_s3(is_secure=False)
68 self.bucket = self.conn.get_bucket(bucket)
70 self.cachedir = cachedir
75 for k in self.bucket.list(self.path + 'log-'):
76 files.append((k.key, k.size))
79 def read(self, filename):
80 if filename in self.cache:
81 fp = open(os.path.join(self.cachedir, filename), 'rb')
85 k.key = self.path + filename
86 data = k.get_contents_as_string()
87 fp = open(os.path.join(self.cachedir, filename), 'wb')
90 self.cache[filename] = True
93 def write(self, filename, data):
95 k.key = self.path + filename
96 k.set_contents_from_string(data)
97 if filename in self.cache:
98 del self.cache[filename]
100 def delete(self, filename):
102 k.key = self.path + filename
104 if filename in self.cache:
105 del self.cache[filename]
107 def loc_to_name(self, location):
108 return "log-%08d-%08d" % (location)
110 def name_to_loc(self, name):
111 m = re.match(r"^log-(\d+)-(\d+)$", name)
112 if m: return (int(m.group(1)), int(m.group(2)))
115 """In-memory representation of a single item stored in a log file."""
118 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
122 return open('/dev/urandom').read(16)
127 for (i, l) in self.links:
130 link_locs.append(struct.pack('<IIII', *l))
131 link_ids = ''.join(link_ids)
132 link_locs = ''.join(link_locs)
134 header = struct.pack(HEADER_FORMAT,
135 HEADER_MAGIC, ord(self.type), self.id, self.inum,
136 len(self.data), len(link_ids), len(link_locs))
137 return header + self.data + link_ids + link_locs
140 def __init__(self, backend, location):
141 self.backend = backend
142 self.location = location
146 return sum(len(s) for s in self.data)
148 def write(self, item):
149 data = item.serialize()
151 self.data.append(data)
152 item.location = self.location + (offset, len(data))
155 data = ''.join(self.data)
156 filename = self.backend.loc_to_name(self.location)
157 print "Would write %d bytes of data to %s" % (len(data), filename)
158 self.backend.write(filename, data)
161 TARGET_SIZE = 4 << 20
163 def __init__(self, backend, dir):
164 self.backend = backend
167 for logname in backend.list():
168 loc = backend.name_to_loc(logname[0])
169 if loc is not None and loc[0] == dir:
170 self.seq_num = max(self.seq_num, loc[1] + 1)
172 print "Starting sequence number is", self.seq_num
174 def open_segment(self):
175 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
179 def write(self, item, segment_group=0):
180 if segment_group not in self.groups:
181 self.groups[segment_group] = self.open_segment()
182 seg = self.groups[segment_group]
184 if len(seg) >= LogDirectory.TARGET_SIZE:
186 del self.groups[segment_group]
189 for k in list(self.groups.keys()):
190 self.groups[k].close()
193 class UtilizationTracker:
194 """A simple object that tracks what fraction of each segment is used.
196 This data can be used to guide segment cleaning decisions."""
198 def __init__(self, backend):
200 for (segment, size) in backend.list():
201 self.segments[segment] = [size, 0]
203 def add_item(self, item):
204 if isinstance(item, LogItem):
206 if item is None: return
207 (dir, seq, offset, size) = item
208 filename = "log-%08d-%08d" % (dir, seq)
209 self.segments[filename][1] += size
211 def parse_item(data):
212 if len(data) < HEADER_SIZE: return
213 header = struct.unpack_from(HEADER_FORMAT, data, 0)
214 size = HEADER_SIZE + sum(header[4:7])
216 if header[0] != HEADER_MAGIC:
217 print "Bad header magic!"
220 if len(data) != size:
221 print "Item size does not match!"
226 item.inum = header[3]
228 item.type = chr(header[1])
230 item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
232 link_ids = data[HEADER_SIZE + header[4]
233 : HEADER_SIZE + header[4] + header[5]]
234 link_locs = data[HEADER_SIZE + header[4] + header[5]
235 : HEADER_SIZE + sum(header[4:7])]
236 for i in range(len(link_ids) // 16):
237 id = link_ids[16*i : 16*i + 16]
241 loc = struct.unpack('<IIII', link_locs[0:16])
242 link_locs = link_locs[16:]
243 links.append((id, loc))
247 def load_item(backend, location):
248 """Load the cloud item pointed at by the 4-tuple 'location'.
250 The elements of the tuple are (directory, sequence, offset, size)."""
252 filename = backend.loc_to_name((location[0], location[1]))
253 data = backend.read(filename)[location[2] : location[2] + location[3]]
254 item = parse_item(data)
255 item.location = location
258 def parse_log(data, location=None):
259 """Parse contents of a log file, yielding a sequence of log items."""
261 if isinstance(location, str):
262 m = re.match(r"^log-(\d+)-(\d+)$", location)
264 location = (int(m.group(1)), int(m.group(2)))
269 while len(data) - offset >= HEADER_SIZE:
270 header = struct.unpack_from(HEADER_FORMAT, data, offset)
271 size = HEADER_SIZE + sum(header[4:7])
272 if header[0] != HEADER_MAGIC:
273 print "Bad header magic!"
275 if size + offset > len(data):
276 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
278 item = parse_item(data[offset : offset + size])
279 if location is not None:
280 item.location = location + (offset, size)
281 if item is not None: yield item
284 def load_checkpoint_record(backend):
285 for (log, size) in reversed(backend.list()):
286 for item in reversed(list(parse_log(backend.read(log), log))):
288 if item.type == ITEM_TYPE.CHECKPOINT:
295 def build(self, backend, checkpoint_record):
296 """Reconstruct the inode map from the checkpoint record given.
298 This will also build up information about segment utilization."""
300 self.checkpoint_record = checkpoint_record
302 util = UtilizationTracker(backend)
303 util.add_item(checkpoint_record)
305 self.obsolete_segments = set()
308 for i in range(len(checkpoint_record.data) // 16):
309 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
310 imap = load_item(backend, checkpoint_record.links[i][1])
312 print "[%d, %d]: %s" % (start, end, imap)
313 for j in range(len(imap.data) // 8):
314 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
315 inode = load_item(backend, imap.links[j][1])
317 data_segments = set()
319 for i in inode.links:
321 data_segments.add(i[1][0:2])
322 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
325 print "Segment utilizations:"
326 for (s, u) in sorted(util.segments.items()):
327 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
334 self.updated_inodes = set()
336 def mark_updated(self, inum):
337 self.updated_inodes.add(inum)
339 def write(self, backend, log):
340 updated_inodes = sorted(self.updated_inodes, reverse=True)
342 new_checkpoint = LogItem()
343 new_checkpoint.id = LogItem.random_id()
344 new_checkpoint.inum = 0
345 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
346 new_checkpoint.data = ""
347 new_checkpoint.links = []
349 for i in range(len(self.checkpoint_record.data) // 16):
350 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
352 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
354 # Case 1: No inodes in this range of the old inode map have
355 # changed. Simply emit a new pointer to the same inode map block.
356 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
357 old_location = self.checkpoint_record.links[i][1][0:2]
358 if old_location not in self.obsolete_segments:
359 new_checkpoint.links.append(self.checkpoint_record.links[i])
362 # Case 2: Some inodes have been updated. Create a new inode map
363 # block, write it out, and point the new checkpoint at it.
364 inodes = [k for k in self.inodes if k >= start and k <= end]
368 block.id = LogItem.random_id()
370 block.type = ITEM_TYPE.INODE_MAP
374 block.data += struct.pack("<Q", j)
375 block.links.append((self.inodes[j].id, self.inodes[j].location))
378 new_checkpoint.links.append((block.id, block.location))
380 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
383 log.write(new_checkpoint, 2)
384 self.checkpoint_record = new_checkpoint
386 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
387 inode = inode_map.inodes[inum]
390 for l in inode.links:
391 data = load_item(backend, l[1])
394 inode.links = [(b.id, b.location) for b in blocks]
396 inode_map.mark_updated(inum)
398 def run_cleaner(backend, inode_map, log):
399 # Determine which segments are poorly utilized and should be cleaned. We
400 # need better heuristics here.
401 for (s, u) in sorted(inode_map.util.segments.items()):
402 if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
403 print "Should clean segment", s
404 loc = backend.name_to_loc(s)
405 if s: inode_map.obsolete_segments.add(loc)
407 # TODO: We probably also want heuristics that will find inodes with
408 # badly-fragmented data and rewrite that to achieve better locality.
410 # Given that list of segments to clean, scan through those segments to find
411 # data which is still live and mark relevant inodes as needing to be
414 dirty_inode_data = set()
415 for s in inode_map.obsolete_segments:
416 filename = backend.loc_to_name(s)
417 print "Scanning", filename, "for live data"
418 for item in parse_log(backend.read(filename), filename):
419 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
421 inode = inode_map.inodes[item.inum]
422 if s == inode.location[0:2]:
423 dirty_inodes.add(item.inum)
424 if item.inum not in dirty_inode_data:
425 for b in inode.links:
427 dirty_inode_data.add(item.inum)
430 print "Inodes to rewrite:", dirty_inodes
431 print "Inodes with data to rewrite:", dirty_inode_data
432 for i in sorted(dirty_inodes.union(dirty_inode_data)):
433 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
435 if __name__ == '__main__':
436 backend = S3Backend("mvrable-bluesky", cachedir=".")
437 chkpt = load_checkpoint_record(backend)
440 imap.build(backend, chkpt)
443 log_dir = LogDirectory(backend, 0)
444 run_cleaner(backend, imap, log_dir)
445 imap.write(backend, log_dir)