1/*-
2 * Copyright (c) 2000 Doug Rabson
3 * Copyright (c) 2014 Jeff Roberson
4 * Copyright (c) 2016 Matthew Macy
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 *    notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 *    notice, this list of conditions and the following disclaimer in the
14 *    documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29#include <sys/cdefs.h>
30__FBSDID("$FreeBSD: releng/11.0/sys/kern/subr_gtaskqueue.c 305267 2016-09-02 01:41:57Z nwhitehorn $");
31
32#include <sys/param.h>
33#include <sys/systm.h>
34#include <sys/bus.h>
35#include <sys/cpuset.h>
36#include <sys/interrupt.h>
37#include <sys/kernel.h>
38#include <sys/kthread.h>
39#include <sys/libkern.h>
40#include <sys/limits.h>
41#include <sys/lock.h>
42#include <sys/malloc.h>
43#include <sys/mutex.h>
44#include <sys/proc.h>
45#include <sys/sched.h>
46#include <sys/smp.h>
47#include <sys/gtaskqueue.h>
48#include <sys/unistd.h>
49#include <machine/stdarg.h>
50
51static MALLOC_DEFINE(M_GTASKQUEUE, "taskqueue", "Task Queues");
52static void	gtaskqueue_thread_enqueue(void *);
53static void	gtaskqueue_thread_loop(void *arg);
54
55
56struct gtaskqueue_busy {
57	struct gtask	*tb_running;
58	TAILQ_ENTRY(gtaskqueue_busy) tb_link;
59};
60
61static struct gtask * const TB_DRAIN_WAITER = (struct gtask *)0x1;
62
63struct gtaskqueue {
64	STAILQ_HEAD(, gtask)	tq_queue;
65	gtaskqueue_enqueue_fn	tq_enqueue;
66	void			*tq_context;
67	char			*tq_name;
68	TAILQ_HEAD(, gtaskqueue_busy) tq_active;
69	struct mtx		tq_mutex;
70	struct thread		**tq_threads;
71	int			tq_tcount;
72	int			tq_spin;
73	int			tq_flags;
74	int			tq_callouts;
75	taskqueue_callback_fn	tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
76	void			*tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
77};
78
79#define	TQ_FLAGS_ACTIVE		(1 << 0)
80#define	TQ_FLAGS_BLOCKED	(1 << 1)
81#define	TQ_FLAGS_UNLOCKED_ENQUEUE	(1 << 2)
82
83#define	DT_CALLOUT_ARMED	(1 << 0)
84
85#define	TQ_LOCK(tq)							\
86	do {								\
87		if ((tq)->tq_spin)					\
88			mtx_lock_spin(&(tq)->tq_mutex);			\
89		else							\
90			mtx_lock(&(tq)->tq_mutex);			\
91	} while (0)
92#define	TQ_ASSERT_LOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_OWNED)
93
94#define	TQ_UNLOCK(tq)							\
95	do {								\
96		if ((tq)->tq_spin)					\
97			mtx_unlock_spin(&(tq)->tq_mutex);		\
98		else							\
99			mtx_unlock(&(tq)->tq_mutex);			\
100	} while (0)
101#define	TQ_ASSERT_UNLOCKED(tq)	mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
102
103static __inline int
104TQ_SLEEP(struct gtaskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
105    int t)
106{
107	if (tq->tq_spin)
108		return (msleep_spin(p, m, wm, t));
109	return (msleep(p, m, pri, wm, t));
110}
111
112static struct gtaskqueue *
113_gtaskqueue_create(const char *name, int mflags,
114		 taskqueue_enqueue_fn enqueue, void *context,
115		 int mtxflags, const char *mtxname __unused)
116{
117	struct gtaskqueue *queue;
118	char *tq_name;
119
120	tq_name = malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
121	if (!tq_name)
122		return (NULL);
123
124	snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue");
125
126	queue = malloc(sizeof(struct gtaskqueue), M_GTASKQUEUE, mflags | M_ZERO);
127	if (!queue)
128		return (NULL);
129
130	STAILQ_INIT(&queue->tq_queue);
131	TAILQ_INIT(&queue->tq_active);
132	queue->tq_enqueue = enqueue;
133	queue->tq_context = context;
134	queue->tq_name = tq_name;
135	queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
136	queue->tq_flags |= TQ_FLAGS_ACTIVE;
137	if (enqueue == gtaskqueue_thread_enqueue)
138		queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
139	mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
140
141	return (queue);
142}
143
144
145/*
146 * Signal a taskqueue thread to terminate.
147 */
148static void
149gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
150{
151
152	while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
153		wakeup(tq);
154		TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
155	}
156}
157
158static void
159gtaskqueue_free(struct gtaskqueue *queue)
160{
161
162	TQ_LOCK(queue);
163	queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
164	gtaskqueue_terminate(queue->tq_threads, queue);
165	KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
166	KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
167	mtx_destroy(&queue->tq_mutex);
168	free(queue->tq_threads, M_GTASKQUEUE);
169	free(queue->tq_name, M_GTASKQUEUE);
170	free(queue, M_GTASKQUEUE);
171}
172
173int
174grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
175{
176	TQ_LOCK(queue);
177	if (gtask->ta_flags & TASK_ENQUEUED) {
178		TQ_UNLOCK(queue);
179		return (0);
180	}
181	STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
182	gtask->ta_flags |= TASK_ENQUEUED;
183	TQ_UNLOCK(queue);
184	if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
185		queue->tq_enqueue(queue->tq_context);
186	return (0);
187}
188
189static void
190gtaskqueue_task_nop_fn(void *context)
191{
192}
193
194/*
195 * Block until all currently queued tasks in this taskqueue
196 * have begun execution.  Tasks queued during execution of
197 * this function are ignored.
198 */
199static void
200gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
201{
202	struct gtask t_barrier;
203
204	if (STAILQ_EMPTY(&queue->tq_queue))
205		return;
206
207	/*
208	 * Enqueue our barrier after all current tasks, but with
209	 * the highest priority so that newly queued tasks cannot
210	 * pass it.  Because of the high priority, we can not use
211	 * taskqueue_enqueue_locked directly (which drops the lock
212	 * anyway) so just insert it at tail while we have the
213	 * queue lock.
214	 */
215	GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
216	STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
217	t_barrier.ta_flags |= TASK_ENQUEUED;
218
219	/*
220	 * Once the barrier has executed, all previously queued tasks
221	 * have completed or are currently executing.
222	 */
223	while (t_barrier.ta_flags & TASK_ENQUEUED)
224		TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
225}
226
227/*
228 * Block until all currently executing tasks for this taskqueue
229 * complete.  Tasks that begin execution during the execution
230 * of this function are ignored.
231 */
232static void
233gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
234{
235	struct gtaskqueue_busy tb_marker, *tb_first;
236
237	if (TAILQ_EMPTY(&queue->tq_active))
238		return;
239
240	/* Block taskq_terminate().*/
241	queue->tq_callouts++;
242
243	/*
244	 * Wait for all currently executing taskqueue threads
245	 * to go idle.
246	 */
247	tb_marker.tb_running = TB_DRAIN_WAITER;
248	TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
249	while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
250		TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
251	TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
252
253	/*
254	 * Wakeup any other drain waiter that happened to queue up
255	 * without any intervening active thread.
256	 */
257	tb_first = TAILQ_FIRST(&queue->tq_active);
258	if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
259		wakeup(tb_first);
260
261	/* Release taskqueue_terminate(). */
262	queue->tq_callouts--;
263	if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
264		wakeup_one(queue->tq_threads);
265}
266
267void
268gtaskqueue_block(struct gtaskqueue *queue)
269{
270
271	TQ_LOCK(queue);
272	queue->tq_flags |= TQ_FLAGS_BLOCKED;
273	TQ_UNLOCK(queue);
274}
275
276void
277gtaskqueue_unblock(struct gtaskqueue *queue)
278{
279
280	TQ_LOCK(queue);
281	queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
282	if (!STAILQ_EMPTY(&queue->tq_queue))
283		queue->tq_enqueue(queue->tq_context);
284	TQ_UNLOCK(queue);
285}
286
287static void
288gtaskqueue_run_locked(struct gtaskqueue *queue)
289{
290	struct gtaskqueue_busy tb;
291	struct gtaskqueue_busy *tb_first;
292	struct gtask *gtask;
293
294	KASSERT(queue != NULL, ("tq is NULL"));
295	TQ_ASSERT_LOCKED(queue);
296	tb.tb_running = NULL;
297
298	while (STAILQ_FIRST(&queue->tq_queue)) {
299		TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
300
301		/*
302		 * Carefully remove the first task from the queue and
303		 * clear its TASK_ENQUEUED flag
304		 */
305		gtask = STAILQ_FIRST(&queue->tq_queue);
306		KASSERT(gtask != NULL, ("task is NULL"));
307		STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
308		gtask->ta_flags &= ~TASK_ENQUEUED;
309		tb.tb_running = gtask;
310		TQ_UNLOCK(queue);
311
312		KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
313		gtask->ta_func(gtask->ta_context);
314
315		TQ_LOCK(queue);
316		tb.tb_running = NULL;
317		wakeup(gtask);
318
319		TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
320		tb_first = TAILQ_FIRST(&queue->tq_active);
321		if (tb_first != NULL &&
322		    tb_first->tb_running == TB_DRAIN_WAITER)
323			wakeup(tb_first);
324	}
325}
326
327static int
328task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
329{
330	struct gtaskqueue_busy *tb;
331
332	TQ_ASSERT_LOCKED(queue);
333	TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
334		if (tb->tb_running == gtask)
335			return (1);
336	}
337	return (0);
338}
339
340static int
341gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
342{
343
344	if (gtask->ta_flags & TASK_ENQUEUED)
345		STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
346	gtask->ta_flags &= ~TASK_ENQUEUED;
347	return (task_is_running(queue, gtask) ? EBUSY : 0);
348}
349
350int
351gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
352{
353	int error;
354
355	TQ_LOCK(queue);
356	error = gtaskqueue_cancel_locked(queue, gtask);
357	TQ_UNLOCK(queue);
358
359	return (error);
360}
361
362void
363gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
364{
365
366	if (!queue->tq_spin)
367		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
368
369	TQ_LOCK(queue);
370	while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
371		TQ_SLEEP(queue, gtask, &queue->tq_mutex, PWAIT, "-", 0);
372	TQ_UNLOCK(queue);
373}
374
375void
376gtaskqueue_drain_all(struct gtaskqueue *queue)
377{
378
379	if (!queue->tq_spin)
380		WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
381
382	TQ_LOCK(queue);
383	gtaskqueue_drain_tq_queue(queue);
384	gtaskqueue_drain_tq_active(queue);
385	TQ_UNLOCK(queue);
386}
387
388static int
389_gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
390    cpuset_t *mask, const char *name, va_list ap)
391{
392	char ktname[MAXCOMLEN + 1];
393	struct thread *td;
394	struct gtaskqueue *tq;
395	int i, error;
396
397	if (count <= 0)
398		return (EINVAL);
399
400	vsnprintf(ktname, sizeof(ktname), name, ap);
401	tq = *tqp;
402
403	tq->tq_threads = malloc(sizeof(struct thread *) * count, M_GTASKQUEUE,
404	    M_NOWAIT | M_ZERO);
405	if (tq->tq_threads == NULL) {
406		printf("%s: no memory for %s threads\n", __func__, ktname);
407		return (ENOMEM);
408	}
409
410	for (i = 0; i < count; i++) {
411		if (count == 1)
412			error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
413			    &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
414		else
415			error = kthread_add(gtaskqueue_thread_loop, tqp, NULL,
416			    &tq->tq_threads[i], RFSTOPPED, 0,
417			    "%s_%d", ktname, i);
418		if (error) {
419			/* should be ok to continue, taskqueue_free will dtrt */
420			printf("%s: kthread_add(%s): error %d", __func__,
421			    ktname, error);
422			tq->tq_threads[i] = NULL;		/* paranoid */
423		} else
424			tq->tq_tcount++;
425	}
426	for (i = 0; i < count; i++) {
427		if (tq->tq_threads[i] == NULL)
428			continue;
429		td = tq->tq_threads[i];
430		if (mask) {
431			error = cpuset_setthread(td->td_tid, mask);
432			/*
433			 * Failing to pin is rarely an actual fatal error;
434			 * it'll just affect performance.
435			 */
436			if (error)
437				printf("%s: curthread=%llu: can't pin; "
438				    "error=%d\n",
439				    __func__,
440				    (unsigned long long) td->td_tid,
441				    error);
442		}
443		thread_lock(td);
444		sched_prio(td, pri);
445		sched_add(td, SRQ_BORING);
446		thread_unlock(td);
447	}
448
449	return (0);
450}
451
452static int
453gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
454    const char *name, ...)
455{
456	va_list ap;
457	int error;
458
459	va_start(ap, name);
460	error = _gtaskqueue_start_threads(tqp, count, pri, NULL, name, ap);
461	va_end(ap);
462	return (error);
463}
464
465static inline void
466gtaskqueue_run_callback(struct gtaskqueue *tq,
467    enum taskqueue_callback_type cb_type)
468{
469	taskqueue_callback_fn tq_callback;
470
471	TQ_ASSERT_UNLOCKED(tq);
472	tq_callback = tq->tq_callbacks[cb_type];
473	if (tq_callback != NULL)
474		tq_callback(tq->tq_cb_contexts[cb_type]);
475}
476
477static void
478gtaskqueue_thread_loop(void *arg)
479{
480	struct gtaskqueue **tqp, *tq;
481
482	tqp = arg;
483	tq = *tqp;
484	gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
485	TQ_LOCK(tq);
486	while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
487		/* XXX ? */
488		gtaskqueue_run_locked(tq);
489		/*
490		 * Because taskqueue_run() can drop tq_mutex, we need to
491		 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
492		 * meantime, which means we missed a wakeup.
493		 */
494		if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
495			break;
496		TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
497	}
498	gtaskqueue_run_locked(tq);
499	/*
500	 * This thread is on its way out, so just drop the lock temporarily
501	 * in order to call the shutdown callback.  This allows the callback
502	 * to look at the taskqueue, even just before it dies.
503	 */
504	TQ_UNLOCK(tq);
505	gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
506	TQ_LOCK(tq);
507
508	/* rendezvous with thread that asked us to terminate */
509	tq->tq_tcount--;
510	wakeup_one(tq->tq_threads);
511	TQ_UNLOCK(tq);
512	kthread_exit();
513}
514
515static void
516gtaskqueue_thread_enqueue(void *context)
517{
518	struct gtaskqueue **tqp, *tq;
519
520	tqp = context;
521	tq = *tqp;
522	wakeup_one(tq);
523}
524
525
526static struct gtaskqueue *
527gtaskqueue_create_fast(const char *name, int mflags,
528		 taskqueue_enqueue_fn enqueue, void *context)
529{
530	return _gtaskqueue_create(name, mflags, enqueue, context,
531			MTX_SPIN, "fast_taskqueue");
532}
533
534
535struct taskqgroup_cpu {
536	LIST_HEAD(, grouptask)	tgc_tasks;
537	struct gtaskqueue	*tgc_taskq;
538	int	tgc_cnt;
539	int	tgc_cpu;
540};
541
542struct taskqgroup {
543	struct taskqgroup_cpu tqg_queue[MAXCPU];
544	struct mtx	tqg_lock;
545	char *		tqg_name;
546	int		tqg_adjusting;
547	int		tqg_stride;
548	int		tqg_cnt;
549};
550
551struct taskq_bind_task {
552	struct gtask bt_task;
553	int	bt_cpuid;
554};
555
556static void
557taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx)
558{
559	struct taskqgroup_cpu *qcpu;
560
561	qcpu = &qgroup->tqg_queue[idx];
562	LIST_INIT(&qcpu->tgc_tasks);
563	qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
564	    taskqueue_thread_enqueue, &qcpu->tgc_taskq);
565	gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
566	    "%s_%d", qgroup->tqg_name, idx);
567	qcpu->tgc_cpu = idx * qgroup->tqg_stride;
568}
569
570static void
571taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
572{
573
574	gtaskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
575}
576
577/*
578 * Find the taskq with least # of tasks that doesn't currently have any
579 * other queues from the uniq identifier.
580 */
581static int
582taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
583{
584	struct grouptask *n;
585	int i, idx, mincnt;
586	int strict;
587
588	mtx_assert(&qgroup->tqg_lock, MA_OWNED);
589	if (qgroup->tqg_cnt == 0)
590		return (0);
591	idx = -1;
592	mincnt = INT_MAX;
593	/*
594	 * Two passes;  First scan for a queue with the least tasks that
595	 * does not already service this uniq id.  If that fails simply find
596	 * the queue with the least total tasks;
597	 */
598	for (strict = 1; mincnt == INT_MAX; strict = 0) {
599		for (i = 0; i < qgroup->tqg_cnt; i++) {
600			if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
601				continue;
602			if (strict) {
603				LIST_FOREACH(n,
604				    &qgroup->tqg_queue[i].tgc_tasks, gt_list)
605					if (n->gt_uniq == uniq)
606						break;
607				if (n != NULL)
608					continue;
609			}
610			mincnt = qgroup->tqg_queue[i].tgc_cnt;
611			idx = i;
612		}
613	}
614	if (idx == -1)
615		panic("taskqgroup_find: Failed to pick a qid.");
616
617	return (idx);
618}
619
620void
621taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
622    void *uniq, int irq, char *name)
623{
624	cpuset_t mask;
625	int qid;
626
627	gtask->gt_uniq = uniq;
628	gtask->gt_name = name;
629	gtask->gt_irq = irq;
630	gtask->gt_cpu = -1;
631	mtx_lock(&qgroup->tqg_lock);
632	qid = taskqgroup_find(qgroup, uniq);
633	qgroup->tqg_queue[qid].tgc_cnt++;
634	LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
635	gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
636	if (irq != -1 && smp_started) {
637		CPU_ZERO(&mask);
638		CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
639		mtx_unlock(&qgroup->tqg_lock);
640		intr_setaffinity(irq, &mask);
641	} else
642		mtx_unlock(&qgroup->tqg_lock);
643}
644
645int
646taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
647	void *uniq, int cpu, int irq, char *name)
648{
649	cpuset_t mask;
650	int i, qid;
651
652	qid = -1;
653	gtask->gt_uniq = uniq;
654	gtask->gt_name = name;
655	gtask->gt_irq = irq;
656	gtask->gt_cpu = cpu;
657	mtx_lock(&qgroup->tqg_lock);
658	if (smp_started) {
659		for (i = 0; i < qgroup->tqg_cnt; i++)
660			if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
661				qid = i;
662				break;
663			}
664		if (qid == -1) {
665			mtx_unlock(&qgroup->tqg_lock);
666			return (EINVAL);
667		}
668	} else
669		qid = 0;
670	qgroup->tqg_queue[qid].tgc_cnt++;
671	LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
672	gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
673	if (irq != -1 && smp_started) {
674		CPU_ZERO(&mask);
675		CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
676		mtx_unlock(&qgroup->tqg_lock);
677		intr_setaffinity(irq, &mask);
678	} else
679		mtx_unlock(&qgroup->tqg_lock);
680	return (0);
681}
682
683void
684taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
685{
686	int i;
687
688	mtx_lock(&qgroup->tqg_lock);
689	for (i = 0; i < qgroup->tqg_cnt; i++)
690		if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
691			break;
692	if (i == qgroup->tqg_cnt)
693		panic("taskqgroup_detach: task not in group\n");
694	qgroup->tqg_queue[i].tgc_cnt--;
695	LIST_REMOVE(gtask, gt_list);
696	mtx_unlock(&qgroup->tqg_lock);
697	gtask->gt_taskqueue = NULL;
698}
699
700static void
701taskqgroup_binder(void *ctx)
702{
703	struct taskq_bind_task *gtask = (struct taskq_bind_task *)ctx;
704	cpuset_t mask;
705	int error;
706
707	CPU_ZERO(&mask);
708	CPU_SET(gtask->bt_cpuid, &mask);
709	error = cpuset_setthread(curthread->td_tid, &mask);
710	thread_lock(curthread);
711	sched_bind(curthread, gtask->bt_cpuid);
712	thread_unlock(curthread);
713
714	if (error)
715		printf("taskqgroup_binder: setaffinity failed: %d\n",
716		    error);
717	free(gtask, M_DEVBUF);
718}
719
720static void
721taskqgroup_bind(struct taskqgroup *qgroup)
722{
723	struct taskq_bind_task *gtask;
724	int i;
725
726	/*
727	 * Bind taskqueue threads to specific CPUs, if they have been assigned
728	 * one.
729	 */
730	for (i = 0; i < qgroup->tqg_cnt; i++) {
731		gtask = malloc(sizeof (*gtask), M_DEVBUF, M_NOWAIT);
732		GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
733		gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
734		grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
735		    &gtask->bt_task);
736	}
737}
738
739static int
740_taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
741{
742	LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
743	cpuset_t mask;
744	struct grouptask *gtask;
745	int i, k, old_cnt, qid, cpu;
746
747	mtx_assert(&qgroup->tqg_lock, MA_OWNED);
748
749	if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) {
750		printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n",
751			   cnt, stride, mp_ncpus, smp_started);
752		return (EINVAL);
753	}
754	if (qgroup->tqg_adjusting) {
755		printf("taskqgroup_adjust failed: adjusting\n");
756		return (EBUSY);
757	}
758	qgroup->tqg_adjusting = 1;
759	old_cnt = qgroup->tqg_cnt;
760	mtx_unlock(&qgroup->tqg_lock);
761	/*
762	 * Set up queue for tasks added before boot.
763	 */
764	if (old_cnt == 0) {
765		LIST_SWAP(&gtask_head, &qgroup->tqg_queue[0].tgc_tasks,
766		    grouptask, gt_list);
767		qgroup->tqg_queue[0].tgc_cnt = 0;
768	}
769
770	/*
771	 * If new taskq threads have been added.
772	 */
773	for (i = old_cnt; i < cnt; i++)
774		taskqgroup_cpu_create(qgroup, i);
775	mtx_lock(&qgroup->tqg_lock);
776	qgroup->tqg_cnt = cnt;
777	qgroup->tqg_stride = stride;
778
779	/*
780	 * Adjust drivers to use new taskqs.
781	 */
782	for (i = 0; i < old_cnt; i++) {
783		while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
784			LIST_REMOVE(gtask, gt_list);
785			qgroup->tqg_queue[i].tgc_cnt--;
786			LIST_INSERT_HEAD(&gtask_head, gtask, gt_list);
787		}
788	}
789
790	while ((gtask = LIST_FIRST(&gtask_head))) {
791		LIST_REMOVE(gtask, gt_list);
792		if (gtask->gt_cpu == -1)
793			qid = taskqgroup_find(qgroup, gtask->gt_uniq);
794		else {
795			for (i = 0; i < qgroup->tqg_cnt; i++)
796				if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) {
797					qid = i;
798					break;
799				}
800		}
801		qgroup->tqg_queue[qid].tgc_cnt++;
802		LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
803		    gt_list);
804		gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
805	}
806	/*
807	 * Set new CPU and IRQ affinity
808	 */
809	cpu = CPU_FIRST();
810	for (i = 0; i < cnt; i++) {
811		qgroup->tqg_queue[i].tgc_cpu = cpu;
812		for (k = 0; k < qgroup->tqg_stride; k++)
813			cpu = CPU_NEXT(cpu);
814		CPU_ZERO(&mask);
815		CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask);
816		LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) {
817			if (gtask->gt_irq == -1)
818				continue;
819			intr_setaffinity(gtask->gt_irq, &mask);
820		}
821	}
822	mtx_unlock(&qgroup->tqg_lock);
823
824	/*
825	 * If taskq thread count has been reduced.
826	 */
827	for (i = cnt; i < old_cnt; i++)
828		taskqgroup_cpu_remove(qgroup, i);
829
830	mtx_lock(&qgroup->tqg_lock);
831	qgroup->tqg_adjusting = 0;
832
833	taskqgroup_bind(qgroup);
834
835	return (0);
836}
837
838int
839taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride)
840{
841	int error;
842
843	mtx_lock(&qgroup->tqg_lock);
844	error = _taskqgroup_adjust(qgroup, cpu, stride);
845	mtx_unlock(&qgroup->tqg_lock);
846
847	return (error);
848}
849
850struct taskqgroup *
851taskqgroup_create(char *name)
852{
853	struct taskqgroup *qgroup;
854
855	qgroup = malloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
856	mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
857	qgroup->tqg_name = name;
858	LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
859
860	return (qgroup);
861}
862
863void
864taskqgroup_destroy(struct taskqgroup *qgroup)
865{
866
867}
868