3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
14 # The BlueSky 'struct cloudlog_header' data type.
15 HEADER_FORMAT = '<4sb16sQIII'
17 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
26 """An interface to BlueSky where the log segments are on local disk.
28 This is mainly intended for testing purposes, as the real cleaner would
29 operate where data is being stored in S3."""
31 def __init__(self, path):
35 """Return a listing of all log segments and their sizes."""
37 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
40 return [(f, os.stat(os.path.join(self.path, f)).st_size)
43 def read(self, filename):
44 fp = open(os.path.join(self.path, filename), 'rb')
47 def write(self, filename, data):
48 fp = open(os.path.join(self.path, filename), 'wb')
52 def loc_to_name(self, location):
53 return "log-%08d-%08d" % (location)
55 def name_to_loc(self, name):
56 m = re.match(r"^log-(\d+)-(\d+)$", name)
57 if m: return (int(m.group(1)), int(m.group(2)))
60 """In-memory representation of a single item stored in a log file."""
63 return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
67 return open('/dev/urandom').read(16)
72 for (i, l) in self.links:
75 link_locs.append(struct.pack('<IIII', *l))
76 link_ids = ''.join(link_ids)
77 link_locs = ''.join(link_locs)
79 header = struct.pack(HEADER_FORMAT,
80 HEADER_MAGIC, ord(self.type), self.id, self.inum,
81 len(self.data), len(link_ids), len(link_locs))
82 return header + self.data + link_ids + link_locs
85 def __init__(self, backend, location):
86 self.backend = backend
87 self.location = location
91 return sum(len(s) for s in self.data)
93 def write(self, item):
94 data = item.serialize()
96 self.data.append(data)
97 item.location = self.location + (offset, len(data))
100 data = ''.join(self.data)
101 filename = self.backend.loc_to_name(self.location)
102 print "Would write %d bytes of data to %s" % (len(data), filename)
103 self.backend.write(filename, data)
106 TARGET_SIZE = 4 << 20
108 def __init__(self, backend, dir):
109 self.backend = backend
112 for logname in backend.list():
113 loc = backend.name_to_loc(logname[0])
114 if loc is not None and loc[0] == dir:
115 self.seq_num = max(self.seq_num, loc[1] + 1)
117 print "Starting sequence number is", self.seq_num
119 def open_segment(self):
120 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
124 def write(self, item, segment_group=0):
125 if segment_group not in self.groups:
126 self.groups[segment_group] = self.open_segment()
127 seg = self.groups[segment_group]
129 if len(seg) >= LogDirectory.TARGET_SIZE:
131 del self.groups[segment_group]
134 for k in list(self.groups.keys()):
135 self.groups[k].close()
138 class UtilizationTracker:
139 """A simple object that tracks what fraction of each segment is used.
141 This data can be used to guide segment cleaning decisions."""
143 def __init__(self, backend):
145 for (segment, size) in backend.list():
146 self.segments[segment] = [size, 0]
148 def add_item(self, item):
149 if isinstance(item, LogItem):
151 if item is None: return
152 (dir, seq, offset, size) = item
153 filename = "log-%08d-%08d" % (dir, seq)
154 self.segments[filename][1] += size
156 def parse_item(data):
157 if len(data) < HEADER_SIZE: return
158 header = struct.unpack_from(HEADER_FORMAT, data, 0)
159 size = HEADER_SIZE + sum(header[4:7])
161 if header[0] != HEADER_MAGIC:
162 print "Bad header magic!"
165 if len(data) != size:
166 print "Item size does not match!"
171 item.inum = header[3]
173 item.type = chr(header[1])
175 item.data = data[HEADER_SIZE : HEADER_SIZE + header[4]]
177 link_ids = data[HEADER_SIZE + header[4]
178 : HEADER_SIZE + header[4] + header[5]]
179 link_locs = data[HEADER_SIZE + header[4] + header[5]
180 : HEADER_SIZE + sum(header[4:7])]
181 for i in range(len(link_ids) // 16):
182 id = link_ids[16*i : 16*i + 16]
186 loc = struct.unpack('<IIII', link_locs[0:16])
187 link_locs = link_locs[16:]
188 links.append((id, loc))
192 def load_item(backend, location):
193 """Load the cloud item pointed at by the 4-tuple 'location'.
195 The elements of the tuple are (directory, sequence, offset, size)."""
197 filename = backend.loc_to_name((location[0], location[1]))
198 data = backend.read(filename)[location[2] : location[2] + location[3]]
199 item = parse_item(data)
200 item.location = location
203 def parse_log(data, location=None):
204 """Parse contents of a log file, yielding a sequence of log items."""
206 if isinstance(location, str):
207 m = re.match(r"^log-(\d+)-(\d+)$", location)
209 location = (int(m.group(1)), int(m.group(2)))
214 while len(data) - offset >= HEADER_SIZE:
215 header = struct.unpack_from(HEADER_FORMAT, data, offset)
216 size = HEADER_SIZE + sum(header[4:7])
217 if header[0] != HEADER_MAGIC:
218 print "Bad header magic!"
220 if size + offset > len(data):
221 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
223 item = parse_item(data[offset : offset + size])
224 if location is not None:
225 item.location = location + (offset, size)
226 if item is not None: yield item
229 def load_checkpoint_record(backend):
230 for (log, size) in reversed(backend.list()):
231 for item in reversed(list(parse_log(backend.read(log), log))):
232 if item.type == ITEM_TYPE.CHECKPOINT:
239 def build(self, backend, checkpoint_record):
240 """Reconstruct the inode map from the checkpoint record given.
242 This will also build up information about segment utilization."""
244 self.checkpoint_record = checkpoint_record
246 util = UtilizationTracker(backend)
247 util.add_item(checkpoint_record)
249 self.obsolete_segments = set()
252 for i in range(len(checkpoint_record.data) // 16):
253 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
254 imap = load_item(backend, checkpoint_record.links[i][1])
256 print "[%d, %d]: %s" % (start, end, imap)
257 for j in range(len(imap.data) // 8):
258 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
259 inode = load_item(backend, imap.links[j][1])
261 data_segments = set()
263 for i in inode.links:
265 data_segments.add(i[1][0:2])
266 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
269 print "Segment utilizations:"
270 for (s, u) in sorted(util.segments.items()):
272 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
276 self.updated_inodes = set()
278 def mark_updated(self, inum):
279 self.updated_inodes.add(inum)
281 def write(self, backend, log):
282 updated_inodes = sorted(self.updated_inodes, reverse=True)
284 new_checkpoint = LogItem()
285 new_checkpoint.id = LogItem.random_id()
286 new_checkpoint.inum = 0
287 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
288 new_checkpoint.data = ""
289 new_checkpoint.links = []
291 for i in range(len(self.checkpoint_record.data) // 16):
292 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
294 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
296 # Case 1: No inodes in this range of the old inode map have
297 # changed. Simply emit a new pointer to the same inode map block.
298 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
299 old_location = self.checkpoint_record.links[i][1][0:2]
300 if old_location not in self.obsolete_segments:
301 new_checkpoint.links.append(self.checkpoint_record.links[i])
304 # Case 2: Some inodes have been updated. Create a new inode map
305 # block, write it out, and point the new checkpoint at it.
306 inodes = [k for k in self.inodes if k >= start and k <= end]
310 block.id = LogItem.random_id()
312 block.type = ITEM_TYPE.INODE_MAP
316 block.data += struct.pack("<Q", j)
317 block.links.append((self.inodes[j].id, self.inodes[j].location))
320 new_checkpoint.links.append((block.id, block.location))
322 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
325 log.write(new_checkpoint, 2)
326 self.checkpoint_record = new_checkpoint
328 def rewrite_inode(backend, inode_map, inum, log):
329 inode = inode_map.inodes[inum]
331 for l in inode.links:
332 data = load_item(backend, l[1])
335 inode.links = [(b.id, b.location) for b in blocks]
337 inode_map.mark_updated(inum)
339 def run_cleaner(backend, inode_map, log):
340 # Determine which segments are poorly utilized and should be cleaned. We
341 # need better heuristics here.
342 for (s, u) in sorted(inode_map.util.segments.items()):
343 if float(u[1]) / u[0] < 0.99 and u[1] > 0:
344 print "Should clean segment", s
345 loc = backend.name_to_loc(s)
346 if s: inode_map.obsolete_segments.add(loc)
348 # Given that list of segments to clean, scan through those segments to find
349 # data which is still live and mark relevant inodes as needing to be
352 for s in inode_map.obsolete_segments:
353 filename = backend.loc_to_name(s)
354 print "Scanning", filename, "for live data"
355 for item in parse_log(backend.read(filename), filename):
356 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
358 dirty_inodes.add(item.inum)
360 print "Inodes to rewrite:", dirty_inodes
361 for i in sorted(dirty_inodes):
362 rewrite_inode(backend, inode_map, i, log)
364 if __name__ == '__main__':
365 backend = FileBackend(".")
366 chkpt = load_checkpoint_record(backend)
368 imap.build(backend, chkpt)
371 log_dir = LogDirectory(backend, 0)
372 run_cleaner(backend, imap, log_dir)
373 imap.write(backend, log_dir)