#include "bluesky.h"
extern BlueSkyFS *fs;
+static int outstanding_rpcs = 0;
+static struct bluesky_stats *rpc_recv_stats, *rpc_send_stats;
+
/* TCP port number to use for NFS protocol. (Should be 2049.) */
#define NFS_SERVICE_PORT 2051
{
struct rpc_fail_reply header;
+ g_atomic_int_add(&outstanding_rpcs, -1);
+
header.xid = htonl(req->xid);
header.type = htonl(1); /* REPLY */
header.stat = htonl(MSG_ACCEPTED);
/* For UDP, a connection only exists for the duration of a single
* message. */
g_mutex_free(req->connection->send_lock);
+ g_string_free(req->connection->msgbuf, TRUE);
g_string_free(req->connection->sendbuf, TRUE);
g_free(req->connection);
}
return;
}
+ g_atomic_int_add(&outstanding_rpcs, -1);
+ bluesky_stats_add(rpc_send_stats, str->len);
+
struct rpc_reply header;
header.xid = htonl(req->xid);
header.type = htonl(1); /* REPLY */
time_end = bluesky_now_hires();
+#if 0
printf("RPC[%"PRIx32"]: time = %"PRId64" ns\n",
req->xid, time_end - req->time_start);
+#endif
/* Clean up. */
g_string_free(str, TRUE);
/* For UDP, a connection only exists for the duration of a single
* message. */
g_mutex_free(req->connection->send_lock);
+ g_string_free(req->connection->msgbuf, TRUE);
g_string_free(req->connection->sendbuf, TRUE);
g_free(req->connection);
}
xdrproc_t _xdr_argument, _xdr_result;
char *(*local)(char *, RPCRequest *);
+#if 0
if (req->req_proc < sizeof(nfs_proc_names) / sizeof(const char *)) {
printf("Dispatched NFS RPC message type %s\n",
nfs_proc_names[req->req_proc]);
} else {
printf("Dispatched unknown NFS RPC message type %d\n", req->req_proc);
}
+#endif
switch (req->req_proc) {
case NFSPROC3_NULL:
static gboolean async_flushd(gpointer data)
{
+#if 0
+ int rpc_count = g_atomic_int_get(&outstanding_rpcs);
+ if (rpc_count != 0) {
+ g_print("Currently outstanding RPC requests: %d\n", rpc_count);
+ }
+#endif
+
if (fs_dump_requested) {
bluesky_debug_dump(fs);
+ bluesky_stats_dump_all();
fs_dump_requested = 0;
}
main_context = g_main_context_new();
main_loop = g_main_loop_new(main_context, FALSE);
- rpc_thread_pool = g_thread_pool_new(async_rpc_task, NULL, -1, FALSE, NULL);
+ rpc_thread_pool = g_thread_pool_new(async_rpc_task, NULL,
+ bluesky_max_threads, FALSE, NULL);
/* Arrange to have the cache writeback code run every five seconds. */
GSource *source = g_timeout_source_new_seconds(5);
GString *msg = rpc->msgbuf;
const char *buf = msg->str;
+ bluesky_stats_add(rpc_recv_stats, msg->len);
+
if (msg->len < sizeof(struct rpc_call_header)) {
fprintf(stderr, "Short RPC message: only %zd bytes!\n", msg->len);
return FALSE;
return FALSE;
}
+ g_atomic_int_add(&outstanding_rpcs, 1);
+
RPCRequest *req = g_new0(RPCRequest, 1);
req->connection = rpc;
req->time_start = time_start;
buf += sizeof(struct rpc_call_header);
for (i = 0; i < 2; i++) {
struct rpc_auth *auth = (struct rpc_auth *)buf;
- if (buf - msg->str + sizeof(struct rpc_auth) > msg->len)
+ if (buf - msg->str + sizeof(struct rpc_auth) > msg->len) {
+ g_atomic_int_add(&outstanding_rpcs, -1);
return FALSE;
+ }
gsize authsize = ntohl(auth->len) + sizeof(struct rpc_auth);
- if (authsize > MAX_RPC_MSGSIZE)
+ if (authsize > MAX_RPC_MSGSIZE) {
+ g_atomic_int_add(&outstanding_rpcs, -1);
return FALSE;
+ }
buf += authsize;
}
- if (buf - msg->str > msg->len)
+ if (buf - msg->str > msg->len) {
+ g_atomic_int_add(&outstanding_rpcs, -1);
return FALSE;
+ }
req->raw_args = msg;
req->raw_args_header_bytes = buf - msg->str;
{
SVCXPRT *transp;
+ rpc_recv_stats = bluesky_stats_new("NFS RPC Messages In");
+ rpc_send_stats = bluesky_stats_new("NFS RPC Messages Out");
+
async_rpc_init();
/* MOUNT protocol */