More Amazon S3 test script updates.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Tue, 15 Jun 2010 18:29:29 +0000 (11:29 -0700)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Tue, 15 Jun 2010 18:29:29 +0000 (11:29 -0700)
cloudbench/paralleltest.py

index b447938..8af4cb4 100755 (executable)
@@ -52,6 +52,37 @@ def parallel_get(name, connections, delay1=0.0):
     if len(res) == len(connections):
         return res
 
+def parallel_multiget(names, connections, repeat=1):
+    requests = Queue.Queue()
+    results = [[threading.Lock(), None] for n in names]
+    for i in range(len(names)):
+        for _ in range(repeat):
+            requests.put((names[i], results[i]))
+
+    threads = []
+    def launcher(c, requests):
+        while True:
+            try:
+                (n, r) = requests.get(block=False)
+                # Possible data race here but it should be harmless
+                if r[1] is None:
+                    res = c.get_object(n)
+                    r[0].acquire()
+                    if r[1] is None: r[1] = time.time()
+                    r[0].release()
+                requests.task_done()
+            except Queue.Empty:
+                return
+    for i in range(len(connections)):
+        c = connections[i]
+        threads.append(threading.Thread(target=launcher, args=(c, requests)))
+    start_time = time.time()
+    for i in range(len(threads)):
+        threads[i].start()
+    requests.join()
+
+    return max(x[1] for x in results) - start_time
+
 def run_test(size, threads, num, logfile=sys.stdout, delay=1.0):
     connections = [S3TestConnection() for _ in range(threads)]
     for i in range(num):
@@ -62,6 +93,32 @@ def run_test(size, threads, num, logfile=sys.stdout, delay=1.0):
         if delay > 0:
             time.sleep(delay)
 
+connections = [S3TestConnection() for _ in range(128)]
+logfile = open('multifetch-simulation.data', 'a')
+for s in SIZES:
+    print "Priming objects: %d-byte objects" % (s,)
+    run_test(s, 1, 100, open('/dev/null', 'w'), 0.0)
+
+    for blocksize in [1 << 20, 2 << 20, 4 << 20, 8 << 20]:
+        if s > blocksize: continue
+        for t in [4, 2, 1]:
+            for rep in range(10):
+                count = blocksize // s
+                print "Running tests: %d-byte blocks, %d-byte objects, %d parallel fetches" % (blocksize, s, t)
+                print "Object count:", count
+                if count * t > len(connections):
+                    conns = connections
+                else:
+                    conns = connections[0 : count * t]
+
+                objects = ['file-%d-%d' % (s, i % 100) for i in range(count)]
+                r = parallel_multiget(objects, conns, t)
+                print r
+                logfile.write('%s\t%s\t%s\t%s\t%s\n' % (s, blocksize >> 20, t, len(conns), r))
+                logfile.flush()
+                time.sleep(2.0)
+sys.exit(0)
+
 for s in SIZES:
     print "Priming objects: %d-byte objects" % (s,)
     logfile = open('/dev/null', 'w')