blob: 9b32b3c811f5a3a5b4236a8fc0febbf13ac6aede [file] [log] [blame]
Jens Axboe771b53d02019-10-22 10:25:58 -06001// SPDX-License-Identifier: GPL-2.0
2/*
3 * Basic worker thread pool for io_uring
4 *
5 * Copyright (C) 2019 Jens Axboe
6 *
7 */
8#include <linux/kernel.h>
9#include <linux/init.h>
10#include <linux/errno.h>
11#include <linux/sched/signal.h>
12#include <linux/mm.h>
13#include <linux/mmu_context.h>
14#include <linux/sched/mm.h>
15#include <linux/percpu.h>
16#include <linux/slab.h>
17#include <linux/kthread.h>
18#include <linux/rculist_nulls.h>
19
20#include "io-wq.h"
21
22#define WORKER_IDLE_TIMEOUT (5 * HZ)
23
24enum {
25 IO_WORKER_F_UP = 1, /* up and active */
26 IO_WORKER_F_RUNNING = 2, /* account as running */
27 IO_WORKER_F_FREE = 4, /* worker on free list */
28 IO_WORKER_F_EXITING = 8, /* worker exiting */
29 IO_WORKER_F_FIXED = 16, /* static idle worker */
Jens Axboec5def4a2019-11-07 11:41:16 -070030 IO_WORKER_F_BOUND = 32, /* is doing bounded work */
Jens Axboe771b53d02019-10-22 10:25:58 -060031};
32
33enum {
34 IO_WQ_BIT_EXIT = 0, /* wq exiting */
35 IO_WQ_BIT_CANCEL = 1, /* cancel work on list */
Jens Axboeb60fda62019-11-19 08:37:07 -070036 IO_WQ_BIT_ERROR = 2, /* error on setup */
Jens Axboe771b53d02019-10-22 10:25:58 -060037};
38
39enum {
40 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
41};
42
43/*
44 * One for each thread in a wqe pool
45 */
46struct io_worker {
47 refcount_t ref;
48 unsigned flags;
49 struct hlist_nulls_node nulls_node;
Jens Axboee61df662019-11-13 13:54:49 -070050 struct list_head all_list;
Jens Axboe771b53d02019-10-22 10:25:58 -060051 struct task_struct *task;
52 wait_queue_head_t wait;
53 struct io_wqe *wqe;
Jens Axboe36c2f922019-11-13 09:43:34 -070054
Jens Axboe771b53d02019-10-22 10:25:58 -060055 struct io_wq_work *cur_work;
Jens Axboe36c2f922019-11-13 09:43:34 -070056 spinlock_t lock;
Jens Axboe771b53d02019-10-22 10:25:58 -060057
58 struct rcu_head rcu;
59 struct mm_struct *mm;
Jens Axboefcb323c2019-10-24 12:39:47 -060060 struct files_struct *restore_files;
Jens Axboe771b53d02019-10-22 10:25:58 -060061};
62
Jens Axboe771b53d02019-10-22 10:25:58 -060063#if BITS_PER_LONG == 64
64#define IO_WQ_HASH_ORDER 6
65#else
66#define IO_WQ_HASH_ORDER 5
67#endif
68
Jens Axboec5def4a2019-11-07 11:41:16 -070069struct io_wqe_acct {
70 unsigned nr_workers;
71 unsigned max_workers;
72 atomic_t nr_running;
73};
74
75enum {
76 IO_WQ_ACCT_BOUND,
77 IO_WQ_ACCT_UNBOUND,
78};
79
Jens Axboe771b53d02019-10-22 10:25:58 -060080/*
81 * Per-node worker thread pool
82 */
83struct io_wqe {
84 struct {
85 spinlock_t lock;
86 struct list_head work_list;
87 unsigned long hash_map;
88 unsigned flags;
89 } ____cacheline_aligned_in_smp;
90
91 int node;
Jens Axboec5def4a2019-11-07 11:41:16 -070092 struct io_wqe_acct acct[2];
Jens Axboe771b53d02019-10-22 10:25:58 -060093
Jens Axboe021d1cd2019-11-14 08:00:41 -070094 struct hlist_nulls_head free_list;
95 struct hlist_nulls_head busy_list;
Jens Axboee61df662019-11-13 13:54:49 -070096 struct list_head all_list;
Jens Axboe771b53d02019-10-22 10:25:58 -060097
98 struct io_wq *wq;
99};
100
101/*
102 * Per io_wq state
103 */
104struct io_wq {
105 struct io_wqe **wqes;
106 unsigned long state;
107 unsigned nr_wqes;
108
Jens Axboe7d723062019-11-12 22:31:31 -0700109 get_work_fn *get_work;
110 put_work_fn *put_work;
111
Jens Axboe771b53d02019-10-22 10:25:58 -0600112 struct task_struct *manager;
Jens Axboec5def4a2019-11-07 11:41:16 -0700113 struct user_struct *user;
Jens Axboe771b53d02019-10-22 10:25:58 -0600114 struct mm_struct *mm;
115 refcount_t refs;
116 struct completion done;
117};
118
Jens Axboe771b53d02019-10-22 10:25:58 -0600119static bool io_worker_get(struct io_worker *worker)
120{
121 return refcount_inc_not_zero(&worker->ref);
122}
123
124static void io_worker_release(struct io_worker *worker)
125{
126 if (refcount_dec_and_test(&worker->ref))
127 wake_up_process(worker->task);
128}
129
130/*
131 * Note: drops the wqe->lock if returning true! The caller must re-acquire
132 * the lock in that case. Some callers need to restart handling if this
133 * happens, so we can't just re-acquire the lock on behalf of the caller.
134 */
135static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
136{
Jens Axboefcb323c2019-10-24 12:39:47 -0600137 bool dropped_lock = false;
138
139 if (current->files != worker->restore_files) {
140 __acquire(&wqe->lock);
141 spin_unlock_irq(&wqe->lock);
142 dropped_lock = true;
143
144 task_lock(current);
145 current->files = worker->restore_files;
146 task_unlock(current);
147 }
148
Jens Axboe771b53d02019-10-22 10:25:58 -0600149 /*
150 * If we have an active mm, we need to drop the wq lock before unusing
151 * it. If we do, return true and let the caller retry the idle loop.
152 */
153 if (worker->mm) {
Jens Axboefcb323c2019-10-24 12:39:47 -0600154 if (!dropped_lock) {
155 __acquire(&wqe->lock);
156 spin_unlock_irq(&wqe->lock);
157 dropped_lock = true;
158 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600159 __set_current_state(TASK_RUNNING);
160 set_fs(KERNEL_DS);
161 unuse_mm(worker->mm);
162 mmput(worker->mm);
163 worker->mm = NULL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600164 }
165
Jens Axboefcb323c2019-10-24 12:39:47 -0600166 return dropped_lock;
Jens Axboe771b53d02019-10-22 10:25:58 -0600167}
168
Jens Axboec5def4a2019-11-07 11:41:16 -0700169static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
170 struct io_wq_work *work)
171{
172 if (work->flags & IO_WQ_WORK_UNBOUND)
173 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
174
175 return &wqe->acct[IO_WQ_ACCT_BOUND];
176}
177
178static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
179 struct io_worker *worker)
180{
181 if (worker->flags & IO_WORKER_F_BOUND)
182 return &wqe->acct[IO_WQ_ACCT_BOUND];
183
184 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
185}
186
Jens Axboe771b53d02019-10-22 10:25:58 -0600187static void io_worker_exit(struct io_worker *worker)
188{
189 struct io_wqe *wqe = worker->wqe;
Jens Axboec5def4a2019-11-07 11:41:16 -0700190 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
191 unsigned nr_workers;
Jens Axboe771b53d02019-10-22 10:25:58 -0600192
193 /*
194 * If we're not at zero, someone else is holding a brief reference
195 * to the worker. Wait for that to go away.
196 */
197 set_current_state(TASK_INTERRUPTIBLE);
198 if (!refcount_dec_and_test(&worker->ref))
199 schedule();
200 __set_current_state(TASK_RUNNING);
201
202 preempt_disable();
203 current->flags &= ~PF_IO_WORKER;
204 if (worker->flags & IO_WORKER_F_RUNNING)
Jens Axboec5def4a2019-11-07 11:41:16 -0700205 atomic_dec(&acct->nr_running);
206 if (!(worker->flags & IO_WORKER_F_BOUND))
207 atomic_dec(&wqe->wq->user->processes);
Jens Axboe771b53d02019-10-22 10:25:58 -0600208 worker->flags = 0;
209 preempt_enable();
210
211 spin_lock_irq(&wqe->lock);
212 hlist_nulls_del_rcu(&worker->nulls_node);
Jens Axboee61df662019-11-13 13:54:49 -0700213 list_del_rcu(&worker->all_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600214 if (__io_worker_unuse(wqe, worker)) {
215 __release(&wqe->lock);
216 spin_lock_irq(&wqe->lock);
217 }
Jens Axboec5def4a2019-11-07 11:41:16 -0700218 acct->nr_workers--;
219 nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers +
220 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers;
Jens Axboe771b53d02019-10-22 10:25:58 -0600221 spin_unlock_irq(&wqe->lock);
222
223 /* all workers gone, wq exit can proceed */
Jens Axboec5def4a2019-11-07 11:41:16 -0700224 if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs))
Jens Axboe771b53d02019-10-22 10:25:58 -0600225 complete(&wqe->wq->done);
226
YueHaibing364b05f2019-11-02 15:55:01 +0800227 kfree_rcu(worker, rcu);
Jens Axboe771b53d02019-10-22 10:25:58 -0600228}
229
Jens Axboec5def4a2019-11-07 11:41:16 -0700230static inline bool io_wqe_run_queue(struct io_wqe *wqe)
231 __must_hold(wqe->lock)
232{
233 if (!list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED))
234 return true;
235 return false;
236}
237
238/*
239 * Check head of free list for an available worker. If one isn't available,
240 * caller must wake up the wq manager to create one.
241 */
242static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
243 __must_hold(RCU)
244{
245 struct hlist_nulls_node *n;
246 struct io_worker *worker;
247
Jens Axboe021d1cd2019-11-14 08:00:41 -0700248 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
Jens Axboec5def4a2019-11-07 11:41:16 -0700249 if (is_a_nulls(n))
250 return false;
251
252 worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
253 if (io_worker_get(worker)) {
254 wake_up(&worker->wait);
255 io_worker_release(worker);
256 return true;
257 }
258
259 return false;
260}
261
262/*
263 * We need a worker. If we find a free one, we're good. If not, and we're
264 * below the max number of workers, wake up the manager to create one.
265 */
266static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
267{
268 bool ret;
269
270 /*
271 * Most likely an attempt to queue unbounded work on an io_wq that
272 * wasn't setup with any unbounded workers.
273 */
274 WARN_ON_ONCE(!acct->max_workers);
275
276 rcu_read_lock();
277 ret = io_wqe_activate_free_worker(wqe);
278 rcu_read_unlock();
279
280 if (!ret && acct->nr_workers < acct->max_workers)
281 wake_up_process(wqe->wq->manager);
282}
283
284static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
285{
286 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
287
288 atomic_inc(&acct->nr_running);
289}
290
291static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
292 __must_hold(wqe->lock)
293{
294 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
295
296 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
297 io_wqe_wake_worker(wqe, acct);
298}
299
Jens Axboe771b53d02019-10-22 10:25:58 -0600300static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
301{
302 allow_kernel_signal(SIGINT);
303
304 current->flags |= PF_IO_WORKER;
305
306 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
Jens Axboefcb323c2019-10-24 12:39:47 -0600307 worker->restore_files = current->files;
Jens Axboec5def4a2019-11-07 11:41:16 -0700308 io_wqe_inc_running(wqe, worker);
Jens Axboe771b53d02019-10-22 10:25:58 -0600309}
310
311/*
312 * Worker will start processing some work. Move it to the busy list, if
313 * it's currently on the freelist
314 */
315static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
316 struct io_wq_work *work)
317 __must_hold(wqe->lock)
318{
Jens Axboec5def4a2019-11-07 11:41:16 -0700319 bool worker_bound, work_bound;
320
Jens Axboe771b53d02019-10-22 10:25:58 -0600321 if (worker->flags & IO_WORKER_F_FREE) {
322 worker->flags &= ~IO_WORKER_F_FREE;
323 hlist_nulls_del_init_rcu(&worker->nulls_node);
Jens Axboe021d1cd2019-11-14 08:00:41 -0700324 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->busy_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600325 }
Jens Axboec5def4a2019-11-07 11:41:16 -0700326
327 /*
328 * If worker is moving from bound to unbound (or vice versa), then
329 * ensure we update the running accounting.
330 */
Dan Carpenterb2e9c7d62019-11-19 09:22:16 +0300331 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
332 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
333 if (worker_bound != work_bound) {
Jens Axboec5def4a2019-11-07 11:41:16 -0700334 io_wqe_dec_running(wqe, worker);
335 if (work_bound) {
336 worker->flags |= IO_WORKER_F_BOUND;
337 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
338 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
339 atomic_dec(&wqe->wq->user->processes);
340 } else {
341 worker->flags &= ~IO_WORKER_F_BOUND;
342 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
343 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
344 atomic_inc(&wqe->wq->user->processes);
345 }
346 io_wqe_inc_running(wqe, worker);
347 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600348}
349
350/*
351 * No work, worker going to sleep. Move to freelist, and unuse mm if we
352 * have one attached. Dropping the mm may potentially sleep, so we drop
353 * the lock in that case and return success. Since the caller has to
354 * retry the loop in that case (we changed task state), we don't regrab
355 * the lock if we return success.
356 */
357static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
358 __must_hold(wqe->lock)
359{
360 if (!(worker->flags & IO_WORKER_F_FREE)) {
361 worker->flags |= IO_WORKER_F_FREE;
362 hlist_nulls_del_init_rcu(&worker->nulls_node);
Jens Axboe021d1cd2019-11-14 08:00:41 -0700363 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600364 }
365
366 return __io_worker_unuse(wqe, worker);
367}
368
369static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
370 __must_hold(wqe->lock)
371{
372 struct io_wq_work *work;
373
374 list_for_each_entry(work, &wqe->work_list, list) {
375 /* not hashed, can run anytime */
376 if (!(work->flags & IO_WQ_WORK_HASHED)) {
377 list_del(&work->list);
378 return work;
379 }
380
381 /* hashed, can run if not already running */
382 *hash = work->flags >> IO_WQ_HASH_SHIFT;
383 if (!(wqe->hash_map & BIT_ULL(*hash))) {
384 wqe->hash_map |= BIT_ULL(*hash);
385 list_del(&work->list);
386 return work;
387 }
388 }
389
390 return NULL;
391}
392
393static void io_worker_handle_work(struct io_worker *worker)
394 __releases(wqe->lock)
395{
Jens Axboe7d723062019-11-12 22:31:31 -0700396 struct io_wq_work *work, *old_work = NULL, *put_work = NULL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600397 struct io_wqe *wqe = worker->wqe;
398 struct io_wq *wq = wqe->wq;
399
400 do {
401 unsigned hash = -1U;
402
403 /*
Jens Axboe771b53d02019-10-22 10:25:58 -0600404 * If we got some work, mark us as busy. If we didn't, but
405 * the list isn't empty, it means we stalled on hashed work.
406 * Mark us stalled so we don't keep looking for work when we
407 * can't make progress, any work completion or insertion will
408 * clear the stalled flag.
409 */
410 work = io_get_next_work(wqe, &hash);
411 if (work)
412 __io_worker_busy(wqe, worker, work);
413 else if (!list_empty(&wqe->work_list))
414 wqe->flags |= IO_WQE_FLAG_STALLED;
415
416 spin_unlock_irq(&wqe->lock);
Jens Axboe7d723062019-11-12 22:31:31 -0700417 if (put_work && wq->put_work)
418 wq->put_work(old_work);
Jens Axboe771b53d02019-10-22 10:25:58 -0600419 if (!work)
420 break;
421next:
Jens Axboe36c2f922019-11-13 09:43:34 -0700422 /* flush any pending signals before assigning new work */
423 if (signal_pending(current))
424 flush_signals(current);
425
426 spin_lock_irq(&worker->lock);
427 worker->cur_work = work;
428 spin_unlock_irq(&worker->lock);
429
Jens Axboeb76da702019-11-20 13:05:32 -0700430 if (work->flags & IO_WQ_WORK_CB)
431 work->func(&work);
432
Jens Axboefcb323c2019-10-24 12:39:47 -0600433 if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
434 current->files != work->files) {
435 task_lock(current);
436 current->files = work->files;
437 task_unlock(current);
438 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600439 if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
440 wq->mm && mmget_not_zero(wq->mm)) {
441 use_mm(wq->mm);
442 set_fs(USER_DS);
443 worker->mm = wq->mm;
444 }
445 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
446 work->flags |= IO_WQ_WORK_CANCEL;
447 if (worker->mm)
448 work->flags |= IO_WQ_WORK_HAS_MM;
449
Jens Axboe7d723062019-11-12 22:31:31 -0700450 if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) {
451 put_work = work;
452 wq->get_work(work);
453 }
454
Jens Axboe771b53d02019-10-22 10:25:58 -0600455 old_work = work;
456 work->func(&work);
457
Jens Axboe36c2f922019-11-13 09:43:34 -0700458 spin_lock_irq(&worker->lock);
Jens Axboe771b53d02019-10-22 10:25:58 -0600459 worker->cur_work = NULL;
Jens Axboe36c2f922019-11-13 09:43:34 -0700460 spin_unlock_irq(&worker->lock);
461
462 spin_lock_irq(&wqe->lock);
463
Jens Axboe771b53d02019-10-22 10:25:58 -0600464 if (hash != -1U) {
465 wqe->hash_map &= ~BIT_ULL(hash);
466 wqe->flags &= ~IO_WQE_FLAG_STALLED;
467 }
468 if (work && work != old_work) {
469 spin_unlock_irq(&wqe->lock);
Jens Axboe7d723062019-11-12 22:31:31 -0700470
471 if (put_work && wq->put_work) {
472 wq->put_work(put_work);
473 put_work = NULL;
474 }
475
Jens Axboe771b53d02019-10-22 10:25:58 -0600476 /* dependent work not hashed */
477 hash = -1U;
478 goto next;
479 }
480 } while (1);
481}
482
Jens Axboe771b53d02019-10-22 10:25:58 -0600483static int io_wqe_worker(void *data)
484{
485 struct io_worker *worker = data;
486 struct io_wqe *wqe = worker->wqe;
487 struct io_wq *wq = wqe->wq;
488 DEFINE_WAIT(wait);
489
490 io_worker_start(wqe, worker);
491
492 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
493 prepare_to_wait(&worker->wait, &wait, TASK_INTERRUPTIBLE);
494
495 spin_lock_irq(&wqe->lock);
496 if (io_wqe_run_queue(wqe)) {
497 __set_current_state(TASK_RUNNING);
498 io_worker_handle_work(worker);
499 continue;
500 }
501 /* drops the lock on success, retry */
502 if (__io_worker_idle(wqe, worker)) {
503 __release(&wqe->lock);
504 continue;
505 }
506 spin_unlock_irq(&wqe->lock);
507 if (signal_pending(current))
508 flush_signals(current);
509 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
510 continue;
511 /* timed out, exit unless we're the fixed worker */
512 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
513 !(worker->flags & IO_WORKER_F_FIXED))
514 break;
515 }
516
517 finish_wait(&worker->wait, &wait);
518
519 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
520 spin_lock_irq(&wqe->lock);
521 if (!list_empty(&wqe->work_list))
522 io_worker_handle_work(worker);
523 else
524 spin_unlock_irq(&wqe->lock);
525 }
526
527 io_worker_exit(worker);
528 return 0;
529}
530
531/*
Jens Axboe771b53d02019-10-22 10:25:58 -0600532 * Called when a worker is scheduled in. Mark us as currently running.
533 */
534void io_wq_worker_running(struct task_struct *tsk)
535{
536 struct io_worker *worker = kthread_data(tsk);
537 struct io_wqe *wqe = worker->wqe;
538
539 if (!(worker->flags & IO_WORKER_F_UP))
540 return;
541 if (worker->flags & IO_WORKER_F_RUNNING)
542 return;
543 worker->flags |= IO_WORKER_F_RUNNING;
Jens Axboec5def4a2019-11-07 11:41:16 -0700544 io_wqe_inc_running(wqe, worker);
Jens Axboe771b53d02019-10-22 10:25:58 -0600545}
546
547/*
548 * Called when worker is going to sleep. If there are no workers currently
549 * running and we have work pending, wake up a free one or have the manager
550 * set one up.
551 */
552void io_wq_worker_sleeping(struct task_struct *tsk)
553{
554 struct io_worker *worker = kthread_data(tsk);
555 struct io_wqe *wqe = worker->wqe;
556
557 if (!(worker->flags & IO_WORKER_F_UP))
558 return;
559 if (!(worker->flags & IO_WORKER_F_RUNNING))
560 return;
561
562 worker->flags &= ~IO_WORKER_F_RUNNING;
563
564 spin_lock_irq(&wqe->lock);
Jens Axboec5def4a2019-11-07 11:41:16 -0700565 io_wqe_dec_running(wqe, worker);
Jens Axboe771b53d02019-10-22 10:25:58 -0600566 spin_unlock_irq(&wqe->lock);
567}
568
Jens Axboeb60fda62019-11-19 08:37:07 -0700569static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
Jens Axboe771b53d02019-10-22 10:25:58 -0600570{
Jens Axboec5def4a2019-11-07 11:41:16 -0700571 struct io_wqe_acct *acct =&wqe->acct[index];
Jens Axboe771b53d02019-10-22 10:25:58 -0600572 struct io_worker *worker;
573
574 worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node);
575 if (!worker)
Jens Axboeb60fda62019-11-19 08:37:07 -0700576 return false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600577
578 refcount_set(&worker->ref, 1);
579 worker->nulls_node.pprev = NULL;
580 init_waitqueue_head(&worker->wait);
581 worker->wqe = wqe;
Jens Axboe36c2f922019-11-13 09:43:34 -0700582 spin_lock_init(&worker->lock);
Jens Axboe771b53d02019-10-22 10:25:58 -0600583
584 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
Jens Axboec5def4a2019-11-07 11:41:16 -0700585 "io_wqe_worker-%d/%d", index, wqe->node);
Jens Axboe771b53d02019-10-22 10:25:58 -0600586 if (IS_ERR(worker->task)) {
587 kfree(worker);
Jens Axboeb60fda62019-11-19 08:37:07 -0700588 return false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600589 }
590
591 spin_lock_irq(&wqe->lock);
Jens Axboe021d1cd2019-11-14 08:00:41 -0700592 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
Jens Axboee61df662019-11-13 13:54:49 -0700593 list_add_tail_rcu(&worker->all_list, &wqe->all_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600594 worker->flags |= IO_WORKER_F_FREE;
Jens Axboec5def4a2019-11-07 11:41:16 -0700595 if (index == IO_WQ_ACCT_BOUND)
596 worker->flags |= IO_WORKER_F_BOUND;
597 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
Jens Axboe771b53d02019-10-22 10:25:58 -0600598 worker->flags |= IO_WORKER_F_FIXED;
Jens Axboec5def4a2019-11-07 11:41:16 -0700599 acct->nr_workers++;
Jens Axboe771b53d02019-10-22 10:25:58 -0600600 spin_unlock_irq(&wqe->lock);
601
Jens Axboec5def4a2019-11-07 11:41:16 -0700602 if (index == IO_WQ_ACCT_UNBOUND)
603 atomic_inc(&wq->user->processes);
604
Jens Axboe771b53d02019-10-22 10:25:58 -0600605 wake_up_process(worker->task);
Jens Axboeb60fda62019-11-19 08:37:07 -0700606 return true;
Jens Axboe771b53d02019-10-22 10:25:58 -0600607}
608
Jens Axboec5def4a2019-11-07 11:41:16 -0700609static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
Jens Axboe771b53d02019-10-22 10:25:58 -0600610 __must_hold(wqe->lock)
611{
Jens Axboec5def4a2019-11-07 11:41:16 -0700612 struct io_wqe_acct *acct = &wqe->acct[index];
Jens Axboe771b53d02019-10-22 10:25:58 -0600613
Jens Axboec5def4a2019-11-07 11:41:16 -0700614 /* if we have available workers or no work, no need */
Jens Axboe021d1cd2019-11-14 08:00:41 -0700615 if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
Jens Axboec5def4a2019-11-07 11:41:16 -0700616 return false;
617 return acct->nr_workers < acct->max_workers;
Jens Axboe771b53d02019-10-22 10:25:58 -0600618}
619
620/*
621 * Manager thread. Tasked with creating new workers, if we need them.
622 */
623static int io_wq_manager(void *data)
624{
625 struct io_wq *wq = data;
Jens Axboeb60fda62019-11-19 08:37:07 -0700626 int i;
627
628 /* create fixed workers */
629 refcount_set(&wq->refs, wq->nr_wqes);
630 for (i = 0; i < wq->nr_wqes; i++) {
631 if (create_io_worker(wq, wq->wqes[i], IO_WQ_ACCT_BOUND))
632 continue;
633 goto err;
634 }
635
636 complete(&wq->done);
Jens Axboe771b53d02019-10-22 10:25:58 -0600637
638 while (!kthread_should_stop()) {
Jens Axboe771b53d02019-10-22 10:25:58 -0600639 for (i = 0; i < wq->nr_wqes; i++) {
640 struct io_wqe *wqe = wq->wqes[i];
Jens Axboec5def4a2019-11-07 11:41:16 -0700641 bool fork_worker[2] = { false, false };
Jens Axboe771b53d02019-10-22 10:25:58 -0600642
643 spin_lock_irq(&wqe->lock);
Jens Axboec5def4a2019-11-07 11:41:16 -0700644 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
645 fork_worker[IO_WQ_ACCT_BOUND] = true;
646 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
647 fork_worker[IO_WQ_ACCT_UNBOUND] = true;
Jens Axboe771b53d02019-10-22 10:25:58 -0600648 spin_unlock_irq(&wqe->lock);
Jens Axboec5def4a2019-11-07 11:41:16 -0700649 if (fork_worker[IO_WQ_ACCT_BOUND])
650 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
651 if (fork_worker[IO_WQ_ACCT_UNBOUND])
652 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
Jens Axboe771b53d02019-10-22 10:25:58 -0600653 }
654 set_current_state(TASK_INTERRUPTIBLE);
655 schedule_timeout(HZ);
656 }
657
658 return 0;
Jens Axboeb60fda62019-11-19 08:37:07 -0700659err:
660 set_bit(IO_WQ_BIT_ERROR, &wq->state);
661 set_bit(IO_WQ_BIT_EXIT, &wq->state);
662 if (refcount_sub_and_test(wq->nr_wqes - i, &wq->refs))
663 complete(&wq->done);
664 return 0;
Jens Axboe771b53d02019-10-22 10:25:58 -0600665}
666
Jens Axboec5def4a2019-11-07 11:41:16 -0700667static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
668 struct io_wq_work *work)
669{
670 bool free_worker;
671
672 if (!(work->flags & IO_WQ_WORK_UNBOUND))
673 return true;
674 if (atomic_read(&acct->nr_running))
675 return true;
676
677 rcu_read_lock();
Jens Axboe021d1cd2019-11-14 08:00:41 -0700678 free_worker = !hlist_nulls_empty(&wqe->free_list);
Jens Axboec5def4a2019-11-07 11:41:16 -0700679 rcu_read_unlock();
680 if (free_worker)
681 return true;
682
683 if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
684 !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
685 return false;
686
687 return true;
688}
689
Jens Axboe771b53d02019-10-22 10:25:58 -0600690static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
691{
Jens Axboec5def4a2019-11-07 11:41:16 -0700692 struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
Jens Axboe771b53d02019-10-22 10:25:58 -0600693 unsigned long flags;
694
Jens Axboec5def4a2019-11-07 11:41:16 -0700695 /*
696 * Do early check to see if we need a new unbound worker, and if we do,
697 * if we're allowed to do so. This isn't 100% accurate as there's a
698 * gap between this check and incrementing the value, but that's OK.
699 * It's close enough to not be an issue, fork() has the same delay.
700 */
701 if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
702 work->flags |= IO_WQ_WORK_CANCEL;
703 work->func(&work);
704 return;
705 }
706
Jens Axboe771b53d02019-10-22 10:25:58 -0600707 spin_lock_irqsave(&wqe->lock, flags);
708 list_add_tail(&work->list, &wqe->work_list);
709 wqe->flags &= ~IO_WQE_FLAG_STALLED;
710 spin_unlock_irqrestore(&wqe->lock, flags);
711
Jens Axboec5def4a2019-11-07 11:41:16 -0700712 if (!atomic_read(&acct->nr_running))
713 io_wqe_wake_worker(wqe, acct);
Jens Axboe771b53d02019-10-22 10:25:58 -0600714}
715
716void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
717{
718 struct io_wqe *wqe = wq->wqes[numa_node_id()];
719
720 io_wqe_enqueue(wqe, work);
721}
722
723/*
724 * Enqueue work, hashed by some key. Work items that hash to the same value
725 * will not be done in parallel. Used to limit concurrent writes, generally
726 * hashed by inode.
727 */
728void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val)
729{
730 struct io_wqe *wqe = wq->wqes[numa_node_id()];
731 unsigned bit;
732
733
734 bit = hash_ptr(val, IO_WQ_HASH_ORDER);
735 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
736 io_wqe_enqueue(wqe, work);
737}
738
739static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
740{
741 send_sig(SIGINT, worker->task, 1);
742 return false;
743}
744
745/*
746 * Iterate the passed in list and call the specific function for each
747 * worker that isn't exiting
748 */
749static bool io_wq_for_each_worker(struct io_wqe *wqe,
Jens Axboe771b53d02019-10-22 10:25:58 -0600750 bool (*func)(struct io_worker *, void *),
751 void *data)
752{
Jens Axboe771b53d02019-10-22 10:25:58 -0600753 struct io_worker *worker;
754 bool ret = false;
755
Jens Axboee61df662019-11-13 13:54:49 -0700756 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
Jens Axboe771b53d02019-10-22 10:25:58 -0600757 if (io_worker_get(worker)) {
758 ret = func(worker, data);
759 io_worker_release(worker);
760 if (ret)
761 break;
762 }
763 }
Jens Axboee61df662019-11-13 13:54:49 -0700764
Jens Axboe771b53d02019-10-22 10:25:58 -0600765 return ret;
766}
767
768void io_wq_cancel_all(struct io_wq *wq)
769{
770 int i;
771
772 set_bit(IO_WQ_BIT_CANCEL, &wq->state);
773
774 /*
775 * Browse both lists, as there's a gap between handing work off
776 * to a worker and the worker putting itself on the busy_list
777 */
778 rcu_read_lock();
779 for (i = 0; i < wq->nr_wqes; i++) {
780 struct io_wqe *wqe = wq->wqes[i];
781
Jens Axboee61df662019-11-13 13:54:49 -0700782 io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
Jens Axboe771b53d02019-10-22 10:25:58 -0600783 }
784 rcu_read_unlock();
785}
786
Jens Axboe62755e32019-10-28 21:49:21 -0600787struct io_cb_cancel_data {
788 struct io_wqe *wqe;
789 work_cancel_fn *cancel;
790 void *caller_data;
791};
792
793static bool io_work_cancel(struct io_worker *worker, void *cancel_data)
794{
795 struct io_cb_cancel_data *data = cancel_data;
Jens Axboe6f726532019-11-05 13:51:51 -0700796 unsigned long flags;
Jens Axboe62755e32019-10-28 21:49:21 -0600797 bool ret = false;
798
799 /*
800 * Hold the lock to avoid ->cur_work going out of scope, caller
Jens Axboe36c2f922019-11-13 09:43:34 -0700801 * may dereference the passed in work.
Jens Axboe62755e32019-10-28 21:49:21 -0600802 */
Jens Axboe36c2f922019-11-13 09:43:34 -0700803 spin_lock_irqsave(&worker->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600804 if (worker->cur_work &&
805 data->cancel(worker->cur_work, data->caller_data)) {
806 send_sig(SIGINT, worker->task, 1);
807 ret = true;
808 }
Jens Axboe36c2f922019-11-13 09:43:34 -0700809 spin_unlock_irqrestore(&worker->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600810
811 return ret;
812}
813
814static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
815 work_cancel_fn *cancel,
816 void *cancel_data)
817{
818 struct io_cb_cancel_data data = {
819 .wqe = wqe,
820 .cancel = cancel,
821 .caller_data = cancel_data,
822 };
823 struct io_wq_work *work;
Jens Axboe6f726532019-11-05 13:51:51 -0700824 unsigned long flags;
Jens Axboe62755e32019-10-28 21:49:21 -0600825 bool found = false;
826
Jens Axboe6f726532019-11-05 13:51:51 -0700827 spin_lock_irqsave(&wqe->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600828 list_for_each_entry(work, &wqe->work_list, list) {
829 if (cancel(work, cancel_data)) {
830 list_del(&work->list);
831 found = true;
832 break;
833 }
834 }
Jens Axboe6f726532019-11-05 13:51:51 -0700835 spin_unlock_irqrestore(&wqe->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600836
837 if (found) {
838 work->flags |= IO_WQ_WORK_CANCEL;
839 work->func(&work);
840 return IO_WQ_CANCEL_OK;
841 }
842
843 rcu_read_lock();
Jens Axboee61df662019-11-13 13:54:49 -0700844 found = io_wq_for_each_worker(wqe, io_work_cancel, &data);
Jens Axboe62755e32019-10-28 21:49:21 -0600845 rcu_read_unlock();
846 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
847}
848
849enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
850 void *data)
851{
852 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
853 int i;
854
855 for (i = 0; i < wq->nr_wqes; i++) {
856 struct io_wqe *wqe = wq->wqes[i];
857
858 ret = io_wqe_cancel_cb_work(wqe, cancel, data);
859 if (ret != IO_WQ_CANCEL_NOTFOUND)
860 break;
861 }
862
863 return ret;
864}
865
Jens Axboe771b53d02019-10-22 10:25:58 -0600866static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
867{
868 struct io_wq_work *work = data;
Jens Axboe36c2f922019-11-13 09:43:34 -0700869 unsigned long flags;
870 bool ret = false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600871
Jens Axboe36c2f922019-11-13 09:43:34 -0700872 if (worker->cur_work != work)
873 return false;
874
875 spin_lock_irqsave(&worker->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600876 if (worker->cur_work == work) {
877 send_sig(SIGINT, worker->task, 1);
Jens Axboe36c2f922019-11-13 09:43:34 -0700878 ret = true;
Jens Axboe771b53d02019-10-22 10:25:58 -0600879 }
Jens Axboe36c2f922019-11-13 09:43:34 -0700880 spin_unlock_irqrestore(&worker->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600881
Jens Axboe36c2f922019-11-13 09:43:34 -0700882 return ret;
Jens Axboe771b53d02019-10-22 10:25:58 -0600883}
884
885static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
886 struct io_wq_work *cwork)
887{
888 struct io_wq_work *work;
Jens Axboe6f726532019-11-05 13:51:51 -0700889 unsigned long flags;
Jens Axboe771b53d02019-10-22 10:25:58 -0600890 bool found = false;
891
892 cwork->flags |= IO_WQ_WORK_CANCEL;
893
894 /*
895 * First check pending list, if we're lucky we can just remove it
896 * from there. CANCEL_OK means that the work is returned as-new,
897 * no completion will be posted for it.
898 */
Jens Axboe6f726532019-11-05 13:51:51 -0700899 spin_lock_irqsave(&wqe->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600900 list_for_each_entry(work, &wqe->work_list, list) {
901 if (work == cwork) {
902 list_del(&work->list);
903 found = true;
904 break;
905 }
906 }
Jens Axboe6f726532019-11-05 13:51:51 -0700907 spin_unlock_irqrestore(&wqe->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600908
909 if (found) {
910 work->flags |= IO_WQ_WORK_CANCEL;
911 work->func(&work);
912 return IO_WQ_CANCEL_OK;
913 }
914
915 /*
916 * Now check if a free (going busy) or busy worker has the work
917 * currently running. If we find it there, we'll return CANCEL_RUNNING
918 * as an indication that we attempte to signal cancellation. The
919 * completion will run normally in this case.
920 */
921 rcu_read_lock();
Jens Axboee61df662019-11-13 13:54:49 -0700922 found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork);
Jens Axboe771b53d02019-10-22 10:25:58 -0600923 rcu_read_unlock();
924 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
925}
926
927enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
928{
929 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
930 int i;
931
932 for (i = 0; i < wq->nr_wqes; i++) {
933 struct io_wqe *wqe = wq->wqes[i];
934
935 ret = io_wqe_cancel_work(wqe, cwork);
936 if (ret != IO_WQ_CANCEL_NOTFOUND)
937 break;
938 }
939
940 return ret;
941}
942
943struct io_wq_flush_data {
944 struct io_wq_work work;
945 struct completion done;
946};
947
948static void io_wq_flush_func(struct io_wq_work **workptr)
949{
950 struct io_wq_work *work = *workptr;
951 struct io_wq_flush_data *data;
952
953 data = container_of(work, struct io_wq_flush_data, work);
954 complete(&data->done);
955}
956
957/*
958 * Doesn't wait for previously queued work to finish. When this completes,
959 * it just means that previously queued work was started.
960 */
961void io_wq_flush(struct io_wq *wq)
962{
963 struct io_wq_flush_data data;
964 int i;
965
966 for (i = 0; i < wq->nr_wqes; i++) {
967 struct io_wqe *wqe = wq->wqes[i];
968
969 init_completion(&data.done);
970 INIT_IO_WORK(&data.work, io_wq_flush_func);
Jens Axboe7d723062019-11-12 22:31:31 -0700971 data.work.flags |= IO_WQ_WORK_INTERNAL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600972 io_wqe_enqueue(wqe, &data.work);
973 wait_for_completion(&data.done);
974 }
975}
976
Jens Axboec5def4a2019-11-07 11:41:16 -0700977struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
Jens Axboe7d723062019-11-12 22:31:31 -0700978 struct user_struct *user, get_work_fn *get_work,
979 put_work_fn *put_work)
Jens Axboe771b53d02019-10-22 10:25:58 -0600980{
981 int ret = -ENOMEM, i, node;
982 struct io_wq *wq;
983
984 wq = kcalloc(1, sizeof(*wq), GFP_KERNEL);
985 if (!wq)
986 return ERR_PTR(-ENOMEM);
987
988 wq->nr_wqes = num_online_nodes();
989 wq->wqes = kcalloc(wq->nr_wqes, sizeof(struct io_wqe *), GFP_KERNEL);
990 if (!wq->wqes) {
991 kfree(wq);
992 return ERR_PTR(-ENOMEM);
993 }
994
Jens Axboe7d723062019-11-12 22:31:31 -0700995 wq->get_work = get_work;
996 wq->put_work = put_work;
997
Jens Axboec5def4a2019-11-07 11:41:16 -0700998 /* caller must already hold a reference to this */
999 wq->user = user;
1000
Jens Axboe771b53d02019-10-22 10:25:58 -06001001 i = 0;
Jens Axboe771b53d02019-10-22 10:25:58 -06001002 for_each_online_node(node) {
1003 struct io_wqe *wqe;
1004
1005 wqe = kcalloc_node(1, sizeof(struct io_wqe), GFP_KERNEL, node);
1006 if (!wqe)
1007 break;
1008 wq->wqes[i] = wqe;
1009 wqe->node = node;
Jens Axboec5def4a2019-11-07 11:41:16 -07001010 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1011 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
1012 if (user) {
1013 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1014 task_rlimit(current, RLIMIT_NPROC);
1015 }
1016 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
Jens Axboe771b53d02019-10-22 10:25:58 -06001017 wqe->node = node;
1018 wqe->wq = wq;
1019 spin_lock_init(&wqe->lock);
1020 INIT_LIST_HEAD(&wqe->work_list);
Jens Axboe021d1cd2019-11-14 08:00:41 -07001021 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1022 INIT_HLIST_NULLS_HEAD(&wqe->busy_list, 1);
Jens Axboee61df662019-11-13 13:54:49 -07001023 INIT_LIST_HEAD(&wqe->all_list);
Jens Axboe771b53d02019-10-22 10:25:58 -06001024
1025 i++;
1026 }
1027
1028 init_completion(&wq->done);
1029
1030 if (i != wq->nr_wqes)
1031 goto err;
1032
1033 /* caller must have already done mmgrab() on this mm */
1034 wq->mm = mm;
1035
1036 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
1037 if (!IS_ERR(wq->manager)) {
1038 wake_up_process(wq->manager);
Jens Axboeb60fda62019-11-19 08:37:07 -07001039 wait_for_completion(&wq->done);
1040 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
1041 ret = -ENOMEM;
1042 goto err;
1043 }
1044 reinit_completion(&wq->done);
Jens Axboe771b53d02019-10-22 10:25:58 -06001045 return wq;
1046 }
1047
1048 ret = PTR_ERR(wq->manager);
Jens Axboe771b53d02019-10-22 10:25:58 -06001049 complete(&wq->done);
Jens Axboeb60fda62019-11-19 08:37:07 -07001050err:
1051 for (i = 0; i < wq->nr_wqes; i++)
1052 kfree(wq->wqes[i]);
1053 kfree(wq->wqes);
1054 kfree(wq);
Jens Axboe771b53d02019-10-22 10:25:58 -06001055 return ERR_PTR(ret);
1056}
1057
1058static bool io_wq_worker_wake(struct io_worker *worker, void *data)
1059{
1060 wake_up_process(worker->task);
1061 return false;
1062}
1063
1064void io_wq_destroy(struct io_wq *wq)
1065{
1066 int i;
1067
Jens Axboeb60fda62019-11-19 08:37:07 -07001068 set_bit(IO_WQ_BIT_EXIT, &wq->state);
1069 if (wq->manager)
Jens Axboe771b53d02019-10-22 10:25:58 -06001070 kthread_stop(wq->manager);
Jens Axboe771b53d02019-10-22 10:25:58 -06001071
1072 rcu_read_lock();
1073 for (i = 0; i < wq->nr_wqes; i++) {
1074 struct io_wqe *wqe = wq->wqes[i];
1075
1076 if (!wqe)
1077 continue;
Jens Axboee61df662019-11-13 13:54:49 -07001078 io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
Jens Axboe771b53d02019-10-22 10:25:58 -06001079 }
1080 rcu_read_unlock();
1081
1082 wait_for_completion(&wq->done);
1083
1084 for (i = 0; i < wq->nr_wqes; i++)
1085 kfree(wq->wqes[i]);
1086 kfree(wq->wqes);
1087 kfree(wq);
1088}