NTB: Fix ntb_transport out-of-order RX update

It was possible for a synchronous update of the RX index in the error
case to get ahead of the asynchronous RX index update in the normal
case.  Change the RX processing to preserve an RX completion order.

There were two error cases.  First, if a buffer is not present to
receive data, there would be no queue entry to preserve the RX
completion order.  Instead of dropping the RX frame, leave the RX frame
in the ring.  Schedule RX processing when RX entries are enqueued, in
case there are RX frames waiting in the ring to be received.

Second, if a buffer is too small to receive data, drop the frame in the
ring, mark the RX entry as done, and indicate the error in the RX entry
length.  Check for a negative length in the receive callback in
ntb_netdev, and count occurrences as rx_length_errors.

Signed-off-by: Allen Hubbe <Allen.Hubbe@emc.com>
Signed-off-by: Jon Mason <jdmason@kudzu.us>
diff --git a/drivers/ntb/ntb_transport.c b/drivers/ntb/ntb_transport.c
index efe3ad4..98e58c7 100644
--- a/drivers/ntb/ntb_transport.c
+++ b/drivers/ntb/ntb_transport.c
@@ -142,10 +142,11 @@
 
 	void (*rx_handler)(struct ntb_transport_qp *qp, void *qp_data,
 			   void *data, int len);
+	struct list_head rx_post_q;
 	struct list_head rx_pend_q;
 	struct list_head rx_free_q;
-	spinlock_t ntb_rx_pend_q_lock;
-	spinlock_t ntb_rx_free_q_lock;
+	/* ntb_rx_q_lock: synchronize access to rx_XXXX_q */
+	spinlock_t ntb_rx_q_lock;
 	void *rx_buff;
 	unsigned int rx_index;
 	unsigned int rx_max_entry;
@@ -534,6 +535,27 @@
 	return entry;
 }
 
+static struct ntb_queue_entry *ntb_list_mv(spinlock_t *lock,
+					   struct list_head *list,
+					   struct list_head *to_list)
+{
+	struct ntb_queue_entry *entry;
+	unsigned long flags;
+
+	spin_lock_irqsave(lock, flags);
+
+	if (list_empty(list)) {
+		entry = NULL;
+	} else {
+		entry = list_first_entry(list, struct ntb_queue_entry, entry);
+		list_move_tail(&entry->entry, to_list);
+	}
+
+	spin_unlock_irqrestore(lock, flags);
+
+	return entry;
+}
+
 static int ntb_transport_setup_qp_mw(struct ntb_transport_ctx *nt,
 				     unsigned int qp_num)
 {
@@ -941,10 +963,10 @@
 	INIT_DELAYED_WORK(&qp->link_work, ntb_qp_link_work);
 	INIT_WORK(&qp->link_cleanup, ntb_qp_link_cleanup_work);
 
-	spin_lock_init(&qp->ntb_rx_pend_q_lock);
-	spin_lock_init(&qp->ntb_rx_free_q_lock);
+	spin_lock_init(&qp->ntb_rx_q_lock);
 	spin_lock_init(&qp->ntb_tx_free_q_lock);
 
+	INIT_LIST_HEAD(&qp->rx_post_q);
 	INIT_LIST_HEAD(&qp->rx_pend_q);
 	INIT_LIST_HEAD(&qp->rx_free_q);
 	INIT_LIST_HEAD(&qp->tx_free_q);
@@ -1107,22 +1129,47 @@
 	kfree(nt);
 }
 
+static void ntb_complete_rxc(struct ntb_transport_qp *qp)
+{
+	struct ntb_queue_entry *entry;
+	void *cb_data;
+	unsigned int len;
+	unsigned long irqflags;
+
+	spin_lock_irqsave(&qp->ntb_rx_q_lock, irqflags);
+
+	while (!list_empty(&qp->rx_post_q)) {
+		entry = list_first_entry(&qp->rx_post_q,
+					 struct ntb_queue_entry, entry);
+		if (!(entry->flags & DESC_DONE_FLAG))
+			break;
+
+		entry->rx_hdr->flags = 0;
+		iowrite32(entry->index, &qp->rx_info->entry);
+
+		cb_data = entry->cb_data;
+		len = entry->len;
+
+		list_move_tail(&entry->entry, &qp->rx_free_q);
+
+		spin_unlock_irqrestore(&qp->ntb_rx_q_lock, irqflags);
+
+		if (qp->rx_handler && qp->client_ready)
+			qp->rx_handler(qp, qp->cb_data, cb_data, len);
+
+		spin_lock_irqsave(&qp->ntb_rx_q_lock, irqflags);
+	}
+
+	spin_unlock_irqrestore(&qp->ntb_rx_q_lock, irqflags);
+}
+
 static void ntb_rx_copy_callback(void *data)
 {
 	struct ntb_queue_entry *entry = data;
-	struct ntb_transport_qp *qp = entry->qp;
-	void *cb_data = entry->cb_data;
-	unsigned int len = entry->len;
-	struct ntb_payload_header *hdr = entry->rx_hdr;
 
-	hdr->flags = 0;
+	entry->flags |= DESC_DONE_FLAG;
 
-	iowrite32(entry->index, &qp->rx_info->entry);
-
-	ntb_list_add(&qp->ntb_rx_free_q_lock, &entry->entry, &qp->rx_free_q);
-
-	if (qp->rx_handler && qp->client_ready)
-		qp->rx_handler(qp, qp->cb_data, cb_data, len);
+	ntb_complete_rxc(entry->qp);
 }
 
 static void ntb_memcpy_rx(struct ntb_queue_entry *entry, void *offset)
@@ -1138,19 +1185,18 @@
 	ntb_rx_copy_callback(entry);
 }
 
-static void ntb_async_rx(struct ntb_queue_entry *entry, void *offset,
-			 size_t len)
+static void ntb_async_rx(struct ntb_queue_entry *entry, void *offset)
 {
 	struct dma_async_tx_descriptor *txd;
 	struct ntb_transport_qp *qp = entry->qp;
 	struct dma_chan *chan = qp->dma_chan;
 	struct dma_device *device;
-	size_t pay_off, buff_off;
+	size_t pay_off, buff_off, len;
 	struct dmaengine_unmap_data *unmap;
 	dma_cookie_t cookie;
 	void *buf = entry->buf;
 
-	entry->len = len;
+	len = entry->len;
 
 	if (!chan)
 		goto err;
@@ -1226,7 +1272,6 @@
 	struct ntb_payload_header *hdr;
 	struct ntb_queue_entry *entry;
 	void *offset;
-	int rc;
 
 	offset = qp->rx_buff + qp->rx_max_frame * qp->rx_index;
 	hdr = offset + qp->rx_max_frame - sizeof(struct ntb_payload_header);
@@ -1255,65 +1300,43 @@
 		return -EIO;
 	}
 
-	entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q);
+	entry = ntb_list_mv(&qp->ntb_rx_q_lock, &qp->rx_pend_q, &qp->rx_post_q);
 	if (!entry) {
 		dev_dbg(&qp->ndev->pdev->dev, "no receive buffer\n");
 		qp->rx_err_no_buf++;
-
-		rc = -ENOMEM;
-		goto err;
+		return -EAGAIN;
 	}
 
+	entry->rx_hdr = hdr;
+	entry->index = qp->rx_index;
+
 	if (hdr->len > entry->len) {
 		dev_dbg(&qp->ndev->pdev->dev,
 			"receive buffer overflow! Wanted %d got %d\n",
 			hdr->len, entry->len);
 		qp->rx_err_oflow++;
 
-		rc = -EIO;
-		goto err;
+		entry->len = -EIO;
+		entry->flags |= DESC_DONE_FLAG;
+
+		ntb_complete_rxc(qp);
+	} else {
+		dev_dbg(&qp->ndev->pdev->dev,
+			"RX OK index %u ver %u size %d into buf size %d\n",
+			qp->rx_index, hdr->ver, hdr->len, entry->len);
+
+		qp->rx_bytes += hdr->len;
+		qp->rx_pkts++;
+
+		entry->len = hdr->len;
+
+		ntb_async_rx(entry, offset);
 	}
 
-	dev_dbg(&qp->ndev->pdev->dev,
-		"RX OK index %u ver %u size %d into buf size %d\n",
-		qp->rx_index, hdr->ver, hdr->len, entry->len);
-
-	qp->rx_bytes += hdr->len;
-	qp->rx_pkts++;
-
-	entry->index = qp->rx_index;
-	entry->rx_hdr = hdr;
-
-	ntb_async_rx(entry, offset, hdr->len);
-
 	qp->rx_index++;
 	qp->rx_index %= qp->rx_max_entry;
 
 	return 0;
-
-err:
-	/* FIXME: if this syncrhonous update of the rx_index gets ahead of
-	 * asyncrhonous ntb_rx_copy_callback of previous entry, there are three
-	 * scenarios:
-	 *
-	 * 1) The peer might miss this update, but observe the update
-	 * from the memcpy completion callback.  In this case, the buffer will
-	 * not be freed on the peer to be reused for a different packet.  The
-	 * successful rx of a later packet would clear the condition, but the
-	 * condition could persist if several rx fail in a row.
-	 *
-	 * 2) The peer may observe this update before the asyncrhonous copy of
-	 * prior packets is completed.  The peer may overwrite the buffers of
-	 * the prior packets before they are copied.
-	 *
-	 * 3) Both: the peer may observe the update, and then observe the index
-	 * decrement by the asynchronous completion callback.  Who knows what
-	 * badness that will cause.
-	 */
-	hdr->flags = 0;
-	iowrite32(qp->rx_index, &qp->rx_info->entry);
-
-	return rc;
 }
 
 static void ntb_transport_rxc_db(unsigned long data)
@@ -1333,7 +1356,7 @@
 			break;
 	}
 
-	if (qp->dma_chan)
+	if (i && qp->dma_chan)
 		dma_async_issue_pending(qp->dma_chan);
 
 	if (i == qp->rx_max_entry) {
@@ -1609,7 +1632,7 @@
 			goto err1;
 
 		entry->qp = qp;
-		ntb_list_add(&qp->ntb_rx_free_q_lock, &entry->entry,
+		ntb_list_add(&qp->ntb_rx_q_lock, &entry->entry,
 			     &qp->rx_free_q);
 	}
 
@@ -1634,7 +1657,7 @@
 	while ((entry = ntb_list_rm(&qp->ntb_tx_free_q_lock, &qp->tx_free_q)))
 		kfree(entry);
 err1:
-	while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q)))
+	while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_free_q)))
 		kfree(entry);
 	if (qp->dma_chan)
 		dma_release_channel(qp->dma_chan);
@@ -1689,11 +1712,16 @@
 	qp->tx_handler = NULL;
 	qp->event_handler = NULL;
 
-	while ((entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q)))
+	while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_free_q)))
 		kfree(entry);
 
-	while ((entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q))) {
-		dev_warn(&pdev->dev, "Freeing item from a non-empty queue\n");
+	while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_pend_q))) {
+		dev_warn(&pdev->dev, "Freeing item from non-empty rx_pend_q\n");
+		kfree(entry);
+	}
+
+	while ((entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_post_q))) {
+		dev_warn(&pdev->dev, "Freeing item from non-empty rx_post_q\n");
 		kfree(entry);
 	}
 
@@ -1724,14 +1752,14 @@
 	if (!qp || qp->client_ready)
 		return NULL;
 
-	entry = ntb_list_rm(&qp->ntb_rx_pend_q_lock, &qp->rx_pend_q);
+	entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_pend_q);
 	if (!entry)
 		return NULL;
 
 	buf = entry->cb_data;
 	*len = entry->len;
 
-	ntb_list_add(&qp->ntb_rx_free_q_lock, &entry->entry, &qp->rx_free_q);
+	ntb_list_add(&qp->ntb_rx_q_lock, &entry->entry, &qp->rx_free_q);
 
 	return buf;
 }
@@ -1757,15 +1785,18 @@
 	if (!qp)
 		return -EINVAL;
 
-	entry = ntb_list_rm(&qp->ntb_rx_free_q_lock, &qp->rx_free_q);
+	entry = ntb_list_rm(&qp->ntb_rx_q_lock, &qp->rx_free_q);
 	if (!entry)
 		return -ENOMEM;
 
 	entry->cb_data = cb;
 	entry->buf = data;
 	entry->len = len;
+	entry->flags = 0;
 
-	ntb_list_add(&qp->ntb_rx_pend_q_lock, &entry->entry, &qp->rx_pend_q);
+	ntb_list_add(&qp->ntb_rx_q_lock, &entry->entry, &qp->rx_pend_q);
+
+	tasklet_schedule(&qp->rxc_db_work);
 
 	return 0;
 }