1 /* Simple benchmark for Amazon S3: measures download speeds for
2 * differently-sized objects and with a variable number of parallel
19 S3BucketContext bucket;
26 // Time when first bytes of the response were received
27 long long first_byte_timestamp;
29 // Statistics for computing mean and standard deviation
37 struct callback_state {
38 struct thread_state *ts;
39 size_t bytes_remaining;
42 #define MAX_THREADS 128
43 struct thread_state threads[MAX_THREADS];
45 int experiment_threads, experiment_size, experiment_objects;
48 pthread_mutex_t barrier_mutex;
49 pthread_cond_t barrier_cond;
52 enum phase { LAUNCH, MEASURE, TERMINATE };
53 volatile enum phase test_phase;
57 pthread_mutex_lock(&barrier_mutex);
59 printf("Barrier: %d left\n", barrier_val);
61 pthread_cond_signal(&barrier_cond);
62 pthread_mutex_unlock(&barrier_mutex);
68 clock_gettime(CLOCK_MONOTONIC, &ts);
70 return ts.tv_sec * 1000000000LL + ts.tv_nsec;
73 static S3Status data_callback(int bufferSize, const char *buffer,
76 struct callback_state *state = (struct callback_state *)callbackData;
77 state->bytes_remaining -= bufferSize;
78 if (state->ts->first_byte_timestamp == 0)
79 state->ts->first_byte_timestamp = get_ns();
83 static S3Status properties_callback(const S3ResponseProperties *properties,
89 static void complete_callback(S3Status status,
90 const S3ErrorDetails *errorDetails,
95 static void do_get(const char *key, size_t bytes, struct thread_state *ts,
98 struct callback_state state;
99 struct S3GetObjectHandler handler;
101 state.bytes_remaining = bytes;
103 handler.responseHandler.propertiesCallback = properties_callback;
104 handler.responseHandler.completeCallback = complete_callback;
105 handler.getObjectDataCallback = data_callback;
107 S3_get_object(&bucket, key, NULL, offset, range_size, NULL, &handler, &state);
110 void *benchmark_thread(void *arg)
112 struct thread_state *ts = (struct thread_state *)arg;
119 ts->sum_x = ts->sum_x2 = ts->sum_f = 0.0;
122 ts->timestamp = get_ns();
123 while (test_phase != TERMINATE) {
124 int object = random() % experiment_objects;
126 sprintf(namebuf, "file-%d-%d", experiment_size, object);
128 offset = (random() % (experiment_size / range_size)) * range_size;
130 ts->first_byte_timestamp = 0;
131 do_get(namebuf, experiment_size, ts, offset);
132 long long timestamp = get_ns();
133 long long elapsed = timestamp - ts->timestamp;
135 printf("Elapsed[%d-%d]: %lld ns\n", ts->thread_num, i, elapsed);
136 printf(" first data after: %lld ns\n",
137 ts->first_byte_timestamp - ts->timestamp);
138 if (measuring && test_phase == MEASURE) {
139 double e = elapsed / 1e9;
140 double f = (ts->first_byte_timestamp - ts->timestamp) / 1e9;
145 ts->bytes_sent += range_size ? range_size : experiment_size;
149 if (stage == 0 && i > 2) {
152 } else if (stage == 1 && ts->n >= 2) {
157 ts->timestamp = timestamp;
158 if (test_phase == MEASURE)
165 void launch_thread(int n)
167 threads[n].thread_num = n;
168 if (pthread_create(&threads[n].thread, NULL, benchmark_thread, &threads[n]) != 0) {
169 fprintf(stderr, "Error launching thread!\n");
174 void wait_thread(int n)
177 pthread_join(threads[n].thread, &result);
180 void launch_test(int thread_count)
183 long long start_time = get_ns();
186 barrier_val = thread_count;
187 assert(thread_count <= MAX_THREADS);
189 printf("Launching...\n");
191 for (i = 0; i < thread_count; i++)
194 /* Wait until all threads are ready. */
195 pthread_mutex_lock(&barrier_mutex);
196 while (barrier_val > 0) {
197 pthread_cond_wait(&barrier_cond, &barrier_mutex);
199 pthread_mutex_unlock(&barrier_mutex);
201 printf("Measuring...\n");
202 barrier_val = thread_count;
203 test_phase = MEASURE;
205 /* Ensure all threads have measured some activity, then a bit more. */
206 pthread_mutex_lock(&barrier_mutex);
207 while (barrier_val > 0) {
208 pthread_cond_wait(&barrier_cond, &barrier_mutex);
210 pthread_mutex_unlock(&barrier_mutex);
211 printf("Data in from all threads...\n");
214 printf("Terminating...\n");
215 test_phase = TERMINATE;
217 for (i = 0; i < thread_count; i++)
221 double sum_x = 0.0, sum_x2 = 0.0, sum_f = 0.0;
222 double bandwidth = 0.0;
223 for (i = 0; i < thread_count; i++) {
225 sum_x += threads[i].sum_x;
226 sum_x2 += threads[i].sum_x2;
227 sum_f += threads[i].sum_f;
228 bandwidth += threads[i].bytes_sent / threads[i].sum_x;
231 double elapsed = (get_ns() - start_time) / 1e9;
232 printf("*** %d threads, %d byte objects, %d byte ranges\n",
233 experiment_threads, experiment_size, range_size);
234 printf("Elapsed: %f s\n", elapsed);
235 printf("Data points: %d\n", n);
236 double mx = sum_x / n;
237 double sx = sqrt((sum_x2 - 2*sum_x*mx + n*mx*mx) / (n - 1));
238 printf("Time: %f ± %f s\n", mx, sx);
239 printf("Latency to first byte: %f\n", sum_f / n);
240 printf("Bandwidth: %f B/s\n", bandwidth);
242 fprintf(statsfile, "%d\t%d\t%f\t%d\t%f\t%f\t%f\n",
243 experiment_threads, experiment_size, elapsed, n,
246 printf("Finished.\n");
249 int main(int argc, char *argv[])
251 statsfile = fopen("readbench.data", "a");
252 if (statsfile == NULL) {
253 perror("open stats file");
257 S3_initialize(NULL, S3_INIT_ALL);
259 bucket.bucketName = "mvrable-benchmark";
260 bucket.protocol = S3ProtocolHTTP;
261 bucket.uriStyle = S3UriStyleVirtualHost;
262 bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
263 bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
265 pthread_mutex_init(&barrier_mutex, NULL);
266 pthread_cond_init(&barrier_cond, NULL);
269 fprintf(stderr, "Usage: %s <threads> <size> <object-count>\n", argv[0]);
273 experiment_threads = atoi(argv[1]);
274 experiment_size = atoi(argv[2]);
275 experiment_objects = atoi(argv[3]);
277 range_size = atoi(argv[4]);
279 assert(experiment_objects > 0);
280 launch_test(experiment_threads);