3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
26 """An interface to BlueSky where the log segments are on local disk.
28 This is mainly intended for testing purposes, as the real cleaner would
29 operate where data is being stored in S3."""
31 def __init__(self, path):
35 """Return a listing of all log segments and their sizes."""
37 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
40 return [(f, os.stat(os.path.join(self.path, f)).st_size)
43 def read(self, filename):
44 fp = open(os.path.join(self.path, filename), 'rb')
47 def write(self, filename, data):
48 fp = open(os.path.join(self.path, filename), 'wb')
52 def loc_to_name(self, location):
53 return "log-%08d-%08d" % (location)
55 def name_to_loc(self, name):
56 m = re.match(r"^log-(\d+)-(\d+)$", name)
57 if m: return (int(m.group(1)), int(m.group(2)))
60 """In-memory representation of a single item stored in a log file."""
63 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
67 return open('/dev/urandom').read(16)
72 for (i, l) in self.links:
75 link_locs.append(struct.pack('<IIII', *l))
76 link_ids = ''.join(link_ids)
77 link_locs = ''.join(link_locs)
79 header = struct.pack(HEADER_FORMAT,
80 HEADER_MAGIC, ord(self.type), self.id, self.inum,
81 len(self.data), len(link_ids), len(link_locs))
82 return header + self.data + link_ids + link_locs
85 def __init__(self, backend, location):
86 self.backend = backend
87 self.location = location
91 return sum(len(s) for s in self.data)
93 def write(self, item):
94 data = item.serialize()
96 self.data.append(data)
97 item.location = self.location + (offset, len(data))
100 data = ''.join(self.data)
101 filename = self.backend.loc_to_name(self.location)
102 print "Would write %d bytes of data to %s" % (len(data), filename)
103 self.backend.write(filename, data)
106 TARGET_SIZE = 4 << 20
108 def __init__(self, backend, dir):
109 self.backend = backend
112 for logname in backend.list():
113 loc = backend.name_to_loc(logname[0])
114 if loc is not None and loc[0] == dir:
115 self.seq_num = max(self.seq_num, loc[1] + 1)
117 print "Starting sequence number is", self.seq_num
119 def open_segment(self):
120 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
124 def write(self, item, segment_group=0):
125 if segment_group not in self.groups:
126 self.groups[segment_group] = self.open_segment()
127 seg = self.groups[segment_group]
129 if len(seg) >= LogDirectory.TARGET_SIZE:
131 del self.groups[segment_group]
134 for k in list(self.groups.keys()):
135 self.groups[k].close()
138 class UtilizationTracker:
139 """A simple object that tracks what fraction of each segment is used.
141 This data can be used to guide segment cleaning decisions."""
143 def __init__(self, backend):
145 for (segment, size) in backend.list():
146 self.segments[segment] = [size, 0]
148 def add_item(self, item):
149 if isinstance(item, LogItem):
151 if item is None: return
152 (dir, seq, offset, size) = item
153 filename = "log-%08d-%08d" % (dir, seq)
154 self.segments[filename][1] += size
156 def parse_item(data):
157 if len(data) < HEADER_SIZE: return
158 header = struct.unpack_from(HEADER_FORMAT, data, 0)
159 size = HEADER_SIZE + sum(header[4:7])
161 if header[0] != HEADER_MAGIC:
162 print "Bad header magic!"
165 if len(data) != size:
166 print "Item size does not match!"
171 item.inum = header[3]
173 item.type = chr(header[1])
175 item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
177 link_ids = data[HEADER_SIZE + header[4]
178 : HEADER_SIZE + header[4] + header[5]]
179 link_locs = data[HEADER_SIZE + header[4] + header[5]
180 : HEADER_SIZE + sum(header[4:7])]
181 for i in range(len(link_ids) // 16):
182 id = link_ids[16*i : 16*i + 16]
186 loc = struct.unpack('<IIII', link_locs[0:16])
187 link_locs = link_locs[16:]
188 links.append((id, loc))
192 def load_item(backend, location):
193 """Load the cloud item pointed at by the 4-tuple 'location'.
195 The elements of the tuple are (directory, sequence, offset, size)."""
197 filename = backend.loc_to_name((location[0], location[1]))
198 data = backend.read(filename)[location[2] : location[2] + location[3]]
199 item = parse_item(data)
200 item.location = location
203 def parse_log(data, location=None):
204 """Parse contents of a log file, yielding a sequence of log items."""
206 if isinstance(location, str):
207 m = re.match(r"^log-(\d+)-(\d+)$", location)
209 location = (int(m.group(1)), int(m.group(2)))
214 while len(data) - offset >= HEADER_SIZE:
215 header = struct.unpack_from(HEADER_FORMAT, data, offset)
216 size = HEADER_SIZE + sum(header[4:7])
217 if header[0] != HEADER_MAGIC:
218 print "Bad header magic!"
220 if size + offset > len(data):
221 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
223 item = parse_item(data[offset : offset + size])
224 if location is not None:
225 item.location = location + (offset, size)
226 if item is not None: yield item
229 def load_checkpoint_record(backend):
230 for (log, size) in reversed(backend.list()):
231 for item in reversed(list(parse_log(backend.read(log), log))):
232 if item.type == ITEM_TYPE.CHECKPOINT:
239 def build(self, backend, checkpoint_record):
240 """Reconstruct the inode map from the checkpoint record given.
242 This will also build up information about segment utilization."""
244 self.checkpoint_record = checkpoint_record
246 util = UtilizationTracker(backend)
247 util.add_item(checkpoint_record)
249 self.obsolete_segments = set()
252 for i in range(len(checkpoint_record.data) // 16):
253 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
254 imap = load_item(backend, checkpoint_record.links[i][1])
256 print "[%d, %d]: %s" % (start, end, imap)
257 for j in range(len(imap.data) // 8):
258 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
259 inode = load_item(backend, imap.links[j][1])
261 data_segments = set()
263 for i in inode.links:
265 data_segments.add(i[1][0:2])
266 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
269 print "Segment utilizations:"
270 for (s, u) in sorted(util.segments.items()):
272 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
276 self.updated_inodes = set()
278 def mark_updated(self, inum):
279 self.updated_inodes.add(inum)
281 def write(self, backend, log):
282 updated_inodes = sorted(self.updated_inodes, reverse=True)
284 new_checkpoint = LogItem()
285 new_checkpoint.id = LogItem.random_id()
286 new_checkpoint.inum = 0
287 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
288 new_checkpoint.data = ""
289 new_checkpoint.links = []
291 for i in range(len(self.checkpoint_record.data) // 16):
292 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
294 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
296 # Case 1: No inodes in this range of the old inode map have
297 # changed. Simply emit a new pointer to the same inode map block.
298 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
299 old_location = self.checkpoint_record.links[i][1][0:2]
300 if old_location not in self.obsolete_segments:
301 new_checkpoint.links.append(self.checkpoint_record.links[i])
304 # Case 2: Some inodes have been updated. Create a new inode map
305 # block, write it out, and point the new checkpoint at it.
306 inodes = [k for k in self.inodes if k >= start and k <= end]
310 block.id = LogItem.random_id()
312 block.type = ITEM_TYPE.INODE_MAP
316 block.data += struct.pack("<Q", j)
317 block.links.append((self.inodes[j].id, self.inodes[j].location))
320 new_checkpoint.links.append((block.id, block.location))
322 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
325 log.write(new_checkpoint, 2)
326 self.checkpoint_record = new_checkpoint
328 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
329 inode = inode_map.inodes[inum]
332 for l in inode.links:
333 data = load_item(backend, l[1])
336 inode.links = [(b.id, b.location) for b in blocks]
338 inode_map.mark_updated(inum)
340 def run_cleaner(backend, inode_map, log):
341 # Determine which segments are poorly utilized and should be cleaned. We
342 # need better heuristics here.
343 for (s, u) in sorted(inode_map.util.segments.items()):
344 if float(u[1]) / u[0] < 0.95 and u[1] > 0:
345 print "Should clean segment", s
346 loc = backend.name_to_loc(s)
347 if s: inode_map.obsolete_segments.add(loc)
349 # TODO: We probably also want heuristics that will find inodes with
350 # badly-fragmented data and rewrite that to achieve better locality.
352 # Given that list of segments to clean, scan through those segments to find
353 # data which is still live and mark relevant inodes as needing to be
356 dirty_inode_data = set()
357 for s in inode_map.obsolete_segments:
358 filename = backend.loc_to_name(s)
359 print "Scanning", filename, "for live data"
360 for item in parse_log(backend.read(filename), filename):
361 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
363 inode = inode_map.inodes[item.inum]
364 if s == inode.location[0:2]:
365 dirty_inodes.add(item.inum)
366 if item.inum not in dirty_inode_data:
367 for b in inode.links:
369 dirty_inode_data.add(item.inum)
372 print "Inodes to rewrite:", dirty_inodes
373 print "Inodes with data to rewrite:", dirty_inode_data
374 for i in sorted(dirty_inodes.union(dirty_inode_data)):
375 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
377 if __name__ == '__main__':
378 backend = FileBackend(".")
379 chkpt = load_checkpoint_record(backend)
381 imap.build(backend, chkpt)
384 log_dir = LogDirectory(backend, 1)
385 run_cleaner(backend, imap, log_dir)
386 imap.write(backend, log_dir)