Work on a client for generating a synthetic stream of NFS operations.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 3 Feb 2010 22:01:19 +0000 (14:01 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 3 Feb 2010 22:01:19 +0000 (14:01 -0800)
This should provide another means of testing performance in addition to the
kernel NFS client.

.gitignore
nfs3/CMakeLists.txt
nfs3/common.c [new file with mode: 0644]
nfs3/nfs3_prot.h
nfs3/rpc.c
nfs3/synclient.c [new file with mode: 0644]

index fc1b865..9a995ed 100644 (file)
@@ -6,4 +6,5 @@ bluesky/libbluesky.so
 bluesky/bluesky-test
 microbench/bench
 microbench/readbench
+microbench/synclient
 nfs3/nfsproxy
index de355d8..810eccb 100644 (file)
@@ -2,7 +2,11 @@ include_directories("${LIBS3_BUILD_DIR}/include")
 link_directories("${LIBS3_BUILD_DIR}/lib")
 
 add_executable(nfsproxy
-               nfsd.c rpc.c mount.c nfs3.c mount_prot_xdr.c nfs3_prot_xdr.c)
+               common.c nfsd.c rpc.c mount.c nfs3.c
+               mount_prot_xdr.c nfs3_prot_xdr.c)
+
+add_executable(synclient synclient.c common.c nfs3_prot_xdr.c)
 
 include_directories(${GLIB_INCLUDE_DIRS} "${CMAKE_SOURCE_DIR}/bluesky")
 target_link_libraries(nfsproxy bluesky ${GLIB_LIBRARIES})
+target_link_libraries(synclient ${GLIB_LIBRARIES})
diff --git a/nfs3/common.c b/nfs3/common.c
new file mode 100644 (file)
index 0000000..36b14f5
--- /dev/null
@@ -0,0 +1,85 @@
+/* Blue Sky: File Systems in the Cloud
+ *
+ * Copyright (C) 2010  The Regents of the University of California
+ * Written by Michael Vrable <mvrable@cs.ucsd.edu>
+ *
+ * TODO: Licensing
+ */
+
+/* Common RPC code shared between the BlueSky server and test client code. */
+
+#include "mount_prot.h"
+#include "nfs3_prot.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <rpc/pmap_clnt.h>
+#include <string.h>
+#include <signal.h>
+#include <memory.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+
+#include "bluesky.h"
+
+/* Routines for XDR-encoding to a growable string. */
+static bool_t xdr_string_putlong(XDR *xdrs, const long *lp)
+{
+    GString *str = (GString *)xdrs->x_private;
+    uint32_t data = htonl(*lp);
+    g_string_set_size(str, str->len + 4);
+    memcpy(str->str + str->len - 4, &data, 4);
+    return TRUE;
+}
+
+static bool_t xdr_string_putbytes(XDR *xdrs, const char *addr, u_int len)
+{
+    GString *str = (GString *)xdrs->x_private;
+    g_string_set_size(str, str->len + len);
+    memcpy(str->str + str->len - len, addr, len);
+    return TRUE;
+}
+
+static u_int xdr_string_getpos(const XDR *xdrs)
+{
+    GString *str = (GString *)xdrs->x_private;
+    return str->len;
+}
+
+static bool_t xdr_string_putint32(XDR *xdrs, const int32_t *ip)
+{
+    GString *str = (GString *)xdrs->x_private;
+    uint32_t data = htonl(*ip);
+    g_string_set_size(str, str->len + 4);
+    memcpy(str->str + str->len - 4, &data, 4);
+    return TRUE;
+}
+
+static int32_t *xdr_string_inline(XDR *xdrs, u_int len)
+{
+    GString *str = (GString *)xdrs->x_private;
+    g_string_set_size(str, str->len + len);
+    return (int32_t *)(str->str + str->len - len);
+}
+
+static void xdr_string_destroy(XDR *xdrs)
+{
+}
+
+static struct xdr_ops xdr_string_ops = {
+    .x_putlong = xdr_string_putlong,
+    .x_putbytes = xdr_string_putbytes,
+    .x_getpostn = xdr_string_getpos,
+    .x_putint32 = xdr_string_putint32,
+    .x_inline = xdr_string_inline,
+    .x_destroy = xdr_string_destroy,
+};
+
+void xdr_string_create(XDR *xdrs, GString *string, enum xdr_op op)
+{
+    xdrs->x_op = op;
+    xdrs->x_ops = &xdr_string_ops;
+    xdrs->x_private = (char *)string;
+    xdrs->x_base = NULL;
+    xdrs->x_handy = 0;
+}
index 1a442ba..77fe5ad 100644 (file)
@@ -834,6 +834,8 @@ extern  bool_t xdr_commit3args (XDR *, commit3args*);
 extern  bool_t xdr_commit3resok (XDR *, commit3resok*);
 extern  bool_t xdr_commit3res (XDR *, commit3res*);
 
+extern void xdr_string_create(XDR *xdrs, GString *string, enum xdr_op op);
+
 #ifdef __cplusplus
 }
 #endif
index a01408f..465bcd3 100644 (file)
@@ -122,68 +122,6 @@ struct rpc_fail_reply {
     uint32_t accept_stat;
 };
 
-/* Routines for XDR-encoding to a growable string. */
-static bool_t xdr_string_putlong(XDR *xdrs, const long *lp)
-{
-    GString *str = (GString *)xdrs->x_private;
-    uint32_t data = htonl(*lp);
-    g_string_set_size(str, str->len + 4);
-    memcpy(str->str + str->len - 4, &data, 4);
-    return TRUE;
-}
-
-static bool_t xdr_string_putbytes(XDR *xdrs, const char *addr, u_int len)
-{
-    GString *str = (GString *)xdrs->x_private;
-    g_string_set_size(str, str->len + len);
-    memcpy(str->str + str->len - len, addr, len);
-    return TRUE;
-}
-
-static u_int xdr_string_getpos(const XDR *xdrs)
-{
-    GString *str = (GString *)xdrs->x_private;
-    return str->len;
-}
-
-static bool_t xdr_string_putint32(XDR *xdrs, const int32_t *ip)
-{
-    GString *str = (GString *)xdrs->x_private;
-    uint32_t data = htonl(*ip);
-    g_string_set_size(str, str->len + 4);
-    memcpy(str->str + str->len - 4, &data, 4);
-    return TRUE;
-}
-
-static int32_t *xdr_string_inline(XDR *xdrs, u_int len)
-{
-    GString *str = (GString *)xdrs->x_private;
-    g_string_set_size(str, str->len + len);
-    return (int32_t *)(str->str + str->len - len);
-}
-
-static void xdr_string_destroy(XDR *xdrs)
-{
-}
-
-static struct xdr_ops xdr_string_ops = {
-    .x_putlong = xdr_string_putlong,
-    .x_putbytes = xdr_string_putbytes,
-    .x_getpostn = xdr_string_getpos,
-    .x_putint32 = xdr_string_putint32,
-    .x_inline = xdr_string_inline,
-    .x_destroy = xdr_string_destroy,
-};
-
-static void xdr_string_create(XDR *xdrs, GString *string, enum xdr_op op)
-{
-    xdrs->x_op = op;
-    xdrs->x_ops = &xdr_string_ops;
-    xdrs->x_private = (char *)string;
-    xdrs->x_base = NULL;
-    xdrs->x_handy = 0;
-}
-
 static void
 async_rpc_send_failure(RPCRequest *req, enum accept_stat stat)
 {
diff --git a/nfs3/synclient.c b/nfs3/synclient.c
new file mode 100644 (file)
index 0000000..9806e7b
--- /dev/null
@@ -0,0 +1,290 @@
+/* Blue Sky: File Systems in the Cloud
+ *
+ * Copyright (C) 2009  The Regents of the University of California
+ * Written by Michael Vrable <mvrable@cs.ucsd.edu>
+ *
+ * TODO: Licensing
+ */
+
+/* Synthetic client for benchmarking: a tool for directly generating NFS
+ * requests and reading back the responses, so that we can exercise the server
+ * differently than the Linux kernel NFS client does.
+ *
+ * Much of this is copied from rpc.c and other BlueSky server code but is
+ * designed to run independently of BlueSky. */
+
+#include "mount_prot.h"
+#include "nfs3_prot.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <rpc/pmap_clnt.h>
+#include <string.h>
+#include <signal.h>
+#include <memory.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <netinet/ip.h>
+#include <glib.h>
+
+/* TCP port number to use for NFS protocol.  (Should be 2049.) */
+#define NFS_SERVICE_PORT 2051
+
+/* Maximum size of a single RPC message that we will accept (8 MB). */
+#define MAX_RPC_MSGSIZE (8 << 20)
+
+struct rpc_reply {
+    uint32_t xid;
+    uint32_t type;
+    uint32_t stat;
+    uint32_t verf_flavor;
+    uint32_t verf_len;
+    uint32_t accept_stat;
+};
+
+struct rpc_fail_reply {
+    uint32_t xid;
+    uint32_t type;
+    uint32_t stat;
+    uint32_t verf_flavor;
+    uint32_t verf_len;
+    uint32_t accept_stat;
+};
+
+struct rpc_call_header {
+    uint32_t xid;
+    uint32_t mtype;
+    uint32_t rpcvers;
+    uint32_t prog;
+    uint32_t vers;
+    uint32_t proc;
+};
+
+struct rpc_auth {
+    uint32_t flavor;
+    uint32_t len;
+};
+
+typedef struct {
+    GIOChannel *channel;
+
+    /* The reassembled message, thus far. */
+    GString *msgbuf;
+
+    /* Remaining number of bytes in this message fragment; 0 if we next expect
+     * another fragment header. */
+    uint32_t frag_len;
+
+    /* If frag_len is zero: the number of bytes of the fragment header that
+     * have been read so far. */
+    int frag_hdr_bytes;
+} NFSConnection;
+
+static GMainLoop *main_loop;
+
+static void do_write(NFSConnection *conn, const char *buf, size_t len)
+{
+    while (len > 0) {
+        gsize written = 0;
+        switch (g_io_channel_write_chars(conn->channel, buf, len,
+                                         &written, NULL)) {
+        case G_IO_STATUS_ERROR:
+        case G_IO_STATUS_EOF:
+        case G_IO_STATUS_AGAIN:
+            fprintf(stderr, "Error writing to socket!\n");
+            return;
+        case G_IO_STATUS_NORMAL:
+            len -= written;
+            buf += written;
+            break;
+        }
+    }
+}
+
+static void send_rpc(NFSConnection *nfs, int proc, GString *msg)
+{
+    static int xid_count = 0;
+    struct rpc_call_header header;
+    struct rpc_auth auth;
+
+    header.xid = GUINT32_TO_BE(xid_count++);
+    header.mtype = 0;
+    header.rpcvers = GUINT32_TO_BE(2);
+    header.prog = GUINT32_TO_BE(NFS_PROGRAM);
+    header.vers = GUINT32_TO_BE(NFS_V3);
+    header.proc = GUINT32_TO_BE(proc);
+
+    auth.flavor = GUINT32_TO_BE(AUTH_NULL);
+    auth.len = 0;
+
+    uint32_t fragment = htonl(0x80000000
+                              | (sizeof(header) + 2*sizeof(auth) + msg->len));
+    do_write(nfs, (const char *)&fragment, sizeof(fragment));
+    do_write(nfs, (const char *)&header, sizeof(header));
+    do_write(nfs, (const char *)&auth, sizeof(auth));
+    do_write(nfs, (const char *)&auth, sizeof(auth));
+    do_write(nfs, msg->str, msg->len);
+    g_io_channel_flush(nfs->channel, NULL);
+}
+
+static gboolean read_handler(GIOChannel *channel,
+                             GIOCondition condition,
+                             gpointer data)
+{
+    NFSConnection *nfs = (NFSConnection *)data;
+
+    gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
+
+    /* If we have not yet read in the fragment header, do that first.  This is
+     * 4 bytes that indicates the number of bytes in the message to follow
+     * (with the high bit set if this is the last fragment making up the
+     * message). */
+    if (nfs->frag_len == 0) {
+        bytes_to_read = 4 - nfs->frag_hdr_bytes;
+    } else {
+        bytes_to_read = nfs->frag_len & 0x7fffffff;
+    }
+
+    if (bytes_to_read > MAX_RPC_MSGSIZE
+        || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
+    {
+        fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
+                bytes_to_read);
+        g_io_channel_shutdown(nfs->channel, TRUE, NULL);
+        return FALSE;
+    }
+
+    gsize bytes_read = 0;
+    g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
+    char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
+    switch (g_io_channel_read_chars(nfs->channel, buf,
+                                    bytes_to_read, &bytes_read, NULL)) {
+    case G_IO_STATUS_NORMAL:
+        break;
+    case G_IO_STATUS_AGAIN:
+        return TRUE;
+    case G_IO_STATUS_EOF:
+        if (bytes_read == bytes_to_read)
+            break;
+        /* else fall through */
+    case G_IO_STATUS_ERROR:
+        fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
+                g_io_channel_unix_get_fd(nfs->channel));
+        g_io_channel_shutdown(nfs->channel, TRUE, NULL);
+        /* TODO: Clean up connection object. */
+        return FALSE;
+    }
+
+    g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
+
+    g_string_set_size(nfs->msgbuf,
+                      nfs->msgbuf->len - (bytes_to_read - bytes_read));
+
+    if (nfs->frag_len == 0) {
+        /* Handle reading in the fragment header.  If we've read the complete
+         * header, store the fragment size. */
+        nfs->frag_hdr_bytes += bytes_read;
+        if (nfs->frag_hdr_bytes == 4) {
+            memcpy((char *)&nfs->frag_len,
+                   &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
+            nfs->frag_len = ntohl(nfs->frag_len);
+            g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
+            nfs->frag_hdr_bytes = 0;
+        }
+    } else {
+        /* We were reading in the fragment body. */
+        nfs->frag_len -= bytes_read;
+
+        if (nfs->frag_len = 0x80000000) {
+            g_print("Got a message of size %zd\n", nfs->msgbuf->len);
+            nfs->frag_len = 0;
+            g_string_set_size(nfs->msgbuf, 0);
+        }
+    }
+
+    return TRUE;
+}
+
+static gboolean idle_handler(gpointer data)
+{
+    NFSConnection *nfs = (NFSConnection *)data;
+    int i;
+
+    g_print("Sending requests...\n");
+    for (i = 0; i < 8; i++) {
+        char buf[64];
+        struct diropargs3 lookup;
+        uint64_t rootfh = GUINT64_TO_BE(1);
+
+        sprintf(buf, "file-%d", i + 1);
+        lookup.dir.data.data_len = 8;
+        lookup.dir.data.data_val = (char *)&rootfh;
+        lookup.name = buf;
+
+        GString *str = g_string_new("");
+        XDR xdr;
+        xdr_string_create(&xdr, str, XDR_ENCODE);
+        xdr_diropargs3(&xdr, &lookup);
+        send_rpc(nfs, NFSPROC3_LOOKUP, str);
+        g_string_free(str, TRUE);
+    }
+
+    return FALSE;
+}
+
+NFSConnection *nfs_connect(const char *hostname)
+{
+    int result;
+    struct addrinfo hints;
+    struct addrinfo *ai = NULL;
+
+    int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+    if (fd < 0) {
+        fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
+        exit(1);
+    }
+
+    memset(&hints, 0, sizeof(hints));
+    hints.ai_family = AF_INET;
+    hints.ai_socktype = SOCK_STREAM;
+    hints.ai_protocol = IPPROTO_TCP;
+    result = getaddrinfo(hostname, "2051", NULL, &ai);
+    if (result < 0 || ai == NULL) {
+        fprintf(stderr, "Hostname lookup failure for %s: %s\n",
+                hostname, gai_strerror(result));
+        exit(1);
+    }
+
+    if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
+        fprintf(stderr, "Unable to connect to : %m\n");
+    }
+
+    freeaddrinfo(ai);
+
+    NFSConnection *conn = g_new0(NFSConnection, 1);
+    conn->msgbuf = g_string_new("");
+
+    conn->channel = g_io_channel_unix_new(fd);
+    g_io_channel_set_encoding(conn->channel, NULL, NULL);
+    g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
+    g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
+
+    g_idle_add(idle_handler, conn);
+
+    return conn;
+}
+
+int main(int argc, char *argv[])
+{
+    g_thread_init(NULL);
+    g_set_prgname("synclient");
+    g_print("Launching synthetic NFS RPC client...\n");
+
+    main_loop = g_main_loop_new(NULL, FALSE);
+    nfs_connect("niniel.sysnet.ucsd.edu");
+
+    g_main_loop_run(main_loop);
+
+    return 0;
+}