Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / cloudbench / paralleltest.py
index f1b0be2..19ee695 100755 (executable)
@@ -1,16 +1,43 @@
 #!/usr/bin/python
 #
+# Copyright (C) 2010  The Regents of the University of California
+# Written by Michael Vrable <mvrable@cs.ucsd.edu>
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+#    notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+#    notice, this list of conditions and the following disclaimer in the
+#    documentation and/or other materials provided with the distribution.
+# 3. Neither the name of the University nor the names of its contributors
+#    may be used to endorse or promote products derived from this software
+#    without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
+# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+
 # Run a series of simple test requests against S3 for gathering some basic
 # performance numbers.
 
 import boto, time
 from boto.s3.connection import SubdomainCallingFormat
 from boto.s3.key import Key
-import sys, threading, time
+import sys, threading, time, Queue
 import azure
 
-BUCKET_NAME = 'mvrable-benchmark-west'
-SIZES = [64, 4096, 32 << 10, 256 << 10, 1 << 20, 4 << 20, 32 << 20]
+BUCKET_NAME = 'mvrable-benchmark'
+SIZES = [(1 << s) for s in range(12, 23)]
 
 class S3TestConnection:
     def __init__(self):
@@ -23,38 +50,112 @@ class S3TestConnection:
         k = Key(self.bucket, name)
         start_time = time.time()
         k.set_contents_from_string(buf)
-        print "%s: %f" % (name, time.time() - start_time)
+        #print "%s: %f" % (name, time.time() - start_time)
 
     def get_object(self, name):
         k = Key(self.bucket, name)
         start_time = time.time()
         buf = k.get_contents_as_string()
-        print "%s: %f" % (name, time.time() - start_time)
+        duration = time.time() - start_time
+        #print "%s: %f" % (name, duration)
+        return duration
 
 def parallel_get(name, connections, delay1=0.0):
-    print "Get: %s x %d" % (name, len(connections))
+    #print "Get: %s x %d" % (name, len(connections))
     threads = []
+    q = Queue.Queue()
+    def launcher(c, name, result_queue):
+        result_queue.put(c.get_object(name))
     for i in range(len(connections)):
         c = connections[i]
-        threads.append(threading.Thread(target=c.get_object, args=(name,)))
+        threads.append(threading.Thread(target=launcher, args=(c, name, q)))
     for i in range(len(threads)):
         threads[i].start()
-        if i == 0: time.sleep(delay1)
     for t in threads: t.join()
-    time.sleep(1.0)
+    res = []
+    while not q.empty():
+        res.append(q.get())
 
-def run_test():
-    print "==== S3 ===="
-    c = S3TestConnection()
-    for repeat in range(4):
-        for size in SIZES:
-            #c.put_object('file-%d-%d' % (size, repeat), size)
-            pass
+    if len(res) == len(connections):
+        return res
 
-    c = S3TestConnection()
-    for repeat in range(4):
-        for size in SIZES:
-            c.get_object('file-%d-%d' % (size, repeat))
+def parallel_multiget(names, connections, repeat=1):
+    requests = Queue.Queue()
+    results = [[threading.Lock(), None] for n in names]
+    for i in range(len(names)):
+        for _ in range(repeat):
+            requests.put((names[i], results[i]))
+
+    threads = []
+    def launcher(c, requests):
+        while True:
+            try:
+                (n, r) = requests.get(block=False)
+                # Possible data race here but it should be harmless
+                if r[1] is None:
+                    res = c.get_object(n)
+                    r[0].acquire()
+                    if r[1] is None: r[1] = time.time()
+                    r[0].release()
+                requests.task_done()
+            except Queue.Empty:
+                return
+    for i in range(len(connections)):
+        c = connections[i]
+        threads.append(threading.Thread(target=launcher, args=(c, requests)))
+    start_time = time.time()
+    for i in range(len(threads)):
+        threads[i].start()
+    requests.join()
+
+    return max(x[1] for x in results) - start_time
+
+def run_test(size, threads, num, logfile=sys.stdout, delay=1.0):
+    connections = [S3TestConnection() for _ in range(threads)]
+    for i in range(num):
+        print "    ...test", i
+        res = parallel_get('file-%d-%d' % (size, i), connections)
+        if res is not None:
+            logfile.write(str(min(res)) + "\n")
+        if delay > 0:
+            time.sleep(delay)
+
+connections = [S3TestConnection() for _ in range(128)]
+logfile = open('multifetch-simulation.data', 'a')
+for s in [(1 << s) for s in range(16, 27)]:
+    print "Priming objects: %d-byte objects" % (s,)
+    run_test(s, 1, 100, open('/dev/null', 'w'), 0.0)
+
+    for blocksize in [x << 20 for x in (4, 8, 16, 32, 64, 128)]:
+        if s > blocksize: continue
+        for t in [4, 2, 1]:
+            for rep in range(10):
+                count = blocksize // s
+                print "Running tests: %d-byte blocks, %d-byte objects, %d parallel fetches" % (blocksize, s, t)
+                print "Object count:", count
+                if count * t > len(connections):
+                    conns = connections
+                else:
+                    conns = connections[0 : count * t]
+
+                objects = ['file-%d-%d' % (s, i % 100) for i in range(count)]
+                r = parallel_multiget(objects, conns, t)
+                print r
+                logfile.write('%s\t%s\t%s\t%s\t%s\n' % (s, blocksize >> 20, t, len(conns), r))
+                logfile.flush()
+                time.sleep(2.0)
+sys.exit(0)
+
+for s in SIZES:
+    print "Priming objects: %d-byte objects" % (s,)
+    logfile = open('/dev/null', 'w')
+    run_test(s, 1, 100, logfile, 0.0)
+
+    for t in [4, 2, 1]:
+        print "Running tests: %d-byte objects, %d parallel fetches" % (s, t)
+        logfile = open('parallel-%d-%d.data' % (s, t), 'w')
+        run_test(s, t, 100, logfile)
+sys.exit(0)
 
 if __name__ == '__main__':
     # Pass 1: Identical downloads in parallel