3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys, time
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
19 HEADER_MAGIC2 = 'AgI=' # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
31 """Base class for BlueSky storage backends."""
33 def loc_to_name(self, location):
34 return "log-%08d-%08d" % (location)
36 def name_to_loc(self, name):
37 m = re.match(r"^log-(\d+)-(\d+)$", name)
38 if m: return (int(m.group(1)), int(m.group(2)))
43 class FileBackend(Backend):
44 """An interface to BlueSky where the log segments are on local disk.
46 This is mainly intended for testing purposes, as the real cleaner would
47 operate where data is being stored in S3."""
49 def __init__(self, path):
52 def list(self, directory=0):
53 """Return a listing of all log segments and their sizes."""
55 prefix = "log-%08d-" % (directory,)
56 files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
59 return [(f, os.stat(os.path.join(self.path, f)).st_size)
62 def read(self, filename, offset=0, length=None):
63 fp = open(os.path.join(self.path, filename), 'rb')
69 return fp.read(length)
71 def write(self, filename, data):
72 fp = open(os.path.join(self.path, filename), 'wb')
76 def delete(self, filename):
77 os.unlink(os.path.join(self.path, filename))
79 def retry_wrap(method):
80 def wrapped(self, *args, **kwargs):
81 for retries in range(3):
83 return method(self, *args, **kwargs)
85 print >>sys.stderr, "S3 operation failed, retrying..."
88 return method(self, *args, **kwargs)
91 class S3Backend(Backend):
92 """An interface to BlueSky where the log segments are on in Amazon S3."""
94 def __init__(self, bucket, path='', cachedir="."):
95 self.bucket_name = bucket
97 self.cachedir = cachedir
99 for f in os.listdir(cachedir):
101 print "Initial cache contents:", list(self.cache.keys())
103 self.stats_get = [0, 0]
104 self.stats_put = [0, 0]
107 self.conn = boto.connect_s3(is_secure=False)
108 self.bucket = self.conn.get_bucket(self.bucket_name)
110 def list(self, directory=0):
112 prefix = "log-%08d-" % (directory,)
113 for k in self.bucket.list(self.path + prefix):
114 files.append((k.key, k.size))
118 def read(self, filename, offset=0, length=None):
119 if filename in self.cache:
120 fp = open(os.path.join(self.cachedir, filename), 'rb')
126 return fp.read(length)
129 k.key = self.path + filename
130 data = k.get_contents_as_string()
131 fp = open(os.path.join(self.cachedir, filename), 'wb')
134 self.cache[filename] = True
135 self.stats_get[0] += 1
136 self.stats_get[1] += len(data)
139 if length is not None:
140 data = data[0:length]
144 def write(self, filename, data):
146 k.key = self.path + filename
147 k.set_contents_from_string(data)
148 self.stats_put[0] += 1
149 self.stats_put[1] += len(data)
150 if filename in self.cache:
151 del self.cache[filename]
154 def delete(self, filename):
156 k.key = self.path + filename
158 if filename in self.cache:
159 del self.cache[filename]
161 def dump_stats(self):
162 print "S3 statistics:"
163 print "GET: %d ops / %d bytes" % tuple(self.stats_get)
164 print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
166 class SimpleBackend(Backend):
167 """An interface to the simple BlueSky test network server."""
169 def __init__(self, server=('localhost', 12345), cachedir="."):
170 self.bucket_name = bucket
171 self.server_address = server
172 self.cachedir = cachedir
175 def _get_socket(self):
176 return socket.create_connection(self.server_address).makefile()
178 def list(self, directory=0):
180 prefix = "log-%08d-" % (directory,)
181 for k in self.bucket.list(self.path + prefix):
182 files.append((k.key, k.size))
185 def read(self, filename, offset=0, length=None):
186 if filename in self.cache:
187 fp = open(os.path.join(self.cachedir, filename), 'rb')
193 return fp.read(length)
195 f = self._get_socket()
196 f.write("GET %s %d %d\n" % (filename, 0, 0))
198 datalen = int(f.readline())
201 data = f.read(datalen)
202 fp = open(os.path.join(self.cachedir, filename), 'wb')
205 self.cache[filename] = True
208 if length is not None:
209 data = data[0:length]
212 def write(self, filename, data):
213 f = self._get_socket()
214 f.write("PUT %s %d %d\n" % (filename, len(data)))
217 result = int(f.readline())
218 if filename in self.cache:
219 del self.cache[filename]
221 def delete(self, filename):
225 """In-memory representation of a single item stored in a log file."""
228 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
229 self.encrypted = False
232 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
236 return open('/dev/urandom').read(16)
241 for (i, l) in self.links:
244 link_locs.append(struct.pack('<IIII', *l))
245 link_ids = ''.join(link_ids)
246 link_locs = ''.join(link_locs)
249 magic = HEADER_MAGIC2
251 magic = HEADER_MAGIC1
252 header = struct.pack(HEADER_FORMAT,
253 magic, self.cryptkeys,
254 ord(self.type), self.id, self.inum,
255 len(self.data), len(link_ids), len(link_locs))
256 return header + self.data + link_ids + link_locs
259 def __init__(self, backend, location):
260 self.backend = backend
261 self.location = location
265 return sum(len(s) for s in self.data)
267 def write(self, item):
268 data = item.serialize()
270 self.data.append(data)
271 item.location = self.location + (offset, len(data))
274 data = ''.join(self.data)
275 filename = self.backend.loc_to_name(self.location)
276 print "Would write %d bytes of data to %s" % (len(data), filename)
277 self.backend.write(filename, data)
280 TARGET_SIZE = 4 << 20
282 def __init__(self, backend, dir):
283 self.backend = backend
286 for logname in backend.list(dir):
287 print "Old log file:", logname
288 loc = backend.name_to_loc(logname[0])
289 if loc is not None and loc[0] == dir:
290 self.seq_num = max(self.seq_num, loc[1] + 1)
292 print "Starting sequence number is", self.seq_num
294 def open_segment(self):
295 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
299 def write(self, item, segment_group=0):
300 if segment_group not in self.groups:
301 self.groups[segment_group] = self.open_segment()
302 seg = self.groups[segment_group]
304 if len(seg) >= LogDirectory.TARGET_SIZE:
306 del self.groups[segment_group]
309 for k in list(self.groups.keys()):
310 self.groups[k].close()
313 class UtilizationTracker:
314 """A simple object that tracks what fraction of each segment is used.
316 This data can be used to guide segment cleaning decisions."""
318 def __init__(self, backend):
320 for (segment, size) in backend.list(0) + backend.list(1):
321 self.segments[segment] = [size, 0]
323 def add_item(self, item):
324 if isinstance(item, LogItem):
326 if item is None: return
327 (dir, seq, offset, size) = item
328 filename = "log-%08d-%08d" % (dir, seq)
329 self.segments[filename][1] += size
331 def parse_item(data):
332 if len(data) < HEADER_SIZE: return
333 header = struct.unpack_from(HEADER_FORMAT, data, 0)
334 size = HEADER_SIZE + sum(header[5:8])
336 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
337 print "Bad header magic!"
340 if len(data) != size:
341 print "Item size does not match: %d != %d" % (size, len(data))
345 if header[0] == HEADER_MAGIC2: item.encrypted = True
346 item.cryptkeys = header[1]
348 item.inum = header[4]
350 item.type = chr(header[2])
352 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
354 link_ids = data[HEADER_SIZE + header[5]
355 : HEADER_SIZE + header[5] + header[6]]
356 link_locs = data[HEADER_SIZE + header[5] + header[6]
357 : HEADER_SIZE + sum(header[5:8])]
358 for i in range(len(link_ids) // 16):
359 id = link_ids[16*i : 16*i + 16]
363 loc = struct.unpack('<IIII', link_locs[0:16])
364 link_locs = link_locs[16:]
365 links.append((id, loc))
369 def load_item(backend, location):
370 """Load the cloud item pointed at by the 4-tuple 'location'.
372 The elements of the tuple are (directory, sequence, offset, size)."""
374 filename = backend.loc_to_name((location[0], location[1]))
375 data = backend.read(filename, location[2], location[3])
376 item = parse_item(data)
377 item.location = location
380 def parse_log(data, location=None):
381 """Parse contents of a log file, yielding a sequence of log items."""
383 if isinstance(location, str):
384 m = re.match(r"^log-(\d+)-(\d+)$", location)
386 location = (int(m.group(1)), int(m.group(2)))
391 while len(data) - offset >= HEADER_SIZE:
392 header = struct.unpack_from(HEADER_FORMAT, data, offset)
393 size = HEADER_SIZE + sum(header[5:8])
394 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
395 print "Bad header magic!"
397 if size + offset > len(data):
398 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
400 item = parse_item(data[offset : offset + size])
401 if location is not None:
402 item.location = location + (offset, size)
403 if item is not None: yield item
406 def load_checkpoint_record(backend, directory=0):
407 for (log, size) in reversed(backend.list(directory)):
408 for item in reversed(list(parse_log(backend.read(log), log))):
410 if item.type == ITEM_TYPE.CHECKPOINT:
417 def build(self, backend, checkpoint_record):
418 """Reconstruct the inode map from the checkpoint record given.
420 This will also build up information about segment utilization."""
422 self.version_vector = {}
423 self.checkpoint_record = checkpoint_record
425 util = UtilizationTracker(backend)
426 util.add_item(checkpoint_record)
428 self.obsolete_segments = set()
430 data = checkpoint_record.data
431 if not data.startswith(CHECKPOINT_MAGIC):
432 raise ValueError, "Invalid checkpoint record!"
433 data = data[len(CHECKPOINT_MAGIC):]
434 (vvlen,) = struct.unpack_from("<I", data, 0)
435 self.vvsize = 4 + 8*vvlen
436 for i in range(vvlen):
437 (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
438 self.version_vector[v1] = v2
439 print self.version_vector
440 self.version_vector[checkpoint_record.location[0]] \
441 = checkpoint_record.location[1]
442 print self.version_vector
444 data = data[self.vvsize:]
447 for i in range(len(data) // 16):
448 (start, end) = struct.unpack_from("<QQ", data, 16*i)
449 imap = load_item(backend, checkpoint_record.links[i][1])
451 print "[%d, %d]: %s" % (start, end, imap)
452 for j in range(len(imap.data) // 8):
453 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
454 inode = load_item(backend, imap.links[j][1])
456 data_segments = set()
458 for i in inode.links:
460 data_segments.add(i[1][0:2])
461 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
464 print "Segment utilizations:"
467 for (s, u) in sorted(util.segments.items()):
468 for i in range(2): total_data[i] += u[i]
469 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
471 print "Would delete..."
478 self.updated_inodes = set()
480 print "%d bytes total / %d bytes used" % tuple(total_data)
481 print "would delete %d segments (%d bytes)" % tuple(deletions)
483 def mark_updated(self, inum):
484 self.updated_inodes.add(inum)
486 def write(self, backend, log):
487 updated_inodes = sorted(self.updated_inodes, reverse=True)
489 new_checkpoint = LogItem()
490 new_checkpoint.id = LogItem.random_id()
491 new_checkpoint.inum = 0
492 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
493 new_checkpoint.data = CHECKPOINT_MAGIC
494 new_checkpoint.links = []
496 new_checkpoint.data += struct.pack('<I', len(self.version_vector))
497 for d in sorted(self.version_vector):
498 new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
500 data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
501 for i in range(len(data) // 16):
502 (start, end) = struct.unpack_from("<QQ", data, 16*i)
504 new_checkpoint.data += data[16*i : 16*i + 16]
506 # Case 1: No inodes in this range of the old inode map have
507 # changed. Simply emit a new pointer to the same inode map block.
508 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
509 old_location = self.checkpoint_record.links[i][1][0:2]
510 if old_location not in self.obsolete_segments:
511 new_checkpoint.links.append(self.checkpoint_record.links[i])
514 # Case 2: Some inodes have been updated. Create a new inode map
515 # block, write it out, and point the new checkpoint at it.
516 inodes = [k for k in self.inodes if k >= start and k <= end]
520 block.id = LogItem.random_id()
522 block.type = ITEM_TYPE.INODE_MAP
526 block.data += struct.pack("<Q", j)
527 block.links.append((self.inodes[j].id, self.inodes[j].location))
530 new_checkpoint.links.append((block.id, block.location))
532 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
535 log.write(new_checkpoint, 2)
536 self.checkpoint_record = new_checkpoint
538 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
539 inode = inode_map.inodes[inum]
542 for l in inode.links:
543 data = load_item(backend, l[1])
546 inode.links = [(b.id, b.location) for b in blocks]
548 inode_map.mark_updated(inum)
550 def run_cleaner(backend, inode_map, log, repack_inodes=False):
551 # Determine which segments are poorly utilized and should be cleaned. We
552 # need better heuristics here.
553 for (s, u) in sorted(inode_map.util.segments.items()):
554 if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
555 print "Should clean segment", s
556 loc = backend.name_to_loc(s)
557 if s: inode_map.obsolete_segments.add(loc)
559 # TODO: We probably also want heuristics that will find inodes with
560 # badly-fragmented data and rewrite that to achieve better locality.
562 # Given that list of segments to clean, scan through those segments to find
563 # data which is still live and mark relevant inodes as needing to be
566 dirty_inodes = set(inode_map.inodes)
569 dirty_inode_data = set()
570 for s in inode_map.obsolete_segments:
571 filename = backend.loc_to_name(s)
572 print "Scanning", filename, "for live data"
573 for item in parse_log(backend.read(filename), filename):
574 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
576 inode = inode_map.inodes[item.inum]
577 if s == inode.location[0:2]:
578 dirty_inodes.add(item.inum)
579 if item.inum not in dirty_inode_data:
580 for b in inode.links:
582 dirty_inode_data.add(item.inum)
585 print "Inodes to rewrite:", dirty_inodes
586 print "Inodes with data to rewrite:", dirty_inode_data
587 for i in sorted(dirty_inodes.union(dirty_inode_data)):
588 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
590 if __name__ == '__main__':
591 start_time = time.time()
592 backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
593 #backend = FileBackend(".")
594 chkpt = load_checkpoint_record(backend)
597 imap.build(backend, chkpt)
600 log_dir = LogDirectory(backend, 1)
601 run_cleaner(backend, imap, log_dir)
602 print "Version vector:", imap.version_vector
603 imap.write(backend, log_dir)
605 end_time = time.time()
606 print "Cleaner running time:", end_time - start_time