tipc: improve message bundling algorithm

As mentioned in commit e95584a889e1 ("tipc: fix unlimited bundling of
small messages"), the current message bundling algorithm is inefficient
that can generate bundles of only one payload message, that causes
unnecessary overheads for both the sender and receiver.

This commit re-designs the 'tipc_msg_make_bundle()' function (now named
as 'tipc_msg_try_bundle()'), so that when a message comes at the first
place, we will just check & keep a reference to it if the message is
suitable for bundling. The message buffer will be put into the link
backlog queue and processed as normal. Later on, when another one comes
we will make a bundle with the first message if possible and so on...
This way, a bundle if really needed will always consist of at least two
payload messages. Otherwise, we let the first buffer go its way without
any need of bundling, so reduce the overheads to zero.

Moreover, since now we have both the messages in hand, we can even
optimize the 'tipc_msg_bundle()' function, make bundle of a very large
(size ~ MSS) and small messages which is not with the current algorithm
e.g. [1400-byte message] + [10-byte message] (MTU = 1500).

Acked-by: Ying Xue <ying.xue@windreiver.com>
Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 7d7a6617..038861ba 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -940,16 +940,17 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
 		   struct sk_buff_head *xmitq)
 {
 	struct tipc_msg *hdr = buf_msg(skb_peek(list));
-	unsigned int maxwin = l->window;
-	int imp = msg_importance(hdr);
-	unsigned int mtu = l->mtu;
+	struct sk_buff_head *backlogq = &l->backlogq;
+	struct sk_buff_head *transmq = &l->transmq;
+	struct sk_buff *skb, *_skb;
+	u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
 	u16 ack = l->rcv_nxt - 1;
 	u16 seqno = l->snd_nxt;
-	u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
-	struct sk_buff_head *transmq = &l->transmq;
-	struct sk_buff_head *backlogq = &l->backlogq;
-	struct sk_buff *skb, *_skb, **tskb;
 	int pkt_cnt = skb_queue_len(list);
+	int imp = msg_importance(hdr);
+	unsigned int maxwin = l->window;
+	unsigned int mtu = l->mtu;
+	bool new_bundle;
 	int rc = 0;
 
 	if (unlikely(msg_size(hdr) > mtu)) {
@@ -975,20 +976,18 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
 	}
 
 	/* Prepare each packet for sending, and add to relevant queue: */
-	while (skb_queue_len(list)) {
-		skb = skb_peek(list);
-		hdr = buf_msg(skb);
-		msg_set_seqno(hdr, seqno);
-		msg_set_ack(hdr, ack);
-		msg_set_bcast_ack(hdr, bc_ack);
-
+	while ((skb = __skb_dequeue(list))) {
 		if (likely(skb_queue_len(transmq) < maxwin)) {
+			hdr = buf_msg(skb);
+			msg_set_seqno(hdr, seqno);
+			msg_set_ack(hdr, ack);
+			msg_set_bcast_ack(hdr, bc_ack);
 			_skb = skb_clone(skb, GFP_ATOMIC);
 			if (!_skb) {
+				kfree_skb(skb);
 				__skb_queue_purge(list);
 				return -ENOBUFS;
 			}
-			__skb_dequeue(list);
 			__skb_queue_tail(transmq, skb);
 			/* next retransmit attempt */
 			if (link_is_bc_sndlink(l))
@@ -1000,22 +999,26 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
 			seqno++;
 			continue;
 		}
-		tskb = &l->backlog[imp].target_bskb;
-		if (tipc_msg_bundle(*tskb, hdr, mtu)) {
-			kfree_skb(__skb_dequeue(list));
-			l->stats.sent_bundled++;
-			continue;
-		}
-		if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) {
-			kfree_skb(__skb_dequeue(list));
-			__skb_queue_tail(backlogq, *tskb);
-			l->backlog[imp].len++;
-			l->stats.sent_bundled++;
-			l->stats.sent_bundles++;
+		if (tipc_msg_try_bundle(l->backlog[imp].target_bskb, &skb,
+					mtu - INT_H_SIZE, l->addr,
+					&new_bundle)) {
+			if (skb) {
+				/* Keep a ref. to the skb for next try */
+				l->backlog[imp].target_bskb = skb;
+				l->backlog[imp].len++;
+				__skb_queue_tail(backlogq, skb);
+			} else {
+				if (new_bundle) {
+					l->stats.sent_bundles++;
+					l->stats.sent_bundled++;
+				}
+				l->stats.sent_bundled++;
+			}
 			continue;
 		}
 		l->backlog[imp].target_bskb = NULL;
-		l->backlog[imp].len += skb_queue_len(list);
+		l->backlog[imp].len += (1 + skb_queue_len(list));
+		__skb_queue_tail(backlogq, skb);
 		skb_queue_splice_tail_init(list, backlogq);
 	}
 	l->snd_nxt = seqno;