05fbaa71b6dc77c8c0eb46c919b2c0416016d7df
[bluesky.git] / cloudbench / readlatency.c
1 /* Simple benchmark for Amazon S3: measures download speeds for
2  * differently-sized objects and with a variable number of parallel
3  * connections. */
4
5 #include <assert.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <string.h>
10 #include <pthread.h>
11 #include <time.h>
12 #include <unistd.h>
13 #include <math.h>
14
15 #include "libs3.h"
16
17 FILE *statsfile;
18
19 S3BucketContext bucket;
20
21 struct thread_state {
22     pthread_t thread;
23     int thread_num;
24     long long timestamp;
25
26     // Time when first bytes of the response were received
27     long long first_byte_timestamp;
28
29     // Statistics for computing mean and standard deviation
30     int n;
31     size_t bytes_sent;
32     double sum_x, sum_x2;
33
34     double sum_f;
35 };
36
37 struct callback_state {
38     struct thread_state *ts;
39     size_t bytes_remaining;
40 };
41
42 #define MAX_THREADS 1024
43 struct thread_state threads[MAX_THREADS];
44
45 int experiment_threads, experiment_size, experiment_objects;
46
47 pthread_mutex_t barrier_mutex;
48 pthread_cond_t barrier_cond;
49 int barrier_val;
50
51 pthread_mutex_t wait_mutex;
52 pthread_cond_t wait_cond;
53 int wait_val = 0;
54
55 enum phase { LAUNCH, MEASURE, TERMINATE };
56 volatile enum phase test_phase;
57
58 void barrier_signal()
59 {
60     pthread_mutex_lock(&barrier_mutex);
61     barrier_val--;
62     printf("Barrier: %d left\n", barrier_val);
63     if (barrier_val == 0)
64         pthread_cond_signal(&barrier_cond);
65     pthread_mutex_unlock(&barrier_mutex);
66 }
67
68 long long get_ns()
69 {
70     struct timespec ts;
71     clock_gettime(CLOCK_MONOTONIC, &ts);
72
73     return ts.tv_sec * 1000000000LL + ts.tv_nsec;
74 }
75
76 static S3Status data_callback(int bufferSize, const char *buffer,
77                               void *callbackData)
78 {
79     struct callback_state *state = (struct callback_state *)callbackData;
80     state->bytes_remaining -= bufferSize;
81     if (state->ts->first_byte_timestamp == 0)
82         state->ts->first_byte_timestamp = get_ns();
83     return S3StatusOK;
84 }
85
86 static S3Status properties_callback(const S3ResponseProperties *properties,
87                                      void *callbackData)
88 {
89     return S3StatusOK;
90 }
91
92 static void complete_callback(S3Status status,
93                               const S3ErrorDetails *errorDetails,
94                               void *callbackData)
95 {
96 }
97
98 static void do_get(const char *key, size_t bytes, struct thread_state *ts)
99 {
100     struct callback_state state;
101     struct S3GetObjectHandler handler;
102
103     state.bytes_remaining = bytes;
104     state.ts = ts;
105     handler.responseHandler.propertiesCallback = properties_callback;
106     handler.responseHandler.completeCallback = complete_callback;
107     handler.getObjectDataCallback = data_callback;
108
109     S3_get_object(&bucket, key, NULL, 0, 0, NULL, &handler, &state);
110 }
111
112 void *benchmark_thread(void *arg)
113 {
114     struct thread_state *ts = (struct thread_state *)arg;
115     char namebuf[64];
116
117     printf("Warming up...\n");
118     do_get("file-1048576-0", 0, ts);
119     printf("Ready.\n");
120     barrier_signal();
121
122     pthread_mutex_lock(&wait_mutex);
123     while (wait_val == 0)
124         pthread_cond_wait(&wait_cond, &wait_mutex);
125     pthread_mutex_unlock(&wait_mutex);
126
127     ts->timestamp = get_ns();
128     sprintf(namebuf, "file-%d-%d", experiment_size, ts->thread_num);
129     ts->first_byte_timestamp = 0;
130     do_get(namebuf, experiment_size, ts);
131     long long timestamp = get_ns();
132     long long elapsed = timestamp - ts->timestamp;
133
134     barrier_signal();
135
136     printf("Thread %d: %f elapsed\n", ts->thread_num, elapsed / 1e9);
137
138     return NULL;
139 }
140
141 void launch_thread(int n)
142 {
143     threads[n].thread_num = n;
144     if (pthread_create(&threads[n].thread, NULL, benchmark_thread, &threads[n]) != 0) {
145         fprintf(stderr, "Error launching thread!\n");
146         exit(1);
147     }
148 }
149
150 void wait_thread(int n)
151 {
152     void *result;
153     pthread_join(threads[n].thread, &result);
154 }
155
156 void launch_test(int thread_count)
157 {
158     int i;
159
160     barrier_val = thread_count;
161     assert(thread_count <= MAX_THREADS);
162
163     for (i = 0; i < thread_count; i++)
164         launch_thread(i);
165
166     /* Wait until all threads are ready. */
167     pthread_mutex_lock(&barrier_mutex);
168     while (barrier_val > 0) {
169         pthread_cond_wait(&barrier_cond, &barrier_mutex);
170     }
171     pthread_mutex_unlock(&barrier_mutex);
172
173     sleep(2);
174     barrier_val = thread_count;
175     pthread_mutex_lock(&wait_mutex);
176     printf("Launching test\n");
177     long long start_time = get_ns();
178     wait_val = 1;
179     pthread_cond_broadcast(&wait_cond);
180     pthread_mutex_unlock(&wait_mutex);
181
182     /* Wait until all threads are ready. */
183     pthread_mutex_lock(&barrier_mutex);
184     while (barrier_val > 0) {
185         pthread_cond_wait(&barrier_cond, &barrier_mutex);
186     }
187     pthread_mutex_unlock(&barrier_mutex);
188
189     long long end_time = get_ns();
190
191     printf("Elapsed time: %f\n", (end_time - start_time) / 1e9);
192     fprintf(statsfile, "%d\t%d\t%f\n", experiment_threads, experiment_size,
193             (end_time - start_time) / 1e9);
194 }
195
196 int main(int argc, char *argv[])
197 {
198     statsfile = fopen("readlatency.data", "a");
199     if (statsfile == NULL) {
200         perror("open stats file");
201         return 1;
202     }
203
204     S3_initialize(NULL, S3_INIT_ALL, NULL);
205
206     bucket.bucketName = "mvrable-benchmark";
207     bucket.protocol = S3ProtocolHTTP;
208     bucket.uriStyle = S3UriStyleVirtualHost;
209     bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
210     bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
211
212     pthread_mutex_init(&barrier_mutex, NULL);
213     pthread_cond_init(&barrier_cond, NULL);
214     pthread_mutex_init(&wait_mutex, NULL);
215     pthread_cond_init(&wait_cond, NULL);
216
217     if (argc < 3) {
218         fprintf(stderr, "Usage: %s <threads> <size>\n", argv[0]);
219         return 1;
220     }
221
222     experiment_threads = atoi(argv[1]);
223     experiment_size = atoi(argv[2]);
224     launch_test(experiment_threads);
225
226     printf("Done.\n");
227     fclose(statsfile);
228
229     return 0;
230 }