tipc: guarantee that group broadcast doesn't bypass group unicast
We need a mechanism guaranteeing that group unicasts sent out from a
socket are not bypassed by later sent broadcasts from the same socket.
We do this as follows:
- Each time a unicast is sent, we set a the broadcast method for the
socket to "replicast" and "mandatory". This forces the first
subsequent broadcast message to follow the same network and data path
as the preceding unicast to a destination, hence preventing it from
overtaking the latter.
- In order to make the 'same data path' statement above true, we let
group unicasts pass through the multicast link input queue, instead
of as previously through the unicast link input queue.
- In the first broadcast following a unicast, we set a new header flag,
requiring all recipients to immediately acknowledge its reception.
- During the period before all the expected acknowledges are received,
the socket refuses to accept any more broadcast attempts, i.e., by
blocking or returning EAGAIN. This period should typically not be
longer than a few microseconds.
- When all acknowledges have been received, the sending socket will
open up for subsequent broadcasts, this time giving the link layer
freedom to itself select the best transmission method.
- The forced and/or abrupt transmission method changes described above
may lead to broadcasts arriving out of order to the recipients. We
remedy this by introducing code that checks and if necessary
re-orders such messages at the receiving end.
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 3276b7a..b1f1c3c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -831,6 +831,7 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
u32 dnode, u32 dport, int dlen)
{
u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
+ struct tipc_mc_method *method = &tsk->mc_method;
int blks = tsk_blocks(GROUP_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
struct sk_buff_head pkts;
@@ -857,9 +858,12 @@ static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
tsk->cong_link_cnt++;
}
- /* Update send window and sequence number */
+ /* Update send window */
tipc_group_update_member(mb, blks);
+ /* A broadcast sent within next EXPIRE period must follow same path */
+ method->rcast = true;
+ method->mandatory = true;
return dlen;
}
@@ -1008,6 +1012,7 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
struct tipc_group *grp = tsk->group;
struct tipc_nlist *dsts = tipc_group_dests(grp);
struct tipc_mc_method *method = &tsk->mc_method;
+ bool ack = method->mandatory && method->rcast;
int blks = tsk_blocks(MCAST_H_SIZE + dlen);
struct tipc_msg *hdr = &tsk->phdr;
int mtu = tipc_bcast_get_mtu(net);
@@ -1036,6 +1041,9 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
msg_set_destnode(hdr, 0);
msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
+ /* Avoid getting stuck with repeated forced replicasts */
+ msg_set_grp_bc_ack_req(hdr, ack);
+
/* Build message as chain of buffers */
skb_queue_head_init(&pkts);
rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
@@ -1043,13 +1051,17 @@ static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
return rc;
/* Send message */
- rc = tipc_mcast_xmit(net, &pkts, method, dsts,
- &tsk->cong_link_cnt);
+ rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
if (unlikely(rc))
return rc;
/* Update broadcast sequence number and send windows */
- tipc_group_update_bc_members(tsk->group, blks);
+ tipc_group_update_bc_members(tsk->group, blks, ack);
+
+ /* Broadcast link is now free to choose method for next broadcast */
+ method->mandatory = false;
+ method->expires = jiffies;
+
return dlen;
}
@@ -1113,7 +1125,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
u32 portid, oport, onode;
struct list_head dports;
struct tipc_msg *msg;
- int hsz;
+ int user, mtyp, hsz;
__skb_queue_head_init(&tmpq);
INIT_LIST_HEAD(&dports);
@@ -1121,6 +1133,18 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
skb = tipc_skb_peek(arrvq, &inputq->lock);
for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
msg = buf_msg(skb);
+ user = msg_user(msg);
+ mtyp = msg_type(msg);
+ if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
+ spin_lock_bh(&inputq->lock);
+ if (skb_peek(arrvq) == skb) {
+ __skb_dequeue(arrvq);
+ __skb_queue_tail(inputq, skb);
+ }
+ refcount_dec(&skb->users);
+ spin_unlock_bh(&inputq->lock);
+ continue;
+ }
hsz = skb_headroom(skb) + msg_hdr_sz(msg);
oport = msg_origport(msg);
onode = msg_orignode(msg);