Spread test files for mixedbench across multiple subdirectories.
[bluesky.git] / cloudbench / parallelrangetest.py
1 #!/usr/bin/python
2 #
3 # Run a series of simple test requests against S3 for gathering some basic
4 # performance numbers.
5
6 import boto, time
7 from boto.s3.connection import SubdomainCallingFormat
8 from boto.s3.key import Key
9 import sys, threading, time, Queue
10 import azure
11
12 BUCKET_NAME = 'mvrable-benchmark'
13 SIZES = [(1 << s) for s in range(12, 23)]
14
15 class S3TestConnection:
16     def __init__(self):
17         self.conn = boto.connect_s3(is_secure=False,
18                                     calling_format=SubdomainCallingFormat())
19         self.bucket = self.conn.get_bucket(BUCKET_NAME)
20
21     def put_object(self, name, size):
22         buf = 'A' * size
23         k = Key(self.bucket, name)
24         start_time = time.time()
25         k.set_contents_from_string(buf)
26         #print "%s: %f" % (name, time.time() - start_time)
27
28     def get_object(self, name, byterange=None):
29         k = Key(self.bucket, name)
30         headers = {}
31         if byterange is not None:
32             headers['Range'] = 'bytes=%s-%s' % byterange
33         start_time = time.time()
34         buf = k.get_contents_as_string(headers=headers)
35         duration = time.time() - start_time
36         return (duration, len(buf))
37
38 def parallel_rangeget(name, size, connections, pieces, repeat=1):
39     requests = Queue.Queue()
40     results = [[threading.Lock(), None] for n in range(pieces)]
41     for _ in range(repeat):
42         for i in range(pieces):
43             requests.put((i, results[i]))
44     blocksize = size // pieces
45
46     threads = []
47     def launcher(c, requests):
48         while True:
49             try:
50                 (i, r) = requests.get(block=False)
51                 # Possible data race here but it should be harmless
52                 if r[1] is None:
53                     res = c.get_object(name, byterange=(blocksize * i,
54                                                         blocksize * (i+1) - 1))
55                     r[0].acquire()
56                     if r[1] is None: r[1] = time.time()
57                     r[0].release()
58                 requests.task_done()
59             except Queue.Empty:
60                 return
61     for i in range(len(connections)):
62         c = connections[i]
63         threads.append(threading.Thread(target=launcher, args=(c, requests)))
64     start_time = time.time()
65     for i in range(len(threads)):
66         threads[i].start()
67     requests.join()
68
69     return max(x[1] for x in results) - start_time
70
71 connections = [S3TestConnection() for _ in range(128)]
72
73 for i in [(1 << x) for x in range(8)]:
74     s = (1 << 22)
75     print i, parallel_rangeget('file-%d-0' % (s,), s, connections, i)
76     time.sleep(4.0)
77 sys.exit(0)
78
79 logfile = open('multifetch-simulation.data', 'a')
80 for s in [(1 << s) for s in range(16, 27)]:
81     print "Priming objects: %d-byte objects" % (s,)
82     run_test(s, 1, 100, open('/dev/null', 'w'), 0.0)
83
84     for blocksize in [x << 20 for x in (4, 8, 16, 32, 64, 128)]:
85         if s > blocksize: continue
86         for t in [4, 2, 1]:
87             for rep in range(10):
88                 count = blocksize // s
89                 print "Running tests: %d-byte blocks, %d-byte objects, %d parallel fetches" % (blocksize, s, t)
90                 print "Object count:", count
91                 if count * t > len(connections):
92                     conns = connections
93                 else:
94                     conns = connections[0 : count * t]
95
96                 objects = ['file-%d-%d' % (s, i % 100) for i in range(count)]
97                 r = parallel_multiget(objects, conns, t)
98                 print r
99                 logfile.write('%s\t%s\t%s\t%s\t%s\n' % (s, blocksize >> 20, t, len(conns), r))
100                 logfile.flush()
101                 time.sleep(2.0)