using namespace kvstore;
using namespace std;
-static gpointer kvstore_new()
-{
- KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090");
- return client;
-}
-
-static void kvstore_destroy(gpointer store)
-{
- KeyValueClient *client = (KeyValueClient *)store;
- delete client;
-}
+static GThreadPool *thread_pool = NULL;
-static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
+static void kvstore_task(gpointer a, gpointer b)
{
- KeyValueClient *client = (KeyValueClient *)store;
+ BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
+ KeyValueClient *client = (KeyValueClient *)bluesky_store_async_get_handle(async);
- g_return_if_fail(async->status == ASYNC_NEW);
- g_return_if_fail(async->op != STORE_OP_NONE);
+ async->status = ASYNC_RUNNING;
switch (async->op) {
case STORE_OP_GET:
value.length()),
value.length());
async->result = 0;
+ } else {
+ g_warning("Failed to fetch key %s from kvstore", async->key);
}
break;
}
case STORE_OP_PUT:
{
string value(async->data->data, async->data->len);
- client->Put(async->key, value);
+ if (!client->Put(async->key, value)) {
+ g_warning("Failed to store key %s to kvstore", async->key);
+ }
break;
}
default:
- g_warning("Uknown operation type for MemStore: %d\n", async->op);
- return;
+ break;
}
bluesky_store_async_mark_complete(async);
+ bluesky_store_async_unref(async);
+}
+
+static gpointer kvstore_new(const gchar *path)
+{
+ /* TODO: Right now we leak this memory. We should probably clean up in
+ * kvstore_destroy, but it's not a big deal. */
+ const gchar *host = "127.0.0.1", *port = "9090";
+ if (path != NULL) {
+ gchar **target = g_strsplit(path, ":", 0);
+ if (target[0] != NULL) {
+ host = target[0];
+ if (target[1] != NULL) {
+ port = target[1];
+ }
+ }
+ }
+
+ static volatile gsize once = 0;
+ if (g_once_init_enter(&once)) {
+ thread_pool = g_thread_pool_new(kvstore_task, NULL,
+ bluesky_max_threads, FALSE, NULL);
+ g_once_init_leave(&once, 1);
+ }
+
+ g_print("kvstore: %s port %s\n", host, port);
+ KeyValueClient *client = new KeyValueClient(host, port);
+ return client;
+}
+
+static void kvstore_destroy(gpointer store)
+{
+ KeyValueClient *client = (KeyValueClient *)store;
+ delete client;
+}
+
+static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
+{
+ KeyValueClient *client = (KeyValueClient *)store;
+
+ g_return_if_fail(async->status == ASYNC_NEW);
+ g_return_if_fail(async->op != STORE_OP_NONE);
+
+ switch (async->op) {
+ case STORE_OP_GET:
+ case STORE_OP_PUT:
+ async->status = ASYNC_PENDING;
+ bluesky_store_async_ref(async);
+ g_thread_pool_push(thread_pool, async, NULL);
+ break;
+
+ default:
+ g_warning("Uknown operation type for kvstore: %d\n", async->op);
+ bluesky_store_async_mark_complete(async);
+ break;
+ }
}
static void kvstore_cleanup(gpointer store, BlueSkyStoreAsync *async)