Implement multi-threaded request processing and locking for NFS.
authorMichael Vrable <mvrable@cs.ucsd.edu>
Tue, 26 Jan 2010 02:31:07 +0000 (18:31 -0800)
committerMichael Vrable <mvrable@cs.ucsd.edu>
Tue, 26 Jan 2010 02:31:07 +0000 (18:31 -0800)
bluesky/bluesky.h
bluesky/inode.c
nfs3/nfs3.c
nfs3/nfs3_prot.h
nfs3/rpc.c

index 201cc17..6324ce2 100644 (file)
@@ -121,6 +121,10 @@ bluesky_time_hires bluesky_now_hires();
  *     dropping from the cache.
  *   - Any pending operations should hold extra references to the inode as
  *     appropriate to keep it available until the operation completes.
+ *   - Locking dependency order is, when multiple locks are to be acquired, to
+ *     acquire locks on parents in the filesystem tree before children.
+ *     (TODO: What about rename when we acquire locks in unrelated parts of the
+ *     filesystem?)
  * */
 typedef struct {
     GMutex *lock;
index 4a158c7..9b6160a 100644 (file)
@@ -172,8 +172,8 @@ BlueSkyInode *bluesky_new_inode(uint64_t inum, BlueSkyFS *fs,
 
 /* Retrieve an inode from the filesystem.  Eventually this will be a cache and
  * so we might need to go fetch the inode from elsewhere; for now all
- * filesystem state is stored here.  inode is returned locked with a reference
- * held. */
+ * filesystem state is stored here.  inode is returned with a reference held
+ * but not locked. */
 BlueSkyInode *bluesky_get_inode(BlueSkyFS *fs, uint64_t inum)
 {
     BlueSkyInode *inode = NULL;
index 8be68bd..72ec9cb 100644 (file)
@@ -104,8 +104,7 @@ void set_attributes(BlueSkyInode *inode, sattr3 *attributes)
         break;
     }
 
-    inode->ctime = now;
-    inode->change_count++;
+    bluesky_inode_update_ctime(inode, FALSE);
 }
 
 /* Copy inode attributes into NFS response.  The BlueSkyInode should be locked
@@ -162,7 +161,9 @@ void nfsproc3_getattr_3_svc(nfs_fh3 *argp, RPCRequest *req)
     BlueSkyInode *inode = lookup_fh(req, argp);
     if (inode != NULL) {
         result.status = NFS3_OK;
+        g_mutex_lock(inode->lock);
         encode_fattr3(&result.getattr3res_u.attributes, inode);
+        g_mutex_unlock(inode->lock);
     } else {
         result.status = NFS3ERR_STALE;
     }
@@ -184,12 +185,14 @@ void nfsproc3_setattr_3_svc(setattr3args *argp, RPCRequest *req)
         return;
     }
 
+    g_mutex_lock(inode->lock);
     encode_pre_wcc(&result.wccstat3_u.wcc, inode);
     if (argp->guard.check) {
         if (inode->ctime != decode_nfstime3(&argp->guard.sattrguard3_u.ctime)) {
             result.status = NFS3ERR_NOT_SYNC;
             result.wccstat3_u.wcc.after.present = TRUE;
             encode_fattr3(&result.wccstat3_u.wcc.after.post_op_attr_u.attributes, inode);
+            g_mutex_unlock(inode->lock);
             async_rpc_send_reply(req, &result);
             return;
         }
@@ -202,6 +205,7 @@ void nfsproc3_setattr_3_svc(setattr3args *argp, RPCRequest *req)
                   inode);
     result.status = NFS3_OK;
 
+    g_mutex_unlock(inode->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -218,6 +222,7 @@ void nfsproc3_lookup_3_svc(diropargs3 *argp, RPCRequest *req)
         return;
     }
 
+    g_mutex_lock(dir->lock);
     result.lookup3res_u.resfail.present = TRUE;
     encode_fattr3(&result.lookup3res_u.resfail.post_op_attr_u.attributes, dir);
     if (!validate_filename(argp->name)) {
@@ -225,6 +230,7 @@ void nfsproc3_lookup_3_svc(diropargs3 *argp, RPCRequest *req)
             result.status = NFS3ERR_NAMETOOLONG;
         else
             result.status = NFS3ERR_NOENT;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -233,15 +239,18 @@ void nfsproc3_lookup_3_svc(diropargs3 *argp, RPCRequest *req)
     uint64_t inum = bluesky_directory_lookup(dir, argp->name);
     if (inum == 0) {
         result.status = NFS3ERR_NOENT;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
     BlueSkyInode *inode = bluesky_get_inode(fs, inum);
     if (inode == NULL) {
         result.status = NFS3ERR_NOENT;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(inode->lock);
     schedule_inode_unref(req, inode);
 
     result.status = NFS3_OK;
@@ -255,6 +264,8 @@ void nfsproc3_lookup_3_svc(diropargs3 *argp, RPCRequest *req)
     result.lookup3res_u.resok.object.data.data_len = 8;
     result.lookup3res_u.resok.object.data.data_val = (char *)&fh_bytes;
 
+    g_mutex_unlock(inode->lock);
+    g_mutex_unlock(dir->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -271,10 +282,12 @@ void nfsproc3_access_3_svc(access3args *argp, RPCRequest *req)
         return;
     }
 
+    g_mutex_lock(inode->lock);
     result.status = NFS3_OK;
     result.access3res_u.resok.obj_attributes.present = TRUE;
     encode_fattr3(&result.access3res_u.resok.obj_attributes.post_op_attr_u.attributes, inode);
     result.access3res_u.resok.access = argp->access;
+    g_mutex_unlock(inode->lock);
 
     async_rpc_send_reply(req, &result);
 }
@@ -286,6 +299,7 @@ void nfsproc3_readlink_3_svc(nfs_fh3 *argp, RPCRequest *req)
 
     BlueSkyInode *inode = lookup_fh(req, argp);
     if (inode != NULL) {
+        g_mutex_lock(inode->lock);
         if (inode->type == BLUESKY_SYMLINK) {
             result.status = NFS3_OK;
             result.readlink3res_u.resok.symlink_attributes.present = TRUE;
@@ -296,6 +310,7 @@ void nfsproc3_readlink_3_svc(nfs_fh3 *argp, RPCRequest *req)
             result.readlink3res_u.resfail.present = TRUE;
             encode_fattr3(&result.readlink3res_u.resfail.post_op_attr_u.attributes, inode);
         }
+        g_mutex_unlock(inode->lock);
     } else {
         result.status = NFS3ERR_STALE;
     }
@@ -317,6 +332,8 @@ void nfsproc3_read_3_svc(read3args *argp, RPCRequest *req)
         return;
     }
 
+    g_mutex_lock(inode->lock);
+
     int count = argp->count;
     if (argp->offset >= inode->size) {
         count = 0;
@@ -338,6 +355,8 @@ void nfsproc3_read_3_svc(read3args *argp, RPCRequest *req)
     result.read3res_u.resok.data.data_val = buf;
     result.read3res_u.resok.data.data_len = count;
 
+    g_mutex_unlock(inode->lock);
+
     async_rpc_send_reply(req, &result);
 }
 
@@ -356,10 +375,13 @@ void nfsproc3_write_3_svc(write3args *argp, RPCRequest *req)
         return;
     }
 
+    g_mutex_lock(inode->lock);
+
     encode_pre_wcc(&wcc, inode);
     if (inode->type != BLUESKY_REGULAR) {
         result.status = NFS3ERR_INVAL;
         result.write3res_u.resfail = wcc;
+        g_mutex_unlock(inode->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -382,6 +404,8 @@ void nfsproc3_write_3_svc(write3args *argp, RPCRequest *req)
     result.write3res_u.resok.count = argp->count;
     result.write3res_u.resok.committed = FILE_SYNC;
 
+    g_mutex_unlock(inode->lock);
+
     async_rpc_send_reply(req, &result);
 }
 
@@ -400,10 +424,13 @@ void nfsproc3_create_3_svc(create3args *argp, RPCRequest *req)
         return;
     }
 
+    g_mutex_lock(dir->lock);
+
     encode_pre_wcc(&wcc, dir);
     if (dir->type != BLUESKY_DIRECTORY) {
         result.status = NFS3ERR_NOTDIR;
         result.diropres3_u.resfail = wcc;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -414,6 +441,7 @@ void nfsproc3_create_3_svc(create3args *argp, RPCRequest *req)
     {
         result.status = NFS3ERR_EXIST;
         result.diropres3_u.resfail = wcc;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -428,11 +456,11 @@ void nfsproc3_create_3_svc(create3args *argp, RPCRequest *req)
     file->ctime = time;
     file->atime = time;
     file->ntime = time;
+    g_mutex_lock(file->lock);
     bluesky_insert_inode(fs, file);
     bluesky_directory_insert(dir, argp->where.name, file->inum);
 
-    dir->mtime = dir->ctime = bluesky_get_current_time();
-    dir->change_count++;
+    bluesky_inode_update_ctime(dir, TRUE);
 
     wcc.after.present = TRUE;
     encode_fattr3(&wcc.after.post_op_attr_u.attributes, dir);
@@ -446,6 +474,9 @@ void nfsproc3_create_3_svc(create3args *argp, RPCRequest *req)
     result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_len = 8;
     result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_val = (char *)&fh_bytes;
 
+    g_mutex_unlock(file->lock);
+    g_mutex_unlock(dir->lock);
+
     async_rpc_send_reply(req, &result);
 }
 
@@ -464,10 +495,13 @@ void nfsproc3_mkdir_3_svc(mkdir3args *argp, RPCRequest *req)
         return;
     }
 
+    g_mutex_lock(dir->lock);
+
     encode_pre_wcc(&wcc, dir);
     if (dir->type != BLUESKY_DIRECTORY) {
         result.status = NFS3ERR_NOTDIR;
         result.diropres3_u.resfail = wcc;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -478,6 +512,7 @@ void nfsproc3_mkdir_3_svc(mkdir3args *argp, RPCRequest *req)
     {
         result.status = NFS3ERR_EXIST;
         result.diropres3_u.resfail = wcc;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -491,12 +526,12 @@ void nfsproc3_mkdir_3_svc(mkdir3args *argp, RPCRequest *req)
     file->ctime = time;
     file->atime = time;
     file->ntime = time;
+    g_mutex_lock(file->lock);
     bluesky_insert_inode(fs, file);
     bluesky_directory_insert(dir, argp->where.name, file->inum);
     set_attributes(file, &argp->attributes);
 
-    dir->mtime = dir->ctime = bluesky_get_current_time();
-    dir->change_count++;
+    bluesky_inode_update_ctime(dir, TRUE);
 
     wcc.after.present = TRUE;
     encode_fattr3(&wcc.after.post_op_attr_u.attributes, dir);
@@ -510,6 +545,8 @@ void nfsproc3_mkdir_3_svc(mkdir3args *argp, RPCRequest *req)
     result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_len = 8;
     result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_val = (char *)&fh_bytes;
 
+    g_mutex_unlock(file->lock);
+    g_mutex_unlock(dir->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -527,11 +564,13 @@ void nfsproc3_symlink_3_svc(symlink3args *argp, RPCRequest *req)
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(dir->lock);
 
     encode_pre_wcc(&wcc, dir);
     if (dir->type != BLUESKY_DIRECTORY) {
         result.status = NFS3ERR_NOTDIR;
         result.diropres3_u.resfail = wcc;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -542,6 +581,7 @@ void nfsproc3_symlink_3_svc(symlink3args *argp, RPCRequest *req)
     {
         result.status = NFS3ERR_EXIST;
         result.diropres3_u.resfail = wcc;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -556,11 +596,11 @@ void nfsproc3_symlink_3_svc(symlink3args *argp, RPCRequest *req)
     file->atime = time;
     file->ntime = time;
     file->symlink_contents = g_strdup(argp->symlink.symlink_data);
+    g_mutex_lock(file->lock);
     bluesky_insert_inode(fs, file);
     bluesky_directory_insert(dir, argp->where.name, file->inum);
 
-    dir->mtime = dir->ctime = bluesky_get_current_time();
-    dir->change_count++;
+    bluesky_inode_update_ctime(dir, TRUE);
 
     wcc.after.present = TRUE;
     encode_fattr3(&wcc.after.post_op_attr_u.attributes, dir);
@@ -574,6 +614,8 @@ void nfsproc3_symlink_3_svc(symlink3args *argp, RPCRequest *req)
     result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_len = 8;
     result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_val = (char *)&fh_bytes;
 
+    g_mutex_unlock(file->lock);
+    g_mutex_unlock(dir->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -599,6 +641,8 @@ void nfsproc3_remove_3_svc(diropargs3 *argp, RPCRequest *req)
         return;
     }
 
+    g_mutex_lock(dir->lock);
+
     encode_pre_wcc(&result.wccstat3_u.wcc, dir);
 
     if (!validate_filename(argp->name)
@@ -606,6 +650,7 @@ void nfsproc3_remove_3_svc(diropargs3 *argp, RPCRequest *req)
         || strcmp(argp->name, "..") == 0)
     {
         result.status = NFS3ERR_NOENT;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -619,6 +664,7 @@ void nfsproc3_remove_3_svc(diropargs3 *argp, RPCRequest *req)
     encode_fattr3(&result.wccstat3_u.wcc.after.post_op_attr_u.attributes,
                   dir);
 
+    g_mutex_unlock(dir->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -634,6 +680,8 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req)
         return;
     }
 
+    g_mutex_lock(dir->lock);
+
     encode_pre_wcc(&result.wccstat3_u.wcc, dir);
 
     if (!validate_filename(argp->name)
@@ -641,6 +689,7 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req)
         || strcmp(argp->name, "..") == 0)
     {
         result.status = NFS3ERR_NOENT;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -649,13 +698,17 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req)
     BlueSkyInode *inode = bluesky_get_inode(fs, inum);
     if (inode == NULL) {
         result.status = NFS3ERR_NOENT;
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(inode->lock);
     schedule_inode_unref(req, inode);
 
     if (inode->type != BLUESKY_DIRECTORY) {
         result.status = NFS3ERR_NOTDIR;
+        g_mutex_unlock(inode->lock);
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -663,6 +716,8 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req)
         printf("Directory not empty: %d entries\n",
                g_sequence_get_length(inode->dirents));
         result.status = NFS3ERR_NOTEMPTY;
+        g_mutex_unlock(inode->lock);
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -676,6 +731,8 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req)
     encode_fattr3(&result.wccstat3_u.wcc.after.post_op_attr_u.attributes,
                   dir);
 
+    g_mutex_unlock(inode->lock);
+    g_mutex_unlock(dir->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -692,14 +749,17 @@ void nfsproc3_rename_3_svc(rename3args *argp, RPCRequest *req)
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(dir1->lock);
     encode_pre_wcc(wcc1, dir1);
 
     BlueSkyInode *dir2 = lookup_fh(req, &argp->to.dir);
     if (dir2 == NULL) {
         result.status = NFS3ERR_STALE;
+        g_mutex_unlock(dir1->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(dir2->lock);
     encode_pre_wcc(wcc2, dir1);
 
     gboolean status = bluesky_rename(dir1, argp->from.name,
@@ -715,6 +775,8 @@ void nfsproc3_rename_3_svc(rename3args *argp, RPCRequest *req)
     else
         result.status = NFS3ERR_PERM;
 
+    g_mutex_unlock(dir2->lock);
+    g_mutex_unlock(dir1->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -732,19 +794,24 @@ void nfsproc3_link_3_svc(link3args *argp, RPCRequest *req)
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(inode->lock);
 
     BlueSkyInode *dir = lookup_fh(req, &argp->link.dir);
     if (dir == NULL) {
         result.status = NFS3ERR_STALE;
         result.link3res_u.res.linkdir_wcc = wcc;
+        g_mutex_unlock(inode->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(dir->lock);
 
     encode_pre_wcc(&wcc, dir);
     if (dir->type != BLUESKY_DIRECTORY) {
         result.status = NFS3ERR_NOTDIR;
         result.link3res_u.res.linkdir_wcc = wcc;
+        g_mutex_unlock(inode->lock);
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -756,6 +823,8 @@ void nfsproc3_link_3_svc(link3args *argp, RPCRequest *req)
     {
         result.status = NFS3ERR_EXIST;
         result.link3res_u.res.linkdir_wcc = wcc;
+        g_mutex_unlock(inode->lock);
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
@@ -763,11 +832,14 @@ void nfsproc3_link_3_svc(link3args *argp, RPCRequest *req)
     if (!bluesky_directory_insert(dir, argp->link.name, inode->inum)) {
         result.status = NFS3ERR_EXIST;
         result.link3res_u.res.linkdir_wcc = wcc;
+        g_mutex_unlock(inode->lock);
+        g_mutex_unlock(dir->lock);
         async_rpc_send_reply(req, &result);
         return;
     }
     inode->nlink++;
-    bluesky_inode_update_ctime(inode, 0);
+    bluesky_inode_update_ctime(inode, FALSE);
+    bluesky_inode_update_ctime(dir, TRUE);
 
     result.status = NFS3_OK;
     wcc.after.present = TRUE;
@@ -776,6 +848,8 @@ void nfsproc3_link_3_svc(link3args *argp, RPCRequest *req)
     encode_fattr3(&result.link3res_u.res.file_attributes.post_op_attr_u.attributes, inode);
     result.link3res_u.res.linkdir_wcc = wcc;
 
+    g_mutex_unlock(inode->lock);
+    g_mutex_unlock(dir->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -795,6 +869,7 @@ void nfsproc3_readdir_3_svc(readdir3args *argp, RPCRequest *req)
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(dir->lock);
 
     result.status = NFS3_OK;
     result.readdir3res_u.resok.dir_attributes.present = TRUE;
@@ -827,6 +902,7 @@ void nfsproc3_readdir_3_svc(readdir3args *argp, RPCRequest *req)
         result.readdir3res_u.resok.reply.entries = NULL;
     result.readdir3res_u.resok.reply.eof = g_sequence_iter_is_end(i);
 
+    g_mutex_unlock(dir->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -849,6 +925,7 @@ void nfsproc3_readdirplus_3_svc(readdirplus3args *argp, RPCRequest *req)
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(dir->lock);
 
     result.status = NFS3_OK;
     result.readdirplus3res_u.resok.dir_attributes.present = TRUE;
@@ -869,12 +946,17 @@ void nfsproc3_readdirplus_3_svc(readdirplus3args *argp, RPCRequest *req)
     while (count < MAX_READDIR_DIRENTS && !g_sequence_iter_is_end(i)) {
         BlueSkyDirent *d = g_sequence_get(i);
         BlueSkyInode *inode = bluesky_get_inode(fs, d->inum);
+        g_mutex_lock(inode->lock);
         if (inode != NULL) {
             dircount += 24 + ((strlen(d->name) + 3) & ~3);
             attrcount += 88 + 8 + 8;
             if (dircount > argp->dircount
                 || dircount + attrcount > argp->maxcount)
+            {
+                g_mutex_unlock(inode->lock);
+                bluesky_inode_unref(inode);
                 break;
+            }
             dirents[count].fileid = d->inum;
             dirents[count].name = d->name;
             dirents[count].cookie = d->cookie;
@@ -889,6 +971,7 @@ void nfsproc3_readdirplus_3_svc(readdirplus3args *argp, RPCRequest *req)
             if (count > 0)
                 dirents[count - 1].nextentry = &dirents[count];
             count++;
+            g_mutex_unlock(inode->lock);
             bluesky_inode_unref(inode);
         }
         i = g_sequence_iter_next(i);
@@ -900,6 +983,7 @@ void nfsproc3_readdirplus_3_svc(readdirplus3args *argp, RPCRequest *req)
         result.readdirplus3res_u.resok.reply.entries = NULL;
     result.readdirplus3res_u.resok.reply.eof = g_sequence_iter_is_end(i);
 
+    g_mutex_unlock(dir->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -915,6 +999,7 @@ void nfsproc3_fsstat_3_svc(nfs_fh3 *argp, RPCRequest *req)
         async_rpc_send_reply(req, &result);
         return;
     }
+    g_mutex_lock(inode->lock);
 
     result.status = NFS3_OK;
     result.fsstat3res_u.resok.obj_attributes.present = TRUE;
@@ -928,6 +1013,7 @@ void nfsproc3_fsstat_3_svc(nfs_fh3 *argp, RPCRequest *req)
     result.fsstat3res_u.resok.afiles = 0;
     result.fsstat3res_u.resok.invarsec = 0;
 
+    g_mutex_unlock(inode->lock);
     async_rpc_send_reply(req, &result);
 }
 
@@ -937,6 +1023,7 @@ void nfsproc3_fsinfo_3_svc(nfs_fh3 *argp, RPCRequest *req)
     memset(&result, 0, sizeof(result));
 
     BlueSkyInode *inode = bluesky_get_inode(fs, 1);
+    g_mutex_lock(inode->lock);
     result.status = NFS3_OK;
     result.fsinfo3res_u.resok.obj_attributes.present = TRUE;
     encode_fattr3(&result.fsinfo3res_u.resok.obj_attributes.post_op_attr_u.attributes, inode);
@@ -952,8 +1039,9 @@ void nfsproc3_fsinfo_3_svc(nfs_fh3 *argp, RPCRequest *req)
     result.fsinfo3res_u.resok.time_delta.nseconds = 1000;
     result.fsinfo3res_u.resok.properties
         = FSF3_LINK | FSF3_SYMLINK | FSF3_HOMOGENEOUS | FSF3_CANSETTIME;
-    bluesky_inode_unref(inode);
 
+    g_mutex_unlock(inode->lock);
+    bluesky_inode_unref(inode);
     async_rpc_send_reply(req, &result);
 }
 
@@ -963,6 +1051,7 @@ void nfsproc3_pathconf_3_svc(nfs_fh3 *argp, RPCRequest *req)
     memset(&result, 0, sizeof(result));
 
     BlueSkyInode *inode = bluesky_get_inode(fs, 1);
+    g_mutex_lock(inode->lock);
     result.status = NFS3_OK;
     result.pathconf3res_u.resok.obj_attributes.present = TRUE;
     encode_fattr3(&result.pathconf3res_u.resok.obj_attributes.post_op_attr_u.attributes, inode);
@@ -972,8 +1061,9 @@ void nfsproc3_pathconf_3_svc(nfs_fh3 *argp, RPCRequest *req)
     result.pathconf3res_u.resok.chown_restricted = TRUE;
     result.pathconf3res_u.resok.case_insensitive = FALSE;
     result.pathconf3res_u.resok.case_preserving = TRUE;
-    bluesky_inode_unref(inode);
 
+    g_mutex_unlock(inode->lock);
+    bluesky_inode_unref(inode);
     async_rpc_send_reply(req, &result);
 }
 
index 01ed7b5..1a442ba 100644 (file)
@@ -653,6 +653,10 @@ typedef struct {
     /* If frag_len is zero: the number of bytes of the fragment header that
      * have been read so far. */
     int frag_hdr_bytes;
+
+    /* Mutex protecting send operations on the socket (to ensure that replies
+     * are not accidentally interleaved). */
+    GMutex *send_lock;
 } RPCConnection;
 
 /* Linked list of cleanup functions to call when a request is completed. */
index a7d108c..07c5d93 100644 (file)
@@ -195,10 +195,12 @@ async_rpc_send_failure(RPCRequest *req, enum accept_stat stat)
     header.verf_len = 0;
     header.accept_stat = htonl(stat);
 
+    g_mutex_lock(req->connection->send_lock);
     uint32_t fragment = htonl(sizeof(header) | 0x80000000);
     async_rpc_write(req->connection, (const char *)&fragment, sizeof(fragment));
     async_rpc_write(req->connection, (const char *)&header, sizeof(header));
     g_io_channel_flush(req->connection->channel, NULL);
+    g_mutex_unlock(req->connection->send_lock);
 
     if (req->args != NULL) {
         char buf[4];
@@ -244,12 +246,14 @@ async_rpc_send_reply(RPCRequest *req, void *result)
     header.verf_len = 0;
     header.accept_stat = 0;
 
+    g_mutex_lock(req->connection->send_lock);
     gsize msg_size = str->len;
     uint32_t fragment = htonl((msg_size + sizeof(header)) | 0x80000000);
     async_rpc_write(req->connection, (const char *)&fragment, sizeof(fragment));
     async_rpc_write(req->connection, (const char *)&header, sizeof(header));
     async_rpc_write(req->connection, str->str, str->len);
     g_io_channel_flush(req->connection->channel, NULL);
+    g_mutex_unlock(req->connection->send_lock);
 
     time_end = bluesky_now_hires();
 
@@ -281,6 +285,31 @@ async_rpc_send_reply(RPCRequest *req, void *result)
     g_free(req);
 }
 
+static const char *nfs_proc_names[] = {
+    [NFSPROC3_NULL] = "NULL",
+    [NFSPROC3_GETATTR] = "GETATTR",
+    [NFSPROC3_SETATTR] = "SETATTR",
+    [NFSPROC3_LOOKUP] = "LOOKUP",
+    [NFSPROC3_ACCESS] = "ACCESS",
+    [NFSPROC3_READLINK] = "READLINK",
+    [NFSPROC3_READ] = "READ",
+    [NFSPROC3_WRITE] = "WRITE",
+    [NFSPROC3_CREATE] = "CREATE",
+    [NFSPROC3_MKDIR] = "MKDIR",
+    [NFSPROC3_SYMLINK] = "SYMLINK",
+    [NFSPROC3_MKNOD] = "MKNOD",
+    [NFSPROC3_REMOVE] = "REMOVE",
+    [NFSPROC3_RMDIR] = "RMDIR",
+    [NFSPROC3_RENAME] = "RENAME",
+    [NFSPROC3_LINK] = "LINK",
+    [NFSPROC3_READDIR] = "READDIR",
+    [NFSPROC3_READDIRPLUS] = "READDIRPLUS",
+    [NFSPROC3_FSSTAT] = "FSSTAT",
+    [NFSPROC3_FSINFO] = "FSINFO",
+    [NFSPROC3_PATHCONF] = "PATHCONF",
+    [NFSPROC3_COMMIT] = "COMMIT",
+};
+
 static void
 nfs_program_3(RPCRequest *req)
 {
@@ -316,7 +345,12 @@ nfs_program_3(RPCRequest *req)
     xdrproc_t _xdr_argument, _xdr_result;
     char *(*local)(char *, RPCRequest *);
 
-    printf("Dispatched NFS RPC message type %d\n", req->req_proc);
+    if (req->req_proc < sizeof(nfs_proc_names) / sizeof(const char *)) {
+        printf("Dispatched NFS RPC message type %s\n",
+               nfs_proc_names[req->req_proc]);
+    } else {
+        printf("Dispatched unknown NFS RPC message type %d\n", req->req_proc);
+    }
 
     switch (req->req_proc) {
     case NFSPROC3_NULL:
@@ -482,17 +516,26 @@ nfs_program_3(RPCRequest *req)
 static GMainContext *main_context;
 static GMainLoop *main_loop;
 
+static GThreadPool *rpc_thread_pool;
+
 static gboolean async_flushd(gpointer data)
 {
     bluesky_flushd_invoke(fs);
     return TRUE;
 }
 
+static void async_rpc_task(gpointer data, gpointer user_data)
+{
+    nfs_program_3((RPCRequest *)data);
+}
+
 static async_rpc_init()
 {
     main_context = g_main_context_new();
     main_loop = g_main_loop_new(main_context, FALSE);
 
+    rpc_thread_pool = g_thread_pool_new(async_rpc_task, NULL, -1, FALSE, NULL);
+
     /* Arrange to have the cache writeback code run every five seconds. */
     GSource *source = g_timeout_source_new_seconds(5);
     g_source_set_callback(source, async_flushd, NULL, NULL);
@@ -579,7 +622,7 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc)
     req->req_proc = ntohl(header->proc);
     rpc->msgbuf = g_string_new("");
 
-    nfs_program_3(req);
+    g_thread_pool_push(rpc_thread_pool, req, NULL);
 
     return TRUE;
 }
@@ -651,6 +694,7 @@ static gboolean async_rpc_do_read(GIOChannel *channel,
         fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n",
                 g_io_channel_unix_get_fd(rpc->channel));
         g_io_channel_shutdown(rpc->channel, TRUE, NULL);
+        /* TODO: Clean up connection object. */
         return FALSE;
     }
 
@@ -709,6 +753,7 @@ static gboolean async_rpc_do_accept(GIOChannel *channel,
     rpc->channel = g_io_channel_unix_new(nfd);
     rpc->msgbuf = g_string_new("");
     g_io_channel_set_encoding(rpc->channel, NULL, NULL);
+    rpc->send_lock = g_mutex_new();
     GSource *source = g_io_create_watch(rpc->channel, G_IO_IN);
     g_source_set_callback(source, (GSourceFunc)async_rpc_do_read,
                           rpc, NULL);