Work on a tool for a synthetic read benchmark
[bluesky.git] / nfs3 / synreadbench.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10  * requests and reading back the responses, so that we can exercise the server
11  * differently than the Linux kernel NFS client does.
12  *
13  * Much of this is copied from rpc.c and other BlueSky server code but is
14  * designed to run independently of BlueSky. */
15
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <inttypes.h>
21 #include <rpc/pmap_clnt.h>
22 #include <string.h>
23 #include <signal.h>
24 #include <memory.h>
25 #include <netdb.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <netinet/in.h>
29 #include <netinet/ip.h>
30 #include <glib.h>
31
32 /* TCP port number to use for NFS protocol.  (Would be 2049 in standard NFS.) */
33 #define NFS_SERVICE_PORT 2051
34
35 /* Maximum size of a single RPC message that we will accept (8 MB). */
36 #define MAX_RPC_MSGSIZE (8 << 20)
37
38 int threads;
39 int completed = 0;
40
41 struct bench_file {
42     uint64_t inum;
43     uint64_t size;
44 };
45
46 GArray *bench_files;
47
48 struct rpc_reply {
49     uint32_t xid;
50     uint32_t type;
51     uint32_t stat;
52     uint32_t verf_flavor;
53     uint32_t verf_len;
54     uint32_t accept_stat;
55 };
56
57 struct rpc_fail_reply {
58     uint32_t xid;
59     uint32_t type;
60     uint32_t stat;
61     uint32_t verf_flavor;
62     uint32_t verf_len;
63     uint32_t accept_stat;
64 };
65
66 struct rpc_call_header {
67     uint32_t xid;
68     uint32_t mtype;
69     uint32_t rpcvers;
70     uint32_t prog;
71     uint32_t vers;
72     uint32_t proc;
73 };
74
75 struct rpc_auth {
76     uint32_t flavor;
77     uint32_t len;
78 };
79
80 typedef struct {
81     GIOChannel *channel;
82
83     /* The reassembled message, thus far. */
84     GString *msgbuf;
85
86     /* Remaining number of bytes in this message fragment; 0 if we next expect
87      * another fragment header. */
88     uint32_t frag_len;
89
90     /* If frag_len is zero: the number of bytes of the fragment header that
91      * have been read so far. */
92     int frag_hdr_bytes;
93
94     /* Mapping of XID values to outstanding RPC calls. */
95     GHashTable *xid_table;
96 } NFSConnection;
97
98
99 typedef void (*NFSFunc)(NFSConnection *nfs,
100                         gpointer user_data, const char *reply, size_t len);
101
102 typedef struct {
103     NFSFunc callback;
104     gpointer user_data;
105     int64_t start, end;
106 } CallInfo;
107
108 static GMainLoop *main_loop;
109
110 int64_t now_hires()
111 {
112     struct timespec time;
113
114     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
115         perror("clock_gettime");
116         return 0;
117     }
118
119     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
120 }
121
122 static void do_write(NFSConnection *conn, const char *buf, size_t len)
123 {
124     while (len > 0) {
125         gsize written = 0;
126         switch (g_io_channel_write_chars(conn->channel, buf, len,
127                                          &written, NULL)) {
128         case G_IO_STATUS_ERROR:
129         case G_IO_STATUS_EOF:
130         case G_IO_STATUS_AGAIN:
131             fprintf(stderr, "Error writing to socket!\n");
132             return;
133         case G_IO_STATUS_NORMAL:
134             len -= written;
135             buf += written;
136             break;
137         }
138     }
139 }
140
141 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
142                      NFSFunc completion_handler, gpointer user_data)
143 {
144     static int xid_count = 0;
145     struct rpc_call_header header;
146     struct rpc_auth auth;
147
148     header.xid = GUINT32_TO_BE(xid_count++);
149     header.mtype = 0;
150     header.rpcvers = GUINT32_TO_BE(2);
151     header.prog = GUINT32_TO_BE(NFS_PROGRAM);
152     header.vers = GUINT32_TO_BE(NFS_V3);
153     header.proc = GUINT32_TO_BE(proc);
154
155     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
156     auth.len = 0;
157
158     CallInfo *info = g_new0(CallInfo, 1);
159
160     uint32_t fragment = htonl(0x80000000
161                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
162     do_write(nfs, (const char *)&fragment, sizeof(fragment));
163     do_write(nfs, (const char *)&header, sizeof(header));
164     do_write(nfs, (const char *)&auth, sizeof(auth));
165     do_write(nfs, (const char *)&auth, sizeof(auth));
166     do_write(nfs, msg->str, msg->len);
167     g_io_channel_flush(nfs->channel, NULL);
168
169     info->start = now_hires();
170     info->callback = completion_handler;
171     info->user_data = user_data;
172     g_hash_table_insert(nfs->xid_table,
173                         GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
174 }
175
176 static void process_reply(NFSConnection *nfs, GString *msg)
177 {
178     struct rpc_reply *reply = (struct rpc_reply *)msg->str;
179
180     uint32_t xid = GUINT32_FROM_BE(reply->xid);
181
182     gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
183     CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
184     if (info == NULL) {
185         g_print("Could not match reply XID %d with a call!\n", xid);
186         return;
187     }
188
189     info->end = now_hires();
190     printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
191     if (info->callback != NULL)
192         info->callback(nfs, info->user_data,
193                        msg->str + sizeof(*reply), msg->len - sizeof(*reply));
194
195     g_hash_table_remove(nfs->xid_table, key);
196     g_free(info);
197
198     completed++;
199     if (completed == 128 * threads) {
200         g_main_loop_quit(main_loop);
201     }
202 }
203
204 static gboolean read_handler(GIOChannel *channel,
205                              GIOCondition condition,
206                              gpointer data)
207 {
208     NFSConnection *nfs = (NFSConnection *)data;
209
210     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
211
212     /* If we have not yet read in the fragment header, do that first.  This is
213      * 4 bytes that indicates the number of bytes in the message to follow
214      * (with the high bit set if this is the last fragment making up the
215      * message). */
216     if (nfs->frag_len == 0) {
217         bytes_to_read = 4 - nfs->frag_hdr_bytes;
218     } else {
219         bytes_to_read = nfs->frag_len & 0x7fffffff;
220     }
221
222     if (bytes_to_read > MAX_RPC_MSGSIZE
223         || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
224     {
225         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
226                 bytes_to_read);
227         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
228         return FALSE;
229     }
230
231     gsize bytes_read = 0;
232     g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
233     char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
234     switch (g_io_channel_read_chars(nfs->channel, buf,
235                                     bytes_to_read, &bytes_read, NULL)) {
236     case G_IO_STATUS_NORMAL:
237         break;
238     case G_IO_STATUS_AGAIN:
239         return TRUE;
240     case G_IO_STATUS_EOF:
241         if (bytes_read == bytes_to_read)
242             break;
243         /* else fall through */
244     case G_IO_STATUS_ERROR:
245         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
246                 g_io_channel_unix_get_fd(nfs->channel));
247         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
248         /* TODO: Clean up connection object. */
249         return FALSE;
250     }
251
252     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
253
254     g_string_set_size(nfs->msgbuf,
255                       nfs->msgbuf->len - (bytes_to_read - bytes_read));
256
257     if (nfs->frag_len == 0) {
258         /* Handle reading in the fragment header.  If we've read the complete
259          * header, store the fragment size. */
260         nfs->frag_hdr_bytes += bytes_read;
261         if (nfs->frag_hdr_bytes == 4) {
262             memcpy((char *)&nfs->frag_len,
263                    &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
264             nfs->frag_len = ntohl(nfs->frag_len);
265             g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
266             nfs->frag_hdr_bytes = 0;
267         }
268     } else {
269         /* We were reading in the fragment body. */
270         nfs->frag_len -= bytes_read;
271
272         if (nfs->frag_len == 0x80000000) {
273             process_reply(nfs, nfs->msgbuf);
274             nfs->frag_len = 0;
275             g_string_set_size(nfs->msgbuf, 0);
276         }
277     }
278
279     return TRUE;
280 }
281
282 static void send_read_request(NFSConnection *nfs, uint64_t inum,
283                                uint64_t offset, uint64_t len);
284
285 static void finish_read_request(NFSConnection *nfs, gpointer user_data,
286                                 const char *reply, size_t len)
287 {
288     printf("Done reading inode %d\n", GPOINTER_TO_INT(user_data));
289
290     struct bench_file *bf;
291     bf = &g_array_index(bench_files, struct bench_file,
292                         g_random_int_range(0, bench_files->len));
293     send_read_request(nfs, bf->inum, 0, 1048576);
294 }
295
296 static void send_read_request(NFSConnection *nfs, uint64_t inum,
297                                uint64_t offset, uint64_t len)
298 {
299     struct nfs_fh3 fh;
300     uint64_t fhdata = GUINT64_TO_BE(inum);
301     fh.data.data_val = (char *)&fhdata;
302     fh.data.data_len = 8;
303     int i;
304
305     char buf[64];
306     struct read3args read;
307     memcpy(&read.file, &fh, sizeof(struct nfs_fh3));
308     read.offset = offset;
309     read.count = len;
310
311     GString *str = g_string_new("");
312     XDR xdr;
313     xdr_string_create(&xdr, str, XDR_ENCODE);
314     xdr_read3args(&xdr, &read);
315     send_rpc(nfs, NFSPROC3_READ, str, finish_read_request,
316              GINT_TO_POINTER((int)inum));
317     g_string_free(str, TRUE);
318 }
319
320 static gboolean idle_handler(gpointer data)
321 {
322     NFSConnection *nfs = (NFSConnection *)data;
323     int i;
324
325     for (i = 0; i < threads; i++) {
326         struct bench_file *bf;
327         bf = &g_array_index(bench_files, struct bench_file,
328                             g_random_int_range(0, bench_files->len));
329         send_read_request(nfs, bf->inum, 0, 1048576);
330     }
331
332 #if 0
333     g_print("Sending requests...\n");
334     for (i = 0; i < threads; i++) {
335         char buf[64];
336         struct diropargs3 lookup;
337         uint64_t rootfh = GUINT64_TO_BE(1);
338
339         sprintf(buf, "file-%d", i + 1);
340         lookup.dir.data.data_len = 8;
341         lookup.dir.data.data_val = (char *)&rootfh;
342         lookup.name = buf;
343
344         GString *str = g_string_new("");
345         XDR xdr;
346         xdr_string_create(&xdr, str, XDR_ENCODE);
347         xdr_diropargs3(&xdr, &lookup);
348         send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
349         g_string_free(str, TRUE);
350     }
351 #endif
352
353     return FALSE;
354 }
355
356 NFSConnection *nfs_connect(const char *hostname)
357 {
358     int result;
359     struct addrinfo hints;
360     struct addrinfo *ai = NULL;
361
362     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
363     if (fd < 0) {
364         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
365         exit(1);
366     }
367
368     memset(&hints, 0, sizeof(hints));
369     hints.ai_family = AF_INET;
370     hints.ai_socktype = SOCK_STREAM;
371     hints.ai_protocol = IPPROTO_TCP;
372     result = getaddrinfo(hostname, "2051", NULL, &ai);
373     if (result < 0 || ai == NULL) {
374         fprintf(stderr, "Hostname lookup failure for %s: %s\n",
375                 hostname, gai_strerror(result));
376         exit(1);
377     }
378
379     if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
380         fprintf(stderr, "Unable to connect to : %m\n");
381     }
382
383     freeaddrinfo(ai);
384
385     NFSConnection *conn = g_new0(NFSConnection, 1);
386     conn->msgbuf = g_string_new("");
387     conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
388
389     conn->channel = g_io_channel_unix_new(fd);
390     g_io_channel_set_encoding(conn->channel, NULL, NULL);
391     g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
392     g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
393
394     g_idle_add(idle_handler, conn);
395
396     return conn;
397 }
398
399 int main(int argc, char *argv[])
400 {
401     g_thread_init(NULL);
402     g_set_prgname("synclient");
403     g_print("Launching synthetic NFS RPC client...\n");
404
405     bench_files = g_array_new(FALSE, TRUE, sizeof(struct bench_file));
406
407     FILE *inodes = fopen(argv[1], "r");
408     if (inodes == NULL) {
409         perror("fopen");
410         return 1;
411     }
412     while (!feof(inodes)) {
413         int i1 = -1, i2 = -1;
414         fscanf(inodes, "%d %d", &i1, &i2);
415         if (i1 < 0 || i2 < 0)
416             continue;
417
418         struct bench_file bf;
419         bf.inum = i1;
420         bf.size = i2;
421         g_array_append_val(bench_files, bf);
422     }
423
424     threads = 8;
425     if (argc > 2)
426         threads = atoi(argv[2]);
427
428     main_loop = g_main_loop_new(NULL, FALSE);
429     nfs_connect("niniel.sysnet.ucsd.edu");
430
431     g_main_loop_run(main_loop);
432
433     return 0;
434 }