1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10 * requests and reading back the responses, so that we can exercise the server
11 * differently than the Linux kernel NFS client does.
13 * Much of this is copied from rpc.c and other BlueSky server code but is
14 * designed to run independently of BlueSky. */
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
21 #include <rpc/pmap_clnt.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <netinet/in.h>
29 #include <netinet/ip.h>
32 /* TCP port number to use for NFS protocol. (Would be 2049 in standard NFS.) */
33 #define NFS_SERVICE_PORT 2051
35 /* Maximum size of a single RPC message that we will accept (8 MB). */
36 #define MAX_RPC_MSGSIZE (8 << 20)
57 struct rpc_fail_reply {
66 struct rpc_call_header {
83 /* The reassembled message, thus far. */
86 /* Remaining number of bytes in this message fragment; 0 if we next expect
87 * another fragment header. */
90 /* If frag_len is zero: the number of bytes of the fragment header that
91 * have been read so far. */
94 /* Mapping of XID values to outstanding RPC calls. */
95 GHashTable *xid_table;
99 typedef void (*NFSFunc)(NFSConnection *nfs,
100 gpointer user_data, const char *reply, size_t len);
108 static GMainLoop *main_loop;
112 struct timespec time;
114 if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
115 perror("clock_gettime");
119 return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
122 static void do_write(NFSConnection *conn, const char *buf, size_t len)
126 switch (g_io_channel_write_chars(conn->channel, buf, len,
128 case G_IO_STATUS_ERROR:
129 case G_IO_STATUS_EOF:
130 case G_IO_STATUS_AGAIN:
131 fprintf(stderr, "Error writing to socket!\n");
133 case G_IO_STATUS_NORMAL:
141 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
142 NFSFunc completion_handler, gpointer user_data)
144 static int xid_count = 0;
145 struct rpc_call_header header;
146 struct rpc_auth auth;
148 header.xid = GUINT32_TO_BE(xid_count++);
150 header.rpcvers = GUINT32_TO_BE(2);
151 header.prog = GUINT32_TO_BE(NFS_PROGRAM);
152 header.vers = GUINT32_TO_BE(NFS_V3);
153 header.proc = GUINT32_TO_BE(proc);
155 auth.flavor = GUINT32_TO_BE(AUTH_NULL);
158 CallInfo *info = g_new0(CallInfo, 1);
160 uint32_t fragment = htonl(0x80000000
161 | (sizeof(header) + 2*sizeof(auth) + msg->len));
162 do_write(nfs, (const char *)&fragment, sizeof(fragment));
163 do_write(nfs, (const char *)&header, sizeof(header));
164 do_write(nfs, (const char *)&auth, sizeof(auth));
165 do_write(nfs, (const char *)&auth, sizeof(auth));
166 do_write(nfs, msg->str, msg->len);
167 g_io_channel_flush(nfs->channel, NULL);
169 info->start = now_hires();
170 info->callback = completion_handler;
171 info->user_data = user_data;
172 g_hash_table_insert(nfs->xid_table,
173 GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
176 static void process_reply(NFSConnection *nfs, GString *msg)
178 struct rpc_reply *reply = (struct rpc_reply *)msg->str;
180 uint32_t xid = GUINT32_FROM_BE(reply->xid);
182 gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
183 CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
185 g_print("Could not match reply XID %d with a call!\n", xid);
189 info->end = now_hires();
190 printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
191 if (info->callback != NULL)
192 info->callback(nfs, info->user_data,
193 msg->str + sizeof(*reply), msg->len - sizeof(*reply));
195 g_hash_table_remove(nfs->xid_table, key);
199 if (completed == 128 * threads) {
200 g_main_loop_quit(main_loop);
204 static gboolean read_handler(GIOChannel *channel,
205 GIOCondition condition,
208 NFSConnection *nfs = (NFSConnection *)data;
210 gsize bytes_to_read = 0; /* Number of bytes to attempt to read. */
212 /* If we have not yet read in the fragment header, do that first. This is
213 * 4 bytes that indicates the number of bytes in the message to follow
214 * (with the high bit set if this is the last fragment making up the
216 if (nfs->frag_len == 0) {
217 bytes_to_read = 4 - nfs->frag_hdr_bytes;
219 bytes_to_read = nfs->frag_len & 0x7fffffff;
222 if (bytes_to_read > MAX_RPC_MSGSIZE
223 || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
225 fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
227 g_io_channel_shutdown(nfs->channel, TRUE, NULL);
231 gsize bytes_read = 0;
232 g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
233 char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
234 switch (g_io_channel_read_chars(nfs->channel, buf,
235 bytes_to_read, &bytes_read, NULL)) {
236 case G_IO_STATUS_NORMAL:
238 case G_IO_STATUS_AGAIN:
240 case G_IO_STATUS_EOF:
241 if (bytes_read == bytes_to_read)
243 /* else fall through */
244 case G_IO_STATUS_ERROR:
245 fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
246 g_io_channel_unix_get_fd(nfs->channel));
247 g_io_channel_shutdown(nfs->channel, TRUE, NULL);
248 /* TODO: Clean up connection object. */
252 g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
254 g_string_set_size(nfs->msgbuf,
255 nfs->msgbuf->len - (bytes_to_read - bytes_read));
257 if (nfs->frag_len == 0) {
258 /* Handle reading in the fragment header. If we've read the complete
259 * header, store the fragment size. */
260 nfs->frag_hdr_bytes += bytes_read;
261 if (nfs->frag_hdr_bytes == 4) {
262 memcpy((char *)&nfs->frag_len,
263 &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
264 nfs->frag_len = ntohl(nfs->frag_len);
265 g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
266 nfs->frag_hdr_bytes = 0;
269 /* We were reading in the fragment body. */
270 nfs->frag_len -= bytes_read;
272 if (nfs->frag_len == 0x80000000) {
273 process_reply(nfs, nfs->msgbuf);
275 g_string_set_size(nfs->msgbuf, 0);
282 static void send_read_request(NFSConnection *nfs, uint64_t inum,
283 uint64_t offset, uint64_t len);
285 static void finish_read_request(NFSConnection *nfs, gpointer user_data,
286 const char *reply, size_t len)
288 printf("Done reading inode %d\n", GPOINTER_TO_INT(user_data));
290 struct bench_file *bf;
291 bf = &g_array_index(bench_files, struct bench_file,
292 g_random_int_range(0, bench_files->len));
293 send_read_request(nfs, bf->inum, 0, 1048576);
296 static void send_read_request(NFSConnection *nfs, uint64_t inum,
297 uint64_t offset, uint64_t len)
300 uint64_t fhdata = GUINT64_TO_BE(inum);
301 fh.data.data_val = (char *)&fhdata;
302 fh.data.data_len = 8;
306 struct read3args read;
307 memcpy(&read.file, &fh, sizeof(struct nfs_fh3));
308 read.offset = offset;
311 GString *str = g_string_new("");
313 xdr_string_create(&xdr, str, XDR_ENCODE);
314 xdr_read3args(&xdr, &read);
315 send_rpc(nfs, NFSPROC3_READ, str, finish_read_request,
316 GINT_TO_POINTER((int)inum));
317 g_string_free(str, TRUE);
320 static gboolean idle_handler(gpointer data)
322 NFSConnection *nfs = (NFSConnection *)data;
325 for (i = 0; i < threads; i++) {
326 struct bench_file *bf;
327 bf = &g_array_index(bench_files, struct bench_file,
328 g_random_int_range(0, bench_files->len));
329 send_read_request(nfs, bf->inum, 0, 1048576);
333 g_print("Sending requests...\n");
334 for (i = 0; i < threads; i++) {
336 struct diropargs3 lookup;
337 uint64_t rootfh = GUINT64_TO_BE(1);
339 sprintf(buf, "file-%d", i + 1);
340 lookup.dir.data.data_len = 8;
341 lookup.dir.data.data_val = (char *)&rootfh;
344 GString *str = g_string_new("");
346 xdr_string_create(&xdr, str, XDR_ENCODE);
347 xdr_diropargs3(&xdr, &lookup);
348 send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
349 g_string_free(str, TRUE);
356 NFSConnection *nfs_connect(const char *hostname)
359 struct addrinfo hints;
360 struct addrinfo *ai = NULL;
362 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
364 fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
368 memset(&hints, 0, sizeof(hints));
369 hints.ai_family = AF_INET;
370 hints.ai_socktype = SOCK_STREAM;
371 hints.ai_protocol = IPPROTO_TCP;
372 result = getaddrinfo(hostname, "2051", NULL, &ai);
373 if (result < 0 || ai == NULL) {
374 fprintf(stderr, "Hostname lookup failure for %s: %s\n",
375 hostname, gai_strerror(result));
379 if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
380 fprintf(stderr, "Unable to connect to : %m\n");
385 NFSConnection *conn = g_new0(NFSConnection, 1);
386 conn->msgbuf = g_string_new("");
387 conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
389 conn->channel = g_io_channel_unix_new(fd);
390 g_io_channel_set_encoding(conn->channel, NULL, NULL);
391 g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
392 g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
394 g_idle_add(idle_handler, conn);
399 int main(int argc, char *argv[])
402 g_set_prgname("synclient");
403 g_print("Launching synthetic NFS RPC client...\n");
405 bench_files = g_array_new(FALSE, TRUE, sizeof(struct bench_file));
407 FILE *inodes = fopen(argv[1], "r");
408 if (inodes == NULL) {
412 while (!feof(inodes)) {
413 int i1 = -1, i2 = -1;
414 fscanf(inodes, "%d %d", &i1, &i2);
415 if (i1 < 0 || i2 < 0)
418 struct bench_file bf;
421 g_array_append_val(bench_files, bf);
426 threads = atoi(argv[2]);
428 main_loop = g_main_loop_new(NULL, FALSE);
429 nfs_connect("niniel.sysnet.ucsd.edu");
431 g_main_loop_run(main_loop);