Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / cloudbench / readlatency.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2010  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 /* Simple benchmark for Amazon S3: measures download speeds for
32  * differently-sized objects and with a variable number of parallel
33  * connections. */
34
35 #include <assert.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <stdint.h>
39 #include <string.h>
40 #include <pthread.h>
41 #include <time.h>
42 #include <unistd.h>
43 #include <math.h>
44
45 #include "libs3.h"
46
47 FILE *statsfile;
48
49 S3BucketContext bucket;
50
51 struct thread_state {
52     pthread_t thread;
53     int thread_num;
54     long long timestamp;
55
56     // Time when first bytes of the response were received
57     long long first_byte_timestamp;
58
59     // Statistics for computing mean and standard deviation
60     int n;
61     size_t bytes_sent;
62     double sum_x, sum_x2;
63
64     double sum_f;
65 };
66
67 struct callback_state {
68     struct thread_state *ts;
69     size_t bytes_remaining;
70 };
71
72 #define MAX_THREADS 1024
73 struct thread_state threads[MAX_THREADS];
74
75 int experiment_threads, experiment_size, experiment_objects;
76
77 pthread_mutex_t barrier_mutex;
78 pthread_cond_t barrier_cond;
79 int barrier_val;
80
81 pthread_mutex_t wait_mutex;
82 pthread_cond_t wait_cond;
83 int wait_val = 0;
84
85 enum phase { LAUNCH, MEASURE, TERMINATE };
86 volatile enum phase test_phase;
87
88 void barrier_signal()
89 {
90     pthread_mutex_lock(&barrier_mutex);
91     barrier_val--;
92     printf("Barrier: %d left\n", barrier_val);
93     if (barrier_val == 0)
94         pthread_cond_signal(&barrier_cond);
95     pthread_mutex_unlock(&barrier_mutex);
96 }
97
98 long long get_ns()
99 {
100     struct timespec ts;
101     clock_gettime(CLOCK_MONOTONIC, &ts);
102
103     return ts.tv_sec * 1000000000LL + ts.tv_nsec;
104 }
105
106 static S3Status data_callback(int bufferSize, const char *buffer,
107                               void *callbackData)
108 {
109     struct callback_state *state = (struct callback_state *)callbackData;
110     state->bytes_remaining -= bufferSize;
111     if (state->ts->first_byte_timestamp == 0)
112         state->ts->first_byte_timestamp = get_ns();
113     return S3StatusOK;
114 }
115
116 static S3Status properties_callback(const S3ResponseProperties *properties,
117                                      void *callbackData)
118 {
119     return S3StatusOK;
120 }
121
122 static void complete_callback(S3Status status,
123                               const S3ErrorDetails *errorDetails,
124                               void *callbackData)
125 {
126 }
127
128 static void do_get(const char *key, size_t bytes, struct thread_state *ts)
129 {
130     struct callback_state state;
131     struct S3GetObjectHandler handler;
132
133     state.bytes_remaining = bytes;
134     state.ts = ts;
135     handler.responseHandler.propertiesCallback = properties_callback;
136     handler.responseHandler.completeCallback = complete_callback;
137     handler.getObjectDataCallback = data_callback;
138
139     S3_get_object(&bucket, key, NULL, 0, 0, NULL, &handler, &state);
140 }
141
142 void *benchmark_thread(void *arg)
143 {
144     struct thread_state *ts = (struct thread_state *)arg;
145     char namebuf[64];
146
147     printf("Warming up...\n");
148     do_get("file-1048576-0", 0, ts);
149     printf("Ready.\n");
150     barrier_signal();
151
152     pthread_mutex_lock(&wait_mutex);
153     while (wait_val == 0)
154         pthread_cond_wait(&wait_cond, &wait_mutex);
155     pthread_mutex_unlock(&wait_mutex);
156
157     ts->timestamp = get_ns();
158     sprintf(namebuf, "file-%d-%d", experiment_size, ts->thread_num);
159     ts->first_byte_timestamp = 0;
160     do_get(namebuf, experiment_size, ts);
161     long long timestamp = get_ns();
162     long long elapsed = timestamp - ts->timestamp;
163
164     barrier_signal();
165
166     printf("Thread %d: %f elapsed\n", ts->thread_num, elapsed / 1e9);
167
168     return NULL;
169 }
170
171 void launch_thread(int n)
172 {
173     threads[n].thread_num = n;
174     if (pthread_create(&threads[n].thread, NULL, benchmark_thread, &threads[n]) != 0) {
175         fprintf(stderr, "Error launching thread!\n");
176         exit(1);
177     }
178 }
179
180 void wait_thread(int n)
181 {
182     void *result;
183     pthread_join(threads[n].thread, &result);
184 }
185
186 void launch_test(int thread_count)
187 {
188     int i;
189
190     barrier_val = thread_count;
191     assert(thread_count <= MAX_THREADS);
192
193     for (i = 0; i < thread_count; i++)
194         launch_thread(i);
195
196     /* Wait until all threads are ready. */
197     pthread_mutex_lock(&barrier_mutex);
198     while (barrier_val > 0) {
199         pthread_cond_wait(&barrier_cond, &barrier_mutex);
200     }
201     pthread_mutex_unlock(&barrier_mutex);
202
203     sleep(2);
204     barrier_val = thread_count;
205     pthread_mutex_lock(&wait_mutex);
206     printf("Launching test\n");
207     long long start_time = get_ns();
208     wait_val = 1;
209     pthread_cond_broadcast(&wait_cond);
210     pthread_mutex_unlock(&wait_mutex);
211
212     /* Wait until all threads are ready. */
213     pthread_mutex_lock(&barrier_mutex);
214     while (barrier_val > 0) {
215         pthread_cond_wait(&barrier_cond, &barrier_mutex);
216     }
217     pthread_mutex_unlock(&barrier_mutex);
218
219     long long end_time = get_ns();
220
221     printf("Elapsed time: %f\n", (end_time - start_time) / 1e9);
222     fprintf(statsfile, "%d\t%d\t%f\n", experiment_threads, experiment_size,
223             (end_time - start_time) / 1e9);
224 }
225
226 int main(int argc, char *argv[])
227 {
228     statsfile = fopen("readlatency.data", "a");
229     if (statsfile == NULL) {
230         perror("open stats file");
231         return 1;
232     }
233
234     S3_initialize(NULL, S3_INIT_ALL, NULL);
235
236     bucket.bucketName = "mvrable-benchmark";
237     bucket.protocol = S3ProtocolHTTP;
238     bucket.uriStyle = S3UriStyleVirtualHost;
239     bucket.accessKeyId = getenv("AWS_ACCESS_KEY_ID");
240     bucket.secretAccessKey = getenv("AWS_SECRET_ACCESS_KEY");
241
242     pthread_mutex_init(&barrier_mutex, NULL);
243     pthread_cond_init(&barrier_cond, NULL);
244     pthread_mutex_init(&wait_mutex, NULL);
245     pthread_cond_init(&wait_cond, NULL);
246
247     if (argc < 3) {
248         fprintf(stderr, "Usage: %s <threads> <size>\n", argv[0]);
249         return 1;
250     }
251
252     experiment_threads = atoi(argv[1]);
253     experiment_size = atoi(argv[2]);
254     launch_test(experiment_threads);
255
256     printf("Done.\n");
257     fclose(statsfile);
258
259     return 0;
260 }