NFS: Add an iocounter wait function for async RPC tasks

By sleeping on a new NFS Unlock-On-Close waitqueue, rpc tasks may wait for
a lock context's iocounter to reach zero.  The rpc waitqueue is only woken
when the open_context has the NFS_CONTEXT_UNLOCK flag set in order to
mitigate spurious wake-ups for any iocounter reaching zero.

Signed-off-by: Benjamin Coddington <bcodding@redhat.com>
Reviewed-by: Jeff Layton <jlayton@redhat.com>
Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index 3ffbffe..3e7b2e6 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -218,6 +218,7 @@ static void nfs_cb_idr_remove_locked(struct nfs_client *clp)
 static void pnfs_init_server(struct nfs_server *server)
 {
 	rpc_init_wait_queue(&server->roc_rpcwaitq, "pNFS ROC");
+	rpc_init_wait_queue(&server->uoc_rpcwaitq, "NFS UOC");
 }
 
 #else
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index f536106..ad92b40 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -102,6 +102,35 @@ nfs_iocounter_wait(struct nfs_lock_context *l_ctx)
 			TASK_KILLABLE);
 }
 
+/**
+ * nfs_async_iocounter_wait - wait on a rpc_waitqueue for I/O
+ * to complete
+ * @task: the rpc_task that should wait
+ * @l_ctx: nfs_lock_context with io_counter to check
+ *
+ * Returns true if there is outstanding I/O to wait on and the
+ * task has been put to sleep.
+ */
+bool
+nfs_async_iocounter_wait(struct rpc_task *task, struct nfs_lock_context *l_ctx)
+{
+	struct inode *inode = d_inode(l_ctx->open_context->dentry);
+	bool ret = false;
+
+	if (atomic_read(&l_ctx->io_count) > 0) {
+		rpc_sleep_on(&NFS_SERVER(inode)->uoc_rpcwaitq, task, NULL);
+		ret = true;
+	}
+
+	if (atomic_read(&l_ctx->io_count) == 0) {
+		rpc_wake_up_queued_task(&NFS_SERVER(inode)->uoc_rpcwaitq, task);
+		ret = false;
+	}
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(nfs_async_iocounter_wait);
+
 /*
  * nfs_page_group_lock - lock the head of the page group
  * @req - request in group that is to be locked
@@ -385,8 +414,11 @@ static void nfs_clear_request(struct nfs_page *req)
 		req->wb_page = NULL;
 	}
 	if (l_ctx != NULL) {
-		if (atomic_dec_and_test(&l_ctx->io_count))
+		if (atomic_dec_and_test(&l_ctx->io_count)) {
 			wake_up_atomic_t(&l_ctx->io_count);
+			if (test_bit(NFS_CONTEXT_UNLOCK, &ctx->flags))
+				rpc_wake_up(&NFS_SERVER(d_inode(ctx->dentry))->uoc_rpcwaitq);
+		}
 		nfs_put_lock_context(l_ctx);
 		req->wb_lock_context = NULL;
 	}