Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / bluesky / store-simple.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 /* Interface to the simple BlueSky test storage server. */
32
33 #include <stdint.h>
34 #include <stdlib.h>
35 #include <errno.h>
36 #include <glib.h>
37 #include <string.h>
38 #include <sys/types.h>
39 #include <sys/socket.h>
40 #include <netdb.h>
41 #include <unistd.h>
42
43 #include "bluesky-private.h"
44
45 #define MAX_IDLE_CONNECTIONS 8
46
47 typedef struct {
48     GThreadPool *thread_pool;
49     struct sockaddr_in server_addr;
50
51     /* A pool of open file connections to the server which are not currently in
52      * use. */
53     GQueue *fd_pool;
54     GMutex *fd_pool_lock;
55 } SimpleStore;
56
57 static int get_connection(SimpleStore *store)
58 {
59     int fd = -1;
60
61     g_mutex_lock(store->fd_pool_lock);
62     if (!g_queue_is_empty(store->fd_pool)) {
63         fd = GPOINTER_TO_INT(g_queue_pop_head(store->fd_pool));
64     }
65     g_mutex_unlock(store->fd_pool_lock);
66     if (fd != -1)
67         return fd;
68
69     fd = socket(PF_INET, SOCK_STREAM, 0);
70     if (fd < 0) {
71         g_warning("Error creating simplestore socket: %m");
72         return -1;
73     }
74
75     if (connect(fd, (struct sockaddr *)&store->server_addr,
76                 sizeof(store->server_addr)) < 0) {
77         g_warning("Error connecting to simplestore server: %m");
78         return -1;
79     }
80
81     return fd;
82 }
83
84 static void put_connection(SimpleStore *store, int fd)
85 {
86     g_mutex_lock(store->fd_pool_lock);
87     g_queue_push_head(store->fd_pool, GINT_TO_POINTER(fd));
88     while (g_queue_get_length(store->fd_pool) > MAX_IDLE_CONNECTIONS) {
89         fd = GPOINTER_TO_INT(g_queue_pop_tail(store->fd_pool));
90         close(fd);
91     }
92     g_mutex_unlock(store->fd_pool_lock);
93 }
94
95 static gboolean write_data(int fd, const char *buf, size_t len)
96 {
97     while (len > 0) {
98         ssize_t bytes = write(fd, buf, len);
99         if (bytes < 0) {
100             if (errno == EINTR)
101                 continue;
102             return FALSE;
103         }
104         buf += bytes;
105         len -= bytes;
106     }
107     return TRUE;
108 }
109
110 static ssize_t read_line(int fd, char *buf, size_t maxlen)
111 {
112     size_t buflen = 0;
113
114     /* Leave room for a null byte at the end */
115     maxlen--;
116
117     while (buflen == 0 || memchr(buf, '\n', buflen) == NULL) {
118         ssize_t bytes;
119
120         if (buflen == maxlen) {
121             return -1;
122         }
123
124         bytes = read(fd, buf + buflen, maxlen - buflen);
125         if (bytes < 0) {
126             if (errno == EINTR)
127                 continue;
128             else
129                 return -1;
130         }
131         if (bytes == 0)
132             break;
133
134         g_assert(bytes <= maxlen - buflen);
135         buflen += bytes;
136     }
137
138     buf[buflen] = '\0';
139     return buflen;
140 }
141
142 static gboolean read_bytes(int fd, char *buf, size_t len)
143 {
144     while (len > 0) {
145         ssize_t bytes;
146
147         bytes = read(fd, buf, len);
148         if (bytes < 0) {
149             if (errno == EINTR)
150                 continue;
151             else
152                 return FALSE;
153         }
154         if (bytes == 0)
155             return FALSE;
156
157         g_assert(bytes <= len);
158         len -= bytes;
159         buf += bytes;
160     }
161
162     return TRUE;
163 }
164
165 /* TODO: Re-use a connection for multiple requests, and support pipelining. */
166 static void simplestore_task(gpointer a, gpointer b)
167 {
168     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
169     SimpleStore *server = (SimpleStore *)bluesky_store_async_get_handle(async);
170
171     async->status = ASYNC_RUNNING;
172
173     int fd = get_connection(server);
174     if (fd < 0) {
175         bluesky_store_async_mark_complete(async);
176         bluesky_store_async_unref(async);
177         return;
178     }
179
180     switch (async->op) {
181     case STORE_OP_GET:
182     {
183         async->result = -1;
184         char *cmd = g_strdup_printf("GET %s %zd %zd\n",
185                                     async->key, async->start, async->len);
186         char result_buf[256];
187
188         if (!write_data(fd, cmd, strlen(cmd))) {
189             g_free(cmd);
190             break;
191         }
192
193         g_free(cmd);
194
195         int bytes = read_line(fd, result_buf, sizeof(result_buf));
196         if (bytes < 0)
197             break;
198         int result = atoi(result_buf);
199         if (result < 0)
200             break;
201
202         char *data = g_malloc(result);
203         if (strchr(result_buf, '\n') != NULL) {
204             int header_size = strchr(result_buf, '\n') - result_buf + 1;
205             int data_bytes = bytes - header_size;
206             if (data_bytes > result)
207                 data_bytes = result;
208             memcpy(data, result_buf + header_size, data_bytes);
209             if (!read_bytes(fd, data + data_bytes, result - data_bytes))
210                 break;
211         } else {
212             if (!read_bytes(fd, data, result))
213                 break;
214         }
215
216         async->data = bluesky_string_new(data, result);
217         async->result = 0;
218         async->range_done = TRUE;
219         break;
220     }
221
222     case STORE_OP_PUT:
223     {
224         async->result = -1;
225         char *cmd = g_strdup_printf("PUT %s %zd\n",
226                                     async->key, async->data->len);
227         char result_buf[256];
228
229         if (!write_data(fd, cmd, strlen(cmd))) {
230             g_free(cmd);
231             break;
232         }
233
234         g_free(cmd);
235         if (!write_data(fd, async->data->data, async->data->len)) {
236             break;
237         }
238
239         if (read_line(fd, result_buf, sizeof(result_buf)) < 0)
240             break;
241         if (atoi(result_buf) != 0)
242             break;
243
244         async->result = 0;
245         break;
246     }
247
248     default:
249         break;
250     }
251
252     int success = (async->result == 0);
253     bluesky_store_async_mark_complete(async);
254     bluesky_store_async_unref(async);
255
256     if (success) {
257         put_connection(server, fd);
258     } else {
259         close(fd);
260     }
261 }
262
263 static char *simplestore_lookup_last(gpointer s, const char *prefix)
264 {
265     return NULL;
266 }
267
268 static gpointer simplestore_new(const gchar *path)
269 {
270     SimpleStore *store = g_new0(SimpleStore, 1);
271
272     /* TODO: Right now we leak this memory.  We should probably clean up in
273      * simplestore_destroy, but it's not a big deal. */
274     const gchar *host = "127.0.0.1", *port = "9541";
275     if (path != NULL) {
276         gchar **target = g_strsplit(path, ":", 0);
277         if (target[0] != NULL) {
278             host = target[0];
279             if (target[1] != NULL) {
280                 port = target[1];
281             }
282         }
283     }
284
285     g_print("simplestore: %s port %s\n", host, port);
286
287     struct addrinfo hints;
288     struct addrinfo *lookup_result = NULL;
289     memset(&hints, 0, sizeof(hints));
290     hints.ai_family = AF_INET;
291     hints.ai_socktype = SOCK_STREAM;
292     int res = getaddrinfo(host, port, &hints, &lookup_result);
293     if (res != 0) {
294         fprintf(stderr, "simplestore: cannot resolve target name: %s\n",
295                 gai_strerror(res));
296         return NULL;
297     }
298     for (struct addrinfo *ai = lookup_result; ai != NULL; ai = ai->ai_next) {
299         printf("flags=%d family=%d socktype=%d proto=%d\n",
300                ai->ai_flags,
301                ai->ai_family,
302                ai->ai_socktype,
303                ai->ai_protocol);
304         if (ai->ai_addrlen == sizeof(struct sockaddr_in)) {
305             memcpy(&store->server_addr, ai->ai_addr,
306                    sizeof(struct sockaddr_in));
307         } else {
308             fprintf(stderr, "Warning: Bad address record size!\n");
309         }
310     }
311     freeaddrinfo(lookup_result);
312
313     store->fd_pool = g_queue_new();
314     store->fd_pool_lock = g_mutex_new();
315
316     store->thread_pool = g_thread_pool_new(simplestore_task, NULL,
317                                            bluesky_max_threads, FALSE, NULL);
318
319     return store;
320 }
321
322 static void simplestore_destroy(gpointer store)
323 {
324 }
325
326 static void simplestore_submit(gpointer store, BlueSkyStoreAsync *async)
327 {
328     SimpleStore *server = (SimpleStore *)store;
329
330     g_return_if_fail(async->status == ASYNC_NEW);
331     g_return_if_fail(async->op != STORE_OP_NONE);
332
333     switch (async->op) {
334     case STORE_OP_GET:
335     case STORE_OP_PUT:
336         async->status = ASYNC_PENDING;
337         bluesky_store_async_ref(async);
338         g_thread_pool_push(server->thread_pool, async, NULL);
339         break;
340
341     default:
342         g_warning("Uknown operation type for simplestore: %d\n", async->op);
343         bluesky_store_async_mark_complete(async);
344         break;
345     }
346 }
347
348 static void simplestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
349 {
350 }
351
352 static BlueSkyStoreImplementation store_impl = {
353     .create = simplestore_new,
354     .destroy = simplestore_destroy,
355     .submit = simplestore_submit,
356     .cleanup = simplestore_cleanup,
357     .lookup_last = simplestore_lookup_last,
358 };
359
360 void bluesky_store_init_simple(void)
361 {
362     bluesky_store_register(&store_impl, "simple");
363 }