tipc: reduce risk of user starvation during link congestion

The socket code currently handles link congestion by either blocking
and trying to send again when the congestion has abated, or just
returning to the user with -EAGAIN and let him re-try later.

This mechanism is prone to starvation, because the wakeup algorithm is
non-atomic. During the time the link issues a wakeup signal, until the
socket wakes up and re-attempts sending, other senders may have come
in between and occupied the free buffer space in the link. This in turn
may lead to a socket having to make many send attempts before it is
successful. In extremely loaded systems we have observed latency times
of several seconds before a low-priority socket is able to send out a
message.

In this commit, we simplify this mechanism and reduce the risk of the
described scenario happening. When a message is attempted sent via a
congested link, we now let it be added to the link's backlog queue
anyway, thus permitting an oversubscription of one message per source
socket. We still create a wakeup item and return an error code, hence
instructing the sender to block or stop sending. Only when enough space
has been freed up in the link's backlog queue do we issue a wakeup event
that allows the sender to continue with the next message, if any.

The fact that a socket now can consider a message sent even when the
link returns a congestion code means that the sending socket code can
be simplified. Also, since this is a good opportunity to get rid of the
obsolete 'mtu change' condition in the three socket send functions, we
now choose to refactor those functions completely.

Signed-off-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index fae6a55..d2f3539 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -67,12 +67,14 @@ enum {
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
  * @portid: unique port identity in TIPC socket hash table
  * @phdr: preformatted message header used when sending messages
+ * #cong_links: list of congested links
  * @publications: list of publications for port
+ * @blocking_link: address of the congested link we are currently sleeping on
  * @pub_count: total # of publications port has made during its lifetime
  * @probing_state:
  * @conn_timeout: the time we can wait for an unresponded setup request
  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
- * @link_cong: non-zero if owner must sleep because of link congestion
+ * @cong_link_cnt: number of congested links
  * @sent_unacked: # messages sent by socket, and not yet acked by peer
  * @rcv_unacked: # messages read by user, but not yet acked back to peer
  * @peer: 'connected' peer for dgram/rdm
@@ -87,13 +89,13 @@ struct tipc_sock {
 	u32 max_pkt;
 	u32 portid;
 	struct tipc_msg phdr;
-	struct list_head sock_list;
+	struct list_head cong_links;
 	struct list_head publications;
 	u32 pub_count;
 	uint conn_timeout;
 	atomic_t dupl_rcvcnt;
 	bool probe_unacked;
-	bool link_cong;
+	u16 cong_link_cnt;
 	u16 snt_unacked;
 	u16 snd_win;
 	u16 peer_caps;
@@ -118,8 +120,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
 static int tipc_sk_insert(struct tipc_sock *tsk);
 static void tipc_sk_remove(struct tipc_sock *tsk);
-static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
-			      size_t dsz);
+static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
 
 static const struct proto_ops packet_ops;
@@ -424,6 +425,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
 	tsk = tipc_sk(sk);
 	tsk->max_pkt = MAX_PKT_DEFAULT;
 	INIT_LIST_HEAD(&tsk->publications);
+	INIT_LIST_HEAD(&tsk->cong_links);
 	msg = &tsk->phdr;
 	tn = net_generic(sock_net(sk), tipc_net_id);
 	tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
@@ -474,9 +476,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
 	struct net *net = sock_net(sk);
+	long timeout = CONN_TIMEOUT_DEFAULT;
 	u32 dnode = tsk_peer_node(tsk);
 	struct sk_buff *skb;
 
+	/* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
+	tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
+					    !tsk_conn_cong(tsk)));
+
 	/* Reject all unreceived messages, except on an active connection
 	 * (which disconnects locally & sends a 'FIN+' to peer).
 	 */
@@ -547,7 +554,8 @@ static int tipc_release(struct socket *sock)
 
 	/* Reject any messages that accumulated in backlog queue */
 	release_sock(sk);
-
+	u32_list_purge(&tsk->cong_links);
+	tsk->cong_link_cnt = 0;
 	call_rcu(&tsk->rcu, tipc_sk_callback);
 	sock->sk = NULL;
 
@@ -690,7 +698,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 
 	switch (sk->sk_state) {
 	case TIPC_ESTABLISHED:
-		if (!tsk->link_cong && !tsk_conn_cong(tsk))
+		if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
 			mask |= POLLOUT;
 		/* fall thru' */
 	case TIPC_LISTEN:
@@ -699,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 			mask |= (POLLIN | POLLRDNORM);
 		break;
 	case TIPC_OPEN:
-		if (!tsk->link_cong)
+		if (!tsk->cong_link_cnt)
 			mask |= POLLOUT;
 		if (tipc_sk_type_connectionless(sk) &&
 		    (!skb_queue_empty(&sk->sk_receive_queue)))
@@ -718,63 +726,48 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
  * @sock: socket structure
  * @seq: destination address
  * @msg: message to send
- * @dsz: total length of message data
- * @timeo: timeout to wait for wakeup
+ * @dlen: length of data to send
+ * @timeout: timeout to wait for wakeup
  *
  * Called from function tipc_sendmsg(), which has done all sanity checks
  * Returns the number of bytes sent on success, or errno
  */
 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
-			  struct msghdr *msg, size_t dsz, long timeo)
+			  struct msghdr *msg, size_t dlen, long timeout)
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_msg *hdr = &tsk->phdr;
 	struct net *net = sock_net(sk);
-	struct tipc_msg *mhdr = &tsk->phdr;
-	struct sk_buff_head pktchain;
-	struct iov_iter save = msg->msg_iter;
-	uint mtu;
+	int mtu = tipc_bcast_get_mtu(net);
+	struct sk_buff_head pkts;
 	int rc;
 
-	if (!timeo && tsk->link_cong)
-		return -ELINKCONG;
-
-	msg_set_type(mhdr, TIPC_MCAST_MSG);
-	msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
-	msg_set_destport(mhdr, 0);
-	msg_set_destnode(mhdr, 0);
-	msg_set_nametype(mhdr, seq->type);
-	msg_set_namelower(mhdr, seq->lower);
-	msg_set_nameupper(mhdr, seq->upper);
-	msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
-
-	skb_queue_head_init(&pktchain);
-
-new_mtu:
-	mtu = tipc_bcast_get_mtu(net);
-	rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
-	if (unlikely(rc < 0))
+	rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
+	if (unlikely(rc))
 		return rc;
 
-	do {
-		rc = tipc_bcast_xmit(net, &pktchain);
-		if (likely(!rc))
-			return dsz;
+	msg_set_type(hdr, TIPC_MCAST_MSG);
+	msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
+	msg_set_destport(hdr, 0);
+	msg_set_destnode(hdr, 0);
+	msg_set_nametype(hdr, seq->type);
+	msg_set_namelower(hdr, seq->lower);
+	msg_set_nameupper(hdr, seq->upper);
+	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 
-		if (rc == -ELINKCONG) {
-			tsk->link_cong = 1;
-			rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong);
-			if (!rc)
-				continue;
-		}
-		__skb_queue_purge(&pktchain);
-		if (rc == -EMSGSIZE) {
-			msg->msg_iter = save;
-			goto new_mtu;
-		}
-		break;
-	} while (1);
-	return rc;
+	skb_queue_head_init(&pkts);
+	rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
+	if (unlikely(rc != dlen))
+		return rc;
+
+	rc = tipc_bcast_xmit(net, &pkts);
+	if (unlikely(rc == -ELINKCONG)) {
+		tsk->cong_link_cnt = 1;
+		rc = 0;
+	}
+
+	return rc ? rc : dlen;
 }
 
 /**
@@ -898,35 +891,38 @@ static int tipc_sendmsg(struct socket *sock,
 	return ret;
 }
 
-static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
+static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 {
-	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 	struct sock *sk = sock->sk;
-	struct tipc_sock *tsk = tipc_sk(sk);
 	struct net *net = sock_net(sk);
-	struct tipc_msg *mhdr = &tsk->phdr;
-	u32 dnode, dport;
-	struct sk_buff_head pktchain;
-	bool is_connectionless = tipc_sk_type_connectionless(sk);
-	struct sk_buff *skb;
+	struct tipc_sock *tsk = tipc_sk(sk);
+	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
+	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+	struct list_head *clinks = &tsk->cong_links;
+	bool syn = !tipc_sk_type_connectionless(sk);
+	struct tipc_msg *hdr = &tsk->phdr;
 	struct tipc_name_seq *seq;
-	struct iov_iter save;
-	u32 mtu;
-	long timeo;
-	int rc;
+	struct sk_buff_head pkts;
+	u32 type, inst, domain;
+	u32 dnode, dport;
+	int mtu, rc;
 
-	if (dsz > TIPC_MAX_USER_MSG_SIZE)
+	if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
 		return -EMSGSIZE;
+
 	if (unlikely(!dest)) {
-		if (is_connectionless && tsk->peer.family == AF_TIPC)
-			dest = &tsk->peer;
-		else
+		dest = &tsk->peer;
+		if (!syn || dest->family != AF_TIPC)
 			return -EDESTADDRREQ;
-	} else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
-		   dest->family != AF_TIPC) {
-		return -EINVAL;
 	}
-	if (!is_connectionless) {
+
+	if (unlikely(m->msg_namelen < sizeof(*dest)))
+		return -EINVAL;
+
+	if (unlikely(dest->family != AF_TIPC))
+		return -EINVAL;
+
+	if (unlikely(syn)) {
 		if (sk->sk_state == TIPC_LISTEN)
 			return -EPIPE;
 		if (sk->sk_state != TIPC_OPEN)
@@ -938,72 +934,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
 			tsk->conn_instance = dest->addr.name.name.instance;
 		}
 	}
+
 	seq = &dest->addr.nameseq;
-	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+	if (dest->addrtype == TIPC_ADDR_MCAST)
+		return tipc_sendmcast(sock, seq, m, dlen, timeout);
 
-	if (dest->addrtype == TIPC_ADDR_MCAST) {
-		return tipc_sendmcast(sock, seq, m, dsz, timeo);
-	} else if (dest->addrtype == TIPC_ADDR_NAME) {
-		u32 type = dest->addr.name.name.type;
-		u32 inst = dest->addr.name.name.instance;
-		u32 domain = dest->addr.name.domain;
-
+	if (dest->addrtype == TIPC_ADDR_NAME) {
+		type = dest->addr.name.name.type;
+		inst = dest->addr.name.name.instance;
+		domain = dest->addr.name.domain;
 		dnode = domain;
-		msg_set_type(mhdr, TIPC_NAMED_MSG);
-		msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
-		msg_set_nametype(mhdr, type);
-		msg_set_nameinst(mhdr, inst);
-		msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
+		msg_set_type(hdr, TIPC_NAMED_MSG);
+		msg_set_hdr_sz(hdr, NAMED_H_SIZE);
+		msg_set_nametype(hdr, type);
+		msg_set_nameinst(hdr, inst);
+		msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
 		dport = tipc_nametbl_translate(net, type, inst, &dnode);
-		msg_set_destnode(mhdr, dnode);
-		msg_set_destport(mhdr, dport);
+		msg_set_destnode(hdr, dnode);
+		msg_set_destport(hdr, dport);
 		if (unlikely(!dport && !dnode))
 			return -EHOSTUNREACH;
+
 	} else if (dest->addrtype == TIPC_ADDR_ID) {
 		dnode = dest->addr.id.node;
-		msg_set_type(mhdr, TIPC_DIRECT_MSG);
-		msg_set_lookup_scope(mhdr, 0);
-		msg_set_destnode(mhdr, dnode);
-		msg_set_destport(mhdr, dest->addr.id.ref);
-		msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
+		msg_set_type(hdr, TIPC_DIRECT_MSG);
+		msg_set_lookup_scope(hdr, 0);
+		msg_set_destnode(hdr, dnode);
+		msg_set_destport(hdr, dest->addr.id.ref);
+		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
 	}
 
-	skb_queue_head_init(&pktchain);
-	save = m->msg_iter;
-new_mtu:
-	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
-	rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
-	if (rc < 0)
+	/* Block or return if destination link is congested */
+	rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
+	if (unlikely(rc))
 		return rc;
 
-	do {
-		skb = skb_peek(&pktchain);
-		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-		rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
-		if (likely(!rc)) {
-			if (!is_connectionless)
-				tipc_set_sk_state(sk, TIPC_CONNECTING);
-			return dsz;
-		}
-		if (rc == -ELINKCONG) {
-			tsk->link_cong = 1;
-			rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong);
-			if (!rc)
-				continue;
-		}
-		__skb_queue_purge(&pktchain);
-		if (rc == -EMSGSIZE) {
-			m->msg_iter = save;
-			goto new_mtu;
-		}
-		break;
-	} while (1);
+	skb_queue_head_init(&pkts);
+	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+	rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
+	if (unlikely(rc != dlen))
+		return rc;
 
-	return rc;
+	rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+	if (unlikely(rc == -ELINKCONG)) {
+		u32_push(clinks, dnode);
+		tsk->cong_link_cnt++;
+		rc = 0;
+	}
+
+	if (unlikely(syn && !rc))
+		tipc_set_sk_state(sk, TIPC_CONNECTING);
+
+	return rc ? rc : dlen;
 }
 
 /**
- * tipc_send_stream - send stream-oriented data
+ * tipc_sendstream - send stream-oriented data
  * @sock: socket structure
  * @m: data to send
  * @dsz: total length of data to be transmitted
@@ -1013,97 +999,69 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
  * Returns the number of bytes sent on success (or partial success),
  * or errno if no data sent
  */
-static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
+static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
 {
 	struct sock *sk = sock->sk;
 	int ret;
 
 	lock_sock(sk);
-	ret = __tipc_send_stream(sock, m, dsz);
+	ret = __tipc_sendstream(sock, m, dsz);
 	release_sock(sk);
 
 	return ret;
 }
 
-static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
+static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
 {
 	struct sock *sk = sock->sk;
-	struct net *net = sock_net(sk);
-	struct tipc_sock *tsk = tipc_sk(sk);
-	struct tipc_msg *mhdr = &tsk->phdr;
-	struct sk_buff_head pktchain;
 	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-	u32 portid = tsk->portid;
-	int rc = -EINVAL;
-	long timeo;
-	u32 dnode;
-	uint mtu, send, sent = 0;
-	struct iov_iter save;
-	int hlen = MIN_H_SIZE;
+	long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+	struct tipc_sock *tsk = tipc_sk(sk);
+	struct tipc_msg *hdr = &tsk->phdr;
+	struct net *net = sock_net(sk);
+	struct sk_buff_head pkts;
+	u32 dnode = tsk_peer_node(tsk);
+	int send, sent = 0;
+	int rc = 0;
 
-	/* Handle implied connection establishment */
-	if (unlikely(dest)) {
-		rc = __tipc_sendmsg(sock, m, dsz);
-		hlen = msg_hdr_sz(mhdr);
-		if (dsz && (dsz == rc))
-			tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
-		return rc;
-	}
-	if (dsz > (uint)INT_MAX)
+	skb_queue_head_init(&pkts);
+
+	if (unlikely(dlen > INT_MAX))
 		return -EMSGSIZE;
 
-	if (unlikely(!tipc_sk_connected(sk))) {
-		if (sk->sk_state == TIPC_DISCONNECTING)
-			return -EPIPE;
-		else
-			return -ENOTCONN;
+	/* Handle implicit connection setup */
+	if (unlikely(dest)) {
+		rc = __tipc_sendmsg(sock, m, dlen);
+		if (dlen && (dlen == rc))
+			tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
+		return rc;
 	}
 
-	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
-	if (!timeo && tsk->link_cong)
-		return -ELINKCONG;
-
-	dnode = tsk_peer_node(tsk);
-	skb_queue_head_init(&pktchain);
-
-next:
-	save = m->msg_iter;
-	mtu = tsk->max_pkt;
-	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
-	rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
-	if (unlikely(rc < 0))
-		return rc;
-
 	do {
-		if (likely(!tsk_conn_cong(tsk))) {
-			rc = tipc_node_xmit(net, &pktchain, dnode, portid);
-			if (likely(!rc)) {
-				tsk->snt_unacked += tsk_inc(tsk, send + hlen);
-				sent += send;
-				if (sent == dsz)
-					return dsz;
-				goto next;
-			}
-			if (rc == -EMSGSIZE) {
-				__skb_queue_purge(&pktchain);
-				tsk->max_pkt = tipc_node_get_mtu(net, dnode,
-								 portid);
-				m->msg_iter = save;
-				goto next;
-			}
-			if (rc != -ELINKCONG)
-				break;
-
-			tsk->link_cong = 1;
-		}
-		rc = tipc_wait_for_cond(sock, &timeo,
-					(!tsk->link_cong &&
+		rc = tipc_wait_for_cond(sock, &timeout,
+					(!tsk->cong_link_cnt &&
 					 !tsk_conn_cong(tsk) &&
 					 tipc_sk_connected(sk)));
-	} while (!rc);
+		if (unlikely(rc))
+			break;
 
-	__skb_queue_purge(&pktchain);
-	return sent ? sent : rc;
+		send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
+		rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
+		if (unlikely(rc != send))
+			break;
+
+		rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+		if (unlikely(rc == -ELINKCONG)) {
+			tsk->cong_link_cnt = 1;
+			rc = 0;
+		}
+		if (likely(!rc)) {
+			tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
+			sent += send;
+		}
+	} while (sent < dlen && !rc);
+
+	return rc ? rc : sent;
 }
 
 /**
@@ -1121,7 +1079,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
 	if (dsz > TIPC_MAX_USER_MSG_SIZE)
 		return -EMSGSIZE;
 
-	return tipc_send_stream(sock, m, dsz);
+	return tipc_sendstream(sock, m, dsz);
 }
 
 /* tipc_sk_finish_conn - complete the setup of a connection
@@ -1688,6 +1646,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
 	unsigned int limit = rcvbuf_limit(sk, skb);
 	int err = TIPC_OK;
 	int usr = msg_user(hdr);
+	u32 onode;
 
 	if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
 		tipc_sk_proto_rcv(tsk, skb, xmitq);
@@ -1695,8 +1654,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
 	}
 
 	if (unlikely(usr == SOCK_WAKEUP)) {
+		onode = msg_orignode(hdr);
 		kfree_skb(skb);
-		tsk->link_cong = 0;
+		u32_del(&tsk->cong_links, onode);
+		tsk->cong_link_cnt--;
 		sk->sk_write_space(sk);
 		return false;
 	}
@@ -2104,7 +2065,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
 		struct msghdr m = {NULL,};
 
 		tsk_advance_rx_queue(sk);
-		__tipc_send_stream(new_sock, &m, 0);
+		__tipc_sendstream(new_sock, &m, 0);
 	} else {
 		__skb_dequeue(&sk->sk_receive_queue);
 		__skb_queue_head(&new_sk->sk_receive_queue, buf);
@@ -2565,7 +2526,7 @@ static const struct proto_ops stream_ops = {
 	.shutdown	= tipc_shutdown,
 	.setsockopt	= tipc_setsockopt,
 	.getsockopt	= tipc_getsockopt,
-	.sendmsg	= tipc_send_stream,
+	.sendmsg	= tipc_sendstream,
 	.recvmsg	= tipc_recv_stream,
 	.mmap		= sock_no_mmap,
 	.sendpage	= sock_no_sendpage