srcu: Abstract multi-tail callback list handling

RCU has only one multi-tail callback list, which is implemented via
the nxtlist, nxttail, nxtcompleted, qlen_lazy, and qlen fields in the
rcu_data structure, and whose operations are open-code throughout the
Tree RCU implementation.  This has been more or less OK in the past,
but upcoming callback-list optimizations in SRCU could really use
a multi-tail callback list there as well.

This commit therefore abstracts the multi-tail callback list handling
into a new kernel/rcu/rcu_segcblist.h file, and uses this new API.
The simple head-and-tail pointer callback list is also abstracted and
applied everywhere except for the NOCB callback-offload lists.  (Yes,
the plan is to apply them there as well, but this commit is already
bigger than would be good.)

Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 530ab6c..8cc9d40 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -97,8 +97,8 @@ struct rcu_state sname##_state = { \
 	.gpnum = 0UL - 300UL, \
 	.completed = 0UL - 300UL, \
 	.orphan_lock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.orphan_lock), \
-	.orphan_nxttail = &sname##_state.orphan_nxtlist, \
-	.orphan_donetail = &sname##_state.orphan_donelist, \
+	.orphan_pend = RCU_CBLIST_INITIALIZER(sname##_state.orphan_pend), \
+	.orphan_done = RCU_CBLIST_INITIALIZER(sname##_state.orphan_done), \
 	.barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
 	.name = RCU_STATE_NAME(sname), \
 	.abbr = sabbr, \
@@ -726,16 +726,6 @@ void rcutorture_record_progress(unsigned long vernum)
 EXPORT_SYMBOL_GPL(rcutorture_record_progress);
 
 /*
- * Does the CPU have callbacks ready to be invoked?
- */
-static int
-cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
-{
-	return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
-	       rdp->nxttail[RCU_NEXT_TAIL] != NULL;
-}
-
-/*
  * Return the root node of the specified rcu_state structure.
  */
 static struct rcu_node *rcu_get_root(struct rcu_state *rsp)
@@ -765,21 +755,17 @@ static int rcu_future_needs_gp(struct rcu_state *rsp)
 static bool
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-	int i;
-
 	if (rcu_gp_in_progress(rsp))
 		return false;  /* No, a grace period is already in progress. */
 	if (rcu_future_needs_gp(rsp))
 		return true;  /* Yes, a no-CBs CPU needs one. */
-	if (!rdp->nxttail[RCU_NEXT_TAIL])
+	if (!rcu_segcblist_is_enabled(&rdp->cblist))
 		return false;  /* No, this is a no-CBs (or offline) CPU. */
-	if (*rdp->nxttail[RCU_NEXT_READY_TAIL])
+	if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
 		return true;  /* Yes, CPU has newly registered callbacks. */
-	for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++)
-		if (rdp->nxttail[i - 1] != rdp->nxttail[i] &&
-		    ULONG_CMP_LT(READ_ONCE(rsp->completed),
-				 rdp->nxtcompleted[i]))
-			return true;  /* Yes, CBs for future grace period. */
+	if (rcu_segcblist_future_gp_needed(&rdp->cblist,
+					   READ_ONCE(rsp->completed)))
+		return true;  /* Yes, CBs for future grace period. */
 	return false; /* No grace period needed. */
 }
 
@@ -1490,7 +1476,8 @@ static void print_other_cpu_stall(struct rcu_state *rsp, unsigned long gpnum)
 
 	print_cpu_stall_info_end();
 	for_each_possible_cpu(cpu)
-		totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+		totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(rsp->rda,
+							    cpu)->cblist);
 	pr_cont("(detected by %d, t=%ld jiffies, g=%ld, c=%ld, q=%lu)\n",
 	       smp_processor_id(), (long)(jiffies - rsp->gp_start),
 	       (long)rsp->gpnum, (long)rsp->completed, totqlen);
@@ -1544,7 +1531,8 @@ static void print_cpu_stall(struct rcu_state *rsp)
 	print_cpu_stall_info(rsp, smp_processor_id());
 	print_cpu_stall_info_end();
 	for_each_possible_cpu(cpu)
-		totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+		totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(rsp->rda,
+							    cpu)->cblist);
 	pr_cont(" (t=%lu jiffies g=%ld c=%ld q=%lu)\n",
 		jiffies - rsp->gp_start,
 		(long)rsp->gpnum, (long)rsp->completed, totqlen);
@@ -1647,30 +1635,6 @@ void rcu_cpu_stall_reset(void)
 }
 
 /*
- * Initialize the specified rcu_data structure's default callback list
- * to empty.  The default callback list is the one that is not used by
- * no-callbacks CPUs.
- */
-static void init_default_callback_list(struct rcu_data *rdp)
-{
-	int i;
-
-	rdp->nxtlist = NULL;
-	for (i = 0; i < RCU_NEXT_SIZE; i++)
-		rdp->nxttail[i] = &rdp->nxtlist;
-}
-
-/*
- * Initialize the specified rcu_data structure's callback list to empty.
- */
-static void init_callback_list(struct rcu_data *rdp)
-{
-	if (init_nocb_callback_list(rdp))
-		return;
-	init_default_callback_list(rdp);
-}
-
-/*
  * Determine the value that ->completed will have at the end of the
  * next subsequent grace period.  This is used to tag callbacks so that
  * a CPU can invoke callbacks in a timely fashion even if that CPU has
@@ -1724,7 +1688,6 @@ rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp,
 		    unsigned long *c_out)
 {
 	unsigned long c;
-	int i;
 	bool ret = false;
 	struct rcu_node *rnp_root = rcu_get_root(rdp->rsp);
 
@@ -1770,13 +1733,11 @@ rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp,
 	/*
 	 * Get a new grace-period number.  If there really is no grace
 	 * period in progress, it will be smaller than the one we obtained
-	 * earlier.  Adjust callbacks as needed.  Note that even no-CBs
-	 * CPUs have a ->nxtcompleted[] array, so no no-CBs checks needed.
+	 * earlier.  Adjust callbacks as needed.
 	 */
 	c = rcu_cbs_completed(rdp->rsp, rnp_root);
-	for (i = RCU_DONE_TAIL; i < RCU_NEXT_TAIL; i++)
-		if (ULONG_CMP_LT(c, rdp->nxtcompleted[i]))
-			rdp->nxtcompleted[i] = c;
+	if (!rcu_is_nocb_cpu(rdp->cpu))
+		(void)rcu_segcblist_accelerate(&rdp->cblist, c);
 
 	/*
 	 * If the needed for the required grace period is already
@@ -1856,57 +1817,27 @@ static void rcu_gp_kthread_wake(struct rcu_state *rsp)
 static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
 			       struct rcu_data *rdp)
 {
-	unsigned long c;
-	int i;
-	bool ret;
+	bool ret = false;
 
-	/* If the CPU has no callbacks, nothing to do. */
-	if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
+	/* If no pending (not yet ready to invoke) callbacks, nothing to do. */
+	if (!rcu_segcblist_pend_cbs(&rdp->cblist))
 		return false;
 
 	/*
-	 * Starting from the sublist containing the callbacks most
-	 * recently assigned a ->completed number and working down, find the
-	 * first sublist that is not assignable to an upcoming grace period.
-	 * Such a sublist has something in it (first two tests) and has
-	 * a ->completed number assigned that will complete sooner than
-	 * the ->completed number for newly arrived callbacks (last test).
-	 *
-	 * The key point is that any later sublist can be assigned the
-	 * same ->completed number as the newly arrived callbacks, which
-	 * means that the callbacks in any of these later sublist can be
-	 * grouped into a single sublist, whether or not they have already
-	 * been assigned a ->completed number.
+	 * Callbacks are often registered with incomplete grace-period
+	 * information.  Something about the fact that getting exact
+	 * information requires acquiring a global lock...  RCU therefore
+	 * makes a conservative estimate of the grace period number at which
+	 * a given callback will become ready to invoke.	The following
+	 * code checks this estimate and improves it when possible, thus
+	 * accelerating callback invocation to an earlier grace-period
+	 * number.
 	 */
-	c = rcu_cbs_completed(rsp, rnp);
-	for (i = RCU_NEXT_TAIL - 1; i > RCU_DONE_TAIL; i--)
-		if (rdp->nxttail[i] != rdp->nxttail[i - 1] &&
-		    !ULONG_CMP_GE(rdp->nxtcompleted[i], c))
-			break;
-
-	/*
-	 * If there are no sublist for unassigned callbacks, leave.
-	 * At the same time, advance "i" one sublist, so that "i" will
-	 * index into the sublist where all the remaining callbacks should
-	 * be grouped into.
-	 */
-	if (++i >= RCU_NEXT_TAIL)
-		return false;
-
-	/*
-	 * Assign all subsequent callbacks' ->completed number to the next
-	 * full grace period and group them all in the sublist initially
-	 * indexed by "i".
-	 */
-	for (; i <= RCU_NEXT_TAIL; i++) {
-		rdp->nxttail[i] = rdp->nxttail[RCU_NEXT_TAIL];
-		rdp->nxtcompleted[i] = c;
-	}
-	/* Record any needed additional grace periods. */
-	ret = rcu_start_future_gp(rnp, rdp, NULL);
+	if (rcu_segcblist_accelerate(&rdp->cblist, rcu_cbs_completed(rsp, rnp)))
+		ret = rcu_start_future_gp(rnp, rdp, NULL);
 
 	/* Trace depending on how much we were able to accelerate. */
-	if (!*rdp->nxttail[RCU_WAIT_TAIL])
+	if (rcu_segcblist_restempty(&rdp->cblist, RCU_WAIT_TAIL))
 		trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccWaitCB"));
 	else
 		trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccReadyCB"));
@@ -1926,32 +1857,15 @@ static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
 static bool rcu_advance_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
 			    struct rcu_data *rdp)
 {
-	int i, j;
-
-	/* If the CPU has no callbacks, nothing to do. */
-	if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
+	/* If no pending (not yet ready to invoke) callbacks, nothing to do. */
+	if (!rcu_segcblist_pend_cbs(&rdp->cblist))
 		return false;
 
 	/*
 	 * Find all callbacks whose ->completed numbers indicate that they
 	 * are ready to invoke, and put them into the RCU_DONE_TAIL sublist.
 	 */
-	for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) {
-		if (ULONG_CMP_LT(rnp->completed, rdp->nxtcompleted[i]))
-			break;
-		rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[i];
-	}
-	/* Clean up any sublist tail pointers that were misordered above. */
-	for (j = RCU_WAIT_TAIL; j < i; j++)
-		rdp->nxttail[j] = rdp->nxttail[RCU_DONE_TAIL];
-
-	/* Copy down callbacks to fill in empty sublists. */
-	for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) {
-		if (rdp->nxttail[j] == rdp->nxttail[RCU_NEXT_TAIL])
-			break;
-		rdp->nxttail[j] = rdp->nxttail[i];
-		rdp->nxtcompleted[j] = rdp->nxtcompleted[i];
-	}
+	rcu_segcblist_advance(&rdp->cblist, rnp->completed);
 
 	/* Classify any remaining callbacks. */
 	return rcu_accelerate_cbs(rsp, rnp, rdp);
@@ -2668,13 +2582,8 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 	 * because _rcu_barrier() excludes CPU-hotplug operations, so it
 	 * cannot be running now.  Thus no memory barrier is required.
 	 */
-	if (rdp->nxtlist != NULL) {
-		rsp->qlen_lazy += rdp->qlen_lazy;
-		rsp->qlen += rdp->qlen;
-		rdp->n_cbs_orphaned += rdp->qlen;
-		rdp->qlen_lazy = 0;
-		WRITE_ONCE(rdp->qlen, 0);
-	}
+	rdp->n_cbs_orphaned += rcu_segcblist_n_cbs(&rdp->cblist);
+	rcu_segcblist_extract_count(&rdp->cblist, &rsp->orphan_done);
 
 	/*
 	 * Next, move those callbacks still needing a grace period to
@@ -2682,31 +2591,18 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 	 * Some of the callbacks might have gone partway through a grace
 	 * period, but that is too bad.  They get to start over because we
 	 * cannot assume that grace periods are synchronized across CPUs.
-	 * We don't bother updating the ->nxttail[] array yet, instead
-	 * we just reset the whole thing later on.
 	 */
-	if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
-		*rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
-		rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
-		*rdp->nxttail[RCU_DONE_TAIL] = NULL;
-	}
+	rcu_segcblist_extract_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
 
 	/*
 	 * Then move the ready-to-invoke callbacks to the orphanage,
 	 * where some other CPU will pick them up.  These will not be
 	 * required to pass though another grace period: They are done.
 	 */
-	if (rdp->nxtlist != NULL) {
-		*rsp->orphan_donetail = rdp->nxtlist;
-		rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
-	}
+	rcu_segcblist_extract_done_cbs(&rdp->cblist, &rsp->orphan_done);
 
-	/*
-	 * Finally, initialize the rcu_data structure's list to empty and
-	 * disallow further callbacks on this CPU.
-	 */
-	init_callback_list(rdp);
-	rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+	/* Finally, disallow further callbacks on this CPU.  */
+	rcu_segcblist_disable(&rdp->cblist);
 }
 
 /*
@@ -2715,7 +2611,6 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
  */
 static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
 {
-	int i;
 	struct rcu_data *rdp = raw_cpu_ptr(rsp->rda);
 
 	/* No-CBs CPUs are handled specially. */
@@ -2724,13 +2619,11 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
 		return;
 
 	/* Do the accounting first. */
-	rdp->qlen_lazy += rsp->qlen_lazy;
-	rdp->qlen += rsp->qlen;
-	rdp->n_cbs_adopted += rsp->qlen;
-	if (rsp->qlen_lazy != rsp->qlen)
+	rdp->n_cbs_adopted += rcu_cblist_n_cbs(&rsp->orphan_done);
+	if (rcu_cblist_n_lazy_cbs(&rsp->orphan_done) !=
+	    rcu_cblist_n_cbs(&rsp->orphan_done))
 		rcu_idle_count_callbacks_posted();
-	rsp->qlen_lazy = 0;
-	rsp->qlen = 0;
+	rcu_segcblist_insert_count(&rdp->cblist, &rsp->orphan_done);
 
 	/*
 	 * We do not need a memory barrier here because the only way we
@@ -2738,24 +2631,13 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
 	 * we are the task doing the rcu_barrier().
 	 */
 
-	/* First adopt the ready-to-invoke callbacks. */
-	if (rsp->orphan_donelist != NULL) {
-		*rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
-		*rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
-		for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
-			if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
-				rdp->nxttail[i] = rsp->orphan_donetail;
-		rsp->orphan_donelist = NULL;
-		rsp->orphan_donetail = &rsp->orphan_donelist;
-	}
-
-	/* And then adopt the callbacks that still need a grace period. */
-	if (rsp->orphan_nxtlist != NULL) {
-		*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
-		rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
-		rsp->orphan_nxtlist = NULL;
-		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
-	}
+	/* First adopt the ready-to-invoke callbacks, then the done ones. */
+	rcu_segcblist_insert_done_cbs(&rdp->cblist, &rsp->orphan_done);
+	WARN_ON_ONCE(!rcu_cblist_empty(&rsp->orphan_done));
+	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
+	WARN_ON_ONCE(!rcu_cblist_empty(&rsp->orphan_pend));
+	WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) !=
+		     !rcu_segcblist_n_cbs(&rdp->cblist));
 }
 
 /*
@@ -2843,9 +2725,11 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 	rcu_adopt_orphan_cbs(rsp, flags);
 	raw_spin_unlock_irqrestore(&rsp->orphan_lock, flags);
 
-	WARN_ONCE(rdp->qlen != 0 || rdp->nxtlist != NULL,
-		  "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, nxtlist=%p\n",
-		  cpu, rdp->qlen, rdp->nxtlist);
+	WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 ||
+		  !rcu_segcblist_empty(&rdp->cblist),
+		  "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n",
+		  cpu, rcu_segcblist_n_cbs(&rdp->cblist),
+		  rcu_segcblist_first_cb(&rdp->cblist));
 }
 
 /*
@@ -2855,14 +2739,17 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 {
 	unsigned long flags;
-	struct rcu_head *next, *list, **tail;
-	long bl, count, count_lazy;
-	int i;
+	struct rcu_head *rhp;
+	struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
+	long bl, count;
 
 	/* If no callbacks are ready, just return. */
-	if (!cpu_has_callbacks_ready_to_invoke(rdp)) {
-		trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, 0);
-		trace_rcu_batch_end(rsp->name, 0, !!READ_ONCE(rdp->nxtlist),
+	if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
+		trace_rcu_batch_start(rsp->name,
+				      rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+				      rcu_segcblist_n_cbs(&rdp->cblist), 0);
+		trace_rcu_batch_end(rsp->name, 0,
+				    !rcu_segcblist_empty(&rdp->cblist),
 				    need_resched(), is_idle_task(current),
 				    rcu_is_callbacks_kthread());
 		return;
@@ -2870,73 +2757,62 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 
 	/*
 	 * Extract the list of ready callbacks, disabling to prevent
-	 * races with call_rcu() from interrupt handlers.
+	 * races with call_rcu() from interrupt handlers.  Leave the
+	 * callback counts, as rcu_barrier() needs to be conservative.
 	 */
 	local_irq_save(flags);
 	WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
 	bl = rdp->blimit;
-	trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, bl);
-	list = rdp->nxtlist;
-	rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
-	*rdp->nxttail[RCU_DONE_TAIL] = NULL;
-	tail = rdp->nxttail[RCU_DONE_TAIL];
-	for (i = RCU_NEXT_SIZE - 1; i >= 0; i--)
-		if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
-			rdp->nxttail[i] = &rdp->nxtlist;
+	trace_rcu_batch_start(rsp->name, rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+			      rcu_segcblist_n_cbs(&rdp->cblist), bl);
+	rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl);
 	local_irq_restore(flags);
 
 	/* Invoke callbacks. */
-	count = count_lazy = 0;
-	while (list) {
-		next = list->next;
-		prefetch(next);
-		debug_rcu_head_unqueue(list);
-		if (__rcu_reclaim(rsp->name, list))
-			count_lazy++;
-		list = next;
-		/* Stop only if limit reached and CPU has something to do. */
-		if (++count >= bl &&
+	rhp = rcu_cblist_dequeue(&rcl);
+	for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
+		debug_rcu_head_unqueue(rhp);
+		if (__rcu_reclaim(rsp->name, rhp))
+			rcu_cblist_dequeued_lazy(&rcl);
+		/*
+		 * Stop only if limit reached and CPU has something to do.
+		 * Note: The rcl structure counts down from zero.
+		 */
+		if (-rcu_cblist_n_cbs(&rcl) >= bl &&
 		    (need_resched() ||
 		     (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
 			break;
 	}
 
 	local_irq_save(flags);
-	trace_rcu_batch_end(rsp->name, count, !!list, need_resched(),
-			    is_idle_task(current),
+	count = -rcu_cblist_n_cbs(&rcl);
+	trace_rcu_batch_end(rsp->name, count, !rcu_cblist_empty(&rcl),
+			    need_resched(), is_idle_task(current),
 			    rcu_is_callbacks_kthread());
 
-	/* Update count, and requeue any remaining callbacks. */
-	if (list != NULL) {
-		*tail = rdp->nxtlist;
-		rdp->nxtlist = list;
-		for (i = 0; i < RCU_NEXT_SIZE; i++)
-			if (&rdp->nxtlist == rdp->nxttail[i])
-				rdp->nxttail[i] = tail;
-			else
-				break;
-	}
+	/* Update counts and requeue any remaining callbacks. */
+	rcu_segcblist_insert_done_cbs(&rdp->cblist, &rcl);
 	smp_mb(); /* List handling before counting for rcu_barrier(). */
-	rdp->qlen_lazy -= count_lazy;
-	WRITE_ONCE(rdp->qlen, rdp->qlen - count);
 	rdp->n_cbs_invoked += count;
+	rcu_segcblist_insert_count(&rdp->cblist, &rcl);
 
 	/* Reinstate batch limit if we have worked down the excess. */
-	if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
+	count = rcu_segcblist_n_cbs(&rdp->cblist);
+	if (rdp->blimit == LONG_MAX && count <= qlowmark)
 		rdp->blimit = blimit;
 
 	/* Reset ->qlen_last_fqs_check trigger if enough CBs have drained. */
-	if (rdp->qlen == 0 && rdp->qlen_last_fqs_check != 0) {
+	if (count == 0 && rdp->qlen_last_fqs_check != 0) {
 		rdp->qlen_last_fqs_check = 0;
 		rdp->n_force_qs_snap = rsp->n_force_qs;
-	} else if (rdp->qlen < rdp->qlen_last_fqs_check - qhimark)
-		rdp->qlen_last_fqs_check = rdp->qlen;
-	WARN_ON_ONCE((rdp->nxtlist == NULL) != (rdp->qlen == 0));
+	} else if (count < rdp->qlen_last_fqs_check - qhimark)
+		rdp->qlen_last_fqs_check = count;
+	WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0));
 
 	local_irq_restore(flags);
 
 	/* Re-invoke RCU core processing if there are callbacks remaining. */
-	if (cpu_has_callbacks_ready_to_invoke(rdp))
+	if (rcu_segcblist_ready_cbs(&rdp->cblist))
 		invoke_rcu_core();
 }
 
@@ -3120,7 +2996,7 @@ __rcu_process_callbacks(struct rcu_state *rsp)
 	}
 
 	/* If there are callbacks ready, invoke them. */
-	if (cpu_has_callbacks_ready_to_invoke(rdp))
+	if (rcu_segcblist_ready_cbs(&rdp->cblist))
 		invoke_rcu_callbacks(rsp, rdp);
 
 	/* Do any needed deferred wakeups of rcuo kthreads. */
@@ -3192,7 +3068,8 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
 	 * invoking force_quiescent_state() if the newly enqueued callback
 	 * is the only one waiting for a grace period to complete.
 	 */
-	if (unlikely(rdp->qlen > rdp->qlen_last_fqs_check + qhimark)) {
+	if (unlikely(rcu_segcblist_n_cbs(&rdp->cblist) >
+		     rdp->qlen_last_fqs_check + qhimark)) {
 
 		/* Are we ignoring a completed grace period? */
 		note_gp_changes(rsp, rdp);
@@ -3210,10 +3087,10 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
 			/* Give the grace period a kick. */
 			rdp->blimit = LONG_MAX;
 			if (rsp->n_force_qs == rdp->n_force_qs_snap &&
-			    *rdp->nxttail[RCU_DONE_TAIL] != head)
+			    rcu_segcblist_first_pend_cb(&rdp->cblist) != head)
 				force_quiescent_state(rsp);
 			rdp->n_force_qs_snap = rsp->n_force_qs;
-			rdp->qlen_last_fqs_check = rdp->qlen;
+			rdp->qlen_last_fqs_check = rcu_segcblist_n_cbs(&rdp->cblist);
 		}
 	}
 }
@@ -3253,7 +3130,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func,
 	rdp = this_cpu_ptr(rsp->rda);
 
 	/* Add the callback to our list. */
-	if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+	if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist)) || cpu != -1) {
 		int offline;
 
 		if (cpu != -1)
@@ -3272,23 +3149,21 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func,
 		 */
 		BUG_ON(cpu != -1);
 		WARN_ON_ONCE(!rcu_is_watching());
-		if (!likely(rdp->nxtlist))
-			init_default_callback_list(rdp);
+		if (rcu_segcblist_empty(&rdp->cblist))
+			rcu_segcblist_init(&rdp->cblist);
 	}
-	WRITE_ONCE(rdp->qlen, rdp->qlen + 1);
-	if (lazy)
-		rdp->qlen_lazy++;
-	else
+	rcu_segcblist_enqueue(&rdp->cblist, head, lazy);
+	if (!lazy)
 		rcu_idle_count_callbacks_posted();
-	smp_mb();  /* Count before adding callback for rcu_barrier(). */
-	*rdp->nxttail[RCU_NEXT_TAIL] = head;
-	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
 
 	if (__is_kfree_rcu_offset((unsigned long)func))
 		trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
-					 rdp->qlen_lazy, rdp->qlen);
+					 rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+					 rcu_segcblist_n_cbs(&rdp->cblist));
 	else
-		trace_rcu_callback(rsp->name, head, rdp->qlen_lazy, rdp->qlen);
+		trace_rcu_callback(rsp->name, head,
+				   rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+				   rcu_segcblist_n_cbs(&rdp->cblist));
 
 	/* Go handle any RCU core processing required. */
 	__call_rcu_core(rsp, rdp, head, flags);
@@ -3600,7 +3475,7 @@ static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)
 	}
 
 	/* Does this CPU have callbacks ready to invoke? */
-	if (cpu_has_callbacks_ready_to_invoke(rdp)) {
+	if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
 		rdp->n_rp_cb_ready++;
 		return 1;
 	}
@@ -3664,10 +3539,10 @@ static bool __maybe_unused rcu_cpu_has_callbacks(bool *all_lazy)
 
 	for_each_rcu_flavor(rsp) {
 		rdp = this_cpu_ptr(rsp->rda);
-		if (!rdp->nxtlist)
+		if (rcu_segcblist_empty(&rdp->cblist))
 			continue;
 		hc = true;
-		if (rdp->qlen != rdp->qlen_lazy || !all_lazy) {
+		if (rcu_segcblist_n_nonlazy_cbs(&rdp->cblist) || !all_lazy) {
 			al = false;
 			break;
 		}
@@ -3776,7 +3651,7 @@ static void _rcu_barrier(struct rcu_state *rsp)
 				__call_rcu(&rdp->barrier_head,
 					   rcu_barrier_callback, rsp, cpu, 0);
 			}
-		} else if (READ_ONCE(rdp->qlen)) {
+		} else if (rcu_segcblist_n_cbs(&rdp->cblist)) {
 			_rcu_barrier_trace(rsp, "OnlineQ", cpu,
 					   rsp->barrier_sequence);
 			smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
@@ -3885,8 +3760,9 @@ rcu_init_percpu_data(int cpu, struct rcu_state *rsp)
 	rdp->qlen_last_fqs_check = 0;
 	rdp->n_force_qs_snap = rsp->n_force_qs;
 	rdp->blimit = blimit;
-	if (!rdp->nxtlist)
-		init_callback_list(rdp);  /* Re-enable callbacks on this CPU. */
+	if (rcu_segcblist_empty(&rdp->cblist) && /* No early-boot CBs? */
+	    !init_nocb_callback_list(rdp))
+		rcu_segcblist_init(&rdp->cblist);  /* Re-enable callbacks. */
 	rdp->dynticks->dynticks_nesting = DYNTICK_TASK_EXIT_IDLE;
 	rcu_sysidle_init_percpu_data(rdp->dynticks);
 	rcu_dynticks_eqs_online();