Update cleaner experiment plot
[bluesky.git] / logbench / logbench.c
1 /* A simple tool for benchmarking various logging strategies.
2  *
3  * We want to log a series of key/value pairs.  Approaches that we try include:
4  *  - Data written directly into the filesystem.
5  *  - Data is written to a Berkeley DB.
6  *  - Data is appended to a log file.
7  * In all cases we want to ensure that data is persistent on disk so it could
8  * be used for crash recovery.  We measure how many log records we can write
9  * per second to gauge performance. */
10
11 #define _GNU_SOURCE
12 #define _ATFILE_SOURCE
13
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <errno.h>
18 #include <time.h>
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <fcntl.h>
22 #include <unistd.h>
23
24 #include <db.h>
25 #include <glib.h>
26
27 struct item {
28     char *key;
29     char *data;
30     size_t len;
31 };
32
33 int queue_capacity = 1024;
34 int item_size = 1024;
35 int opt_threads = 1;
36 int opt_batchsize = 1;
37 int opt_writes = (1 << 12);
38 int opt_bdb_async = FALSE;
39
40 GAsyncQueue *queue;
41 int outstanding = 0;
42 GMutex *lock;
43 GCond *cond_empty, *cond_full;
44
45 int64_t get_ns()
46 {
47     struct timespec ts;
48     clock_gettime(CLOCK_MONOTONIC, &ts);
49
50     return ts.tv_sec * 1000000000LL + ts.tv_nsec;
51 }
52
53 struct item *get_item()
54 {
55     return (struct item *)g_async_queue_pop(queue);
56 }
57
58 void finish_item(struct item *item)
59 {
60     g_free(item->key);
61     g_free(item->data);
62     g_free(item);
63
64     g_mutex_lock(lock);
65     outstanding--;
66     if (outstanding == 0)
67         g_cond_signal(cond_empty);
68     if (outstanding < queue_capacity)
69         g_cond_signal(cond_full);
70     g_mutex_unlock(lock);
71 }
72
73 void writebuf(int fd, const char *buf, size_t len)
74 {
75     while (len > 0) {
76         ssize_t written;
77         written = write(fd, buf, len);
78         if (written < 0 && errno == EINTR)
79             continue;
80         g_assert(written >= 0);
81         buf += written;
82         len -= written;
83     }
84 }
85
86 /************************ Direct-to-filesystem logging ***********************/
87 static int dirfd = -1;
88
89 gpointer fslog_thread(gpointer d)
90 {
91     while (TRUE) {
92         struct item *item = get_item();
93
94         int fd = openat(dirfd, item->key, O_CREAT|O_WRONLY|O_TRUNC, 0666);
95         g_assert(fd >= 0);
96
97         writebuf(fd, item->data, item->len);
98
99         finish_item(item);
100
101         fsync(fd);
102         fsync(dirfd);
103         close(fd);
104     }
105
106     return NULL;
107 }
108
109 void launch_fslog()
110 {
111     dirfd = open(".", O_DIRECTORY);
112     g_assert(dirfd >= 0);
113
114     for (int i = 0; i < 1; i++)
115         g_thread_create(fslog_thread, NULL, FALSE, NULL);
116 }
117
118 /****************************** Single-File Log ******************************/
119 gpointer flatlog_thread(gpointer d)
120 {
121     int fd = open("logfile", O_CREAT|O_WRONLY|O_TRUNC, 0666);
122     g_assert(fd >= 0);
123
124     int count = 0;
125
126     while (TRUE) {
127         struct item *item = get_item();
128
129         writebuf(fd, item->key, strlen(item->key) + 1);
130         writebuf(fd, (char *)&item->len, sizeof(item->len));
131         writebuf(fd, item->data, item->len);
132
133         count++;
134         if (count % opt_batchsize == 0)
135             fdatasync(fd);
136
137         finish_item(item);
138     }
139
140     return NULL;
141 }
142
143 void launch_flatlog()
144 {
145     g_thread_create(flatlog_thread, NULL, FALSE, NULL);
146 }
147
148 /************************* Transactional Berkeley DB *************************/
149 gpointer bdb_thread(gpointer d)
150 {
151     int res;
152     DB_ENV *env;
153     DB *db;
154     DB_TXN *txn = NULL;
155     int count = 0;
156
157     res = db_env_create(&env, 0);
158     g_assert(res == 0);
159
160     res = env->open(env, ".",
161                     DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG
162                      | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0644);
163     g_assert(res == 0);
164
165     if (opt_bdb_async) {
166         res = env->set_flags(env, DB_TXN_WRITE_NOSYNC, 1);
167         g_assert(res == 0);
168     }
169
170     res = db_create(&db, env, 0);
171     g_assert(res == 0);
172
173     res = db->open(db, NULL, "log.db", "log", DB_BTREE,
174                    DB_CREATE | DB_THREAD | DB_AUTO_COMMIT, 0644);
175     g_assert(res == 0);
176
177     while (TRUE) {
178         if (txn == NULL && !opt_bdb_async) {
179             res = env->txn_begin(env, NULL, &txn, 0);
180             g_assert(res == 0);
181         }
182
183         struct item *item = get_item();
184
185         DBT key, value;
186         memset(&key, 0, sizeof(key));
187         memset(&value, 0, sizeof(value));
188
189         key.data = item->key;
190         key.size = strlen(item->key);
191
192         value.data = item->data;
193         value.size = item->len;
194
195         res = db->put(db, opt_bdb_async ? NULL : txn, &key, &value, 0);
196         g_assert(res == 0);
197
198         count++;
199         if (count % opt_batchsize == 0) {
200             if (opt_bdb_async) {
201                 env->txn_checkpoint(env, 0, 0, 0);
202             } else {
203                 txn->commit(txn, 0);
204                 txn = NULL;
205             }
206         }
207
208         finish_item(item);
209     }
210
211     return NULL;
212 }
213
214 void launch_bdb()
215 {
216     g_thread_create(bdb_thread, NULL, FALSE, NULL);
217 }
218
219 int main(int argc, char *argv[])
220 {
221     int64_t time_start, time_end;
222
223     g_thread_init(NULL);
224     queue = g_async_queue_new();
225     lock = g_mutex_new();
226     cond_empty = g_cond_new();
227     cond_full = g_cond_new();
228
229     int opt;
230     int backend = 0;
231     while ((opt = getopt(argc, argv, "at:s:b:n:BFD")) != -1) {
232         switch (opt) {
233         case 'a':
234             // Make BDB log writes more asynchronous
235             opt_bdb_async = TRUE;
236             break;
237         case 't':
238             // Set number of log worker threads
239             opt_threads = atoi(optarg);
240             break;
241         case 's':
242             // Set item size (in bytes)
243             item_size = atoi(optarg);
244             break;
245         case 'b':
246             // Set batch size
247             opt_batchsize = atoi(optarg);
248             break;
249         case 'n':
250             // Set object count
251             opt_writes = atoi(optarg);
252             break;
253         case 'B':
254             // Select BDB backend
255             backend = 'b';
256             break;
257         case 'F':
258             // Select flat file backend
259             backend = 'f';
260             break;
261         case 'D':
262             // Select file system directory backend
263             backend = 'd';
264             break;
265         default: /* '?' */
266             fprintf(stderr, "Usage: %s [-t threads] {-B|-F|-D}\n",
267                     argv[0]);
268             return EXIT_FAILURE;
269         }
270     }
271
272     switch (backend) {
273     case 'b':
274         launch_bdb();
275         break;
276     case 'f':
277         launch_flatlog();
278         break;
279     case 'd':
280         launch_fslog();
281         break;
282     default:
283         fprintf(stderr, "Backend not selected!\n");
284         return EXIT_FAILURE;
285     }
286
287     time_start = get_ns();
288     for (int i = 0; i < opt_writes; i++) {
289         struct item *item = g_new(struct item, 1);
290         item->key = g_strdup_printf("item-%06d", i);
291         item->data = g_malloc(item_size);
292         item->len = item_size;
293
294         g_mutex_lock(lock);
295         g_async_queue_push(queue, item);
296         outstanding++;
297         if (outstanding == opt_batchsize)
298             g_cond_wait(cond_empty, lock);
299         g_mutex_unlock(lock);
300     }
301
302     g_mutex_lock(lock);
303     while (outstanding > 0)
304         g_cond_wait(cond_empty, lock);
305     g_mutex_unlock(lock);
306     time_end = get_ns();
307
308     double elapsed = (time_end - time_start) / 1e9;
309     printf("Elapsed: %f s\nThroughput: %f txn/s, %f MiB/s\n",
310            elapsed, opt_writes / elapsed,
311            opt_writes / elapsed * item_size / (1 << 20));
312
313     if (backend == 'b' && opt_bdb_async)
314         backend = 'B';
315
316     FILE *f = fopen("../logbench.data", "a");
317     g_assert(f != NULL);
318     fprintf(f, "%c\t%d\t%d\t%d\t%f\t%f\t%f\n",
319             backend, item_size, opt_writes, opt_batchsize,
320             elapsed, opt_writes / elapsed,
321             opt_writes / elapsed * item_size / (1 << 20));
322     fclose(f);
323
324     return 0;
325 }