tipc: clean up handling of link congestion

After the recent changes in message importance handling it becomes
possible to simplify handling of messages and sockets when we
encounter link congestion.

We merge the function tipc_link_cong() into link_schedule_user(),
and simplify the code of the latter. The code should now be
easier to follow, especially regarding return codes and handling
of the message that caused the situation.

In case the scheduling function is unable to pre-allocate a wakeup
message buffer, it now returns -ENOBUFS, which is a more correct
code than the previously used -EHOSTUNREACH.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/link.c b/net/tipc/link.c
index b9325a1..58e2460 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -367,28 +367,43 @@
 }
 
 /**
- * link_schedule_user - schedule user for wakeup after congestion
+ * link_schedule_user - schedule a message sender for wakeup after congestion
  * @link: congested link
- * @oport: sending port
- * @chain_sz: size of buffer chain that was attempted sent
- * @imp: importance of message attempted sent
+ * @list: message that was attempted sent
  * Create pseudo msg to send back to user when congestion abates
+ * Only consumes message if there is an error
  */
-static bool link_schedule_user(struct tipc_link *link, u32 oport,
-			       uint chain_sz, uint imp)
+static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
 {
-	struct sk_buff *buf;
+	struct tipc_msg *msg = buf_msg(skb_peek(list));
+	int imp = msg_importance(msg);
+	u32 oport = msg_origport(msg);
+	u32 addr = link_own_addr(link);
+	struct sk_buff *skb;
 
-	buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
-			      link_own_addr(link), link_own_addr(link),
-			      oport, 0, 0);
-	if (!buf)
-		return false;
-	TIPC_SKB_CB(buf)->chain_sz = chain_sz;
-	TIPC_SKB_CB(buf)->chain_imp = imp;
-	skb_queue_tail(&link->wakeupq, buf);
+	/* This really cannot happen...  */
+	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
+		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
+		tipc_link_reset(link);
+		goto err;
+	}
+	/* Non-blocking sender: */
+	if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
+		return -ELINKCONG;
+
+	/* Create and schedule wakeup pseudo message */
+	skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
+			      addr, addr, oport, 0, 0);
+	if (!skb)
+		goto err;
+	TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
+	TIPC_SKB_CB(skb)->chain_imp = imp;
+	skb_queue_tail(&link->wakeupq, skb);
 	link->stats.link_congs++;
-	return true;
+	return -ELINKCONG;
+err:
+	__skb_queue_purge(list);
+	return -ENOBUFS;
 }
 
 /**
@@ -708,48 +723,15 @@
 	}
 }
 
-/* tipc_link_cong: determine return value and how to treat the
- * sent buffer during link congestion.
- * - For plain, errorless user data messages we keep the buffer and
- *   return -ELINKONG.
- * - For all other messages we discard the buffer and return -EHOSTUNREACH
- * - For TIPC internal messages we also reset the link
- */
-static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
-{
-	struct sk_buff *skb = skb_peek(list);
-	struct tipc_msg *msg = buf_msg(skb);
-	int imp = msg_importance(msg);
-	u32 oport = msg_tot_origport(msg);
-
-	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
-		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
-		tipc_link_reset(link);
-		goto drop;
-	}
-	if (unlikely(msg_errcode(msg)))
-		goto drop;
-	if (unlikely(msg_reroute_cnt(msg)))
-		goto drop;
-	if (TIPC_SKB_CB(skb)->wakeup_pending)
-		return -ELINKCONG;
-	if (link_schedule_user(link, oport, skb_queue_len(list), imp))
-		return -ELINKCONG;
-drop:
-	__skb_queue_purge(list);
-	return -EHOSTUNREACH;
-}
-
 /**
  * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
  * @link: link to use
  * @list: chain of buffers containing message
  *
- * Consumes the buffer chain, except when returning -ELINKCONG
- * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
- * user data messages) or -EHOSTUNREACH (all other messages/senders)
- * Only the socket functions tipc_send_stream() and tipc_send_packet() need
- * to act on the return value, since they may need to do more send attempts.
+ * Consumes the buffer chain, except when returning -ELINKCONG,
+ * since the caller then may want to make more send attempts.
+ * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
+ * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
  */
 int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 		     struct sk_buff_head *list)
@@ -768,7 +750,7 @@
 
 	/* Match backlog limit against msg importance: */
 	if (unlikely(link->backlog[imp].len >= link->backlog[imp].limit))
-		return tipc_link_cong(link, list);
+		return link_schedule_user(link, list);
 
 	if (unlikely(msg_size(msg) > mtu)) {
 		__skb_queue_purge(list);
@@ -820,13 +802,25 @@
 	return __tipc_link_xmit(link->owner->net, link, &head);
 }
 
+/* tipc_link_xmit_skb(): send single buffer to destination
+ * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
+ * messages, which will not be rejected
+ * The only exception is datagram messages rerouted after secondary
+ * lookup, which are rare and safe to dispose of anyway.
+ * TODO: Return real return value, and let callers use
+ * tipc_wait_for_sendpkt() where applicable
+ */
 int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
 		       u32 selector)
 {
 	struct sk_buff_head head;
+	int rc;
 
 	skb2list(skb, &head);
-	return tipc_link_xmit(net, &head, dnode, selector);
+	rc = tipc_link_xmit(net, &head, dnode, selector);
+	if (rc == -ELINKCONG)
+		kfree_skb(skb);
+	return 0;
 }
 
 /**