Check in a new very simple client/server storage protocol implementation.
[bluesky.git] / bluesky / store-simple.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Interface to the simple BlueSky test storage server. */
10
11 #include <stdint.h>
12 #include <stdlib.h>
13 #include <errno.h>
14 #include <glib.h>
15 #include <string.h>
16 #include <sys/types.h>
17 #include <sys/socket.h>
18 #include <netdb.h>
19 #include <unistd.h>
20
21 #include "bluesky-private.h"
22
23 typedef struct {
24     GThreadPool *thread_pool;
25     struct sockaddr_in server_addr;
26 } SimpleStore;
27
28 static gboolean write_data(int fd, const char *buf, size_t len)
29 {
30     while (len > 0) {
31         ssize_t bytes = write(fd, buf, len);
32         if (bytes < 0) {
33             if (errno == EINTR)
34                 continue;
35             return FALSE;
36         }
37         buf += bytes;
38         len -= bytes;
39     }
40     return TRUE;
41 }
42
43 static ssize_t read_line(int fd, char *buf, size_t maxlen)
44 {
45     size_t buflen = 0;
46
47     /* Leave room for a null byte at the end */
48     maxlen--;
49
50     while (buflen == 0 || memchr(buf, '\n', buflen) == NULL) {
51         ssize_t bytes;
52
53         if (buflen == maxlen) {
54             return -1;
55         }
56
57         bytes = read(fd, buf + buflen, maxlen - buflen);
58         if (bytes < 0) {
59             if (errno == EINTR)
60                 continue;
61             else
62                 return -1;
63         }
64         if (bytes == 0)
65             break;
66
67         g_assert(bytes <= maxlen - buflen);
68         buflen += bytes;
69     }
70
71     buf[buflen] = '\0';
72     return buflen;
73 }
74
75 static gboolean read_bytes(int fd, char *buf, size_t len)
76 {
77     while (len > 0) {
78         ssize_t bytes;
79
80         bytes = read(fd, buf, len);
81         if (bytes < 0) {
82             if (errno == EINTR)
83                 continue;
84             else
85                 return FALSE;
86         }
87         if (bytes == 0)
88             return FALSE;
89
90         g_assert(bytes <= len);
91         len -= bytes;
92         buf += bytes;
93     }
94
95     return TRUE;
96 }
97
98 /* TODO: Re-use a connection for multiple requests, and support pipelining. */
99 static void simplestore_task(gpointer a, gpointer b)
100 {
101     BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
102     SimpleStore *server = (SimpleStore *)bluesky_store_async_get_handle(async);
103
104     async->status = ASYNC_RUNNING;
105
106     int fd = socket(PF_INET, SOCK_STREAM, 0);
107     if (fd < 0) {
108         g_warning("Error creating simplestore socket: %m");
109         bluesky_store_async_mark_complete(async);
110         bluesky_store_async_unref(async);
111         return;
112     }
113
114     if (connect(fd, (struct sockaddr *)&server->server_addr,
115                 sizeof(server->server_addr)) < 0) {
116         g_warning("Error connecting to simplestore server: %m");
117         bluesky_store_async_mark_complete(async);
118         bluesky_store_async_unref(async);
119         return;
120     }
121
122     switch (async->op) {
123     case STORE_OP_GET:
124     {
125         async->result = -1;
126         char *cmd = g_strdup_printf("GET %s %zd %zd\n",
127                                     async->key, async->start, async->len);
128         char result_buf[256];
129
130         if (!write_data(fd, cmd, strlen(cmd))) {
131             g_free(cmd);
132             break;
133         }
134
135         g_free(cmd);
136
137         int bytes = read_line(fd, result_buf, sizeof(result_buf));
138         if (bytes < 0)
139             break;
140         int result = atoi(result_buf);
141         if (result < 0)
142             break;
143
144         char *data = g_malloc(result);
145         if (strchr(result_buf, '\n') != NULL) {
146             int header_size = strchr(result_buf, '\n') - result_buf;
147             int data_bytes = bytes - header_size;
148             if (data_bytes > result)
149                 data_bytes = result;
150             memcpy(data, result_buf + header_size, data_bytes);
151             if (!read_bytes(fd, data + data_bytes, result - data_bytes))
152                 break;
153         } else {
154             if (!read_bytes(fd, data, result))
155                 break;
156         }
157
158         async->data = bluesky_string_new(data, result);
159         async->result = 0;
160         break;
161     }
162
163     case STORE_OP_PUT:
164     {
165         async->result = -1;
166         char *cmd = g_strdup_printf("PUT %s %zd\n",
167                                     async->key, async->data->len);
168         char result_buf[256];
169
170         if (!write_data(fd, cmd, strlen(cmd))) {
171             g_free(cmd);
172             break;
173         }
174
175         g_free(cmd);
176         if (!write_data(fd, async->data->data, async->data->len)) {
177             break;
178         }
179
180         if (read_line(fd, result_buf, sizeof(result_buf)) < 0)
181             break;
182         if (atoi(result_buf) != 0)
183             break;
184
185         async->result = 0;
186         break;
187     }
188
189     default:
190         break;
191     }
192
193     bluesky_store_async_mark_complete(async);
194     bluesky_store_async_unref(async);
195
196     close(fd);
197 }
198
199 static char *simplestore_lookup_last(gpointer s, const char *prefix)
200 {
201     return NULL;
202 }
203
204 static gpointer simplestore_new(const gchar *path)
205 {
206     SimpleStore *store = g_new0(SimpleStore, 1);
207
208     /* TODO: Right now we leak this memory.  We should probably clean up in
209      * simplestore_destroy, but it's not a big deal. */
210     const gchar *host = "127.0.0.1", *port = "8257";
211     if (path != NULL) {
212         gchar **target = g_strsplit(path, ":", 0);
213         if (target[0] != NULL) {
214             host = target[0];
215             if (target[1] != NULL) {
216                 port = target[1];
217             }
218         }
219     }
220
221     g_print("simplestore: %s port %s\n", host, port);
222
223     struct addrinfo hints;
224     struct addrinfo *lookup_result = NULL;
225     memset(&hints, 0, sizeof(hints));
226     hints.ai_family = AF_INET;
227     hints.ai_socktype = SOCK_STREAM;
228     int res = getaddrinfo(host, port, &hints, &lookup_result);
229     if (res != 0) {
230         fprintf(stderr, "simplestore: cannot resolve target name: %s\n",
231                 gai_strerror(res));
232         return NULL;
233     }
234     freeaddrinfo(lookup_result);
235     for (struct addrinfo *ai = lookup_result; ai != NULL; ai = ai->ai_next) {
236         printf("flags=%d family=%d socktype=%d proto=%d\n",
237                ai->ai_flags,
238                ai->ai_family,
239                ai->ai_socktype,
240                ai->ai_protocol);
241         if (ai->ai_addrlen == sizeof(struct sockaddr_in)) {
242             memcpy(&store->server_addr, ai->ai_addr,
243                    sizeof(struct sockaddr_in));
244         } else {
245             fprintf(stderr, "Warning: Bad address record size!\n");
246         }
247     }
248
249     store->thread_pool = g_thread_pool_new(simplestore_task, NULL,
250                                            bluesky_max_threads, FALSE, NULL);
251
252     return store;
253 }
254
255 static void simplestore_destroy(gpointer store)
256 {
257 }
258
259 static void simplestore_submit(gpointer store, BlueSkyStoreAsync *async)
260 {
261     SimpleStore *server = (SimpleStore *)store;
262
263     g_return_if_fail(async->status == ASYNC_NEW);
264     g_return_if_fail(async->op != STORE_OP_NONE);
265
266     switch (async->op) {
267     case STORE_OP_GET:
268     case STORE_OP_PUT:
269         async->status = ASYNC_PENDING;
270         bluesky_store_async_ref(async);
271         g_thread_pool_push(server->thread_pool, async, NULL);
272         break;
273
274     default:
275         g_warning("Uknown operation type for simplestore: %d\n", async->op);
276         bluesky_store_async_mark_complete(async);
277         break;
278     }
279 }
280
281 static void simplestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
282 {
283 }
284
285 static BlueSkyStoreImplementation store_impl = {
286     .create = simplestore_new,
287     .destroy = simplestore_destroy,
288     .submit = simplestore_submit,
289     .cleanup = simplestore_cleanup,
290     .lookup_last = simplestore_lookup_last,
291 };
292
293 void bluesky_store_init_simple(void)
294 {
295     bluesky_store_register(&store_impl, "simple");
296 }