Make the log benchmark configurable and make a parameter sweep script.
[bluesky.git] / logbench / logbench.c
1 /* A simple tool for benchmarking various logging strategies.
2  *
3  * We want to log a series of key/value pairs.  Approaches that we try include:
4  *  - Data written directly into the filesystem.
5  *  - Data is written to a Berkeley DB.
6  *  - Data is appended to a log file.
7  * In all cases we want to ensure that data is persistent on disk so it could
8  * be used for crash recovery.  We measure how many log records we can write
9  * per second to gauge performance. */
10
11 #define _GNU_SOURCE
12 #define _ATFILE_SOURCE
13
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <errno.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <fcntl.h>
21 #include <unistd.h>
22
23 #include <db.h>
24 #include <glib.h>
25
26 struct item {
27     char *key;
28     char *data;
29     size_t len;
30 };
31
32 int queue_capacity = 1024;
33 int item_size = 1024;
34 int opt_threads = 1;
35 int opt_batchsize = 1;
36
37 GAsyncQueue *queue;
38 int outstanding = 0;
39 GMutex *lock;
40 GCond *cond_empty, *cond_full;
41
42 struct item *get_item()
43 {
44     return (struct item *)g_async_queue_pop(queue);
45 }
46
47 void finish_item(struct item *item)
48 {
49     g_free(item->key);
50     g_free(item->data);
51     g_free(item);
52
53     g_mutex_lock(lock);
54     outstanding--;
55     if (outstanding == 0)
56         g_cond_signal(cond_empty);
57     if (outstanding < queue_capacity)
58         g_cond_signal(cond_full);
59     g_mutex_unlock(lock);
60 }
61
62 void writebuf(int fd, const char *buf, size_t len)
63 {
64     while (len > 0) {
65         ssize_t written;
66         written = write(fd, buf, len);
67         if (written < 0 && errno == EINTR)
68             continue;
69         g_assert(written >= 0);
70         buf += written;
71         len -= written;
72     }
73 }
74
75 /************************ Direct-to-filesystem logging ***********************/
76 static int dirfd = -1;
77
78 gpointer fslog_thread(gpointer d)
79 {
80     g_print("Launching filesystem writer thread...\n");
81
82     while (TRUE) {
83         struct item *item = get_item();
84
85         int fd = openat(dirfd, item->key, O_CREAT|O_WRONLY|O_TRUNC, 0666);
86         g_assert(fd >= 0);
87
88         writebuf(fd, item->data, item->len);
89
90         finish_item(item);
91
92         fsync(fd);
93         fsync(dirfd);
94         close(fd);
95     }
96
97     return NULL;
98 }
99
100 void launch_fslog()
101 {
102     dirfd = open(".", O_DIRECTORY);
103     g_assert(dirfd >= 0);
104
105     for (int i = 0; i < 1; i++)
106         g_thread_create(fslog_thread, NULL, FALSE, NULL);
107 }
108
109 /****************************** Single-File Log ******************************/
110 gpointer flatlog_thread(gpointer d)
111 {
112     g_print("Launching flat log writer thread...\n");
113
114     int fd = open("logfile", O_CREAT|O_WRONLY|O_TRUNC, 0666);
115     g_assert(fd >= 0);
116
117     int count = 0;
118
119     while (TRUE) {
120         struct item *item = get_item();
121
122         writebuf(fd, item->key, strlen(item->key) + 1);
123         writebuf(fd, (char *)&item->len, sizeof(item->len));
124         writebuf(fd, item->data, item->len);
125
126         count++;
127         if (count % opt_batchsize == 0)
128             fdatasync(fd);
129
130         finish_item(item);
131     }
132
133     return NULL;
134 }
135
136 void launch_flatlog()
137 {
138     g_thread_create(flatlog_thread, NULL, FALSE, NULL);
139 }
140
141 /************************* Transactional Berkeley DB *************************/
142 gpointer bdb_thread(gpointer d)
143 {
144     g_print("Launching BDB log writer thread...\n");
145
146     int res;
147     DB_ENV *env;
148     DB *db;
149     DB_TXN *txn = NULL;
150     int count = 0;
151
152     res = db_env_create(&env, 0);
153     g_assert(res == 0);
154
155     res = env->open(env, ".",
156                     DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG
157                      | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0644);
158     g_assert(res == 0);
159
160     res = db_create(&db, env, 0);
161     g_assert(res == 0);
162
163     res = db->open(db, NULL, "log.db", "log", DB_BTREE,
164                    DB_CREATE | DB_THREAD | DB_AUTO_COMMIT, 0644);
165     g_assert(res == 0);
166
167     while (TRUE) {
168         if (txn == NULL) {
169             res = env->txn_begin(env, NULL, &txn, 0);
170             g_assert(res == 0);
171         }
172
173         struct item *item = get_item();
174
175         DBT key, value;
176         memset(&key, 0, sizeof(key));
177         memset(&value, 0, sizeof(value));
178
179         key.data = item->key;
180         key.size = strlen(item->key);
181
182         value.data = item->data;
183         value.size = item->len;
184
185         res = db->put(db, NULL, &key, &value, 0);
186         g_assert(res == 0);
187
188         count++;
189         if (count % opt_batchsize == 0) {
190             txn->commit(txn, 0);
191             txn = NULL;
192         }
193
194         finish_item(item);
195     }
196
197     return NULL;
198 }
199
200 void launch_bdb()
201 {
202     g_thread_create(bdb_thread, NULL, FALSE, NULL);
203 }
204
205 int main(int argc, char *argv[])
206 {
207     g_thread_init(NULL);
208     queue = g_async_queue_new();
209     lock = g_mutex_new();
210     cond_empty = g_cond_new();
211     cond_full = g_cond_new();
212
213     int opt;
214     int backend = 0;
215     while ((opt = getopt(argc, argv, "t:s:b:BFD")) != -1) {
216         switch (opt) {
217         case 't':
218             // Set number of log worker threads
219             opt_threads = atoi(optarg);
220             break;
221         case 's':
222             // Set item size (in bytes)
223             item_size = atoi(optarg);
224             break;
225         case 'b':
226             // Set batch size
227             opt_batchsize = atoi(optarg);
228             break;
229         case 'B':
230             // Select BDB backend
231             backend = 'b';
232             break;
233         case 'F':
234             // Select flat file backend
235             backend = 'f';
236             break;
237         case 'D':
238             // Select file system directory backend
239             backend = 'd';
240             break;
241         default: /* '?' */
242             fprintf(stderr, "Usage: %s [-t threads] {-B|-F|-D}\n",
243                     argv[0]);
244             return EXIT_FAILURE;
245         }
246     }
247
248     switch (backend) {
249     case 'b':
250         launch_bdb();
251         break;
252     case 'f':
253         launch_flatlog();
254         break;
255     case 'd':
256         launch_fslog();
257         break;
258     default:
259         fprintf(stderr, "Backend not selected!\n");
260         return EXIT_FAILURE;
261     }
262
263     for (int i = 0; i < (1 << 12); i++) {
264         struct item *item = g_new(struct item, 1);
265         item->key = g_strdup_printf("item-%06d", i);
266         item->data = g_malloc(item_size);
267         item->len = item_size;
268
269         g_mutex_lock(lock);
270         g_async_queue_push(queue, item);
271         outstanding++;
272         if (outstanding == opt_batchsize)
273             g_cond_wait(cond_empty, lock);
274         g_mutex_unlock(lock);
275     }
276
277     g_mutex_lock(lock);
278     while (outstanding > 0)
279         g_cond_wait(cond_empty, lock);
280     g_mutex_unlock(lock);
281
282     return 0;
283 }