tipc: eliminate race condition at multicast reception
In a previous commit in this series we resolved a race problem during
unicast message reception.
Here, we resolve the same problem at multicast reception. We apply the
same technique: an input queue serializing the delivery of arriving
buffers. The main difference is that here we do it in two steps.
First, the broadcast link feeds arriving buffers into the tail of an
arrival queue, which head is consumed at the socket level, and where
destination lookup is performed. Second, if the lookup is successful,
the resulting buffer clones are fed into a second queue, the input
queue. This queue is consumed at reception in the socket just like
in the unicast case. Both queues are protected by the same lock, -the
one of the input queue.
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 26aec841..6666680 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -776,44 +776,60 @@
return rc;
}
-/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+/**
+ * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
+ * @arrvq: queue with arriving messages, to be cloned after destination lookup
+ * @inputq: queue with cloned messages, delivered to socket after dest lookup
+ *
+ * Multi-threaded: parallel calls with reference to same queues may occur
*/
-void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *skb)
+void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
+ struct sk_buff_head *inputq)
{
- struct tipc_msg *msg = buf_msg(skb);
+ struct tipc_msg *msg;
struct tipc_plist dports;
- struct sk_buff *cskb;
u32 portid;
u32 scope = TIPC_CLUSTER_SCOPE;
- struct sk_buff_head msgq;
- uint hsz = skb_headroom(skb) + msg_hdr_sz(msg);
+ struct sk_buff_head tmpq;
+ uint hsz;
+ struct sk_buff *skb, *_skb;
- skb_queue_head_init(&msgq);
+ __skb_queue_head_init(&tmpq);
tipc_plist_init(&dports);
- if (in_own_node(net, msg_orignode(msg)))
- scope = TIPC_NODE_SCOPE;
+ skb = tipc_skb_peek(arrvq, &inputq->lock);
+ for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
+ msg = buf_msg(skb);
+ hsz = skb_headroom(skb) + msg_hdr_sz(msg);
- if (unlikely(!msg_mcast(msg))) {
- pr_warn("Received non-multicast msg in multicast\n");
- goto exit;
- }
- /* Create destination port list: */
- tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
- msg_nameupper(msg), scope, &dports);
- portid = tipc_plist_pop(&dports);
- for (; portid; portid = tipc_plist_pop(&dports)) {
- cskb = __pskb_copy(skb, hsz, GFP_ATOMIC);
- if (!cskb) {
- pr_warn("Failed do clone mcast rcv buffer\n");
- continue;
+ if (in_own_node(net, msg_orignode(msg)))
+ scope = TIPC_NODE_SCOPE;
+
+ /* Create destination port list and message clones: */
+ tipc_nametbl_mc_translate(net,
+ msg_nametype(msg), msg_namelower(msg),
+ msg_nameupper(msg), scope, &dports);
+ portid = tipc_plist_pop(&dports);
+ for (; portid; portid = tipc_plist_pop(&dports)) {
+ _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
+ if (_skb) {
+ msg_set_destport(buf_msg(_skb), portid);
+ __skb_queue_tail(&tmpq, _skb);
+ continue;
+ }
+ pr_warn("Failed to clone mcast rcv buffer\n");
}
- msg_set_destport(buf_msg(cskb), portid);
- skb_queue_tail(&msgq, cskb);
+ /* Append to inputq if not already done by other thread */
+ spin_lock_bh(&inputq->lock);
+ if (skb_peek(arrvq) == skb) {
+ skb_queue_splice_tail_init(&tmpq, inputq);
+ kfree_skb(__skb_dequeue(arrvq));
+ }
+ spin_unlock_bh(&inputq->lock);
+ __skb_queue_purge(&tmpq);
+ kfree_skb(skb);
}
- tipc_sk_rcv(net, &msgq);
-exit:
- kfree_skb(skb);
+ tipc_sk_rcv(net, inputq);
}
/**