Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / cloudbench / readbench.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2010  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 /* Simple benchmark for Amazon S3: measures download speeds for
32  * differently-sized objects and with a variable number of parallel
33  * connections. */
34
35 #include <assert.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <stdint.h>
39 #include <string.h>
40 #include <pthread.h>
41 #include <time.h>
42 #include <unistd.h>
43 #include <math.h>
44
45 #include "libs3.h"
46
47 FILE *statsfile;
48
49 S3BucketContext bucket;
50
51 struct thread_state {
52     pthread_t thread;
53     int thread_num;
54     long long timestamp;
55
56     // Time when first bytes of the response were received
57     long long first_byte_timestamp;
58
59     // Statistics for computing mean and standard deviation
60     int n;
61     size_t bytes_sent;
62     double sum_x, sum_x2;
63
64     double sum_f;
65 };
66
67 struct callback_state {
68     struct thread_state *ts;
69     size_t bytes_remaining;
70 };
71
72 #define MAX_THREADS 128
73 struct thread_state threads[MAX_THREADS];
74
75 int experiment_threads, experiment_size, experiment_objects;
76 int range_size = 0;
77
78 pthread_mutex_t barrier_mutex;
79 pthread_cond_t barrier_cond;
80 int barrier_val;
81
82 enum phase { LAUNCH, MEASURE, TERMINATE };
83 volatile enum phase test_phase;
84
85 void barrier_signal()
86 {
87     pthread_mutex_lock(&barrier_mutex);
88     barrier_val--;
89     printf("Barrier: %d left\n", barrier_val);
90     if (barrier_val == 0)
91         pthread_cond_signal(&barrier_cond);
92     pthread_mutex_unlock(&barrier_mutex);
93 }
94
95 long long get_ns()
96 {
97     struct timespec ts;
98     clock_gettime(CLOCK_MONOTONIC, &ts);
99
100     return ts.tv_sec * 1000000000LL + ts.tv_nsec;
101 }
102
103 static S3Status data_callback(int bufferSize, const char *buffer,
104                               void *callbackData)
105 {
106     struct callback_state *state = (struct callback_state *)callbackData;
107     state->bytes_remaining -= bufferSize;
108     if (state->ts->first_byte_timestamp == 0)
109         state->ts->first_byte_timestamp = get_ns();
110     return S3StatusOK;
111 }
112
113 static S3Status properties_callback(const S3ResponseProperties *properties,
114                                      void *callbackData)
115 {
116     return S3StatusOK;
117 }
118
119 static void complete_callback(S3Status status,
120                               const S3ErrorDetails *errorDetails,
121                               void *callbackData)
122 {
123 }
124
125 static void do_get(const char *key, size_t bytes, struct thread_state *ts,
126                    size_t offset)
127 {
128     struct callback_state state;
129     struct S3GetObjectHandler handler;
130
131     state.bytes_remaining = bytes;
132     state.ts = ts;
133     handler.responseHandler.propertiesCallback = properties_callback;
134     handler.responseHandler.completeCallback = complete_callback;
135     handler.getObjectDataCallback = data_callback;
136
137     S3_get_object(&bucket, key, NULL, offset, range_size, NULL, &handler, &state);
138 }
139
140 void *benchmark_thread(void *arg)
141 {
142     struct thread_state *ts = (struct thread_state *)arg;
143     char namebuf[64];
144     int i = 0;
145     int stage = 0;
146     int measuring = 0;
147
148     ts->n = 0;
149     ts->sum_x = ts->sum_x2 = ts->sum_f = 0.0;
150     ts->bytes_sent = 0;
151
152     ts->timestamp = get_ns();
153     while (test_phase != TERMINATE) {
154         int object = random() % experiment_objects;
155         int offset = 0;
156         sprintf(namebuf, "file-%d-%d", experiment_size, object);
157         if (range_size) {
158             offset = (random() % (experiment_size / range_size)) * range_size;
159         }
160         ts->first_byte_timestamp = 0;
161         do_get(namebuf, experiment_size, ts, offset);
162         long long timestamp = get_ns();
163         long long elapsed = timestamp - ts->timestamp;
164
165         printf("Elapsed[%d-%d]: %lld ns\n", ts->thread_num, i, elapsed);
166         printf("    first data after: %lld ns\n",
167                ts->first_byte_timestamp - ts->timestamp);
168         if (measuring && test_phase == MEASURE) {
169             double e = elapsed / 1e9;
170             double f = (ts->first_byte_timestamp - ts->timestamp) / 1e9;
171             ts->n++;
172             ts->sum_x += e;
173             ts->sum_x2 += e * e;
174             ts->sum_f += f;
175             ts->bytes_sent += range_size ? range_size : experiment_size;
176         }
177
178         i++;
179         if (stage == 0 && i > 2) {
180             barrier_signal();
181             stage = 1;
182         } else if (stage == 1 && ts->n >= 2) {
183             barrier_signal();
184             stage = 2;
185         }
186
187         ts->timestamp = timestamp;
188         if (test_phase == MEASURE)
189             measuring = 1;
190     }
191
192     return NULL;
193 }
194
195 void launch_thread(int n)
196 {
197     threads[n].thread_num = n;
198     if (pthread_create(&threads[n].thread, NULL, benchmark_thread, &threads[n]) != 0) {
199         fprintf(stderr, "Error launching thread!\n");
200         exit(1);
201     }
202 }
203
204 void wait_thread(int n)
205 {
206     void *result;
207     pthread_join(threads[n].thread, &result);
208 }
209
210 void launch_test(int thread_count)
211 {
212     int i;
213     long long start_time = get_ns();
214
215     test_phase = LAUNCH;
216     barrier_val = thread_count;
217     assert(thread_count <= MAX_THREADS);
218
219     printf("Launching...\n");
220
221     for (i = 0; i < thread_count; i++)
222         launch_thread(i);
223
224     /* Wait until all threads are ready. */
225     pthread_mutex_lock(&barrier_mutex);
226     while (barrier_val > 0) {
227         pthread_cond_wait(&barrier_cond, &barrier_mutex);
228     }
229     pthread_mutex_unlock(&barrier_mutex);
230
231     printf("Measuring...\n");
232     barrier_val = thread_count;
233     test_phase = MEASURE;
234
235     /* Ensure all threads have measured some activity, then a bit more. */
236     pthread_mutex_lock(&barrier_mutex);
237     while (barrier_val > 0) {
238         pthread_cond_wait(&barrier_cond, &barrier_mutex);
239     }
240     pthread_mutex_unlock(&barrier_mutex);
241     printf("Data in from all threads...\n");
242     sleep(5);
243
244     printf("Terminating...\n");
245     test_phase = TERMINATE;
246
247     for (i = 0; i < thread_count; i++)
248         wait_thread(i);
249
250     int n = 0;
251     double sum_x = 0.0, sum_x2 = 0.0, sum_f = 0.0;
252     double bandwidth = 0.0;
253     for (i = 0; i < thread_count; i++) {
254         n += threads[i].n;
255         sum_x += threads[i].sum_x;
256         sum_x2 += threads[i].sum_x2;
257         sum_f += threads[i].sum_f;
258         bandwidth += threads[i].bytes_sent / threads[i].sum_x;
259     }
260
261     double elapsed = (get_ns() - start_time) / 1e9;
262     printf("*** %d threads, %d byte objects, %d byte ranges\n",
263            experiment_threads, experiment_size, range_size);
264     printf("Elapsed: %f s\n", elapsed);
265     printf("Data points: %d\n", n);
266     double mx = sum_x / n;
267     double sx = sqrt((sum_x2 - 2*sum_x*mx + n*mx*mx) / (n - 1));
268     printf("Time: %f ± %f s\n", mx, sx);
269     printf("Latency to first byte: %f\n", sum_f / n);
270     printf("Bandwidth: %f B/s\n", bandwidth);
271
272     fprintf(statsfile, "%d\t%d\t%f\t%d\t%f\t%f\t%f\n",
273             experiment_threads, experiment_size, elapsed, n,
274             mx, sx, bandwidth);
275
276     printf("Finished.\n");
277 }
278
279 int main(int argc, char *argv[])
280 {
281     statsfile = fopen("readbench.data", "a");
282     if (statsfile == NULL) {
283         perror("open stats file");
284         return 1;
285     }
286
287     S3_initialize(NULL, S3_INIT_ALL, NULL);
288
289     bucket.bucketName = "mvrable-benchmark";
290     bucket.protocol = S3ProtocolHTTP;
291     bucket.uriStyle = S3UriStyleVirtualHost;
292     bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
293     bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
294
295     pthread_mutex_init(&barrier_mutex, NULL);
296     pthread_cond_init(&barrier_cond, NULL);
297
298     if (argc < 4) {
299         fprintf(stderr, "Usage: %s <threads> <size> <object-count>\n", argv[0]);
300         return 1;
301     }
302
303     experiment_threads = atoi(argv[1]);
304     experiment_size = atoi(argv[2]);
305     experiment_objects = atoi(argv[3]);
306     if (argc > 4) {
307         range_size = atoi(argv[4]);
308     }
309     assert(experiment_objects > 0);
310     launch_test(experiment_threads);
311
312     printf("Done.\n");
313     fclose(statsfile);
314
315     return 0;
316 }