blob: 253c04a40db57f35e4bcf25b8c4193c1755851ac [file] [log] [blame]
Jens Axboe771b53d02019-10-22 10:25:58 -06001// SPDX-License-Identifier: GPL-2.0
2/*
3 * Basic worker thread pool for io_uring
4 *
5 * Copyright (C) 2019 Jens Axboe
6 *
7 */
8#include <linux/kernel.h>
9#include <linux/init.h>
10#include <linux/errno.h>
11#include <linux/sched/signal.h>
12#include <linux/mm.h>
13#include <linux/mmu_context.h>
14#include <linux/sched/mm.h>
15#include <linux/percpu.h>
16#include <linux/slab.h>
17#include <linux/kthread.h>
18#include <linux/rculist_nulls.h>
19
20#include "io-wq.h"
21
22#define WORKER_IDLE_TIMEOUT (5 * HZ)
23
24enum {
25 IO_WORKER_F_UP = 1, /* up and active */
26 IO_WORKER_F_RUNNING = 2, /* account as running */
27 IO_WORKER_F_FREE = 4, /* worker on free list */
28 IO_WORKER_F_EXITING = 8, /* worker exiting */
29 IO_WORKER_F_FIXED = 16, /* static idle worker */
30};
31
32enum {
33 IO_WQ_BIT_EXIT = 0, /* wq exiting */
34 IO_WQ_BIT_CANCEL = 1, /* cancel work on list */
35};
36
37enum {
38 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
39};
40
41/*
42 * One for each thread in a wqe pool
43 */
44struct io_worker {
45 refcount_t ref;
46 unsigned flags;
47 struct hlist_nulls_node nulls_node;
48 struct task_struct *task;
49 wait_queue_head_t wait;
50 struct io_wqe *wqe;
51 struct io_wq_work *cur_work;
52
53 struct rcu_head rcu;
54 struct mm_struct *mm;
Jens Axboefcb323c2019-10-24 12:39:47 -060055 struct files_struct *restore_files;
Jens Axboe771b53d02019-10-22 10:25:58 -060056};
57
58struct io_wq_nulls_list {
59 struct hlist_nulls_head head;
60 unsigned long nulls;
61};
62
63#if BITS_PER_LONG == 64
64#define IO_WQ_HASH_ORDER 6
65#else
66#define IO_WQ_HASH_ORDER 5
67#endif
68
69/*
70 * Per-node worker thread pool
71 */
72struct io_wqe {
73 struct {
74 spinlock_t lock;
75 struct list_head work_list;
76 unsigned long hash_map;
77 unsigned flags;
78 } ____cacheline_aligned_in_smp;
79
80 int node;
81 unsigned nr_workers;
82 unsigned max_workers;
83 atomic_t nr_running;
84
85 struct io_wq_nulls_list free_list;
86 struct io_wq_nulls_list busy_list;
87
88 struct io_wq *wq;
89};
90
91/*
92 * Per io_wq state
93 */
94struct io_wq {
95 struct io_wqe **wqes;
96 unsigned long state;
97 unsigned nr_wqes;
98
99 struct task_struct *manager;
100 struct mm_struct *mm;
101 refcount_t refs;
102 struct completion done;
103};
104
105static void io_wq_free_worker(struct rcu_head *head)
106{
107 struct io_worker *worker = container_of(head, struct io_worker, rcu);
108
109 kfree(worker);
110}
111
112static bool io_worker_get(struct io_worker *worker)
113{
114 return refcount_inc_not_zero(&worker->ref);
115}
116
117static void io_worker_release(struct io_worker *worker)
118{
119 if (refcount_dec_and_test(&worker->ref))
120 wake_up_process(worker->task);
121}
122
123/*
124 * Note: drops the wqe->lock if returning true! The caller must re-acquire
125 * the lock in that case. Some callers need to restart handling if this
126 * happens, so we can't just re-acquire the lock on behalf of the caller.
127 */
128static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
129{
Jens Axboefcb323c2019-10-24 12:39:47 -0600130 bool dropped_lock = false;
131
132 if (current->files != worker->restore_files) {
133 __acquire(&wqe->lock);
134 spin_unlock_irq(&wqe->lock);
135 dropped_lock = true;
136
137 task_lock(current);
138 current->files = worker->restore_files;
139 task_unlock(current);
140 }
141
Jens Axboe771b53d02019-10-22 10:25:58 -0600142 /*
143 * If we have an active mm, we need to drop the wq lock before unusing
144 * it. If we do, return true and let the caller retry the idle loop.
145 */
146 if (worker->mm) {
Jens Axboefcb323c2019-10-24 12:39:47 -0600147 if (!dropped_lock) {
148 __acquire(&wqe->lock);
149 spin_unlock_irq(&wqe->lock);
150 dropped_lock = true;
151 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600152 __set_current_state(TASK_RUNNING);
153 set_fs(KERNEL_DS);
154 unuse_mm(worker->mm);
155 mmput(worker->mm);
156 worker->mm = NULL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600157 }
158
Jens Axboefcb323c2019-10-24 12:39:47 -0600159 return dropped_lock;
Jens Axboe771b53d02019-10-22 10:25:58 -0600160}
161
162static void io_worker_exit(struct io_worker *worker)
163{
164 struct io_wqe *wqe = worker->wqe;
165 bool all_done = false;
166
167 /*
168 * If we're not at zero, someone else is holding a brief reference
169 * to the worker. Wait for that to go away.
170 */
171 set_current_state(TASK_INTERRUPTIBLE);
172 if (!refcount_dec_and_test(&worker->ref))
173 schedule();
174 __set_current_state(TASK_RUNNING);
175
176 preempt_disable();
177 current->flags &= ~PF_IO_WORKER;
178 if (worker->flags & IO_WORKER_F_RUNNING)
179 atomic_dec(&wqe->nr_running);
180 worker->flags = 0;
181 preempt_enable();
182
183 spin_lock_irq(&wqe->lock);
184 hlist_nulls_del_rcu(&worker->nulls_node);
185 if (__io_worker_unuse(wqe, worker)) {
186 __release(&wqe->lock);
187 spin_lock_irq(&wqe->lock);
188 }
189 wqe->nr_workers--;
190 all_done = !wqe->nr_workers;
191 spin_unlock_irq(&wqe->lock);
192
193 /* all workers gone, wq exit can proceed */
194 if (all_done && refcount_dec_and_test(&wqe->wq->refs))
195 complete(&wqe->wq->done);
196
197 call_rcu(&worker->rcu, io_wq_free_worker);
198}
199
200static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
201{
202 allow_kernel_signal(SIGINT);
203
204 current->flags |= PF_IO_WORKER;
205
206 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
Jens Axboefcb323c2019-10-24 12:39:47 -0600207 worker->restore_files = current->files;
Jens Axboe771b53d02019-10-22 10:25:58 -0600208 atomic_inc(&wqe->nr_running);
209}
210
211/*
212 * Worker will start processing some work. Move it to the busy list, if
213 * it's currently on the freelist
214 */
215static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
216 struct io_wq_work *work)
217 __must_hold(wqe->lock)
218{
219 if (worker->flags & IO_WORKER_F_FREE) {
220 worker->flags &= ~IO_WORKER_F_FREE;
221 hlist_nulls_del_init_rcu(&worker->nulls_node);
222 hlist_nulls_add_head_rcu(&worker->nulls_node,
223 &wqe->busy_list.head);
224 }
225 worker->cur_work = work;
226}
227
228/*
229 * No work, worker going to sleep. Move to freelist, and unuse mm if we
230 * have one attached. Dropping the mm may potentially sleep, so we drop
231 * the lock in that case and return success. Since the caller has to
232 * retry the loop in that case (we changed task state), we don't regrab
233 * the lock if we return success.
234 */
235static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
236 __must_hold(wqe->lock)
237{
238 if (!(worker->flags & IO_WORKER_F_FREE)) {
239 worker->flags |= IO_WORKER_F_FREE;
240 hlist_nulls_del_init_rcu(&worker->nulls_node);
241 hlist_nulls_add_head_rcu(&worker->nulls_node,
242 &wqe->free_list.head);
243 }
244
245 return __io_worker_unuse(wqe, worker);
246}
247
248static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
249 __must_hold(wqe->lock)
250{
251 struct io_wq_work *work;
252
253 list_for_each_entry(work, &wqe->work_list, list) {
254 /* not hashed, can run anytime */
255 if (!(work->flags & IO_WQ_WORK_HASHED)) {
256 list_del(&work->list);
257 return work;
258 }
259
260 /* hashed, can run if not already running */
261 *hash = work->flags >> IO_WQ_HASH_SHIFT;
262 if (!(wqe->hash_map & BIT_ULL(*hash))) {
263 wqe->hash_map |= BIT_ULL(*hash);
264 list_del(&work->list);
265 return work;
266 }
267 }
268
269 return NULL;
270}
271
272static void io_worker_handle_work(struct io_worker *worker)
273 __releases(wqe->lock)
274{
275 struct io_wq_work *work, *old_work;
276 struct io_wqe *wqe = worker->wqe;
277 struct io_wq *wq = wqe->wq;
278
279 do {
280 unsigned hash = -1U;
281
282 /*
283 * Signals are either sent to cancel specific work, or to just
284 * cancel all work items. For the former, ->cur_work must
285 * match. ->cur_work is NULL at this point, since we haven't
286 * assigned any work, so it's safe to flush signals for that
287 * case. For the latter case of cancelling all work, the caller
288 * wil have set IO_WQ_BIT_CANCEL.
289 */
290 if (signal_pending(current))
291 flush_signals(current);
292
293 /*
294 * If we got some work, mark us as busy. If we didn't, but
295 * the list isn't empty, it means we stalled on hashed work.
296 * Mark us stalled so we don't keep looking for work when we
297 * can't make progress, any work completion or insertion will
298 * clear the stalled flag.
299 */
300 work = io_get_next_work(wqe, &hash);
301 if (work)
302 __io_worker_busy(wqe, worker, work);
303 else if (!list_empty(&wqe->work_list))
304 wqe->flags |= IO_WQE_FLAG_STALLED;
305
306 spin_unlock_irq(&wqe->lock);
307 if (!work)
308 break;
309next:
Jens Axboefcb323c2019-10-24 12:39:47 -0600310 if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
311 current->files != work->files) {
312 task_lock(current);
313 current->files = work->files;
314 task_unlock(current);
315 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600316 if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
317 wq->mm && mmget_not_zero(wq->mm)) {
318 use_mm(wq->mm);
319 set_fs(USER_DS);
320 worker->mm = wq->mm;
321 }
322 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
323 work->flags |= IO_WQ_WORK_CANCEL;
324 if (worker->mm)
325 work->flags |= IO_WQ_WORK_HAS_MM;
326
327 old_work = work;
328 work->func(&work);
329
330 spin_lock_irq(&wqe->lock);
331 worker->cur_work = NULL;
332 if (hash != -1U) {
333 wqe->hash_map &= ~BIT_ULL(hash);
334 wqe->flags &= ~IO_WQE_FLAG_STALLED;
335 }
336 if (work && work != old_work) {
337 spin_unlock_irq(&wqe->lock);
338 /* dependent work not hashed */
339 hash = -1U;
340 goto next;
341 }
342 } while (1);
343}
344
345static inline bool io_wqe_run_queue(struct io_wqe *wqe)
346 __must_hold(wqe->lock)
347{
348 if (!list_empty_careful(&wqe->work_list) &&
349 !(wqe->flags & IO_WQE_FLAG_STALLED))
350 return true;
351 return false;
352}
353
354static int io_wqe_worker(void *data)
355{
356 struct io_worker *worker = data;
357 struct io_wqe *wqe = worker->wqe;
358 struct io_wq *wq = wqe->wq;
359 DEFINE_WAIT(wait);
360
361 io_worker_start(wqe, worker);
362
363 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
364 prepare_to_wait(&worker->wait, &wait, TASK_INTERRUPTIBLE);
365
366 spin_lock_irq(&wqe->lock);
367 if (io_wqe_run_queue(wqe)) {
368 __set_current_state(TASK_RUNNING);
369 io_worker_handle_work(worker);
370 continue;
371 }
372 /* drops the lock on success, retry */
373 if (__io_worker_idle(wqe, worker)) {
374 __release(&wqe->lock);
375 continue;
376 }
377 spin_unlock_irq(&wqe->lock);
378 if (signal_pending(current))
379 flush_signals(current);
380 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
381 continue;
382 /* timed out, exit unless we're the fixed worker */
383 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
384 !(worker->flags & IO_WORKER_F_FIXED))
385 break;
386 }
387
388 finish_wait(&worker->wait, &wait);
389
390 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
391 spin_lock_irq(&wqe->lock);
392 if (!list_empty(&wqe->work_list))
393 io_worker_handle_work(worker);
394 else
395 spin_unlock_irq(&wqe->lock);
396 }
397
398 io_worker_exit(worker);
399 return 0;
400}
401
402/*
403 * Check head of free list for an available worker. If one isn't available,
404 * caller must wake up the wq manager to create one.
405 */
406static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
407 __must_hold(RCU)
408{
409 struct hlist_nulls_node *n;
410 struct io_worker *worker;
411
412 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list.head));
413 if (is_a_nulls(n))
414 return false;
415
416 worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
417 if (io_worker_get(worker)) {
418 wake_up(&worker->wait);
419 io_worker_release(worker);
420 return true;
421 }
422
423 return false;
424}
425
426/*
427 * We need a worker. If we find a free one, we're good. If not, and we're
428 * below the max number of workers, wake up the manager to create one.
429 */
430static void io_wqe_wake_worker(struct io_wqe *wqe)
431{
432 bool ret;
433
434 rcu_read_lock();
435 ret = io_wqe_activate_free_worker(wqe);
436 rcu_read_unlock();
437
438 if (!ret && wqe->nr_workers < wqe->max_workers)
439 wake_up_process(wqe->wq->manager);
440}
441
442/*
443 * Called when a worker is scheduled in. Mark us as currently running.
444 */
445void io_wq_worker_running(struct task_struct *tsk)
446{
447 struct io_worker *worker = kthread_data(tsk);
448 struct io_wqe *wqe = worker->wqe;
449
450 if (!(worker->flags & IO_WORKER_F_UP))
451 return;
452 if (worker->flags & IO_WORKER_F_RUNNING)
453 return;
454 worker->flags |= IO_WORKER_F_RUNNING;
455 atomic_inc(&wqe->nr_running);
456}
457
458/*
459 * Called when worker is going to sleep. If there are no workers currently
460 * running and we have work pending, wake up a free one or have the manager
461 * set one up.
462 */
463void io_wq_worker_sleeping(struct task_struct *tsk)
464{
465 struct io_worker *worker = kthread_data(tsk);
466 struct io_wqe *wqe = worker->wqe;
467
468 if (!(worker->flags & IO_WORKER_F_UP))
469 return;
470 if (!(worker->flags & IO_WORKER_F_RUNNING))
471 return;
472
473 worker->flags &= ~IO_WORKER_F_RUNNING;
474
475 spin_lock_irq(&wqe->lock);
476 if (atomic_dec_and_test(&wqe->nr_running) && io_wqe_run_queue(wqe))
477 io_wqe_wake_worker(wqe);
478 spin_unlock_irq(&wqe->lock);
479}
480
481static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe)
482{
483 struct io_worker *worker;
484
485 worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node);
486 if (!worker)
487 return;
488
489 refcount_set(&worker->ref, 1);
490 worker->nulls_node.pprev = NULL;
491 init_waitqueue_head(&worker->wait);
492 worker->wqe = wqe;
493
494 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
495 "io_wqe_worker-%d", wqe->node);
496 if (IS_ERR(worker->task)) {
497 kfree(worker);
498 return;
499 }
500
501 spin_lock_irq(&wqe->lock);
502 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list.head);
503 worker->flags |= IO_WORKER_F_FREE;
504 if (!wqe->nr_workers)
505 worker->flags |= IO_WORKER_F_FIXED;
506 wqe->nr_workers++;
507 spin_unlock_irq(&wqe->lock);
508
509 wake_up_process(worker->task);
510}
511
512static inline bool io_wqe_need_new_worker(struct io_wqe *wqe)
513 __must_hold(wqe->lock)
514{
515 if (!wqe->nr_workers)
516 return true;
517 if (hlist_nulls_empty(&wqe->free_list.head) &&
518 wqe->nr_workers < wqe->max_workers && io_wqe_run_queue(wqe))
519 return true;
520
521 return false;
522}
523
524/*
525 * Manager thread. Tasked with creating new workers, if we need them.
526 */
527static int io_wq_manager(void *data)
528{
529 struct io_wq *wq = data;
530
531 while (!kthread_should_stop()) {
532 int i;
533
534 for (i = 0; i < wq->nr_wqes; i++) {
535 struct io_wqe *wqe = wq->wqes[i];
536 bool fork_worker = false;
537
538 spin_lock_irq(&wqe->lock);
539 fork_worker = io_wqe_need_new_worker(wqe);
540 spin_unlock_irq(&wqe->lock);
541 if (fork_worker)
542 create_io_worker(wq, wqe);
543 }
544 set_current_state(TASK_INTERRUPTIBLE);
545 schedule_timeout(HZ);
546 }
547
548 return 0;
549}
550
551static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
552{
553 unsigned long flags;
554
555 spin_lock_irqsave(&wqe->lock, flags);
556 list_add_tail(&work->list, &wqe->work_list);
557 wqe->flags &= ~IO_WQE_FLAG_STALLED;
558 spin_unlock_irqrestore(&wqe->lock, flags);
559
560 if (!atomic_read(&wqe->nr_running))
561 io_wqe_wake_worker(wqe);
562}
563
564void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
565{
566 struct io_wqe *wqe = wq->wqes[numa_node_id()];
567
568 io_wqe_enqueue(wqe, work);
569}
570
571/*
572 * Enqueue work, hashed by some key. Work items that hash to the same value
573 * will not be done in parallel. Used to limit concurrent writes, generally
574 * hashed by inode.
575 */
576void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val)
577{
578 struct io_wqe *wqe = wq->wqes[numa_node_id()];
579 unsigned bit;
580
581
582 bit = hash_ptr(val, IO_WQ_HASH_ORDER);
583 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
584 io_wqe_enqueue(wqe, work);
585}
586
587static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
588{
589 send_sig(SIGINT, worker->task, 1);
590 return false;
591}
592
593/*
594 * Iterate the passed in list and call the specific function for each
595 * worker that isn't exiting
596 */
597static bool io_wq_for_each_worker(struct io_wqe *wqe,
598 struct io_wq_nulls_list *list,
599 bool (*func)(struct io_worker *, void *),
600 void *data)
601{
602 struct hlist_nulls_node *n;
603 struct io_worker *worker;
604 bool ret = false;
605
606restart:
607 hlist_nulls_for_each_entry_rcu(worker, n, &list->head, nulls_node) {
608 if (io_worker_get(worker)) {
609 ret = func(worker, data);
610 io_worker_release(worker);
611 if (ret)
612 break;
613 }
614 }
615 if (!ret && get_nulls_value(n) != list->nulls)
616 goto restart;
617 return ret;
618}
619
620void io_wq_cancel_all(struct io_wq *wq)
621{
622 int i;
623
624 set_bit(IO_WQ_BIT_CANCEL, &wq->state);
625
626 /*
627 * Browse both lists, as there's a gap between handing work off
628 * to a worker and the worker putting itself on the busy_list
629 */
630 rcu_read_lock();
631 for (i = 0; i < wq->nr_wqes; i++) {
632 struct io_wqe *wqe = wq->wqes[i];
633
634 io_wq_for_each_worker(wqe, &wqe->busy_list,
635 io_wqe_worker_send_sig, NULL);
636 io_wq_for_each_worker(wqe, &wqe->free_list,
637 io_wqe_worker_send_sig, NULL);
638 }
639 rcu_read_unlock();
640}
641
642static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
643{
644 struct io_wq_work *work = data;
645
646 if (worker->cur_work == work) {
647 send_sig(SIGINT, worker->task, 1);
648 return true;
649 }
650
651 return false;
652}
653
654static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
655 struct io_wq_work *cwork)
656{
657 struct io_wq_work *work;
658 bool found = false;
659
660 cwork->flags |= IO_WQ_WORK_CANCEL;
661
662 /*
663 * First check pending list, if we're lucky we can just remove it
664 * from there. CANCEL_OK means that the work is returned as-new,
665 * no completion will be posted for it.
666 */
667 spin_lock_irq(&wqe->lock);
668 list_for_each_entry(work, &wqe->work_list, list) {
669 if (work == cwork) {
670 list_del(&work->list);
671 found = true;
672 break;
673 }
674 }
675 spin_unlock_irq(&wqe->lock);
676
677 if (found) {
678 work->flags |= IO_WQ_WORK_CANCEL;
679 work->func(&work);
680 return IO_WQ_CANCEL_OK;
681 }
682
683 /*
684 * Now check if a free (going busy) or busy worker has the work
685 * currently running. If we find it there, we'll return CANCEL_RUNNING
686 * as an indication that we attempte to signal cancellation. The
687 * completion will run normally in this case.
688 */
689 rcu_read_lock();
690 found = io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_cancel,
691 cwork);
692 if (found)
693 goto done;
694
695 found = io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_cancel,
696 cwork);
697done:
698 rcu_read_unlock();
699 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
700}
701
702enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
703{
704 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
705 int i;
706
707 for (i = 0; i < wq->nr_wqes; i++) {
708 struct io_wqe *wqe = wq->wqes[i];
709
710 ret = io_wqe_cancel_work(wqe, cwork);
711 if (ret != IO_WQ_CANCEL_NOTFOUND)
712 break;
713 }
714
715 return ret;
716}
717
718struct io_wq_flush_data {
719 struct io_wq_work work;
720 struct completion done;
721};
722
723static void io_wq_flush_func(struct io_wq_work **workptr)
724{
725 struct io_wq_work *work = *workptr;
726 struct io_wq_flush_data *data;
727
728 data = container_of(work, struct io_wq_flush_data, work);
729 complete(&data->done);
730}
731
732/*
733 * Doesn't wait for previously queued work to finish. When this completes,
734 * it just means that previously queued work was started.
735 */
736void io_wq_flush(struct io_wq *wq)
737{
738 struct io_wq_flush_data data;
739 int i;
740
741 for (i = 0; i < wq->nr_wqes; i++) {
742 struct io_wqe *wqe = wq->wqes[i];
743
744 init_completion(&data.done);
745 INIT_IO_WORK(&data.work, io_wq_flush_func);
746 io_wqe_enqueue(wqe, &data.work);
747 wait_for_completion(&data.done);
748 }
749}
750
751struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm)
752{
753 int ret = -ENOMEM, i, node;
754 struct io_wq *wq;
755
756 wq = kcalloc(1, sizeof(*wq), GFP_KERNEL);
757 if (!wq)
758 return ERR_PTR(-ENOMEM);
759
760 wq->nr_wqes = num_online_nodes();
761 wq->wqes = kcalloc(wq->nr_wqes, sizeof(struct io_wqe *), GFP_KERNEL);
762 if (!wq->wqes) {
763 kfree(wq);
764 return ERR_PTR(-ENOMEM);
765 }
766
767 i = 0;
768 refcount_set(&wq->refs, wq->nr_wqes);
769 for_each_online_node(node) {
770 struct io_wqe *wqe;
771
772 wqe = kcalloc_node(1, sizeof(struct io_wqe), GFP_KERNEL, node);
773 if (!wqe)
774 break;
775 wq->wqes[i] = wqe;
776 wqe->node = node;
777 wqe->max_workers = concurrency;
778 wqe->node = node;
779 wqe->wq = wq;
780 spin_lock_init(&wqe->lock);
781 INIT_LIST_HEAD(&wqe->work_list);
782 INIT_HLIST_NULLS_HEAD(&wqe->free_list.head, 0);
783 wqe->free_list.nulls = 0;
784 INIT_HLIST_NULLS_HEAD(&wqe->busy_list.head, 1);
785 wqe->busy_list.nulls = 1;
786 atomic_set(&wqe->nr_running, 0);
787
788 i++;
789 }
790
791 init_completion(&wq->done);
792
793 if (i != wq->nr_wqes)
794 goto err;
795
796 /* caller must have already done mmgrab() on this mm */
797 wq->mm = mm;
798
799 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
800 if (!IS_ERR(wq->manager)) {
801 wake_up_process(wq->manager);
802 return wq;
803 }
804
805 ret = PTR_ERR(wq->manager);
806 wq->manager = NULL;
807err:
808 complete(&wq->done);
809 io_wq_destroy(wq);
810 return ERR_PTR(ret);
811}
812
813static bool io_wq_worker_wake(struct io_worker *worker, void *data)
814{
815 wake_up_process(worker->task);
816 return false;
817}
818
819void io_wq_destroy(struct io_wq *wq)
820{
821 int i;
822
823 if (wq->manager) {
824 set_bit(IO_WQ_BIT_EXIT, &wq->state);
825 kthread_stop(wq->manager);
826 }
827
828 rcu_read_lock();
829 for (i = 0; i < wq->nr_wqes; i++) {
830 struct io_wqe *wqe = wq->wqes[i];
831
832 if (!wqe)
833 continue;
834 io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_wake,
835 NULL);
836 io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_wake,
837 NULL);
838 }
839 rcu_read_unlock();
840
841 wait_for_completion(&wq->done);
842
843 for (i = 0; i < wq->nr_wqes; i++)
844 kfree(wq->wqes[i]);
845 kfree(wq->wqes);
846 kfree(wq);
847}