Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / nfs3 / synreadbench.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 /* Synthetic client for benchmarking: a tool for directly generating NFS
32  * requests and reading back the responses, so that we can exercise the server
33  * differently than the Linux kernel NFS client does.
34  *
35  * Much of this is copied from rpc.c and other BlueSky server code but is
36  * designed to run independently of BlueSky. */
37
38 #include "mount_prot.h"
39 #include "nfs3_prot.h"
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <inttypes.h>
43 #include <rpc/pmap_clnt.h>
44 #include <string.h>
45 #include <signal.h>
46 #include <memory.h>
47 #include <netdb.h>
48 #include <sys/socket.h>
49 #include <sys/types.h>
50 #include <netinet/in.h>
51 #include <netinet/ip.h>
52 #include <glib.h>
53
54 /* TCP port number to use for NFS protocol.  (Would be 2049 in standard NFS.) */
55 #define NFS_SERVICE_PORT 2051
56
57 /* Maximum size of a single RPC message that we will accept (8 MB). */
58 #define MAX_RPC_MSGSIZE (8 << 20)
59
60 FILE *logfile = NULL;
61
62 int warmup_mode = 0;
63 int threads;
64 int completed = 0;
65 int read_size = 32768;
66
67 struct bench_file {
68     uint64_t inum;
69     uint64_t size;
70 };
71
72 struct data_point {
73     uint32_t timestamp;     // Unix timestamp of completion
74     uint32_t latency;       // Latency, in microseconds
75 } __attribute__((packed));
76
77 GArray *bench_files;
78
79 struct rpc_reply {
80     uint32_t xid;
81     uint32_t type;
82     uint32_t stat;
83     uint32_t verf_flavor;
84     uint32_t verf_len;
85     uint32_t accept_stat;
86 };
87
88 struct rpc_fail_reply {
89     uint32_t xid;
90     uint32_t type;
91     uint32_t stat;
92     uint32_t verf_flavor;
93     uint32_t verf_len;
94     uint32_t accept_stat;
95 };
96
97 struct rpc_call_header {
98     uint32_t xid;
99     uint32_t mtype;
100     uint32_t rpcvers;
101     uint32_t prog;
102     uint32_t vers;
103     uint32_t proc;
104 };
105
106 struct rpc_auth {
107     uint32_t flavor;
108     uint32_t len;
109 };
110
111 typedef struct {
112     GIOChannel *channel;
113
114     /* The reassembled message, thus far. */
115     GString *msgbuf;
116
117     /* Remaining number of bytes in this message fragment; 0 if we next expect
118      * another fragment header. */
119     uint32_t frag_len;
120
121     /* If frag_len is zero: the number of bytes of the fragment header that
122      * have been read so far. */
123     int frag_hdr_bytes;
124
125     /* Mapping of XID values to outstanding RPC calls. */
126     GHashTable *xid_table;
127 } NFSConnection;
128
129
130 typedef void (*NFSFunc)(NFSConnection *nfs,
131                         gpointer user_data, const char *reply, size_t len);
132
133 typedef struct {
134     NFSFunc callback;
135     gpointer user_data;
136     int64_t start, end;
137 } CallInfo;
138
139 static GMainLoop *main_loop;
140
141 int64_t now_hires()
142 {
143     struct timespec time;
144
145     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
146         perror("clock_gettime");
147         return 0;
148     }
149
150     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
151 }
152
153 static void do_write(NFSConnection *conn, const char *buf, size_t len)
154 {
155     while (len > 0) {
156         gsize written = 0;
157         switch (g_io_channel_write_chars(conn->channel, buf, len,
158                                          &written, NULL)) {
159         case G_IO_STATUS_ERROR:
160         case G_IO_STATUS_EOF:
161         case G_IO_STATUS_AGAIN:
162             fprintf(stderr, "Error writing to socket!\n");
163             return;
164         case G_IO_STATUS_NORMAL:
165             len -= written;
166             buf += written;
167             break;
168         }
169     }
170 }
171
172 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
173                      NFSFunc completion_handler, gpointer user_data)
174 {
175     static int xid_count = 0;
176     struct rpc_call_header header;
177     struct rpc_auth auth;
178
179     header.xid = GUINT32_TO_BE(xid_count++);
180     header.mtype = 0;
181     header.rpcvers = GUINT32_TO_BE(2);
182     header.prog = GUINT32_TO_BE(NFS_PROGRAM);
183     header.vers = GUINT32_TO_BE(NFS_V3);
184     header.proc = GUINT32_TO_BE(proc);
185
186     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
187     auth.len = 0;
188
189     CallInfo *info = g_new0(CallInfo, 1);
190
191     uint32_t fragment = htonl(0x80000000
192                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
193     do_write(nfs, (const char *)&fragment, sizeof(fragment));
194     do_write(nfs, (const char *)&header, sizeof(header));
195     do_write(nfs, (const char *)&auth, sizeof(auth));
196     do_write(nfs, (const char *)&auth, sizeof(auth));
197     do_write(nfs, msg->str, msg->len);
198     g_io_channel_flush(nfs->channel, NULL);
199
200     info->start = now_hires();
201     info->callback = completion_handler;
202     info->user_data = user_data;
203     g_hash_table_insert(nfs->xid_table,
204                         GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
205 }
206
207 static void process_reply(NFSConnection *nfs, GString *msg)
208 {
209     struct rpc_reply *reply = (struct rpc_reply *)msg->str;
210
211     uint32_t xid = GUINT32_FROM_BE(reply->xid);
212
213     gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
214     CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
215     if (info == NULL) {
216         g_print("Could not match reply XID %d with a call!\n", xid);
217         return;
218     }
219
220     struct data_point d;
221     info->end = now_hires();
222     d.timestamp = info->end / 1000000000;
223     d.latency = (info->end - info->start + 500) / 1000; /* Round off */
224     //printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
225     if (info->callback != NULL)
226         info->callback(nfs, info->user_data,
227                        msg->str + sizeof(*reply), msg->len - sizeof(*reply));
228
229     g_hash_table_remove(nfs->xid_table, key);
230     g_free(info);
231     if (logfile != NULL) {
232         fwrite(&d, sizeof(d), 1, logfile);
233         fflush(logfile);
234     }
235
236     completed++;
237     if (warmup_mode) {
238         printf("Done warming up %d\n", completed);
239         int scale = 1;
240         if (read_size > (1 << 20))
241             scale = read_size / (1 << 20);
242         if (completed == bench_files->len * scale)
243             g_main_loop_quit(main_loop);
244     }
245 }
246
247 static gboolean read_handler(GIOChannel *channel,
248                              GIOCondition condition,
249                              gpointer data)
250 {
251     NFSConnection *nfs = (NFSConnection *)data;
252
253     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
254
255     /* If we have not yet read in the fragment header, do that first.  This is
256      * 4 bytes that indicates the number of bytes in the message to follow
257      * (with the high bit set if this is the last fragment making up the
258      * message). */
259     if (nfs->frag_len == 0) {
260         bytes_to_read = 4 - nfs->frag_hdr_bytes;
261     } else {
262         bytes_to_read = nfs->frag_len & 0x7fffffff;
263     }
264
265     if (bytes_to_read > MAX_RPC_MSGSIZE
266         || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
267     {
268         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
269                 bytes_to_read);
270         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
271         return FALSE;
272     }
273
274     gsize bytes_read = 0;
275     g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
276     char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
277     switch (g_io_channel_read_chars(nfs->channel, buf,
278                                     bytes_to_read, &bytes_read, NULL)) {
279     case G_IO_STATUS_NORMAL:
280         break;
281     case G_IO_STATUS_AGAIN:
282         return TRUE;
283     case G_IO_STATUS_EOF:
284         if (bytes_read == bytes_to_read)
285             break;
286         /* else fall through */
287     case G_IO_STATUS_ERROR:
288         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
289                 g_io_channel_unix_get_fd(nfs->channel));
290         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
291         /* TODO: Clean up connection object. */
292         return FALSE;
293     }
294
295     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
296
297     g_string_set_size(nfs->msgbuf,
298                       nfs->msgbuf->len - (bytes_to_read - bytes_read));
299
300     if (nfs->frag_len == 0) {
301         /* Handle reading in the fragment header.  If we've read the complete
302          * header, store the fragment size. */
303         nfs->frag_hdr_bytes += bytes_read;
304         if (nfs->frag_hdr_bytes == 4) {
305             memcpy((char *)&nfs->frag_len,
306                    &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
307             nfs->frag_len = ntohl(nfs->frag_len);
308             g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
309             nfs->frag_hdr_bytes = 0;
310         }
311     } else {
312         /* We were reading in the fragment body. */
313         nfs->frag_len -= bytes_read;
314
315         if (nfs->frag_len == 0x80000000) {
316             process_reply(nfs, nfs->msgbuf);
317             nfs->frag_len = 0;
318             g_string_set_size(nfs->msgbuf, 0);
319         }
320     }
321
322     return TRUE;
323 }
324
325 static void send_read_request(NFSConnection *nfs, uint64_t inum,
326                                uint64_t offset, uint64_t len);
327 static void submit_random_read(NFSConnection *nfs)
328 {
329     static int warmup_counter = 0;
330     struct bench_file *bf;
331
332     if (warmup_mode) {
333         int scale = 1;
334         if (read_size > (1 << 20)) {
335             scale = read_size / (1 << 20);
336         }
337         int filecount = bench_files->len;
338         printf("Warming up file %d\n", warmup_counter);
339         if (warmup_counter >= filecount * scale)
340             return;
341         bf = &g_array_index(bench_files, struct bench_file,
342                             warmup_counter % filecount);
343         send_read_request(nfs, bf->inum, (warmup_counter / filecount) << 20,
344                           read_size > (1 << 20) ? (1 << 20) : read_size);
345         warmup_counter++;
346         return;
347     }
348
349     bf = &g_array_index(bench_files, struct bench_file,
350                         g_random_int_range(0, bench_files->len));
351     int blocks = bf->size / read_size;
352     if (blocks == 0)
353         blocks = 1;
354
355     int offset = g_random_int_range(0, blocks);
356     send_read_request(nfs, bf->inum, offset * read_size, read_size);
357 }
358
359 static void finish_read_request(NFSConnection *nfs, gpointer user_data,
360                                 const char *reply, size_t len)
361 {
362     submit_random_read(nfs);
363 }
364
365 static void send_read_request(NFSConnection *nfs, uint64_t inum,
366                                uint64_t offset, uint64_t len)
367 {
368     struct nfs_fh3 fh;
369     uint64_t fhdata = GUINT64_TO_BE(inum);
370     fh.data.data_val = (char *)&fhdata;
371     fh.data.data_len = 8;
372     int i;
373
374     char buf[64];
375     struct read3args read;
376     memcpy(&read.file, &fh, sizeof(struct nfs_fh3));
377     read.offset = offset;
378     read.count = len;
379
380     GString *str = g_string_new("");
381     XDR xdr;
382     xdr_string_create(&xdr, str, XDR_ENCODE);
383     xdr_read3args(&xdr, &read);
384     send_rpc(nfs, NFSPROC3_READ, str, finish_read_request,
385              GINT_TO_POINTER((int)inum));
386     g_string_free(str, TRUE);
387 }
388
389 static gboolean idle_handler(gpointer data)
390 {
391     NFSConnection *nfs = (NFSConnection *)data;
392     int i;
393
394     for (i = 0; i < threads; i++) {
395         submit_random_read(nfs);
396     }
397
398 #if 0
399     g_print("Sending requests...\n");
400     for (i = 0; i < threads; i++) {
401         char buf[64];
402         struct diropargs3 lookup;
403         uint64_t rootfh = GUINT64_TO_BE(1);
404
405         sprintf(buf, "file-%d", i + 1);
406         lookup.dir.data.data_len = 8;
407         lookup.dir.data.data_val = (char *)&rootfh;
408         lookup.name = buf;
409
410         GString *str = g_string_new("");
411         XDR xdr;
412         xdr_string_create(&xdr, str, XDR_ENCODE);
413         xdr_diropargs3(&xdr, &lookup);
414         send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
415         g_string_free(str, TRUE);
416     }
417 #endif
418
419     return FALSE;
420 }
421
422 NFSConnection *nfs_connect(const char *hostname)
423 {
424     int result;
425     struct addrinfo hints;
426     struct addrinfo *ai = NULL;
427
428     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
429     if (fd < 0) {
430         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
431         exit(1);
432     }
433
434     memset(&hints, 0, sizeof(hints));
435     hints.ai_family = AF_INET;
436     hints.ai_socktype = SOCK_STREAM;
437     hints.ai_protocol = IPPROTO_TCP;
438     result = getaddrinfo(hostname, "2051", NULL, &ai);
439     if (result < 0 || ai == NULL) {
440         fprintf(stderr, "Hostname lookup failure for %s: %s\n",
441                 hostname, gai_strerror(result));
442         exit(1);
443     }
444
445     if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
446         fprintf(stderr, "Unable to connect to %s: %m\n", hostname);
447     }
448
449     freeaddrinfo(ai);
450
451     NFSConnection *conn = g_new0(NFSConnection, 1);
452     conn->msgbuf = g_string_new("");
453     conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
454
455     conn->channel = g_io_channel_unix_new(fd);
456     g_io_channel_set_encoding(conn->channel, NULL, NULL);
457     g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
458     g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
459
460     g_idle_add(idle_handler, conn);
461
462     return conn;
463 }
464
465 int main(int argc, char *argv[])
466 {
467     g_thread_init(NULL);
468     g_set_prgname("synclient");
469
470     bench_files = g_array_new(FALSE, TRUE, sizeof(struct bench_file));
471
472     FILE *inodes = fopen(argv[1], "r");
473     if (inodes == NULL) {
474         perror("fopen");
475         return 1;
476     }
477     while (!feof(inodes)) {
478         int i1 = -1, i2 = -1;
479         fscanf(inodes, "%d %d", &i1, &i2);
480         if (i1 < 0 || i2 < 0)
481             continue;
482
483         struct bench_file bf;
484         bf.inum = i1;
485         bf.size = i2;
486         g_array_append_val(bench_files, bf);
487     }
488     fclose(inodes);
489
490     threads = 8;
491     if (argc > 2)
492         threads = atoi(argv[2]);
493     if (argc > 3)
494         read_size = atoi(argv[3]);
495     if (argc > 4) {
496         if (strcmp(argv[4], "WARMUP") == 0) {
497             warmup_mode = 1;
498         } else {
499             logfile = fopen(argv[4], "wb");
500         }
501     }
502
503     main_loop = g_main_loop_new(NULL, FALSE);
504     nfs_connect("vrable2.sysnet.ucsd.edu");
505
506     g_main_loop_run(main_loop);
507
508     return 0;
509 }