1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the University nor the names of its contributors
15 * may be used to endorse or promote products derived from this software
16 * without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31 /* Interface to the simple BlueSky test storage server. */
38 #include <sys/types.h>
39 #include <sys/socket.h>
43 #include "bluesky-private.h"
45 #define MAX_IDLE_CONNECTIONS 8
48 GThreadPool *thread_pool;
49 struct sockaddr_in server_addr;
51 /* A pool of open file connections to the server which are not currently in
57 static int get_connection(SimpleStore *store)
61 g_mutex_lock(store->fd_pool_lock);
62 if (!g_queue_is_empty(store->fd_pool)) {
63 fd = GPOINTER_TO_INT(g_queue_pop_head(store->fd_pool));
65 g_mutex_unlock(store->fd_pool_lock);
69 fd = socket(PF_INET, SOCK_STREAM, 0);
71 g_warning("Error creating simplestore socket: %m");
75 if (connect(fd, (struct sockaddr *)&store->server_addr,
76 sizeof(store->server_addr)) < 0) {
77 g_warning("Error connecting to simplestore server: %m");
84 static void put_connection(SimpleStore *store, int fd)
86 g_mutex_lock(store->fd_pool_lock);
87 g_queue_push_head(store->fd_pool, GINT_TO_POINTER(fd));
88 while (g_queue_get_length(store->fd_pool) > MAX_IDLE_CONNECTIONS) {
89 fd = GPOINTER_TO_INT(g_queue_pop_tail(store->fd_pool));
92 g_mutex_unlock(store->fd_pool_lock);
95 static gboolean write_data(int fd, const char *buf, size_t len)
98 ssize_t bytes = write(fd, buf, len);
110 static ssize_t read_line(int fd, char *buf, size_t maxlen)
114 /* Leave room for a null byte at the end */
117 while (buflen == 0 || memchr(buf, '\n', buflen) == NULL) {
120 if (buflen == maxlen) {
124 bytes = read(fd, buf + buflen, maxlen - buflen);
134 g_assert(bytes <= maxlen - buflen);
142 static gboolean read_bytes(int fd, char *buf, size_t len)
147 bytes = read(fd, buf, len);
157 g_assert(bytes <= len);
165 /* TODO: Re-use a connection for multiple requests, and support pipelining. */
166 static void simplestore_task(gpointer a, gpointer b)
168 BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
169 SimpleStore *server = (SimpleStore *)bluesky_store_async_get_handle(async);
171 async->status = ASYNC_RUNNING;
173 int fd = get_connection(server);
175 bluesky_store_async_mark_complete(async);
176 bluesky_store_async_unref(async);
184 char *cmd = g_strdup_printf("GET %s %zd %zd\n",
185 async->key, async->start, async->len);
186 char result_buf[256];
188 if (!write_data(fd, cmd, strlen(cmd))) {
195 int bytes = read_line(fd, result_buf, sizeof(result_buf));
198 int result = atoi(result_buf);
202 char *data = g_malloc(result);
203 if (strchr(result_buf, '\n') != NULL) {
204 int header_size = strchr(result_buf, '\n') - result_buf + 1;
205 int data_bytes = bytes - header_size;
206 if (data_bytes > result)
208 memcpy(data, result_buf + header_size, data_bytes);
209 if (!read_bytes(fd, data + data_bytes, result - data_bytes))
212 if (!read_bytes(fd, data, result))
216 async->data = bluesky_string_new(data, result);
218 async->range_done = TRUE;
225 char *cmd = g_strdup_printf("PUT %s %zd\n",
226 async->key, async->data->len);
227 char result_buf[256];
229 if (!write_data(fd, cmd, strlen(cmd))) {
235 if (!write_data(fd, async->data->data, async->data->len)) {
239 if (read_line(fd, result_buf, sizeof(result_buf)) < 0)
241 if (atoi(result_buf) != 0)
252 int success = (async->result == 0);
253 bluesky_store_async_mark_complete(async);
254 bluesky_store_async_unref(async);
257 put_connection(server, fd);
263 static char *simplestore_lookup_last(gpointer s, const char *prefix)
268 static gpointer simplestore_new(const gchar *path)
270 SimpleStore *store = g_new0(SimpleStore, 1);
272 /* TODO: Right now we leak this memory. We should probably clean up in
273 * simplestore_destroy, but it's not a big deal. */
274 const gchar *host = "127.0.0.1", *port = "9541";
276 gchar **target = g_strsplit(path, ":", 0);
277 if (target[0] != NULL) {
279 if (target[1] != NULL) {
285 g_print("simplestore: %s port %s\n", host, port);
287 struct addrinfo hints;
288 struct addrinfo *lookup_result = NULL;
289 memset(&hints, 0, sizeof(hints));
290 hints.ai_family = AF_INET;
291 hints.ai_socktype = SOCK_STREAM;
292 int res = getaddrinfo(host, port, &hints, &lookup_result);
294 fprintf(stderr, "simplestore: cannot resolve target name: %s\n",
298 for (struct addrinfo *ai = lookup_result; ai != NULL; ai = ai->ai_next) {
299 printf("flags=%d family=%d socktype=%d proto=%d\n",
304 if (ai->ai_addrlen == sizeof(struct sockaddr_in)) {
305 memcpy(&store->server_addr, ai->ai_addr,
306 sizeof(struct sockaddr_in));
308 fprintf(stderr, "Warning: Bad address record size!\n");
311 freeaddrinfo(lookup_result);
313 store->fd_pool = g_queue_new();
314 store->fd_pool_lock = g_mutex_new();
316 store->thread_pool = g_thread_pool_new(simplestore_task, NULL,
317 bluesky_max_threads, FALSE, NULL);
322 static void simplestore_destroy(gpointer store)
326 static void simplestore_submit(gpointer store, BlueSkyStoreAsync *async)
328 SimpleStore *server = (SimpleStore *)store;
330 g_return_if_fail(async->status == ASYNC_NEW);
331 g_return_if_fail(async->op != STORE_OP_NONE);
336 async->status = ASYNC_PENDING;
337 bluesky_store_async_ref(async);
338 g_thread_pool_push(server->thread_pool, async, NULL);
342 g_warning("Uknown operation type for simplestore: %d\n", async->op);
343 bluesky_store_async_mark_complete(async);
348 static void simplestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
352 static BlueSkyStoreImplementation store_impl = {
353 .create = simplestore_new,
354 .destroy = simplestore_destroy,
355 .submit = simplestore_submit,
356 .cleanup = simplestore_cleanup,
357 .lookup_last = simplestore_lookup_last,
360 void bluesky_store_init_simple(void)
362 bluesky_store_register(&store_impl, "simple");