tipc: add smart nagle feature
We introduce a feature that works like a combination of TCP_NAGLE and
TCP_CORK, but without some of the weaknesses of those. In particular,
we will not observe long delivery delays because of delayed acks, since
the algorithm itself decides if and when acks are to be sent from the
receiving peer.
- The nagle property as such is determined by manipulating a new
'maxnagle' field in struct tipc_sock. If certain conditions are met,
'maxnagle' will define max size of the messages which can be bundled.
If it is set to zero no messages are ever bundled, implying that the
nagle property is disabled.
- A socket with the nagle property enabled enters nagle mode when more
than 4 messages have been sent out without receiving any data message
from the peer.
- A socket leaves nagle mode whenever it receives a data message from
the peer.
In nagle mode, messages smaller than 'maxnagle' are accumulated in the
socket write queue. The last buffer in the queue is marked with a new
'ack_required' bit, which forces the receiving peer to send a CONN_ACK
message back to the sender upon reception.
The accumulated contents of the write queue is transmitted when one of
the following events or conditions occur.
- A CONN_ACK message is received from the peer.
- A data message is received from the peer.
- A SOCK_WAKEUP pseudo message is received from the link level.
- The write queue contains more than 64 1k blocks of data.
- The connection is being shut down.
- There is no CONN_ACK message to expect. I.e., there is currently
no outstanding message where the 'ack_required' bit was set. As a
consequence, the first message added after we enter nagle mode
is always sent directly with this bit set.
This new feature gives a 50-100% improvement of throughput for small
(i.e., less than MTU size) messages, while it might add up to one RTT
to latency time when the socket is in nagle mode.
Acked-by: Ying Xue <ying.xue@windreiver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 2bcacd6..3e99a12 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -75,6 +75,7 @@ struct sockaddr_pair {
* @conn_instance: TIPC instance used when connection was established
* @published: non-zero if port has one or more associated names
* @max_pkt: maximum packet size "hint" used when building messages sent by port
+ * @maxnagle: maximum size of msg which can be subject to nagle
* @portid: unique port identity in TIPC socket hash table
* @phdr: preformatted message header used when sending messages
* #cong_links: list of congested links
@@ -97,6 +98,7 @@ struct tipc_sock {
u32 conn_instance;
int published;
u32 max_pkt;
+ u32 maxnagle;
u32 portid;
struct tipc_msg phdr;
struct list_head cong_links;
@@ -116,6 +118,10 @@ struct tipc_sock {
struct tipc_mc_method mc_method;
struct rcu_head rcu;
struct tipc_group *group;
+ u32 oneway;
+ u16 snd_backlog;
+ bool expect_ack;
+ bool nodelay;
bool group_is_open;
};
@@ -137,6 +143,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
+static void tipc_sk_push_backlog(struct tipc_sock *tsk);
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
@@ -227,6 +234,26 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
return 1;
}
+/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
+ */
+static void tsk_set_nagle(struct tipc_sock *tsk)
+{
+ struct sock *sk = &tsk->sk;
+
+ tsk->maxnagle = 0;
+ if (sk->sk_type != SOCK_STREAM)
+ return;
+ if (tsk->nodelay)
+ return;
+ if (!(tsk->peer_caps & TIPC_NAGLE))
+ return;
+ /* Limit node local buffer size to avoid receive queue overflow */
+ if (tsk->max_pkt == MAX_MSG_SIZE)
+ tsk->maxnagle = 1500;
+ else
+ tsk->maxnagle = tsk->max_pkt;
+}
+
/**
* tsk_advance_rx_queue - discard first buffer in socket receive queue
*
@@ -446,6 +473,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
tsk = tipc_sk(sk);
tsk->max_pkt = MAX_PKT_DEFAULT;
+ tsk->maxnagle = 0;
INIT_LIST_HEAD(&tsk->publications);
INIT_LIST_HEAD(&tsk->cong_links);
msg = &tsk->phdr;
@@ -512,8 +540,12 @@ static void __tipc_shutdown(struct socket *sock, int error)
tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
!tsk_conn_cong(tsk)));
- /* Remove any pending SYN message */
- __skb_queue_purge(&sk->sk_write_queue);
+ /* Push out unsent messages or remove if pending SYN */
+ skb = skb_peek(&sk->sk_write_queue);
+ if (skb && !msg_is_syn(buf_msg(skb)))
+ tipc_sk_push_backlog(tsk);
+ else
+ __skb_queue_purge(&sk->sk_write_queue);
/* Reject all unreceived messages, except on an active connection
* (which disconnects locally & sends a 'FIN+' to peer).
@@ -1208,6 +1240,27 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
tipc_sk_rcv(net, inputq);
}
+/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
+ * when socket is in Nagle mode
+ */
+static void tipc_sk_push_backlog(struct tipc_sock *tsk)
+{
+ struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
+ struct net *net = sock_net(&tsk->sk);
+ u32 dnode = tsk_peer_node(tsk);
+ int rc;
+
+ if (skb_queue_empty(txq) || tsk->cong_link_cnt)
+ return;
+
+ tsk->snt_unacked += tsk->snd_backlog;
+ tsk->snd_backlog = 0;
+ tsk->expect_ack = true;
+ rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
+ if (rc == -ELINKCONG)
+ tsk->cong_link_cnt = 1;
+}
+
/**
* tipc_sk_conn_proto_rcv - receive a connection mng protocol message
* @tsk: receiving socket
@@ -1221,7 +1274,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
u32 onode = tsk_own_node(tsk);
struct sock *sk = &tsk->sk;
int mtyp = msg_type(hdr);
- bool conn_cong;
+ bool was_cong;
/* Ignore if connection cannot be validated: */
if (!tsk_peer_msg(tsk, hdr)) {
@@ -1254,11 +1307,13 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
__skb_queue_tail(xmitq, skb);
return;
} else if (mtyp == CONN_ACK) {
- conn_cong = tsk_conn_cong(tsk);
+ was_cong = tsk_conn_cong(tsk);
+ tsk->expect_ack = false;
+ tipc_sk_push_backlog(tsk);
tsk->snt_unacked -= msg_conn_ack(hdr);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
tsk->snd_win = msg_adv_win(hdr);
- if (conn_cong)
+ if (was_cong && !tsk_conn_cong(tsk))
sk->sk_write_space(sk);
} else if (mtyp != CONN_PROBE_REPLY) {
pr_warn("Received unknown CONN_PROTO msg\n");
@@ -1437,15 +1492,15 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
struct sock *sk = sock->sk;
DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+ struct sk_buff_head *txq = &sk->sk_write_queue;
struct tipc_sock *tsk = tipc_sk(sk);
struct tipc_msg *hdr = &tsk->phdr;
struct net *net = sock_net(sk);
- struct sk_buff_head pkts;
u32 dnode = tsk_peer_node(tsk);
+ int maxnagle = tsk->maxnagle;
+ int maxpkt = tsk->max_pkt;
int send, sent = 0;
- int rc = 0;
-
- __skb_queue_head_init(&pkts);
+ int blocks, rc = 0;
if (unlikely(dlen > INT_MAX))
return -EMSGSIZE;
@@ -1467,21 +1522,35 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
tipc_sk_connected(sk)));
if (unlikely(rc))
break;
-
send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
- rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
- if (unlikely(rc != send))
- break;
-
- trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
+ blocks = tsk->snd_backlog;
+ if (tsk->oneway++ >= 4 && send <= maxnagle) {
+ rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
+ if (unlikely(rc < 0))
+ break;
+ blocks += rc;
+ if (blocks <= 64 && tsk->expect_ack) {
+ tsk->snd_backlog = blocks;
+ sent += send;
+ break;
+ }
+ tsk->expect_ack = true;
+ } else {
+ rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
+ if (unlikely(rc != send))
+ break;
+ blocks += tsk_inc(tsk, send + MIN_H_SIZE);
+ }
+ trace_tipc_sk_sendstream(sk, skb_peek(txq),
TIPC_DUMP_SK_SNDQ, " ");
- rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+ rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
if (unlikely(rc == -ELINKCONG)) {
tsk->cong_link_cnt = 1;
rc = 0;
}
if (likely(!rc)) {
- tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
+ tsk->snt_unacked += blocks;
+ tsk->snd_backlog = 0;
sent += send;
}
} while (sent < dlen && !rc);
@@ -1528,6 +1597,7 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
+ tsk_set_nagle(tsk);
__skb_queue_purge(&sk->sk_write_queue);
if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
return;
@@ -1848,6 +1918,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
bool peek = flags & MSG_PEEK;
int offset, required, copy, copied = 0;
int hlen, dlen, err, rc;
+ bool ack = false;
long timeout;
/* Catch invalid receive attempts */
@@ -1892,6 +1963,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
/* Copy data if msg ok, otherwise return error/partial data */
if (likely(!err)) {
+ ack = msg_ack_required(hdr);
offset = skb_cb->bytes_read;
copy = min_t(int, dlen - offset, buflen - copied);
rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
@@ -1919,7 +1991,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
/* Send connection flow control advertisement when applicable */
tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
- if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
+ if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
tipc_sk_send_ack(tsk);
/* Exit if all requested data or FIN/error received */
@@ -1990,6 +2062,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
smp_wmb();
tsk->cong_link_cnt--;
wakeup = true;
+ tipc_sk_push_backlog(tsk);
break;
case GROUP_PROTOCOL:
tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
@@ -2029,6 +2102,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
if (unlikely(msg_mcast(hdr)))
return false;
+ tsk->oneway = 0;
switch (sk->sk_state) {
case TIPC_CONNECTING:
@@ -2074,6 +2148,8 @@ static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
return true;
return false;
case TIPC_ESTABLISHED:
+ if (!skb_queue_empty(&sk->sk_write_queue))
+ tipc_sk_push_backlog(tsk);
/* Accept only connection-based messages sent by peer */
if (likely(con_msg && !err && pport == oport && pnode == onode))
return true;
@@ -2959,6 +3035,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
case TIPC_SRC_DROPPABLE:
case TIPC_DEST_DROPPABLE:
case TIPC_CONN_TIMEOUT:
+ case TIPC_NODELAY:
if (ol < sizeof(value))
return -EINVAL;
if (get_user(value, (u32 __user *)ov))
@@ -3007,6 +3084,10 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
case TIPC_GROUP_LEAVE:
res = tipc_sk_leave(tsk);
break;
+ case TIPC_NODELAY:
+ tsk->nodelay = !!value;
+ tsk_set_nagle(tsk);
+ break;
default:
res = -EINVAL;
}