3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys, time
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
19 HEADER_MAGIC2 = 'AgI=' # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
31 """Base class for BlueSky storage backends."""
33 def loc_to_name(self, location):
34 return "log-%08d-%08d" % (location)
36 def name_to_loc(self, name):
37 m = re.match(r"^log-(\d+)-(\d+)$", name)
38 if m: return (int(m.group(1)), int(m.group(2)))
43 class FileBackend(Backend):
44 """An interface to BlueSky where the log segments are on local disk.
46 This is mainly intended for testing purposes, as the real cleaner would
47 operate where data is being stored in S3."""
49 def __init__(self, path):
52 def list(self, directory=0):
53 """Return a listing of all log segments and their sizes."""
55 prefix = "log-%08d-" % (directory,)
56 files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
59 return [(f, os.stat(os.path.join(self.path, f)).st_size)
62 def read(self, filename, offset=0, length=None):
63 fp = open(os.path.join(self.path, filename), 'rb')
69 return fp.read(length)
71 def write(self, filename, data):
72 fp = open(os.path.join(self.path, filename), 'wb')
76 def delete(self, filename):
77 os.unlink(os.path.join(self.path, filename))
79 def retry_wrap(method):
80 def wrapped(self, *args, **kwargs):
81 for retries in range(3):
83 return method(self, *args, **kwargs)
85 print >>sys.stderr, "S3 operation failed, retrying..."
86 print >>sys.stderr, " %s %s %s" % (method, args, kwargs)
89 return method(self, *args, **kwargs)
92 class S3Backend(Backend):
93 """An interface to BlueSky where the log segments are on in Amazon S3."""
95 def __init__(self, bucket, path='', cachedir="."):
96 self.bucket_name = bucket
98 self.cachedir = cachedir
100 for f in os.listdir(cachedir):
102 #print "Initial cache contents:", list(self.cache.keys())
104 self.stats_get = [0, 0]
105 self.stats_put = [0, 0]
108 self.conn = boto.connect_s3(is_secure=False)
109 self.bucket = self.conn.get_bucket(self.bucket_name)
111 def list(self, directory=0):
113 prefix = "log-%08d-" % (directory,)
114 for k in self.bucket.list(self.path + prefix):
115 files.append((k.key, k.size))
119 def read(self, filename, offset=0, length=None):
120 if filename in self.cache:
121 fp = open(os.path.join(self.cachedir, filename), 'rb')
127 return fp.read(length)
130 k.key = self.path + filename
131 data = k.get_contents_as_string()
132 fp = open(os.path.join(self.cachedir, filename), 'wb')
135 self.cache[filename] = True
136 self.stats_get[0] += 1
137 self.stats_get[1] += len(data)
140 if length is not None:
141 data = data[0:length]
145 def write(self, filename, data):
147 k.key = self.path + filename
148 k.set_contents_from_string(data)
149 self.stats_put[0] += 1
150 self.stats_put[1] += len(data)
151 if filename in self.cache:
152 del self.cache[filename]
155 def delete(self, filename):
157 k.key = self.path + filename
159 if filename in self.cache:
160 del self.cache[filename]
162 def dump_stats(self):
163 print "S3 statistics:"
164 print "GET: %d ops / %d bytes" % tuple(self.stats_get)
165 print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
167 class SimpleBackend(Backend):
168 """An interface to the simple BlueSky test network server."""
170 def __init__(self, server=('localhost', 12345), cachedir="."):
171 self.bucket_name = bucket
172 self.server_address = server
173 self.cachedir = cachedir
176 def _get_socket(self):
177 return socket.create_connection(self.server_address).makefile()
179 def list(self, directory=0):
181 prefix = "log-%08d-" % (directory,)
182 for k in self.bucket.list(self.path + prefix):
183 files.append((k.key, k.size))
186 def read(self, filename, offset=0, length=None):
187 if filename in self.cache:
188 fp = open(os.path.join(self.cachedir, filename), 'rb')
194 return fp.read(length)
196 f = self._get_socket()
197 f.write("GET %s %d %d\n" % (filename, 0, 0))
199 datalen = int(f.readline())
202 data = f.read(datalen)
203 fp = open(os.path.join(self.cachedir, filename), 'wb')
206 self.cache[filename] = True
209 if length is not None:
210 data = data[0:length]
213 def write(self, filename, data):
214 f = self._get_socket()
215 f.write("PUT %s %d %d\n" % (filename, len(data)))
218 result = int(f.readline())
219 if filename in self.cache:
220 del self.cache[filename]
222 def delete(self, filename):
226 """In-memory representation of a single item stored in a log file."""
229 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
230 self.encrypted = False
233 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
237 return open('/dev/urandom').read(16)
242 for (i, l) in self.links:
245 link_locs.append(struct.pack('<IIII', *l))
246 link_ids = ''.join(link_ids)
247 link_locs = ''.join(link_locs)
250 magic = HEADER_MAGIC2
252 magic = HEADER_MAGIC1
253 header = struct.pack(HEADER_FORMAT,
254 magic, self.cryptkeys,
255 ord(self.type), self.id, self.inum,
256 len(self.data), len(link_ids), len(link_locs))
257 return header + self.data + link_ids + link_locs
260 def __init__(self, backend, location):
261 self.backend = backend
262 self.location = location
266 return sum(len(s) for s in self.data)
268 def write(self, item):
269 data = item.serialize()
271 self.data.append(data)
272 item.location = self.location + (offset, len(data))
275 data = ''.join(self.data)
276 filename = self.backend.loc_to_name(self.location)
277 print "Would write %d bytes of data to %s" % (len(data), filename)
278 self.backend.write(filename, data)
281 TARGET_SIZE = 4 << 20
283 def __init__(self, backend, dir):
284 self.backend = backend
287 for logname in backend.list(dir):
288 #print "Old log file:", logname
289 loc = backend.name_to_loc(logname[0])
290 if loc is not None and loc[0] == dir:
291 self.seq_num = max(self.seq_num, loc[1] + 1)
293 print "Starting sequence number is", self.seq_num
295 def open_segment(self):
296 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
300 def write(self, item, segment_group=0):
301 if segment_group not in self.groups:
302 self.groups[segment_group] = self.open_segment()
303 seg = self.groups[segment_group]
305 if len(seg) >= LogDirectory.TARGET_SIZE:
307 del self.groups[segment_group]
310 for k in list(self.groups.keys()):
311 self.groups[k].close()
314 class UtilizationTracker:
315 """A simple object that tracks what fraction of each segment is used.
317 This data can be used to guide segment cleaning decisions."""
319 def __init__(self, backend):
321 for (segment, size) in backend.list(0) + backend.list(1):
322 self.segments[segment] = [size, 0]
324 def add_item(self, item):
325 if isinstance(item, LogItem):
327 if item is None: return
328 (dir, seq, offset, size) = item
329 filename = "log-%08d-%08d" % (dir, seq)
330 self.segments[filename][1] += size
332 def parse_item(data):
333 if len(data) < HEADER_SIZE: return
334 header = struct.unpack_from(HEADER_FORMAT, data, 0)
335 size = HEADER_SIZE + sum(header[5:8])
337 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
338 print "Bad header magic!"
341 if len(data) != size:
342 print "Item size does not match: %d != %d" % (size, len(data))
346 if header[0] == HEADER_MAGIC2: item.encrypted = True
347 item.cryptkeys = header[1]
349 item.inum = header[4]
351 item.type = chr(header[2])
353 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
355 link_ids = data[HEADER_SIZE + header[5]
356 : HEADER_SIZE + header[5] + header[6]]
357 link_locs = data[HEADER_SIZE + header[5] + header[6]
358 : HEADER_SIZE + sum(header[5:8])]
359 for i in range(len(link_ids) // 16):
360 id = link_ids[16*i : 16*i + 16]
364 loc = struct.unpack('<IIII', link_locs[0:16])
365 link_locs = link_locs[16:]
366 links.append((id, loc))
370 def load_item(backend, location):
371 """Load the cloud item pointed at by the 4-tuple 'location'.
373 The elements of the tuple are (directory, sequence, offset, size)."""
375 filename = backend.loc_to_name((location[0], location[1]))
376 data = backend.read(filename, location[2], location[3])
377 item = parse_item(data)
378 item.location = location
381 def parse_log(data, location=None):
382 """Parse contents of a log file, yielding a sequence of log items."""
384 if isinstance(location, str):
385 m = re.match(r"^log-(\d+)-(\d+)$", location)
387 location = (int(m.group(1)), int(m.group(2)))
392 while len(data) - offset >= HEADER_SIZE:
393 header = struct.unpack_from(HEADER_FORMAT, data, offset)
394 size = HEADER_SIZE + sum(header[5:8])
395 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
396 print "Bad header magic!"
398 if size + offset > len(data):
399 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
401 item = parse_item(data[offset : offset + size])
402 if location is not None:
403 item.location = location + (offset, size)
404 if item is not None: yield item
407 def load_checkpoint_record(backend, directory=0):
408 for (log, size) in reversed(backend.list(directory)):
409 for item in reversed(list(parse_log(backend.read(log), log))):
411 if item.type == ITEM_TYPE.CHECKPOINT:
418 def build(self, backend, checkpoint_record):
419 """Reconstruct the inode map from the checkpoint record given.
421 This will also build up information about segment utilization."""
423 self.version_vector = {}
424 self.checkpoint_record = checkpoint_record
426 util = UtilizationTracker(backend)
427 util.add_item(checkpoint_record)
429 self.obsolete_segments = set()
431 data = checkpoint_record.data
432 if not data.startswith(CHECKPOINT_MAGIC):
433 raise ValueError, "Invalid checkpoint record!"
434 data = data[len(CHECKPOINT_MAGIC):]
435 (vvlen,) = struct.unpack_from("<I", data, 0)
436 self.vvsize = 4 + 8*vvlen
437 for i in range(vvlen):
438 (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
439 self.version_vector[v1] = v2
440 print self.version_vector
441 self.version_vector[checkpoint_record.location[0]] \
442 = checkpoint_record.location[1]
443 print self.version_vector
445 data = data[self.vvsize:]
448 for i in range(len(data) // 16):
449 (start, end) = struct.unpack_from("<QQ", data, 16*i)
450 imap = load_item(backend, checkpoint_record.links[i][1])
452 #print "[%d, %d]: %s" % (start, end, imap)
453 for j in range(len(imap.data) // 8):
454 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
455 inode = load_item(backend, imap.links[j][1])
457 data_segments = set()
459 for i in inode.links:
462 data_segments.add(i[1][0:2])
463 #print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
466 print "Segment utilizations:"
469 for (s, u) in sorted(util.segments.items()):
470 for i in range(2): total_data[i] += u[i]
471 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
473 print "Would delete..."
474 (d, n) = backend.name_to_loc(s)
476 if n < self.version_vector[d]:
481 print "Not deleting log file newer than checkpoint!"
483 print "Error determining age of log segment, keeping"
487 self.updated_inodes = set()
489 print "%d bytes total / %d bytes used" % tuple(total_data)
490 print "would delete %d segments (%d bytes)" % tuple(deletions)
492 def mark_updated(self, inum):
493 self.updated_inodes.add(inum)
495 def write(self, backend, log):
496 updated_inodes = sorted(self.updated_inodes, reverse=True)
498 new_checkpoint = LogItem()
499 new_checkpoint.id = LogItem.random_id()
500 new_checkpoint.inum = 0
501 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
502 new_checkpoint.data = CHECKPOINT_MAGIC
503 new_checkpoint.links = []
505 new_checkpoint.data += struct.pack('<I', len(self.version_vector))
506 for d in sorted(self.version_vector):
507 new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
509 data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
510 for i in range(len(data) // 16):
511 (start, end) = struct.unpack_from("<QQ", data, 16*i)
513 new_checkpoint.data += data[16*i : 16*i + 16]
515 # Case 1: No inodes in this range of the old inode map have
516 # changed. Simply emit a new pointer to the same inode map block.
517 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
518 old_location = self.checkpoint_record.links[i][1][0:2]
519 if old_location not in self.obsolete_segments:
520 new_checkpoint.links.append(self.checkpoint_record.links[i])
523 # Case 2: Some inodes have been updated. Create a new inode map
524 # block, write it out, and point the new checkpoint at it.
525 inodes = [k for k in self.inodes if k >= start and k <= end]
529 block.id = LogItem.random_id()
531 block.type = ITEM_TYPE.INODE_MAP
535 block.data += struct.pack("<Q", j)
536 block.links.append((self.inodes[j].id, self.inodes[j].location))
539 new_checkpoint.links.append((block.id, block.location))
541 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
544 log.write(new_checkpoint, 2)
545 self.checkpoint_record = new_checkpoint
547 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
548 inode = inode_map.inodes[inum]
551 for l in inode.links:
553 data = load_item(backend, l[1])
555 newlinks.append((data.id, data.location))
558 inode.links = newlinks
560 inode_map.mark_updated(inum)
562 def run_cleaner(backend, inode_map, log, repack_inodes=False):
563 # Determine which segments are poorly utilized and should be cleaned. We
564 # need better heuristics here.
565 for (s, u) in sorted(inode_map.util.segments.items()):
566 if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
567 print "Should clean segment", s
568 loc = backend.name_to_loc(s)
569 if s: inode_map.obsolete_segments.add(loc)
571 # TODO: We probably also want heuristics that will find inodes with
572 # badly-fragmented data and rewrite that to achieve better locality.
574 # Given that list of segments to clean, scan through those segments to find
575 # data which is still live and mark relevant inodes as needing to be
578 dirty_inodes = set(inode_map.inodes)
581 dirty_inode_data = set()
582 for s in inode_map.obsolete_segments:
583 filename = backend.loc_to_name(s)
584 #print "Scanning", filename, "for live data"
585 for item in parse_log(backend.read(filename), filename):
586 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
588 inode = inode_map.inodes[item.inum]
589 if s == inode.location[0:2]:
590 dirty_inodes.add(item.inum)
591 if item.inum not in dirty_inode_data:
592 for b in inode.links:
593 if b[1] is not None and s == b[1][0:2]:
594 dirty_inode_data.add(item.inum)
597 #print "Inodes to rewrite:", dirty_inodes
598 #print "Inodes with data to rewrite:", dirty_inode_data
599 for i in sorted(dirty_inodes.union(dirty_inode_data)):
600 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
602 if __name__ == '__main__':
603 start_time = time.time()
604 backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
605 #backend = FileBackend(".")
606 chkpt = load_checkpoint_record(backend)
607 #print backend.list()
608 log_dir = LogDirectory(backend, 1)
610 imap.build(backend, chkpt)
613 run_cleaner(backend, imap, log_dir)
614 print "Version vector:", imap.version_vector
615 imap.write(backend, log_dir)
617 end_time = time.time()
618 print "Cleaner running time:", end_time - start_time