rcu-tasks: Wait for trc_read_check_handler() IPIs

Currently, RCU Tasks Trace initializes the trc_n_readers_need_end counter
to the value one, increments it before each trc_read_check_handler()
IPI, then decrements it within trc_read_check_handler() if the target
task was in a quiescent state (or if the target task moved to some other
CPU while the IPI was in flight), complaining if the new value was zero.
The rationale for complaining is that the initial value of one must be
decremented away before zero can be reached, and this decrement has not
yet happened.

Except that trc_read_check_handler() is initiated with an asynchronous
smp_call_function_single(), which might be significantly delayed.  This
can result in false-positive complaints about the counter reaching zero.

This commit therefore waits for in-flight IPI handlers to complete before
decrementing away the initial value of one from the trc_n_readers_need_end
counter.

Signed-off-by: Paul E. McKenney <paulmck@kernel.org>
diff --git a/kernel/rcu/tasks.h b/kernel/rcu/tasks.h
index 806160c..3b2f803 100644
--- a/kernel/rcu/tasks.h
+++ b/kernel/rcu/tasks.h
@@ -1150,14 +1150,28 @@ static void check_all_holdout_tasks_trace(struct list_head *hop,
 	}
 }
 
+static void rcu_tasks_trace_empty_fn(void *unused)
+{
+}
+
 /* Wait for grace period to complete and provide ordering. */
 static void rcu_tasks_trace_postgp(struct rcu_tasks *rtp)
 {
+	int cpu;
 	bool firstreport;
 	struct task_struct *g, *t;
 	LIST_HEAD(holdouts);
 	long ret;
 
+	// Wait for any lingering IPI handlers to complete.  Note that
+	// if a CPU has gone offline or transitioned to userspace in the
+	// meantime, all IPI handlers should have been drained beforehand.
+	// Yes, this assumes that CPUs process IPIs in order.  If that ever
+	// changes, there will need to be a recheck and/or timed wait.
+	for_each_online_cpu(cpu)
+		if (smp_load_acquire(per_cpu_ptr(&trc_ipi_to_cpu, cpu)))
+			smp_call_function_single(cpu, rcu_tasks_trace_empty_fn, NULL, 1);
+
 	// Remove the safety count.
 	smp_mb__before_atomic();  // Order vs. earlier atomics
 	atomic_dec(&trc_n_readers_need_end);