Add proper per-file copyright notices/licenses and top-level license.
[bluesky.git] / nfs3 / synclient.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Neither the name of the University nor the names of its contributors
15  *    may be used to endorse or promote products derived from this software
16  *    without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
28  * SUCH DAMAGE.
29  */
30
31 /* Synthetic client for benchmarking: a tool for directly generating NFS
32  * requests and reading back the responses, so that we can exercise the server
33  * differently than the Linux kernel NFS client does.
34  *
35  * Much of this is copied from rpc.c and other BlueSky server code but is
36  * designed to run independently of BlueSky. */
37
38 #include "mount_prot.h"
39 #include "nfs3_prot.h"
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <inttypes.h>
43 #include <rpc/pmap_clnt.h>
44 #include <string.h>
45 #include <signal.h>
46 #include <memory.h>
47 #include <netdb.h>
48 #include <sys/socket.h>
49 #include <sys/types.h>
50 #include <netinet/in.h>
51 #include <netinet/ip.h>
52 #include <glib.h>
53
54 /* TCP port number to use for NFS protocol.  (Should be 2049.) */
55 #define NFS_SERVICE_PORT 2051
56
57 /* Maximum size of a single RPC message that we will accept (8 MB). */
58 #define MAX_RPC_MSGSIZE (8 << 20)
59
60 int threads;
61 int completed = 0;
62
63 struct rpc_reply {
64     uint32_t xid;
65     uint32_t type;
66     uint32_t stat;
67     uint32_t verf_flavor;
68     uint32_t verf_len;
69     uint32_t accept_stat;
70 };
71
72 struct rpc_fail_reply {
73     uint32_t xid;
74     uint32_t type;
75     uint32_t stat;
76     uint32_t verf_flavor;
77     uint32_t verf_len;
78     uint32_t accept_stat;
79 };
80
81 struct rpc_call_header {
82     uint32_t xid;
83     uint32_t mtype;
84     uint32_t rpcvers;
85     uint32_t prog;
86     uint32_t vers;
87     uint32_t proc;
88 };
89
90 struct rpc_auth {
91     uint32_t flavor;
92     uint32_t len;
93 };
94
95 typedef struct {
96     GIOChannel *channel;
97
98     /* The reassembled message, thus far. */
99     GString *msgbuf;
100
101     /* Remaining number of bytes in this message fragment; 0 if we next expect
102      * another fragment header. */
103     uint32_t frag_len;
104
105     /* If frag_len is zero: the number of bytes of the fragment header that
106      * have been read so far. */
107     int frag_hdr_bytes;
108
109     /* Mapping of XID values to outstanding RPC calls. */
110     GHashTable *xid_table;
111 } NFSConnection;
112
113
114 typedef void (*NFSFunc)(NFSConnection *nfs,
115                         gpointer user_data, const char *reply, size_t len);
116
117 typedef struct {
118     NFSFunc callback;
119     gpointer user_data;
120     int64_t start, end;
121 } CallInfo;
122
123 static GMainLoop *main_loop;
124
125 int64_t now_hires()
126 {
127     struct timespec time;
128
129     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
130         perror("clock_gettime");
131         return 0;
132     }
133
134     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
135 }
136
137 static void do_write(NFSConnection *conn, const char *buf, size_t len)
138 {
139     while (len > 0) {
140         gsize written = 0;
141         switch (g_io_channel_write_chars(conn->channel, buf, len,
142                                          &written, NULL)) {
143         case G_IO_STATUS_ERROR:
144         case G_IO_STATUS_EOF:
145         case G_IO_STATUS_AGAIN:
146             fprintf(stderr, "Error writing to socket!\n");
147             return;
148         case G_IO_STATUS_NORMAL:
149             len -= written;
150             buf += written;
151             break;
152         }
153     }
154 }
155
156 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
157                      NFSFunc completion_handler, gpointer user_data)
158 {
159     static int xid_count = 0;
160     struct rpc_call_header header;
161     struct rpc_auth auth;
162
163     header.xid = GUINT32_TO_BE(xid_count++);
164     header.mtype = 0;
165     header.rpcvers = GUINT32_TO_BE(2);
166     header.prog = GUINT32_TO_BE(NFS_PROGRAM);
167     header.vers = GUINT32_TO_BE(NFS_V3);
168     header.proc = GUINT32_TO_BE(proc);
169
170     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
171     auth.len = 0;
172
173     CallInfo *info = g_new0(CallInfo, 1);
174
175     uint32_t fragment = htonl(0x80000000
176                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
177     do_write(nfs, (const char *)&fragment, sizeof(fragment));
178     do_write(nfs, (const char *)&header, sizeof(header));
179     do_write(nfs, (const char *)&auth, sizeof(auth));
180     do_write(nfs, (const char *)&auth, sizeof(auth));
181     do_write(nfs, msg->str, msg->len);
182     g_io_channel_flush(nfs->channel, NULL);
183
184     info->start = now_hires();
185     info->callback = completion_handler;
186     info->user_data = user_data;
187     g_hash_table_insert(nfs->xid_table,
188                         GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
189 }
190
191 static void process_reply(NFSConnection *nfs, GString *msg)
192 {
193     struct rpc_reply *reply = (struct rpc_reply *)msg->str;
194
195     uint32_t xid = GUINT32_FROM_BE(reply->xid);
196
197     gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
198     CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
199     if (info == NULL) {
200         g_print("Could not match reply XID %d with a call!\n", xid);
201         return;
202     }
203
204     info->end = now_hires();
205     printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
206     if (info->callback != NULL)
207         info->callback(nfs, info->user_data,
208                        msg->str + sizeof(*reply), msg->len - sizeof(*reply));
209
210     g_hash_table_remove(nfs->xid_table, key);
211     g_free(info);
212
213     completed++;
214     if (completed == 5 * threads) {
215         g_main_loop_quit(main_loop);
216     }
217 }
218
219 static gboolean read_handler(GIOChannel *channel,
220                              GIOCondition condition,
221                              gpointer data)
222 {
223     NFSConnection *nfs = (NFSConnection *)data;
224
225     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
226
227     /* If we have not yet read in the fragment header, do that first.  This is
228      * 4 bytes that indicates the number of bytes in the message to follow
229      * (with the high bit set if this is the last fragment making up the
230      * message). */
231     if (nfs->frag_len == 0) {
232         bytes_to_read = 4 - nfs->frag_hdr_bytes;
233     } else {
234         bytes_to_read = nfs->frag_len & 0x7fffffff;
235     }
236
237     if (bytes_to_read > MAX_RPC_MSGSIZE
238         || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
239     {
240         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
241                 bytes_to_read);
242         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
243         return FALSE;
244     }
245
246     gsize bytes_read = 0;
247     g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
248     char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
249     switch (g_io_channel_read_chars(nfs->channel, buf,
250                                     bytes_to_read, &bytes_read, NULL)) {
251     case G_IO_STATUS_NORMAL:
252         break;
253     case G_IO_STATUS_AGAIN:
254         return TRUE;
255     case G_IO_STATUS_EOF:
256         if (bytes_read == bytes_to_read)
257             break;
258         /* else fall through */
259     case G_IO_STATUS_ERROR:
260         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
261                 g_io_channel_unix_get_fd(nfs->channel));
262         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
263         /* TODO: Clean up connection object. */
264         return FALSE;
265     }
266
267     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
268
269     g_string_set_size(nfs->msgbuf,
270                       nfs->msgbuf->len - (bytes_to_read - bytes_read));
271
272     if (nfs->frag_len == 0) {
273         /* Handle reading in the fragment header.  If we've read the complete
274          * header, store the fragment size. */
275         nfs->frag_hdr_bytes += bytes_read;
276         if (nfs->frag_hdr_bytes == 4) {
277             memcpy((char *)&nfs->frag_len,
278                    &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
279             nfs->frag_len = ntohl(nfs->frag_len);
280             g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
281             nfs->frag_hdr_bytes = 0;
282         }
283     } else {
284         /* We were reading in the fragment body. */
285         nfs->frag_len -= bytes_read;
286
287         if (nfs->frag_len == 0x80000000) {
288             process_reply(nfs, nfs->msgbuf);
289             nfs->frag_len = 0;
290             g_string_set_size(nfs->msgbuf, 0);
291         }
292     }
293
294     return TRUE;
295 }
296
297 static void send_read_requests(NFSConnection *nfs, const struct nfs_fh3 *fh)
298 {
299     int i;
300
301     g_print("Sending read requests...\n");
302     for (i = 0; i < 4; i++) {
303         char buf[64];
304         struct read3args read;
305         memcpy(&read.file, fh, sizeof(struct nfs_fh3));
306         read.offset = (1 << 20) * i;
307         read.count = (1 << 20);
308
309         GString *str = g_string_new("");
310         XDR xdr;
311         xdr_string_create(&xdr, str, XDR_ENCODE);
312         xdr_read3args(&xdr, &read);
313         send_rpc(nfs, NFSPROC3_READ, str, NULL, NULL);
314         g_string_free(str, TRUE);
315     }
316 }
317
318 static void store_fh(NFSConnection *nfs, gpointer user_data,
319                      const char *reply, size_t len)
320 {
321     struct lookup3res res;
322     XDR xdr;
323     memset(&res, 0, sizeof(res));
324     xdrmem_create(&xdr, (char *)reply, len, XDR_DECODE);
325     if (!xdr_lookup3res(&xdr, &res)) {
326         g_print("Decode error for lookup3res!\n");
327         return;
328     }
329     if (res.status != NFS3_OK) {
330         g_print("Response not NFS3_OK\n");
331         return;
332     }
333
334     struct nfs_fh3 *fh = g_new0(struct nfs_fh3, 1);
335     fh->data.data_len = res.lookup3res_u.resok.object.data.data_len;
336     fh->data.data_val = g_memdup(res.lookup3res_u.resok.object.data.data_val,
337                                  fh->data.data_len);
338
339     xdr.x_op = XDR_FREE;
340     xdr_lookup3res(&xdr, &res);
341
342     send_read_requests(nfs, fh);
343 }
344
345 static gboolean idle_handler(gpointer data)
346 {
347     NFSConnection *nfs = (NFSConnection *)data;
348     int i;
349
350     g_print("Sending requests...\n");
351     for (i = 0; i < threads; i++) {
352         char buf[64];
353         struct diropargs3 lookup;
354         uint64_t rootfh = GUINT64_TO_BE(1);
355
356         sprintf(buf, "file-%d", i + 1);
357         lookup.dir.data.data_len = 8;
358         lookup.dir.data.data_val = (char *)&rootfh;
359         lookup.name = buf;
360
361         GString *str = g_string_new("");
362         XDR xdr;
363         xdr_string_create(&xdr, str, XDR_ENCODE);
364         xdr_diropargs3(&xdr, &lookup);
365         send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
366         g_string_free(str, TRUE);
367     }
368
369     return FALSE;
370 }
371
372 NFSConnection *nfs_connect(const char *hostname)
373 {
374     int result;
375     struct addrinfo hints;
376     struct addrinfo *ai = NULL;
377
378     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
379     if (fd < 0) {
380         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
381         exit(1);
382     }
383
384     memset(&hints, 0, sizeof(hints));
385     hints.ai_family = AF_INET;
386     hints.ai_socktype = SOCK_STREAM;
387     hints.ai_protocol = IPPROTO_TCP;
388     result = getaddrinfo(hostname, "2051", NULL, &ai);
389     if (result < 0 || ai == NULL) {
390         fprintf(stderr, "Hostname lookup failure for %s: %s\n",
391                 hostname, gai_strerror(result));
392         exit(1);
393     }
394
395     if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
396         fprintf(stderr, "Unable to connect to : %m\n");
397     }
398
399     freeaddrinfo(ai);
400
401     NFSConnection *conn = g_new0(NFSConnection, 1);
402     conn->msgbuf = g_string_new("");
403     conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
404
405     conn->channel = g_io_channel_unix_new(fd);
406     g_io_channel_set_encoding(conn->channel, NULL, NULL);
407     g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
408     g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
409
410     g_idle_add(idle_handler, conn);
411
412     return conn;
413 }
414
415 int main(int argc, char *argv[])
416 {
417     g_thread_init(NULL);
418     g_set_prgname("synclient");
419     g_print("Launching synthetic NFS RPC client...\n");
420
421     threads = 8;
422     if (argc > 1)
423         threads = atoi(argv[1]);
424
425     main_loop = g_main_loop_new(NULL, FALSE);
426     nfs_connect("niniel.sysnet.ucsd.edu");
427
428     g_main_loop_run(main_loop);
429
430     return 0;
431 }