X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=nfs3%2Fsynclient.c;h=ef614d3444c0913a188e7fa148ce2d2613ff6bbc;hb=388030970805a70cb4fad34ade5e3de7a3607a57;hp=9806e7bb64383209b0befe5ee61b4c9e18abff88;hpb=e1b0a756fd5ee4c3959a82ba96a6f6e1b73e1049;p=bluesky.git diff --git a/nfs3/synclient.c b/nfs3/synclient.c index 9806e7b..ef614d3 100644 --- a/nfs3/synclient.c +++ b/nfs3/synclient.c @@ -17,6 +17,7 @@ #include "nfs3_prot.h" #include #include +#include #include #include #include @@ -34,6 +35,9 @@ /* Maximum size of a single RPC message that we will accept (8 MB). */ #define MAX_RPC_MSGSIZE (8 << 20) +int threads; +int completed = 0; + struct rpc_reply { uint32_t xid; uint32_t type; @@ -79,10 +83,35 @@ typedef struct { /* If frag_len is zero: the number of bytes of the fragment header that * have been read so far. */ int frag_hdr_bytes; + + /* Mapping of XID values to outstanding RPC calls. */ + GHashTable *xid_table; } NFSConnection; + +typedef void (*NFSFunc)(NFSConnection *nfs, + gpointer user_data, const char *reply, size_t len); + +typedef struct { + NFSFunc callback; + gpointer user_data; + int64_t start, end; +} CallInfo; + static GMainLoop *main_loop; +int64_t now_hires() +{ + struct timespec time; + + if (clock_gettime(CLOCK_REALTIME, &time) != 0) { + perror("clock_gettime"); + return 0; + } + + return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec; +} + static void do_write(NFSConnection *conn, const char *buf, size_t len) { while (len > 0) { @@ -102,7 +131,8 @@ static void do_write(NFSConnection *conn, const char *buf, size_t len) } } -static void send_rpc(NFSConnection *nfs, int proc, GString *msg) +static void send_rpc(NFSConnection *nfs, int proc, GString *msg, + NFSFunc completion_handler, gpointer user_data) { static int xid_count = 0; struct rpc_call_header header; @@ -118,6 +148,8 @@ static void send_rpc(NFSConnection *nfs, int proc, GString *msg) auth.flavor = GUINT32_TO_BE(AUTH_NULL); auth.len = 0; + CallInfo *info = g_new0(CallInfo, 1); + uint32_t fragment = htonl(0x80000000 | (sizeof(header) + 2*sizeof(auth) + msg->len)); do_write(nfs, (const char *)&fragment, sizeof(fragment)); @@ -126,6 +158,40 @@ static void send_rpc(NFSConnection *nfs, int proc, GString *msg) do_write(nfs, (const char *)&auth, sizeof(auth)); do_write(nfs, msg->str, msg->len); g_io_channel_flush(nfs->channel, NULL); + + info->start = now_hires(); + info->callback = completion_handler; + info->user_data = user_data; + g_hash_table_insert(nfs->xid_table, + GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info); +} + +static void process_reply(NFSConnection *nfs, GString *msg) +{ + struct rpc_reply *reply = (struct rpc_reply *)msg->str; + + uint32_t xid = GUINT32_FROM_BE(reply->xid); + + gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid)); + CallInfo *info = g_hash_table_lookup(nfs->xid_table, key); + if (info == NULL) { + g_print("Could not match reply XID %d with a call!\n", xid); + return; + } + + info->end = now_hires(); + printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start); + if (info->callback != NULL) + info->callback(nfs, info->user_data, + msg->str + sizeof(*reply), msg->len - sizeof(*reply)); + + g_hash_table_remove(nfs->xid_table, key); + g_free(info); + + completed++; + if (completed == 5 * threads) { + g_main_loop_quit(main_loop); + } } static gboolean read_handler(GIOChannel *channel, @@ -196,8 +262,8 @@ static gboolean read_handler(GIOChannel *channel, /* We were reading in the fragment body. */ nfs->frag_len -= bytes_read; - if (nfs->frag_len = 0x80000000) { - g_print("Got a message of size %zd\n", nfs->msgbuf->len); + if (nfs->frag_len == 0x80000000) { + process_reply(nfs, nfs->msgbuf); nfs->frag_len = 0; g_string_set_size(nfs->msgbuf, 0); } @@ -206,13 +272,61 @@ static gboolean read_handler(GIOChannel *channel, return TRUE; } +static void send_read_requests(NFSConnection *nfs, const struct nfs_fh3 *fh) +{ + int i; + + g_print("Sending read requests...\n"); + for (i = 0; i < 4; i++) { + char buf[64]; + struct read3args read; + memcpy(&read.file, fh, sizeof(struct nfs_fh3)); + read.offset = (1 << 20) * i; + read.count = (1 << 20); + + GString *str = g_string_new(""); + XDR xdr; + xdr_string_create(&xdr, str, XDR_ENCODE); + xdr_read3args(&xdr, &read); + send_rpc(nfs, NFSPROC3_READ, str, NULL, NULL); + g_string_free(str, TRUE); + } +} + +static void store_fh(NFSConnection *nfs, gpointer user_data, + const char *reply, size_t len) +{ + struct lookup3res res; + XDR xdr; + memset(&res, 0, sizeof(res)); + xdrmem_create(&xdr, (char *)reply, len, XDR_DECODE); + if (!xdr_lookup3res(&xdr, &res)) { + g_print("Decode error for lookup3res!\n"); + return; + } + if (res.status != NFS3_OK) { + g_print("Response not NFS3_OK\n"); + return; + } + + struct nfs_fh3 *fh = g_new0(struct nfs_fh3, 1); + fh->data.data_len = res.lookup3res_u.resok.object.data.data_len; + fh->data.data_val = g_memdup(res.lookup3res_u.resok.object.data.data_val, + fh->data.data_len); + + xdr.x_op = XDR_FREE; + xdr_lookup3res(&xdr, &res); + + send_read_requests(nfs, fh); +} + static gboolean idle_handler(gpointer data) { NFSConnection *nfs = (NFSConnection *)data; int i; g_print("Sending requests...\n"); - for (i = 0; i < 8; i++) { + for (i = 0; i < threads; i++) { char buf[64]; struct diropargs3 lookup; uint64_t rootfh = GUINT64_TO_BE(1); @@ -226,7 +340,7 @@ static gboolean idle_handler(gpointer data) XDR xdr; xdr_string_create(&xdr, str, XDR_ENCODE); xdr_diropargs3(&xdr, &lookup); - send_rpc(nfs, NFSPROC3_LOOKUP, str); + send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL); g_string_free(str, TRUE); } @@ -264,6 +378,7 @@ NFSConnection *nfs_connect(const char *hostname) NFSConnection *conn = g_new0(NFSConnection, 1); conn->msgbuf = g_string_new(""); + conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal); conn->channel = g_io_channel_unix_new(fd); g_io_channel_set_encoding(conn->channel, NULL, NULL); @@ -281,6 +396,10 @@ int main(int argc, char *argv[]) g_set_prgname("synclient"); g_print("Launching synthetic NFS RPC client...\n"); + threads = 8; + if (argc > 1) + threads = atoi(argv[1]); + main_loop = g_main_loop_new(NULL, FALSE); nfs_connect("niniel.sysnet.ucsd.edu");