SLOW_WORK: Wait for outstanding work items belonging to a module to clear

Wait for outstanding slow work items belonging to a module to clear when
unregistering that module as a user of the facility.  This prevents the put_ref
code of a work item from being taken away before it returns.

Signed-off-by: David Howells <dhowells@redhat.com>
diff --git a/kernel/slow-work.c b/kernel/slow-work.c
index 0d31135..dd08f37 100644
--- a/kernel/slow-work.c
+++ b/kernel/slow-work.c
@@ -22,6 +22,8 @@
 #define SLOW_WORK_OOM_TIMEOUT (5 * HZ)	/* can't start new threads for 5s after
 					 * OOM */
 
+#define SLOW_WORK_THREAD_LIMIT	255	/* abs maximum number of slow-work threads */
+
 static void slow_work_cull_timeout(unsigned long);
 static void slow_work_oom_timeout(unsigned long);
 
@@ -46,7 +48,7 @@
 
 #ifdef CONFIG_SYSCTL
 static const int slow_work_min_min_threads = 2;
-static int slow_work_max_max_threads = 255;
+static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
 static const int slow_work_min_vslow = 1;
 static const int slow_work_max_vslow = 99;
 
@@ -98,6 +100,23 @@
 static struct slow_work slow_work_new_thread; /* new thread starter */
 
 /*
+ * slow work ID allocation (use slow_work_queue_lock)
+ */
+static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
+
+/*
+ * Unregistration tracking to prevent put_ref() from disappearing during module
+ * unload
+ */
+#ifdef CONFIG_MODULES
+static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
+static struct module *slow_work_unreg_module;
+static struct slow_work *slow_work_unreg_work_item;
+static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
+static DEFINE_MUTEX(slow_work_unreg_sync_lock);
+#endif
+
+/*
  * The queues of work items and the lock governing access to them.  These are
  * shared between all the CPUs.  It doesn't make sense to have per-CPU queues
  * as the number of threads bears no relation to the number of CPUs.
@@ -149,8 +168,11 @@
  * Attempt to execute stuff queued on a slow thread.  Return true if we managed
  * it, false if there was nothing to do.
  */
-static bool slow_work_execute(void)
+static bool slow_work_execute(int id)
 {
+#ifdef CONFIG_MODULES
+	struct module *module;
+#endif
 	struct slow_work *work = NULL;
 	unsigned vsmax;
 	bool very_slow;
@@ -186,6 +208,12 @@
 	} else {
 		very_slow = false; /* avoid the compiler warning */
 	}
+
+#ifdef CONFIG_MODULES
+	if (work)
+		slow_work_thread_processing[id] = work->owner;
+#endif
+
 	spin_unlock_irq(&slow_work_queue_lock);
 
 	if (!work)
@@ -219,7 +247,18 @@
 		spin_unlock_irq(&slow_work_queue_lock);
 	}
 
+	/* sort out the race between module unloading and put_ref() */
 	work->ops->put_ref(work);
+
+#ifdef CONFIG_MODULES
+	module = slow_work_thread_processing[id];
+	slow_work_thread_processing[id] = NULL;
+	smp_mb();
+	if (slow_work_unreg_work_item == work ||
+	    slow_work_unreg_module == module)
+		wake_up_all(&slow_work_unreg_wq);
+#endif
+
 	return true;
 
 auto_requeue:
@@ -232,6 +271,7 @@
 	else
 		list_add_tail(&work->link, &slow_work_queue);
 	spin_unlock_irq(&slow_work_queue_lock);
+	slow_work_thread_processing[id] = NULL;
 	return true;
 }
 
@@ -368,13 +408,22 @@
  */
 static int slow_work_thread(void *_data)
 {
-	int vsmax;
+	int vsmax, id;
 
 	DEFINE_WAIT(wait);
 
 	set_freezable();
 	set_user_nice(current, -5);
 
+	/* allocate ourselves an ID */
+	spin_lock_irq(&slow_work_queue_lock);
+	id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
+	BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
+	__set_bit(id, slow_work_ids);
+	spin_unlock_irq(&slow_work_queue_lock);
+
+	sprintf(current->comm, "kslowd%03u", id);
+
 	for (;;) {
 		vsmax = vslow_work_proportion;
 		vsmax *= atomic_read(&slow_work_thread_count);
@@ -395,7 +444,7 @@
 		vsmax *= atomic_read(&slow_work_thread_count);
 		vsmax /= 100;
 
-		if (slow_work_available(vsmax) && slow_work_execute()) {
+		if (slow_work_available(vsmax) && slow_work_execute(id)) {
 			cond_resched();
 			if (list_empty(&slow_work_queue) &&
 			    list_empty(&vslow_work_queue) &&
@@ -412,6 +461,10 @@
 			break;
 	}
 
+	spin_lock_irq(&slow_work_queue_lock);
+	__clear_bit(id, slow_work_ids);
+	spin_unlock_irq(&slow_work_queue_lock);
+
 	if (atomic_dec_and_test(&slow_work_thread_count))
 		complete_and_exit(&slow_work_last_thread_exited, 0);
 	return 0;
@@ -475,6 +528,7 @@
 }
 
 static const struct slow_work_ops slow_work_new_thread_ops = {
+	.owner		= THIS_MODULE,
 	.get_ref	= slow_work_new_thread_get_ref,
 	.put_ref	= slow_work_new_thread_put_ref,
 	.execute	= slow_work_new_thread_execute,
@@ -546,12 +600,13 @@
 
 /**
  * slow_work_register_user - Register a user of the facility
+ * @module: The module about to make use of the facility
  *
  * Register a user of the facility, starting up the initial threads if there
  * aren't any other users at this point.  This will return 0 if successful, or
  * an error if not.
  */
-int slow_work_register_user(void)
+int slow_work_register_user(struct module *module)
 {
 	struct task_struct *p;
 	int loop;
@@ -598,14 +653,79 @@
 }
 EXPORT_SYMBOL(slow_work_register_user);
 
+/*
+ * wait for all outstanding items from the calling module to complete
+ * - note that more items may be queued whilst we're waiting
+ */
+static void slow_work_wait_for_items(struct module *module)
+{
+	DECLARE_WAITQUEUE(myself, current);
+	struct slow_work *work;
+	int loop;
+
+	mutex_lock(&slow_work_unreg_sync_lock);
+	add_wait_queue(&slow_work_unreg_wq, &myself);
+
+	for (;;) {
+		spin_lock_irq(&slow_work_queue_lock);
+
+		/* first of all, we wait for the last queued item in each list
+		 * to be processed */
+		list_for_each_entry_reverse(work, &vslow_work_queue, link) {
+			if (work->owner == module) {
+				set_current_state(TASK_UNINTERRUPTIBLE);
+				slow_work_unreg_work_item = work;
+				goto do_wait;
+			}
+		}
+		list_for_each_entry_reverse(work, &slow_work_queue, link) {
+			if (work->owner == module) {
+				set_current_state(TASK_UNINTERRUPTIBLE);
+				slow_work_unreg_work_item = work;
+				goto do_wait;
+			}
+		}
+
+		/* then we wait for the items being processed to finish */
+		slow_work_unreg_module = module;
+		smp_mb();
+		for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
+			if (slow_work_thread_processing[loop] == module)
+				goto do_wait;
+		}
+		spin_unlock_irq(&slow_work_queue_lock);
+		break; /* okay, we're done */
+
+	do_wait:
+		spin_unlock_irq(&slow_work_queue_lock);
+		schedule();
+		slow_work_unreg_work_item = NULL;
+		slow_work_unreg_module = NULL;
+	}
+
+	remove_wait_queue(&slow_work_unreg_wq, &myself);
+	mutex_unlock(&slow_work_unreg_sync_lock);
+}
+
 /**
  * slow_work_unregister_user - Unregister a user of the facility
+ * @module: The module whose items should be cleared
  *
  * Unregister a user of the facility, killing all the threads if this was the
  * last one.
+ *
+ * This waits for all the work items belonging to the nominated module to go
+ * away before proceeding.
  */
-void slow_work_unregister_user(void)
+void slow_work_unregister_user(struct module *module)
 {
+	/* first of all, wait for all outstanding items from the calling module
+	 * to complete */
+	if (module)
+		slow_work_wait_for_items(module);
+
+	/* then we can actually go about shutting down the facility if need
+	 * be */
 	mutex_lock(&slow_work_user_lock);
 
 	BUG_ON(slow_work_user_count <= 0);