Change how mixed read/write workloads are done in mixedbench
[bluesky.git] / microbench / mixedbench.c
1 /* A simple file system workload generator.
2  *
3  * Reads and writes a number of files in the current working directory.
4  *
5  * Command-line arguments:
6  *   File size (bytes)
7  *   File count
8  *   Write fraction (0.0 - 1.0)
9  *   Threads
10  *   Benchmark duration (seconds)
11  *   Target operations per second (aggregate across all threads)
12  *   Interval count (how many times to report results during the run)
13  *   Directory size (number of files per numbered subdirectory)
14  */
15
16 #include <errno.h>
17 #include <inttypes.h>
18 #include <pthread.h>
19 #include <stdint.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <string.h>
23 #include <sys/stat.h>
24 #include <sys/types.h>
25 #include <time.h>
26 #include <unistd.h>
27 #include <math.h>
28
29 int opt_filesize, opt_filecount, opt_threads, opt_duration, opt_intervals, opt_dirsize;
30 double opt_writeratio, opt_ops;
31
32 int write_threads;
33
34 struct thread_state {
35     pthread_t thread;
36     pthread_mutex_t lock;
37     int thread_num;
38     int read_count, write_count;
39     double read_time, write_time, read_time2, write_time2;
40 };
41
42 static int64_t start_time;
43
44 #define MAX_THREADS 128
45 struct thread_state threads[MAX_THREADS];
46
47 static double sq(double x)
48 {
49     return x * x;
50 }
51
52 static double stddev(double x, double x2, int n)
53 {
54     if (n < 2)
55         return 0;
56     return sqrt((x2 / n - sq(x / n)) * n / (n - 1));
57 }
58
59 int64_t now_hires()
60 {
61     struct timespec time;
62
63     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
64         perror("clock_gettime");
65         return 0;
66     }
67
68     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
69 }
70
71 int get_random(int range)
72 {
73     return random() % range;
74 }
75
76 void sleep_micros(int duration)
77 {
78     struct timespec req;
79     if (duration <= 0)
80         return;
81
82     req.tv_sec = duration / 1000000;
83     req.tv_nsec = (duration % 1000000) * 1000;
84
85     while (nanosleep(&req, &req) < 0 && errno == EINTR)
86         ;
87 }
88
89 void benchmark_op(struct thread_state *ts)
90 {
91     int64_t start, end;
92
93     start = now_hires();
94
95     char filename[256];
96     int n = get_random(opt_filecount);
97     int n1 = n / opt_dirsize, n2 = n % opt_dirsize;
98
99     if (ts->thread_num >= write_threads) {
100         /* Read */
101         sprintf(filename, "t%d/%d/%d", ts->thread_num - write_threads, n1, n2);
102         FILE *f = fopen(filename, "rb");
103         if (f == NULL) {
104             perror("fopen");
105             return;
106         }
107
108         char buf[65536];
109         while (fread(buf, 1, sizeof(buf), f) > 0) { }
110         fclose(f);
111
112         end = now_hires();
113         pthread_mutex_lock(&ts->lock);
114         ts->read_count++;
115         ts->read_time += (end - start) / 1e9;
116         ts->read_time2 += sq((end - start) / 1e9);
117         pthread_mutex_unlock(&ts->lock);
118     } else {
119         /* Write */
120         sprintf(filename, "t%d/%d/%d", ts->thread_num, n1, n2);
121         FILE *f = fopen(filename, "wb");
122         if (f == NULL) {
123             perror("fopen");
124             return;
125         }
126
127         char buf[65536];
128         int bytes_left = opt_filesize;
129         while (bytes_left > 0) {
130             size_t written = fwrite(buf, 1,
131                                     bytes_left < sizeof(buf)
132                                      ? bytes_left : sizeof(buf),
133                                     f);
134             if (ferror(f))
135                 return;
136             bytes_left -= written;
137         }
138         fclose(f);
139
140         end = now_hires();
141         pthread_mutex_lock(&ts->lock);
142         ts->write_count++;
143         ts->write_time += (end - start) / 1e9;
144         ts->write_time2 += sq((end - start) / 1e9);
145         pthread_mutex_unlock(&ts->lock);
146     }
147 }
148
149 void *benchmark_thread(void *arg)
150 {
151     struct thread_state *ts = (struct thread_state *)arg;
152
153     int target_delay = (opt_threads / opt_ops) * 1e6;
154
155     while (1) {
156         int64_t start = now_hires();
157         benchmark_op(ts);
158         int64_t end = now_hires();
159
160         int elapsed = (end - start) / 1000;
161         if (elapsed < target_delay)
162             sleep_micros(target_delay - elapsed);
163     }
164
165     return NULL;
166 }
167
168 void launch_thread(int i)
169 {
170     memset(&threads[i], 0, sizeof(struct thread_state));
171     threads[i].thread_num = i;
172     pthread_mutex_init(&threads[i].lock, NULL);
173     if (pthread_create(&threads[i].thread, NULL, benchmark_thread, &threads[i]) != 0) {
174         fprintf(stderr, "Error launching thread!\n");
175         exit(1);
176     }
177 }
178
179 void wait_thread(int n)
180 {
181     void *result;
182     pthread_join(threads[n].thread, &result);
183 }
184
185 void reset_stats(int print, double duration)
186 {
187     int read_count = 0, write_count = 0;
188     double read_time = 0, write_time = 0, read_time2 = 0, write_time2 = 0;
189
190     for (int i = 0; i < opt_threads; i++) {
191         pthread_mutex_lock(&threads[i].lock);
192         read_count += threads[i].read_count;
193         write_count += threads[i].write_count;
194         read_time += threads[i].read_time;
195         write_time += threads[i].write_time;
196         read_time2 += threads[i].read_time2;
197         write_time2 += threads[i].write_time2;
198         threads[i].read_count = threads[i].write_count = 0;
199         threads[i].read_time = threads[i].write_time = 0;
200         threads[i].read_time2 = threads[i].write_time2 = 0;
201         pthread_mutex_unlock(&threads[i].lock);
202     }
203
204     if (print) {
205         printf("read: [%g, %f, %f]\n",
206                read_count / duration, read_time / read_count,
207                stddev(read_time, read_time2, read_count));
208         printf("write: [%g, %f, %f]\n",
209                write_count / duration, write_time / write_count,
210                stddev(write_time, write_time2, write_count));
211         printf("\n");
212         fflush(stdout);
213     }
214 }
215
216 int main(int argc, char *argv[])
217 {
218     if (argc != 9) {
219         fprintf(stderr, "Usage: TODO\n");
220         return 1;
221     }
222
223     opt_filesize = atoi(argv[1]);
224     opt_filecount = atoi(argv[2]);
225     opt_writeratio = atof(argv[3]);
226     opt_threads = atoi(argv[4]);
227     opt_duration = atoi(argv[5]);
228     opt_ops = atof(argv[6]);
229     opt_intervals = atoi(argv[7]);
230     opt_dirsize = atoi(argv[8]);
231
232     srandom(time(NULL));
233
234     start_time = now_hires();
235
236     /* Partition threads into those that should do reads and those for writes,
237      * as close as possible to the desired allocation. */
238     write_threads = (int)round(opt_threads * opt_writeratio);
239     fprintf(stderr, "Using %d threads for reads, %d for writes\n",
240             opt_threads - write_threads, write_threads);
241
242     for (int i = 0; i < opt_threads; i++) {
243         launch_thread(i);
244     }
245
246     for (int i = 0; i < opt_intervals; i++) {
247         sleep_micros(opt_duration * 1000000 / opt_intervals);
248         reset_stats(1, (double)opt_duration / opt_intervals);
249     }
250
251     return 0;
252 }