net/smc: add SMC-D support in data transfer

The data transfer and CDC message headers differ in SMC-R and SMC-D.
This patch adds support for the SMC-D data transfer to the existing SMC
code. It consists of the following:

* SMC-D CDC support
* SMC-D tx support
* SMC-D rx support

The CDC header is stored at the beginning of the receive buffer. Thus, a
rx_offset variable is added for the CDC header offset within the buffer
(0 for SMC-R).

Signed-off-by: Hans Wippel <hwippel@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com>
Suggested-by: Thomas Richter <tmricht@linux.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/smc/smc_cdc.c b/net/smc/smc_cdc.c
index a7e8d63..621d8cc 100644
--- a/net/smc/smc_cdc.c
+++ b/net/smc/smc_cdc.c
@@ -117,7 +117,7 @@ int smc_cdc_msg_send(struct smc_connection *conn,
 	return rc;
 }
 
-int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
+static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn)
 {
 	struct smc_cdc_tx_pend *pend;
 	struct smc_wr_buf *wr_buf;
@@ -130,6 +130,21 @@ int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
 	return smc_cdc_msg_send(conn, wr_buf, pend);
 }
 
+int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
+{
+	int rc;
+
+	if (conn->lgr->is_smcd) {
+		spin_lock_bh(&conn->send_lock);
+		rc = smcd_cdc_msg_send(conn);
+		spin_unlock_bh(&conn->send_lock);
+	} else {
+		rc = smcr_cdc_get_slot_and_msg_send(conn);
+	}
+
+	return rc;
+}
+
 static bool smc_cdc_tx_filter(struct smc_wr_tx_pend_priv *tx_pend,
 			      unsigned long data)
 {
@@ -157,6 +172,45 @@ void smc_cdc_tx_dismiss_slots(struct smc_connection *conn)
 				(unsigned long)conn);
 }
 
+/* Send a SMC-D CDC header.
+ * This increments the free space available in our send buffer.
+ * Also update the confirmed receive buffer with what was sent to the peer.
+ */
+int smcd_cdc_msg_send(struct smc_connection *conn)
+{
+	struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
+	struct smcd_cdc_msg cdc;
+	int rc, diff;
+
+	memset(&cdc, 0, sizeof(cdc));
+	cdc.common.type = SMC_CDC_MSG_TYPE;
+	cdc.prod_wrap = conn->local_tx_ctrl.prod.wrap;
+	cdc.prod_count = conn->local_tx_ctrl.prod.count;
+
+	cdc.cons_wrap = conn->local_tx_ctrl.cons.wrap;
+	cdc.cons_count = conn->local_tx_ctrl.cons.count;
+	cdc.prod_flags = conn->local_tx_ctrl.prod_flags;
+	cdc.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
+	rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1);
+	if (rc)
+		return rc;
+	smc_curs_write(&conn->rx_curs_confirmed,
+		       smc_curs_read(&conn->local_tx_ctrl.cons, conn), conn);
+	/* Calculate transmitted data and increment free send buffer space */
+	diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
+			     &conn->tx_curs_sent);
+	/* increased by confirmed number of bytes */
+	smp_mb__before_atomic();
+	atomic_add(diff, &conn->sndbuf_space);
+	/* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
+	smp_mb__after_atomic();
+	smc_curs_write(&conn->tx_curs_fin,
+		       smc_curs_read(&conn->tx_curs_sent, conn), conn);
+
+	smc_tx_sndbuf_nonfull(smc);
+	return rc;
+}
+
 /********************************* receive ***********************************/
 
 static inline bool smc_cdc_before(u16 seq1, u16 seq2)
@@ -178,7 +232,7 @@ static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc,
 	if (!sock_flag(&smc->sk, SOCK_URGINLINE))
 		/* we'll skip the urgent byte, so don't account for it */
 		(*diff_prod)--;
-	base = (char *)conn->rmb_desc->cpu_addr;
+	base = (char *)conn->rmb_desc->cpu_addr + conn->rx_off;
 	if (conn->urg_curs.count)
 		conn->urg_rx_byte = *(base + conn->urg_curs.count - 1);
 	else
@@ -276,6 +330,34 @@ static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
 	sock_put(&smc->sk); /* no free sk in softirq-context */
 }
 
+/* Schedule a tasklet for this connection. Triggered from the ISM device IRQ
+ * handler to indicate update in the DMBE.
+ *
+ * Context:
+ * - tasklet context
+ */
+static void smcd_cdc_rx_tsklet(unsigned long data)
+{
+	struct smc_connection *conn = (struct smc_connection *)data;
+	struct smcd_cdc_msg cdc;
+	struct smc_sock *smc;
+
+	if (!conn)
+		return;
+
+	memcpy(&cdc, conn->rmb_desc->cpu_addr, sizeof(cdc));
+	smc = container_of(conn, struct smc_sock, conn);
+	smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc);
+}
+
+/* Initialize receive tasklet. Called from ISM device IRQ handler to start
+ * receiver side.
+ */
+void smcd_cdc_rx_init(struct smc_connection *conn)
+{
+	tasklet_init(&conn->rx_tsklet, smcd_cdc_rx_tsklet, (unsigned long)conn);
+}
+
 /***************************** init, exit, misc ******************************/
 
 static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf)