+ def delete(self, filename):
+ os.unlink(os.path.join(self.path, filename))
+
+def retry_wrap(method):
+ def wrapped(self, *args, **kwargs):
+ for retries in range(3):
+ try:
+ return method(self, *args, **kwargs)
+ except:
+ print >>sys.stderr, "S3 operation failed, retrying..."
+ self.connect()
+ time.sleep(1.0)
+ return method(self, *args, **kwargs)
+ return wrapped
+
+class S3Backend(Backend):
+ """An interface to BlueSky where the log segments are on in Amazon S3."""
+
+ def __init__(self, bucket, path='', cachedir="."):
+ self.bucket_name = bucket
+ self.path = path
+ self.cachedir = cachedir
+ self.cache = {}
+ self.connect()
+ self.stats_get = [0, 0]
+ self.stats_put = [0, 0]
+
+ def connect(self):
+ self.conn = boto.connect_s3(is_secure=False)
+ self.bucket = self.conn.get_bucket(self.bucket_name)
+
+ def list(self, directory=0):
+ files = []
+ prefix = "log-%08d-" % (directory,)
+ for k in self.bucket.list(self.path + prefix):
+ files.append((k.key, k.size))
+ return files
+
+ @retry_wrap
+ def read(self, filename, offset=0, length=None):
+ if filename in self.cache:
+ fp = open(os.path.join(self.cachedir, filename), 'rb')
+ if offset > 0:
+ fp.seek(offset)
+ if length is None:
+ return fp.read()
+ else:
+ return fp.read(length)
+ else:
+ k = Key(self.bucket)
+ k.key = self.path + filename
+ data = k.get_contents_as_string()
+ fp = open(os.path.join(self.cachedir, filename), 'wb')
+ fp.write(data)
+ fp.close()
+ self.cache[filename] = True
+ self.stats_get[0] += 1
+ self.stats_get[1] += len(data)
+ if offset > 0:
+ data = data[offset:]
+ if length is not None:
+ data = data[0:length]
+ return data
+
+ @retry_wrap
+ def write(self, filename, data):
+ k = Key(self.bucket)
+ k.key = self.path + filename
+ k.set_contents_from_string(data)
+ self.stats_put[0] += 1
+ self.stats_put[1] += len(data)
+ if filename in self.cache:
+ del self.cache[filename]
+
+ @retry_wrap
+ def delete(self, filename):
+ k = Key(self.bucket)
+ k.key = self.path + filename
+ k.delete()
+ if filename in self.cache:
+ del self.cache[filename]
+
+ def dump_stats(self):
+ print "S3 statistics:"
+ print "GET: %d ops / %d bytes" % tuple(self.stats_get)
+ print "PUT: %d ops / %d bytes" % tuple(self.stats_put)
+
+class SimpleBackend(Backend):
+ """An interface to the simple BlueSky test network server."""
+
+ def __init__(self, server=('localhost', 12345), cachedir="."):
+ self.bucket_name = bucket
+ self.server_address = server
+ self.cachedir = cachedir
+ self.cache = {}
+
+ def _get_socket(self):
+ return socket.create_connection(self.server_address).makefile()
+
+ def list(self, directory=0):
+ files = []
+ prefix = "log-%08d-" % (directory,)
+ for k in self.bucket.list(self.path + prefix):
+ files.append((k.key, k.size))
+ return files
+
+ def read(self, filename, offset=0, length=None):
+ if filename in self.cache:
+ fp = open(os.path.join(self.cachedir, filename), 'rb')
+ if offset > 0:
+ fp.seek(offset)
+ if length is None:
+ return fp.read()
+ else:
+ return fp.read(length)
+ else:
+ f = self._get_socket()
+ f.write("GET %s %d %d\n" % (filename, 0, 0))
+ f.flush()
+ datalen = int(f.readline())
+ if datalen < 0:
+ raise RuntimeError
+ data = f.read(datalen)
+ fp = open(os.path.join(self.cachedir, filename), 'wb')
+ fp.write(data)
+ fp.close()
+ self.cache[filename] = True
+ if offset > 0:
+ data = data[offset:]
+ if length is not None:
+ data = data[0:length]
+ return data
+
+ def write(self, filename, data):
+ f = self._get_socket()
+ f.write("PUT %s %d %d\n" % (filename, len(data)))
+ f.write(data)
+ f.flush()
+ result = int(f.readline())
+ if filename in self.cache:
+ del self.cache[filename]
+
+ def delete(self, filename):
+ pass
+