Lines Matching refs:queue

107 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
112 callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
114 timeout_task->q = queue;
127 task_get_busy(struct taskqueue *queue, struct task *task)
131 TQ_ASSERT_LOCKED(queue);
132 LIST_FOREACH(tb, &queue->tq_active, tb_link) {
144 struct taskqueue *queue;
151 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
152 if (queue == NULL) {
159 STAILQ_INIT(&queue->tq_queue);
160 LIST_INIT(&queue->tq_active);
161 queue->tq_enqueue = enqueue;
162 queue->tq_context = context;
163 queue->tq_name = tq_name;
164 queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
165 queue->tq_flags |= TQ_FLAGS_ACTIVE;
170 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
171 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
173 return (queue);
186 taskqueue_set_callback(struct taskqueue *queue,
195 KASSERT((queue->tq_callbacks[cb_type] == NULL),
198 queue->tq_callbacks[cb_type] = callback;
199 queue->tq_cb_contexts[cb_type] = context;
216 taskqueue_free(struct taskqueue *queue)
219 TQ_LOCK(queue);
220 queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
221 taskqueue_terminate(queue->tq_threads, queue);
222 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
223 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
224 mtx_destroy(&queue->tq_mutex);
225 free(queue->tq_threads, M_TASKQUEUE);
226 free(queue->tq_name, M_TASKQUEUE);
227 free(queue, M_TASKQUEUE);
231 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags)
242 tb = task_get_busy(queue, task);
244 TQ_UNLOCK(queue);
254 TQ_UNLOCK(queue);
259 TQ_UNLOCK(queue);
269 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
271 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
273 prev = queue->tq_hint;
278 ins = STAILQ_FIRST(&queue->tq_queue);
285 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
286 queue->tq_hint = task;
288 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
292 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
293 TQ_UNLOCK(queue);
294 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
295 queue->tq_enqueue(queue->tq_context);
296 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
297 TQ_UNLOCK(queue);
304 taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags)
308 TQ_LOCK(queue);
309 res = taskqueue_enqueue_locked(queue, task, flags);
316 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
318 return (taskqueue_enqueue_flags(queue, task, 0));
324 struct taskqueue *queue;
328 queue = timeout_task->q;
331 queue->tq_callouts--;
337 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,
342 TQ_LOCK(queue);
343 KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
344 ("Migrated queue"));
345 timeout_task->q = queue;
349 TQ_UNLOCK(queue);
352 taskqueue_enqueue_locked(queue, &timeout_task->t, 0);
358 queue->tq_callouts++;
364 if (queue->tq_spin)
366 if (queue->tq_spin && queue->tq_tcount == 1 &&
367 queue->tq_threads[0] == curthread) {
375 TQ_UNLOCK(queue);
381 taskqueue_enqueue_timeout(struct taskqueue *queue,
385 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt,
400 taskqueue_drain_tq_queue(struct taskqueue *queue)
404 if (STAILQ_EMPTY(&queue->tq_queue))
413 * queue lock.
416 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
417 queue->tq_hint = &t_barrier;
425 TQ_SLEEP(queue, &t_barrier, "tq_qdrain");
435 taskqueue_drain_tq_active(struct taskqueue *queue)
440 if (LIST_EMPTY(&queue->tq_active))
444 queue->tq_callouts++;
447 seq = queue->tq_seq;
449 LIST_FOREACH(tb, &queue->tq_active, tb_link) {
451 TQ_SLEEP(queue, tb->tb_running, "tq_adrain");
457 queue->tq_callouts--;
458 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
459 wakeup_one(queue->tq_threads);
464 taskqueue_block(struct taskqueue *queue)
467 TQ_LOCK(queue);
468 queue->tq_flags |= TQ_FLAGS_BLOCKED;
469 TQ_UNLOCK(queue);
473 taskqueue_unblock(struct taskqueue *queue)
476 TQ_LOCK(queue);
477 queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
478 if (!STAILQ_EMPTY(&queue->tq_queue))
479 queue->tq_enqueue(queue->tq_context);
480 TQ_UNLOCK(queue);
484 taskqueue_run_locked(struct taskqueue *queue)
492 KASSERT(queue != NULL, ("tq is NULL"));
493 TQ_ASSERT_LOCKED(queue);
495 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
498 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
499 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
500 if (queue->tq_hint == task)
501 queue->tq_hint = NULL;
505 tb.tb_seq = ++queue->tq_seq;
507 TQ_UNLOCK(queue);
519 TQ_LOCK(queue);
528 taskqueue_run(struct taskqueue *queue)
531 TQ_LOCK(queue);
532 taskqueue_run_locked(queue);
533 TQ_UNLOCK(queue);
542 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task)
546 TQ_LOCK(queue);
547 retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL;
548 TQ_UNLOCK(queue);
554 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
561 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
562 if (queue->tq_hint == task)
563 queue->tq_hint = NULL;
568 tb = task_get_busy(queue, task);
578 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
582 TQ_LOCK(queue);
583 error = taskqueue_cancel_locked(queue, task, pendp);
584 TQ_UNLOCK(queue);
590 taskqueue_cancel_timeout(struct taskqueue *queue,
596 TQ_LOCK(queue);
598 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
601 queue->tq_callouts--;
603 TQ_UNLOCK(queue);
611 taskqueue_drain(struct taskqueue *queue, struct task *task)
614 if (!queue->tq_spin)
617 TQ_LOCK(queue);
618 while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL)
619 TQ_SLEEP(queue, task, "tq_drain");
620 TQ_UNLOCK(queue);
624 taskqueue_drain_all(struct taskqueue *queue)
627 if (!queue->tq_spin)
630 TQ_LOCK(queue);
631 (void)taskqueue_drain_tq_queue(queue);
632 (void)taskqueue_drain_tq_active(queue);
633 TQ_UNLOCK(queue);
637 taskqueue_drain_timeout(struct taskqueue *queue,
644 TQ_LOCK(queue);
648 TQ_UNLOCK(queue);
651 taskqueue_drain(queue, &timeout_task->t);
656 TQ_LOCK(queue);
658 TQ_UNLOCK(queue);
662 taskqueue_quiesce(struct taskqueue *queue)
666 TQ_LOCK(queue);
668 ret = taskqueue_drain_tq_queue(queue);
670 ret = taskqueue_drain_tq_active(queue);
672 TQ_UNLOCK(queue);
867 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
903 taskqueue_member(struct taskqueue *queue, struct thread *td)
908 if (queue->tq_threads[i] == NULL)
910 if (queue->tq_threads[i] == td) {
914 if (++j >= queue->tq_tcount)