Make kvstore backend in BlueSky asynchronous.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 17 Feb 2010 22:53:23 +0000 (14:53 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 17 Feb 2010 22:53:23 +0000 (14:53 -0800)
Also, add synthetic delays to the kvstore server for latency testing.

bluesky/bluesky-private.h
bluesky/store-kv.cc
bluesky/store.c
kvstore/backend.cc
kvstore/kvservice.cc

index a4c8a5e..320886c 100644 (file)
@@ -100,6 +100,7 @@ void bluesky_store_register(const BlueSkyStoreImplementation *impl,
                             const gchar *name);
 
 BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store);
+gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async);
 void bluesky_store_async_ref(BlueSkyStoreAsync *async);
 void bluesky_store_async_unref(BlueSkyStoreAsync *async);
 void bluesky_store_async_wait(BlueSkyStoreAsync *async);
index 7a6ac71..c1b0524 100644 (file)
@@ -21,24 +21,14 @@ using namespace boost;
 using namespace kvstore;
 using namespace std;
 
-static gpointer kvstore_new()
-{
-    KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090");
-    return client;
-}
-
-static void kvstore_destroy(gpointer store)
-{
-    KeyValueClient *client = (KeyValueClient *)store;
-    delete client;
-}
+static GThreadPool *thread_pool = NULL;
 
-static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
+static void kvstore_task(gpointer a, gpointer b)
 {
-    KeyValueClient *client = (KeyValueClient *)store;
+    BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
+    KeyValueClient *client = (KeyValueClient *)bluesky_store_async_get_handle(async);
 
-    g_return_if_fail(async->status == ASYNC_NEW);
-    g_return_if_fail(async->op != STORE_OP_NONE);
+    async->status = ASYNC_RUNNING;
 
     switch (async->op) {
     case STORE_OP_GET:
@@ -61,11 +51,51 @@ static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
     }
 
     default:
-        g_warning("Uknown operation type for MemStore: %d\n", async->op);
-        return;
+        break;
     }
 
     bluesky_store_async_mark_complete(async);
+    bluesky_store_async_unref(async);
+}
+
+static gpointer kvstore_new()
+{
+    static volatile gsize once = 0;
+    if (g_once_init_enter(&once)) {
+        thread_pool = g_thread_pool_new(kvstore_task, NULL, -1, FALSE, NULL);
+        g_once_init_leave(&once, 1);
+    }
+
+    KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090");
+    return client;
+}
+
+static void kvstore_destroy(gpointer store)
+{
+    KeyValueClient *client = (KeyValueClient *)store;
+    delete client;
+}
+
+static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
+{
+    KeyValueClient *client = (KeyValueClient *)store;
+
+    g_return_if_fail(async->status == ASYNC_NEW);
+    g_return_if_fail(async->op != STORE_OP_NONE);
+
+    switch (async->op) {
+    case STORE_OP_GET:
+    case STORE_OP_PUT:
+        async->status = ASYNC_PENDING;
+        bluesky_store_async_ref(async);
+        g_thread_pool_push(thread_pool, async, NULL);
+        break;
+
+    default:
+        g_warning("Uknown operation type for kvstore: %d\n", async->op);
+        bluesky_store_async_mark_complete(async);
+        break;
+    }
 }
 
 static void kvstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
index 785372b..8cf9f0e 100644 (file)
@@ -88,6 +88,11 @@ BlueSkyStoreAsync *bluesky_store_async_new(BlueSkyStore *store)
     return async;
 }
 
+gpointer bluesky_store_async_get_handle(BlueSkyStoreAsync *async)
+{
+    return async->store->handle;
+}
+
 void bluesky_store_async_ref(BlueSkyStoreAsync *async)
 {
     if (async == NULL)
index 04a6764..c757560 100644 (file)
@@ -15,7 +15,7 @@ extern "C" {
 namespace kvstore
 {
 
-MemoryBackend::~MemoryBackend() 
+MemoryBackend::~MemoryBackend()
 {
 }
 
@@ -118,7 +118,7 @@ public:
                            0644);
 
 
-        if (res != 0) 
+        if (res != 0)
         {
             cerr << db_strerror(res) << endl;
             throw std::runtime_error("BDB ENV Open Fail");
index e241bc9..59e194a 100644 (file)
@@ -1,8 +1,43 @@
 #include "kvservice.h"
 #include <iostream>
 
+extern "C" {
+#include <time.h>
+#include <sys/time.h>
+}
+
 using namespace std;
 
+/* Timing and delay functionality.  We allow the get and put operations to have
+ * a specified minimum latency, and will sleep if the operation would have
+ * completed before that time.  This can be used in benchmarking to see the
+ * effect of increasing latency on an application.
+ *
+ * Call gettimeofday at the start of the operation to get the starting time,
+ * and then use minimum_delay to wait until at least the specified number of
+ * microseconds have elapsed. */
+static void minimum_delay(const struct timeval *tv, unsigned int min_usec)
+{
+    struct timeval now;
+    if (gettimeofday(&now, NULL) < 0)
+        return;
+
+    int64_t t1, t2;             /* Times converted to straight microseconds */
+    t1 = (int64_t)tv->tv_sec * 1000000 + tv->tv_usec;
+    t2 = (int64_t)now.tv_sec * 1000000 + now.tv_usec;
+
+    unsigned int elapsed = t2 - t1;
+    if (elapsed >= min_usec)
+        return;
+
+    struct timespec delay;
+    delay.tv_sec = (min_usec - elapsed) / 1000000;
+    delay.tv_nsec = ((min_usec - elapsed) % 1000000) * 1000;
+
+    while (nanosleep(&delay, &delay) != 0 && errno == EINTR)
+        ;
+}
+
 namespace kvstore
 {
 
@@ -21,6 +56,9 @@ void KeyValueRpcService::PutValue(
                       ::kvrpc::PutReply* response,
                       ::google::protobuf::Closure* done)
 {
+    struct timeval start;
+    gettimeofday(&start, NULL);
+
     if (_backend->Put(request->key(), request->value()))
     {
         response->set_result(kvrpc::SUCCESS);
@@ -30,6 +68,7 @@ void KeyValueRpcService::PutValue(
         response->set_result(kvrpc::FAILURE);
     }
 
+    minimum_delay(&start, 1000000);
     done->Run();
 }
 
@@ -39,6 +78,9 @@ void KeyValueRpcService::GetValue(
                       ::kvrpc::GetReply* response,
                       ::google::protobuf::Closure* done)
 {
+    struct timeval start;
+    gettimeofday(&start, NULL);
+
     string value;
     if (_backend->Get(request->key(), &value))
     {
@@ -49,6 +91,8 @@ void KeyValueRpcService::GetValue(
     {
         response->set_result(kvrpc::FAILURE);
     }
+
+    minimum_delay(&start, 1000000);
     done->Run();
 }