net/smc: register RMB-related memory region

A memory region created for a new RMB must be registered explicitly,
before the peer can make use of it for remote DMA transfer.

Signed-off-by: Ursula Braun <ubraun@linux.vnet.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/smc/af_smc.c b/net/smc/af_smc.c
index 120a7b9..e0a95d5 100644
--- a/net/smc/af_smc.c
+++ b/net/smc/af_smc.c
@@ -338,6 +338,12 @@ static int smc_clnt_conf_first_link(struct smc_sock *smc, union ib_gid *gid)
 		return SMC_CLC_DECL_INTERR;
 
 	smc_wr_remember_qp_attr(link);
+
+	rc = smc_wr_reg_send(link,
+			     smc->conn.rmb_desc->mr_rx[SMC_SINGLE_LINK]);
+	if (rc)
+		return SMC_CLC_DECL_INTERR;
+
 	/* send CONFIRM LINK response over RoCE fabric */
 	rc = smc_llc_send_confirm_link(link,
 				       link->smcibdev->mac[link->ibport - 1],
@@ -459,6 +465,18 @@ static int smc_connect_rdma(struct smc_sock *smc)
 			reason_code = SMC_CLC_DECL_INTERR;
 			goto decline_rdma_unlock;
 		}
+	} else {
+		struct smc_buf_desc *buf_desc = smc->conn.rmb_desc;
+
+		if (!buf_desc->reused) {
+			/* register memory region for new rmb */
+			rc = smc_wr_reg_send(link,
+					     buf_desc->mr_rx[SMC_SINGLE_LINK]);
+			if (rc) {
+				reason_code = SMC_CLC_DECL_INTERR;
+				goto decline_rdma_unlock;
+			}
+		}
 	}
 
 	rc = smc_clc_send_confirm(smc);
@@ -692,6 +710,12 @@ static int smc_serv_conf_first_link(struct smc_sock *smc)
 	int rc;
 
 	link = &lgr->lnk[SMC_SINGLE_LINK];
+
+	rc = smc_wr_reg_send(link,
+			     smc->conn.rmb_desc->mr_rx[SMC_SINGLE_LINK]);
+	if (rc)
+		return SMC_CLC_DECL_INTERR;
+
 	/* send CONFIRM LINK request to client over the RoCE fabric */
 	rc = smc_llc_send_confirm_link(link,
 				       link->smcibdev->mac[link->ibport - 1],
@@ -803,6 +827,20 @@ static void smc_listen_work(struct work_struct *work)
 	smc_close_init(new_smc);
 	smc_rx_init(new_smc);
 
+	if (local_contact != SMC_FIRST_CONTACT) {
+		struct smc_buf_desc *buf_desc = new_smc->conn.rmb_desc;
+
+		if (!buf_desc->reused) {
+			/* register memory region for new rmb */
+			rc = smc_wr_reg_send(link,
+					     buf_desc->mr_rx[SMC_SINGLE_LINK]);
+			if (rc) {
+				reason_code = SMC_CLC_DECL_INTERR;
+				goto decline_rdma;
+			}
+		}
+	}
+
 	rc = smc_clc_send_accept(new_smc, local_contact);
 	if (rc)
 		goto out_err;
diff --git a/net/smc/smc_core.c b/net/smc/smc_core.c
index f1dd4e1..87bb3e4 100644
--- a/net/smc/smc_core.c
+++ b/net/smc/smc_core.c
@@ -175,7 +175,6 @@ static int smc_lgr_create(struct smc_sock *smc, __be32 peer_in_addr,
 	rc = smc_wr_alloc_link_mem(lnk);
 	if (rc)
 		goto free_lgr;
-	init_waitqueue_head(&lnk->wr_tx_wait);
 	rc = smc_ib_create_protection_domain(lnk);
 	if (rc)
 		goto free_link_mem;
diff --git a/net/smc/smc_core.h b/net/smc/smc_core.h
index 17b5fea..f7b40bd 100644
--- a/net/smc/smc_core.h
+++ b/net/smc/smc_core.h
@@ -37,6 +37,14 @@ struct smc_wr_buf {
 	u8	raw[SMC_WR_BUF_SIZE];
 };
 
+#define SMC_WR_REG_MR_WAIT_TIME	(5 * HZ)/* wait time for ib_wr_reg_mr result */
+
+enum smc_wr_reg_state {
+	POSTED,		/* ib_wr_reg_mr request posted */
+	CONFIRMED,	/* ib_wr_reg_mr response: successful */
+	FAILED		/* ib_wr_reg_mr response: failure */
+};
+
 struct smc_link {
 	struct smc_ib_device	*smcibdev;	/* ib-device */
 	u8			ibport;		/* port - values 1 | 2 */
@@ -65,6 +73,10 @@ struct smc_link {
 	u64			wr_rx_id;	/* seq # of last recv WR */
 	u32			wr_rx_cnt;	/* number of WR recv buffers */
 
+	struct ib_reg_wr	wr_reg;		/* WR register memory region */
+	wait_queue_head_t	wr_reg_wait;	/* wait for wr_reg result */
+	enum smc_wr_reg_state	wr_reg_state;	/* state of wr_reg request */
+
 	union ib_gid		gid;		/* gid matching used vlan id */
 	u32			peer_qpn;	/* QP number of peer */
 	enum ib_mtu		path_mtu;	/* used mtu */
diff --git a/net/smc/smc_ib.c b/net/smc/smc_ib.c
index 0823349..85e1831 100644
--- a/net/smc/smc_ib.c
+++ b/net/smc/smc_ib.c
@@ -231,10 +231,10 @@ int smc_ib_create_queue_pair(struct smc_link *lnk)
 		.recv_cq = lnk->smcibdev->roce_cq_recv,
 		.srq = NULL,
 		.cap = {
-			.max_send_wr = SMC_WR_BUF_CNT,
 				/* include unsolicited rdma_writes as well,
 				 * there are max. 2 RDMA_WRITE per 1 WR_SEND
 				 */
+			.max_send_wr = SMC_WR_BUF_CNT * 3,
 			.max_recv_wr = SMC_WR_BUF_CNT * 3,
 			.max_send_sge = SMC_IB_MAX_SEND_SGE,
 			.max_recv_sge = 1,
diff --git a/net/smc/smc_wr.c b/net/smc/smc_wr.c
index 874ee9f..ab56bda 100644
--- a/net/smc/smc_wr.c
+++ b/net/smc/smc_wr.c
@@ -68,6 +68,16 @@ static inline void smc_wr_tx_process_cqe(struct ib_wc *wc)
 	int i;
 
 	link = wc->qp->qp_context;
+
+	if (wc->opcode == IB_WC_REG_MR) {
+		if (wc->status)
+			link->wr_reg_state = FAILED;
+		else
+			link->wr_reg_state = CONFIRMED;
+		wake_up(&link->wr_reg_wait);
+		return;
+	}
+
 	pnd_snd_idx = smc_wr_tx_find_pending_index(link, wc->wr_id);
 	if (pnd_snd_idx == link->wr_tx_cnt)
 		return;
@@ -243,6 +253,52 @@ int smc_wr_tx_send(struct smc_link *link, struct smc_wr_tx_pend_priv *priv)
 	return rc;
 }
 
+/* Register a memory region and wait for result. */
+int smc_wr_reg_send(struct smc_link *link, struct ib_mr *mr)
+{
+	struct ib_send_wr *failed_wr = NULL;
+	int rc;
+
+	ib_req_notify_cq(link->smcibdev->roce_cq_send,
+			 IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
+	link->wr_reg_state = POSTED;
+	link->wr_reg.wr.wr_id = (u64)(uintptr_t)mr;
+	link->wr_reg.mr = mr;
+	link->wr_reg.key = mr->rkey;
+	failed_wr = &link->wr_reg.wr;
+	rc = ib_post_send(link->roce_qp, &link->wr_reg.wr, &failed_wr);
+	WARN_ON(failed_wr != &link->wr_reg.wr);
+	if (rc)
+		return rc;
+
+	rc = wait_event_interruptible_timeout(link->wr_reg_wait,
+					      (link->wr_reg_state != POSTED),
+					      SMC_WR_REG_MR_WAIT_TIME);
+	if (!rc) {
+		/* timeout - terminate connections */
+		struct smc_link_group *lgr;
+
+		lgr = container_of(link, struct smc_link_group,
+				   lnk[SMC_SINGLE_LINK]);
+		smc_lgr_terminate(lgr);
+		return -EPIPE;
+	}
+	if (rc == -ERESTARTSYS)
+		return -EINTR;
+	switch (link->wr_reg_state) {
+	case CONFIRMED:
+		rc = 0;
+		break;
+	case FAILED:
+		rc = -EIO;
+		break;
+	case POSTED:
+		rc = -EPIPE;
+		break;
+	}
+	return rc;
+}
+
 void smc_wr_tx_dismiss_slots(struct smc_link *link, u8 wr_rx_hdr_type,
 			     smc_wr_tx_filter filter,
 			     smc_wr_tx_dismisser dismisser,
@@ -458,6 +514,11 @@ static void smc_wr_init_sge(struct smc_link *lnk)
 		lnk->wr_rx_ibs[i].sg_list = &lnk->wr_rx_sges[i];
 		lnk->wr_rx_ibs[i].num_sge = 1;
 	}
+	lnk->wr_reg.wr.next = NULL;
+	lnk->wr_reg.wr.num_sge = 0;
+	lnk->wr_reg.wr.send_flags = IB_SEND_SIGNALED;
+	lnk->wr_reg.wr.opcode = IB_WR_REG_MR;
+	lnk->wr_reg.access = IB_ACCESS_LOCAL_WRITE | IB_ACCESS_REMOTE_WRITE;
 }
 
 void smc_wr_free_link(struct smc_link *lnk)
@@ -602,6 +663,8 @@ int smc_wr_create_link(struct smc_link *lnk)
 	smc_wr_init_sge(lnk);
 	memset(lnk->wr_tx_mask, 0,
 	       BITS_TO_LONGS(SMC_WR_BUF_CNT) * sizeof(*lnk->wr_tx_mask));
+	init_waitqueue_head(&lnk->wr_tx_wait);
+	init_waitqueue_head(&lnk->wr_reg_wait);
 	return rc;
 
 dma_unmap:
diff --git a/net/smc/smc_wr.h b/net/smc/smc_wr.h
index 0b9beed..45eb538 100644
--- a/net/smc/smc_wr.h
+++ b/net/smc/smc_wr.h
@@ -102,5 +102,6 @@ void smc_wr_tx_dismiss_slots(struct smc_link *lnk, u8 wr_rx_hdr_type,
 int smc_wr_rx_register_handler(struct smc_wr_rx_handler *handler);
 int smc_wr_rx_post_init(struct smc_link *link);
 void smc_wr_rx_cq_handler(struct ib_cq *ib_cq, void *cq_context);
+int smc_wr_reg_send(struct smc_link *link, struct ib_mr *mr);
 
 #endif /* SMC_WR_H */