3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
19 HEADER_MAGIC2 = 'AgI=' # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
29 """An interface to BlueSky where the log segments are on local disk.
31 This is mainly intended for testing purposes, as the real cleaner would
32 operate where data is being stored in S3."""
34 def __init__(self, path):
38 """Return a listing of all log segments and their sizes."""
40 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
43 return [(f, os.stat(os.path.join(self.path, f)).st_size)
46 def read(self, filename, offset=0, length=None):
47 fp = open(os.path.join(self.path, filename), 'rb')
53 return fp.read(length)
55 def write(self, filename, data):
56 fp = open(os.path.join(self.path, filename), 'wb')
60 def delete(self, filename):
61 os.unlink(os.path.join(self.path, filename))
63 def loc_to_name(self, location):
64 return "log-%08d-%08d" % (location)
66 def name_to_loc(self, name):
67 m = re.match(r"^log-(\d+)-(\d+)$", name)
68 if m: return (int(m.group(1)), int(m.group(2)))
71 """An interface to BlueSky where the log segments are on in Amazon S3."""
73 def __init__(self, bucket, path='', cachedir="."):
74 self.conn = boto.connect_s3(is_secure=False)
75 self.bucket = self.conn.get_bucket(bucket)
77 self.cachedir = cachedir
82 for k in self.bucket.list(self.path + 'log-'):
83 files.append((k.key, k.size))
86 def read(self, filename, offset=0, length=None):
87 if filename in self.cache:
88 fp = open(os.path.join(self.cachedir, filename), 'rb')
94 return fp.read(length)
97 k.key = self.path + filename
98 data = k.get_contents_as_string()
99 fp = open(os.path.join(self.cachedir, filename), 'wb')
102 self.cache[filename] = True
105 if length is not None:
106 data = data[0:length]
109 def write(self, filename, data):
111 k.key = self.path + filename
112 k.set_contents_from_string(data)
113 if filename in self.cache:
114 del self.cache[filename]
116 def delete(self, filename):
118 k.key = self.path + filename
120 if filename in self.cache:
121 del self.cache[filename]
123 def loc_to_name(self, location):
124 return "log-%08d-%08d" % (location)
126 def name_to_loc(self, name):
127 m = re.match(r"^log-(\d+)-(\d+)$", name)
128 if m: return (int(m.group(1)), int(m.group(2)))
131 """In-memory representation of a single item stored in a log file."""
134 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
135 self.encrypted = False
138 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
142 return open('/dev/urandom').read(16)
147 for (i, l) in self.links:
150 link_locs.append(struct.pack('<IIII', *l))
151 link_ids = ''.join(link_ids)
152 link_locs = ''.join(link_locs)
155 magic = HEADER_MAGIC2
157 magic = HEADER_MAGIC1
158 header = struct.pack(HEADER_FORMAT,
159 magic, self.cryptkeys,
160 ord(self.type), self.id, self.inum,
161 len(self.data), len(link_ids), len(link_locs))
162 return header + self.data + link_ids + link_locs
165 def __init__(self, backend, location):
166 self.backend = backend
167 self.location = location
171 return sum(len(s) for s in self.data)
173 def write(self, item):
174 data = item.serialize()
176 self.data.append(data)
177 item.location = self.location + (offset, len(data))
180 data = ''.join(self.data)
181 filename = self.backend.loc_to_name(self.location)
182 print "Would write %d bytes of data to %s" % (len(data), filename)
183 self.backend.write(filename, data)
186 TARGET_SIZE = 4 << 20
188 def __init__(self, backend, dir):
189 self.backend = backend
192 for logname in backend.list():
193 loc = backend.name_to_loc(logname[0])
194 if loc is not None and loc[0] == dir:
195 self.seq_num = max(self.seq_num, loc[1] + 1)
197 print "Starting sequence number is", self.seq_num
199 def open_segment(self):
200 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
204 def write(self, item, segment_group=0):
205 if segment_group not in self.groups:
206 self.groups[segment_group] = self.open_segment()
207 seg = self.groups[segment_group]
209 if len(seg) >= LogDirectory.TARGET_SIZE:
211 del self.groups[segment_group]
214 for k in list(self.groups.keys()):
215 self.groups[k].close()
218 class UtilizationTracker:
219 """A simple object that tracks what fraction of each segment is used.
221 This data can be used to guide segment cleaning decisions."""
223 def __init__(self, backend):
225 for (segment, size) in backend.list():
226 self.segments[segment] = [size, 0]
228 def add_item(self, item):
229 if isinstance(item, LogItem):
231 if item is None: return
232 (dir, seq, offset, size) = item
233 filename = "log-%08d-%08d" % (dir, seq)
234 self.segments[filename][1] += size
236 def parse_item(data):
237 if len(data) < HEADER_SIZE: return
238 header = struct.unpack_from(HEADER_FORMAT, data, 0)
239 size = HEADER_SIZE + sum(header[5:8])
241 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
242 print "Bad header magic!"
245 if len(data) != size:
246 print "Item size does not match: %d != %d" % (size, len(data))
250 if header[0] == HEADER_MAGIC2: item.encrypted = True
251 item.cryptkeys = header[1]
253 item.inum = header[4]
255 item.type = chr(header[2])
257 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
259 link_ids = data[HEADER_SIZE + header[5]
260 : HEADER_SIZE + header[5] + header[6]]
261 link_locs = data[HEADER_SIZE + header[5] + header[6]
262 : HEADER_SIZE + sum(header[5:8])]
263 for i in range(len(link_ids) // 16):
264 id = link_ids[16*i : 16*i + 16]
268 loc = struct.unpack('<IIII', link_locs[0:16])
269 link_locs = link_locs[16:]
270 links.append((id, loc))
274 def load_item(backend, location):
275 """Load the cloud item pointed at by the 4-tuple 'location'.
277 The elements of the tuple are (directory, sequence, offset, size)."""
279 filename = backend.loc_to_name((location[0], location[1]))
280 data = backend.read(filename, location[2], location[3])
281 item = parse_item(data)
282 item.location = location
285 def parse_log(data, location=None):
286 """Parse contents of a log file, yielding a sequence of log items."""
288 if isinstance(location, str):
289 m = re.match(r"^log-(\d+)-(\d+)$", location)
291 location = (int(m.group(1)), int(m.group(2)))
296 while len(data) - offset >= HEADER_SIZE:
297 header = struct.unpack_from(HEADER_FORMAT, data, offset)
298 size = HEADER_SIZE + sum(header[5:8])
299 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
300 print "Bad header magic!"
302 if size + offset > len(data):
303 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
305 item = parse_item(data[offset : offset + size])
306 if location is not None:
307 item.location = location + (offset, size)
308 if item is not None: yield item
311 def load_checkpoint_record(backend):
312 for (log, size) in reversed(backend.list()):
313 for item in reversed(list(parse_log(backend.read(log), log))):
315 if item.type == ITEM_TYPE.CHECKPOINT:
322 def build(self, backend, checkpoint_record):
323 """Reconstruct the inode map from the checkpoint record given.
325 This will also build up information about segment utilization."""
327 self.checkpoint_record = checkpoint_record
329 util = UtilizationTracker(backend)
330 util.add_item(checkpoint_record)
332 self.obsolete_segments = set()
335 for i in range(len(checkpoint_record.data) // 16):
336 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
337 imap = load_item(backend, checkpoint_record.links[i][1])
339 print "[%d, %d]: %s" % (start, end, imap)
340 for j in range(len(imap.data) // 8):
341 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
342 inode = load_item(backend, imap.links[j][1])
344 data_segments = set()
346 for i in inode.links:
348 data_segments.add(i[1][0:2])
349 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
352 print "Segment utilizations:"
353 for (s, u) in sorted(util.segments.items()):
354 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
361 self.updated_inodes = set()
363 def mark_updated(self, inum):
364 self.updated_inodes.add(inum)
366 def write(self, backend, log):
367 updated_inodes = sorted(self.updated_inodes, reverse=True)
369 new_checkpoint = LogItem()
370 new_checkpoint.id = LogItem.random_id()
371 new_checkpoint.inum = 0
372 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
373 new_checkpoint.data = ""
374 new_checkpoint.links = []
376 for i in range(len(self.checkpoint_record.data) // 16):
377 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
379 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
381 # Case 1: No inodes in this range of the old inode map have
382 # changed. Simply emit a new pointer to the same inode map block.
383 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
384 old_location = self.checkpoint_record.links[i][1][0:2]
385 if old_location not in self.obsolete_segments:
386 new_checkpoint.links.append(self.checkpoint_record.links[i])
389 # Case 2: Some inodes have been updated. Create a new inode map
390 # block, write it out, and point the new checkpoint at it.
391 inodes = [k for k in self.inodes if k >= start and k <= end]
395 block.id = LogItem.random_id()
397 block.type = ITEM_TYPE.INODE_MAP
401 block.data += struct.pack("<Q", j)
402 block.links.append((self.inodes[j].id, self.inodes[j].location))
405 new_checkpoint.links.append((block.id, block.location))
407 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
410 log.write(new_checkpoint, 2)
411 self.checkpoint_record = new_checkpoint
413 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
414 inode = inode_map.inodes[inum]
417 for l in inode.links:
418 data = load_item(backend, l[1])
421 inode.links = [(b.id, b.location) for b in blocks]
423 inode_map.mark_updated(inum)
425 def run_cleaner(backend, inode_map, log, repack_inodes=False):
426 # Determine which segments are poorly utilized and should be cleaned. We
427 # need better heuristics here.
428 for (s, u) in sorted(inode_map.util.segments.items()):
429 if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
430 print "Should clean segment", s
431 loc = backend.name_to_loc(s)
432 if s: inode_map.obsolete_segments.add(loc)
434 # TODO: We probably also want heuristics that will find inodes with
435 # badly-fragmented data and rewrite that to achieve better locality.
437 # Given that list of segments to clean, scan through those segments to find
438 # data which is still live and mark relevant inodes as needing to be
441 dirty_inodes = set(inode_map.inodes)
444 dirty_inode_data = set()
445 for s in inode_map.obsolete_segments:
446 filename = backend.loc_to_name(s)
447 print "Scanning", filename, "for live data"
448 for item in parse_log(backend.read(filename), filename):
449 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
451 inode = inode_map.inodes[item.inum]
452 if s == inode.location[0:2]:
453 dirty_inodes.add(item.inum)
454 if item.inum not in dirty_inode_data:
455 for b in inode.links:
457 dirty_inode_data.add(item.inum)
460 print "Inodes to rewrite:", dirty_inodes
461 print "Inodes with data to rewrite:", dirty_inode_data
462 for i in sorted(dirty_inodes.union(dirty_inode_data)):
463 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
465 if __name__ == '__main__':
466 backend = S3Backend("mvrable-bluesky", cachedir=".")
467 chkpt = load_checkpoint_record(backend)
470 imap.build(backend, chkpt)
473 log_dir = LogDirectory(backend, 0)
474 run_cleaner(backend, imap, log_dir)
475 imap.write(backend, log_dir)