From: Michael Vrable Date: Tue, 18 Jan 2011 05:35:38 +0000 (-0800) Subject: Check in a new very simple client/server storage protocol implementation. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=c5521806064224d3275d7a54ff95d329905804d7;p=bluesky.git Check in a new very simple client/server storage protocol implementation. This is meant for local testing; the server writes data to a local disk without synchronization so data isn't guaranteed to be durable but this should provide better performance for benchmarking (where we care about the BlueSky performance and not the storage sever itself). --- diff --git a/bluesky/CMakeLists.txt b/bluesky/CMakeLists.txt index f890b33..2891f93 100644 --- a/bluesky/CMakeLists.txt +++ b/bluesky/CMakeLists.txt @@ -5,7 +5,7 @@ link_directories("${LIBS3_BUILD_DIR}/lib" ${KVSTORE_DIR}) add_library(bluesky SHARED cache.c cleaner.c cloudlog.c crc32c.c crypto.c debug.c dir.c file.c imap.c init.c inode.c log.c serialize.c store.c store-bdb.c - store-kv.cc store-multi.c store-s3.c util.c) + store-kv.cc store-multi.c store-s3.c store-simple.c util.c) add_executable(bluesky-test main.c) set(CMAKE_C_FLAGS "-Wall -std=gnu99 ${CMAKE_C_FLAGS}") diff --git a/bluesky/init.c b/bluesky/init.c index 2ac2f23..df33709 100644 --- a/bluesky/init.c +++ b/bluesky/init.c @@ -59,6 +59,7 @@ void bluesky_store_init_s3(void); void bluesky_store_init_kv(void); void bluesky_store_init_multi(void); void bluesky_store_init_bdb(void); +void bluesky_store_init_simple(void); /* Initialize the BlueSky library and dependent libraries. */ void bluesky_init(void) @@ -80,4 +81,5 @@ void bluesky_init(void) bluesky_store_init_s3(); bluesky_store_init_multi(); bluesky_store_init_bdb(); + bluesky_store_init_simple(); } diff --git a/bluesky/store-simple.c b/bluesky/store-simple.c new file mode 100644 index 0000000..ef68e11 --- /dev/null +++ b/bluesky/store-simple.c @@ -0,0 +1,296 @@ +/* Blue Sky: File Systems in the Cloud + * + * Copyright (C) 2009 The Regents of the University of California + * Written by Michael Vrable + * + * TODO: Licensing + */ + +/* Interface to the simple BlueSky test storage server. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "bluesky-private.h" + +typedef struct { + GThreadPool *thread_pool; + struct sockaddr_in server_addr; +} SimpleStore; + +static gboolean write_data(int fd, const char *buf, size_t len) +{ + while (len > 0) { + ssize_t bytes = write(fd, buf, len); + if (bytes < 0) { + if (errno == EINTR) + continue; + return FALSE; + } + buf += bytes; + len -= bytes; + } + return TRUE; +} + +static ssize_t read_line(int fd, char *buf, size_t maxlen) +{ + size_t buflen = 0; + + /* Leave room for a null byte at the end */ + maxlen--; + + while (buflen == 0 || memchr(buf, '\n', buflen) == NULL) { + ssize_t bytes; + + if (buflen == maxlen) { + return -1; + } + + bytes = read(fd, buf + buflen, maxlen - buflen); + if (bytes < 0) { + if (errno == EINTR) + continue; + else + return -1; + } + if (bytes == 0) + break; + + g_assert(bytes <= maxlen - buflen); + buflen += bytes; + } + + buf[buflen] = '\0'; + return buflen; +} + +static gboolean read_bytes(int fd, char *buf, size_t len) +{ + while (len > 0) { + ssize_t bytes; + + bytes = read(fd, buf, len); + if (bytes < 0) { + if (errno == EINTR) + continue; + else + return FALSE; + } + if (bytes == 0) + return FALSE; + + g_assert(bytes <= len); + len -= bytes; + buf += bytes; + } + + return TRUE; +} + +/* TODO: Re-use a connection for multiple requests, and support pipelining. */ +static void simplestore_task(gpointer a, gpointer b) +{ + BlueSkyStoreAsync *async = (BlueSkyStoreAsync *)a; + SimpleStore *server = (SimpleStore *)bluesky_store_async_get_handle(async); + + async->status = ASYNC_RUNNING; + + int fd = socket(PF_INET, SOCK_STREAM, 0); + if (fd < 0) { + g_warning("Error creating simplestore socket: %m"); + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); + return; + } + + if (connect(fd, (struct sockaddr *)&server->server_addr, + sizeof(server->server_addr)) < 0) { + g_warning("Error connecting to simplestore server: %m"); + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); + return; + } + + switch (async->op) { + case STORE_OP_GET: + { + async->result = -1; + char *cmd = g_strdup_printf("GET %s %zd %zd\n", + async->key, async->start, async->len); + char result_buf[256]; + + if (!write_data(fd, cmd, strlen(cmd))) { + g_free(cmd); + break; + } + + g_free(cmd); + + int bytes = read_line(fd, result_buf, sizeof(result_buf)); + if (bytes < 0) + break; + int result = atoi(result_buf); + if (result < 0) + break; + + char *data = g_malloc(result); + if (strchr(result_buf, '\n') != NULL) { + int header_size = strchr(result_buf, '\n') - result_buf; + int data_bytes = bytes - header_size; + if (data_bytes > result) + data_bytes = result; + memcpy(data, result_buf + header_size, data_bytes); + if (!read_bytes(fd, data + data_bytes, result - data_bytes)) + break; + } else { + if (!read_bytes(fd, data, result)) + break; + } + + async->data = bluesky_string_new(data, result); + async->result = 0; + break; + } + + case STORE_OP_PUT: + { + async->result = -1; + char *cmd = g_strdup_printf("PUT %s %zd\n", + async->key, async->data->len); + char result_buf[256]; + + if (!write_data(fd, cmd, strlen(cmd))) { + g_free(cmd); + break; + } + + g_free(cmd); + if (!write_data(fd, async->data->data, async->data->len)) { + break; + } + + if (read_line(fd, result_buf, sizeof(result_buf)) < 0) + break; + if (atoi(result_buf) != 0) + break; + + async->result = 0; + break; + } + + default: + break; + } + + bluesky_store_async_mark_complete(async); + bluesky_store_async_unref(async); + + close(fd); +} + +static char *simplestore_lookup_last(gpointer s, const char *prefix) +{ + return NULL; +} + +static gpointer simplestore_new(const gchar *path) +{ + SimpleStore *store = g_new0(SimpleStore, 1); + + /* TODO: Right now we leak this memory. We should probably clean up in + * simplestore_destroy, but it's not a big deal. */ + const gchar *host = "127.0.0.1", *port = "8257"; + if (path != NULL) { + gchar **target = g_strsplit(path, ":", 0); + if (target[0] != NULL) { + host = target[0]; + if (target[1] != NULL) { + port = target[1]; + } + } + } + + g_print("simplestore: %s port %s\n", host, port); + + struct addrinfo hints; + struct addrinfo *lookup_result = NULL; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + int res = getaddrinfo(host, port, &hints, &lookup_result); + if (res != 0) { + fprintf(stderr, "simplestore: cannot resolve target name: %s\n", + gai_strerror(res)); + return NULL; + } + freeaddrinfo(lookup_result); + for (struct addrinfo *ai = lookup_result; ai != NULL; ai = ai->ai_next) { + printf("flags=%d family=%d socktype=%d proto=%d\n", + ai->ai_flags, + ai->ai_family, + ai->ai_socktype, + ai->ai_protocol); + if (ai->ai_addrlen == sizeof(struct sockaddr_in)) { + memcpy(&store->server_addr, ai->ai_addr, + sizeof(struct sockaddr_in)); + } else { + fprintf(stderr, "Warning: Bad address record size!\n"); + } + } + + store->thread_pool = g_thread_pool_new(simplestore_task, NULL, + bluesky_max_threads, FALSE, NULL); + + return store; +} + +static void simplestore_destroy(gpointer store) +{ +} + +static void simplestore_submit(gpointer store, BlueSkyStoreAsync *async) +{ + SimpleStore *server = (SimpleStore *)store; + + g_return_if_fail(async->status == ASYNC_NEW); + g_return_if_fail(async->op != STORE_OP_NONE); + + switch (async->op) { + case STORE_OP_GET: + case STORE_OP_PUT: + async->status = ASYNC_PENDING; + bluesky_store_async_ref(async); + g_thread_pool_push(server->thread_pool, async, NULL); + break; + + default: + g_warning("Uknown operation type for simplestore: %d\n", async->op); + bluesky_store_async_mark_complete(async); + break; + } +} + +static void simplestore_cleanup(gpointer store, BlueSkyStoreAsync *async) +{ +} + +static BlueSkyStoreImplementation store_impl = { + .create = simplestore_new, + .destroy = simplestore_destroy, + .submit = simplestore_submit, + .cleanup = simplestore_cleanup, + .lookup_last = simplestore_lookup_last, +}; + +void bluesky_store_init_simple(void) +{ + bluesky_store_register(&store_impl, "simple"); +} diff --git a/filestore/server.c b/filestore/server.c new file mode 100644 index 0000000..6bdc60f --- /dev/null +++ b/filestore/server.c @@ -0,0 +1,464 @@ +/* A very simple AWS-like server, without any authentication. This is meant + * performance testing in a local environment. The server offers weak + * guarantees on data durability--data is stored directly to the file system + * without synchronization, so data might be lost in a crash. This should + * offer good performance though for benchmarking. + * + * Protocol: Each request is a whitespace-separated (typically, a single space) + * list of parameters terminated by a newline character. The response is a + * line containing a response code and a body length (again white-separated and + * newline-terminated) followed by the response body. Requests: + * GET filename offset length + * PUT filename length + * DELETE filename + * LIST prefix + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Maximum number of connections the server will queue waiting for us to call + * accept(). */ +#define TCP_BACKLOG 32 + +#define WHITESPACE " \t\r\n" +#define MAX_ARGS 4 + +/* Maximum length of a command that we will accept. */ +#define MAX_CMD_LEN 4096 + +#define MAX_OBJECT_SIZE (256 << 20) + +enum command { GET, PUT, LIST, DELETE }; + +void write_data(int fd, const char *buf, size_t len) +{ + while (len > 0) { + ssize_t bytes = write(fd, buf, len); + if (bytes < 0) { + if (errno == EINTR) + continue; + exit(1); + } + buf += bytes; + len -= bytes; + } +} + +/* SIGCHLD handler, used to clean up processes that handle the connections. */ +static void sigchld_handler(int signal) +{ + int pid; + int reaped = 0; + + while ((pid = waitpid(WAIT_ANY, NULL, WNOHANG)) > 0) { + reaped++; + } + + if (pid < 0) { + if (errno == ECHILD && reaped) { + /* Don't print an error for the caes that we successfully cleaned + * up after all children. */ + } else { + perror("waitpid"); + } + } +} + +/* Return a text representation of a socket address. Returns a pointer to a + * static buffer so it is non-reentrant. */ +const char *sockname(struct sockaddr_in *addr, socklen_t len) +{ + static char buf[128]; + if (len < sizeof(struct sockaddr_in)) + return NULL; + if (addr->sin_family != AF_INET) + return NULL; + + uint32_t ip = ntohl(addr->sin_addr.s_addr); + sprintf(buf, "%d.%d.%d.%d:%d", + (int)((ip >> 24) & 0xff), + (int)((ip >> 16) & 0xff), + (int)((ip >> 8) & 0xff), + (int)(ip & 0xff), + ntohs(addr->sin_port)); + + return buf; +} + +/* Convert a path from a client to the actual filename used. Returns a string + * that must be freed. */ +char *normalize_path(const char *in) +{ + char *out = malloc(2 * strlen(in) + 1); + int i, j; + for (i = 0, j = 0; in[i] != '\0'; i++) { + if (in[i] == '/') { + out[j++] = '_'; + out[j++] = 's'; + } else if (in[i] == '_') { + out[j++] = '_'; + out[j++] = 'u'; + } else { + out[j++] = in[i]; + } + } + out[j++] = '\0'; + return out; +} + +void cmd_get(int fd, char *path, size_t start, ssize_t len) +{ + char buf[65536]; + + char *response = "-1\n"; + int file = open(path, O_RDONLY); + if (file < 0) { + write_data(fd, response, strlen(response)); + return; + } + + struct stat statbuf; + if (fstat(file, &statbuf) < 0) { + write_data(fd, response, strlen(response)); + return; + } + + size_t filelen = statbuf.st_size; + sprintf(buf, "%zd\n", filelen); + write_data(fd, buf, strlen(buf)); + + while (filelen > 0) { + size_t needed = filelen > sizeof(buf) ? sizeof(buf) : filelen; + ssize_t bytes = read(file, buf, needed); + if (bytes < 0 && errno == EINTR) + continue; + if (bytes <= 0) { + /* Error reading necessary data, but we already told the client the + * file size so pad the data to the client with null bytes. */ + memset(buf, 0, needed); + bytes = needed; + } + write_data(fd, buf, bytes); + filelen -= bytes; + } + + close(file); +} + +void cmd_put(int fd, char *path, char *buf, + size_t object_size, size_t buf_used) +{ + while (buf_used < object_size) { + ssize_t bytes; + + bytes = read(fd, buf + buf_used, object_size - buf_used); + if (bytes < 0) { + if (errno == EINTR) + continue; + else + exit(1); + } + + if (bytes == 0) + exit(1); + + assert(bytes <= object_size - buf_used); + buf_used += bytes; + + continue; + } + + printf("Got %zd bytes for object '%s'\n", buf_used, path); + + char *response = "-1\n"; + int file = open(path, O_WRONLY|O_CREAT|O_TRUNC, 0600); + if (file >= 0) { + write_data(file, buf, object_size); + response = "0\n"; + close(file); + } + + write_data(fd, response, strlen(response)); +} + +/* The core handler for processing requests from the client. This can be + * single-threaded since each connection is handled in a separate process. */ +void handle_connection(int fd) +{ + char cmdbuf[MAX_CMD_LEN]; + size_t buflen = 0; + + while (1) { + /* Keep reading data until reaching a newline, so that a complete + * command can be parsed. */ + if (buflen == 0 || memchr(cmdbuf, '\n', buflen) == NULL) { + ssize_t bytes; + + if (buflen == MAX_CMD_LEN) { + /* Command is too long and thus malformed; close the + * connection. */ + return; + } + + bytes = read(fd, cmdbuf + buflen, MAX_CMD_LEN - buflen); + if (bytes < 0) { + if (errno == EINTR) + continue; + else + return; + } + if (bytes == 0) + return; + + assert(bytes <= MAX_CMD_LEN - buflen); + buflen += bytes; + + continue; + } + + size_t cmdlen = (char *)memchr(cmdbuf, '\n', buflen) - cmdbuf + 1; + cmdbuf[cmdlen - 1] = '\0'; + char *token; + int arg_count; + char *args[MAX_ARGS]; + int arg_int[MAX_ARGS]; + for (token = strtok(cmdbuf, WHITESPACE), arg_count = 0; + token != NULL; + token = strtok(NULL, WHITESPACE), arg_count++) + { + args[arg_count] = token; + arg_int[arg_count] = atoi(token); + } + + if (arg_count < 2) { + return; + } + char *path = normalize_path(args[1]); + enum command cmd; + if (strcmp(args[0], "GET") == 0 && arg_count == 4) { + cmd = GET; + } else if (strcmp(args[0], "PUT") == 0 && arg_count == 3) { + cmd = PUT; + } else if (strcmp(args[0], "DELETE") == 0 && arg_count == 2) { + cmd = DELETE; + } else { + return; + } + + if (cmdlen < buflen) + memmove(cmdbuf, cmdbuf + cmdlen, buflen - cmdlen); + buflen -= cmdlen; + + switch (cmd) { + case GET: + cmd_get(fd, path, arg_int[2], arg_int[3]); + break; + case PUT: { + size_t object_size = arg_int[2]; + if (object_size > MAX_OBJECT_SIZE) + return; + char *data_buf = malloc(object_size); + if (data_buf == NULL) + return; + size_t data_buflen = buflen > object_size ? object_size : buflen; + if (data_buflen > 0) + memcpy(data_buf, cmdbuf, data_buflen); + if (data_buflen < buflen) { + memmove(cmdbuf, cmdbuf + data_buflen, buflen - data_buflen); + buflen -= cmdlen; + } else { + buflen = 0; + } + cmd_put(fd, path, data_buf, object_size, data_buflen); + break; + } + case DELETE: + //cmd_delete(fd, path); + break; + default: + return; + } + + free(path); + } +} + +/* Create a listening TCP socket on a new address (we do not use a fixed port). + * Return the file descriptor of the listening socket. */ +int server_init() +{ + int fd; + struct sockaddr_in server_addr; + socklen_t addr_len = sizeof(server_addr); + + fd = socket(PF_INET, SOCK_STREAM, 0); + if (fd < 0) { + perror("socket"); + exit(1); + } + + if (listen(fd, TCP_BACKLOG) < 0) { + perror("listen"); + exit(1); + } + + if (getsockname(fd, (struct sockaddr *)&server_addr, &addr_len) < 0) { + perror("getsockname"); + exit(1); + } + + printf("Server listening on %s ...\n", sockname(&server_addr, addr_len)); + fflush(stdout); + + return fd; +} + +/* Process-based server main loop. Wait for a connection, accept it, fork a + * child process to handle the connection, and repeat. Child processes are + * reaped in the SIGCHLD handler. */ +void server_main(int listen_fd) +{ + struct sigaction handler; + + /* Install signal handler for SIGCHLD. */ + handler.sa_handler = sigchld_handler; + sigemptyset(&handler.sa_mask); + handler.sa_flags = SA_RESTART; + if (sigaction(SIGCHLD, &handler, NULL) < 0) { + perror("sigaction"); + exit(1); + } + + while (1) { + struct sockaddr_in client_addr; + socklen_t addr_len = sizeof(client_addr); + int client_fd = accept(listen_fd, (struct sockaddr *)&client_addr, + &addr_len); + int pid; + + /* Very simple error handling. Exit if something goes wrong. Later, + * might want to look into not killing off current connections abruptly + * if we encounter an error in the accept(). */ + if (client_fd < 0) { + if (errno == EINTR) + continue; + + perror("accept"); + exit(1); + } + + printf("Accepted connection from %s ...\n", + sockname(&client_addr, addr_len)); + fflush(stdout); + + pid = fork(); + if (pid < 0) { + perror("fork"); + } else if (pid == 0) { + handle_connection(client_fd); + printf("Closing connection %s ...\n", + sockname(&client_addr, addr_len)); + close(client_fd); + exit(0); + } + + close(client_fd); + } +} + +/* Print a help message describing command-line options to stdout. */ +static void usage(char *argv0) +{ + printf("Usage: %s [options...]\n" + "A simple key-value storage server.\n", argv0); +} + +int main(int argc, char *argv[]) +{ + int fd; + //int i; + int display_help = 0, cmdline_error = 0; + +#if 0 + for (i = 1; i < argc; i++) { + if (strcmp(argv[i], "-help") == 0) { + display_help = 1; + } else if (strcmp(argv[i], "-document_root") == 0) { + i++; + if (i >= argc) { + fprintf(stderr, + "Error: Argument to -document_root expected.\n"); + cmdline_error = 1; + } else { + document_root = argv[i]; + } + } else if (strcmp(argv[i], "-port") == 0) { + i++; + if (i >= argc) { + fprintf(stderr, + "Error: Expected port number after -port.\n"); + cmdline_error = 1; + } else { + server_port = atoi(argv[i]); + if (server_port < 1 || server_port > 65535) { + fprintf(stderr, + "Error: Port must be between 1 and 65535.\n"); + cmdline_error = 1; + } + } + } else if (strcmp(argv[i], "-mime_types") == 0) { + i++; + if (i >= argc) { + fprintf(stderr, + "Error: Argument to -mime_types expected.\n"); + cmdline_error = 1; + } else { + mime_types_file = argv[i]; + } + } else if (strcmp(argv[i], "-log") == 0) { + i++; + if (i >= argc) { + fprintf(stderr, + "Error: Argument to -log expected.\n"); + cmdline_error = 1; + } else { + log_fname = argv[i]; + } + } else { + fprintf(stderr, "Error: Unrecognized option '%s'\n", argv[i]); + cmdline_error = 1; + } + } +#endif + + /* Display a help message if requested, or let the user know to look at the + * help message if there was an error parsing the command line. In either + * case, the server never starts. */ + if (display_help) { + usage(argv[0]); + exit(0); + } else if (cmdline_error) { + fprintf(stderr, "Run '%s -help' for a summary of options.\n", argv[0]); + exit(2); + } + + fd = server_init(); + if (fd >= 0) { + server_main(fd); + } + + return 0; +}