rcu: Defer reporting RCU-preempt quiescent states when disabled

This commit defers reporting of RCU-preempt quiescent states at
rcu_read_unlock_special() time when any of interrupts, softirq, or
preemption are disabled.  These deferred quiescent states are reported
at a later RCU_SOFTIRQ, context switch, idle entry, or CPU-hotplug
offline operation.  Of course, if another RCU read-side critical
section has started in the meantime, the reporting of the quiescent
state will be further deferred.

This also means that disabling preemption, interrupts, and/or
softirqs will act as an RCU-preempt read-side critical section.
This is enforced by checking preempt_count() as needed.

Some special cases must be handled on an ad-hoc basis, for example,
context switch is a quiescent state even though both the scheduler and
do_exit() disable preemption.  In these cases, additional calls to
rcu_preempt_deferred_qs() override the preemption disabling.  Similar
logic overrides disabled interrupts in rcu_preempt_check_callbacks()
because in this case the quiescent state happened just before the
corresponding scheduling-clock interrupt.

In theory, this change lifts a long-standing restriction that required
that if interrupts were disabled across a call to rcu_read_unlock()
that the matching rcu_read_lock() also be contained within that
interrupts-disabled region of code.  Because the reporting of the
corresponding RCU-preempt quiescent state is now deferred until
after interrupts have been enabled, it is no longer possible for this
situation to result in deadlocks involving the scheduler's runqueue and
priority-inheritance locks.  This may allow some code simplification that
might reduce interrupt latency a bit.  Unfortunately, in practice this
would also defer deboosting a low-priority task that had been subjected
to RCU priority boosting, so real-time-response considerations might
well force this restriction to remain in place.

Because RCU-preempt grace periods are now blocked not only by RCU
read-side critical sections, but also by disabling of interrupts,
preemption, and softirqs, it will be possible to eliminate RCU-bh and
RCU-sched in favor of RCU-preempt in CONFIG_PREEMPT=y kernels.  This may
require some additional plumbing to provide the network denial-of-service
guarantees that have been traditionally provided by RCU-bh.  Once these
are in place, CONFIG_PREEMPT=n kernels will be able to fold RCU-bh
into RCU-sched.  This would mean that all kernels would have but
one flavor of RCU, which would open the door to significant code
cleanup.

Moving to a single flavor of RCU would also have the beneficial effect
of reducing the NOCB kthreads by at least a factor of two.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
[ paulmck: Apply rcu_read_unlock_special() preempt_count() feedback
  from Joel Fernandes. ]
[ paulmck: Adjust rcu_eqs_enter() call to rcu_preempt_deferred_qs() in
  response to bug reports from kbuild test robot. ]
[ paulmck: Fix bug located by kbuild test robot involving recursion
  via rcu_preempt_deferred_qs(). ]
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 3678678..3466247 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -422,6 +422,7 @@ static void rcu_momentary_dyntick_idle(void)
 	special = atomic_add_return(2 * RCU_DYNTICK_CTRL_CTR, &rdtp->dynticks);
 	/* It is illegal to call this from idle state. */
 	WARN_ON_ONCE(!(special & RCU_DYNTICK_CTRL_CTR));
+	rcu_preempt_deferred_qs(current);
 }
 
 /*
@@ -729,6 +730,7 @@ static void rcu_eqs_enter(bool user)
 		do_nocb_deferred_wakeup(rdp);
 	}
 	rcu_prepare_for_idle();
+	rcu_preempt_deferred_qs(current);
 	WRITE_ONCE(rdtp->dynticks_nesting, 0); /* Avoid irq-access tearing. */
 	rcu_dynticks_eqs_enter();
 	rcu_dynticks_task_enter();
@@ -2850,6 +2852,12 @@ __rcu_process_callbacks(struct rcu_state *rsp)
 
 	WARN_ON_ONCE(!rdp->beenonline);
 
+	/* Report any deferred quiescent states if preemption enabled. */
+	if (!(preempt_count() & PREEMPT_MASK))
+		rcu_preempt_deferred_qs(current);
+	else if (rcu_preempt_need_deferred_qs(current))
+		resched_cpu(rdp->cpu); /* Provoke future context switch. */
+
 	/* Update RCU state based on any recent quiescent states. */
 	rcu_check_quiescent_state(rsp, rdp);
 
@@ -3823,6 +3831,7 @@ void rcu_report_dead(unsigned int cpu)
 	rcu_report_exp_rdp(&rcu_sched_state,
 			   this_cpu_ptr(rcu_sched_state.rda), true);
 	preempt_enable();
+	rcu_preempt_deferred_qs(current);
 	for_each_rcu_flavor(rsp)
 		rcu_cleanup_dying_idle_cpu(cpu, rsp);
 
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 4e74df7..025bd2e 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -195,6 +195,7 @@ struct rcu_data {
 	bool		core_needs_qs;	/* Core waits for quiesc state. */
 	bool		beenonline;	/* CPU online at least once. */
 	bool		gpwrap;		/* Possible ->gp_seq wrap. */
+	bool		deferred_qs;	/* This CPU awaiting a deferred QS? */
 	struct rcu_node *mynode;	/* This CPU's leaf of hierarchy */
 	unsigned long grpmask;		/* Mask to apply to leaf qsmask. */
 	unsigned long	ticks_this_gp;	/* The number of scheduling-clock */
@@ -461,6 +462,8 @@ static void rcu_cleanup_after_idle(void);
 static void rcu_prepare_for_idle(void);
 static void rcu_idle_count_callbacks_posted(void);
 static bool rcu_preempt_has_tasks(struct rcu_node *rnp);
+static bool rcu_preempt_need_deferred_qs(struct task_struct *t);
+static void rcu_preempt_deferred_qs(struct task_struct *t);
 static void print_cpu_stall_info_begin(void);
 static void print_cpu_stall_info(struct rcu_state *rsp, int cpu);
 static void print_cpu_stall_info_end(void);
diff --git a/kernel/rcu/tree_exp.h b/kernel/rcu/tree_exp.h
index 0b2c2ad..f9d5bbd8 100644
--- a/kernel/rcu/tree_exp.h
+++ b/kernel/rcu/tree_exp.h
@@ -262,6 +262,7 @@ static void rcu_report_exp_cpu_mult(struct rcu_state *rsp, struct rcu_node *rnp,
 static void rcu_report_exp_rdp(struct rcu_state *rsp, struct rcu_data *rdp,
 			       bool wake)
 {
+	WRITE_ONCE(rdp->deferred_qs, false);
 	rcu_report_exp_cpu_mult(rsp, rdp->mynode, rdp->grpmask, wake);
 }
 
@@ -735,32 +736,70 @@ EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
  */
 static void sync_rcu_exp_handler(void *info)
 {
-	struct rcu_data *rdp;
+	unsigned long flags;
 	struct rcu_state *rsp = info;
+	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
+	struct rcu_node *rnp = rdp->mynode;
 	struct task_struct *t = current;
 
 	/*
-	 * Within an RCU read-side critical section, request that the next
-	 * rcu_read_unlock() report.  Unless this RCU read-side critical
-	 * section has already blocked, in which case it is already set
-	 * up for the expedited grace period to wait on it.
+	 * First, the common case of not being in an RCU read-side
+	 * critical section.  If also enabled or idle, immediately
+	 * report the quiescent state, otherwise defer.
 	 */
-	if (t->rcu_read_lock_nesting > 0 &&
-	    !t->rcu_read_unlock_special.b.blocked) {
-		t->rcu_read_unlock_special.b.exp_need_qs = true;
+	if (!t->rcu_read_lock_nesting) {
+		if (!(preempt_count() & (PREEMPT_MASK | SOFTIRQ_MASK)) ||
+		    rcu_dynticks_curr_cpu_in_eqs()) {
+			rcu_report_exp_rdp(rsp, rdp, true);
+		} else {
+			rdp->deferred_qs = true;
+			resched_cpu(rdp->cpu);
+		}
 		return;
 	}
 
 	/*
-	 * We are either exiting an RCU read-side critical section (negative
-	 * values of t->rcu_read_lock_nesting) or are not in one at all
-	 * (zero value of t->rcu_read_lock_nesting).  Or we are in an RCU
-	 * read-side critical section that blocked before this expedited
-	 * grace period started.  Either way, we can immediately report
-	 * the quiescent state.
+	 * Second, the less-common case of being in an RCU read-side
+	 * critical section.  In this case we can count on a future
+	 * rcu_read_unlock().  However, this rcu_read_unlock() might
+	 * execute on some other CPU, but in that case there will be
+	 * a future context switch.  Either way, if the expedited
+	 * grace period is still waiting on this CPU, set ->deferred_qs
+	 * so that the eventual quiescent state will be reported.
+	 * Note that there is a large group of race conditions that
+	 * can have caused this quiescent state to already have been
+	 * reported, so we really do need to check ->expmask.
 	 */
-	rdp = this_cpu_ptr(rsp->rda);
-	rcu_report_exp_rdp(rsp, rdp, true);
+	if (t->rcu_read_lock_nesting > 0) {
+		raw_spin_lock_irqsave_rcu_node(rnp, flags);
+		if (rnp->expmask & rdp->grpmask)
+			rdp->deferred_qs = true;
+		raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
+	}
+
+	/*
+	 * The final and least likely case is where the interrupted
+	 * code was just about to or just finished exiting the RCU-preempt
+	 * read-side critical section, and no, we can't tell which.
+	 * So either way, set ->deferred_qs to flag later code that
+	 * a quiescent state is required.
+	 *
+	 * If the CPU is fully enabled (or if some buggy RCU-preempt
+	 * read-side critical section is being used from idle), just
+	 * invoke rcu_preempt_defer_qs() to immediately report the
+	 * quiescent state.  We cannot use rcu_read_unlock_special()
+	 * because we are in an interrupt handler, which will cause that
+	 * function to take an early exit without doing anything.
+	 *
+	 * Otherwise, use resched_cpu() to force a context switch after
+	 * the CPU enables everything.
+	 */
+	rdp->deferred_qs = true;
+	if (!(preempt_count() & (PREEMPT_MASK | SOFTIRQ_MASK)) ||
+	    WARN_ON_ONCE(rcu_dynticks_curr_cpu_in_eqs()))
+		rcu_preempt_deferred_qs(t);
+	else
+		resched_cpu(rdp->cpu);
 }
 
 /**
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index a97c20e..5427913 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -371,6 +371,9 @@ static void rcu_preempt_note_context_switch(bool preempt)
 		 * behalf of preempted instance of __rcu_read_unlock().
 		 */
 		rcu_read_unlock_special(t);
+		rcu_preempt_deferred_qs(t);
+	} else {
+		rcu_preempt_deferred_qs(t);
 	}
 
 	/*
@@ -464,54 +467,51 @@ static bool rcu_preempt_has_tasks(struct rcu_node *rnp)
 }
 
 /*
- * Handle special cases during rcu_read_unlock(), such as needing to
- * notify RCU core processing or task having blocked during the RCU
- * read-side critical section.
+ * Report deferred quiescent states.  The deferral time can
+ * be quite short, for example, in the case of the call from
+ * rcu_read_unlock_special().
  */
-static void rcu_read_unlock_special(struct task_struct *t)
+static void
+rcu_preempt_deferred_qs_irqrestore(struct task_struct *t, unsigned long flags)
 {
 	bool empty_exp;
 	bool empty_norm;
 	bool empty_exp_now;
-	unsigned long flags;
 	struct list_head *np;
 	bool drop_boost_mutex = false;
 	struct rcu_data *rdp;
 	struct rcu_node *rnp;
 	union rcu_special special;
 
-	/* NMI handlers cannot block and cannot safely manipulate state. */
-	if (in_nmi())
-		return;
-
-	local_irq_save(flags);
-
 	/*
 	 * If RCU core is waiting for this CPU to exit its critical section,
 	 * report the fact that it has exited.  Because irqs are disabled,
 	 * t->rcu_read_unlock_special cannot change.
 	 */
 	special = t->rcu_read_unlock_special;
+	rdp = this_cpu_ptr(rcu_state_p->rda);
+	if (!special.s && !rdp->deferred_qs) {
+		local_irq_restore(flags);
+		return;
+	}
 	if (special.b.need_qs) {
 		rcu_preempt_qs();
 		t->rcu_read_unlock_special.b.need_qs = false;
-		if (!t->rcu_read_unlock_special.s) {
+		if (!t->rcu_read_unlock_special.s && !rdp->deferred_qs) {
 			local_irq_restore(flags);
 			return;
 		}
 	}
 
 	/*
-	 * Respond to a request for an expedited grace period, but only if
-	 * we were not preempted, meaning that we were running on the same
-	 * CPU throughout.  If we were preempted, the exp_need_qs flag
-	 * would have been cleared at the time of the first preemption,
-	 * and the quiescent state would be reported when we were dequeued.
+	 * Respond to a request by an expedited grace period for a
+	 * quiescent state from this CPU.  Note that requests from
+	 * tasks are handled when removing the task from the
+	 * blocked-tasks list below.
 	 */
-	if (special.b.exp_need_qs) {
-		WARN_ON_ONCE(special.b.blocked);
+	if (special.b.exp_need_qs || rdp->deferred_qs) {
 		t->rcu_read_unlock_special.b.exp_need_qs = false;
-		rdp = this_cpu_ptr(rcu_state_p->rda);
+		rdp->deferred_qs = false;
 		rcu_report_exp_rdp(rcu_state_p, rdp, true);
 		if (!t->rcu_read_unlock_special.s) {
 			local_irq_restore(flags);
@@ -519,19 +519,6 @@ static void rcu_read_unlock_special(struct task_struct *t)
 		}
 	}
 
-	/* Hardware IRQ handlers cannot block, complain if they get here. */
-	if (in_irq() || in_serving_softirq()) {
-		lockdep_rcu_suspicious(__FILE__, __LINE__,
-				       "rcu_read_unlock() from irq or softirq with blocking in critical section!!!\n");
-		pr_alert("->rcu_read_unlock_special: %#x (b: %d, enq: %d nq: %d)\n",
-			 t->rcu_read_unlock_special.s,
-			 t->rcu_read_unlock_special.b.blocked,
-			 t->rcu_read_unlock_special.b.exp_need_qs,
-			 t->rcu_read_unlock_special.b.need_qs);
-		local_irq_restore(flags);
-		return;
-	}
-
 	/* Clean up if blocked during RCU read-side critical section. */
 	if (special.b.blocked) {
 		t->rcu_read_unlock_special.b.blocked = false;
@@ -603,6 +590,72 @@ static void rcu_read_unlock_special(struct task_struct *t)
 }
 
 /*
+ * Is a deferred quiescent-state pending, and are we also not in
+ * an RCU read-side critical section?  It is the caller's responsibility
+ * to ensure it is otherwise safe to report any deferred quiescent
+ * states.  The reason for this is that it is safe to report a
+ * quiescent state during context switch even though preemption
+ * is disabled.  This function cannot be expected to understand these
+ * nuances, so the caller must handle them.
+ */
+static bool rcu_preempt_need_deferred_qs(struct task_struct *t)
+{
+	return (this_cpu_ptr(&rcu_preempt_data)->deferred_qs ||
+		READ_ONCE(t->rcu_read_unlock_special.s)) &&
+	       !t->rcu_read_lock_nesting;
+}
+
+/*
+ * Report a deferred quiescent state if needed and safe to do so.
+ * As with rcu_preempt_need_deferred_qs(), "safe" involves only
+ * not being in an RCU read-side critical section.  The caller must
+ * evaluate safety in terms of interrupt, softirq, and preemption
+ * disabling.
+ */
+static void rcu_preempt_deferred_qs(struct task_struct *t)
+{
+	unsigned long flags;
+	bool couldrecurse = t->rcu_read_lock_nesting >= 0;
+
+	if (!rcu_preempt_need_deferred_qs(t))
+		return;
+	if (couldrecurse)
+		t->rcu_read_lock_nesting -= INT_MIN;
+	local_irq_save(flags);
+	rcu_preempt_deferred_qs_irqrestore(t, flags);
+	if (couldrecurse)
+		t->rcu_read_lock_nesting += INT_MIN;
+}
+
+/*
+ * Handle special cases during rcu_read_unlock(), such as needing to
+ * notify RCU core processing or task having blocked during the RCU
+ * read-side critical section.
+ */
+static void rcu_read_unlock_special(struct task_struct *t)
+{
+	unsigned long flags;
+	bool preempt_bh_were_disabled =
+			!!(preempt_count() & (PREEMPT_MASK | SOFTIRQ_MASK));
+	bool irqs_were_disabled;
+
+	/* NMI handlers cannot block and cannot safely manipulate state. */
+	if (in_nmi())
+		return;
+
+	local_irq_save(flags);
+	irqs_were_disabled = irqs_disabled_flags(flags);
+	if ((preempt_bh_were_disabled || irqs_were_disabled) &&
+	    t->rcu_read_unlock_special.b.blocked) {
+		/* Need to defer quiescent state until everything is enabled. */
+		raise_softirq_irqoff(RCU_SOFTIRQ);
+		local_irq_restore(flags);
+		return;
+	}
+	rcu_preempt_deferred_qs_irqrestore(t, flags);
+}
+
+/*
  * Dump detailed information for all tasks blocking the current RCU
  * grace period on the specified rcu_node structure.
  */
@@ -737,10 +790,20 @@ static void rcu_preempt_check_callbacks(void)
 	struct rcu_state *rsp = &rcu_preempt_state;
 	struct task_struct *t = current;
 
-	if (t->rcu_read_lock_nesting == 0) {
-		rcu_preempt_qs();
+	if (t->rcu_read_lock_nesting > 0 ||
+	    (preempt_count() & (PREEMPT_MASK | SOFTIRQ_MASK))) {
+		/* No QS, force context switch if deferred. */
+		if (rcu_preempt_need_deferred_qs(t))
+			resched_cpu(smp_processor_id());
+	} else if (rcu_preempt_need_deferred_qs(t)) {
+		rcu_preempt_deferred_qs(t); /* Report deferred QS. */
+		return;
+	} else if (!t->rcu_read_lock_nesting) {
+		rcu_preempt_qs(); /* Report immediate QS. */
 		return;
 	}
+
+	/* If GP is oldish, ask for help from rcu_read_unlock_special(). */
 	if (t->rcu_read_lock_nesting > 0 &&
 	    __this_cpu_read(rcu_data_p->core_needs_qs) &&
 	    __this_cpu_read(rcu_data_p->cpu_no_qs.b.norm) &&
@@ -859,6 +922,7 @@ void exit_rcu(void)
 	barrier();
 	t->rcu_read_unlock_special.b.blocked = true;
 	__rcu_read_unlock();
+	rcu_preempt_deferred_qs(current);
 }
 
 /*
@@ -941,6 +1005,16 @@ static bool rcu_preempt_has_tasks(struct rcu_node *rnp)
 }
 
 /*
+ * Because there is no preemptible RCU, there can be no deferred quiescent
+ * states.
+ */
+static bool rcu_preempt_need_deferred_qs(struct task_struct *t)
+{
+	return false;
+}
+static void rcu_preempt_deferred_qs(struct task_struct *t) { }
+
+/*
  * Because preemptible RCU does not exist, we never have to check for
  * tasks blocked within RCU read-side critical sections.
  */