1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2010 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the University nor the names of its contributors
15 * may be used to endorse or promote products derived from this software
16 * without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31 /* Simple benchmark for Amazon S3: measures download speeds for
32 * differently-sized objects and with a variable number of parallel
49 S3BucketContext bucket;
56 // Time when first bytes of the response were received
57 long long first_byte_timestamp;
59 // Statistics for computing mean and standard deviation
67 struct callback_state {
68 struct thread_state *ts;
69 size_t bytes_remaining;
72 #define MAX_THREADS 128
73 struct thread_state threads[MAX_THREADS];
75 int experiment_threads, experiment_size, experiment_objects;
78 pthread_mutex_t barrier_mutex;
79 pthread_cond_t barrier_cond;
82 enum phase { LAUNCH, MEASURE, TERMINATE };
83 volatile enum phase test_phase;
87 pthread_mutex_lock(&barrier_mutex);
89 printf("Barrier: %d left\n", barrier_val);
91 pthread_cond_signal(&barrier_cond);
92 pthread_mutex_unlock(&barrier_mutex);
98 clock_gettime(CLOCK_MONOTONIC, &ts);
100 return ts.tv_sec * 1000000000LL + ts.tv_nsec;
103 static S3Status data_callback(int bufferSize, const char *buffer,
106 struct callback_state *state = (struct callback_state *)callbackData;
107 state->bytes_remaining -= bufferSize;
108 if (state->ts->first_byte_timestamp == 0)
109 state->ts->first_byte_timestamp = get_ns();
113 static S3Status properties_callback(const S3ResponseProperties *properties,
119 static void complete_callback(S3Status status,
120 const S3ErrorDetails *errorDetails,
125 static void do_get(const char *key, size_t bytes, struct thread_state *ts,
128 struct callback_state state;
129 struct S3GetObjectHandler handler;
131 state.bytes_remaining = bytes;
133 handler.responseHandler.propertiesCallback = properties_callback;
134 handler.responseHandler.completeCallback = complete_callback;
135 handler.getObjectDataCallback = data_callback;
137 S3_get_object(&bucket, key, NULL, offset, range_size, NULL, &handler, &state);
140 void *benchmark_thread(void *arg)
142 struct thread_state *ts = (struct thread_state *)arg;
149 ts->sum_x = ts->sum_x2 = ts->sum_f = 0.0;
152 ts->timestamp = get_ns();
153 while (test_phase != TERMINATE) {
154 int object = random() % experiment_objects;
156 sprintf(namebuf, "file-%d-%d", experiment_size, object);
158 offset = (random() % (experiment_size / range_size)) * range_size;
160 ts->first_byte_timestamp = 0;
161 do_get(namebuf, experiment_size, ts, offset);
162 long long timestamp = get_ns();
163 long long elapsed = timestamp - ts->timestamp;
165 printf("Elapsed[%d-%d]: %lld ns\n", ts->thread_num, i, elapsed);
166 printf(" first data after: %lld ns\n",
167 ts->first_byte_timestamp - ts->timestamp);
168 if (measuring && test_phase == MEASURE) {
169 double e = elapsed / 1e9;
170 double f = (ts->first_byte_timestamp - ts->timestamp) / 1e9;
175 ts->bytes_sent += range_size ? range_size : experiment_size;
179 if (stage == 0 && i > 2) {
182 } else if (stage == 1 && ts->n >= 2) {
187 ts->timestamp = timestamp;
188 if (test_phase == MEASURE)
195 void launch_thread(int n)
197 threads[n].thread_num = n;
198 if (pthread_create(&threads[n].thread, NULL, benchmark_thread, &threads[n]) != 0) {
199 fprintf(stderr, "Error launching thread!\n");
204 void wait_thread(int n)
207 pthread_join(threads[n].thread, &result);
210 void launch_test(int thread_count)
213 long long start_time = get_ns();
216 barrier_val = thread_count;
217 assert(thread_count <= MAX_THREADS);
219 printf("Launching...\n");
221 for (i = 0; i < thread_count; i++)
224 /* Wait until all threads are ready. */
225 pthread_mutex_lock(&barrier_mutex);
226 while (barrier_val > 0) {
227 pthread_cond_wait(&barrier_cond, &barrier_mutex);
229 pthread_mutex_unlock(&barrier_mutex);
231 printf("Measuring...\n");
232 barrier_val = thread_count;
233 test_phase = MEASURE;
235 /* Ensure all threads have measured some activity, then a bit more. */
236 pthread_mutex_lock(&barrier_mutex);
237 while (barrier_val > 0) {
238 pthread_cond_wait(&barrier_cond, &barrier_mutex);
240 pthread_mutex_unlock(&barrier_mutex);
241 printf("Data in from all threads...\n");
244 printf("Terminating...\n");
245 test_phase = TERMINATE;
247 for (i = 0; i < thread_count; i++)
251 double sum_x = 0.0, sum_x2 = 0.0, sum_f = 0.0;
252 double bandwidth = 0.0;
253 for (i = 0; i < thread_count; i++) {
255 sum_x += threads[i].sum_x;
256 sum_x2 += threads[i].sum_x2;
257 sum_f += threads[i].sum_f;
258 bandwidth += threads[i].bytes_sent / threads[i].sum_x;
261 double elapsed = (get_ns() - start_time) / 1e9;
262 printf("*** %d threads, %d byte objects, %d byte ranges\n",
263 experiment_threads, experiment_size, range_size);
264 printf("Elapsed: %f s\n", elapsed);
265 printf("Data points: %d\n", n);
266 double mx = sum_x / n;
267 double sx = sqrt((sum_x2 - 2*sum_x*mx + n*mx*mx) / (n - 1));
268 printf("Time: %f ± %f s\n", mx, sx);
269 printf("Latency to first byte: %f\n", sum_f / n);
270 printf("Bandwidth: %f B/s\n", bandwidth);
272 fprintf(statsfile, "%d\t%d\t%f\t%d\t%f\t%f\t%f\n",
273 experiment_threads, experiment_size, elapsed, n,
276 printf("Finished.\n");
279 int main(int argc, char *argv[])
281 statsfile = fopen("readbench.data", "a");
282 if (statsfile == NULL) {
283 perror("open stats file");
287 S3_initialize(NULL, S3_INIT_ALL, NULL);
289 bucket.bucketName = "mvrable-benchmark";
290 bucket.protocol = S3ProtocolHTTP;
291 bucket.uriStyle = S3UriStyleVirtualHost;
292 bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
293 bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
295 pthread_mutex_init(&barrier_mutex, NULL);
296 pthread_cond_init(&barrier_cond, NULL);
299 fprintf(stderr, "Usage: %s <threads> <size> <object-count>\n", argv[0]);
303 experiment_threads = atoi(argv[1]);
304 experiment_size = atoi(argv[2]);
305 experiment_objects = atoi(argv[3]);
307 range_size = atoi(argv[4]);
309 assert(experiment_objects > 0);
310 launch_test(experiment_threads);