More cleanups to move RPC layer towards being asynchronous.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 13 Jan 2010 19:05:39 +0000 (11:05 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Wed, 13 Jan 2010 19:05:39 +0000 (11:05 -0800)
nfs3/nfs3_prot.h
nfs3/rpc.c

index d5152a1..7f75928 100644 (file)
@@ -10,7 +10,6 @@
 extern "C" {
 #endif
 
-
 typedef u_quad_t uint64;
 
 typedef quad_t int64;
@@ -639,6 +638,51 @@ struct commit3res {
 };
 typedef struct commit3res commit3res;
 
+/* Structure for tracking a single incoming TCP connection for RPCs.  For now,
+ * used for NFS only. */
+typedef struct {
+    GIOChannel *channel;
+
+    /* The reassembled message, thus far. */
+    GString *msgbuf;
+
+    /* Remaining number of bytes in this message fragment; 0 if we next expect
+     * another fragment header. */
+    uint32_t frag_len;
+
+    /* If frag_len is zero: the number of bytes of the fragment header that
+     * have been read so far. */
+    int frag_hdr_bytes;
+} RPCConnection;
+
+/* Used to track a single outstanding RPC request.  Not all of the fields are
+ * initially filled in, but more are filled in as the request is processed. */
+typedef struct {
+    /* The corresponding connection on which the request was made. */
+    RPCConnection *connection;
+
+    /* Transaction ID of the request, in host byte order. */
+    uint32_t xid;
+
+    /* Raw XDR arguments for the call, including headers (everything except the
+     * fragment headers).  Also, the offset to the actual arguments (number of
+     * bytes making up the headers). */
+    GString *raw_args;
+    size_t raw_args_header_bytes;
+
+    /* Decoded header information. */
+    int req_proc;
+
+    /* The XDR-decoded argument of the call, and the procedure to use for
+     * freeing these arguments.  The actual freeing is done automatically when
+     * the response is sent. */
+    void *args;
+    xdrproc_t xdr_args_free;
+
+    /* Procedure to be used for encoding the eventual return value into XDR. */
+    xdrproc_t xdr_result;
+} RPCRequest;
+
 #define NFS_PROGRAM 100003
 #define NFS_V3 3
 
index 7376535..d47943c 100644 (file)
@@ -32,22 +32,6 @@ extern BlueSkyFS *fs;
 /* Maximum size of a single RPC message that we will accept (8 MB). */
 #define MAX_RPC_MSGSIZE (8 << 20)
 
-/* For now, used for NFS only. */
-typedef struct {
-    GIOChannel *channel;
-
-    /* The reassembled message, thus far. */
-    GString *msgbuf;
-
-    /* Remaining number of bytes in this message fragment; 0 if we next expect
-     * another fragment header. */
-    uint32_t frag_len;
-
-    /* If frag_len is zero: the number of bytes of the fragment header that
-     * have been read so far. */
-    int frag_hdr_bytes;
-} RPCConnection;
-
 static void
 mount_program_3(struct svc_req *rqstp, register SVCXPRT *transp)
 {
@@ -138,13 +122,13 @@ struct rpc_fail_reply {
 };
 
 static void
-async_rpc_send_failure(RPCConnection *rpc, uint32_t xid, enum accept_stat stat)
+async_rpc_send_failure(RPCRequest *req, enum accept_stat stat)
 {
     struct rpc_fail_reply header;
 
     fprintf(stderr, "Sending RPC failure status %d\n", stat);
 
-    header.xid = htonl(xid);
+    header.xid = htonl(req->xid);
     header.type = htonl(1);     /* REPLY */
     header.stat = htonl(MSG_ACCEPTED);
     header.verf_flavor = 0;
@@ -152,16 +136,34 @@ async_rpc_send_failure(RPCConnection *rpc, uint32_t xid, enum accept_stat stat)
     header.accept_stat = htonl(stat);
 
     uint32_t fragment = htonl(sizeof(header) | 0x80000000);
-    async_rpc_write(rpc, (const char *)&fragment, sizeof(fragment));
-    async_rpc_write(rpc, (const char *)&header, sizeof(header));
-    g_io_channel_flush(rpc->channel, NULL);
+    async_rpc_write(req->connection, (const char *)&fragment, sizeof(fragment));
+    async_rpc_write(req->connection, (const char *)&header, sizeof(header));
+    g_io_channel_flush(req->connection->channel, NULL);
+
+    if (req->raw_args != NULL)
+        g_string_free(req->raw_args, TRUE);
+
+    if (req->args != NULL) {
+        char buf[4];
+        XDR xdr;
+        xdrmem_create(&xdr, buf, sizeof(buf), XDR_FREE);
+        if (!req->xdr_args_free(&xdr, req->args)) {
+            fprintf(stderr, "unable to free arguments");
+        }
+    }
+
+    g_free(req);
 }
 
 static void
-nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid,
-              const char *msg_buf, size_t msg_len)
+nfs_program_3(RPCRequest *req)
 {
-    union {
+    RPCConnection *connection = req->connection;
+    uint32_t xid = req->xid;
+    const char *msg_buf = req->raw_args->str + req->raw_args_header_bytes;
+    size_t msg_len = req->raw_args->len - req->raw_args_header_bytes;
+
+    union argtype {
         nfs_fh3 nfsproc3_getattr_3_arg;
         setattr3args nfsproc3_setattr_3_arg;
         diropargs3 nfsproc3_lookup_3_arg;
@@ -183,12 +185,12 @@ nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid,
         nfs_fh3 nfsproc3_fsinfo_3_arg;
         nfs_fh3 nfsproc3_pathconf_3_arg;
         commit3args nfsproc3_commit_3_arg;
-    } argument;
+    };
     char *result;
     xdrproc_t _xdr_argument, _xdr_result;
     char *(*local)(char *, struct svc_req *);
 
-    switch (rqstp->rq_proc) {
+    switch (req->req_proc) {
     case NFSPROC3_NULL:
         _xdr_argument = (xdrproc_t) xdr_void;
         _xdr_result = (xdrproc_t) xdr_void;
@@ -322,35 +324,30 @@ nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid,
         break;
 
     default:
-        async_rpc_send_failure(connection, xid, PROC_UNAVAIL);
+        async_rpc_send_failure(req, PROC_UNAVAIL);
         return;
     }
 
     /* Decode incoming message */
-    memset ((char *)&argument, 0, sizeof (argument));
+    req->xdr_args_free = _xdr_argument;
+    req->args = g_new0(union argtype, 1);
     XDR xdr_in;
     xdrmem_create(&xdr_in, (char *)msg_buf, msg_len, XDR_DECODE);
-    int i;
-    printf("Call XDR: ");
-    for (i = 0; i < msg_len; i++) {
-        printf("%02x ", (uint8_t)msg_buf[i]);
-    }
-    printf("\n");
-    if (!_xdr_argument(&xdr_in, (caddr_t)&argument)) {
-        async_rpc_send_failure(connection, xid, GARBAGE_ARGS);
+    if (!_xdr_argument(&xdr_in, req->args)) {
+        async_rpc_send_failure(req, GARBAGE_ARGS);
         fprintf(stderr, "RPC decode error!\n");
         return;
     }
 
     /* Perform the call. */
-    result = (*local)((char *)&argument, rqstp);
+    result = (*local)((char *)req->args, NULL);
 
     /* Encode result and send reply. */
     static char reply_buf[MAX_RPC_MSGSIZE];
     XDR xdr_out;
     xdrmem_create(&xdr_out, reply_buf, MAX_RPC_MSGSIZE, XDR_ENCODE);
     if (result != NULL && !_xdr_result(&xdr_out, result)) {
-        async_rpc_send_failure(connection, xid, SYSTEM_ERR);
+        async_rpc_send_failure(req, SYSTEM_ERR);
     }
 
     struct rpc_reply header;
@@ -371,10 +368,11 @@ nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid,
 
     /* Clean up. */
     xdr_in.x_op = XDR_FREE;
-    if (!_xdr_argument(&xdr_in, (caddr_t)&argument)) {
+    if (!_xdr_argument(&xdr_in, (caddr_t)req->args)) {
         fprintf (stderr, "%s", "unable to free arguments");
         exit (1);
     }
+    g_free(req->args);
 
     bluesky_flushd_invoke(fs);
 
@@ -431,13 +429,19 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc)
 
     if (ntohl(header->rpcvers) != 2) {
         return FALSE;
-    } else if (ntohl(header->prog) != NFS_PROGRAM) {
-        async_rpc_send_failure(rpc, xid, PROG_UNAVAIL);
+    }
+
+    RPCRequest *req = g_new0(RPCRequest, 1);
+    req->connection = rpc;
+    req->xid = xid;
+
+    if (ntohl(header->prog) != NFS_PROGRAM) {
+        async_rpc_send_failure(req, PROG_UNAVAIL);
         return TRUE;
     } else if (ntohl(header->vers) != NFS_V3) {
         /* FIXME: Should be PROG_MISMATCH */
-        async_rpc_send_failure(rpc, xid, PROG_UNAVAIL);
-        return FALSE;
+        async_rpc_send_failure(req, PROG_UNAVAIL);
+        return TRUE;
     }
 
     uint32_t proc = ntohl(header->proc);
@@ -461,18 +465,12 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc)
 
     printf("Dispatching RPC procedure %d...\n", proc);
 
-    struct svc_req req;
-    req.rq_prog = ntohl(header->prog);
-    req.rq_vers = ntohl(header->vers);
-    req.rq_proc = ntohl(header->proc);
-    req.rq_cred.oa_flavor = 0;
-    req.rq_cred.oa_base = NULL;
-    req.rq_cred.oa_length = 0;
-    req.rq_clntcred = NULL;
-    req.rq_xprt = NULL;
-
-    nfs_program_3(&req, rpc, ntohl(header->xid), buf,
-                  (msg->str + msg->len) - buf);
+    req->raw_args = msg;
+    req->raw_args_header_bytes = buf - msg->str;
+    req->req_proc = ntohl(header->proc);
+    rpc->msgbuf = g_string_new("");
+
+    nfs_program_3(req);
 
     return TRUE;
 }