1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
9 /* Interface to the simple BlueSky test storage server. */
16 #include <sys/types.h>
17 #include <sys/socket.h>
21 #include "bluesky-private.h"
24 GThreadPool *thread_pool;
25 struct sockaddr_in server_addr;
28 static gboolean write_data(int fd, const char *buf, size_t len)
31 ssize_t bytes = write(fd, buf, len);
43 static ssize_t read_line(int fd, char *buf, size_t maxlen)
47 /* Leave room for a null byte at the end */
50 while (buflen == 0 || memchr(buf, '\n', buflen) == NULL) {
53 if (buflen == maxlen) {
57 bytes = read(fd, buf + buflen, maxlen - buflen);
67 g_assert(bytes <= maxlen - buflen);
75 static gboolean read_bytes(int fd, char *buf, size_t len)
80 bytes = read(fd, buf, len);
90 g_assert(bytes <= len);
98 /* TODO: Re-use a connection for multiple requests, and support pipelining. */
99 static void simplestore_task(gpointer a, gpointer b)
101 BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a;
102 SimpleStore *server = (SimpleStore *)bluesky_store_async_get_handle(async);
104 async->status = ASYNC_RUNNING;
106 int fd = socket(PF_INET, SOCK_STREAM, 0);
108 g_warning("Error creating simplestore socket: %m");
109 bluesky_store_async_mark_complete(async);
110 bluesky_store_async_unref(async);
114 if (connect(fd, (struct sockaddr *)&server->server_addr,
115 sizeof(server->server_addr)) < 0) {
116 g_warning("Error connecting to simplestore server: %m");
117 bluesky_store_async_mark_complete(async);
118 bluesky_store_async_unref(async);
126 char *cmd = g_strdup_printf("GET %s %zd %zd\n",
127 async->key, async->start, async->len);
128 char result_buf[256];
130 if (!write_data(fd, cmd, strlen(cmd))) {
137 int bytes = read_line(fd, result_buf, sizeof(result_buf));
140 int result = atoi(result_buf);
144 char *data = g_malloc(result);
145 if (strchr(result_buf, '\n') != NULL) {
146 int header_size = strchr(result_buf, '\n') - result_buf + 1;
147 int data_bytes = bytes - header_size;
148 if (data_bytes > result)
150 memcpy(data, result_buf + header_size, data_bytes);
151 if (!read_bytes(fd, data + data_bytes, result - data_bytes))
154 if (!read_bytes(fd, data, result))
158 async->data = bluesky_string_new(data, result);
160 async->range_done = TRUE;
167 char *cmd = g_strdup_printf("PUT %s %zd\n",
168 async->key, async->data->len);
169 char result_buf[256];
171 if (!write_data(fd, cmd, strlen(cmd))) {
177 if (!write_data(fd, async->data->data, async->data->len)) {
181 if (read_line(fd, result_buf, sizeof(result_buf)) < 0)
183 if (atoi(result_buf) != 0)
194 bluesky_store_async_mark_complete(async);
195 bluesky_store_async_unref(async);
200 static char *simplestore_lookup_last(gpointer s, const char *prefix)
205 static gpointer simplestore_new(const gchar *path)
207 SimpleStore *store = g_new0(SimpleStore, 1);
209 /* TODO: Right now we leak this memory. We should probably clean up in
210 * simplestore_destroy, but it's not a big deal. */
211 const gchar *host = "127.0.0.1", *port = "8257";
213 gchar **target = g_strsplit(path, ":", 0);
214 if (target[0] != NULL) {
216 if (target[1] != NULL) {
222 g_print("simplestore: %s port %s\n", host, port);
224 struct addrinfo hints;
225 struct addrinfo *lookup_result = NULL;
226 memset(&hints, 0, sizeof(hints));
227 hints.ai_family = AF_INET;
228 hints.ai_socktype = SOCK_STREAM;
229 int res = getaddrinfo(host, port, &hints, &lookup_result);
231 fprintf(stderr, "simplestore: cannot resolve target name: %s\n",
235 freeaddrinfo(lookup_result);
236 for (struct addrinfo *ai = lookup_result; ai != NULL; ai = ai->ai_next) {
237 printf("flags=%d family=%d socktype=%d proto=%d\n",
242 if (ai->ai_addrlen == sizeof(struct sockaddr_in)) {
243 memcpy(&store->server_addr, ai->ai_addr,
244 sizeof(struct sockaddr_in));
246 fprintf(stderr, "Warning: Bad address record size!\n");
250 store->thread_pool = g_thread_pool_new(simplestore_task, NULL,
251 bluesky_max_threads, FALSE, NULL);
256 static void simplestore_destroy(gpointer store)
260 static void simplestore_submit(gpointer store, BlueSkyStoreAsync *async)
262 SimpleStore *server = (SimpleStore *)store;
264 g_return_if_fail(async->status == ASYNC_NEW);
265 g_return_if_fail(async->op != STORE_OP_NONE);
270 async->status = ASYNC_PENDING;
271 bluesky_store_async_ref(async);
272 g_thread_pool_push(server->thread_pool, async, NULL);
276 g_warning("Uknown operation type for simplestore: %d\n", async->op);
277 bluesky_store_async_mark_complete(async);
282 static void simplestore_cleanup(gpointer store, BlueSkyStoreAsync *async)
286 static BlueSkyStoreImplementation store_impl = {
287 .create = simplestore_new,
288 .destroy = simplestore_destroy,
289 .submit = simplestore_submit,
290 .cleanup = simplestore_cleanup,
291 .lookup_last = simplestore_lookup_last,
294 void bluesky_store_init_simple(void)
296 bluesky_store_register(&store_impl, "simple");