3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
19 HEADER_MAGIC2 = 'AgI=' # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
29 """An interface to BlueSky where the log segments are on local disk.
31 This is mainly intended for testing purposes, as the real cleaner would
32 operate where data is being stored in S3."""
34 def __init__(self, path):
38 """Return a listing of all log segments and their sizes."""
40 files = [f for f in os.listdir(self.path) if f.startswith('log-')]
43 return [(f, os.stat(os.path.join(self.path, f)).st_size)
46 def read(self, filename, offset=0, length=None):
47 fp = open(os.path.join(self.path, filename), 'rb')
53 return fp.read(length)
55 def write(self, filename, data):
56 fp = open(os.path.join(self.path, filename), 'wb')
60 def delete(self, filename):
61 os.unlink(os.path.join(self.path, filename))
63 def loc_to_name(self, location):
64 return "log-%08d-%08d" % (location)
66 def name_to_loc(self, name):
67 m = re.match(r"^log-(\d+)-(\d+)$", name)
68 if m: return (int(m.group(1)), int(m.group(2)))
70 def retry_wrap(method):
71 def wrapped(self, *args, **kwargs):
72 for retries in range(3):
74 return method(self, *args, **kwargs)
76 print >>sys.stderr, "S3 operation failed, retrying..."
78 return method(self, *args, **kwargs)
82 """An interface to BlueSky where the log segments are on in Amazon S3."""
84 def __init__(self, bucket, path='', cachedir="."):
85 self.bucket_name = bucket
87 self.cachedir = cachedir
92 self.conn = boto.connect_s3(is_secure=False)
93 self.bucket = self.conn.get_bucket(self.bucket_name)
97 for k in self.bucket.list(self.path + 'log-'):
98 files.append((k.key, k.size))
102 def read(self, filename, offset=0, length=None):
103 if filename in self.cache:
104 fp = open(os.path.join(self.cachedir, filename), 'rb')
110 return fp.read(length)
113 k.key = self.path + filename
114 data = k.get_contents_as_string()
115 fp = open(os.path.join(self.cachedir, filename), 'wb')
118 self.cache[filename] = True
121 if length is not None:
122 data = data[0:length]
126 def write(self, filename, data):
128 k.key = self.path + filename
129 k.set_contents_from_string(data)
130 if filename in self.cache:
131 del self.cache[filename]
134 def delete(self, filename):
136 k.key = self.path + filename
138 if filename in self.cache:
139 del self.cache[filename]
141 def loc_to_name(self, location):
142 return "log-%08d-%08d" % (location)
144 def name_to_loc(self, name):
145 m = re.match(r"^log-(\d+)-(\d+)$", name)
146 if m: return (int(m.group(1)), int(m.group(2)))
149 """In-memory representation of a single item stored in a log file."""
152 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
153 self.encrypted = False
156 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
160 return open('/dev/urandom').read(16)
165 for (i, l) in self.links:
168 link_locs.append(struct.pack('<IIII', *l))
169 link_ids = ''.join(link_ids)
170 link_locs = ''.join(link_locs)
173 magic = HEADER_MAGIC2
175 magic = HEADER_MAGIC1
176 header = struct.pack(HEADER_FORMAT,
177 magic, self.cryptkeys,
178 ord(self.type), self.id, self.inum,
179 len(self.data), len(link_ids), len(link_locs))
180 return header + self.data + link_ids + link_locs
183 def __init__(self, backend, location):
184 self.backend = backend
185 self.location = location
189 return sum(len(s) for s in self.data)
191 def write(self, item):
192 data = item.serialize()
194 self.data.append(data)
195 item.location = self.location + (offset, len(data))
198 data = ''.join(self.data)
199 filename = self.backend.loc_to_name(self.location)
200 print "Would write %d bytes of data to %s" % (len(data), filename)
201 self.backend.write(filename, data)
204 TARGET_SIZE = 4 << 20
206 def __init__(self, backend, dir):
207 self.backend = backend
210 for logname in backend.list():
211 loc = backend.name_to_loc(logname[0])
212 if loc is not None and loc[0] == dir:
213 self.seq_num = max(self.seq_num, loc[1] + 1)
215 print "Starting sequence number is", self.seq_num
217 def open_segment(self):
218 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
222 def write(self, item, segment_group=0):
223 if segment_group not in self.groups:
224 self.groups[segment_group] = self.open_segment()
225 seg = self.groups[segment_group]
227 if len(seg) >= LogDirectory.TARGET_SIZE:
229 del self.groups[segment_group]
232 for k in list(self.groups.keys()):
233 self.groups[k].close()
236 class UtilizationTracker:
237 """A simple object that tracks what fraction of each segment is used.
239 This data can be used to guide segment cleaning decisions."""
241 def __init__(self, backend):
243 for (segment, size) in backend.list():
244 self.segments[segment] = [size, 0]
246 def add_item(self, item):
247 if isinstance(item, LogItem):
249 if item is None: return
250 (dir, seq, offset, size) = item
251 filename = "log-%08d-%08d" % (dir, seq)
252 self.segments[filename][1] += size
254 def parse_item(data):
255 if len(data) < HEADER_SIZE: return
256 header = struct.unpack_from(HEADER_FORMAT, data, 0)
257 size = HEADER_SIZE + sum(header[5:8])
259 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
260 print "Bad header magic!"
263 if len(data) != size:
264 print "Item size does not match: %d != %d" % (size, len(data))
268 if header[0] == HEADER_MAGIC2: item.encrypted = True
269 item.cryptkeys = header[1]
271 item.inum = header[4]
273 item.type = chr(header[2])
275 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
277 link_ids = data[HEADER_SIZE + header[5]
278 : HEADER_SIZE + header[5] + header[6]]
279 link_locs = data[HEADER_SIZE + header[5] + header[6]
280 : HEADER_SIZE + sum(header[5:8])]
281 for i in range(len(link_ids) // 16):
282 id = link_ids[16*i : 16*i + 16]
286 loc = struct.unpack('<IIII', link_locs[0:16])
287 link_locs = link_locs[16:]
288 links.append((id, loc))
292 def load_item(backend, location):
293 """Load the cloud item pointed at by the 4-tuple 'location'.
295 The elements of the tuple are (directory, sequence, offset, size)."""
297 filename = backend.loc_to_name((location[0], location[1]))
298 data = backend.read(filename, location[2], location[3])
299 item = parse_item(data)
300 item.location = location
303 def parse_log(data, location=None):
304 """Parse contents of a log file, yielding a sequence of log items."""
306 if isinstance(location, str):
307 m = re.match(r"^log-(\d+)-(\d+)$", location)
309 location = (int(m.group(1)), int(m.group(2)))
314 while len(data) - offset >= HEADER_SIZE:
315 header = struct.unpack_from(HEADER_FORMAT, data, offset)
316 size = HEADER_SIZE + sum(header[5:8])
317 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
318 print "Bad header magic!"
320 if size + offset > len(data):
321 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
323 item = parse_item(data[offset : offset + size])
324 if location is not None:
325 item.location = location + (offset, size)
326 if item is not None: yield item
329 def load_checkpoint_record(backend):
330 for (log, size) in reversed(backend.list()):
331 for item in reversed(list(parse_log(backend.read(log), log))):
333 if item.type == ITEM_TYPE.CHECKPOINT:
340 def build(self, backend, checkpoint_record):
341 """Reconstruct the inode map from the checkpoint record given.
343 This will also build up information about segment utilization."""
345 self.checkpoint_record = checkpoint_record
347 util = UtilizationTracker(backend)
348 util.add_item(checkpoint_record)
350 self.obsolete_segments = set()
353 for i in range(len(checkpoint_record.data) // 16):
354 (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
355 imap = load_item(backend, checkpoint_record.links[i][1])
357 print "[%d, %d]: %s" % (start, end, imap)
358 for j in range(len(imap.data) // 8):
359 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
360 inode = load_item(backend, imap.links[j][1])
362 data_segments = set()
364 for i in inode.links:
366 data_segments.add(i[1][0:2])
367 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
370 print "Segment utilizations:"
371 for (s, u) in sorted(util.segments.items()):
372 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
379 self.updated_inodes = set()
381 def mark_updated(self, inum):
382 self.updated_inodes.add(inum)
384 def write(self, backend, log):
385 updated_inodes = sorted(self.updated_inodes, reverse=True)
387 new_checkpoint = LogItem()
388 new_checkpoint.id = LogItem.random_id()
389 new_checkpoint.inum = 0
390 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
391 new_checkpoint.data = ""
392 new_checkpoint.links = []
394 for i in range(len(self.checkpoint_record.data) // 16):
395 (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
397 new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
399 # Case 1: No inodes in this range of the old inode map have
400 # changed. Simply emit a new pointer to the same inode map block.
401 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
402 old_location = self.checkpoint_record.links[i][1][0:2]
403 if old_location not in self.obsolete_segments:
404 new_checkpoint.links.append(self.checkpoint_record.links[i])
407 # Case 2: Some inodes have been updated. Create a new inode map
408 # block, write it out, and point the new checkpoint at it.
409 inodes = [k for k in self.inodes if k >= start and k <= end]
413 block.id = LogItem.random_id()
415 block.type = ITEM_TYPE.INODE_MAP
419 block.data += struct.pack("<Q", j)
420 block.links.append((self.inodes[j].id, self.inodes[j].location))
423 new_checkpoint.links.append((block.id, block.location))
425 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
428 log.write(new_checkpoint, 2)
429 self.checkpoint_record = new_checkpoint
431 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
432 inode = inode_map.inodes[inum]
435 for l in inode.links:
436 data = load_item(backend, l[1])
439 inode.links = [(b.id, b.location) for b in blocks]
441 inode_map.mark_updated(inum)
443 def run_cleaner(backend, inode_map, log, repack_inodes=False):
444 # Determine which segments are poorly utilized and should be cleaned. We
445 # need better heuristics here.
446 for (s, u) in sorted(inode_map.util.segments.items()):
447 if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
448 print "Should clean segment", s
449 loc = backend.name_to_loc(s)
450 if s: inode_map.obsolete_segments.add(loc)
452 # TODO: We probably also want heuristics that will find inodes with
453 # badly-fragmented data and rewrite that to achieve better locality.
455 # Given that list of segments to clean, scan through those segments to find
456 # data which is still live and mark relevant inodes as needing to be
459 dirty_inodes = set(inode_map.inodes)
462 dirty_inode_data = set()
463 for s in inode_map.obsolete_segments:
464 filename = backend.loc_to_name(s)
465 print "Scanning", filename, "for live data"
466 for item in parse_log(backend.read(filename), filename):
467 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
469 inode = inode_map.inodes[item.inum]
470 if s == inode.location[0:2]:
471 dirty_inodes.add(item.inum)
472 if item.inum not in dirty_inode_data:
473 for b in inode.links:
475 dirty_inode_data.add(item.inum)
478 print "Inodes to rewrite:", dirty_inodes
479 print "Inodes with data to rewrite:", dirty_inode_data
480 for i in sorted(dirty_inodes.union(dirty_inode_data)):
481 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
483 if __name__ == '__main__':
484 backend = S3Backend("mvrable-bluesky", cachedir=".")
485 chkpt = load_checkpoint_record(backend)
488 imap.build(backend, chkpt)
491 log_dir = LogDirectory(backend, 0)
492 run_cleaner(backend, imap, log_dir)
493 imap.write(backend, log_dir)