static gpointer bdbstore_thread(gpointer data)
{
BDBStore *store = (BDBStore *)data;
+ DB_TXN *txn = NULL;
+
+ // Number of operations in the current transaction
+ int transaction_size = 0;
while (TRUE) {
int res;
BlueSkyStoreAsync *async;
+ if (txn == NULL) {
+ res = store->env->txn_begin(store->env, NULL, &txn, 0);
+ if (res != 0) {
+ fprintf(stderr, "Unable to begin transaction!\n");
+ return NULL;
+ }
+ }
+
async = (BlueSkyStoreAsync *)g_async_queue_pop(store->operations);
async->status = ASYNC_RUNNING;
async->exec_time = bluesky_now_hires();
if (async->op == STORE_OP_GET) {
value.flags = DB_DBT_MALLOC;
- res = store->db->get(store->db, NULL, &key, &value, 0);
+ res = store->db->get(store->db, txn, &key, &value, 0);
async->result = res;
async->data = NULL;
value.data = async->data->data;
value.size = async->data->len;
- res = store->db->put(store->db, NULL, &key, &value, 0);
+ res = store->db->put(store->db, txn, &key, &value, 0);
if (res != 0) {
fprintf(stderr, "BDB write failure: %s\n", db_strerror(res));
bluesky_store_async_mark_complete(async);
bluesky_store_async_unref(async);
+ transaction_size++;
+
+ if (transaction_size >= 64) {
+ txn->commit(txn, 0);
+ txn = NULL;
+ transaction_size = 0;
+ }
}
return NULL;