3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys, time
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
19 HEADER_MAGIC2 = 'AgI=' # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
24 # Log file to write benchmark data to
26 def benchlog_write(msg, *args):
29 if benchlog is not None:
30 benchlog.write(msg % args)
40 """Base class for BlueSky storage backends."""
42 def loc_to_name(self, location):
43 return "log-%08d-%08d" % (location)
45 def name_to_loc(self, name):
46 m = re.match(r"^log-(\d+)-(\d+)$", name)
47 if m: return (int(m.group(1)), int(m.group(2)))
52 class FileBackend(Backend):
53 """An interface to BlueSky where the log segments are on local disk.
55 This is mainly intended for testing purposes, as the real cleaner would
56 operate where data is being stored in S3."""
58 def __init__(self, path):
61 def list(self, directory=0):
62 """Return a listing of all log segments and their sizes."""
64 prefix = "log-%08d-" % (directory,)
65 files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
68 return [(f, os.stat(os.path.join(self.path, f)).st_size)
71 def read(self, filename, offset=0, length=None):
72 fp = open(os.path.join(self.path, filename), 'rb')
78 return fp.read(length)
80 def write(self, filename, data):
81 fp = open(os.path.join(self.path, filename), 'wb')
85 def delete(self, filename):
86 os.unlink(os.path.join(self.path, filename))
88 def retry_wrap(method):
89 def wrapped(self, *args, **kwargs):
90 for retries in range(3):
92 return method(self, *args, **kwargs)
94 print >>sys.stderr, "S3 operation failed, retrying..."
95 print >>sys.stderr, " %s %s %s" % (method, args, kwargs)
98 return method(self, *args, **kwargs)
101 class S3Backend(Backend):
102 """An interface to BlueSky where the log segments are on in Amazon S3."""
104 def __init__(self, bucket, path='', cachedir="."):
105 self.bucket_name = bucket
107 self.cachedir = cachedir
109 for f in os.listdir(cachedir):
111 #print "Initial cache contents:", list(self.cache.keys())
113 self.stats_get = [0, 0]
114 self.stats_put = [0, 0]
117 self.conn = boto.connect_s3(is_secure=False)
118 self.bucket = self.conn.get_bucket(self.bucket_name)
120 def list(self, directory=0):
122 prefix = "log-%08d-" % (directory,)
123 for k in self.bucket.list(self.path + prefix):
124 files.append((k.key, k.size))
128 def read(self, filename, offset=0, length=None):
129 if filename in self.cache:
130 fp = open(os.path.join(self.cachedir, filename), 'rb')
136 return fp.read(length)
139 k.key = self.path + filename
140 data = k.get_contents_as_string()
141 fp = open(os.path.join(self.cachedir, filename), 'wb')
144 self.cache[filename] = True
145 self.stats_get[0] += 1
146 self.stats_get[1] += len(data)
149 if length is not None:
150 data = data[0:length]
154 def write(self, filename, data):
156 k.key = self.path + filename
157 k.set_contents_from_string(data)
158 self.stats_put[0] += 1
159 self.stats_put[1] += len(data)
160 if filename in self.cache:
161 del self.cache[filename]
164 def delete(self, filename):
166 k.key = self.path + filename
168 if filename in self.cache:
169 del self.cache[filename]
171 def dump_stats(self):
172 print "S3 statistics:"
173 print "GET: %d ops / %d bytes" % tuple(self.stats_get)
174 print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
175 benchlog_write("s3_get: %d", self.stats_get[1])
176 benchlog_write("s3_put: %d", self.stats_put[1])
178 class SimpleBackend(Backend):
179 """An interface to the simple BlueSky test network server."""
181 def __init__(self, server=('localhost', 12345), cachedir="."):
182 self.bucket_name = bucket
183 self.server_address = server
184 self.cachedir = cachedir
187 def _get_socket(self):
188 return socket.create_connection(self.server_address).makefile()
190 def list(self, directory=0):
192 prefix = "log-%08d-" % (directory,)
193 for k in self.bucket.list(self.path + prefix):
194 files.append((k.key, k.size))
197 def read(self, filename, offset=0, length=None):
198 if filename in self.cache:
199 fp = open(os.path.join(self.cachedir, filename), 'rb')
205 return fp.read(length)
207 f = self._get_socket()
208 f.write("GET %s %d %d\n" % (filename, 0, 0))
210 datalen = int(f.readline())
213 data = f.read(datalen)
214 fp = open(os.path.join(self.cachedir, filename), 'wb')
217 self.cache[filename] = True
220 if length is not None:
221 data = data[0:length]
224 def write(self, filename, data):
225 f = self._get_socket()
226 f.write("PUT %s %d %d\n" % (filename, len(data)))
229 result = int(f.readline())
230 if filename in self.cache:
231 del self.cache[filename]
233 def delete(self, filename):
237 """In-memory representation of a single item stored in a log file."""
240 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
241 self.encrypted = False
244 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
248 return open('/dev/urandom').read(16)
253 for (i, l) in self.links:
256 link_locs.append(struct.pack('<IIII', *l))
257 link_ids = ''.join(link_ids)
258 link_locs = ''.join(link_locs)
261 magic = HEADER_MAGIC2
263 magic = HEADER_MAGIC1
264 header = struct.pack(HEADER_FORMAT,
265 magic, self.cryptkeys,
266 ord(self.type), self.id, self.inum,
267 len(self.data), len(link_ids), len(link_locs))
268 return header + self.data + link_ids + link_locs
271 def __init__(self, backend, location):
272 self.backend = backend
273 self.location = location
277 return sum(len(s) for s in self.data)
279 def write(self, item):
280 data = item.serialize()
282 self.data.append(data)
283 item.location = self.location + (offset, len(data))
286 data = ''.join(self.data)
287 filename = self.backend.loc_to_name(self.location)
288 print "Would write %d bytes of data to %s" % (len(data), filename)
289 self.backend.write(filename, data)
292 TARGET_SIZE = 4 << 20
294 def __init__(self, backend, dir):
295 self.backend = backend
298 for logname in backend.list(dir):
299 #print "Old log file:", logname
300 loc = backend.name_to_loc(logname[0])
301 if loc is not None and loc[0] == dir:
302 self.seq_num = max(self.seq_num, loc[1] + 1)
304 print "Starting sequence number is", self.seq_num
306 def open_segment(self):
307 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
311 def write(self, item, segment_group=0):
312 if segment_group not in self.groups:
313 self.groups[segment_group] = self.open_segment()
314 seg = self.groups[segment_group]
316 if len(seg) >= LogDirectory.TARGET_SIZE:
318 del self.groups[segment_group]
321 for k in list(self.groups.keys()):
322 self.groups[k].close()
325 class UtilizationTracker:
326 """A simple object that tracks what fraction of each segment is used.
328 This data can be used to guide segment cleaning decisions."""
330 def __init__(self, backend):
332 for (segment, size) in backend.list(0) + backend.list(1):
333 self.segments[segment] = [size, 0]
335 def add_item(self, item):
336 if isinstance(item, LogItem):
338 if item is None: return
339 (dir, seq, offset, size) = item
340 filename = "log-%08d-%08d" % (dir, seq)
341 self.segments[filename][1] += size
343 def parse_item(data):
344 if len(data) < HEADER_SIZE: return
345 header = struct.unpack_from(HEADER_FORMAT, data, 0)
346 size = HEADER_SIZE + sum(header[5:8])
348 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
349 print "Bad header magic!"
352 if len(data) != size:
353 print "Item size does not match: %d != %d" % (size, len(data))
357 if header[0] == HEADER_MAGIC2: item.encrypted = True
358 item.cryptkeys = header[1]
360 item.inum = header[4]
362 item.type = chr(header[2])
364 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
366 link_ids = data[HEADER_SIZE + header[5]
367 : HEADER_SIZE + header[5] + header[6]]
368 link_locs = data[HEADER_SIZE + header[5] + header[6]
369 : HEADER_SIZE + sum(header[5:8])]
370 for i in range(len(link_ids) // 16):
371 id = link_ids[16*i : 16*i + 16]
375 loc = struct.unpack('<IIII', link_locs[0:16])
376 link_locs = link_locs[16:]
377 links.append((id, loc))
381 def load_item(backend, location):
382 """Load the cloud item pointed at by the 4-tuple 'location'.
384 The elements of the tuple are (directory, sequence, offset, size)."""
386 filename = backend.loc_to_name((location[0], location[1]))
387 data = backend.read(filename, location[2], location[3])
388 item = parse_item(data)
389 item.location = location
392 def parse_log(data, location=None):
393 """Parse contents of a log file, yielding a sequence of log items."""
395 if isinstance(location, str):
396 m = re.match(r"^log-(\d+)-(\d+)$", location)
398 location = (int(m.group(1)), int(m.group(2)))
403 while len(data) - offset >= HEADER_SIZE:
404 header = struct.unpack_from(HEADER_FORMAT, data, offset)
405 size = HEADER_SIZE + sum(header[5:8])
406 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
407 print "Bad header magic!"
409 if size + offset > len(data):
410 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
412 item = parse_item(data[offset : offset + size])
413 if location is not None:
414 item.location = location + (offset, size)
415 if item is not None: yield item
418 def load_checkpoint_record(backend, directory=0):
419 for (log, size) in reversed(backend.list(directory)):
420 for item in reversed(list(parse_log(backend.read(log), log))):
422 if item.type == ITEM_TYPE.CHECKPOINT:
429 def build(self, backend, checkpoint_record):
430 """Reconstruct the inode map from the checkpoint record given.
432 This will also build up information about segment utilization."""
434 self.version_vector = {}
435 self.checkpoint_record = checkpoint_record
437 util = UtilizationTracker(backend)
438 util.add_item(checkpoint_record)
440 self.obsolete_segments = set()
442 data = checkpoint_record.data
443 if not data.startswith(CHECKPOINT_MAGIC):
444 raise ValueError, "Invalid checkpoint record!"
445 data = data[len(CHECKPOINT_MAGIC):]
446 (vvlen,) = struct.unpack_from("<I", data, 0)
447 self.vvsize = 4 + 8*vvlen
448 for i in range(vvlen):
449 (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
450 self.version_vector[v1] = v2
451 print self.version_vector
452 self.version_vector[checkpoint_record.location[0]] \
453 = checkpoint_record.location[1]
454 print self.version_vector
456 data = data[self.vvsize:]
459 for i in range(len(data) // 16):
460 (start, end) = struct.unpack_from("<QQ", data, 16*i)
461 imap = load_item(backend, checkpoint_record.links[i][1])
463 #print "[%d, %d]: %s" % (start, end, imap)
464 for j in range(len(imap.data) // 8):
465 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
466 inode = load_item(backend, imap.links[j][1])
468 data_segments = set()
470 for i in inode.links:
473 data_segments.add(i[1][0:2])
474 #print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
477 print "Segment utilizations:"
480 for (s, u) in sorted(util.segments.items()):
481 for i in range(2): total_data[i] += u[i]
482 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
484 print "Would delete..."
485 (d, n) = backend.name_to_loc(s)
487 if n < self.version_vector[d]:
492 print "Not deleting log file newer than checkpoint!"
494 print "Error determining age of log segment, keeping"
498 self.updated_inodes = set()
500 print "%d bytes total / %d bytes used" % tuple(total_data)
501 print "would delete %d segments (%d bytes)" % tuple(deletions)
502 benchlog_write("bytes_used: %d", total_data[1])
503 benchlog_write("bytes_wasted: %d", total_data[0] - total_data[1])
504 benchlog_write("bytes_freed: %d", deletions[1])
506 def mark_updated(self, inum):
507 self.updated_inodes.add(inum)
509 def write(self, backend, log):
510 updated_inodes = sorted(self.updated_inodes, reverse=True)
512 new_checkpoint = LogItem()
513 new_checkpoint.id = LogItem.random_id()
514 new_checkpoint.inum = 0
515 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
516 new_checkpoint.data = CHECKPOINT_MAGIC
517 new_checkpoint.links = []
519 new_checkpoint.data += struct.pack('<I', len(self.version_vector))
520 for d in sorted(self.version_vector):
521 new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
523 data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
524 for i in range(len(data) // 16):
525 (start, end) = struct.unpack_from("<QQ", data, 16*i)
527 new_checkpoint.data += data[16*i : 16*i + 16]
529 # Case 1: No inodes in this range of the old inode map have
530 # changed. Simply emit a new pointer to the same inode map block.
531 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
532 old_location = self.checkpoint_record.links[i][1][0:2]
533 if old_location not in self.obsolete_segments:
534 new_checkpoint.links.append(self.checkpoint_record.links[i])
537 # Case 2: Some inodes have been updated. Create a new inode map
538 # block, write it out, and point the new checkpoint at it.
539 inodes = [k for k in self.inodes if k >= start and k <= end]
543 block.id = LogItem.random_id()
545 block.type = ITEM_TYPE.INODE_MAP
549 block.data += struct.pack("<Q", j)
550 block.links.append((self.inodes[j].id, self.inodes[j].location))
553 new_checkpoint.links.append((block.id, block.location))
555 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
558 log.write(new_checkpoint, 2)
559 self.checkpoint_record = new_checkpoint
561 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
562 inode = inode_map.inodes[inum]
565 for l in inode.links:
567 data = load_item(backend, l[1])
569 newlinks.append((data.id, data.location))
572 inode.links = newlinks
574 inode_map.mark_updated(inum)
576 def run_cleaner(backend, inode_map, log, repack_inodes=False):
577 # Determine which segments are poorly utilized and should be cleaned. We
578 # need better heuristics here.
579 for (s, u) in sorted(inode_map.util.segments.items()):
580 if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
581 print "Should clean segment", s
582 loc = backend.name_to_loc(s)
583 if s: inode_map.obsolete_segments.add(loc)
585 # TODO: We probably also want heuristics that will find inodes with
586 # badly-fragmented data and rewrite that to achieve better locality.
588 # Given that list of segments to clean, scan through those segments to find
589 # data which is still live and mark relevant inodes as needing to be
592 dirty_inodes = set(inode_map.inodes)
595 dirty_inode_data = set()
596 for s in inode_map.obsolete_segments:
597 filename = backend.loc_to_name(s)
598 #print "Scanning", filename, "for live data"
599 for item in parse_log(backend.read(filename), filename):
600 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
602 inode = inode_map.inodes[item.inum]
603 if s == inode.location[0:2]:
604 dirty_inodes.add(item.inum)
605 if item.inum not in dirty_inode_data:
606 for b in inode.links:
607 if b[1] is not None and s == b[1][0:2]:
608 dirty_inode_data.add(item.inum)
611 #print "Inodes to rewrite:", dirty_inodes
612 #print "Inodes with data to rewrite:", dirty_inode_data
613 for i in sorted(dirty_inodes.union(dirty_inode_data)):
614 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
616 if __name__ == '__main__':
617 benchlog = open('cleaner.log', 'a')
618 benchlog_write("*** START CLEANER RUN ***")
619 start_time = time.time()
620 backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
621 #backend = FileBackend(".")
622 chkpt = load_checkpoint_record(backend)
623 #print backend.list()
624 log_dir = LogDirectory(backend, 1)
626 imap.build(backend, chkpt)
629 print "Version vector:", imap.version_vector
630 print "Last cleaner log file:", log_dir.seq_num - 1
631 if imap.version_vector.get(1, -1) != log_dir.seq_num - 1:
632 print "Proxy hasn't updated to latest cleaner segment yet!"
633 benchlog_write("waiting for proxy...")
636 run_cleaner(backend, imap, log_dir)
637 print "Version vector:", imap.version_vector
638 imap.write(backend, log_dir)
640 end_time = time.time()
642 benchlog_write("running_time: %s", end_time - start_time)