59ba530ad9ec7ecb88215da3b9f5ada4a3cec912
[bluesky.git] / bluesky / store-kv.cc
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Interface to John McCullough's simple key/value store. */
10
11 #include <stdint.h>
12 #include <stdlib.h>
13 #include <glib.h>
14 #include <string.h>
15
16 #include "bluesky-private.h"
17 #include "kvservice.h"
18 #include "kvclient.h"
19
20 using namespace boost;
21 using namespace kvstore;
22 using namespace std;
23
24 static GThreadPool *thread_pool = NULL;
25
26 static void kvstore_task(gpointer a, gpointer b)
27 {
28     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
29     KeyValueClient *client = (KeyValueClient *)bluesky_store_async_get_handle(async);
30
31     async->status = ASYNC_RUNNING;
32
33     switch (async->op) {
34     case STORE_OP_GET:
35     {
36         string value;
37         if (client->Get(async->key, &value)) {
38             async->data = bluesky_string_new(g_memdup(value.c_str(),
39                                                     value.length()),
40                                             value.length());
41             async->result = 0;
42         } else {
43             g_warning("Failed to fetch key %s from kvstore", async->key);
44         }
45         break;
46     }
47
48     case STORE_OP_PUT:
49     {
50         string value(async->data->data, async->data->len);
51         if (!client->Put(async->key, value)) {
52             g_warning("Failed to store key %s to kvstore", async->key);
53         }
54         break;
55     }
56
57     default:
58         break;
59     }
60
61     bluesky_store_async_mark_complete(async);
62     bluesky_store_async_unref(async);
63 }
64
65 static gpointer kvstore_new(const gchar *path)
66 {
67     /* TODO: Right now we leak this memory.  We should probably clean up in
68      * kvstore_destroy, but it's not a big deal. */
69     const gchar *host = "127.0.0.1", *port = "9090";
70     if (path != NULL) {
71         gchar **target = g_strsplit(path, ":", 0);
72         if (target[0] != NULL) {
73             host = target[0];
74             if (target[1] != NULL) {
75                 port = target[1];
76             }
77         }
78     }
79
80     static volatile gsize once = 0;
81     if (g_once_init_enter(&once)) {
82         thread_pool = g_thread_pool_new(kvstore_task, NULL,
83                                         bluesky_max_threads, FALSE, NULL);
84         g_once_init_leave(&once, 1);
85     }
86
87     g_print("kvstore: %s port %s\n", host, port);
88     KeyValueClient *client = new KeyValueClient(host, port);
89     return client;
90 }
91
92 static void kvstore_destroy(gpointer store)
93 {
94     KeyValueClient *client = (KeyValueClient *)store;
95     delete client;
96 }
97
98 static void kvstore_submit(gpointer store, BlueSkyStoreAsync *async)
99 {
100     KeyValueClient *client = (KeyValueClient *)store;
101
102     g_return_if_fail(async->status == ASYNC_NEW);
103     g_return_if_fail(async->op != STORE_OP_NONE);
104
105     switch (async->op) {
106     case STORE_OP_GET:
107     case STORE_OP_PUT:
108         async->status = ASYNC_PENDING;
109         bluesky_store_async_ref(async);
110         g_thread_pool_push(thread_pool, async, NULL);
111         break;
112
113     default:
114         g_warning("Uknown operation type for kvstore: %d\n", async->op);
115         bluesky_store_async_mark_complete(async);
116         break;
117     }
118 }
119
120 static void kvstore_cleanup(gpointer store, BlueSkyStoreAsync *async)
121 {
122     KeyValueClient *client = (KeyValueClient *)store;
123 }
124
125 static BlueSkyStoreImplementation store_impl = {
126     kvstore_new,
127     kvstore_destroy,
128     kvstore_submit,
129     kvstore_cleanup,
130 };
131
132 extern "C" void bluesky_store_init_kv(void)
133 {
134     bluesky_store_register(&store_impl, "kv");
135 }