3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 import base64, os, re, struct, sys, time
13 from boto.s3.key import Key
15 # The BlueSky 'struct cloudlog_header' data type.
16 HEADER_FORMAT = '<4s48sb16sQIII'
17 HEADER_CRYPTBYTES = 48
18 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
19 HEADER_MAGIC2 = 'AgI=' # Encrypted data
20 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
22 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
31 """Base class for BlueSky storage backends."""
33 def loc_to_name(self, location):
34 return "log-%08d-%08d" % (location)
36 def name_to_loc(self, name):
37 m = re.match(r"^log-(\d+)-(\d+)$", name)
38 if m: return (int(m.group(1)), int(m.group(2)))
41 class FileBackend(Backend):
42 """An interface to BlueSky where the log segments are on local disk.
44 This is mainly intended for testing purposes, as the real cleaner would
45 operate where data is being stored in S3."""
47 def __init__(self, path):
50 def list(self, directory=0):
51 """Return a listing of all log segments and their sizes."""
53 prefix = "log-%08d-" % (directory,)
54 files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
57 return [(f, os.stat(os.path.join(self.path, f)).st_size)
60 def read(self, filename, offset=0, length=None):
61 fp = open(os.path.join(self.path, filename), 'rb')
67 return fp.read(length)
69 def write(self, filename, data):
70 fp = open(os.path.join(self.path, filename), 'wb')
74 def delete(self, filename):
75 os.unlink(os.path.join(self.path, filename))
77 def retry_wrap(method):
78 def wrapped(self, *args, **kwargs):
79 for retries in range(3):
81 return method(self, *args, **kwargs)
83 print >>sys.stderr, "S3 operation failed, retrying..."
86 return method(self, *args, **kwargs)
89 class S3Backend(Backend):
90 """An interface to BlueSky where the log segments are on in Amazon S3."""
92 def __init__(self, bucket, path='', cachedir="."):
93 self.bucket_name = bucket
95 self.cachedir = cachedir
100 self.conn = boto.connect_s3(is_secure=False)
101 self.bucket = self.conn.get_bucket(self.bucket_name)
103 def list(self, directory=0):
105 prefix = "log-%08d-" % (directory,)
106 for k in self.bucket.list(self.path + prefix):
107 files.append((k.key, k.size))
111 def read(self, filename, offset=0, length=None):
112 if filename in self.cache:
113 fp = open(os.path.join(self.cachedir, filename), 'rb')
119 return fp.read(length)
122 k.key = self.path + filename
123 data = k.get_contents_as_string()
124 fp = open(os.path.join(self.cachedir, filename), 'wb')
127 self.cache[filename] = True
130 if length is not None:
131 data = data[0:length]
135 def write(self, filename, data):
137 k.key = self.path + filename
138 k.set_contents_from_string(data)
139 if filename in self.cache:
140 del self.cache[filename]
143 def delete(self, filename):
145 k.key = self.path + filename
147 if filename in self.cache:
148 del self.cache[filename]
150 class SimpleBackend(Backend):
151 """An interface to the simple BlueSky test network server."""
153 def __init__(self, server=('localhost', 12345), cachedir="."):
154 self.bucket_name = bucket
155 self.server_address = server
156 self.cachedir = cachedir
159 def _get_socket(self):
160 return socket.create_connection(self.server_address).makefile()
162 def list(self, directory=0):
164 prefix = "log-%08d-" % (directory,)
165 for k in self.bucket.list(self.path + prefix):
166 files.append((k.key, k.size))
169 def read(self, filename, offset=0, length=None):
170 if filename in self.cache:
171 fp = open(os.path.join(self.cachedir, filename), 'rb')
177 return fp.read(length)
179 f = self._get_socket()
180 f.write("GET %s %d %d\n" % (filename, 0, 0))
182 datalen = int(f.readline())
185 data = f.read(datalen)
186 fp = open(os.path.join(self.cachedir, filename), 'wb')
189 self.cache[filename] = True
192 if length is not None:
193 data = data[0:length]
196 def write(self, filename, data):
197 f = self._get_socket()
198 f.write("PUT %s %d %d\n" % (filename, len(data)))
201 result = int(f.readline())
202 if filename in self.cache:
203 del self.cache[filename]
205 def delete(self, filename):
209 """In-memory representation of a single item stored in a log file."""
212 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
213 self.encrypted = False
216 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
220 return open('/dev/urandom').read(16)
225 for (i, l) in self.links:
228 link_locs.append(struct.pack('<IIII', *l))
229 link_ids = ''.join(link_ids)
230 link_locs = ''.join(link_locs)
233 magic = HEADER_MAGIC2
235 magic = HEADER_MAGIC1
236 header = struct.pack(HEADER_FORMAT,
237 magic, self.cryptkeys,
238 ord(self.type), self.id, self.inum,
239 len(self.data), len(link_ids), len(link_locs))
240 return header + self.data + link_ids + link_locs
243 def __init__(self, backend, location):
244 self.backend = backend
245 self.location = location
249 return sum(len(s) for s in self.data)
251 def write(self, item):
252 data = item.serialize()
254 self.data.append(data)
255 item.location = self.location + (offset, len(data))
258 data = ''.join(self.data)
259 filename = self.backend.loc_to_name(self.location)
260 print "Would write %d bytes of data to %s" % (len(data), filename)
261 self.backend.write(filename, data)
264 TARGET_SIZE = 4 << 20
266 def __init__(self, backend, dir):
267 self.backend = backend
270 for logname in backend.list(dir):
271 print "Old log file:", logname
272 loc = backend.name_to_loc(logname[0])
273 if loc is not None and loc[0] == dir:
274 self.seq_num = max(self.seq_num, loc[1] + 1)
276 print "Starting sequence number is", self.seq_num
278 def open_segment(self):
279 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
283 def write(self, item, segment_group=0):
284 if segment_group not in self.groups:
285 self.groups[segment_group] = self.open_segment()
286 seg = self.groups[segment_group]
288 if len(seg) >= LogDirectory.TARGET_SIZE:
290 del self.groups[segment_group]
293 for k in list(self.groups.keys()):
294 self.groups[k].close()
297 class UtilizationTracker:
298 """A simple object that tracks what fraction of each segment is used.
300 This data can be used to guide segment cleaning decisions."""
302 def __init__(self, backend):
304 for (segment, size) in backend.list(0) + backend.list(1):
305 self.segments[segment] = [size, 0]
307 def add_item(self, item):
308 if isinstance(item, LogItem):
310 if item is None: return
311 (dir, seq, offset, size) = item
312 filename = "log-%08d-%08d" % (dir, seq)
313 self.segments[filename][1] += size
315 def parse_item(data):
316 if len(data) < HEADER_SIZE: return
317 header = struct.unpack_from(HEADER_FORMAT, data, 0)
318 size = HEADER_SIZE + sum(header[5:8])
320 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
321 print "Bad header magic!"
324 if len(data) != size:
325 print "Item size does not match: %d != %d" % (size, len(data))
329 if header[0] == HEADER_MAGIC2: item.encrypted = True
330 item.cryptkeys = header[1]
332 item.inum = header[4]
334 item.type = chr(header[2])
336 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
338 link_ids = data[HEADER_SIZE + header[5]
339 : HEADER_SIZE + header[5] + header[6]]
340 link_locs = data[HEADER_SIZE + header[5] + header[6]
341 : HEADER_SIZE + sum(header[5:8])]
342 for i in range(len(link_ids) // 16):
343 id = link_ids[16*i : 16*i + 16]
347 loc = struct.unpack('<IIII', link_locs[0:16])
348 link_locs = link_locs[16:]
349 links.append((id, loc))
353 def load_item(backend, location):
354 """Load the cloud item pointed at by the 4-tuple 'location'.
356 The elements of the tuple are (directory, sequence, offset, size)."""
358 filename = backend.loc_to_name((location[0], location[1]))
359 data = backend.read(filename, location[2], location[3])
360 item = parse_item(data)
361 item.location = location
364 def parse_log(data, location=None):
365 """Parse contents of a log file, yielding a sequence of log items."""
367 if isinstance(location, str):
368 m = re.match(r"^log-(\d+)-(\d+)$", location)
370 location = (int(m.group(1)), int(m.group(2)))
375 while len(data) - offset >= HEADER_SIZE:
376 header = struct.unpack_from(HEADER_FORMAT, data, offset)
377 size = HEADER_SIZE + sum(header[5:8])
378 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
379 print "Bad header magic!"
381 if size + offset > len(data):
382 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
384 item = parse_item(data[offset : offset + size])
385 if location is not None:
386 item.location = location + (offset, size)
387 if item is not None: yield item
390 def load_checkpoint_record(backend, directory=0):
391 for (log, size) in reversed(backend.list(directory)):
392 for item in reversed(list(parse_log(backend.read(log), log))):
394 if item.type == ITEM_TYPE.CHECKPOINT:
401 def build(self, backend, checkpoint_record):
402 """Reconstruct the inode map from the checkpoint record given.
404 This will also build up information about segment utilization."""
406 self.version_vector = {}
407 self.checkpoint_record = checkpoint_record
409 util = UtilizationTracker(backend)
410 util.add_item(checkpoint_record)
412 self.obsolete_segments = set()
414 data = checkpoint_record.data
415 if not data.startswith(CHECKPOINT_MAGIC):
416 raise ValueError, "Invalid checkpoint record!"
417 data = data[len(CHECKPOINT_MAGIC):]
418 (vvlen,) = struct.unpack_from("<I", data, 0)
419 self.vvsize = 4 + 8*vvlen
420 for i in range(vvlen):
421 (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
422 self.version_vector[v1] = v2
423 print self.version_vector
424 self.version_vector[checkpoint_record.location[0]] \
425 = checkpoint_record.location[1]
426 print self.version_vector
428 data = data[self.vvsize:]
431 for i in range(len(data) // 16):
432 (start, end) = struct.unpack_from("<QQ", data, 16*i)
433 imap = load_item(backend, checkpoint_record.links[i][1])
435 print "[%d, %d]: %s" % (start, end, imap)
436 for j in range(len(imap.data) // 8):
437 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
438 inode = load_item(backend, imap.links[j][1])
440 data_segments = set()
442 for i in inode.links:
444 data_segments.add(i[1][0:2])
445 print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
448 print "Segment utilizations:"
449 for (s, u) in sorted(util.segments.items()):
450 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
452 # print "Deleting..."
458 self.updated_inodes = set()
460 def mark_updated(self, inum):
461 self.updated_inodes.add(inum)
463 def write(self, backend, log):
464 updated_inodes = sorted(self.updated_inodes, reverse=True)
466 new_checkpoint = LogItem()
467 new_checkpoint.id = LogItem.random_id()
468 new_checkpoint.inum = 0
469 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
470 new_checkpoint.data = CHECKPOINT_MAGIC
471 new_checkpoint.links = []
473 new_checkpoint.data += struct.pack('<I', len(self.version_vector))
474 for d in sorted(self.version_vector):
475 new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
477 data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
478 for i in range(len(data) // 16):
479 (start, end) = struct.unpack_from("<QQ", data, 16*i)
481 new_checkpoint.data += data[16*i : 16*i + 16]
483 # Case 1: No inodes in this range of the old inode map have
484 # changed. Simply emit a new pointer to the same inode map block.
485 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
486 old_location = self.checkpoint_record.links[i][1][0:2]
487 if old_location not in self.obsolete_segments:
488 new_checkpoint.links.append(self.checkpoint_record.links[i])
491 # Case 2: Some inodes have been updated. Create a new inode map
492 # block, write it out, and point the new checkpoint at it.
493 inodes = [k for k in self.inodes if k >= start and k <= end]
497 block.id = LogItem.random_id()
499 block.type = ITEM_TYPE.INODE_MAP
503 block.data += struct.pack("<Q", j)
504 block.links.append((self.inodes[j].id, self.inodes[j].location))
507 new_checkpoint.links.append((block.id, block.location))
509 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
512 log.write(new_checkpoint, 2)
513 self.checkpoint_record = new_checkpoint
515 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
516 inode = inode_map.inodes[inum]
519 for l in inode.links:
520 data = load_item(backend, l[1])
523 inode.links = [(b.id, b.location) for b in blocks]
525 inode_map.mark_updated(inum)
527 def run_cleaner(backend, inode_map, log, repack_inodes=False):
528 # Determine which segments are poorly utilized and should be cleaned. We
529 # need better heuristics here.
530 for (s, u) in sorted(inode_map.util.segments.items()):
531 if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
532 print "Should clean segment", s
533 loc = backend.name_to_loc(s)
534 if s: inode_map.obsolete_segments.add(loc)
536 # TODO: We probably also want heuristics that will find inodes with
537 # badly-fragmented data and rewrite that to achieve better locality.
539 # Given that list of segments to clean, scan through those segments to find
540 # data which is still live and mark relevant inodes as needing to be
543 dirty_inodes = set(inode_map.inodes)
546 dirty_inode_data = set()
547 for s in inode_map.obsolete_segments:
548 filename = backend.loc_to_name(s)
549 print "Scanning", filename, "for live data"
550 for item in parse_log(backend.read(filename), filename):
551 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
553 inode = inode_map.inodes[item.inum]
554 if s == inode.location[0:2]:
555 dirty_inodes.add(item.inum)
556 if item.inum not in dirty_inode_data:
557 for b in inode.links:
559 dirty_inode_data.add(item.inum)
562 print "Inodes to rewrite:", dirty_inodes
563 print "Inodes with data to rewrite:", dirty_inode_data
564 for i in sorted(dirty_inodes.union(dirty_inode_data)):
565 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
567 if __name__ == '__main__':
568 backend = S3Backend("mvrable-bluesky", cachedir=".")
569 #backend = FileBackend(".")
570 chkpt = load_checkpoint_record(backend)
573 imap.build(backend, chkpt)
576 log_dir = LogDirectory(backend, 1)
577 run_cleaner(backend, imap, log_dir)
578 print "Version vector:", imap.version_vector
579 imap.write(backend, log_dir)