Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / cloudbench / paralleltest.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2010  The Regents of the University of California
4 # Written by Michael Vrable <mvrable@cs.ucsd.edu>
5 #
6 # Redistribution and use in source and binary forms, with or without
7 # modification, are permitted provided that the following conditions
8 # are met:
9 # 1. Redistributions of source code must retain the above copyright
10 #    notice, this list of conditions and the following disclaimer.
11 # 2. Redistributions in binary form must reproduce the above copyright
12 #    notice, this list of conditions and the following disclaimer in the
13 #    documentation and/or other materials provided with the distribution.
14 # 3. Neither the name of the University nor the names of its contributors
15 #    may be used to endorse or promote products derived from this software
16 #    without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 # ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28 # SUCH DAMAGE.
29
30 # Run a series of simple test requests against S3 for gathering some basic
31 # performance numbers.
32
33 import boto, time
34 from boto.s3.connection import SubdomainCallingFormat
35 from boto.s3.key import Key
36 import sys, threading, time, Queue
37 import azure
38
39 BUCKET_NAME = 'mvrable-benchmark'
40 SIZES = [(1 << s) for s in range(12, 23)]
41
42 class S3TestConnection:
43     def __init__(self):
44         self.conn = boto.connect_s3(is_secure=False,
45                                     calling_format=SubdomainCallingFormat())
46         self.bucket = self.conn.get_bucket(BUCKET_NAME)
47
48     def put_object(self, name, size):
49         buf = 'A' * size
50         k = Key(self.bucket, name)
51         start_time = time.time()
52         k.set_contents_from_string(buf)
53         #print "%s: %f" % (name, time.time() - start_time)
54
55     def get_object(self, name):
56         k = Key(self.bucket, name)
57         start_time = time.time()
58         buf = k.get_contents_as_string()
59         duration = time.time() - start_time
60         #print "%s: %f" % (name, duration)
61         return duration
62
63 def parallel_get(name, connections, delay1=0.0):
64     #print "Get: %s x %d" % (name, len(connections))
65     threads = []
66     q = Queue.Queue()
67     def launcher(c, name, result_queue):
68         result_queue.put(c.get_object(name))
69     for i in range(len(connections)):
70         c = connections[i]
71         threads.append(threading.Thread(target=launcher, args=(c, name, q)))
72     for i in range(len(threads)):
73         threads[i].start()
74     for t in threads: t.join()
75     res = []
76     while not q.empty():
77         res.append(q.get())
78
79     if len(res) == len(connections):
80         return res
81
82 def parallel_multiget(names, connections, repeat=1):
83     requests = Queue.Queue()
84     results = [[threading.Lock(), None] for n in names]
85     for i in range(len(names)):
86         for _ in range(repeat):
87             requests.put((names[i], results[i]))
88
89     threads = []
90     def launcher(c, requests):
91         while True:
92             try:
93                 (n, r) = requests.get(block=False)
94                 # Possible data race here but it should be harmless
95                 if r[1] is None:
96                     res = c.get_object(n)
97                     r[0].acquire()
98                     if r[1] is None: r[1] = time.time()
99                     r[0].release()
100                 requests.task_done()
101             except Queue.Empty:
102                 return
103     for i in range(len(connections)):
104         c = connections[i]
105         threads.append(threading.Thread(target=launcher, args=(c, requests)))
106     start_time = time.time()
107     for i in range(len(threads)):
108         threads[i].start()
109     requests.join()
110
111     return max(x[1] for x in results) - start_time
112
113 def run_test(size, threads, num, logfile=sys.stdout, delay=1.0):
114     connections = [S3TestConnection() for _ in range(threads)]
115     for i in range(num):
116         print "    ...test", i
117         res = parallel_get('file-%d-%d' % (size, i), connections)
118         if res is not None:
119             logfile.write(str(min(res)) + "\n")
120         if delay > 0:
121             time.sleep(delay)
122
123 connections = [S3TestConnection() for _ in range(128)]
124 logfile = open('multifetch-simulation.data', 'a')
125 for s in [(1 << s) for s in range(16, 27)]:
126     print "Priming objects: %d-byte objects" % (s,)
127     run_test(s, 1, 100, open('/dev/null', 'w'), 0.0)
128
129     for blocksize in [x << 20 for x in (4, 8, 16, 32, 64, 128)]:
130         if s > blocksize: continue
131         for t in [4, 2, 1]:
132             for rep in range(10):
133                 count = blocksize // s
134                 print "Running tests: %d-byte blocks, %d-byte objects, %d parallel fetches" % (blocksize, s, t)
135                 print "Object count:", count
136                 if count * t > len(connections):
137                     conns = connections
138                 else:
139                     conns = connections[0 : count * t]
140
141                 objects = ['file-%d-%d' % (s, i % 100) for i in range(count)]
142                 r = parallel_multiget(objects, conns, t)
143                 print r
144                 logfile.write('%s\t%s\t%s\t%s\t%s\n' % (s, blocksize >> 20, t, len(conns), r))
145                 logfile.flush()
146                 time.sleep(2.0)
147 sys.exit(0)
148
149 for s in SIZES:
150     print "Priming objects: %d-byte objects" % (s,)
151     logfile = open('/dev/null', 'w')
152     run_test(s, 1, 100, logfile, 0.0)
153
154     for t in [4, 2, 1]:
155         print "Running tests: %d-byte objects, %d parallel fetches" % (s, t)
156         logfile = open('parallel-%d-%d.data' % (s, t), 'w')
157         run_test(s, t, 100, logfile)
158 sys.exit(0)
159
160 if __name__ == '__main__':
161     # Pass 1: Identical downloads in parallel
162     connections = [S3TestConnection() for _ in range(8)]
163     SIZES = [4096, 32 << 10, 256 << 10, 1 << 20, 4 << 20]
164     PRIME = (1 << 20) + (1 << 10)
165     c = S3TestConnection()
166     for size in SIZES:
167         for i in range(32):
168             parallel_get('file-%d-%d' % (size, i), connections)
169
170     # Pass 1: Downloads in parallel, but downloads staggered so one request
171     # arrives earlier
172     connections = [S3TestConnection() for _ in range(8)]
173     SIZES = [4096, 32 << 10, 256 << 10, 1 << 20, 4 << 20]
174     PRIME = (1 << 20) + (1 << 10)
175     c = S3TestConnection()
176     for size in SIZES:
177         for i in range(32):
178             parallel_get('file-%d-%d' % (size, i), connections, delay1=1.0)