More work on the synthetic read benchmark
[bluesky.git] / nfs3 / synreadbench.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10  * requests and reading back the responses, so that we can exercise the server
11  * differently than the Linux kernel NFS client does.
12  *
13  * Much of this is copied from rpc.c and other BlueSky server code but is
14  * designed to run independently of BlueSky. */
15
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <inttypes.h>
21 #include <rpc/pmap_clnt.h>
22 #include <string.h>
23 #include <signal.h>
24 #include <memory.h>
25 #include <netdb.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <netinet/in.h>
29 #include <netinet/ip.h>
30 #include <glib.h>
31
32 /* TCP port number to use for NFS protocol.  (Would be 2049 in standard NFS.) */
33 #define NFS_SERVICE_PORT 2051
34
35 /* Maximum size of a single RPC message that we will accept (8 MB). */
36 #define MAX_RPC_MSGSIZE (8 << 20)
37
38 FILE *logfile = NULL;
39
40 int threads;
41 int completed = 0;
42 int read_size = 32768;
43
44 struct bench_file {
45     uint64_t inum;
46     uint64_t size;
47 };
48
49 struct data_point {
50     uint32_t timestamp;     // Unix timestamp of completion
51     uint32_t latency;       // Latency, in microseconds
52 } __attribute__((packed));
53
54 GArray *bench_files;
55
56 struct rpc_reply {
57     uint32_t xid;
58     uint32_t type;
59     uint32_t stat;
60     uint32_t verf_flavor;
61     uint32_t verf_len;
62     uint32_t accept_stat;
63 };
64
65 struct rpc_fail_reply {
66     uint32_t xid;
67     uint32_t type;
68     uint32_t stat;
69     uint32_t verf_flavor;
70     uint32_t verf_len;
71     uint32_t accept_stat;
72 };
73
74 struct rpc_call_header {
75     uint32_t xid;
76     uint32_t mtype;
77     uint32_t rpcvers;
78     uint32_t prog;
79     uint32_t vers;
80     uint32_t proc;
81 };
82
83 struct rpc_auth {
84     uint32_t flavor;
85     uint32_t len;
86 };
87
88 typedef struct {
89     GIOChannel *channel;
90
91     /* The reassembled message, thus far. */
92     GString *msgbuf;
93
94     /* Remaining number of bytes in this message fragment; 0 if we next expect
95      * another fragment header. */
96     uint32_t frag_len;
97
98     /* If frag_len is zero: the number of bytes of the fragment header that
99      * have been read so far. */
100     int frag_hdr_bytes;
101
102     /* Mapping of XID values to outstanding RPC calls. */
103     GHashTable *xid_table;
104 } NFSConnection;
105
106
107 typedef void (*NFSFunc)(NFSConnection *nfs,
108                         gpointer user_data, const char *reply, size_t len);
109
110 typedef struct {
111     NFSFunc callback;
112     gpointer user_data;
113     int64_t start, end;
114 } CallInfo;
115
116 static GMainLoop *main_loop;
117
118 int64_t now_hires()
119 {
120     struct timespec time;
121
122     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
123         perror("clock_gettime");
124         return 0;
125     }
126
127     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
128 }
129
130 static void do_write(NFSConnection *conn, const char *buf, size_t len)
131 {
132     while (len > 0) {
133         gsize written = 0;
134         switch (g_io_channel_write_chars(conn->channel, buf, len,
135                                          &written, NULL)) {
136         case G_IO_STATUS_ERROR:
137         case G_IO_STATUS_EOF:
138         case G_IO_STATUS_AGAIN:
139             fprintf(stderr, "Error writing to socket!\n");
140             return;
141         case G_IO_STATUS_NORMAL:
142             len -= written;
143             buf += written;
144             break;
145         }
146     }
147 }
148
149 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
150                      NFSFunc completion_handler, gpointer user_data)
151 {
152     static int xid_count = 0;
153     struct rpc_call_header header;
154     struct rpc_auth auth;
155
156     header.xid = GUINT32_TO_BE(xid_count++);
157     header.mtype = 0;
158     header.rpcvers = GUINT32_TO_BE(2);
159     header.prog = GUINT32_TO_BE(NFS_PROGRAM);
160     header.vers = GUINT32_TO_BE(NFS_V3);
161     header.proc = GUINT32_TO_BE(proc);
162
163     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
164     auth.len = 0;
165
166     CallInfo *info = g_new0(CallInfo, 1);
167
168     uint32_t fragment = htonl(0x80000000
169                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
170     do_write(nfs, (const char *)&fragment, sizeof(fragment));
171     do_write(nfs, (const char *)&header, sizeof(header));
172     do_write(nfs, (const char *)&auth, sizeof(auth));
173     do_write(nfs, (const char *)&auth, sizeof(auth));
174     do_write(nfs, msg->str, msg->len);
175     g_io_channel_flush(nfs->channel, NULL);
176
177     info->start = now_hires();
178     info->callback = completion_handler;
179     info->user_data = user_data;
180     g_hash_table_insert(nfs->xid_table,
181                         GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
182 }
183
184 static void process_reply(NFSConnection *nfs, GString *msg)
185 {
186     struct rpc_reply *reply = (struct rpc_reply *)msg->str;
187
188     uint32_t xid = GUINT32_FROM_BE(reply->xid);
189
190     gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
191     CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
192     if (info == NULL) {
193         g_print("Could not match reply XID %d with a call!\n", xid);
194         return;
195     }
196
197     struct data_point d;
198     info->end = now_hires();
199     d.timestamp = info->end / 1000000000;
200     d.latency = (info->end - info->start + 500) / 1000; /* Round off */
201     //printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
202     if (info->callback != NULL)
203         info->callback(nfs, info->user_data,
204                        msg->str + sizeof(*reply), msg->len - sizeof(*reply));
205
206     g_hash_table_remove(nfs->xid_table, key);
207     g_free(info);
208     if (logfile != NULL) {
209         fwrite(&d, sizeof(d), 1, logfile);
210         fflush(logfile);
211     }
212
213     completed++;
214     /*
215     if (completed == 128 * threads) {
216         g_main_loop_quit(main_loop);
217     } */
218 }
219
220 static gboolean read_handler(GIOChannel *channel,
221                              GIOCondition condition,
222                              gpointer data)
223 {
224     NFSConnection *nfs = (NFSConnection *)data;
225
226     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
227
228     /* If we have not yet read in the fragment header, do that first.  This is
229      * 4 bytes that indicates the number of bytes in the message to follow
230      * (with the high bit set if this is the last fragment making up the
231      * message). */
232     if (nfs->frag_len == 0) {
233         bytes_to_read = 4 - nfs->frag_hdr_bytes;
234     } else {
235         bytes_to_read = nfs->frag_len & 0x7fffffff;
236     }
237
238     if (bytes_to_read > MAX_RPC_MSGSIZE
239         || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
240     {
241         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
242                 bytes_to_read);
243         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
244         return FALSE;
245     }
246
247     gsize bytes_read = 0;
248     g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
249     char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
250     switch (g_io_channel_read_chars(nfs->channel, buf,
251                                     bytes_to_read, &bytes_read, NULL)) {
252     case G_IO_STATUS_NORMAL:
253         break;
254     case G_IO_STATUS_AGAIN:
255         return TRUE;
256     case G_IO_STATUS_EOF:
257         if (bytes_read == bytes_to_read)
258             break;
259         /* else fall through */
260     case G_IO_STATUS_ERROR:
261         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
262                 g_io_channel_unix_get_fd(nfs->channel));
263         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
264         /* TODO: Clean up connection object. */
265         return FALSE;
266     }
267
268     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
269
270     g_string_set_size(nfs->msgbuf,
271                       nfs->msgbuf->len - (bytes_to_read - bytes_read));
272
273     if (nfs->frag_len == 0) {
274         /* Handle reading in the fragment header.  If we've read the complete
275          * header, store the fragment size. */
276         nfs->frag_hdr_bytes += bytes_read;
277         if (nfs->frag_hdr_bytes == 4) {
278             memcpy((char *)&nfs->frag_len,
279                    &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
280             nfs->frag_len = ntohl(nfs->frag_len);
281             g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
282             nfs->frag_hdr_bytes = 0;
283         }
284     } else {
285         /* We were reading in the fragment body. */
286         nfs->frag_len -= bytes_read;
287
288         if (nfs->frag_len == 0x80000000) {
289             process_reply(nfs, nfs->msgbuf);
290             nfs->frag_len = 0;
291             g_string_set_size(nfs->msgbuf, 0);
292         }
293     }
294
295     return TRUE;
296 }
297
298 static void send_read_request(NFSConnection *nfs, uint64_t inum,
299                                uint64_t offset, uint64_t len);
300 static void submit_random_read(NFSConnection *nfs)
301 {
302     struct bench_file *bf;
303     bf = &g_array_index(bench_files, struct bench_file,
304                         g_random_int_range(0, bench_files->len));
305     int blocks = bf->size / read_size;
306     if (blocks == 0)
307         blocks = 1;
308
309     int offset = g_random_int_range(0, blocks);
310     send_read_request(nfs, bf->inum, offset * read_size, read_size);
311 }
312
313 static void finish_read_request(NFSConnection *nfs, gpointer user_data,
314                                 const char *reply, size_t len)
315 {
316     submit_random_read(nfs);
317 }
318
319 static void send_read_request(NFSConnection *nfs, uint64_t inum,
320                                uint64_t offset, uint64_t len)
321 {
322     struct nfs_fh3 fh;
323     uint64_t fhdata = GUINT64_TO_BE(inum);
324     fh.data.data_val = (char *)&fhdata;
325     fh.data.data_len = 8;
326     int i;
327
328     char buf[64];
329     struct read3args read;
330     memcpy(&read.file, &fh, sizeof(struct nfs_fh3));
331     read.offset = offset;
332     read.count = len;
333
334     GString *str = g_string_new("");
335     XDR xdr;
336     xdr_string_create(&xdr, str, XDR_ENCODE);
337     xdr_read3args(&xdr, &read);
338     send_rpc(nfs, NFSPROC3_READ, str, finish_read_request,
339              GINT_TO_POINTER((int)inum));
340     g_string_free(str, TRUE);
341 }
342
343 static gboolean idle_handler(gpointer data)
344 {
345     NFSConnection *nfs = (NFSConnection *)data;
346     int i;
347
348     for (i = 0; i < threads; i++) {
349         submit_random_read(nfs);
350     }
351
352 #if 0
353     g_print("Sending requests...\n");
354     for (i = 0; i < threads; i++) {
355         char buf[64];
356         struct diropargs3 lookup;
357         uint64_t rootfh = GUINT64_TO_BE(1);
358
359         sprintf(buf, "file-%d", i + 1);
360         lookup.dir.data.data_len = 8;
361         lookup.dir.data.data_val = (char *)&rootfh;
362         lookup.name = buf;
363
364         GString *str = g_string_new("");
365         XDR xdr;
366         xdr_string_create(&xdr, str, XDR_ENCODE);
367         xdr_diropargs3(&xdr, &lookup);
368         send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
369         g_string_free(str, TRUE);
370     }
371 #endif
372
373     return FALSE;
374 }
375
376 NFSConnection *nfs_connect(const char *hostname)
377 {
378     int result;
379     struct addrinfo hints;
380     struct addrinfo *ai = NULL;
381
382     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
383     if (fd < 0) {
384         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
385         exit(1);
386     }
387
388     memset(&hints, 0, sizeof(hints));
389     hints.ai_family = AF_INET;
390     hints.ai_socktype = SOCK_STREAM;
391     hints.ai_protocol = IPPROTO_TCP;
392     result = getaddrinfo(hostname, "2051", NULL, &ai);
393     if (result < 0 || ai == NULL) {
394         fprintf(stderr, "Hostname lookup failure for %s: %s\n",
395                 hostname, gai_strerror(result));
396         exit(1);
397     }
398
399     if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
400         fprintf(stderr, "Unable to connect to : %m\n");
401     }
402
403     freeaddrinfo(ai);
404
405     NFSConnection *conn = g_new0(NFSConnection, 1);
406     conn->msgbuf = g_string_new("");
407     conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
408
409     conn->channel = g_io_channel_unix_new(fd);
410     g_io_channel_set_encoding(conn->channel, NULL, NULL);
411     g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
412     g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
413
414     g_idle_add(idle_handler, conn);
415
416     return conn;
417 }
418
419 int main(int argc, char *argv[])
420 {
421     g_thread_init(NULL);
422     g_set_prgname("synclient");
423     g_print("Launching synthetic NFS RPC client...\n");
424
425     bench_files = g_array_new(FALSE, TRUE, sizeof(struct bench_file));
426
427     FILE *inodes = fopen(argv[1], "r");
428     if (inodes == NULL) {
429         perror("fopen");
430         return 1;
431     }
432     while (!feof(inodes)) {
433         int i1 = -1, i2 = -1;
434         fscanf(inodes, "%d %d", &i1, &i2);
435         if (i1 < 0 || i2 < 0)
436             continue;
437
438         struct bench_file bf;
439         bf.inum = i1;
440         bf.size = i2;
441         g_array_append_val(bench_files, bf);
442     }
443     fclose(inodes);
444
445     threads = 8;
446     if (argc > 2)
447         threads = atoi(argv[2]);
448     if (argc > 3)
449         read_size = atoi(argv[3]);
450     if (argc > 4)
451         logfile = fopen(argv[4], "wb");
452
453     main_loop = g_main_loop_new(NULL, FALSE);
454     nfs_connect("niniel.sysnet.ucsd.edu");
455
456     g_main_loop_run(main_loop);
457
458     return 0;
459 }