Compile fix for directory renaming; use virtual host access method for S3.
[bluesky.git] / cloudbench / readbench.c
1 /* Simple benchmark for Amazon S3: measures download speeds for
2  * differently-sized objects and with a variable number of parallel
3  * connections. */
4
5 #include <assert.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <string.h>
10 #include <pthread.h>
11 #include <time.h>
12 #include <unistd.h>
13 #include <math.h>
14
15 #include "libs3.h"
16
17 FILE *statsfile;
18
19 S3BucketContext bucket;
20
21 struct thread_state {
22     pthread_t thread;
23     int thread_num;
24     long long timestamp;
25
26     // Time when first bytes of the response were received
27     long long first_byte_timestamp;
28
29     // Statistics for computing mean and standard deviation
30     int n;
31     size_t bytes_sent;
32     double sum_x, sum_x2;
33
34     double sum_f;
35 };
36
37 struct callback_state {
38     struct thread_state *ts;
39     size_t bytes_remaining;
40 };
41
42 #define MAX_THREADS 128
43 struct thread_state threads[MAX_THREADS];
44
45 int experiment_threads, experiment_size, experiment_objects;
46
47 pthread_mutex_t barrier_mutex;
48 pthread_cond_t barrier_cond;
49 int barrier_val;
50
51 enum phase { LAUNCH, MEASURE, TERMINATE };
52 volatile enum phase test_phase;
53
54 void barrier_signal()
55 {
56     pthread_mutex_lock(&barrier_mutex);
57     barrier_val--;
58     printf("Barrier: %d left\n", barrier_val);
59     if (barrier_val == 0)
60         pthread_cond_signal(&barrier_cond);
61     pthread_mutex_unlock(&barrier_mutex);
62 }
63
64 long long get_ns()
65 {
66     struct timespec ts;
67     clock_gettime(CLOCK_MONOTONIC, &ts);
68
69     return ts.tv_sec * 1000000000LL + ts.tv_nsec;
70 }
71
72 static S3Status data_callback(int bufferSize, const char *buffer,
73                               void *callbackData)
74 {
75     struct callback_state *state = (struct callback_state *)callbackData;
76     state->bytes_remaining -= bufferSize;
77     if (state->ts->first_byte_timestamp == 0)
78         state->ts->first_byte_timestamp = get_ns();
79     return S3StatusOK;
80 }
81
82 static S3Status properties_callback(const S3ResponseProperties *properties,
83                                      void *callbackData)
84 {
85     return S3StatusOK;
86 }
87
88 static void complete_callback(S3Status status,
89                               const S3ErrorDetails *errorDetails,
90                               void *callbackData)
91 {
92 }
93
94 static void do_get(const char *key, size_t bytes, struct thread_state *ts)
95 {
96     struct callback_state state;
97     struct S3GetObjectHandler handler;
98
99     state.bytes_remaining = bytes;
100     state.ts = ts;
101     handler.responseHandler.propertiesCallback = properties_callback;
102     handler.responseHandler.completeCallback = complete_callback;
103     handler.getObjectDataCallback = data_callback;
104
105     S3_get_object(&bucket, key, NULL, 0, 0, NULL, &handler, &state);
106 }
107
108 void *benchmark_thread(void *arg)
109 {
110     struct thread_state *ts = (struct thread_state *)arg;
111     char namebuf[64];
112     int i = 0;
113     int stage = 0;
114     int measuring = 0;
115
116     ts->n = 0;
117     ts->sum_x = ts->sum_x2 = ts->sum_f = 0.0;
118     ts->bytes_sent = 0;
119
120     ts->timestamp = get_ns();
121     while (test_phase != TERMINATE) {
122         int object = random() % experiment_objects;
123         sprintf(namebuf, "file-%d-%d", experiment_size, object);
124         ts->first_byte_timestamp = 0;
125         do_get(namebuf, experiment_size, ts);
126         long long timestamp = get_ns();
127         long long elapsed = timestamp - ts->timestamp;
128
129         printf("Elapsed[%d-%d]: %lld ns\n", ts->thread_num, i, elapsed);
130         printf("    first data after: %lld ns\n",
131                ts->first_byte_timestamp - ts->timestamp);
132         if (measuring && test_phase == MEASURE) {
133             double e = elapsed / 1e9;
134             double f = (ts->first_byte_timestamp - ts->timestamp) / 1e9;
135             ts->n++;
136             ts->sum_x += e;
137             ts->sum_x2 += e * e;
138             ts->sum_f += f;
139             ts->bytes_sent += experiment_size;
140         }
141
142         i++;
143         if (stage == 0 && i > 2) {
144             barrier_signal();
145             stage = 1;
146         } else if (stage == 1 && ts->n >= 2) {
147             barrier_signal();
148             stage = 2;
149         }
150
151         ts->timestamp = timestamp;
152         if (test_phase == MEASURE)
153             measuring = 1;
154     }
155
156     return NULL;
157 }
158
159 void launch_thread(int n)
160 {
161     threads[n].thread_num = n;
162     if (pthread_create(&threads[n].thread, NULL, benchmark_thread, &threads[n]) != 0) {
163         fprintf(stderr, "Error launching thread!\n");
164         exit(1);
165     }
166 }
167
168 void wait_thread(int n)
169 {
170     void *result;
171     pthread_join(threads[n].thread, &result);
172 }
173
174 void launch_test(int thread_count)
175 {
176     int i;
177     long long start_time = get_ns();
178
179     test_phase = LAUNCH;
180     barrier_val = thread_count;
181     assert(thread_count <= MAX_THREADS);
182
183     printf("Launching...\n");
184
185     for (i = 0; i < thread_count; i++)
186         launch_thread(i);
187
188     /* Wait until all threads are ready. */
189     pthread_mutex_lock(&barrier_mutex);
190     while (barrier_val > 0) {
191         pthread_cond_wait(&barrier_cond, &barrier_mutex);
192     }
193     pthread_mutex_unlock(&barrier_mutex);
194
195     printf("Measuring...\n");
196     barrier_val = thread_count;
197     test_phase = MEASURE;
198
199     /* Ensure all threads have measured some activity, then a bit more. */
200     pthread_mutex_lock(&barrier_mutex);
201     while (barrier_val > 0) {
202         pthread_cond_wait(&barrier_cond, &barrier_mutex);
203     }
204     pthread_mutex_unlock(&barrier_mutex);
205     printf("Data in from all threads...\n");
206     sleep(5);
207
208     printf("Terminating...\n");
209     test_phase = TERMINATE;
210
211     for (i = 0; i < thread_count; i++)
212         wait_thread(i);
213
214     int n = 0;
215     double sum_x = 0.0, sum_x2 = 0.0, sum_f = 0.0;
216     double bandwidth = 0.0;
217     for (i = 0; i < thread_count; i++) {
218         n += threads[i].n;
219         sum_x += threads[i].sum_x;
220         sum_x2 += threads[i].sum_x2;
221         sum_f += threads[i].sum_f;
222         bandwidth += threads[i].bytes_sent / threads[i].sum_x;
223     }
224
225     double elapsed = (get_ns() - start_time) / 1e9;
226     printf("*** %d threads, %d byte objects\n",
227            experiment_threads, experiment_size);
228     printf("Elapsed: %f s\n", elapsed);
229     printf("Data points: %d\n", n);
230     double mx = sum_x / n;
231     double sx = sqrt((sum_x2 - 2*sum_x*mx + n*mx*mx) / (n - 1));
232     printf("Time: %f ± %f s\n", mx, sx);
233     printf("Latency to first byte: %f\n", sum_f / n);
234     printf("Bandwidth: %f B/s\n", bandwidth);
235
236     fprintf(statsfile, "%d\t%d\t%f\t%d\t%f\t%f\t%f\n",
237             experiment_threads, experiment_size, elapsed, n,
238             mx, sx, bandwidth);
239
240     printf("Finished.\n");
241 }
242
243 int main(int argc, char *argv[])
244 {
245     statsfile = fopen("readbench.data", "a");
246     if (statsfile == NULL) {
247         perror("open stats file");
248         return 1;
249     }
250
251     S3_initialize(NULL, S3_INIT_ALL);
252
253     bucket.bucketName = "mvrable-benchmark";
254     bucket.protocol = S3ProtocolHTTP;
255     bucket.uriStyle = S3UriStyleVirtualHost;
256     bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
257     bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
258
259     pthread_mutex_init(&barrier_mutex, NULL);
260     pthread_cond_init(&barrier_cond, NULL);
261
262     if (argc < 4) {
263         fprintf(stderr, "Usage: %s <threads> <size> <object-count>\n", argv[0]);
264         return 1;
265     }
266
267     experiment_threads = atoi(argv[1]);
268     experiment_size = atoi(argv[2]);
269     experiment_objects = atoi(argv[3]);
270     assert(experiment_objects > 0);
271     launch_test(experiment_threads);
272
273     printf("Done.\n");
274     fclose(statsfile);
275
276     return 0;
277 }