Add retries to the S3 backend in the cleaner.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 18 Oct 2010 21:41:44 +0000 (14:41 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Mon, 18 Oct 2010 21:41:44 +0000 (14:41 -0700)
cleaner/cleaner

index 95bbb08..9103769 100755 (executable)
@@ -67,15 +67,30 @@ class FileBackend:
         m = re.match(r"^log-(\d+)-(\d+)$", name)
         if m: return (int(m.group(1)), int(m.group(2)))
 
+def retry_wrap(method):
+    def wrapped(self, *args, **kwargs):
+        for retries in range(3):
+            try:
+                return method(self, *args, **kwargs)
+            except:
+                print >>sys.stderr, "S3 operation failed, retrying..."
+                self.connect()
+        return method(self, *args, **kwargs)
+    return wrapped
+
 class S3Backend:
     """An interface to BlueSky where the log segments are on in Amazon S3."""
 
     def __init__(self, bucket, path='', cachedir="."):
-        self.conn = boto.connect_s3(is_secure=False)
-        self.bucket = self.conn.get_bucket(bucket)
+        self.bucket_name = bucket
         self.path = path
         self.cachedir = cachedir
         self.cache = {}
+        self.connect()
+
+    def connect(self):
+        self.conn = boto.connect_s3(is_secure=False)
+        self.bucket = self.conn.get_bucket(self.bucket_name)
 
     def list(self):
         files = []
@@ -83,6 +98,7 @@ class S3Backend:
             files.append((k.key, k.size))
         return files
 
+    @retry_wrap
     def read(self, filename, offset=0, length=None):
         if filename in self.cache:
             fp = open(os.path.join(self.cachedir, filename), 'rb')
@@ -106,6 +122,7 @@ class S3Backend:
                 data = data[0:length]
             return data
 
+    @retry_wrap
     def write(self, filename, data):
         k = Key(self.bucket)
         k.key = self.path + filename
@@ -113,6 +130,7 @@ class S3Backend:
         if filename in self.cache:
             del self.cache[filename]
 
+    @retry_wrap
     def delete(self, filename):
         k = Key(self.bucket)
         k.key = self.path + filename