From 3ec75fa45814e987f77224887350a9f736ed5b92 Mon Sep 17 00:00:00 2001 From: Michael Vrable Date: Wed, 13 Jan 2010 11:05:39 -0800 Subject: [PATCH] More cleanups to move RPC layer towards being asynchronous. --- nfs3/nfs3_prot.h | 46 +++++++++++++++++++- nfs3/rpc.c | 108 +++++++++++++++++++++++------------------------ 2 files changed, 98 insertions(+), 56 deletions(-) diff --git a/nfs3/nfs3_prot.h b/nfs3/nfs3_prot.h index d5152a1..7f75928 100644 --- a/nfs3/nfs3_prot.h +++ b/nfs3/nfs3_prot.h @@ -10,7 +10,6 @@ extern "C" { #endif - typedef u_quad_t uint64; typedef quad_t int64; @@ -639,6 +638,51 @@ struct commit3res { }; typedef struct commit3res commit3res; +/* Structure for tracking a single incoming TCP connection for RPCs. For now, + * used for NFS only. */ +typedef struct { + GIOChannel *channel; + + /* The reassembled message, thus far. */ + GString *msgbuf; + + /* Remaining number of bytes in this message fragment; 0 if we next expect + * another fragment header. */ + uint32_t frag_len; + + /* If frag_len is zero: the number of bytes of the fragment header that + * have been read so far. */ + int frag_hdr_bytes; +} RPCConnection; + +/* Used to track a single outstanding RPC request. Not all of the fields are + * initially filled in, but more are filled in as the request is processed. */ +typedef struct { + /* The corresponding connection on which the request was made. */ + RPCConnection *connection; + + /* Transaction ID of the request, in host byte order. */ + uint32_t xid; + + /* Raw XDR arguments for the call, including headers (everything except the + * fragment headers). Also, the offset to the actual arguments (number of + * bytes making up the headers). */ + GString *raw_args; + size_t raw_args_header_bytes; + + /* Decoded header information. */ + int req_proc; + + /* The XDR-decoded argument of the call, and the procedure to use for + * freeing these arguments. The actual freeing is done automatically when + * the response is sent. */ + void *args; + xdrproc_t xdr_args_free; + + /* Procedure to be used for encoding the eventual return value into XDR. */ + xdrproc_t xdr_result; +} RPCRequest; + #define NFS_PROGRAM 100003 #define NFS_V3 3 diff --git a/nfs3/rpc.c b/nfs3/rpc.c index 7376535..d47943c 100644 --- a/nfs3/rpc.c +++ b/nfs3/rpc.c @@ -32,22 +32,6 @@ extern BlueSkyFS *fs; /* Maximum size of a single RPC message that we will accept (8 MB). */ #define MAX_RPC_MSGSIZE (8 << 20) -/* For now, used for NFS only. */ -typedef struct { - GIOChannel *channel; - - /* The reassembled message, thus far. */ - GString *msgbuf; - - /* Remaining number of bytes in this message fragment; 0 if we next expect - * another fragment header. */ - uint32_t frag_len; - - /* If frag_len is zero: the number of bytes of the fragment header that - * have been read so far. */ - int frag_hdr_bytes; -} RPCConnection; - static void mount_program_3(struct svc_req *rqstp, register SVCXPRT *transp) { @@ -138,13 +122,13 @@ struct rpc_fail_reply { }; static void -async_rpc_send_failure(RPCConnection *rpc, uint32_t xid, enum accept_stat stat) +async_rpc_send_failure(RPCRequest *req, enum accept_stat stat) { struct rpc_fail_reply header; fprintf(stderr, "Sending RPC failure status %d\n", stat); - header.xid = htonl(xid); + header.xid = htonl(req->xid); header.type = htonl(1); /* REPLY */ header.stat = htonl(MSG_ACCEPTED); header.verf_flavor = 0; @@ -152,16 +136,34 @@ async_rpc_send_failure(RPCConnection *rpc, uint32_t xid, enum accept_stat stat) header.accept_stat = htonl(stat); uint32_t fragment = htonl(sizeof(header) | 0x80000000); - async_rpc_write(rpc, (const char *)&fragment, sizeof(fragment)); - async_rpc_write(rpc, (const char *)&header, sizeof(header)); - g_io_channel_flush(rpc->channel, NULL); + async_rpc_write(req->connection, (const char *)&fragment, sizeof(fragment)); + async_rpc_write(req->connection, (const char *)&header, sizeof(header)); + g_io_channel_flush(req->connection->channel, NULL); + + if (req->raw_args != NULL) + g_string_free(req->raw_args, TRUE); + + if (req->args != NULL) { + char buf[4]; + XDR xdr; + xdrmem_create(&xdr, buf, sizeof(buf), XDR_FREE); + if (!req->xdr_args_free(&xdr, req->args)) { + fprintf(stderr, "unable to free arguments"); + } + } + + g_free(req); } static void -nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid, - const char *msg_buf, size_t msg_len) +nfs_program_3(RPCRequest *req) { - union { + RPCConnection *connection = req->connection; + uint32_t xid = req->xid; + const char *msg_buf = req->raw_args->str + req->raw_args_header_bytes; + size_t msg_len = req->raw_args->len - req->raw_args_header_bytes; + + union argtype { nfs_fh3 nfsproc3_getattr_3_arg; setattr3args nfsproc3_setattr_3_arg; diropargs3 nfsproc3_lookup_3_arg; @@ -183,12 +185,12 @@ nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid, nfs_fh3 nfsproc3_fsinfo_3_arg; nfs_fh3 nfsproc3_pathconf_3_arg; commit3args nfsproc3_commit_3_arg; - } argument; + }; char *result; xdrproc_t _xdr_argument, _xdr_result; char *(*local)(char *, struct svc_req *); - switch (rqstp->rq_proc) { + switch (req->req_proc) { case NFSPROC3_NULL: _xdr_argument = (xdrproc_t) xdr_void; _xdr_result = (xdrproc_t) xdr_void; @@ -322,35 +324,30 @@ nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid, break; default: - async_rpc_send_failure(connection, xid, PROC_UNAVAIL); + async_rpc_send_failure(req, PROC_UNAVAIL); return; } /* Decode incoming message */ - memset ((char *)&argument, 0, sizeof (argument)); + req->xdr_args_free = _xdr_argument; + req->args = g_new0(union argtype, 1); XDR xdr_in; xdrmem_create(&xdr_in, (char *)msg_buf, msg_len, XDR_DECODE); - int i; - printf("Call XDR: "); - for (i = 0; i < msg_len; i++) { - printf("%02x ", (uint8_t)msg_buf[i]); - } - printf("\n"); - if (!_xdr_argument(&xdr_in, (caddr_t)&argument)) { - async_rpc_send_failure(connection, xid, GARBAGE_ARGS); + if (!_xdr_argument(&xdr_in, req->args)) { + async_rpc_send_failure(req, GARBAGE_ARGS); fprintf(stderr, "RPC decode error!\n"); return; } /* Perform the call. */ - result = (*local)((char *)&argument, rqstp); + result = (*local)((char *)req->args, NULL); /* Encode result and send reply. */ static char reply_buf[MAX_RPC_MSGSIZE]; XDR xdr_out; xdrmem_create(&xdr_out, reply_buf, MAX_RPC_MSGSIZE, XDR_ENCODE); if (result != NULL && !_xdr_result(&xdr_out, result)) { - async_rpc_send_failure(connection, xid, SYSTEM_ERR); + async_rpc_send_failure(req, SYSTEM_ERR); } struct rpc_reply header; @@ -371,10 +368,11 @@ nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid, /* Clean up. */ xdr_in.x_op = XDR_FREE; - if (!_xdr_argument(&xdr_in, (caddr_t)&argument)) { + if (!_xdr_argument(&xdr_in, (caddr_t)req->args)) { fprintf (stderr, "%s", "unable to free arguments"); exit (1); } + g_free(req->args); bluesky_flushd_invoke(fs); @@ -431,13 +429,19 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc) if (ntohl(header->rpcvers) != 2) { return FALSE; - } else if (ntohl(header->prog) != NFS_PROGRAM) { - async_rpc_send_failure(rpc, xid, PROG_UNAVAIL); + } + + RPCRequest *req = g_new0(RPCRequest, 1); + req->connection = rpc; + req->xid = xid; + + if (ntohl(header->prog) != NFS_PROGRAM) { + async_rpc_send_failure(req, PROG_UNAVAIL); return TRUE; } else if (ntohl(header->vers) != NFS_V3) { /* FIXME: Should be PROG_MISMATCH */ - async_rpc_send_failure(rpc, xid, PROG_UNAVAIL); - return FALSE; + async_rpc_send_failure(req, PROG_UNAVAIL); + return TRUE; } uint32_t proc = ntohl(header->proc); @@ -461,18 +465,12 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc) printf("Dispatching RPC procedure %d...\n", proc); - struct svc_req req; - req.rq_prog = ntohl(header->prog); - req.rq_vers = ntohl(header->vers); - req.rq_proc = ntohl(header->proc); - req.rq_cred.oa_flavor = 0; - req.rq_cred.oa_base = NULL; - req.rq_cred.oa_length = 0; - req.rq_clntcred = NULL; - req.rq_xprt = NULL; - - nfs_program_3(&req, rpc, ntohl(header->xid), buf, - (msg->str + msg->len) - buf); + req->raw_args = msg; + req->raw_args_header_bytes = buf - msg->str; + req->req_proc = ntohl(header->proc); + rpc->msgbuf = g_string_new(""); + + nfs_program_3(req); return TRUE; } -- 2.20.1