Lines Matching refs:wq

21 #include "io-wq.h"
35 IO_WQ_BIT_EXIT = 0, /* wq exiting */
43 * One for each thread in a wq pool
51 struct io_wq *wq;
136 static bool create_io_worker(struct io_wq *wq, int index);
138 static bool io_acct_cancel_pending_work(struct io_wq *wq,
142 static void io_wq_cancel_tw_create(struct io_wq *wq);
155 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
157 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
160 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
163 return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND));
168 return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND);
171 static void io_worker_ref_put(struct io_wq *wq)
173 if (atomic_dec_and_test(&wq->worker_refs))
174 complete(&wq->worker_done);
184 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state);
190 struct io_wq *wq = worker->wq;
193 raw_spin_lock(&wq->lock);
195 raw_spin_unlock(&wq->lock);
196 io_worker_ref_put(wq);
213 struct io_wq *wq = worker->wq;
216 struct callback_head *cb = task_work_cancel_match(wq->task,
227 raw_spin_lock(&wq->lock);
231 raw_spin_unlock(&wq->lock);
241 io_worker_ref_put(wq);
270 static bool io_wq_activate_free_worker(struct io_wq *wq,
282 hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) {
306 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
313 pr_warn_once("io-wq is not configured for unbound workers");
315 raw_spin_lock(&wq->lock);
317 raw_spin_unlock(&wq->lock);
321 raw_spin_unlock(&wq->lock);
323 atomic_inc(&wq->worker_refs);
324 return create_io_worker(wq, acct->index);
337 struct io_wq *wq;
343 wq = worker->wq;
344 acct = &wq->acct[worker->create_index];
345 raw_spin_lock(&wq->lock);
351 raw_spin_unlock(&wq->lock);
353 create_io_worker(wq, worker->create_index);
356 io_worker_ref_put(wq);
366 struct io_wq *wq = worker->wq;
369 if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
383 atomic_inc(&wq->worker_refs);
386 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
390 * now set. wq exit does that too, but we can have added this
393 if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
394 io_wq_cancel_tw_create(wq);
395 io_worker_ref_put(wq);
398 io_worker_ref_put(wq);
404 io_worker_ref_put(wq);
411 struct io_wq *wq = worker->wq;
423 atomic_inc(&wq->worker_refs);
431 static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
435 raw_spin_lock(&wq->lock);
437 raw_spin_unlock(&wq->lock);
444 static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
445 __must_hold(wq->lock)
449 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
458 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
462 spin_lock_irq(&wq->hash->wait.lock);
463 if (list_empty(&wq->wait.entry)) {
464 __add_wait_queue(&wq->hash->wait, &wq->wait);
465 if (!test_bit(hash, &wq->hash->map)) {
467 list_del_init(&wq->wait.entry);
471 spin_unlock_irq(&wq->hash->wait.lock);
482 struct io_wq *wq = worker->wq;
497 tail = wq->hash_tail[hash];
500 if (!test_and_set_bit(hash, &wq->hash->map)) {
501 wq->hash_tail[hash] = NULL;
520 unstalled = io_wait_on_hash(wq, stall_hash);
524 if (wq_has_sleeper(&wq->hash->wait))
525 wake_up(&wq->hash->wait);
553 struct io_wq *wq = worker->wq;
554 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
569 __io_worker_busy(wq, worker);
596 wq->do_work(work);
599 linked = wq->free_work(work);
607 io_wq_enqueue(wq, linked);
611 spin_lock_irq(&wq->hash->wait.lock);
612 clear_bit(hash, &wq->hash->map);
614 spin_unlock_irq(&wq->hash->wait.lock);
615 if (wq_has_sleeper(&wq->hash->wait))
616 wake_up(&wq->hash->wait);
630 struct io_wq *wq = worker->wq;
636 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
639 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
651 raw_spin_lock(&wq->lock);
658 raw_spin_unlock(&wq->lock);
663 __io_worker_idle(wq, worker);
664 raw_spin_unlock(&wq->lock);
678 wq->cpu_mask);
682 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
725 static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker,
730 set_cpus_allowed_ptr(tsk, wq->cpu_mask);
732 raw_spin_lock(&wq->lock);
733 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
734 list_add_tail_rcu(&worker->all_list, &wq->all_list);
736 raw_spin_unlock(&wq->lock);
769 struct io_wq *wq;
773 wq = worker->wq;
776 io_init_new_worker(wq, worker, tsk);
783 raw_spin_lock(&wq->lock);
791 raw_spin_unlock(&wq->lock);
792 while (io_acct_cancel_pending_work(wq, acct, &match))
795 raw_spin_unlock(&wq->lock);
797 io_worker_ref_put(wq);
816 static bool create_io_worker(struct io_wq *wq, int index)
818 struct io_wq_acct *acct = &wq->acct[index];
828 raw_spin_lock(&wq->lock);
830 raw_spin_unlock(&wq->lock);
831 io_worker_ref_put(wq);
836 worker->wq = wq;
845 io_init_new_worker(wq, worker, tsk);
861 static bool io_wq_for_each_worker(struct io_wq *wq,
868 list_for_each_entry_rcu(worker, &wq->all_list, all_list) {
889 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
893 wq->do_work(work);
894 work = wq->free_work(work);
898 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work)
900 struct io_wq_acct *acct = io_work_get_acct(wq, work);
911 tail = wq->hash_tail[hash];
912 wq->hash_tail[hash] = work;
924 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
926 struct io_wq_acct *acct = io_work_get_acct(wq, work);
932 * If io-wq is exiting for this task, or if the request has explicitly
935 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
937 io_run_cancel(work, wq);
942 io_wq_insert_work(wq, work);
947 do_create = !io_wq_activate_free_worker(wq, acct);
954 did_create = io_wq_create_worker(wq, acct);
958 raw_spin_lock(&wq->lock);
960 raw_spin_unlock(&wq->lock);
963 raw_spin_unlock(&wq->lock);
970 io_acct_cancel_pending_work(wq, acct, &match);
1016 static inline void io_wq_remove_pending(struct io_wq *wq,
1020 struct io_wq_acct *acct = io_work_get_acct(wq, work);
1024 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) {
1028 wq->hash_tail[hash] = prev_work;
1030 wq->hash_tail[hash] = NULL;
1035 static bool io_acct_cancel_pending_work(struct io_wq *wq,
1047 io_wq_remove_pending(wq, work, prev);
1049 io_run_cancel(work, wq);
1059 static void io_wq_cancel_pending_work(struct io_wq *wq,
1065 struct io_wq_acct *acct = io_get_acct(wq, i == 0);
1067 if (io_acct_cancel_pending_work(wq, acct, match)) {
1075 static void io_wq_cancel_running_work(struct io_wq *wq,
1079 io_wq_for_each_worker(wq, io_wq_worker_cancel, match);
1083 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1102 * Do both of these while holding the wq->lock, to ensure that
1105 io_wq_cancel_pending_work(wq, &match);
1109 raw_spin_lock(&wq->lock);
1110 io_wq_cancel_running_work(wq, &match);
1111 raw_spin_unlock(&wq->lock);
1125 struct io_wq *wq = container_of(wait, struct io_wq, wait);
1132 struct io_wq_acct *acct = &wq->acct[i];
1135 io_wq_activate_free_worker(wq, acct);
1144 struct io_wq *wq;
1151 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
1152 if (!wq)
1156 wq->hash = data->hash;
1157 wq->free_work = data->free_work;
1158 wq->do_work = data->do_work;
1162 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL))
1164 cpumask_copy(wq->cpu_mask, cpu_possible_mask);
1165 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1166 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1168 INIT_LIST_HEAD(&wq->wait.entry);
1169 wq->wait.func = io_wq_hash_wake;
1171 struct io_wq_acct *acct = &wq->acct[i];
1179 raw_spin_lock_init(&wq->lock);
1180 INIT_HLIST_NULLS_HEAD(&wq->free_list, 0);
1181 INIT_LIST_HEAD(&wq->all_list);
1183 wq->task = get_task_struct(data->task);
1184 atomic_set(&wq->worker_refs, 1);
1185 init_completion(&wq->worker_done);
1186 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1190 return wq;
1193 free_cpumask_var(wq->cpu_mask);
1194 kfree(wq);
1205 return worker->wq == data;
1208 void io_wq_exit_start(struct io_wq *wq)
1210 set_bit(IO_WQ_BIT_EXIT, &wq->state);
1213 static void io_wq_cancel_tw_create(struct io_wq *wq)
1217 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1231 static void io_wq_exit_workers(struct io_wq *wq)
1233 if (!wq->task)
1236 io_wq_cancel_tw_create(wq);
1239 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
1241 io_worker_ref_put(wq);
1242 wait_for_completion(&wq->worker_done);
1244 spin_lock_irq(&wq->hash->wait.lock);
1245 list_del_init(&wq->wait.entry);
1246 spin_unlock_irq(&wq->hash->wait.lock);
1248 put_task_struct(wq->task);
1249 wq->task = NULL;
1252 static void io_wq_destroy(struct io_wq *wq)
1259 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1260 io_wq_cancel_pending_work(wq, &match);
1261 free_cpumask_var(wq->cpu_mask);
1262 io_wq_put_hash(wq->hash);
1263 kfree(wq);
1266 void io_wq_put_and_exit(struct io_wq *wq)
1268 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1270 io_wq_exit_workers(wq);
1271 io_wq_destroy(wq);
1284 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
1286 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);
1290 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1298 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od);
1305 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1307 return __io_wq_cpu_online(wq, cpu, true);
1312 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1314 return __io_wq_cpu_online(wq, cpu, false);
1336 int io_wq_max_workers(struct io_wq *wq, int *new_count)
1356 raw_spin_lock(&wq->lock);
1358 acct = &wq->acct[i];
1363 raw_spin_unlock(&wq->lock);
1376 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",