pipe: Use head and tail pointers for the ring, not cursor and length

Convert pipes to use head and tail pointers for the buffer ring rather than
pointer and length as the latter requires two atomic ops to update (or a
combined op) whereas the former only requires one.

 (1) The head pointer is the point at which production occurs and points to
     the slot in which the next buffer will be placed.  This is equivalent
     to pipe->curbuf + pipe->nrbufs.

     The head pointer belongs to the write-side.

 (2) The tail pointer is the point at which consumption occurs.  It points
     to the next slot to be consumed.  This is equivalent to pipe->curbuf.

     The tail pointer belongs to the read-side.

 (3) head and tail are allowed to run to UINT_MAX and wrap naturally.  They
     are only masked off when the array is being accessed, e.g.:

	pipe->bufs[head & mask]

     This means that it is not necessary to have a dead slot in the ring as
     head == tail isn't ambiguous.

 (4) The ring is empty if "head == tail".

     A helper, pipe_empty(), is provided for this.

 (5) The occupancy of the ring is "head - tail".

     A helper, pipe_occupancy(), is provided for this.

 (6) The number of free slots in the ring is "pipe->ring_size - occupancy".

     A helper, pipe_space_for_user() is provided to indicate how many slots
     userspace may use.

 (7) The ring is full if "head - tail >= pipe->ring_size".

     A helper, pipe_full(), is provided for this.

Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/fs/splice.c b/fs/splice.c
index 9841272..22b0a47 100644
--- a/fs/splice.c
+++ b/fs/splice.c
@@ -185,6 +185,9 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
 		       struct splice_pipe_desc *spd)
 {
 	unsigned int spd_pages = spd->nr_pages;
+	unsigned int tail = pipe->tail;
+	unsigned int head = pipe->head;
+	unsigned int mask = pipe->ring_size - 1;
 	int ret = 0, page_nr = 0;
 
 	if (!spd_pages)
@@ -196,9 +199,8 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
 		goto out;
 	}
 
-	while (pipe->nrbufs < pipe->buffers) {
-		int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
-		struct pipe_buffer *buf = pipe->bufs + newbuf;
+	while (!pipe_full(head, tail, pipe->ring_size)) {
+		struct pipe_buffer *buf = &pipe->bufs[head & mask];
 
 		buf->page = spd->pages[page_nr];
 		buf->offset = spd->partial[page_nr].offset;
@@ -207,7 +209,8 @@ ssize_t splice_to_pipe(struct pipe_inode_info *pipe,
 		buf->ops = spd->ops;
 		buf->flags = 0;
 
-		pipe->nrbufs++;
+		head++;
+		pipe->head = head;
 		page_nr++;
 		ret += buf->len;
 
@@ -228,17 +231,19 @@ EXPORT_SYMBOL_GPL(splice_to_pipe);
 
 ssize_t add_to_pipe(struct pipe_inode_info *pipe, struct pipe_buffer *buf)
 {
+	unsigned int head = pipe->head;
+	unsigned int tail = pipe->tail;
+	unsigned int mask = pipe->ring_size - 1;
 	int ret;
 
 	if (unlikely(!pipe->readers)) {
 		send_sig(SIGPIPE, current, 0);
 		ret = -EPIPE;
-	} else if (pipe->nrbufs == pipe->buffers) {
+	} else if (pipe_full(head, tail, pipe->ring_size)) {
 		ret = -EAGAIN;
 	} else {
-		int newbuf = (pipe->curbuf + pipe->nrbufs) & (pipe->buffers - 1);
-		pipe->bufs[newbuf] = *buf;
-		pipe->nrbufs++;
+		pipe->bufs[head & mask] = *buf;
+		pipe->head = head + 1;
 		return buf->len;
 	}
 	pipe_buf_release(pipe, buf);
@@ -252,14 +257,14 @@ EXPORT_SYMBOL(add_to_pipe);
  */
 int splice_grow_spd(const struct pipe_inode_info *pipe, struct splice_pipe_desc *spd)
 {
-	unsigned int buffers = READ_ONCE(pipe->buffers);
+	unsigned int max_usage = READ_ONCE(pipe->ring_size);
 
-	spd->nr_pages_max = buffers;
-	if (buffers <= PIPE_DEF_BUFFERS)
+	spd->nr_pages_max = max_usage;
+	if (max_usage <= PIPE_DEF_BUFFERS)
 		return 0;
 
-	spd->pages = kmalloc_array(buffers, sizeof(struct page *), GFP_KERNEL);
-	spd->partial = kmalloc_array(buffers, sizeof(struct partial_page),
+	spd->pages = kmalloc_array(max_usage, sizeof(struct page *), GFP_KERNEL);
+	spd->partial = kmalloc_array(max_usage, sizeof(struct partial_page),
 				     GFP_KERNEL);
 
 	if (spd->pages && spd->partial)
@@ -298,10 +303,11 @@ ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
 {
 	struct iov_iter to;
 	struct kiocb kiocb;
-	int idx, ret;
+	unsigned int i_head;
+	int ret;
 
 	iov_iter_pipe(&to, READ, pipe, len);
-	idx = to.idx;
+	i_head = to.head;
 	init_sync_kiocb(&kiocb, in);
 	kiocb.ki_pos = *ppos;
 	ret = call_read_iter(in, &kiocb, &to);
@@ -309,7 +315,7 @@ ssize_t generic_file_splice_read(struct file *in, loff_t *ppos,
 		*ppos = kiocb.ki_pos;
 		file_accessed(in);
 	} else if (ret < 0) {
-		to.idx = idx;
+		to.head = i_head;
 		to.iov_offset = 0;
 		iov_iter_advance(&to, 0); /* to free what was emitted */
 		/*
@@ -370,11 +376,12 @@ static ssize_t default_file_splice_read(struct file *in, loff_t *ppos,
 	struct iov_iter to;
 	struct page **pages;
 	unsigned int nr_pages;
+	unsigned int mask;
 	size_t offset, base, copied = 0;
 	ssize_t res;
 	int i;
 
-	if (pipe->nrbufs == pipe->buffers)
+	if (pipe_full(pipe->head, pipe->tail, pipe->ring_size))
 		return -EAGAIN;
 
 	/*
@@ -400,8 +407,9 @@ static ssize_t default_file_splice_read(struct file *in, loff_t *ppos,
 		}
 	}
 
-	pipe->bufs[to.idx].offset = offset;
-	pipe->bufs[to.idx].len -= offset;
+	mask = pipe->ring_size - 1;
+	pipe->bufs[to.head & mask].offset = offset;
+	pipe->bufs[to.head & mask].len -= offset;
 
 	for (i = 0; i < nr_pages; i++) {
 		size_t this_len = min_t(size_t, len, PAGE_SIZE - offset);
@@ -443,7 +451,8 @@ static int pipe_to_sendpage(struct pipe_inode_info *pipe,
 
 	more = (sd->flags & SPLICE_F_MORE) ? MSG_MORE : 0;
 
-	if (sd->len < sd->total_len && pipe->nrbufs > 1)
+	if (sd->len < sd->total_len &&
+	    pipe_occupancy(pipe->head, pipe->tail) > 1)
 		more |= MSG_SENDPAGE_NOTLAST;
 
 	return file->f_op->sendpage(file, buf->page, buf->offset,
@@ -481,10 +490,13 @@ static void wakeup_pipe_writers(struct pipe_inode_info *pipe)
 static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_desc *sd,
 			  splice_actor *actor)
 {
+	unsigned int head = pipe->head;
+	unsigned int tail = pipe->tail;
+	unsigned int mask = pipe->ring_size - 1;
 	int ret;
 
-	while (pipe->nrbufs) {
-		struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
+	while (!pipe_empty(tail, head)) {
+		struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 
 		sd->len = buf->len;
 		if (sd->len > sd->total_len)
@@ -511,8 +523,8 @@ static int splice_from_pipe_feed(struct pipe_inode_info *pipe, struct splice_des
 
 		if (!buf->len) {
 			pipe_buf_release(pipe, buf);
-			pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
-			pipe->nrbufs--;
+			tail++;
+			pipe->tail = tail;
 			if (pipe->files)
 				sd->need_wakeup = true;
 		}
@@ -543,7 +555,7 @@ static int splice_from_pipe_next(struct pipe_inode_info *pipe, struct splice_des
 	if (signal_pending(current))
 		return -ERESTARTSYS;
 
-	while (!pipe->nrbufs) {
+	while (pipe_empty(pipe->head, pipe->tail)) {
 		if (!pipe->writers)
 			return 0;
 
@@ -686,7 +698,7 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 		.pos = *ppos,
 		.u.file = out,
 	};
-	int nbufs = pipe->buffers;
+	int nbufs = pipe->ring_size;
 	struct bio_vec *array = kcalloc(nbufs, sizeof(struct bio_vec),
 					GFP_KERNEL);
 	ssize_t ret;
@@ -699,16 +711,19 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 	splice_from_pipe_begin(&sd);
 	while (sd.total_len) {
 		struct iov_iter from;
+		unsigned int head = pipe->head;
+		unsigned int tail = pipe->tail;
+		unsigned int mask = pipe->ring_size - 1;
 		size_t left;
-		int n, idx;
+		int n;
 
 		ret = splice_from_pipe_next(pipe, &sd);
 		if (ret <= 0)
 			break;
 
-		if (unlikely(nbufs < pipe->buffers)) {
+		if (unlikely(nbufs < pipe->ring_size)) {
 			kfree(array);
-			nbufs = pipe->buffers;
+			nbufs = pipe->ring_size;
 			array = kcalloc(nbufs, sizeof(struct bio_vec),
 					GFP_KERNEL);
 			if (!array) {
@@ -719,16 +734,13 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 
 		/* build the vector */
 		left = sd.total_len;
-		for (n = 0, idx = pipe->curbuf; left && n < pipe->nrbufs; n++, idx++) {
-			struct pipe_buffer *buf = pipe->bufs + idx;
+		for (n = 0; !pipe_empty(head, tail) && left && n < nbufs; tail++, n++) {
+			struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 			size_t this_len = buf->len;
 
 			if (this_len > left)
 				this_len = left;
 
-			if (idx == pipe->buffers - 1)
-				idx = -1;
-
 			ret = pipe_buf_confirm(pipe, buf);
 			if (unlikely(ret)) {
 				if (ret == -ENODATA)
@@ -752,14 +764,15 @@ iter_file_splice_write(struct pipe_inode_info *pipe, struct file *out,
 		*ppos = sd.pos;
 
 		/* dismiss the fully eaten buffers, adjust the partial one */
+		tail = pipe->tail;
 		while (ret) {
-			struct pipe_buffer *buf = pipe->bufs + pipe->curbuf;
+			struct pipe_buffer *buf = &pipe->bufs[tail & mask];
 			if (ret >= buf->len) {
 				ret -= buf->len;
 				buf->len = 0;
 				pipe_buf_release(pipe, buf);
-				pipe->curbuf = (pipe->curbuf + 1) & (pipe->buffers - 1);
-				pipe->nrbufs--;
+				tail++;
+				pipe->tail = tail;
 				if (pipe->files)
 					sd.need_wakeup = true;
 			} else {
@@ -942,15 +955,17 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
 	sd->flags &= ~SPLICE_F_NONBLOCK;
 	more = sd->flags & SPLICE_F_MORE;
 
-	WARN_ON_ONCE(pipe->nrbufs != 0);
+	WARN_ON_ONCE(!pipe_empty(pipe->head, pipe->tail));
 
 	while (len) {
+		unsigned int p_space;
 		size_t read_len;
 		loff_t pos = sd->pos, prev_pos = pos;
 
 		/* Don't try to read more the pipe has space for. */
-		read_len = min_t(size_t, len,
-				 (pipe->buffers - pipe->nrbufs) << PAGE_SHIFT);
+		p_space = pipe->ring_size -
+			pipe_occupancy(pipe->head, pipe->tail);
+		read_len = min_t(size_t, len, p_space << PAGE_SHIFT);
 		ret = do_splice_to(in, &pos, pipe, read_len, flags);
 		if (unlikely(ret <= 0))
 			goto out_release;
@@ -989,7 +1004,7 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
 	}
 
 done:
-	pipe->nrbufs = pipe->curbuf = 0;
+	pipe->tail = pipe->head = 0;
 	file_accessed(in);
 	return bytes;
 
@@ -998,8 +1013,8 @@ ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
 	 * If we did an incomplete transfer we must release
 	 * the pipe buffers in question:
 	 */
-	for (i = 0; i < pipe->buffers; i++) {
-		struct pipe_buffer *buf = pipe->bufs + i;
+	for (i = 0; i < pipe->ring_size; i++) {
+		struct pipe_buffer *buf = &pipe->bufs[i];
 
 		if (buf->ops)
 			pipe_buf_release(pipe, buf);
@@ -1075,7 +1090,7 @@ static int wait_for_space(struct pipe_inode_info *pipe, unsigned flags)
 			send_sig(SIGPIPE, current, 0);
 			return -EPIPE;
 		}
-		if (pipe->nrbufs != pipe->buffers)
+		if (!pipe_full(pipe->head, pipe->tail, pipe->ring_size))
 			return 0;
 		if (flags & SPLICE_F_NONBLOCK)
 			return -EAGAIN;
@@ -1442,16 +1457,16 @@ static int ipipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 	int ret;
 
 	/*
-	 * Check ->nrbufs without the inode lock first. This function
+	 * Check the pipe occupancy without the inode lock first. This function
 	 * is speculative anyways, so missing one is ok.
 	 */
-	if (pipe->nrbufs)
+	if (!pipe_empty(pipe->head, pipe->tail))
 		return 0;
 
 	ret = 0;
 	pipe_lock(pipe);
 
-	while (!pipe->nrbufs) {
+	while (pipe_empty(pipe->head, pipe->tail)) {
 		if (signal_pending(current)) {
 			ret = -ERESTARTSYS;
 			break;
@@ -1480,16 +1495,16 @@ static int opipe_prep(struct pipe_inode_info *pipe, unsigned int flags)
 	int ret;
 
 	/*
-	 * Check ->nrbufs without the inode lock first. This function
+	 * Check pipe occupancy without the inode lock first. This function
 	 * is speculative anyways, so missing one is ok.
 	 */
-	if (pipe->nrbufs < pipe->buffers)
+	if (pipe_full(pipe->head, pipe->tail, pipe->ring_size))
 		return 0;
 
 	ret = 0;
 	pipe_lock(pipe);
 
-	while (pipe->nrbufs >= pipe->buffers) {
+	while (pipe_full(pipe->head, pipe->tail, pipe->ring_size)) {
 		if (!pipe->readers) {
 			send_sig(SIGPIPE, current, 0);
 			ret = -EPIPE;
@@ -1520,7 +1535,10 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			       size_t len, unsigned int flags)
 {
 	struct pipe_buffer *ibuf, *obuf;
-	int ret = 0, nbuf;
+	unsigned int i_head, o_head;
+	unsigned int i_tail, o_tail;
+	unsigned int i_mask, o_mask;
+	int ret = 0;
 	bool input_wakeup = false;
 
 
@@ -1540,7 +1558,14 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 	 */
 	pipe_double_lock(ipipe, opipe);
 
+	i_tail = ipipe->tail;
+	i_mask = ipipe->ring_size - 1;
+	o_head = opipe->head;
+	o_mask = opipe->ring_size - 1;
+
 	do {
+		size_t o_len;
+
 		if (!opipe->readers) {
 			send_sig(SIGPIPE, current, 0);
 			if (!ret)
@@ -1548,14 +1573,18 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			break;
 		}
 
-		if (!ipipe->nrbufs && !ipipe->writers)
+		i_head = ipipe->head;
+		o_tail = opipe->tail;
+
+		if (pipe_empty(i_head, i_tail) && !ipipe->writers)
 			break;
 
 		/*
 		 * Cannot make any progress, because either the input
 		 * pipe is empty or the output pipe is full.
 		 */
-		if (!ipipe->nrbufs || opipe->nrbufs >= opipe->buffers) {
+		if (pipe_empty(i_head, i_tail) ||
+		    pipe_full(o_head, o_tail, opipe->ring_size)) {
 			/* Already processed some buffers, break */
 			if (ret)
 				break;
@@ -1575,9 +1604,8 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			goto retry;
 		}
 
-		ibuf = ipipe->bufs + ipipe->curbuf;
-		nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1);
-		obuf = opipe->bufs + nbuf;
+		ibuf = &ipipe->bufs[i_tail & i_mask];
+		obuf = &opipe->bufs[o_head & o_mask];
 
 		if (len >= ibuf->len) {
 			/*
@@ -1585,10 +1613,12 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			 */
 			*obuf = *ibuf;
 			ibuf->ops = NULL;
-			opipe->nrbufs++;
-			ipipe->curbuf = (ipipe->curbuf + 1) & (ipipe->buffers - 1);
-			ipipe->nrbufs--;
+			i_tail++;
+			ipipe->tail = i_tail;
 			input_wakeup = true;
+			o_len = obuf->len;
+			o_head++;
+			opipe->head = o_head;
 		} else {
 			/*
 			 * Get a reference to this pipe buffer,
@@ -1610,12 +1640,14 @@ static int splice_pipe_to_pipe(struct pipe_inode_info *ipipe,
 			pipe_buf_mark_unmergeable(obuf);
 
 			obuf->len = len;
-			opipe->nrbufs++;
-			ibuf->offset += obuf->len;
-			ibuf->len -= obuf->len;
+			ibuf->offset += len;
+			ibuf->len -= len;
+			o_len = len;
+			o_head++;
+			opipe->head = o_head;
 		}
-		ret += obuf->len;
-		len -= obuf->len;
+		ret += o_len;
+		len -= o_len;
 	} while (len);
 
 	pipe_unlock(ipipe);
@@ -1641,7 +1673,10 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 		     size_t len, unsigned int flags)
 {
 	struct pipe_buffer *ibuf, *obuf;
-	int ret = 0, i = 0, nbuf;
+	unsigned int i_head, o_head;
+	unsigned int i_tail, o_tail;
+	unsigned int i_mask, o_mask;
+	int ret = 0;
 
 	/*
 	 * Potential ABBA deadlock, work around it by ordering lock
@@ -1650,6 +1685,11 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 	 */
 	pipe_double_lock(ipipe, opipe);
 
+	i_tail = ipipe->tail;
+	i_mask = ipipe->ring_size - 1;
+	o_head = opipe->head;
+	o_mask = opipe->ring_size - 1;
+
 	do {
 		if (!opipe->readers) {
 			send_sig(SIGPIPE, current, 0);
@@ -1658,15 +1698,19 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 			break;
 		}
 
+		i_head = ipipe->head;
+		o_tail = opipe->tail;
+
 		/*
-		 * If we have iterated all input buffers or ran out of
+		 * If we have iterated all input buffers or run out of
 		 * output room, break.
 		 */
-		if (i >= ipipe->nrbufs || opipe->nrbufs >= opipe->buffers)
+		if (pipe_empty(i_head, i_tail) ||
+		    pipe_full(o_head, o_tail, opipe->ring_size))
 			break;
 
-		ibuf = ipipe->bufs + ((ipipe->curbuf + i) & (ipipe->buffers-1));
-		nbuf = (opipe->curbuf + opipe->nrbufs) & (opipe->buffers - 1);
+		ibuf = &ipipe->bufs[i_tail & i_mask];
+		obuf = &opipe->bufs[o_head & o_mask];
 
 		/*
 		 * Get a reference to this pipe buffer,
@@ -1678,7 +1722,6 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 			break;
 		}
 
-		obuf = opipe->bufs + nbuf;
 		*obuf = *ibuf;
 
 		/*
@@ -1691,11 +1734,12 @@ static int link_pipe(struct pipe_inode_info *ipipe,
 
 		if (obuf->len > len)
 			obuf->len = len;
-
-		opipe->nrbufs++;
 		ret += obuf->len;
 		len -= obuf->len;
-		i++;
+
+		o_head++;
+		opipe->head = o_head;
+		i_tail++;
 	} while (len);
 
 	/*