# Copyright (C) 2010 The Regents of the University of California
# Written by Michael Vrable <mvrable@cs.ucsd.edu>
-import base64, os, re, struct, sys
+import base64, os, re, struct, sys, time
import boto
from boto.s3.key import Key
# The BlueSky 'struct cloudlog_header' data type.
HEADER_FORMAT = '<4s48sb16sQIII'
HEADER_CRYPTBYTES = 48
-HEADER_MAGIC = 'AgI-'
+HEADER_MAGIC1 = 'AgI-' # Unencrypted data
+HEADER_MAGIC2 = 'AgI=' # Encrypted data
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
class ITEM_TYPE:
m = re.match(r"^log-(\d+)-(\d+)$", name)
if m: return (int(m.group(1)), int(m.group(2)))
+def retry_wrap(method):
+ def wrapped(self, *args, **kwargs):
+ for retries in range(3):
+ try:
+ return method(self, *args, **kwargs)
+ except:
+ print >>sys.stderr, "S3 operation failed, retrying..."
+ self.connect()
+ time.sleep(1.0)
+ return method(self, *args, **kwargs)
+ return wrapped
+
class S3Backend:
"""An interface to BlueSky where the log segments are on in Amazon S3."""
def __init__(self, bucket, path='', cachedir="."):
- self.conn = boto.connect_s3(is_secure=False)
- self.bucket = self.conn.get_bucket(bucket)
+ self.bucket_name = bucket
self.path = path
self.cachedir = cachedir
self.cache = {}
+ self.connect()
+
+ def connect(self):
+ self.conn = boto.connect_s3(is_secure=False)
+ self.bucket = self.conn.get_bucket(self.bucket_name)
def list(self):
files = []
files.append((k.key, k.size))
return files
+ @retry_wrap
def read(self, filename, offset=0, length=None):
if filename in self.cache:
fp = open(os.path.join(self.cachedir, filename), 'rb')
data = data[0:length]
return data
+ @retry_wrap
def write(self, filename, data):
k = Key(self.bucket)
k.key = self.path + filename
if filename in self.cache:
del self.cache[filename]
+ @retry_wrap
def delete(self, filename):
k = Key(self.bucket)
k.key = self.path + filename
def __init__(self):
self.cryptkeys = '\0' * HEADER_CRYPTBYTES
+ self.encrypted = False
def __str__(self):
- return "<Item ty=%s location=%s size=%d id=%s...>" % (self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
+ return "<Item%s ty=%s location=%s size=%d id=%s...>" % (self.encrypted and '$' or '', self.type, self.location, self.size, base64.b16encode(self.id).lower()[0:8])
@staticmethod
def random_id():
link_ids = ''.join(link_ids)
link_locs = ''.join(link_locs)
+ if self.encrypted:
+ magic = HEADER_MAGIC2
+ else:
+ magic = HEADER_MAGIC1
header = struct.pack(HEADER_FORMAT,
- HEADER_MAGIC, self.cryptkeys,
+ magic, self.cryptkeys,
ord(self.type), self.id, self.inum,
len(self.data), len(link_ids), len(link_locs))
return header + self.data + link_ids + link_locs
header = struct.unpack_from(HEADER_FORMAT, data, 0)
size = HEADER_SIZE + sum(header[5:8])
- if header[0] != HEADER_MAGIC:
+ if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
print "Bad header magic!"
return
return
item = LogItem()
+ if header[0] == HEADER_MAGIC2: item.encrypted = True
item.cryptkeys = header[1]
item.id = header[3]
item.inum = header[4]
while len(data) - offset >= HEADER_SIZE:
header = struct.unpack_from(HEADER_FORMAT, data, offset)
size = HEADER_SIZE + sum(header[5:8])
- if header[0] != HEADER_MAGIC:
+ if header[0] not in (HEADER_MAGIC1, HEADER_MAGIC2):
print "Bad header magic!"
break
if size + offset > len(data):