1 /* Simple benchmark for Amazon S3: measures download speeds for
2 * differently-sized objects and with a variable number of parallel
19 S3BucketContext bucket;
26 // Time when first bytes of the response were received
27 long long first_byte_timestamp;
29 // Statistics for computing mean and standard deviation
37 struct callback_state {
38 struct thread_state *ts;
39 size_t bytes_remaining;
42 #define MAX_THREADS 1024
43 struct thread_state threads[MAX_THREADS];
45 int experiment_threads, experiment_size, experiment_objects;
47 pthread_mutex_t barrier_mutex;
48 pthread_cond_t barrier_cond;
51 pthread_mutex_t wait_mutex;
52 pthread_cond_t wait_cond;
55 enum phase { LAUNCH, MEASURE, TERMINATE };
56 volatile enum phase test_phase;
60 pthread_mutex_lock(&barrier_mutex);
62 printf("Barrier: %d left\n", barrier_val);
64 pthread_cond_signal(&barrier_cond);
65 pthread_mutex_unlock(&barrier_mutex);
71 clock_gettime(CLOCK_MONOTONIC, &ts);
73 return ts.tv_sec * 1000000000LL + ts.tv_nsec;
76 static S3Status data_callback(int bufferSize, const char *buffer,
79 struct callback_state *state = (struct callback_state *)callbackData;
80 state->bytes_remaining -= bufferSize;
81 if (state->ts->first_byte_timestamp == 0)
82 state->ts->first_byte_timestamp = get_ns();
86 static S3Status properties_callback(const S3ResponseProperties *properties,
92 static void complete_callback(S3Status status,
93 const S3ErrorDetails *errorDetails,
98 static void do_get(const char *key, size_t bytes, struct thread_state *ts)
100 struct callback_state state;
101 struct S3GetObjectHandler handler;
103 state.bytes_remaining = bytes;
105 handler.responseHandler.propertiesCallback = properties_callback;
106 handler.responseHandler.completeCallback = complete_callback;
107 handler.getObjectDataCallback = data_callback;
109 S3_get_object(&bucket, key, NULL, 0, 0, NULL, &handler, &state);
112 void *benchmark_thread(void *arg)
114 struct thread_state *ts = (struct thread_state *)arg;
117 printf("Warming up...\n");
118 do_get("file-1048576-0", 0, ts);
122 pthread_mutex_lock(&wait_mutex);
123 while (wait_val == 0)
124 pthread_cond_wait(&wait_cond, &wait_mutex);
125 pthread_mutex_unlock(&wait_mutex);
127 ts->timestamp = get_ns();
128 sprintf(namebuf, "file-%d-%d", experiment_size, ts->thread_num);
129 ts->first_byte_timestamp = 0;
130 do_get(namebuf, experiment_size, ts);
131 long long timestamp = get_ns();
132 long long elapsed = timestamp - ts->timestamp;
136 printf("Thread %d: %f elapsed\n", ts->thread_num, elapsed / 1e9);
141 void launch_thread(int n)
143 threads[n].thread_num = n;
144 if (pthread_create(&threads[n].thread, NULL, benchmark_thread, &threads[n]) != 0) {
145 fprintf(stderr, "Error launching thread!\n");
150 void wait_thread(int n)
153 pthread_join(threads[n].thread, &result);
156 void launch_test(int thread_count)
160 barrier_val = thread_count;
161 assert(thread_count <= MAX_THREADS);
163 for (i = 0; i < thread_count; i++)
166 /* Wait until all threads are ready. */
167 pthread_mutex_lock(&barrier_mutex);
168 while (barrier_val > 0) {
169 pthread_cond_wait(&barrier_cond, &barrier_mutex);
171 pthread_mutex_unlock(&barrier_mutex);
174 barrier_val = thread_count;
175 pthread_mutex_lock(&wait_mutex);
176 printf("Launching test\n");
177 long long start_time = get_ns();
179 pthread_cond_broadcast(&wait_cond);
180 pthread_mutex_unlock(&wait_mutex);
182 /* Wait until all threads are ready. */
183 pthread_mutex_lock(&barrier_mutex);
184 while (barrier_val > 0) {
185 pthread_cond_wait(&barrier_cond, &barrier_mutex);
187 pthread_mutex_unlock(&barrier_mutex);
189 long long end_time = get_ns();
191 printf("Elapsed time: %f\n", (end_time - start_time) / 1e9);
192 fprintf(statsfile, "%d\t%d\t%f\n", experiment_threads, experiment_size,
193 (end_time - start_time) / 1e9);
196 int main(int argc, char *argv[])
198 statsfile = fopen("readlatency.data", "a");
199 if (statsfile == NULL) {
200 perror("open stats file");
204 S3_initialize(NULL, S3_INIT_ALL);
206 bucket.bucketName = "mvrable-benchmark";
207 bucket.protocol = S3ProtocolHTTP;
208 bucket.uriStyle = S3UriStyleVirtualHost;
209 bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
210 bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
212 pthread_mutex_init(&barrier_mutex, NULL);
213 pthread_cond_init(&barrier_cond, NULL);
214 pthread_mutex_init(&wait_mutex, NULL);
215 pthread_cond_init(&wait_cond, NULL);
218 fprintf(stderr, "Usage: %s <threads> <size>\n", argv[0]);
222 experiment_threads = atoi(argv[1]);
223 experiment_size = atoi(argv[2]);
224 launch_test(experiment_threads);