X-Git-Url: http://git.vrable.net/?a=blobdiff_plain;f=nfs3%2Frpc.c;h=05d9722480d24fdc5f711b86dd7172dd50e83d16;hb=2f8003e3de64956cec66cf3f642c35795c236a17;hp=e1cb4d135b2397c3591cfd9d2763b8ed836a0172;hpb=d8cf6ebaaa04e7dece0633f6242fe67632015d23;p=bluesky.git diff --git a/nfs3/rpc.c b/nfs3/rpc.c index e1cb4d1..05d9722 100644 --- a/nfs3/rpc.c +++ b/nfs3/rpc.c @@ -27,6 +27,9 @@ #include "bluesky.h" extern BlueSkyFS *fs; +static int outstanding_rpcs = 0; +static struct bluesky_stats *rpc_recv_stats, *rpc_send_stats; + /* TCP port number to use for NFS protocol. (Should be 2049.) */ #define NFS_SERVICE_PORT 2051 @@ -128,6 +131,8 @@ async_rpc_send_failure(RPCRequest *req, enum accept_stat stat) { struct rpc_fail_reply header; + g_atomic_int_add(&outstanding_rpcs, -1); + header.xid = htonl(req->xid); header.type = htonl(1); /* REPLY */ header.stat = htonl(MSG_ACCEPTED); @@ -144,6 +149,8 @@ async_rpc_send_failure(RPCRequest *req, enum accept_stat stat) async_rpc_flush(req->connection); g_mutex_unlock(req->connection->send_lock); + bluesky_profile_free(req->profile); + if (req->args != NULL) { char buf[4]; XDR xdr; @@ -168,6 +175,7 @@ async_rpc_send_failure(RPCRequest *req, enum accept_stat stat) /* For UDP, a connection only exists for the duration of a single * message. */ g_mutex_free(req->connection->send_lock); + g_string_free(req->connection->msgbuf, TRUE); g_string_free(req->connection->sendbuf, TRUE); g_free(req->connection); } @@ -180,6 +188,9 @@ async_rpc_send_reply(RPCRequest *req, void *result) { bluesky_time_hires time_end; + bluesky_profile_add_event(req->profile, + g_strdup("Start encoding NFS response")); + GString *str = g_string_new(""); XDR xdr_out; xdr_string_create(&xdr_out, str, XDR_ENCODE); @@ -189,6 +200,9 @@ async_rpc_send_reply(RPCRequest *req, void *result) return; } + g_atomic_int_add(&outstanding_rpcs, -1); + bluesky_stats_add(rpc_send_stats, str->len); + struct rpc_reply header; header.xid = htonl(req->xid); header.type = htonl(1); /* REPLY */ @@ -210,10 +224,17 @@ async_rpc_send_reply(RPCRequest *req, void *result) time_end = bluesky_now_hires(); +#if 0 printf("RPC[%"PRIx32"]: time = %"PRId64" ns\n", req->xid, time_end - req->time_start); +#endif + + bluesky_profile_add_event(req->profile, + g_strdup("NFS reply sent")); + bluesky_profile_print(req->profile); /* Clean up. */ + bluesky_profile_free(req->profile); g_string_free(str, TRUE); if (req->args != NULL) { @@ -240,6 +261,7 @@ async_rpc_send_reply(RPCRequest *req, void *result) /* For UDP, a connection only exists for the duration of a single * message. */ g_mutex_free(req->connection->send_lock); + g_string_free(req->connection->msgbuf, TRUE); g_string_free(req->connection->sendbuf, TRUE); g_free(req->connection); } @@ -307,11 +329,14 @@ nfs_program_3(RPCRequest *req) xdrproc_t _xdr_argument, _xdr_result; char *(*local)(char *, RPCRequest *); + bluesky_profile_set(req->profile); + if (req->req_proc < sizeof(nfs_proc_names) / sizeof(const char *)) { - printf("Dispatched NFS RPC message type %s\n", - nfs_proc_names[req->req_proc]); - } else { - printf("Dispatched unknown NFS RPC message type %d\n", req->req_proc); + bluesky_profile_add_event( + req->profile, + g_strdup_printf("Dispatching NFS %s request", + nfs_proc_names[req->req_proc]) + ); } switch (req->req_proc) { @@ -489,8 +514,16 @@ static void sig_handler(int sig) static gboolean async_flushd(gpointer data) { +#if 0 + int rpc_count = g_atomic_int_get(&outstanding_rpcs); + if (rpc_count != 0) { + g_print("Currently outstanding RPC requests: %d\n", rpc_count); + } +#endif + if (fs_dump_requested) { bluesky_debug_dump(fs); + bluesky_stats_dump_all(); fs_dump_requested = 0; } @@ -508,7 +541,8 @@ static async_rpc_init() main_context = g_main_context_new(); main_loop = g_main_loop_new(main_context, FALSE); - rpc_thread_pool = g_thread_pool_new(async_rpc_task, NULL, -1, FALSE, NULL); + rpc_thread_pool = g_thread_pool_new(async_rpc_task, NULL, + bluesky_max_threads, FALSE, NULL); /* Arrange to have the cache writeback code run every five seconds. */ GSource *source = g_timeout_source_new_seconds(5); @@ -550,6 +584,8 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc) GString *msg = rpc->msgbuf; const char *buf = msg->str; + bluesky_stats_add(rpc_recv_stats, msg->len); + if (msg->len < sizeof(struct rpc_call_header)) { fprintf(stderr, "Short RPC message: only %zd bytes!\n", msg->len); return FALSE; @@ -567,9 +603,12 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc) return FALSE; } + g_atomic_int_add(&outstanding_rpcs, 1); + RPCRequest *req = g_new0(RPCRequest, 1); req->connection = rpc; - req->time_start = time_start; + req->profile = bluesky_profile_new(); + bluesky_profile_add_event(req->profile, g_strdup("Receive NFS request")); req->xid = xid; if (ntohl(header->prog) != NFS_PROGRAM) { @@ -587,18 +626,24 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc) buf += sizeof(struct rpc_call_header); for (i = 0; i < 2; i++) { struct rpc_auth *auth = (struct rpc_auth *)buf; - if (buf - msg->str + sizeof(struct rpc_auth) > msg->len) + if (buf - msg->str + sizeof(struct rpc_auth) > msg->len) { + g_atomic_int_add(&outstanding_rpcs, -1); return FALSE; + } gsize authsize = ntohl(auth->len) + sizeof(struct rpc_auth); - if (authsize > MAX_RPC_MSGSIZE) + if (authsize > MAX_RPC_MSGSIZE) { + g_atomic_int_add(&outstanding_rpcs, -1); return FALSE; + } buf += authsize; } - if (buf - msg->str > msg->len) + if (buf - msg->str > msg->len) { + g_atomic_int_add(&outstanding_rpcs, -1); return FALSE; + } req->raw_args = msg; req->raw_args_header_bytes = buf - msg->str; @@ -834,6 +879,9 @@ void register_rpc() { SVCXPRT *transp; + rpc_recv_stats = bluesky_stats_new("NFS RPC Messages In"); + rpc_send_stats = bluesky_stats_new("NFS RPC Messages Out"); + async_rpc_init(); /* MOUNT protocol */