Also, add synthetic delays to the kvstore server for latency testing.
const gchar *name);
BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store);
+gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async);
void bluesky_store_async_ref(BlueSkyStoreAsync *async);
void bluesky_store_async_unref(BlueSkyStoreAsync *async);
void bluesky_store_async_wait(BlueSkyStoreAsync *async);
using namespace kvstore;
using namespace std;
-static gpointer kvstore_new()
-{
- KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090");
- return client;
-}
-
-static void kvstore_destroy(gpointer store)
-{
- KeyValueClient *client = (KeyValueClient *)store;
- delete client;
-}
+static GThreadPool *thread_pool = NULL;
-static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
+static void kvstore_task(gpointer a, gpointer b)
{
- KeyValueClient *client = (KeyValueClient *)store;
+ BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
+ KeyValueClient *client = (KeyValueClient *)bluesky_store_async_get_handle(async);
- g_return_if_fail(async->status == ASYNC_NEW);
- g_return_if_fail(async->op != STORE_OP_NONE);
+ async->status = ASYNC_RUNNING;
switch (async->op) {
case STORE_OP_GET:
}
default:
- g_warning("Uknown operation type for MemStore: %d\n", async->op);
- return;
+ break;
}
bluesky_store_async_mark_complete(async);
+ bluesky_store_async_unref(async);
+}
+
+static gpointer kvstore_new()
+{
+ static volatile gsize once = 0;
+ if (g_once_init_enter(&once)) {
+ thread_pool = g_thread_pool_new(kvstore_task, NULL, -1, FALSE, NULL);
+ g_once_init_leave(&once, 1);
+ }
+
+ KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090");
+ return client;
+}
+
+static void kvstore_destroy(gpointer store)
+{
+ KeyValueClient *client = (KeyValueClient *)store;
+ delete client;
+}
+
+static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
+{
+ KeyValueClient *client = (KeyValueClient *)store;
+
+ g_return_if_fail(async->status == ASYNC_NEW);
+ g_return_if_fail(async->op != STORE_OP_NONE);
+
+ switch (async->op) {
+ case STORE_OP_GET:
+ case STORE_OP_PUT:
+ async->status = ASYNC_PENDING;
+ bluesky_store_async_ref(async);
+ g_thread_pool_push(thread_pool, async, NULL);
+ break;
+
+ default:
+ g_warning("Uknown operation type for kvstore: %d\n", async->op);
+ bluesky_store_async_mark_complete(async);
+ break;
+ }
}
static void kvstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
return async;
}
+gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async)
+{
+ return async->store->handle;
+}
+
void bluesky_store_async_ref(BlueSkyStoreAsync *async)
{
if (async == NULL)
namespace kvstore
{
-MemoryBackend::~MemoryBackend()
+MemoryBackend::~MemoryBackend()
{
}
0644);
- if (res != 0)
+ if (res != 0)
{
cerr << db_strerror(res) << endl;
throw std::runtime_error("BDB ENV Open Fail");
#include "kvservice.h"
#include <iostream>
+extern "C" {
+#include <time.h>
+#include <sys/time.h>
+}
+
using namespace std;
+/* Timing and delay functionality. We allow the get and put operations to have
+ * a specified minimum latency, and will sleep if the operation would have
+ * completed before that time. This can be used in benchmarking to see the
+ * effect of increasing latency on an application.
+ *
+ * Call gettimeofday at the start of the operation to get the starting time,
+ * and then use minimum_delay to wait until at least the specified number of
+ * microseconds have elapsed. */
+static void minimum_delay(const struct timeval *tv, unsigned int min_usec)
+{
+ struct timeval now;
+ if (gettimeofday(&now, NULL) < 0)
+ return;
+
+ int64_t t1, t2; /* Times converted to straight microseconds */
+ t1 = (int64_t)tv->tv_sec * 1000000 + tv->tv_usec;
+ t2 = (int64_t)now.tv_sec * 1000000 + now.tv_usec;
+
+ unsigned int elapsed = t2 - t1;
+ if (elapsed >= min_usec)
+ return;
+
+ struct timespec delay;
+ delay.tv_sec = (min_usec - elapsed) / 1000000;
+ delay.tv_nsec = ((min_usec - elapsed) % 1000000) * 1000;
+
+ while (nanosleep(&delay, &delay) != 0 && errno == EINTR)
+ ;
+}
+
namespace kvstore
{
::kvrpc::PutReply* response,
::google::protobuf::Closure* done)
{
+ struct timeval start;
+ gettimeofday(&start, NULL);
+
if (_backend->Put(request->key(), request->value()))
{
response->set_result(kvrpc::SUCCESS);
response->set_result(kvrpc::FAILURE);
}
+ minimum_delay(&start, 1000000);
done->Run();
}
::kvrpc::GetReply* response,
::google::protobuf::Closure* done)
{
+ struct timeval start;
+ gettimeofday(&start, NULL);
+
string value;
if (_backend->Get(request->key(), &value))
{
{
response->set_result(kvrpc::FAILURE);
}
+
+ minimum_delay(&start, 1000000);
done->Run();
}