tcp: TCP Small Queues

This introduce TSQ (TCP Small Queues)

TSQ goal is to reduce number of TCP packets in xmit queues (qdisc &
device queues), to reduce RTT and cwnd bias, part of the bufferbloat
problem.

sk->sk_wmem_alloc not allowed to grow above a given limit,
allowing no more than ~128KB [1] per tcp socket in qdisc/dev layers at a
given time.

TSO packets are sized/capped to half the limit, so that we have two
TSO packets in flight, allowing better bandwidth use.

As a side effect, setting the limit to 40000 automatically reduces the
standard gso max limit (65536) to 40000/2 : It can help to reduce
latencies of high prio packets, having smaller TSO packets.

This means we divert sock_wfree() to a tcp_wfree() handler, to
queue/send following frames when skb_orphan() [2] is called for the
already queued skbs.

Results on my dev machines (tg3/ixgbe nics) are really impressive,
using standard pfifo_fast, and with or without TSO/GSO.

Without reduction of nominal bandwidth, we have reduction of buffering
per bulk sender :
< 1ms on Gbit (instead of 50ms with TSO)
< 8ms on 100Mbit (instead of 132 ms)

I no longer have 4 MBytes backlogged in qdisc by a single netperf
session, and both side socket autotuning no longer use 4 Mbytes.

As skb destructor cannot restart xmit itself ( as qdisc lock might be
taken at this point ), we delegate the work to a tasklet. We use one
tasklest per cpu for performance reasons.

If tasklet finds a socket owned by the user, it sets TSQ_OWNED flag.
This flag is tested in a new protocol method called from release_sock(),
to eventually send new segments.

[1] New /proc/sys/net/ipv4/tcp_limit_output_bytes tunable
[2] skb_orphan() is usually called at TX completion time,
  but some drivers call it in their start_xmit() handler.
  These drivers should at least use BQL, or else a single TCP
  session can still fill the whole NIC TX ring, since TSQ will
  have no effect.

Signed-off-by: Eric Dumazet <edumazet@google.com>
Cc: Dave Taht <dave.taht@bufferbloat.net>
Cc: Tom Herbert <therbert@google.com>
Cc: Matt Mathis <mattmathis@google.com>
Cc: Yuchung Cheng <ycheng@google.com>
Cc: Nandita Dukkipati <nanditad@google.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/ipv4/sysctl_net_ipv4.c b/net/ipv4/sysctl_net_ipv4.c
index 12aa0c5..70730f7 100644
--- a/net/ipv4/sysctl_net_ipv4.c
+++ b/net/ipv4/sysctl_net_ipv4.c
@@ -598,6 +598,13 @@
 		.mode		= 0644,
 		.proc_handler	= proc_dointvec
 	},
+	{
+		.procname	= "tcp_limit_output_bytes",
+		.data		= &sysctl_tcp_limit_output_bytes,
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec
+	},
 #ifdef CONFIG_NET_DMA
 	{
 		.procname	= "tcp_dma_copybreak",
diff --git a/net/ipv4/tcp.c b/net/ipv4/tcp.c
index d902da9..4252cd8 100644
--- a/net/ipv4/tcp.c
+++ b/net/ipv4/tcp.c
@@ -376,6 +376,7 @@
 	skb_queue_head_init(&tp->out_of_order_queue);
 	tcp_init_xmit_timers(sk);
 	tcp_prequeue_init(tp);
+	INIT_LIST_HEAD(&tp->tsq_node);
 
 	icsk->icsk_rto = TCP_TIMEOUT_INIT;
 	tp->mdev = TCP_TIMEOUT_INIT;
@@ -796,6 +797,10 @@
 				  inet_csk(sk)->icsk_ext_hdr_len -
 				  tp->tcp_header_len);
 
+		/* TSQ : try to have two TSO segments in flight */
+		xmit_size_goal = min_t(u32, xmit_size_goal,
+				       sysctl_tcp_limit_output_bytes >> 1);
+
 		xmit_size_goal = tcp_bound_to_half_wnd(tp, xmit_size_goal);
 
 		/* We try hard to avoid divides here */
@@ -3574,4 +3579,5 @@
 	tcp_secret_primary = &tcp_secret_one;
 	tcp_secret_retiring = &tcp_secret_two;
 	tcp_secret_secondary = &tcp_secret_two;
+	tcp_tasklet_init();
 }
diff --git a/net/ipv4/tcp_ipv4.c b/net/ipv4/tcp_ipv4.c
index ddefd39..01545a3 100644
--- a/net/ipv4/tcp_ipv4.c
+++ b/net/ipv4/tcp_ipv4.c
@@ -2588,6 +2588,7 @@
 	.sendmsg		= tcp_sendmsg,
 	.sendpage		= tcp_sendpage,
 	.backlog_rcv		= tcp_v4_do_rcv,
+	.release_cb		= tcp_release_cb,
 	.hash			= inet_hash,
 	.unhash			= inet_unhash,
 	.get_port		= inet_csk_get_port,
diff --git a/net/ipv4/tcp_minisocks.c b/net/ipv4/tcp_minisocks.c
index 6560886..c66f2ed 100644
--- a/net/ipv4/tcp_minisocks.c
+++ b/net/ipv4/tcp_minisocks.c
@@ -424,6 +424,7 @@
 			treq->snt_isn + 1 + tcp_s_data_size(oldtp);
 
 		tcp_prequeue_init(newtp);
+		INIT_LIST_HEAD(&newtp->tsq_node);
 
 		tcp_init_wl(newtp, treq->rcv_isn);
 
diff --git a/net/ipv4/tcp_output.c b/net/ipv4/tcp_output.c
index c465d3e..03854ab 100644
--- a/net/ipv4/tcp_output.c
+++ b/net/ipv4/tcp_output.c
@@ -50,6 +50,9 @@
  */
 int sysctl_tcp_workaround_signed_windows __read_mostly = 0;
 
+/* Default TSQ limit of two TSO segments */
+int sysctl_tcp_limit_output_bytes __read_mostly = 131072;
+
 /* This limits the percentage of the congestion window which we
  * will allow a single TSO frame to consume.  Building TSO frames
  * which are too large can cause TCP streams to be bursty.
@@ -65,6 +68,8 @@
 int sysctl_tcp_cookie_size __read_mostly = 0; /* TCP_COOKIE_MAX */
 EXPORT_SYMBOL_GPL(sysctl_tcp_cookie_size);
 
+static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle,
+			   int push_one, gfp_t gfp);
 
 /* Account for new data that has been sent to the network. */
 static void tcp_event_new_data_sent(struct sock *sk, const struct sk_buff *skb)
@@ -783,6 +788,140 @@
 	return size;
 }
 
+
+/* TCP SMALL QUEUES (TSQ)
+ *
+ * TSQ goal is to keep small amount of skbs per tcp flow in tx queues (qdisc+dev)
+ * to reduce RTT and bufferbloat.
+ * We do this using a special skb destructor (tcp_wfree).
+ *
+ * Its important tcp_wfree() can be replaced by sock_wfree() in the event skb
+ * needs to be reallocated in a driver.
+ * The invariant being skb->truesize substracted from sk->sk_wmem_alloc
+ *
+ * Since transmit from skb destructor is forbidden, we use a tasklet
+ * to process all sockets that eventually need to send more skbs.
+ * We use one tasklet per cpu, with its own queue of sockets.
+ */
+struct tsq_tasklet {
+	struct tasklet_struct	tasklet;
+	struct list_head	head; /* queue of tcp sockets */
+};
+static DEFINE_PER_CPU(struct tsq_tasklet, tsq_tasklet);
+
+/*
+ * One tasklest per cpu tries to send more skbs.
+ * We run in tasklet context but need to disable irqs when
+ * transfering tsq->head because tcp_wfree() might
+ * interrupt us (non NAPI drivers)
+ */
+static void tcp_tasklet_func(unsigned long data)
+{
+	struct tsq_tasklet *tsq = (struct tsq_tasklet *)data;
+	LIST_HEAD(list);
+	unsigned long flags;
+	struct list_head *q, *n;
+	struct tcp_sock *tp;
+	struct sock *sk;
+
+	local_irq_save(flags);
+	list_splice_init(&tsq->head, &list);
+	local_irq_restore(flags);
+
+	list_for_each_safe(q, n, &list) {
+		tp = list_entry(q, struct tcp_sock, tsq_node);
+		list_del(&tp->tsq_node);
+
+		sk = (struct sock *)tp;
+		bh_lock_sock(sk);
+
+		if (!sock_owned_by_user(sk)) {
+			if ((1 << sk->sk_state) &
+			    (TCPF_ESTABLISHED | TCPF_FIN_WAIT1 |
+			     TCPF_CLOSING | TCPF_CLOSE_WAIT))
+				tcp_write_xmit(sk,
+					       tcp_current_mss(sk),
+					       0, 0,
+					       GFP_ATOMIC);
+		} else {
+			/* defer the work to tcp_release_cb() */
+			set_bit(TSQ_OWNED, &tp->tsq_flags);
+		}
+		bh_unlock_sock(sk);
+
+		clear_bit(TSQ_QUEUED, &tp->tsq_flags);
+		sk_free(sk);
+	}
+}
+
+/**
+ * tcp_release_cb - tcp release_sock() callback
+ * @sk: socket
+ *
+ * called from release_sock() to perform protocol dependent
+ * actions before socket release.
+ */
+void tcp_release_cb(struct sock *sk)
+{
+	struct tcp_sock *tp = tcp_sk(sk);
+
+	if (test_and_clear_bit(TSQ_OWNED, &tp->tsq_flags)) {
+		if ((1 << sk->sk_state) &
+		    (TCPF_ESTABLISHED | TCPF_FIN_WAIT1 |
+		     TCPF_CLOSING | TCPF_CLOSE_WAIT))
+			tcp_write_xmit(sk,
+				       tcp_current_mss(sk),
+				       0, 0,
+				       GFP_ATOMIC);
+	}
+}
+EXPORT_SYMBOL(tcp_release_cb);
+
+void __init tcp_tasklet_init(void)
+{
+	int i;
+
+	for_each_possible_cpu(i) {
+		struct tsq_tasklet *tsq = &per_cpu(tsq_tasklet, i);
+
+		INIT_LIST_HEAD(&tsq->head);
+		tasklet_init(&tsq->tasklet,
+			     tcp_tasklet_func,
+			     (unsigned long)tsq);
+	}
+}
+
+/*
+ * Write buffer destructor automatically called from kfree_skb.
+ * We cant xmit new skbs from this context, as we might already
+ * hold qdisc lock.
+ */
+void tcp_wfree(struct sk_buff *skb)
+{
+	struct sock *sk = skb->sk;
+	struct tcp_sock *tp = tcp_sk(sk);
+
+	if (test_and_clear_bit(TSQ_THROTTLED, &tp->tsq_flags) &&
+	    !test_and_set_bit(TSQ_QUEUED, &tp->tsq_flags)) {
+		unsigned long flags;
+		struct tsq_tasklet *tsq;
+
+		/* Keep a ref on socket.
+		 * This last ref will be released in tcp_tasklet_func()
+		 */
+		atomic_sub(skb->truesize - 1, &sk->sk_wmem_alloc);
+
+		/* queue this socket to tasklet queue */
+		local_irq_save(flags);
+		tsq = &__get_cpu_var(tsq_tasklet);
+		list_add(&tp->tsq_node, &tsq->head);
+		tasklet_schedule(&tsq->tasklet);
+		local_irq_restore(flags);
+	} else {
+		sock_wfree(skb);
+	}
+}
+
 /* This routine actually transmits TCP packets queued in by
  * tcp_do_sendmsg().  This is used by both the initial
  * transmission and possible later retransmissions.
@@ -844,7 +983,12 @@
 
 	skb_push(skb, tcp_header_size);
 	skb_reset_transport_header(skb);
-	skb_set_owner_w(skb, sk);
+
+	skb_orphan(skb);
+	skb->sk = sk;
+	skb->destructor = (sysctl_tcp_limit_output_bytes > 0) ?
+			  tcp_wfree : sock_wfree;
+	atomic_add(skb->truesize, &sk->sk_wmem_alloc);
 
 	/* Build TCP header and checksum it. */
 	th = tcp_hdr(skb);
@@ -1780,6 +1924,7 @@
 	while ((skb = tcp_send_head(sk))) {
 		unsigned int limit;
 
+
 		tso_segs = tcp_init_tso_segs(sk, skb, mss_now);
 		BUG_ON(!tso_segs);
 
@@ -1800,6 +1945,13 @@
 				break;
 		}
 
+		/* TSQ : sk_wmem_alloc accounts skb truesize,
+		 * including skb overhead. But thats OK.
+		 */
+		if (atomic_read(&sk->sk_wmem_alloc) >= sysctl_tcp_limit_output_bytes) {
+			set_bit(TSQ_THROTTLED, &tp->tsq_flags);
+			break;
+		}
 		limit = mss_now;
 		if (tso_segs > 1 && !tcp_urg_mode(tp))
 			limit = tcp_mss_split_point(sk, skb, mss_now,