import boto, time
from boto.s3.connection import SubdomainCallingFormat
from boto.s3.key import Key
-import sys, threading, time
+import sys, threading, time, Queue
import azure
-BUCKET_NAME = 'mvrable-benchmark-west'
-SIZES = [64, 4096, 32 << 10, 256 << 10, 1 << 20, 4 << 20, 32 << 20]
+BUCKET_NAME = 'mvrable-benchmark'
+SIZES = [(1 << s) for s in range(12, 23)]
class S3TestConnection:
def __init__(self):
k = Key(self.bucket, name)
start_time = time.time()
k.set_contents_from_string(buf)
- print "%s: %f" % (name, time.time() - start_time)
+ #print "%s: %f" % (name, time.time() - start_time)
def get_object(self, name):
k = Key(self.bucket, name)
start_time = time.time()
buf = k.get_contents_as_string()
- print "%s: %f" % (name, time.time() - start_time)
+ duration = time.time() - start_time
+ #print "%s: %f" % (name, duration)
+ return duration
def parallel_get(name, connections, delay1=0.0):
- print "Get: %s x %d" % (name, len(connections))
+ #print "Get: %s x %d" % (name, len(connections))
threads = []
+ q = Queue.Queue()
+ def launcher(c, name, result_queue):
+ result_queue.put(c.get_object(name))
for i in range(len(connections)):
c = connections[i]
- threads.append(threading.Thread(target=c.get_object, args=(name,)))
+ threads.append(threading.Thread(target=launcher, args=(c, name, q)))
for i in range(len(threads)):
threads[i].start()
- if i == 0: time.sleep(delay1)
for t in threads: t.join()
- time.sleep(1.0)
+ res = []
+ while not q.empty():
+ res.append(q.get())
-def run_test():
- print "==== S3 ===="
- c = S3TestConnection()
- for repeat in range(4):
- for size in SIZES:
- #c.put_object('file-%d-%d' % (size, repeat), size)
- pass
+ if len(res) == len(connections):
+ return res
- c = S3TestConnection()
- for repeat in range(4):
- for size in SIZES:
- c.get_object('file-%d-%d' % (size, repeat))
+def parallel_multiget(names, connections, repeat=1):
+ requests = Queue.Queue()
+ results = [[threading.Lock(), None] for n in names]
+ for i in range(len(names)):
+ for _ in range(repeat):
+ requests.put((names[i], results[i]))
+
+ threads = []
+ def launcher(c, requests):
+ while True:
+ try:
+ (n, r) = requests.get(block=False)
+ # Possible data race here but it should be harmless
+ if r[1] is None:
+ res = c.get_object(n)
+ r[0].acquire()
+ if r[1] is None: r[1] = time.time()
+ r[0].release()
+ requests.task_done()
+ except Queue.Empty:
+ return
+ for i in range(len(connections)):
+ c = connections[i]
+ threads.append(threading.Thread(target=launcher, args=(c, requests)))
+ start_time = time.time()
+ for i in range(len(threads)):
+ threads[i].start()
+ requests.join()
+
+ return max(x[1] for x in results) - start_time
+
+def run_test(size, threads, num, logfile=sys.stdout, delay=1.0):
+ connections = [S3TestConnection() for _ in range(threads)]
+ for i in range(num):
+ print " ...test", i
+ res = parallel_get('file-%d-%d' % (size, i), connections)
+ if res is not None:
+ logfile.write(str(min(res)) + "\n")
+ if delay > 0:
+ time.sleep(delay)
+
+connections = [S3TestConnection() for _ in range(128)]
+logfile = open('multifetch-simulation.data', 'a')
+for s in [(1 << s) for s in range(16, 27)]:
+ print "Priming objects: %d-byte objects" % (s,)
+ run_test(s, 1, 100, open('/dev/null', 'w'), 0.0)
+
+ for blocksize in [x << 20 for x in (4, 8, 16, 32, 64, 128)]:
+ if s > blocksize: continue
+ for t in [4, 2, 1]:
+ for rep in range(10):
+ count = blocksize // s
+ print "Running tests: %d-byte blocks, %d-byte objects, %d parallel fetches" % (blocksize, s, t)
+ print "Object count:", count
+ if count * t > len(connections):
+ conns = connections
+ else:
+ conns = connections[0 : count * t]
+
+ objects = ['file-%d-%d' % (s, i % 100) for i in range(count)]
+ r = parallel_multiget(objects, conns, t)
+ print r
+ logfile.write('%s\t%s\t%s\t%s\t%s\n' % (s, blocksize >> 20, t, len(conns), r))
+ logfile.flush()
+ time.sleep(2.0)
+sys.exit(0)
+
+for s in SIZES:
+ print "Priming objects: %d-byte objects" % (s,)
+ logfile = open('/dev/null', 'w')
+ run_test(s, 1, 100, logfile, 0.0)
+
+ for t in [4, 2, 1]:
+ print "Running tests: %d-byte objects, %d parallel fetches" % (s, t)
+ logfile = open('parallel-%d-%d.data' % (s, t), 'w')
+ run_test(s, t, 100, logfile)
+sys.exit(0)
if __name__ == '__main__':
# Pass 1: Identical downloads in parallel