3 # Run a series of simple test requests against S3 for gathering some basic
7 from boto.s3.connection import SubdomainCallingFormat
8 from boto.s3.key import Key
9 import sys, threading, time, Queue
12 BUCKET_NAME = 'mvrable-benchmark'
13 SIZES = [(1 << s) for s in range(12, 23)]
15 class S3TestConnection:
17 self.conn = boto.connect_s3(is_secure=False,
18 calling_format=SubdomainCallingFormat())
19 self.bucket = self.conn.get_bucket(BUCKET_NAME)
21 def put_object(self, name, size):
23 k = Key(self.bucket, name)
24 start_time = time.time()
25 k.set_contents_from_string(buf)
26 #print "%s: %f" % (name, time.time() - start_time)
28 def get_object(self, name):
29 k = Key(self.bucket, name)
30 start_time = time.time()
31 buf = k.get_contents_as_string()
32 duration = time.time() - start_time
33 #print "%s: %f" % (name, duration)
36 def parallel_get(name, connections, delay1=0.0):
37 #print "Get: %s x %d" % (name, len(connections))
40 def launcher(c, name, result_queue):
41 result_queue.put(c.get_object(name))
42 for i in range(len(connections)):
44 threads.append(threading.Thread(target=launcher, args=(c, name, q)))
45 for i in range(len(threads)):
47 for t in threads: t.join()
52 if len(res) == len(connections):
55 def parallel_multiget(names, connections, repeat=1):
56 requests = Queue.Queue()
57 results = [[threading.Lock(), None] for n in names]
58 for i in range(len(names)):
59 for _ in range(repeat):
60 requests.put((names[i], results[i]))
63 def launcher(c, requests):
66 (n, r) = requests.get(block=False)
67 # Possible data race here but it should be harmless
71 if r[1] is None: r[1] = time.time()
76 for i in range(len(connections)):
78 threads.append(threading.Thread(target=launcher, args=(c, requests)))
79 start_time = time.time()
80 for i in range(len(threads)):
84 return max(x[1] for x in results) - start_time
86 def run_test(size, threads, num, logfile=sys.stdout, delay=1.0):
87 connections = [S3TestConnection() for _ in range(threads)]
90 res = parallel_get('file-%d-%d' % (size, i), connections)
92 logfile.write(str(min(res)) + "\n")
96 connections = [S3TestConnection() for _ in range(128)]
97 logfile = open('multifetch-simulation.data', 'a')
98 for s in [(1 << s) for s in range(16, 27)]:
99 print "Priming objects: %d-byte objects" % (s,)
100 run_test(s, 1, 100, open('/dev/null', 'w'), 0.0)
102 for blocksize in [x << 20 for x in (4, 8, 16, 32, 64, 128)]:
103 if s > blocksize: continue
105 for rep in range(10):
106 count = blocksize // s
107 print "Running tests: %d-byte blocks, %d-byte objects, %d parallel fetches" % (blocksize, s, t)
108 print "Object count:", count
109 if count * t > len(connections):
112 conns = connections[0 : count * t]
114 objects = ['file-%d-%d' % (s, i % 100) for i in range(count)]
115 r = parallel_multiget(objects, conns, t)
117 logfile.write('%s\t%s\t%s\t%s\t%s\n' % (s, blocksize >> 20, t, len(conns), r))
123 print "Priming objects: %d-byte objects" % (s,)
124 logfile = open('/dev/null', 'w')
125 run_test(s, 1, 100, logfile, 0.0)
128 print "Running tests: %d-byte objects, %d parallel fetches" % (s, t)
129 logfile = open('parallel-%d-%d.data' % (s, t), 'w')
130 run_test(s, t, 100, logfile)
133 if __name__ == '__main__':
134 # Pass 1: Identical downloads in parallel
135 connections = [S3TestConnection() for _ in range(8)]
136 SIZES = [4096, 32 << 10, 256 << 10, 1 << 20, 4 << 20]
137 PRIME = (1 << 20) + (1 << 10)
138 c = S3TestConnection()
141 parallel_get('file-%d-%d' % (size, i), connections)
143 # Pass 1: Downloads in parallel, but downloads staggered so one request
145 connections = [S3TestConnection() for _ in range(8)]
146 SIZES = [4096, 32 << 10, 256 << 10, 1 << 20, 4 << 20]
147 PRIME = (1 << 20) + (1 << 10)
148 c = S3TestConnection()
151 parallel_get('file-%d-%d' % (size, i), connections, delay1=1.0)