3 # A simple cleaner for the BlueSky cloud file system. At the moment this is an
4 # offline cleaner--the file system must not be in use at the time that the
5 # cleaning is performed. Later, it will be extended to be an online/concurrent
6 # cleaner, where cleaning is performed even while the file system is mounted.
8 # Copyright (C) 2010 The Regents of the University of California
9 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
11 # Redistribution and use in source and binary forms, with or without
12 # modification, are permitted provided that the following conditions
14 # 1. Redistributions of source code must retain the above copyright
15 # notice, this list of conditions and the following disclaimer.
16 # 2. Redistributions in binary form must reproduce the above copyright
17 # notice, this list of conditions and the following disclaimer in the
18 # documentation and/or other materials provided with the distribution.
19 # 3. Neither the name of the University nor the names of its contributors
20 # may be used to endorse or promote products derived from this software
21 # without specific prior written permission.
23 # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
24 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26 # ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
27 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
28 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
29 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
30 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
31 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
32 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 import base64, os, re, struct, sys, time
37 from boto.s3.key import Key
39 # The BlueSky 'struct cloudlog_header' data type.
40 HEADER_FORMAT = '<4s48sb16sQIII'
41 HEADER_CRYPTBYTES = 48
42 HEADER_MAGIC1 = 'AgI-' # Unencrypted data
43 HEADER_MAGIC2 = 'AgI=' # Encrypted data
44 HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
46 CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
48 # Log file to write benchmark data to
50 def benchlog_write(msg, *args):
53 if benchlog is not None:
54 benchlog.write(msg % args)
64 """Base class for BlueSky storage backends."""
66 def loc_to_name(self, location):
67 return "log-%08d-%08d" % (location)
69 def name_to_loc(self, name):
70 m = re.match(r"^log-(\d+)-(\d+)$", name)
71 if m: return (int(m.group(1)), int(m.group(2)))
76 class FileBackend(Backend):
77 """An interface to BlueSky where the log segments are on local disk.
79 This is mainly intended for testing purposes, as the real cleaner would
80 operate where data is being stored in S3."""
82 def __init__(self, path):
85 def list(self, directory=0):
86 """Return a listing of all log segments and their sizes."""
88 prefix = "log-%08d-" % (directory,)
89 files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
92 return [(f, os.stat(os.path.join(self.path, f)).st_size)
95 def read(self, filename, offset=0, length=None):
96 fp = open(os.path.join(self.path, filename), 'rb')
102 return fp.read(length)
104 def write(self, filename, data):
105 fp = open(os.path.join(self.path, filename), 'wb')
109 def delete(self, filename):
110 os.unlink(os.path.join(self.path, filename))
112 def retry_wrap(method):
113 def wrapped(self, *args, **kwargs):
114 for retries in range(3):
116 return method(self, *args, **kwargs)
118 print >>sys.stderr, "S3 operation failed, retrying..."
119 print >>sys.stderr, " %s %s %s" % (method, args, kwargs)
122 return method(self, *args, **kwargs)
125 class S3Backend(Backend):
126 """An interface to BlueSky where the log segments are on in Amazon S3."""
128 def __init__(self, bucket, path='', cachedir="."):
129 self.bucket_name = bucket
131 self.cachedir = cachedir
133 for f in os.listdir(cachedir):
135 #print "Initial cache contents:", list(self.cache.keys())
137 self.stats_get = [0, 0]
138 self.stats_put = [0, 0]
141 self.conn = boto.connect_s3(is_secure=False)
142 self.bucket = self.conn.get_bucket(self.bucket_name)
144 def list(self, directory=0):
146 prefix = "log-%08d-" % (directory,)
147 for k in self.bucket.list(self.path + prefix):
148 files.append((k.key, k.size))
152 def read(self, filename, offset=0, length=None):
153 if filename in self.cache:
154 fp = open(os.path.join(self.cachedir, filename), 'rb')
160 return fp.read(length)
163 k.key = self.path + filename
164 data = k.get_contents_as_string()
165 fp = open(os.path.join(self.cachedir, filename), 'wb')
168 self.cache[filename] = True
169 self.stats_get[0] += 1
170 self.stats_get[1] += len(data)
173 if length is not None:
174 data = data[0:length]
178 def write(self, filename, data):
180 k.key = self.path + filename
181 k.set_contents_from_string(data)
182 self.stats_put[0] += 1
183 self.stats_put[1] += len(data)
184 if filename in self.cache:
185 del self.cache[filename]
188 def delete(self, filename):
190 k.key = self.path + filename
192 if filename in self.cache:
193 del self.cache[filename]
195 def dump_stats(self):
196 print "S3 statistics:"
197 print "GET: %d ops / %d bytes" % tuple(self.stats_get)
198 print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
199 benchlog_write("s3_get: %d", self.stats_get[1])
200 benchlog_write("s3_put: %d", self.stats_put[1])
202 class SimpleBackend(Backend):
203 """An interface to the simple BlueSky test network server."""
205 def __init__(self, server=('localhost', 12345), cachedir="."):
206 self.bucket_name = bucket
207 self.server_address = server
208 self.cachedir = cachedir
211 def _get_socket(self):
212 return socket.create_connection(self.server_address).makefile()
214 def list(self, directory=0):
216 prefix = "log-%08d-" % (directory,)
217 for k in self.bucket.list(self.path + prefix):
218 files.append((k.key, k.size))
221 def read(self, filename, offset=0, length=None):
222 if filename in self.cache:
223 fp = open(os.path.join(self.cachedir, filename), 'rb')
229 return fp.read(length)
231 f = self._get_socket()
232 f.write("GET %s %d %d\n" % (filename, 0, 0))
234 datalen = int(f.readline())
237 data = f.read(datalen)
238 fp = open(os.path.join(self.cachedir, filename), 'wb')
241 self.cache[filename] = True
244 if length is not None:
245 data = data[0:length]
248 def write(self, filename, data):
249 f = self._get_socket()
250 f.write("PUT %s %d %d\n" % (filename, len(data)))
253 result = int(f.readline())
254 if filename in self.cache:
255 del self.cache[filename]
257 def delete(self, filename):
261 """In-memory representation of a single item stored in a log file."""
264 self.cryptkeys = '\0' * HEADER_CRYPTBYTES
265 self.encrypted = False
268 return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
272 return open('/dev/urandom').read(16)
277 for (i, l) in self.links:
280 link_locs.append(struct.pack('<IIII', *l))
281 link_ids = ''.join(link_ids)
282 link_locs = ''.join(link_locs)
285 magic = HEADER_MAGIC2
287 magic = HEADER_MAGIC1
288 header = struct.pack(HEADER_FORMAT,
289 magic, self.cryptkeys,
290 ord(self.type), self.id, self.inum,
291 len(self.data), len(link_ids), len(link_locs))
292 return header + self.data + link_ids + link_locs
295 def __init__(self, backend, location):
296 self.backend = backend
297 self.location = location
301 return sum(len(s) for s in self.data)
303 def write(self, item):
304 data = item.serialize()
306 self.data.append(data)
307 item.location = self.location + (offset, len(data))
310 data = ''.join(self.data)
311 filename = self.backend.loc_to_name(self.location)
312 print "Would write %d bytes of data to %s" % (len(data), filename)
313 self.backend.write(filename, data)
316 TARGET_SIZE = 4 << 20
318 def __init__(self, backend, dir):
319 self.backend = backend
322 for logname in backend.list(dir):
323 #print "Old log file:", logname
324 loc = backend.name_to_loc(logname[0])
325 if loc is not None and loc[0] == dir:
326 self.seq_num = max(self.seq_num, loc[1] + 1)
328 print "Starting sequence number is", self.seq_num
330 def open_segment(self):
331 seg = LogSegment(self.backend, (self.dir_num, self.seq_num))
335 def write(self, item, segment_group=0):
336 if segment_group not in self.groups:
337 self.groups[segment_group] = self.open_segment()
338 seg = self.groups[segment_group]
340 if len(seg) >= LogDirectory.TARGET_SIZE:
342 del self.groups[segment_group]
345 for k in list(self.groups.keys()):
346 self.groups[k].close()
349 class UtilizationTracker:
350 """A simple object that tracks what fraction of each segment is used.
352 This data can be used to guide segment cleaning decisions."""
354 def __init__(self, backend):
356 for (segment, size) in backend.list(0) + backend.list(1):
357 self.segments[segment] = [size, 0]
359 def add_item(self, item):
360 if isinstance(item, LogItem):
362 if item is None: return
363 (dir, seq, offset, size) = item
364 filename = "log-%08d-%08d" % (dir, seq)
365 self.segments[filename][1] += size
367 def parse_item(data):
368 if len(data) < HEADER_SIZE: return
369 header = struct.unpack_from(HEADER_FORMAT, data, 0)
370 size = HEADER_SIZE + sum(header[5:8])
372 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
373 print "Bad header magic!"
376 if len(data) != size:
377 print "Item size does not match: %d != %d" % (size, len(data))
381 if header[0] == HEADER_MAGIC2: item.encrypted = True
382 item.cryptkeys = header[1]
384 item.inum = header[4]
386 item.type = chr(header[2])
388 item.data = data[HEADER_SIZE : HEADER_SIZE + header[5]]
390 link_ids = data[HEADER_SIZE + header[5]
391 : HEADER_SIZE + header[5] + header[6]]
392 link_locs = data[HEADER_SIZE + header[5] + header[6]
393 : HEADER_SIZE + sum(header[5:8])]
394 for i in range(len(link_ids) // 16):
395 id = link_ids[16*i : 16*i + 16]
399 loc = struct.unpack('<IIII', link_locs[0:16])
400 link_locs = link_locs[16:]
401 links.append((id, loc))
405 def load_item(backend, location):
406 """Load the cloud item pointed at by the 4-tuple 'location'.
408 The elements of the tuple are (directory, sequence, offset, size)."""
410 filename = backend.loc_to_name((location[0], location[1]))
411 data = backend.read(filename, location[2], location[3])
412 item = parse_item(data)
413 item.location = location
416 def parse_log(data, location=None):
417 """Parse contents of a log file, yielding a sequence of log items."""
419 if isinstance(location, str):
420 m = re.match(r"^log-(\d+)-(\d+)$", location)
422 location = (int(m.group(1)), int(m.group(2)))
427 while len(data) - offset >= HEADER_SIZE:
428 header = struct.unpack_from(HEADER_FORMAT, data, offset)
429 size = HEADER_SIZE + sum(header[5:8])
430 if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
431 print "Bad header magic!"
433 if size + offset > len(data):
434 print "Short data record at end of log: %s < %s" % (len(data) - offset, size)
436 item = parse_item(data[offset : offset + size])
437 if location is not None:
438 item.location = location + (offset, size)
439 if item is not None: yield item
442 def load_checkpoint_record(backend, directory=0):
443 for (log, size) in reversed(backend.list(directory)):
444 for item in reversed(list(parse_log(backend.read(log), log))):
446 if item.type == ITEM_TYPE.CHECKPOINT:
453 def build(self, backend, checkpoint_record):
454 """Reconstruct the inode map from the checkpoint record given.
456 This will also build up information about segment utilization."""
458 self.version_vector = {}
459 self.checkpoint_record = checkpoint_record
461 util = UtilizationTracker(backend)
462 util.add_item(checkpoint_record)
464 self.obsolete_segments = set()
466 data = checkpoint_record.data
467 if not data.startswith(CHECKPOINT_MAGIC):
468 raise ValueError, "Invalid checkpoint record!"
469 data = data[len(CHECKPOINT_MAGIC):]
470 (vvlen,) = struct.unpack_from("<I", data, 0)
471 self.vvsize = 4 + 8*vvlen
472 for i in range(vvlen):
473 (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
474 self.version_vector[v1] = v2
475 print self.version_vector
476 self.version_vector[checkpoint_record.location[0]] \
477 = checkpoint_record.location[1]
478 print self.version_vector
480 data = data[self.vvsize:]
483 for i in range(len(data) // 16):
484 (start, end) = struct.unpack_from("<QQ", data, 16*i)
485 imap = load_item(backend, checkpoint_record.links[i][1])
487 #print "[%d, %d]: %s" % (start, end, imap)
488 for j in range(len(imap.data) // 8):
489 (inum,) = struct.unpack_from("<Q", imap.data, 8*j)
490 inode = load_item(backend, imap.links[j][1])
492 data_segments = set()
494 for i in inode.links:
497 data_segments.add(i[1][0:2])
498 #print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
501 print "Segment utilizations:"
504 for (s, u) in sorted(util.segments.items()):
505 for i in range(2): total_data[i] += u[i]
506 print "%s: %s %s" % (s, u, float(u[1]) / u[0])
508 print "Would delete..."
509 (d, n) = backend.name_to_loc(s)
511 if n < self.version_vector[d]:
516 print "Not deleting log file newer than checkpoint!"
518 print "Error determining age of log segment, keeping"
522 self.updated_inodes = set()
524 print "%d bytes total / %d bytes used" % tuple(total_data)
525 print "would delete %d segments (%d bytes)" % tuple(deletions)
526 benchlog_write("bytes_used: %d", total_data[1])
527 benchlog_write("bytes_wasted: %d", total_data[0] - total_data[1])
528 benchlog_write("bytes_freed: %d", deletions[1])
530 def mark_updated(self, inum):
531 self.updated_inodes.add(inum)
533 def write(self, backend, log):
534 updated_inodes = sorted(self.updated_inodes, reverse=True)
536 new_checkpoint = LogItem()
537 new_checkpoint.id = LogItem.random_id()
538 new_checkpoint.inum = 0
539 new_checkpoint.type = ITEM_TYPE.CHECKPOINT
540 new_checkpoint.data = CHECKPOINT_MAGIC
541 new_checkpoint.links = []
543 new_checkpoint.data += struct.pack('<I', len(self.version_vector))
544 for d in sorted(self.version_vector):
545 new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
547 data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
548 for i in range(len(data) // 16):
549 (start, end) = struct.unpack_from("<QQ", data, 16*i)
551 new_checkpoint.data += data[16*i : 16*i + 16]
553 # Case 1: No inodes in this range of the old inode map have
554 # changed. Simply emit a new pointer to the same inode map block.
555 if len(updated_inodes) == 0 or updated_inodes[-1] > end:
556 old_location = self.checkpoint_record.links[i][1][0:2]
557 if old_location not in self.obsolete_segments:
558 new_checkpoint.links.append(self.checkpoint_record.links[i])
561 # Case 2: Some inodes have been updated. Create a new inode map
562 # block, write it out, and point the new checkpoint at it.
563 inodes = [k for k in self.inodes if k >= start and k <= end]
567 block.id = LogItem.random_id()
569 block.type = ITEM_TYPE.INODE_MAP
573 block.data += struct.pack("<Q", j)
574 block.links.append((self.inodes[j].id, self.inodes[j].location))
577 new_checkpoint.links.append((block.id, block.location))
579 while len(updated_inodes) > 0 and updated_inodes[-1] <= end:
582 log.write(new_checkpoint, 2)
583 self.checkpoint_record = new_checkpoint
585 def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
586 inode = inode_map.inodes[inum]
589 for l in inode.links:
591 data = load_item(backend, l[1])
593 newlinks.append((data.id, data.location))
596 inode.links = newlinks
598 inode_map.mark_updated(inum)
600 def run_cleaner(backend, inode_map, log, repack_inodes=False):
601 # Determine which segments are poorly utilized and should be cleaned. We
602 # need better heuristics here.
603 for (s, u) in sorted(inode_map.util.segments.items()):
604 if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
605 print "Should clean segment", s
606 loc = backend.name_to_loc(s)
607 if s: inode_map.obsolete_segments.add(loc)
609 # TODO: We probably also want heuristics that will find inodes with
610 # badly-fragmented data and rewrite that to achieve better locality.
612 # Given that list of segments to clean, scan through those segments to find
613 # data which is still live and mark relevant inodes as needing to be
616 dirty_inodes = set(inode_map.inodes)
619 dirty_inode_data = set()
620 for s in inode_map.obsolete_segments:
621 filename = backend.loc_to_name(s)
622 #print "Scanning", filename, "for live data"
623 for item in parse_log(backend.read(filename), filename):
624 if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
626 inode = inode_map.inodes[item.inum]
627 if s == inode.location[0:2]:
628 dirty_inodes.add(item.inum)
629 if item.inum not in dirty_inode_data:
630 for b in inode.links:
631 if b[1] is not None and s == b[1][0:2]:
632 dirty_inode_data.add(item.inum)
635 #print "Inodes to rewrite:", dirty_inodes
636 #print "Inodes with data to rewrite:", dirty_inode_data
637 for i in sorted(dirty_inodes.union(dirty_inode_data)):
638 rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
640 if __name__ == '__main__':
641 benchlog = open('cleaner.log', 'a')
642 benchlog_write("*** START CLEANER RUN ***")
643 start_time = time.time()
644 backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
645 #backend = FileBackend(".")
646 chkpt = load_checkpoint_record(backend)
647 #print backend.list()
648 log_dir = LogDirectory(backend, 1)
650 imap.build(backend, chkpt)
653 print "Version vector:", imap.version_vector
654 print "Last cleaner log file:", log_dir.seq_num - 1
655 if imap.version_vector.get(1, -1) != log_dir.seq_num - 1:
656 print "Proxy hasn't updated to latest cleaner segment yet!"
657 benchlog_write("waiting for proxy...")
660 run_cleaner(backend, imap, log_dir)
661 print "Version vector:", imap.version_vector
662 imap.write(backend, log_dir)
664 end_time = time.time()
666 benchlog_write("running_time: %s", end_time - start_time)