X-Git-Url: http://git.vrable.net/?p=bluesky.git;a=blobdiff_plain;f=cloudbench%2Fparalleltest.py;fp=cloudbench%2Fparalleltest.py;h=8af4cb49b35fa6303753b4565027b059140bfff1;hp=b447938ecdbc77397e096e708b84c919e0082683;hb=710fc5d5d9401b7c99b9e85021bace515b600546;hpb=9840abd370c4679739b2ae0e9384276bd99c2591 diff --git a/cloudbench/paralleltest.py b/cloudbench/paralleltest.py index b447938..8af4cb4 100755 --- a/cloudbench/paralleltest.py +++ b/cloudbench/paralleltest.py @@ -52,6 +52,37 @@ def parallel_get(name, connections, delay1=0.0): if len(res) == len(connections): return res +def parallel_multiget(names, connections, repeat=1): + requests = Queue.Queue() + results = [[threading.Lock(), None] for n in names] + for i in range(len(names)): + for _ in range(repeat): + requests.put((names[i], results[i])) + + threads = [] + def launcher(c, requests): + while True: + try: + (n, r) = requests.get(block=False) + # Possible data race here but it should be harmless + if r[1] is None: + res = c.get_object(n) + r[0].acquire() + if r[1] is None: r[1] = time.time() + r[0].release() + requests.task_done() + except Queue.Empty: + return + for i in range(len(connections)): + c = connections[i] + threads.append(threading.Thread(target=launcher, args=(c, requests))) + start_time = time.time() + for i in range(len(threads)): + threads[i].start() + requests.join() + + return max(x[1] for x in results) - start_time + def run_test(size, threads, num, logfile=sys.stdout, delay=1.0): connections = [S3TestConnection() for _ in range(threads)] for i in range(num): @@ -62,6 +93,32 @@ def run_test(size, threads, num, logfile=sys.stdout, delay=1.0): if delay > 0: time.sleep(delay) +connections = [S3TestConnection() for _ in range(128)] +logfile = open('multifetch-simulation.data', 'a') +for s in SIZES: + print "Priming objects: %d-byte objects" % (s,) + run_test(s, 1, 100, open('/dev/null', 'w'), 0.0) + + for blocksize in [1 << 20, 2 << 20, 4 << 20, 8 << 20]: + if s > blocksize: continue + for t in [4, 2, 1]: + for rep in range(10): + count = blocksize // s + print "Running tests: %d-byte blocks, %d-byte objects, %d parallel fetches" % (blocksize, s, t) + print "Object count:", count + if count * t > len(connections): + conns = connections + else: + conns = connections[0 : count * t] + + objects = ['file-%d-%d' % (s, i % 100) for i in range(count)] + r = parallel_multiget(objects, conns, t) + print r + logfile.write('%s\t%s\t%s\t%s\t%s\n' % (s, blocksize >> 20, t, len(conns), r)) + logfile.flush() + time.sleep(2.0) +sys.exit(0) + for s in SIZES: print "Priming objects: %d-byte objects" % (s,) logfile = open('/dev/null', 'w')