X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fstore.c;h=5cc475efcdbd2dc7c8ae0a08cc4b947a5f0641cb;hb=83fd6b61a6e092a22d4d5e59ed95f05f5e287f11;hp=48a69b478097d2937e199a9d863e45243d81869b;hpb=cf3d405919942d3adf810bba0ed5df28a91b6d2d;p=bluesky.git diff --git a/bluesky/store.c b/bluesky/store.c index 48a69b4..5cc475e 100644 --- a/bluesky/store.c +++ b/bluesky/store.c @@ -24,6 +24,8 @@ struct _BlueSkyStore { GMutex *lock; GCond *cond_idle; int pending; /* Count of operations not yet complete. */ + + struct bluesky_stats *stats_get, *stats_put; }; GHashTable *store_implementations; @@ -71,6 +73,10 @@ BlueSkyStore *bluesky_store_new(const gchar *type) store->lock = g_mutex_new(); store->cond_idle = g_cond_new(); store->pending = 0; + store->stats_get = bluesky_stats_new(g_strdup_printf("Store[%s]: GETS", + type)); + store->stats_put = bluesky_stats_new(g_strdup_printf("Store[%s]: PUTS", + type)); g_free(scheme); return store; } @@ -219,9 +225,19 @@ void bluesky_store_async_mark_complete(BlueSkyStoreAsync *async) g_thread_pool_push(notifier_thread_pool, nl, NULL); } - g_log("bluesky/store", G_LOG_LEVEL_DEBUG, - "[%p] complete: elapsed = %"PRIi64" ns, latency = %"PRIi64" ns", - async, elapsed, latency); + if (bluesky_verbose) { + g_log("bluesky/store", G_LOG_LEVEL_DEBUG, + "[%p] complete: elapsed = %"PRIi64" ns, latency = %"PRIi64" ns", + async, elapsed, latency); + } + + if (async->data) { + if (async->op == STORE_OP_GET) { + bluesky_stats_add(async->store->stats_get, async->data->len); + } else if (async->op == STORE_OP_PUT) { + bluesky_stats_add(async->store->stats_put, async->data->len); + } + } } void bluesky_store_async_submit(BlueSkyStoreAsync *async) @@ -234,13 +250,15 @@ void bluesky_store_async_submit(BlueSkyStoreAsync *async) // processing was started, if there could be a delay from submission time. async->exec_time = bluesky_now_hires(); - g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "[%p] submit: %s %s", - async, - async->op == STORE_OP_GET ? "GET" - : async->op == STORE_OP_PUT ? "PUT" - : async->op == STORE_OP_DELETE ? "DELETE" - : async->op == STORE_OP_BARRIER ? "BARRIER" : "???", - async->key); + if (bluesky_verbose) { + g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "[%p] submit: %s %s", + async, + async->op == STORE_OP_GET ? "GET" + : async->op == STORE_OP_PUT ? "PUT" + : async->op == STORE_OP_DELETE ? "DELETE" + : async->op == STORE_OP_BARRIER ? "BARRIER" : "???", + async->key); + } /* Barriers are handled specially, and not handed down the storage * implementation layer. */ @@ -274,13 +292,15 @@ void bluesky_store_add_barrier(BlueSkyStoreAsync *barrier, g_mutex_unlock(barrier->lock); g_mutex_lock(async->lock); - if (async->barrier == NULL) { + if (async->barrier == NULL && async->status != ASYNC_COMPLETE) { async->barrier = barrier; + g_mutex_unlock(async->lock); } else { - g_warning("Adding async to more than one barrier!\n"); + if (async->barrier != NULL) + g_warning("Adding async to more than one barrier!\n"); + g_mutex_unlock(async->lock); bluesky_store_async_add_notifier(async, op_complete, barrier); } - g_mutex_unlock(async->lock); } static void notifier_task(gpointer n, gpointer s) @@ -302,12 +322,17 @@ static void notifier_task(gpointer n, gpointer s) void bluesky_store_sync(BlueSkyStore *store) { g_mutex_lock(store->lock); - g_print("Waiting for pending store operations to complete...\n"); + if (bluesky_verbose) { + g_log("bluesky/store", G_LOG_LEVEL_DEBUG, + "Waiting for pending store operations to complete..."); + } while (store->pending > 0) { g_cond_wait(store->cond_idle, store->lock); } g_mutex_unlock(store->lock); - g_print("Operations are complete.\n"); + if (bluesky_verbose) { + g_log("bluesky/store", G_LOG_LEVEL_DEBUG, "Operations are complete."); + } } /* Convenience wrappers that perform a single operation synchronously. */ @@ -476,6 +501,32 @@ static BlueSkyStoreImplementation filestore_impl = { .cleanup = filestore_cleanup, }; +/* A store implementation which simply discards all data, for testing. */ +static gpointer nullstore_create(const gchar *path) +{ + return (gpointer)nullstore_create; +} + +static void nullstore_destroy(gpointer store) +{ +} + +static void nullstore_submit(gpointer s, BlueSkyStoreAsync *async) +{ + bluesky_store_async_mark_complete(async); +} + +static void nullstore_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ +} + +static BlueSkyStoreImplementation nullstore_impl = { + .create = nullstore_create, + .destroy = nullstore_destroy, + .submit = nullstore_submit, + .cleanup = nullstore_cleanup, +}; + void bluesky_store_init() { store_implementations = g_hash_table_new(g_str_hash, g_str_equal); @@ -483,4 +534,5 @@ void bluesky_store_init() bluesky_max_threads, FALSE, NULL); bluesky_store_register(&memstore_impl, "mem"); bluesky_store_register(&filestore_impl, "file"); + bluesky_store_register(&nullstore_impl, "null"); }