47216b151194a80af06e0fa7902838dd810bb24f
[bluesky.git] / cloudbench / s3pipeline.py
1 #!/usr/bin/python2.6
2
3 import boto, httplib, socket, sys, time
4 from boto.s3.key import Key
5 from threading import Thread
6
7 bucket_name = 'mvrable-benchmark'
8 conn = boto.connect_s3(is_secure=False)
9
10 class HttpResponseParser:
11     def __init__(self):
12         self.buf = ""
13         self.header = True
14         self.content_length = None
15
16     def process(self, data):
17         self.buf = self.buf + data
18         while self.parse():
19             pass
20
21     def parse(self):
22         if len(self.buf) == 0:
23             return False
24
25         if not self.header and self.content_length is not None:
26             consumed = min(self.content_length, len(self.buf))
27             #print "Got", consumed, "bytes of data"
28             self.content_length -= consumed
29             self.buf = self.buf[consumed:]
30
31             if self.content_length == 0:
32                 print "Completed reading body"
33                 self.content_length = None
34                 self.header = True
35
36             return True
37
38         crlf = self.buf.find("\r\n")
39         if crlf < 0:
40             return False
41
42         line = self.buf[0:crlf]
43         self.buf = self.buf[crlf+2:]
44         #print "Header line:", line
45         if line.lower().startswith('content-length'):
46             self.content_length = int(line[16:])
47             print "Content length:", self.content_length
48         if line == "":
49             self.header = False
50         return True
51
52 class PipelinedRequester:
53     def __init__(self, bucket):
54         self.bucket = bucket
55         self.host = conn.calling_format.build_host(conn.server_name(), bucket)
56         self.sock = socket.create_connection((self.host, 80))
57
58         t = Thread(target=self.reader_thread)
59         t.setDaemon(True)
60         t.start()
61
62     def reader_thread(self):
63         hrp = HttpResponseParser()
64         while True:
65             buf = self.sock.recv(4096)
66             if len(buf) == 0: break
67             hrp.process(buf)
68
69     def send_request(self, key):
70         method = 'GET'
71         path = conn.calling_format.build_path_base(self.bucket, key)
72         auth_path = conn.calling_format.build_auth_path(self.bucket, key)
73
74         headers = {'User-Agent': boto.UserAgent + " (pipelined)",
75                    'Content-Length': str(0)}
76         conn.add_aws_auth_header(headers, method, auth_path)
77
78         req = "%s %s HTTP/1.1\r\nHost: %s\r\n" % (method, path, self.host)
79         req = req + ''.join("%s: %s\r\n" % h for h in headers.items()) + "\r\n"
80         self.sock.sendall(req)
81
82 requester = PipelinedRequester(bucket_name)
83 for i in range(12, 18):
84     requester.send_request('file-%d-1' % (1 << i,))
85     if i == 12:
86         time.sleep(2)
87 for i in range(32):
88     requester.send_request('file-8192-%d' % (i,))
89
90 time.sleep(5)