X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=bluesky%2Fstore-kv.cc;h=38051430ac32089ad5bb667eb1a78d866b4bd510;hb=8ff0fd08d6e1cc97cdb7e94b7cd97dc28c29e674;hp=7a6ac714ae7ec259b24a258f11419fac8cc41595;hpb=25ddfddd324493ee87cc3770245104420b35fd75;p=bluesky.git diff --git a/bluesky/store-kv.cc b/bluesky/store-kv.cc index 7a6ac71..3805143 100644 --- a/bluesky/store-kv.cc +++ b/bluesky/store-kv.cc @@ -3,7 +3,29 @@ * Copyright (C) 2009 The Regents of the University of California * Written by Michael Vrable * - * TODO: Licensing + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. */ /* Interface to John McCullough's simple key/value store. */ @@ -21,24 +43,14 @@ using namespace boost; using namespace kvstore; using namespace std; -static gpointer kvstore_new() -{ - KeyValueClient *client = new KeyValueClient("127.0.0.1", "9090"); - return client; -} - -static void kvstore_destroy(gpointer store) -{ - KeyValueClient *client = (KeyValueClient *)store; - delete client; -} +static GThreadPool *thread_pool = NULL; -static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async) +static void kvstore_task(gpointer a, gpointer b) { - KeyValueClient *client = (KeyValueClient *)store; + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; + KeyValueClient *client = (KeyValueClient *)bluesky_store_async_get_handle(async); - g_return_if_fail(async->status == ASYNC_NEW); - g_return_if_fail(async->op != STORE_OP_NONE); + async->status = ASYNC_RUNNING; switch (async->op) { case STORE_OP_GET: @@ -49,6 +61,8 @@ static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async) value.length()), value.length()); async->result = 0; + } else { + g_warning("Failed to fetch key %s from kvstore", async->key); } break; } @@ -56,16 +70,73 @@ static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async) case STORE_OP_PUT: { string value(async->data->data, async->data->len); - client->Put(async->key, value); + if (!client->Put(async->key, value)) { + g_warning("Failed to store key %s to kvstore", async->key); + } break; } default: - g_warning("Uknown operation type for MemStore: %d\n", async->op); - return; + break; } bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); +} + +static gpointer kvstore_new(const gchar *path) +{ + /* TODO: Right now we leak this memory. We should probably clean up in + * kvstore_destroy, but it's not a big deal. */ + const gchar *host = "127.0.0.1", *port = "9090"; + if (path != NULL) { + gchar **target = g_strsplit(path, ":", 0); + if (target[0] != NULL) { + host = target[0]; + if (target[1] != NULL) { + port = target[1]; + } + } + } + + static volatile gsize once = 0; + if (g_once_init_enter(&once)) { + thread_pool = g_thread_pool_new(kvstore_task, NULL, + bluesky_max_threads, FALSE, NULL); + g_once_init_leave(&once, 1); + } + + g_print("kvstore: %s port %s\n", host, port); + KeyValueClient *client = new KeyValueClient(host, port); + return client; +} + +static void kvstore_destroy(gpointer store) +{ + KeyValueClient *client = (KeyValueClient *)store; + delete client; +} + +static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async) +{ + KeyValueClient *client = (KeyValueClient *)store; + + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + bluesky_store_async_ref(async); + g_thread_pool_push(thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for kvstore: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } } static void kvstore_cleanup(gpointer store, BlueSkyStoreAsync *async)