RPC header file cleanup.
[bluesky.git] / nfs3 / rpc.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* RPC handling: registration, marshalling and unmarshalling of messages.  For
10  * now this uses the standard Sun RPC mechanisms in the standard C library.
11  * Later, it might be changed to use something better.  Much of this code was
12  * generated with rpcgen from the XDR specifications, but has been hand-edited
13  * slightly. */
14
15 #include "mount_prot.h"
16 #include "nfs3_prot.h"
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <rpc/pmap_clnt.h>
20 #include <string.h>
21 #include <memory.h>
22 #include <sys/socket.h>
23 #include <netinet/in.h>
24 #include <netinet/ip.h>
25
26 #include "bluesky.h"
27 extern BlueSkyFS *fs;
28
29 /* TCP port number to use for NFS protocol.  (Should be 2049.) */
30 #define NFS_SERVICE_PORT 2051
31
32 /* Maximum size of a single RPC message that we will accept (8 MB). */
33 #define MAX_RPC_MSGSIZE (8 << 20)
34
35 /* For now, used for NFS only. */
36 typedef struct {
37     GIOChannel *channel;
38
39     /* The reassembled message, thus far. */
40     GString *msgbuf;
41
42     /* Remaining number of bytes in this message fragment; 0 if we next expect
43      * another fragment header. */
44     uint32_t frag_len;
45
46     /* If frag_len is zero: the number of bytes of the fragment header that
47      * have been read so far. */
48     int frag_hdr_bytes;
49 } RPCConnection;
50
51 static void
52 mount_program_3(struct svc_req *rqstp, register SVCXPRT *transp)
53 {
54     union {
55         dirpath mountproc3_mnt_3_arg;
56         dirpath mountproc3_umnt_3_arg;
57     } argument;
58     char *result;
59     xdrproc_t _xdr_argument, _xdr_result;
60     char *(*local)(char *, struct svc_req *);
61
62     switch (rqstp->rq_proc) {
63     case MOUNTPROC3_NULL:
64         _xdr_argument = (xdrproc_t) xdr_void;
65         _xdr_result = (xdrproc_t) xdr_void;
66         local = (char *(*)(char *, struct svc_req *)) mountproc3_null_3_svc;
67         break;
68
69     case MOUNTPROC3_MNT:
70         _xdr_argument = (xdrproc_t) xdr_dirpath;
71         _xdr_result = (xdrproc_t) xdr_mountres3;
72         local = (char *(*)(char *, struct svc_req *)) mountproc3_mnt_3_svc;
73         break;
74
75     case MOUNTPROC3_DUMP:
76         _xdr_argument = (xdrproc_t) xdr_void;
77         _xdr_result = (xdrproc_t) xdr_mountlist;
78         local = (char *(*)(char *, struct svc_req *)) mountproc3_dump_3_svc;
79         break;
80
81     case MOUNTPROC3_UMNT:
82         _xdr_argument = (xdrproc_t) xdr_dirpath;
83         _xdr_result = (xdrproc_t) xdr_void;
84         local = (char *(*)(char *, struct svc_req *)) mountproc3_umnt_3_svc;
85         break;
86
87     case MOUNTPROC3_UMNTALL:
88         _xdr_argument = (xdrproc_t) xdr_void;
89         _xdr_result = (xdrproc_t) xdr_void;
90         local = (char *(*)(char *, struct svc_req *)) mountproc3_umntall_3_svc;
91         break;
92
93     case MOUNTPROC3_EXPORT:
94         _xdr_argument = (xdrproc_t) xdr_void;
95         _xdr_result = (xdrproc_t) xdr_exports;
96         local = (char *(*)(char *, struct svc_req *)) mountproc3_export_3_svc;
97         break;
98
99     default:
100         svcerr_noproc (transp);
101         return;
102     }
103     memset ((char *)&argument, 0, sizeof (argument));
104     if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
105         svcerr_decode (transp);
106         return;
107     }
108     result = (*local)((char *)&argument, rqstp);
109     if (result != NULL && !svc_sendreply(transp, (xdrproc_t) _xdr_result, result)) {
110         svcerr_systemerr (transp);
111     }
112     if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) {
113         fprintf (stderr, "%s", "unable to free arguments");
114         exit (1);
115     }
116     return;
117 }
118
119 struct rpc_reply {
120     uint32_t xid;
121     uint32_t type;
122     uint32_t stat;
123     uint32_t verf_flavor;
124     uint32_t verf_len;
125     uint32_t accept_stat;
126 };
127
128 static void async_rpc_write(RPCConnection *rpc,
129                             const char *buf, gsize len);
130
131 struct rpc_fail_reply {
132     uint32_t xid;
133     uint32_t type;
134     uint32_t stat;
135     uint32_t verf_flavor;
136     uint32_t verf_len;
137     uint32_t accept_stat;
138 };
139
140 static void
141 async_rpc_send_failure(RPCConnection *rpc, uint32_t xid, enum accept_stat stat)
142 {
143     struct rpc_fail_reply header;
144
145     fprintf(stderr, "Sending RPC failure status %d\n", stat);
146
147     header.xid = htonl(xid);
148     header.type = htonl(1);     /* REPLY */
149     header.stat = htonl(MSG_ACCEPTED);
150     header.verf_flavor = 0;
151     header.verf_len = 0;
152     header.accept_stat = htonl(stat);
153
154     uint32_t fragment = htonl(sizeof(header) | 0x80000000);
155     async_rpc_write(rpc, (const char *)&fragment, sizeof(fragment));
156     async_rpc_write(rpc, (const char *)&header, sizeof(header));
157     g_io_channel_flush(rpc->channel, NULL);
158 }
159
160 static void
161 nfs_program_3(struct svc_req *rqstp, RPCConnection *connection, uint32_t xid,
162               const char *msg_buf, size_t msg_len)
163 {
164     union {
165         nfs_fh3 nfsproc3_getattr_3_arg;
166         setattr3args nfsproc3_setattr_3_arg;
167         diropargs3 nfsproc3_lookup_3_arg;
168         access3args nfsproc3_access_3_arg;
169         nfs_fh3 nfsproc3_readlink_3_arg;
170         read3args nfsproc3_read_3_arg;
171         write3args nfsproc3_write_3_arg;
172         create3args nfsproc3_create_3_arg;
173         mkdir3args nfsproc3_mkdir_3_arg;
174         symlink3args nfsproc3_symlink_3_arg;
175         mknod3args nfsproc3_mknod_3_arg;
176         diropargs3 nfsproc3_remove_3_arg;
177         diropargs3 nfsproc3_rmdir_3_arg;
178         rename3args nfsproc3_rename_3_arg;
179         link3args nfsproc3_link_3_arg;
180         readdir3args nfsproc3_readdir_3_arg;
181         readdirplus3args nfsproc3_readdirplus_3_arg;
182         nfs_fh3 nfsproc3_fsstat_3_arg;
183         nfs_fh3 nfsproc3_fsinfo_3_arg;
184         nfs_fh3 nfsproc3_pathconf_3_arg;
185         commit3args nfsproc3_commit_3_arg;
186     } argument;
187     char *result;
188     xdrproc_t _xdr_argument, _xdr_result;
189     char *(*local)(char *, struct svc_req *);
190
191     switch (rqstp->rq_proc) {
192     case NFSPROC3_NULL:
193         _xdr_argument = (xdrproc_t) xdr_void;
194         _xdr_result = (xdrproc_t) xdr_void;
195         local = (char *(*)(char *, struct svc_req *)) nfsproc3_null_3_svc;
196         break;
197
198     case NFSPROC3_GETATTR:
199         _xdr_argument = (xdrproc_t) xdr_nfs_fh3;
200         _xdr_result = (xdrproc_t) xdr_getattr3res;
201         local = (char *(*)(char *, struct svc_req *)) nfsproc3_getattr_3_svc;
202         break;
203
204     case NFSPROC3_SETATTR:
205         _xdr_argument = (xdrproc_t) xdr_setattr3args;
206         _xdr_result = (xdrproc_t) xdr_wccstat3;
207         local = (char *(*)(char *, struct svc_req *)) nfsproc3_setattr_3_svc;
208         break;
209
210     case NFSPROC3_LOOKUP:
211         _xdr_argument = (xdrproc_t) xdr_diropargs3;
212         _xdr_result = (xdrproc_t) xdr_lookup3res;
213         local = (char *(*)(char *, struct svc_req *)) nfsproc3_lookup_3_svc;
214         break;
215
216     case NFSPROC3_ACCESS:
217         _xdr_argument = (xdrproc_t) xdr_access3args;
218         _xdr_result = (xdrproc_t) xdr_access3res;
219         local = (char *(*)(char *, struct svc_req *)) nfsproc3_access_3_svc;
220         break;
221
222     case NFSPROC3_READLINK:
223         _xdr_argument = (xdrproc_t) xdr_nfs_fh3;
224         _xdr_result = (xdrproc_t) xdr_readlink3res;
225         local = (char *(*)(char *, struct svc_req *)) nfsproc3_readlink_3_svc;
226         break;
227
228     case NFSPROC3_READ:
229         _xdr_argument = (xdrproc_t) xdr_read3args;
230         _xdr_result = (xdrproc_t) xdr_read3res;
231         local = (char *(*)(char *, struct svc_req *)) nfsproc3_read_3_svc;
232         break;
233
234     case NFSPROC3_WRITE:
235         _xdr_argument = (xdrproc_t) xdr_write3args;
236         _xdr_result = (xdrproc_t) xdr_write3res;
237         local = (char *(*)(char *, struct svc_req *)) nfsproc3_write_3_svc;
238         break;
239
240     case NFSPROC3_CREATE:
241         _xdr_argument = (xdrproc_t) xdr_create3args;
242         _xdr_result = (xdrproc_t) xdr_diropres3;
243         local = (char *(*)(char *, struct svc_req *)) nfsproc3_create_3_svc;
244         break;
245
246     case NFSPROC3_MKDIR:
247         _xdr_argument = (xdrproc_t) xdr_mkdir3args;
248         _xdr_result = (xdrproc_t) xdr_diropres3;
249         local = (char *(*)(char *, struct svc_req *)) nfsproc3_mkdir_3_svc;
250         break;
251
252     case NFSPROC3_SYMLINK:
253         _xdr_argument = (xdrproc_t) xdr_symlink3args;
254         _xdr_result = (xdrproc_t) xdr_diropres3;
255         local = (char *(*)(char *, struct svc_req *)) nfsproc3_symlink_3_svc;
256         break;
257
258     case NFSPROC3_MKNOD:
259         _xdr_argument = (xdrproc_t) xdr_mknod3args;
260         _xdr_result = (xdrproc_t) xdr_diropres3;
261         local = (char *(*)(char *, struct svc_req *)) nfsproc3_mknod_3_svc;
262         break;
263
264     case NFSPROC3_REMOVE:
265         _xdr_argument = (xdrproc_t) xdr_diropargs3;
266         _xdr_result = (xdrproc_t) xdr_wccstat3;
267         local = (char *(*)(char *, struct svc_req *)) nfsproc3_remove_3_svc;
268         break;
269
270     case NFSPROC3_RMDIR:
271         _xdr_argument = (xdrproc_t) xdr_diropargs3;
272         _xdr_result = (xdrproc_t) xdr_wccstat3;
273         local = (char *(*)(char *, struct svc_req *)) nfsproc3_rmdir_3_svc;
274         break;
275
276     case NFSPROC3_RENAME:
277         _xdr_argument = (xdrproc_t) xdr_rename3args;
278         _xdr_result = (xdrproc_t) xdr_rename3res;
279         local = (char *(*)(char *, struct svc_req *)) nfsproc3_rename_3_svc;
280         break;
281
282     case NFSPROC3_LINK:
283         _xdr_argument = (xdrproc_t) xdr_link3args;
284         _xdr_result = (xdrproc_t) xdr_link3res;
285         local = (char *(*)(char *, struct svc_req *)) nfsproc3_link_3_svc;
286         break;
287
288     case NFSPROC3_READDIR:
289         _xdr_argument = (xdrproc_t) xdr_readdir3args;
290         _xdr_result = (xdrproc_t) xdr_readdir3res;
291         local = (char *(*)(char *, struct svc_req *)) nfsproc3_readdir_3_svc;
292         break;
293
294     case NFSPROC3_READDIRPLUS:
295         _xdr_argument = (xdrproc_t) xdr_readdirplus3args;
296         _xdr_result = (xdrproc_t) xdr_readdirplus3res;
297         local = (char *(*)(char *, struct svc_req *)) nfsproc3_readdirplus_3_svc;
298         break;
299
300     case NFSPROC3_FSSTAT:
301         _xdr_argument = (xdrproc_t) xdr_nfs_fh3;
302         _xdr_result = (xdrproc_t) xdr_fsstat3res;
303         local = (char *(*)(char *, struct svc_req *)) nfsproc3_fsstat_3_svc;
304         break;
305
306     case NFSPROC3_FSINFO:
307         _xdr_argument = (xdrproc_t) xdr_nfs_fh3;
308         _xdr_result = (xdrproc_t) xdr_fsinfo3res;
309         local = (char *(*)(char *, struct svc_req *)) nfsproc3_fsinfo_3_svc;
310         break;
311
312     case NFSPROC3_PATHCONF:
313         _xdr_argument = (xdrproc_t) xdr_nfs_fh3;
314         _xdr_result = (xdrproc_t) xdr_pathconf3res;
315         local = (char *(*)(char *, struct svc_req *)) nfsproc3_pathconf_3_svc;
316         break;
317
318     case NFSPROC3_COMMIT:
319         _xdr_argument = (xdrproc_t) xdr_commit3args;
320         _xdr_result = (xdrproc_t) xdr_commit3res;
321         local = (char *(*)(char *, struct svc_req *)) nfsproc3_commit_3_svc;
322         break;
323
324     default:
325         async_rpc_send_failure(connection, xid, PROC_UNAVAIL);
326         return;
327     }
328
329     /* Decode incoming message */
330     memset ((char *)&argument, 0, sizeof (argument));
331     XDR xdr_in;
332     xdrmem_create(&xdr_in, (char *)msg_buf, msg_len, XDR_DECODE);
333     int i;
334     printf("Call XDR: ");
335     for (i = 0; i < msg_len; i++) {
336         printf("%02x ", (uint8_t)msg_buf[i]);
337     }
338     printf("\n");
339     if (!_xdr_argument(&xdr_in, (caddr_t)&argument)) {
340         async_rpc_send_failure(connection, xid, GARBAGE_ARGS);
341         fprintf(stderr, "RPC decode error!\n");
342         return;
343     }
344
345     /* Perform the call. */
346     result = (*local)((char *)&argument, rqstp);
347
348     /* Encode result and send reply. */
349     static char reply_buf[MAX_RPC_MSGSIZE];
350     XDR xdr_out;
351     xdrmem_create(&xdr_out, reply_buf, MAX_RPC_MSGSIZE, XDR_ENCODE);
352     if (result != NULL && !_xdr_result(&xdr_out, result)) {
353         async_rpc_send_failure(connection, xid, SYSTEM_ERR);
354     }
355
356     struct rpc_reply header;
357     header.xid = htonl(xid);
358     header.type = htonl(1);     /* REPLY */
359     header.stat = htonl(MSG_ACCEPTED);
360     header.verf_flavor = 0;
361     header.verf_len = 0;
362     header.accept_stat = 0;
363
364     gsize msg_size = xdr_out.x_ops->x_getpostn(&xdr_out);
365     printf("Have an RPC reply of size %zd bytes\n", msg_size);
366     uint32_t fragment = htonl((msg_size + sizeof(header)) | 0x80000000);
367     async_rpc_write(connection, (const char *)&fragment, sizeof(fragment));
368     async_rpc_write(connection, (const char *)&header, sizeof(header));
369     async_rpc_write(connection, reply_buf, msg_size);
370     g_io_channel_flush(connection->channel, NULL);
371
372     /* Clean up. */
373     xdr_in.x_op = XDR_FREE;
374     if (!_xdr_argument(&xdr_in, (caddr_t)&argument)) {
375         fprintf (stderr, "%s", "unable to free arguments");
376         exit (1);
377     }
378
379     bluesky_flushd_invoke(fs);
380
381     return;
382 }
383
384 /* Enhanced, asynchronous-friendly RPC layer.  This is a replacement for the
385  * built-in sunrpc parsing and dispatch that will allow for processing multiple
386  * requests at the same time. */
387 static GMainContext *main_context;
388 static GMainLoop *main_loop;
389
390 static async_rpc_init()
391 {
392     main_context = g_main_context_new();
393     main_loop = g_main_loop_new(main_context, FALSE);
394 }
395
396 struct rpc_call_header {
397     uint32_t xid;
398     uint32_t mtype;
399     uint32_t rpcvers;
400     uint32_t prog;
401     uint32_t vers;
402     uint32_t proc;
403 };
404
405 struct rpc_auth {
406     uint32_t flavor;
407     uint32_t len;
408 };
409
410 /* Decode an RPC message and process it.  Returns a boolean indicating whether
411  * the message could be processed; if false, an unrecoverable error occurred
412  * and the transport should be closed. */
413 static gboolean async_rpc_dispatch(RPCConnection *rpc)
414 {
415     int i;
416     GString *msg = rpc->msgbuf;
417     const char *buf = msg->str;
418
419     if (msg->len < sizeof(struct rpc_call_header)) {
420         fprintf(stderr, "Short RPC message: only %zd bytes!\n", msg->len);
421         return FALSE;
422     }
423
424     struct rpc_call_header *header = (struct rpc_call_header *)(msg->str);
425     uint32_t xid = ntohl(header->xid);
426
427     if (ntohl(header->mtype) != 0) {
428         /* Not an RPC call */
429         return FALSE;
430     }
431
432     if (ntohl(header->rpcvers) != 2) {
433         return FALSE;
434     } else if (ntohl(header->prog) != NFS_PROGRAM) {
435         async_rpc_send_failure(rpc, xid, PROG_UNAVAIL);
436         return TRUE;
437     } else if (ntohl(header->vers) != NFS_V3) {
438         /* FIXME: Should be PROG_MISMATCH */
439         async_rpc_send_failure(rpc, xid, PROG_UNAVAIL);
440         return FALSE;
441     }
442
443     uint32_t proc = ntohl(header->proc);
444
445     /* Next, skip over authentication headers. */
446     buf += sizeof(struct rpc_call_header);
447     for (i = 0; i < 2; i++) {
448         struct rpc_auth *auth = (struct rpc_auth *)buf;
449         if (buf - msg->str + sizeof(struct rpc_auth) > msg->len)
450             return FALSE;
451
452         gsize authsize = ntohl(auth->len) + sizeof(struct rpc_auth);
453         if (authsize > MAX_RPC_MSGSIZE)
454             return FALSE;
455
456         buf += authsize;
457     }
458
459     if (buf - msg->str > msg->len)
460         return FALSE;
461
462     printf("Dispatching RPC procedure %d...\n", proc);
463
464     struct svc_req req;
465     req.rq_prog = ntohl(header->prog);
466     req.rq_vers = ntohl(header->vers);
467     req.rq_proc = ntohl(header->proc);
468     req.rq_cred.oa_flavor = 0;
469     req.rq_cred.oa_base = NULL;
470     req.rq_cred.oa_length = 0;
471     req.rq_clntcred = NULL;
472     req.rq_xprt = NULL;
473
474     nfs_program_3(&req, rpc, ntohl(header->xid), buf,
475                   (msg->str + msg->len) - buf);
476
477     return TRUE;
478 }
479
480 /* Write the given data to the RPC socket. */
481 static void async_rpc_write(RPCConnection *rpc,
482                             const char *buf, gsize len)
483 {
484     while (len > 0) {
485         gsize written = 0;
486         switch (g_io_channel_write_chars(rpc->channel, buf, len,
487                                          &written, NULL)) {
488         case G_IO_STATUS_ERROR:
489         case G_IO_STATUS_EOF:
490         case G_IO_STATUS_AGAIN:
491             fprintf(stderr, "Error writing to socket!\n");
492             return;
493         case G_IO_STATUS_NORMAL:
494             len -= written;
495             buf += written;
496             break;
497         }
498     }
499
500     // g_io_channel_flush(rpc->channel, NULL);
501 }
502
503 static gboolean async_rpc_do_read(GIOChannel *channel,
504                                   GIOCondition condition,
505                                   gpointer data)
506 {
507     RPCConnection *rpc = (RPCConnection *)data;
508
509     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
510
511     /* If we have not yet read in the fragment header, do that first.  This is
512      * 4 bytes that indicates the number of bytes in the message to follow
513      * (with the high bit set if this is the last fragment making up the
514      * message). */
515     if (rpc->frag_len == 0) {
516         bytes_to_read = 4 - rpc->frag_hdr_bytes;
517     } else {
518         bytes_to_read = rpc->frag_len & 0x7fffffff;
519     }
520
521     if (bytes_to_read > MAX_RPC_MSGSIZE
522         || rpc->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
523     {
524         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
525                 bytes_to_read);
526         g_io_channel_shutdown(rpc->channel, TRUE, NULL);
527         return FALSE;
528     }
529
530     gsize bytes_read = 0;
531     g_string_set_size(rpc->msgbuf, rpc->msgbuf->len + bytes_to_read);
532     char *buf = &rpc->msgbuf->str[rpc->msgbuf->len - bytes_to_read];
533     switch (g_io_channel_read_chars(rpc->channel, buf,
534                                     bytes_to_read, &bytes_read, NULL)) {
535     case G_IO_STATUS_NORMAL:
536         break;
537     case G_IO_STATUS_AGAIN:
538         return TRUE;
539     case G_IO_STATUS_EOF:
540         if (bytes_read == bytes_to_read)
541             break;
542         /* else fall through */
543     case G_IO_STATUS_ERROR:
544         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
545                 g_io_channel_unix_get_fd(rpc->channel));
546         g_io_channel_shutdown(rpc->channel, TRUE, NULL);
547         return FALSE;
548     }
549
550     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
551
552     g_string_set_size(rpc->msgbuf,
553                       rpc->msgbuf->len - (bytes_to_read - bytes_read));
554
555     if (rpc->frag_len == 0) {
556         /* Handle reading in the fragment header.  If we've read the complete
557          * header, store the fragment size. */
558         rpc->frag_hdr_bytes += bytes_read;
559         if (rpc->frag_hdr_bytes == 4) {
560             memcpy((char *)&rpc->frag_len,
561                    &rpc->msgbuf->str[rpc->msgbuf->len - 4], 4);
562             rpc->frag_len = ntohl(rpc->frag_len);
563             g_string_set_size(rpc->msgbuf, rpc->msgbuf->len - 4);
564             rpc->frag_hdr_bytes = 0;
565             g_print("RPC fragment header: %08x\n", rpc->frag_len);
566         }
567     } else {
568         /* We were reading in the fragment body. */
569         rpc->frag_len -= bytes_read;
570
571         if (rpc->frag_len = 0x80000000) {
572             /* We have a complete message since this was the last fragment and
573              * there are no more bytes in it.  Dispatch the message. */
574             g_print("Complete RPC message: %zd bytes\n", rpc->msgbuf->len);
575             if (!async_rpc_dispatch(rpc)) {
576                 fprintf(stderr, "Invalid RPC message, closing channel\n");
577                 g_io_channel_shutdown(rpc->channel, TRUE, NULL);
578                 return FALSE;
579             }
580             rpc->frag_len = 0;
581             g_string_set_size(rpc->msgbuf, 0);
582         }
583     }
584
585     return TRUE;
586 }
587
588 static gboolean async_rpc_do_accept(GIOChannel *channel,
589                                     GIOCondition condition,
590                                     gpointer data)
591 {
592     int fd = g_io_channel_unix_get_fd(channel);
593     struct sockaddr_in addr;
594     socklen_t addrlen = sizeof(addr);
595
596     g_print("Received new connection on fd %d!\n", fd);
597     int nfd = accept(fd, (struct sockaddr *)&addr, &addrlen);
598     if (nfd < 0) {
599         fprintf(stderr, "Error accepting connection: %m\n");
600         return TRUE;
601     }
602
603     RPCConnection *rpc = g_new0(RPCConnection, 1);
604     rpc->channel = g_io_channel_unix_new(nfd);
605     rpc->msgbuf = g_string_new("");
606     g_io_channel_set_encoding(rpc->channel, NULL, NULL);
607     GSource *source = g_io_create_watch(rpc->channel, G_IO_IN);
608     g_source_set_callback(source, (GSourceFunc)async_rpc_do_read,
609                           rpc, NULL);
610     g_source_attach(source, main_context);
611     g_source_unref(source);
612
613     return TRUE;
614 }
615
616 static async_rpc_register_listening(int fd)
617 {
618     GIOChannel *channel = g_io_channel_unix_new(fd);
619     g_io_channel_set_encoding(channel, NULL, NULL);
620     GSource *source = g_io_create_watch(channel, G_IO_IN);
621     g_source_set_callback(source, (GSourceFunc)async_rpc_do_accept,
622                           NULL, NULL);
623     g_source_attach(source, main_context);
624     g_source_unref(source);
625 }
626
627 static gpointer async_rpc_run(gpointer data)
628 {
629     g_print("Starting NFS main loop...\n");
630     g_main_loop_run(main_loop);
631 }
632
633 void register_rpc()
634 {
635     SVCXPRT *transp;
636
637     async_rpc_init();
638
639     /* MOUNT protocol */
640     pmap_unset (MOUNT_PROGRAM, MOUNT_V3);
641
642     transp = svcudp_create(RPC_ANYSOCK);
643     if (transp == NULL) {
644         fprintf(stderr, "%s", "cannot create udp service.");
645         exit(1);
646     }
647     if (!svc_register(transp, MOUNT_PROGRAM, MOUNT_V3, mount_program_3, IPPROTO_UDP)) {
648         fprintf(stderr, "%s", "unable to register (MOUNT_PROGRAM, MOUNT_V3, udp).");
649         exit(1);
650     }
651
652     transp = svctcp_create(RPC_ANYSOCK, 0, 0);
653     if (transp == NULL) {
654         fprintf(stderr, "%s", "cannot create tcp service.");
655         exit(1);
656     }
657     if (!svc_register(transp, MOUNT_PROGRAM, MOUNT_V3, mount_program_3, IPPROTO_TCP)) {
658         fprintf(stderr, "%s", "unable to register (MOUNT_PROGRAM, MOUNT_V3, tcp).");
659         exit(1);
660     }
661
662     /* NFS protocol (version 3) */
663     pmap_unset (NFS_PROGRAM, NFS_V3);
664
665     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
666     if (fd < 0) {
667         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
668         exit(1);
669     }
670
671     int n = 1;
672     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n));
673
674     struct sockaddr_in addr;
675     addr.sin_family = AF_INET;
676     addr.sin_port = htons(NFS_SERVICE_PORT);
677     addr.sin_addr.s_addr = INADDR_ANY;
678     if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
679         fprintf(stderr, "Unable to bind to NFS TCP address: %m\n");
680         exit(1);
681     }
682
683     if (listen(fd, SOMAXCONN) < 0) {
684         fprintf(stderr, "Unable to listen on NFS TCP socket: %m\n");
685         exit(1);
686     }
687
688     if (!pmap_set(NFS_PROGRAM, NFS_V3, IPPROTO_TCP, NFS_SERVICE_PORT)) {
689         fprintf(stderr, "Could not register NFS RPC service!\n");
690         exit(1);
691     }
692
693     async_rpc_register_listening(fd);
694
695     g_thread_create(async_rpc_run, NULL, TRUE, NULL);
696 }