blob: 3bbab2c58695dd10f207c6443f2f151eabb8b2e5 [file] [log] [blame]
Jens Axboe771b53d02019-10-22 10:25:58 -06001// SPDX-License-Identifier: GPL-2.0
2/*
3 * Basic worker thread pool for io_uring
4 *
5 * Copyright (C) 2019 Jens Axboe
6 *
7 */
8#include <linux/kernel.h>
9#include <linux/init.h>
10#include <linux/errno.h>
11#include <linux/sched/signal.h>
12#include <linux/mm.h>
13#include <linux/mmu_context.h>
14#include <linux/sched/mm.h>
15#include <linux/percpu.h>
16#include <linux/slab.h>
17#include <linux/kthread.h>
18#include <linux/rculist_nulls.h>
19
20#include "io-wq.h"
21
22#define WORKER_IDLE_TIMEOUT (5 * HZ)
23
24enum {
25 IO_WORKER_F_UP = 1, /* up and active */
26 IO_WORKER_F_RUNNING = 2, /* account as running */
27 IO_WORKER_F_FREE = 4, /* worker on free list */
28 IO_WORKER_F_EXITING = 8, /* worker exiting */
29 IO_WORKER_F_FIXED = 16, /* static idle worker */
30};
31
32enum {
33 IO_WQ_BIT_EXIT = 0, /* wq exiting */
34 IO_WQ_BIT_CANCEL = 1, /* cancel work on list */
35};
36
37enum {
38 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
39};
40
41/*
42 * One for each thread in a wqe pool
43 */
44struct io_worker {
45 refcount_t ref;
46 unsigned flags;
47 struct hlist_nulls_node nulls_node;
48 struct task_struct *task;
49 wait_queue_head_t wait;
50 struct io_wqe *wqe;
51 struct io_wq_work *cur_work;
52
53 struct rcu_head rcu;
54 struct mm_struct *mm;
Jens Axboefcb323c2019-10-24 12:39:47 -060055 struct files_struct *restore_files;
Jens Axboe771b53d02019-10-22 10:25:58 -060056};
57
58struct io_wq_nulls_list {
59 struct hlist_nulls_head head;
60 unsigned long nulls;
61};
62
63#if BITS_PER_LONG == 64
64#define IO_WQ_HASH_ORDER 6
65#else
66#define IO_WQ_HASH_ORDER 5
67#endif
68
69/*
70 * Per-node worker thread pool
71 */
72struct io_wqe {
73 struct {
74 spinlock_t lock;
75 struct list_head work_list;
76 unsigned long hash_map;
77 unsigned flags;
78 } ____cacheline_aligned_in_smp;
79
80 int node;
81 unsigned nr_workers;
82 unsigned max_workers;
83 atomic_t nr_running;
84
85 struct io_wq_nulls_list free_list;
86 struct io_wq_nulls_list busy_list;
87
88 struct io_wq *wq;
89};
90
91/*
92 * Per io_wq state
93 */
94struct io_wq {
95 struct io_wqe **wqes;
96 unsigned long state;
97 unsigned nr_wqes;
98
99 struct task_struct *manager;
100 struct mm_struct *mm;
101 refcount_t refs;
102 struct completion done;
103};
104
Jens Axboe771b53d02019-10-22 10:25:58 -0600105static bool io_worker_get(struct io_worker *worker)
106{
107 return refcount_inc_not_zero(&worker->ref);
108}
109
110static void io_worker_release(struct io_worker *worker)
111{
112 if (refcount_dec_and_test(&worker->ref))
113 wake_up_process(worker->task);
114}
115
116/*
117 * Note: drops the wqe->lock if returning true! The caller must re-acquire
118 * the lock in that case. Some callers need to restart handling if this
119 * happens, so we can't just re-acquire the lock on behalf of the caller.
120 */
121static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
122{
Jens Axboefcb323c2019-10-24 12:39:47 -0600123 bool dropped_lock = false;
124
125 if (current->files != worker->restore_files) {
126 __acquire(&wqe->lock);
127 spin_unlock_irq(&wqe->lock);
128 dropped_lock = true;
129
130 task_lock(current);
131 current->files = worker->restore_files;
132 task_unlock(current);
133 }
134
Jens Axboe771b53d02019-10-22 10:25:58 -0600135 /*
136 * If we have an active mm, we need to drop the wq lock before unusing
137 * it. If we do, return true and let the caller retry the idle loop.
138 */
139 if (worker->mm) {
Jens Axboefcb323c2019-10-24 12:39:47 -0600140 if (!dropped_lock) {
141 __acquire(&wqe->lock);
142 spin_unlock_irq(&wqe->lock);
143 dropped_lock = true;
144 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600145 __set_current_state(TASK_RUNNING);
146 set_fs(KERNEL_DS);
147 unuse_mm(worker->mm);
148 mmput(worker->mm);
149 worker->mm = NULL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600150 }
151
Jens Axboefcb323c2019-10-24 12:39:47 -0600152 return dropped_lock;
Jens Axboe771b53d02019-10-22 10:25:58 -0600153}
154
155static void io_worker_exit(struct io_worker *worker)
156{
157 struct io_wqe *wqe = worker->wqe;
158 bool all_done = false;
159
160 /*
161 * If we're not at zero, someone else is holding a brief reference
162 * to the worker. Wait for that to go away.
163 */
164 set_current_state(TASK_INTERRUPTIBLE);
165 if (!refcount_dec_and_test(&worker->ref))
166 schedule();
167 __set_current_state(TASK_RUNNING);
168
169 preempt_disable();
170 current->flags &= ~PF_IO_WORKER;
171 if (worker->flags & IO_WORKER_F_RUNNING)
172 atomic_dec(&wqe->nr_running);
173 worker->flags = 0;
174 preempt_enable();
175
176 spin_lock_irq(&wqe->lock);
177 hlist_nulls_del_rcu(&worker->nulls_node);
178 if (__io_worker_unuse(wqe, worker)) {
179 __release(&wqe->lock);
180 spin_lock_irq(&wqe->lock);
181 }
182 wqe->nr_workers--;
183 all_done = !wqe->nr_workers;
184 spin_unlock_irq(&wqe->lock);
185
186 /* all workers gone, wq exit can proceed */
187 if (all_done && refcount_dec_and_test(&wqe->wq->refs))
188 complete(&wqe->wq->done);
189
YueHaibing364b05f2019-11-02 15:55:01 +0800190 kfree_rcu(worker, rcu);
Jens Axboe771b53d02019-10-22 10:25:58 -0600191}
192
193static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
194{
195 allow_kernel_signal(SIGINT);
196
197 current->flags |= PF_IO_WORKER;
198
199 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
Jens Axboefcb323c2019-10-24 12:39:47 -0600200 worker->restore_files = current->files;
Jens Axboe771b53d02019-10-22 10:25:58 -0600201 atomic_inc(&wqe->nr_running);
202}
203
204/*
205 * Worker will start processing some work. Move it to the busy list, if
206 * it's currently on the freelist
207 */
208static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
209 struct io_wq_work *work)
210 __must_hold(wqe->lock)
211{
212 if (worker->flags & IO_WORKER_F_FREE) {
213 worker->flags &= ~IO_WORKER_F_FREE;
214 hlist_nulls_del_init_rcu(&worker->nulls_node);
215 hlist_nulls_add_head_rcu(&worker->nulls_node,
216 &wqe->busy_list.head);
217 }
218 worker->cur_work = work;
219}
220
221/*
222 * No work, worker going to sleep. Move to freelist, and unuse mm if we
223 * have one attached. Dropping the mm may potentially sleep, so we drop
224 * the lock in that case and return success. Since the caller has to
225 * retry the loop in that case (we changed task state), we don't regrab
226 * the lock if we return success.
227 */
228static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
229 __must_hold(wqe->lock)
230{
231 if (!(worker->flags & IO_WORKER_F_FREE)) {
232 worker->flags |= IO_WORKER_F_FREE;
233 hlist_nulls_del_init_rcu(&worker->nulls_node);
234 hlist_nulls_add_head_rcu(&worker->nulls_node,
235 &wqe->free_list.head);
236 }
237
238 return __io_worker_unuse(wqe, worker);
239}
240
241static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
242 __must_hold(wqe->lock)
243{
244 struct io_wq_work *work;
245
246 list_for_each_entry(work, &wqe->work_list, list) {
247 /* not hashed, can run anytime */
248 if (!(work->flags & IO_WQ_WORK_HASHED)) {
249 list_del(&work->list);
250 return work;
251 }
252
253 /* hashed, can run if not already running */
254 *hash = work->flags >> IO_WQ_HASH_SHIFT;
255 if (!(wqe->hash_map & BIT_ULL(*hash))) {
256 wqe->hash_map |= BIT_ULL(*hash);
257 list_del(&work->list);
258 return work;
259 }
260 }
261
262 return NULL;
263}
264
265static void io_worker_handle_work(struct io_worker *worker)
266 __releases(wqe->lock)
267{
268 struct io_wq_work *work, *old_work;
269 struct io_wqe *wqe = worker->wqe;
270 struct io_wq *wq = wqe->wq;
271
272 do {
273 unsigned hash = -1U;
274
275 /*
276 * Signals are either sent to cancel specific work, or to just
277 * cancel all work items. For the former, ->cur_work must
278 * match. ->cur_work is NULL at this point, since we haven't
279 * assigned any work, so it's safe to flush signals for that
280 * case. For the latter case of cancelling all work, the caller
281 * wil have set IO_WQ_BIT_CANCEL.
282 */
283 if (signal_pending(current))
284 flush_signals(current);
285
286 /*
287 * If we got some work, mark us as busy. If we didn't, but
288 * the list isn't empty, it means we stalled on hashed work.
289 * Mark us stalled so we don't keep looking for work when we
290 * can't make progress, any work completion or insertion will
291 * clear the stalled flag.
292 */
293 work = io_get_next_work(wqe, &hash);
294 if (work)
295 __io_worker_busy(wqe, worker, work);
296 else if (!list_empty(&wqe->work_list))
297 wqe->flags |= IO_WQE_FLAG_STALLED;
298
299 spin_unlock_irq(&wqe->lock);
300 if (!work)
301 break;
302next:
Jens Axboefcb323c2019-10-24 12:39:47 -0600303 if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
304 current->files != work->files) {
305 task_lock(current);
306 current->files = work->files;
307 task_unlock(current);
308 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600309 if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
310 wq->mm && mmget_not_zero(wq->mm)) {
311 use_mm(wq->mm);
312 set_fs(USER_DS);
313 worker->mm = wq->mm;
314 }
315 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
316 work->flags |= IO_WQ_WORK_CANCEL;
317 if (worker->mm)
318 work->flags |= IO_WQ_WORK_HAS_MM;
319
320 old_work = work;
321 work->func(&work);
322
323 spin_lock_irq(&wqe->lock);
324 worker->cur_work = NULL;
325 if (hash != -1U) {
326 wqe->hash_map &= ~BIT_ULL(hash);
327 wqe->flags &= ~IO_WQE_FLAG_STALLED;
328 }
329 if (work && work != old_work) {
330 spin_unlock_irq(&wqe->lock);
331 /* dependent work not hashed */
332 hash = -1U;
333 goto next;
334 }
335 } while (1);
336}
337
338static inline bool io_wqe_run_queue(struct io_wqe *wqe)
339 __must_hold(wqe->lock)
340{
341 if (!list_empty_careful(&wqe->work_list) &&
342 !(wqe->flags & IO_WQE_FLAG_STALLED))
343 return true;
344 return false;
345}
346
347static int io_wqe_worker(void *data)
348{
349 struct io_worker *worker = data;
350 struct io_wqe *wqe = worker->wqe;
351 struct io_wq *wq = wqe->wq;
352 DEFINE_WAIT(wait);
353
354 io_worker_start(wqe, worker);
355
356 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
357 prepare_to_wait(&worker->wait, &wait, TASK_INTERRUPTIBLE);
358
359 spin_lock_irq(&wqe->lock);
360 if (io_wqe_run_queue(wqe)) {
361 __set_current_state(TASK_RUNNING);
362 io_worker_handle_work(worker);
363 continue;
364 }
365 /* drops the lock on success, retry */
366 if (__io_worker_idle(wqe, worker)) {
367 __release(&wqe->lock);
368 continue;
369 }
370 spin_unlock_irq(&wqe->lock);
371 if (signal_pending(current))
372 flush_signals(current);
373 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
374 continue;
375 /* timed out, exit unless we're the fixed worker */
376 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
377 !(worker->flags & IO_WORKER_F_FIXED))
378 break;
379 }
380
381 finish_wait(&worker->wait, &wait);
382
383 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
384 spin_lock_irq(&wqe->lock);
385 if (!list_empty(&wqe->work_list))
386 io_worker_handle_work(worker);
387 else
388 spin_unlock_irq(&wqe->lock);
389 }
390
391 io_worker_exit(worker);
392 return 0;
393}
394
395/*
396 * Check head of free list for an available worker. If one isn't available,
397 * caller must wake up the wq manager to create one.
398 */
399static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
400 __must_hold(RCU)
401{
402 struct hlist_nulls_node *n;
403 struct io_worker *worker;
404
405 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list.head));
406 if (is_a_nulls(n))
407 return false;
408
409 worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
410 if (io_worker_get(worker)) {
411 wake_up(&worker->wait);
412 io_worker_release(worker);
413 return true;
414 }
415
416 return false;
417}
418
419/*
420 * We need a worker. If we find a free one, we're good. If not, and we're
421 * below the max number of workers, wake up the manager to create one.
422 */
423static void io_wqe_wake_worker(struct io_wqe *wqe)
424{
425 bool ret;
426
427 rcu_read_lock();
428 ret = io_wqe_activate_free_worker(wqe);
429 rcu_read_unlock();
430
431 if (!ret && wqe->nr_workers < wqe->max_workers)
432 wake_up_process(wqe->wq->manager);
433}
434
435/*
436 * Called when a worker is scheduled in. Mark us as currently running.
437 */
438void io_wq_worker_running(struct task_struct *tsk)
439{
440 struct io_worker *worker = kthread_data(tsk);
441 struct io_wqe *wqe = worker->wqe;
442
443 if (!(worker->flags & IO_WORKER_F_UP))
444 return;
445 if (worker->flags & IO_WORKER_F_RUNNING)
446 return;
447 worker->flags |= IO_WORKER_F_RUNNING;
448 atomic_inc(&wqe->nr_running);
449}
450
451/*
452 * Called when worker is going to sleep. If there are no workers currently
453 * running and we have work pending, wake up a free one or have the manager
454 * set one up.
455 */
456void io_wq_worker_sleeping(struct task_struct *tsk)
457{
458 struct io_worker *worker = kthread_data(tsk);
459 struct io_wqe *wqe = worker->wqe;
460
461 if (!(worker->flags & IO_WORKER_F_UP))
462 return;
463 if (!(worker->flags & IO_WORKER_F_RUNNING))
464 return;
465
466 worker->flags &= ~IO_WORKER_F_RUNNING;
467
468 spin_lock_irq(&wqe->lock);
469 if (atomic_dec_and_test(&wqe->nr_running) && io_wqe_run_queue(wqe))
470 io_wqe_wake_worker(wqe);
471 spin_unlock_irq(&wqe->lock);
472}
473
474static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe)
475{
476 struct io_worker *worker;
477
478 worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node);
479 if (!worker)
480 return;
481
482 refcount_set(&worker->ref, 1);
483 worker->nulls_node.pprev = NULL;
484 init_waitqueue_head(&worker->wait);
485 worker->wqe = wqe;
486
487 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
488 "io_wqe_worker-%d", wqe->node);
489 if (IS_ERR(worker->task)) {
490 kfree(worker);
491 return;
492 }
493
494 spin_lock_irq(&wqe->lock);
495 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list.head);
496 worker->flags |= IO_WORKER_F_FREE;
497 if (!wqe->nr_workers)
498 worker->flags |= IO_WORKER_F_FIXED;
499 wqe->nr_workers++;
500 spin_unlock_irq(&wqe->lock);
501
502 wake_up_process(worker->task);
503}
504
505static inline bool io_wqe_need_new_worker(struct io_wqe *wqe)
506 __must_hold(wqe->lock)
507{
508 if (!wqe->nr_workers)
509 return true;
510 if (hlist_nulls_empty(&wqe->free_list.head) &&
511 wqe->nr_workers < wqe->max_workers && io_wqe_run_queue(wqe))
512 return true;
513
514 return false;
515}
516
517/*
518 * Manager thread. Tasked with creating new workers, if we need them.
519 */
520static int io_wq_manager(void *data)
521{
522 struct io_wq *wq = data;
523
524 while (!kthread_should_stop()) {
525 int i;
526
527 for (i = 0; i < wq->nr_wqes; i++) {
528 struct io_wqe *wqe = wq->wqes[i];
529 bool fork_worker = false;
530
531 spin_lock_irq(&wqe->lock);
532 fork_worker = io_wqe_need_new_worker(wqe);
533 spin_unlock_irq(&wqe->lock);
534 if (fork_worker)
535 create_io_worker(wq, wqe);
536 }
537 set_current_state(TASK_INTERRUPTIBLE);
538 schedule_timeout(HZ);
539 }
540
541 return 0;
542}
543
544static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
545{
546 unsigned long flags;
547
548 spin_lock_irqsave(&wqe->lock, flags);
549 list_add_tail(&work->list, &wqe->work_list);
550 wqe->flags &= ~IO_WQE_FLAG_STALLED;
551 spin_unlock_irqrestore(&wqe->lock, flags);
552
553 if (!atomic_read(&wqe->nr_running))
554 io_wqe_wake_worker(wqe);
555}
556
557void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
558{
559 struct io_wqe *wqe = wq->wqes[numa_node_id()];
560
561 io_wqe_enqueue(wqe, work);
562}
563
564/*
565 * Enqueue work, hashed by some key. Work items that hash to the same value
566 * will not be done in parallel. Used to limit concurrent writes, generally
567 * hashed by inode.
568 */
569void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val)
570{
571 struct io_wqe *wqe = wq->wqes[numa_node_id()];
572 unsigned bit;
573
574
575 bit = hash_ptr(val, IO_WQ_HASH_ORDER);
576 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
577 io_wqe_enqueue(wqe, work);
578}
579
580static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
581{
582 send_sig(SIGINT, worker->task, 1);
583 return false;
584}
585
586/*
587 * Iterate the passed in list and call the specific function for each
588 * worker that isn't exiting
589 */
590static bool io_wq_for_each_worker(struct io_wqe *wqe,
591 struct io_wq_nulls_list *list,
592 bool (*func)(struct io_worker *, void *),
593 void *data)
594{
595 struct hlist_nulls_node *n;
596 struct io_worker *worker;
597 bool ret = false;
598
599restart:
600 hlist_nulls_for_each_entry_rcu(worker, n, &list->head, nulls_node) {
601 if (io_worker_get(worker)) {
602 ret = func(worker, data);
603 io_worker_release(worker);
604 if (ret)
605 break;
606 }
607 }
608 if (!ret && get_nulls_value(n) != list->nulls)
609 goto restart;
610 return ret;
611}
612
613void io_wq_cancel_all(struct io_wq *wq)
614{
615 int i;
616
617 set_bit(IO_WQ_BIT_CANCEL, &wq->state);
618
619 /*
620 * Browse both lists, as there's a gap between handing work off
621 * to a worker and the worker putting itself on the busy_list
622 */
623 rcu_read_lock();
624 for (i = 0; i < wq->nr_wqes; i++) {
625 struct io_wqe *wqe = wq->wqes[i];
626
627 io_wq_for_each_worker(wqe, &wqe->busy_list,
628 io_wqe_worker_send_sig, NULL);
629 io_wq_for_each_worker(wqe, &wqe->free_list,
630 io_wqe_worker_send_sig, NULL);
631 }
632 rcu_read_unlock();
633}
634
Jens Axboe62755e32019-10-28 21:49:21 -0600635struct io_cb_cancel_data {
636 struct io_wqe *wqe;
637 work_cancel_fn *cancel;
638 void *caller_data;
639};
640
641static bool io_work_cancel(struct io_worker *worker, void *cancel_data)
642{
643 struct io_cb_cancel_data *data = cancel_data;
644 struct io_wqe *wqe = data->wqe;
645 bool ret = false;
646
647 /*
648 * Hold the lock to avoid ->cur_work going out of scope, caller
649 * may deference the passed in work.
650 */
651 spin_lock_irq(&wqe->lock);
652 if (worker->cur_work &&
653 data->cancel(worker->cur_work, data->caller_data)) {
654 send_sig(SIGINT, worker->task, 1);
655 ret = true;
656 }
657 spin_unlock_irq(&wqe->lock);
658
659 return ret;
660}
661
662static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
663 work_cancel_fn *cancel,
664 void *cancel_data)
665{
666 struct io_cb_cancel_data data = {
667 .wqe = wqe,
668 .cancel = cancel,
669 .caller_data = cancel_data,
670 };
671 struct io_wq_work *work;
672 bool found = false;
673
674 spin_lock_irq(&wqe->lock);
675 list_for_each_entry(work, &wqe->work_list, list) {
676 if (cancel(work, cancel_data)) {
677 list_del(&work->list);
678 found = true;
679 break;
680 }
681 }
682 spin_unlock_irq(&wqe->lock);
683
684 if (found) {
685 work->flags |= IO_WQ_WORK_CANCEL;
686 work->func(&work);
687 return IO_WQ_CANCEL_OK;
688 }
689
690 rcu_read_lock();
691 found = io_wq_for_each_worker(wqe, &wqe->free_list, io_work_cancel,
692 &data);
693 if (found)
694 goto done;
695
696 found = io_wq_for_each_worker(wqe, &wqe->busy_list, io_work_cancel,
697 &data);
698done:
699 rcu_read_unlock();
700 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
701}
702
703enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
704 void *data)
705{
706 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
707 int i;
708
709 for (i = 0; i < wq->nr_wqes; i++) {
710 struct io_wqe *wqe = wq->wqes[i];
711
712 ret = io_wqe_cancel_cb_work(wqe, cancel, data);
713 if (ret != IO_WQ_CANCEL_NOTFOUND)
714 break;
715 }
716
717 return ret;
718}
719
Jens Axboe771b53d02019-10-22 10:25:58 -0600720static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
721{
722 struct io_wq_work *work = data;
723
724 if (worker->cur_work == work) {
725 send_sig(SIGINT, worker->task, 1);
726 return true;
727 }
728
729 return false;
730}
731
732static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
733 struct io_wq_work *cwork)
734{
735 struct io_wq_work *work;
736 bool found = false;
737
738 cwork->flags |= IO_WQ_WORK_CANCEL;
739
740 /*
741 * First check pending list, if we're lucky we can just remove it
742 * from there. CANCEL_OK means that the work is returned as-new,
743 * no completion will be posted for it.
744 */
745 spin_lock_irq(&wqe->lock);
746 list_for_each_entry(work, &wqe->work_list, list) {
747 if (work == cwork) {
748 list_del(&work->list);
749 found = true;
750 break;
751 }
752 }
753 spin_unlock_irq(&wqe->lock);
754
755 if (found) {
756 work->flags |= IO_WQ_WORK_CANCEL;
757 work->func(&work);
758 return IO_WQ_CANCEL_OK;
759 }
760
761 /*
762 * Now check if a free (going busy) or busy worker has the work
763 * currently running. If we find it there, we'll return CANCEL_RUNNING
764 * as an indication that we attempte to signal cancellation. The
765 * completion will run normally in this case.
766 */
767 rcu_read_lock();
768 found = io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_cancel,
769 cwork);
770 if (found)
771 goto done;
772
773 found = io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_cancel,
774 cwork);
775done:
776 rcu_read_unlock();
777 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
778}
779
780enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
781{
782 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
783 int i;
784
785 for (i = 0; i < wq->nr_wqes; i++) {
786 struct io_wqe *wqe = wq->wqes[i];
787
788 ret = io_wqe_cancel_work(wqe, cwork);
789 if (ret != IO_WQ_CANCEL_NOTFOUND)
790 break;
791 }
792
793 return ret;
794}
795
796struct io_wq_flush_data {
797 struct io_wq_work work;
798 struct completion done;
799};
800
801static void io_wq_flush_func(struct io_wq_work **workptr)
802{
803 struct io_wq_work *work = *workptr;
804 struct io_wq_flush_data *data;
805
806 data = container_of(work, struct io_wq_flush_data, work);
807 complete(&data->done);
808}
809
810/*
811 * Doesn't wait for previously queued work to finish. When this completes,
812 * it just means that previously queued work was started.
813 */
814void io_wq_flush(struct io_wq *wq)
815{
816 struct io_wq_flush_data data;
817 int i;
818
819 for (i = 0; i < wq->nr_wqes; i++) {
820 struct io_wqe *wqe = wq->wqes[i];
821
822 init_completion(&data.done);
823 INIT_IO_WORK(&data.work, io_wq_flush_func);
824 io_wqe_enqueue(wqe, &data.work);
825 wait_for_completion(&data.done);
826 }
827}
828
829struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm)
830{
831 int ret = -ENOMEM, i, node;
832 struct io_wq *wq;
833
834 wq = kcalloc(1, sizeof(*wq), GFP_KERNEL);
835 if (!wq)
836 return ERR_PTR(-ENOMEM);
837
838 wq->nr_wqes = num_online_nodes();
839 wq->wqes = kcalloc(wq->nr_wqes, sizeof(struct io_wqe *), GFP_KERNEL);
840 if (!wq->wqes) {
841 kfree(wq);
842 return ERR_PTR(-ENOMEM);
843 }
844
845 i = 0;
846 refcount_set(&wq->refs, wq->nr_wqes);
847 for_each_online_node(node) {
848 struct io_wqe *wqe;
849
850 wqe = kcalloc_node(1, sizeof(struct io_wqe), GFP_KERNEL, node);
851 if (!wqe)
852 break;
853 wq->wqes[i] = wqe;
854 wqe->node = node;
855 wqe->max_workers = concurrency;
856 wqe->node = node;
857 wqe->wq = wq;
858 spin_lock_init(&wqe->lock);
859 INIT_LIST_HEAD(&wqe->work_list);
860 INIT_HLIST_NULLS_HEAD(&wqe->free_list.head, 0);
861 wqe->free_list.nulls = 0;
862 INIT_HLIST_NULLS_HEAD(&wqe->busy_list.head, 1);
863 wqe->busy_list.nulls = 1;
864 atomic_set(&wqe->nr_running, 0);
865
866 i++;
867 }
868
869 init_completion(&wq->done);
870
871 if (i != wq->nr_wqes)
872 goto err;
873
874 /* caller must have already done mmgrab() on this mm */
875 wq->mm = mm;
876
877 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
878 if (!IS_ERR(wq->manager)) {
879 wake_up_process(wq->manager);
880 return wq;
881 }
882
883 ret = PTR_ERR(wq->manager);
884 wq->manager = NULL;
885err:
886 complete(&wq->done);
887 io_wq_destroy(wq);
888 return ERR_PTR(ret);
889}
890
891static bool io_wq_worker_wake(struct io_worker *worker, void *data)
892{
893 wake_up_process(worker->task);
894 return false;
895}
896
897void io_wq_destroy(struct io_wq *wq)
898{
899 int i;
900
901 if (wq->manager) {
902 set_bit(IO_WQ_BIT_EXIT, &wq->state);
903 kthread_stop(wq->manager);
904 }
905
906 rcu_read_lock();
907 for (i = 0; i < wq->nr_wqes; i++) {
908 struct io_wqe *wqe = wq->wqes[i];
909
910 if (!wqe)
911 continue;
912 io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_wake,
913 NULL);
914 io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_wake,
915 NULL);
916 }
917 rcu_read_unlock();
918
919 wait_for_completion(&wq->done);
920
921 for (i = 0; i < wq->nr_wqes; i++)
922 kfree(wq->wqes[i]);
923 kfree(wq->wqes);
924 kfree(wq);
925}