+def retry_wrap(method):
+ def wrapped(self, *args, **kwargs):
+ for retries in range(3):
+ try:
+ return method(self, *args, **kwargs)
+ except:
+ print >>sys.stderr, "S3 operation failed, retrying..."
+ self.connect()
+ time.sleep(1.0)
+ return method(self, *args, **kwargs)
+ return wrapped
+
+class S3Backend(Backend):
+ """An interface to BlueSky where the log segments are on in Amazon S3."""
+
+ def __init__(self, bucket, path='', cachedir="."):
+ self.bucket_name = bucket
+ self.path = path
+ self.cachedir = cachedir
+ self.cache = {}
+ self.connect()
+
+ def connect(self):
+ self.conn = boto.connect_s3(is_secure=False)
+ self.bucket = self.conn.get_bucket(self.bucket_name)
+
+ def list(self, directory=0):
+ files = []
+ prefix = "log-%08d-" % (directory,)
+ for k in self.bucket.list(self.path + prefix):
+ files.append((k.key, k.size))
+ return files
+
+ @retry_wrap
+ def read(self, filename, offset=0, length=None):
+ if filename in self.cache:
+ fp = open(os.path.join(self.cachedir, filename), 'rb')
+ if offset > 0:
+ fp.seek(offset)
+ if length is None:
+ return fp.read()
+ else:
+ return fp.read(length)
+ else:
+ k = Key(self.bucket)
+ k.key = self.path + filename
+ data = k.get_contents_as_string()
+ fp = open(os.path.join(self.cachedir, filename), 'wb')
+ fp.write(data)
+ fp.close()
+ self.cache[filename] = True
+ if offset > 0:
+ data = data[offset:]
+ if length is not None:
+ data = data[0:length]
+ return data
+
+ @retry_wrap
+ def write(self, filename, data):
+ k = Key(self.bucket)
+ k.key = self.path + filename
+ k.set_contents_from_string(data)
+ if filename in self.cache:
+ del self.cache[filename]