Merge tag 'notifications-pipe-prep-20191115' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

Pull pipe rework from David Howells:
 "This is my set of preparatory patches for building a general
  notification queue on top of pipes. It makes a number of significant
  changes:

   - It removes the nr_exclusive argument from __wake_up_sync_key() as
     this is always 1. This prepares for the next step:

   - Adds wake_up_interruptible_sync_poll_locked() so that poll can be
     woken up from a function that's holding the poll waitqueue
     spinlock.

   - Change the pipe buffer ring to be managed in terms of unbounded
     head and tail indices rather than bounded index and length. This
     means that reading the pipe only needs to modify one index, not
     two.

   - A selection of helper functions are provided to query the state of
     the pipe buffer, plus a couple to apply updates to the pipe
     indices.

   - The pipe ring is allowed to have kernel-reserved slots. This allows
     many notification messages to be spliced in by the kernel without
     allowing userspace to pin too many pages if it writes to the same
     pipe.

   - Advance the head and tail indices inside the pipe waitqueue lock
     and use wake_up_interruptible_sync_poll_locked() to poke poll
     without having to take the lock twice.

   - Rearrange pipe_write() to preallocate the buffer it is going to
     write into and then drop the spinlock. This allows kernel
     notifications to then be added the ring whilst it is filling the
     buffer it allocated. The read side is stalled because the pipe
     mutex is still held.

   - Don't wake up readers on a pipe if there was already data in it
     when we added more.

   - Don't wake up writers on a pipe if the ring wasn't full before we
     removed a buffer"

* tag 'notifications-pipe-prep-20191115' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs:
  pipe: Remove sync on wake_ups
  pipe: Increase the writer-wakeup threshold to reduce context-switch count
  pipe: Check for ring full inside of the spinlock in pipe_write()
  pipe: Remove redundant wakeup from pipe_write()
  pipe: Rearrange sequence in pipe_write() to preallocate slot
  pipe: Conditionalise wakeup in pipe_read()
  pipe: Advance tail pointer inside of wait spinlock in pipe_read()
  pipe: Allow pipes to have kernel-reserved slots
  pipe: Use head and tail pointers for the ring, not cursor and length
  Add wake_up_interruptible_sync_poll_locked()
  Remove the nr_exclusive argument from __wake_up_sync_key()
  pipe: Reduce #inclusion of pipe_fs_i.h
diff --git a/fs/splice.c b/fs/splice.c
index e509239..f2400ce 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -185,6 +185,9 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
 		       struct splice_pipe_desc *spd)
 {
 	unsigned int spd_pages = spd->nr_pages;
+	unsigned int tail = pipe->tail;
+	unsigned int head = pipe->head;
+	unsigned int mask = pipe->ring_size - 1;
 	int ret = 0, page_nr = 0;
 
 	if (!spd_pages)
@@ -196,9 +199,8 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
 		goto out;
 	}
 
-	while (pipe->nrbufs < pipe->buffers) {
-		int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
-		struct pipe_buffer *buf = pipe->bufs + newbuf;
+	while (!pipe_full(head, tail, pipe->max_usage)) {
+		struct pipe_buffer *buf = &pipe->bufs[head & mask];
 
 		buf->page = spd->pages[page_nr];
 		buf->offset = spd->partial[page_nr].offset;
@@ -207,7 +209,8 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
 		buf->ops = spd->ops;
 		buf->flags = 0;
 
-		pipe->nrbufs++;
+		head++;
+		pipe->head = head;
 		page_nr++;
 		ret += buf->len;
 
@@ -228,17 +231,19 @@ EXPORT_SYMBOL_GPL(splice_to_pipe);
 
 ssize_t add_to_pipe(struct pipe_inode_info *pipe, struct pipe_buffer *buf)
 {
+	unsigned int head = pipe->head;
+	unsigned int tail = pipe->tail;
+	unsigned int mask = pipe->ring_size - 1;
 	int ret;
 
 	if (unlikely(!pipe->readers)) {
 		send_sig(SIGPIPE, current, 0);
 		ret = -EPIPE;
-	} else if (pipe->nrbufs == pipe->buffers) {
+	} else if (pipe_full(head, tail, pipe->max_usage)) {
 		ret = -EAGAIN;
 	} else {
-		int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
-		pipe->bufs[newbuf] = *buf;
-		pipe->nrbufs++;
+		pipe->bufs[head & mask] = *buf;
+		pipe->head = head + 1;
 		return buf->len;
 	}
 	pipe_buf_release(pipe, buf);
@@ -252,14 +257,14 @@ EXPORT_SYMBOL(add_to_pipe);
  */
 int splice_grow_spd(const struct pipe_inode_info *pipe, struct splice_pipe_desc *spd)
 {
-	unsigned int buffers = READ_ONCE(pipe->buffers);
+	unsigned int max_usage = READ_ONCE(pipe->max_usage);
 
-	spd->nr_pages_max = buffers;
-	if (buffers <= PIPE_DEF_BUFFERS)
+	spd->nr_pages_max = max_usage;
+	if (max_usage <= PIPE_DEF_BUFFERS)
 		return 0;
 
-	spd->pages = kmalloc_array(buffers, sizeof(struct page *), GFP_KERNEL);
-	spd->partial = kmalloc_array(buffers, sizeof(struct partial_page),
+	spd->pages = kmalloc_array(max_usage, sizeof(struct page *), GFP_KERNEL);
+	spd->partial = kmalloc_array(max_usage, sizeof(struct partial_page),
 				     GFP_KERNEL);
 
 	if (spd->pages && spd->partial)
@@ -298,10 +303,11 @@ ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
 {
 	struct iov_iter to;
 	struct kiocb kiocb;
-	int idx, ret;
+	unsigned int i_head;
+	int ret;
 
 	iov_iter_pipe(&to, READ, pipe, len);
-	idx = to.idx;
+	i_head = to.head;
 	init_sync_kiocb(&kiocb, in);
 	kiocb.ki_pos = *ppos;
 	ret = call_read_iter(in, &kiocb, &to);
@@ -309,7 +315,7 @@ ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
 		*ppos = kiocb.ki_pos;
 		file_accessed(in);
 	} else if (ret < 0) {
-		to.idx = idx;
+		to.head = i_head;
 		to.iov_offset = 0;
 		iov_iter_advance(&to, 0); /* to free what was emitted */
 		/*
@@ -370,11 +376,12 @@ static ssize_t default_file_splice_read(struct file *in, loff_t *ppos,
 	struct iov_iter to;
 	struct page **pages;
 	unsigned int nr_pages;
+	unsigned int mask;
 	size_t offset, base, copied = 0;
 	ssize_t res;
 	int i;
 
-	if (pipe->nrbufs == pipe->buffers)
+	if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
 		return -EAGAIN;
 
 	/*
@@ -400,8 +407,9 @@ static ssize_t default_file_splice_read(struct file *in, loff_t *ppos,
 		}
 	}
 
-	pipe->bufs[to.idx].offset = offset;
-	pipe->bufs[to.idx].len -= offset;
+	mask = pipe->ring_size - 1;
+	pipe->bufs[to.head & mask].offset = offset;
+	pipe->bufs[to.head & mask].len -= offset;
 
 	for (i = 0; i < nr_pages; i++) {
 		size_t this_len = min_t(size_t, len, PAGE_SIZE - offset);
@@ -443,7 +451,8 @@ static int pipe_to_sendpage(struct pipe_inode_info *pipe,
 
 	more = (sd->flags & SPLICE_F_MORE) ? MSG_MORE : 0;
 
-	if (sd->len < sd->total_len && pipe->nrbufs > 1)
+	if (sd->len < sd->total_len &&
+	    pipe_occupancy(pipe->head, pipe->tail) > 1)
 		more |= MSG_SENDPAGE_NOTLAST;
 
 	return file->f_op->sendpage(file, buf->page, buf->offset,
@@ -481,10 +490,13 @@ static void wakeup_pipe_writers(struct pipe_inode_info *pipe)
 static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_desc *sd,
 			  splice_actor *actor)
 {
+	unsigned int head = pipe->head;
+	unsigned int tail = pipe->tail;
+	unsigned int mask = pipe->ring_size - 1;
 	int ret;
 
-	while (pipe->nrbufs) {
-		struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
+	while (!pipe_empty(tail, head)) {
+		struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 
 		sd->len = buf->len;
 		if (sd->len > sd->total_len)
@@ -511,8 +523,8 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des
 
 		if (!buf->len) {
 			pipe_buf_release(pipe, buf);
-			pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
-			pipe->nrbufs--;
+			tail++;
+			pipe->tail = tail;
 			if (pipe->files)
 				sd->need_wakeup = true;
 		}
@@ -543,7 +555,7 @@ static int splice_from_pipe_next(struct pipe_inode_info *pipe, struct splice_des
 	if (signal_pending(current))
 		return -ERESTARTSYS;
 
-	while (!pipe->nrbufs) {
+	while (pipe_empty(pipe->head, pipe->tail)) {
 		if (!pipe->writers)
 			return 0;
 
@@ -686,7 +698,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 		.pos = *ppos,
 		.u.file = out,
 	};
-	int nbufs = pipe->buffers;
+	int nbufs = pipe->max_usage;
 	struct bio_vec *array = kcalloc(nbufs, sizeof(struct bio_vec),
 					GFP_KERNEL);
 	ssize_t ret;
@@ -699,16 +711,19 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 	splice_from_pipe_begin(&sd);
 	while (sd.total_len) {
 		struct iov_iter from;
+		unsigned int head = pipe->head;
+		unsigned int tail = pipe->tail;
+		unsigned int mask = pipe->ring_size - 1;
 		size_t left;
-		int n, idx;
+		int n;
 
 		ret = splice_from_pipe_next(pipe, &sd);
 		if (ret <= 0)
 			break;
 
-		if (unlikely(nbufs < pipe->buffers)) {
+		if (unlikely(nbufs < pipe->max_usage)) {
 			kfree(array);
-			nbufs = pipe->buffers;
+			nbufs = pipe->max_usage;
 			array = kcalloc(nbufs, sizeof(struct bio_vec),
 					GFP_KERNEL);
 			if (!array) {
@@ -719,16 +734,13 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 
 		/* build the vector */
 		left = sd.total_len;
-		for (n = 0, idx = pipe->curbuf; left && n < pipe->nrbufs; n++, idx++) {
-			struct pipe_buffer *buf = pipe->bufs + idx;
+		for (n = 0; !pipe_empty(head, tail) && left && n < nbufs; tail++, n++) {
+			struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 			size_t this_len = buf->len;
 
 			if (this_len > left)
 				this_len = left;
 
-			if (idx == pipe->buffers - 1)
-				idx = -1;
-
 			ret = pipe_buf_confirm(pipe, buf);
 			if (unlikely(ret)) {
 				if (ret == -ENODATA)
@@ -752,14 +764,15 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 		*ppos = sd.pos;
 
 		/* dismiss the fully eaten buffers, adjust the partial one */
+		tail = pipe->tail;
 		while (ret) {
-			struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
+			struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 			if (ret >= buf->len) {
 				ret -= buf->len;
 				buf->len = 0;
 				pipe_buf_release(pipe, buf);
-				pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
-				pipe->nrbufs--;
+				tail++;
+				pipe->tail = tail;
 				if (pipe->files)
 					sd.need_wakeup = true;
 			} else {
@@ -942,16 +955,17 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
 	sd->flags &= ~SPLICE_F_NONBLOCK;
 	more = sd->flags & SPLICE_F_MORE;
 
-	WARN_ON_ONCE(pipe->nrbufs != 0);
+	WARN_ON_ONCE(!pipe_empty(pipe->head, pipe->tail));
 
 	while (len) {
-		unsigned int pipe_pages;
+		unsigned int p_space;
 		size_t read_len;
 		loff_t pos = sd->pos, prev_pos = pos;
 
 		/* Don't try to read more the pipe has space for. */
-		pipe_pages = pipe->buffers - pipe->nrbufs;
-		read_len = min(len, (size_t)pipe_pages << PAGE_SHIFT);
+		p_space = pipe->max_usage -
+			pipe_occupancy(pipe->head, pipe->tail);
+		read_len = min_t(size_t, len, p_space << PAGE_SHIFT);
 		ret = do_splice_to(in, &pos, pipe, read_len, flags);
 		if (unlikely(ret <= 0))
 			goto out_release;
@@ -990,7 +1004,7 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
 	}
 
 done:
-	pipe->nrbufs = pipe->curbuf = 0;
+	pipe->tail = pipe->head = 0;
 	file_accessed(in);
 	return bytes;
 
@@ -999,8 +1013,8 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
 	 * If we did an incomplete transfer we must release
 	 * the pipe buffers in question:
 	 */
-	for (i = 0; i < pipe->buffers; i++) {
-		struct pipe_buffer *buf = pipe->bufs + i;
+	for (i = 0; i < pipe->ring_size; i++) {
+		struct pipe_buffer *buf = &pipe->bufs[i];
 
 		if (buf->ops)
 			pipe_buf_release(pipe, buf);
@@ -1076,7 +1090,7 @@ static int wait_for_space(struct pipe_inode_info *pipe, unsigned flags)
 			send_sig(SIGPIPE, current, 0);
 			return -EPIPE;
 		}
-		if (pipe->nrbufs != pipe->buffers)
+		if (!pipe_full(pipe->head, pipe->tail, pipe->max_usage))
 			return 0;
 		if (flags & SPLICE_F_NONBLOCK)
 			return -EAGAIN;
@@ -1182,11 +1196,11 @@ static long do_splice(struct file *in, loff_t __user *off_in,
 		pipe_lock(opipe);
 		ret = wait_for_space(opipe, flags);
 		if (!ret) {
-			unsigned int pipe_pages;
+			unsigned int p_space;
 
 			/* Don't try to read more the pipe has space for. */
-			pipe_pages = opipe->buffers - opipe->nrbufs;
-			len = min(len, (size_t)pipe_pages << PAGE_SHIFT);
+			p_space = opipe->max_usage - pipe_occupancy(opipe->head, opipe->tail);
+			len = min_t(size_t, len, p_space << PAGE_SHIFT);
 
 			ret = do_splice_to(in, &offset, opipe, len, flags);
 		}
@@ -1450,16 +1464,16 @@ static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 	int ret;
 
 	/*
-	 * Check ->nrbufs without the inode lock first. This function
+	 * Check the pipe occupancy without the inode lock first. This function
 	 * is speculative anyways, so missing one is ok.
 	 */
-	if (pipe->nrbufs)
+	if (!pipe_empty(pipe->head, pipe->tail))
 		return 0;
 
 	ret = 0;
 	pipe_lock(pipe);
 
-	while (!pipe->nrbufs) {
+	while (pipe_empty(pipe->head, pipe->tail)) {
 		if (signal_pending(current)) {
 			ret = -ERESTARTSYS;
 			break;
@@ -1488,16 +1502,16 @@ static int opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 	int ret;
 
 	/*
-	 * Check ->nrbufs without the inode lock first. This function
+	 * Check pipe occupancy without the inode lock first. This function
 	 * is speculative anyways, so missing one is ok.
 	 */
-	if (pipe->nrbufs < pipe->buffers)
+	if (pipe_full(pipe->head, pipe->tail, pipe->max_usage))
 		return 0;
 
 	ret = 0;
 	pipe_lock(pipe);
 
-	while (pipe->nrbufs >= pipe->buffers) {
+	while (pipe_full(pipe->head, pipe->tail, pipe->max_usage)) {
 		if (!pipe->readers) {
 			send_sig(SIGPIPE, current, 0);
 			ret = -EPIPE;
@@ -1528,7 +1542,10 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			       size_t len, unsigned int flags)
 {
 	struct pipe_buffer *ibuf, *obuf;
-	int ret = 0, nbuf;
+	unsigned int i_head, o_head;
+	unsigned int i_tail, o_tail;
+	unsigned int i_mask, o_mask;
+	int ret = 0;
 	bool input_wakeup = false;
 
 
@@ -1548,7 +1565,14 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 	 */
 	pipe_double_lock(ipipe, opipe);
 
+	i_tail = ipipe->tail;
+	i_mask = ipipe->ring_size - 1;
+	o_head = opipe->head;
+	o_mask = opipe->ring_size - 1;
+
 	do {
+		size_t o_len;
+
 		if (!opipe->readers) {
 			send_sig(SIGPIPE, current, 0);
 			if (!ret)
@@ -1556,14 +1580,18 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			break;
 		}
 
-		if (!ipipe->nrbufs && !ipipe->writers)
+		i_head = ipipe->head;
+		o_tail = opipe->tail;
+
+		if (pipe_empty(i_head, i_tail) && !ipipe->writers)
 			break;
 
 		/*
 		 * Cannot make any progress, because either the input
 		 * pipe is empty or the output pipe is full.
 		 */
-		if (!ipipe->nrbufs || opipe->nrbufs >= opipe->buffers) {
+		if (pipe_empty(i_head, i_tail) ||
+		    pipe_full(o_head, o_tail, opipe->max_usage)) {
 			/* Already processed some buffers, break */
 			if (ret)
 				break;
@@ -1583,9 +1611,8 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			goto retry;
 		}
 
-		ibuf = ipipe->bufs + ipipe->curbuf;
-		nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1);
-		obuf = opipe->bufs + nbuf;
+		ibuf = &ipipe->bufs[i_tail & i_mask];
+		obuf = &opipe->bufs[o_head & o_mask];
 
 		if (len >= ibuf->len) {
 			/*
@@ -1593,10 +1620,12 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			 */
 			*obuf = *ibuf;
 			ibuf->ops = NULL;
-			opipe->nrbufs++;
-			ipipe->curbuf = (ipipe->curbuf + 1) & (ipipe->buffers - 1);
-			ipipe->nrbufs--;
+			i_tail++;
+			ipipe->tail = i_tail;
 			input_wakeup = true;
+			o_len = obuf->len;
+			o_head++;
+			opipe->head = o_head;
 		} else {
 			/*
 			 * Get a reference to this pipe buffer,
@@ -1618,12 +1647,14 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			pipe_buf_mark_unmergeable(obuf);
 
 			obuf->len = len;
-			opipe->nrbufs++;
-			ibuf->offset += obuf->len;
-			ibuf->len -= obuf->len;
+			ibuf->offset += len;
+			ibuf->len -= len;
+			o_len = len;
+			o_head++;
+			opipe->head = o_head;
 		}
-		ret += obuf->len;
-		len -= obuf->len;
+		ret += o_len;
+		len -= o_len;
 	} while (len);
 
 	pipe_unlock(ipipe);
@@ -1649,7 +1680,10 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 		     size_t len, unsigned int flags)
 {
 	struct pipe_buffer *ibuf, *obuf;
-	int ret = 0, i = 0, nbuf;
+	unsigned int i_head, o_head;
+	unsigned int i_tail, o_tail;
+	unsigned int i_mask, o_mask;
+	int ret = 0;
 
 	/*
 	 * Potential ABBA deadlock, work around it by ordering lock
@@ -1658,6 +1692,11 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 	 */
 	pipe_double_lock(ipipe, opipe);
 
+	i_tail = ipipe->tail;
+	i_mask = ipipe->ring_size - 1;
+	o_head = opipe->head;
+	o_mask = opipe->ring_size - 1;
+
 	do {
 		if (!opipe->readers) {
 			send_sig(SIGPIPE, current, 0);
@@ -1666,15 +1705,19 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 			break;
 		}
 
+		i_head = ipipe->head;
+		o_tail = opipe->tail;
+
 		/*
-		 * If we have iterated all input buffers or ran out of
+		 * If we have iterated all input buffers or run out of
 		 * output room, break.
 		 */
-		if (i >= ipipe->nrbufs || opipe->nrbufs >= opipe->buffers)
+		if (pipe_empty(i_head, i_tail) ||
+		    pipe_full(o_head, o_tail, opipe->max_usage))
 			break;
 
-		ibuf = ipipe->bufs + ((ipipe->curbuf + i) & (ipipe->buffers-1));
-		nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1);
+		ibuf = &ipipe->bufs[i_tail & i_mask];
+		obuf = &opipe->bufs[o_head & o_mask];
 
 		/*
 		 * Get a reference to this pipe buffer,
@@ -1686,7 +1729,6 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 			break;
 		}
 
-		obuf = opipe->bufs + nbuf;
 		*obuf = *ibuf;
 
 		/*
@@ -1699,11 +1741,12 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 
 		if (obuf->len > len)
 			obuf->len = len;
-
-		opipe->nrbufs++;
 		ret += obuf->len;
 		len -= obuf->len;
-		i++;
+
+		o_head++;
+		opipe->head = o_head;
+		i_tail++;
 	} while (len);
 
 	/*