SUNRPC: split SUNPRC PipeFS pipe data and inode creation

Generally, pipe data is used only for pipes, and thus allocating space for it
on every RPC inode allocation is redundant. This patch splits private SUNRPC
PipeFS pipe data and inode, makes pipe data allocated only for pipe inodes.
This patch is also is a next step towards to to removing PipeFS inode
references from kernel code other than PipeFS itself.

Signed-off-by: Stanislav Kinsbursky <skinsbursky@parallels.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c
index 16d9b9a..b6f6555 100644
--- a/net/sunrpc/rpc_pipe.c
+++ b/net/sunrpc/rpc_pipe.c
@@ -61,7 +61,7 @@
 }
 EXPORT_SYMBOL_GPL(rpc_pipefs_notifier_unregister);
 
-static void rpc_purge_list(struct rpc_inode *rpci, struct list_head *head,
+static void rpc_purge_list(struct rpc_pipe *pipe, struct list_head *head,
 		void (*destroy_msg)(struct rpc_pipe_msg *), int err)
 {
 	struct rpc_pipe_msg *msg;
@@ -74,29 +74,29 @@
 		msg->errno = err;
 		destroy_msg(msg);
 	} while (!list_empty(head));
-	wake_up(&rpci->waitq);
+	wake_up(&pipe->waitq);
 }
 
 static void
 rpc_timeout_upcall_queue(struct work_struct *work)
 {
 	LIST_HEAD(free_list);
-	struct rpc_inode *rpci =
-		container_of(work, struct rpc_inode, queue_timeout.work);
+	struct rpc_pipe *pipe =
+		container_of(work, struct rpc_pipe, queue_timeout.work);
 	void (*destroy_msg)(struct rpc_pipe_msg *);
 
-	spin_lock(&rpci->lock);
-	if (rpci->ops == NULL) {
-		spin_unlock(&rpci->lock);
+	spin_lock(&pipe->lock);
+	if (pipe->ops == NULL) {
+		spin_unlock(&pipe->lock);
 		return;
 	}
-	destroy_msg = rpci->ops->destroy_msg;
-	if (rpci->nreaders == 0) {
-		list_splice_init(&rpci->pipe, &free_list);
-		rpci->pipelen = 0;
+	destroy_msg = pipe->ops->destroy_msg;
+	if (pipe->nreaders == 0) {
+		list_splice_init(&pipe->pipe, &free_list);
+		pipe->pipelen = 0;
 	}
-	spin_unlock(&rpci->lock);
-	rpc_purge_list(rpci, &free_list, destroy_msg, -ETIMEDOUT);
+	spin_unlock(&pipe->lock);
+	rpc_purge_list(pipe, &free_list, destroy_msg, -ETIMEDOUT);
 }
 
 ssize_t rpc_pipe_generic_upcall(struct file *filp, struct rpc_pipe_msg *msg,
@@ -135,25 +135,25 @@
 	struct rpc_inode *rpci = RPC_I(inode);
 	int res = -EPIPE;
 
-	spin_lock(&rpci->lock);
-	if (rpci->ops == NULL)
+	spin_lock(&rpci->pipe->lock);
+	if (rpci->pipe->ops == NULL)
 		goto out;
-	if (rpci->nreaders) {
-		list_add_tail(&msg->list, &rpci->pipe);
-		rpci->pipelen += msg->len;
+	if (rpci->pipe->nreaders) {
+		list_add_tail(&msg->list, &rpci->pipe->pipe);
+		rpci->pipe->pipelen += msg->len;
 		res = 0;
-	} else if (rpci->flags & RPC_PIPE_WAIT_FOR_OPEN) {
-		if (list_empty(&rpci->pipe))
+	} else if (rpci->pipe->flags & RPC_PIPE_WAIT_FOR_OPEN) {
+		if (list_empty(&rpci->pipe->pipe))
 			queue_delayed_work(rpciod_workqueue,
-					&rpci->queue_timeout,
+					&rpci->pipe->queue_timeout,
 					RPC_UPCALL_TIMEOUT);
-		list_add_tail(&msg->list, &rpci->pipe);
-		rpci->pipelen += msg->len;
+		list_add_tail(&msg->list, &rpci->pipe->pipe);
+		rpci->pipe->pipelen += msg->len;
 		res = 0;
 	}
 out:
-	spin_unlock(&rpci->lock);
-	wake_up(&rpci->waitq);
+	spin_unlock(&rpci->pipe->lock);
+	wake_up(&rpci->pipe->waitq);
 	return res;
 }
 EXPORT_SYMBOL_GPL(rpc_queue_upcall);
@@ -167,27 +167,27 @@
 static void
 rpc_close_pipes(struct inode *inode)
 {
-	struct rpc_inode *rpci = RPC_I(inode);
+	struct rpc_pipe *pipe = RPC_I(inode)->pipe;
 	const struct rpc_pipe_ops *ops;
 	int need_release;
 
 	mutex_lock(&inode->i_mutex);
-	ops = rpci->ops;
+	ops = pipe->ops;
 	if (ops != NULL) {
 		LIST_HEAD(free_list);
-		spin_lock(&rpci->lock);
-		need_release = rpci->nreaders != 0 || rpci->nwriters != 0;
-		rpci->nreaders = 0;
-		list_splice_init(&rpci->in_upcall, &free_list);
-		list_splice_init(&rpci->pipe, &free_list);
-		rpci->pipelen = 0;
-		rpci->ops = NULL;
-		spin_unlock(&rpci->lock);
-		rpc_purge_list(rpci, &free_list, ops->destroy_msg, -EPIPE);
-		rpci->nwriters = 0;
+		spin_lock(&pipe->lock);
+		need_release = pipe->nreaders != 0 || pipe->nwriters != 0;
+		pipe->nreaders = 0;
+		list_splice_init(&pipe->in_upcall, &free_list);
+		list_splice_init(&pipe->pipe, &free_list);
+		pipe->pipelen = 0;
+		pipe->ops = NULL;
+		spin_unlock(&pipe->lock);
+		rpc_purge_list(pipe, &free_list, ops->destroy_msg, -EPIPE);
+		pipe->nwriters = 0;
 		if (need_release && ops->release_pipe)
 			ops->release_pipe(inode);
-		cancel_delayed_work_sync(&rpci->queue_timeout);
+		cancel_delayed_work_sync(&pipe->queue_timeout);
 	}
 	rpc_inode_setowner(inode, NULL);
 	mutex_unlock(&inode->i_mutex);
@@ -207,6 +207,7 @@
 rpc_i_callback(struct rcu_head *head)
 {
 	struct inode *inode = container_of(head, struct inode, i_rcu);
+	kfree(RPC_I(inode)->pipe);
 	kmem_cache_free(rpc_inode_cachep, RPC_I(inode));
 }
 
@@ -224,18 +225,18 @@
 	int res = -ENXIO;
 
 	mutex_lock(&inode->i_mutex);
-	if (rpci->ops == NULL)
+	if (rpci->pipe->ops == NULL)
 		goto out;
-	first_open = rpci->nreaders == 0 && rpci->nwriters == 0;
-	if (first_open && rpci->ops->open_pipe) {
-		res = rpci->ops->open_pipe(inode);
+	first_open = rpci->pipe->nreaders == 0 && rpci->pipe->nwriters == 0;
+	if (first_open && rpci->pipe->ops->open_pipe) {
+		res = rpci->pipe->ops->open_pipe(inode);
 		if (res)
 			goto out;
 	}
 	if (filp->f_mode & FMODE_READ)
-		rpci->nreaders++;
+		rpci->pipe->nreaders++;
 	if (filp->f_mode & FMODE_WRITE)
-		rpci->nwriters++;
+		rpci->pipe->nwriters++;
 	res = 0;
 out:
 	mutex_unlock(&inode->i_mutex);
@@ -245,38 +246,38 @@
 static int
 rpc_pipe_release(struct inode *inode, struct file *filp)
 {
-	struct rpc_inode *rpci = RPC_I(inode);
+	struct rpc_pipe *pipe = RPC_I(inode)->pipe;
 	struct rpc_pipe_msg *msg;
 	int last_close;
 
 	mutex_lock(&inode->i_mutex);
-	if (rpci->ops == NULL)
+	if (pipe->ops == NULL)
 		goto out;
 	msg = filp->private_data;
 	if (msg != NULL) {
-		spin_lock(&rpci->lock);
+		spin_lock(&pipe->lock);
 		msg->errno = -EAGAIN;
 		list_del_init(&msg->list);
-		spin_unlock(&rpci->lock);
-		rpci->ops->destroy_msg(msg);
+		spin_unlock(&pipe->lock);
+		pipe->ops->destroy_msg(msg);
 	}
 	if (filp->f_mode & FMODE_WRITE)
-		rpci->nwriters --;
+		pipe->nwriters --;
 	if (filp->f_mode & FMODE_READ) {
-		rpci->nreaders --;
-		if (rpci->nreaders == 0) {
+		pipe->nreaders --;
+		if (pipe->nreaders == 0) {
 			LIST_HEAD(free_list);
-			spin_lock(&rpci->lock);
-			list_splice_init(&rpci->pipe, &free_list);
-			rpci->pipelen = 0;
-			spin_unlock(&rpci->lock);
-			rpc_purge_list(rpci, &free_list,
-					rpci->ops->destroy_msg, -EAGAIN);
+			spin_lock(&pipe->lock);
+			list_splice_init(&pipe->pipe, &free_list);
+			pipe->pipelen = 0;
+			spin_unlock(&pipe->lock);
+			rpc_purge_list(pipe, &free_list,
+					pipe->ops->destroy_msg, -EAGAIN);
 		}
 	}
-	last_close = rpci->nwriters == 0 && rpci->nreaders == 0;
-	if (last_close && rpci->ops->release_pipe)
-		rpci->ops->release_pipe(inode);
+	last_close = pipe->nwriters == 0 && pipe->nreaders == 0;
+	if (last_close && pipe->ops->release_pipe)
+		pipe->ops->release_pipe(inode);
 out:
 	mutex_unlock(&inode->i_mutex);
 	return 0;
@@ -291,34 +292,34 @@
 	int res = 0;
 
 	mutex_lock(&inode->i_mutex);
-	if (rpci->ops == NULL) {
+	if (rpci->pipe->ops == NULL) {
 		res = -EPIPE;
 		goto out_unlock;
 	}
 	msg = filp->private_data;
 	if (msg == NULL) {
-		spin_lock(&rpci->lock);
-		if (!list_empty(&rpci->pipe)) {
-			msg = list_entry(rpci->pipe.next,
+		spin_lock(&rpci->pipe->lock);
+		if (!list_empty(&rpci->pipe->pipe)) {
+			msg = list_entry(rpci->pipe->pipe.next,
 					struct rpc_pipe_msg,
 					list);
-			list_move(&msg->list, &rpci->in_upcall);
-			rpci->pipelen -= msg->len;
+			list_move(&msg->list, &rpci->pipe->in_upcall);
+			rpci->pipe->pipelen -= msg->len;
 			filp->private_data = msg;
 			msg->copied = 0;
 		}
-		spin_unlock(&rpci->lock);
+		spin_unlock(&rpci->pipe->lock);
 		if (msg == NULL)
 			goto out_unlock;
 	}
 	/* NOTE: it is up to the callback to update msg->copied */
-	res = rpci->ops->upcall(filp, msg, buf, len);
+	res = rpci->pipe->ops->upcall(filp, msg, buf, len);
 	if (res < 0 || msg->len == msg->copied) {
 		filp->private_data = NULL;
-		spin_lock(&rpci->lock);
+		spin_lock(&rpci->pipe->lock);
 		list_del_init(&msg->list);
-		spin_unlock(&rpci->lock);
-		rpci->ops->destroy_msg(msg);
+		spin_unlock(&rpci->pipe->lock);
+		rpci->pipe->ops->destroy_msg(msg);
 	}
 out_unlock:
 	mutex_unlock(&inode->i_mutex);
@@ -334,8 +335,8 @@
 
 	mutex_lock(&inode->i_mutex);
 	res = -EPIPE;
-	if (rpci->ops != NULL)
-		res = rpci->ops->downcall(filp, buf, len);
+	if (rpci->pipe->ops != NULL)
+		res = rpci->pipe->ops->downcall(filp, buf, len);
 	mutex_unlock(&inode->i_mutex);
 	return res;
 }
@@ -347,12 +348,12 @@
 	unsigned int mask = 0;
 
 	rpci = RPC_I(filp->f_path.dentry->d_inode);
-	poll_wait(filp, &rpci->waitq, wait);
+	poll_wait(filp, &rpci->pipe->waitq, wait);
 
 	mask = POLLOUT | POLLWRNORM;
-	if (rpci->ops == NULL)
+	if (rpci->pipe->ops == NULL)
 		mask |= POLLERR | POLLHUP;
-	if (filp->private_data || !list_empty(&rpci->pipe))
+	if (filp->private_data || !list_empty(&rpci->pipe->pipe))
 		mask |= POLLIN | POLLRDNORM;
 	return mask;
 }
@@ -366,18 +367,18 @@
 
 	switch (cmd) {
 	case FIONREAD:
-		spin_lock(&rpci->lock);
-		if (rpci->ops == NULL) {
-			spin_unlock(&rpci->lock);
+		spin_lock(&rpci->pipe->lock);
+		if (rpci->pipe->ops == NULL) {
+			spin_unlock(&rpci->pipe->lock);
 			return -EPIPE;
 		}
-		len = rpci->pipelen;
+		len = rpci->pipe->pipelen;
 		if (filp->private_data) {
 			struct rpc_pipe_msg *msg;
 			msg = filp->private_data;
 			len += msg->len - msg->copied;
 		}
-		spin_unlock(&rpci->lock);
+		spin_unlock(&rpci->pipe->lock);
 		return put_user(len, (int __user *)arg);
 	default:
 		return -EINVAL;
@@ -562,6 +563,23 @@
 	return 0;
 }
 
+static void
+init_pipe(struct rpc_pipe *pipe)
+{
+	pipe->nreaders = 0;
+	pipe->nwriters = 0;
+	INIT_LIST_HEAD(&pipe->in_upcall);
+	INIT_LIST_HEAD(&pipe->in_downcall);
+	INIT_LIST_HEAD(&pipe->pipe);
+	pipe->pipelen = 0;
+	init_waitqueue_head(&pipe->waitq);
+	INIT_DELAYED_WORK(&pipe->queue_timeout,
+			    rpc_timeout_upcall_queue);
+	pipe->ops = NULL;
+	spin_lock_init(&pipe->lock);
+
+}
+
 static int __rpc_mkpipe(struct inode *dir, struct dentry *dentry,
 			umode_t mode,
 			const struct file_operations *i_fop,
@@ -569,16 +587,24 @@
 			const struct rpc_pipe_ops *ops,
 			int flags)
 {
+	struct rpc_pipe *pipe;
 	struct rpc_inode *rpci;
 	int err;
 
+	pipe = kzalloc(sizeof(struct rpc_pipe), GFP_KERNEL);
+	if (!pipe)
+		return -ENOMEM;
+	init_pipe(pipe);
 	err = __rpc_create_common(dir, dentry, S_IFIFO | mode, i_fop, private);
-	if (err)
+	if (err) {
+		kfree(pipe);
 		return err;
+	}
 	rpci = RPC_I(dentry->d_inode);
 	rpci->private = private;
-	rpci->flags = flags;
-	rpci->ops = ops;
+	rpci->pipe = pipe;
+	rpci->pipe->flags = flags;
+	rpci->pipe->ops = ops;
 	fsnotify_create(dir, dentry);
 	return 0;
 }
@@ -1142,17 +1168,7 @@
 
 	inode_init_once(&rpci->vfs_inode);
 	rpci->private = NULL;
-	rpci->nreaders = 0;
-	rpci->nwriters = 0;
-	INIT_LIST_HEAD(&rpci->in_upcall);
-	INIT_LIST_HEAD(&rpci->in_downcall);
-	INIT_LIST_HEAD(&rpci->pipe);
-	rpci->pipelen = 0;
-	init_waitqueue_head(&rpci->waitq);
-	INIT_DELAYED_WORK(&rpci->queue_timeout,
-			    rpc_timeout_upcall_queue);
-	rpci->ops = NULL;
-	spin_lock_init(&rpci->lock);
+	rpci->pipe = NULL;
 }
 
 int register_rpc_pipefs(void)