# Copyright (C) 2010 The Regents of the University of California
# Written by Michael Vrable <mvrable@cs.ucsd.edu>
-import base64, os, re, struct, sys
+import base64, os, re, struct, sys, time
import boto
from boto.s3.key import Key
HEADER_MAGIC2 = 'AgI=' # Encrypted data
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
+CHECKPOINT_MAGIC = struct.pack('<Q', 0x7ad7dafb42a498b4)
+
+# Log file to write benchmark data to
+benchlog = None
+def benchlog_write(msg, *args):
+ m = msg % args
+ print "LOG:", m
+ if benchlog is not None:
+ benchlog.write(msg % args)
+ benchlog.write("\n")
+
class ITEM_TYPE:
DATA = '1'
INODE = '2'
INODE_MAP = '3'
CHECKPOINT = '4'
-class FileBackend:
+class Backend:
+ """Base class for BlueSky storage backends."""
+
+ def loc_to_name(self, location):
+ return "log-%08d-%08d" % (location)
+
+ def name_to_loc(self, name):
+ m = re.match(r"^log-(\d+)-(\d+)$", name)
+ if m: return (int(m.group(1)), int(m.group(2)))
+
+ def dump_stats(self):
+ pass
+
+class FileBackend(Backend):
"""An interface to BlueSky where the log segments are on local disk.
This is mainly intended for testing purposes, as the real cleaner would
def __init__(self, path):
self.path = path
- def list(self):
+ def list(self, directory=0):
"""Return a listing of all log segments and their sizes."""
- files = [f for f in os.listdir(self.path) if f.startswith('log-')]
+ prefix = "log-%08d-" % (directory,)
+ files = [f for f in os.listdir(self.path) if f.startswith(prefix)]
files.sort()
return [(f, os.stat(os.path.join(self.path, f)).st_size)
fp = open(os.path.join(self.path, filename), 'rb')
if offset > 0:
fp.seek(offset)
- if legnth is None:
+ if length is None:
return fp.read()
else:
return fp.read(length)
def delete(self, filename):
os.unlink(os.path.join(self.path, filename))
- def loc_to_name(self, location):
- return "log-%08d-%08d" % (location)
-
- def name_to_loc(self, name):
- m = re.match(r"^log-(\d+)-(\d+)$", name)
- if m: return (int(m.group(1)), int(m.group(2)))
-
def retry_wrap(method):
def wrapped(self, *args, **kwargs):
for retries in range(3):
return method(self, *args, **kwargs)
except:
print >>sys.stderr, "S3 operation failed, retrying..."
+ print >>sys.stderr, " %s %s %s" % (method, args, kwargs)
self.connect()
+ time.sleep(1.0)
return method(self, *args, **kwargs)
return wrapped
-class S3Backend:
+class S3Backend(Backend):
"""An interface to BlueSky where the log segments are on in Amazon S3."""
def __init__(self, bucket, path='', cachedir="."):
self.path = path
self.cachedir = cachedir
self.cache = {}
+ for f in os.listdir(cachedir):
+ self.cache[f] = True
+ #print "Initial cache contents:", list(self.cache.keys())
self.connect()
+ self.stats_get = [0, 0]
+ self.stats_put = [0, 0]
def connect(self):
self.conn = boto.connect_s3(is_secure=False)
self.bucket = self.conn.get_bucket(self.bucket_name)
- def list(self):
+ def list(self, directory=0):
files = []
- for k in self.bucket.list(self.path + 'log-'):
+ prefix = "log-%08d-" % (directory,)
+ for k in self.bucket.list(self.path + prefix):
files.append((k.key, k.size))
return files
fp.write(data)
fp.close()
self.cache[filename] = True
+ self.stats_get[0] += 1
+ self.stats_get[1] += len(data)
if offset > 0:
data = data[offset:]
if length is not None:
k = Key(self.bucket)
k.key = self.path + filename
k.set_contents_from_string(data)
+ self.stats_put[0] += 1
+ self.stats_put[1] += len(data)
if filename in self.cache:
del self.cache[filename]
if filename in self.cache:
del self.cache[filename]
- def loc_to_name(self, location):
- return "log-%08d-%08d" % (location)
+ def dump_stats(self):
+ print "S3 statistics:"
+ print "GET: %d ops / %d bytes" % tuple(self.stats_get)
+ print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
+ benchlog_write("s3_get: %d", self.stats_get[1])
+ benchlog_write("s3_put: %d", self.stats_put[1])
- def name_to_loc(self, name):
- m = re.match(r"^log-(\d+)-(\d+)$", name)
- if m: return (int(m.group(1)), int(m.group(2)))
+class SimpleBackend(Backend):
+ """An interface to the simple BlueSky test network server."""
+
+ def __init__(self, server=('localhost', 12345), cachedir="."):
+ self.bucket_name = bucket
+ self.server_address = server
+ self.cachedir = cachedir
+ self.cache = {}
+
+ def _get_socket(self):
+ return socket.create_connection(self.server_address).makefile()
+
+ def list(self, directory=0):
+ files = []
+ prefix = "log-%08d-" % (directory,)
+ for k in self.bucket.list(self.path + prefix):
+ files.append((k.key, k.size))
+ return files
+
+ def read(self, filename, offset=0, length=None):
+ if filename in self.cache:
+ fp = open(os.path.join(self.cachedir, filename), 'rb')
+ if offset > 0:
+ fp.seek(offset)
+ if length is None:
+ return fp.read()
+ else:
+ return fp.read(length)
+ else:
+ f = self._get_socket()
+ f.write("GET %s %d %d\n" % (filename, 0, 0))
+ f.flush()
+ datalen = int(f.readline())
+ if datalen < 0:
+ raise RuntimeError
+ data = f.read(datalen)
+ fp = open(os.path.join(self.cachedir, filename), 'wb')
+ fp.write(data)
+ fp.close()
+ self.cache[filename] = True
+ if offset > 0:
+ data = data[offset:]
+ if length is not None:
+ data = data[0:length]
+ return data
+
+ def write(self, filename, data):
+ f = self._get_socket()
+ f.write("PUT %s %d %d\n" % (filename, len(data)))
+ f.write(data)
+ f.flush()
+ result = int(f.readline())
+ if filename in self.cache:
+ del self.cache[filename]
+
+ def delete(self, filename):
+ pass
class LogItem:
"""In-memory representation of a single item stored in a log file."""
self.backend = backend
self.dir_num = dir
self.seq_num = 0
- for logname in backend.list():
+ for logname in backend.list(dir):
+ #print "Old log file:", logname
loc = backend.name_to_loc(logname[0])
if loc is not None and loc[0] == dir:
self.seq_num = max(self.seq_num, loc[1] + 1)
def __init__(self, backend):
self.segments = {}
- for (segment, size) in backend.list():
+ for (segment, size) in backend.list(0) + backend.list(1):
self.segments[segment] = [size, 0]
def add_item(self, item):
if item is not None: yield item
offset += size
-def load_checkpoint_record(backend):
- for (log, size) in reversed(backend.list()):
+def load_checkpoint_record(backend, directory=0):
+ for (log, size) in reversed(backend.list(directory)):
for item in reversed(list(parse_log(backend.read(log), log))):
print item
if item.type == ITEM_TYPE.CHECKPOINT:
This will also build up information about segment utilization."""
+ self.version_vector = {}
self.checkpoint_record = checkpoint_record
util = UtilizationTracker(backend)
inodes = {}
self.obsolete_segments = set()
- print "Inode map:"
- for i in range(len(checkpoint_record.data) // 16):
- (start, end) = struct.unpack_from("<QQ", checkpoint_record.data, 16*i)
+ data = checkpoint_record.data
+ if not data.startswith(CHECKPOINT_MAGIC):
+ raise ValueError, "Invalid checkpoint record!"
+ data = data[len(CHECKPOINT_MAGIC):]
+ (vvlen,) = struct.unpack_from("<I", data, 0)
+ self.vvsize = 4 + 8*vvlen
+ for i in range(vvlen):
+ (v1, v2) = struct.unpack_from("<II", data, 4 + 8*i)
+ self.version_vector[v1] = v2
+ print self.version_vector
+ self.version_vector[checkpoint_record.location[0]] \
+ = checkpoint_record.location[1]
+ print self.version_vector
+
+ data = data[self.vvsize:]
+
+ #print "Inode map:"
+ for i in range(len(data) // 16):
+ (start, end) = struct.unpack_from("<QQ", data, 16*i)
imap = load_item(backend, checkpoint_record.links[i][1])
util.add_item(imap)
- print "[%d, %d]: %s" % (start, end, imap)
+ #print "[%d, %d]: %s" % (start, end, imap)
for j in range(len(imap.data) // 8):
(inum,) = struct.unpack_from("<Q", imap.data, 8*j)
inode = load_item(backend, imap.links[j][1])
util.add_item(inode)
for i in inode.links:
util.add_item(i[1])
- data_segments.add(i[1][0:2])
- print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
+ if i[1] is not None:
+ data_segments.add(i[1][0:2])
+ #print " %d: %s (%d data segments)" % (inum, inode, len(data_segments))
- print
+ #print
print "Segment utilizations:"
+ total_data = [0, 0]
+ deletions = [0, 0]
for (s, u) in sorted(util.segments.items()):
+ for i in range(2): total_data[i] += u[i]
print "%s: %s %s" % (s, u, float(u[1]) / u[0])
if u[1] == 0:
- print "Deleting..."
- backend.delete(s)
+ print "Would delete..."
+ (d, n) = backend.name_to_loc(s)
+ try:
+ if n < self.version_vector[d]:
+ backend.delete(s)
+ deletions[0] += 1
+ deletions[1] += u[0]
+ else:
+ print "Not deleting log file newer than checkpoint!"
+ except:
+ print "Error determining age of log segment, keeping"
self.inodes = inodes
self.util = util
self.updated_inodes = set()
+ print "%d bytes total / %d bytes used" % tuple(total_data)
+ print "would delete %d segments (%d bytes)" % tuple(deletions)
+ benchlog_write("bytes_used: %d", total_data[1])
+ benchlog_write("bytes_wasted: %d", total_data[0] - total_data[1])
+ benchlog_write("bytes_freed: %d", deletions[1])
+
def mark_updated(self, inum):
self.updated_inodes.add(inum)
new_checkpoint.id = LogItem.random_id()
new_checkpoint.inum = 0
new_checkpoint.type = ITEM_TYPE.CHECKPOINT
- new_checkpoint.data = ""
+ new_checkpoint.data = CHECKPOINT_MAGIC
new_checkpoint.links = []
- for i in range(len(self.checkpoint_record.data) // 16):
- (start, end) = struct.unpack_from("<QQ", self.checkpoint_record.data, 16*i)
+ new_checkpoint.data += struct.pack('<I', len(self.version_vector))
+ for d in sorted(self.version_vector):
+ new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
+
+ data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
+ for i in range(len(data) // 16):
+ (start, end) = struct.unpack_from("<QQ", data, 16*i)
- new_checkpoint.data += self.checkpoint_record.data[16*i : 16*i + 16]
+ new_checkpoint.data += data[16*i : 16*i + 16]
# Case 1: No inodes in this range of the old inode map have
# changed. Simply emit a new pointer to the same inode map block.
def rewrite_inode(backend, inode_map, inum, log, copy_data=True):
inode = inode_map.inodes[inum]
if copy_data:
- blocks = []
+ newlinks = []
for l in inode.links:
- data = load_item(backend, l[1])
- blocks.append(data)
- log.write(data, 0)
- inode.links = [(b.id, b.location) for b in blocks]
+ if l[1] is not None:
+ data = load_item(backend, l[1])
+ log.write(data, 0)
+ newlinks.append((data.id, data.location))
+ else:
+ newlinks.append(l)
+ inode.links = newlinks
log.write(inode, 1)
inode_map.mark_updated(inum)
# Determine which segments are poorly utilized and should be cleaned. We
# need better heuristics here.
for (s, u) in sorted(inode_map.util.segments.items()):
- if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
+ if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
print "Should clean segment", s
loc = backend.name_to_loc(s)
if s: inode_map.obsolete_segments.add(loc)
dirty_inode_data = set()
for s in inode_map.obsolete_segments:
filename = backend.loc_to_name(s)
- print "Scanning", filename, "for live data"
+ #print "Scanning", filename, "for live data"
for item in parse_log(backend.read(filename), filename):
if item.type in (ITEM_TYPE.DATA, ITEM_TYPE.INODE):
if item.inum != 0:
dirty_inodes.add(item.inum)
if item.inum not in dirty_inode_data:
for b in inode.links:
- if s == b[1][0:2]:
+ if b[1] is not None and s == b[1][0:2]:
dirty_inode_data.add(item.inum)
break
- print "Inodes to rewrite:", dirty_inodes
- print "Inodes with data to rewrite:", dirty_inode_data
+ #print "Inodes to rewrite:", dirty_inodes
+ #print "Inodes with data to rewrite:", dirty_inode_data
for i in sorted(dirty_inodes.union(dirty_inode_data)):
rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
if __name__ == '__main__':
- backend = S3Backend("mvrable-bluesky", cachedir=".")
+ benchlog = open('cleaner.log', 'a')
+ benchlog_write("*** START CLEANER RUN ***")
+ start_time = time.time()
+ backend = S3Backend("mvrable-bluesky-west", cachedir="/tmp/bluesky-cache")
+ #backend = FileBackend(".")
chkpt = load_checkpoint_record(backend)
- print backend.list()
+ #print backend.list()
+ log_dir = LogDirectory(backend, 1)
imap = InodeMap()
imap.build(backend, chkpt)
print chkpt
- log_dir = LogDirectory(backend, 0)
+ print "Version vector:", imap.version_vector
+ print "Last cleaner log file:", log_dir.seq_num - 1
+ if imap.version_vector.get(1, -1) != log_dir.seq_num - 1:
+ print "Proxy hasn't updated to latest cleaner segment yet!"
+ benchlog_write("waiting for proxy...")
+ sys.exit(0)
+
run_cleaner(backend, imap, log_dir)
+ print "Version vector:", imap.version_vector
imap.write(backend, log_dir)
log_dir.close_all()
+ end_time = time.time()
+ backend.dump_stats()
+ benchlog_write("running_time: %s", end_time - start_time)
+ benchlog_write("")