ad93a05ec11b67a8634d9782143824173da9ccca
[bluesky.git] / bluesky / store-simple.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Interface to the simple BlueSky test storage server. */
10
11 #include <stdint.h>
12 #include <stdlib.h>
13 #include <errno.h>
14 #include <glib.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18 #include <netdb.h>
19 #include <unistd.h>
20
21 #include "bluesky-private.h"
22
23 #define MAX_IDLE_CONNECTIONS 8
24
25 typedef struct {
26     GThreadPool *thread_pool;
27     struct sockaddr_in server_addr;
28
29     /* A pool of open file connections to the server which are not currently in
30      * use. */
31     GQueue *fd_pool;
32     GMutex *fd_pool_lock;
33 } SimpleStore;
34
35 static int get_connection(SimpleStore *store)
36 {
37     int fd = -1;
38
39     g_mutex_lock(store->fd_pool_lock);
40     if (!g_queue_is_empty(store->fd_pool)) {
41         fd = GPOINTER_TO_INT(g_queue_pop_head(store->fd_pool));
42     }
43     g_mutex_unlock(store->fd_pool_lock);
44     if (fd != -1)
45         return fd;
46
47     fd = socket(PF_INET, SOCK_STREAM, 0);
48     if (fd < 0) {
49         g_warning("Error creating simplestore socket: %m");
50         return -1;
51     }
52
53     if (connect(fd, (struct sockaddr *)&store->server_addr,
54                 sizeof(store->server_addr)) < 0) {
55         g_warning("Error connecting to simplestore server: %m");
56         return -1;
57     }
58
59     return fd;
60 }
61
62 static void put_connection(SimpleStore *store, int fd)
63 {
64     g_mutex_lock(store->fd_pool_lock);
65     g_queue_push_head(store->fd_pool, GINT_TO_POINTER(fd));
66     while (g_queue_get_length(store->fd_pool) > MAX_IDLE_CONNECTIONS) {
67         fd = GPOINTER_TO_INT(g_queue_pop_tail(store->fd_pool));
68         close(fd);
69     }
70     g_mutex_unlock(store->fd_pool_lock);
71 }
72
73 static gboolean write_data(int fd, const char *buf, size_t len)
74 {
75     while (len > 0) {
76         ssize_t bytes = write(fd, buf, len);
77         if (bytes < 0) {
78             if (errno == EINTR)
79                 continue;
80             return FALSE;
81         }
82         buf += bytes;
83         len -= bytes;
84     }
85     return TRUE;
86 }
87
88 static ssize_t read_line(int fd, char *buf, size_t maxlen)
89 {
90     size_t buflen = 0;
91
92     /* Leave room for a null byte at the end */
93     maxlen--;
94
95     while (buflen == 0 || memchr(buf, '\n', buflen) == NULL) {
96         ssize_t bytes;
97
98         if (buflen == maxlen) {
99             return -1;
100         }
101
102         bytes = read(fd, buf + buflen, maxlen - buflen);
103         if (bytes < 0) {
104             if (errno == EINTR)
105                 continue;
106             else
107                 return -1;
108         }
109         if (bytes == 0)
110             break;
111
112         g_assert(bytes <= maxlen - buflen);
113         buflen += bytes;
114     }
115
116     buf[buflen] = '\0';
117     return buflen;
118 }
119
120 static gboolean read_bytes(int fd, char *buf, size_t len)
121 {
122     while (len > 0) {
123         ssize_t bytes;
124
125         bytes = read(fd, buf, len);
126         if (bytes < 0) {
127             if (errno == EINTR)
128                 continue;
129             else
130                 return FALSE;
131         }
132         if (bytes == 0)
133             return FALSE;
134
135         g_assert(bytes <= len);
136         len -= bytes;
137         buf += bytes;
138     }
139
140     return TRUE;
141 }
142
143 /* TODO: Re-use a connection for multiple requests, and support pipelining. */
144 static void simplestore_task(gpointer a, gpointer b)
145 {
146     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
147     SimpleStore *server = (SimpleStore *)bluesky_store_async_get_handle(async);
148
149     async->status = ASYNC_RUNNING;
150
151     int fd = get_connection(server);
152     if (fd < 0) {
153         bluesky_store_async_mark_complete(async);
154         bluesky_store_async_unref(async);
155         return;
156     }
157
158     switch (async->op) {
159     case STORE_OP_GET:
160     {
161         async->result = -1;
162         char *cmd = g_strdup_printf("GET %s %zd %zd\n",
163                                     async->key, async->start, async->len);
164         char result_buf[256];
165
166         if (!write_data(fd, cmd, strlen(cmd))) {
167             g_free(cmd);
168             break;
169         }
170
171         g_free(cmd);
172
173         int bytes = read_line(fd, result_buf, sizeof(result_buf));
174         if (bytes < 0)
175             break;
176         int result = atoi(result_buf);
177         if (result < 0)
178             break;
179
180         char *data = g_malloc(result);
181         if (strchr(result_buf, '\n') != NULL) {
182             int header_size = strchr(result_buf, '\n') - result_buf + 1;
183             int data_bytes = bytes - header_size;
184             if (data_bytes > result)
185                 data_bytes = result;
186             memcpy(data, result_buf + header_size, data_bytes);
187             if (!read_bytes(fd, data + data_bytes, result - data_bytes))
188                 break;
189         } else {
190             if (!read_bytes(fd, data, result))
191                 break;
192         }
193
194         async->data = bluesky_string_new(data, result);
195         async->result = 0;
196         async->range_done = TRUE;
197         break;
198     }
199
200     case STORE_OP_PUT:
201     {
202         async->result = -1;
203         char *cmd = g_strdup_printf("PUT %s %zd\n",
204                                     async->key, async->data->len);
205         char result_buf[256];
206
207         if (!write_data(fd, cmd, strlen(cmd))) {
208             g_free(cmd);
209             break;
210         }
211
212         g_free(cmd);
213         if (!write_data(fd, async->data->data, async->data->len)) {
214             break;
215         }
216
217         if (read_line(fd, result_buf, sizeof(result_buf)) < 0)
218             break;
219         if (atoi(result_buf) != 0)
220             break;
221
222         async->result = 0;
223         break;
224     }
225
226     default:
227         break;
228     }
229
230     int success = (async->result == 0);
231     bluesky_store_async_mark_complete(async);
232     bluesky_store_async_unref(async);
233
234     if (success) {
235         put_connection(server, fd);
236     } else {
237         close(fd);
238     }
239 }
240
241 static char *simplestore_lookup_last(gpointer s, const char *prefix)
242 {
243     return NULL;
244 }
245
246 static gpointer simplestore_new(const gchar *path)
247 {
248     SimpleStore *store = g_new0(SimpleStore, 1);
249
250     /* TODO: Right now we leak this memory.  We should probably clean up in
251      * simplestore_destroy, but it's not a big deal. */
252     const gchar *host = "127.0.0.1", *port = "9541";
253     if (path != NULL) {
254         gchar **target = g_strsplit(path, ":", 0);
255         if (target[0] != NULL) {
256             host = target[0];
257             if (target[1] != NULL) {
258                 port = target[1];
259             }
260         }
261     }
262
263     g_print("simplestore: %s port %s\n", host, port);
264
265     struct addrinfo hints;
266     struct addrinfo *lookup_result = NULL;
267     memset(&hints, 0, sizeof(hints));
268     hints.ai_family = AF_INET;
269     hints.ai_socktype = SOCK_STREAM;
270     int res = getaddrinfo(host, port, &hints, &lookup_result);
271     if (res != 0) {
272         fprintf(stderr, "simplestore: cannot resolve target name: %s\n",
273                 gai_strerror(res));
274         return NULL;
275     }
276     for (struct addrinfo *ai = lookup_result; ai != NULL; ai = ai->ai_next) {
277         printf("flags=%d family=%d socktype=%d proto=%d\n",
278                ai->ai_flags,
279                ai->ai_family,
280                ai->ai_socktype,
281                ai->ai_protocol);
282         if (ai->ai_addrlen == sizeof(struct sockaddr_in)) {
283             memcpy(&store->server_addr, ai->ai_addr,
284                    sizeof(struct sockaddr_in));
285         } else {
286             fprintf(stderr, "Warning: Bad address record size!\n");
287         }
288     }
289     freeaddrinfo(lookup_result);
290
291     store->fd_pool = g_queue_new();
292     store->fd_pool_lock = g_mutex_new();
293
294     store->thread_pool = g_thread_pool_new(simplestore_task, NULL,
295                                            bluesky_max_threads, FALSE, NULL);
296
297     return store;
298 }
299
300 static void simplestore_destroy(gpointer store)
301 {
302 }
303
304 static void simplestore_submit(gpointer store, BlueSkyStoreAsync *async)
305 {
306     SimpleStore *server = (SimpleStore *)store;
307
308     g_return_if_fail(async->status == ASYNC_NEW);
309     g_return_if_fail(async->op != STORE_OP_NONE);
310
311     switch (async->op) {
312     case STORE_OP_GET:
313     case STORE_OP_PUT:
314         async->status = ASYNC_PENDING;
315         bluesky_store_async_ref(async);
316         g_thread_pool_push(server->thread_pool, async, NULL);
317         break;
318
319     default:
320         g_warning("Uknown operation type for simplestore: %d\n", async->op);
321         bluesky_store_async_mark_complete(async);
322         break;
323     }
324 }
325
326 static void simplestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
327 {
328 }
329
330 static BlueSkyStoreImplementation store_impl = {
331     .create = simplestore_new,
332     .destroy = simplestore_destroy,
333     .submit = simplestore_submit,
334     .cleanup = simplestore_cleanup,
335     .lookup_last = simplestore_lookup_last,
336 };
337
338 void bluesky_store_init_simple(void)
339 {
340     bluesky_store_register(&store_impl, "simple");
341 }