mptcp: handle pending data on closed subflow
The PM can close active subflow, e.g. due to ingress RM_ADDR
option. Such subflow could carry data still unacked at the
MPTCP-level, both in the write and the rtx_queue, which has
never reached the other peer.
Currently the mptcp-level retransmission will deliver such data,
but at a very low rate (at most 1 DSM for each MPTCP rtx interval).
We can speed-up the recovery a lot, moving all the unacked in the
tcp write_queue, so that it will be pushed again via other
subflows, at the speed allowed by them.
Also make available the new helper for later patches.
Closes: https://github.com/multipath-tcp/mptcp_net-next/issues/207
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
Signed-off-by: Mat Martineau <mathew.j.martineau@linux.intel.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index decbb42..5fafa7a 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -1055,8 +1055,14 @@ static void __mptcp_clean_una(struct sock *sk)
if (after64(dfrag->data_seq + dfrag->data_len, snd_una))
break;
- if (WARN_ON_ONCE(dfrag == msk->first_pending))
- break;
+ if (unlikely(dfrag == msk->first_pending)) {
+ /* in recovery mode can see ack after the current snd head */
+ if (WARN_ON_ONCE(!msk->recovery))
+ break;
+
+ WRITE_ONCE(msk->first_pending, mptcp_send_next(sk));
+ }
+
dfrag_clear(sk, dfrag);
cleaned = true;
}
@@ -1065,8 +1071,14 @@ static void __mptcp_clean_una(struct sock *sk)
if (dfrag && after64(snd_una, dfrag->data_seq)) {
u64 delta = snd_una - dfrag->data_seq;
- if (WARN_ON_ONCE(delta > dfrag->already_sent))
- goto out;
+ /* prevent wrap around in recovery mode */
+ if (unlikely(delta > dfrag->already_sent)) {
+ if (WARN_ON_ONCE(!msk->recovery))
+ goto out;
+ if (WARN_ON_ONCE(delta > dfrag->data_len))
+ goto out;
+ dfrag->already_sent += delta - dfrag->already_sent;
+ }
dfrag->data_seq += delta;
dfrag->offset += delta;
@@ -1077,6 +1089,10 @@ static void __mptcp_clean_una(struct sock *sk)
cleaned = true;
}
+ /* all retransmitted data acked, recovery completed */
+ if (unlikely(msk->recovery) && after64(msk->snd_una, msk->recovery_snd_nxt))
+ msk->recovery = false;
+
out:
if (cleaned) {
if (tcp_under_memory_pressure(sk)) {
@@ -1085,7 +1101,7 @@ static void __mptcp_clean_una(struct sock *sk)
}
}
- if (snd_una == READ_ONCE(msk->snd_nxt)) {
+ if (snd_una == READ_ONCE(msk->snd_nxt) && !msk->recovery) {
if (mptcp_timer_pending(sk) && !mptcp_data_fin_enabled(msk))
mptcp_stop_timer(sk);
} else {
@@ -2148,6 +2164,50 @@ static void mptcp_dispose_initial_subflow(struct mptcp_sock *msk)
}
}
+bool __mptcp_retransmit_pending_data(struct sock *sk)
+{
+ struct mptcp_data_frag *cur, *rtx_head;
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ if (__mptcp_check_fallback(mptcp_sk(sk)))
+ return false;
+
+ if (tcp_rtx_and_write_queues_empty(sk))
+ return false;
+
+ /* the closing socket has some data untransmitted and/or unacked:
+ * some data in the mptcp rtx queue has not really xmitted yet.
+ * keep it simple and re-inject the whole mptcp level rtx queue
+ */
+ mptcp_data_lock(sk);
+ __mptcp_clean_una_wakeup(sk);
+ rtx_head = mptcp_rtx_head(sk);
+ if (!rtx_head) {
+ mptcp_data_unlock(sk);
+ return false;
+ }
+
+ /* will accept ack for reijected data before re-sending them */
+ if (!msk->recovery || after64(msk->snd_nxt, msk->recovery_snd_nxt))
+ msk->recovery_snd_nxt = msk->snd_nxt;
+ msk->recovery = true;
+ mptcp_data_unlock(sk);
+
+ msk->first_pending = rtx_head;
+ msk->tx_pending_data += msk->snd_nxt - rtx_head->data_seq;
+ msk->snd_nxt = rtx_head->data_seq;
+ msk->snd_burst = 0;
+
+ /* be sure to clear the "sent status" on all re-injected fragments */
+ list_for_each_entry(cur, &msk->rtx_queue, list) {
+ if (!cur->already_sent)
+ break;
+ cur->already_sent = 0;
+ }
+
+ return true;
+}
+
/* subflow sockets can be either outgoing (connect) or incoming
* (accept).
*
@@ -2160,6 +2220,7 @@ static void __mptcp_close_ssk(struct sock *sk, struct sock *ssk,
struct mptcp_subflow_context *subflow)
{
struct mptcp_sock *msk = mptcp_sk(sk);
+ bool need_push;
list_del(&subflow->node);
@@ -2171,6 +2232,7 @@ static void __mptcp_close_ssk(struct sock *sk, struct sock *ssk,
if (ssk->sk_socket)
sock_orphan(ssk);
+ need_push = __mptcp_retransmit_pending_data(sk);
subflow->disposable = 1;
/* if ssk hit tcp_done(), tcp_cleanup_ulp() cleared the related ops
@@ -2198,6 +2260,9 @@ static void __mptcp_close_ssk(struct sock *sk, struct sock *ssk,
if (msk->subflow && ssk == msk->subflow->sk)
mptcp_dispose_initial_subflow(msk);
+
+ if (need_push)
+ __mptcp_push_pending(sk, 0);
}
void mptcp_close_ssk(struct sock *sk, struct sock *ssk,
@@ -2410,6 +2475,7 @@ static int __mptcp_init_sock(struct sock *sk)
msk->first = NULL;
inet_csk(sk)->icsk_sync_mss = mptcp_sync_mss;
WRITE_ONCE(msk->csum_enabled, mptcp_is_checksum_enabled(sock_net(sk)));
+ msk->recovery = false;
mptcp_pm_data_init(msk);