tipc: improve groupcast scope handling

When a member joins a group, it also indicates a binding scope. This
makes it possible to create both node local groups, invisible to other
nodes, as well as cluster global groups, visible everywhere.

In order to avoid that different members end up having permanently
differing views of group size and memberhip, we must inhibit locally
and globally bound members from joining the same group.

We do this by using the binding scope as an additional separator between
groups. I.e., a member must ignore all membership events from sockets
using a different scope than itself, and all lookups for message
destinations must require an exact match between the message's lookup
scope and the potential target's binding scope.

Apart from making it possible to create local groups using the same
identity on different nodes, a side effect of this is that it now also
becomes possible to create a cluster global group with the same identity
across the same nodes, without interfering with the local groups.

Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/group.c b/net/tipc/group.c
index cf996bd..1908773 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -87,7 +87,6 @@ struct tipc_group {
 	int subid;
 	u32 type;
 	u32 instance;
-	u32 domain;
 	u32 scope;
 	u32 portid;
 	u16 member_cnt;
@@ -158,6 +157,8 @@ int tipc_group_size(struct tipc_group *grp)
 struct tipc_group *tipc_group_create(struct net *net, u32 portid,
 				     struct tipc_group_req *mreq)
 {
+	u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS;
+	bool global = mreq->scope != TIPC_NODE_SCOPE;
 	struct tipc_group *grp;
 	u32 type = mreq->type;
 
@@ -171,15 +172,14 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
 	grp->members = RB_ROOT;
 	grp->net = net;
 	grp->portid = portid;
-	grp->domain = addr_domain(net, mreq->scope);
 	grp->type = type;
 	grp->instance = mreq->instance;
 	grp->scope = mreq->scope;
 	grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
 	grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
-	if (tipc_topsrv_kern_subscr(net, portid, type,
-				    TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS,
-				    0, ~0, &grp->subid))
+	filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE;
+	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0,
+				    filter, &grp->subid))
 		return grp;
 	kfree(grp);
 	return NULL;
@@ -732,6 +732,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
 	if (!grp)
 		return;
 
+	if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net))
+		return;
+
 	m = tipc_group_find_member(grp, node, port);
 
 	switch (msg_type(hdr)) {
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 60af9885..64cdd3c 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -328,7 +328,8 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
 	list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
 		tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
 					    TIPC_PUBLISHED, publ->ref,
-					    publ->node, created_subseq);
+					    publ->node, publ->scope,
+					    created_subseq);
 	}
 	return publ;
 }
@@ -398,7 +399,8 @@ static struct publication *tipc_nameseq_remove_publ(struct net *net,
 	list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
 		tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
 					    TIPC_WITHDRAWN, publ->ref,
-					    publ->node, removed_subseq);
+					    publ->node, publ->scope,
+					    removed_subseq);
 	}
 
 	return publ;
@@ -435,6 +437,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
 							    sseq->upper,
 							    TIPC_PUBLISHED,
 							    crs->ref, crs->node,
+							    crs->scope,
 							    must_report);
 				must_report = 0;
 			}
@@ -598,7 +601,7 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance,
 	return ref;
 }
 
-bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
+bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
 			 struct list_head *dsts, int *dstcnt, u32 exclude,
 			 bool all)
 {
@@ -608,9 +611,6 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
 	struct name_seq *seq;
 	struct sub_seq *sseq;
 
-	if (!tipc_in_scope(domain, self))
-		return false;
-
 	*dstcnt = 0;
 	rcu_read_lock();
 	seq = nametbl_find_seq(net, type);
@@ -621,7 +621,7 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
 	if (likely(sseq)) {
 		info = sseq->info;
 		list_for_each_entry(publ, &info->zone_list, zone_list) {
-			if (!tipc_in_scope(domain, publ->node))
+			if (publ->scope != scope)
 				continue;
 			if (publ->ref == exclude && publ->node == self)
 				continue;
@@ -639,13 +639,14 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
 	return !list_empty(dsts);
 }
 
-int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
-			      u32 limit, struct list_head *dports)
+int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
+			   u32 scope, bool exact, struct list_head *dports)
 {
-	struct name_seq *seq;
-	struct sub_seq *sseq;
 	struct sub_seq *sseq_stop;
 	struct name_info *info;
+	struct publication *p;
+	struct name_seq *seq;
+	struct sub_seq *sseq;
 	int res = 0;
 
 	rcu_read_lock();
@@ -657,15 +658,12 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
 	sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
 	sseq_stop = seq->sseqs + seq->first_free;
 	for (; sseq != sseq_stop; sseq++) {
-		struct publication *publ;
-
 		if (sseq->lower > upper)
 			break;
-
 		info = sseq->info;
-		list_for_each_entry(publ, &info->node_list, node_list) {
-			if (publ->scope <= limit)
-				tipc_dest_push(dports, 0, publ->ref);
+		list_for_each_entry(p, &info->node_list, node_list) {
+			if (p->scope == scope || (!exact && p->scope < scope))
+				tipc_dest_push(dports, 0, p->ref);
 		}
 
 		if (info->cluster_list_size != info->node_list_size)
@@ -682,7 +680,7 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
  * - Determines if any node local ports overlap
  */
 void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
-				   u32 upper, u32 domain,
+				   u32 upper, u32 scope,
 				   struct tipc_nlist *nodes)
 {
 	struct sub_seq *sseq, *stop;
@@ -701,7 +699,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
 	for (; sseq != stop && sseq->lower <= upper; sseq++) {
 		info = sseq->info;
 		list_for_each_entry(publ, &info->zone_list, zone_list) {
-			if (tipc_in_scope(domain, publ->node))
+			if (publ->scope == scope)
 				tipc_nlist_add(nodes, publ->node);
 		}
 	}
@@ -713,7 +711,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
 /* tipc_nametbl_build_group - build list of communication group members
  */
 void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
-			      u32 type, u32 domain)
+			      u32 type, u32 scope)
 {
 	struct sub_seq *sseq, *stop;
 	struct name_info *info;
@@ -731,7 +729,7 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
 	for (; sseq != stop; sseq++) {
 		info = sseq->info;
 		list_for_each_entry(p, &info->zone_list, zone_list) {
-			if (!tipc_in_scope(domain, p->node))
+			if (p->scope != scope)
 				continue;
 			tipc_group_add_member(grp, p->node, p->ref, p->lower);
 		}
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 73a148c..b595d8a 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -100,8 +100,8 @@ struct name_table {
 int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
 
 u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
-int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
-			      u32 limit, struct list_head *dports);
+int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
+			   u32 scope, bool exact, struct list_head *dports);
 void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
 			      u32 type, u32 domain);
 void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 950c54c..8ee5e86 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -489,8 +489,8 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
 	}
 }
 
-bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
-			     u32 filter, u32 lower, u32 upper, int *conid)
+bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
+			     u32 upper, u32 filter, int *conid)
 {
 	struct tipc_subscriber *scbr;
 	struct tipc_subscr sub;
diff --git a/net/tipc/server.h b/net/tipc/server.h
index ea1effb..17f49ee 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -41,6 +41,8 @@
 #include <net/net_namespace.h>
 
 #define TIPC_SERVER_NAME_LEN	32
+#define TIPC_SUB_CLUSTER_SCOPE  0x20
+#define TIPC_SUB_NODE_SCOPE     0x40
 #define TIPC_SUB_NO_STATUS      0x80
 
 /**
@@ -84,8 +86,8 @@ struct tipc_server {
 int tipc_conn_sendmsg(struct tipc_server *s, int conid,
 		      struct sockaddr_tipc *addr, void *data, size_t len);
 
-bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
-			     u32 filter, u32 lower, u32 upper, int *conid);
+bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
+			     u32 upper, u32 filter, int *conid);
 void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
 
 /**
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index e3a02f1f..b24dab3 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -928,21 +928,22 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
 	struct list_head *cong_links = &tsk->cong_links;
 	int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 	struct tipc_group *grp = tsk->group;
+	struct tipc_msg *hdr = &tsk->phdr;
 	struct tipc_member *first = NULL;
 	struct tipc_member *mbr = NULL;
 	struct net *net = sock_net(sk);
 	u32 node, port, exclude;
-	u32 type, inst, domain;
 	struct list_head dsts;
+	u32 type, inst, scope;
 	int lookups = 0;
 	int dstcnt, rc;
 	bool cong;
 
 	INIT_LIST_HEAD(&dsts);
 
-	type = dest->addr.name.name.type;
+	type = msg_nametype(hdr);
 	inst = dest->addr.name.name.instance;
-	domain = addr_domain(net, dest->scope);
+	scope = msg_lookup_scope(hdr);
 	exclude = tipc_group_exclude(grp);
 
 	while (++lookups < 4) {
@@ -950,7 +951,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
 
 		/* Look for a non-congested destination member, if any */
 		while (1) {
-			if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts,
+			if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
 						 &dstcnt, exclude, false))
 				return -EHOSTUNREACH;
 			tipc_dest_pop(&dsts, &node, &port);
@@ -1079,22 +1080,23 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
 {
 	struct sock *sk = sock->sk;
 	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-	struct tipc_name_seq *seq = &dest->addr.nameseq;
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct tipc_group *grp = tsk->group;
+	struct tipc_msg *hdr = &tsk->phdr;
 	struct net *net = sock_net(sk);
-	u32 domain, exclude, dstcnt;
+	u32 type, inst, scope, exclude;
 	struct list_head dsts;
+	u32 dstcnt;
 
 	INIT_LIST_HEAD(&dsts);
 
-	if (seq->lower != seq->upper)
-		return -ENOTSUPP;
-
-	domain = addr_domain(net, dest->scope);
+	type = msg_nametype(hdr);
+	inst = dest->addr.name.name.instance;
+	scope = msg_lookup_scope(hdr);
 	exclude = tipc_group_exclude(grp);
-	if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain,
-				 &dsts, &dstcnt, exclude, true))
+
+	if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
+				 &dstcnt, exclude, true))
 		return -EHOSTUNREACH;
 
 	if (dstcnt == 1) {
@@ -1116,24 +1118,29 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 		       struct sk_buff_head *inputq)
 {
-	u32 scope = TIPC_CLUSTER_SCOPE;
 	u32 self = tipc_own_addr(net);
+	u32 type, lower, upper, scope;
 	struct sk_buff *skb, *_skb;
-	u32 lower = 0, upper = ~0;
-	struct sk_buff_head tmpq;
 	u32 portid, oport, onode;
+	struct sk_buff_head tmpq;
 	struct list_head dports;
-	struct tipc_msg *msg;
-	int user, mtyp, hsz;
+	struct tipc_msg *hdr;
+	int user, mtyp, hlen;
+	bool exact;
 
 	__skb_queue_head_init(&tmpq);
 	INIT_LIST_HEAD(&dports);
 
 	skb = tipc_skb_peek(arrvq, &inputq->lock);
 	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
-		msg = buf_msg(skb);
-		user = msg_user(msg);
-		mtyp = msg_type(msg);
+		hdr = buf_msg(skb);
+		user = msg_user(hdr);
+		mtyp = msg_type(hdr);
+		hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
+		oport = msg_origport(hdr);
+		onode = msg_orignode(hdr);
+		type = msg_nametype(hdr);
+
 		if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
 			spin_lock_bh(&inputq->lock);
 			if (skb_peek(arrvq) == skb) {
@@ -1144,21 +1151,31 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 			spin_unlock_bh(&inputq->lock);
 			continue;
 		}
-		hsz = skb_headroom(skb) + msg_hdr_sz(msg);
-		oport = msg_origport(msg);
-		onode = msg_orignode(msg);
-		if (onode == self)
-			scope = TIPC_NODE_SCOPE;
 
-		/* Create destination port list and message clones: */
-		if (!msg_in_group(msg)) {
-			lower = msg_namelower(msg);
-			upper = msg_nameupper(msg);
+		/* Group messages require exact scope match */
+		if (msg_in_group(hdr)) {
+			lower = 0;
+			upper = ~0;
+			scope = msg_lookup_scope(hdr);
+			exact = true;
+		} else {
+			/* TIPC_NODE_SCOPE means "any scope" in this context */
+			if (onode == self)
+				scope = TIPC_NODE_SCOPE;
+			else
+				scope = TIPC_CLUSTER_SCOPE;
+			exact = false;
+			lower = msg_namelower(hdr);
+			upper = msg_nameupper(hdr);
 		}
-		tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper,
-					  scope, &dports);
+
+		/* Create destination port list: */
+		tipc_nametbl_mc_lookup(net, type, lower, upper,
+				       scope, exact, &dports);
+
+		/* Clone message per destination */
 		while (tipc_dest_pop(&dports, NULL, &portid)) {
-			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
+			_skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
 			if (_skb) {
 				msg_set_destport(buf_msg(_skb), portid);
 				__skb_queue_tail(&tmpq, _skb);
@@ -2731,7 +2748,6 @@ void tipc_sk_rht_destroy(struct net *net)
 static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
 {
 	struct net *net = sock_net(&tsk->sk);
-	u32 domain = addr_domain(net, mreq->scope);
 	struct tipc_group *grp = tsk->group;
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct tipc_name_seq seq;
@@ -2739,6 +2755,8 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
 
 	if (mreq->type < TIPC_RESERVED_TYPES)
 		return -EACCES;
+	if (mreq->scope > TIPC_NODE_SCOPE)
+		return -EINVAL;
 	if (grp)
 		return -EACCES;
 	grp = tipc_group_create(net, tsk->portid, mreq);
@@ -2751,7 +2769,7 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
 	seq.type = mreq->type;
 	seq.lower = mreq->instance;
 	seq.upper = seq.lower;
-	tipc_nametbl_build_group(net, grp, mreq->type, domain);
+	tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope);
 	rc = tipc_sk_publish(tsk, mreq->scope, &seq);
 	if (rc) {
 		tipc_group_delete(net, grp);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 1052341..44df528 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -118,15 +118,19 @@ void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
 
 void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
 				 u32 found_upper, u32 event, u32 port_ref,
-				 u32 node, int must)
+				 u32 node, u32 scope, int must)
 {
+	u32 filter = htohl(sub->evt.s.filter, sub->swap);
 	struct tipc_name_seq seq;
 
 	tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq);
 	if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper))
 		return;
-	if (!must &&
-	    !(htohl(sub->evt.s.filter, sub->swap) & TIPC_SUB_PORTS))
+	if (!must && !(filter & TIPC_SUB_PORTS))
+		return;
+	if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE)
+		return;
+	if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
 		return;
 
 	tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index ee52957..f3edca7 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -71,7 +71,7 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
 			       u32 found_upper);
 void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
 				 u32 found_lower, u32 found_upper, u32 event,
-				 u32 port_ref, u32 node, int must);
+				 u32 port_ref, u32 node, u32 scope, int must);
 void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
 			      struct tipc_name_seq *out);
 u32 tipc_subscrp_convert_seq_type(u32 type, int swap);