Enable range requests in the simple storage backend
[bluesky.git] / bluesky / store-simple.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Interface to the simple BlueSky test storage server. */
10
11 #include <stdint.h>
12 #include <stdlib.h>
13 #include <errno.h>
14 #include <glib.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18 #include <netdb.h>
19 #include <unistd.h>
20
21 #include "bluesky-private.h"
22
23 typedef struct {
24     GThreadPool *thread_pool;
25     struct sockaddr_in server_addr;
26 } SimpleStore;
27
28 static gboolean write_data(int fd, const char *buf, size_t len)
29 {
30     while (len > 0) {
31         ssize_t bytes = write(fd, buf, len);
32         if (bytes < 0) {
33             if (errno == EINTR)
34                 continue;
35             return FALSE;
36         }
37         buf += bytes;
38         len -= bytes;
39     }
40     return TRUE;
41 }
42
43 static ssize_t read_line(int fd, char *buf, size_t maxlen)
44 {
45     size_t buflen = 0;
46
47     /* Leave room for a null byte at the end */
48     maxlen--;
49
50     while (buflen == 0 || memchr(buf, '\n', buflen) == NULL) {
51         ssize_t bytes;
52
53         if (buflen == maxlen) {
54             return -1;
55         }
56
57         bytes = read(fd, buf + buflen, maxlen - buflen);
58         if (bytes < 0) {
59             if (errno == EINTR)
60                 continue;
61             else
62                 return -1;
63         }
64         if (bytes == 0)
65             break;
66
67         g_assert(bytes <= maxlen - buflen);
68         buflen += bytes;
69     }
70
71     buf[buflen] = '\0';
72     return buflen;
73 }
74
75 static gboolean read_bytes(int fd, char *buf, size_t len)
76 {
77     while (len > 0) {
78         ssize_t bytes;
79
80         bytes = read(fd, buf, len);
81         if (bytes < 0) {
82             if (errno == EINTR)
83                 continue;
84             else
85                 return FALSE;
86         }
87         if (bytes == 0)
88             return FALSE;
89
90         g_assert(bytes <= len);
91         len -= bytes;
92         buf += bytes;
93     }
94
95     return TRUE;
96 }
97
98 /* TODO: Re-use a connection for multiple requests, and support pipelining. */
99 static void simplestore_task(gpointer a, gpointer b)
100 {
101     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
102     SimpleStore *server = (SimpleStore *)bluesky_store_async_get_handle(async);
103
104     async->status = ASYNC_RUNNING;
105
106     int fd = socket(PF_INET, SOCK_STREAM, 0);
107     if (fd < 0) {
108         g_warning("Error creating simplestore socket: %m");
109         bluesky_store_async_mark_complete(async);
110         bluesky_store_async_unref(async);
111         return;
112     }
113
114     if (connect(fd, (struct sockaddr *)&server->server_addr,
115                 sizeof(server->server_addr)) < 0) {
116         g_warning("Error connecting to simplestore server: %m");
117         bluesky_store_async_mark_complete(async);
118         bluesky_store_async_unref(async);
119         return;
120     }
121
122     switch (async->op) {
123     case STORE_OP_GET:
124     {
125         async->result = -1;
126         char *cmd = g_strdup_printf("GET %s %zd %zd\n",
127                                     async->key, async->start, async->len);
128         char result_buf[256];
129
130         if (!write_data(fd, cmd, strlen(cmd))) {
131             g_free(cmd);
132             break;
133         }
134
135         g_free(cmd);
136
137         int bytes = read_line(fd, result_buf, sizeof(result_buf));
138         if (bytes < 0)
139             break;
140         int result = atoi(result_buf);
141         if (result < 0)
142             break;
143
144         char *data = g_malloc(result);
145         if (strchr(result_buf, '\n') != NULL) {
146             int header_size = strchr(result_buf, '\n') - result_buf + 1;
147             int data_bytes = bytes - header_size;
148             if (data_bytes > result)
149                 data_bytes = result;
150             memcpy(data, result_buf + header_size, data_bytes);
151             if (!read_bytes(fd, data + data_bytes, result - data_bytes))
152                 break;
153         } else {
154             if (!read_bytes(fd, data, result))
155                 break;
156         }
157
158         async->data = bluesky_string_new(data, result);
159         async->result = 0;
160         async->range_done = TRUE;
161         break;
162     }
163
164     case STORE_OP_PUT:
165     {
166         async->result = -1;
167         char *cmd = g_strdup_printf("PUT %s %zd\n",
168                                     async->key, async->data->len);
169         char result_buf[256];
170
171         if (!write_data(fd, cmd, strlen(cmd))) {
172             g_free(cmd);
173             break;
174         }
175
176         g_free(cmd);
177         if (!write_data(fd, async->data->data, async->data->len)) {
178             break;
179         }
180
181         if (read_line(fd, result_buf, sizeof(result_buf)) < 0)
182             break;
183         if (atoi(result_buf) != 0)
184             break;
185
186         async->result = 0;
187         break;
188     }
189
190     default:
191         break;
192     }
193
194     bluesky_store_async_mark_complete(async);
195     bluesky_store_async_unref(async);
196
197     close(fd);
198 }
199
200 static char *simplestore_lookup_last(gpointer s, const char *prefix)
201 {
202     return NULL;
203 }
204
205 static gpointer simplestore_new(const gchar *path)
206 {
207     SimpleStore *store = g_new0(SimpleStore, 1);
208
209     /* TODO: Right now we leak this memory.  We should probably clean up in
210      * simplestore_destroy, but it's not a big deal. */
211     const gchar *host = "127.0.0.1", *port = "8257";
212     if (path != NULL) {
213         gchar **target = g_strsplit(path, ":", 0);
214         if (target[0] != NULL) {
215             host = target[0];
216             if (target[1] != NULL) {
217                 port = target[1];
218             }
219         }
220     }
221
222     g_print("simplestore: %s port %s\n", host, port);
223
224     struct addrinfo hints;
225     struct addrinfo *lookup_result = NULL;
226     memset(&hints, 0, sizeof(hints));
227     hints.ai_family = AF_INET;
228     hints.ai_socktype = SOCK_STREAM;
229     int res = getaddrinfo(host, port, &hints, &lookup_result);
230     if (res != 0) {
231         fprintf(stderr, "simplestore: cannot resolve target name: %s\n",
232                 gai_strerror(res));
233         return NULL;
234     }
235     freeaddrinfo(lookup_result);
236     for (struct addrinfo *ai = lookup_result; ai != NULL; ai = ai->ai_next) {
237         printf("flags=%d family=%d socktype=%d proto=%d\n",
238                ai->ai_flags,
239                ai->ai_family,
240                ai->ai_socktype,
241                ai->ai_protocol);
242         if (ai->ai_addrlen == sizeof(struct sockaddr_in)) {
243             memcpy(&store->server_addr, ai->ai_addr,
244                    sizeof(struct sockaddr_in));
245         } else {
246             fprintf(stderr, "Warning: Bad address record size!\n");
247         }
248     }
249
250     store->thread_pool = g_thread_pool_new(simplestore_task, NULL,
251                                            bluesky_max_threads, FALSE, NULL);
252
253     return store;
254 }
255
256 static void simplestore_destroy(gpointer store)
257 {
258 }
259
260 static void simplestore_submit(gpointer store, BlueSkyStoreAsync *async)
261 {
262     SimpleStore *server = (SimpleStore *)store;
263
264     g_return_if_fail(async->status == ASYNC_NEW);
265     g_return_if_fail(async->op != STORE_OP_NONE);
266
267     switch (async->op) {
268     case STORE_OP_GET:
269     case STORE_OP_PUT:
270         async->status = ASYNC_PENDING;
271         bluesky_store_async_ref(async);
272         g_thread_pool_push(server->thread_pool, async, NULL);
273         break;
274
275     default:
276         g_warning("Uknown operation type for simplestore: %d\n", async->op);
277         bluesky_store_async_mark_complete(async);
278         break;
279     }
280 }
281
282 static void simplestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
283 {
284 }
285
286 static BlueSkyStoreImplementation store_impl = {
287     .create = simplestore_new,
288     .destroy = simplestore_destroy,
289     .submit = simplestore_submit,
290     .cleanup = simplestore_cleanup,
291     .lookup_last = simplestore_lookup_last,
292 };
293
294 void bluesky_store_init_simple(void)
295 {
296     bluesky_store_register(&store_impl, "simple");
297 }