tipc: simplify connection congestion handling
As a consequence of the recently introduced serialized access
to the socket in commit 8d94168a761819d10252bab1f8de6d7b202c3baa
("tipc: same receive code path for connection protocol and data
messages") we can make a number of simplifications in the
detection and handling of connection congestion situations.
- We don't need to keep two counters, one for sent messages and one
for acked messages. There is no longer any risk for races between
acknowledge messages arriving in BH and data message sending
running in user context. So we merge this into one counter,
'sent_unacked', which is incremented at sending and subtracted
from at acknowledge reception.
- We don't need to set the 'congested' field in tipc_port to
true before we sent the message, and clear it when sending
is successful. (As a matter of fact, it was never necessary;
the field was set in link_schedule_port() before any wakeup
could arrive anyway.)
- We keep the conditions for link congestion and connection connection
congestion separated. There would otherwise be a risk that an arriving
acknowledge message may wake up a user sleeping because of link
congestion.
- We can simplify reception of acknowledge messages.
We also make some cosmetic/structural changes:
- We rename the 'congested' field to the more correct 'link_congĀ“.
- We rename 'conn_unacked' to 'rcv_unacked'
- We move the above mentioned fields from struct tipc_port to
struct tipc_sock.
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1762323..ede78b1 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -207,7 +207,7 @@
sk->sk_data_ready = tipc_data_ready;
sk->sk_write_space = tipc_write_space;
tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
- tsk->port.sent = 0;
+ tsk->sent_unacked = 0;
atomic_set(&tsk->dupl_rcvcnt, 0);
tipc_port_unlock(port);
@@ -513,12 +513,12 @@
switch ((int)sock->state) {
case SS_UNCONNECTED:
- if (!tsk->port.congested)
+ if (!tsk->link_cong)
mask |= POLLOUT;
break;
case SS_READY:
case SS_CONNECTED:
- if (!tsk->port.congested)
+ if (!tsk->link_cong && !tipc_sk_conn_cong(tsk))
mask |= POLLOUT;
/* fall thru' */
case SS_CONNECTING:
@@ -546,7 +546,7 @@
{
struct tipc_msg *msg = buf_msg(buf);
struct tipc_port *port = &tsk->port;
- int wakeable;
+ int conn_cong;
/* Ignore if connection cannot be validated: */
if (!port->connected || !tipc_port_peer_msg(port, msg))
@@ -555,13 +555,10 @@
port->probing_state = TIPC_CONN_OK;
if (msg_type(msg) == CONN_ACK) {
- wakeable = tipc_port_congested(port) && port->congested;
- port->acked += msg_msgcnt(msg);
- if (!tipc_port_congested(port)) {
- port->congested = 0;
- if (wakeable)
- tipc_port_wakeup(port);
- }
+ conn_cong = tipc_sk_conn_cong(tsk);
+ tsk->sent_unacked -= msg_msgcnt(msg);
+ if (conn_cong)
+ tipc_sock_wakeup(tsk);
} else if (msg_type(msg) == CONN_PROBE) {
if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
return TIPC_OK;
@@ -626,7 +623,7 @@
return sock_intr_errno(*timeo_p);
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
- done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
+ done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
@@ -800,7 +797,6 @@
{
struct sock *sk = sock->sk;
struct tipc_sock *tsk = tipc_sk(sk);
- struct tipc_port *port = &tsk->port;
DEFINE_WAIT(wait);
int done;
@@ -819,7 +815,9 @@
prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
done = sk_wait_event(sk, timeo_p,
- (!port->congested || !port->connected));
+ (!tsk->link_cong &&
+ !tipc_sk_conn_cong(tsk)) ||
+ !tsk->port.connected);
finish_wait(sk_sleep(sk), &wait);
} while (!done);
return 0;
@@ -856,7 +854,7 @@
if (unlikely(dest)) {
rc = tipc_sendmsg(iocb, sock, m, dsz);
if (dsz && (dsz == rc))
- tsk->port.sent = 1;
+ tsk->sent_unacked = 1;
return rc;
}
if (dsz > (uint)INT_MAX)
@@ -875,7 +873,6 @@
timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
dnode = tipc_port_peernode(port);
- port->congested = 1;
next:
mtu = port->max_pkt;
@@ -884,11 +881,10 @@
if (unlikely(rc < 0))
goto exit;
do {
- port->congested = 1;
- if (likely(!tipc_port_congested(port))) {
+ if (likely(!tipc_sk_conn_cong(tsk))) {
rc = tipc_link_xmit2(buf, dnode, ref);
if (likely(!rc)) {
- port->sent++;
+ tsk->sent_unacked++;
sent += send;
if (sent == dsz)
break;
@@ -903,8 +899,6 @@
}
rc = tipc_wait_for_sndpkt(sock, &timeo);
} while (!rc);
-
- port->congested = 0;
exit:
if (iocb)
release_sock(sk);
@@ -1169,8 +1163,10 @@
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
if ((sock->state != SS_READY) &&
- (++port->conn_unacked >= TIPC_CONNACK_INTV))
- tipc_acknowledge(port->ref, port->conn_unacked);
+ (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+ tipc_acknowledge(port->ref, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+ }
advance_rx_queue(sk);
}
exit:
@@ -1278,8 +1274,10 @@
/* Consume received message (optional) */
if (likely(!(flags & MSG_PEEK))) {
- if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
- tipc_acknowledge(port->ref, port->conn_unacked);
+ if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
+ tipc_acknowledge(port->ref, tsk->rcv_unacked);
+ tsk->rcv_unacked = 0;
+ }
advance_rx_queue(sk);
}