X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=logbench%2Flogbench.c;h=6f3a858d223a1c208ba2b8afce60d0137e4157a3;hb=f25981daece838a116a21ba8a7dc89582f1641d5;hp=d811dc962f43b9680b076816e3c4a105adfd35b0;hpb=4f279c7107dd917c58c31ab310b0eb6afd45644f;p=bluesky.git diff --git a/logbench/logbench.c b/logbench/logbench.c index d811dc9..6f3a858 100644 --- a/logbench/logbench.c +++ b/logbench/logbench.c @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -29,14 +30,26 @@ struct item { size_t len; }; -const int queue_capacity = 1024; -const int item_size = 1024; +int queue_capacity = 1024; +int item_size = 1024; +int opt_threads = 1; +int opt_batchsize = 1; +int opt_writes = (1 << 12); +int opt_bdb_async = FALSE; GAsyncQueue *queue; int outstanding = 0; GMutex *lock; GCond *cond_empty, *cond_full; +int64_t get_ns() +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + + return ts.tv_sec * 1000000000LL + ts.tv_nsec; +} + struct item *get_item() { return (struct item *)g_async_queue_pop(queue); @@ -75,8 +88,6 @@ static int dirfd = -1; gpointer fslog_thread(gpointer d) { - g_print("Launching filesystem writer thread...\n"); - while (TRUE) { struct item *item = get_item(); @@ -97,7 +108,7 @@ gpointer fslog_thread(gpointer d) void launch_fslog() { - dirfd = open("logdir", O_DIRECTORY); + dirfd = open(".", O_DIRECTORY); g_assert(dirfd >= 0); for (int i = 0; i < 1; i++) @@ -107,8 +118,6 @@ void launch_fslog() /****************************** Single-File Log ******************************/ gpointer flatlog_thread(gpointer d) { - g_print("Launching flat log writer thread...\n"); - int fd = open("logfile", O_CREAT|O_WRONLY|O_TRUNC, 0666); g_assert(fd >= 0); @@ -122,7 +131,7 @@ gpointer flatlog_thread(gpointer d) writebuf(fd, item->data, item->len); count++; - if (count % (1 << 8) == 0) + if (count % opt_batchsize == 0) fdatasync(fd); finish_item(item); @@ -139,8 +148,6 @@ void launch_flatlog() /************************* Transactional Berkeley DB *************************/ gpointer bdb_thread(gpointer d) { - g_print("Launching BDB log writer thread...\n"); - int res; DB_ENV *env; DB *db; @@ -150,11 +157,16 @@ gpointer bdb_thread(gpointer d) res = db_env_create(&env, 0); g_assert(res == 0); - res = env->open(env, "bdb", + res = env->open(env, ".", DB_CREATE | DB_RECOVER | DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0644); g_assert(res == 0); + if (opt_bdb_async) { + res = env->set_flags(env, DB_TXN_WRITE_NOSYNC, 1); + g_assert(res == 0); + } + res = db_create(&db, env, 0); g_assert(res == 0); @@ -163,7 +175,7 @@ gpointer bdb_thread(gpointer d) g_assert(res == 0); while (TRUE) { - if (txn == NULL) { + if (txn == NULL && !opt_bdb_async) { res = env->txn_begin(env, NULL, &txn, 0); g_assert(res == 0); } @@ -180,13 +192,17 @@ gpointer bdb_thread(gpointer d) value.data = item->data; value.size = item->len; - res = db->put(db, NULL, &key, &value, 0); + res = db->put(db, opt_bdb_async ? NULL : txn, &key, &value, 0); g_assert(res == 0); count++; - if (count % (1 << 8) == 0) { - txn->commit(txn, 0); - txn = NULL; + if (count % opt_batchsize == 0) { + if (opt_bdb_async) { + env->txn_checkpoint(env, 0, 0, 0); + } else { + txn->commit(txn, 0); + txn = NULL; + } } finish_item(item); @@ -202,27 +218,84 @@ void launch_bdb() int main(int argc, char *argv[]) { + int64_t time_start, time_end; + g_thread_init(NULL); queue = g_async_queue_new(); lock = g_mutex_new(); cond_empty = g_cond_new(); cond_full = g_cond_new(); - launch_fslog(); - // launch_flatlog(); - // launch_bdb(); + int opt; + int backend = 0; + while ((opt = getopt(argc, argv, "at:s:b:n:BFD")) != -1) { + switch (opt) { + case 'a': + // Make BDB log writes more asynchronous + opt_bdb_async = TRUE; + break; + case 't': + // Set number of log worker threads + opt_threads = atoi(optarg); + break; + case 's': + // Set item size (in bytes) + item_size = atoi(optarg); + break; + case 'b': + // Set batch size + opt_batchsize = atoi(optarg); + break; + case 'n': + // Set object count + opt_writes = atoi(optarg); + break; + case 'B': + // Select BDB backend + backend = 'b'; + break; + case 'F': + // Select flat file backend + backend = 'f'; + break; + case 'D': + // Select file system directory backend + backend = 'd'; + break; + default: /* '?' */ + fprintf(stderr, "Usage: %s [-t threads] {-B|-F|-D}\n", + argv[0]); + return EXIT_FAILURE; + } + } + + switch (backend) { + case 'b': + launch_bdb(); + break; + case 'f': + launch_flatlog(); + break; + case 'd': + launch_fslog(); + break; + default: + fprintf(stderr, "Backend not selected!\n"); + return EXIT_FAILURE; + } - for (int i = 0; i < (1 << 12); i++) { + time_start = get_ns(); + for (int i = 0; i < opt_writes; i++) { struct item *item = g_new(struct item, 1); item->key = g_strdup_printf("item-%06d", i); item->data = g_malloc(item_size); item->len = item_size; g_mutex_lock(lock); - while (outstanding >= queue_capacity) - g_cond_wait(cond_full, lock); g_async_queue_push(queue, item); outstanding++; + if (outstanding == opt_batchsize) + g_cond_wait(cond_empty, lock); g_mutex_unlock(lock); } @@ -230,6 +303,23 @@ int main(int argc, char *argv[]) while (outstanding > 0) g_cond_wait(cond_empty, lock); g_mutex_unlock(lock); + time_end = get_ns(); + + double elapsed = (time_end - time_start) / 1e9; + printf("Elapsed: %f s\nThroughput: %f txn/s, %f MiB/s\n", + elapsed, opt_writes / elapsed, + opt_writes / elapsed * item_size / (1 << 20)); + + if (backend == 'b' && opt_bdb_async) + backend = 'B'; + + FILE *f = fopen("../logbench.data", "a"); + g_assert(f != NULL); + fprintf(f, "%c\t%d\t%d\t%d\t%f\t%f\t%f\n", + backend, item_size, opt_writes, opt_batchsize, + elapsed, opt_writes / elapsed, + opt_writes / elapsed * item_size / (1 << 20)); + fclose(f); return 0; }