1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10 * requests and reading back the responses, so that we can exercise the server
11 * differently than the Linux kernel NFS client does.
13 * Much of this is copied from rpc.c and other BlueSky server code but is
14 * designed to run independently of BlueSky. */
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
21 #include <rpc/pmap_clnt.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <netinet/in.h>
29 #include <netinet/ip.h>
32 /* TCP port number to use for NFS protocol. (Should be 2049.) */
33 #define NFS_SERVICE_PORT 2051
35 /* Maximum size of a single RPC message that we will accept (8 MB). */
36 #define MAX_RPC_MSGSIZE (8 << 20)
47 struct rpc_fail_reply {
56 struct rpc_call_header {
73 /* The reassembled message, thus far. */
76 /* Remaining number of bytes in this message fragment; 0 if we next expect
77 * another fragment header. */
80 /* If frag_len is zero: the number of bytes of the fragment header that
81 * have been read so far. */
84 /* Mapping of XID values to outstanding RPC calls. */
85 GHashTable *xid_table;
94 static GMainLoop *main_loop;
100 if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
101 perror("clock_gettime");
105 return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
108 static void do_write(NFSConnection *conn, const char *buf, size_t len)
112 switch (g_io_channel_write_chars(conn->channel, buf, len,
114 case G_IO_STATUS_ERROR:
115 case G_IO_STATUS_EOF:
116 case G_IO_STATUS_AGAIN:
117 fprintf(stderr, "Error writing to socket!\n");
119 case G_IO_STATUS_NORMAL:
127 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
128 GFunc completion_handler, gpointer user_data)
130 static int xid_count = 0;
131 struct rpc_call_header header;
132 struct rpc_auth auth;
134 header.xid = GUINT32_TO_BE(xid_count++);
136 header.rpcvers = GUINT32_TO_BE(2);
137 header.prog = GUINT32_TO_BE(NFS_PROGRAM);
138 header.vers = GUINT32_TO_BE(NFS_V3);
139 header.proc = GUINT32_TO_BE(proc);
141 auth.flavor = GUINT32_TO_BE(AUTH_NULL);
144 CallInfo *info = g_new0(CallInfo, 1);
146 uint32_t fragment = htonl(0x80000000
147 | (sizeof(header) + 2*sizeof(auth) + msg->len));
148 do_write(nfs, (const char *)&fragment, sizeof(fragment));
149 do_write(nfs, (const char *)&header, sizeof(header));
150 do_write(nfs, (const char *)&auth, sizeof(auth));
151 do_write(nfs, (const char *)&auth, sizeof(auth));
152 do_write(nfs, msg->str, msg->len);
153 g_io_channel_flush(nfs->channel, NULL);
155 info->start = now_hires();
156 info->callback = completion_handler;
157 info->user_data = user_data;
158 g_hash_table_insert(nfs->xid_table,
159 GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
162 static void process_reply(NFSConnection *nfs, GString *msg)
164 struct rpc_reply *reply = (struct rpc_reply *)msg->str;
166 uint32_t xid = GUINT32_FROM_BE(reply->xid);
168 gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
169 CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
171 g_print("Could not match reply XID %d with a call!\n", xid);
175 info->end = now_hires();
176 g_print("Call(XID = %d) duration: %"PRIi64" ns\n",
177 xid, info->end - info->start);
178 if (info->callback != NULL)
179 info->callback(nfs, info->user_data);
181 g_hash_table_remove(nfs->xid_table, key);
185 static gboolean read_handler(GIOChannel *channel,
186 GIOCondition condition,
189 NFSConnection *nfs = (NFSConnection *)data;
191 gsize bytes_to_read = 0; /* Number of bytes to attempt to read. */
193 /* If we have not yet read in the fragment header, do that first. This is
194 * 4 bytes that indicates the number of bytes in the message to follow
195 * (with the high bit set if this is the last fragment making up the
197 if (nfs->frag_len == 0) {
198 bytes_to_read = 4 - nfs->frag_hdr_bytes;
200 bytes_to_read = nfs->frag_len & 0x7fffffff;
203 if (bytes_to_read > MAX_RPC_MSGSIZE
204 || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
206 fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
208 g_io_channel_shutdown(nfs->channel, TRUE, NULL);
212 gsize bytes_read = 0;
213 g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
214 char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
215 switch (g_io_channel_read_chars(nfs->channel, buf,
216 bytes_to_read, &bytes_read, NULL)) {
217 case G_IO_STATUS_NORMAL:
219 case G_IO_STATUS_AGAIN:
221 case G_IO_STATUS_EOF:
222 if (bytes_read == bytes_to_read)
224 /* else fall through */
225 case G_IO_STATUS_ERROR:
226 fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
227 g_io_channel_unix_get_fd(nfs->channel));
228 g_io_channel_shutdown(nfs->channel, TRUE, NULL);
229 /* TODO: Clean up connection object. */
233 g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
235 g_string_set_size(nfs->msgbuf,
236 nfs->msgbuf->len - (bytes_to_read - bytes_read));
238 if (nfs->frag_len == 0) {
239 /* Handle reading in the fragment header. If we've read the complete
240 * header, store the fragment size. */
241 nfs->frag_hdr_bytes += bytes_read;
242 if (nfs->frag_hdr_bytes == 4) {
243 memcpy((char *)&nfs->frag_len,
244 &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
245 nfs->frag_len = ntohl(nfs->frag_len);
246 g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
247 nfs->frag_hdr_bytes = 0;
250 /* We were reading in the fragment body. */
251 nfs->frag_len -= bytes_read;
253 if (nfs->frag_len = 0x80000000) {
254 process_reply(nfs, nfs->msgbuf);
256 g_string_set_size(nfs->msgbuf, 0);
263 static gboolean idle_handler(gpointer data)
265 NFSConnection *nfs = (NFSConnection *)data;
268 g_print("Sending requests...\n");
269 for (i = 0; i < 64; i++) {
271 struct diropargs3 lookup;
272 uint64_t rootfh = GUINT64_TO_BE(1);
274 sprintf(buf, "file-%d", i + 1);
275 lookup.dir.data.data_len = 8;
276 lookup.dir.data.data_val = (char *)&rootfh;
279 GString *str = g_string_new("");
281 xdr_string_create(&xdr, str, XDR_ENCODE);
282 xdr_diropargs3(&xdr, &lookup);
283 send_rpc(nfs, NFSPROC3_LOOKUP, str, NULL, NULL);
284 g_string_free(str, TRUE);
290 NFSConnection *nfs_connect(const char *hostname)
293 struct addrinfo hints;
294 struct addrinfo *ai = NULL;
296 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
298 fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
302 memset(&hints, 0, sizeof(hints));
303 hints.ai_family = AF_INET;
304 hints.ai_socktype = SOCK_STREAM;
305 hints.ai_protocol = IPPROTO_TCP;
306 result = getaddrinfo(hostname, "2051", NULL, &ai);
307 if (result < 0 || ai == NULL) {
308 fprintf(stderr, "Hostname lookup failure for %s: %s\n",
309 hostname, gai_strerror(result));
313 if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
314 fprintf(stderr, "Unable to connect to : %m\n");
319 NFSConnection *conn = g_new0(NFSConnection, 1);
320 conn->msgbuf = g_string_new("");
321 conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
323 conn->channel = g_io_channel_unix_new(fd);
324 g_io_channel_set_encoding(conn->channel, NULL, NULL);
325 g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
326 g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
328 g_idle_add(idle_handler, conn);
333 int main(int argc, char *argv[])
336 g_set_prgname("synclient");
337 g_print("Launching synthetic NFS RPC client...\n");
339 main_loop = g_main_loop_new(NULL, FALSE);
340 nfs_connect("niniel.sysnet.ucsd.edu");
342 g_main_loop_run(main_loop);