From bb92882c6f3b517f8a30c268792642870ac896d2 Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Wed, 17 Feb 2010 14:53:23 -0800 Subject: [PATCH] Make kvstore backend in BlueSky asynchronous. Also, add synthetic delays to the kvstore server for latency testing. --- bluesky/bluesky-private.h | 1 + bluesky/store-kv.cc | 64 ++++++++++++++++++++++++++++----------- bluesky/store.c | 5 +++ kvstore/backend.cc | 4 +-- kvstore/kvservice.cc | 44 +++++++++++++++++++++++++++ 5 files changed, 99 insertions(+), 19 deletions(-) diff --git a/bluesky/bluesky-private.h b/bluesky/bluesky-private.h index a4c8a5e..320886c 100644 --- a/bluesky/bluesky-private.h +++ b/bluesky/bluesky-private.h @@ -100,6 +100,7 @@ void bluesky_store_register(const BlueSkyStoreImplementation *impl, const gchar *name); BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store); +gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async); void bluesky_store_async_ref(BlueSkyStoreAsync *async); void bluesky_store_async_unref(BlueSkyStoreAsync *async); void bluesky_store_async_wait(BlueSkyStoreAsync *async); diff --git a/bluesky/store-kv.cc b/bluesky/store-kv.cc index 7a6ac71..c1b0524 100644 --- a/bluesky/store-kv.cc +++ b/bluesky/store-kv.cc @@ -21,24 +21,14 @@ using namespace boost; using namespace kvstore; using namespace std; -static gpointer kvstore_new() -{ - KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090"); - return client; -} - -static void kvstore_destroy(gpointer store) -{ - KeyValueClient *client = (KeyValueClient *)store; - delete client; -} +static GThreadPool *thread_pool = NULL; -static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async) +static void kvstore_task(gpointer a, gpointer b) { - KeyValueClient *client = (KeyValueClient *)store; + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; + KeyValueClient *client = (KeyValueClient *)bluesky_store_async_get_handle(async); - g_return_if_fail(async->status == ASYNC_NEW); - g_return_if_fail(async->op != STORE_OP_NONE); + async->status = ASYNC_RUNNING; switch (async->op) { case STORE_OP_GET: @@ -61,11 +51,51 @@ static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async) } default: - g_warning("Uknown operation type for MemStore: %d\n", async->op); - return; + break; } bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); +} + +static gpointer kvstore_new() +{ + static volatile gsize once = 0; + if (g_once_init_enter(&once)) { + thread_pool = g_thread_pool_new(kvstore_task, NULL, -1, FALSE, NULL); + g_once_init_leave(&once, 1); + } + + KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090"); + return client; +} + +static void kvstore_destroy(gpointer store) +{ + KeyValueClient *client = (KeyValueClient *)store; + delete client; +} + +static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async) +{ + KeyValueClient *client = (KeyValueClient *)store; + + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + bluesky_store_async_ref(async); + g_thread_pool_push(thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for kvstore: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } } static void kvstore_cleanup(gpointer store, BlueSkyStoreAsync *async) diff --git a/bluesky/store.c b/bluesky/store.c index 785372b..8cf9f0e 100644 --- a/bluesky/store.c +++ b/bluesky/store.c @@ -88,6 +88,11 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store) return async; } +gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async) +{ + return async->store->handle; +} + void bluesky_store_async_ref(BlueSkyStoreAsync *async) { if (async == NULL) diff --git a/kvstore/backend.cc b/kvstore/backend.cc index 04a6764..c757560 100644 --- a/kvstore/backend.cc +++ b/kvstore/backend.cc @@ -15,7 +15,7 @@ extern "C" { namespace kvstore { -MemoryBackend::~MemoryBackend() +MemoryBackend::~MemoryBackend() { } @@ -118,7 +118,7 @@ public: 0644); - if (res != 0) + if (res != 0) { cerr << db_strerror(res) << endl; throw std::runtime_error("BDB ENV Open Fail"); diff --git a/kvstore/kvservice.cc b/kvstore/kvservice.cc index e241bc9..59e194a 100644 --- a/kvstore/kvservice.cc +++ b/kvstore/kvservice.cc @@ -1,8 +1,43 @@ #include "kvservice.h" #include +extern "C" { +#include +#include +} + using namespace std; +/* Timing and delay functionality. We allow the get and put operations to have + * a specified minimum latency, and will sleep if the operation would have + * completed before that time. This can be used in benchmarking to see the + * effect of increasing latency on an application. + * + * Call gettimeofday at the start of the operation to get the starting time, + * and then use minimum_delay to wait until at least the specified number of + * microseconds have elapsed. */ +static void minimum_delay(const struct timeval *tv, unsigned int min_usec) +{ + struct timeval now; + if (gettimeofday(&now, NULL) < 0) + return; + + int64_t t1, t2; /* Times converted to straight microseconds */ + t1 = (int64_t)tv->tv_sec * 1000000 + tv->tv_usec; + t2 = (int64_t)now.tv_sec * 1000000 + now.tv_usec; + + unsigned int elapsed = t2 - t1; + if (elapsed >= min_usec) + return; + + struct timespec delay; + delay.tv_sec = (min_usec - elapsed) / 1000000; + delay.tv_nsec = ((min_usec - elapsed) % 1000000) * 1000; + + while (nanosleep(&delay, &delay) != 0 && errno == EINTR) + ; +} + namespace kvstore { @@ -21,6 +56,9 @@ void KeyValueRpcService::PutValue( ::kvrpc::PutReply* response, ::google::protobuf::Closure* done) { + struct timeval start; + gettimeofday(&start, NULL); + if (_backend->Put(request->key(), request->value())) { response->set_result(kvrpc::SUCCESS); @@ -30,6 +68,7 @@ void KeyValueRpcService::PutValue( response->set_result(kvrpc::FAILURE); } + minimum_delay(&start, 1000000); done->Run(); } @@ -39,6 +78,9 @@ void KeyValueRpcService::GetValue( ::kvrpc::GetReply* response, ::google::protobuf::Closure* done) { + struct timeval start; + gettimeofday(&start, NULL); + string value; if (_backend->Get(request->key(), &value)) { @@ -49,6 +91,8 @@ void KeyValueRpcService::GetValue( { response->set_result(kvrpc::FAILURE); } + + minimum_delay(&start, 1000000); done->Run(); } -- 2.20.1