3f74b2fa8b050c8ac9e171e64334991c03f47059
[bluesky.git] / cloudbench / paralleltest.py
1 #!/usr/bin/python
2 #
3 # Run a series of simple test requests against S3 for gathering some basic
4 # performance numbers.
5
6 import boto, time
7 from boto.s3.connection import SubdomainCallingFormat
8 from boto.s3.key import Key
9 import sys, threading, time, Queue
10 import azure
11
12 BUCKET_NAME = 'mvrable-benchmark'
13 SIZES = [(1 << s) for s in range(12, 23)]
14
15 class S3TestConnection:
16     def __init__(self):
17         self.conn = boto.connect_s3(is_secure=False,
18                                     calling_format=SubdomainCallingFormat())
19         self.bucket = self.conn.get_bucket(BUCKET_NAME)
20
21     def put_object(self, name, size):
22         buf = 'A' * size
23         k = Key(self.bucket, name)
24         start_time = time.time()
25         k.set_contents_from_string(buf)
26         #print "%s: %f" % (name, time.time() - start_time)
27
28     def get_object(self, name):
29         k = Key(self.bucket, name)
30         start_time = time.time()
31         buf = k.get_contents_as_string()
32         duration = time.time() - start_time
33         #print "%s: %f" % (name, duration)
34         return duration
35
36 def parallel_get(name, connections, delay1=0.0):
37     #print "Get: %s x %d" % (name, len(connections))
38     threads = []
39     q = Queue.Queue()
40     def launcher(c, name, result_queue):
41         result_queue.put(c.get_object(name))
42     for i in range(len(connections)):
43         c = connections[i]
44         threads.append(threading.Thread(target=launcher, args=(c, name, q)))
45     for i in range(len(threads)):
46         threads[i].start()
47     for t in threads: t.join()
48     res = []
49     while not q.empty():
50         res.append(q.get())
51
52     if len(res) == len(connections):
53         return res
54
55 def parallel_multiget(names, connections, repeat=1):
56     requests = Queue.Queue()
57     results = [[threading.Lock(), None] for n in names]
58     for i in range(len(names)):
59         for _ in range(repeat):
60             requests.put((names[i], results[i]))
61
62     threads = []
63     def launcher(c, requests):
64         while True:
65             try:
66                 (n, r) = requests.get(block=False)
67                 # Possible data race here but it should be harmless
68                 if r[1] is None:
69                     res = c.get_object(n)
70                     r[0].acquire()
71                     if r[1] is None: r[1] = time.time()
72                     r[0].release()
73                 requests.task_done()
74             except Queue.Empty:
75                 return
76     for i in range(len(connections)):
77         c = connections[i]
78         threads.append(threading.Thread(target=launcher, args=(c, requests)))
79     start_time = time.time()
80     for i in range(len(threads)):
81         threads[i].start()
82     requests.join()
83
84     return max(x[1] for x in results) - start_time
85
86 def run_test(size, threads, num, logfile=sys.stdout, delay=1.0):
87     connections = [S3TestConnection() for _ in range(threads)]
88     for i in range(num):
89         print "    ...test", i
90         res = parallel_get('file-%d-%d' % (size, i), connections)
91         if res is not None:
92             logfile.write(str(min(res)) + "\n")
93         if delay > 0:
94             time.sleep(delay)
95
96 connections = [S3TestConnection() for _ in range(128)]
97 logfile = open('multifetch-simulation.data', 'a')
98 for s in [(1 << s) for s in range(16, 27)]:
99     print "Priming objects: %d-byte objects" % (s,)
100     run_test(s, 1, 100, open('/dev/null', 'w'), 0.0)
101
102     for blocksize in [x << 20 for x in (4, 8, 16, 32, 64, 128)]:
103         if s > blocksize: continue
104         for t in [4, 2, 1]:
105             for rep in range(10):
106                 count = blocksize // s
107                 print "Running tests: %d-byte blocks, %d-byte objects, %d parallel fetches" % (blocksize, s, t)
108                 print "Object count:", count
109                 if count * t > len(connections):
110                     conns = connections
111                 else:
112                     conns = connections[0 : count * t]
113
114                 objects = ['file-%d-%d' % (s, i % 100) for i in range(count)]
115                 r = parallel_multiget(objects, conns, t)
116                 print r
117                 logfile.write('%s\t%s\t%s\t%s\t%s\n' % (s, blocksize >> 20, t, len(conns), r))
118                 logfile.flush()
119                 time.sleep(2.0)
120 sys.exit(0)
121
122 for s in SIZES:
123     print "Priming objects: %d-byte objects" % (s,)
124     logfile = open('/dev/null', 'w')
125     run_test(s, 1, 100, logfile, 0.0)
126
127     for t in [4, 2, 1]:
128         print "Running tests: %d-byte objects, %d parallel fetches" % (s, t)
129         logfile = open('parallel-%d-%d.data' % (s, t), 'w')
130         run_test(s, t, 100, logfile)
131 sys.exit(0)
132
133 if __name__ == '__main__':
134     # Pass 1: Identical downloads in parallel
135     connections = [S3TestConnection() for _ in range(8)]
136     SIZES = [4096, 32 << 10, 256 << 10, 1 << 20, 4 << 20]
137     PRIME = (1 << 20) + (1 << 10)
138     c = S3TestConnection()
139     for size in SIZES:
140         for i in range(32):
141             parallel_get('file-%d-%d' % (size, i), connections)
142
143     # Pass 1: Downloads in parallel, but downloads staggered so one request
144     # arrives earlier
145     connections = [S3TestConnection() for _ in range(8)]
146     SIZES = [4096, 32 << 10, 256 << 10, 1 << 20, 4 << 20]
147     PRIME = (1 << 20) + (1 << 10)
148     c = S3TestConnection()
149     for size in SIZES:
150         for i in range(32):
151             parallel_get('file-%d-%d' % (size, i), connections, delay1=1.0)