1 /* Blue Sky: File Systems in the Cloud
3 * Copyright (C) 2009 The Regents of the University of California
4 * Written by Michael Vrable <mvrable@cs.ucsd.edu>
9 /* Synthetic client for benchmarking: a tool for directly generating NFS
10 * requests and reading back the responses, so that we can exercise the server
11 * differently than the Linux kernel NFS client does.
13 * Much of this is copied from rpc.c and other BlueSky server code but is
14 * designed to run independently of BlueSky. */
16 #include "mount_prot.h"
17 #include "nfs3_prot.h"
21 #include <rpc/pmap_clnt.h>
26 #include <sys/socket.h>
27 #include <sys/types.h>
28 #include <netinet/in.h>
29 #include <netinet/ip.h>
32 /* TCP port number to use for NFS protocol. (Would be 2049 in standard NFS.) */
33 #define NFS_SERVICE_PORT 2051
35 /* Maximum size of a single RPC message that we will accept (8 MB). */
36 #define MAX_RPC_MSGSIZE (8 << 20)
43 int read_size = 32768;
51 uint32_t timestamp; // Unix timestamp of completion
52 uint32_t latency; // Latency, in microseconds
53 } __attribute__((packed));
66 struct rpc_fail_reply {
75 struct rpc_call_header {
92 /* The reassembled message, thus far. */
95 /* Remaining number of bytes in this message fragment; 0 if we next expect
96 * another fragment header. */
99 /* If frag_len is zero: the number of bytes of the fragment header that
100 * have been read so far. */
103 /* Mapping of XID values to outstanding RPC calls. */
104 GHashTable *xid_table;
108 typedef void (*NFSFunc)(NFSConnection *nfs,
109 gpointer user_data, const char *reply, size_t len);
117 static GMainLoop *main_loop;
121 struct timespec time;
123 if (clock_gettime(CLOCK_REALTIME, &time) != 0) {
124 perror("clock_gettime");
128 return (int64_t)(time.tv_sec) * 1000000000 + time.tv_nsec;
131 static void do_write(NFSConnection *conn, const char *buf, size_t len)
135 switch (g_io_channel_write_chars(conn->channel, buf, len,
137 case G_IO_STATUS_ERROR:
138 case G_IO_STATUS_EOF:
139 case G_IO_STATUS_AGAIN:
140 fprintf(stderr, "Error writing to socket!\n");
142 case G_IO_STATUS_NORMAL:
150 static void send_rpc(NFSConnection *nfs, int proc, GString *msg,
151 NFSFunc completion_handler, gpointer user_data)
153 static int xid_count = 0;
154 struct rpc_call_header header;
155 struct rpc_auth auth;
157 header.xid = GUINT32_TO_BE(xid_count++);
159 header.rpcvers = GUINT32_TO_BE(2);
160 header.prog = GUINT32_TO_BE(NFS_PROGRAM);
161 header.vers = GUINT32_TO_BE(NFS_V3);
162 header.proc = GUINT32_TO_BE(proc);
164 auth.flavor = GUINT32_TO_BE(AUTH_NULL);
167 CallInfo *info = g_new0(CallInfo, 1);
169 uint32_t fragment = htonl(0x80000000
170 | (sizeof(header) + 2*sizeof(auth) + msg->len));
171 do_write(nfs, (const char *)&fragment, sizeof(fragment));
172 do_write(nfs, (const char *)&header, sizeof(header));
173 do_write(nfs, (const char *)&auth, sizeof(auth));
174 do_write(nfs, (const char *)&auth, sizeof(auth));
175 do_write(nfs, msg->str, msg->len);
176 g_io_channel_flush(nfs->channel, NULL);
178 info->start = now_hires();
179 info->callback = completion_handler;
180 info->user_data = user_data;
181 g_hash_table_insert(nfs->xid_table,
182 GINT_TO_POINTER(GUINT32_FROM_BE(header.xid)), info);
185 static void process_reply(NFSConnection *nfs, GString *msg)
187 struct rpc_reply *reply = (struct rpc_reply *)msg->str;
189 uint32_t xid = GUINT32_FROM_BE(reply->xid);
191 gpointer key = GINT_TO_POINTER(GUINT32_FROM_BE(reply->xid));
192 CallInfo *info = g_hash_table_lookup(nfs->xid_table, key);
194 g_print("Could not match reply XID %d with a call!\n", xid);
199 info->end = now_hires();
200 d.timestamp = info->end / 1000000000;
201 d.latency = (info->end - info->start + 500) / 1000; /* Round off */
202 //printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
203 if (info->callback != NULL)
204 info->callback(nfs, info->user_data,
205 msg->str + sizeof(*reply), msg->len - sizeof(*reply));
207 g_hash_table_remove(nfs->xid_table, key);
209 if (logfile != NULL) {
210 fwrite(&d, sizeof(d), 1, logfile);
216 printf("Done warming up %d\n", completed);
218 if (read_size > (1 << 20))
219 scale = read_size / (1 << 20);
220 if (completed == bench_files->len * scale)
221 g_main_loop_quit(main_loop);
225 static gboolean read_handler(GIOChannel *channel,
226 GIOCondition condition,
229 NFSConnection *nfs = (NFSConnection *)data;
231 gsize bytes_to_read = 0; /* Number of bytes to attempt to read. */
233 /* If we have not yet read in the fragment header, do that first. This is
234 * 4 bytes that indicates the number of bytes in the message to follow
235 * (with the high bit set if this is the last fragment making up the
237 if (nfs->frag_len == 0) {
238 bytes_to_read = 4 - nfs->frag_hdr_bytes;
240 bytes_to_read = nfs->frag_len & 0x7fffffff;
243 if (bytes_to_read > MAX_RPC_MSGSIZE
244 || nfs->msgbuf->len + bytes_to_read > MAX_RPC_MSGSIZE)
246 fprintf(stderr, "Excessive fragment size for RPC: %zd bytes\n",
248 g_io_channel_shutdown(nfs->channel, TRUE, NULL);
252 gsize bytes_read = 0;
253 g_string_set_size(nfs->msgbuf, nfs->msgbuf->len + bytes_to_read);
254 char *buf = &nfs->msgbuf->str[nfs->msgbuf->len - bytes_to_read];
255 switch (g_io_channel_read_chars(nfs->channel, buf,
256 bytes_to_read, &bytes_read, NULL)) {
257 case G_IO_STATUS_NORMAL:
259 case G_IO_STATUS_AGAIN:
261 case G_IO_STATUS_EOF:
262 if (bytes_read == bytes_to_read)
264 /* else fall through */
265 case G_IO_STATUS_ERROR:
266 fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
267 g_io_channel_unix_get_fd(nfs->channel));
268 g_io_channel_shutdown(nfs->channel, TRUE, NULL);
269 /* TODO: Clean up connection object. */
273 g_assert(bytes_read >= 0 && bytes_read <= bytes_to_read);
275 g_string_set_size(nfs->msgbuf,
276 nfs->msgbuf->len - (bytes_to_read - bytes_read));
278 if (nfs->frag_len == 0) {
279 /* Handle reading in the fragment header. If we've read the complete
280 * header, store the fragment size. */
281 nfs->frag_hdr_bytes += bytes_read;
282 if (nfs->frag_hdr_bytes == 4) {
283 memcpy((char *)&nfs->frag_len,
284 &nfs->msgbuf->str[nfs->msgbuf->len - 4], 4);
285 nfs->frag_len = ntohl(nfs->frag_len);
286 g_string_set_size(nfs->msgbuf, nfs->msgbuf->len - 4);
287 nfs->frag_hdr_bytes = 0;
290 /* We were reading in the fragment body. */
291 nfs->frag_len -= bytes_read;
293 if (nfs->frag_len == 0x80000000) {
294 process_reply(nfs, nfs->msgbuf);
296 g_string_set_size(nfs->msgbuf, 0);
303 static void send_read_request(NFSConnection *nfs, uint64_t inum,
304 uint64_t offset, uint64_t len);
305 static void submit_random_read(NFSConnection *nfs)
307 static int warmup_counter = 0;
308 struct bench_file *bf;
312 if (read_size > (1 << 20)) {
313 scale = read_size / (1 << 20);
315 int filecount = bench_files->len;
316 printf("Warming up file %d\n", warmup_counter);
317 if (warmup_counter >= filecount * scale)
319 bf = &g_array_index(bench_files, struct bench_file,
320 warmup_counter % filecount);
321 send_read_request(nfs, bf->inum, (warmup_counter / filecount) << 20,
322 read_size > (1 << 20) ? (1 << 20) : read_size);
327 bf = &g_array_index(bench_files, struct bench_file,
328 g_random_int_range(0, bench_files->len));
329 int blocks = bf->size / read_size;
333 int offset = g_random_int_range(0, blocks);
334 send_read_request(nfs, bf->inum, offset * read_size, read_size);
337 static void finish_read_request(NFSConnection *nfs, gpointer user_data,
338 const char *reply, size_t len)
340 submit_random_read(nfs);
343 static void send_read_request(NFSConnection *nfs, uint64_t inum,
344 uint64_t offset, uint64_t len)
347 uint64_t fhdata = GUINT64_TO_BE(inum);
348 fh.data.data_val = (char *)&fhdata;
349 fh.data.data_len = 8;
353 struct read3args read;
354 memcpy(&read.file, &fh, sizeof(struct nfs_fh3));
355 read.offset = offset;
358 GString *str = g_string_new("");
360 xdr_string_create(&xdr, str, XDR_ENCODE);
361 xdr_read3args(&xdr, &read);
362 send_rpc(nfs, NFSPROC3_READ, str, finish_read_request,
363 GINT_TO_POINTER((int)inum));
364 g_string_free(str, TRUE);
367 static gboolean idle_handler(gpointer data)
369 NFSConnection *nfs = (NFSConnection *)data;
372 for (i = 0; i < threads; i++) {
373 submit_random_read(nfs);
377 g_print("Sending requests...\n");
378 for (i = 0; i < threads; i++) {
380 struct diropargs3 lookup;
381 uint64_t rootfh = GUINT64_TO_BE(1);
383 sprintf(buf, "file-%d", i + 1);
384 lookup.dir.data.data_len = 8;
385 lookup.dir.data.data_val = (char *)&rootfh;
388 GString *str = g_string_new("");
390 xdr_string_create(&xdr, str, XDR_ENCODE);
391 xdr_diropargs3(&xdr, &lookup);
392 send_rpc(nfs, NFSPROC3_LOOKUP, str, store_fh, NULL);
393 g_string_free(str, TRUE);
400 NFSConnection *nfs_connect(const char *hostname)
403 struct addrinfo hints;
404 struct addrinfo *ai = NULL;
406 int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
408 fprintf(stderr, "Unable to create NFS TCP socket: %m\n");
412 memset(&hints, 0, sizeof(hints));
413 hints.ai_family = AF_INET;
414 hints.ai_socktype = SOCK_STREAM;
415 hints.ai_protocol = IPPROTO_TCP;
416 result = getaddrinfo(hostname, "2051", NULL, &ai);
417 if (result < 0 || ai == NULL) {
418 fprintf(stderr, "Hostname lookup failure for %s: %s\n",
419 hostname, gai_strerror(result));
423 if (connect(fd, ai->ai_addr, ai->ai_addrlen) < 0) {
424 fprintf(stderr, "Unable to connect to %s: %m\n", hostname);
429 NFSConnection *conn = g_new0(NFSConnection, 1);
430 conn->msgbuf = g_string_new("");
431 conn->xid_table = g_hash_table_new(g_direct_hash, g_direct_equal);
433 conn->channel = g_io_channel_unix_new(fd);
434 g_io_channel_set_encoding(conn->channel, NULL, NULL);
435 g_io_channel_set_flags(conn->channel, G_IO_FLAG_NONBLOCK, NULL);
436 g_io_add_watch(conn->channel, G_IO_IN, read_handler, conn);
438 g_idle_add(idle_handler, conn);
443 int main(int argc, char *argv[])
446 g_set_prgname("synclient");
448 bench_files = g_array_new(FALSE, TRUE, sizeof(struct bench_file));
450 FILE *inodes = fopen(argv[1], "r");
451 if (inodes == NULL) {
455 while (!feof(inodes)) {
456 int i1 = -1, i2 = -1;
457 fscanf(inodes, "%d %d", &i1, &i2);
458 if (i1 < 0 || i2 < 0)
461 struct bench_file bf;
464 g_array_append_val(bench_files, bf);
470 threads = atoi(argv[2]);
472 read_size = atoi(argv[3]);
474 if (strcmp(argv[4], "WARMUP") == 0) {
477 logfile = fopen(argv[4], "wb");
481 main_loop = g_main_loop_new(NULL, FALSE);
482 nfs_connect("vrable2.sysnet.ucsd.edu");
484 g_main_loop_run(main_loop);