RDS/IB: Add caching of frags and incs

This patch is based heavily on an initial patch by Chris Mason.
Instead of freeing slab memory and pages, it keeps them, and
funnels them back to be reused.

The lock minimization strategy uses xchg and cmpxchg atomic ops
for manipulation of pointers to list heads. We anchor the lists with a
pointer to a list_head struct instead of a static list_head struct.
We just have to carefully use the existing primitives with
the difference between a pointer and a static head struct.

For example, 'list_empty()' means that our anchor pointer points to a list with
a single item instead of meaning that our static head element doesn't point to
any list items.

Original patch by Chris, with significant mods and fixes by Andy and Zach.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
Signed-off-by: Andy Grover <andy.grover@oracle.com>
Signed-off-by: Zach Brown <zach.brown@oracle.com>
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index 48add10..5b429b7 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -43,14 +43,6 @@
 static struct kmem_cache *rds_ib_frag_slab;
 static atomic_t	rds_ib_allocation = ATOMIC_INIT(0);
 
-/* Free frag and attached recv buffer f_sg */
-static void rds_ib_frag_free(struct rds_page_frag *frag)
-{
-	rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
-	__free_page(sg_page(&frag->f_sg));
-	kmem_cache_free(rds_ib_frag_slab, frag);
-}
-
 void rds_ib_recv_init_ring(struct rds_ib_connection *ic)
 {
 	struct rds_ib_recv_work *recv;
@@ -79,6 +71,151 @@
 	}
 }
 
+/*
+ * The entire 'from' list, including the from element itself, is put on
+ * to the tail of the 'to' list.
+ */
+static void list_splice_entire_tail(struct list_head *from,
+				    struct list_head *to)
+{
+	struct list_head *from_last = from->prev;
+
+	list_splice_tail(from_last, to);
+	list_add_tail(from_last, to);
+}
+
+static void rds_ib_cache_xfer_to_ready(struct rds_ib_refill_cache *cache)
+{
+	struct list_head *tmp;
+
+	tmp = xchg(&cache->xfer, NULL);
+	if (tmp) {
+		if (cache->ready)
+			list_splice_entire_tail(tmp, cache->ready);
+		else
+			cache->ready = tmp;
+	}
+}
+
+static int rds_ib_recv_alloc_cache(struct rds_ib_refill_cache *cache)
+{
+	struct rds_ib_cache_head *head;
+	int cpu;
+
+	cache->percpu = alloc_percpu(struct rds_ib_cache_head);
+	if (!cache->percpu)
+	       return -ENOMEM;
+
+	for_each_possible_cpu(cpu) {
+		head = per_cpu_ptr(cache->percpu, cpu);
+		head->first = NULL;
+		head->count = 0;
+	}
+	cache->xfer = NULL;
+	cache->ready = NULL;
+
+	return 0;
+}
+
+int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic)
+{
+	int ret;
+
+	ret = rds_ib_recv_alloc_cache(&ic->i_cache_incs);
+	if (!ret) {
+		ret = rds_ib_recv_alloc_cache(&ic->i_cache_frags);
+		if (ret)
+			free_percpu(ic->i_cache_incs.percpu);
+	}
+
+	return ret;
+}
+
+static void rds_ib_cache_splice_all_lists(struct rds_ib_refill_cache *cache,
+					  struct list_head *caller_list)
+{
+	struct rds_ib_cache_head *head;
+	int cpu;
+
+	for_each_possible_cpu(cpu) {
+		head = per_cpu_ptr(cache->percpu, cpu);
+		if (head->first) {
+			list_splice_entire_tail(head->first, caller_list);
+			head->first = NULL;
+		}
+	}
+
+	if (cache->ready) {
+		list_splice_entire_tail(cache->ready, caller_list);
+		cache->ready = NULL;
+	}
+}
+
+void rds_ib_recv_free_caches(struct rds_ib_connection *ic)
+{
+	struct rds_ib_incoming *inc;
+	struct rds_ib_incoming *inc_tmp;
+	struct rds_page_frag *frag;
+	struct rds_page_frag *frag_tmp;
+	LIST_HEAD(list);
+
+	rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
+	rds_ib_cache_splice_all_lists(&ic->i_cache_incs, &list);
+	free_percpu(ic->i_cache_incs.percpu);
+
+	list_for_each_entry_safe(inc, inc_tmp, &list, ii_cache_entry) {
+		list_del(&inc->ii_cache_entry);
+		WARN_ON(!list_empty(&inc->ii_frags));
+		kmem_cache_free(rds_ib_incoming_slab, inc);
+	}
+
+	rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
+	rds_ib_cache_splice_all_lists(&ic->i_cache_frags, &list);
+	free_percpu(ic->i_cache_frags.percpu);
+
+	list_for_each_entry_safe(frag, frag_tmp, &list, f_cache_entry) {
+		list_del(&frag->f_cache_entry);
+		WARN_ON(!list_empty(&frag->f_item));
+		kmem_cache_free(rds_ib_frag_slab, frag);
+	}
+}
+
+/* fwd decl */
+static void rds_ib_recv_cache_put(struct list_head *new_item,
+				  struct rds_ib_refill_cache *cache);
+static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache);
+
+
+/* Recycle frag and attached recv buffer f_sg */
+static void rds_ib_frag_free(struct rds_ib_connection *ic,
+			     struct rds_page_frag *frag)
+{
+	rdsdebug("frag %p page %p\n", frag, sg_page(&frag->f_sg));
+
+	rds_ib_recv_cache_put(&frag->f_cache_entry, &ic->i_cache_frags);
+}
+
+/* Recycle inc after freeing attached frags */
+void rds_ib_inc_free(struct rds_incoming *inc)
+{
+	struct rds_ib_incoming *ibinc;
+	struct rds_page_frag *frag;
+	struct rds_page_frag *pos;
+	struct rds_ib_connection *ic = inc->i_conn->c_transport_data;
+
+	ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+
+	/* Free attached frags */
+	list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
+		list_del_init(&frag->f_item);
+		rds_ib_frag_free(ic, frag);
+	}
+	BUG_ON(!list_empty(&ibinc->ii_frags));
+
+	rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
+	rds_ib_recv_cache_put(&ibinc->ii_cache_entry, &ic->i_cache_incs);
+}
+
 static void rds_ib_recv_clear_one(struct rds_ib_connection *ic,
 				  struct rds_ib_recv_work *recv)
 {
@@ -88,7 +225,7 @@
 	}
 	if (recv->r_frag) {
 		ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
-		rds_ib_frag_free(recv->r_frag);
+		rds_ib_frag_free(ic, recv->r_frag);
 		recv->r_frag = NULL;
 	}
 }
@@ -101,6 +238,61 @@
 		rds_ib_recv_clear_one(ic, &ic->i_recvs[i]);
 }
 
+static struct rds_ib_incoming *rds_ib_refill_one_inc(struct rds_ib_connection *ic)
+{
+	struct rds_ib_incoming *ibinc;
+	struct list_head *cache_item;
+	int avail_allocs;
+
+	cache_item = rds_ib_recv_cache_get(&ic->i_cache_incs);
+	if (cache_item) {
+		ibinc = container_of(cache_item, struct rds_ib_incoming, ii_cache_entry);
+	} else {
+		avail_allocs = atomic_add_unless(&rds_ib_allocation,
+						 1, rds_ib_sysctl_max_recv_allocation);
+		if (!avail_allocs) {
+			rds_ib_stats_inc(s_ib_rx_alloc_limit);
+			return NULL;
+		}
+		ibinc = kmem_cache_alloc(rds_ib_incoming_slab, GFP_NOWAIT);
+		if (!ibinc) {
+			atomic_dec(&rds_ib_allocation);
+			return NULL;
+		}
+	}
+	INIT_LIST_HEAD(&ibinc->ii_frags);
+	rds_inc_init(&ibinc->ii_inc, ic->conn, ic->conn->c_faddr);
+
+	return ibinc;
+}
+
+static struct rds_page_frag *rds_ib_refill_one_frag(struct rds_ib_connection *ic)
+{
+	struct rds_page_frag *frag;
+	struct list_head *cache_item;
+	int ret;
+
+	cache_item = rds_ib_recv_cache_get(&ic->i_cache_frags);
+	if (cache_item) {
+		frag = container_of(cache_item, struct rds_page_frag, f_cache_entry);
+	} else {
+		frag = kmem_cache_alloc(rds_ib_frag_slab, GFP_NOWAIT);
+		if (!frag)
+			return NULL;
+
+		ret = rds_page_remainder_alloc(&frag->f_sg,
+					       RDS_FRAG_SIZE, GFP_NOWAIT);
+		if (ret) {
+			kmem_cache_free(rds_ib_frag_slab, frag);
+			return NULL;
+		}
+	}
+
+	INIT_LIST_HEAD(&frag->f_item);
+
+	return frag;
+}
+
 static int rds_ib_recv_refill_one(struct rds_connection *conn,
 				  struct rds_ib_recv_work *recv)
 {
@@ -108,37 +300,25 @@
 	struct ib_sge *sge;
 	int ret = -ENOMEM;
 
+	if (!ic->i_cache_incs.ready)
+		rds_ib_cache_xfer_to_ready(&ic->i_cache_incs);
+	if (!ic->i_cache_frags.ready)
+		rds_ib_cache_xfer_to_ready(&ic->i_cache_frags);
+
 	/*
 	 * ibinc was taken from recv if recv contained the start of a message.
 	 * recvs that were continuations will still have this allocated.
 	 */
 	if (!recv->r_ibinc) {
-		if (!atomic_add_unless(&rds_ib_allocation, 1, rds_ib_sysctl_max_recv_allocation)) {
-			rds_ib_stats_inc(s_ib_rx_alloc_limit);
+		recv->r_ibinc = rds_ib_refill_one_inc(ic);
+		if (!recv->r_ibinc)
 			goto out;
-		}
-		recv->r_ibinc = kmem_cache_alloc(rds_ib_incoming_slab, GFP_NOWAIT);
-		if (!recv->r_ibinc) {
-			atomic_dec(&rds_ib_allocation);
-			goto out;
-		}
-		INIT_LIST_HEAD(&recv->r_ibinc->ii_frags);
-		rds_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr);
 	}
 
 	WARN_ON(recv->r_frag); /* leak! */
-	recv->r_frag = kmem_cache_alloc(rds_ib_frag_slab, GFP_NOWAIT);
+	recv->r_frag = rds_ib_refill_one_frag(ic);
 	if (!recv->r_frag)
 		goto out;
-	INIT_LIST_HEAD(&recv->r_frag->f_item);
-	sg_init_table(&recv->r_frag->f_sg, 1);
-	ret = rds_page_remainder_alloc(&recv->r_frag->f_sg,
-				       RDS_FRAG_SIZE, GFP_NOWAIT);
-	if (ret) {
-		kmem_cache_free(rds_ib_frag_slab, recv->r_frag);
-		recv->r_frag = NULL;
-		goto out;
-	}
 
 	ret = ib_dma_map_sg(ic->i_cm_id->device, &recv->r_frag->f_sg,
 			    1, DMA_FROM_DEVICE);
@@ -160,8 +340,7 @@
 /*
  * This tries to allocate and post unused work requests after making sure that
  * they have all the allocations they need to queue received fragments into
- * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
- * pairs don't go unmatched.
+ * sockets.
  *
  * -1 is returned if posting fails due to temporary resource exhaustion.
  */
@@ -216,33 +395,71 @@
 	return ret;
 }
 
-static void rds_ib_inc_purge(struct rds_incoming *inc)
+/*
+ * We want to recycle several types of recv allocations, like incs and frags.
+ * To use this, the *_free() function passes in the ptr to a list_head within
+ * the recyclee, as well as the cache to put it on.
+ *
+ * First, we put the memory on a percpu list. When this reaches a certain size,
+ * We move it to an intermediate non-percpu list in a lockless manner, with some
+ * xchg/compxchg wizardry.
+ *
+ * N.B. Instead of a list_head as the anchor, we use a single pointer, which can
+ * be NULL and xchg'd. The list is actually empty when the pointer is NULL, and
+ * list_empty() will return true with one element is actually present.
+ */
+static void rds_ib_recv_cache_put(struct list_head *new_item,
+				 struct rds_ib_refill_cache *cache)
 {
-	struct rds_ib_incoming *ibinc;
-	struct rds_page_frag *frag;
-	struct rds_page_frag *pos;
+	unsigned long flags;
+	struct rds_ib_cache_head *chp;
+	struct list_head *old;
 
-	ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
-	rdsdebug("purging ibinc %p inc %p\n", ibinc, inc);
+	local_irq_save(flags);
 
-	list_for_each_entry_safe(frag, pos, &ibinc->ii_frags, f_item) {
-		list_del_init(&frag->f_item);
-		rds_ib_frag_free(frag);
-	}
+	chp = per_cpu_ptr(cache->percpu, smp_processor_id());
+	if (!chp->first)
+		INIT_LIST_HEAD(new_item);
+	else /* put on front */
+		list_add_tail(new_item, chp->first);
+	chp->first = new_item;
+	chp->count++;
+
+	if (chp->count < RDS_IB_RECYCLE_BATCH_COUNT)
+		goto end;
+
+	/*
+	 * Return our per-cpu first list to the cache's xfer by atomically
+	 * grabbing the current xfer list, appending it to our per-cpu list,
+	 * and then atomically returning that entire list back to the
+	 * cache's xfer list as long as it's still empty.
+	 */
+	do {
+		old = xchg(&cache->xfer, NULL);
+		if (old)
+			list_splice_entire_tail(old, chp->first);
+		old = cmpxchg(&cache->xfer, NULL, chp->first);
+	} while (old);
+
+	chp->first = NULL;
+	chp->count = 0;
+end:
+	local_irq_restore(flags);
 }
 
-void rds_ib_inc_free(struct rds_incoming *inc)
+static struct list_head *rds_ib_recv_cache_get(struct rds_ib_refill_cache *cache)
 {
-	struct rds_ib_incoming *ibinc;
+	struct list_head *head = cache->ready;
 
-	ibinc = container_of(inc, struct rds_ib_incoming, ii_inc);
+	if (head) {
+		if (!list_empty(head)) {
+			cache->ready = head->next;
+			list_del_init(head);
+		} else
+			cache->ready = NULL;
+	}
 
-	rds_ib_inc_purge(inc);
-	rdsdebug("freeing ibinc %p inc %p\n", ibinc, inc);
-	BUG_ON(!list_empty(&ibinc->ii_frags));
-	kmem_cache_free(rds_ib_incoming_slab, ibinc);
-	atomic_dec(&rds_ib_allocation);
-	BUG_ON(atomic_read(&rds_ib_allocation) < 0);
+	return head;
 }
 
 int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
@@ -647,7 +864,7 @@
 		 *
 		 * FIXME: Fold this into the code path below.
 		 */
-		rds_ib_frag_free(recv->r_frag);
+		rds_ib_frag_free(ic, recv->r_frag);
 		recv->r_frag = NULL;
 		return;
 	}