X-Git-Url: http://git.vrable.net/?p=bluesky.git;a=blobdiff_plain;f=cloudbench%2Fs3pipeline.py;fp=cloudbench%2Fs3pipeline.py;h=47216b151194a80af06e0fa7902838dd810bb24f;hp=0000000000000000000000000000000000000000;hb=4a16bafc74f36e1314a722544612ad7ac6a6cd05;hpb=9d32f47c84c4b8aae2ec6fd63d1a4f008228e82f diff --git a/cloudbench/s3pipeline.py b/cloudbench/s3pipeline.py new file mode 100755 index 0000000..47216b1 --- /dev/null +++ b/cloudbench/s3pipeline.py @@ -0,0 +1,90 @@ +#!/usr/bin/python2.6 + +import boto, httplib, socket, sys, time +from boto.s3.key import Key +from threading import Thread + +bucket_name = 'mvrable-benchmark' +conn = boto.connect_s3(is_secure=False) + +class HttpResponseParser: + def __init__(self): + self.buf = "" + self.header = True + self.content_length = None + + def process(self, data): + self.buf = self.buf + data + while self.parse(): + pass + + def parse(self): + if len(self.buf) == 0: + return False + + if not self.header and self.content_length is not None: + consumed = min(self.content_length, len(self.buf)) + #print "Got", consumed, "bytes of data" + self.content_length -= consumed + self.buf = self.buf[consumed:] + + if self.content_length == 0: + print "Completed reading body" + self.content_length = None + self.header = True + + return True + + crlf = self.buf.find("\r\n") + if crlf < 0: + return False + + line = self.buf[0:crlf] + self.buf = self.buf[crlf+2:] + #print "Header line:", line + if line.lower().startswith('content-length'): + self.content_length = int(line[16:]) + print "Content length:", self.content_length + if line == "": + self.header = False + return True + +class PipelinedRequester: + def __init__(self, bucket): + self.bucket = bucket + self.host = conn.calling_format.build_host(conn.server_name(), bucket) + self.sock = socket.create_connection((self.host, 80)) + + t = Thread(target=self.reader_thread) + t.setDaemon(True) + t.start() + + def reader_thread(self): + hrp = HttpResponseParser() + while True: + buf = self.sock.recv(4096) + if len(buf) == 0: break + hrp.process(buf) + + def send_request(self, key): + method = 'GET' + path = conn.calling_format.build_path_base(self.bucket, key) + auth_path = conn.calling_format.build_auth_path(self.bucket, key) + + headers = {'User-Agent': boto.UserAgent + " (pipelined)", + 'Content-Length': str(0)} + conn.add_aws_auth_header(headers, method, auth_path) + + req = "%s %s HTTP/1.1\r\nHost: %s\r\n" % (method, path, self.host) + req = req + ''.join("%s: %s\r\n" % h for h in headers.items()) + "\r\n" + self.sock.sendall(req) + +requester = PipelinedRequester(bucket_name) +for i in range(12, 18): + requester.send_request('file-%d-1' % (1 << i,)) + if i == 12: + time.sleep(2) +for i in range(32): + requester.send_request('file-8192-%d' % (i,)) + +time.sleep(5)