size_t len;
};
-const int queue_capacity = 1024;
-const int item_size = 1024;
+int queue_capacity = 1024;
+int item_size = 1024;
+int opt_threads = 1;
+int opt_batchsize = 1;
GAsyncQueue *queue;
int outstanding = 0;
void launch_fslog()
{
- dirfd = open("logdir", O_DIRECTORY);
+ dirfd = open(".", O_DIRECTORY);
g_assert(dirfd >= 0);
for (int i = 0; i < 1; i++)
writebuf(fd, item->data, item->len);
count++;
- if (count % (1 << 8) == 0)
+ if (count % opt_batchsize == 0)
fdatasync(fd);
finish_item(item);
res = db_env_create(&env, 0);
g_assert(res == 0);
- res = env->open(env, "bdb",
+ res = env->open(env, ".",
DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG
| DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0644);
g_assert(res == 0);
g_assert(res == 0);
count++;
- if (count % (1 << 8) == 0) {
+ if (count % opt_batchsize == 0) {
txn->commit(txn, 0);
txn = NULL;
}
cond_empty = g_cond_new();
cond_full = g_cond_new();
- launch_fslog();
- // launch_flatlog();
- // launch_bdb();
+ int opt;
+ int backend = 0;
+ while ((opt = getopt(argc, argv, "t:s:b:BFD")) != -1) {
+ switch (opt) {
+ case 't':
+ // Set number of log worker threads
+ opt_threads = atoi(optarg);
+ break;
+ case 's':
+ // Set item size (in bytes)
+ item_size = atoi(optarg);
+ break;
+ case 'b':
+ // Set batch size
+ opt_batchsize = atoi(optarg);
+ break;
+ case 'B':
+ // Select BDB backend
+ backend = 'b';
+ break;
+ case 'F':
+ // Select flat file backend
+ backend = 'f';
+ break;
+ case 'D':
+ // Select file system directory backend
+ backend = 'd';
+ break;
+ default: /* '?' */
+ fprintf(stderr, "Usage: %s [-t threads] {-B|-F|-D}\n",
+ argv[0]);
+ return EXIT_FAILURE;
+ }
+ }
+
+ switch (backend) {
+ case 'b':
+ launch_bdb();
+ break;
+ case 'f':
+ launch_flatlog();
+ break;
+ case 'd':
+ launch_fslog();
+ break;
+ default:
+ fprintf(stderr, "Backend not selected!\n");
+ return EXIT_FAILURE;
+ }
for (int i = 0; i < (1 << 12); i++) {
struct item *item = g_new(struct item, 1);
item->len = item_size;
g_mutex_lock(lock);
- while (outstanding >= queue_capacity)
- g_cond_wait(cond_full, lock);
g_async_queue_push(queue, item);
outstanding++;
+ if (outstanding == opt_batchsize)
+ g_cond_wait(cond_empty, lock);
g_mutex_unlock(lock);
}