libceph: multiple workspaces for CRUSH computations

Replace a global map->crush_workspace (protected by a global mutex)
with a list of workspaces, up to the number of CPUs + 1.

This is based on a patch from Robin Geuze <robing@nl.team.blue>.
Robin and his team have observed a 10-20% increase in IOPS on all
queue depths and lower CPU usage as well on a high-end all-NVMe
100GbE cluster.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 96c25f5..fa08c15 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -965,6 +965,143 @@ static int decode_pool_names(void **p, void *end, struct ceph_osdmap *map)
 }
 
 /*
+ * CRUSH workspaces
+ *
+ * workspace_manager framework borrowed from fs/btrfs/compression.c.
+ * Two simplifications: there is only one type of workspace and there
+ * is always at least one workspace.
+ */
+static struct crush_work *alloc_workspace(const struct crush_map *c)
+{
+	struct crush_work *work;
+	size_t work_size;
+
+	WARN_ON(!c->working_size);
+	work_size = crush_work_size(c, CEPH_PG_MAX_SIZE);
+	dout("%s work_size %zu bytes\n", __func__, work_size);
+
+	work = ceph_kvmalloc(work_size, GFP_NOIO);
+	if (!work)
+		return NULL;
+
+	INIT_LIST_HEAD(&work->item);
+	crush_init_workspace(c, work);
+	return work;
+}
+
+static void free_workspace(struct crush_work *work)
+{
+	WARN_ON(!list_empty(&work->item));
+	kvfree(work);
+}
+
+static void init_workspace_manager(struct workspace_manager *wsm)
+{
+	INIT_LIST_HEAD(&wsm->idle_ws);
+	spin_lock_init(&wsm->ws_lock);
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+	init_waitqueue_head(&wsm->ws_wait);
+}
+
+static void add_initial_workspace(struct workspace_manager *wsm,
+				  struct crush_work *work)
+{
+	WARN_ON(!list_empty(&wsm->idle_ws));
+
+	list_add(&work->item, &wsm->idle_ws);
+	atomic_set(&wsm->total_ws, 1);
+	wsm->free_ws = 1;
+}
+
+static void cleanup_workspace_manager(struct workspace_manager *wsm)
+{
+	struct crush_work *work;
+
+	while (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		free_workspace(work);
+	}
+	atomic_set(&wsm->total_ws, 0);
+	wsm->free_ws = 0;
+}
+
+/*
+ * Finds an available workspace or allocates a new one.  If it's not
+ * possible to allocate a new one, waits until there is one.
+ */
+static struct crush_work *get_workspace(struct workspace_manager *wsm,
+					const struct crush_map *c)
+{
+	struct crush_work *work;
+	int cpus = num_online_cpus();
+
+again:
+	spin_lock(&wsm->ws_lock);
+	if (!list_empty(&wsm->idle_ws)) {
+		work = list_first_entry(&wsm->idle_ws, struct crush_work,
+					item);
+		list_del_init(&work->item);
+		wsm->free_ws--;
+		spin_unlock(&wsm->ws_lock);
+		return work;
+
+	}
+	if (atomic_read(&wsm->total_ws) > cpus) {
+		DEFINE_WAIT(wait);
+
+		spin_unlock(&wsm->ws_lock);
+		prepare_to_wait(&wsm->ws_wait, &wait, TASK_UNINTERRUPTIBLE);
+		if (atomic_read(&wsm->total_ws) > cpus && !wsm->free_ws)
+			schedule();
+		finish_wait(&wsm->ws_wait, &wait);
+		goto again;
+	}
+	atomic_inc(&wsm->total_ws);
+	spin_unlock(&wsm->ws_lock);
+
+	work = alloc_workspace(c);
+	if (!work) {
+		atomic_dec(&wsm->total_ws);
+		wake_up(&wsm->ws_wait);
+
+		/*
+		 * Do not return the error but go back to waiting.  We
+		 * have the inital workspace and the CRUSH computation
+		 * time is bounded so we will get it eventually.
+		 */
+		WARN_ON(atomic_read(&wsm->total_ws) < 1);
+		goto again;
+	}
+	return work;
+}
+
+/*
+ * Puts a workspace back on the list or frees it if we have enough
+ * idle ones sitting around.
+ */
+static void put_workspace(struct workspace_manager *wsm,
+			  struct crush_work *work)
+{
+	spin_lock(&wsm->ws_lock);
+	if (wsm->free_ws <= num_online_cpus()) {
+		list_add(&work->item, &wsm->idle_ws);
+		wsm->free_ws++;
+		spin_unlock(&wsm->ws_lock);
+		goto wake;
+	}
+	spin_unlock(&wsm->ws_lock);
+
+	free_workspace(work);
+	atomic_dec(&wsm->total_ws);
+wake:
+	if (wq_has_sleeper(&wsm->ws_wait))
+		wake_up(&wsm->ws_wait);
+}
+
+/*
  * osd map
  */
 struct ceph_osdmap *ceph_osdmap_alloc(void)
@@ -981,7 +1118,8 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
 	map->primary_temp = RB_ROOT;
 	map->pg_upmap = RB_ROOT;
 	map->pg_upmap_items = RB_ROOT;
-	mutex_init(&map->crush_workspace_mutex);
+
+	init_workspace_manager(&map->crush_wsm);
 
 	return map;
 }
@@ -989,8 +1127,11 @@ struct ceph_osdmap *ceph_osdmap_alloc(void)
 void ceph_osdmap_destroy(struct ceph_osdmap *map)
 {
 	dout("osdmap_destroy %p\n", map);
+
 	if (map->crush)
 		crush_destroy(map->crush);
+	cleanup_workspace_manager(&map->crush_wsm);
+
 	while (!RB_EMPTY_ROOT(&map->pg_temp)) {
 		struct ceph_pg_mapping *pg =
 			rb_entry(rb_first(&map->pg_temp),
@@ -1029,7 +1170,6 @@ void ceph_osdmap_destroy(struct ceph_osdmap *map)
 	kvfree(map->osd_weight);
 	kvfree(map->osd_addr);
 	kvfree(map->osd_primary_affinity);
-	kvfree(map->crush_workspace);
 	kfree(map);
 }
 
@@ -1104,26 +1244,22 @@ static int osdmap_set_max_osd(struct ceph_osdmap *map, u32 max)
 
 static int osdmap_set_crush(struct ceph_osdmap *map, struct crush_map *crush)
 {
-	void *workspace;
-	size_t work_size;
+	struct crush_work *work;
 
 	if (IS_ERR(crush))
 		return PTR_ERR(crush);
 
-	work_size = crush_work_size(crush, CEPH_PG_MAX_SIZE);
-	dout("%s work_size %zu bytes\n", __func__, work_size);
-	workspace = ceph_kvmalloc(work_size, GFP_NOIO);
-	if (!workspace) {
+	work = alloc_workspace(crush);
+	if (!work) {
 		crush_destroy(crush);
 		return -ENOMEM;
 	}
-	crush_init_workspace(crush, workspace);
 
 	if (map->crush)
 		crush_destroy(map->crush);
-	kvfree(map->crush_workspace);
+	cleanup_workspace_manager(&map->crush_wsm);
 	map->crush = crush;
-	map->crush_workspace = workspace;
+	add_initial_workspace(&map->crush_wsm, work);
 	return 0;
 }
 
@@ -2322,6 +2458,7 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 		    s64 choose_args_index)
 {
 	struct crush_choose_arg_map *arg_map;
+	struct crush_work *work;
 	int r;
 
 	BUG_ON(result_max > CEPH_PG_MAX_SIZE);
@@ -2332,12 +2469,11 @@ static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 		arg_map = lookup_choose_arg_map(&map->crush->choose_args,
 						CEPH_DEFAULT_CHOOSE_ARGS);
 
-	mutex_lock(&map->crush_workspace_mutex);
+	work = get_workspace(&map->crush_wsm, map->crush);
 	r = crush_do_rule(map->crush, ruleno, x, result, result_max,
-			  weight, weight_max, map->crush_workspace,
+			  weight, weight_max, work,
 			  arg_map ? arg_map->args : NULL);
-	mutex_unlock(&map->crush_workspace_mutex);
-
+	put_workspace(&map->crush_wsm, work);
 	return r;
 }