rcu: Allow rcu_do_batch() to dynamically adjust batch sizes
Bimodal behavior of rcu_do_batch() is not really suited to Google
applications like gfe servers.
When a process with millions of sockets exits, closing all files
queues two rcu callbacks per socket.
This eventually reaches the point where RCU enters an emergency
mode, where rcu_do_batch() do not return until whole queue is flushed.
Each rcu callback lasts at least 70 nsec, so with millions of
elements, we easily spend more than 100 msec without rescheduling.
Goal of this patch is to avoid the infamous message like following
"need_resched set for > 51999388 ns (52 ticks) without schedule"
We dynamically adjust the number of elements we process, instead
of 10 / INFINITE choices, we use a floor of ~1 % of current entries.
If the number is above 1000, we switch to a time based limit of 3 msec
per batch, adjustable with /sys/module/rcutree/parameters/rcu_resched_ns
Signed-off-by: Eric Dumazet <edumazet@google.com>
[ paulmck: Forward-port and remove debug statements. ]
Signed-off-by: Paul E. McKenney <paulmck@linux.ibm.com>
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 3e89b5b..71395e9 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -56,6 +56,7 @@
#include <linux/smpboot.h>
#include <linux/jiffies.h>
#include <linux/sched/isolation.h>
+#include <linux/sched/clock.h>
#include "../time/tick-internal.h"
#include "tree.h"
@@ -416,6 +417,12 @@ module_param(qlowmark, long, 0444);
static ulong jiffies_till_first_fqs = ULONG_MAX;
static ulong jiffies_till_next_fqs = ULONG_MAX;
static bool rcu_kick_kthreads;
+static int rcu_divisor = 7;
+module_param(rcu_divisor, int, 0644);
+
+/* Force an exit from rcu_do_batch() after 3 milliseconds. */
+static long rcu_resched_ns = 3 * NSEC_PER_MSEC;
+module_param(rcu_resched_ns, long, 0644);
/*
* How long the grace period must be before we start recruiting
@@ -2109,6 +2116,7 @@ static void rcu_do_batch(struct rcu_data *rdp)
struct rcu_head *rhp;
struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
long bl, count;
+ long pending, tlimit = 0;
/* If no callbacks are ready, just return. */
if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
@@ -2130,7 +2138,10 @@ static void rcu_do_batch(struct rcu_data *rdp)
local_irq_save(flags);
rcu_nocb_lock(rdp);
WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
- bl = rdp->blimit;
+ pending = rcu_segcblist_n_cbs(&rdp->cblist);
+ bl = max(rdp->blimit, pending >> rcu_divisor);
+ if (unlikely(bl > 100))
+ tlimit = local_clock() + rcu_resched_ns;
trace_rcu_batch_start(rcu_state.name,
rcu_segcblist_n_lazy_cbs(&rdp->cblist),
rcu_segcblist_n_cbs(&rdp->cblist), bl);
@@ -2153,6 +2164,13 @@ static void rcu_do_batch(struct rcu_data *rdp)
(need_resched() ||
(!is_idle_task(current) && !rcu_is_callbacks_kthread())))
break;
+ if (unlikely(tlimit)) {
+ /* only call local_clock() every 32 callbacks */
+ if (likely((-rcl.len & 31) || local_clock() < tlimit))
+ continue;
+ /* Exceeded the time limit, so leave. */
+ break;
+ }
if (offloaded) {
WARN_ON_ONCE(in_serving_softirq());
local_bh_enable();