159cbe4d77b442f1322e71b03211082e64583a85
[bluesky.git] / cloudbench / rangetest.py
1 #!/usr/bin/python
2 #
3 # Run a series of simple test requests against S3 for gathering some basic
4 # performance numbers.
5
6 import boto, time
7 from boto.s3.connection import SubdomainCallingFormat
8 from boto.s3.key import Key
9 import sys, threading, time, Queue
10 import azure
11
12 BUCKET_NAME = 'mvrable-benchmark'
13 SIZES = [(1 << s) for s in range(12, 23)]
14
15 class S3TestConnection:
16     def __init__(self):
17         self.conn = boto.connect_s3(is_secure=False,
18                                     calling_format=SubdomainCallingFormat())
19         self.bucket = self.conn.get_bucket(BUCKET_NAME)
20
21     def put_object(self, name, size):
22         buf = 'A' * size
23         k = Key(self.bucket, name)
24         start_time = time.time()
25         k.set_contents_from_string(buf)
26         #print "%s: %f" % (name, time.time() - start_time)
27
28     def get_object(self, name, byterange=None):
29         k = Key(self.bucket, name)
30         headers = {}
31         if byterange is not None:
32             headers['Range'] = 'bytes=%s-%s' % byterange
33         start_time = time.time()
34         buf = k.get_contents_as_string(headers=headers)
35         duration = time.time() - start_time
36         return (duration, len(buf))
37
38 def parallel_get(name, connections, delay1=0.0):
39     #print "Get: %s x %d" % (name, len(connections))
40     threads = []
41     q = Queue.Queue()
42     def launcher(c, name, result_queue):
43         result_queue.put(c.get_object(name))
44     for i in range(len(connections)):
45         c = connections[i]
46         threads.append(threading.Thread(target=launcher, args=(c, name, q)))
47     for i in range(len(threads)):
48         threads[i].start()
49     for t in threads: t.join()
50     res = []
51     while not q.empty():
52         res.append(q.get())
53
54     if len(res) == len(connections):
55         return res
56
57 def parallel_multiget(names, connections, repeat=1):
58     requests = Queue.Queue()
59     results = [[threading.Lock(), None] for n in names]
60     for i in range(len(names)):
61         for _ in range(repeat):
62             requests.put((names[i], results[i]))
63
64     threads = []
65     def launcher(c, requests):
66         while True:
67             try:
68                 (n, r) = requests.get(block=False)
69                 # Possible data race here but it should be harmless
70                 if r[1] is None:
71                     res = c.get_object(n)
72                     r[0].acquire()
73                     if r[1] is None: r[1] = time.time()
74                     r[0].release()
75                 requests.task_done()
76             except Queue.Empty:
77                 return
78     for i in range(len(connections)):
79         c = connections[i]
80         threads.append(threading.Thread(target=launcher, args=(c, requests)))
81     start_time = time.time()
82     for i in range(len(threads)):
83         threads[i].start()
84     requests.join()
85
86     return max(x[1] for x in results) - start_time
87
88 def run_test(size, threads, num, logfile=sys.stdout, delay=1.0):
89     connections = [S3TestConnection() for _ in range(threads)]
90     for i in range(num):
91         print "    ...test", i
92         res = parallel_get('file-%d-%d' % (size, i), connections)
93         if res is not None:
94             logfile.write(str(min(res)) + "\n")
95         if delay > 0:
96             time.sleep(delay)
97
98 # Ranges are specified as a start and a length.  Fractional values are
99 # multiplied by the total size of the object.  Negative start values measure
100 # from the end of the object.
101 TESTRANGES = [[None, None],                 # Cold read, then hot read
102               [None, (0, 256)],             # Cold read, hot partial read
103               [(0, 256), None],             # Cold partial, hot full
104               [(-256, 256), None],          # Cold partial end, hot full
105               [(0, 256), (0, 256)],         # Repeated range
106               [(0, 256), (256, 256)],       # Consecutive ranges
107               [(0, 256), (-256, 256)],      # Discontiguous ranges
108               [(-256, 256), (0, 256)]]      # Discontiguous ranges
109
110 connection = S3TestConnection()
111 logfile = open('multifetch-simulation.data', 'a')
112 RANGE = 100
113 for s in SIZES:
114     for i in range(RANGE):
115         name = 'file-%d-%d' % (s, i)
116         for r in TESTRANGES[i * len(TESTRANGES) // RANGE]:
117             if r is not None:
118                 (x, y) = r
119                 if abs(x) < 1: x = int(x * s)
120                 if abs(y) < 1: x = int(y * s)
121                 if x < 0: x += s
122                 r = (x, x + y - 1)
123             t = connection.get_object(name, r)
124             print "%s %s: %s" % (name, r or "", t)
125
126 sys.exit(0)