Update to a warning message to provide more debugging info
[bluesky.git] / nfs3 / synclient.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10  * requests and reading back the responses, so that we can exercise the server
11  * differently than the Linux kernel NFS client does.
12  *
13  * Much of this is copied from rpc.c and other BlueSky server code but is
14  * designed to run independently of BlueSky. */
15
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <inttypes.h>
21 #include <rpc/pmap_clnt.h>
22 #include <string.h>
23 #include <signal.h>
24 #include <memory.h>
25 #include <netdb.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <netinet/in.h>
29 #include <netinet/ip.h>
30 #include <glib.h>
31
32 /* TCP port number to use for NFS protocol.  (Should be 2049.) */
33 #define NFS_SERVICE_PORT 2051
34
35 /* Maximum size of a single RPC message that we will accept (8 MB). */
36 #define MAX_RPC_MSGSIZE (8 << 20)
37
38 int threads;
39 int completed = 0;
40
41 struct rpc_reply {
42     uint32_t xid;
43     uint32_t type;
44     uint32_t stat;
45     uint32_t verf_flavor;
46     uint32_t verf_len;
47     uint32_t accept_stat;
48 };
49
50 struct rpc_fail_reply {
51     uint32_t xid;
52     uint32_t type;
53     uint32_t stat;
54     uint32_t verf_flavor;
55     uint32_t verf_len;
56     uint32_t accept_stat;
57 };
58
59 struct rpc_call_header {
60     uint32_t xid;
61     uint32_t mtype;
62     uint32_t rpcvers;
63     uint32_t prog;
64     uint32_t vers;
65     uint32_t proc;
66 };
67
68 struct rpc_auth {
69     uint32_t flavor;
70     uint32_t len;
71 };
72
73 typedef struct {
74     GIOChannel *channel;
75
76     /* The reassembled message, thus far. */
77     GString *msgbuf;
78
79     /* Remaining number of bytes in this message fragment; 0 if we next expect
80      * another fragment header. */
81     uint32_t frag_len;
82
83     /* If frag_len is zero: the number of bytes of the fragment header that
84      * have been read so far. */
85     int frag_hdr_bytes;
86
87     /* Mapping of XID values to outstanding RPC calls. */
88     GHashTable *xid_table;
89 } NFSConnection;
90
91
92 typedef void (*NFSFunc)(NFSConnection *nfs,
93                         gpointer user_data, const char *reply, size_t len);
94
95 typedef struct {
96     NFSFunc callback;
97     gpointer user_data;
98     int64_t start, end;
99 } CallInfo;
100
101 static GMainLoop *main_loop;
102
103 int64_t now_hires()
104 {
105     struct timespec time;
106
107     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
108         perror("clock_gettime");
109         return 0;
110     }
111
112     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
113 }
114
115 static void do_write(NFSConnection *conn, const char *buf, size_t len)
116 {
117     while (len > 0) {
118         gsize written = 0;
119         switch (g_io_channel_write_chars(conn->channel, buf, len,
120                                          &written, NULL)) {
121         case G_IO_STATUS_ERROR:
122         case G_IO_STATUS_EOF:
123         case G_IO_STATUS_AGAIN:
124             fprintf(stderr, "Error writing to socket!\n");
125             return;
126         case G_IO_STATUS_NORMAL:
127             len -= written;
128             buf += written;
129             break;
130         }
131     }
132 }
133
134 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
135                      NFSFunc completion_handler, gpointer user_data)
136 {
137     static int xid_count = 0;
138     struct rpc_call_header header;
139     struct rpc_auth auth;
140
141     header.xid = GUINT32_TO_BE(xid_count++);
142     header.mtype = 0;
143     header.rpcvers = GUINT32_TO_BE(2);
144     header.prog = GUINT32_TO_BE(NFS_PROGRAM);
145     header.vers = GUINT32_TO_BE(NFS_V3);
146     header.proc = GUINT32_TO_BE(proc);
147
148     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
149     auth.len = 0;
150
151     CallInfo *info = g_new0(CallInfo, 1);
152
153     uint32_t fragment = htonl(0x80000000
154                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
155     do_write(nfs, (const char *)&fragment, sizeof(fragment));
156     do_write(nfs, (const char *)&header, sizeof(header));
157     do_write(nfs, (const char *)&auth, sizeof(auth));
158     do_write(nfs, (const char *)&auth, sizeof(auth));
159     do_write(nfs, msg->str, msg->len);
160     g_io_channel_flush(nfs->channel, NULL);
161
162     info->start = now_hires();
163     info->callback = completion_handler;
164     info->user_data = user_data;
165     g_hash_table_insert(nfs->xid_table,
166                         GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
167 }
168
169 static void process_reply(NFSConnection *nfs, GString *msg)
170 {
171     struct rpc_reply *reply = (struct rpc_reply *)msg->str;
172
173     uint32_t xid = GUINT32_FROM_BE(reply->xid);
174
175     gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
176     CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
177     if (info == NULL) {
178         g_print("Could not match reply XID %d with a call!\n", xid);
179         return;
180     }
181
182     info->end = now_hires();
183     printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
184     if (info->callback != NULL)
185         info->callback(nfs, info->user_data,
186                        msg->str + sizeof(*reply), msg->len - sizeof(*reply));
187
188     g_hash_table_remove(nfs->xid_table, key);
189     g_free(info);
190
191     completed++;
192     if (completed == 5 * threads) {
193         g_main_loop_quit(main_loop);
194     }
195 }
196
197 static gboolean read_handler(GIOChannel *channel,
198                              GIOCondition condition,
199                              gpointer data)
200 {
201     NFSConnection *nfs = (NFSConnection *)data;
202
203     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
204
205     /* If we have not yet read in the fragment header, do that first.  This is
206      * 4 bytes that indicates the number of bytes in the message to follow
207      * (with the high bit set if this is the last fragment making up the
208      * message). */
209     if (nfs->frag_len == 0) {
210         bytes_to_read = 4 - nfs->frag_hdr_bytes;
211     } else {
212         bytes_to_read = nfs->frag_len & 0x7fffffff;
213     }
214
215     if (bytes_to_read > MAX_RPC_MSGSIZE
216         || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
217     {
218         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
219                 bytes_to_read);
220         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
221         return FALSE;
222     }
223
224     gsize bytes_read = 0;
225     g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
226     char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
227     switch (g_io_channel_read_chars(nfs->channel, buf,
228                                     bytes_to_read, &bytes_read, NULL)) {
229     case G_IO_STATUS_NORMAL:
230         break;
231     case G_IO_STATUS_AGAIN:
232         return TRUE;
233     case G_IO_STATUS_EOF:
234         if (bytes_read == bytes_to_read)
235             break;
236         /* else fall through */
237     case G_IO_STATUS_ERROR:
238         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
239                 g_io_channel_unix_get_fd(nfs->channel));
240         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
241         /* TODO: Clean up connection object. */
242         return FALSE;
243     }
244
245     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
246
247     g_string_set_size(nfs->msgbuf,
248                       nfs->msgbuf->len - (bytes_to_read - bytes_read));
249
250     if (nfs->frag_len == 0) {
251         /* Handle reading in the fragment header.  If we've read the complete
252          * header, store the fragment size. */
253         nfs->frag_hdr_bytes += bytes_read;
254         if (nfs->frag_hdr_bytes == 4) {
255             memcpy((char *)&nfs->frag_len,
256                    &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
257             nfs->frag_len = ntohl(nfs->frag_len);
258             g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
259             nfs->frag_hdr_bytes = 0;
260         }
261     } else {
262         /* We were reading in the fragment body. */
263         nfs->frag_len -= bytes_read;
264
265         if (nfs->frag_len == 0x80000000) {
266             process_reply(nfs, nfs->msgbuf);
267             nfs->frag_len = 0;
268             g_string_set_size(nfs->msgbuf, 0);
269         }
270     }
271
272     return TRUE;
273 }
274
275 static void send_read_requests(NFSConnection *nfs, const struct nfs_fh3 *fh)
276 {
277     int i;
278
279     g_print("Sending read requests...\n");
280     for (i = 0; i < 4; i++) {
281         char buf[64];
282         struct read3args read;
283         memcpy(&read.file, fh, sizeof(struct nfs_fh3));
284         read.offset = (1 << 20) * i;
285         read.count = (1 << 20);
286
287         GString *str = g_string_new("");
288         XDR xdr;
289         xdr_string_create(&xdr, str, XDR_ENCODE);
290         xdr_read3args(&xdr, &read);
291         send_rpc(nfs, NFSPROC3_READ, str, NULL, NULL);
292         g_string_free(str, TRUE);
293     }
294 }
295
296 static void store_fh(NFSConnection *nfs, gpointer user_data,
297                      const char *reply, size_t len)
298 {
299     struct lookup3res res;
300     XDR xdr;
301     memset(&res, 0, sizeof(res));
302     xdrmem_create(&xdr, (char *)reply, len, XDR_DECODE);
303     if (!xdr_lookup3res(&xdr, &res)) {
304         g_print("Decode error for lookup3res!\n");
305         return;
306     }
307     if (res.status != NFS3_OK) {
308         g_print("Response not NFS3_OK\n");
309         return;
310     }
311
312     struct nfs_fh3 *fh = g_new0(struct nfs_fh3, 1);
313     fh->data.data_len = res.lookup3res_u.resok.object.data.data_len;
314     fh->data.data_val = g_memdup(res.lookup3res_u.resok.object.data.data_val,
315                                  fh->data.data_len);
316
317     xdr.x_op = XDR_FREE;
318     xdr_lookup3res(&xdr, &res);
319
320     send_read_requests(nfs, fh);
321 }
322
323 static gboolean idle_handler(gpointer data)
324 {
325     NFSConnection *nfs = (NFSConnection *)data;
326     int i;
327
328     g_print("Sending requests...\n");
329     for (i = 0; i < threads; i++) {
330         char buf[64];
331         struct diropargs3 lookup;
332         uint64_t rootfh = GUINT64_TO_BE(1);
333
334         sprintf(buf, "file-%d", i + 1);
335         lookup.dir.data.data_len = 8;
336         lookup.dir.data.data_val = (char *)&rootfh;
337         lookup.name = buf;
338
339         GString *str = g_string_new("");
340         XDR xdr;
341         xdr_string_create(&xdr, str, XDR_ENCODE);
342         xdr_diropargs3(&xdr, &lookup);
343         send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
344         g_string_free(str, TRUE);
345     }
346
347     return FALSE;
348 }
349
350 NFSConnection *nfs_connect(const char *hostname)
351 {
352     int result;
353     struct addrinfo hints;
354     struct addrinfo *ai = NULL;
355
356     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
357     if (fd < 0) {
358         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
359         exit(1);
360     }
361
362     memset(&hints, 0, sizeof(hints));
363     hints.ai_family = AF_INET;
364     hints.ai_socktype = SOCK_STREAM;
365     hints.ai_protocol = IPPROTO_TCP;
366     result = getaddrinfo(hostname, "2051", NULL, &ai);
367     if (result < 0 || ai == NULL) {
368         fprintf(stderr, "Hostname lookup failure for %s: %s\n",
369                 hostname, gai_strerror(result));
370         exit(1);
371     }
372
373     if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
374         fprintf(stderr, "Unable to connect to : %m\n");
375     }
376
377     freeaddrinfo(ai);
378
379     NFSConnection *conn = g_new0(NFSConnection, 1);
380     conn->msgbuf = g_string_new("");
381     conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
382
383     conn->channel = g_io_channel_unix_new(fd);
384     g_io_channel_set_encoding(conn->channel, NULL, NULL);
385     g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
386     g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
387
388     g_idle_add(idle_handler, conn);
389
390     return conn;
391 }
392
393 int main(int argc, char *argv[])
394 {
395     g_thread_init(NULL);
396     g_set_prgname("synclient");
397     g_print("Launching synthetic NFS RPC client...\n");
398
399     threads = 8;
400     if (argc > 1)
401         threads = atoi(argv[1]);
402
403     main_loop = g_main_loop_new(NULL, FALSE);
404     nfs_connect("niniel.sysnet.ucsd.edu");
405
406     g_main_loop_run(main_loop);
407
408     return 0;
409 }