rcu: Account for nocb-CPU callback counts in RCU CPU stall warnings

The RCU CPU stall warnings print an estimate of the total number of
RCU callbacks queued in the system, but this estimate leaves out
the callbacks queued for nocbs CPUs.  This commit therefore introduces
rcu_get_n_cbs_cpu(), which gives an accurate callback estimate for
both nocbs and normal CPUs, and uses this new function as needed.

This commit also introduces a rcu_get_n_cbs_nocb_cpu() helper function
that returns the number of callbacks for nocbs CPUs or zero otherwise,
and also uses this function in place of direct access to ->nocb_q_count
while in the area (fewer characters, you see).

Signed-off-by: Paul E. McKenney <paulmck@linux.ibm.com>
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 853b79a..0933d86 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -207,6 +207,19 @@ static int rcu_gp_in_progress(void)
 	return rcu_seq_state(rcu_seq_current(&rcu_state.gp_seq));
 }
 
+/*
+ * Return the number of callbacks queued on the specified CPU.
+ * Handles both the nocbs and normal cases.
+ */
+static long rcu_get_n_cbs_cpu(int cpu)
+{
+	struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
+
+	if (rcu_segcblist_is_enabled(&rdp->cblist)) /* Online normal CPU? */
+		return rcu_segcblist_n_cbs(&rdp->cblist);
+	return rcu_get_n_cbs_nocb_cpu(rdp); /* Works for offline, too. */
+}
+
 void rcu_softirq_qs(void)
 {
 	rcu_qs();
@@ -1265,8 +1278,7 @@ static void print_other_cpu_stall(unsigned long gp_seq)
 
 	print_cpu_stall_info_end();
 	for_each_possible_cpu(cpu)
-		totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(&rcu_data,
-							    cpu)->cblist);
+		totqlen += rcu_get_n_cbs_cpu(cpu);
 	pr_cont("(detected by %d, t=%ld jiffies, g=%ld, q=%lu)\n",
 	       smp_processor_id(), (long)(jiffies - rcu_state.gp_start),
 	       (long)rcu_seq_current(&rcu_state.gp_seq), totqlen);
@@ -1326,8 +1338,7 @@ static void print_cpu_stall(void)
 	raw_spin_unlock_irqrestore_rcu_node(rdp->mynode, flags);
 	print_cpu_stall_info_end();
 	for_each_possible_cpu(cpu)
-		totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(&rcu_data,
-							    cpu)->cblist);
+		totqlen += rcu_get_n_cbs_cpu(cpu);
 	pr_cont(" (t=%lu jiffies g=%ld q=%lu)\n",
 		jiffies - rcu_state.gp_start,
 		(long)rcu_seq_current(&rcu_state.gp_seq), totqlen);
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index c3e2807..a8f82b7 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -455,6 +455,7 @@ static void __init rcu_spawn_nocb_kthreads(void);
 static void __init rcu_organize_nocb_kthreads(void);
 #endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 static bool init_nocb_callback_list(struct rcu_data *rdp);
+static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp);
 static void rcu_bind_gp_kthread(void);
 static bool rcu_nohz_full_cpu(void);
 static void rcu_dynticks_task_enter(void);
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 25fe26c..1b3dd2f 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -2011,7 +2011,7 @@ static bool rcu_nocb_cpu_needs_barrier(int cpu)
 	 * (if a callback is in fact needed).  This is associated with an
 	 * atomic_inc() in the caller.
 	 */
-	ret = atomic_long_read(&rdp->nocb_q_count);
+	ret = rcu_get_n_cbs_nocb_cpu(rdp);
 
 #ifdef CONFIG_PROVE_RCU
 	rhp = READ_ONCE(rdp->nocb_head);
@@ -2066,7 +2066,7 @@ static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
 				    TPS("WakeNotPoll"));
 		return;
 	}
-	len = atomic_long_read(&rdp->nocb_q_count);
+	len = rcu_get_n_cbs_nocb_cpu(rdp);
 	if (old_rhpp == &rdp->nocb_head) {
 		if (!irqs_disabled_flags(flags)) {
 			/* ... if queue was empty ... */
@@ -2115,11 +2115,11 @@ static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
 		trace_rcu_kfree_callback(rcu_state.name, rhp,
 					 (unsigned long)rhp->func,
 					 -atomic_long_read(&rdp->nocb_q_count_lazy),
-					 -atomic_long_read(&rdp->nocb_q_count));
+					 -rcu_get_n_cbs_nocb_cpu(rdp));
 	else
 		trace_rcu_callback(rcu_state.name, rhp,
 				   -atomic_long_read(&rdp->nocb_q_count_lazy),
-				   -atomic_long_read(&rdp->nocb_q_count));
+				   -rcu_get_n_cbs_nocb_cpu(rdp));
 
 	/*
 	 * If called from an extended quiescent state with interrupts
@@ -2343,7 +2343,7 @@ static int rcu_nocb_kthread(void *arg)
 		/* Each pass through the following loop invokes a callback. */
 		trace_rcu_batch_start(rcu_state.name,
 				      atomic_long_read(&rdp->nocb_q_count_lazy),
-				      atomic_long_read(&rdp->nocb_q_count), -1);
+				      rcu_get_n_cbs_nocb_cpu(rdp), -1);
 		c = cl = 0;
 		while (list) {
 			next = list->next;
@@ -2614,6 +2614,15 @@ void rcu_bind_current_to_nocb(void)
 }
 EXPORT_SYMBOL_GPL(rcu_bind_current_to_nocb);
 
+/*
+ * Return the number of RCU callbacks still queued from the specified
+ * CPU, which must be a nocbs CPU.
+ */
+static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp)
+{
+	return atomic_long_read(&rdp->nocb_q_count);
+}
+
 #else /* #ifdef CONFIG_RCU_NOCB_CPU */
 
 static bool rcu_nocb_cpu_needs_barrier(int cpu)
@@ -2674,6 +2683,11 @@ static bool init_nocb_callback_list(struct rcu_data *rdp)
 	return false;
 }
 
+static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp)
+{
+	return 0;
+}
+
 #endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
 
 /*