blob: da94f3c101af77985272b72861bca34db1c853aa [file] [log] [blame]
David Howells07fe7cb2009-04-03 16:42:35 +01001/* Worker thread pool for slow items, such as filesystem lookups or mkdirs
2 *
3 * Copyright (C) 2008 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public Licence
8 * as published by the Free Software Foundation; either version
9 * 2 of the Licence, or (at your option) any later version.
David Howells8f0aa2f2009-04-03 16:42:35 +010010 *
11 * See Documentation/slow-work.txt
David Howells07fe7cb2009-04-03 16:42:35 +010012 */
13
14#include <linux/module.h>
15#include <linux/slow-work.h>
16#include <linux/kthread.h>
17#include <linux/freezer.h>
18#include <linux/wait.h>
David Howells8fba10a2009-11-19 18:10:51 +000019#include <linux/proc_fs.h>
20#include "slow-work.h"
David Howells3d7a6412009-11-19 18:10:23 +000021
David Howells109d9272009-04-03 16:42:35 +010022static void slow_work_cull_timeout(unsigned long);
23static void slow_work_oom_timeout(unsigned long);
24
David Howells12e22c52009-04-03 16:42:35 +010025#ifdef CONFIG_SYSCTL
Alexey Dobriyan8d65af72009-09-23 15:57:19 -070026static int slow_work_min_threads_sysctl(struct ctl_table *, int,
David Howells12e22c52009-04-03 16:42:35 +010027 void __user *, size_t *, loff_t *);
28
Alexey Dobriyan8d65af72009-09-23 15:57:19 -070029static int slow_work_max_threads_sysctl(struct ctl_table *, int ,
David Howells12e22c52009-04-03 16:42:35 +010030 void __user *, size_t *, loff_t *);
31#endif
32
David Howells07fe7cb2009-04-03 16:42:35 +010033/*
34 * The pool of threads has at least min threads in it as long as someone is
35 * using the facility, and may have as many as max.
36 *
37 * A portion of the pool may be processing very slow operations.
38 */
39static unsigned slow_work_min_threads = 2;
40static unsigned slow_work_max_threads = 4;
41static unsigned vslow_work_proportion = 50; /* % of threads that may process
42 * very slow work */
David Howells12e22c52009-04-03 16:42:35 +010043
44#ifdef CONFIG_SYSCTL
45static const int slow_work_min_min_threads = 2;
David Howells3d7a6412009-11-19 18:10:23 +000046static int slow_work_max_max_threads = SLOW_WORK_THREAD_LIMIT;
David Howells12e22c52009-04-03 16:42:35 +010047static const int slow_work_min_vslow = 1;
48static const int slow_work_max_vslow = 99;
49
50ctl_table slow_work_sysctls[] = {
51 {
52 .ctl_name = CTL_UNNUMBERED,
53 .procname = "min-threads",
54 .data = &slow_work_min_threads,
55 .maxlen = sizeof(unsigned),
56 .mode = 0644,
57 .proc_handler = slow_work_min_threads_sysctl,
58 .extra1 = (void *) &slow_work_min_min_threads,
59 .extra2 = &slow_work_max_threads,
60 },
61 {
62 .ctl_name = CTL_UNNUMBERED,
63 .procname = "max-threads",
64 .data = &slow_work_max_threads,
65 .maxlen = sizeof(unsigned),
66 .mode = 0644,
67 .proc_handler = slow_work_max_threads_sysctl,
68 .extra1 = &slow_work_min_threads,
69 .extra2 = (void *) &slow_work_max_max_threads,
70 },
71 {
72 .ctl_name = CTL_UNNUMBERED,
73 .procname = "vslow-percentage",
74 .data = &vslow_work_proportion,
75 .maxlen = sizeof(unsigned),
76 .mode = 0644,
77 .proc_handler = &proc_dointvec_minmax,
78 .extra1 = (void *) &slow_work_min_vslow,
79 .extra2 = (void *) &slow_work_max_vslow,
80 },
81 { .ctl_name = 0 }
82};
83#endif
84
85/*
86 * The active state of the thread pool
87 */
David Howells07fe7cb2009-04-03 16:42:35 +010088static atomic_t slow_work_thread_count;
89static atomic_t vslow_work_executing_count;
90
David Howells109d9272009-04-03 16:42:35 +010091static bool slow_work_may_not_start_new_thread;
92static bool slow_work_cull; /* cull a thread due to lack of activity */
93static DEFINE_TIMER(slow_work_cull_timer, slow_work_cull_timeout, 0, 0);
94static DEFINE_TIMER(slow_work_oom_timer, slow_work_oom_timeout, 0, 0);
95static struct slow_work slow_work_new_thread; /* new thread starter */
96
David Howells07fe7cb2009-04-03 16:42:35 +010097/*
David Howells3d7a6412009-11-19 18:10:23 +000098 * slow work ID allocation (use slow_work_queue_lock)
99 */
100static DECLARE_BITMAP(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
101
102/*
103 * Unregistration tracking to prevent put_ref() from disappearing during module
104 * unload
105 */
106#ifdef CONFIG_MODULES
107static struct module *slow_work_thread_processing[SLOW_WORK_THREAD_LIMIT];
108static struct module *slow_work_unreg_module;
109static struct slow_work *slow_work_unreg_work_item;
110static DECLARE_WAIT_QUEUE_HEAD(slow_work_unreg_wq);
111static DEFINE_MUTEX(slow_work_unreg_sync_lock);
112#endif
113
114/*
David Howells8fba10a2009-11-19 18:10:51 +0000115 * Data for tracking currently executing items for indication through /proc
116 */
117#ifdef CONFIG_SLOW_WORK_PROC
118struct slow_work *slow_work_execs[SLOW_WORK_THREAD_LIMIT];
119pid_t slow_work_pids[SLOW_WORK_THREAD_LIMIT];
120DEFINE_RWLOCK(slow_work_execs_lock);
121#endif
122
123/*
David Howells07fe7cb2009-04-03 16:42:35 +0100124 * The queues of work items and the lock governing access to them. These are
125 * shared between all the CPUs. It doesn't make sense to have per-CPU queues
126 * as the number of threads bears no relation to the number of CPUs.
127 *
128 * There are two queues of work items: one for slow work items, and one for
129 * very slow work items.
130 */
David Howells8fba10a2009-11-19 18:10:51 +0000131LIST_HEAD(slow_work_queue);
132LIST_HEAD(vslow_work_queue);
133DEFINE_SPINLOCK(slow_work_queue_lock);
David Howells07fe7cb2009-04-03 16:42:35 +0100134
135/*
David Howells3bde31a2009-11-19 18:10:57 +0000136 * The following are two wait queues that get pinged when a work item is placed
137 * on an empty queue. These allow work items that are hogging a thread by
138 * sleeping in a way that could be deferred to yield their thread and enqueue
139 * themselves.
140 */
141static DECLARE_WAIT_QUEUE_HEAD(slow_work_queue_waits_for_occupation);
142static DECLARE_WAIT_QUEUE_HEAD(vslow_work_queue_waits_for_occupation);
143
144/*
David Howells07fe7cb2009-04-03 16:42:35 +0100145 * The thread controls. A variable used to signal to the threads that they
146 * should exit when the queue is empty, a waitqueue used by the threads to wait
147 * for signals, and a completion set by the last thread to exit.
148 */
149static bool slow_work_threads_should_exit;
150static DECLARE_WAIT_QUEUE_HEAD(slow_work_thread_wq);
151static DECLARE_COMPLETION(slow_work_last_thread_exited);
152
153/*
154 * The number of users of the thread pool and its lock. Whilst this is zero we
155 * have no threads hanging around, and when this reaches zero, we wait for all
156 * active or queued work items to complete and kill all the threads we do have.
157 */
158static int slow_work_user_count;
159static DEFINE_MUTEX(slow_work_user_lock);
160
Jens Axboe4d8bb2c2009-11-19 18:10:39 +0000161static inline int slow_work_get_ref(struct slow_work *work)
162{
163 if (work->ops->get_ref)
164 return work->ops->get_ref(work);
165
166 return 0;
167}
168
169static inline void slow_work_put_ref(struct slow_work *work)
170{
171 if (work->ops->put_ref)
172 work->ops->put_ref(work);
173}
174
David Howells07fe7cb2009-04-03 16:42:35 +0100175/*
176 * Calculate the maximum number of active threads in the pool that are
177 * permitted to process very slow work items.
178 *
179 * The answer is rounded up to at least 1, but may not equal or exceed the
180 * maximum number of the threads in the pool. This means we always have at
181 * least one thread that can process slow work items, and we always have at
182 * least one thread that won't get tied up doing so.
183 */
184static unsigned slow_work_calc_vsmax(void)
185{
186 unsigned vsmax;
187
188 vsmax = atomic_read(&slow_work_thread_count) * vslow_work_proportion;
189 vsmax /= 100;
190 vsmax = max(vsmax, 1U);
191 return min(vsmax, slow_work_max_threads - 1);
192}
193
194/*
195 * Attempt to execute stuff queued on a slow thread. Return true if we managed
196 * it, false if there was nothing to do.
197 */
David Howells8fba10a2009-11-19 18:10:51 +0000198static noinline bool slow_work_execute(int id)
David Howells07fe7cb2009-04-03 16:42:35 +0100199{
David Howells3d7a6412009-11-19 18:10:23 +0000200#ifdef CONFIG_MODULES
201 struct module *module;
202#endif
David Howells07fe7cb2009-04-03 16:42:35 +0100203 struct slow_work *work = NULL;
204 unsigned vsmax;
205 bool very_slow;
206
207 vsmax = slow_work_calc_vsmax();
208
David Howells109d9272009-04-03 16:42:35 +0100209 /* see if we can schedule a new thread to be started if we're not
210 * keeping up with the work */
211 if (!waitqueue_active(&slow_work_thread_wq) &&
212 (!list_empty(&slow_work_queue) || !list_empty(&vslow_work_queue)) &&
213 atomic_read(&slow_work_thread_count) < slow_work_max_threads &&
214 !slow_work_may_not_start_new_thread)
215 slow_work_enqueue(&slow_work_new_thread);
216
David Howells07fe7cb2009-04-03 16:42:35 +0100217 /* find something to execute */
218 spin_lock_irq(&slow_work_queue_lock);
219 if (!list_empty(&vslow_work_queue) &&
220 atomic_read(&vslow_work_executing_count) < vsmax) {
221 work = list_entry(vslow_work_queue.next,
222 struct slow_work, link);
223 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
224 BUG();
225 list_del_init(&work->link);
226 atomic_inc(&vslow_work_executing_count);
227 very_slow = true;
228 } else if (!list_empty(&slow_work_queue)) {
229 work = list_entry(slow_work_queue.next,
230 struct slow_work, link);
231 if (test_and_set_bit_lock(SLOW_WORK_EXECUTING, &work->flags))
232 BUG();
233 list_del_init(&work->link);
234 very_slow = false;
235 } else {
236 very_slow = false; /* avoid the compiler warning */
237 }
David Howells3d7a6412009-11-19 18:10:23 +0000238
239#ifdef CONFIG_MODULES
240 if (work)
241 slow_work_thread_processing[id] = work->owner;
242#endif
David Howells8fba10a2009-11-19 18:10:51 +0000243 if (work) {
244 slow_work_mark_time(work);
245 slow_work_begin_exec(id, work);
246 }
David Howells3d7a6412009-11-19 18:10:23 +0000247
David Howells07fe7cb2009-04-03 16:42:35 +0100248 spin_unlock_irq(&slow_work_queue_lock);
249
250 if (!work)
251 return false;
252
253 if (!test_and_clear_bit(SLOW_WORK_PENDING, &work->flags))
254 BUG();
255
Jens Axboe01609502009-11-19 18:10:43 +0000256 /* don't execute if the work is in the process of being cancelled */
257 if (!test_bit(SLOW_WORK_CANCELLING, &work->flags))
258 work->ops->execute(work);
David Howells07fe7cb2009-04-03 16:42:35 +0100259
260 if (very_slow)
261 atomic_dec(&vslow_work_executing_count);
262 clear_bit_unlock(SLOW_WORK_EXECUTING, &work->flags);
263
Jens Axboe01609502009-11-19 18:10:43 +0000264 /* wake up anyone waiting for this work to be complete */
265 wake_up_bit(&work->flags, SLOW_WORK_EXECUTING);
266
David Howells8fba10a2009-11-19 18:10:51 +0000267 slow_work_end_exec(id, work);
268
David Howells07fe7cb2009-04-03 16:42:35 +0100269 /* if someone tried to enqueue the item whilst we were executing it,
270 * then it'll be left unenqueued to avoid multiple threads trying to
271 * execute it simultaneously
272 *
273 * there is, however, a race between us testing the pending flag and
274 * getting the spinlock, and between the enqueuer setting the pending
275 * flag and getting the spinlock, so we use a deferral bit to tell us
276 * if the enqueuer got there first
277 */
278 if (test_bit(SLOW_WORK_PENDING, &work->flags)) {
279 spin_lock_irq(&slow_work_queue_lock);
280
281 if (!test_bit(SLOW_WORK_EXECUTING, &work->flags) &&
282 test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags))
283 goto auto_requeue;
284
285 spin_unlock_irq(&slow_work_queue_lock);
286 }
287
David Howells3d7a6412009-11-19 18:10:23 +0000288 /* sort out the race between module unloading and put_ref() */
Jens Axboe4d8bb2c2009-11-19 18:10:39 +0000289 slow_work_put_ref(work);
David Howells3d7a6412009-11-19 18:10:23 +0000290
291#ifdef CONFIG_MODULES
292 module = slow_work_thread_processing[id];
293 slow_work_thread_processing[id] = NULL;
294 smp_mb();
295 if (slow_work_unreg_work_item == work ||
296 slow_work_unreg_module == module)
297 wake_up_all(&slow_work_unreg_wq);
298#endif
299
David Howells07fe7cb2009-04-03 16:42:35 +0100300 return true;
301
302auto_requeue:
303 /* we must complete the enqueue operation
304 * - we transfer our ref on the item back to the appropriate queue
305 * - don't wake another thread up as we're awake already
306 */
David Howells8fba10a2009-11-19 18:10:51 +0000307 slow_work_mark_time(work);
David Howells07fe7cb2009-04-03 16:42:35 +0100308 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags))
309 list_add_tail(&work->link, &vslow_work_queue);
310 else
311 list_add_tail(&work->link, &slow_work_queue);
312 spin_unlock_irq(&slow_work_queue_lock);
David Howells3d7a6412009-11-19 18:10:23 +0000313 slow_work_thread_processing[id] = NULL;
David Howells07fe7cb2009-04-03 16:42:35 +0100314 return true;
315}
316
317/**
David Howells3bde31a2009-11-19 18:10:57 +0000318 * slow_work_sleep_till_thread_needed - Sleep till thread needed by other work
319 * work: The work item under execution that wants to sleep
320 * _timeout: Scheduler sleep timeout
321 *
322 * Allow a requeueable work item to sleep on a slow-work processor thread until
323 * that thread is needed to do some other work or the sleep is interrupted by
324 * some other event.
325 *
326 * The caller must set up a wake up event before calling this and must have set
327 * the appropriate sleep mode (such as TASK_UNINTERRUPTIBLE) and tested its own
328 * condition before calling this function as no test is made here.
329 *
330 * False is returned if there is nothing on the queue; true is returned if the
331 * work item should be requeued
332 */
333bool slow_work_sleep_till_thread_needed(struct slow_work *work,
334 signed long *_timeout)
335{
336 wait_queue_head_t *wfo_wq;
337 struct list_head *queue;
338
339 DEFINE_WAIT(wait);
340
341 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
342 wfo_wq = &vslow_work_queue_waits_for_occupation;
343 queue = &vslow_work_queue;
344 } else {
345 wfo_wq = &slow_work_queue_waits_for_occupation;
346 queue = &slow_work_queue;
347 }
348
349 if (!list_empty(queue))
350 return true;
351
352 add_wait_queue_exclusive(wfo_wq, &wait);
353 if (list_empty(queue))
354 *_timeout = schedule_timeout(*_timeout);
355 finish_wait(wfo_wq, &wait);
356
357 return !list_empty(queue);
358}
359EXPORT_SYMBOL(slow_work_sleep_till_thread_needed);
360
361/**
David Howells07fe7cb2009-04-03 16:42:35 +0100362 * slow_work_enqueue - Schedule a slow work item for processing
363 * @work: The work item to queue
364 *
365 * Schedule a slow work item for processing. If the item is already undergoing
366 * execution, this guarantees not to re-enter the execution routine until the
367 * first execution finishes.
368 *
369 * The item is pinned by this function as it retains a reference to it, managed
370 * through the item operations. The item is unpinned once it has been
371 * executed.
372 *
373 * An item may hog the thread that is running it for a relatively large amount
374 * of time, sufficient, for example, to perform several lookup, mkdir, create
375 * and setxattr operations. It may sleep on I/O and may sleep to obtain locks.
376 *
377 * Conversely, if a number of items are awaiting processing, it may take some
378 * time before any given item is given attention. The number of threads in the
379 * pool may be increased to deal with demand, but only up to a limit.
380 *
381 * If SLOW_WORK_VERY_SLOW is set on the work item, then it will be placed in
382 * the very slow queue, from which only a portion of the threads will be
383 * allowed to pick items to execute. This ensures that very slow items won't
384 * overly block ones that are just ordinarily slow.
385 *
Jens Axboe01609502009-11-19 18:10:43 +0000386 * Returns 0 if successful, -EAGAIN if not (or -ECANCELED if cancelled work is
387 * attempted queued)
David Howells07fe7cb2009-04-03 16:42:35 +0100388 */
389int slow_work_enqueue(struct slow_work *work)
390{
David Howells3bde31a2009-11-19 18:10:57 +0000391 wait_queue_head_t *wfo_wq;
392 struct list_head *queue;
David Howells07fe7cb2009-04-03 16:42:35 +0100393 unsigned long flags;
Jens Axboe01609502009-11-19 18:10:43 +0000394 int ret;
395
396 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
397 return -ECANCELED;
David Howells07fe7cb2009-04-03 16:42:35 +0100398
399 BUG_ON(slow_work_user_count <= 0);
400 BUG_ON(!work);
401 BUG_ON(!work->ops);
David Howells07fe7cb2009-04-03 16:42:35 +0100402
403 /* when honouring an enqueue request, we only promise that we will run
404 * the work function in the future; we do not promise to run it once
405 * per enqueue request
406 *
407 * we use the PENDING bit to merge together repeat requests without
408 * having to disable IRQs and take the spinlock, whilst still
409 * maintaining our promise
410 */
411 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
David Howells3bde31a2009-11-19 18:10:57 +0000412 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
413 wfo_wq = &vslow_work_queue_waits_for_occupation;
414 queue = &vslow_work_queue;
415 } else {
416 wfo_wq = &slow_work_queue_waits_for_occupation;
417 queue = &slow_work_queue;
418 }
419
David Howells07fe7cb2009-04-03 16:42:35 +0100420 spin_lock_irqsave(&slow_work_queue_lock, flags);
421
Jens Axboe01609502009-11-19 18:10:43 +0000422 if (unlikely(test_bit(SLOW_WORK_CANCELLING, &work->flags)))
423 goto cancelled;
424
David Howells07fe7cb2009-04-03 16:42:35 +0100425 /* we promise that we will not attempt to execute the work
426 * function in more than one thread simultaneously
427 *
428 * this, however, leaves us with a problem if we're asked to
429 * enqueue the work whilst someone is executing the work
430 * function as simply queueing the work immediately means that
431 * another thread may try executing it whilst it is already
432 * under execution
433 *
434 * to deal with this, we set the ENQ_DEFERRED bit instead of
435 * enqueueing, and the thread currently executing the work
436 * function will enqueue the work item when the work function
437 * returns and it has cleared the EXECUTING bit
438 */
439 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
440 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
441 } else {
Jens Axboe01609502009-11-19 18:10:43 +0000442 ret = slow_work_get_ref(work);
443 if (ret < 0)
444 goto failed;
David Howells8fba10a2009-11-19 18:10:51 +0000445 slow_work_mark_time(work);
David Howells3bde31a2009-11-19 18:10:57 +0000446 list_add_tail(&work->link, queue);
David Howells07fe7cb2009-04-03 16:42:35 +0100447 wake_up(&slow_work_thread_wq);
David Howells3bde31a2009-11-19 18:10:57 +0000448
449 /* if someone who could be requeued is sleeping on a
450 * thread, then ask them to yield their thread */
451 if (work->link.prev == queue)
452 wake_up(wfo_wq);
David Howells07fe7cb2009-04-03 16:42:35 +0100453 }
454
455 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
456 }
457 return 0;
458
Jens Axboe01609502009-11-19 18:10:43 +0000459cancelled:
460 ret = -ECANCELED;
461failed:
David Howells07fe7cb2009-04-03 16:42:35 +0100462 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
Jens Axboe01609502009-11-19 18:10:43 +0000463 return ret;
David Howells07fe7cb2009-04-03 16:42:35 +0100464}
465EXPORT_SYMBOL(slow_work_enqueue);
466
Jens Axboe01609502009-11-19 18:10:43 +0000467static int slow_work_wait(void *word)
468{
469 schedule();
470 return 0;
471}
472
473/**
474 * slow_work_cancel - Cancel a slow work item
475 * @work: The work item to cancel
476 *
477 * This function will cancel a previously enqueued work item. If we cannot
478 * cancel the work item, it is guarenteed to have run when this function
479 * returns.
480 */
481void slow_work_cancel(struct slow_work *work)
482{
483 bool wait = true, put = false;
484
485 set_bit(SLOW_WORK_CANCELLING, &work->flags);
Jens Axboe6b8268b2009-11-19 18:10:47 +0000486 smp_mb();
487
488 /* if the work item is a delayed work item with an active timer, we
489 * need to wait for the timer to finish _before_ getting the spinlock,
490 * lest we deadlock against the timer routine
491 *
492 * the timer routine will leave DELAYED set if it notices the
493 * CANCELLING flag in time
494 */
495 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
496 struct delayed_slow_work *dwork =
497 container_of(work, struct delayed_slow_work, work);
498 del_timer_sync(&dwork->timer);
499 }
Jens Axboe01609502009-11-19 18:10:43 +0000500
501 spin_lock_irq(&slow_work_queue_lock);
502
Jens Axboe6b8268b2009-11-19 18:10:47 +0000503 if (test_bit(SLOW_WORK_DELAYED, &work->flags)) {
504 /* the timer routine aborted or never happened, so we are left
505 * holding the timer's reference on the item and should just
506 * drop the pending flag and wait for any ongoing execution to
507 * finish */
508 struct delayed_slow_work *dwork =
509 container_of(work, struct delayed_slow_work, work);
510
511 BUG_ON(timer_pending(&dwork->timer));
512 BUG_ON(!list_empty(&work->link));
513
514 clear_bit(SLOW_WORK_DELAYED, &work->flags);
515 put = true;
516 clear_bit(SLOW_WORK_PENDING, &work->flags);
517
518 } else if (test_bit(SLOW_WORK_PENDING, &work->flags) &&
519 !list_empty(&work->link)) {
Jens Axboe01609502009-11-19 18:10:43 +0000520 /* the link in the pending queue holds a reference on the item
521 * that we will need to release */
522 list_del_init(&work->link);
523 wait = false;
524 put = true;
525 clear_bit(SLOW_WORK_PENDING, &work->flags);
526
527 } else if (test_and_clear_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags)) {
528 /* the executor is holding our only reference on the item, so
529 * we merely need to wait for it to finish executing */
530 clear_bit(SLOW_WORK_PENDING, &work->flags);
531 }
532
533 spin_unlock_irq(&slow_work_queue_lock);
534
535 /* the EXECUTING flag is set by the executor whilst the spinlock is set
536 * and before the item is dequeued - so assuming the above doesn't
537 * actually dequeue it, simply waiting for the EXECUTING flag to be
538 * released here should be sufficient */
539 if (wait)
540 wait_on_bit(&work->flags, SLOW_WORK_EXECUTING, slow_work_wait,
541 TASK_UNINTERRUPTIBLE);
542
543 clear_bit(SLOW_WORK_CANCELLING, &work->flags);
544 if (put)
545 slow_work_put_ref(work);
546}
547EXPORT_SYMBOL(slow_work_cancel);
548
David Howells07fe7cb2009-04-03 16:42:35 +0100549/*
Jens Axboe6b8268b2009-11-19 18:10:47 +0000550 * Handle expiry of the delay timer, indicating that a delayed slow work item
551 * should now be queued if not cancelled
552 */
553static void delayed_slow_work_timer(unsigned long data)
554{
David Howells3bde31a2009-11-19 18:10:57 +0000555 wait_queue_head_t *wfo_wq;
556 struct list_head *queue;
Jens Axboe6b8268b2009-11-19 18:10:47 +0000557 struct slow_work *work = (struct slow_work *) data;
558 unsigned long flags;
David Howells3bde31a2009-11-19 18:10:57 +0000559 bool queued = false, put = false, first = false;
560
561 if (test_bit(SLOW_WORK_VERY_SLOW, &work->flags)) {
562 wfo_wq = &vslow_work_queue_waits_for_occupation;
563 queue = &vslow_work_queue;
564 } else {
565 wfo_wq = &slow_work_queue_waits_for_occupation;
566 queue = &slow_work_queue;
567 }
Jens Axboe6b8268b2009-11-19 18:10:47 +0000568
569 spin_lock_irqsave(&slow_work_queue_lock, flags);
570 if (likely(!test_bit(SLOW_WORK_CANCELLING, &work->flags))) {
571 clear_bit(SLOW_WORK_DELAYED, &work->flags);
572
573 if (test_bit(SLOW_WORK_EXECUTING, &work->flags)) {
574 /* we discard the reference the timer was holding in
575 * favour of the one the executor holds */
576 set_bit(SLOW_WORK_ENQ_DEFERRED, &work->flags);
577 put = true;
578 } else {
David Howells8fba10a2009-11-19 18:10:51 +0000579 slow_work_mark_time(work);
David Howells3bde31a2009-11-19 18:10:57 +0000580 list_add_tail(&work->link, queue);
Jens Axboe6b8268b2009-11-19 18:10:47 +0000581 queued = true;
David Howells3bde31a2009-11-19 18:10:57 +0000582 if (work->link.prev == queue)
583 first = true;
Jens Axboe6b8268b2009-11-19 18:10:47 +0000584 }
585 }
586
587 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
588 if (put)
589 slow_work_put_ref(work);
David Howells3bde31a2009-11-19 18:10:57 +0000590 if (first)
591 wake_up(wfo_wq);
Jens Axboe6b8268b2009-11-19 18:10:47 +0000592 if (queued)
593 wake_up(&slow_work_thread_wq);
594}
595
596/**
597 * delayed_slow_work_enqueue - Schedule a delayed slow work item for processing
598 * @dwork: The delayed work item to queue
599 * @delay: When to start executing the work, in jiffies from now
600 *
601 * This is similar to slow_work_enqueue(), but it adds a delay before the work
602 * is actually queued for processing.
603 *
604 * The item can have delayed processing requested on it whilst it is being
605 * executed. The delay will begin immediately, and if it expires before the
606 * item finishes executing, the item will be placed back on the queue when it
607 * has done executing.
608 */
609int delayed_slow_work_enqueue(struct delayed_slow_work *dwork,
610 unsigned long delay)
611{
612 struct slow_work *work = &dwork->work;
613 unsigned long flags;
614 int ret;
615
616 if (delay == 0)
617 return slow_work_enqueue(&dwork->work);
618
619 BUG_ON(slow_work_user_count <= 0);
620 BUG_ON(!work);
621 BUG_ON(!work->ops);
622
623 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
624 return -ECANCELED;
625
626 if (!test_and_set_bit_lock(SLOW_WORK_PENDING, &work->flags)) {
627 spin_lock_irqsave(&slow_work_queue_lock, flags);
628
629 if (test_bit(SLOW_WORK_CANCELLING, &work->flags))
630 goto cancelled;
631
632 /* the timer holds a reference whilst it is pending */
633 ret = work->ops->get_ref(work);
634 if (ret < 0)
635 goto cant_get_ref;
636
637 if (test_and_set_bit(SLOW_WORK_DELAYED, &work->flags))
638 BUG();
639 dwork->timer.expires = jiffies + delay;
640 dwork->timer.data = (unsigned long) work;
641 dwork->timer.function = delayed_slow_work_timer;
642 add_timer(&dwork->timer);
643
644 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
645 }
646
647 return 0;
648
649cancelled:
650 ret = -ECANCELED;
651cant_get_ref:
652 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
653 return ret;
654}
655EXPORT_SYMBOL(delayed_slow_work_enqueue);
656
657/*
Chris Peterson009789f2009-06-16 15:33:43 -0700658 * Schedule a cull of the thread pool at some time in the near future
659 */
660static void slow_work_schedule_cull(void)
661{
662 mod_timer(&slow_work_cull_timer,
663 round_jiffies(jiffies + SLOW_WORK_CULL_TIMEOUT));
664}
665
666/*
David Howells109d9272009-04-03 16:42:35 +0100667 * Worker thread culling algorithm
668 */
669static bool slow_work_cull_thread(void)
670{
671 unsigned long flags;
672 bool do_cull = false;
673
674 spin_lock_irqsave(&slow_work_queue_lock, flags);
675
676 if (slow_work_cull) {
677 slow_work_cull = false;
678
679 if (list_empty(&slow_work_queue) &&
680 list_empty(&vslow_work_queue) &&
681 atomic_read(&slow_work_thread_count) >
682 slow_work_min_threads) {
Chris Peterson009789f2009-06-16 15:33:43 -0700683 slow_work_schedule_cull();
David Howells109d9272009-04-03 16:42:35 +0100684 do_cull = true;
685 }
686 }
687
688 spin_unlock_irqrestore(&slow_work_queue_lock, flags);
689 return do_cull;
690}
691
692/*
David Howells07fe7cb2009-04-03 16:42:35 +0100693 * Determine if there is slow work available for dispatch
694 */
695static inline bool slow_work_available(int vsmax)
696{
697 return !list_empty(&slow_work_queue) ||
698 (!list_empty(&vslow_work_queue) &&
699 atomic_read(&vslow_work_executing_count) < vsmax);
700}
701
702/*
703 * Worker thread dispatcher
704 */
705static int slow_work_thread(void *_data)
706{
David Howells3d7a6412009-11-19 18:10:23 +0000707 int vsmax, id;
David Howells07fe7cb2009-04-03 16:42:35 +0100708
709 DEFINE_WAIT(wait);
710
711 set_freezable();
712 set_user_nice(current, -5);
713
David Howells3d7a6412009-11-19 18:10:23 +0000714 /* allocate ourselves an ID */
715 spin_lock_irq(&slow_work_queue_lock);
716 id = find_first_zero_bit(slow_work_ids, SLOW_WORK_THREAD_LIMIT);
717 BUG_ON(id < 0 || id >= SLOW_WORK_THREAD_LIMIT);
718 __set_bit(id, slow_work_ids);
David Howells8fba10a2009-11-19 18:10:51 +0000719 slow_work_set_thread_pid(id, current->pid);
David Howells3d7a6412009-11-19 18:10:23 +0000720 spin_unlock_irq(&slow_work_queue_lock);
721
722 sprintf(current->comm, "kslowd%03u", id);
723
David Howells07fe7cb2009-04-03 16:42:35 +0100724 for (;;) {
725 vsmax = vslow_work_proportion;
726 vsmax *= atomic_read(&slow_work_thread_count);
727 vsmax /= 100;
728
Oleg Nesterovb415c492009-06-11 13:12:55 +0100729 prepare_to_wait_exclusive(&slow_work_thread_wq, &wait,
730 TASK_INTERRUPTIBLE);
David Howells07fe7cb2009-04-03 16:42:35 +0100731 if (!freezing(current) &&
732 !slow_work_threads_should_exit &&
David Howells109d9272009-04-03 16:42:35 +0100733 !slow_work_available(vsmax) &&
734 !slow_work_cull)
David Howells07fe7cb2009-04-03 16:42:35 +0100735 schedule();
736 finish_wait(&slow_work_thread_wq, &wait);
737
738 try_to_freeze();
739
740 vsmax = vslow_work_proportion;
741 vsmax *= atomic_read(&slow_work_thread_count);
742 vsmax /= 100;
743
David Howells3d7a6412009-11-19 18:10:23 +0000744 if (slow_work_available(vsmax) && slow_work_execute(id)) {
David Howells07fe7cb2009-04-03 16:42:35 +0100745 cond_resched();
David Howells109d9272009-04-03 16:42:35 +0100746 if (list_empty(&slow_work_queue) &&
747 list_empty(&vslow_work_queue) &&
748 atomic_read(&slow_work_thread_count) >
749 slow_work_min_threads)
Chris Peterson009789f2009-06-16 15:33:43 -0700750 slow_work_schedule_cull();
David Howells07fe7cb2009-04-03 16:42:35 +0100751 continue;
752 }
753
754 if (slow_work_threads_should_exit)
755 break;
David Howells109d9272009-04-03 16:42:35 +0100756
757 if (slow_work_cull && slow_work_cull_thread())
758 break;
David Howells07fe7cb2009-04-03 16:42:35 +0100759 }
760
David Howells3d7a6412009-11-19 18:10:23 +0000761 spin_lock_irq(&slow_work_queue_lock);
David Howells8fba10a2009-11-19 18:10:51 +0000762 slow_work_set_thread_pid(id, 0);
David Howells3d7a6412009-11-19 18:10:23 +0000763 __clear_bit(id, slow_work_ids);
764 spin_unlock_irq(&slow_work_queue_lock);
765
David Howells07fe7cb2009-04-03 16:42:35 +0100766 if (atomic_dec_and_test(&slow_work_thread_count))
767 complete_and_exit(&slow_work_last_thread_exited, 0);
768 return 0;
769}
770
David Howells109d9272009-04-03 16:42:35 +0100771/*
772 * Handle thread cull timer expiration
773 */
774static void slow_work_cull_timeout(unsigned long data)
775{
776 slow_work_cull = true;
777 wake_up(&slow_work_thread_wq);
778}
779
780/*
David Howells109d9272009-04-03 16:42:35 +0100781 * Start a new slow work thread
782 */
783static void slow_work_new_thread_execute(struct slow_work *work)
784{
785 struct task_struct *p;
786
787 if (slow_work_threads_should_exit)
788 return;
789
790 if (atomic_read(&slow_work_thread_count) >= slow_work_max_threads)
791 return;
792
793 if (!mutex_trylock(&slow_work_user_lock))
794 return;
795
796 slow_work_may_not_start_new_thread = true;
797 atomic_inc(&slow_work_thread_count);
798 p = kthread_run(slow_work_thread, NULL, "kslowd");
799 if (IS_ERR(p)) {
800 printk(KERN_DEBUG "Slow work thread pool: OOM\n");
801 if (atomic_dec_and_test(&slow_work_thread_count))
802 BUG(); /* we're running on a slow work thread... */
803 mod_timer(&slow_work_oom_timer,
Chris Peterson009789f2009-06-16 15:33:43 -0700804 round_jiffies(jiffies + SLOW_WORK_OOM_TIMEOUT));
David Howells109d9272009-04-03 16:42:35 +0100805 } else {
806 /* ratelimit the starting of new threads */
807 mod_timer(&slow_work_oom_timer, jiffies + 1);
808 }
809
810 mutex_unlock(&slow_work_user_lock);
811}
812
813static const struct slow_work_ops slow_work_new_thread_ops = {
David Howells3d7a6412009-11-19 18:10:23 +0000814 .owner = THIS_MODULE,
David Howells109d9272009-04-03 16:42:35 +0100815 .execute = slow_work_new_thread_execute,
David Howells8fba10a2009-11-19 18:10:51 +0000816#ifdef CONFIG_SLOW_WORK_PROC
817 .desc = slow_work_new_thread_desc,
818#endif
David Howells109d9272009-04-03 16:42:35 +0100819};
820
821/*
822 * post-OOM new thread start suppression expiration
823 */
824static void slow_work_oom_timeout(unsigned long data)
825{
826 slow_work_may_not_start_new_thread = false;
827}
828
David Howells12e22c52009-04-03 16:42:35 +0100829#ifdef CONFIG_SYSCTL
830/*
831 * Handle adjustment of the minimum number of threads
832 */
833static int slow_work_min_threads_sysctl(struct ctl_table *table, int write,
Alexey Dobriyan8d65af72009-09-23 15:57:19 -0700834 void __user *buffer,
David Howells12e22c52009-04-03 16:42:35 +0100835 size_t *lenp, loff_t *ppos)
836{
Alexey Dobriyan8d65af72009-09-23 15:57:19 -0700837 int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
David Howells12e22c52009-04-03 16:42:35 +0100838 int n;
839
840 if (ret == 0) {
841 mutex_lock(&slow_work_user_lock);
842 if (slow_work_user_count > 0) {
843 /* see if we need to start or stop threads */
844 n = atomic_read(&slow_work_thread_count) -
845 slow_work_min_threads;
846
847 if (n < 0 && !slow_work_may_not_start_new_thread)
848 slow_work_enqueue(&slow_work_new_thread);
849 else if (n > 0)
Chris Peterson009789f2009-06-16 15:33:43 -0700850 slow_work_schedule_cull();
David Howells12e22c52009-04-03 16:42:35 +0100851 }
852 mutex_unlock(&slow_work_user_lock);
853 }
854
855 return ret;
856}
857
858/*
859 * Handle adjustment of the maximum number of threads
860 */
861static int slow_work_max_threads_sysctl(struct ctl_table *table, int write,
Alexey Dobriyan8d65af72009-09-23 15:57:19 -0700862 void __user *buffer,
David Howells12e22c52009-04-03 16:42:35 +0100863 size_t *lenp, loff_t *ppos)
864{
Alexey Dobriyan8d65af72009-09-23 15:57:19 -0700865 int ret = proc_dointvec_minmax(table, write, buffer, lenp, ppos);
David Howells12e22c52009-04-03 16:42:35 +0100866 int n;
867
868 if (ret == 0) {
869 mutex_lock(&slow_work_user_lock);
870 if (slow_work_user_count > 0) {
871 /* see if we need to stop threads */
872 n = slow_work_max_threads -
873 atomic_read(&slow_work_thread_count);
874
875 if (n < 0)
Chris Peterson009789f2009-06-16 15:33:43 -0700876 slow_work_schedule_cull();
David Howells12e22c52009-04-03 16:42:35 +0100877 }
878 mutex_unlock(&slow_work_user_lock);
879 }
880
881 return ret;
882}
883#endif /* CONFIG_SYSCTL */
884
David Howells07fe7cb2009-04-03 16:42:35 +0100885/**
886 * slow_work_register_user - Register a user of the facility
David Howells3d7a6412009-11-19 18:10:23 +0000887 * @module: The module about to make use of the facility
David Howells07fe7cb2009-04-03 16:42:35 +0100888 *
889 * Register a user of the facility, starting up the initial threads if there
890 * aren't any other users at this point. This will return 0 if successful, or
891 * an error if not.
892 */
David Howells3d7a6412009-11-19 18:10:23 +0000893int slow_work_register_user(struct module *module)
David Howells07fe7cb2009-04-03 16:42:35 +0100894{
895 struct task_struct *p;
896 int loop;
897
898 mutex_lock(&slow_work_user_lock);
899
900 if (slow_work_user_count == 0) {
901 printk(KERN_NOTICE "Slow work thread pool: Starting up\n");
902 init_completion(&slow_work_last_thread_exited);
903
904 slow_work_threads_should_exit = false;
David Howells109d9272009-04-03 16:42:35 +0100905 slow_work_init(&slow_work_new_thread,
906 &slow_work_new_thread_ops);
907 slow_work_may_not_start_new_thread = false;
908 slow_work_cull = false;
David Howells07fe7cb2009-04-03 16:42:35 +0100909
910 /* start the minimum number of threads */
911 for (loop = 0; loop < slow_work_min_threads; loop++) {
912 atomic_inc(&slow_work_thread_count);
913 p = kthread_run(slow_work_thread, NULL, "kslowd");
914 if (IS_ERR(p))
915 goto error;
916 }
917 printk(KERN_NOTICE "Slow work thread pool: Ready\n");
918 }
919
920 slow_work_user_count++;
921 mutex_unlock(&slow_work_user_lock);
922 return 0;
923
924error:
925 if (atomic_dec_and_test(&slow_work_thread_count))
926 complete(&slow_work_last_thread_exited);
927 if (loop > 0) {
928 printk(KERN_ERR "Slow work thread pool:"
929 " Aborting startup on ENOMEM\n");
930 slow_work_threads_should_exit = true;
931 wake_up_all(&slow_work_thread_wq);
932 wait_for_completion(&slow_work_last_thread_exited);
933 printk(KERN_ERR "Slow work thread pool: Aborted\n");
934 }
935 mutex_unlock(&slow_work_user_lock);
936 return PTR_ERR(p);
937}
938EXPORT_SYMBOL(slow_work_register_user);
939
David Howells3d7a6412009-11-19 18:10:23 +0000940/*
941 * wait for all outstanding items from the calling module to complete
942 * - note that more items may be queued whilst we're waiting
943 */
944static void slow_work_wait_for_items(struct module *module)
945{
946 DECLARE_WAITQUEUE(myself, current);
947 struct slow_work *work;
948 int loop;
949
950 mutex_lock(&slow_work_unreg_sync_lock);
951 add_wait_queue(&slow_work_unreg_wq, &myself);
952
953 for (;;) {
954 spin_lock_irq(&slow_work_queue_lock);
955
956 /* first of all, we wait for the last queued item in each list
957 * to be processed */
958 list_for_each_entry_reverse(work, &vslow_work_queue, link) {
959 if (work->owner == module) {
960 set_current_state(TASK_UNINTERRUPTIBLE);
961 slow_work_unreg_work_item = work;
962 goto do_wait;
963 }
964 }
965 list_for_each_entry_reverse(work, &slow_work_queue, link) {
966 if (work->owner == module) {
967 set_current_state(TASK_UNINTERRUPTIBLE);
968 slow_work_unreg_work_item = work;
969 goto do_wait;
970 }
971 }
972
973 /* then we wait for the items being processed to finish */
974 slow_work_unreg_module = module;
975 smp_mb();
976 for (loop = 0; loop < SLOW_WORK_THREAD_LIMIT; loop++) {
977 if (slow_work_thread_processing[loop] == module)
978 goto do_wait;
979 }
980 spin_unlock_irq(&slow_work_queue_lock);
981 break; /* okay, we're done */
982
983 do_wait:
984 spin_unlock_irq(&slow_work_queue_lock);
985 schedule();
986 slow_work_unreg_work_item = NULL;
987 slow_work_unreg_module = NULL;
988 }
989
990 remove_wait_queue(&slow_work_unreg_wq, &myself);
991 mutex_unlock(&slow_work_unreg_sync_lock);
992}
993
David Howells07fe7cb2009-04-03 16:42:35 +0100994/**
995 * slow_work_unregister_user - Unregister a user of the facility
David Howells3d7a6412009-11-19 18:10:23 +0000996 * @module: The module whose items should be cleared
David Howells07fe7cb2009-04-03 16:42:35 +0100997 *
998 * Unregister a user of the facility, killing all the threads if this was the
999 * last one.
David Howells3d7a6412009-11-19 18:10:23 +00001000 *
1001 * This waits for all the work items belonging to the nominated module to go
1002 * away before proceeding.
David Howells07fe7cb2009-04-03 16:42:35 +01001003 */
David Howells3d7a6412009-11-19 18:10:23 +00001004void slow_work_unregister_user(struct module *module)
David Howells07fe7cb2009-04-03 16:42:35 +01001005{
David Howells3d7a6412009-11-19 18:10:23 +00001006 /* first of all, wait for all outstanding items from the calling module
1007 * to complete */
1008 if (module)
1009 slow_work_wait_for_items(module);
1010
1011 /* then we can actually go about shutting down the facility if need
1012 * be */
David Howells07fe7cb2009-04-03 16:42:35 +01001013 mutex_lock(&slow_work_user_lock);
1014
1015 BUG_ON(slow_work_user_count <= 0);
1016
1017 slow_work_user_count--;
1018 if (slow_work_user_count == 0) {
1019 printk(KERN_NOTICE "Slow work thread pool: Shutting down\n");
1020 slow_work_threads_should_exit = true;
Jonathan Corbet418df63c2009-04-22 12:01:49 +01001021 del_timer_sync(&slow_work_cull_timer);
1022 del_timer_sync(&slow_work_oom_timer);
David Howells07fe7cb2009-04-03 16:42:35 +01001023 wake_up_all(&slow_work_thread_wq);
1024 wait_for_completion(&slow_work_last_thread_exited);
1025 printk(KERN_NOTICE "Slow work thread pool:"
1026 " Shut down complete\n");
1027 }
1028
David Howells07fe7cb2009-04-03 16:42:35 +01001029 mutex_unlock(&slow_work_user_lock);
1030}
1031EXPORT_SYMBOL(slow_work_unregister_user);
1032
1033/*
1034 * Initialise the slow work facility
1035 */
1036static int __init init_slow_work(void)
1037{
1038 unsigned nr_cpus = num_possible_cpus();
1039
David Howells12e22c52009-04-03 16:42:35 +01001040 if (slow_work_max_threads < nr_cpus)
David Howells07fe7cb2009-04-03 16:42:35 +01001041 slow_work_max_threads = nr_cpus;
David Howells12e22c52009-04-03 16:42:35 +01001042#ifdef CONFIG_SYSCTL
1043 if (slow_work_max_max_threads < nr_cpus * 2)
1044 slow_work_max_max_threads = nr_cpus * 2;
1045#endif
David Howells8fba10a2009-11-19 18:10:51 +00001046#ifdef CONFIG_SLOW_WORK_PROC
1047 proc_create("slow_work_rq", S_IFREG | 0400, NULL,
1048 &slow_work_runqueue_fops);
1049#endif
David Howells07fe7cb2009-04-03 16:42:35 +01001050 return 0;
1051}
1052
1053subsys_initcall(init_slow_work);