blob: 54e270ae12ab7bd18539edac8ce8e887c1798815 [file] [log] [blame]
Jens Axboe771b53d02019-10-22 10:25:58 -06001// SPDX-License-Identifier: GPL-2.0
2/*
3 * Basic worker thread pool for io_uring
4 *
5 * Copyright (C) 2019 Jens Axboe
6 *
7 */
8#include <linux/kernel.h>
9#include <linux/init.h>
10#include <linux/errno.h>
11#include <linux/sched/signal.h>
12#include <linux/mm.h>
13#include <linux/mmu_context.h>
14#include <linux/sched/mm.h>
15#include <linux/percpu.h>
16#include <linux/slab.h>
17#include <linux/kthread.h>
18#include <linux/rculist_nulls.h>
19
20#include "io-wq.h"
21
22#define WORKER_IDLE_TIMEOUT (5 * HZ)
23
24enum {
25 IO_WORKER_F_UP = 1, /* up and active */
26 IO_WORKER_F_RUNNING = 2, /* account as running */
27 IO_WORKER_F_FREE = 4, /* worker on free list */
28 IO_WORKER_F_EXITING = 8, /* worker exiting */
29 IO_WORKER_F_FIXED = 16, /* static idle worker */
Jens Axboec5def4a2019-11-07 11:41:16 -070030 IO_WORKER_F_BOUND = 32, /* is doing bounded work */
Jens Axboe771b53d02019-10-22 10:25:58 -060031};
32
33enum {
34 IO_WQ_BIT_EXIT = 0, /* wq exiting */
35 IO_WQ_BIT_CANCEL = 1, /* cancel work on list */
Jens Axboeb60fda62019-11-19 08:37:07 -070036 IO_WQ_BIT_ERROR = 2, /* error on setup */
Jens Axboe771b53d02019-10-22 10:25:58 -060037};
38
39enum {
40 IO_WQE_FLAG_STALLED = 1, /* stalled on hash */
41};
42
43/*
44 * One for each thread in a wqe pool
45 */
46struct io_worker {
47 refcount_t ref;
48 unsigned flags;
49 struct hlist_nulls_node nulls_node;
Jens Axboee61df662019-11-13 13:54:49 -070050 struct list_head all_list;
Jens Axboe771b53d02019-10-22 10:25:58 -060051 struct task_struct *task;
Jens Axboe771b53d02019-10-22 10:25:58 -060052 struct io_wqe *wqe;
Jens Axboe36c2f922019-11-13 09:43:34 -070053
Jens Axboe771b53d02019-10-22 10:25:58 -060054 struct io_wq_work *cur_work;
Jens Axboe36c2f922019-11-13 09:43:34 -070055 spinlock_t lock;
Jens Axboe771b53d02019-10-22 10:25:58 -060056
57 struct rcu_head rcu;
58 struct mm_struct *mm;
Jens Axboe181e4482019-11-25 08:52:30 -070059 const struct cred *creds;
Jens Axboefcb323c2019-10-24 12:39:47 -060060 struct files_struct *restore_files;
Jens Axboe771b53d02019-10-22 10:25:58 -060061};
62
Jens Axboe771b53d02019-10-22 10:25:58 -060063#if BITS_PER_LONG == 64
64#define IO_WQ_HASH_ORDER 6
65#else
66#define IO_WQ_HASH_ORDER 5
67#endif
68
Jens Axboec5def4a2019-11-07 11:41:16 -070069struct io_wqe_acct {
70 unsigned nr_workers;
71 unsigned max_workers;
72 atomic_t nr_running;
73};
74
75enum {
76 IO_WQ_ACCT_BOUND,
77 IO_WQ_ACCT_UNBOUND,
78};
79
Jens Axboe771b53d02019-10-22 10:25:58 -060080/*
81 * Per-node worker thread pool
82 */
83struct io_wqe {
84 struct {
85 spinlock_t lock;
Jens Axboe6206f0e2019-11-26 11:59:32 -070086 struct io_wq_work_list work_list;
Jens Axboe771b53d02019-10-22 10:25:58 -060087 unsigned long hash_map;
88 unsigned flags;
89 } ____cacheline_aligned_in_smp;
90
91 int node;
Jens Axboec5def4a2019-11-07 11:41:16 -070092 struct io_wqe_acct acct[2];
Jens Axboe771b53d02019-10-22 10:25:58 -060093
Jens Axboe021d1cd2019-11-14 08:00:41 -070094 struct hlist_nulls_head free_list;
Jens Axboee61df662019-11-13 13:54:49 -070095 struct list_head all_list;
Jens Axboe771b53d02019-10-22 10:25:58 -060096
97 struct io_wq *wq;
98};
99
100/*
101 * Per io_wq state
102 */
103struct io_wq {
104 struct io_wqe **wqes;
105 unsigned long state;
Jens Axboe771b53d02019-10-22 10:25:58 -0600106
Jens Axboe7d723062019-11-12 22:31:31 -0700107 get_work_fn *get_work;
108 put_work_fn *put_work;
109
Jens Axboe771b53d02019-10-22 10:25:58 -0600110 struct task_struct *manager;
Jens Axboec5def4a2019-11-07 11:41:16 -0700111 struct user_struct *user;
Jens Axboe0b8c0ec2019-12-02 08:50:00 -0700112 const struct cred *creds;
Jens Axboe771b53d02019-10-22 10:25:58 -0600113 struct mm_struct *mm;
114 refcount_t refs;
115 struct completion done;
Jens Axboe848f7e12020-01-23 15:33:32 -0700116
117 refcount_t use_refs;
Jens Axboe771b53d02019-10-22 10:25:58 -0600118};
119
Jens Axboe771b53d02019-10-22 10:25:58 -0600120static bool io_worker_get(struct io_worker *worker)
121{
122 return refcount_inc_not_zero(&worker->ref);
123}
124
125static void io_worker_release(struct io_worker *worker)
126{
127 if (refcount_dec_and_test(&worker->ref))
128 wake_up_process(worker->task);
129}
130
131/*
132 * Note: drops the wqe->lock if returning true! The caller must re-acquire
133 * the lock in that case. Some callers need to restart handling if this
134 * happens, so we can't just re-acquire the lock on behalf of the caller.
135 */
136static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
137{
Jens Axboefcb323c2019-10-24 12:39:47 -0600138 bool dropped_lock = false;
139
Jens Axboe181e4482019-11-25 08:52:30 -0700140 if (worker->creds) {
141 revert_creds(worker->creds);
142 worker->creds = NULL;
143 }
144
Jens Axboefcb323c2019-10-24 12:39:47 -0600145 if (current->files != worker->restore_files) {
146 __acquire(&wqe->lock);
147 spin_unlock_irq(&wqe->lock);
148 dropped_lock = true;
149
150 task_lock(current);
151 current->files = worker->restore_files;
152 task_unlock(current);
153 }
154
Jens Axboe771b53d02019-10-22 10:25:58 -0600155 /*
156 * If we have an active mm, we need to drop the wq lock before unusing
157 * it. If we do, return true and let the caller retry the idle loop.
158 */
159 if (worker->mm) {
Jens Axboefcb323c2019-10-24 12:39:47 -0600160 if (!dropped_lock) {
161 __acquire(&wqe->lock);
162 spin_unlock_irq(&wqe->lock);
163 dropped_lock = true;
164 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600165 __set_current_state(TASK_RUNNING);
166 set_fs(KERNEL_DS);
167 unuse_mm(worker->mm);
168 mmput(worker->mm);
169 worker->mm = NULL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600170 }
171
Jens Axboefcb323c2019-10-24 12:39:47 -0600172 return dropped_lock;
Jens Axboe771b53d02019-10-22 10:25:58 -0600173}
174
Jens Axboec5def4a2019-11-07 11:41:16 -0700175static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
176 struct io_wq_work *work)
177{
178 if (work->flags & IO_WQ_WORK_UNBOUND)
179 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
180
181 return &wqe->acct[IO_WQ_ACCT_BOUND];
182}
183
184static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
185 struct io_worker *worker)
186{
187 if (worker->flags & IO_WORKER_F_BOUND)
188 return &wqe->acct[IO_WQ_ACCT_BOUND];
189
190 return &wqe->acct[IO_WQ_ACCT_UNBOUND];
191}
192
Jens Axboe771b53d02019-10-22 10:25:58 -0600193static void io_worker_exit(struct io_worker *worker)
194{
195 struct io_wqe *wqe = worker->wqe;
Jens Axboec5def4a2019-11-07 11:41:16 -0700196 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
197 unsigned nr_workers;
Jens Axboe771b53d02019-10-22 10:25:58 -0600198
199 /*
200 * If we're not at zero, someone else is holding a brief reference
201 * to the worker. Wait for that to go away.
202 */
203 set_current_state(TASK_INTERRUPTIBLE);
204 if (!refcount_dec_and_test(&worker->ref))
205 schedule();
206 __set_current_state(TASK_RUNNING);
207
208 preempt_disable();
209 current->flags &= ~PF_IO_WORKER;
210 if (worker->flags & IO_WORKER_F_RUNNING)
Jens Axboec5def4a2019-11-07 11:41:16 -0700211 atomic_dec(&acct->nr_running);
212 if (!(worker->flags & IO_WORKER_F_BOUND))
213 atomic_dec(&wqe->wq->user->processes);
Jens Axboe771b53d02019-10-22 10:25:58 -0600214 worker->flags = 0;
215 preempt_enable();
216
217 spin_lock_irq(&wqe->lock);
218 hlist_nulls_del_rcu(&worker->nulls_node);
Jens Axboee61df662019-11-13 13:54:49 -0700219 list_del_rcu(&worker->all_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600220 if (__io_worker_unuse(wqe, worker)) {
221 __release(&wqe->lock);
222 spin_lock_irq(&wqe->lock);
223 }
Jens Axboec5def4a2019-11-07 11:41:16 -0700224 acct->nr_workers--;
225 nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers +
226 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers;
Jens Axboe771b53d02019-10-22 10:25:58 -0600227 spin_unlock_irq(&wqe->lock);
228
229 /* all workers gone, wq exit can proceed */
Jens Axboec5def4a2019-11-07 11:41:16 -0700230 if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs))
Jens Axboe771b53d02019-10-22 10:25:58 -0600231 complete(&wqe->wq->done);
232
YueHaibing364b05f2019-11-02 15:55:01 +0800233 kfree_rcu(worker, rcu);
Jens Axboe771b53d02019-10-22 10:25:58 -0600234}
235
Jens Axboec5def4a2019-11-07 11:41:16 -0700236static inline bool io_wqe_run_queue(struct io_wqe *wqe)
237 __must_hold(wqe->lock)
238{
Jens Axboe6206f0e2019-11-26 11:59:32 -0700239 if (!wq_list_empty(&wqe->work_list) &&
240 !(wqe->flags & IO_WQE_FLAG_STALLED))
Jens Axboec5def4a2019-11-07 11:41:16 -0700241 return true;
242 return false;
243}
244
245/*
246 * Check head of free list for an available worker. If one isn't available,
247 * caller must wake up the wq manager to create one.
248 */
249static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
250 __must_hold(RCU)
251{
252 struct hlist_nulls_node *n;
253 struct io_worker *worker;
254
Jens Axboe021d1cd2019-11-14 08:00:41 -0700255 n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list));
Jens Axboec5def4a2019-11-07 11:41:16 -0700256 if (is_a_nulls(n))
257 return false;
258
259 worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
260 if (io_worker_get(worker)) {
Jens Axboe506d95f2019-12-07 21:03:59 -0700261 wake_up_process(worker->task);
Jens Axboec5def4a2019-11-07 11:41:16 -0700262 io_worker_release(worker);
263 return true;
264 }
265
266 return false;
267}
268
269/*
270 * We need a worker. If we find a free one, we're good. If not, and we're
271 * below the max number of workers, wake up the manager to create one.
272 */
273static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
274{
275 bool ret;
276
277 /*
278 * Most likely an attempt to queue unbounded work on an io_wq that
279 * wasn't setup with any unbounded workers.
280 */
281 WARN_ON_ONCE(!acct->max_workers);
282
283 rcu_read_lock();
284 ret = io_wqe_activate_free_worker(wqe);
285 rcu_read_unlock();
286
287 if (!ret && acct->nr_workers < acct->max_workers)
288 wake_up_process(wqe->wq->manager);
289}
290
291static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
292{
293 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
294
295 atomic_inc(&acct->nr_running);
296}
297
298static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
299 __must_hold(wqe->lock)
300{
301 struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
302
303 if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
304 io_wqe_wake_worker(wqe, acct);
305}
306
Jens Axboe771b53d02019-10-22 10:25:58 -0600307static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
308{
309 allow_kernel_signal(SIGINT);
310
311 current->flags |= PF_IO_WORKER;
312
313 worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
Jens Axboefcb323c2019-10-24 12:39:47 -0600314 worker->restore_files = current->files;
Jens Axboec5def4a2019-11-07 11:41:16 -0700315 io_wqe_inc_running(wqe, worker);
Jens Axboe771b53d02019-10-22 10:25:58 -0600316}
317
318/*
319 * Worker will start processing some work. Move it to the busy list, if
320 * it's currently on the freelist
321 */
322static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
323 struct io_wq_work *work)
324 __must_hold(wqe->lock)
325{
Jens Axboec5def4a2019-11-07 11:41:16 -0700326 bool worker_bound, work_bound;
327
Jens Axboe771b53d02019-10-22 10:25:58 -0600328 if (worker->flags & IO_WORKER_F_FREE) {
329 worker->flags &= ~IO_WORKER_F_FREE;
330 hlist_nulls_del_init_rcu(&worker->nulls_node);
Jens Axboe771b53d02019-10-22 10:25:58 -0600331 }
Jens Axboec5def4a2019-11-07 11:41:16 -0700332
333 /*
334 * If worker is moving from bound to unbound (or vice versa), then
335 * ensure we update the running accounting.
336 */
Dan Carpenterb2e9c7d62019-11-19 09:22:16 +0300337 worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
338 work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
339 if (worker_bound != work_bound) {
Jens Axboec5def4a2019-11-07 11:41:16 -0700340 io_wqe_dec_running(wqe, worker);
341 if (work_bound) {
342 worker->flags |= IO_WORKER_F_BOUND;
343 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
344 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
345 atomic_dec(&wqe->wq->user->processes);
346 } else {
347 worker->flags &= ~IO_WORKER_F_BOUND;
348 wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
349 wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
350 atomic_inc(&wqe->wq->user->processes);
351 }
352 io_wqe_inc_running(wqe, worker);
353 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600354}
355
356/*
357 * No work, worker going to sleep. Move to freelist, and unuse mm if we
358 * have one attached. Dropping the mm may potentially sleep, so we drop
359 * the lock in that case and return success. Since the caller has to
360 * retry the loop in that case (we changed task state), we don't regrab
361 * the lock if we return success.
362 */
363static bool __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
364 __must_hold(wqe->lock)
365{
366 if (!(worker->flags & IO_WORKER_F_FREE)) {
367 worker->flags |= IO_WORKER_F_FREE;
Jens Axboe021d1cd2019-11-14 08:00:41 -0700368 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600369 }
370
371 return __io_worker_unuse(wqe, worker);
372}
373
374static struct io_wq_work *io_get_next_work(struct io_wqe *wqe, unsigned *hash)
375 __must_hold(wqe->lock)
376{
Jens Axboe6206f0e2019-11-26 11:59:32 -0700377 struct io_wq_work_node *node, *prev;
Jens Axboe771b53d02019-10-22 10:25:58 -0600378 struct io_wq_work *work;
379
Jens Axboe6206f0e2019-11-26 11:59:32 -0700380 wq_list_for_each(node, prev, &wqe->work_list) {
381 work = container_of(node, struct io_wq_work, list);
382
Jens Axboe771b53d02019-10-22 10:25:58 -0600383 /* not hashed, can run anytime */
384 if (!(work->flags & IO_WQ_WORK_HASHED)) {
Jens Axboe6206f0e2019-11-26 11:59:32 -0700385 wq_node_del(&wqe->work_list, node, prev);
Jens Axboe771b53d02019-10-22 10:25:58 -0600386 return work;
387 }
388
389 /* hashed, can run if not already running */
390 *hash = work->flags >> IO_WQ_HASH_SHIFT;
391 if (!(wqe->hash_map & BIT_ULL(*hash))) {
392 wqe->hash_map |= BIT_ULL(*hash);
Jens Axboe6206f0e2019-11-26 11:59:32 -0700393 wq_node_del(&wqe->work_list, node, prev);
Jens Axboe771b53d02019-10-22 10:25:58 -0600394 return work;
395 }
396 }
397
398 return NULL;
399}
400
401static void io_worker_handle_work(struct io_worker *worker)
402 __releases(wqe->lock)
403{
Jens Axboe7d723062019-11-12 22:31:31 -0700404 struct io_wq_work *work, *old_work = NULL, *put_work = NULL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600405 struct io_wqe *wqe = worker->wqe;
406 struct io_wq *wq = wqe->wq;
407
408 do {
409 unsigned hash = -1U;
410
411 /*
Jens Axboe771b53d02019-10-22 10:25:58 -0600412 * If we got some work, mark us as busy. If we didn't, but
413 * the list isn't empty, it means we stalled on hashed work.
414 * Mark us stalled so we don't keep looking for work when we
415 * can't make progress, any work completion or insertion will
416 * clear the stalled flag.
417 */
418 work = io_get_next_work(wqe, &hash);
419 if (work)
420 __io_worker_busy(wqe, worker, work);
Jens Axboe6206f0e2019-11-26 11:59:32 -0700421 else if (!wq_list_empty(&wqe->work_list))
Jens Axboe771b53d02019-10-22 10:25:58 -0600422 wqe->flags |= IO_WQE_FLAG_STALLED;
423
424 spin_unlock_irq(&wqe->lock);
Jens Axboe7d723062019-11-12 22:31:31 -0700425 if (put_work && wq->put_work)
426 wq->put_work(old_work);
Jens Axboe771b53d02019-10-22 10:25:58 -0600427 if (!work)
428 break;
429next:
Jens Axboe36c2f922019-11-13 09:43:34 -0700430 /* flush any pending signals before assigning new work */
431 if (signal_pending(current))
432 flush_signals(current);
433
Hillf Dantonfd1c4bc2019-12-24 09:14:29 -0700434 cond_resched();
435
Jens Axboe36c2f922019-11-13 09:43:34 -0700436 spin_lock_irq(&worker->lock);
437 worker->cur_work = work;
438 spin_unlock_irq(&worker->lock);
439
Jens Axboeb76da702019-11-20 13:05:32 -0700440 if (work->flags & IO_WQ_WORK_CB)
441 work->func(&work);
442
Jens Axboefcb323c2019-10-24 12:39:47 -0600443 if ((work->flags & IO_WQ_WORK_NEEDS_FILES) &&
444 current->files != work->files) {
445 task_lock(current);
446 current->files = work->files;
447 task_unlock(current);
448 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600449 if ((work->flags & IO_WQ_WORK_NEEDS_USER) && !worker->mm &&
Jens Axboee0bbb342020-01-14 22:06:11 -0700450 wq->mm) {
451 if (mmget_not_zero(wq->mm)) {
452 use_mm(wq->mm);
453 set_fs(USER_DS);
454 worker->mm = wq->mm;
455 } else {
456 work->flags |= IO_WQ_WORK_CANCEL;
457 }
Jens Axboe771b53d02019-10-22 10:25:58 -0600458 }
Jens Axboe181e4482019-11-25 08:52:30 -0700459 if (!worker->creds)
460 worker->creds = override_creds(wq->creds);
Jens Axboe0c9d5cc2019-12-11 19:29:43 -0700461 /*
462 * OK to set IO_WQ_WORK_CANCEL even for uncancellable work,
463 * the worker function will do the right thing.
464 */
Jens Axboe771b53d02019-10-22 10:25:58 -0600465 if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
466 work->flags |= IO_WQ_WORK_CANCEL;
467 if (worker->mm)
468 work->flags |= IO_WQ_WORK_HAS_MM;
469
Jens Axboe7d723062019-11-12 22:31:31 -0700470 if (wq->get_work && !(work->flags & IO_WQ_WORK_INTERNAL)) {
471 put_work = work;
472 wq->get_work(work);
473 }
474
Jens Axboe771b53d02019-10-22 10:25:58 -0600475 old_work = work;
476 work->func(&work);
477
Jens Axboe36c2f922019-11-13 09:43:34 -0700478 spin_lock_irq(&worker->lock);
Jens Axboe771b53d02019-10-22 10:25:58 -0600479 worker->cur_work = NULL;
Jens Axboe36c2f922019-11-13 09:43:34 -0700480 spin_unlock_irq(&worker->lock);
481
482 spin_lock_irq(&wqe->lock);
483
Jens Axboe771b53d02019-10-22 10:25:58 -0600484 if (hash != -1U) {
485 wqe->hash_map &= ~BIT_ULL(hash);
486 wqe->flags &= ~IO_WQE_FLAG_STALLED;
487 }
488 if (work && work != old_work) {
489 spin_unlock_irq(&wqe->lock);
Jens Axboe7d723062019-11-12 22:31:31 -0700490
491 if (put_work && wq->put_work) {
492 wq->put_work(put_work);
493 put_work = NULL;
494 }
495
Jens Axboe771b53d02019-10-22 10:25:58 -0600496 /* dependent work not hashed */
497 hash = -1U;
498 goto next;
499 }
500 } while (1);
501}
502
Jens Axboee995d512019-12-07 21:06:46 -0700503static inline void io_worker_spin_for_work(struct io_wqe *wqe)
504{
505 int i = 0;
506
507 while (++i < 1000) {
508 if (io_wqe_run_queue(wqe))
509 break;
510 if (need_resched())
511 break;
512 cpu_relax();
513 }
514}
515
Jens Axboe771b53d02019-10-22 10:25:58 -0600516static int io_wqe_worker(void *data)
517{
518 struct io_worker *worker = data;
519 struct io_wqe *wqe = worker->wqe;
520 struct io_wq *wq = wqe->wq;
Jens Axboee995d512019-12-07 21:06:46 -0700521 bool did_work;
Jens Axboe771b53d02019-10-22 10:25:58 -0600522
523 io_worker_start(wqe, worker);
524
Jens Axboee995d512019-12-07 21:06:46 -0700525 did_work = false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600526 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
Jens Axboe506d95f2019-12-07 21:03:59 -0700527 set_current_state(TASK_INTERRUPTIBLE);
Jens Axboee995d512019-12-07 21:06:46 -0700528loop:
529 if (did_work)
530 io_worker_spin_for_work(wqe);
Jens Axboe771b53d02019-10-22 10:25:58 -0600531 spin_lock_irq(&wqe->lock);
532 if (io_wqe_run_queue(wqe)) {
533 __set_current_state(TASK_RUNNING);
534 io_worker_handle_work(worker);
Jens Axboee995d512019-12-07 21:06:46 -0700535 did_work = true;
536 goto loop;
Jens Axboe771b53d02019-10-22 10:25:58 -0600537 }
Jens Axboee995d512019-12-07 21:06:46 -0700538 did_work = false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600539 /* drops the lock on success, retry */
540 if (__io_worker_idle(wqe, worker)) {
541 __release(&wqe->lock);
Jens Axboee995d512019-12-07 21:06:46 -0700542 goto loop;
Jens Axboe771b53d02019-10-22 10:25:58 -0600543 }
544 spin_unlock_irq(&wqe->lock);
545 if (signal_pending(current))
546 flush_signals(current);
547 if (schedule_timeout(WORKER_IDLE_TIMEOUT))
548 continue;
549 /* timed out, exit unless we're the fixed worker */
550 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
551 !(worker->flags & IO_WORKER_F_FIXED))
552 break;
553 }
554
Jens Axboe771b53d02019-10-22 10:25:58 -0600555 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
556 spin_lock_irq(&wqe->lock);
Jens Axboe6206f0e2019-11-26 11:59:32 -0700557 if (!wq_list_empty(&wqe->work_list))
Jens Axboe771b53d02019-10-22 10:25:58 -0600558 io_worker_handle_work(worker);
559 else
560 spin_unlock_irq(&wqe->lock);
561 }
562
563 io_worker_exit(worker);
564 return 0;
565}
566
567/*
Jens Axboe771b53d02019-10-22 10:25:58 -0600568 * Called when a worker is scheduled in. Mark us as currently running.
569 */
570void io_wq_worker_running(struct task_struct *tsk)
571{
572 struct io_worker *worker = kthread_data(tsk);
573 struct io_wqe *wqe = worker->wqe;
574
575 if (!(worker->flags & IO_WORKER_F_UP))
576 return;
577 if (worker->flags & IO_WORKER_F_RUNNING)
578 return;
579 worker->flags |= IO_WORKER_F_RUNNING;
Jens Axboec5def4a2019-11-07 11:41:16 -0700580 io_wqe_inc_running(wqe, worker);
Jens Axboe771b53d02019-10-22 10:25:58 -0600581}
582
583/*
584 * Called when worker is going to sleep. If there are no workers currently
585 * running and we have work pending, wake up a free one or have the manager
586 * set one up.
587 */
588void io_wq_worker_sleeping(struct task_struct *tsk)
589{
590 struct io_worker *worker = kthread_data(tsk);
591 struct io_wqe *wqe = worker->wqe;
592
593 if (!(worker->flags & IO_WORKER_F_UP))
594 return;
595 if (!(worker->flags & IO_WORKER_F_RUNNING))
596 return;
597
598 worker->flags &= ~IO_WORKER_F_RUNNING;
599
600 spin_lock_irq(&wqe->lock);
Jens Axboec5def4a2019-11-07 11:41:16 -0700601 io_wqe_dec_running(wqe, worker);
Jens Axboe771b53d02019-10-22 10:25:58 -0600602 spin_unlock_irq(&wqe->lock);
603}
604
Jens Axboeb60fda62019-11-19 08:37:07 -0700605static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
Jens Axboe771b53d02019-10-22 10:25:58 -0600606{
Jens Axboec5def4a2019-11-07 11:41:16 -0700607 struct io_wqe_acct *acct =&wqe->acct[index];
Jens Axboe771b53d02019-10-22 10:25:58 -0600608 struct io_worker *worker;
609
Jann Hornad6e0052019-11-26 17:39:45 +0100610 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
Jens Axboe771b53d02019-10-22 10:25:58 -0600611 if (!worker)
Jens Axboeb60fda62019-11-19 08:37:07 -0700612 return false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600613
614 refcount_set(&worker->ref, 1);
615 worker->nulls_node.pprev = NULL;
Jens Axboe771b53d02019-10-22 10:25:58 -0600616 worker->wqe = wqe;
Jens Axboe36c2f922019-11-13 09:43:34 -0700617 spin_lock_init(&worker->lock);
Jens Axboe771b53d02019-10-22 10:25:58 -0600618
619 worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
Jens Axboec5def4a2019-11-07 11:41:16 -0700620 "io_wqe_worker-%d/%d", index, wqe->node);
Jens Axboe771b53d02019-10-22 10:25:58 -0600621 if (IS_ERR(worker->task)) {
622 kfree(worker);
Jens Axboeb60fda62019-11-19 08:37:07 -0700623 return false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600624 }
625
626 spin_lock_irq(&wqe->lock);
Jens Axboe021d1cd2019-11-14 08:00:41 -0700627 hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
Jens Axboee61df662019-11-13 13:54:49 -0700628 list_add_tail_rcu(&worker->all_list, &wqe->all_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600629 worker->flags |= IO_WORKER_F_FREE;
Jens Axboec5def4a2019-11-07 11:41:16 -0700630 if (index == IO_WQ_ACCT_BOUND)
631 worker->flags |= IO_WORKER_F_BOUND;
632 if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
Jens Axboe771b53d02019-10-22 10:25:58 -0600633 worker->flags |= IO_WORKER_F_FIXED;
Jens Axboec5def4a2019-11-07 11:41:16 -0700634 acct->nr_workers++;
Jens Axboe771b53d02019-10-22 10:25:58 -0600635 spin_unlock_irq(&wqe->lock);
636
Jens Axboec5def4a2019-11-07 11:41:16 -0700637 if (index == IO_WQ_ACCT_UNBOUND)
638 atomic_inc(&wq->user->processes);
639
Jens Axboe771b53d02019-10-22 10:25:58 -0600640 wake_up_process(worker->task);
Jens Axboeb60fda62019-11-19 08:37:07 -0700641 return true;
Jens Axboe771b53d02019-10-22 10:25:58 -0600642}
643
Jens Axboec5def4a2019-11-07 11:41:16 -0700644static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
Jens Axboe771b53d02019-10-22 10:25:58 -0600645 __must_hold(wqe->lock)
646{
Jens Axboec5def4a2019-11-07 11:41:16 -0700647 struct io_wqe_acct *acct = &wqe->acct[index];
Jens Axboe771b53d02019-10-22 10:25:58 -0600648
Jens Axboec5def4a2019-11-07 11:41:16 -0700649 /* if we have available workers or no work, no need */
Jens Axboe021d1cd2019-11-14 08:00:41 -0700650 if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
Jens Axboec5def4a2019-11-07 11:41:16 -0700651 return false;
652 return acct->nr_workers < acct->max_workers;
Jens Axboe771b53d02019-10-22 10:25:58 -0600653}
654
655/*
656 * Manager thread. Tasked with creating new workers, if we need them.
657 */
658static int io_wq_manager(void *data)
659{
660 struct io_wq *wq = data;
Jann Horn3fc50ab2019-11-26 19:10:20 +0100661 int workers_to_create = num_possible_nodes();
662 int node;
Jens Axboeb60fda62019-11-19 08:37:07 -0700663
664 /* create fixed workers */
Jann Horn3fc50ab2019-11-26 19:10:20 +0100665 refcount_set(&wq->refs, workers_to_create);
666 for_each_node(node) {
667 if (!create_io_worker(wq, wq->wqes[node], IO_WQ_ACCT_BOUND))
668 goto err;
669 workers_to_create--;
Jens Axboeb60fda62019-11-19 08:37:07 -0700670 }
671
672 complete(&wq->done);
Jens Axboe771b53d02019-10-22 10:25:58 -0600673
674 while (!kthread_should_stop()) {
Jann Horn3fc50ab2019-11-26 19:10:20 +0100675 for_each_node(node) {
676 struct io_wqe *wqe = wq->wqes[node];
Jens Axboec5def4a2019-11-07 11:41:16 -0700677 bool fork_worker[2] = { false, false };
Jens Axboe771b53d02019-10-22 10:25:58 -0600678
679 spin_lock_irq(&wqe->lock);
Jens Axboec5def4a2019-11-07 11:41:16 -0700680 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
681 fork_worker[IO_WQ_ACCT_BOUND] = true;
682 if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
683 fork_worker[IO_WQ_ACCT_UNBOUND] = true;
Jens Axboe771b53d02019-10-22 10:25:58 -0600684 spin_unlock_irq(&wqe->lock);
Jens Axboec5def4a2019-11-07 11:41:16 -0700685 if (fork_worker[IO_WQ_ACCT_BOUND])
686 create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
687 if (fork_worker[IO_WQ_ACCT_UNBOUND])
688 create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
Jens Axboe771b53d02019-10-22 10:25:58 -0600689 }
690 set_current_state(TASK_INTERRUPTIBLE);
691 schedule_timeout(HZ);
692 }
693
694 return 0;
Jens Axboeb60fda62019-11-19 08:37:07 -0700695err:
696 set_bit(IO_WQ_BIT_ERROR, &wq->state);
697 set_bit(IO_WQ_BIT_EXIT, &wq->state);
Jann Horn3fc50ab2019-11-26 19:10:20 +0100698 if (refcount_sub_and_test(workers_to_create, &wq->refs))
Jens Axboeb60fda62019-11-19 08:37:07 -0700699 complete(&wq->done);
700 return 0;
Jens Axboe771b53d02019-10-22 10:25:58 -0600701}
702
Jens Axboec5def4a2019-11-07 11:41:16 -0700703static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
704 struct io_wq_work *work)
705{
706 bool free_worker;
707
708 if (!(work->flags & IO_WQ_WORK_UNBOUND))
709 return true;
710 if (atomic_read(&acct->nr_running))
711 return true;
712
713 rcu_read_lock();
Jens Axboe021d1cd2019-11-14 08:00:41 -0700714 free_worker = !hlist_nulls_empty(&wqe->free_list);
Jens Axboec5def4a2019-11-07 11:41:16 -0700715 rcu_read_unlock();
716 if (free_worker)
717 return true;
718
719 if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
720 !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
721 return false;
722
723 return true;
724}
725
Jens Axboe771b53d02019-10-22 10:25:58 -0600726static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
727{
Jens Axboec5def4a2019-11-07 11:41:16 -0700728 struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
Jens Axboe895e2ca2019-12-17 08:46:33 -0700729 int work_flags;
Jens Axboe771b53d02019-10-22 10:25:58 -0600730 unsigned long flags;
731
Jens Axboec5def4a2019-11-07 11:41:16 -0700732 /*
733 * Do early check to see if we need a new unbound worker, and if we do,
734 * if we're allowed to do so. This isn't 100% accurate as there's a
735 * gap between this check and incrementing the value, but that's OK.
736 * It's close enough to not be an issue, fork() has the same delay.
737 */
738 if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
739 work->flags |= IO_WQ_WORK_CANCEL;
740 work->func(&work);
741 return;
742 }
743
Jens Axboe895e2ca2019-12-17 08:46:33 -0700744 work_flags = work->flags;
Jens Axboe771b53d02019-10-22 10:25:58 -0600745 spin_lock_irqsave(&wqe->lock, flags);
Jens Axboe6206f0e2019-11-26 11:59:32 -0700746 wq_list_add_tail(&work->list, &wqe->work_list);
Jens Axboe771b53d02019-10-22 10:25:58 -0600747 wqe->flags &= ~IO_WQE_FLAG_STALLED;
748 spin_unlock_irqrestore(&wqe->lock, flags);
749
Jens Axboe895e2ca2019-12-17 08:46:33 -0700750 if ((work_flags & IO_WQ_WORK_CONCURRENT) ||
751 !atomic_read(&acct->nr_running))
Jens Axboec5def4a2019-11-07 11:41:16 -0700752 io_wqe_wake_worker(wqe, acct);
Jens Axboe771b53d02019-10-22 10:25:58 -0600753}
754
755void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
756{
757 struct io_wqe *wqe = wq->wqes[numa_node_id()];
758
759 io_wqe_enqueue(wqe, work);
760}
761
762/*
763 * Enqueue work, hashed by some key. Work items that hash to the same value
764 * will not be done in parallel. Used to limit concurrent writes, generally
765 * hashed by inode.
766 */
767void io_wq_enqueue_hashed(struct io_wq *wq, struct io_wq_work *work, void *val)
768{
769 struct io_wqe *wqe = wq->wqes[numa_node_id()];
770 unsigned bit;
771
772
773 bit = hash_ptr(val, IO_WQ_HASH_ORDER);
774 work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
775 io_wqe_enqueue(wqe, work);
776}
777
778static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
779{
780 send_sig(SIGINT, worker->task, 1);
781 return false;
782}
783
784/*
785 * Iterate the passed in list and call the specific function for each
786 * worker that isn't exiting
787 */
788static bool io_wq_for_each_worker(struct io_wqe *wqe,
Jens Axboe771b53d02019-10-22 10:25:58 -0600789 bool (*func)(struct io_worker *, void *),
790 void *data)
791{
Jens Axboe771b53d02019-10-22 10:25:58 -0600792 struct io_worker *worker;
793 bool ret = false;
794
Jens Axboee61df662019-11-13 13:54:49 -0700795 list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
Jens Axboe771b53d02019-10-22 10:25:58 -0600796 if (io_worker_get(worker)) {
797 ret = func(worker, data);
798 io_worker_release(worker);
799 if (ret)
800 break;
801 }
802 }
Jens Axboee61df662019-11-13 13:54:49 -0700803
Jens Axboe771b53d02019-10-22 10:25:58 -0600804 return ret;
805}
806
807void io_wq_cancel_all(struct io_wq *wq)
808{
Jann Horn3fc50ab2019-11-26 19:10:20 +0100809 int node;
Jens Axboe771b53d02019-10-22 10:25:58 -0600810
811 set_bit(IO_WQ_BIT_CANCEL, &wq->state);
812
Jens Axboe771b53d02019-10-22 10:25:58 -0600813 rcu_read_lock();
Jann Horn3fc50ab2019-11-26 19:10:20 +0100814 for_each_node(node) {
815 struct io_wqe *wqe = wq->wqes[node];
Jens Axboe771b53d02019-10-22 10:25:58 -0600816
Jens Axboee61df662019-11-13 13:54:49 -0700817 io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
Jens Axboe771b53d02019-10-22 10:25:58 -0600818 }
819 rcu_read_unlock();
820}
821
Jens Axboe62755e32019-10-28 21:49:21 -0600822struct io_cb_cancel_data {
823 struct io_wqe *wqe;
824 work_cancel_fn *cancel;
825 void *caller_data;
826};
827
828static bool io_work_cancel(struct io_worker *worker, void *cancel_data)
829{
830 struct io_cb_cancel_data *data = cancel_data;
Jens Axboe6f726532019-11-05 13:51:51 -0700831 unsigned long flags;
Jens Axboe62755e32019-10-28 21:49:21 -0600832 bool ret = false;
833
834 /*
835 * Hold the lock to avoid ->cur_work going out of scope, caller
Jens Axboe36c2f922019-11-13 09:43:34 -0700836 * may dereference the passed in work.
Jens Axboe62755e32019-10-28 21:49:21 -0600837 */
Jens Axboe36c2f922019-11-13 09:43:34 -0700838 spin_lock_irqsave(&worker->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600839 if (worker->cur_work &&
Jens Axboe0c9d5cc2019-12-11 19:29:43 -0700840 !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL) &&
Jens Axboe62755e32019-10-28 21:49:21 -0600841 data->cancel(worker->cur_work, data->caller_data)) {
842 send_sig(SIGINT, worker->task, 1);
843 ret = true;
844 }
Jens Axboe36c2f922019-11-13 09:43:34 -0700845 spin_unlock_irqrestore(&worker->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600846
847 return ret;
848}
849
850static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
851 work_cancel_fn *cancel,
852 void *cancel_data)
853{
854 struct io_cb_cancel_data data = {
855 .wqe = wqe,
856 .cancel = cancel,
857 .caller_data = cancel_data,
858 };
Jens Axboe6206f0e2019-11-26 11:59:32 -0700859 struct io_wq_work_node *node, *prev;
Jens Axboe62755e32019-10-28 21:49:21 -0600860 struct io_wq_work *work;
Jens Axboe6f726532019-11-05 13:51:51 -0700861 unsigned long flags;
Jens Axboe62755e32019-10-28 21:49:21 -0600862 bool found = false;
863
Jens Axboe6f726532019-11-05 13:51:51 -0700864 spin_lock_irqsave(&wqe->lock, flags);
Jens Axboe6206f0e2019-11-26 11:59:32 -0700865 wq_list_for_each(node, prev, &wqe->work_list) {
866 work = container_of(node, struct io_wq_work, list);
867
Jens Axboe62755e32019-10-28 21:49:21 -0600868 if (cancel(work, cancel_data)) {
Jens Axboe6206f0e2019-11-26 11:59:32 -0700869 wq_node_del(&wqe->work_list, node, prev);
Jens Axboe62755e32019-10-28 21:49:21 -0600870 found = true;
871 break;
872 }
873 }
Jens Axboe6f726532019-11-05 13:51:51 -0700874 spin_unlock_irqrestore(&wqe->lock, flags);
Jens Axboe62755e32019-10-28 21:49:21 -0600875
876 if (found) {
877 work->flags |= IO_WQ_WORK_CANCEL;
878 work->func(&work);
879 return IO_WQ_CANCEL_OK;
880 }
881
882 rcu_read_lock();
Jens Axboee61df662019-11-13 13:54:49 -0700883 found = io_wq_for_each_worker(wqe, io_work_cancel, &data);
Jens Axboe62755e32019-10-28 21:49:21 -0600884 rcu_read_unlock();
885 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
886}
887
888enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
889 void *data)
890{
891 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
Jann Horn3fc50ab2019-11-26 19:10:20 +0100892 int node;
Jens Axboe62755e32019-10-28 21:49:21 -0600893
Jann Horn3fc50ab2019-11-26 19:10:20 +0100894 for_each_node(node) {
895 struct io_wqe *wqe = wq->wqes[node];
Jens Axboe62755e32019-10-28 21:49:21 -0600896
897 ret = io_wqe_cancel_cb_work(wqe, cancel, data);
898 if (ret != IO_WQ_CANCEL_NOTFOUND)
899 break;
900 }
901
902 return ret;
903}
904
Jens Axboe771b53d02019-10-22 10:25:58 -0600905static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
906{
907 struct io_wq_work *work = data;
Jens Axboe36c2f922019-11-13 09:43:34 -0700908 unsigned long flags;
909 bool ret = false;
Jens Axboe771b53d02019-10-22 10:25:58 -0600910
Jens Axboe36c2f922019-11-13 09:43:34 -0700911 if (worker->cur_work != work)
912 return false;
913
914 spin_lock_irqsave(&worker->lock, flags);
Jens Axboe0c9d5cc2019-12-11 19:29:43 -0700915 if (worker->cur_work == work &&
916 !(worker->cur_work->flags & IO_WQ_WORK_NO_CANCEL)) {
Jens Axboe771b53d02019-10-22 10:25:58 -0600917 send_sig(SIGINT, worker->task, 1);
Jens Axboe36c2f922019-11-13 09:43:34 -0700918 ret = true;
Jens Axboe771b53d02019-10-22 10:25:58 -0600919 }
Jens Axboe36c2f922019-11-13 09:43:34 -0700920 spin_unlock_irqrestore(&worker->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600921
Jens Axboe36c2f922019-11-13 09:43:34 -0700922 return ret;
Jens Axboe771b53d02019-10-22 10:25:58 -0600923}
924
925static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
926 struct io_wq_work *cwork)
927{
Jens Axboe6206f0e2019-11-26 11:59:32 -0700928 struct io_wq_work_node *node, *prev;
Jens Axboe771b53d02019-10-22 10:25:58 -0600929 struct io_wq_work *work;
Jens Axboe6f726532019-11-05 13:51:51 -0700930 unsigned long flags;
Jens Axboe771b53d02019-10-22 10:25:58 -0600931 bool found = false;
932
933 cwork->flags |= IO_WQ_WORK_CANCEL;
934
935 /*
936 * First check pending list, if we're lucky we can just remove it
937 * from there. CANCEL_OK means that the work is returned as-new,
938 * no completion will be posted for it.
939 */
Jens Axboe6f726532019-11-05 13:51:51 -0700940 spin_lock_irqsave(&wqe->lock, flags);
Jens Axboe6206f0e2019-11-26 11:59:32 -0700941 wq_list_for_each(node, prev, &wqe->work_list) {
942 work = container_of(node, struct io_wq_work, list);
943
Jens Axboe771b53d02019-10-22 10:25:58 -0600944 if (work == cwork) {
Jens Axboe6206f0e2019-11-26 11:59:32 -0700945 wq_node_del(&wqe->work_list, node, prev);
Jens Axboe771b53d02019-10-22 10:25:58 -0600946 found = true;
947 break;
948 }
949 }
Jens Axboe6f726532019-11-05 13:51:51 -0700950 spin_unlock_irqrestore(&wqe->lock, flags);
Jens Axboe771b53d02019-10-22 10:25:58 -0600951
952 if (found) {
953 work->flags |= IO_WQ_WORK_CANCEL;
954 work->func(&work);
955 return IO_WQ_CANCEL_OK;
956 }
957
958 /*
959 * Now check if a free (going busy) or busy worker has the work
960 * currently running. If we find it there, we'll return CANCEL_RUNNING
Brian Gianforcarod195a662019-12-13 03:09:50 -0800961 * as an indication that we attempt to signal cancellation. The
Jens Axboe771b53d02019-10-22 10:25:58 -0600962 * completion will run normally in this case.
963 */
964 rcu_read_lock();
Jens Axboee61df662019-11-13 13:54:49 -0700965 found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork);
Jens Axboe771b53d02019-10-22 10:25:58 -0600966 rcu_read_unlock();
967 return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
968}
969
970enum io_wq_cancel io_wq_cancel_work(struct io_wq *wq, struct io_wq_work *cwork)
971{
972 enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND;
Jann Horn3fc50ab2019-11-26 19:10:20 +0100973 int node;
Jens Axboe771b53d02019-10-22 10:25:58 -0600974
Jann Horn3fc50ab2019-11-26 19:10:20 +0100975 for_each_node(node) {
976 struct io_wqe *wqe = wq->wqes[node];
Jens Axboe771b53d02019-10-22 10:25:58 -0600977
978 ret = io_wqe_cancel_work(wqe, cwork);
979 if (ret != IO_WQ_CANCEL_NOTFOUND)
980 break;
981 }
982
983 return ret;
984}
985
986struct io_wq_flush_data {
987 struct io_wq_work work;
988 struct completion done;
989};
990
991static void io_wq_flush_func(struct io_wq_work **workptr)
992{
993 struct io_wq_work *work = *workptr;
994 struct io_wq_flush_data *data;
995
996 data = container_of(work, struct io_wq_flush_data, work);
997 complete(&data->done);
998}
999
1000/*
1001 * Doesn't wait for previously queued work to finish. When this completes,
1002 * it just means that previously queued work was started.
1003 */
1004void io_wq_flush(struct io_wq *wq)
1005{
1006 struct io_wq_flush_data data;
Jann Horn3fc50ab2019-11-26 19:10:20 +01001007 int node;
Jens Axboe771b53d02019-10-22 10:25:58 -06001008
Jann Horn3fc50ab2019-11-26 19:10:20 +01001009 for_each_node(node) {
1010 struct io_wqe *wqe = wq->wqes[node];
Jens Axboe771b53d02019-10-22 10:25:58 -06001011
1012 init_completion(&data.done);
1013 INIT_IO_WORK(&data.work, io_wq_flush_func);
Jens Axboe7d723062019-11-12 22:31:31 -07001014 data.work.flags |= IO_WQ_WORK_INTERNAL;
Jens Axboe771b53d02019-10-22 10:25:58 -06001015 io_wqe_enqueue(wqe, &data.work);
1016 wait_for_completion(&data.done);
1017 }
1018}
1019
Jens Axboe576a3472019-11-25 08:49:20 -07001020struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
Jens Axboe771b53d02019-10-22 10:25:58 -06001021{
Jann Horn3fc50ab2019-11-26 19:10:20 +01001022 int ret = -ENOMEM, node;
Jens Axboe771b53d02019-10-22 10:25:58 -06001023 struct io_wq *wq;
1024
Jann Hornad6e0052019-11-26 17:39:45 +01001025 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
Jens Axboe771b53d02019-10-22 10:25:58 -06001026 if (!wq)
1027 return ERR_PTR(-ENOMEM);
1028
Jann Horn3fc50ab2019-11-26 19:10:20 +01001029 wq->wqes = kcalloc(nr_node_ids, sizeof(struct io_wqe *), GFP_KERNEL);
Jens Axboe771b53d02019-10-22 10:25:58 -06001030 if (!wq->wqes) {
1031 kfree(wq);
1032 return ERR_PTR(-ENOMEM);
1033 }
1034
Jens Axboe576a3472019-11-25 08:49:20 -07001035 wq->get_work = data->get_work;
1036 wq->put_work = data->put_work;
Jens Axboe7d723062019-11-12 22:31:31 -07001037
Jens Axboec5def4a2019-11-07 11:41:16 -07001038 /* caller must already hold a reference to this */
Jens Axboe576a3472019-11-25 08:49:20 -07001039 wq->user = data->user;
Jens Axboe181e4482019-11-25 08:52:30 -07001040 wq->creds = data->creds;
Jens Axboec5def4a2019-11-07 11:41:16 -07001041
Jann Horn3fc50ab2019-11-26 19:10:20 +01001042 for_each_node(node) {
Jens Axboe771b53d02019-10-22 10:25:58 -06001043 struct io_wqe *wqe;
1044
Jann Hornad6e0052019-11-26 17:39:45 +01001045 wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, node);
Jens Axboe771b53d02019-10-22 10:25:58 -06001046 if (!wqe)
Jann Horn3fc50ab2019-11-26 19:10:20 +01001047 goto err;
1048 wq->wqes[node] = wqe;
Jens Axboe771b53d02019-10-22 10:25:58 -06001049 wqe->node = node;
Jens Axboec5def4a2019-11-07 11:41:16 -07001050 wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1051 atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
Jens Axboe576a3472019-11-25 08:49:20 -07001052 if (wq->user) {
Jens Axboec5def4a2019-11-07 11:41:16 -07001053 wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1054 task_rlimit(current, RLIMIT_NPROC);
1055 }
1056 atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
Jens Axboe771b53d02019-10-22 10:25:58 -06001057 wqe->node = node;
1058 wqe->wq = wq;
1059 spin_lock_init(&wqe->lock);
Jens Axboe6206f0e2019-11-26 11:59:32 -07001060 INIT_WQ_LIST(&wqe->work_list);
Jens Axboe021d1cd2019-11-14 08:00:41 -07001061 INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
Jens Axboee61df662019-11-13 13:54:49 -07001062 INIT_LIST_HEAD(&wqe->all_list);
Jens Axboe771b53d02019-10-22 10:25:58 -06001063 }
1064
1065 init_completion(&wq->done);
1066
Jens Axboe771b53d02019-10-22 10:25:58 -06001067 /* caller must have already done mmgrab() on this mm */
Jens Axboe576a3472019-11-25 08:49:20 -07001068 wq->mm = data->mm;
Jens Axboe771b53d02019-10-22 10:25:58 -06001069
1070 wq->manager = kthread_create(io_wq_manager, wq, "io_wq_manager");
1071 if (!IS_ERR(wq->manager)) {
1072 wake_up_process(wq->manager);
Jens Axboeb60fda62019-11-19 08:37:07 -07001073 wait_for_completion(&wq->done);
1074 if (test_bit(IO_WQ_BIT_ERROR, &wq->state)) {
1075 ret = -ENOMEM;
1076 goto err;
1077 }
Jens Axboe848f7e12020-01-23 15:33:32 -07001078 refcount_set(&wq->use_refs, 1);
Jens Axboeb60fda62019-11-19 08:37:07 -07001079 reinit_completion(&wq->done);
Jens Axboe771b53d02019-10-22 10:25:58 -06001080 return wq;
1081 }
1082
1083 ret = PTR_ERR(wq->manager);
Jens Axboe771b53d02019-10-22 10:25:58 -06001084 complete(&wq->done);
Jens Axboeb60fda62019-11-19 08:37:07 -07001085err:
Jann Horn3fc50ab2019-11-26 19:10:20 +01001086 for_each_node(node)
1087 kfree(wq->wqes[node]);
Jens Axboeb60fda62019-11-19 08:37:07 -07001088 kfree(wq->wqes);
1089 kfree(wq);
Jens Axboe771b53d02019-10-22 10:25:58 -06001090 return ERR_PTR(ret);
1091}
1092
1093static bool io_wq_worker_wake(struct io_worker *worker, void *data)
1094{
1095 wake_up_process(worker->task);
1096 return false;
1097}
1098
Jens Axboe848f7e12020-01-23 15:33:32 -07001099static void __io_wq_destroy(struct io_wq *wq)
Jens Axboe771b53d02019-10-22 10:25:58 -06001100{
Jann Horn3fc50ab2019-11-26 19:10:20 +01001101 int node;
Jens Axboe771b53d02019-10-22 10:25:58 -06001102
Jens Axboeb60fda62019-11-19 08:37:07 -07001103 set_bit(IO_WQ_BIT_EXIT, &wq->state);
1104 if (wq->manager)
Jens Axboe771b53d02019-10-22 10:25:58 -06001105 kthread_stop(wq->manager);
Jens Axboe771b53d02019-10-22 10:25:58 -06001106
1107 rcu_read_lock();
Jann Horn3fc50ab2019-11-26 19:10:20 +01001108 for_each_node(node)
1109 io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
Jens Axboe771b53d02019-10-22 10:25:58 -06001110 rcu_read_unlock();
1111
1112 wait_for_completion(&wq->done);
1113
Jann Horn3fc50ab2019-11-26 19:10:20 +01001114 for_each_node(node)
1115 kfree(wq->wqes[node]);
Jens Axboe771b53d02019-10-22 10:25:58 -06001116 kfree(wq->wqes);
1117 kfree(wq);
1118}
Jens Axboe848f7e12020-01-23 15:33:32 -07001119
1120void io_wq_destroy(struct io_wq *wq)
1121{
1122 if (refcount_dec_and_test(&wq->use_refs))
1123 __io_wq_destroy(wq);
1124}