NFS: merge _full and _partial write rpc_ops

Decouple nfs_pgio_header and nfs_write_data, and have (possibly
multiple) nfs_write_datas each take a refcount on nfs_pgio_header.

For the moment keeps nfs_write_header as a way to preallocate a single
nfs_write_data with the nfs_pgio_header.  The code doesn't need this,
and would be prettier without, but given the amount of churn I am
already introducing I didn't want to play with tuning new mempools.

This also fixes bug in pnfs_ld_handle_write_error.  In the case of
desc->pg_bsize < PAGE_CACHE_SIZE, the pages list was empty, causing
replay attempt to do nothing.

Signed-off-by: Fred Isaman <iisaman@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index d1a91db..d515f00 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -1199,7 +1199,9 @@
 		clear_bit(NFS_INO_LAYOUTCOMMIT, &NFS_I(hdr->inode)->flags);
 		pnfs_return_layout(hdr->inode);
 	}
-	data->task.tk_status = pnfs_write_done_resend_to_mds(hdr->inode, &hdr->pages);
+	if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags))
+		data->task.tk_status = pnfs_write_done_resend_to_mds(hdr->inode,
+								&hdr->pages);
 }
 
 /*
@@ -1214,7 +1216,6 @@
 		hdr->mds_ops->rpc_call_done(&data->task, data);
 	} else
 		pnfs_ld_handle_write_error(data);
-	put_lseg(hdr->lseg);
 	hdr->mds_ops->rpc_release(data);
 }
 EXPORT_SYMBOL_GPL(pnfs_ld_write_done);
@@ -1225,12 +1226,11 @@
 {
 	struct nfs_pgio_header *hdr = data->header;
 
-	list_splice_tail_init(&hdr->pages, &desc->pg_list);
-	if (hdr->req && list_empty(&hdr->req->wb_list))
-		nfs_list_add_request(hdr->req, &desc->pg_list);
-	nfs_pageio_reset_write_mds(desc);
-	desc->pg_recoalesce = 1;
-	put_lseg(hdr->lseg);
+	if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) {
+		list_splice_tail_init(&hdr->pages, &desc->pg_list);
+		nfs_pageio_reset_write_mds(desc);
+		desc->pg_recoalesce = 1;
+	}
 	nfs_writedata_release(data);
 }
 
@@ -1246,18 +1246,12 @@
 	struct nfs_server *nfss = NFS_SERVER(inode);
 
 	hdr->mds_ops = call_ops;
-	hdr->lseg = get_lseg(lseg);
 
 	dprintk("%s: Writing ino:%lu %u@%llu (how %d)\n", __func__,
 		inode->i_ino, wdata->args.count, wdata->args.offset, how);
-
 	trypnfs = nfss->pnfs_curr_ld->write_pagelist(wdata, how);
-	if (trypnfs == PNFS_NOT_ATTEMPTED) {
-		put_lseg(hdr->lseg);
-		hdr->lseg = NULL;
-	} else
+	if (trypnfs != PNFS_NOT_ATTEMPTED)
 		nfs_inc_stats(inode, NFSIOS_PNFS_WRITE);
-
 	dprintk("%s End (trypnfs:%d)\n", __func__, trypnfs);
 	return trypnfs;
 }
@@ -1273,7 +1267,7 @@
 	while (!list_empty(head)) {
 		enum pnfs_try_status trypnfs;
 
-		data = list_entry(head->next, struct nfs_write_data, list);
+		data = list_first_entry(head, struct nfs_write_data, list);
 		list_del_init(&data->list);
 
 		trypnfs = pnfs_try_to_write_data(data, call_ops, lseg, how);
@@ -1283,20 +1277,40 @@
 	put_lseg(lseg);
 }
 
+static void pnfs_writehdr_free(struct nfs_pgio_header *hdr)
+{
+	put_lseg(hdr->lseg);
+	nfs_writehdr_free(hdr);
+}
+
 int
 pnfs_generic_pg_writepages(struct nfs_pageio_descriptor *desc)
 {
-	LIST_HEAD(head);
+	struct nfs_write_header *whdr;
+	struct nfs_pgio_header *hdr;
 	int ret;
 
-	ret = nfs_generic_flush(desc, &head);
+	whdr = nfs_writehdr_alloc();
+	if (!whdr) {
+		nfs_async_write_error(&desc->pg_list);
+		put_lseg(desc->pg_lseg);
+		desc->pg_lseg = NULL;
+		return -ENOMEM;
+	}
+	hdr = &whdr->header;
+	nfs_pgheader_init(desc, hdr, pnfs_writehdr_free);
+	hdr->lseg = get_lseg(desc->pg_lseg);
+	atomic_inc(&hdr->refcnt);
+	ret = nfs_generic_flush(desc, hdr);
 	if (ret != 0) {
 		put_lseg(desc->pg_lseg);
 		desc->pg_lseg = NULL;
-		return ret;
-	}
-	pnfs_do_multiple_writes(desc, &head, desc->pg_ioflags);
-	return 0;
+		set_bit(NFS_IOHDR_REDO, &hdr->flags);
+	} else
+		pnfs_do_multiple_writes(desc, &hdr->rpc_list, desc->pg_ioflags);
+	if (atomic_dec_and_test(&hdr->refcnt))
+		nfs_write_completion(hdr);
+	return ret;
 }
 EXPORT_SYMBOL_GPL(pnfs_generic_pg_writepages);