f4d5ad26c8b0463fe96fde99c8e4d0ea04618b18
[bluesky.git] / nfs3 / synreadbench.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10  * requests and reading back the responses, so that we can exercise the server
11  * differently than the Linux kernel NFS client does.
12  *
13  * Much of this is copied from rpc.c and other BlueSky server code but is
14  * designed to run independently of BlueSky. */
15
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <inttypes.h>
21 #include <rpc/pmap_clnt.h>
22 #include <string.h>
23 #include <signal.h>
24 #include <memory.h>
25 #include <netdb.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <netinet/in.h>
29 #include <netinet/ip.h>
30 #include <glib.h>
31
32 /* TCP port number to use for NFS protocol.  (Would be 2049 in standard NFS.) */
33 #define NFS_SERVICE_PORT 2051
34
35 /* Maximum size of a single RPC message that we will accept (8 MB). */
36 #define MAX_RPC_MSGSIZE (8 << 20)
37
38 int threads;
39 int completed = 0;
40 int read_size = 32768;
41
42 struct bench_file {
43     uint64_t inum;
44     uint64_t size;
45 };
46
47 GArray *bench_files;
48
49 struct rpc_reply {
50     uint32_t xid;
51     uint32_t type;
52     uint32_t stat;
53     uint32_t verf_flavor;
54     uint32_t verf_len;
55     uint32_t accept_stat;
56 };
57
58 struct rpc_fail_reply {
59     uint32_t xid;
60     uint32_t type;
61     uint32_t stat;
62     uint32_t verf_flavor;
63     uint32_t verf_len;
64     uint32_t accept_stat;
65 };
66
67 struct rpc_call_header {
68     uint32_t xid;
69     uint32_t mtype;
70     uint32_t rpcvers;
71     uint32_t prog;
72     uint32_t vers;
73     uint32_t proc;
74 };
75
76 struct rpc_auth {
77     uint32_t flavor;
78     uint32_t len;
79 };
80
81 typedef struct {
82     GIOChannel *channel;
83
84     /* The reassembled message, thus far. */
85     GString *msgbuf;
86
87     /* Remaining number of bytes in this message fragment; 0 if we next expect
88      * another fragment header. */
89     uint32_t frag_len;
90
91     /* If frag_len is zero: the number of bytes of the fragment header that
92      * have been read so far. */
93     int frag_hdr_bytes;
94
95     /* Mapping of XID values to outstanding RPC calls. */
96     GHashTable *xid_table;
97 } NFSConnection;
98
99
100 typedef void (*NFSFunc)(NFSConnection *nfs,
101                         gpointer user_data, const char *reply, size_t len);
102
103 typedef struct {
104     NFSFunc callback;
105     gpointer user_data;
106     int64_t start, end;
107 } CallInfo;
108
109 static GMainLoop *main_loop;
110
111 int64_t now_hires()
112 {
113     struct timespec time;
114
115     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
116         perror("clock_gettime");
117         return 0;
118     }
119
120     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
121 }
122
123 static void do_write(NFSConnection *conn, const char *buf, size_t len)
124 {
125     while (len > 0) {
126         gsize written = 0;
127         switch (g_io_channel_write_chars(conn->channel, buf, len,
128                                          &written, NULL)) {
129         case G_IO_STATUS_ERROR:
130         case G_IO_STATUS_EOF:
131         case G_IO_STATUS_AGAIN:
132             fprintf(stderr, "Error writing to socket!\n");
133             return;
134         case G_IO_STATUS_NORMAL:
135             len -= written;
136             buf += written;
137             break;
138         }
139     }
140 }
141
142 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
143                      NFSFunc completion_handler, gpointer user_data)
144 {
145     static int xid_count = 0;
146     struct rpc_call_header header;
147     struct rpc_auth auth;
148
149     header.xid = GUINT32_TO_BE(xid_count++);
150     header.mtype = 0;
151     header.rpcvers = GUINT32_TO_BE(2);
152     header.prog = GUINT32_TO_BE(NFS_PROGRAM);
153     header.vers = GUINT32_TO_BE(NFS_V3);
154     header.proc = GUINT32_TO_BE(proc);
155
156     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
157     auth.len = 0;
158
159     CallInfo *info = g_new0(CallInfo, 1);
160
161     uint32_t fragment = htonl(0x80000000
162                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
163     do_write(nfs, (const char *)&fragment, sizeof(fragment));
164     do_write(nfs, (const char *)&header, sizeof(header));
165     do_write(nfs, (const char *)&auth, sizeof(auth));
166     do_write(nfs, (const char *)&auth, sizeof(auth));
167     do_write(nfs, msg->str, msg->len);
168     g_io_channel_flush(nfs->channel, NULL);
169
170     info->start = now_hires();
171     info->callback = completion_handler;
172     info->user_data = user_data;
173     g_hash_table_insert(nfs->xid_table,
174                         GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
175 }
176
177 static void process_reply(NFSConnection *nfs, GString *msg)
178 {
179     struct rpc_reply *reply = (struct rpc_reply *)msg->str;
180
181     uint32_t xid = GUINT32_FROM_BE(reply->xid);
182
183     gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
184     CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
185     if (info == NULL) {
186         g_print("Could not match reply XID %d with a call!\n", xid);
187         return;
188     }
189
190     info->end = now_hires();
191     printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
192     if (info->callback != NULL)
193         info->callback(nfs, info->user_data,
194                        msg->str + sizeof(*reply), msg->len - sizeof(*reply));
195
196     g_hash_table_remove(nfs->xid_table, key);
197     g_free(info);
198
199     completed++;
200     if (completed == 128 * threads) {
201         g_main_loop_quit(main_loop);
202     }
203 }
204
205 static gboolean read_handler(GIOChannel *channel,
206                              GIOCondition condition,
207                              gpointer data)
208 {
209     NFSConnection *nfs = (NFSConnection *)data;
210
211     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
212
213     /* If we have not yet read in the fragment header, do that first.  This is
214      * 4 bytes that indicates the number of bytes in the message to follow
215      * (with the high bit set if this is the last fragment making up the
216      * message). */
217     if (nfs->frag_len == 0) {
218         bytes_to_read = 4 - nfs->frag_hdr_bytes;
219     } else {
220         bytes_to_read = nfs->frag_len & 0x7fffffff;
221     }
222
223     if (bytes_to_read > MAX_RPC_MSGSIZE
224         || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
225     {
226         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
227                 bytes_to_read);
228         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
229         return FALSE;
230     }
231
232     gsize bytes_read = 0;
233     g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
234     char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
235     switch (g_io_channel_read_chars(nfs->channel, buf,
236                                     bytes_to_read, &bytes_read, NULL)) {
237     case G_IO_STATUS_NORMAL:
238         break;
239     case G_IO_STATUS_AGAIN:
240         return TRUE;
241     case G_IO_STATUS_EOF:
242         if (bytes_read == bytes_to_read)
243             break;
244         /* else fall through */
245     case G_IO_STATUS_ERROR:
246         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
247                 g_io_channel_unix_get_fd(nfs->channel));
248         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
249         /* TODO: Clean up connection object. */
250         return FALSE;
251     }
252
253     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
254
255     g_string_set_size(nfs->msgbuf,
256                       nfs->msgbuf->len - (bytes_to_read - bytes_read));
257
258     if (nfs->frag_len == 0) {
259         /* Handle reading in the fragment header.  If we've read the complete
260          * header, store the fragment size. */
261         nfs->frag_hdr_bytes += bytes_read;
262         if (nfs->frag_hdr_bytes == 4) {
263             memcpy((char *)&nfs->frag_len,
264                    &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
265             nfs->frag_len = ntohl(nfs->frag_len);
266             g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
267             nfs->frag_hdr_bytes = 0;
268         }
269     } else {
270         /* We were reading in the fragment body. */
271         nfs->frag_len -= bytes_read;
272
273         if (nfs->frag_len == 0x80000000) {
274             process_reply(nfs, nfs->msgbuf);
275             nfs->frag_len = 0;
276             g_string_set_size(nfs->msgbuf, 0);
277         }
278     }
279
280     return TRUE;
281 }
282
283 static void send_read_request(NFSConnection *nfs, uint64_t inum,
284                                uint64_t offset, uint64_t len);
285 static void submit_random_read(NFSConnection *nfs)
286 {
287     struct bench_file *bf;
288     bf = &g_array_index(bench_files, struct bench_file,
289                         g_random_int_range(0, bench_files->len));
290     int blocks = bf->size / read_size;
291     if (blocks == 0)
292         blocks = 1;
293
294     int offset = g_random_int_range(0, blocks);
295     send_read_request(nfs, bf->inum, offset * read_size, read_size);
296 }
297
298 static void finish_read_request(NFSConnection *nfs, gpointer user_data,
299                                 const char *reply, size_t len)
300 {
301     printf("Done reading inode %d\n", GPOINTER_TO_INT(user_data));
302
303     submit_random_read(nfs);
304 }
305
306 static void send_read_request(NFSConnection *nfs, uint64_t inum,
307                                uint64_t offset, uint64_t len)
308 {
309     struct nfs_fh3 fh;
310     uint64_t fhdata = GUINT64_TO_BE(inum);
311     fh.data.data_val = (char *)&fhdata;
312     fh.data.data_len = 8;
313     int i;
314
315     char buf[64];
316     struct read3args read;
317     memcpy(&read.file, &fh, sizeof(struct nfs_fh3));
318     read.offset = offset;
319     read.count = len;
320
321     GString *str = g_string_new("");
322     XDR xdr;
323     xdr_string_create(&xdr, str, XDR_ENCODE);
324     xdr_read3args(&xdr, &read);
325     send_rpc(nfs, NFSPROC3_READ, str, finish_read_request,
326              GINT_TO_POINTER((int)inum));
327     g_string_free(str, TRUE);
328 }
329
330 static gboolean idle_handler(gpointer data)
331 {
332     NFSConnection *nfs = (NFSConnection *)data;
333     int i;
334
335     for (i = 0; i < threads; i++) {
336         submit_random_read(nfs);
337     }
338
339 #if 0
340     g_print("Sending requests...\n");
341     for (i = 0; i < threads; i++) {
342         char buf[64];
343         struct diropargs3 lookup;
344         uint64_t rootfh = GUINT64_TO_BE(1);
345
346         sprintf(buf, "file-%d", i + 1);
347         lookup.dir.data.data_len = 8;
348         lookup.dir.data.data_val = (char *)&rootfh;
349         lookup.name = buf;
350
351         GString *str = g_string_new("");
352         XDR xdr;
353         xdr_string_create(&xdr, str, XDR_ENCODE);
354         xdr_diropargs3(&xdr, &lookup);
355         send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
356         g_string_free(str, TRUE);
357     }
358 #endif
359
360     return FALSE;
361 }
362
363 NFSConnection *nfs_connect(const char *hostname)
364 {
365     int result;
366     struct addrinfo hints;
367     struct addrinfo *ai = NULL;
368
369     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
370     if (fd < 0) {
371         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
372         exit(1);
373     }
374
375     memset(&hints, 0, sizeof(hints));
376     hints.ai_family = AF_INET;
377     hints.ai_socktype = SOCK_STREAM;
378     hints.ai_protocol = IPPROTO_TCP;
379     result = getaddrinfo(hostname, "2051", NULL, &ai);
380     if (result < 0 || ai == NULL) {
381         fprintf(stderr, "Hostname lookup failure for %s: %s\n",
382                 hostname, gai_strerror(result));
383         exit(1);
384     }
385
386     if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
387         fprintf(stderr, "Unable to connect to : %m\n");
388     }
389
390     freeaddrinfo(ai);
391
392     NFSConnection *conn = g_new0(NFSConnection, 1);
393     conn->msgbuf = g_string_new("");
394     conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
395
396     conn->channel = g_io_channel_unix_new(fd);
397     g_io_channel_set_encoding(conn->channel, NULL, NULL);
398     g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
399     g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
400
401     g_idle_add(idle_handler, conn);
402
403     return conn;
404 }
405
406 int main(int argc, char *argv[])
407 {
408     g_thread_init(NULL);
409     g_set_prgname("synclient");
410     g_print("Launching synthetic NFS RPC client...\n");
411
412     bench_files = g_array_new(FALSE, TRUE, sizeof(struct bench_file));
413
414     FILE *inodes = fopen(argv[1], "r");
415     if (inodes == NULL) {
416         perror("fopen");
417         return 1;
418     }
419     while (!feof(inodes)) {
420         int i1 = -1, i2 = -1;
421         fscanf(inodes, "%d %d", &i1, &i2);
422         if (i1 < 0 || i2 < 0)
423             continue;
424
425         struct bench_file bf;
426         bf.inum = i1;
427         bf.size = i2;
428         g_array_append_val(bench_files, bf);
429     }
430
431     threads = 8;
432     if (argc > 2)
433         threads = atoi(argv[2]);
434     if (argc > 3)
435         read_size = atoi(argv[3]);
436
437     main_loop = g_main_loop_new(NULL, FALSE);
438     nfs_connect("niniel.sysnet.ucsd.edu");
439
440     g_main_loop_run(main_loop);
441
442     return 0;
443 }