3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys, time
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
19 HEADER_MAGIC2 = 'AgI=' # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
31 """An interface to BlueSky where the log segments are on local disk.
33 This is mainly intended for testing purposes, as the real cleaner would
34 operate where data is being stored in S3."""
36 def __init__(self, path):
39 def list(self, directory=0):
40 """Return a listing of all log segments and their sizes."""
42 prefix = "log-%08d-" % (directory,)
43 files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
46 return [(f, os.stat(os.path.join(self.path, f)).st_size)
49 def read(self, filename, offset=0, length=None):
50 fp = open(os.path.join(self.path, filename), 'rb')
56 return fp.read(length)
58 def write(self, filename, data):
59 fp = open(os.path.join(self.path, filename), 'wb')
63 def delete(self, filename):
64 os.unlink(os.path.join(self.path, filename))
66 def loc_to_name(self, location):
67 return "log-%08d-%08d" % (location)
69 def name_to_loc(self, name):
70 m = re.match(r"^log-(\d+)-(\d+)$", name)
71 if m: return (int(m.group(1)), int(m.group(2)))
73 def retry_wrap(method):
74 def wrapped(self, *args, **kwargs):
75 for retries in range(3):
77 return method(self, *args, **kwargs)
79 print >>sys.stderr, "S3 operation failed, retrying..."
82 return method(self, *args, **kwargs)
86 """An interface to BlueSky where the log segments are on in Amazon S3."""
88 def __init__(self, bucket, path='', cachedir="."):
89 self.bucket_name = bucket
91 self.cachedir = cachedir
96 self.conn = boto.connect_s3(is_secure=False)
97 self.bucket = self.conn.get_bucket(self.bucket_name)
99 def list(self, directory=0):
101 prefix = "log-%08d-" % (directory,)
102 for k in self.bucket.list(self.path + prefix):
103 files.append((k.key, k.size))
107 def read(self, filename, offset=0, length=None):
108 if filename in self.cache:
109 fp = open(os.path.join(self.cachedir, filename), 'rb')
115 return fp.read(length)
118 k.key = self.path + filename
119 data = k.get_contents_as_string()
120 fp = open(os.path.join(self.cachedir, filename), 'wb')
123 self.cache[filename] = True
126 if length is not None:
127 data = data[0:length]
131 def write(self, filename, data):
133 k.key = self.path + filename
134 k.set_contents_from_string(data)
135 if filename in self.cache:
136 del self.cache[filename]
139 def delete(self, filename):
141 k.key = self.path + filename
143 if filename in self.cache:
144 del self.cache[filename]
146 def loc_to_name(self, location):
147 return "log-%08d-%08d" % (location)
149 def name_to_loc(self, name):
150 m = re.match(r"^log-(\d+)-(\d+)$", name)
151 if m: return (int(m.group(1)), int(m.group(2)))
154 """In-memory representation of a single item stored in a log file."""
157 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
158 self.encrypted = False
161 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
165 return open('/dev/urandom').read(16)
170 for (i, l) in self.links:
173 link_locs.append(struct.pack('<IIII', *l))
174 link_ids = ''.join(link_ids)
175 link_locs = ''.join(link_locs)
178 magic = HEADER_MAGIC2
180 magic = HEADER_MAGIC1
181 header = struct.pack(HEADER_FORMAT,
182 magic, self.cryptkeys,
183 ord(self.type), self.id, self.inum,
184 len(self.data), len(link_ids), len(link_locs))
185 return header + self.data + link_ids + link_locs
188 def __init__(self, backend, location):
189 self.backend = backend
190 self.location = location
194 return sum(len(s) for s in self.data)
196 def write(self, item):
197 data = item.serialize()
199 self.data.append(data)
200 item.location = self.location + (offset, len(data))
203 data = ''.join(self.data)
204 filename = self.backend.loc_to_name(self.location)
205 print "Would write %d bytes of data to %s" % (len(data), filename)
206 self.backend.write(filename, data)
209 TARGET_SIZE = 4 << 20
211 def __init__(self, backend, dir):
212 self.backend = backend
215 for logname in backend.list(dir):
216 print "Old log file:", logname
217 loc = backend.name_to_loc(logname[0])
218 if loc is not None and loc[0] == dir:
219 self.seq_num = max(self.seq_num, loc[1] + 1)
221 print "Starting sequence number is", self.seq_num
223 def open_segment(self):
224 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
228 def write(self, item, segment_group=0):
229 if segment_group not in self.groups:
230 self.groups[segment_group] = self.open_segment()
231 seg = self.groups[segment_group]
233 if len(seg) >= LogDirectory.TARGET_SIZE:
235 del self.groups[segment_group]
238 for k in list(self.groups.keys()):
239 self.groups[k].close()
242 class UtilizationTracker:
243 """A simple object that tracks what fraction of each segment is used.
245 This data can be used to guide segment cleaning decisions."""
247 def __init__(self, backend):
249 for (segment, size) in backend.list(0) + backend.list(1):
250 self.segments[segment] = [size, 0]
252 def add_item(self, item):
253 if isinstance(item, LogItem):
255 if item is None: return
256 (dir, seq, offset, size) = item
257 filename = "log-%08d-%08d" % (dir, seq)
258 self.segments[filename][1] += size
260 def parse_item(data):
261 if len(data) < HEADER_SIZE: return
262 header = struct.unpack_from(HEADER_FORMAT, data, 0)
263 size = HEADER_SIZE + sum(header[5:8])
265 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
266 print "Bad header magic!"
269 if len(data) != size:
270 print "Item size does not match: %d != %d" % (size, len(data))
274 if header[0] == HEADER_MAGIC2: item.encrypted = True
275 item.cryptkeys = header[1]
277 item.inum = header[4]
279 item.type = chr(header[2])
281 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
283 link_ids = data[HEADER_SIZE + header[5]
284 : HEADER_SIZE + header[5] + header[6]]
285 link_locs = data[HEADER_SIZE + header[5] + header[6]
286 : HEADER_SIZE + sum(header[5:8])]
287 for i in range(len(link_ids) // 16):
288 id = link_ids[16*i : 16*i + 16]
292 loc = struct.unpack('<IIII', link_locs[0:16])
293 link_locs = link_locs[16:]
294 links.append((id, loc))
298 def load_item(backend, location):
299 """Load the cloud item pointed at by the 4-tuple 'location'.
301 The elements of the tuple are (directory, sequence, offset, size)."""
303 filename = backend.loc_to_name((location[0], location[1]))
304 data = backend.read(filename, location[2], location[3])
305 item = parse_item(data)
306 item.location = location
309 def parse_log(data, location=None):
310 """Parse contents of a log file, yielding a sequence of log items."""
312 if isinstance(location, str):
313 m = re.match(r"^log-(\d+)-(\d+)$", location)
315 location = (int(m.group(1)), int(m.group(2)))
320 while len(data) - offset >= HEADER_SIZE:
321 header = struct.unpack_from(HEADER_FORMAT, data, offset)
322 size = HEADER_SIZE + sum(header[5:8])
323 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
324 print "Bad header magic!"
326 if size + offset > len(data):
327 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
329 item = parse_item(data[offset : offset + size])
330 if location is not None:
331 item.location = location + (offset, size)
332 if item is not None: yield item
335 def load_checkpoint_record(backend, directory=0):
336 for (log, size) in reversed(backend.list(directory)):
337 for item in reversed(list(parse_log(backend.read(log), log))):
339 if item.type == ITEM_TYPE.CHECKPOINT:
346 def build(self, backend, checkpoint_record):
347 """Reconstruct the inode map from the checkpoint record given.
349 This will also build up information about segment utilization."""
351 self.version_vector = {}
352 self.checkpoint_record = checkpoint_record
354 util = UtilizationTracker(backend)
355 util.add_item(checkpoint_record)
357 self.obsolete_segments = set()
359 data = checkpoint_record.data
360 if not data.startswith(CHECKPOINT_MAGIC):
361 raise ValueError, "Invalid checkpoint record!"
362 data = data[len(CHECKPOINT_MAGIC):]
363 (vvlen,) = struct.unpack_from("<I", data, 0)
364 self.vvsize = 4 + 8*vvlen
365 for i in range(vvlen):
366 (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
367 self.version_vector[v1] = v2
368 print self.version_vector
369 self.version_vector[checkpoint_record.location[0]] \
370 = checkpoint_record.location[1]
371 print self.version_vector
373 data = data[self.vvsize:]
376 for i in range(len(data) // 16):
377 (start, end) = struct.unpack_from("<QQ", data, 16*i)
378 imap = load_item(backend, checkpoint_record.links[i][1])
380 print "[%d, %d]: %s" % (start, end, imap)
381 for j in range(len(imap.data) // 8):
382 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
383 inode = load_item(backend, imap.links[j][1])
385 data_segments = set()
387 for i in inode.links:
389 data_segments.add(i[1][0:2])
390 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
393 print "Segment utilizations:"
394 for (s, u) in sorted(util.segments.items()):
395 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
397 # print "Deleting..."
403 self.updated_inodes = set()
405 def mark_updated(self, inum):
406 self.updated_inodes.add(inum)
408 def write(self, backend, log):
409 updated_inodes = sorted(self.updated_inodes, reverse=True)
411 new_checkpoint = LogItem()
412 new_checkpoint.id = LogItem.random_id()
413 new_checkpoint.inum = 0
414 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
415 new_checkpoint.data = CHECKPOINT_MAGIC
416 new_checkpoint.links = []
418 new_checkpoint.data += struct.pack('<I', len(self.version_vector))
419 for d in sorted(self.version_vector):
420 new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
422 data = self.checkpoint_record.data[self.vvsize:]
423 for i in range(len(data) // 16):
424 (start, end) = struct.unpack_from("<QQ", data, 16*i)
426 new_checkpoint.data += data[16*i : 16*i + 16]
428 # Case 1: No inodes in this range of the old inode map have
429 # changed. Simply emit a new pointer to the same inode map block.
430 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
431 old_location = self.checkpoint_record.links[i][1][0:2]
432 if old_location not in self.obsolete_segments:
433 new_checkpoint.links.append(self.checkpoint_record.links[i])
436 # Case 2: Some inodes have been updated. Create a new inode map
437 # block, write it out, and point the new checkpoint at it.
438 inodes = [k for k in self.inodes if k >= start and k <= end]
442 block.id = LogItem.random_id()
444 block.type = ITEM_TYPE.INODE_MAP
448 block.data += struct.pack("<Q", j)
449 block.links.append((self.inodes[j].id, self.inodes[j].location))
452 new_checkpoint.links.append((block.id, block.location))
454 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
457 log.write(new_checkpoint, 2)
458 self.checkpoint_record = new_checkpoint
460 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
461 inode = inode_map.inodes[inum]
464 for l in inode.links:
465 data = load_item(backend, l[1])
468 inode.links = [(b.id, b.location) for b in blocks]
470 inode_map.mark_updated(inum)
472 def run_cleaner(backend, inode_map, log, repack_inodes=False):
473 # Determine which segments are poorly utilized and should be cleaned. We
474 # need better heuristics here.
475 for (s, u) in sorted(inode_map.util.segments.items()):
476 if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
477 print "Should clean segment", s
478 loc = backend.name_to_loc(s)
479 if s: inode_map.obsolete_segments.add(loc)
481 # TODO: We probably also want heuristics that will find inodes with
482 # badly-fragmented data and rewrite that to achieve better locality.
484 # Given that list of segments to clean, scan through those segments to find
485 # data which is still live and mark relevant inodes as needing to be
488 dirty_inodes = set(inode_map.inodes)
491 dirty_inode_data = set()
492 for s in inode_map.obsolete_segments:
493 filename = backend.loc_to_name(s)
494 print "Scanning", filename, "for live data"
495 for item in parse_log(backend.read(filename), filename):
496 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
498 inode = inode_map.inodes[item.inum]
499 if s == inode.location[0:2]:
500 dirty_inodes.add(item.inum)
501 if item.inum not in dirty_inode_data:
502 for b in inode.links:
504 dirty_inode_data.add(item.inum)
507 print "Inodes to rewrite:", dirty_inodes
508 print "Inodes with data to rewrite:", dirty_inode_data
509 for i in sorted(dirty_inodes.union(dirty_inode_data)):
510 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
512 if __name__ == '__main__':
513 #backend = S3Backend("mvrable-bluesky", cachedir=".")
514 backend = FileBackend(".")
515 chkpt = load_checkpoint_record(backend)
518 imap.build(backend, chkpt)
521 log_dir = LogDirectory(backend, 1)
522 run_cleaner(backend, imap, log_dir)
523 print "Version vector:", imap.version_vector
524 imap.write(backend, log_dir)