blob: dffea3c25a6a02d511516ecedb4a852b579c6ccf [file] [log] [blame]
Jens Axboe771b53d02019-10-22 10:25:58 -06001// SPDX-License-Identifier: GPL-2.0
2/*
3 * Basic worker thread pool for io_uring
4 *
5 * Copyright (C) 2019 Jens Axboe
6 *
7 */
8#include <linux/kernel.h>
9#include <linux/init.h>
10#include <linux/errno.h>
11#include <linux/sched/signal.h>
12#include <linux/mm.h>
13#include <linux/mmu_context.h>
14#include <linux/sched/mm.h>
15#include <linux/percpu.h>
16#include <linux/slab.h>
17#include <linux/kthread.h>
18#include <linux/rculist_nulls.h>
19
20#include "io-wq.h"
21
22#define WORKER_IDLE_TIMEOUT (5 * HZ)
23
24enum {
25 IO_WORKER_F_UP = 1, /* up and active */
26 IO_WORKER_F_RUNNING = 2, /* account as running */
27 IO_WORKER_F_FREE = 4, /* worker on free list */
28 IO_WORKER_F_EXITING = 8, /* worker exiting */
29 IO_WORKER_F_FIXED = 16, /* static idle worker */
Jens Axboec5def4a2019-11-07 11:41:16 -070030 IO_WORKER_F_BOUND = 32, /* is doing bounded work */
Jens Axboe771b53d02019-10-22 10:25:58 -060031};
32
33enum {
34 IO_WQ_BIT_EXIT = 0, /* wq exiting */
35 IO_WQ_BIT_CANCEL = 1, /* cancel work on list */
Jens Axboeb60fda62019-11-19 08:37:07 -070036 IO_WQ_BIT_ERROR = 2, /* error on setup */
Jens Axboe771b53d02019-10-22 10:25:58 -060037};
38
39enum {
40 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
41};
42
43/*
44 * One for each thread in a wqe pool
45 */
46struct io_worker {
47 refcount_t ref;
48 unsigned flags;
49 struct hlist_nulls_node nulls_node;
Jens Axboee61df662019-11-13 13:54:49 -070050 struct list_head all_list;
Jens Axboe771b53d02019-10-22 10:25:58 -060051 struct task_struct *task;
52 wait_queue_head_t wait;
53 struct io_wqe *wqe;
Jens Axboe36c2f922019-11-13 09:43:34 -070054
Jens Axboe771b53d02019-10-22 10:25:58 -060055 struct io_wq_work *cur_work;
Jens Axboe36c2f922019-11-13 09:43:34 -070056 spinlock_t lock;
Jens Axboe771b53d02019-10-22 10:25:58 -060057
58 struct rcu_head rcu;
59 struct mm_struct *mm;
Jens Axboe181e4482019-11-25 08:52:30 -070060 const struct cred *creds;
Jens Axboefcb323c2019-10-24 12:39:47 -060061 struct files_struct *restore_files;
Jens Axboe771b53d02019-10-22 10:25:58 -060062};
63
Jens Axboe771b53d02019-10-22 10:25:58 -060064#if BITS_PER_LONG == 64
65#define IO_WQ_HASH_ORDER 6
66#else
67#define IO_WQ_HASH_ORDER 5
68#endif
69
Jens Axboec5def4a2019-11-07 11:41:16 -070070struct io_wqe_acct {
71 unsigned nr_workers;
72 unsigned max_workers;
73 atomic_t nr_running;
74};
75
76enum {
77 IO_WQ_ACCT_BOUND,
78 IO_WQ_ACCT_UNBOUND,
79};
80
Jens Axboe771b53d02019-10-22 10:25:58 -060081/*
82 * Per-node worker thread pool
83 */
84struct io_wqe {
85 struct {
86 spinlock_t lock;
87 struct list_head work_list;
88 unsigned long hash_map;
89 unsigned flags;
90 } ____cacheline_aligned_in_smp;
91
92 int node;
Jens Axboec5def4a2019-11-07 11:41:16 -070093 struct io_wqe_acct acct[2];
Jens Axboe771b53d02019-10-22 10:25:58 -060094
Jens Axboe021d1cd2019-11-14 08:00:41 -070095 struct hlist_nulls_head free_list;
96 struct hlist_nulls_head busy_list;
Jens Axboee61df662019-11-13 13:54:49 -070097 struct list_head all_list;
Jens Axboe771b53d02019-10-22 10:25:58 -060098
99 struct io_wq *wq;
100};
101
102/*
103 * Per io_wq state
104 */
105struct io_wq {
106 struct io_wqe **wqes;
107 unsigned long state;
108 unsigned nr_wqes;
109
Jens Axboe7d723062019-11-12 22:31:31 -0700110 get_work_fn *get_work;
111 put_work_fn *put_work;
112
Jens Axboe771b53d02019-10-22 10:25:58 -0600113 struct task_struct *manager;
Jens Axboec5def4a2019-11-07 11:41:16 -0700114 struct user_struct *user;
Jens Axboe181e4482019-11-25 08:52:30 -0700115 struct cred *creds;
Jens Axboe771b53d02019-10-22 10:25:58 -0600116 struct mm_struct *mm;
117 refcount_t refs;
118 struct completion done;
119};
120
Jens Axboe771b53d02019-10-22 10:25:58 -0600121static bool io_worker_get(struct io_worker *worker)
122{
123 return refcount_inc_not_zero(&worker->ref);
124}
125
126static void io_worker_release(struct io_worker *worker)
127{
128 if (refcount_dec_and_test(&worker->ref))
129 wake_up_process(worker->task);
130}
131
132/*
133 * Note: drops the wqe->lock if returning true! The caller must re-acquire
134 * the lock in that case. Some callers need to restart handling if this
135 * happens, so we can't just re-acquire the lock on behalf of the caller.
136 */
137static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
138{
Jens Axboefcb323c2019-10-24 12:39:47 -0600139 bool dropped_lock = false;
140
Jens Axboe181e4482019-11-25 08:52:30 -0700141 if (worker->creds) {
142 revert_creds(worker->creds);
143 worker->creds = NULL;
144 }
145
Jens Axboefcb323c2019-10-24 12:39:47 -0600146 if (current->files != worker->restore_files) {
147 __acquire(&wqe->lock);
148 spin_unlock_irq(&wqe->lock);
149 dropped_lock = true;
150
151 task_lock(current);
152 current->files = worker->restore_files;
153 task_unlock(current);
154 }
155
Jens Axboe771b53d02019-10-22 10:25:58 -0600156 /*
157 * If we have an active mm, we need to drop the wq lock before unusing
158 * it. If we do, return true and let the caller retry the idle loop.
159 */
160 if (worker->mm) {
Jens Axboefcb323c2019-10-24 12:39:47 -0600161 if (!dropped_lock) {
162 __acquire(&wqe->lock);
163 spin_unlock_irq(&wqe->lock);
164 dropped_lock = true;
165 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600166 __set_current_state(TASK_RUNNING);
167 set_fs(KERNEL_DS);
168 unuse_mm(worker->mm);
169 mmput(worker->mm);
170 worker->mm = NULL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600171 }
172
Jens Axboefcb323c2019-10-24 12:39:47 -0600173 return dropped_lock;
Jens Axboe771b53d02019-10-22 10:25:58 -0600174}
175
Jens Axboec5def4a2019-11-07 11:41:16 -0700176static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
177 struct io_wq_work *work)
178{
179 if (work->flags & IO_WQ_WORK_UNBOUND)
180 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
181
182 return &wqe->acct[IO_WQ_ACCT_BOUND];
183}
184
185static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
186 struct io_worker *worker)
187{
188 if (worker->flags & IO_WORKER_F_BOUND)
189 return &wqe->acct[IO_WQ_ACCT_BOUND];
190
191 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
192}
193
Jens Axboe771b53d02019-10-22 10:25:58 -0600194static void io_worker_exit(struct io_worker *worker)
195{
196 struct io_wqe *wqe = worker->wqe;
Jens Axboec5def4a2019-11-07 11:41:16 -0700197 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
198 unsigned nr_workers;
Jens Axboe771b53d02019-10-22 10:25:58 -0600199
200 /*
201 * If we're not at zero, someone else is holding a brief reference
202 * to the worker. Wait for that to go away.
203 */
204 set_current_state(TASK_INTERRUPTIBLE);
205 if (!refcount_dec_and_test(&worker->ref))
206 schedule();
207 __set_current_state(TASK_RUNNING);
208
209 preempt_disable();
210 current->flags &= ~PF_IO_WORKER;
211 if (worker->flags & IO_WORKER_F_RUNNING)
Jens Axboec5def4a2019-11-07 11:41:16 -0700212 atomic_dec(&acct->nr_running);
213 if (!(worker->flags & IO_WORKER_F_BOUND))
214 atomic_dec(&wqe->wq->user->processes);
Jens Axboe771b53d02019-10-22 10:25:58 -0600215 worker->flags = 0;
216 preempt_enable();
217
218 spin_lock_irq(&wqe->lock);
219 hlist_nulls_del_rcu(&worker->nulls_node);
Jens Axboee61df662019-11-13 13:54:49 -0700220 list_del_rcu(&worker->all_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600221 if (__io_worker_unuse(wqe, worker)) {
222 __release(&wqe->lock);
223 spin_lock_irq(&wqe->lock);
224 }
Jens Axboec5def4a2019-11-07 11:41:16 -0700225 acct->nr_workers--;
226 nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers +
227 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers;
Jens Axboe771b53d02019-10-22 10:25:58 -0600228 spin_unlock_irq(&wqe->lock);
229
230 /* all workers gone, wq exit can proceed */
Jens Axboec5def4a2019-11-07 11:41:16 -0700231 if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs))
Jens Axboe771b53d02019-10-22 10:25:58 -0600232 complete(&wqe->wq->done);
233
YueHaibing364b05f2019-11-02 15:55:01 +0800234 kfree_rcu(worker, rcu);
Jens Axboe771b53d02019-10-22 10:25:58 -0600235}
236
Jens Axboec5def4a2019-11-07 11:41:16 -0700237static inline bool io_wqe_run_queue(struct io_wqe *wqe)
238 __must_hold(wqe->lock)
239{
240 if (!list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED))
241 return true;
242 return false;
243}
244
245/*
246 * Check head of free list for an available worker. If one isn't available,
247 * caller must wake up the wq manager to create one.
248 */
249static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
250 __must_hold(RCU)
251{
252 struct hlist_nulls_node *n;
253 struct io_worker *worker;
254
Jens Axboe021d1cd2019-11-14 08:00:41 -0700255 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
Jens Axboec5def4a2019-11-07 11:41:16 -0700256 if (is_a_nulls(n))
257 return false;
258
259 worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
260 if (io_worker_get(worker)) {
261 wake_up(&worker->wait);
262 io_worker_release(worker);
263 return true;
264 }
265
266 return false;
267}
268
269/*
270 * We need a worker. If we find a free one, we're good. If not, and we're
271 * below the max number of workers, wake up the manager to create one.
272 */
273static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
274{
275 bool ret;
276
277 /*
278 * Most likely an attempt to queue unbounded work on an io_wq that
279 * wasn't setup with any unbounded workers.
280 */
281 WARN_ON_ONCE(!acct->max_workers);
282
283 rcu_read_lock();
284 ret = io_wqe_activate_free_worker(wqe);
285 rcu_read_unlock();
286
287 if (!ret && acct->nr_workers < acct->max_workers)
288 wake_up_process(wqe->wq->manager);
289}
290
291static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
292{
293 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
294
295 atomic_inc(&acct->nr_running);
296}
297
298static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
299 __must_hold(wqe->lock)
300{
301 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
302
303 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
304 io_wqe_wake_worker(wqe, acct);
305}
306
Jens Axboe771b53d02019-10-22 10:25:58 -0600307static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
308{
309 allow_kernel_signal(SIGINT);
310
311 current->flags |= PF_IO_WORKER;
312
313 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
Jens Axboefcb323c2019-10-24 12:39:47 -0600314 worker->restore_files = current->files;
Jens Axboec5def4a2019-11-07 11:41:16 -0700315 io_wqe_inc_running(wqe, worker);
Jens Axboe771b53d02019-10-22 10:25:58 -0600316}
317
318/*
319 * Worker will start processing some work. Move it to the busy list, if
320 * it's currently on the freelist
321 */
322static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
323 struct io_wq_work *work)
324 __must_hold(wqe->lock)
325{
Jens Axboec5def4a2019-11-07 11:41:16 -0700326 bool worker_bound, work_bound;
327
Jens Axboe771b53d02019-10-22 10:25:58 -0600328 if (worker->flags & IO_WORKER_F_FREE) {
329 worker->flags &= ~IO_WORKER_F_FREE;
330 hlist_nulls_del_init_rcu(&worker->nulls_node);
Jens Axboe021d1cd2019-11-14 08:00:41 -0700331 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->busy_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600332 }
Jens Axboec5def4a2019-11-07 11:41:16 -0700333
334 /*
335 * If worker is moving from bound to unbound (or vice versa), then
336 * ensure we update the running accounting.
337 */
Dan Carpenterb2e9c7d62019-11-19 09:22:16 +0300338 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
339 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
340 if (worker_bound != work_bound) {
Jens Axboec5def4a2019-11-07 11:41:16 -0700341 io_wqe_dec_running(wqe, worker);
342 if (work_bound) {
343 worker->flags |= IO_WORKER_F_BOUND;
344 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
345 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
346 atomic_dec(&wqe->wq->user->processes);
347 } else {
348 worker->flags &= ~IO_WORKER_F_BOUND;
349 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
350 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
351 atomic_inc(&wqe->wq->user->processes);
352 }
353 io_wqe_inc_running(wqe, worker);
354 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600355}
356
357/*
358 * No work, worker going to sleep. Move to freelist, and unuse mm if we
359 * have one attached. Dropping the mm may potentially sleep, so we drop
360 * the lock in that case and return success. Since the caller has to
361 * retry the loop in that case (we changed task state), we don't regrab
362 * the lock if we return success.
363 */
364static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
365 __must_hold(wqe->lock)
366{
367 if (!(worker->flags & IO_WORKER_F_FREE)) {
368 worker->flags |= IO_WORKER_F_FREE;
369 hlist_nulls_del_init_rcu(&worker->nulls_node);
Jens Axboe021d1cd2019-11-14 08:00:41 -0700370 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600371 }
372
373 return __io_worker_unuse(wqe, worker);
374}
375
376static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
377 __must_hold(wqe->lock)
378{
379 struct io_wq_work *work;
380
381 list_for_each_entry(work, &wqe->work_list, list) {
382 /* not hashed, can run anytime */
383 if (!(work->flags & IO_WQ_WORK_HASHED)) {
384 list_del(&work->list);
385 return work;
386 }
387
388 /* hashed, can run if not already running */
389 *hash = work->flags >> IO_WQ_HASH_SHIFT;
390 if (!(wqe->hash_map & BIT_ULL(*hash))) {
391 wqe->hash_map |= BIT_ULL(*hash);
392 list_del(&work->list);
393 return work;
394 }
395 }
396
397 return NULL;
398}
399
400static void io_worker_handle_work(struct io_worker *worker)
401 __releases(wqe->lock)
402{
Jens Axboe7d723062019-11-12 22:31:31 -0700403 struct io_wq_work *work, *old_work = NULL, *put_work = NULL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600404 struct io_wqe *wqe = worker->wqe;
405 struct io_wq *wq = wqe->wq;
406
407 do {
408 unsigned hash = -1U;
409
410 /*
Jens Axboe771b53d02019-10-22 10:25:58 -0600411 * If we got some work, mark us as busy. If we didn't, but
412 * the list isn't empty, it means we stalled on hashed work.
413 * Mark us stalled so we don't keep looking for work when we
414 * can't make progress, any work completion or insertion will
415 * clear the stalled flag.
416 */
417 work = io_get_next_work(wqe, &hash);
418 if (work)
419 __io_worker_busy(wqe, worker, work);
420 else if (!list_empty(&wqe->work_list))
421 wqe->flags |= IO_WQE_FLAG_STALLED;
422
423 spin_unlock_irq(&wqe->lock);
Jens Axboe7d723062019-11-12 22:31:31 -0700424 if (put_work && wq->put_work)
425 wq->put_work(old_work);
Jens Axboe771b53d02019-10-22 10:25:58 -0600426 if (!work)
427 break;
428next:
Jens Axboe36c2f922019-11-13 09:43:34 -0700429 /* flush any pending signals before assigning new work */
430 if (signal_pending(current))
431 flush_signals(current);
432
433 spin_lock_irq(&worker->lock);
434 worker->cur_work = work;
435 spin_unlock_irq(&worker->lock);
436
Jens Axboeb76da702019-11-20 13:05:32 -0700437 if (work->flags & IO_WQ_WORK_CB)
438 work->func(&work);
439
Jens Axboefcb323c2019-10-24 12:39:47 -0600440 if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
441 current->files != work->files) {
442 task_lock(current);
443 current->files = work->files;
444 task_unlock(current);
445 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600446 if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
447 wq->mm && mmget_not_zero(wq->mm)) {
448 use_mm(wq->mm);
449 set_fs(USER_DS);
450 worker->mm = wq->mm;
451 }
Jens Axboe181e4482019-11-25 08:52:30 -0700452 if (!worker->creds)
453 worker->creds = override_creds(wq->creds);
Jens Axboe771b53d02019-10-22 10:25:58 -0600454 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
455 work->flags |= IO_WQ_WORK_CANCEL;
456 if (worker->mm)
457 work->flags |= IO_WQ_WORK_HAS_MM;
458
Jens Axboe7d723062019-11-12 22:31:31 -0700459 if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) {
460 put_work = work;
461 wq->get_work(work);
462 }
463
Jens Axboe771b53d02019-10-22 10:25:58 -0600464 old_work = work;
465 work->func(&work);
466
Jens Axboe36c2f922019-11-13 09:43:34 -0700467 spin_lock_irq(&worker->lock);
Jens Axboe771b53d02019-10-22 10:25:58 -0600468 worker->cur_work = NULL;
Jens Axboe36c2f922019-11-13 09:43:34 -0700469 spin_unlock_irq(&worker->lock);
470
471 spin_lock_irq(&wqe->lock);
472
Jens Axboe771b53d02019-10-22 10:25:58 -0600473 if (hash != -1U) {
474 wqe->hash_map &= ~BIT_ULL(hash);
475 wqe->flags &= ~IO_WQE_FLAG_STALLED;
476 }
477 if (work && work != old_work) {
478 spin_unlock_irq(&wqe->lock);
Jens Axboe7d723062019-11-12 22:31:31 -0700479
480 if (put_work && wq->put_work) {
481 wq->put_work(put_work);
482 put_work = NULL;
483 }
484
Jens Axboe771b53d02019-10-22 10:25:58 -0600485 /* dependent work not hashed */
486 hash = -1U;
487 goto next;
488 }
489 } while (1);
490}
491
Jens Axboe771b53d02019-10-22 10:25:58 -0600492static int io_wqe_worker(void *data)
493{
494 struct io_worker *worker = data;
495 struct io_wqe *wqe = worker->wqe;
496 struct io_wq *wq = wqe->wq;
497 DEFINE_WAIT(wait);
498
499 io_worker_start(wqe, worker);
500
501 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
502 prepare_to_wait(&worker->wait, &wait, TASK_INTERRUPTIBLE);
503
504 spin_lock_irq(&wqe->lock);
505 if (io_wqe_run_queue(wqe)) {
506 __set_current_state(TASK_RUNNING);
507 io_worker_handle_work(worker);
508 continue;
509 }
510 /* drops the lock on success, retry */
511 if (__io_worker_idle(wqe, worker)) {
512 __release(&wqe->lock);
513 continue;
514 }
515 spin_unlock_irq(&wqe->lock);
516 if (signal_pending(current))
517 flush_signals(current);
518 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
519 continue;
520 /* timed out, exit unless we're the fixed worker */
521 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
522 !(worker->flags & IO_WORKER_F_FIXED))
523 break;
524 }
525
526 finish_wait(&worker->wait, &wait);
527
528 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
529 spin_lock_irq(&wqe->lock);
530 if (!list_empty(&wqe->work_list))
531 io_worker_handle_work(worker);
532 else
533 spin_unlock_irq(&wqe->lock);
534 }
535
536 io_worker_exit(worker);
537 return 0;
538}
539
540/*
Jens Axboe771b53d02019-10-22 10:25:58 -0600541 * Called when a worker is scheduled in. Mark us as currently running.
542 */
543void io_wq_worker_running(struct task_struct *tsk)
544{
545 struct io_worker *worker = kthread_data(tsk);
546 struct io_wqe *wqe = worker->wqe;
547
548 if (!(worker->flags & IO_WORKER_F_UP))
549 return;
550 if (worker->flags & IO_WORKER_F_RUNNING)
551 return;
552 worker->flags |= IO_WORKER_F_RUNNING;
Jens Axboec5def4a2019-11-07 11:41:16 -0700553 io_wqe_inc_running(wqe, worker);
Jens Axboe771b53d02019-10-22 10:25:58 -0600554}
555
556/*
557 * Called when worker is going to sleep. If there are no workers currently
558 * running and we have work pending, wake up a free one or have the manager
559 * set one up.
560 */
561void io_wq_worker_sleeping(struct task_struct *tsk)
562{
563 struct io_worker *worker = kthread_data(tsk);
564 struct io_wqe *wqe = worker->wqe;
565
566 if (!(worker->flags & IO_WORKER_F_UP))
567 return;
568 if (!(worker->flags & IO_WORKER_F_RUNNING))
569 return;
570
571 worker->flags &= ~IO_WORKER_F_RUNNING;
572
573 spin_lock_irq(&wqe->lock);
Jens Axboec5def4a2019-11-07 11:41:16 -0700574 io_wqe_dec_running(wqe, worker);
Jens Axboe771b53d02019-10-22 10:25:58 -0600575 spin_unlock_irq(&wqe->lock);
576}
577
Jens Axboeb60fda62019-11-19 08:37:07 -0700578static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
Jens Axboe771b53d02019-10-22 10:25:58 -0600579{
Jens Axboec5def4a2019-11-07 11:41:16 -0700580 struct io_wqe_acct *acct =&wqe->acct[index];
Jens Axboe771b53d02019-10-22 10:25:58 -0600581 struct io_worker *worker;
582
Jann Hornad6e0052019-11-26 17:39:45 +0100583 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
Jens Axboe771b53d02019-10-22 10:25:58 -0600584 if (!worker)
Jens Axboeb60fda62019-11-19 08:37:07 -0700585 return false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600586
587 refcount_set(&worker->ref, 1);
588 worker->nulls_node.pprev = NULL;
589 init_waitqueue_head(&worker->wait);
590 worker->wqe = wqe;
Jens Axboe36c2f922019-11-13 09:43:34 -0700591 spin_lock_init(&worker->lock);
Jens Axboe771b53d02019-10-22 10:25:58 -0600592
593 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
Jens Axboec5def4a2019-11-07 11:41:16 -0700594 "io_wqe_worker-%d/%d", index, wqe->node);
Jens Axboe771b53d02019-10-22 10:25:58 -0600595 if (IS_ERR(worker->task)) {
596 kfree(worker);
Jens Axboeb60fda62019-11-19 08:37:07 -0700597 return false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600598 }
599
600 spin_lock_irq(&wqe->lock);
Jens Axboe021d1cd2019-11-14 08:00:41 -0700601 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
Jens Axboee61df662019-11-13 13:54:49 -0700602 list_add_tail_rcu(&worker->all_list, &wqe->all_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600603 worker->flags |= IO_WORKER_F_FREE;
Jens Axboec5def4a2019-11-07 11:41:16 -0700604 if (index == IO_WQ_ACCT_BOUND)
605 worker->flags |= IO_WORKER_F_BOUND;
606 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
Jens Axboe771b53d02019-10-22 10:25:58 -0600607 worker->flags |= IO_WORKER_F_FIXED;
Jens Axboec5def4a2019-11-07 11:41:16 -0700608 acct->nr_workers++;
Jens Axboe771b53d02019-10-22 10:25:58 -0600609 spin_unlock_irq(&wqe->lock);
610
Jens Axboec5def4a2019-11-07 11:41:16 -0700611 if (index == IO_WQ_ACCT_UNBOUND)
612 atomic_inc(&wq->user->processes);
613
Jens Axboe771b53d02019-10-22 10:25:58 -0600614 wake_up_process(worker->task);
Jens Axboeb60fda62019-11-19 08:37:07 -0700615 return true;
Jens Axboe771b53d02019-10-22 10:25:58 -0600616}
617
Jens Axboec5def4a2019-11-07 11:41:16 -0700618static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
Jens Axboe771b53d02019-10-22 10:25:58 -0600619 __must_hold(wqe->lock)
620{
Jens Axboec5def4a2019-11-07 11:41:16 -0700621 struct io_wqe_acct *acct = &wqe->acct[index];
Jens Axboe771b53d02019-10-22 10:25:58 -0600622
Jens Axboec5def4a2019-11-07 11:41:16 -0700623 /* if we have available workers or no work, no need */
Jens Axboe021d1cd2019-11-14 08:00:41 -0700624 if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
Jens Axboec5def4a2019-11-07 11:41:16 -0700625 return false;
626 return acct->nr_workers < acct->max_workers;
Jens Axboe771b53d02019-10-22 10:25:58 -0600627}
628
629/*
630 * Manager thread. Tasked with creating new workers, if we need them.
631 */
632static int io_wq_manager(void *data)
633{
634 struct io_wq *wq = data;
Jens Axboeb60fda62019-11-19 08:37:07 -0700635 int i;
636
637 /* create fixed workers */
638 refcount_set(&wq->refs, wq->nr_wqes);
639 for (i = 0; i < wq->nr_wqes; i++) {
640 if (create_io_worker(wq, wq->wqes[i], IO_WQ_ACCT_BOUND))
641 continue;
642 goto err;
643 }
644
645 complete(&wq->done);
Jens Axboe771b53d02019-10-22 10:25:58 -0600646
647 while (!kthread_should_stop()) {
Jens Axboe771b53d02019-10-22 10:25:58 -0600648 for (i = 0; i < wq->nr_wqes; i++) {
649 struct io_wqe *wqe = wq->wqes[i];
Jens Axboec5def4a2019-11-07 11:41:16 -0700650 bool fork_worker[2] = { false, false };
Jens Axboe771b53d02019-10-22 10:25:58 -0600651
652 spin_lock_irq(&wqe->lock);
Jens Axboec5def4a2019-11-07 11:41:16 -0700653 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
654 fork_worker[IO_WQ_ACCT_BOUND] = true;
655 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
656 fork_worker[IO_WQ_ACCT_UNBOUND] = true;
Jens Axboe771b53d02019-10-22 10:25:58 -0600657 spin_unlock_irq(&wqe->lock);
Jens Axboec5def4a2019-11-07 11:41:16 -0700658 if (fork_worker[IO_WQ_ACCT_BOUND])
659 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
660 if (fork_worker[IO_WQ_ACCT_UNBOUND])
661 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
Jens Axboe771b53d02019-10-22 10:25:58 -0600662 }
663 set_current_state(TASK_INTERRUPTIBLE);
664 schedule_timeout(HZ);
665 }
666
667 return 0;
Jens Axboeb60fda62019-11-19 08:37:07 -0700668err:
669 set_bit(IO_WQ_BIT_ERROR, &wq->state);
670 set_bit(IO_WQ_BIT_EXIT, &wq->state);
671 if (refcount_sub_and_test(wq->nr_wqes - i, &wq->refs))
672 complete(&wq->done);
673 return 0;
Jens Axboe771b53d02019-10-22 10:25:58 -0600674}
675
Jens Axboec5def4a2019-11-07 11:41:16 -0700676static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
677 struct io_wq_work *work)
678{
679 bool free_worker;
680
681 if (!(work->flags & IO_WQ_WORK_UNBOUND))
682 return true;
683 if (atomic_read(&acct->nr_running))
684 return true;
685
686 rcu_read_lock();
Jens Axboe021d1cd2019-11-14 08:00:41 -0700687 free_worker = !hlist_nulls_empty(&wqe->free_list);
Jens Axboec5def4a2019-11-07 11:41:16 -0700688 rcu_read_unlock();
689 if (free_worker)
690 return true;
691
692 if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
693 !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
694 return false;
695
696 return true;
697}
698
Jens Axboe771b53d02019-10-22 10:25:58 -0600699static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
700{
Jens Axboec5def4a2019-11-07 11:41:16 -0700701 struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
Jens Axboe771b53d02019-10-22 10:25:58 -0600702 unsigned long flags;
703
Jens Axboec5def4a2019-11-07 11:41:16 -0700704 /*
705 * Do early check to see if we need a new unbound worker, and if we do,
706 * if we're allowed to do so. This isn't 100% accurate as there's a
707 * gap between this check and incrementing the value, but that's OK.
708 * It's close enough to not be an issue, fork() has the same delay.
709 */
710 if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
711 work->flags |= IO_WQ_WORK_CANCEL;
712 work->func(&work);
713 return;
714 }
715
Jens Axboe771b53d02019-10-22 10:25:58 -0600716 spin_lock_irqsave(&wqe->lock, flags);
717 list_add_tail(&work->list, &wqe->work_list);
718 wqe->flags &= ~IO_WQE_FLAG_STALLED;
719 spin_unlock_irqrestore(&wqe->lock, flags);
720
Jens Axboec5def4a2019-11-07 11:41:16 -0700721 if (!atomic_read(&acct->nr_running))
722 io_wqe_wake_worker(wqe, acct);
Jens Axboe771b53d02019-10-22 10:25:58 -0600723}
724
725void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
726{
727 struct io_wqe *wqe = wq->wqes[numa_node_id()];
728
729 io_wqe_enqueue(wqe, work);
730}
731
732/*
733 * Enqueue work, hashed by some key. Work items that hash to the same value
734 * will not be done in parallel. Used to limit concurrent writes, generally
735 * hashed by inode.
736 */
737void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val)
738{
739 struct io_wqe *wqe = wq->wqes[numa_node_id()];
740 unsigned bit;
741
742
743 bit = hash_ptr(val, IO_WQ_HASH_ORDER);
744 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
745 io_wqe_enqueue(wqe, work);
746}
747
748static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
749{
750 send_sig(SIGINT, worker->task, 1);
751 return false;
752}
753
754/*
755 * Iterate the passed in list and call the specific function for each
756 * worker that isn't exiting
757 */
758static bool io_wq_for_each_worker(struct io_wqe *wqe,
Jens Axboe771b53d02019-10-22 10:25:58 -0600759 bool (*func)(struct io_worker *, void *),
760 void *data)
761{
Jens Axboe771b53d02019-10-22 10:25:58 -0600762 struct io_worker *worker;
763 bool ret = false;
764
Jens Axboee61df662019-11-13 13:54:49 -0700765 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
Jens Axboe771b53d02019-10-22 10:25:58 -0600766 if (io_worker_get(worker)) {
767 ret = func(worker, data);
768 io_worker_release(worker);
769 if (ret)
770 break;
771 }
772 }
Jens Axboee61df662019-11-13 13:54:49 -0700773
Jens Axboe771b53d02019-10-22 10:25:58 -0600774 return ret;
775}
776
777void io_wq_cancel_all(struct io_wq *wq)
778{
779 int i;
780
781 set_bit(IO_WQ_BIT_CANCEL, &wq->state);
782
783 /*
784 * Browse both lists, as there's a gap between handing work off
785 * to a worker and the worker putting itself on the busy_list
786 */
787 rcu_read_lock();
788 for (i = 0; i < wq->nr_wqes; i++) {
789 struct io_wqe *wqe = wq->wqes[i];
790
Jens Axboee61df662019-11-13 13:54:49 -0700791 io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
Jens Axboe771b53d02019-10-22 10:25:58 -0600792 }
793 rcu_read_unlock();
794}
795
Jens Axboe62755e32019-10-28 21:49:21 -0600796struct io_cb_cancel_data {
797 struct io_wqe *wqe;
798 work_cancel_fn *cancel;
799 void *caller_data;
800};
801
802static bool io_work_cancel(struct io_worker *worker, void *cancel_data)
803{
804 struct io_cb_cancel_data *data = cancel_data;
Jens Axboe6f726532019-11-05 13:51:51 -0700805 unsigned long flags;
Jens Axboe62755e32019-10-28 21:49:21 -0600806 bool ret = false;
807
808 /*
809 * Hold the lock to avoid ->cur_work going out of scope, caller
Jens Axboe36c2f922019-11-13 09:43:34 -0700810 * may dereference the passed in work.
Jens Axboe62755e32019-10-28 21:49:21 -0600811 */
Jens Axboe36c2f922019-11-13 09:43:34 -0700812 spin_lock_irqsave(&worker->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600813 if (worker->cur_work &&
814 data->cancel(worker->cur_work, data->caller_data)) {
815 send_sig(SIGINT, worker->task, 1);
816 ret = true;
817 }
Jens Axboe36c2f922019-11-13 09:43:34 -0700818 spin_unlock_irqrestore(&worker->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600819
820 return ret;
821}
822
823static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
824 work_cancel_fn *cancel,
825 void *cancel_data)
826{
827 struct io_cb_cancel_data data = {
828 .wqe = wqe,
829 .cancel = cancel,
830 .caller_data = cancel_data,
831 };
832 struct io_wq_work *work;
Jens Axboe6f726532019-11-05 13:51:51 -0700833 unsigned long flags;
Jens Axboe62755e32019-10-28 21:49:21 -0600834 bool found = false;
835
Jens Axboe6f726532019-11-05 13:51:51 -0700836 spin_lock_irqsave(&wqe->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600837 list_for_each_entry(work, &wqe->work_list, list) {
838 if (cancel(work, cancel_data)) {
839 list_del(&work->list);
840 found = true;
841 break;
842 }
843 }
Jens Axboe6f726532019-11-05 13:51:51 -0700844 spin_unlock_irqrestore(&wqe->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600845
846 if (found) {
847 work->flags |= IO_WQ_WORK_CANCEL;
848 work->func(&work);
849 return IO_WQ_CANCEL_OK;
850 }
851
852 rcu_read_lock();
Jens Axboee61df662019-11-13 13:54:49 -0700853 found = io_wq_for_each_worker(wqe, io_work_cancel, &data);
Jens Axboe62755e32019-10-28 21:49:21 -0600854 rcu_read_unlock();
855 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
856}
857
858enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
859 void *data)
860{
861 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
862 int i;
863
864 for (i = 0; i < wq->nr_wqes; i++) {
865 struct io_wqe *wqe = wq->wqes[i];
866
867 ret = io_wqe_cancel_cb_work(wqe, cancel, data);
868 if (ret != IO_WQ_CANCEL_NOTFOUND)
869 break;
870 }
871
872 return ret;
873}
874
Jens Axboe771b53d02019-10-22 10:25:58 -0600875static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
876{
877 struct io_wq_work *work = data;
Jens Axboe36c2f922019-11-13 09:43:34 -0700878 unsigned long flags;
879 bool ret = false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600880
Jens Axboe36c2f922019-11-13 09:43:34 -0700881 if (worker->cur_work != work)
882 return false;
883
884 spin_lock_irqsave(&worker->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600885 if (worker->cur_work == work) {
886 send_sig(SIGINT, worker->task, 1);
Jens Axboe36c2f922019-11-13 09:43:34 -0700887 ret = true;
Jens Axboe771b53d02019-10-22 10:25:58 -0600888 }
Jens Axboe36c2f922019-11-13 09:43:34 -0700889 spin_unlock_irqrestore(&worker->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600890
Jens Axboe36c2f922019-11-13 09:43:34 -0700891 return ret;
Jens Axboe771b53d02019-10-22 10:25:58 -0600892}
893
894static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
895 struct io_wq_work *cwork)
896{
897 struct io_wq_work *work;
Jens Axboe6f726532019-11-05 13:51:51 -0700898 unsigned long flags;
Jens Axboe771b53d02019-10-22 10:25:58 -0600899 bool found = false;
900
901 cwork->flags |= IO_WQ_WORK_CANCEL;
902
903 /*
904 * First check pending list, if we're lucky we can just remove it
905 * from there. CANCEL_OK means that the work is returned as-new,
906 * no completion will be posted for it.
907 */
Jens Axboe6f726532019-11-05 13:51:51 -0700908 spin_lock_irqsave(&wqe->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600909 list_for_each_entry(work, &wqe->work_list, list) {
910 if (work == cwork) {
911 list_del(&work->list);
912 found = true;
913 break;
914 }
915 }
Jens Axboe6f726532019-11-05 13:51:51 -0700916 spin_unlock_irqrestore(&wqe->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600917
918 if (found) {
919 work->flags |= IO_WQ_WORK_CANCEL;
920 work->func(&work);
921 return IO_WQ_CANCEL_OK;
922 }
923
924 /*
925 * Now check if a free (going busy) or busy worker has the work
926 * currently running. If we find it there, we'll return CANCEL_RUNNING
927 * as an indication that we attempte to signal cancellation. The
928 * completion will run normally in this case.
929 */
930 rcu_read_lock();
Jens Axboee61df662019-11-13 13:54:49 -0700931 found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork);
Jens Axboe771b53d02019-10-22 10:25:58 -0600932 rcu_read_unlock();
933 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
934}
935
936enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
937{
938 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
939 int i;
940
941 for (i = 0; i < wq->nr_wqes; i++) {
942 struct io_wqe *wqe = wq->wqes[i];
943
944 ret = io_wqe_cancel_work(wqe, cwork);
945 if (ret != IO_WQ_CANCEL_NOTFOUND)
946 break;
947 }
948
949 return ret;
950}
951
952struct io_wq_flush_data {
953 struct io_wq_work work;
954 struct completion done;
955};
956
957static void io_wq_flush_func(struct io_wq_work **workptr)
958{
959 struct io_wq_work *work = *workptr;
960 struct io_wq_flush_data *data;
961
962 data = container_of(work, struct io_wq_flush_data, work);
963 complete(&data->done);
964}
965
966/*
967 * Doesn't wait for previously queued work to finish. When this completes,
968 * it just means that previously queued work was started.
969 */
970void io_wq_flush(struct io_wq *wq)
971{
972 struct io_wq_flush_data data;
973 int i;
974
975 for (i = 0; i < wq->nr_wqes; i++) {
976 struct io_wqe *wqe = wq->wqes[i];
977
978 init_completion(&data.done);
979 INIT_IO_WORK(&data.work, io_wq_flush_func);
Jens Axboe7d723062019-11-12 22:31:31 -0700980 data.work.flags |= IO_WQ_WORK_INTERNAL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600981 io_wqe_enqueue(wqe, &data.work);
982 wait_for_completion(&data.done);
983 }
984}
985
Jens Axboe576a3472019-11-25 08:49:20 -0700986struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
Jens Axboe771b53d02019-10-22 10:25:58 -0600987{
988 int ret = -ENOMEM, i, node;
989 struct io_wq *wq;
990
Jann Hornad6e0052019-11-26 17:39:45 +0100991 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
Jens Axboe771b53d02019-10-22 10:25:58 -0600992 if (!wq)
993 return ERR_PTR(-ENOMEM);
994
995 wq->nr_wqes = num_online_nodes();
996 wq->wqes = kcalloc(wq->nr_wqes, sizeof(struct io_wqe *), GFP_KERNEL);
997 if (!wq->wqes) {
998 kfree(wq);
999 return ERR_PTR(-ENOMEM);
1000 }
1001
Jens Axboe576a3472019-11-25 08:49:20 -07001002 wq->get_work = data->get_work;
1003 wq->put_work = data->put_work;
Jens Axboe7d723062019-11-12 22:31:31 -07001004
Jens Axboec5def4a2019-11-07 11:41:16 -07001005 /* caller must already hold a reference to this */
Jens Axboe576a3472019-11-25 08:49:20 -07001006 wq->user = data->user;
Jens Axboe181e4482019-11-25 08:52:30 -07001007 wq->creds = data->creds;
Jens Axboec5def4a2019-11-07 11:41:16 -07001008
Jens Axboe771b53d02019-10-22 10:25:58 -06001009 i = 0;
Jens Axboe771b53d02019-10-22 10:25:58 -06001010 for_each_online_node(node) {
1011 struct io_wqe *wqe;
1012
Jann Hornad6e0052019-11-26 17:39:45 +01001013 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node);
Jens Axboe771b53d02019-10-22 10:25:58 -06001014 if (!wqe)
1015 break;
1016 wq->wqes[i] = wqe;
1017 wqe->node = node;
Jens Axboec5def4a2019-11-07 11:41:16 -07001018 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1019 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
Jens Axboe576a3472019-11-25 08:49:20 -07001020 if (wq->user) {
Jens Axboec5def4a2019-11-07 11:41:16 -07001021 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1022 task_rlimit(current, RLIMIT_NPROC);
1023 }
1024 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
Jens Axboe771b53d02019-10-22 10:25:58 -06001025 wqe->node = node;
1026 wqe->wq = wq;
1027 spin_lock_init(&wqe->lock);
1028 INIT_LIST_HEAD(&wqe->work_list);
Jens Axboe021d1cd2019-11-14 08:00:41 -07001029 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
1030 INIT_HLIST_NULLS_HEAD(&wqe->busy_list, 1);
Jens Axboee61df662019-11-13 13:54:49 -07001031 INIT_LIST_HEAD(&wqe->all_list);
Jens Axboe771b53d02019-10-22 10:25:58 -06001032
1033 i++;
1034 }
1035
1036 init_completion(&wq->done);
1037
1038 if (i != wq->nr_wqes)
1039 goto err;
1040
1041 /* caller must have already done mmgrab() on this mm */
Jens Axboe576a3472019-11-25 08:49:20 -07001042 wq->mm = data->mm;
Jens Axboe771b53d02019-10-22 10:25:58 -06001043
1044 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
1045 if (!IS_ERR(wq->manager)) {
1046 wake_up_process(wq->manager);
Jens Axboeb60fda62019-11-19 08:37:07 -07001047 wait_for_completion(&wq->done);
1048 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
1049 ret = -ENOMEM;
1050 goto err;
1051 }
1052 reinit_completion(&wq->done);
Jens Axboe771b53d02019-10-22 10:25:58 -06001053 return wq;
1054 }
1055
1056 ret = PTR_ERR(wq->manager);
Jens Axboe771b53d02019-10-22 10:25:58 -06001057 complete(&wq->done);
Jens Axboeb60fda62019-11-19 08:37:07 -07001058err:
1059 for (i = 0; i < wq->nr_wqes; i++)
1060 kfree(wq->wqes[i]);
1061 kfree(wq->wqes);
1062 kfree(wq);
Jens Axboe771b53d02019-10-22 10:25:58 -06001063 return ERR_PTR(ret);
1064}
1065
1066static bool io_wq_worker_wake(struct io_worker *worker, void *data)
1067{
1068 wake_up_process(worker->task);
1069 return false;
1070}
1071
1072void io_wq_destroy(struct io_wq *wq)
1073{
1074 int i;
1075
Jens Axboeb60fda62019-11-19 08:37:07 -07001076 set_bit(IO_WQ_BIT_EXIT, &wq->state);
1077 if (wq->manager)
Jens Axboe771b53d02019-10-22 10:25:58 -06001078 kthread_stop(wq->manager);
Jens Axboe771b53d02019-10-22 10:25:58 -06001079
1080 rcu_read_lock();
1081 for (i = 0; i < wq->nr_wqes; i++) {
1082 struct io_wqe *wqe = wq->wqes[i];
1083
1084 if (!wqe)
1085 continue;
Jens Axboee61df662019-11-13 13:54:49 -07001086 io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
Jens Axboe771b53d02019-10-22 10:25:58 -06001087 }
1088 rcu_read_unlock();
1089
1090 wait_for_completion(&wq->done);
1091
1092 for (i = 0; i < wq->nr_wqes; i++)
1093 kfree(wq->wqes[i]);
1094 kfree(wq->wqes);
1095 kfree(wq);
1096}