Setup for the synthetic read benchmark
[bluesky.git] / microbench / mixedbench.c
1 /* A simple file system workload generator.
2  *
3  * Reads and writes a number of files in the current working directory.
4  *
5  * Command-line arguments:
6  *   File size (bytes)
7  *   File count
8  *   Write fraction (0.0 - 1.0)
9  *   Threads
10  *   Benchmark duration (seconds)
11  *   Target operations per second (aggregate across all threads)
12  *   Interval count (how many times to report results during the run)
13  *   Directory size (number of files per numbered subdirectory)
14  *   Read/write block size (0 for the entire file)
15  */
16
17 #include <errno.h>
18 #include <inttypes.h>
19 #include <pthread.h>
20 #include <stdint.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <sys/stat.h>
25 #include <sys/types.h>
26 #include <time.h>
27 #include <unistd.h>
28 #include <math.h>
29
30 int opt_filesize, opt_filecount, opt_threads, opt_duration, opt_intervals, opt_dirsize, opt_blocksize;
31 double opt_writeratio, opt_ops;
32
33 int write_threads;
34
35 struct thread_state {
36     pthread_t thread;
37     pthread_mutex_t lock;
38     int thread_num;
39     int read_count, write_count;
40     double read_time, write_time, read_time2, write_time2;
41 };
42
43 static int64_t start_time;
44
45 #define MAX_THREADS 128
46 struct thread_state threads[MAX_THREADS];
47
48 static double sq(double x)
49 {
50     return x * x;
51 }
52
53 static double stddev(double x, double x2, int n)
54 {
55     if (n < 2)
56         return 0;
57     return sqrt((x2 / n - sq(x / n)) * n / (n - 1));
58 }
59
60 int64_t now_hires()
61 {
62     struct timespec time;
63
64     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
65         perror("clock_gettime");
66         return 0;
67     }
68
69     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
70 }
71
72 int get_random(int range)
73 {
74     return random() % range;
75 }
76
77 void sleep_micros(int duration)
78 {
79     struct timespec req;
80     if (duration <= 0)
81         return;
82
83     req.tv_sec = duration / 1000000;
84     req.tv_nsec = (duration % 1000000) * 1000;
85
86     while (nanosleep(&req, &req) < 0 && errno == EINTR)
87         ;
88 }
89
90 void benchmark_op(struct thread_state *ts)
91 {
92     int64_t start, end;
93
94     start = now_hires();
95
96     /* The space of all files is partitioned evenly based on the number of
97      * threads.  Pick a file out of our particular partition. */
98     int thread_num, thread_count;
99     if (ts->thread_num >= write_threads) {
100         /* Read */
101         thread_num = ts->thread_num - write_threads;
102         thread_count = opt_threads - write_threads;
103     } else {
104         /* Write */
105         thread_num = ts->thread_num;
106         thread_count = write_threads;
107     }
108
109     int n = get_random(opt_filecount / thread_count);
110     n += thread_num * (opt_filecount / thread_count);
111     int n1 = n / opt_dirsize, n2 = n % opt_dirsize;
112     char filename[256];
113     sprintf(filename, "%d/%d", n1, n2);
114
115     /* If a smaller blocksize was requested, choose a random offset within the
116      * file to use. */
117     int offset = 0;
118     if (opt_blocksize > 0) {
119         offset = get_random(opt_filesize / opt_blocksize) * opt_blocksize;
120     }
121
122     if (ts->thread_num >= write_threads) {
123         /* Read */
124         FILE *f = fopen(filename, "rb");
125         if (f == NULL) {
126             fprintf(stderr, "fopen(%s): %m\n", filename);
127             return;
128         }
129
130         if (offset != 0)
131             fseek(f, offset, SEEK_SET);
132
133         char buf[65536];
134         int bytes_left = opt_blocksize > 0 ? opt_blocksize : opt_filesize;
135         while (bytes_left > 0) {
136             size_t read = fread(buf, 1, bytes_left < sizeof(buf)
137                                          ? bytes_left : sizeof(buf), f);
138             if (ferror(f))
139                 return;
140             bytes_left -= read;
141         }
142         fclose(f);
143
144         end = now_hires();
145         pthread_mutex_lock(&ts->lock);
146         ts->read_count++;
147         ts->read_time += (end - start) / 1e9;
148         ts->read_time2 += sq((end - start) / 1e9);
149         pthread_mutex_unlock(&ts->lock);
150     } else {
151         /* Write */
152         FILE *f = fopen(filename, "r+b");
153         if (f == NULL) {
154             fprintf(stderr, "fopen(%s): %m\n", filename);
155             return;
156         }
157
158         if (offset != 0)
159             fseek(f, offset, SEEK_SET);
160
161         char buf[65536];
162         int bytes_left = opt_blocksize > 0 ? opt_blocksize : opt_filesize;
163         while (bytes_left > 0) {
164             size_t written = fwrite(buf, 1,
165                                     bytes_left < sizeof(buf)
166                                      ? bytes_left : sizeof(buf),
167                                     f);
168             if (ferror(f))
169                 return;
170             bytes_left -= written;
171         }
172         fclose(f);
173
174         end = now_hires();
175         pthread_mutex_lock(&ts->lock);
176         ts->write_count++;
177         ts->write_time += (end - start) / 1e9;
178         ts->write_time2 += sq((end - start) / 1e9);
179         pthread_mutex_unlock(&ts->lock);
180     }
181 }
182
183 void *benchmark_thread(void *arg)
184 {
185     struct thread_state *ts = (struct thread_state *)arg;
186
187     int target_delay = (opt_threads / opt_ops) * 1e6;
188
189     while (1) {
190         int64_t start = now_hires();
191         benchmark_op(ts);
192         int64_t end = now_hires();
193
194         int elapsed = (end - start) / 1000;
195         if (elapsed < target_delay)
196             sleep_micros(target_delay - elapsed);
197     }
198
199     return NULL;
200 }
201
202 void launch_thread(int i)
203 {
204     memset(&threads[i], 0, sizeof(struct thread_state));
205     threads[i].thread_num = i;
206     pthread_mutex_init(&threads[i].lock, NULL);
207     if (pthread_create(&threads[i].thread, NULL, benchmark_thread, &threads[i]) != 0) {
208         fprintf(stderr, "Error launching thread!\n");
209         exit(1);
210     }
211 }
212
213 void wait_thread(int n)
214 {
215     void *result;
216     pthread_join(threads[n].thread, &result);
217 }
218
219 void reset_stats(int print, double duration)
220 {
221     int read_count = 0, write_count = 0;
222     double read_time = 0, write_time = 0, read_time2 = 0, write_time2 = 0;
223
224     for (int i = 0; i < opt_threads; i++) {
225         pthread_mutex_lock(&threads[i].lock);
226         read_count += threads[i].read_count;
227         write_count += threads[i].write_count;
228         read_time += threads[i].read_time;
229         write_time += threads[i].write_time;
230         read_time2 += threads[i].read_time2;
231         write_time2 += threads[i].write_time2;
232         threads[i].read_count = threads[i].write_count = 0;
233         threads[i].read_time = threads[i].write_time = 0;
234         threads[i].read_time2 = threads[i].write_time2 = 0;
235         pthread_mutex_unlock(&threads[i].lock);
236     }
237
238     if (print) {
239         printf("read: [%g, %f, %f]\n",
240                read_count / duration, read_time / read_count,
241                stddev(read_time, read_time2, read_count));
242         printf("write: [%g, %f, %f]\n",
243                write_count / duration, write_time / write_count,
244                stddev(write_time, write_time2, write_count));
245         printf("\n");
246         fflush(stdout);
247     }
248 }
249
250 int main(int argc, char *argv[])
251 {
252     if (argc != 10) {
253         fprintf(stderr, "Usage: TODO\n");
254         return 1;
255     }
256
257     opt_filesize = atoi(argv[1]);
258     opt_filecount = atoi(argv[2]);
259     opt_writeratio = atof(argv[3]);
260     opt_threads = atoi(argv[4]);
261     opt_duration = atoi(argv[5]);
262     opt_ops = atof(argv[6]);
263     opt_intervals = atoi(argv[7]);
264     opt_dirsize = atoi(argv[8]);
265     opt_blocksize = atoi(argv[9]);
266
267     srandom(time(NULL));
268
269     start_time = now_hires();
270
271     /* Partition threads into those that should do reads and those for writes,
272      * as close as possible to the desired allocation. */
273     write_threads = (int)round(opt_threads * opt_writeratio);
274     fprintf(stderr, "Using %d threads for reads, %d for writes\n",
275             opt_threads - write_threads, write_threads);
276
277     for (int i = 0; i < opt_threads; i++) {
278         launch_thread(i);
279     }
280
281     for (int i = 0; i < opt_intervals; i++) {
282         sleep_micros(opt_duration * 1000000 / opt_intervals);
283         reset_stats(1, (double)opt_duration / opt_intervals);
284     }
285
286     return 0;
287 }