Import TBBT (NFS trace replay).
[bluesky.git] / TBBT / trace_play / rpc / sfs_ctcp.c
1 #ifndef lint
2 static char sfs_ctcp_id[] = "@(#)sfs_ctcp.c     2.1     97/10/23";
3 #endif
4 /* @(#)clnt_tcp.c       2.2 88/08/01 4.0 RPCSRC */
5 /*
6  *   Copyright (c) 1992-1997,2001 by Standard Performance Evaluation Corporation
7  *      All rights reserved.
8  *              Standard Performance Evaluation Corporation (SPEC)
9  *              6585 Merchant Place, Suite 100
10  *              Warrenton, VA 20187
11  *
12  *      This product contains benchmarks acquired from several sources who
13  *      understand and agree with SPEC's goal of creating fair and objective
14  *      benchmarks to measure computer performance.
15  *
16  *      This copyright notice is placed here only to protect SPEC in the
17  *      event the source is misused in any manner that is contrary to the
18  *      spirit, the goals and the intent of SPEC.
19  *
20  *      The source code is provided to the user or company under the license
21  *      agreement for the SPEC Benchmark Suite for this product.
22  */
23 /*
24  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
25  * unrestricted use provided that this legend is included on all tape
26  * media and as a part of the software program in whole or part.  Users
27  * may copy or modify Sun RPC without charge, but are not authorized
28  * to license or distribute it to anyone else except as part of a product or
29  * program developed by the user.
30  * 
31  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
32  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
33  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
34  * 
35  * Sun RPC is provided with no support and without any obligation on the
36  * part of Sun Microsystems, Inc. to assist in its use, correction,
37  * modification or enhancement.
38  * 
39  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
40  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
41  * OR ANY PART THEREOF.
42  * 
43  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
44  * or profits or other special, indirect and consequential damages, even if
45  * Sun has been advised of the possibility of such damages.
46  * 
47  * Sun Microsystems, Inc.
48  * 2550 Garcia Avenue
49  * Mountain View, California  94043
50  */
51 #if !defined(lint) && defined(SCCSIDS)
52 static char sccsid[] = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
53 #endif
54  
55 /*
56  * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
57  *
58  * Copyright (C) 1984, Sun Microsystems, Inc.
59  *
60  * TCP based RPC supports 'batched calls'.
61  * A sequence of calls may be batched-up in a send buffer.  The rpc call
62  * return immediately to the client even though the call was not necessarily
63  * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
64  * the rpc timeout value is zero (see clnt.h, rpc).
65  *
66  * Clients should NOT casually batch calls that in fact return results; that is,
67  * the server side should be aware that a call is batched and not produce any
68  * return message.  Batched calls that produce many result messages can
69  * deadlock (netlock) the client and the server....
70  *
71  * Now go hang yourself.
72  */
73
74
75 #include <stdio.h>
76 #include <stdlib.h>
77 #include <unistd.h>
78 #include <string.h>
79 #include "rpc/rpc.h"
80 #include "rpc/osdep.h"
81 #include <netdb.h>
82 #include <errno.h>
83 #include "rpc/pmap_clnt.h"
84
85 #define MCALL_MSG_SIZE 24
86
87 struct ct_data {
88         int             ct_sock;
89         bool_t          ct_closeit;
90         struct timeval  ct_wait;
91         struct sockaddr_in ct_addr; 
92         struct sockaddr_in ct_oaddr; 
93         struct rpc_err  ct_error;
94         char            ct_mcall[MCALL_MSG_SIZE];       /* marshalled callmsg */
95         uint_t          ct_mpos;                        /* pos after marshal */
96         XDR             ct_xdrs;
97         int             ct_first;
98         uint32_t        ct_prog;
99         uint32_t        ct_vers;
100         uint_t          ct_sendsz;
101         uint_t          ct_recvsz;
102 };
103
104 static int      readtcp(struct ct_data *, char *, int);
105 static int      writetcp(struct ct_data *ct, char * buf, int len);
106
107 static enum clnt_stat   sfs_ctcp_call(CLIENT *, uint32_t, xdrproc_t, void *,
108                                         xdrproc_t, void *, struct timeval);
109 static void             sfs_ctcp_abort(CLIENT *);
110 static void             sfs_ctcp_geterr(CLIENT *, struct rpc_err *);
111 static bool_t           sfs_ctcp_freeres(CLIENT *, xdrproc_t, void *);
112 static bool_t           sfs_ctcp_control(CLIENT *, uint_t, void *);
113 static void             sfs_ctcp_destroy(CLIENT *);
114 static bool_t           sfs_ctcp_getreply(CLIENT *, xdrproc_t, void *,
115                                 int, uint32_t *, uint32_t *, struct timeval *);
116 static int              sfs_ctcp_poll(CLIENT *, uint32_t);
117
118 static struct clnt_ops tcp_ops = {
119         sfs_ctcp_call,
120         sfs_ctcp_abort,
121         sfs_ctcp_geterr,
122         sfs_ctcp_freeres,
123         sfs_ctcp_destroy,
124         sfs_ctcp_control,
125         sfs_ctcp_getreply,
126         sfs_ctcp_poll
127 };
128
129 static int
130 sfs_ctcp_make_conn(
131         struct ct_data *ct,
132         struct sockaddr_in *raddr,
133         int *sockp)
134 {
135         int i;
136         int min_buf_sz;
137         int new_buf_sz;
138         int type;
139         int error;
140         int setopt = 1;
141 #if defined(UNIXWARE) || defined(AIX)
142         size_t optlen;
143 #else
144         int optlen;
145 #endif
146
147         if (ct->ct_first == 0)
148                 ct->ct_first++;
149
150 #ifdef DEBUG
151         if (ct->ct_first)
152                 (void) fprintf(stderr, "Re-establishing connection.\n");
153 #endif
154
155         /*
156          * If no port number given ask the pmap for one
157          */
158         if (raddr->sin_port == 0) {
159                 uint16_t port;
160                 if ((port = pmap_getport(raddr, ct->ct_prog, ct->ct_vers,
161                                                         IPPROTO_TCP)) == 0) {
162                         return (-1);
163                 }
164                 raddr->sin_port = htons(port);
165         }
166
167         /*
168          * If no socket given, open one
169          */
170         if (*sockp >= 0) {
171                 ct->ct_closeit = FALSE;
172                 return (*sockp);
173         }
174
175         ct->ct_closeit = TRUE;
176
177         *sockp = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
178         (void)bindresvport(*sockp, (struct sockaddr_in *)0);
179         if ((*sockp < 0)
180             || (connect(*sockp, (struct sockaddr *)raddr,
181             sizeof(struct sockaddr_in)) < 0)) {
182                 return (-1);
183         }
184
185         /*
186          * Need to try to size the socket buffers based on the number of
187          * outstanding requests desired.  NFS reads and writes can do as
188          * much as 8K per request which can quickly run us out of space
189          * on the socket buffer queue.  Use the maximum number of bio style
190          * requests * NFS_MAXDATA plus a pad as a starting point for desired
191          * socket buffer size and then back off by NFS_MAXDATA until the buffer
192          * sizes are successfully set.  Note, the algorithm never sets the
193          * buffer size to less than the OS default.
194          */
195         type = SO_SNDBUF;
196         for (i = 0; i < 2; i++) {
197                 optlen = sizeof(min_buf_sz);
198 #ifdef UNIXWARE
199                 if (getsockopt(*sockp, SOL_SOCKET, type,
200                                 (void *)&min_buf_sz, &optlen) < 0) {
201                         /* guess the default */
202                         min_buf_sz = 18 * 1024;
203                 }
204 #else
205                 if (getsockopt(*sockp, SOL_SOCKET, type,
206                                 (char *)&min_buf_sz, &optlen) < 0) {
207                         /* guess the default */
208                         min_buf_sz = 18 * 1024;
209                 }
210 #endif
211
212                 new_buf_sz = 512 * 1024;
213                 if (new_buf_sz > min_buf_sz) {
214                         do {
215                                 error = setsockopt(*sockp, SOL_SOCKET,
216                                                 type, (char *)&new_buf_sz,
217                                                 sizeof(int));
218                                 new_buf_sz -= (8 * 1024);
219                         } while (error != 0 && new_buf_sz > min_buf_sz);
220                 }
221
222                 type = SO_RCVBUF;
223         }
224
225 #ifdef TCP_NODELAY
226         setsockopt(*sockp, IPPROTO_TCP, TCP_NODELAY, (char *) &setopt,
227                 sizeof(setopt));
228 #endif /* TCP_NODELAY */
229
230         return (*sockp);
231 }
232
233 /*
234  * Create a client handle for a tcp/ip connection.
235  * If *sockp<0, *sockp is set to a newly created TCP socket and it is
236  * connected to raddr.  If *sockp non-negative then
237  * raddr is ignored.  The rpc/tcp package does buffering
238  * similar to stdio, so the client must pick send and receive buffer sizes,];
239  * 0 => use the default.
240  * If raddr->sin_port is 0, then a binder on the remote machine is
241  * consulted for the right port number.
242  * NB: *sockp is copied into a private area.
243  * NB: It is the clients responsibility to close *sockp.
244  * NB: The rpch->cl_auth is set null authentication.  Caller may wish to set this
245  * something more useful.
246  */
247 CLIENT *
248 sfs_ctcp_create(
249         struct sockaddr_in *raddr,
250         uint32_t prog,
251         uint32_t vers,
252         int *sockp,
253         uint_t sendsz,
254         uint_t recvsz)
255 {
256         CLIENT *h;
257         struct ct_data *ct;
258         struct timeval now;
259         struct rpc_msg call_msg;
260
261         h  = (CLIENT *)mem_alloc(sizeof(CLIENT));
262         if (h == NULL) {
263                 (void)fprintf(stderr, "clnttcp_create: out of memory\n");
264                 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
265                 rpc_createerr.cf_error.re_errno = errno;
266                 goto fooy;
267         }
268         ct = (struct ct_data *)mem_alloc(sizeof(struct ct_data));
269         if (ct == NULL) {
270                 (void)fprintf(stderr, "clnttcp_create: out of memory\n");
271                 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
272                 rpc_createerr.cf_error.re_errno = errno;
273                 goto fooy;
274         }
275         (void) memset(ct, '\0', sizeof (struct ct_data));
276
277         ct->ct_oaddr = *raddr;
278         ct->ct_prog = prog;
279         ct->ct_vers = vers;
280
281         if (sfs_ctcp_make_conn(ct, raddr, sockp) < 0) {
282                 rpc_createerr.cf_stat = RPC_SYSTEMERROR;
283                 rpc_createerr.cf_error.re_errno = errno;
284                 (void)close(*sockp);
285                 goto fooy;
286         }
287
288         /*
289          * Set up private data struct
290          */
291         ct->ct_sock = *sockp;
292         ct->ct_wait.tv_sec = 0;
293         ct->ct_wait.tv_usec = 0;
294         ct->ct_addr = *raddr;
295
296         /*
297          * Initialize call message
298          */
299         (void)gettimeofday(&now, (struct timezone *)0);
300         call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
301         call_msg.rm_direction = CALL;
302         call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
303         call_msg.rm_call.cb_prog = prog;
304         call_msg.rm_call.cb_vers = vers;
305
306         /*
307          * pre-serialize the static part of the call msg and stash it away
308          */
309         xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE,
310             XDR_ENCODE);
311         if (! xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
312                 if (ct->ct_closeit) {
313                         (void)close(*sockp);
314                 }
315                 goto fooy;
316         }
317         ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
318         XDR_DESTROY(&(ct->ct_xdrs));
319
320         /*
321          * Create a client handle which uses xdrrec for serialization
322          * and authnone for authentication.
323          */
324         xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz,
325             (void *)ct, readtcp, writetcp);
326         ct->ct_sendsz = sendsz;
327         ct->ct_recvsz = recvsz;
328         h->cl_ops = &tcp_ops;
329         h->cl_private = (void *) ct;
330         h->cl_auth = authnone_create();
331         return (h);
332
333 fooy:
334         /*
335          * Something goofed, free stuff and barf
336          */
337         mem_free((void *)ct, sizeof(struct ct_data));
338         mem_free((void *)h, sizeof(CLIENT));
339         return ((CLIENT *)NULL);
340 }
341
342 static enum clnt_stat
343 get_areply(
344         CLIENT *h,
345         struct rpc_msg *reply_msg)
346 {
347         struct ct_data *ct = (struct ct_data *) h->cl_private;
348         XDR *xdrs = &(ct->ct_xdrs);
349
350         /* CONSTCOND */
351         while (TRUE) {
352                 reply_msg->acpted_rply.ar_verf = _null_auth;
353                 reply_msg->acpted_rply.ar_results.where = NULL;
354                 reply_msg->acpted_rply.ar_results.proc = xdr_void;
355                 if (! xdrrec_skiprecord(xdrs)) {
356                         return (ct->ct_error.re_status);
357                 }
358                 /* now decode and validate the response header */
359                 if (! xdr_replymsg(xdrs, reply_msg)) {
360                         if (ct->ct_error.re_status == RPC_SUCCESS)
361                                 continue;
362                 }
363                 return (ct->ct_error.re_status);
364         }
365 }
366
367 static enum clnt_stat
368 proc_header(
369         CLIENT *h,
370         struct rpc_msg *reply_msg,
371         xdrproc_t xdr_results,
372         void * results_ptr)
373 {
374         struct ct_data *ct = (struct ct_data *) h->cl_private;
375         XDR *xdrs = &(ct->ct_xdrs);
376
377         /*
378          * process header
379          */
380         _seterr_reply(reply_msg, &(ct->ct_error));
381         if (ct->ct_error.re_status == RPC_SUCCESS) {
382                 if (! AUTH_VALIDATE(h->cl_auth,
383                                         &reply_msg->acpted_rply.ar_verf)) {
384                         ct->ct_error.re_status = RPC_AUTHERROR;
385                         ct->ct_error.re_why = AUTH_INVALIDRESP;
386                 } else if (! (*xdr_results)(xdrs, results_ptr)) {
387                         if (ct->ct_error.re_status == RPC_SUCCESS)
388                                 ct->ct_error.re_status = RPC_CANTDECODERES;
389                 }
390                 /* free verifier ... */
391                 if (reply_msg->acpted_rply.ar_verf.oa_base != NULL) {
392                         xdrs->x_op = XDR_FREE;
393                         (void)xdr_opaque_auth(xdrs,
394                                         &(reply_msg->acpted_rply.ar_verf));
395                 }
396         }  /* end successful completion */
397         return (ct->ct_error.re_status);
398 }
399
400 static enum clnt_stat
401 sfs_ctcp_call(
402         CLIENT *h,
403         uint32_t proc,
404         xdrproc_t xdr_args,
405         void * args_ptr,
406         xdrproc_t xdr_results,
407         void * results_ptr,
408         struct timeval timeout)
409 {
410         struct ct_data *ct = (struct ct_data *) h->cl_private;
411         XDR *xdrs = &(ct->ct_xdrs);
412         struct rpc_msg reply_msg;
413         uint32_t x_id;
414         uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);        /* yuk */
415         bool_t shipnow;
416
417         ct->ct_wait = timeout;
418
419         shipnow =
420             (xdr_results == (xdrproc_t)0 && timeout.tv_sec == 0
421             && timeout.tv_usec == 0) ? FALSE : TRUE;
422
423         xdrs->x_op = XDR_ENCODE;
424         ct->ct_error.re_status = RPC_SUCCESS;
425         x_id = ntohl(--(*msg_x_id));
426         if ((! XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
427             (! XDR_PUTLONG(xdrs, (int32_t *)&proc)) ||
428             (! AUTH_MARSHALL(h->cl_auth, xdrs)) ||
429             (! (*xdr_args)(xdrs, args_ptr))) {
430                 if (ct->ct_error.re_status == RPC_SUCCESS)
431                         ct->ct_error.re_status = RPC_CANTENCODEARGS;
432                 (void)xdrrec_endofrecord(xdrs, TRUE);
433                 return (ct->ct_error.re_status);
434         }
435         if (! xdrrec_endofrecord(xdrs, TRUE))
436                 return (ct->ct_error.re_status = RPC_CANTSEND);
437         /*
438          * Hack to provide rpc-based message passing
439          */
440         if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
441                 if (! shipnow && results_ptr == NULL)
442                         return (RPC_SUCCESS);
443
444                 /*
445                  * Double hack, send back xid in results_prt if non-NULL
446                  */
447                 *(uint32_t *)results_ptr = x_id;
448                 return(ct->ct_error.re_status = RPC_TIMEDOUT);
449         }
450
451
452         /*
453          * Keep receiving until we get a valid transaction id
454          */
455         xdrs->x_op = XDR_DECODE;
456         /* CONSTCOND */
457         while (TRUE) {
458                 enum clnt_stat res;
459
460                 if ((res = get_areply(h, &reply_msg)) != RPC_SUCCESS)
461                         return (res);
462
463                 if (reply_msg.rm_xid == x_id)
464                         break;
465         }
466
467         /*
468          * process header
469          */
470         return (proc_header(h, &reply_msg, xdr_results, results_ptr));
471 }
472
473 static void
474 sfs_ctcp_geterr(
475         CLIENT *h,
476         struct rpc_err *errp)
477 {
478         struct ct_data *ct =
479             (struct ct_data *) h->cl_private;
480
481         *errp = ct->ct_error;
482 }
483
484 static bool_t
485 sfs_ctcp_freeres(
486         CLIENT *cl,
487         xdrproc_t xdr_res,
488         void * res_ptr)
489 {
490         struct ct_data *ct = (struct ct_data *)cl->cl_private;
491         XDR *xdrs = &(ct->ct_xdrs);
492
493         xdrs->x_op = XDR_FREE;
494         return ((*xdr_res)(xdrs, res_ptr));
495 }
496
497 /* ARGSUSED */
498 static void
499 sfs_ctcp_abort(CLIENT *h)
500 {
501 }
502
503 static bool_t
504 sfs_ctcp_control(
505         CLIENT *cl,
506         uint_t request,
507         void *info)
508 {
509         struct ct_data *ct = (struct ct_data *)cl->cl_private;
510
511         switch (request) {
512         case CLGET_SERVER_ADDR:
513                 *(struct sockaddr_in *)info = ct->ct_addr;
514                 break;
515         default:
516                 return (FALSE);
517         }
518         return (TRUE);
519 }
520
521
522 static void
523 sfs_ctcp_destroy(
524         CLIENT *h)
525 {
526         struct ct_data *ct =
527             (struct ct_data *) h->cl_private;
528
529         if (ct->ct_closeit) {
530                 (void)close(ct->ct_sock);
531         }
532         XDR_DESTROY(&(ct->ct_xdrs));
533         mem_free((void *)ct, sizeof(struct ct_data));
534         mem_free((void *)h, sizeof(CLIENT));
535 }
536
537 /*
538  * Interface between xdr serializer and tcp connection.
539  * Behaves like the system calls, read & write, but keeps some error state
540  * around for the rpc level.
541  */
542 static int
543 readtcp(struct ct_data *ct,
544         char * buf,
545         int len)
546 {
547 #ifdef FD_SETSIZE
548         fd_set mask;
549         fd_set readfds;
550
551         FD_ZERO(&mask);
552         FD_SET(ct->ct_sock, &mask);
553 #else
554         int mask = 1 << (ct->ct_sock);
555         int readfds;
556 #endif /* def FD_SETSIZE */
557
558         if (len == 0)
559                 return (0);
560
561         /* CONSTCOND */
562         while (TRUE) {
563                 readfds = mask;
564                 switch (select(_rpc_dtablesize(), &readfds, NULL, NULL,
565                                &(ct->ct_wait))) {
566                 case 0:
567                         ct->ct_error.re_status = RPC_TIMEDOUT;
568                         return (-1);
569
570                 case -1:
571                         if (errno == EINTR)
572                                 continue;
573                         ct->ct_error.re_status = RPC_CANTRECV;
574                         ct->ct_error.re_errno = errno;
575                         goto lost;
576                 }
577                 break;
578         }
579         switch (len = read(ct->ct_sock, buf, len)) {
580
581         case 0:
582                 /* premature eof */
583                 ct->ct_error.re_errno = ECONNRESET;
584                 ct->ct_error.re_status = RPC_CANTRECV;
585                 len = -1;  /* it's really an error */
586                 goto lost;
587
588         case -1:
589                 ct->ct_error.re_errno = errno;
590                 ct->ct_error.re_status = RPC_CANTRECV;
591                 goto lost;
592         }
593         return (len);
594 lost:
595         /*
596          * We have lost our connection to the server.  Try and
597          * reestablish it.
598          */
599         (void) close(ct->ct_sock);
600         ct->ct_addr = ct->ct_oaddr;
601         ct->ct_sock = -1;
602         XDR_DESTROY(&(ct->ct_xdrs));
603
604         (void) sfs_ctcp_make_conn(ct, &ct->ct_addr, &ct->ct_sock);
605         /*
606          * Create a client handle which uses xdrrec for
607          * serialization and authnone for authentication.
608          */
609         xdrrec_create(&(ct->ct_xdrs), ct->ct_sendsz, ct->ct_recvsz,
610                                                 (void *)ct, readtcp, writetcp);
611         return (-1);
612 }
613
614 static int
615 writetcp(
616         struct ct_data *ct,
617         char * buf,
618         int len)
619 {
620         int i, cnt;
621
622         for (cnt = len; cnt > 0; cnt -= i, buf += i) {
623                 if ((i = write(ct->ct_sock, buf, cnt)) == -1) {
624                         /*
625                          * We have lost our connection to the server.  Try and
626                          * reestablish it.
627                          */
628                         (void) close(ct->ct_sock);
629                         ct->ct_addr = ct->ct_oaddr;
630                         ct->ct_sock = -1;
631                         XDR_DESTROY(&(ct->ct_xdrs));
632
633                         (void) sfs_ctcp_make_conn(ct, &ct->ct_addr,
634                                                                 &ct->ct_sock);
635                         /*
636                          * Create a client handle which uses xdrrec for
637                          * serialization and authnone for authentication.
638                          */
639                         xdrrec_create(&(ct->ct_xdrs), ct->ct_sendsz,
640                                                 ct->ct_recvsz,
641                                                 (void *)ct, readtcp, writetcp);
642                         ct->ct_error.re_errno = errno;
643                         ct->ct_error.re_status = RPC_CANTSEND;
644                         return (-1);
645                 }
646         }
647         return (len);
648 }
649
650 /* ARGSUSED */
651 static bool_t
652 sfs_ctcp_getreply(
653         CLIENT *cl,
654         xdrproc_t xdr_results,
655         void *results_ptr,
656         int cnt,
657         uint32_t *xids,
658         uint32_t *xid,
659         struct timeval *tv)
660 {
661         struct ct_data *ct = (struct ct_data *) cl->cl_private;
662         XDR *xdrs = &(ct->ct_xdrs);
663         struct rpc_msg reply_msg;
664         enum clnt_stat res;
665         int i;
666
667         /*
668          * Receive just one returning transaction id
669          */
670         xdrs->x_op = XDR_DECODE;
671         ct->ct_error.re_status = RPC_SUCCESS;
672         ct->ct_wait.tv_sec = tv->tv_sec;
673         ct->ct_wait.tv_usec = tv->tv_usec;
674
675         if ((res = get_areply(cl, &reply_msg)) != RPC_SUCCESS)
676                 return (res);
677
678         *xid = reply_msg.rm_xid;
679
680         /*
681          * Check to make sure xid matchs one that we are interested in
682          */
683         for (i = 0; i < cnt; i++) {
684                 if (xids[i] == *xid)
685                         break;
686         }
687
688         if (i == cnt)
689                 return (RPC_CANTDECODERES);
690
691         /*
692          * process header
693          */
694         return (proc_header(cl, &reply_msg, xdr_results, results_ptr));
695 }
696  
697 /* ARGSUSED */
698 static int
699 sfs_ctcp_poll(
700         CLIENT *cl,
701         uint32_t usecs)
702 {
703         struct ct_data *ct = (struct ct_data *)cl->cl_private;
704         XDR *xdrs = &(ct->ct_xdrs);
705         struct timeval tv;
706 #ifdef FD_SETSIZE
707         fd_set mask;
708         fd_set readfds;
709
710         FD_ZERO(&mask);
711         FD_SET(ct->ct_sock, &mask);
712 #else
713         int mask = 1 << (ct->ct_sock);
714         int readfds;
715 #endif /* def FD_SETSIZE */
716
717         if (xdrrec_eof(xdrs) == FALSE)
718                 return (1);
719
720         tv.tv_sec = 0;
721         if (usecs > 1000000)
722                 tv.tv_sec = usecs / 1000000;
723         tv.tv_usec = usecs % 1000000;
724
725         readfds = mask;
726         return (select(_rpc_dtablesize(), &readfds, NULL, NULL, &tv));
727 }