tipc: make name table distributor use new send function

In a previous commit series ("tipc: new unicast transmission code")
we introduced a new message sending function, tipc_link_xmit2(),
and moved the unicast data users over to use that function. We now
let the internal name table distributor do the same.

The interaction between the name distributor and the node/link
layer also becomes significantly simpler, so we can eliminate
the function tipc_link_names_xmit().

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 8ce7309..d16f947 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -101,24 +101,22 @@
 
 void named_cluster_distribute(struct sk_buff *buf)
 {
-	struct sk_buff *buf_copy;
-	struct tipc_node *n_ptr;
-	struct tipc_link *l_ptr;
+	struct sk_buff *obuf;
+	struct tipc_node *node;
+	u32 dnode;
 
 	rcu_read_lock();
-	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
-		tipc_node_lock(n_ptr);
-		l_ptr = n_ptr->active_links[n_ptr->addr & 1];
-		if (l_ptr) {
-			buf_copy = skb_copy(buf, GFP_ATOMIC);
-			if (!buf_copy) {
-				tipc_node_unlock(n_ptr);
-				break;
-			}
-			msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
-			__tipc_link_xmit(l_ptr, buf_copy);
-		}
-		tipc_node_unlock(n_ptr);
+	list_for_each_entry_rcu(node, &tipc_node_list, list) {
+		dnode = node->addr;
+		if (in_own_node(dnode))
+			continue;
+		if (!tipc_node_active_links(node))
+			continue;
+		obuf = skb_copy(buf, GFP_ATOMIC);
+		if (!obuf)
+			break;
+		msg_set_destnode(buf_msg(obuf), dnode);
+		tipc_link_xmit2(obuf, dnode, dnode);
 	}
 	rcu_read_unlock();
 
@@ -175,34 +173,44 @@
 	return buf;
 }
 
-/*
+/**
  * named_distribute - prepare name info for bulk distribution to another node
+ * @msg_list: list of messages (buffers) to be returned from this function
+ * @dnode: node to be updated
+ * @pls: linked list of publication items to be packed into buffer chain
  */
-static void named_distribute(struct list_head *message_list, u32 node,
-			     struct publ_list *pls, u32 max_item_buf)
+static void named_distribute(struct list_head *msg_list, u32 dnode,
+			     struct publ_list *pls)
 {
 	struct publication *publ;
 	struct sk_buff *buf = NULL;
 	struct distr_item *item = NULL;
-	u32 left = 0;
-	u32 rest = pls->size * ITEM_SIZE;
+	uint dsz = pls->size * ITEM_SIZE;
+	uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
+	uint rem = dsz;
+	uint msg_rem = 0;
 
 	list_for_each_entry(publ, &pls->list, local_list) {
+		/* Prepare next buffer: */
 		if (!buf) {
-			left = (rest <= max_item_buf) ? rest : max_item_buf;
-			rest -= left;
-			buf = named_prepare_buf(PUBLICATION, left, node);
+			msg_rem = min_t(uint, rem, msg_dsz);
+			rem -= msg_rem;
+			buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
 			if (!buf) {
 				pr_warn("Bulk publication failure\n");
 				return;
 			}
 			item = (struct distr_item *)msg_data(buf_msg(buf));
 		}
+
+		/* Pack publication into message: */
 		publ_to_item(item, publ);
 		item++;
-		left -= ITEM_SIZE;
-		if (!left) {
-			list_add_tail((struct list_head *)buf, message_list);
+		msg_rem -= ITEM_SIZE;
+
+		/* Append full buffer to list: */
+		if (!msg_rem) {
+			list_add_tail((struct list_head *)buf, msg_list);
 			buf = NULL;
 		}
 	}
@@ -211,16 +219,20 @@
 /**
  * tipc_named_node_up - tell specified node about all publications by this node
  */
-void tipc_named_node_up(u32 max_item_buf, u32 node)
+void tipc_named_node_up(u32 dnode)
 {
-	LIST_HEAD(message_list);
+	LIST_HEAD(msg_list);
+	struct sk_buff *buf_chain;
 
 	read_lock_bh(&tipc_nametbl_lock);
-	named_distribute(&message_list, node, &publ_cluster, max_item_buf);
-	named_distribute(&message_list, node, &publ_zone, max_item_buf);
+	named_distribute(&msg_list, dnode, &publ_cluster);
+	named_distribute(&msg_list, dnode, &publ_zone);
 	read_unlock_bh(&tipc_nametbl_lock);
 
-	tipc_link_names_xmit(&message_list, node);
+	/* Convert circular list to linear list and send: */
+	buf_chain = (struct sk_buff *)msg_list.next;
+	((struct sk_buff *)msg_list.prev)->next = NULL;
+	tipc_link_xmit2(buf_chain, dnode, dnode);
 }
 
 /**