INODE_MAP = '3'
CHECKPOINT = '4'
-class FileBackend:
+class Backend:
+ """Base class for BlueSky storage backends."""
+
+ def loc_to_name(self, location):
+ return "log-%08d-%08d" % (location)
+
+ def name_to_loc(self, name):
+ m = re.match(r"^log-(\d+)-(\d+)$", name)
+ if m: return (int(m.group(1)), int(m.group(2)))
+
+
+class FileBackend(Backend):
"""An interface to BlueSky where the log segments are on local disk.
This is mainly intended for testing purposes, as the real cleaner would
def delete(self, filename):
os.unlink(os.path.join(self.path, filename))
- def loc_to_name(self, location):
- return "log-%08d-%08d" % (location)
-
- def name_to_loc(self, name):
- m = re.match(r"^log-(\d+)-(\d+)$", name)
- if m: return (int(m.group(1)), int(m.group(2)))
-
def retry_wrap(method):
def wrapped(self, *args, **kwargs):
for retries in range(3):
return method(self, *args, **kwargs)
return wrapped
-class S3Backend:
+class S3Backend(Backend):
"""An interface to BlueSky where the log segments are on in Amazon S3."""
def __init__(self, bucket, path='', cachedir="."):
if filename in self.cache:
del self.cache[filename]
- def loc_to_name(self, location):
- return "log-%08d-%08d" % (location)
+class SimpleBackend(Backend):
+ """An interface to the simple BlueSky test network server."""
- def name_to_loc(self, name):
- m = re.match(r"^log-(\d+)-(\d+)$", name)
- if m: return (int(m.group(1)), int(m.group(2)))
+ def __init__(self, server=('localhost', 12345), cachedir="."):
+ self.bucket_name = bucket
+ self.server_address = server
+ self.cachedir = cachedir
+ self.cache = {}
+
+ def _get_socket(self):
+ return socket.create_connection(self.server_address).makefile()
+
+ def list(self, directory=0):
+ files = []
+ prefix = "log-%08d-" % (directory,)
+ for k in self.bucket.list(self.path + prefix):
+ files.append((k.key, k.size))
+ return files
+
+ def read(self, filename, offset=0, length=None):
+ if filename in self.cache:
+ fp = open(os.path.join(self.cachedir, filename), 'rb')
+ if offset > 0:
+ fp.seek(offset)
+ if length is None:
+ return fp.read()
+ else:
+ return fp.read(length)
+ else:
+ f = self._get_socket()
+ f.write("GET %s %d %d\n" % (filename, 0, 0))
+ f.flush()
+ datalen = int(f.readline())
+ if datalen < 0:
+ raise RuntimeError
+ data = f.read(datalen)
+ fp = open(os.path.join(self.cachedir, filename), 'wb')
+ fp.write(data)
+ fp.close()
+ self.cache[filename] = True
+ if offset > 0:
+ data = data[offset:]
+ if length is not None:
+ data = data[0:length]
+ return data
+
+ def write(self, filename, data):
+ f = self._get_socket()
+ f.write("PUT %s %d %d\n" % (filename, len(data)))
+ f.write(data)
+ f.flush()
+ result = int(f.readline())
+ if filename in self.cache:
+ del self.cache[filename]
+
+ def delete(self, filename):
+ pass
class LogItem:
"""In-memory representation of a single item stored in a log file."""
for d in sorted(self.version_vector):
new_checkpoint.data += struct.pack('<II', d, self.version_vector[d])
- data = self.checkpoint_record.data[self.vvsize:]
+ data = self.checkpoint_record.data[self.vvsize + len(CHECKPOINT_MAGIC):]
for i in range(len(data) // 16):
(start, end) = struct.unpack_from("<QQ", data, 16*i)
# Determine which segments are poorly utilized and should be cleaned. We
# need better heuristics here.
for (s, u) in sorted(inode_map.util.segments.items()):
- if (float(u[1]) / u[0] < 0.6 or u[1] < 32768) and u[1] > 0:
+ if (float(u[1]) / u[0] < 0.6) and u[1] > 0:
print "Should clean segment", s
loc = backend.name_to_loc(s)
if s: inode_map.obsolete_segments.add(loc)
rewrite_inode(backend, inode_map, i, log, i in dirty_inode_data)
if __name__ == '__main__':
- #backend = S3Backend("mvrable-bluesky", cachedir=".")
- backend = FileBackend(".")
+ backend = S3Backend("mvrable-bluesky", cachedir=".")
+ #backend = FileBackend(".")
chkpt = load_checkpoint_record(backend)
print backend.list()
imap = InodeMap()