projects
/
bluesky.git
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
1d1d743
)
Add retries to the S3 backend in the cleaner.
author
Michael Vrable
<mvrable@cs.ucsd.edu>
Mon, 18 Oct 2010 21:41:44 +0000
(14:41 -0700)
committer
Michael Vrable
<mvrable@cs.ucsd.edu>
Mon, 18 Oct 2010 21:41:44 +0000
(14:41 -0700)
cleaner/cleaner
patch
|
blob
|
history
diff --git
a/cleaner/cleaner
b/cleaner/cleaner
index
95bbb08
..
9103769
100755
(executable)
--- a/
cleaner/cleaner
+++ b/
cleaner/cleaner
@@
-67,15
+67,30
@@
class FileBackend:
m = re.match(r"^log-(\d+)-(\d+)$", name)
if m: return (int(m.group(1)), int(m.group(2)))
m = re.match(r"^log-(\d+)-(\d+)$", name)
if m: return (int(m.group(1)), int(m.group(2)))
+def retry_wrap(method):
+ def wrapped(self, *args, **kwargs):
+ for retries in range(3):
+ try:
+ return method(self, *args, **kwargs)
+ except:
+ print >>sys.stderr, "S3 operation failed, retrying..."
+ self.connect()
+ return method(self, *args, **kwargs)
+ return wrapped
+
class S3Backend:
"""An interface to BlueSky where the log segments are on in Amazon S3."""
def __init__(self, bucket, path='', cachedir="."):
class S3Backend:
"""An interface to BlueSky where the log segments are on in Amazon S3."""
def __init__(self, bucket, path='', cachedir="."):
- self.conn = boto.connect_s3(is_secure=False)
- self.bucket = self.conn.get_bucket(bucket)
+ self.bucket_name = bucket
self.path = path
self.cachedir = cachedir
self.cache = {}
self.path = path
self.cachedir = cachedir
self.cache = {}
+ self.connect()
+
+ def connect(self):
+ self.conn = boto.connect_s3(is_secure=False)
+ self.bucket = self.conn.get_bucket(self.bucket_name)
def list(self):
files = []
def list(self):
files = []
@@
-83,6
+98,7
@@
class S3Backend:
files.append((k.key, k.size))
return files
files.append((k.key, k.size))
return files
+ @retry_wrap
def read(self, filename, offset=0, length=None):
if filename in self.cache:
fp = open(os.path.join(self.cachedir, filename), 'rb')
def read(self, filename, offset=0, length=None):
if filename in self.cache:
fp = open(os.path.join(self.cachedir, filename), 'rb')
@@
-106,6
+122,7
@@
class S3Backend:
data = data[0:length]
return data
data = data[0:length]
return data
+ @retry_wrap
def write(self, filename, data):
k = Key(self.bucket)
k.key = self.path + filename
def write(self, filename, data):
k = Key(self.bucket)
k.key = self.path + filename
@@
-113,6
+130,7
@@
class S3Backend:
if filename in self.cache:
del self.cache[filename]
if filename in self.cache:
del self.cache[filename]
+ @retry_wrap
def delete(self, filename):
k = Key(self.bucket)
k.key = self.path + filename
def delete(self, filename):
k = Key(self.bucket)
k.key = self.path + filename