From: Michael Vrable Date: Wed, 30 Jun 2010 19:25:14 +0000 (-0700) Subject: Add another S3 benchmark tool. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=05de79169bf3ef0a3ba0d3f0c4d9c8dcaef383e6;p=bluesky.git Add another S3 benchmark tool. --- diff --git a/cloudbench/parallelrangetest.py b/cloudbench/parallelrangetest.py new file mode 100755 index 0000000..a57ac87 --- /dev/null +++ b/cloudbench/parallelrangetest.py @@ -0,0 +1,101 @@ +#!/usr/bin/python +# +# Run a series of simple test requests against S3 for gathering some basic +# performance numbers. + +import boto, time +from boto.s3.connection import SubdomainCallingFormat +from boto.s3.key import Key +import sys, threading, time, Queue +import azure + +BUCKET_NAME = 'mvrable-benchmark' +SIZES = [(1 << s) for s in range(12, 23)] + +class S3TestConnection: + def __init__(self): + self.conn = boto.connect_s3(is_secure=False, + calling_format=SubdomainCallingFormat()) + self.bucket = self.conn.get_bucket(BUCKET_NAME) + + def put_object(self, name, size): + buf = 'A' * size + k = Key(self.bucket, name) + start_time = time.time() + k.set_contents_from_string(buf) + #print "%s: %f" % (name, time.time() - start_time) + + def get_object(self, name, byterange=None): + k = Key(self.bucket, name) + headers = {} + if byterange is not None: + headers['Range'] = 'bytes=%s-%s' % byterange + start_time = time.time() + buf = k.get_contents_as_string(headers=headers) + duration = time.time() - start_time + return (duration, len(buf)) + +def parallel_rangeget(name, size, connections, pieces, repeat=1): + requests = Queue.Queue() + results = [[threading.Lock(), None] for n in range(pieces)] + for _ in range(repeat): + for i in range(pieces): + requests.put((i, results[i])) + blocksize = size // pieces + + threads = [] + def launcher(c, requests): + while True: + try: + (i, r) = requests.get(block=False) + # Possible data race here but it should be harmless + if r[1] is None: + res = c.get_object(name, byterange=(blocksize * i, + blocksize * (i+1) - 1)) + r[0].acquire() + if r[1] is None: r[1] = time.time() + r[0].release() + requests.task_done() + except Queue.Empty: + return + for i in range(len(connections)): + c = connections[i] + threads.append(threading.Thread(target=launcher, args=(c, requests))) + start_time = time.time() + for i in range(len(threads)): + threads[i].start() + requests.join() + + return max(x[1] for x in results) - start_time + +connections = [S3TestConnection() for _ in range(128)] + +for i in [(1 << x) for x in range(8)]: + s = (1 << 22) + print i, parallel_rangeget('file-%d-0' % (s,), s, connections, i) + time.sleep(4.0) +sys.exit(0) + +logfile = open('multifetch-simulation.data', 'a') +for s in [(1 << s) for s in range(16, 27)]: + print "Priming objects: %d-byte objects" % (s,) + run_test(s, 1, 100, open('/dev/null', 'w'), 0.0) + + for blocksize in [x << 20 for x in (4, 8, 16, 32, 64, 128)]: + if s > blocksize: continue + for t in [4, 2, 1]: + for rep in range(10): + count = blocksize // s + print "Running tests: %d-byte blocks, %d-byte objects, %d parallel fetches" % (blocksize, s, t) + print "Object count:", count + if count * t > len(connections): + conns = connections + else: + conns = connections[0 : count * t] + + objects = ['file-%d-%d' % (s, i % 100) for i in range(count)] + r = parallel_multiget(objects, conns, t) + print r + logfile.write('%s\t%s\t%s\t%s\t%s\n' % (s, blocksize >> 20, t, len(conns), r)) + logfile.flush() + time.sleep(2.0)