A new microbenchmark tool to figure out what format to use for logs.
[bluesky.git] / logbench / logbench.c
1 /* A simple tool for benchmarking various logging strategies.
2  *
3  * We want to log a series of key/value pairs.  Approaches that we try include:
4  *  - Data written directly into the filesystem.
5  *  - Data is written to a Berkeley DB.
6  *  - Data is appended to a log file.
7  * In all cases we want to ensure that data is persistent on disk so it could
8  * be used for crash recovery.  We measure how many log records we can write
9  * per second to gauge performance. */
10
11 #define _GNU_SOURCE
12 #define _ATFILE_SOURCE
13
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <errno.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <fcntl.h>
21 #include <unistd.h>
22
23 #include <db.h>
24 #include <glib.h>
25
26 struct item {
27     char *key;
28     char *data;
29     size_t len;
30 };
31
32 const int queue_capacity = 1024;
33 const int item_size = 1024;
34
35 GAsyncQueue *queue;
36 int outstanding = 0;
37 GMutex *lock;
38 GCond *cond_empty, *cond_full;
39
40 struct item *get_item()
41 {
42     return (struct item *)g_async_queue_pop(queue);
43 }
44
45 void finish_item(struct item *item)
46 {
47     g_free(item->key);
48     g_free(item->data);
49     g_free(item);
50
51     g_mutex_lock(lock);
52     outstanding--;
53     if (outstanding == 0)
54         g_cond_signal(cond_empty);
55     if (outstanding < queue_capacity)
56         g_cond_signal(cond_full);
57     g_mutex_unlock(lock);
58 }
59
60 void writebuf(int fd, const char *buf, size_t len)
61 {
62     while (len > 0) {
63         ssize_t written;
64         written = write(fd, buf, len);
65         if (written < 0 && errno == EINTR)
66             continue;
67         g_assert(written >= 0);
68         buf += written;
69         len -= written;
70     }
71 }
72
73 /************************ Direct-to-filesystem logging ***********************/
74 static int dirfd = -1;
75
76 gpointer fslog_thread(gpointer d)
77 {
78     g_print("Launching filesystem writer thread...\n");
79
80     while (TRUE) {
81         struct item *item = get_item();
82
83         int fd = openat(dirfd, item->key, O_CREAT|O_WRONLY|O_TRUNC, 0666);
84         g_assert(fd >= 0);
85
86         writebuf(fd, item->data, item->len);
87
88         finish_item(item);
89
90         fsync(fd);
91         fsync(dirfd);
92         close(fd);
93     }
94
95     return NULL;
96 }
97
98 void launch_fslog()
99 {
100     dirfd = open("logdir", O_DIRECTORY);
101     g_assert(dirfd >= 0);
102
103     for (int i = 0; i < 1; i++)
104         g_thread_create(fslog_thread, NULL, FALSE, NULL);
105 }
106
107 /****************************** Single-File Log ******************************/
108 gpointer flatlog_thread(gpointer d)
109 {
110     g_print("Launching flat log writer thread...\n");
111
112     int fd = open("logfile", O_CREAT|O_WRONLY|O_TRUNC, 0666);
113     g_assert(fd >= 0);
114
115     int count = 0;
116
117     while (TRUE) {
118         struct item *item = get_item();
119
120         writebuf(fd, item->key, strlen(item->key) + 1);
121         writebuf(fd, (char *)&item->len, sizeof(item->len));
122         writebuf(fd, item->data, item->len);
123
124         count++;
125         if (count % (1 << 8) == 0)
126             fdatasync(fd);
127
128         finish_item(item);
129     }
130
131     return NULL;
132 }
133
134 void launch_flatlog()
135 {
136     g_thread_create(flatlog_thread, NULL, FALSE, NULL);
137 }
138
139 /************************* Transactional Berkeley DB *************************/
140 gpointer bdb_thread(gpointer d)
141 {
142     g_print("Launching BDB log writer thread...\n");
143
144     int res;
145     DB_ENV *env;
146     DB *db;
147     DB_TXN *txn = NULL;
148     int count = 0;
149
150     res = db_env_create(&env, 0);
151     g_assert(res == 0);
152
153     res = env->open(env, "bdb",
154                     DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG
155                      | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0644);
156     g_assert(res == 0);
157
158     res = db_create(&db, env, 0);
159     g_assert(res == 0);
160
161     res = db->open(db, NULL, "log.db", "log", DB_BTREE,
162                    DB_CREATE | DB_THREAD | DB_AUTO_COMMIT, 0644);
163     g_assert(res == 0);
164
165     while (TRUE) {
166         if (txn == NULL) {
167             res = env->txn_begin(env, NULL, &txn, 0);
168             g_assert(res == 0);
169         }
170
171         struct item *item = get_item();
172
173         DBT key, value;
174         memset(&key, 0, sizeof(key));
175         memset(&value, 0, sizeof(value));
176
177         key.data = item->key;
178         key.size = strlen(item->key);
179
180         value.data = item->data;
181         value.size = item->len;
182
183         res = db->put(db, NULL, &key, &value, 0);
184         g_assert(res == 0);
185
186         count++;
187         if (count % (1 << 8) == 0) {
188             txn->commit(txn, 0);
189             txn = NULL;
190         }
191
192         finish_item(item);
193     }
194
195     return NULL;
196 }
197
198 void launch_bdb()
199 {
200     g_thread_create(bdb_thread, NULL, FALSE, NULL);
201 }
202
203 int main(int argc, char *argv[])
204 {
205     g_thread_init(NULL);
206     queue = g_async_queue_new();
207     lock = g_mutex_new();
208     cond_empty = g_cond_new();
209     cond_full = g_cond_new();
210
211     launch_fslog();
212     // launch_flatlog();
213     // launch_bdb();
214
215     for (int i = 0; i < (1 << 12); i++) {
216         struct item *item = g_new(struct item, 1);
217         item->key = g_strdup_printf("item-%06d", i);
218         item->data = g_malloc(item_size);
219         item->len = item_size;
220
221         g_mutex_lock(lock);
222         while (outstanding >= queue_capacity)
223             g_cond_wait(cond_full, lock);
224         g_async_queue_push(queue, item);
225         outstanding++;
226         g_mutex_unlock(lock);
227     }
228
229     g_mutex_lock(lock);
230     while (outstanding > 0)
231         g_cond_wait(cond_empty, lock);
232     g_mutex_unlock(lock);
233
234     return 0;
235 }