pnfs: support multiple verfs per direct req

Support direct requests that span multiple pnfs data servers by
comparing nfs_pgio_header->verf to a cached verf in pnfs_commit_bucket.
Continue to use dreq->verf if the MDS is used / non-pNFS.

Signed-off-by: Weston Andros Adamson <dros@primarydata.com>
Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 2c0e08f..4ad7bc3 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -108,6 +108,97 @@
 	return atomic_dec_and_test(&dreq->io_count);
 }
 
+/*
+ * nfs_direct_select_verf - select the right verifier
+ * @dreq - direct request possibly spanning multiple servers
+ * @ds_clp - nfs_client of data server or NULL if MDS / non-pnfs
+ * @ds_idx - index of data server in data server list, only valid if ds_clp set
+ *
+ * returns the correct verifier to use given the role of the server
+ */
+static struct nfs_writeverf *
+nfs_direct_select_verf(struct nfs_direct_req *dreq,
+		       struct nfs_client *ds_clp,
+		       int ds_idx)
+{
+	struct nfs_writeverf *verfp = &dreq->verf;
+
+#ifdef CONFIG_NFS_V4_1
+	if (ds_clp) {
+		/* pNFS is in use, use the DS verf */
+		if (ds_idx >= 0 && ds_idx < dreq->ds_cinfo.nbuckets)
+			verfp = &dreq->ds_cinfo.buckets[ds_idx].direct_verf;
+		else
+			WARN_ON_ONCE(1);
+	}
+#endif
+	return verfp;
+}
+
+
+/*
+ * nfs_direct_set_hdr_verf - set the write/commit verifier
+ * @dreq - direct request possibly spanning multiple servers
+ * @hdr - pageio header to validate against previously seen verfs
+ *
+ * Set the server's (MDS or DS) "seen" verifier
+ */
+static void nfs_direct_set_hdr_verf(struct nfs_direct_req *dreq,
+				    struct nfs_pgio_header *hdr)
+{
+	struct nfs_writeverf *verfp;
+
+	verfp = nfs_direct_select_verf(dreq, hdr->data->ds_clp,
+				      hdr->data->ds_idx);
+	WARN_ON_ONCE(verfp->committed >= 0);
+	memcpy(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
+	WARN_ON_ONCE(verfp->committed < 0);
+}
+
+/*
+ * nfs_direct_cmp_hdr_verf - compare verifier for pgio header
+ * @dreq - direct request possibly spanning multiple servers
+ * @hdr - pageio header to validate against previously seen verf
+ *
+ * set the server's "seen" verf if not initialized.
+ * returns result of comparison between @hdr->verf and the "seen"
+ * verf of the server used by @hdr (DS or MDS)
+ */
+static int nfs_direct_set_or_cmp_hdr_verf(struct nfs_direct_req *dreq,
+					  struct nfs_pgio_header *hdr)
+{
+	struct nfs_writeverf *verfp;
+
+	verfp = nfs_direct_select_verf(dreq, hdr->data->ds_clp,
+					 hdr->data->ds_idx);
+	if (verfp->committed < 0) {
+		nfs_direct_set_hdr_verf(dreq, hdr);
+		return 0;
+	}
+	return memcmp(verfp, &hdr->verf, sizeof(struct nfs_writeverf));
+}
+
+#if IS_ENABLED(CONFIG_NFS_V3) || IS_ENABLED(CONFIG_NFS_V4)
+/*
+ * nfs_direct_cmp_commit_data_verf - compare verifier for commit data
+ * @dreq - direct request possibly spanning multiple servers
+ * @data - commit data to validate against previously seen verf
+ *
+ * returns result of comparison between @data->verf and the verf of
+ * the server used by @data (DS or MDS)
+ */
+static int nfs_direct_cmp_commit_data_verf(struct nfs_direct_req *dreq,
+					   struct nfs_commit_data *data)
+{
+	struct nfs_writeverf *verfp;
+
+	verfp = nfs_direct_select_verf(dreq, data->ds_clp,
+					 data->ds_commit_index);
+	WARN_ON_ONCE(verfp->committed < 0);
+	return memcmp(verfp, &data->verf, sizeof(struct nfs_writeverf));
+}
+#endif
+
 /**
  * nfs_direct_IO - NFS address space operation for direct I/O
  * @rw: direction (read or write)
@@ -168,6 +259,7 @@
 	kref_get(&dreq->kref);
 	init_completion(&dreq->completion);
 	INIT_LIST_HEAD(&dreq->mds_cinfo.list);
+	dreq->verf.committed = NFS_INVALID_STABLE_HOW;	/* not set yet */
 	INIT_WORK(&dreq->work, nfs_direct_write_schedule_work);
 	spin_lock_init(&dreq->lock);
 
@@ -602,7 +694,7 @@
 		dprintk("NFS: %5u commit failed with error %d.\n",
 			data->task.tk_pid, status);
 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
-	} else if (memcmp(&dreq->verf, &data->verf, sizeof(data->verf))) {
+	} else if (nfs_direct_cmp_commit_data_verf(dreq, data)) {
 		dprintk("NFS: %5u commit verify failed\n", data->task.tk_pid);
 		dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
 	}
@@ -811,13 +903,13 @@
 			if (dreq->flags == NFS_ODIRECT_RESCHED_WRITES)
 				bit = NFS_IOHDR_NEED_RESCHED;
 			else if (dreq->flags == 0) {
-				memcpy(&dreq->verf, &hdr->verf,
-				       sizeof(dreq->verf));
+				nfs_direct_set_hdr_verf(dreq, hdr);
 				bit = NFS_IOHDR_NEED_COMMIT;
 				dreq->flags = NFS_ODIRECT_DO_COMMIT;
 			} else if (dreq->flags == NFS_ODIRECT_DO_COMMIT) {
-				if (memcmp(&dreq->verf, &hdr->verf, sizeof(dreq->verf))) {
-					dreq->flags = NFS_ODIRECT_RESCHED_WRITES;
+				if (nfs_direct_set_or_cmp_hdr_verf(dreq, hdr)) {
+					dreq->flags =
+						NFS_ODIRECT_RESCHED_WRITES;
 					bit = NFS_IOHDR_NEED_RESCHED;
 				} else
 					bit = NFS_IOHDR_NEED_COMMIT;
diff --git a/fs/nfs/nfs4filelayout.c b/fs/nfs/nfs4filelayout.c
index 7a665e0..0ebc521 100644
--- a/fs/nfs/nfs4filelayout.c
+++ b/fs/nfs/nfs4filelayout.c
@@ -560,6 +560,7 @@
 	/* No multipath support. Use first DS */
 	atomic_inc(&ds->ds_clp->cl_count);
 	data->ds_clp = ds->ds_clp;
+	data->ds_idx = idx;
 	fh = nfs4_fl_select_ds_fh(lseg, j);
 	if (fh)
 		data->args.fh = fh;
@@ -603,6 +604,7 @@
 	data->pgio_done_cb = filelayout_write_done_cb;
 	atomic_inc(&ds->ds_clp->cl_count);
 	data->ds_clp = ds->ds_clp;
+	data->ds_idx = idx;
 	fh = nfs4_fl_select_ds_fh(lseg, j);
 	if (fh)
 		data->args.fh = fh;
@@ -875,6 +877,8 @@
 	for (i = 0; i < size; i++) {
 		INIT_LIST_HEAD(&buckets[i].written);
 		INIT_LIST_HEAD(&buckets[i].committing);
+		/* mark direct verifier as unset */
+		buckets[i].direct_verf.committed = NFS_INVALID_STABLE_HOW;
 	}
 
 	spin_lock(cinfo->lock);
@@ -885,6 +889,8 @@
 			    &buckets[i].written);
 		list_splice(&cinfo->ds->buckets[i].committing,
 			    &buckets[i].committing);
+		buckets[i].direct_verf.committed =
+			cinfo->ds->buckets[i].direct_verf.committed;
 		buckets[i].wlseg = cinfo->ds->buckets[i].wlseg;
 		buckets[i].clseg = cinfo->ds->buckets[i].clseg;
 	}
diff --git a/include/linux/nfs.h b/include/linux/nfs.h
index 3e794c1..610af51 100644
--- a/include/linux/nfs.h
+++ b/include/linux/nfs.h
@@ -46,6 +46,9 @@
 enum nfs3_stable_how {
 	NFS_UNSTABLE = 0,
 	NFS_DATA_SYNC = 1,
-	NFS_FILE_SYNC = 2
+	NFS_FILE_SYNC = 2,
+
+	/* used by direct.c to mark verf as invalid */
+	NFS_INVALID_STABLE_HOW = -1
 };
 #endif /* _LINUX_NFS_H */
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index ae63601..9a1396e 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -1112,6 +1112,7 @@
 	struct list_head committing;
 	struct pnfs_layout_segment *wlseg;
 	struct pnfs_layout_segment *clseg;
+	struct nfs_writeverf direct_verf;
 };
 
 struct pnfs_ds_commit_info {
@@ -1294,6 +1295,7 @@
 	__u64			mds_offset;	/* Filelayout dense stripe */
 	struct nfs_page_array	pages;
 	struct nfs_client	*ds_clp;	/* pNFS data server */
+	int			ds_idx;		/* ds index if ds_clp is set */
 };
 
 struct nfs_rw_header {