3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys, time
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
19 HEADER_MAGIC2 = 'AgI=' # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
31 """Base class for BlueSky storage backends."""
33 def loc_to_name(self, location):
34 return "log-%08d-%08d" % (location)
36 def name_to_loc(self, name):
37 m = re.match(r"^log-(\d+)-(\d+)$", name)
38 if m: return (int(m.group(1)), int(m.group(2)))
43 class FileBackend(Backend):
44 """An interface to BlueSky where the log segments are on local disk.
46 This is mainly intended for testing purposes, as the real cleaner would
47 operate where data is being stored in S3."""
49 def __init__(self, path):
52 def list(self, directory=0):
53 """Return a listing of all log segments and their sizes."""
55 prefix = "log-%08d-" % (directory,)
56 files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
59 return [(f, os.stat(os.path.join(self.path, f)).st_size)
62 def read(self, filename, offset=0, length=None):
63 fp = open(os.path.join(self.path, filename), 'rb')
69 return fp.read(length)
71 def write(self, filename, data):
72 fp = open(os.path.join(self.path, filename), 'wb')
76 def delete(self, filename):
77 os.unlink(os.path.join(self.path, filename))
79 def retry_wrap(method):
80 def wrapped(self, *args, **kwargs):
81 for retries in range(3):
83 return method(self, *args, **kwargs)
85 print >>sys.stderr, "S3 operation failed, retrying..."
88 return method(self, *args, **kwargs)
91 class S3Backend(Backend):
92 """An interface to BlueSky where the log segments are on in Amazon S3."""
94 def __init__(self, bucket, path='', cachedir="."):
95 self.bucket_name = bucket
97 self.cachedir = cachedir
100 self.stats_get = [0, 0]
101 self.stats_put = [0, 0]
104 self.conn = boto.connect_s3(is_secure=False)
105 self.bucket = self.conn.get_bucket(self.bucket_name)
107 def list(self, directory=0):
109 prefix = "log-%08d-" % (directory,)
110 for k in self.bucket.list(self.path + prefix):
111 files.append((k.key, k.size))
115 def read(self, filename, offset=0, length=None):
116 if filename in self.cache:
117 fp = open(os.path.join(self.cachedir, filename), 'rb')
123 return fp.read(length)
126 k.key = self.path + filename
127 data = k.get_contents_as_string()
128 fp = open(os.path.join(self.cachedir, filename), 'wb')
131 self.cache[filename] = True
132 self.stats_get[0] += 1
133 self.stats_get[1] += len(data)
136 if length is not None:
137 data = data[0:length]
141 def write(self, filename, data):
143 k.key = self.path + filename
144 k.set_contents_from_string(data)
145 self.stats_put[0] += 1
146 self.stats_put[1] += len(data)
147 if filename in self.cache:
148 del self.cache[filename]
151 def delete(self, filename):
153 k.key = self.path + filename
155 if filename in self.cache:
156 del self.cache[filename]
158 def dump_stats(self):
159 print "S3 statistics:"
160 print "GET: %d ops / %d bytes" % tuple(self.stats_get)
161 print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
163 class SimpleBackend(Backend):
164 """An interface to the simple BlueSky test network server."""
166 def __init__(self, server=('localhost', 12345), cachedir="."):
167 self.bucket_name = bucket
168 self.server_address = server
169 self.cachedir = cachedir
172 def _get_socket(self):
173 return socket.create_connection(self.server_address).makefile()
175 def list(self, directory=0):
177 prefix = "log-%08d-" % (directory,)
178 for k in self.bucket.list(self.path + prefix):
179 files.append((k.key, k.size))
182 def read(self, filename, offset=0, length=None):
183 if filename in self.cache:
184 fp = open(os.path.join(self.cachedir, filename), 'rb')
190 return fp.read(length)
192 f = self._get_socket()
193 f.write("GET %s %d %d\n" % (filename, 0, 0))
195 datalen = int(f.readline())
198 data = f.read(datalen)
199 fp = open(os.path.join(self.cachedir, filename), 'wb')
202 self.cache[filename] = True
205 if length is not None:
206 data = data[0:length]
209 def write(self, filename, data):
210 f = self._get_socket()
211 f.write("PUT %s %d %d\n" % (filename, len(data)))
214 result = int(f.readline())
215 if filename in self.cache:
216 del self.cache[filename]
218 def delete(self, filename):
222 """In-memory representation of a single item stored in a log file."""
225 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
226 self.encrypted = False
229 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
233 return open('/dev/urandom').read(16)
238 for (i, l) in self.links:
241 link_locs.append(struct.pack('<IIII', *l))
242 link_ids = ''.join(link_ids)
243 link_locs = ''.join(link_locs)
246 magic = HEADER_MAGIC2
248 magic = HEADER_MAGIC1
249 header = struct.pack(HEADER_FORMAT,
250 magic, self.cryptkeys,
251 ord(self.type), self.id, self.inum,
252 len(self.data), len(link_ids), len(link_locs))
253 return header + self.data + link_ids + link_locs
256 def __init__(self, backend, location):
257 self.backend = backend
258 self.location = location
262 return sum(len(s) for s in self.data)
264 def write(self, item):
265 data = item.serialize()
267 self.data.append(data)
268 item.location = self.location + (offset, len(data))
271 data = ''.join(self.data)
272 filename = self.backend.loc_to_name(self.location)
273 print "Would write %d bytes of data to %s" % (len(data), filename)
274 self.backend.write(filename, data)
277 TARGET_SIZE = 4 << 20
279 def __init__(self, backend, dir):
280 self.backend = backend
283 for logname in backend.list(dir):
284 print "Old log file:", logname
285 loc = backend.name_to_loc(logname[0])
286 if loc is not None and loc[0] == dir:
287 self.seq_num = max(self.seq_num, loc[1] + 1)
289 print "Starting sequence number is", self.seq_num
291 def open_segment(self):
292 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
296 def write(self, item, segment_group=0):
297 if segment_group not in self.groups:
298 self.groups[segment_group] = self.open_segment()
299 seg = self.groups[segment_group]
301 if len(seg) >= LogDirectory.TARGET_SIZE:
303 del self.groups[segment_group]
306 for k in list(self.groups.keys()):
307 self.groups[k].close()
310 class UtilizationTracker:
311 """A simple object that tracks what fraction of each segment is used.
313 This data can be used to guide segment cleaning decisions."""
315 def __init__(self, backend):
317 for (segment, size) in backend.list(0) + backend.list(1):
318 self.segments[segment] = [size, 0]
320 def add_item(self, item):
321 if isinstance(item, LogItem):
323 if item is None: return
324 (dir, seq, offset, size) = item
325 filename = "log-%08d-%08d" % (dir, seq)
326 self.segments[filename][1] += size
328 def parse_item(data):
329 if len(data) < HEADER_SIZE: return
330 header = struct.unpack_from(HEADER_FORMAT, data, 0)
331 size = HEADER_SIZE + sum(header[5:8])
333 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
334 print "Bad header magic!"
337 if len(data) != size:
338 print "Item size does not match: %d != %d" % (size, len(data))
342 if header[0] == HEADER_MAGIC2: item.encrypted = True
343 item.cryptkeys = header[1]
345 item.inum = header[4]
347 item.type = chr(header[2])
349 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
351 link_ids = data[HEADER_SIZE + header[5]
352 : HEADER_SIZE + header[5] + header[6]]
353 link_locs = data[HEADER_SIZE + header[5] + header[6]
354 : HEADER_SIZE + sum(header[5:8])]
355 for i in range(len(link_ids) // 16):
356 id = link_ids[16*i : 16*i + 16]
360 loc = struct.unpack('<IIII', link_locs[0:16])
361 link_locs = link_locs[16:]
362 links.append((id, loc))
366 def load_item(backend, location):
367 """Load the cloud item pointed at by the 4-tuple 'location'.
369 The elements of the tuple are (directory, sequence, offset, size)."""
371 filename = backend.loc_to_name((location[0], location[1]))
372 data = backend.read(filename, location[2], location[3])
373 item = parse_item(data)
374 item.location = location
377 def parse_log(data, location=None):
378 """Parse contents of a log file, yielding a sequence of log items."""
380 if isinstance(location, str):
381 m = re.match(r"^log-(\d+)-(\d+)$", location)
383 location = (int(m.group(1)), int(m.group(2)))
388 while len(data) - offset >= HEADER_SIZE:
389 header = struct.unpack_from(HEADER_FORMAT, data, offset)
390 size = HEADER_SIZE + sum(header[5:8])
391 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
392 print "Bad header magic!"
394 if size + offset > len(data):
395 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
397 item = parse_item(data[offset : offset + size])
398 if location is not None:
399 item.location = location + (offset, size)
400 if item is not None: yield item
403 def load_checkpoint_record(backend, directory=0):
404 for (log, size) in reversed(backend.list(directory)):
405 for item in reversed(list(parse_log(backend.read(log), log))):
407 if item.type == ITEM_TYPE.CHECKPOINT:
414 def build(self, backend, checkpoint_record):
415 """Reconstruct the inode map from the checkpoint record given.
417 This will also build up information about segment utilization."""
419 self.version_vector = {}
420 self.checkpoint_record = checkpoint_record
422 util = UtilizationTracker(backend)
423 util.add_item(checkpoint_record)
425 self.obsolete_segments = set()
427 data = checkpoint_record.data
428 if not data.startswith(CHECKPOINT_MAGIC):
429 raise ValueError, "Invalid checkpoint record!"
430 data = data[len(CHECKPOINT_MAGIC):]
431 (vvlen,) = struct.unpack_from("<I", data, 0)
432 self.vvsize = 4 + 8*vvlen
433 for i in range(vvlen):
434 (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
435 self.version_vector[v1] = v2
436 print self.version_vector
437 self.version_vector[checkpoint_record.location[0]] \
438 = checkpoint_record.location[1]
439 print self.version_vector
441 data = data[self.vvsize:]
444 for i in range(len(data) // 16):
445 (start, end) = struct.unpack_from("<QQ", data, 16*i)
446 imap = load_item(backend, checkpoint_record.links[i][1])
448 print "[%d, %d]: %s" % (start, end, imap)
449 for j in range(len(imap.data) // 8):
450 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
451 inode = load_item(backend, imap.links[j][1])
453 data_segments = set()
455 for i in inode.links:
457 data_segments.add(i[1][0:2])
458 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
461 print "Segment utilizations:"
464 for (s, u) in sorted(util.segments.items()):
465 for i in range(2): total_data[i] += u[i]
466 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
468 print "Would delete..."
475 self.updated_inodes = set()
477 print "%d bytes total / %d bytes used" % tuple(total_data)
478 print "would delete %d segments (%d bytes)" % tuple(deletions)
480 def mark_updated(self, inum):
481 self.updated_inodes.add(inum)
483 def write(self, backend, log):
484 updated_inodes = sorted(self.updated_inodes, reverse=True)
486 new_checkpoint = LogItem()
487 new_checkpoint.id = LogItem.random_id()
488 new_checkpoint.inum = 0
489 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
490 new_checkpoint.data = CHECKPOINT_MAGIC
491 new_checkpoint.links = []
493 new_checkpoint.data += struct.pack('<I', len(self.version_vector))
494 for d in sorted(self.version_vector):
495 new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
497 data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
498 for i in range(len(data) // 16):
499 (start, end) = struct.unpack_from("<QQ", data, 16*i)
501 new_checkpoint.data += data[16*i : 16*i + 16]
503 # Case 1: No inodes in this range of the old inode map have
504 # changed. Simply emit a new pointer to the same inode map block.
505 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
506 old_location = self.checkpoint_record.links[i][1][0:2]
507 if old_location not in self.obsolete_segments:
508 new_checkpoint.links.append(self.checkpoint_record.links[i])
511 # Case 2: Some inodes have been updated. Create a new inode map
512 # block, write it out, and point the new checkpoint at it.
513 inodes = [k for k in self.inodes if k >= start and k <= end]
517 block.id = LogItem.random_id()
519 block.type = ITEM_TYPE.INODE_MAP
523 block.data += struct.pack("<Q", j)
524 block.links.append((self.inodes[j].id, self.inodes[j].location))
527 new_checkpoint.links.append((block.id, block.location))
529 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
532 log.write(new_checkpoint, 2)
533 self.checkpoint_record = new_checkpoint
535 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
536 inode = inode_map.inodes[inum]
539 for l in inode.links:
540 data = load_item(backend, l[1])
543 inode.links = [(b.id, b.location) for b in blocks]
545 inode_map.mark_updated(inum)
547 def run_cleaner(backend, inode_map, log, repack_inodes=False):
548 # Determine which segments are poorly utilized and should be cleaned. We
549 # need better heuristics here.
550 for (s, u) in sorted(inode_map.util.segments.items()):
551 if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
552 print "Should clean segment", s
553 loc = backend.name_to_loc(s)
554 if s: inode_map.obsolete_segments.add(loc)
556 # TODO: We probably also want heuristics that will find inodes with
557 # badly-fragmented data and rewrite that to achieve better locality.
559 # Given that list of segments to clean, scan through those segments to find
560 # data which is still live and mark relevant inodes as needing to be
563 dirty_inodes = set(inode_map.inodes)
566 dirty_inode_data = set()
567 for s in inode_map.obsolete_segments:
568 filename = backend.loc_to_name(s)
569 print "Scanning", filename, "for live data"
570 for item in parse_log(backend.read(filename), filename):
571 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
573 inode = inode_map.inodes[item.inum]
574 if s == inode.location[0:2]:
575 dirty_inodes.add(item.inum)
576 if item.inum not in dirty_inode_data:
577 for b in inode.links:
579 dirty_inode_data.add(item.inum)
582 print "Inodes to rewrite:", dirty_inodes
583 print "Inodes with data to rewrite:", dirty_inode_data
584 for i in sorted(dirty_inodes.union(dirty_inode_data)):
585 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
587 if __name__ == '__main__':
588 start_time = time.time()
589 backend = S3Backend("mvrable-bluesky-west", cachedir="/export/cache")
590 #backend = FileBackend(".")
591 chkpt = load_checkpoint_record(backend)
594 imap.build(backend, chkpt)
597 log_dir = LogDirectory(backend, 1)
598 run_cleaner(backend, imap, log_dir)
599 print "Version vector:", imap.version_vector
600 imap.write(backend, log_dir)
602 end_time = time.time()
603 print "Cleaner running time:", end_time - start_time