From: Michael Vrable Date: Tue, 26 Jan 2010 02:31:07 +0000 (-0800) Subject: Implement multi-threaded request processing and locking for NFS. X-Git-Url: https://git.vrable.net/?a=commitdiff_plain;h=50c08ba526a6638e8e3c4eec0503365a2c110a85;p=bluesky.git Implement multi-threaded request processing and locking for NFS. --- diff --git a/bluesky/bluesky.h b/bluesky/bluesky.h index 201cc17..6324ce2 100644 --- a/bluesky/bluesky.h +++ b/bluesky/bluesky.h @@ -121,6 +121,10 @@ bluesky_time_hires bluesky_now_hires(); * dropping from the cache. * - Any pending operations should hold extra references to the inode as * appropriate to keep it available until the operation completes. + * - Locking dependency order is, when multiple locks are to be acquired, to + * acquire locks on parents in the filesystem tree before children. + * (TODO: What about rename when we acquire locks in unrelated parts of the + * filesystem?) * */ typedef struct { GMutex *lock; diff --git a/bluesky/inode.c b/bluesky/inode.c index 4a158c7..9b6160a 100644 --- a/bluesky/inode.c +++ b/bluesky/inode.c @@ -172,8 +172,8 @@ BlueSkyInode *bluesky_new_inode(uint64_t inum, BlueSkyFS *fs, /* Retrieve an inode from the filesystem. Eventually this will be a cache and * so we might need to go fetch the inode from elsewhere; for now all - * filesystem state is stored here. inode is returned locked with a reference - * held. */ + * filesystem state is stored here. inode is returned with a reference held + * but not locked. */ BlueSkyInode *bluesky_get_inode(BlueSkyFS *fs, uint64_t inum) { BlueSkyInode *inode = NULL; diff --git a/nfs3/nfs3.c b/nfs3/nfs3.c index 8be68bd..72ec9cb 100644 --- a/nfs3/nfs3.c +++ b/nfs3/nfs3.c @@ -104,8 +104,7 @@ void set_attributes(BlueSkyInode *inode, sattr3 *attributes) break; } - inode->ctime = now; - inode->change_count++; + bluesky_inode_update_ctime(inode, FALSE); } /* Copy inode attributes into NFS response. The BlueSkyInode should be locked @@ -162,7 +161,9 @@ void nfsproc3_getattr_3_svc(nfs_fh3 *argp, RPCRequest *req) BlueSkyInode *inode = lookup_fh(req, argp); if (inode != NULL) { result.status = NFS3_OK; + g_mutex_lock(inode->lock); encode_fattr3(&result.getattr3res_u.attributes, inode); + g_mutex_unlock(inode->lock); } else { result.status = NFS3ERR_STALE; } @@ -184,12 +185,14 @@ void nfsproc3_setattr_3_svc(setattr3args *argp, RPCRequest *req) return; } + g_mutex_lock(inode->lock); encode_pre_wcc(&result.wccstat3_u.wcc, inode); if (argp->guard.check) { if (inode->ctime != decode_nfstime3(&argp->guard.sattrguard3_u.ctime)) { result.status = NFS3ERR_NOT_SYNC; result.wccstat3_u.wcc.after.present = TRUE; encode_fattr3(&result.wccstat3_u.wcc.after.post_op_attr_u.attributes, inode); + g_mutex_unlock(inode->lock); async_rpc_send_reply(req, &result); return; } @@ -202,6 +205,7 @@ void nfsproc3_setattr_3_svc(setattr3args *argp, RPCRequest *req) inode); result.status = NFS3_OK; + g_mutex_unlock(inode->lock); async_rpc_send_reply(req, &result); } @@ -218,6 +222,7 @@ void nfsproc3_lookup_3_svc(diropargs3 *argp, RPCRequest *req) return; } + g_mutex_lock(dir->lock); result.lookup3res_u.resfail.present = TRUE; encode_fattr3(&result.lookup3res_u.resfail.post_op_attr_u.attributes, dir); if (!validate_filename(argp->name)) { @@ -225,6 +230,7 @@ void nfsproc3_lookup_3_svc(diropargs3 *argp, RPCRequest *req) result.status = NFS3ERR_NAMETOOLONG; else result.status = NFS3ERR_NOENT; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -233,15 +239,18 @@ void nfsproc3_lookup_3_svc(diropargs3 *argp, RPCRequest *req) uint64_t inum = bluesky_directory_lookup(dir, argp->name); if (inum == 0) { result.status = NFS3ERR_NOENT; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } BlueSkyInode *inode = bluesky_get_inode(fs, inum); if (inode == NULL) { result.status = NFS3ERR_NOENT; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } + g_mutex_lock(inode->lock); schedule_inode_unref(req, inode); result.status = NFS3_OK; @@ -255,6 +264,8 @@ void nfsproc3_lookup_3_svc(diropargs3 *argp, RPCRequest *req) result.lookup3res_u.resok.object.data.data_len = 8; result.lookup3res_u.resok.object.data.data_val = (char *)&fh_bytes; + g_mutex_unlock(inode->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); } @@ -271,10 +282,12 @@ void nfsproc3_access_3_svc(access3args *argp, RPCRequest *req) return; } + g_mutex_lock(inode->lock); result.status = NFS3_OK; result.access3res_u.resok.obj_attributes.present = TRUE; encode_fattr3(&result.access3res_u.resok.obj_attributes.post_op_attr_u.attributes, inode); result.access3res_u.resok.access = argp->access; + g_mutex_unlock(inode->lock); async_rpc_send_reply(req, &result); } @@ -286,6 +299,7 @@ void nfsproc3_readlink_3_svc(nfs_fh3 *argp, RPCRequest *req) BlueSkyInode *inode = lookup_fh(req, argp); if (inode != NULL) { + g_mutex_lock(inode->lock); if (inode->type == BLUESKY_SYMLINK) { result.status = NFS3_OK; result.readlink3res_u.resok.symlink_attributes.present = TRUE; @@ -296,6 +310,7 @@ void nfsproc3_readlink_3_svc(nfs_fh3 *argp, RPCRequest *req) result.readlink3res_u.resfail.present = TRUE; encode_fattr3(&result.readlink3res_u.resfail.post_op_attr_u.attributes, inode); } + g_mutex_unlock(inode->lock); } else { result.status = NFS3ERR_STALE; } @@ -317,6 +332,8 @@ void nfsproc3_read_3_svc(read3args *argp, RPCRequest *req) return; } + g_mutex_lock(inode->lock); + int count = argp->count; if (argp->offset >= inode->size) { count = 0; @@ -338,6 +355,8 @@ void nfsproc3_read_3_svc(read3args *argp, RPCRequest *req) result.read3res_u.resok.data.data_val = buf; result.read3res_u.resok.data.data_len = count; + g_mutex_unlock(inode->lock); + async_rpc_send_reply(req, &result); } @@ -356,10 +375,13 @@ void nfsproc3_write_3_svc(write3args *argp, RPCRequest *req) return; } + g_mutex_lock(inode->lock); + encode_pre_wcc(&wcc, inode); if (inode->type != BLUESKY_REGULAR) { result.status = NFS3ERR_INVAL; result.write3res_u.resfail = wcc; + g_mutex_unlock(inode->lock); async_rpc_send_reply(req, &result); return; } @@ -382,6 +404,8 @@ void nfsproc3_write_3_svc(write3args *argp, RPCRequest *req) result.write3res_u.resok.count = argp->count; result.write3res_u.resok.committed = FILE_SYNC; + g_mutex_unlock(inode->lock); + async_rpc_send_reply(req, &result); } @@ -400,10 +424,13 @@ void nfsproc3_create_3_svc(create3args *argp, RPCRequest *req) return; } + g_mutex_lock(dir->lock); + encode_pre_wcc(&wcc, dir); if (dir->type != BLUESKY_DIRECTORY) { result.status = NFS3ERR_NOTDIR; result.diropres3_u.resfail = wcc; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -414,6 +441,7 @@ void nfsproc3_create_3_svc(create3args *argp, RPCRequest *req) { result.status = NFS3ERR_EXIST; result.diropres3_u.resfail = wcc; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -428,11 +456,11 @@ void nfsproc3_create_3_svc(create3args *argp, RPCRequest *req) file->ctime = time; file->atime = time; file->ntime = time; + g_mutex_lock(file->lock); bluesky_insert_inode(fs, file); bluesky_directory_insert(dir, argp->where.name, file->inum); - dir->mtime = dir->ctime = bluesky_get_current_time(); - dir->change_count++; + bluesky_inode_update_ctime(dir, TRUE); wcc.after.present = TRUE; encode_fattr3(&wcc.after.post_op_attr_u.attributes, dir); @@ -446,6 +474,9 @@ void nfsproc3_create_3_svc(create3args *argp, RPCRequest *req) result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_len = 8; result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_val = (char *)&fh_bytes; + g_mutex_unlock(file->lock); + g_mutex_unlock(dir->lock); + async_rpc_send_reply(req, &result); } @@ -464,10 +495,13 @@ void nfsproc3_mkdir_3_svc(mkdir3args *argp, RPCRequest *req) return; } + g_mutex_lock(dir->lock); + encode_pre_wcc(&wcc, dir); if (dir->type != BLUESKY_DIRECTORY) { result.status = NFS3ERR_NOTDIR; result.diropres3_u.resfail = wcc; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -478,6 +512,7 @@ void nfsproc3_mkdir_3_svc(mkdir3args *argp, RPCRequest *req) { result.status = NFS3ERR_EXIST; result.diropres3_u.resfail = wcc; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -491,12 +526,12 @@ void nfsproc3_mkdir_3_svc(mkdir3args *argp, RPCRequest *req) file->ctime = time; file->atime = time; file->ntime = time; + g_mutex_lock(file->lock); bluesky_insert_inode(fs, file); bluesky_directory_insert(dir, argp->where.name, file->inum); set_attributes(file, &argp->attributes); - dir->mtime = dir->ctime = bluesky_get_current_time(); - dir->change_count++; + bluesky_inode_update_ctime(dir, TRUE); wcc.after.present = TRUE; encode_fattr3(&wcc.after.post_op_attr_u.attributes, dir); @@ -510,6 +545,8 @@ void nfsproc3_mkdir_3_svc(mkdir3args *argp, RPCRequest *req) result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_len = 8; result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_val = (char *)&fh_bytes; + g_mutex_unlock(file->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); } @@ -527,11 +564,13 @@ void nfsproc3_symlink_3_svc(symlink3args *argp, RPCRequest *req) async_rpc_send_reply(req, &result); return; } + g_mutex_lock(dir->lock); encode_pre_wcc(&wcc, dir); if (dir->type != BLUESKY_DIRECTORY) { result.status = NFS3ERR_NOTDIR; result.diropres3_u.resfail = wcc; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -542,6 +581,7 @@ void nfsproc3_symlink_3_svc(symlink3args *argp, RPCRequest *req) { result.status = NFS3ERR_EXIST; result.diropres3_u.resfail = wcc; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -556,11 +596,11 @@ void nfsproc3_symlink_3_svc(symlink3args *argp, RPCRequest *req) file->atime = time; file->ntime = time; file->symlink_contents = g_strdup(argp->symlink.symlink_data); + g_mutex_lock(file->lock); bluesky_insert_inode(fs, file); bluesky_directory_insert(dir, argp->where.name, file->inum); - dir->mtime = dir->ctime = bluesky_get_current_time(); - dir->change_count++; + bluesky_inode_update_ctime(dir, TRUE); wcc.after.present = TRUE; encode_fattr3(&wcc.after.post_op_attr_u.attributes, dir); @@ -574,6 +614,8 @@ void nfsproc3_symlink_3_svc(symlink3args *argp, RPCRequest *req) result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_len = 8; result.diropres3_u.resok.obj.post_op_fh3_u.handle.data.data_val = (char *)&fh_bytes; + g_mutex_unlock(file->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); } @@ -599,6 +641,8 @@ void nfsproc3_remove_3_svc(diropargs3 *argp, RPCRequest *req) return; } + g_mutex_lock(dir->lock); + encode_pre_wcc(&result.wccstat3_u.wcc, dir); if (!validate_filename(argp->name) @@ -606,6 +650,7 @@ void nfsproc3_remove_3_svc(diropargs3 *argp, RPCRequest *req) || strcmp(argp->name, "..") == 0) { result.status = NFS3ERR_NOENT; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -619,6 +664,7 @@ void nfsproc3_remove_3_svc(diropargs3 *argp, RPCRequest *req) encode_fattr3(&result.wccstat3_u.wcc.after.post_op_attr_u.attributes, dir); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); } @@ -634,6 +680,8 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req) return; } + g_mutex_lock(dir->lock); + encode_pre_wcc(&result.wccstat3_u.wcc, dir); if (!validate_filename(argp->name) @@ -641,6 +689,7 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req) || strcmp(argp->name, "..") == 0) { result.status = NFS3ERR_NOENT; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -649,13 +698,17 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req) BlueSkyInode *inode = bluesky_get_inode(fs, inum); if (inode == NULL) { result.status = NFS3ERR_NOENT; + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } + g_mutex_lock(inode->lock); schedule_inode_unref(req, inode); if (inode->type != BLUESKY_DIRECTORY) { result.status = NFS3ERR_NOTDIR; + g_mutex_unlock(inode->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -663,6 +716,8 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req) printf("Directory not empty: %d entries\n", g_sequence_get_length(inode->dirents)); result.status = NFS3ERR_NOTEMPTY; + g_mutex_unlock(inode->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -676,6 +731,8 @@ void nfsproc3_rmdir_3_svc(diropargs3 *argp, RPCRequest *req) encode_fattr3(&result.wccstat3_u.wcc.after.post_op_attr_u.attributes, dir); + g_mutex_unlock(inode->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); } @@ -692,14 +749,17 @@ void nfsproc3_rename_3_svc(rename3args *argp, RPCRequest *req) async_rpc_send_reply(req, &result); return; } + g_mutex_lock(dir1->lock); encode_pre_wcc(wcc1, dir1); BlueSkyInode *dir2 = lookup_fh(req, &argp->to.dir); if (dir2 == NULL) { result.status = NFS3ERR_STALE; + g_mutex_unlock(dir1->lock); async_rpc_send_reply(req, &result); return; } + g_mutex_lock(dir2->lock); encode_pre_wcc(wcc2, dir1); gboolean status = bluesky_rename(dir1, argp->from.name, @@ -715,6 +775,8 @@ void nfsproc3_rename_3_svc(rename3args *argp, RPCRequest *req) else result.status = NFS3ERR_PERM; + g_mutex_unlock(dir2->lock); + g_mutex_unlock(dir1->lock); async_rpc_send_reply(req, &result); } @@ -732,19 +794,24 @@ void nfsproc3_link_3_svc(link3args *argp, RPCRequest *req) async_rpc_send_reply(req, &result); return; } + g_mutex_lock(inode->lock); BlueSkyInode *dir = lookup_fh(req, &argp->link.dir); if (dir == NULL) { result.status = NFS3ERR_STALE; result.link3res_u.res.linkdir_wcc = wcc; + g_mutex_unlock(inode->lock); async_rpc_send_reply(req, &result); return; } + g_mutex_lock(dir->lock); encode_pre_wcc(&wcc, dir); if (dir->type != BLUESKY_DIRECTORY) { result.status = NFS3ERR_NOTDIR; result.link3res_u.res.linkdir_wcc = wcc; + g_mutex_unlock(inode->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -756,6 +823,8 @@ void nfsproc3_link_3_svc(link3args *argp, RPCRequest *req) { result.status = NFS3ERR_EXIST; result.link3res_u.res.linkdir_wcc = wcc; + g_mutex_unlock(inode->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } @@ -763,11 +832,14 @@ void nfsproc3_link_3_svc(link3args *argp, RPCRequest *req) if (!bluesky_directory_insert(dir, argp->link.name, inode->inum)) { result.status = NFS3ERR_EXIST; result.link3res_u.res.linkdir_wcc = wcc; + g_mutex_unlock(inode->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); return; } inode->nlink++; - bluesky_inode_update_ctime(inode, 0); + bluesky_inode_update_ctime(inode, FALSE); + bluesky_inode_update_ctime(dir, TRUE); result.status = NFS3_OK; wcc.after.present = TRUE; @@ -776,6 +848,8 @@ void nfsproc3_link_3_svc(link3args *argp, RPCRequest *req) encode_fattr3(&result.link3res_u.res.file_attributes.post_op_attr_u.attributes, inode); result.link3res_u.res.linkdir_wcc = wcc; + g_mutex_unlock(inode->lock); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); } @@ -795,6 +869,7 @@ void nfsproc3_readdir_3_svc(readdir3args *argp, RPCRequest *req) async_rpc_send_reply(req, &result); return; } + g_mutex_lock(dir->lock); result.status = NFS3_OK; result.readdir3res_u.resok.dir_attributes.present = TRUE; @@ -827,6 +902,7 @@ void nfsproc3_readdir_3_svc(readdir3args *argp, RPCRequest *req) result.readdir3res_u.resok.reply.entries = NULL; result.readdir3res_u.resok.reply.eof = g_sequence_iter_is_end(i); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); } @@ -849,6 +925,7 @@ void nfsproc3_readdirplus_3_svc(readdirplus3args *argp, RPCRequest *req) async_rpc_send_reply(req, &result); return; } + g_mutex_lock(dir->lock); result.status = NFS3_OK; result.readdirplus3res_u.resok.dir_attributes.present = TRUE; @@ -869,12 +946,17 @@ void nfsproc3_readdirplus_3_svc(readdirplus3args *argp, RPCRequest *req) while (count < MAX_READDIR_DIRENTS && !g_sequence_iter_is_end(i)) { BlueSkyDirent *d = g_sequence_get(i); BlueSkyInode *inode = bluesky_get_inode(fs, d->inum); + g_mutex_lock(inode->lock); if (inode != NULL) { dircount += 24 + ((strlen(d->name) + 3) & ~3); attrcount += 88 + 8 + 8; if (dircount > argp->dircount || dircount + attrcount > argp->maxcount) + { + g_mutex_unlock(inode->lock); + bluesky_inode_unref(inode); break; + } dirents[count].fileid = d->inum; dirents[count].name = d->name; dirents[count].cookie = d->cookie; @@ -889,6 +971,7 @@ void nfsproc3_readdirplus_3_svc(readdirplus3args *argp, RPCRequest *req) if (count > 0) dirents[count - 1].nextentry = &dirents[count]; count++; + g_mutex_unlock(inode->lock); bluesky_inode_unref(inode); } i = g_sequence_iter_next(i); @@ -900,6 +983,7 @@ void nfsproc3_readdirplus_3_svc(readdirplus3args *argp, RPCRequest *req) result.readdirplus3res_u.resok.reply.entries = NULL; result.readdirplus3res_u.resok.reply.eof = g_sequence_iter_is_end(i); + g_mutex_unlock(dir->lock); async_rpc_send_reply(req, &result); } @@ -915,6 +999,7 @@ void nfsproc3_fsstat_3_svc(nfs_fh3 *argp, RPCRequest *req) async_rpc_send_reply(req, &result); return; } + g_mutex_lock(inode->lock); result.status = NFS3_OK; result.fsstat3res_u.resok.obj_attributes.present = TRUE; @@ -928,6 +1013,7 @@ void nfsproc3_fsstat_3_svc(nfs_fh3 *argp, RPCRequest *req) result.fsstat3res_u.resok.afiles = 0; result.fsstat3res_u.resok.invarsec = 0; + g_mutex_unlock(inode->lock); async_rpc_send_reply(req, &result); } @@ -937,6 +1023,7 @@ void nfsproc3_fsinfo_3_svc(nfs_fh3 *argp, RPCRequest *req) memset(&result, 0, sizeof(result)); BlueSkyInode *inode = bluesky_get_inode(fs, 1); + g_mutex_lock(inode->lock); result.status = NFS3_OK; result.fsinfo3res_u.resok.obj_attributes.present = TRUE; encode_fattr3(&result.fsinfo3res_u.resok.obj_attributes.post_op_attr_u.attributes, inode); @@ -952,8 +1039,9 @@ void nfsproc3_fsinfo_3_svc(nfs_fh3 *argp, RPCRequest *req) result.fsinfo3res_u.resok.time_delta.nseconds = 1000; result.fsinfo3res_u.resok.properties = FSF3_LINK | FSF3_SYMLINK | FSF3_HOMOGENEOUS | FSF3_CANSETTIME; - bluesky_inode_unref(inode); + g_mutex_unlock(inode->lock); + bluesky_inode_unref(inode); async_rpc_send_reply(req, &result); } @@ -963,6 +1051,7 @@ void nfsproc3_pathconf_3_svc(nfs_fh3 *argp, RPCRequest *req) memset(&result, 0, sizeof(result)); BlueSkyInode *inode = bluesky_get_inode(fs, 1); + g_mutex_lock(inode->lock); result.status = NFS3_OK; result.pathconf3res_u.resok.obj_attributes.present = TRUE; encode_fattr3(&result.pathconf3res_u.resok.obj_attributes.post_op_attr_u.attributes, inode); @@ -972,8 +1061,9 @@ void nfsproc3_pathconf_3_svc(nfs_fh3 *argp, RPCRequest *req) result.pathconf3res_u.resok.chown_restricted = TRUE; result.pathconf3res_u.resok.case_insensitive = FALSE; result.pathconf3res_u.resok.case_preserving = TRUE; - bluesky_inode_unref(inode); + g_mutex_unlock(inode->lock); + bluesky_inode_unref(inode); async_rpc_send_reply(req, &result); } diff --git a/nfs3/nfs3_prot.h b/nfs3/nfs3_prot.h index 01ed7b5..1a442ba 100644 --- a/nfs3/nfs3_prot.h +++ b/nfs3/nfs3_prot.h @@ -653,6 +653,10 @@ typedef struct { /* If frag_len is zero: the number of bytes of the fragment header that * have been read so far. */ int frag_hdr_bytes; + + /* Mutex protecting send operations on the socket (to ensure that replies + * are not accidentally interleaved). */ + GMutex *send_lock; } RPCConnection; /* Linked list of cleanup functions to call when a request is completed. */ diff --git a/nfs3/rpc.c b/nfs3/rpc.c index a7d108c..07c5d93 100644 --- a/nfs3/rpc.c +++ b/nfs3/rpc.c @@ -195,10 +195,12 @@ async_rpc_send_failure(RPCRequest *req, enum accept_stat stat) header.verf_len = 0; header.accept_stat = htonl(stat); + g_mutex_lock(req->connection->send_lock); uint32_t fragment = htonl(sizeof(header) | 0x80000000); async_rpc_write(req->connection, (const char *)&fragment, sizeof(fragment)); async_rpc_write(req->connection, (const char *)&header, sizeof(header)); g_io_channel_flush(req->connection->channel, NULL); + g_mutex_unlock(req->connection->send_lock); if (req->args != NULL) { char buf[4]; @@ -244,12 +246,14 @@ async_rpc_send_reply(RPCRequest *req, void *result) header.verf_len = 0; header.accept_stat = 0; + g_mutex_lock(req->connection->send_lock); gsize msg_size = str->len; uint32_t fragment = htonl((msg_size + sizeof(header)) | 0x80000000); async_rpc_write(req->connection, (const char *)&fragment, sizeof(fragment)); async_rpc_write(req->connection, (const char *)&header, sizeof(header)); async_rpc_write(req->connection, str->str, str->len); g_io_channel_flush(req->connection->channel, NULL); + g_mutex_unlock(req->connection->send_lock); time_end = bluesky_now_hires(); @@ -281,6 +285,31 @@ async_rpc_send_reply(RPCRequest *req, void *result) g_free(req); } +static const char *nfs_proc_names[] = { + [NFSPROC3_NULL] = "NULL", + [NFSPROC3_GETATTR] = "GETATTR", + [NFSPROC3_SETATTR] = "SETATTR", + [NFSPROC3_LOOKUP] = "LOOKUP", + [NFSPROC3_ACCESS] = "ACCESS", + [NFSPROC3_READLINK] = "READLINK", + [NFSPROC3_READ] = "READ", + [NFSPROC3_WRITE] = "WRITE", + [NFSPROC3_CREATE] = "CREATE", + [NFSPROC3_MKDIR] = "MKDIR", + [NFSPROC3_SYMLINK] = "SYMLINK", + [NFSPROC3_MKNOD] = "MKNOD", + [NFSPROC3_REMOVE] = "REMOVE", + [NFSPROC3_RMDIR] = "RMDIR", + [NFSPROC3_RENAME] = "RENAME", + [NFSPROC3_LINK] = "LINK", + [NFSPROC3_READDIR] = "READDIR", + [NFSPROC3_READDIRPLUS] = "READDIRPLUS", + [NFSPROC3_FSSTAT] = "FSSTAT", + [NFSPROC3_FSINFO] = "FSINFO", + [NFSPROC3_PATHCONF] = "PATHCONF", + [NFSPROC3_COMMIT] = "COMMIT", +}; + static void nfs_program_3(RPCRequest *req) { @@ -316,7 +345,12 @@ nfs_program_3(RPCRequest *req) xdrproc_t _xdr_argument, _xdr_result; char *(*local)(char *, RPCRequest *); - printf("Dispatched NFS RPC message type %d\n", req->req_proc); + if (req->req_proc < sizeof(nfs_proc_names) / sizeof(const char *)) { + printf("Dispatched NFS RPC message type %s\n", + nfs_proc_names[req->req_proc]); + } else { + printf("Dispatched unknown NFS RPC message type %d\n", req->req_proc); + } switch (req->req_proc) { case NFSPROC3_NULL: @@ -482,17 +516,26 @@ nfs_program_3(RPCRequest *req) static GMainContext *main_context; static GMainLoop *main_loop; +static GThreadPool *rpc_thread_pool; + static gboolean async_flushd(gpointer data) { bluesky_flushd_invoke(fs); return TRUE; } +static void async_rpc_task(gpointer data, gpointer user_data) +{ + nfs_program_3((RPCRequest *)data); +} + static async_rpc_init() { main_context = g_main_context_new(); main_loop = g_main_loop_new(main_context, FALSE); + rpc_thread_pool = g_thread_pool_new(async_rpc_task, NULL, -1, FALSE, NULL); + /* Arrange to have the cache writeback code run every five seconds. */ GSource *source = g_timeout_source_new_seconds(5); g_source_set_callback(source, async_flushd, NULL, NULL); @@ -579,7 +622,7 @@ static gboolean async_rpc_dispatch(RPCConnection *rpc) req->req_proc = ntohl(header->proc); rpc->msgbuf = g_string_new(""); - nfs_program_3(req); + g_thread_pool_push(rpc_thread_pool, req, NULL); return TRUE; } @@ -651,6 +694,7 @@ static gboolean async_rpc_do_read(GIOChannel *channel, fprintf(stderr, "Unexpected error or end of file on RPC stream %d!\n", g_io_channel_unix_get_fd(rpc->channel)); g_io_channel_shutdown(rpc->channel, TRUE, NULL); + /* TODO: Clean up connection object. */ return FALSE; } @@ -709,6 +753,7 @@ static gboolean async_rpc_do_accept(GIOChannel *channel, rpc->channel = g_io_channel_unix_new(nfd); rpc->msgbuf = g_string_new(""); g_io_channel_set_encoding(rpc->channel, NULL, NULL); + rpc->send_lock = g_mutex_new(); GSource *source = g_io_create_watch(rpc->channel, G_IO_IN); g_source_set_callback(source, (GSourceFunc)async_rpc_do_read, rpc, NULL);