Try to improve parallelism of synreadbench warmup phase
[bluesky.git] / nfs3 / synreadbench.c
1 /* Blue Sky: File Systems in the Cloud
2  *
3  * Copyright (C) 2009  The Regents of the University of California
4  * Written by Michael Vrable <mvrable@cs.ucsd.edu>
5  *
6  * TODO: Licensing
7  */
8
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10  * requests and reading back the responses, so that we can exercise the server
11  * differently than the Linux kernel NFS client does.
12  *
13  * Much of this is copied from rpc.c and other BlueSky server code but is
14  * designed to run independently of BlueSky. */
15
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <inttypes.h>
21 #include <rpc/pmap_clnt.h>
22 #include <string.h>
23 #include <signal.h>
24 #include <memory.h>
25 #include <netdb.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <netinet/in.h>
29 #include <netinet/ip.h>
30 #include <glib.h>
31
32 /* TCP port number to use for NFS protocol.  (Would be 2049 in standard NFS.) */
33 #define NFS_SERVICE_PORT 2051
34
35 /* Maximum size of a single RPC message that we will accept (8 MB). */
36 #define MAX_RPC_MSGSIZE (8 << 20)
37
38 FILE *logfile = NULL;
39
40 int warmup_mode = 0;
41 int threads;
42 int completed = 0;
43 int read_size = 32768;
44
45 struct bench_file {
46     uint64_t inum;
47     uint64_t size;
48 };
49
50 struct data_point {
51     uint32_t timestamp;     // Unix timestamp of completion
52     uint32_t latency;       // Latency, in microseconds
53 } __attribute__((packed));
54
55 GArray *bench_files;
56
57 struct rpc_reply {
58     uint32_t xid;
59     uint32_t type;
60     uint32_t stat;
61     uint32_t verf_flavor;
62     uint32_t verf_len;
63     uint32_t accept_stat;
64 };
65
66 struct rpc_fail_reply {
67     uint32_t xid;
68     uint32_t type;
69     uint32_t stat;
70     uint32_t verf_flavor;
71     uint32_t verf_len;
72     uint32_t accept_stat;
73 };
74
75 struct rpc_call_header {
76     uint32_t xid;
77     uint32_t mtype;
78     uint32_t rpcvers;
79     uint32_t prog;
80     uint32_t vers;
81     uint32_t proc;
82 };
83
84 struct rpc_auth {
85     uint32_t flavor;
86     uint32_t len;
87 };
88
89 typedef struct {
90     GIOChannel *channel;
91
92     /* The reassembled message, thus far. */
93     GString *msgbuf;
94
95     /* Remaining number of bytes in this message fragment; 0 if we next expect
96      * another fragment header. */
97     uint32_t frag_len;
98
99     /* If frag_len is zero: the number of bytes of the fragment header that
100      * have been read so far. */
101     int frag_hdr_bytes;
102
103     /* Mapping of XID values to outstanding RPC calls. */
104     GHashTable *xid_table;
105 } NFSConnection;
106
107
108 typedef void (*NFSFunc)(NFSConnection *nfs,
109                         gpointer user_data, const char *reply, size_t len);
110
111 typedef struct {
112     NFSFunc callback;
113     gpointer user_data;
114     int64_t start, end;
115 } CallInfo;
116
117 static GMainLoop *main_loop;
118
119 int64_t now_hires()
120 {
121     struct timespec time;
122
123     if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
124         perror("clock_gettime");
125         return 0;
126     }
127
128     return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
129 }
130
131 static void do_write(NFSConnection *conn, const char *buf, size_t len)
132 {
133     while (len > 0) {
134         gsize written = 0;
135         switch (g_io_channel_write_chars(conn->channel, buf, len,
136                                          &written, NULL)) {
137         case G_IO_STATUS_ERROR:
138         case G_IO_STATUS_EOF:
139         case G_IO_STATUS_AGAIN:
140             fprintf(stderr, "Error writing to socket!\n");
141             return;
142         case G_IO_STATUS_NORMAL:
143             len -= written;
144             buf += written;
145             break;
146         }
147     }
148 }
149
150 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
151                      NFSFunc completion_handler, gpointer user_data)
152 {
153     static int xid_count = 0;
154     struct rpc_call_header header;
155     struct rpc_auth auth;
156
157     header.xid = GUINT32_TO_BE(xid_count++);
158     header.mtype = 0;
159     header.rpcvers = GUINT32_TO_BE(2);
160     header.prog = GUINT32_TO_BE(NFS_PROGRAM);
161     header.vers = GUINT32_TO_BE(NFS_V3);
162     header.proc = GUINT32_TO_BE(proc);
163
164     auth.flavor = GUINT32_TO_BE(AUTH_NULL);
165     auth.len = 0;
166
167     CallInfo *info = g_new0(CallInfo, 1);
168
169     uint32_t fragment = htonl(0x80000000
170                               | (sizeof(header) + 2*sizeof(auth) + msg->len));
171     do_write(nfs, (const char *)&fragment, sizeof(fragment));
172     do_write(nfs, (const char *)&header, sizeof(header));
173     do_write(nfs, (const char *)&auth, sizeof(auth));
174     do_write(nfs, (const char *)&auth, sizeof(auth));
175     do_write(nfs, msg->str, msg->len);
176     g_io_channel_flush(nfs->channel, NULL);
177
178     info->start = now_hires();
179     info->callback = completion_handler;
180     info->user_data = user_data;
181     g_hash_table_insert(nfs->xid_table,
182                         GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
183 }
184
185 static void process_reply(NFSConnection *nfs, GString *msg)
186 {
187     struct rpc_reply *reply = (struct rpc_reply *)msg->str;
188
189     uint32_t xid = GUINT32_FROM_BE(reply->xid);
190
191     gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
192     CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
193     if (info == NULL) {
194         g_print("Could not match reply XID %d with a call!\n", xid);
195         return;
196     }
197
198     struct data_point d;
199     info->end = now_hires();
200     d.timestamp = info->end / 1000000000;
201     d.latency = (info->end - info->start + 500) / 1000; /* Round off */
202     //printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
203     if (info->callback != NULL)
204         info->callback(nfs, info->user_data,
205                        msg->str + sizeof(*reply), msg->len - sizeof(*reply));
206
207     g_hash_table_remove(nfs->xid_table, key);
208     g_free(info);
209     if (logfile != NULL) {
210         fwrite(&d, sizeof(d), 1, logfile);
211         fflush(logfile);
212     }
213
214     completed++;
215     if (warmup_mode) {
216         printf("Done warming up %d\n", completed);
217         int scale = 1;
218         if (read_size > (1 << 20))
219             scale = read_size / (1 << 20);
220         if (completed == bench_files->len * scale)
221             g_main_loop_quit(main_loop);
222     }
223 }
224
225 static gboolean read_handler(GIOChannel *channel,
226                              GIOCondition condition,
227                              gpointer data)
228 {
229     NFSConnection *nfs = (NFSConnection *)data;
230
231     gsize bytes_to_read = 0;    /* Number of bytes to attempt to read. */
232
233     /* If we have not yet read in the fragment header, do that first.  This is
234      * 4 bytes that indicates the number of bytes in the message to follow
235      * (with the high bit set if this is the last fragment making up the
236      * message). */
237     if (nfs->frag_len == 0) {
238         bytes_to_read = 4 - nfs->frag_hdr_bytes;
239     } else {
240         bytes_to_read = nfs->frag_len & 0x7fffffff;
241     }
242
243     if (bytes_to_read > MAX_RPC_MSGSIZE
244         || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
245     {
246         fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
247                 bytes_to_read);
248         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
249         return FALSE;
250     }
251
252     gsize bytes_read = 0;
253     g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
254     char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
255     switch (g_io_channel_read_chars(nfs->channel, buf,
256                                     bytes_to_read, &bytes_read, NULL)) {
257     case G_IO_STATUS_NORMAL:
258         break;
259     case G_IO_STATUS_AGAIN:
260         return TRUE;
261     case G_IO_STATUS_EOF:
262         if (bytes_read == bytes_to_read)
263             break;
264         /* else fall through */
265     case G_IO_STATUS_ERROR:
266         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
267                 g_io_channel_unix_get_fd(nfs->channel));
268         g_io_channel_shutdown(nfs->channel, TRUE, NULL);
269         /* TODO: Clean up connection object. */
270         return FALSE;
271     }
272
273     g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
274
275     g_string_set_size(nfs->msgbuf,
276                       nfs->msgbuf->len - (bytes_to_read - bytes_read));
277
278     if (nfs->frag_len == 0) {
279         /* Handle reading in the fragment header.  If we've read the complete
280          * header, store the fragment size. */
281         nfs->frag_hdr_bytes += bytes_read;
282         if (nfs->frag_hdr_bytes == 4) {
283             memcpy((char *)&nfs->frag_len,
284                    &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
285             nfs->frag_len = ntohl(nfs->frag_len);
286             g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
287             nfs->frag_hdr_bytes = 0;
288         }
289     } else {
290         /* We were reading in the fragment body. */
291         nfs->frag_len -= bytes_read;
292
293         if (nfs->frag_len == 0x80000000) {
294             process_reply(nfs, nfs->msgbuf);
295             nfs->frag_len = 0;
296             g_string_set_size(nfs->msgbuf, 0);
297         }
298     }
299
300     return TRUE;
301 }
302
303 static void send_read_request(NFSConnection *nfs, uint64_t inum,
304                                uint64_t offset, uint64_t len);
305 static void submit_random_read(NFSConnection *nfs)
306 {
307     static int warmup_counter = 0;
308     struct bench_file *bf;
309
310     if (warmup_mode) {
311         int scale = 1;
312         if (read_size > (1 << 20)) {
313             scale = read_size / (1 << 20);
314         }
315         int filecount = bench_files->len;
316         printf("Warming up file %d\n", warmup_counter);
317         if (warmup_counter >= filecount * scale)
318             return;
319         bf = &g_array_index(bench_files, struct bench_file,
320                             warmup_counter % filecount);
321         send_read_request(nfs, bf->inum, (warmup_counter / filecount) << 20,
322                           read_size > (1 << 20) ? (1 << 20) : read_size);
323         warmup_counter++;
324         return;
325     }
326
327     bf = &g_array_index(bench_files, struct bench_file,
328                         g_random_int_range(0, bench_files->len));
329     int blocks = bf->size / read_size;
330     if (blocks == 0)
331         blocks = 1;
332
333     int offset = g_random_int_range(0, blocks);
334     send_read_request(nfs, bf->inum, offset * read_size, read_size);
335 }
336
337 static void finish_read_request(NFSConnection *nfs, gpointer user_data,
338                                 const char *reply, size_t len)
339 {
340     submit_random_read(nfs);
341 }
342
343 static void send_read_request(NFSConnection *nfs, uint64_t inum,
344                                uint64_t offset, uint64_t len)
345 {
346     struct nfs_fh3 fh;
347     uint64_t fhdata = GUINT64_TO_BE(inum);
348     fh.data.data_val = (char *)&fhdata;
349     fh.data.data_len = 8;
350     int i;
351
352     char buf[64];
353     struct read3args read;
354     memcpy(&read.file, &fh, sizeof(struct nfs_fh3));
355     read.offset = offset;
356     read.count = len;
357
358     GString *str = g_string_new("");
359     XDR xdr;
360     xdr_string_create(&xdr, str, XDR_ENCODE);
361     xdr_read3args(&xdr, &read);
362     send_rpc(nfs, NFSPROC3_READ, str, finish_read_request,
363              GINT_TO_POINTER((int)inum));
364     g_string_free(str, TRUE);
365 }
366
367 static gboolean idle_handler(gpointer data)
368 {
369     NFSConnection *nfs = (NFSConnection *)data;
370     int i;
371
372     for (i = 0; i < threads; i++) {
373         submit_random_read(nfs);
374     }
375
376 #if 0
377     g_print("Sending requests...\n");
378     for (i = 0; i < threads; i++) {
379         char buf[64];
380         struct diropargs3 lookup;
381         uint64_t rootfh = GUINT64_TO_BE(1);
382
383         sprintf(buf, "file-%d", i + 1);
384         lookup.dir.data.data_len = 8;
385         lookup.dir.data.data_val = (char *)&rootfh;
386         lookup.name = buf;
387
388         GString *str = g_string_new("");
389         XDR xdr;
390         xdr_string_create(&xdr, str, XDR_ENCODE);
391         xdr_diropargs3(&xdr, &lookup);
392         send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
393         g_string_free(str, TRUE);
394     }
395 #endif
396
397     return FALSE;
398 }
399
400 NFSConnection *nfs_connect(const char *hostname)
401 {
402     int result;
403     struct addrinfo hints;
404     struct addrinfo *ai = NULL;
405
406     int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
407     if (fd < 0) {
408         fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
409         exit(1);
410     }
411
412     memset(&hints, 0, sizeof(hints));
413     hints.ai_family = AF_INET;
414     hints.ai_socktype = SOCK_STREAM;
415     hints.ai_protocol = IPPROTO_TCP;
416     result = getaddrinfo(hostname, "2051", NULL, &ai);
417     if (result < 0 || ai == NULL) {
418         fprintf(stderr, "Hostname lookup failure for %s: %s\n",
419                 hostname, gai_strerror(result));
420         exit(1);
421     }
422
423     if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
424         fprintf(stderr, "Unable to connect to : %m\n");
425     }
426
427     freeaddrinfo(ai);
428
429     NFSConnection *conn = g_new0(NFSConnection, 1);
430     conn->msgbuf = g_string_new("");
431     conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
432
433     conn->channel = g_io_channel_unix_new(fd);
434     g_io_channel_set_encoding(conn->channel, NULL, NULL);
435     g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
436     g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
437
438     g_idle_add(idle_handler, conn);
439
440     return conn;
441 }
442
443 int main(int argc, char *argv[])
444 {
445     g_thread_init(NULL);
446     g_set_prgname("synclient");
447
448     bench_files = g_array_new(FALSE, TRUE, sizeof(struct bench_file));
449
450     FILE *inodes = fopen(argv[1], "r");
451     if (inodes == NULL) {
452         perror("fopen");
453         return 1;
454     }
455     while (!feof(inodes)) {
456         int i1 = -1, i2 = -1;
457         fscanf(inodes, "%d %d", &i1, &i2);
458         if (i1 < 0 || i2 < 0)
459             continue;
460
461         struct bench_file bf;
462         bf.inum = i1;
463         bf.size = i2;
464         g_array_append_val(bench_files, bf);
465     }
466     fclose(inodes);
467
468     threads = 8;
469     if (argc > 2)
470         threads = atoi(argv[2]);
471     if (argc > 3)
472         read_size = atoi(argv[3]);
473     if (argc > 4) {
474         if (strcmp(argv[4], "WARMUP") == 0) {
475             warmup_mode = 1;
476         } else {
477             logfile = fopen(argv[4], "wb");
478         }
479     }
480
481     main_loop = g_main_loop_new(NULL, FALSE);
482     nfs_connect("vrable2.sysnet.ucsd.edu");
483
484     g_main_loop_run(main_loop);
485
486     return 0;
487 }