1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the University nor the names of its contributors
15 * may be used to endorse or promote products derived from this software
16 * without specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
19 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
22 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
24 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
25 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
27 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31 /* Synthetic client for benchmarking: a tool for directly generating NFS
32 * requests and reading back the responses, so that we can exercise the server
33 * differently than the Linux kernel NFS client does.
35 * Much of this is copied from rpc.c and other BlueSky server code but is
36 * designed to run independently of BlueSky. */
38 #include "mount_prot.h"
39 #include "nfs3_prot.h"
43 #include <rpc/pmap_clnt.h>
48 #include <sys/socket.h>
49 #include <sys/types.h>
50 #include <netinet/in.h>
51 #include <netinet/ip.h>
54 /* TCP port number to use for NFS protocol. (Would be 2049 in standard NFS.) */
55 #define NFS_SERVICE_PORT 2051
57 /* Maximum size of a single RPC message that we will accept (8 MB). */
58 #define MAX_RPC_MSGSIZE (8 << 20)
65 int read_size = 32768;
73 uint32_t timestamp; // Unix timestamp of completion
74 uint32_t latency; // Latency, in microseconds
75 } __attribute__((packed));
88 struct rpc_fail_reply {
97 struct rpc_call_header {
114 /* The reassembled message, thus far. */
117 /* Remaining number of bytes in this message fragment; 0 if we next expect
118 * another fragment header. */
121 /* If frag_len is zero: the number of bytes of the fragment header that
122 * have been read so far. */
125 /* Mapping of XID values to outstanding RPC calls. */
126 GHashTable *xid_table;
130 typedef void (*NFSFunc)(NFSConnection *nfs,
131 gpointer user_data, const char *reply, size_t len);
139 static GMainLoop *main_loop;
143 struct timespec time;
145 if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
146 perror("clock_gettime");
150 return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
153 static void do_write(NFSConnection *conn, const char *buf, size_t len)
157 switch (g_io_channel_write_chars(conn->channel, buf, len,
159 case G_IO_STATUS_ERROR:
160 case G_IO_STATUS_EOF:
161 case G_IO_STATUS_AGAIN:
162 fprintf(stderr, "Error writing to socket!\n");
164 case G_IO_STATUS_NORMAL:
172 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
173 NFSFunc completion_handler, gpointer user_data)
175 static int xid_count = 0;
176 struct rpc_call_header header;
177 struct rpc_auth auth;
179 header.xid = GUINT32_TO_BE(xid_count++);
181 header.rpcvers = GUINT32_TO_BE(2);
182 header.prog = GUINT32_TO_BE(NFS_PROGRAM);
183 header.vers = GUINT32_TO_BE(NFS_V3);
184 header.proc = GUINT32_TO_BE(proc);
186 auth.flavor = GUINT32_TO_BE(AUTH_NULL);
189 CallInfo *info = g_new0(CallInfo, 1);
191 uint32_t fragment = htonl(0x80000000
192 | (sizeof(header) + 2*sizeof(auth) + msg->len));
193 do_write(nfs, (const char *)&fragment, sizeof(fragment));
194 do_write(nfs, (const char *)&header, sizeof(header));
195 do_write(nfs, (const char *)&auth, sizeof(auth));
196 do_write(nfs, (const char *)&auth, sizeof(auth));
197 do_write(nfs, msg->str, msg->len);
198 g_io_channel_flush(nfs->channel, NULL);
200 info->start = now_hires();
201 info->callback = completion_handler;
202 info->user_data = user_data;
203 g_hash_table_insert(nfs->xid_table,
204 GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
207 static void process_reply(NFSConnection *nfs, GString *msg)
209 struct rpc_reply *reply = (struct rpc_reply *)msg->str;
211 uint32_t xid = GUINT32_FROM_BE(reply->xid);
213 gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
214 CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
216 g_print("Could not match reply XID %d with a call!\n", xid);
221 info->end = now_hires();
222 d.timestamp = info->end / 1000000000;
223 d.latency = (info->end - info->start + 500) / 1000; /* Round off */
224 //printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
225 if (info->callback != NULL)
226 info->callback(nfs, info->user_data,
227 msg->str + sizeof(*reply), msg->len - sizeof(*reply));
229 g_hash_table_remove(nfs->xid_table, key);
231 if (logfile != NULL) {
232 fwrite(&d, sizeof(d), 1, logfile);
238 printf("Done warming up %d\n", completed);
240 if (read_size > (1 << 20))
241 scale = read_size / (1 << 20);
242 if (completed == bench_files->len * scale)
243 g_main_loop_quit(main_loop);
247 static gboolean read_handler(GIOChannel *channel,
248 GIOCondition condition,
251 NFSConnection *nfs = (NFSConnection *)data;
253 gsize bytes_to_read = 0; /* Number of bytes to attempt to read. */
255 /* If we have not yet read in the fragment header, do that first. This is
256 * 4 bytes that indicates the number of bytes in the message to follow
257 * (with the high bit set if this is the last fragment making up the
259 if (nfs->frag_len == 0) {
260 bytes_to_read = 4 - nfs->frag_hdr_bytes;
262 bytes_to_read = nfs->frag_len & 0x7fffffff;
265 if (bytes_to_read > MAX_RPC_MSGSIZE
266 || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
268 fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
270 g_io_channel_shutdown(nfs->channel, TRUE, NULL);
274 gsize bytes_read = 0;
275 g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
276 char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
277 switch (g_io_channel_read_chars(nfs->channel, buf,
278 bytes_to_read, &bytes_read, NULL)) {
279 case G_IO_STATUS_NORMAL:
281 case G_IO_STATUS_AGAIN:
283 case G_IO_STATUS_EOF:
284 if (bytes_read == bytes_to_read)
286 /* else fall through */
287 case G_IO_STATUS_ERROR:
288 fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
289 g_io_channel_unix_get_fd(nfs->channel));
290 g_io_channel_shutdown(nfs->channel, TRUE, NULL);
291 /* TODO: Clean up connection object. */
295 g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
297 g_string_set_size(nfs->msgbuf,
298 nfs->msgbuf->len - (bytes_to_read - bytes_read));
300 if (nfs->frag_len == 0) {
301 /* Handle reading in the fragment header. If we've read the complete
302 * header, store the fragment size. */
303 nfs->frag_hdr_bytes += bytes_read;
304 if (nfs->frag_hdr_bytes == 4) {
305 memcpy((char *)&nfs->frag_len,
306 &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
307 nfs->frag_len = ntohl(nfs->frag_len);
308 g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
309 nfs->frag_hdr_bytes = 0;
312 /* We were reading in the fragment body. */
313 nfs->frag_len -= bytes_read;
315 if (nfs->frag_len == 0x80000000) {
316 process_reply(nfs, nfs->msgbuf);
318 g_string_set_size(nfs->msgbuf, 0);
325 static void send_read_request(NFSConnection *nfs, uint64_t inum,
326 uint64_t offset, uint64_t len);
327 static void submit_random_read(NFSConnection *nfs)
329 static int warmup_counter = 0;
330 struct bench_file *bf;
334 if (read_size > (1 << 20)) {
335 scale = read_size / (1 << 20);
337 int filecount = bench_files->len;
338 printf("Warming up file %d\n", warmup_counter);
339 if (warmup_counter >= filecount * scale)
341 bf = &g_array_index(bench_files, struct bench_file,
342 warmup_counter % filecount);
343 send_read_request(nfs, bf->inum, (warmup_counter / filecount) << 20,
344 read_size > (1 << 20) ? (1 << 20) : read_size);
349 bf = &g_array_index(bench_files, struct bench_file,
350 g_random_int_range(0, bench_files->len));
351 int blocks = bf->size / read_size;
355 int offset = g_random_int_range(0, blocks);
356 send_read_request(nfs, bf->inum, offset * read_size, read_size);
359 static void finish_read_request(NFSConnection *nfs, gpointer user_data,
360 const char *reply, size_t len)
362 submit_random_read(nfs);
365 static void send_read_request(NFSConnection *nfs, uint64_t inum,
366 uint64_t offset, uint64_t len)
369 uint64_t fhdata = GUINT64_TO_BE(inum);
370 fh.data.data_val = (char *)&fhdata;
371 fh.data.data_len = 8;
375 struct read3args read;
376 memcpy(&read.file, &fh, sizeof(struct nfs_fh3));
377 read.offset = offset;
380 GString *str = g_string_new("");
382 xdr_string_create(&xdr, str, XDR_ENCODE);
383 xdr_read3args(&xdr, &read);
384 send_rpc(nfs, NFSPROC3_READ, str, finish_read_request,
385 GINT_TO_POINTER((int)inum));
386 g_string_free(str, TRUE);
389 static gboolean idle_handler(gpointer data)
391 NFSConnection *nfs = (NFSConnection *)data;
394 for (i = 0; i < threads; i++) {
395 submit_random_read(nfs);
399 g_print("Sending requests...\n");
400 for (i = 0; i < threads; i++) {
402 struct diropargs3 lookup;
403 uint64_t rootfh = GUINT64_TO_BE(1);
405 sprintf(buf, "file-%d", i + 1);
406 lookup.dir.data.data_len = 8;
407 lookup.dir.data.data_val = (char *)&rootfh;
410 GString *str = g_string_new("");
412 xdr_string_create(&xdr, str, XDR_ENCODE);
413 xdr_diropargs3(&xdr, &lookup);
414 send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
415 g_string_free(str, TRUE);
422 NFSConnection *nfs_connect(const char *hostname)
425 struct addrinfo hints;
426 struct addrinfo *ai = NULL;
428 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
430 fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
434 memset(&hints, 0, sizeof(hints));
435 hints.ai_family = AF_INET;
436 hints.ai_socktype = SOCK_STREAM;
437 hints.ai_protocol = IPPROTO_TCP;
438 result = getaddrinfo(hostname, "2051", NULL, &ai);
439 if (result < 0 || ai == NULL) {
440 fprintf(stderr, "Hostname lookup failure for %s: %s\n",
441 hostname, gai_strerror(result));
445 if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
446 fprintf(stderr, "Unable to connect to %s: %m\n", hostname);
451 NFSConnection *conn = g_new0(NFSConnection, 1);
452 conn->msgbuf = g_string_new("");
453 conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
455 conn->channel = g_io_channel_unix_new(fd);
456 g_io_channel_set_encoding(conn->channel, NULL, NULL);
457 g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
458 g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
460 g_idle_add(idle_handler, conn);
465 int main(int argc, char *argv[])
468 g_set_prgname("synclient");
470 bench_files = g_array_new(FALSE, TRUE, sizeof(struct bench_file));
472 FILE *inodes = fopen(argv[1], "r");
473 if (inodes == NULL) {
477 while (!feof(inodes)) {
478 int i1 = -1, i2 = -1;
479 fscanf(inodes, "%d %d", &i1, &i2);
480 if (i1 < 0 || i2 < 0)
483 struct bench_file bf;
486 g_array_append_val(bench_files, bf);
492 threads = atoi(argv[2]);
494 read_size = atoi(argv[3]);
496 if (strcmp(argv[4], "WARMUP") == 0) {
499 logfile = fopen(argv[4], "wb");
503 main_loop = g_main_loop_new(NULL, FALSE);
504 nfs_connect("vrable2.sysnet.ucsd.edu");
506 g_main_loop_run(main_loop);