Update synthetic RPC client to record timing.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Thu, 4 Feb 2010 19:36:13 +0000 (11:36 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Thu, 4 Feb 2010 19:36:13 +0000 (11:36 -0800)
For now, both it and readbench and effectively testing the time to look up
an inode, which should be a single fetch from S3.

microbench/readbench.c
nfs3/synclient.c

index 84d6a82..043a2cf 100644 (file)
@@ -1,11 +1,13 @@
+#include <errno.h>
+#include <inttypes.h>
+#include <pthread.h>
+#include <stdint.h>
 #include <stdio.h>
 #include <stdlib.h>
-#include <stdint.h>
-#include <inttypes.h>
-#include <unistd.h>
-#include <errno.h>
+#include <sys/stat.h>
+#include <sys/types.h>
 #include <time.h>
-#include <pthread.h>
+#include <unistd.h>
 
 struct thread_state {
     pthread_t thread;
@@ -61,6 +63,11 @@ void *benchmark_thread(void *arg)
     int64_t start, end;
 
     start = now_hires();
+
+    struct stat stat_buf;
+    stat(namebuf, &stat_buf);
+
+#if 0
     FILE *f = fopen(namebuf, "rb");
     if (f == NULL) {
         perror("fopen");
@@ -69,11 +76,12 @@ void *benchmark_thread(void *arg)
 
     char buf[4096];
     fread(buf, 1, sizeof(buf), f);
+#endif
 
     end = now_hires();
     printf("Thread %d: Time = %"PRIi64"\n", ts->thread_num, end - start);
 
-    fclose(f);
+    // fclose(f);
 
     return NULL;
 }
index 9806e7b..d96ae13 100644 (file)
@@ -17,6 +17,7 @@
 #include "nfs3_prot.h"
 #include <stdio.h>
 #include <stdlib.h>
+#include <inttypes.h>
 #include <rpc/pmap_clnt.h>
 #include <string.h>
 #include <signal.h>
@@ -79,10 +80,31 @@ typedef struct {
     /* If frag_len is zero: the number of bytes of the fragment header that
      * have been read so far. */
     int frag_hdr_bytes;
+
+    /* Mapping of XID values to outstanding RPC calls. */
+    GHashTable *xid_table;
 } NFSConnection;
 
+typedef struct {
+    GFunc callback;
+    gpointer user_data;
+    int64_t start, end;
+} CallInfo;
+
 static GMainLoop *main_loop;
 
+int64_t now_hires()
+{
+    struct timespec time;
+
+    if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
+        perror("clock_gettime");
+        return 0;
+    }
+
+    return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
+}
+
 static void do_write(NFSConnection *conn, const char *buf, size_t len)
 {
     while (len > 0) {
@@ -102,7 +124,8 @@ static void do_write(NFSConnection *conn, const char *buf, size_t len)
     }
 }
 
-static void send_rpc(NFSConnection *nfs, int proc, GString *msg)
+static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
+                     GFunc completion_handler, gpointer user_data)
 {
     static int xid_count = 0;
     struct rpc_call_header header;
@@ -118,6 +141,8 @@ static void send_rpc(NFSConnection *nfs, int proc, GString *msg)
     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
     auth.len = 0;
 
+    CallInfo *info = g_new0(CallInfo, 1);
+
     uint32_t fragment = htonl(0x80000000
                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
     do_write(nfs, (const char *)&fragment, sizeof(fragment));
@@ -126,6 +151,35 @@ static void send_rpc(NFSConnection *nfs, int proc, GString *msg)
     do_write(nfs, (const char *)&auth, sizeof(auth));
     do_write(nfs, msg->str, msg->len);
     g_io_channel_flush(nfs->channel, NULL);
+
+    info->start = now_hires();
+    info->callback = completion_handler;
+    info->user_data = user_data;
+    g_hash_table_insert(nfs->xid_table,
+                        GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
+}
+
+static void process_reply(NFSConnection *nfs, GString *msg)
+{
+    struct rpc_reply *reply = (struct rpc_reply *)msg->str;
+
+    uint32_t xid = GUINT32_FROM_BE(reply->xid);
+
+    gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
+    CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
+    if (info == NULL) {
+        g_print("Could not match reply XID %d with a call!\n", xid);
+        return;
+    }
+
+    info->end = now_hires();
+    g_print("Call(XID = %d) duration: %"PRIi64" ns\n",
+            xid, info->end - info->start);
+    if (info->callback != NULL)
+        info->callback(nfs, info->user_data);
+
+    g_hash_table_remove(nfs->xid_table, key);
+    g_free(info);
 }
 
 static gboolean read_handler(GIOChannel *channel,
@@ -197,7 +251,7 @@ static gboolean read_handler(GIOChannel *channel,
         nfs->frag_len -= bytes_read;
 
         if (nfs->frag_len = 0x80000000) {
-            g_print("Got a message of size %zd\n", nfs->msgbuf->len);
+            process_reply(nfs, nfs->msgbuf);
             nfs->frag_len = 0;
             g_string_set_size(nfs->msgbuf, 0);
         }
@@ -226,7 +280,7 @@ static gboolean idle_handler(gpointer data)
         XDR xdr;
         xdr_string_create(&xdr, str, XDR_ENCODE);
         xdr_diropargs3(&xdr, &lookup);
-        send_rpc(nfs, NFSPROC3_LOOKUP, str);
+        send_rpc(nfs, NFSPROC3_LOOKUP, str, NULL, NULL);
         g_string_free(str, TRUE);
     }
 
@@ -264,6 +318,7 @@ NFSConnection *nfs_connect(const char *hostname)
 
     NFSConnection *conn = g_new0(NFSConnection, 1);
     conn->msgbuf = g_string_new("");
+    conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
 
     conn->channel = g_io_channel_unix_new(fd);
     g_io_channel_set_encoding(conn->channel, NULL, NULL);