X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=nfs3%2Frpc.c;h=468c58c2823d44077f59b4cde8ecf88933e3e47b;hb=7bb8900e1209a2abf3d83e7f06e2dfb314cef61e;hp=7376535cf86182db0cf3ee89efd042b83199ed96;hpb=cb5c460ba6a31c27ac0d6803c33e9e6c5a140acd;p=bluesky.git diff --git a/nfs3/rpc.c b/nfs3/rpc.c index 7376535..468c58c 100644 --- a/nfs3/rpc.c +++ b/nfs3/rpc.c @@ -32,22 +32,6 @@ extern BlueSkyFS *fs; /* Maximum size of a single RPC message that we will accept (8 MB). */ #define MAX_RPC_MSGSIZE (8 << 20) -/* For now, used for NFS only. */ -typedef struct { - GIOChannel *channel; - - /* The reassembled message, thus far. */ - GString *msgbuf; - - /* Remaining number of bytes in this message fragment; 0 if we next expect - * another fragment header. */ - uint32_t frag_len; - - /* If frag_len is zero: the number of bytes of the fragment header that - * have been read so far. */ - int frag_hdr_bytes; -} RPCConnection; - static void mount_program_3(struct svc_req *rqstp, register SVCXPRT *transp) { @@ -137,14 +121,74 @@ struct rpc_fail_reply { uint32_t accept_stat; }; +/* Routines for XDR-encoding to a growable string. */ +static bool_t xdr_string_putlong(XDR *xdrs, const long *lp) +{ + GString *str = (GString *)xdrs->x_private; + uint32_t data = htonl(*lp); + g_string_set_size(str, str->len + 4); + memcpy(str->str + str->len - 4, &data, 4); + return TRUE; +} + +static bool_t xdr_string_putbytes(XDR *xdrs, const char *addr, u_int len) +{ + GString *str = (GString *)xdrs->x_private; + g_string_set_size(str, str->len + len); + memcpy(str->str + str->len - len, addr, len); + return TRUE; +} + +static u_int xdr_string_getpos(const XDR *xdrs) +{ + GString *str = (GString *)xdrs->x_private; + return str->len; +} + +static bool_t xdr_string_putint32(XDR *xdrs, const int32_t *ip) +{ + GString *str = (GString *)xdrs->x_private; + uint32_t data = htonl(*ip); + g_string_set_size(str, str->len + 4); + memcpy(str->str + str->len - 4, &data, 4); + return TRUE; +} + +static int32_t *xdr_string_inline(XDR *xdrs, u_int len) +{ + GString *str = (GString *)xdrs->x_private; + g_string_set_size(str, str->len + len); + return (int32_t *)(str->str + str->len - len); +} + +static void xdr_string_destroy(XDR *xdrs) +{ +} + +static struct xdr_ops xdr_string_ops = { + .x_putlong = xdr_string_putlong, + .x_putbytes = xdr_string_putbytes, + .x_getpostn = xdr_string_getpos, + .x_putint32 = xdr_string_putint32, + .x_inline = xdr_string_inline, + .x_destroy = xdr_string_destroy, +}; + +static void xdr_string_create(XDR *xdrs, GString *string, enum xdr_op op) +{ + xdrs->x_op = op; + xdrs->x_ops = &xdr_string_ops; + xdrs->x_private = (char *)string; + xdrs->x_base = NULL; + xdrs->x_handy = 0; +} + static void -async_rpc_send_failure(RPCConnection *rpc, uint32_t xid, enum accept_stat stat) +async_rpc_send_failure(RPCRequest *req, enum accept_stat stat) { struct rpc_fail_reply header; - fprintf(stderr, "Sending RPC failure status %d\n", stat); - - header.xid = htonl(xid); + header.xid = htonl(req->xid); header.type = htonl(1); /* REPLY */ header.stat = htonl(MSG_ACCEPTED); header.verf_flavor = 0; @@ -152,16 +196,79 @@ async_rpc_send_failure(RPCConnection *rpc, uint32_t xid, enum accept_stat stat) header.accept_stat = htonl(stat); uint32_t fragment = htonl(sizeof(header) | 0x80000000); - async_rpc_write(rpc, (const char *)&fragment, sizeof(fragment)); - async_rpc_write(rpc, (const char *)&header, sizeof(header)); - g_io_channel_flush(rpc->channel, NULL); + async_rpc_write(req->connection, (const char *)&fragment, sizeof(fragment)); + async_rpc_write(req->connection, (const char *)&header, sizeof(header)); + g_io_channel_flush(req->connection->channel, NULL); + + if (req->args != NULL) { + char buf[4]; + XDR xdr; + xdrmem_create(&xdr, buf, sizeof(buf), XDR_FREE); + if (!req->xdr_args_free(&xdr, req->args)) { + fprintf(stderr, "unable to free arguments"); + } + } + + if (req->raw_args != NULL) + g_string_free(req->raw_args, TRUE); + + g_free(req); +} + +void +async_rpc_send_reply(RPCRequest *req, void *result) +{ + GString *str = g_string_new(""); + XDR xdr_out; + xdr_string_create(&xdr_out, str, XDR_ENCODE); + if (!req->xdr_result(&xdr_out, result)) { + async_rpc_send_failure(req, SYSTEM_ERR); + g_string_free(str, TRUE); + return; + } + + struct rpc_reply header; + header.xid = htonl(req->xid); + header.type = htonl(1); /* REPLY */ + header.stat = htonl(MSG_ACCEPTED); + header.verf_flavor = 0; + header.verf_len = 0; + header.accept_stat = 0; + + gsize msg_size = str->len; + uint32_t fragment = htonl((msg_size + sizeof(header)) | 0x80000000); + async_rpc_write(req->connection, (const char *)&fragment, sizeof(fragment)); + async_rpc_write(req->connection, (const char *)&header, sizeof(header)); + async_rpc_write(req->connection, str->str, str->len); + g_io_channel_flush(req->connection->channel, NULL); + + /* Clean up. */ + g_string_free(str, TRUE); + + if (req->args != NULL) { + char buf[4]; + XDR xdr; + xdrmem_create(&xdr, buf, sizeof(buf), XDR_FREE); + if (!req->xdr_args_free(&xdr, req->args)) { + fprintf(stderr, "unable to free arguments"); + } + } + + if (req->raw_args != NULL) + g_string_free(req->raw_args, TRUE); + + g_free(req); } static void -nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid, - const char *msg_buf, size_t msg_len) +nfs_program_3(RPCRequest *req) { - union { + RPCConnection *connection = req->connection; + uint32_t xid = req->xid; + const char *msg_buf = req->raw_args->str + req->raw_args_header_bytes; + size_t msg_len = req->raw_args->len - req->raw_args_header_bytes; + + union argtype { nfs_fh3 nfsproc3_getattr_3_arg; setattr3args nfsproc3_setattr_3_arg; diropargs3 nfsproc3_lookup_3_arg; @@ -183,200 +290,168 @@ nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid, nfs_fh3 nfsproc3_fsinfo_3_arg; nfs_fh3 nfsproc3_pathconf_3_arg; commit3args nfsproc3_commit_3_arg; - } argument; + }; char *result; xdrproc_t _xdr_argument, _xdr_result; - char *(*local)(char *, struct svc_req *); + char *(*local)(char *, RPCRequest *); - switch (rqstp->rq_proc) { + printf("Dispatched NFS RPC message type %d\n", req->req_proc); + + switch (req->req_proc) { case NFSPROC3_NULL: _xdr_argument = (xdrproc_t) xdr_void; _xdr_result = (xdrproc_t) xdr_void; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_null_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_null_3_svc; break; case NFSPROC3_GETATTR: _xdr_argument = (xdrproc_t) xdr_nfs_fh3; _xdr_result = (xdrproc_t) xdr_getattr3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_getattr_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_getattr_3_svc; break; case NFSPROC3_SETATTR: _xdr_argument = (xdrproc_t) xdr_setattr3args; _xdr_result = (xdrproc_t) xdr_wccstat3; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_setattr_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_setattr_3_svc; break; case NFSPROC3_LOOKUP: _xdr_argument = (xdrproc_t) xdr_diropargs3; _xdr_result = (xdrproc_t) xdr_lookup3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_lookup_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_lookup_3_svc; break; case NFSPROC3_ACCESS: _xdr_argument = (xdrproc_t) xdr_access3args; _xdr_result = (xdrproc_t) xdr_access3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_access_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_access_3_svc; break; case NFSPROC3_READLINK: _xdr_argument = (xdrproc_t) xdr_nfs_fh3; _xdr_result = (xdrproc_t) xdr_readlink3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_readlink_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_readlink_3_svc; break; case NFSPROC3_READ: _xdr_argument = (xdrproc_t) xdr_read3args; _xdr_result = (xdrproc_t) xdr_read3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_read_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_read_3_svc; break; case NFSPROC3_WRITE: _xdr_argument = (xdrproc_t) xdr_write3args; _xdr_result = (xdrproc_t) xdr_write3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_write_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_write_3_svc; break; case NFSPROC3_CREATE: _xdr_argument = (xdrproc_t) xdr_create3args; _xdr_result = (xdrproc_t) xdr_diropres3; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_create_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_create_3_svc; break; case NFSPROC3_MKDIR: _xdr_argument = (xdrproc_t) xdr_mkdir3args; _xdr_result = (xdrproc_t) xdr_diropres3; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_mkdir_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_mkdir_3_svc; break; case NFSPROC3_SYMLINK: _xdr_argument = (xdrproc_t) xdr_symlink3args; _xdr_result = (xdrproc_t) xdr_diropres3; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_symlink_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_symlink_3_svc; break; case NFSPROC3_MKNOD: _xdr_argument = (xdrproc_t) xdr_mknod3args; _xdr_result = (xdrproc_t) xdr_diropres3; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_mknod_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_mknod_3_svc; break; case NFSPROC3_REMOVE: _xdr_argument = (xdrproc_t) xdr_diropargs3; _xdr_result = (xdrproc_t) xdr_wccstat3; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_remove_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_remove_3_svc; break; case NFSPROC3_RMDIR: _xdr_argument = (xdrproc_t) xdr_diropargs3; _xdr_result = (xdrproc_t) xdr_wccstat3; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_rmdir_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_rmdir_3_svc; break; case NFSPROC3_RENAME: _xdr_argument = (xdrproc_t) xdr_rename3args; _xdr_result = (xdrproc_t) xdr_rename3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_rename_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_rename_3_svc; break; case NFSPROC3_LINK: _xdr_argument = (xdrproc_t) xdr_link3args; _xdr_result = (xdrproc_t) xdr_link3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_link_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_link_3_svc; break; case NFSPROC3_READDIR: _xdr_argument = (xdrproc_t) xdr_readdir3args; _xdr_result = (xdrproc_t) xdr_readdir3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_readdir_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_readdir_3_svc; break; case NFSPROC3_READDIRPLUS: _xdr_argument = (xdrproc_t) xdr_readdirplus3args; _xdr_result = (xdrproc_t) xdr_readdirplus3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_readdirplus_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_readdirplus_3_svc; break; case NFSPROC3_FSSTAT: _xdr_argument = (xdrproc_t) xdr_nfs_fh3; _xdr_result = (xdrproc_t) xdr_fsstat3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_fsstat_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_fsstat_3_svc; break; case NFSPROC3_FSINFO: _xdr_argument = (xdrproc_t) xdr_nfs_fh3; _xdr_result = (xdrproc_t) xdr_fsinfo3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_fsinfo_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_fsinfo_3_svc; break; case NFSPROC3_PATHCONF: _xdr_argument = (xdrproc_t) xdr_nfs_fh3; _xdr_result = (xdrproc_t) xdr_pathconf3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_pathconf_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_pathconf_3_svc; break; case NFSPROC3_COMMIT: _xdr_argument = (xdrproc_t) xdr_commit3args; _xdr_result = (xdrproc_t) xdr_commit3res; - local = (char *(*)(char *, struct svc_req *)) nfsproc3_commit_3_svc; + local = (char *(*)(char *, RPCRequest *)) nfsproc3_commit_3_svc; break; default: - async_rpc_send_failure(connection, xid, PROC_UNAVAIL); + async_rpc_send_failure(req, PROC_UNAVAIL); return; } /* Decode incoming message */ - memset ((char *)&argument, 0, sizeof (argument)); + req->xdr_args_free = _xdr_argument; + req->args = g_new0(union argtype, 1); XDR xdr_in; xdrmem_create(&xdr_in, (char *)msg_buf, msg_len, XDR_DECODE); - int i; - printf("Call XDR: "); - for (i = 0; i < msg_len; i++) { - printf("%02x ", (uint8_t)msg_buf[i]); - } - printf("\n"); - if (!_xdr_argument(&xdr_in, (caddr_t)&argument)) { - async_rpc_send_failure(connection, xid, GARBAGE_ARGS); + if (!_xdr_argument(&xdr_in, req->args)) { + async_rpc_send_failure(req, GARBAGE_ARGS); fprintf(stderr, "RPC decode error!\n"); return; } /* Perform the call. */ - result = (*local)((char *)&argument, rqstp); - - /* Encode result and send reply. */ - static char reply_buf[MAX_RPC_MSGSIZE]; - XDR xdr_out; - xdrmem_create(&xdr_out, reply_buf, MAX_RPC_MSGSIZE, XDR_ENCODE); - if (result != NULL && !_xdr_result(&xdr_out, result)) { - async_rpc_send_failure(connection, xid, SYSTEM_ERR); - } - - struct rpc_reply header; - header.xid = htonl(xid); - header.type = htonl(1); /* REPLY */ - header.stat = htonl(MSG_ACCEPTED); - header.verf_flavor = 0; - header.verf_len = 0; - header.accept_stat = 0; - - gsize msg_size = xdr_out.x_ops->x_getpostn(&xdr_out); - printf("Have an RPC reply of size %zd bytes\n", msg_size); - uint32_t fragment = htonl((msg_size + sizeof(header)) | 0x80000000); - async_rpc_write(connection, (const char *)&fragment, sizeof(fragment)); - async_rpc_write(connection, (const char *)&header, sizeof(header)); - async_rpc_write(connection, reply_buf, msg_size); - g_io_channel_flush(connection->channel, NULL); - - /* Clean up. */ - xdr_in.x_op = XDR_FREE; - if (!_xdr_argument(&xdr_in, (caddr_t)&argument)) { - fprintf (stderr, "%s", "unable to free arguments"); - exit (1); - } + req->xdr_result = _xdr_result; + result = (*local)((char *)req->args, req); bluesky_flushd_invoke(fs); + bluesky_debug_dump(fs); return; } @@ -431,13 +506,19 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc) if (ntohl(header->rpcvers) != 2) { return FALSE; - } else if (ntohl(header->prog) != NFS_PROGRAM) { - async_rpc_send_failure(rpc, xid, PROG_UNAVAIL); + } + + RPCRequest *req = g_new0(RPCRequest, 1); + req->connection = rpc; + req->xid = xid; + + if (ntohl(header->prog) != NFS_PROGRAM) { + async_rpc_send_failure(req, PROG_UNAVAIL); return TRUE; } else if (ntohl(header->vers) != NFS_V3) { /* FIXME: Should be PROG_MISMATCH */ - async_rpc_send_failure(rpc, xid, PROG_UNAVAIL); - return FALSE; + async_rpc_send_failure(req, PROG_UNAVAIL); + return TRUE; } uint32_t proc = ntohl(header->proc); @@ -459,20 +540,12 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc) if (buf - msg->str > msg->len) return FALSE; - printf("Dispatching RPC procedure %d...\n", proc); - - struct svc_req req; - req.rq_prog = ntohl(header->prog); - req.rq_vers = ntohl(header->vers); - req.rq_proc = ntohl(header->proc); - req.rq_cred.oa_flavor = 0; - req.rq_cred.oa_base = NULL; - req.rq_cred.oa_length = 0; - req.rq_clntcred = NULL; - req.rq_xprt = NULL; + req->raw_args = msg; + req->raw_args_header_bytes = buf - msg->str; + req->req_proc = ntohl(header->proc); + rpc->msgbuf = g_string_new(""); - nfs_program_3(&req, rpc, ntohl(header->xid), buf, - (msg->str + msg->len) - buf); + nfs_program_3(req); return TRUE; } @@ -562,7 +635,6 @@ static gboolean async_rpc_do_read(GIOChannel *channel, rpc->frag_len = ntohl(rpc->frag_len); g_string_set_size(rpc->msgbuf, rpc->msgbuf->len - 4); rpc->frag_hdr_bytes = 0; - g_print("RPC fragment header: %08x\n", rpc->frag_len); } } else { /* We were reading in the fragment body. */ @@ -571,7 +643,6 @@ static gboolean async_rpc_do_read(GIOChannel *channel, if (rpc->frag_len = 0x80000000) { /* We have a complete message since this was the last fragment and * there are no more bytes in it. Dispatch the message. */ - g_print("Complete RPC message: %zd bytes\n", rpc->msgbuf->len); if (!async_rpc_dispatch(rpc)) { fprintf(stderr, "Invalid RPC message, closing channel\n"); g_io_channel_shutdown(rpc->channel, TRUE, NULL);