Try to improve parallelism of synreadbench warmup phase
[bluesky.git] / nfs3 / synreadbench.c
index c3593d2..908363f 100644 (file)
 /* Maximum size of a single RPC message that we will accept (8 MB). */
 #define MAX_RPC_MSGSIZE (8 << 20)
 
+FILE *logfile = NULL;
+
+int warmup_mode = 0;
 int threads;
 int completed = 0;
+int read_size = 32768;
 
 struct bench_file {
     uint64_t inum;
     uint64_t size;
 };
 
+struct data_point {
+    uint32_t timestamp;     // Unix timestamp of completion
+    uint32_t latency;       // Latency, in microseconds
+} __attribute__((packed));
+
 GArray *bench_files;
 
 struct rpc_reply {
@@ -186,18 +195,30 @@ static void process_reply(NFSConnection *nfs, GString *msg)
         return;
     }
 
+    struct data_point d;
     info->end = now_hires();
-    printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
+    d.timestamp = info->end / 1000000000;
+    d.latency = (info->end - info->start + 500) / 1000; /* Round off */
+    //printf("XID %d: Time = %"PRIi64"\n", xid, info->end - info->start);
     if (info->callback != NULL)
         info->callback(nfs, info->user_data,
                        msg->str + sizeof(*reply), msg->len - sizeof(*reply));
 
     g_hash_table_remove(nfs->xid_table, key);
     g_free(info);
+    if (logfile != NULL) {
+        fwrite(&d, sizeof(d), 1, logfile);
+        fflush(logfile);
+    }
 
     completed++;
-    if (completed == 128 * threads) {
-        g_main_loop_quit(main_loop);
+    if (warmup_mode) {
+        printf("Done warming up %d\n", completed);
+        int scale = 1;
+        if (read_size > (1 << 20))
+            scale = read_size / (1 << 20);
+        if (completed == bench_files->len * scale)
+            g_main_loop_quit(main_loop);
     }
 }
 
@@ -281,16 +302,42 @@ static gboolean read_handler(GIOChannel *channel,
 
 static void send_read_request(NFSConnection *nfs, uint64_t inum,
                                uint64_t offset, uint64_t len);
-
-static void finish_read_request(NFSConnection *nfs, gpointer user_data,
-                                const char *reply, size_t len)
+static void submit_random_read(NFSConnection *nfs)
 {
-    printf("Done reading inode %d\n", GPOINTER_TO_INT(user_data));
-
+    static int warmup_counter = 0;
     struct bench_file *bf;
+
+    if (warmup_mode) {
+        int scale = 1;
+        if (read_size > (1 << 20)) {
+            scale = read_size / (1 << 20);
+        }
+        int filecount = bench_files->len;
+        printf("Warming up file %d\n", warmup_counter);
+        if (warmup_counter >= filecount * scale)
+            return;
+        bf = &g_array_index(bench_files, struct bench_file,
+                            warmup_counter % filecount);
+        send_read_request(nfs, bf->inum, (warmup_counter / filecount) << 20,
+                          read_size > (1 << 20) ? (1 << 20) : read_size);
+        warmup_counter++;
+        return;
+    }
+
     bf = &g_array_index(bench_files, struct bench_file,
                         g_random_int_range(0, bench_files->len));
-    send_read_request(nfs, bf->inum, 0, 1048576);
+    int blocks = bf->size / read_size;
+    if (blocks == 0)
+        blocks = 1;
+
+    int offset = g_random_int_range(0, blocks);
+    send_read_request(nfs, bf->inum, offset * read_size, read_size);
+}
+
+static void finish_read_request(NFSConnection *nfs, gpointer user_data,
+                                const char *reply, size_t len)
+{
+    submit_random_read(nfs);
 }
 
 static void send_read_request(NFSConnection *nfs, uint64_t inum,
@@ -323,10 +370,7 @@ static gboolean idle_handler(gpointer data)
     int i;
 
     for (i = 0; i < threads; i++) {
-        struct bench_file *bf;
-        bf = &g_array_index(bench_files, struct bench_file,
-                            g_random_int_range(0, bench_files->len));
-        send_read_request(nfs, bf->inum, 0, 1048576);
+        submit_random_read(nfs);
     }
 
 #if 0
@@ -400,7 +444,6 @@ int main(int argc, char *argv[])
 {
     g_thread_init(NULL);
     g_set_prgname("synclient");
-    g_print("Launching synthetic NFS RPC client...\n");
 
     bench_files = g_array_new(FALSE, TRUE, sizeof(struct bench_file));
 
@@ -420,13 +463,23 @@ int main(int argc, char *argv[])
         bf.size = i2;
         g_array_append_val(bench_files, bf);
     }
+    fclose(inodes);
 
     threads = 8;
     if (argc > 2)
         threads = atoi(argv[2]);
+    if (argc > 3)
+        read_size = atoi(argv[3]);
+    if (argc > 4) {
+        if (strcmp(argv[4], "WARMUP") == 0) {
+            warmup_mode = 1;
+        } else {
+            logfile = fopen(argv[4], "wb");
+        }
+    }
 
     main_loop = g_main_loop_new(NULL, FALSE);
-    nfs_connect("niniel.sysnet.ucsd.edu");
+    nfs_connect("vrable2.sysnet.ucsd.edu");
 
     g_main_loop_run(main_loop);