Update synthetic RPC client to record timing.
[bluesky.git] / nfs3 / synclient.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10  * requests and reading back the responses, so that we can exercise the server
11  * differently than the Linux kernel NFS client does.
12  *
13  * Much of this is copied from rpc.c and other BlueSky server code but is
14  * designed to run independently of BlueSky. */
15
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <inttypes.h>
21 #include <rpc/pmap_clnt.h>
22 #include <string.h>
23 #include <signal.h>
24 #include <memory.h>
25 #include <netdb.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <netinet/in.h>
29 #include <netinet/ip.h>
30 #include <glib.h>
31
32 /* TCP port number to use for NFS protocol.  (Should be 2049.) */
33 #define NFS_SERVICE_PORT 2051
34
35 /* Maximum size of a single RPC message that we will accept (8 MB). */
36 #define MAX_RPC_MSGSIZE (8 << 20)
37
38 struct rpc_reply {
39     uint32_t xid;
40     uint32_t type;
41     uint32_t stat;
42     uint32_t verf_flavor;
43     uint32_t verf_len;
44     uint32_t accept_stat;
45 };
46
47 struct rpc_fail_reply {
48     uint32_t xid;
49     uint32_t type;
50     uint32_t stat;
51     uint32_t verf_flavor;
52     uint32_t verf_len;
53     uint32_t accept_stat;
54 };
55
56 struct rpc_call_header {
57     uint32_t xid;
58     uint32_t mtype;
59     uint32_t rpcvers;
60     uint32_t prog;
61     uint32_t vers;
62     uint32_t proc;
63 };
64
65 struct rpc_auth {
66     uint32_t flavor;
67     uint32_t len;
68 };
69
70 typedef struct {
71     GIOChannel *channel;
72
73     /* The reassembled message, thus far. */
74     GString *msgbuf;
75
76     /* Remaining number of bytes in this message fragment; 0 if we next expect
77      * another fragment header. */
78     uint32_t frag_len;
79
80     /* If frag_len is zero: the number of bytes of the fragment header that
81      * have been read so far. */
82     int frag_hdr_bytes;
83
84     /* Mapping of XID values to outstanding RPC calls. */
85     GHashTable *xid_table;
86 } NFSConnection;
87
88 typedef struct {
89     GFunc callback;
90     gpointer user_data;
91     int64_t start, end;
92 } CallInfo;
93
94 static GMainLoop *main_loop;
95
96 int64_t now_hires()
97 {
98     struct timespec time;
99
100     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
101         perror("clock_gettime");
102         return 0;
103     }
104
105     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
106 }
107
108 static void do_write(NFSConnection *conn, const char *buf, size_t len)
109 {
110     while (len > 0) {
111         gsize written = 0;
112         switch (g_io_channel_write_chars(conn->channel, buf, len,
113                                          &written, NULL)) {
114         case G_IO_STATUS_ERROR:
115         case G_IO_STATUS_EOF:
116         case G_IO_STATUS_AGAIN:
117             fprintf(stderr, "Error writing to socket!\n");
118             return;
119         case G_IO_STATUS_NORMAL:
120             len -= written;
121             buf += written;
122             break;
123         }
124     }
125 }
126
127 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
128                      GFunc completion_handler, gpointer user_data)
129 {
130     static int xid_count = 0;
131     struct rpc_call_header header;
132     struct rpc_auth auth;
133
134     header.xid = GUINT32_TO_BE(xid_count++);
135     header.mtype = 0;
136     header.rpcvers = GUINT32_TO_BE(2);
137     header.prog = GUINT32_TO_BE(NFS_PROGRAM);
138     header.vers = GUINT32_TO_BE(NFS_V3);
139     header.proc = GUINT32_TO_BE(proc);
140
141     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
142     auth.len = 0;
143
144     CallInfo *info = g_new0(CallInfo, 1);
145
146     uint32_t fragment = htonl(0x80000000
147                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
148     do_write(nfs, (const char *)&fragment, sizeof(fragment));
149     do_write(nfs, (const char *)&header, sizeof(header));
150     do_write(nfs, (const char *)&auth, sizeof(auth));
151     do_write(nfs, (const char *)&auth, sizeof(auth));
152     do_write(nfs, msg->str, msg->len);
153     g_io_channel_flush(nfs->channel, NULL);
154
155     info->start = now_hires();
156     info->callback = completion_handler;
157     info->user_data = user_data;
158     g_hash_table_insert(nfs->xid_table,
159                         GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
160 }
161
162 static void process_reply(NFSConnection *nfs, GString *msg)
163 {
164     struct rpc_reply *reply = (struct rpc_reply *)msg->str;
165
166     uint32_t xid = GUINT32_FROM_BE(reply->xid);
167
168     gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
169     CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
170     if (info == NULL) {
171         g_print("Could not match reply XID %d with a call!\n", xid);
172         return;
173     }
174
175     info->end = now_hires();
176     g_print("Call(XID = %d) duration: %"PRIi64" ns\n",
177             xid, info->end - info->start);
178     if (info->callback != NULL)
179         info->callback(nfs, info->user_data);
180
181     g_hash_table_remove(nfs->xid_table, key);
182     g_free(info);
183 }
184
185 static gboolean read_handler(GIOChannel *channel,
186                              GIOCondition condition,
187                              gpointer data)
188 {
189     NFSConnection *nfs = (NFSConnection *)data;
190
191     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
192
193     /* If we have not yet read in the fragment header, do that first.  This is
194      * 4 bytes that indicates the number of bytes in the message to follow
195      * (with the high bit set if this is the last fragment making up the
196      * message). */
197     if (nfs->frag_len == 0) {
198         bytes_to_read = 4 - nfs->frag_hdr_bytes;
199     } else {
200         bytes_to_read = nfs->frag_len & 0x7fffffff;
201     }
202
203     if (bytes_to_read > MAX_RPC_MSGSIZE
204         || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
205     {
206         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
207                 bytes_to_read);
208         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
209         return FALSE;
210     }
211
212     gsize bytes_read = 0;
213     g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
214     char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
215     switch (g_io_channel_read_chars(nfs->channel, buf,
216                                     bytes_to_read, &bytes_read, NULL)) {
217     case G_IO_STATUS_NORMAL:
218         break;
219     case G_IO_STATUS_AGAIN:
220         return TRUE;
221     case G_IO_STATUS_EOF:
222         if (bytes_read == bytes_to_read)
223             break;
224         /* else fall through */
225     case G_IO_STATUS_ERROR:
226         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
227                 g_io_channel_unix_get_fd(nfs->channel));
228         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
229         /* TODO: Clean up connection object. */
230         return FALSE;
231     }
232
233     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
234
235     g_string_set_size(nfs->msgbuf,
236                       nfs->msgbuf->len - (bytes_to_read - bytes_read));
237
238     if (nfs->frag_len == 0) {
239         /* Handle reading in the fragment header.  If we've read the complete
240          * header, store the fragment size. */
241         nfs->frag_hdr_bytes += bytes_read;
242         if (nfs->frag_hdr_bytes == 4) {
243             memcpy((char *)&nfs->frag_len,
244                    &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
245             nfs->frag_len = ntohl(nfs->frag_len);
246             g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
247             nfs->frag_hdr_bytes = 0;
248         }
249     } else {
250         /* We were reading in the fragment body. */
251         nfs->frag_len -= bytes_read;
252
253         if (nfs->frag_len = 0x80000000) {
254             process_reply(nfs, nfs->msgbuf);
255             nfs->frag_len = 0;
256             g_string_set_size(nfs->msgbuf, 0);
257         }
258     }
259
260     return TRUE;
261 }
262
263 static gboolean idle_handler(gpointer data)
264 {
265     NFSConnection *nfs = (NFSConnection *)data;
266     int i;
267
268     g_print("Sending requests...\n");
269     for (i = 0; i < 8; i++) {
270         char buf[64];
271         struct diropargs3 lookup;
272         uint64_t rootfh = GUINT64_TO_BE(1);
273
274         sprintf(buf, "file-%d", i + 1);
275         lookup.dir.data.data_len = 8;
276         lookup.dir.data.data_val = (char *)&rootfh;
277         lookup.name = buf;
278
279         GString *str = g_string_new("");
280         XDR xdr;
281         xdr_string_create(&xdr, str, XDR_ENCODE);
282         xdr_diropargs3(&xdr, &lookup);
283         send_rpc(nfs, NFSPROC3_LOOKUP, str, NULL, NULL);
284         g_string_free(str, TRUE);
285     }
286
287     return FALSE;
288 }
289
290 NFSConnection *nfs_connect(const char *hostname)
291 {
292     int result;
293     struct addrinfo hints;
294     struct addrinfo *ai = NULL;
295
296     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
297     if (fd < 0) {
298         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
299         exit(1);
300     }
301
302     memset(&hints, 0, sizeof(hints));
303     hints.ai_family = AF_INET;
304     hints.ai_socktype = SOCK_STREAM;
305     hints.ai_protocol = IPPROTO_TCP;
306     result = getaddrinfo(hostname, "2051", NULL, &ai);
307     if (result < 0 || ai == NULL) {
308         fprintf(stderr, "Hostname lookup failure for %s: %s\n",
309                 hostname, gai_strerror(result));
310         exit(1);
311     }
312
313     if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
314         fprintf(stderr, "Unable to connect to : %m\n");
315     }
316
317     freeaddrinfo(ai);
318
319     NFSConnection *conn = g_new0(NFSConnection, 1);
320     conn->msgbuf = g_string_new("");
321     conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
322
323     conn->channel = g_io_channel_unix_new(fd);
324     g_io_channel_set_encoding(conn->channel, NULL, NULL);
325     g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
326     g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
327
328     g_idle_add(idle_handler, conn);
329
330     return conn;
331 }
332
333 int main(int argc, char *argv[])
334 {
335     g_thread_init(NULL);
336     g_set_prgname("synclient");
337     g_print("Launching synthetic NFS RPC client...\n");
338
339     main_loop = g_main_loop_new(NULL, FALSE);
340     nfs_connect("niniel.sysnet.ucsd.edu");
341
342     g_main_loop_run(main_loop);
343
344     return 0;
345 }