CIFS: Add asynchronous write support through kernel AIO

This patch adds support to process write calls passed by io_submit()
asynchronously. It based on the previously introduced async context
that allows to process i/o responses in a separate thread and
return the caller immediately for asynchronous calls.

This improves writing performance of single threaded applications
with increasing of i/o queue depth size.

Signed-off-by: Pavel Shilovsky <pshilov@microsoft.com>
Signed-off-by: Steve French <smfrench@gmail.com>
diff --git a/fs/cifs/file.c b/fs/cifs/file.c
index 9564e2c..6ef78ad 100644
--- a/fs/cifs/file.c
+++ b/fs/cifs/file.c
@@ -2458,11 +2458,14 @@ cifs_uncached_writedata_release(struct kref *refcount)
 	struct cifs_writedata *wdata = container_of(refcount,
 					struct cifs_writedata, refcount);
 
+	kref_put(&wdata->ctx->refcount, cifs_aio_ctx_release);
 	for (i = 0; i < wdata->nr_pages; i++)
 		put_page(wdata->pages[i]);
 	cifs_writedata_release(refcount);
 }
 
+static void collect_uncached_write_data(struct cifs_aio_ctx *ctx);
+
 static void
 cifs_uncached_writev_complete(struct work_struct *work)
 {
@@ -2478,7 +2481,8 @@ cifs_uncached_writev_complete(struct work_struct *work)
 	spin_unlock(&inode->i_lock);
 
 	complete(&wdata->done);
-
+	collect_uncached_write_data(wdata->ctx);
+	/* the below call can possibly free the last ref to aio ctx */
 	kref_put(&wdata->refcount, cifs_uncached_writedata_release);
 }
 
@@ -2527,7 +2531,8 @@ wdata_fill_from_iovec(struct cifs_writedata *wdata, struct iov_iter *from,
 static int
 cifs_write_from_iter(loff_t offset, size_t len, struct iov_iter *from,
 		     struct cifsFileInfo *open_file,
-		     struct cifs_sb_info *cifs_sb, struct list_head *wdata_list)
+		     struct cifs_sb_info *cifs_sb, struct list_head *wdata_list,
+		     struct cifs_aio_ctx *ctx)
 {
 	int rc = 0;
 	size_t cur_len;
@@ -2595,6 +2600,8 @@ cifs_write_from_iter(loff_t offset, size_t len, struct iov_iter *from,
 		wdata->pagesz = PAGE_SIZE;
 		wdata->tailsz = cur_len - ((nr_pages - 1) * PAGE_SIZE);
 		wdata->credits = credits;
+		wdata->ctx = ctx;
+		kref_get(&ctx->refcount);
 
 		if (!wdata->cfile->invalidHandle ||
 		    !(rc = cifs_reopen_file(wdata->cfile, false)))
@@ -2620,15 +2627,95 @@ cifs_write_from_iter(loff_t offset, size_t len, struct iov_iter *from,
 	return rc;
 }
 
+static void collect_uncached_write_data(struct cifs_aio_ctx *ctx)
+{
+	struct cifs_writedata *wdata, *tmp;
+	struct cifs_tcon *tcon;
+	struct cifs_sb_info *cifs_sb;
+	struct dentry *dentry = ctx->cfile->dentry;
+	unsigned int i;
+	int rc;
+
+	tcon = tlink_tcon(ctx->cfile->tlink);
+	cifs_sb = CIFS_SB(dentry->d_sb);
+
+	mutex_lock(&ctx->aio_mutex);
+
+	if (list_empty(&ctx->list)) {
+		mutex_unlock(&ctx->aio_mutex);
+		return;
+	}
+
+	rc = ctx->rc;
+	/*
+	 * Wait for and collect replies for any successful sends in order of
+	 * increasing offset. Once an error is hit, then return without waiting
+	 * for any more replies.
+	 */
+restart_loop:
+	list_for_each_entry_safe(wdata, tmp, &ctx->list, list) {
+		if (!rc) {
+			if (!try_wait_for_completion(&wdata->done)) {
+				mutex_unlock(&ctx->aio_mutex);
+				return;
+			}
+
+			if (wdata->result)
+				rc = wdata->result;
+			else
+				ctx->total_len += wdata->bytes;
+
+			/* resend call if it's a retryable error */
+			if (rc == -EAGAIN) {
+				struct list_head tmp_list;
+				struct iov_iter tmp_from = ctx->iter;
+
+				INIT_LIST_HEAD(&tmp_list);
+				list_del_init(&wdata->list);
+
+				iov_iter_advance(&tmp_from,
+						 wdata->offset - ctx->pos);
+
+				rc = cifs_write_from_iter(wdata->offset,
+						wdata->bytes, &tmp_from,
+						ctx->cfile, cifs_sb, &tmp_list,
+						ctx);
+
+				list_splice(&tmp_list, &ctx->list);
+
+				kref_put(&wdata->refcount,
+					 cifs_uncached_writedata_release);
+				goto restart_loop;
+			}
+		}
+		list_del_init(&wdata->list);
+		kref_put(&wdata->refcount, cifs_uncached_writedata_release);
+	}
+
+	for (i = 0; i < ctx->npages; i++)
+		put_page(ctx->bv[i].bv_page);
+
+	cifs_stats_bytes_written(tcon, ctx->total_len);
+	set_bit(CIFS_INO_INVALID_MAPPING, &CIFS_I(dentry->d_inode)->flags);
+
+	ctx->rc = (rc == 0) ? ctx->total_len : rc;
+
+	mutex_unlock(&ctx->aio_mutex);
+
+	if (ctx->iocb && ctx->iocb->ki_complete)
+		ctx->iocb->ki_complete(ctx->iocb, ctx->rc, 0);
+	else
+		complete(&ctx->done);
+}
+
 ssize_t cifs_user_writev(struct kiocb *iocb, struct iov_iter *from)
 {
 	struct file *file = iocb->ki_filp;
 	ssize_t total_written = 0;
-	struct cifsFileInfo *open_file;
+	struct cifsFileInfo *cfile;
 	struct cifs_tcon *tcon;
 	struct cifs_sb_info *cifs_sb;
-	struct cifs_writedata *wdata, *tmp;
-	struct list_head wdata_list;
+	struct cifs_aio_ctx *ctx;
 	struct iov_iter saved_from = *from;
 	int rc;
 
@@ -2642,16 +2729,35 @@ ssize_t cifs_user_writev(struct kiocb *iocb, struct iov_iter *from)
 	if (rc <= 0)
 		return rc;
 
-	INIT_LIST_HEAD(&wdata_list);
 	cifs_sb = CIFS_FILE_SB(file);
-	open_file = file->private_data;
-	tcon = tlink_tcon(open_file->tlink);
+	cfile = file->private_data;
+	tcon = tlink_tcon(cfile->tlink);
 
 	if (!tcon->ses->server->ops->async_writev)
 		return -ENOSYS;
 
-	rc = cifs_write_from_iter(iocb->ki_pos, iov_iter_count(from), from,
-				  open_file, cifs_sb, &wdata_list);
+	ctx = cifs_aio_ctx_alloc();
+	if (!ctx)
+		return -ENOMEM;
+
+	ctx->cfile = cifsFileInfo_get(cfile);
+
+	if (!is_sync_kiocb(iocb))
+		ctx->iocb = iocb;
+
+	ctx->pos = iocb->ki_pos;
+
+	rc = setup_aio_ctx_iter(ctx, from, WRITE);
+	if (rc) {
+		kref_put(&ctx->refcount, cifs_aio_ctx_release);
+		return rc;
+	}
+
+	/* grab a lock here due to read response handlers can access ctx */
+	mutex_lock(&ctx->aio_mutex);
+
+	rc = cifs_write_from_iter(iocb->ki_pos, ctx->len, &saved_from,
+				  cfile, cifs_sb, &ctx->list, ctx);
 
 	/*
 	 * If at least one write was successfully sent, then discard any rc
@@ -2659,58 +2765,38 @@ ssize_t cifs_user_writev(struct kiocb *iocb, struct iov_iter *from)
 	 * we'll end up returning whatever was written. If it fails, then
 	 * we'll get a new rc value from that.
 	 */
-	if (!list_empty(&wdata_list))
+	if (!list_empty(&ctx->list))
 		rc = 0;
 
-	/*
-	 * Wait for and collect replies for any successful sends in order of
-	 * increasing offset. Once an error is hit or we get a fatal signal
-	 * while waiting, then return without waiting for any more replies.
-	 */
-restart_loop:
-	list_for_each_entry_safe(wdata, tmp, &wdata_list, list) {
-		if (!rc) {
-			/* FIXME: freezable too? */
-			rc = wait_for_completion_killable(&wdata->done);
-			if (rc)
-				rc = -EINTR;
-			else if (wdata->result)
-				rc = wdata->result;
-			else
-				total_written += wdata->bytes;
+	mutex_unlock(&ctx->aio_mutex);
 
-			/* resend call if it's a retryable error */
-			if (rc == -EAGAIN) {
-				struct list_head tmp_list;
-				struct iov_iter tmp_from = saved_from;
-
-				INIT_LIST_HEAD(&tmp_list);
-				list_del_init(&wdata->list);
-
-				iov_iter_advance(&tmp_from,
-						 wdata->offset - iocb->ki_pos);
-
-				rc = cifs_write_from_iter(wdata->offset,
-						wdata->bytes, &tmp_from,
-						open_file, cifs_sb, &tmp_list);
-
-				list_splice(&tmp_list, &wdata_list);
-
-				kref_put(&wdata->refcount,
-					 cifs_uncached_writedata_release);
-				goto restart_loop;
-			}
-		}
-		list_del_init(&wdata->list);
-		kref_put(&wdata->refcount, cifs_uncached_writedata_release);
+	if (rc) {
+		kref_put(&ctx->refcount, cifs_aio_ctx_release);
+		return rc;
 	}
 
+	if (!is_sync_kiocb(iocb)) {
+		kref_put(&ctx->refcount, cifs_aio_ctx_release);
+		return -EIOCBQUEUED;
+	}
+
+	rc = wait_for_completion_killable(&ctx->done);
+	if (rc) {
+		mutex_lock(&ctx->aio_mutex);
+		ctx->rc = rc = -EINTR;
+		total_written = ctx->total_len;
+		mutex_unlock(&ctx->aio_mutex);
+	} else {
+		rc = ctx->rc;
+		total_written = ctx->total_len;
+	}
+
+	kref_put(&ctx->refcount, cifs_aio_ctx_release);
+
 	if (unlikely(!total_written))
 		return rc;
 
 	iocb->ki_pos += total_written;
-	set_bit(CIFS_INO_INVALID_MAPPING, &CIFS_I(file_inode(file))->flags);
-	cifs_stats_bytes_written(tcon, total_written);
 	return total_written;
 }