ac4185e9db78a094922eaf142d8f748e3633e20d
[bluesky.git] / cloudbench / readbench.c
1 /* Simple benchmark for Amazon S3: measures download speeds for
2  * differently-sized objects and with a variable number of parallel
3  * connections. */
4
5 #include <assert.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <string.h>
10 #include <pthread.h>
11 #include <time.h>
12 #include <unistd.h>
13 #include <math.h>
14
15 #include "libs3.h"
16
17 FILE *statsfile;
18
19 S3BucketContext bucket;
20
21 struct thread_state {
22     pthread_t thread;
23     int thread_num;
24     long long timestamp;
25
26     // Time when first bytes of the response were received
27     long long first_byte_timestamp;
28
29     // Statistics for computing mean and standard deviation
30     int n;
31     size_t bytes_sent;
32     double sum_x, sum_x2;
33
34     double sum_f;
35 };
36
37 struct callback_state {
38     struct thread_state *ts;
39     size_t bytes_remaining;
40 };
41
42 #define MAX_THREADS 128
43 struct thread_state threads[MAX_THREADS];
44
45 int experiment_threads, experiment_size, experiment_objects;
46 int range_size = 0;
47
48 pthread_mutex_t barrier_mutex;
49 pthread_cond_t barrier_cond;
50 int barrier_val;
51
52 enum phase { LAUNCH, MEASURE, TERMINATE };
53 volatile enum phase test_phase;
54
55 void barrier_signal()
56 {
57     pthread_mutex_lock(&barrier_mutex);
58     barrier_val--;
59     printf("Barrier: %d left\n", barrier_val);
60     if (barrier_val == 0)
61         pthread_cond_signal(&barrier_cond);
62     pthread_mutex_unlock(&barrier_mutex);
63 }
64
65 long long get_ns()
66 {
67     struct timespec ts;
68     clock_gettime(CLOCK_MONOTONIC, &ts);
69
70     return ts.tv_sec * 1000000000LL + ts.tv_nsec;
71 }
72
73 static S3Status data_callback(int bufferSize, const char *buffer,
74                               void *callbackData)
75 {
76     struct callback_state *state = (struct callback_state *)callbackData;
77     state->bytes_remaining -= bufferSize;
78     if (state->ts->first_byte_timestamp == 0)
79         state->ts->first_byte_timestamp = get_ns();
80     return S3StatusOK;
81 }
82
83 static S3Status properties_callback(const S3ResponseProperties *properties,
84                                      void *callbackData)
85 {
86     return S3StatusOK;
87 }
88
89 static void complete_callback(S3Status status,
90                               const S3ErrorDetails *errorDetails,
91                               void *callbackData)
92 {
93 }
94
95 static void do_get(const char *key, size_t bytes, struct thread_state *ts,
96                    size_t offset)
97 {
98     struct callback_state state;
99     struct S3GetObjectHandler handler;
100
101     state.bytes_remaining = bytes;
102     state.ts = ts;
103     handler.responseHandler.propertiesCallback = properties_callback;
104     handler.responseHandler.completeCallback = complete_callback;
105     handler.getObjectDataCallback = data_callback;
106
107     S3_get_object(&bucket, key, NULL, offset, range_size, NULL, &handler, &state);
108 }
109
110 void *benchmark_thread(void *arg)
111 {
112     struct thread_state *ts = (struct thread_state *)arg;
113     char namebuf[64];
114     int i = 0;
115     int stage = 0;
116     int measuring = 0;
117
118     ts->n = 0;
119     ts->sum_x = ts->sum_x2 = ts->sum_f = 0.0;
120     ts->bytes_sent = 0;
121
122     ts->timestamp = get_ns();
123     while (test_phase != TERMINATE) {
124         int object = random() % experiment_objects;
125         int offset = 0;
126         sprintf(namebuf, "file-%d-%d", experiment_size, object);
127         if (range_size) {
128             offset = (random() % (experiment_size / range_size)) * range_size;
129         }
130         ts->first_byte_timestamp = 0;
131         do_get(namebuf, experiment_size, ts, offset);
132         long long timestamp = get_ns();
133         long long elapsed = timestamp - ts->timestamp;
134
135         printf("Elapsed[%d-%d]: %lld ns\n", ts->thread_num, i, elapsed);
136         printf("    first data after: %lld ns\n",
137                ts->first_byte_timestamp - ts->timestamp);
138         if (measuring && test_phase == MEASURE) {
139             double e = elapsed / 1e9;
140             double f = (ts->first_byte_timestamp - ts->timestamp) / 1e9;
141             ts->n++;
142             ts->sum_x += e;
143             ts->sum_x2 += e * e;
144             ts->sum_f += f;
145             ts->bytes_sent += range_size ? range_size : experiment_size;
146         }
147
148         i++;
149         if (stage == 0 && i > 2) {
150             barrier_signal();
151             stage = 1;
152         } else if (stage == 1 && ts->n >= 2) {
153             barrier_signal();
154             stage = 2;
155         }
156
157         ts->timestamp = timestamp;
158         if (test_phase == MEASURE)
159             measuring = 1;
160     }
161
162     return NULL;
163 }
164
165 void launch_thread(int n)
166 {
167     threads[n].thread_num = n;
168     if (pthread_create(&threads[n].thread, NULL, benchmark_thread, &threads[n]) != 0) {
169         fprintf(stderr, "Error launching thread!\n");
170         exit(1);
171     }
172 }
173
174 void wait_thread(int n)
175 {
176     void *result;
177     pthread_join(threads[n].thread, &result);
178 }
179
180 void launch_test(int thread_count)
181 {
182     int i;
183     long long start_time = get_ns();
184
185     test_phase = LAUNCH;
186     barrier_val = thread_count;
187     assert(thread_count <= MAX_THREADS);
188
189     printf("Launching...\n");
190
191     for (i = 0; i < thread_count; i++)
192         launch_thread(i);
193
194     /* Wait until all threads are ready. */
195     pthread_mutex_lock(&barrier_mutex);
196     while (barrier_val > 0) {
197         pthread_cond_wait(&barrier_cond, &barrier_mutex);
198     }
199     pthread_mutex_unlock(&barrier_mutex);
200
201     printf("Measuring...\n");
202     barrier_val = thread_count;
203     test_phase = MEASURE;
204
205     /* Ensure all threads have measured some activity, then a bit more. */
206     pthread_mutex_lock(&barrier_mutex);
207     while (barrier_val > 0) {
208         pthread_cond_wait(&barrier_cond, &barrier_mutex);
209     }
210     pthread_mutex_unlock(&barrier_mutex);
211     printf("Data in from all threads...\n");
212     sleep(5);
213
214     printf("Terminating...\n");
215     test_phase = TERMINATE;
216
217     for (i = 0; i < thread_count; i++)
218         wait_thread(i);
219
220     int n = 0;
221     double sum_x = 0.0, sum_x2 = 0.0, sum_f = 0.0;
222     double bandwidth = 0.0;
223     for (i = 0; i < thread_count; i++) {
224         n += threads[i].n;
225         sum_x += threads[i].sum_x;
226         sum_x2 += threads[i].sum_x2;
227         sum_f += threads[i].sum_f;
228         bandwidth += threads[i].bytes_sent / threads[i].sum_x;
229     }
230
231     double elapsed = (get_ns() - start_time) / 1e9;
232     printf("*** %d threads, %d byte objects, %d byte ranges\n",
233            experiment_threads, experiment_size, range_size);
234     printf("Elapsed: %f s\n", elapsed);
235     printf("Data points: %d\n", n);
236     double mx = sum_x / n;
237     double sx = sqrt((sum_x2 - 2*sum_x*mx + n*mx*mx) / (n - 1));
238     printf("Time: %f ± %f s\n", mx, sx);
239     printf("Latency to first byte: %f\n", sum_f / n);
240     printf("Bandwidth: %f B/s\n", bandwidth);
241
242     fprintf(statsfile, "%d\t%d\t%f\t%d\t%f\t%f\t%f\n",
243             experiment_threads, experiment_size, elapsed, n,
244             mx, sx, bandwidth);
245
246     printf("Finished.\n");
247 }
248
249 int main(int argc, char *argv[])
250 {
251     statsfile = fopen("readbench.data", "a");
252     if (statsfile == NULL) {
253         perror("open stats file");
254         return 1;
255     }
256
257     S3_initialize(NULL, S3_INIT_ALL, NULL);
258
259     bucket.bucketName = "mvrable-benchmark";
260     bucket.protocol = S3ProtocolHTTP;
261     bucket.uriStyle = S3UriStyleVirtualHost;
262     bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
263     bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
264
265     pthread_mutex_init(&barrier_mutex, NULL);
266     pthread_cond_init(&barrier_cond, NULL);
267
268     if (argc < 4) {
269         fprintf(stderr, "Usage: %s <threads> <size> <object-count>\n", argv[0]);
270         return 1;
271     }
272
273     experiment_threads = atoi(argv[1]);
274     experiment_size = atoi(argv[2]);
275     experiment_objects = atoi(argv[3]);
276     if (argc > 4) {
277         range_size = atoi(argv[4]);
278     }
279     assert(experiment_objects > 0);
280     launch_test(experiment_threads);
281
282     printf("Done.\n");
283     fclose(statsfile);
284
285     return 0;
286 }