Enable range requests in the simple storage backend
[bluesky.git] / filestore / server.c
1 /* A very simple AWS-like server, without any authentication.  This is meant
2  * performance testing in a local environment.  The server offers weak
3  * guarantees on data durability--data is stored directly to the file system
4  * without synchronization, so data might be lost in a crash.  This should
5  * offer good performance though for benchmarking.
6  *
7  * Protocol: Each request is a whitespace-separated (typically, a single space)
8  * list of parameters terminated by a newline character.  The response is a
9  * line containing a response code and a body length (again white-separated and
10  * newline-terminated) followed by the response body.  Requests:
11  *   GET filename offset length
12  *   PUT filename length
13  *   DELETE filename
14  *   LIST prefix
15  */
16
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <assert.h>
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <signal.h>
23 #include <string.h>
24 #include <time.h>
25 #include <unistd.h>
26 #include <netinet/in.h>
27 #include <sys/socket.h>
28 #include <sys/stat.h>
29 #include <sys/types.h>
30 #include <sys/wait.h>
31
32 /* Maximum number of connections the server will queue waiting for us to call
33  * accept(). */
34 #define TCP_BACKLOG 32
35
36 #define WHITESPACE " \t\r\n"
37 #define MAX_ARGS 4
38
39 /* Maximum length of a command that we will accept. */
40 #define MAX_CMD_LEN 4096
41
42 #define MAX_OBJECT_SIZE (256 << 20)
43
44 enum command { GET, PUT, LIST, DELETE };
45
46 void write_data(int fd, const char *buf, size_t len)
47 {
48     while (len > 0) {
49         ssize_t bytes = write(fd, buf, len);
50         if (bytes < 0) {
51             if (errno == EINTR)
52                 continue;
53             exit(1);
54         }
55         buf += bytes;
56         len -= bytes;
57     }
58 }
59
60 /* SIGCHLD handler, used to clean up processes that handle the connections. */
61 static void sigchld_handler(int signal)
62 {
63     int pid;
64     int reaped = 0;
65
66     while ((pid = waitpid(WAIT_ANY, NULL, WNOHANG)) > 0) {
67         reaped++;
68     }
69
70     if (pid < 0) {
71         if (errno == ECHILD && reaped) {
72             /* Don't print an error for the caes that we successfully cleaned
73              * up after all children. */
74         } else {
75             perror("waitpid");
76         }
77     }
78 }
79
80 /* Return a text representation of a socket address.  Returns a pointer to a
81  * static buffer so it is non-reentrant. */
82 const char *sockname(struct sockaddr_in *addr, socklen_t len)
83 {
84     static char buf[128];
85     if (len < sizeof(struct sockaddr_in))
86         return NULL;
87     if (addr->sin_family != AF_INET)
88         return NULL;
89
90     uint32_t ip = ntohl(addr->sin_addr.s_addr);
91     sprintf(buf, "%d.%d.%d.%d:%d",
92             (int)((ip >> 24) & 0xff),
93             (int)((ip >> 16) & 0xff),
94             (int)((ip >> 8) & 0xff),
95             (int)(ip & 0xff),
96             ntohs(addr->sin_port));
97
98     return buf;
99 }
100
101 /* Convert a path from a client to the actual filename used.  Returns a string
102  * that must be freed. */
103 char *normalize_path(const char *in)
104 {
105     char *out = malloc(2 * strlen(in) + 1);
106     int i, j;
107     for (i = 0, j = 0; in[i] != '\0'; i++) {
108         if (in[i] == '/') {
109             out[j++] = '_';
110             out[j++] = 's';
111         } else if (in[i] == '_') {
112             out[j++] = '_';
113             out[j++] = 'u';
114         } else {
115             out[j++] = in[i];
116         }
117     }
118     out[j++] = '\0';
119     return out;
120 }
121
122 void cmd_get(int fd, char *path, size_t start, ssize_t len)
123 {
124     char buf[65536];
125
126     char *response = "-1\n";
127     int file = open(path, O_RDONLY);
128     if (file < 0) {
129         write_data(fd, response, strlen(response));
130         return;
131     }
132
133     struct stat statbuf;
134     if (fstat(file, &statbuf) < 0) {
135         write_data(fd, response, strlen(response));
136         return;
137     }
138
139     size_t datalen = statbuf.st_size;
140     if (start > 0) {
141         if (start >= datalen) {
142             datalen = 0;
143         } else {
144             lseek(file, start, SEEK_SET);
145             datalen -= start;
146         }
147     }
148     if (len > 0 && len < datalen) {
149         datalen = len;
150     }
151
152     sprintf(buf, "%zd\n", datalen);
153     write_data(fd, buf, strlen(buf));
154
155     while (datalen > 0) {
156         size_t needed = datalen > sizeof(buf) ? sizeof(buf) : datalen;
157         ssize_t bytes = read(file, buf, needed);
158         if (bytes < 0 && errno == EINTR)
159             continue;
160         if (bytes <= 0) {
161             /* Error reading necessary data, but we already told the client the
162              * file size so pad the data to the client with null bytes. */
163             memset(buf, 0, needed);
164             bytes = needed;
165         }
166         write_data(fd, buf, bytes);
167         datalen -= bytes;
168     }
169
170     close(file);
171 }
172
173 void cmd_put(int fd, char *path, char *buf,
174              size_t object_size, size_t buf_used)
175 {
176     while (buf_used < object_size) {
177         ssize_t bytes;
178
179         bytes = read(fd, buf + buf_used, object_size - buf_used);
180         if (bytes < 0) {
181             if (errno == EINTR)
182                 continue;
183             else
184                 exit(1);
185         }
186
187         if (bytes == 0)
188             exit(1);
189
190         assert(bytes <= object_size - buf_used);
191         buf_used += bytes;
192
193         continue;
194     }
195
196     printf("Got %zd bytes for object '%s'\n", buf_used, path);
197
198     char *response = "-1\n";
199     int file = open(path, O_WRONLY|O_CREAT|O_TRUNC, 0600);
200     if (file >= 0) {
201         write_data(file, buf, object_size);
202         response = "0\n";
203         close(file);
204     }
205
206     write_data(fd, response, strlen(response));
207 }
208
209 /* The core handler for processing requests from the client.  This can be
210  * single-threaded since each connection is handled in a separate process. */
211 void handle_connection(int fd)
212 {
213     char cmdbuf[MAX_CMD_LEN];
214     size_t buflen = 0;
215
216     while (1) {
217         /* Keep reading data until reaching a newline, so that a complete
218          * command can be parsed. */
219         if (buflen == 0 || memchr(cmdbuf, '\n', buflen) == NULL) {
220             ssize_t bytes;
221
222             if (buflen == MAX_CMD_LEN) {
223                 /* Command is too long and thus malformed; close the
224                  * connection. */
225                 return;
226             }
227
228             bytes = read(fd, cmdbuf + buflen, MAX_CMD_LEN - buflen);
229             if (bytes < 0) {
230                 if (errno == EINTR)
231                     continue;
232                 else
233                     return;
234             }
235             if (bytes == 0)
236                 return;
237
238             assert(bytes <= MAX_CMD_LEN - buflen);
239             buflen += bytes;
240
241             continue;
242         }
243
244         size_t cmdlen = (char *)memchr(cmdbuf, '\n', buflen) - cmdbuf + 1;
245         cmdbuf[cmdlen - 1] = '\0';
246         char *token;
247         int arg_count;
248         char *args[MAX_ARGS];
249         int arg_int[MAX_ARGS];
250         for (token = strtok(cmdbuf, WHITESPACE), arg_count = 0;
251              token != NULL;
252              token = strtok(NULL, WHITESPACE), arg_count++)
253         {
254             args[arg_count] = token;
255             arg_int[arg_count] = atoi(token);
256         }
257
258         if (arg_count < 2) {
259             return;
260         }
261         char *path = normalize_path(args[1]);
262         enum command cmd;
263         if (strcmp(args[0], "GET") == 0 && arg_count == 4) {
264             cmd = GET;
265         } else if (strcmp(args[0], "PUT") == 0 && arg_count == 3) {
266             cmd = PUT;
267         } else if (strcmp(args[0], "DELETE") == 0 && arg_count == 2) {
268             cmd = DELETE;
269         } else {
270             return;
271         }
272
273         if (cmdlen < buflen)
274             memmove(cmdbuf, cmdbuf + cmdlen, buflen - cmdlen);
275         buflen -= cmdlen;
276
277         switch (cmd) {
278         case GET:
279             cmd_get(fd, path, arg_int[2], arg_int[3]);
280             break;
281         case PUT: {
282             size_t object_size = arg_int[2];
283             if (object_size > MAX_OBJECT_SIZE)
284                 return;
285             char *data_buf = malloc(object_size);
286             if (data_buf == NULL)
287                 return;
288             size_t data_buflen = buflen > object_size ? object_size : buflen;
289             if (data_buflen > 0)
290                 memcpy(data_buf, cmdbuf, data_buflen);
291             if (data_buflen < buflen) {
292                 memmove(cmdbuf, cmdbuf + data_buflen, buflen - data_buflen);
293                 buflen -= cmdlen;
294             } else {
295                 buflen = 0;
296             }
297             cmd_put(fd, path, data_buf, object_size, data_buflen);
298             break;
299         }
300         case DELETE:
301             //cmd_delete(fd, path);
302             break;
303         default:
304             return;
305         }
306
307         free(path);
308     }
309 }
310
311 /* Create a listening TCP socket on a new address (we do not use a fixed port).
312  * Return the file descriptor of the listening socket. */
313 int server_init()
314 {
315     int fd;
316     struct sockaddr_in server_addr;
317     socklen_t addr_len = sizeof(server_addr);
318
319     fd = socket(PF_INET, SOCK_STREAM, 0);
320     if (fd < 0) {
321         perror("socket");
322         exit(1);
323     }
324
325     if (listen(fd, TCP_BACKLOG) < 0) {
326         perror("listen");
327         exit(1);
328     }
329
330     if (getsockname(fd, (struct sockaddr *)&server_addr, &addr_len) < 0) {
331         perror("getsockname");
332         exit(1);
333     }
334
335     printf("Server listening on %s ...\n", sockname(&server_addr, addr_len));
336     fflush(stdout);
337
338     return fd;
339 }
340
341 /* Process-based server main loop.  Wait for a connection, accept it, fork a
342  * child process to handle the connection, and repeat.  Child processes are
343  * reaped in the SIGCHLD handler. */
344 void server_main(int listen_fd)
345 {
346     struct sigaction handler;
347
348     /* Install signal handler for SIGCHLD. */
349     handler.sa_handler = sigchld_handler;
350     sigemptyset(&handler.sa_mask);
351     handler.sa_flags = SA_RESTART;
352     if (sigaction(SIGCHLD, &handler, NULL) < 0) {
353         perror("sigaction");
354         exit(1);
355     }
356
357     while (1) {
358         struct sockaddr_in client_addr;
359         socklen_t addr_len = sizeof(client_addr);
360         int client_fd = accept(listen_fd, (struct sockaddr *)&client_addr,
361                                &addr_len);
362         int pid;
363
364         /* Very simple error handling.  Exit if something goes wrong.  Later,
365          * might want to look into not killing off current connections abruptly
366          * if we encounter an error in the accept(). */
367         if (client_fd < 0) {
368             if (errno == EINTR)
369                 continue;
370
371             perror("accept");
372             exit(1);
373         }
374
375         printf("Accepted connection from %s ...\n",
376                sockname(&client_addr, addr_len));
377         fflush(stdout);
378
379         pid = fork();
380         if (pid < 0) {
381             perror("fork");
382         } else if (pid == 0) {
383             handle_connection(client_fd);
384             printf("Closing connection %s ...\n",
385                    sockname(&client_addr, addr_len));
386             close(client_fd);
387             exit(0);
388         }
389
390         close(client_fd);
391     }
392 }
393
394 /* Print a help message describing command-line options to stdout. */
395 static void usage(char *argv0)
396 {
397     printf("Usage: %s [options...]\n"
398            "A simple key-value storage server.\n", argv0);
399 }
400
401 int main(int argc, char *argv[])
402 {
403     int fd;
404     //int i;
405     int display_help = 0, cmdline_error = 0;
406
407 #if 0
408     for (i = 1; i < argc; i++) {
409         if (strcmp(argv[i], "-help") == 0) {
410             display_help = 1;
411         } else if (strcmp(argv[i], "-document_root") == 0) {
412             i++;
413             if (i >= argc) {
414                 fprintf(stderr,
415                         "Error: Argument to -document_root expected.\n");
416                 cmdline_error = 1;
417             } else {
418                 document_root = argv[i];
419             }
420         } else if (strcmp(argv[i], "-port") == 0) {
421             i++;
422             if (i >= argc) {
423                 fprintf(stderr,
424                         "Error: Expected port number after -port.\n");
425                 cmdline_error = 1;
426             } else {
427                 server_port = atoi(argv[i]);
428                 if (server_port < 1 || server_port > 65535) {
429                     fprintf(stderr,
430                             "Error: Port must be between 1 and 65535.\n");
431                     cmdline_error = 1;
432                 }
433             }
434         } else if (strcmp(argv[i], "-mime_types") == 0) {
435             i++;
436             if (i >= argc) {
437                 fprintf(stderr,
438                         "Error: Argument to -mime_types expected.\n");
439                 cmdline_error = 1;
440             } else {
441                 mime_types_file = argv[i];
442             }
443         } else if (strcmp(argv[i], "-log") == 0) {
444             i++;
445             if (i >= argc) {
446                 fprintf(stderr,
447                         "Error: Argument to -log expected.\n");
448                 cmdline_error = 1;
449             } else {
450                 log_fname = argv[i];
451             }
452         } else {
453             fprintf(stderr, "Error: Unrecognized option '%s'\n", argv[i]);
454             cmdline_error = 1;
455         }
456     }
457 #endif
458
459     /* Display a help message if requested, or let the user know to look at the
460      * help message if there was an error parsing the command line.  In either
461      * case, the server never starts. */
462     if (display_help) {
463         usage(argv[0]);
464         exit(0);
465     } else if (cmdline_error) {
466         fprintf(stderr, "Run '%s -help' for a summary of options.\n", argv[0]);
467         exit(2);
468     }
469
470     fd = server_init();
471     if (fd >= 0) {
472         server_main(fd);
473     }
474
475     return 0;
476 }