3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
26 """An interface to BlueSky where the log segments are on local disk.
28 This is mainly intended for testing purposes, as the real cleaner would
29 operate where data is being stored in S3."""
31 def __init__(self, path):
35 """Return a listing of all log segments and their sizes."""
37 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
40 return [(f, os.stat(os.path.join(self.path, f)).st_size)
43 def read(self, filename):
44 fp = open(os.path.join(self.path, filename), 'rb')
47 def write(self, filename, data):
48 fp = open(os.path.join(self.path, filename), 'wb')
53 """In-memory representation of a single item stored in a log file."""
56 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
60 return open('/dev/urandom').read(16)
65 for (i, l) in self.links:
68 link_locs.append(struct.pack('<IIII', *l))
69 link_ids = ''.join(link_ids)
70 link_locs = ''.join(link_locs)
72 header = struct.pack(HEADER_FORMAT,
73 HEADER_MAGIC, ord(self.type), self.id, self.inum,
74 len(self.data), len(link_ids), len(link_locs))
75 return header + self.data + link_ids + link_locs
78 def __init__(self, backend, location):
79 self.backend = backend
80 self.location = location
84 return sum(len(s) for s in self.data)
86 def write(self, item):
87 data = item.serialize()
89 self.data.append(data)
90 item.location = self.location + (offset, len(data))
93 data = ''.join(self.data)
94 filename = "log-%08d-%08d" % (self.location)
95 print "Would write %d bytes of data to %s" % (len(data), filename)
96 self.backend.write(filename, data)
101 def __init__(self, backend, dir, seq):
102 self.backend = backend
107 def open_segment(self):
108 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
112 def write(self, item, segment_group=0):
113 if segment_group not in self.groups:
114 self.groups[segment_group] = self.open_segment()
115 seg = self.groups[segment_group]
117 if len(seg) >= LogDirectory.TARGET_SIZE:
119 del self.groups[segment_group]
122 for k in list(self.groups.keys()):
123 self.groups[k].close()
126 class UtilizationTracker:
127 """A simple object that tracks what fraction of each segment is used.
129 This data can be used to guide segment cleaning decisions."""
131 def __init__(self, backend):
133 for (segment, size) in backend.list():
134 self.segments[segment] = [size, 0]
136 def add_item(self, item):
137 if isinstance(item, LogItem):
139 if item is None: return
140 (dir, seq, offset, size) = item
141 filename = "log-%08d-%08d" % (dir, seq)
142 self.segments[filename][1] += size
144 def parse_item(data):
145 if len(data) < HEADER_SIZE: return
146 header = struct.unpack_from(HEADER_FORMAT, data, 0)
147 size = HEADER_SIZE + sum(header[4:7])
149 if header[0] != HEADER_MAGIC:
150 print "Bad header magic!"
153 if len(data) != size:
154 print "Item size does not match!"
159 item.inum = header[3]
161 item.type = chr(header[1])
163 item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
165 link_ids = data[HEADER_SIZE + header[4]
166 : HEADER_SIZE + header[4] + header[5]]
167 link_locs = data[HEADER_SIZE + header[4] + header[5]
168 : HEADER_SIZE + sum(header[4:7])]
169 for i in range(len(link_ids) // 16):
170 id = link_ids[16*i : 16*i + 16]
174 loc = struct.unpack('<IIII', link_locs[0:16])
175 link_locs = link_locs[16:]
176 links.append((id, loc))
180 def load_item(backend, location):
181 """Load the cloud item pointed at by the 4-tuple 'location'.
183 The elements of the tuple are (directory, sequence, offset, size)."""
185 filename = "log-%08d-%08d" % (location[0], location[1])
186 data = backend.read(filename)[location[2] : location[2] + location[3]]
187 item = parse_item(data)
188 item.location = location
191 def parse_log(data, location=None):
192 """Parse contents of a log file, yielding a sequence of log items."""
194 if isinstance(location, str):
195 m = re.match(r"^log-(\d+)-(\d+)$", location)
197 location = (int(m.group(1)), int(m.group(2)))
202 while len(data) - offset >= HEADER_SIZE:
203 header = struct.unpack_from(HEADER_FORMAT, data, offset)
204 size = HEADER_SIZE + sum(header[4:7])
205 if header[0] != HEADER_MAGIC:
206 print "Bad header magic!"
208 if size + offset > len(data):
209 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
211 item = parse_item(data[offset : offset + size])
212 if location is not None:
213 item.location = location + (offset, size)
214 if item is not None: yield item
217 def load_checkpoint_record(backend):
218 for (log, size) in reversed(backend.list()):
219 for item in reversed(list(parse_log(backend.read(log), log))):
220 if item.type == ITEM_TYPE.CHECKPOINT:
227 def build(self, backend, checkpoint_record):
228 """Reconstruct the inode map from the checkpoint record given.
230 This will also build up information about segment utilization."""
232 self.checkpoint_record = checkpoint_record
234 util = UtilizationTracker(backend)
235 util.add_item(checkpoint_record)
237 self.obsolete_segments = set()
240 for i in range(len(checkpoint_record.data) // 16):
241 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
242 imap = load_item(backend, checkpoint_record.links[i][1])
244 print "[%d, %d]: %s" % (start, end, imap)
245 for j in range(len(imap.data) // 8):
246 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
247 inode = load_item(backend, imap.links[j][1])
249 data_segments = set()
251 for i in inode.links:
253 data_segments.add(i[1][0:2])
254 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
257 print "Segment utilizations:"
258 for (s, u) in sorted(util.segments.items()):
260 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
264 self.updated_inodes = set()
266 def mark_updated(self, inum):
267 self.updated_inodes.add(inum)
269 def write(self, backend, log):
270 updated_inodes = sorted(self.updated_inodes, reverse=True)
272 new_checkpoint = LogItem()
273 new_checkpoint.id = LogItem.random_id()
274 new_checkpoint.inum = 0
275 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
276 new_checkpoint.data = ""
277 new_checkpoint.links = []
279 for i in range(len(self.checkpoint_record.data) // 16):
280 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
282 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
284 # Case 1: No inodes in this range of the old inode map have
285 # changed. Simply emit a new pointer to the same inode map block.
286 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
287 old_location = self.checkpoint_record.links[i][1][0:2]
288 if old_location not in self.obsolete_segments:
289 new_checkpoint.links.append(self.checkpoint_record.links[i])
292 # Case 2: Some inodes have been updated. Create a new inode map
293 # block, write it out, and point the new checkpoint at it.
294 inodes = [k for k in self.inodes if k >= start and k <= end]
298 block.id = LogItem.random_id()
300 block.type = ITEM_TYPE.INODE_MAP
304 block.data += struct.pack("<Q", j)
305 block.links.append((self.inodes[j].id, self.inodes[j].location))
308 new_checkpoint.links.append((block.id, block.location))
310 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
313 log.write(new_checkpoint, 2)
314 self.checkpoint_record = new_checkpoint
316 def rewrite_inode(backend, inode_map, inum, log):
317 inode = inode_map.inodes[inum]
319 for l in inode.links:
320 data = load_item(backend, l[1])
323 inode.links = [(b.id, b.location) for b in blocks]
325 inode_map.mark_updated(inum)
327 def run_cleaner(backend, inode_map, log):
328 # Determine which segments are poorly utilized and should be cleaned. We
329 # need better heuristics here.
330 for (s, u) in sorted(inode_map.util.segments.items()):
331 if float(u[1]) / u[0] < 0.99 and u[1] > 0:
332 print "Should clean segment", s
333 m = re.match(r"^log-(\d+)-(\d+)$", s)
334 if m: inode_map.obsolete_segments.add((int(m.group(1)), int(m.group(2))))
336 # Given that list of segments to clean, scan through those segments to find
337 # data which is still live and mark relevant inodes as needing to be
340 for s in inode_map.obsolete_segments:
341 filename = "log-%08d-%08d" % s
342 print "Scanning", filename, "for live data"
343 for item in parse_log(backend.read(filename), filename):
344 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
346 dirty_inodes.add(item.inum)
348 print "Inodes to rewrite:", dirty_inodes
349 for i in sorted(dirty_inodes):
350 rewrite_inode(backend, inode_map, i, log)
352 if __name__ == '__main__':
353 backend = FileBackend(".")
354 chkpt = load_checkpoint_record(backend)
356 imap.build(backend, chkpt)
359 log_dir = LogDirectory(backend, 1, 0)
360 run_cleaner(backend, imap, log_dir)
361 imap.write(backend, log_dir)