Work on a client for generating a synthetic stream of NFS operations.
[bluesky.git] / nfs3 / synclient.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10  * requests and reading back the responses, so that we can exercise the server
11  * differently than the Linux kernel NFS client does.
12  *
13  * Much of this is copied from rpc.c and other BlueSky server code but is
14  * designed to run independently of BlueSky. */
15
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <rpc/pmap_clnt.h>
21 #include <string.h>
22 #include <signal.h>
23 #include <memory.h>
24 #include <netdb.h>
25 #include <sys/socket.h>
26 #include <sys/types.h>
27 #include <netinet/in.h>
28 #include <netinet/ip.h>
29 #include <glib.h>
30
31 /* TCP port number to use for NFS protocol.  (Should be 2049.) */
32 #define NFS_SERVICE_PORT 2051
33
34 /* Maximum size of a single RPC message that we will accept (8 MB). */
35 #define MAX_RPC_MSGSIZE (8 << 20)
36
37 struct rpc_reply {
38     uint32_t xid;
39     uint32_t type;
40     uint32_t stat;
41     uint32_t verf_flavor;
42     uint32_t verf_len;
43     uint32_t accept_stat;
44 };
45
46 struct rpc_fail_reply {
47     uint32_t xid;
48     uint32_t type;
49     uint32_t stat;
50     uint32_t verf_flavor;
51     uint32_t verf_len;
52     uint32_t accept_stat;
53 };
54
55 struct rpc_call_header {
56     uint32_t xid;
57     uint32_t mtype;
58     uint32_t rpcvers;
59     uint32_t prog;
60     uint32_t vers;
61     uint32_t proc;
62 };
63
64 struct rpc_auth {
65     uint32_t flavor;
66     uint32_t len;
67 };
68
69 typedef struct {
70     GIOChannel *channel;
71
72     /* The reassembled message, thus far. */
73     GString *msgbuf;
74
75     /* Remaining number of bytes in this message fragment; 0 if we next expect
76      * another fragment header. */
77     uint32_t frag_len;
78
79     /* If frag_len is zero: the number of bytes of the fragment header that
80      * have been read so far. */
81     int frag_hdr_bytes;
82 } NFSConnection;
83
84 static GMainLoop *main_loop;
85
86 static void do_write(NFSConnection *conn, const char *buf, size_t len)
87 {
88     while (len > 0) {
89         gsize written = 0;
90         switch (g_io_channel_write_chars(conn->channel, buf, len,
91                                          &written, NULL)) {
92         case G_IO_STATUS_ERROR:
93         case G_IO_STATUS_EOF:
94         case G_IO_STATUS_AGAIN:
95             fprintf(stderr, "Error writing to socket!\n");
96             return;
97         case G_IO_STATUS_NORMAL:
98             len -= written;
99             buf += written;
100             break;
101         }
102     }
103 }
104
105 static void send_rpc(NFSConnection *nfs, int proc, GString *msg)
106 {
107     static int xid_count = 0;
108     struct rpc_call_header header;
109     struct rpc_auth auth;
110
111     header.xid = GUINT32_TO_BE(xid_count++);
112     header.mtype = 0;
113     header.rpcvers = GUINT32_TO_BE(2);
114     header.prog = GUINT32_TO_BE(NFS_PROGRAM);
115     header.vers = GUINT32_TO_BE(NFS_V3);
116     header.proc = GUINT32_TO_BE(proc);
117
118     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
119     auth.len = 0;
120
121     uint32_t fragment = htonl(0x80000000
122                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
123     do_write(nfs, (const char *)&fragment, sizeof(fragment));
124     do_write(nfs, (const char *)&header, sizeof(header));
125     do_write(nfs, (const char *)&auth, sizeof(auth));
126     do_write(nfs, (const char *)&auth, sizeof(auth));
127     do_write(nfs, msg->str, msg->len);
128     g_io_channel_flush(nfs->channel, NULL);
129 }
130
131 static gboolean read_handler(GIOChannel *channel,
132                              GIOCondition condition,
133                              gpointer data)
134 {
135     NFSConnection *nfs = (NFSConnection *)data;
136
137     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
138
139     /* If we have not yet read in the fragment header, do that first.  This is
140      * 4 bytes that indicates the number of bytes in the message to follow
141      * (with the high bit set if this is the last fragment making up the
142      * message). */
143     if (nfs->frag_len == 0) {
144         bytes_to_read = 4 - nfs->frag_hdr_bytes;
145     } else {
146         bytes_to_read = nfs->frag_len & 0x7fffffff;
147     }
148
149     if (bytes_to_read > MAX_RPC_MSGSIZE
150         || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
151     {
152         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
153                 bytes_to_read);
154         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
155         return FALSE;
156     }
157
158     gsize bytes_read = 0;
159     g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
160     char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
161     switch (g_io_channel_read_chars(nfs->channel, buf,
162                                     bytes_to_read, &bytes_read, NULL)) {
163     case G_IO_STATUS_NORMAL:
164         break;
165     case G_IO_STATUS_AGAIN:
166         return TRUE;
167     case G_IO_STATUS_EOF:
168         if (bytes_read == bytes_to_read)
169             break;
170         /* else fall through */
171     case G_IO_STATUS_ERROR:
172         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
173                 g_io_channel_unix_get_fd(nfs->channel));
174         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
175         /* TODO: Clean up connection object. */
176         return FALSE;
177     }
178
179     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
180
181     g_string_set_size(nfs->msgbuf,
182                       nfs->msgbuf->len - (bytes_to_read - bytes_read));
183
184     if (nfs->frag_len == 0) {
185         /* Handle reading in the fragment header.  If we've read the complete
186          * header, store the fragment size. */
187         nfs->frag_hdr_bytes += bytes_read;
188         if (nfs->frag_hdr_bytes == 4) {
189             memcpy((char *)&nfs->frag_len,
190                    &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
191             nfs->frag_len = ntohl(nfs->frag_len);
192             g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
193             nfs->frag_hdr_bytes = 0;
194         }
195     } else {
196         /* We were reading in the fragment body. */
197         nfs->frag_len -= bytes_read;
198
199         if (nfs->frag_len = 0x80000000) {
200             g_print("Got a message of size %zd\n", nfs->msgbuf->len);
201             nfs->frag_len = 0;
202             g_string_set_size(nfs->msgbuf, 0);
203         }
204     }
205
206     return TRUE;
207 }
208
209 static gboolean idle_handler(gpointer data)
210 {
211     NFSConnection *nfs = (NFSConnection *)data;
212     int i;
213
214     g_print("Sending requests...\n");
215     for (i = 0; i < 8; i++) {
216         char buf[64];
217         struct diropargs3 lookup;
218         uint64_t rootfh = GUINT64_TO_BE(1);
219
220         sprintf(buf, "file-%d", i + 1);
221         lookup.dir.data.data_len = 8;
222         lookup.dir.data.data_val = (char *)&rootfh;
223         lookup.name = buf;
224
225         GString *str = g_string_new("");
226         XDR xdr;
227         xdr_string_create(&xdr, str, XDR_ENCODE);
228         xdr_diropargs3(&xdr, &lookup);
229         send_rpc(nfs, NFSPROC3_LOOKUP, str);
230         g_string_free(str, TRUE);
231     }
232
233     return FALSE;
234 }
235
236 NFSConnection *nfs_connect(const char *hostname)
237 {
238     int result;
239     struct addrinfo hints;
240     struct addrinfo *ai = NULL;
241
242     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
243     if (fd < 0) {
244         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
245         exit(1);
246     }
247
248     memset(&hints, 0, sizeof(hints));
249     hints.ai_family = AF_INET;
250     hints.ai_socktype = SOCK_STREAM;
251     hints.ai_protocol = IPPROTO_TCP;
252     result = getaddrinfo(hostname, "2051", NULL, &ai);
253     if (result < 0 || ai == NULL) {
254         fprintf(stderr, "Hostname lookup failure for %s: %s\n",
255                 hostname, gai_strerror(result));
256         exit(1);
257     }
258
259     if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
260         fprintf(stderr, "Unable to connect to : %m\n");
261     }
262
263     freeaddrinfo(ai);
264
265     NFSConnection *conn = g_new0(NFSConnection, 1);
266     conn->msgbuf = g_string_new("");
267
268     conn->channel = g_io_channel_unix_new(fd);
269     g_io_channel_set_encoding(conn->channel, NULL, NULL);
270     g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
271     g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
272
273     g_idle_add(idle_handler, conn);
274
275     return conn;
276 }
277
278 int main(int argc, char *argv[])
279 {
280     g_thread_init(NULL);
281     g_set_prgname("synclient");
282     g_print("Launching synthetic NFS RPC client...\n");
283
284     main_loop = g_main_loop_new(NULL, FALSE);
285     nfs_connect("niniel.sysnet.ucsd.edu");
286
287     g_main_loop_run(main_loop);
288
289     return 0;
290 }