More reworking of microbenchmark handling of reads/writes and file layout.
[bluesky.git] / microbench / mixedbench.c
1 /* A simple file system workload generator.
2  *
3  * Reads and writes a number of files in the current working directory.
4  *
5  * Command-line arguments:
6  *   File size (bytes)
7  *   File count
8  *   Write fraction (0.0 - 1.0)
9  *   Threads
10  *   Benchmark duration (seconds)
11  *   Target operations per second (aggregate across all threads)
12  *   Interval count (how many times to report results during the run)
13  *   Directory size (number of files per numbered subdirectory)
14  */
15
16 #include <errno.h>
17 #include <inttypes.h>
18 #include <pthread.h>
19 #include <stdint.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <time.h>
26 #include <unistd.h>
27 #include <math.h>
28
29 int opt_filesize, opt_filecount, opt_threads, opt_duration, opt_intervals, opt_dirsize;
30 double opt_writeratio, opt_ops;
31
32 int write_threads;
33
34 struct thread_state {
35     pthread_t thread;
36     pthread_mutex_t lock;
37     int thread_num;
38     int read_count, write_count;
39     double read_time, write_time, read_time2, write_time2;
40 };
41
42 static int64_t start_time;
43
44 #define MAX_THREADS 128
45 struct thread_state threads[MAX_THREADS];
46
47 static double sq(double x)
48 {
49     return x * x;
50 }
51
52 static double stddev(double x, double x2, int n)
53 {
54     if (n < 2)
55         return 0;
56     return sqrt((x2 / n - sq(x / n)) * n / (n - 1));
57 }
58
59 int64_t now_hires()
60 {
61     struct timespec time;
62
63     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
64         perror("clock_gettime");
65         return 0;
66     }
67
68     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
69 }
70
71 int get_random(int range)
72 {
73     return random() % range;
74 }
75
76 void sleep_micros(int duration)
77 {
78     struct timespec req;
79     if (duration <= 0)
80         return;
81
82     req.tv_sec = duration / 1000000;
83     req.tv_nsec = (duration % 1000000) * 1000;
84
85     while (nanosleep(&req, &req) < 0 && errno == EINTR)
86         ;
87 }
88
89 void benchmark_op(struct thread_state *ts)
90 {
91     int64_t start, end;
92
93     start = now_hires();
94
95     /* The space of all files is partitioned evenly based on the number of
96      * threads.  Pick a file out of our particular partition. */
97     int thread_num, thread_count;
98     if (ts->thread_num >= write_threads) {
99         /* Read */
100         thread_num = ts->thread_num - write_threads;
101         thread_count = opt_threads - write_threads;
102     } else {
103         /* Write */
104         thread_num = ts->thread_num;
105         thread_count = write_threads;
106     }
107
108     int n = get_random(opt_filecount / thread_count);
109     n += thread_num * (opt_filecount / thread_count);
110     int n1 = n / opt_dirsize, n2 = n % opt_dirsize;
111     char filename[256];
112     sprintf(filename, "%d/%d", n1, n2);
113
114     if (ts->thread_num >= write_threads) {
115         /* Read */
116         FILE *f = fopen(filename, "rb");
117         if (f == NULL) {
118             fprintf(stderr, "fopen(%s): %m\n", filename);
119             return;
120         }
121
122         char buf[65536];
123         while (fread(buf, 1, sizeof(buf), f) > 0) { }
124         fclose(f);
125
126         end = now_hires();
127         pthread_mutex_lock(&ts->lock);
128         ts->read_count++;
129         ts->read_time += (end - start) / 1e9;
130         ts->read_time2 += sq((end - start) / 1e9);
131         pthread_mutex_unlock(&ts->lock);
132     } else {
133         /* Write */
134         FILE *f = fopen(filename, "wb");
135         if (f == NULL) {
136             fprintf(stderr, "fopen(%s): %m\n", filename);
137             return;
138         }
139
140         char buf[65536];
141         int bytes_left = opt_filesize;
142         while (bytes_left > 0) {
143             size_t written = fwrite(buf, 1,
144                                     bytes_left < sizeof(buf)
145                                      ? bytes_left : sizeof(buf),
146                                     f);
147             if (ferror(f))
148                 return;
149             bytes_left -= written;
150         }
151         fclose(f);
152
153         end = now_hires();
154         pthread_mutex_lock(&ts->lock);
155         ts->write_count++;
156         ts->write_time += (end - start) / 1e9;
157         ts->write_time2 += sq((end - start) / 1e9);
158         pthread_mutex_unlock(&ts->lock);
159     }
160 }
161
162 void *benchmark_thread(void *arg)
163 {
164     struct thread_state *ts = (struct thread_state *)arg;
165
166     int target_delay = (opt_threads / opt_ops) * 1e6;
167
168     while (1) {
169         int64_t start = now_hires();
170         benchmark_op(ts);
171         int64_t end = now_hires();
172
173         int elapsed = (end - start) / 1000;
174         if (elapsed < target_delay)
175             sleep_micros(target_delay - elapsed);
176     }
177
178     return NULL;
179 }
180
181 void launch_thread(int i)
182 {
183     memset(&threads[i], 0, sizeof(struct thread_state));
184     threads[i].thread_num = i;
185     pthread_mutex_init(&threads[i].lock, NULL);
186     if (pthread_create(&threads[i].thread, NULL, benchmark_thread, &threads[i]) != 0) {
187         fprintf(stderr, "Error launching thread!\n");
188         exit(1);
189     }
190 }
191
192 void wait_thread(int n)
193 {
194     void *result;
195     pthread_join(threads[n].thread, &result);
196 }
197
198 void reset_stats(int print, double duration)
199 {
200     int read_count = 0, write_count = 0;
201     double read_time = 0, write_time = 0, read_time2 = 0, write_time2 = 0;
202
203     for (int i = 0; i < opt_threads; i++) {
204         pthread_mutex_lock(&threads[i].lock);
205         read_count += threads[i].read_count;
206         write_count += threads[i].write_count;
207         read_time += threads[i].read_time;
208         write_time += threads[i].write_time;
209         read_time2 += threads[i].read_time2;
210         write_time2 += threads[i].write_time2;
211         threads[i].read_count = threads[i].write_count = 0;
212         threads[i].read_time = threads[i].write_time = 0;
213         threads[i].read_time2 = threads[i].write_time2 = 0;
214         pthread_mutex_unlock(&threads[i].lock);
215     }
216
217     if (print) {
218         printf("read: [%g, %f, %f]\n",
219                read_count / duration, read_time / read_count,
220                stddev(read_time, read_time2, read_count));
221         printf("write: [%g, %f, %f]\n",
222                write_count / duration, write_time / write_count,
223                stddev(write_time, write_time2, write_count));
224         printf("\n");
225         fflush(stdout);
226     }
227 }
228
229 int main(int argc, char *argv[])
230 {
231     if (argc != 9) {
232         fprintf(stderr, "Usage: TODO\n");
233         return 1;
234     }
235
236     opt_filesize = atoi(argv[1]);
237     opt_filecount = atoi(argv[2]);
238     opt_writeratio = atof(argv[3]);
239     opt_threads = atoi(argv[4]);
240     opt_duration = atoi(argv[5]);
241     opt_ops = atof(argv[6]);
242     opt_intervals = atoi(argv[7]);
243     opt_dirsize = atoi(argv[8]);
244
245     srandom(time(NULL));
246
247     start_time = now_hires();
248
249     /* Partition threads into those that should do reads and those for writes,
250      * as close as possible to the desired allocation. */
251     write_threads = (int)round(opt_threads * opt_writeratio);
252     fprintf(stderr, "Using %d threads for reads, %d for writes\n",
253             opt_threads - write_threads, write_threads);
254
255     for (int i = 0; i < opt_threads; i++) {
256         launch_thread(i);
257     }
258
259     for (int i = 0; i < opt_intervals; i++) {
260         sleep_micros(opt_duration * 1000000 / opt_intervals);
261         reset_stats(1, (double)opt_duration / opt_intervals);
262     }
263
264     return 0;
265 }