tipc: improve TIPC throughput by Gap ACK blocks

During unicast link transmission, it's observed very often that because
of one or a few lost/dis-ordered packets, the sending side will fastly
reach the send window limit and must wait for the packets to be arrived
at the receiving side or in the worst case, a retransmission must be
done first. The sending side cannot release a lot of subsequent packets
in its transmq even though all of them might have already been received
by the receiving side.
That is, one or two packets dis-ordered/lost and dozens of packets have
to wait, this obviously reduces the overall throughput!

This commit introduces an algorithm to overcome this by using "Gap ACK
blocks". Basically, a Gap ACK block will consist of <ack, gap> numbers
that describes the link deferdq where packets have been got by the
receiving side but with gaps, for example:

      link deferdq: [1 2 3 4      10 11      13 14 15       20]
--> Gap ACK blocks:       <4, 5>,   <11, 1>,      <15, 4>, <20, 0>

The Gap ACK blocks will be sent to the sending side along with the
traditional ACK or NACK message. Immediately when receiving the message
the sending side will now not only release from its transmq the packets
ack-ed by the ACK but also by the Gap ACK blocks! So, more packets can
be enqueued and transmitted.
In addition, the sending side can now do "multi-retransmissions"
according to the Gaps reported in the Gap ACK blocks.

The new algorithm as verified helps greatly improve the TIPC throughput
especially under packet loss condition.

So far, a maximum of 32 blocks is quite enough without any "Too few Gap
ACK blocks" reports with a 5.0% packet loss rate, however this number
can be increased in the furture if needed.

Also, the patch is backward compatible.

Acked-by: Ying Xue <ying.xue@windriver.com>
Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 52d23b3..5aee1ed 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -246,6 +246,10 @@ static int tipc_link_build_nack_msg(struct tipc_link *l,
 static void tipc_link_build_bc_init_msg(struct tipc_link *l,
 					struct sk_buff_head *xmitq);
 static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
+static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data);
+static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
+				      struct tipc_gap_ack_blks *ga,
+				      struct sk_buff_head *xmitq);
 
 /*
  *  Simple non-static link routines (i.e. referenced outside this file)
@@ -1226,6 +1230,102 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
 	return released;
 }
 
+/* tipc_build_gap_ack_blks - build Gap ACK blocks
+ * @l: tipc link that data have come with gaps in sequence if any
+ * @data: data buffer to store the Gap ACK blocks after built
+ *
+ * returns the actual allocated memory size
+ */
+static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void *data)
+{
+	struct sk_buff *skb = skb_peek(&l->deferdq);
+	struct tipc_gap_ack_blks *ga = data;
+	u16 len, expect, seqno = 0;
+	u8 n = 0;
+
+	if (!skb)
+		goto exit;
+
+	expect = buf_seqno(skb);
+	skb_queue_walk(&l->deferdq, skb) {
+		seqno = buf_seqno(skb);
+		if (unlikely(more(seqno, expect))) {
+			ga->gacks[n].ack = htons(expect - 1);
+			ga->gacks[n].gap = htons(seqno - expect);
+			if (++n >= MAX_GAP_ACK_BLKS) {
+				pr_info_ratelimited("Too few Gap ACK blocks!\n");
+				goto exit;
+			}
+		} else if (unlikely(less(seqno, expect))) {
+			pr_warn("Unexpected skb in deferdq!\n");
+			continue;
+		}
+		expect = seqno + 1;
+	}
+
+	/* last block */
+	ga->gacks[n].ack = htons(seqno);
+	ga->gacks[n].gap = 0;
+	n++;
+
+exit:
+	len = tipc_gap_ack_blks_sz(n);
+	ga->len = htons(len);
+	ga->gack_cnt = n;
+	return len;
+}
+
+/* tipc_link_advance_transmq - advance TIPC link transmq queue by releasing
+ *			       acked packets, also doing retransmissions if
+ *			       gaps found
+ * @l: tipc link with transmq queue to be advanced
+ * @acked: seqno of last packet acked by peer without any gaps before
+ * @gap: # of gap packets
+ * @ga: buffer pointer to Gap ACK blocks from peer
+ * @xmitq: queue for accumulating the retransmitted packets if any
+ */
+static void tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
+				      struct tipc_gap_ack_blks *ga,
+				      struct sk_buff_head *xmitq)
+{
+	struct sk_buff *skb, *_skb, *tmp;
+	struct tipc_msg *hdr;
+	u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
+	u16 ack = l->rcv_nxt - 1;
+	u16 seqno;
+	u16 n = 0;
+
+	skb_queue_walk_safe(&l->transmq, skb, tmp) {
+		seqno = buf_seqno(skb);
+
+next_gap_ack:
+		if (less_eq(seqno, acked)) {
+			/* release skb */
+			__skb_unlink(skb, &l->transmq);
+			kfree_skb(skb);
+		} else if (less_eq(seqno, acked + gap)) {
+			/* retransmit skb */
+			_skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
+			if (!_skb)
+				continue;
+			hdr = buf_msg(_skb);
+			msg_set_ack(hdr, ack);
+			msg_set_bcast_ack(hdr, bc_ack);
+			_skb->priority = TC_PRIO_CONTROL;
+			__skb_queue_tail(xmitq, _skb);
+			l->stats.retransmitted++;
+		} else {
+			/* retry with Gap ACK blocks if any */
+			if (!ga || n >= ga->gack_cnt)
+				break;
+			acked = ntohs(ga->gacks[n].ack);
+			gap = ntohs(ga->gacks[n].gap);
+			n++;
+			goto next_gap_ack;
+		}
+	}
+}
+
 /* tipc_link_build_state_msg: prepare link state message for transmission
  *
  * Note that sending of broadcast ack is coordinated among nodes, to reduce
@@ -1378,6 +1478,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 	struct tipc_mon_state *mstate = &l->mon_state;
 	int dlen = 0;
 	void *data;
+	u16 glen = 0;
 
 	/* Don't send protocol message during reset or link failover */
 	if (tipc_link_is_blocked(l))
@@ -1390,8 +1491,8 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 		rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt;
 
 	skb = tipc_msg_create(LINK_PROTOCOL, mtyp, INT_H_SIZE,
-			      tipc_max_domain_size, l->addr,
-			      tipc_own_addr(l->net), 0, 0, 0);
+			      tipc_max_domain_size + MAX_GAP_ACK_BLKS_SZ,
+			      l->addr, tipc_own_addr(l->net), 0, 0, 0);
 	if (!skb)
 		return;
 
@@ -1418,9 +1519,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 		msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
 		msg_set_probe(hdr, probe);
 		msg_set_is_keepalive(hdr, probe || probe_reply);
-		tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
-		msg_set_size(hdr, INT_H_SIZE + dlen);
-		skb_trim(skb, INT_H_SIZE + dlen);
+		if (l->peer_caps & TIPC_GAP_ACK_BLOCK)
+			glen = tipc_build_gap_ack_blks(l, data);
+		tipc_mon_prep(l->net, data + glen, &dlen, mstate, l->bearer_id);
+		msg_set_size(hdr, INT_H_SIZE + glen + dlen);
+		skb_trim(skb, INT_H_SIZE + glen + dlen);
 		l->stats.sent_states++;
 		l->rcv_unacked = 0;
 	} else {
@@ -1590,6 +1693,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
 			       struct sk_buff_head *xmitq)
 {
 	struct tipc_msg *hdr = buf_msg(skb);
+	struct tipc_gap_ack_blks *ga = NULL;
 	u16 rcvgap = 0;
 	u16 ack = msg_ack(hdr);
 	u16 gap = msg_seq_gap(hdr);
@@ -1600,6 +1704,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
 	u16 dlen = msg_data_sz(hdr);
 	int mtyp = msg_type(hdr);
 	bool reply = msg_probe(hdr);
+	u16 glen = 0;
 	void *data;
 	char *if_name;
 	int rc = 0;
@@ -1697,7 +1802,17 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
 				rc = TIPC_LINK_UP_EVT;
 			break;
 		}
-		tipc_mon_rcv(l->net, data, dlen, l->addr,
+
+		/* Receive Gap ACK blocks from peer if any */
+		if (l->peer_caps & TIPC_GAP_ACK_BLOCK) {
+			ga = (struct tipc_gap_ack_blks *)data;
+			glen = ntohs(ga->len);
+			/* sanity check: if failed, ignore Gap ACK blocks */
+			if (glen != tipc_gap_ack_blks_sz(ga->gack_cnt))
+				ga = NULL;
+		}
+
+		tipc_mon_rcv(l->net, data + glen, dlen - glen, l->addr,
 			     &l->mon_state, l->bearer_id);
 
 		/* Send NACK if peer has sent pkts we haven't received yet */
@@ -1706,13 +1821,12 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
 		if (rcvgap || reply)
 			tipc_link_build_proto_msg(l, STATE_MSG, 0, reply,
 						  rcvgap, 0, 0, xmitq);
-		tipc_link_release_pkts(l, ack);
+
+		tipc_link_advance_transmq(l, ack, gap, ga, xmitq);
 
 		/* If NACK, retransmit will now start at right position */
-		if (gap) {
-			rc = tipc_link_retrans(l, l, ack + 1, ack + gap, xmitq);
+		if (gap)
 			l->stats.recv_nacks++;
-		}
 
 		tipc_link_advance_backlog(l, xmitq);
 		if (unlikely(!skb_queue_empty(&l->wakeupq)))