1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
22 * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
23 * Use is subject to license terms.
24 */
25/*
26 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
27 * Copyright 2012 Garrett D'Amore <garrett@damore.org>.  All rights reserved.
28 * Copyright (c) 2014 by Delphix. All rights reserved.
29 */
30
31#include <sys/zfs_context.h>
32
33int taskq_now;
34taskq_t *system_taskq;
35taskq_t *system_delay_taskq;
36
37static pthread_key_t taskq_tsd;
38
39#define	TASKQ_ACTIVE	0x00010000
40
41static taskq_ent_t *
42task_alloc(taskq_t *tq, int tqflags)
43{
44	taskq_ent_t *t;
45	int rv;
46
47again:	if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) {
48		ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC));
49		tq->tq_freelist = t->tqent_next;
50	} else {
51		if (tq->tq_nalloc >= tq->tq_maxalloc) {
52			if (!(tqflags & KM_SLEEP))
53				return (NULL);
54
55			/*
56			 * We don't want to exceed tq_maxalloc, but we can't
57			 * wait for other tasks to complete (and thus free up
58			 * task structures) without risking deadlock with
59			 * the caller.  So, we just delay for one second
60			 * to throttle the allocation rate. If we have tasks
61			 * complete before one second timeout expires then
62			 * taskq_ent_free will signal us and we will
63			 * immediately retry the allocation.
64			 */
65			tq->tq_maxalloc_wait++;
66			rv = cv_timedwait(&tq->tq_maxalloc_cv,
67			    &tq->tq_lock, ddi_get_lbolt() + hz);
68			tq->tq_maxalloc_wait--;
69			if (rv > 0)
70				goto again;		/* signaled */
71		}
72		mutex_exit(&tq->tq_lock);
73
74		t = kmem_alloc(sizeof (taskq_ent_t), tqflags);
75
76		mutex_enter(&tq->tq_lock);
77		if (t != NULL) {
78			/* Make sure we start without any flags */
79			t->tqent_flags = 0;
80			tq->tq_nalloc++;
81		}
82	}
83	return (t);
84}
85
86static void
87task_free(taskq_t *tq, taskq_ent_t *t)
88{
89	if (tq->tq_nalloc <= tq->tq_minalloc) {
90		t->tqent_next = tq->tq_freelist;
91		tq->tq_freelist = t;
92	} else {
93		tq->tq_nalloc--;
94		mutex_exit(&tq->tq_lock);
95		kmem_free(t, sizeof (taskq_ent_t));
96		mutex_enter(&tq->tq_lock);
97	}
98
99	if (tq->tq_maxalloc_wait)
100		cv_signal(&tq->tq_maxalloc_cv);
101}
102
103taskqid_t
104taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags)
105{
106	taskq_ent_t *t;
107
108	if (taskq_now) {
109		func(arg);
110		return (1);
111	}
112
113	mutex_enter(&tq->tq_lock);
114	ASSERT(tq->tq_flags & TASKQ_ACTIVE);
115	if ((t = task_alloc(tq, tqflags)) == NULL) {
116		mutex_exit(&tq->tq_lock);
117		return (0);
118	}
119	if (tqflags & TQ_FRONT) {
120		t->tqent_next = tq->tq_task.tqent_next;
121		t->tqent_prev = &tq->tq_task;
122	} else {
123		t->tqent_next = &tq->tq_task;
124		t->tqent_prev = tq->tq_task.tqent_prev;
125	}
126	t->tqent_next->tqent_prev = t;
127	t->tqent_prev->tqent_next = t;
128	t->tqent_func = func;
129	t->tqent_arg = arg;
130	t->tqent_flags = 0;
131	cv_signal(&tq->tq_dispatch_cv);
132	mutex_exit(&tq->tq_lock);
133	return (1);
134}
135
136taskqid_t
137taskq_dispatch_delay(taskq_t *tq,  task_func_t func, void *arg, uint_t tqflags,
138    clock_t expire_time)
139{
140	return (0);
141}
142
143int
144taskq_empty_ent(taskq_ent_t *t)
145{
146	return (t->tqent_next == NULL);
147}
148
149void
150taskq_init_ent(taskq_ent_t *t)
151{
152	t->tqent_next = NULL;
153	t->tqent_prev = NULL;
154	t->tqent_func = NULL;
155	t->tqent_arg = NULL;
156	t->tqent_flags = 0;
157}
158
159void
160taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags,
161    taskq_ent_t *t)
162{
163	ASSERT(func != NULL);
164
165	/*
166	 * Mark it as a prealloc'd task.  This is important
167	 * to ensure that we don't free it later.
168	 */
169	t->tqent_flags |= TQENT_FLAG_PREALLOC;
170	/*
171	 * Enqueue the task to the underlying queue.
172	 */
173	mutex_enter(&tq->tq_lock);
174
175	if (flags & TQ_FRONT) {
176		t->tqent_next = tq->tq_task.tqent_next;
177		t->tqent_prev = &tq->tq_task;
178	} else {
179		t->tqent_next = &tq->tq_task;
180		t->tqent_prev = tq->tq_task.tqent_prev;
181	}
182	t->tqent_next->tqent_prev = t;
183	t->tqent_prev->tqent_next = t;
184	t->tqent_func = func;
185	t->tqent_arg = arg;
186	cv_signal(&tq->tq_dispatch_cv);
187	mutex_exit(&tq->tq_lock);
188}
189
190void
191taskq_wait(taskq_t *tq)
192{
193	mutex_enter(&tq->tq_lock);
194	while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
195		cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
196	mutex_exit(&tq->tq_lock);
197}
198
199void
200taskq_wait_id(taskq_t *tq, taskqid_t id)
201{
202	taskq_wait(tq);
203}
204
205void
206taskq_wait_outstanding(taskq_t *tq, taskqid_t id)
207{
208	taskq_wait(tq);
209}
210
211static void
212taskq_thread(void *arg)
213{
214	taskq_t *tq = arg;
215	taskq_ent_t *t;
216	boolean_t prealloc;
217
218	VERIFY0(pthread_setspecific(taskq_tsd, tq));
219
220	mutex_enter(&tq->tq_lock);
221	while (tq->tq_flags & TASKQ_ACTIVE) {
222		if ((t = tq->tq_task.tqent_next) == &tq->tq_task) {
223			if (--tq->tq_active == 0)
224				cv_broadcast(&tq->tq_wait_cv);
225			cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock);
226			tq->tq_active++;
227			continue;
228		}
229		t->tqent_prev->tqent_next = t->tqent_next;
230		t->tqent_next->tqent_prev = t->tqent_prev;
231		t->tqent_next = NULL;
232		t->tqent_prev = NULL;
233		prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC;
234		mutex_exit(&tq->tq_lock);
235
236		rw_enter(&tq->tq_threadlock, RW_READER);
237		t->tqent_func(t->tqent_arg);
238		rw_exit(&tq->tq_threadlock);
239
240		mutex_enter(&tq->tq_lock);
241		if (!prealloc)
242			task_free(tq, t);
243	}
244	tq->tq_nthreads--;
245	cv_broadcast(&tq->tq_wait_cv);
246	mutex_exit(&tq->tq_lock);
247	thread_exit();
248}
249
250/*ARGSUSED*/
251taskq_t *
252taskq_create(const char *name, int nthreads, pri_t pri,
253    int minalloc, int maxalloc, uint_t flags)
254{
255	taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP);
256	int t;
257
258	if (flags & TASKQ_THREADS_CPU_PCT) {
259		int pct;
260		ASSERT3S(nthreads, >=, 0);
261		ASSERT3S(nthreads, <=, 100);
262		pct = MIN(nthreads, 100);
263		pct = MAX(pct, 0);
264
265		nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100;
266		nthreads = MAX(nthreads, 1);	/* need at least 1 thread */
267	} else {
268		ASSERT3S(nthreads, >=, 1);
269	}
270
271	rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
272	mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
273	cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
274	cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
275	cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
276	(void) strncpy(tq->tq_name, name, TASKQ_NAMELEN);
277	tq->tq_flags = flags | TASKQ_ACTIVE;
278	tq->tq_active = nthreads;
279	tq->tq_nthreads = nthreads;
280	tq->tq_minalloc = minalloc;
281	tq->tq_maxalloc = maxalloc;
282	tq->tq_task.tqent_next = &tq->tq_task;
283	tq->tq_task.tqent_prev = &tq->tq_task;
284	tq->tq_threadlist = kmem_alloc(nthreads * sizeof (kthread_t *),
285	    KM_SLEEP);
286
287	if (flags & TASKQ_PREPOPULATE) {
288		mutex_enter(&tq->tq_lock);
289		while (minalloc-- > 0)
290			task_free(tq, task_alloc(tq, KM_SLEEP));
291		mutex_exit(&tq->tq_lock);
292	}
293
294	for (t = 0; t < nthreads; t++)
295		VERIFY((tq->tq_threadlist[t] = thread_create(NULL, 0,
296		    taskq_thread, tq, 0, &p0, TS_RUN, pri)) != NULL);
297
298	return (tq);
299}
300
301void
302taskq_destroy(taskq_t *tq)
303{
304	int nthreads = tq->tq_nthreads;
305
306	taskq_wait(tq);
307
308	mutex_enter(&tq->tq_lock);
309
310	tq->tq_flags &= ~TASKQ_ACTIVE;
311	cv_broadcast(&tq->tq_dispatch_cv);
312
313	while (tq->tq_nthreads != 0)
314		cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
315
316	tq->tq_minalloc = 0;
317	while (tq->tq_nalloc != 0) {
318		ASSERT(tq->tq_freelist != NULL);
319		task_free(tq, task_alloc(tq, KM_SLEEP));
320	}
321
322	mutex_exit(&tq->tq_lock);
323
324	kmem_free(tq->tq_threadlist, nthreads * sizeof (kthread_t *));
325
326	rw_destroy(&tq->tq_threadlock);
327	mutex_destroy(&tq->tq_lock);
328	cv_destroy(&tq->tq_dispatch_cv);
329	cv_destroy(&tq->tq_wait_cv);
330	cv_destroy(&tq->tq_maxalloc_cv);
331
332	kmem_free(tq, sizeof (taskq_t));
333}
334
335int
336taskq_member(taskq_t *tq, kthread_t *t)
337{
338	int i;
339
340	if (taskq_now)
341		return (1);
342
343	for (i = 0; i < tq->tq_nthreads; i++)
344		if (tq->tq_threadlist[i] == t)
345			return (1);
346
347	return (0);
348}
349
350taskq_t *
351taskq_of_curthread(void)
352{
353	return (pthread_getspecific(taskq_tsd));
354}
355
356int
357taskq_cancel_id(taskq_t *tq, taskqid_t id)
358{
359	return (ENOENT);
360}
361
362void
363system_taskq_init(void)
364{
365	VERIFY0(pthread_key_create(&taskq_tsd, NULL));
366	system_taskq = taskq_create("system_taskq", 64, maxclsyspri, 4, 512,
367	    TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
368	system_delay_taskq = taskq_create("delay_taskq", 4, maxclsyspri, 4,
369	    512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE);
370}
371
372void
373system_taskq_fini(void)
374{
375	taskq_destroy(system_taskq);
376	system_taskq = NULL; /* defensive */
377	taskq_destroy(system_delay_taskq);
378	system_delay_taskq = NULL;
379	VERIFY0(pthread_key_delete(taskq_tsd));
380}
381