3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
26 """An interface to BlueSky where the log segments are on local disk.
28 This is mainly intended for testing purposes, as the real cleaner would
29 operate where data is being stored in S3."""
31 def __init__(self, path):
35 """Return a listing of all log segments and their sizes."""
37 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
40 return [(f, os.stat(os.path.join(self.path, f)).st_size)
43 def read(self, filename):
44 fp = open(os.path.join(self.path, filename), 'rb')
47 def write(self, filename, data):
48 fp = open(os.path.join(self.path, filename), 'wb')
52 def delete(self, filename):
53 os.unlink(os.path.join(self.path, filename))
55 def loc_to_name(self, location):
56 return "log-%08d-%08d" % (location)
58 def name_to_loc(self, name):
59 m = re.match(r"^log-(\d+)-(\d+)$", name)
60 if m: return (int(m.group(1)), int(m.group(2)))
63 """In-memory representation of a single item stored in a log file."""
66 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
70 return open('/dev/urandom').read(16)
75 for (i, l) in self.links:
78 link_locs.append(struct.pack('<IIII', *l))
79 link_ids = ''.join(link_ids)
80 link_locs = ''.join(link_locs)
82 header = struct.pack(HEADER_FORMAT,
83 HEADER_MAGIC, ord(self.type), self.id, self.inum,
84 len(self.data), len(link_ids), len(link_locs))
85 return header + self.data + link_ids + link_locs
88 def __init__(self, backend, location):
89 self.backend = backend
90 self.location = location
94 return sum(len(s) for s in self.data)
96 def write(self, item):
97 data = item.serialize()
99 self.data.append(data)
100 item.location = self.location + (offset, len(data))
103 data = ''.join(self.data)
104 filename = self.backend.loc_to_name(self.location)
105 print "Would write %d bytes of data to %s" % (len(data), filename)
106 self.backend.write(filename, data)
109 TARGET_SIZE = 4 << 20
111 def __init__(self, backend, dir):
112 self.backend = backend
115 for logname in backend.list():
116 loc = backend.name_to_loc(logname[0])
117 if loc is not None and loc[0] == dir:
118 self.seq_num = max(self.seq_num, loc[1] + 1)
120 print "Starting sequence number is", self.seq_num
122 def open_segment(self):
123 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
127 def write(self, item, segment_group=0):
128 if segment_group not in self.groups:
129 self.groups[segment_group] = self.open_segment()
130 seg = self.groups[segment_group]
132 if len(seg) >= LogDirectory.TARGET_SIZE:
134 del self.groups[segment_group]
137 for k in list(self.groups.keys()):
138 self.groups[k].close()
141 class UtilizationTracker:
142 """A simple object that tracks what fraction of each segment is used.
144 This data can be used to guide segment cleaning decisions."""
146 def __init__(self, backend):
148 for (segment, size) in backend.list():
149 self.segments[segment] = [size, 0]
151 def add_item(self, item):
152 if isinstance(item, LogItem):
154 if item is None: return
155 (dir, seq, offset, size) = item
156 filename = "log-%08d-%08d" % (dir, seq)
157 self.segments[filename][1] += size
159 def parse_item(data):
160 if len(data) < HEADER_SIZE: return
161 header = struct.unpack_from(HEADER_FORMAT, data, 0)
162 size = HEADER_SIZE + sum(header[4:7])
164 if header[0] != HEADER_MAGIC:
165 print "Bad header magic!"
168 if len(data) != size:
169 print "Item size does not match!"
174 item.inum = header[3]
176 item.type = chr(header[1])
178 item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
180 link_ids = data[HEADER_SIZE + header[4]
181 : HEADER_SIZE + header[4] + header[5]]
182 link_locs = data[HEADER_SIZE + header[4] + header[5]
183 : HEADER_SIZE + sum(header[4:7])]
184 for i in range(len(link_ids) // 16):
185 id = link_ids[16*i : 16*i + 16]
189 loc = struct.unpack('<IIII', link_locs[0:16])
190 link_locs = link_locs[16:]
191 links.append((id, loc))
195 def load_item(backend, location):
196 """Load the cloud item pointed at by the 4-tuple 'location'.
198 The elements of the tuple are (directory, sequence, offset, size)."""
200 filename = backend.loc_to_name((location[0], location[1]))
201 data = backend.read(filename)[location[2] : location[2] + location[3]]
202 item = parse_item(data)
203 item.location = location
206 def parse_log(data, location=None):
207 """Parse contents of a log file, yielding a sequence of log items."""
209 if isinstance(location, str):
210 m = re.match(r"^log-(\d+)-(\d+)$", location)
212 location = (int(m.group(1)), int(m.group(2)))
217 while len(data) - offset >= HEADER_SIZE:
218 header = struct.unpack_from(HEADER_FORMAT, data, offset)
219 size = HEADER_SIZE + sum(header[4:7])
220 if header[0] != HEADER_MAGIC:
221 print "Bad header magic!"
223 if size + offset > len(data):
224 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
226 item = parse_item(data[offset : offset + size])
227 if location is not None:
228 item.location = location + (offset, size)
229 if item is not None: yield item
232 def load_checkpoint_record(backend):
233 for (log, size) in reversed(backend.list()):
234 for item in reversed(list(parse_log(backend.read(log), log))):
235 if item.type == ITEM_TYPE.CHECKPOINT:
242 def build(self, backend, checkpoint_record):
243 """Reconstruct the inode map from the checkpoint record given.
245 This will also build up information about segment utilization."""
247 self.checkpoint_record = checkpoint_record
249 util = UtilizationTracker(backend)
250 util.add_item(checkpoint_record)
252 self.obsolete_segments = set()
255 for i in range(len(checkpoint_record.data) // 16):
256 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
257 imap = load_item(backend, checkpoint_record.links[i][1])
259 print "[%d, %d]: %s" % (start, end, imap)
260 for j in range(len(imap.data) // 8):
261 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
262 inode = load_item(backend, imap.links[j][1])
264 data_segments = set()
266 for i in inode.links:
268 data_segments.add(i[1][0:2])
269 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
272 print "Segment utilizations:"
273 for (s, u) in sorted(util.segments.items()):
274 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
281 self.updated_inodes = set()
283 def mark_updated(self, inum):
284 self.updated_inodes.add(inum)
286 def write(self, backend, log):
287 updated_inodes = sorted(self.updated_inodes, reverse=True)
289 new_checkpoint = LogItem()
290 new_checkpoint.id = LogItem.random_id()
291 new_checkpoint.inum = 0
292 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
293 new_checkpoint.data = ""
294 new_checkpoint.links = []
296 for i in range(len(self.checkpoint_record.data) // 16):
297 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
299 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
301 # Case 1: No inodes in this range of the old inode map have
302 # changed. Simply emit a new pointer to the same inode map block.
303 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
304 old_location = self.checkpoint_record.links[i][1][0:2]
305 if old_location not in self.obsolete_segments:
306 new_checkpoint.links.append(self.checkpoint_record.links[i])
309 # Case 2: Some inodes have been updated. Create a new inode map
310 # block, write it out, and point the new checkpoint at it.
311 inodes = [k for k in self.inodes if k >= start and k <= end]
315 block.id = LogItem.random_id()
317 block.type = ITEM_TYPE.INODE_MAP
321 block.data += struct.pack("<Q", j)
322 block.links.append((self.inodes[j].id, self.inodes[j].location))
325 new_checkpoint.links.append((block.id, block.location))
327 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
330 log.write(new_checkpoint, 2)
331 self.checkpoint_record = new_checkpoint
333 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
334 inode = inode_map.inodes[inum]
337 for l in inode.links:
338 data = load_item(backend, l[1])
341 inode.links = [(b.id, b.location) for b in blocks]
343 inode_map.mark_updated(inum)
345 def run_cleaner(backend, inode_map, log):
346 # Determine which segments are poorly utilized and should be cleaned. We
347 # need better heuristics here.
348 for (s, u) in sorted(inode_map.util.segments.items()):
349 if float(u[1]) / u[0] < 0.6 and u[1] > 0:
350 print "Should clean segment", s
351 loc = backend.name_to_loc(s)
352 if s: inode_map.obsolete_segments.add(loc)
354 # TODO: We probably also want heuristics that will find inodes with
355 # badly-fragmented data and rewrite that to achieve better locality.
357 # Given that list of segments to clean, scan through those segments to find
358 # data which is still live and mark relevant inodes as needing to be
361 dirty_inode_data = set()
362 for s in inode_map.obsolete_segments:
363 filename = backend.loc_to_name(s)
364 print "Scanning", filename, "for live data"
365 for item in parse_log(backend.read(filename), filename):
366 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
368 inode = inode_map.inodes[item.inum]
369 if s == inode.location[0:2]:
370 dirty_inodes.add(item.inum)
371 if item.inum not in dirty_inode_data:
372 for b in inode.links:
374 dirty_inode_data.add(item.inum)
377 print "Inodes to rewrite:", dirty_inodes
378 print "Inodes with data to rewrite:", dirty_inode_data
379 for i in sorted(dirty_inodes.union(dirty_inode_data)):
380 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
382 if __name__ == '__main__':
383 backend = FileBackend(".")
384 chkpt = load_checkpoint_record(backend)
386 imap.build(backend, chkpt)
389 log_dir = LogDirectory(backend, 0)
390 run_cleaner(backend, imap, log_dir)
391 imap.write(backend, log_dir)