rcu: Make callers awaken grace-period kthread

The rcu_start_gp_advanced() function currently uses irq_work_queue()
to defer wakeups of the RCU grace-period kthread.  This deferring
is necessary to avoid RCU-scheduler deadlocks involving the rcu_node
structure's lock, meaning that RCU cannot call any of the scheduler's
wake-up functions while holding one of these locks.

Unfortunately, the second and subsequent calls to irq_work_queue() are
ignored, and the first call will be ignored (aside from queuing the work
item) if the scheduler-clock tick is turned off.  This is OK for many
uses, especially those where irq_work_queue() is called from an interrupt
or softirq handler, because in those cases the scheduler-clock-tick state
will be re-evaluated, which will turn the scheduler-clock tick back on.
On the next tick, any deferred work will then be processed.

However, this strategy does not always work for RCU, which can be invoked
at process level from idle CPUs.  In this case, the tick might never
be turned back on, indefinitely defering a grace-period start request.
Note that the RCU CPU stall detector cannot see this condition, because
there is no RCU grace period in progress.  Therefore, we can (and do!)
see long tens-of-seconds stalls in grace-period handling.  In theory,
we could see a full grace-period hang, but rcutorture testing to date
has seen only the tens-of-seconds stalls.  Event tracing demonstrates
that irq_work_queue() is being called repeatedly to no effect during
these stalls: The "newreq" event appears repeatedly from a task that is
not one of the grace-period kthreads.

In theory, irq_work_queue() might be fixed to avoid this sort of issue,
but RCU's requirements are unusual and it is quite straightforward to pass
wake-up responsibility up through RCU's call chain, so that the wakeup
happens when the offending locks are released.

This commit therefore makes this change.  The rcu_start_gp_advanced(),
rcu_start_future_gp(), rcu_accelerate_cbs(), rcu_advance_cbs(),
__note_gp_changes(), and rcu_start_gp() functions now return a boolean
which indicates when a wake-up is needed.  A new rcu_gp_kthread_wake()
does the wakeup when it is necessary and safe to do so: No self-wakes,
no wake-ups if the ->gp_flags field indicates there is no need (as in
someone else did the wake-up before we got around to it), and no wake-ups
before the grace-period kthread has been created.

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Cc: Peter Zijlstra <peterz@infradead.org>
Cc: Steven Rostedt <rostedt@goodmis.org>
Cc: Frederic Weisbecker <fweisbec@gmail.com>
Reviewed-by: Josh Triplett <josh@joshtriplett.org>
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index c624415..fca911b 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -243,7 +243,7 @@
 module_param(jiffies_till_first_fqs, ulong, 0644);
 module_param(jiffies_till_next_fqs, ulong, 0644);
 
-static void rcu_start_gp_advanced(struct rcu_state *rsp, struct rcu_node *rnp,
+static bool rcu_start_gp_advanced(struct rcu_state *rsp, struct rcu_node *rnp,
 				  struct rcu_data *rdp);
 static void force_qs_rnp(struct rcu_state *rsp,
 			 int (*f)(struct rcu_data *rsp, bool *isidle,
@@ -1138,15 +1138,18 @@
 /*
  * Start some future grace period, as needed to handle newly arrived
  * callbacks.  The required future grace periods are recorded in each
- * rcu_node structure's ->need_future_gp field.
+ * rcu_node structure's ->need_future_gp field.  Returns true if there
+ * is reason to awaken the grace-period kthread.
  *
  * The caller must hold the specified rcu_node structure's ->lock.
  */
-static unsigned long __maybe_unused
-rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp)
+static bool __maybe_unused
+rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp,
+		    unsigned long *c_out)
 {
 	unsigned long c;
 	int i;
+	bool ret = false;
 	struct rcu_node *rnp_root = rcu_get_root(rdp->rsp);
 
 	/*
@@ -1157,7 +1160,7 @@
 	trace_rcu_future_gp(rnp, rdp, c, TPS("Startleaf"));
 	if (rnp->need_future_gp[c & 0x1]) {
 		trace_rcu_future_gp(rnp, rdp, c, TPS("Prestartleaf"));
-		return c;
+		goto out;
 	}
 
 	/*
@@ -1171,7 +1174,7 @@
 	    ACCESS_ONCE(rnp->gpnum) != ACCESS_ONCE(rnp->completed)) {
 		rnp->need_future_gp[c & 0x1]++;
 		trace_rcu_future_gp(rnp, rdp, c, TPS("Startedleaf"));
-		return c;
+		goto out;
 	}
 
 	/*
@@ -1212,12 +1215,15 @@
 		trace_rcu_future_gp(rnp, rdp, c, TPS("Startedleafroot"));
 	} else {
 		trace_rcu_future_gp(rnp, rdp, c, TPS("Startedroot"));
-		rcu_start_gp_advanced(rdp->rsp, rnp_root, rdp);
+		ret = rcu_start_gp_advanced(rdp->rsp, rnp_root, rdp);
 	}
 unlock_out:
 	if (rnp != rnp_root)
 		raw_spin_unlock(&rnp_root->lock);
-	return c;
+out:
+	if (c_out != NULL)
+		*c_out = c;
+	return ret;
 }
 
 /*
@@ -1241,25 +1247,43 @@
 }
 
 /*
+ * Awaken the grace-period kthread for the specified flavor of RCU.
+ * Don't do a self-awaken, and don't bother awakening when there is
+ * nothing for the grace-period kthread to do (as in several CPUs
+ * raced to awaken, and we lost), and finally don't try to awaken
+ * a kthread that has not yet been created.
+ */
+static void rcu_gp_kthread_wake(struct rcu_state *rsp)
+{
+	if (current == rsp->gp_kthread ||
+	    !ACCESS_ONCE(rsp->gp_flags) ||
+	    !rsp->gp_kthread)
+		return;
+	wake_up(&rsp->gp_wq);
+}
+
+/*
  * If there is room, assign a ->completed number to any callbacks on
  * this CPU that have not already been assigned.  Also accelerate any
  * callbacks that were previously assigned a ->completed number that has
  * since proven to be too conservative, which can happen if callbacks get
  * assigned a ->completed number while RCU is idle, but with reference to
  * a non-root rcu_node structure.  This function is idempotent, so it does
- * not hurt to call it repeatedly.
+ * not hurt to call it repeatedly.  Returns an flag saying that we should
+ * awaken the RCU grace-period kthread.
  *
  * The caller must hold rnp->lock with interrupts disabled.
  */
-static void rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
+static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
 			       struct rcu_data *rdp)
 {
 	unsigned long c;
 	int i;
+	bool ret;
 
 	/* If the CPU has no callbacks, nothing to do. */
 	if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
-		return;
+		return false;
 
 	/*
 	 * Starting from the sublist containing the callbacks most
@@ -1288,7 +1312,7 @@
 	 * be grouped into.
 	 */
 	if (++i >= RCU_NEXT_TAIL)
-		return;
+		return false;
 
 	/*
 	 * Assign all subsequent callbacks' ->completed number to the next
@@ -1300,13 +1324,14 @@
 		rdp->nxtcompleted[i] = c;
 	}
 	/* Record any needed additional grace periods. */
-	rcu_start_future_gp(rnp, rdp);
+	ret = rcu_start_future_gp(rnp, rdp, NULL);
 
 	/* Trace depending on how much we were able to accelerate. */
 	if (!*rdp->nxttail[RCU_WAIT_TAIL])
 		trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccWaitCB"));
 	else
 		trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccReadyCB"));
+	return ret;
 }
 
 /*
@@ -1315,17 +1340,18 @@
  * assign ->completed numbers to any callbacks in the RCU_NEXT_TAIL
  * sublist.  This function is idempotent, so it does not hurt to
  * invoke it repeatedly.  As long as it is not invoked -too- often...
+ * Returns true if the RCU grace-period kthread needs to be awakened.
  *
  * The caller must hold rnp->lock with interrupts disabled.
  */
-static void rcu_advance_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
+static bool rcu_advance_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
 			    struct rcu_data *rdp)
 {
 	int i, j;
 
 	/* If the CPU has no callbacks, nothing to do. */
 	if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
-		return;
+		return false;
 
 	/*
 	 * Find all callbacks whose ->completed numbers indicate that they
@@ -1349,26 +1375,30 @@
 	}
 
 	/* Classify any remaining callbacks. */
-	rcu_accelerate_cbs(rsp, rnp, rdp);
+	return rcu_accelerate_cbs(rsp, rnp, rdp);
 }
 
 /*
  * Update CPU-local rcu_data state to record the beginnings and ends of
  * grace periods.  The caller must hold the ->lock of the leaf rcu_node
  * structure corresponding to the current CPU, and must have irqs disabled.
+ * Returns true if the grace-period kthread needs to be awakened.
  */
-static void __note_gp_changes(struct rcu_state *rsp, struct rcu_node *rnp, struct rcu_data *rdp)
+static bool __note_gp_changes(struct rcu_state *rsp, struct rcu_node *rnp,
+			      struct rcu_data *rdp)
 {
+	bool ret;
+
 	/* Handle the ends of any preceding grace periods first. */
 	if (rdp->completed == rnp->completed) {
 
 		/* No grace period end, so just accelerate recent callbacks. */
-		rcu_accelerate_cbs(rsp, rnp, rdp);
+		ret = rcu_accelerate_cbs(rsp, rnp, rdp);
 
 	} else {
 
 		/* Advance callbacks. */
-		rcu_advance_cbs(rsp, rnp, rdp);
+		ret = rcu_advance_cbs(rsp, rnp, rdp);
 
 		/* Remember that we saw this grace-period completion. */
 		rdp->completed = rnp->completed;
@@ -1387,11 +1417,13 @@
 		rdp->qs_pending = !!(rnp->qsmask & rdp->grpmask);
 		zero_cpu_stall_ticks(rdp);
 	}
+	return ret;
 }
 
 static void note_gp_changes(struct rcu_state *rsp, struct rcu_data *rdp)
 {
 	unsigned long flags;
+	bool needwake;
 	struct rcu_node *rnp;
 
 	local_irq_save(flags);
@@ -1403,8 +1435,10 @@
 		return;
 	}
 	smp_mb__after_unlock_lock();
-	__note_gp_changes(rsp, rnp, rdp);
+	needwake = __note_gp_changes(rsp, rnp, rdp);
 	raw_spin_unlock_irqrestore(&rnp->lock, flags);
+	if (needwake)
+		rcu_gp_kthread_wake(rsp);
 }
 
 /*
@@ -1468,7 +1502,7 @@
 		WARN_ON_ONCE(rnp->completed != rsp->completed);
 		ACCESS_ONCE(rnp->completed) = rsp->completed;
 		if (rnp == rdp->mynode)
-			__note_gp_changes(rsp, rnp, rdp);
+			(void)__note_gp_changes(rsp, rnp, rdp);
 		rcu_preempt_boost_start_gp(rnp);
 		trace_rcu_grace_period_init(rsp->name, rnp->gpnum,
 					    rnp->level, rnp->grplo,
@@ -1528,6 +1562,7 @@
 static void rcu_gp_cleanup(struct rcu_state *rsp)
 {
 	unsigned long gp_duration;
+	bool needgp = false;
 	int nocb = 0;
 	struct rcu_data *rdp;
 	struct rcu_node *rnp = rcu_get_root(rsp);
@@ -1563,7 +1598,7 @@
 		ACCESS_ONCE(rnp->completed) = rsp->gpnum;
 		rdp = this_cpu_ptr(rsp->rda);
 		if (rnp == rdp->mynode)
-			__note_gp_changes(rsp, rnp, rdp);
+			needgp = __note_gp_changes(rsp, rnp, rdp) || needgp;
 		/* smp_mb() provided by prior unlock-lock pair. */
 		nocb += rcu_future_gp_cleanup(rsp, rnp);
 		raw_spin_unlock_irq(&rnp->lock);
@@ -1579,8 +1614,9 @@
 	trace_rcu_grace_period(rsp->name, rsp->completed, TPS("end"));
 	rsp->fqs_state = RCU_GP_IDLE;
 	rdp = this_cpu_ptr(rsp->rda);
-	rcu_advance_cbs(rsp, rnp, rdp);  /* Reduce false positives below. */
-	if (cpu_needs_another_gp(rsp, rdp)) {
+	/* Advance CBs to reduce false positives below. */
+	needgp = rcu_advance_cbs(rsp, rnp, rdp) || needgp;
+	if (needgp || cpu_needs_another_gp(rsp, rdp)) {
 		ACCESS_ONCE(rsp->gp_flags) = RCU_GP_FLAG_INIT;
 		trace_rcu_grace_period(rsp->name,
 				       ACCESS_ONCE(rsp->gpnum),
@@ -1680,16 +1716,6 @@
 	}
 }
 
-static void rsp_wakeup(struct irq_work *work)
-{
-	struct rcu_state *rsp = container_of(work, struct rcu_state, wakeup_work);
-
-	/* Wake up rcu_gp_kthread() to start the grace period. */
-	wake_up(&rsp->gp_wq);
-	trace_rcu_grace_period(rsp->name, ACCESS_ONCE(rsp->gpnum),
-			       "Workqueuewoken");
-}
-
 /*
  * Start a new RCU grace period if warranted, re-initializing the hierarchy
  * in preparation for detecting the next grace period.  The caller must hold
@@ -1698,8 +1724,10 @@
  * Note that it is legal for a dying CPU (which is marked as offline) to
  * invoke this function.  This can happen when the dying CPU reports its
  * quiescent state.
+ *
+ * Returns true if the grace-period kthread must be awakened.
  */
-static void
+static bool
 rcu_start_gp_advanced(struct rcu_state *rsp, struct rcu_node *rnp,
 		      struct rcu_data *rdp)
 {
@@ -1710,7 +1738,7 @@
 		 * or a grace period is already in progress.
 		 * Either way, don't start a new grace period.
 		 */
-		return;
+		return false;
 	}
 	ACCESS_ONCE(rsp->gp_flags) = RCU_GP_FLAG_INIT;
 	trace_rcu_grace_period(rsp->name, ACCESS_ONCE(rsp->gpnum),
@@ -1719,14 +1747,9 @@
 	/*
 	 * We can't do wakeups while holding the rnp->lock, as that
 	 * could cause possible deadlocks with the rq->lock. Defer
-	 * the wakeup to interrupt context.  And don't bother waking
-	 * up the running kthread.
+	 * the wakeup to our caller.
 	 */
-	if (current != rsp->gp_kthread) {
-		trace_rcu_grace_period(rsp->name, ACCESS_ONCE(rsp->gpnum),
-				       "Workqueuewake");
-		irq_work_queue(&rsp->wakeup_work);
-	}
+	return true;
 }
 
 /*
@@ -1735,12 +1758,14 @@
  * is invoked indirectly from rcu_advance_cbs(), which would result in
  * endless recursion -- or would do so if it wasn't for the self-deadlock
  * that is encountered beforehand.
+ *
+ * Returns true if the grace-period kthread needs to be awakened.
  */
-static void
-rcu_start_gp(struct rcu_state *rsp)
+static bool rcu_start_gp(struct rcu_state *rsp)
 {
 	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
 	struct rcu_node *rnp = rcu_get_root(rsp);
+	bool ret = false;
 
 	/*
 	 * If there is no grace period in progress right now, any
@@ -1750,8 +1775,9 @@
 	 * resulting in pointless grace periods.  So, advance callbacks
 	 * then start the grace period!
 	 */
-	rcu_advance_cbs(rsp, rnp, rdp);
-	rcu_start_gp_advanced(rsp, rnp, rdp);
+	ret = rcu_advance_cbs(rsp, rnp, rdp) || ret;
+	ret = rcu_start_gp_advanced(rsp, rnp, rdp) || ret;
+	return ret;
 }
 
 /*
@@ -1840,6 +1866,7 @@
 {
 	unsigned long flags;
 	unsigned long mask;
+	bool needwake;
 	struct rcu_node *rnp;
 
 	rnp = rdp->mynode;
@@ -1868,9 +1895,11 @@
 		 * This GP can't end until cpu checks in, so all of our
 		 * callbacks can be processed during the next GP.
 		 */
-		rcu_accelerate_cbs(rsp, rnp, rdp);
+		needwake = rcu_accelerate_cbs(rsp, rnp, rdp);
 
 		rcu_report_qs_rnp(mask, rsp, rnp, flags); /* rlses rnp->lock */
+		if (needwake)
+			rcu_gp_kthread_wake(rsp);
 	}
 }
 
@@ -2354,6 +2383,7 @@
 __rcu_process_callbacks(struct rcu_state *rsp)
 {
 	unsigned long flags;
+	bool needwake;
 	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);
 
 	WARN_ON_ONCE(rdp->beenonline == 0);
@@ -2365,8 +2395,10 @@
 	local_irq_save(flags);
 	if (cpu_needs_another_gp(rsp, rdp)) {
 		raw_spin_lock(&rcu_get_root(rsp)->lock); /* irqs disabled. */
-		rcu_start_gp(rsp);
+		needwake = rcu_start_gp(rsp);
 		raw_spin_unlock_irqrestore(&rcu_get_root(rsp)->lock, flags);
+		if (needwake)
+			rcu_gp_kthread_wake(rsp);
 	} else {
 		local_irq_restore(flags);
 	}
@@ -2424,6 +2456,8 @@
 static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
 			    struct rcu_head *head, unsigned long flags)
 {
+	bool needwake;
+
 	/*
 	 * If called from an extended quiescent state, invoke the RCU
 	 * core in order to force a re-evaluation of RCU's idleness.
@@ -2453,8 +2487,10 @@
 
 			raw_spin_lock(&rnp_root->lock);
 			smp_mb__after_unlock_lock();
-			rcu_start_gp(rsp);
+			needwake = rcu_start_gp(rsp);
 			raw_spin_unlock(&rnp_root->lock);
+			if (needwake)
+				rcu_gp_kthread_wake(rsp);
 		} else {
 			/* Give the grace period a kick. */
 			rdp->blimit = LONG_MAX;
@@ -3440,7 +3476,6 @@
 
 	rsp->rda = rda;
 	init_waitqueue_head(&rsp->gp_wq);
-	init_irq_work(&rsp->wakeup_work, rsp_wakeup);
 	rnp = rsp->level[rcu_num_lvls - 1];
 	for_each_possible_cpu(i) {
 		while (i > rnp->grphi)