From e1b0a756fd5ee4c3959a82ba96a6f6e1b73e1049 Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Wed, 3 Feb 2010 14:01:19 -0800 Subject: [PATCH] Work on a client for generating a synthetic stream of NFS operations. This should provide another means of testing performance in addition to the kernel NFS client. --- .gitignore | 1 + nfs3/CMakeLists.txt | 6 +- nfs3/common.c | 85 +++++++++++++ nfs3/nfs3_prot.h | 2 + nfs3/rpc.c | 62 ---------- nfs3/synclient.c | 290 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 383 insertions(+), 63 deletions(-) create mode 100644 nfs3/common.c create mode 100644 nfs3/synclient.c diff --git a/.gitignore b/.gitignore index fc1b865..9a995ed 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ bluesky/libbluesky.so bluesky/bluesky-test microbench/bench microbench/readbench +microbench/synclient nfs3/nfsproxy diff --git a/nfs3/CMakeLists.txt b/nfs3/CMakeLists.txt index de355d8..810eccb 100644 --- a/nfs3/CMakeLists.txt +++ b/nfs3/CMakeLists.txt @@ -2,7 +2,11 @@ include_directories("${LIBS3_BUILD_DIR}/include") link_directories("${LIBS3_BUILD_DIR}/lib") add_executable(nfsproxy - nfsd.c rpc.c mount.c nfs3.c mount_prot_xdr.c nfs3_prot_xdr.c) + common.c nfsd.c rpc.c mount.c nfs3.c + mount_prot_xdr.c nfs3_prot_xdr.c) + +add_executable(synclient synclient.c common.c nfs3_prot_xdr.c) include_directories(${GLIB_INCLUDE_DIRS} "${CMAKE_SOURCE_DIR}/bluesky") target_link_libraries(nfsproxy bluesky ${GLIB_LIBRARIES}) +target_link_libraries(synclient ${GLIB_LIBRARIES}) diff --git a/nfs3/common.c b/nfs3/common.c new file mode 100644 index 0000000..36b14f5 --- /dev/null +++ b/nfs3/common.c @@ -0,0 +1,85 @@ +/* Blue Sky: File Systems in the Cloud + * + * Copyright (C) 2010 The Regents of the University of California + * Written by Michael Vrable + * + * TODO: Licensing + */ + +/* Common RPC code shared between the BlueSky server and test client code. */ + +#include "mount_prot.h" +#include "nfs3_prot.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "bluesky.h" + +/* Routines for XDR-encoding to a growable string. */ +static bool_t xdr_string_putlong(XDR *xdrs, const long *lp) +{ + GString *str = (GString *)xdrs->x_private; + uint32_t data = htonl(*lp); + g_string_set_size(str, str->len + 4); + memcpy(str->str + str->len - 4, &data, 4); + return TRUE; +} + +static bool_t xdr_string_putbytes(XDR *xdrs, const char *addr, u_int len) +{ + GString *str = (GString *)xdrs->x_private; + g_string_set_size(str, str->len + len); + memcpy(str->str + str->len - len, addr, len); + return TRUE; +} + +static u_int xdr_string_getpos(const XDR *xdrs) +{ + GString *str = (GString *)xdrs->x_private; + return str->len; +} + +static bool_t xdr_string_putint32(XDR *xdrs, const int32_t *ip) +{ + GString *str = (GString *)xdrs->x_private; + uint32_t data = htonl(*ip); + g_string_set_size(str, str->len + 4); + memcpy(str->str + str->len - 4, &data, 4); + return TRUE; +} + +static int32_t *xdr_string_inline(XDR *xdrs, u_int len) +{ + GString *str = (GString *)xdrs->x_private; + g_string_set_size(str, str->len + len); + return (int32_t *)(str->str + str->len - len); +} + +static void xdr_string_destroy(XDR *xdrs) +{ +} + +static struct xdr_ops xdr_string_ops = { + .x_putlong = xdr_string_putlong, + .x_putbytes = xdr_string_putbytes, + .x_getpostn = xdr_string_getpos, + .x_putint32 = xdr_string_putint32, + .x_inline = xdr_string_inline, + .x_destroy = xdr_string_destroy, +}; + +void xdr_string_create(XDR *xdrs, GString *string, enum xdr_op op) +{ + xdrs->x_op = op; + xdrs->x_ops = &xdr_string_ops; + xdrs->x_private = (char *)string; + xdrs->x_base = NULL; + xdrs->x_handy = 0; +} diff --git a/nfs3/nfs3_prot.h b/nfs3/nfs3_prot.h index 1a442ba..77fe5ad 100644 --- a/nfs3/nfs3_prot.h +++ b/nfs3/nfs3_prot.h @@ -834,6 +834,8 @@ extern bool_t xdr_commit3args (XDR *, commit3args*); extern bool_t xdr_commit3resok (XDR *, commit3resok*); extern bool_t xdr_commit3res (XDR *, commit3res*); +extern void xdr_string_create(XDR *xdrs, GString *string, enum xdr_op op); + #ifdef __cplusplus } #endif diff --git a/nfs3/rpc.c b/nfs3/rpc.c index a01408f..465bcd3 100644 --- a/nfs3/rpc.c +++ b/nfs3/rpc.c @@ -122,68 +122,6 @@ struct rpc_fail_reply { uint32_t accept_stat; }; -/* Routines for XDR-encoding to a growable string. */ -static bool_t xdr_string_putlong(XDR *xdrs, const long *lp) -{ - GString *str = (GString *)xdrs->x_private; - uint32_t data = htonl(*lp); - g_string_set_size(str, str->len + 4); - memcpy(str->str + str->len - 4, &data, 4); - return TRUE; -} - -static bool_t xdr_string_putbytes(XDR *xdrs, const char *addr, u_int len) -{ - GString *str = (GString *)xdrs->x_private; - g_string_set_size(str, str->len + len); - memcpy(str->str + str->len - len, addr, len); - return TRUE; -} - -static u_int xdr_string_getpos(const XDR *xdrs) -{ - GString *str = (GString *)xdrs->x_private; - return str->len; -} - -static bool_t xdr_string_putint32(XDR *xdrs, const int32_t *ip) -{ - GString *str = (GString *)xdrs->x_private; - uint32_t data = htonl(*ip); - g_string_set_size(str, str->len + 4); - memcpy(str->str + str->len - 4, &data, 4); - return TRUE; -} - -static int32_t *xdr_string_inline(XDR *xdrs, u_int len) -{ - GString *str = (GString *)xdrs->x_private; - g_string_set_size(str, str->len + len); - return (int32_t *)(str->str + str->len - len); -} - -static void xdr_string_destroy(XDR *xdrs) -{ -} - -static struct xdr_ops xdr_string_ops = { - .x_putlong = xdr_string_putlong, - .x_putbytes = xdr_string_putbytes, - .x_getpostn = xdr_string_getpos, - .x_putint32 = xdr_string_putint32, - .x_inline = xdr_string_inline, - .x_destroy = xdr_string_destroy, -}; - -static void xdr_string_create(XDR *xdrs, GString *string, enum xdr_op op) -{ - xdrs->x_op = op; - xdrs->x_ops = &xdr_string_ops; - xdrs->x_private = (char *)string; - xdrs->x_base = NULL; - xdrs->x_handy = 0; -} - static void async_rpc_send_failure(RPCRequest *req, enum accept_stat stat) { diff --git a/nfs3/synclient.c b/nfs3/synclient.c new file mode 100644 index 0000000..9806e7b --- /dev/null +++ b/nfs3/synclient.c @@ -0,0 +1,290 @@ +/* Blue Sky: File Systems in the Cloud + * + * Copyright (C) 2009 The Regents of the University of California + * Written by Michael Vrable + * + * TODO: Licensing + */ + +/* Synthetic client for benchmarking: a tool for directly generating NFS + * requests and reading back the responses, so that we can exercise the server + * differently than the Linux kernel NFS client does. + * + * Much of this is copied from rpc.c and other BlueSky server code but is + * designed to run independently of BlueSky. */ + +#include "mount_prot.h" +#include "nfs3_prot.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* TCP port number to use for NFS protocol. (Should be 2049.) */ +#define NFS_SERVICE_PORT 2051 + +/* Maximum size of a single RPC message that we will accept (8 MB). */ +#define MAX_RPC_MSGSIZE (8 << 20) + +struct rpc_reply { + uint32_t xid; + uint32_t type; + uint32_t stat; + uint32_t verf_flavor; + uint32_t verf_len; + uint32_t accept_stat; +}; + +struct rpc_fail_reply { + uint32_t xid; + uint32_t type; + uint32_t stat; + uint32_t verf_flavor; + uint32_t verf_len; + uint32_t accept_stat; +}; + +struct rpc_call_header { + uint32_t xid; + uint32_t mtype; + uint32_t rpcvers; + uint32_t prog; + uint32_t vers; + uint32_t proc; +}; + +struct rpc_auth { + uint32_t flavor; + uint32_t len; +}; + +typedef struct { + GIOChannel *channel; + + /* The reassembled message, thus far. */ + GString *msgbuf; + + /* Remaining number of bytes in this message fragment; 0 if we next expect + * another fragment header. */ + uint32_t frag_len; + + /* If frag_len is zero: the number of bytes of the fragment header that + * have been read so far. */ + int frag_hdr_bytes; +} NFSConnection; + +static GMainLoop *main_loop; + +static void do_write(NFSConnection *conn, const char *buf, size_t len) +{ + while (len > 0) { + gsize written = 0; + switch (g_io_channel_write_chars(conn->channel, buf, len, + &written, NULL)) { + case G_IO_STATUS_ERROR: + case G_IO_STATUS_EOF: + case G_IO_STATUS_AGAIN: + fprintf(stderr, "Error writing to socket!\n"); + return; + case G_IO_STATUS_NORMAL: + len -= written; + buf += written; + break; + } + } +} + +static void send_rpc(NFSConnection *nfs, int proc, GString *msg) +{ + static int xid_count = 0; + struct rpc_call_header header; + struct rpc_auth auth; + + header.xid = GUINT32_TO_BE(xid_count++); + header.mtype = 0; + header.rpcvers = GUINT32_TO_BE(2); + header.prog = GUINT32_TO_BE(NFS_PROGRAM); + header.vers = GUINT32_TO_BE(NFS_V3); + header.proc = GUINT32_TO_BE(proc); + + auth.flavor = GUINT32_TO_BE(AUTH_NULL); + auth.len = 0; + + uint32_t fragment = htonl(0x80000000 + | (sizeof(header) + 2*sizeof(auth) + msg->len)); + do_write(nfs, (const char *)&fragment, sizeof(fragment)); + do_write(nfs, (const char *)&header, sizeof(header)); + do_write(nfs, (const char *)&auth, sizeof(auth)); + do_write(nfs, (const char *)&auth, sizeof(auth)); + do_write(nfs, msg->str, msg->len); + g_io_channel_flush(nfs->channel, NULL); +} + +static gboolean read_handler(GIOChannel *channel, + GIOCondition condition, + gpointer data) +{ + NFSConnection *nfs = (NFSConnection *)data; + + gsize bytes_to_read = 0; /* Number of bytes to attempt to read. */ + + /* If we have not yet read in the fragment header, do that first. This is + * 4 bytes that indicates the number of bytes in the message to follow + * (with the high bit set if this is the last fragment making up the + * message). */ + if (nfs->frag_len == 0) { + bytes_to_read = 4 - nfs->frag_hdr_bytes; + } else { + bytes_to_read = nfs->frag_len & 0x7fffffff; + } + + if (bytes_to_read > MAX_RPC_MSGSIZE + || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE) + { + fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n", + bytes_to_read); + g_io_channel_shutdown(nfs->channel, TRUE, NULL); + return FALSE; + } + + gsize bytes_read = 0; + g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read); + char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read]; + switch (g_io_channel_read_chars(nfs->channel, buf, + bytes_to_read, &bytes_read, NULL)) { + case G_IO_STATUS_NORMAL: + break; + case G_IO_STATUS_AGAIN: + return TRUE; + case G_IO_STATUS_EOF: + if (bytes_read == bytes_to_read) + break; + /* else fall through */ + case G_IO_STATUS_ERROR: + fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n", + g_io_channel_unix_get_fd(nfs->channel)); + g_io_channel_shutdown(nfs->channel, TRUE, NULL); + /* TODO: Clean up connection object. */ + return FALSE; + } + + g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read); + + g_string_set_size(nfs->msgbuf, + nfs->msgbuf->len - (bytes_to_read - bytes_read)); + + if (nfs->frag_len == 0) { + /* Handle reading in the fragment header. If we've read the complete + * header, store the fragment size. */ + nfs->frag_hdr_bytes += bytes_read; + if (nfs->frag_hdr_bytes == 4) { + memcpy((char *)&nfs->frag_len, + &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4); + nfs->frag_len = ntohl(nfs->frag_len); + g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4); + nfs->frag_hdr_bytes = 0; + } + } else { + /* We were reading in the fragment body. */ + nfs->frag_len -= bytes_read; + + if (nfs->frag_len = 0x80000000) { + g_print("Got a message of size %zd\n", nfs->msgbuf->len); + nfs->frag_len = 0; + g_string_set_size(nfs->msgbuf, 0); + } + } + + return TRUE; +} + +static gboolean idle_handler(gpointer data) +{ + NFSConnection *nfs = (NFSConnection *)data; + int i; + + g_print("Sending requests...\n"); + for (i = 0; i < 8; i++) { + char buf[64]; + struct diropargs3 lookup; + uint64_t rootfh = GUINT64_TO_BE(1); + + sprintf(buf, "file-%d", i + 1); + lookup.dir.data.data_len = 8; + lookup.dir.data.data_val = (char *)&rootfh; + lookup.name = buf; + + GString *str = g_string_new(""); + XDR xdr; + xdr_string_create(&xdr, str, XDR_ENCODE); + xdr_diropargs3(&xdr, &lookup); + send_rpc(nfs, NFSPROC3_LOOKUP, str); + g_string_free(str, TRUE); + } + + return FALSE; +} + +NFSConnection *nfs_connect(const char *hostname) +{ + int result; + struct addrinfo hints; + struct addrinfo *ai = NULL; + + int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (fd < 0) { + fprintf(stderr, "Unable to create NFS TCP socket: %m\n"); + exit(1); + } + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + result = getaddrinfo(hostname, "2051", NULL, &ai); + if (result < 0 || ai == NULL) { + fprintf(stderr, "Hostname lookup failure for %s: %s\n", + hostname, gai_strerror(result)); + exit(1); + } + + if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) { + fprintf(stderr, "Unable to connect to : %m\n"); + } + + freeaddrinfo(ai); + + NFSConnection *conn = g_new0(NFSConnection, 1); + conn->msgbuf = g_string_new(""); + + conn->channel = g_io_channel_unix_new(fd); + g_io_channel_set_encoding(conn->channel, NULL, NULL); + g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL); + g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn); + + g_idle_add(idle_handler, conn); + + return conn; +} + +int main(int argc, char *argv[]) +{ + g_thread_init(NULL); + g_set_prgname("synclient"); + g_print("Launching synthetic NFS RPC client...\n"); + + main_loop = g_main_loop_new(NULL, FALSE); + nfs_connect("niniel.sysnet.ucsd.edu"); + + g_main_loop_run(main_loop); + + return 0; +} -- 2.20.1