1168404Spjd/* 2168404Spjd * CDDL HEADER START 3168404Spjd * 4168404Spjd * The contents of this file are subject to the terms of the 5185029Spjd * Common Development and Distribution License (the "License"). 6185029Spjd * You may not use this file except in compliance with the License. 7168404Spjd * 8168404Spjd * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9168404Spjd * or http://www.opensolaris.org/os/licensing. 10168404Spjd * See the License for the specific language governing permissions 11168404Spjd * and limitations under the License. 12168404Spjd * 13168404Spjd * When distributing Covered Code, include this CDDL HEADER in each 14168404Spjd * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15168404Spjd * If applicable, add the following below this CDDL HEADER, with the 16168404Spjd * fields enclosed by brackets "[]" replaced with your own identifying 17168404Spjd * information: Portions Copyright [yyyy] [name of copyright owner] 18168404Spjd * 19168404Spjd * CDDL HEADER END 20168404Spjd */ 21168404Spjd/* 22219089Spjd * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23168404Spjd * Use is subject to license terms. 24168404Spjd */ 25260742Savg/* 26260742Savg * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27260742Savg */ 28168404Spjd 29168404Spjd#include <sys/zfs_context.h> 30168404Spjd 31168404Spjdint taskq_now; 32208047Smmtaskq_t *system_taskq; 33168404Spjd 34168404Spjd#define TASKQ_ACTIVE 0x00010000 35168404Spjd 36168404Spjdstruct taskq { 37168404Spjd kmutex_t tq_lock; 38168404Spjd krwlock_t tq_threadlock; 39168404Spjd kcondvar_t tq_dispatch_cv; 40168404Spjd kcondvar_t tq_wait_cv; 41168404Spjd thread_t *tq_threadlist; 42168404Spjd int tq_flags; 43168404Spjd int tq_active; 44168404Spjd int tq_nthreads; 45168404Spjd int tq_nalloc; 46168404Spjd int tq_minalloc; 47168404Spjd int tq_maxalloc; 48219089Spjd kcondvar_t tq_maxalloc_cv; 49219089Spjd int tq_maxalloc_wait; 50260742Savg taskq_ent_t *tq_freelist; 51260742Savg taskq_ent_t tq_task; 52168404Spjd}; 53168404Spjd 54260742Savgstatic taskq_ent_t * 55168404Spjdtask_alloc(taskq_t *tq, int tqflags) 56168404Spjd{ 57260742Savg taskq_ent_t *t; 58219089Spjd int rv; 59168404Spjd 60219089Spjdagain: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 61260742Savg tq->tq_freelist = t->tqent_next; 62168404Spjd } else { 63168404Spjd if (tq->tq_nalloc >= tq->tq_maxalloc) { 64219089Spjd if (!(tqflags & KM_SLEEP)) 65168404Spjd return (NULL); 66219089Spjd 67168404Spjd /* 68168404Spjd * We don't want to exceed tq_maxalloc, but we can't 69168404Spjd * wait for other tasks to complete (and thus free up 70168404Spjd * task structures) without risking deadlock with 71168404Spjd * the caller. So, we just delay for one second 72219089Spjd * to throttle the allocation rate. If we have tasks 73219089Spjd * complete before one second timeout expires then 74219089Spjd * taskq_ent_free will signal us and we will 75219089Spjd * immediately retry the allocation. 76168404Spjd */ 77219089Spjd tq->tq_maxalloc_wait++; 78219089Spjd rv = cv_timedwait(&tq->tq_maxalloc_cv, 79219089Spjd &tq->tq_lock, ddi_get_lbolt() + hz); 80219089Spjd tq->tq_maxalloc_wait--; 81219089Spjd if (rv > 0) 82219089Spjd goto again; /* signaled */ 83168404Spjd } 84219089Spjd mutex_exit(&tq->tq_lock); 85219089Spjd 86260742Savg t = kmem_alloc(sizeof (taskq_ent_t), tqflags & KM_SLEEP); 87219089Spjd 88168404Spjd mutex_enter(&tq->tq_lock); 89168404Spjd if (t != NULL) 90168404Spjd tq->tq_nalloc++; 91168404Spjd } 92168404Spjd return (t); 93168404Spjd} 94168404Spjd 95168404Spjdstatic void 96260742Savgtask_free(taskq_t *tq, taskq_ent_t *t) 97168404Spjd{ 98168404Spjd if (tq->tq_nalloc <= tq->tq_minalloc) { 99260742Savg t->tqent_next = tq->tq_freelist; 100168404Spjd tq->tq_freelist = t; 101168404Spjd } else { 102168404Spjd tq->tq_nalloc--; 103168404Spjd mutex_exit(&tq->tq_lock); 104260742Savg kmem_free(t, sizeof (taskq_ent_t)); 105168404Spjd mutex_enter(&tq->tq_lock); 106168404Spjd } 107219089Spjd 108219089Spjd if (tq->tq_maxalloc_wait) 109219089Spjd cv_signal(&tq->tq_maxalloc_cv); 110168404Spjd} 111168404Spjd 112168404Spjdtaskqid_t 113168404Spjdtaskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 114168404Spjd{ 115260742Savg taskq_ent_t *t; 116168404Spjd 117168404Spjd if (taskq_now) { 118168404Spjd func(arg); 119168404Spjd return (1); 120168404Spjd } 121168404Spjd 122168404Spjd mutex_enter(&tq->tq_lock); 123168404Spjd ASSERT(tq->tq_flags & TASKQ_ACTIVE); 124168404Spjd if ((t = task_alloc(tq, tqflags)) == NULL) { 125168404Spjd mutex_exit(&tq->tq_lock); 126168404Spjd return (0); 127168404Spjd } 128219089Spjd if (tqflags & TQ_FRONT) { 129260742Savg t->tqent_next = tq->tq_task.tqent_next; 130260742Savg t->tqent_prev = &tq->tq_task; 131219089Spjd } else { 132260742Savg t->tqent_next = &tq->tq_task; 133260742Savg t->tqent_prev = tq->tq_task.tqent_prev; 134219089Spjd } 135260742Savg t->tqent_next->tqent_prev = t; 136260742Savg t->tqent_prev->tqent_next = t; 137260742Savg t->tqent_func = func; 138260742Savg t->tqent_arg = arg; 139168404Spjd cv_signal(&tq->tq_dispatch_cv); 140168404Spjd mutex_exit(&tq->tq_lock); 141168404Spjd return (1); 142168404Spjd} 143168404Spjd 144168404Spjdvoid 145260742Savgtaskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 146260742Savg taskq_ent_t *t) 147260742Savg{ 148260742Savg ASSERT(func != NULL); 149260742Savg ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 150260742Savg 151260742Savg /* 152260742Savg * Mark it as a prealloc'd task. This is important 153260742Savg * to ensure that we don't free it later. 154260742Savg */ 155260742Savg t->tqent_flags |= TQENT_FLAG_PREALLOC; 156260742Savg /* 157260742Savg * Enqueue the task to the underlying queue. 158260742Savg */ 159260742Savg mutex_enter(&tq->tq_lock); 160260742Savg 161260742Savg if (flags & TQ_FRONT) { 162260742Savg t->tqent_next = tq->tq_task.tqent_next; 163260742Savg t->tqent_prev = &tq->tq_task; 164260742Savg } else { 165260742Savg t->tqent_next = &tq->tq_task; 166260742Savg t->tqent_prev = tq->tq_task.tqent_prev; 167260742Savg } 168260742Savg t->tqent_next->tqent_prev = t; 169260742Savg t->tqent_prev->tqent_next = t; 170260742Savg t->tqent_func = func; 171260742Savg t->tqent_arg = arg; 172260742Savg cv_signal(&tq->tq_dispatch_cv); 173260742Savg mutex_exit(&tq->tq_lock); 174260742Savg} 175260742Savg 176260742Savgvoid 177168404Spjdtaskq_wait(taskq_t *tq) 178168404Spjd{ 179168404Spjd mutex_enter(&tq->tq_lock); 180260742Savg while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 181168404Spjd cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 182168404Spjd mutex_exit(&tq->tq_lock); 183168404Spjd} 184168404Spjd 185168404Spjdstatic void * 186168404Spjdtaskq_thread(void *arg) 187168404Spjd{ 188168404Spjd taskq_t *tq = arg; 189260742Savg taskq_ent_t *t; 190260742Savg boolean_t prealloc; 191168404Spjd 192168404Spjd mutex_enter(&tq->tq_lock); 193168404Spjd while (tq->tq_flags & TASKQ_ACTIVE) { 194260742Savg if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 195168404Spjd if (--tq->tq_active == 0) 196168404Spjd cv_broadcast(&tq->tq_wait_cv); 197168404Spjd cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 198168404Spjd tq->tq_active++; 199168404Spjd continue; 200168404Spjd } 201260742Savg t->tqent_prev->tqent_next = t->tqent_next; 202260742Savg t->tqent_next->tqent_prev = t->tqent_prev; 203260742Savg t->tqent_next = NULL; 204260742Savg t->tqent_prev = NULL; 205260742Savg prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 206168404Spjd mutex_exit(&tq->tq_lock); 207168404Spjd 208168404Spjd rw_enter(&tq->tq_threadlock, RW_READER); 209260742Savg t->tqent_func(t->tqent_arg); 210168404Spjd rw_exit(&tq->tq_threadlock); 211168404Spjd 212168404Spjd mutex_enter(&tq->tq_lock); 213260742Savg if (!prealloc) 214260742Savg task_free(tq, t); 215168404Spjd } 216168404Spjd tq->tq_nthreads--; 217168404Spjd cv_broadcast(&tq->tq_wait_cv); 218168404Spjd mutex_exit(&tq->tq_lock); 219168404Spjd return (NULL); 220168404Spjd} 221168404Spjd 222168404Spjd/*ARGSUSED*/ 223168404Spjdtaskq_t * 224168404Spjdtaskq_create(const char *name, int nthreads, pri_t pri, 225168404Spjd int minalloc, int maxalloc, uint_t flags) 226168404Spjd{ 227168404Spjd taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 228168404Spjd int t; 229168404Spjd 230209962Smm if (flags & TASKQ_THREADS_CPU_PCT) { 231209962Smm int pct; 232209962Smm ASSERT3S(nthreads, >=, 0); 233209962Smm ASSERT3S(nthreads, <=, 100); 234209962Smm pct = MIN(nthreads, 100); 235209962Smm pct = MAX(pct, 0); 236209962Smm 237209962Smm nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 238209962Smm nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 239209962Smm } else { 240209962Smm ASSERT3S(nthreads, >=, 1); 241209962Smm } 242209962Smm 243168404Spjd rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 244185029Spjd mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 245185029Spjd cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 246185029Spjd cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 247219089Spjd cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 248168404Spjd tq->tq_flags = flags | TASKQ_ACTIVE; 249168404Spjd tq->tq_active = nthreads; 250168404Spjd tq->tq_nthreads = nthreads; 251168404Spjd tq->tq_minalloc = minalloc; 252168404Spjd tq->tq_maxalloc = maxalloc; 253260742Savg tq->tq_task.tqent_next = &tq->tq_task; 254260742Savg tq->tq_task.tqent_prev = &tq->tq_task; 255168404Spjd tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 256168404Spjd 257168404Spjd if (flags & TASKQ_PREPOPULATE) { 258168404Spjd mutex_enter(&tq->tq_lock); 259168404Spjd while (minalloc-- > 0) 260168404Spjd task_free(tq, task_alloc(tq, KM_SLEEP)); 261168404Spjd mutex_exit(&tq->tq_lock); 262168404Spjd } 263168404Spjd 264168404Spjd for (t = 0; t < nthreads; t++) 265168404Spjd (void) thr_create(0, 0, taskq_thread, 266168404Spjd tq, THR_BOUND, &tq->tq_threadlist[t]); 267168404Spjd 268168404Spjd return (tq); 269168404Spjd} 270168404Spjd 271168404Spjdvoid 272168404Spjdtaskq_destroy(taskq_t *tq) 273168404Spjd{ 274168404Spjd int t; 275168404Spjd int nthreads = tq->tq_nthreads; 276168404Spjd 277168404Spjd taskq_wait(tq); 278168404Spjd 279168404Spjd mutex_enter(&tq->tq_lock); 280168404Spjd 281168404Spjd tq->tq_flags &= ~TASKQ_ACTIVE; 282168404Spjd cv_broadcast(&tq->tq_dispatch_cv); 283168404Spjd 284168404Spjd while (tq->tq_nthreads != 0) 285168404Spjd cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 286168404Spjd 287168404Spjd tq->tq_minalloc = 0; 288168404Spjd while (tq->tq_nalloc != 0) { 289168404Spjd ASSERT(tq->tq_freelist != NULL); 290168404Spjd task_free(tq, task_alloc(tq, KM_SLEEP)); 291168404Spjd } 292168404Spjd 293168404Spjd mutex_exit(&tq->tq_lock); 294168404Spjd 295168404Spjd for (t = 0; t < nthreads; t++) 296168404Spjd (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 297168404Spjd 298168404Spjd kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 299168404Spjd 300168404Spjd rw_destroy(&tq->tq_threadlock); 301185029Spjd mutex_destroy(&tq->tq_lock); 302185029Spjd cv_destroy(&tq->tq_dispatch_cv); 303185029Spjd cv_destroy(&tq->tq_wait_cv); 304219089Spjd cv_destroy(&tq->tq_maxalloc_cv); 305168404Spjd 306168404Spjd kmem_free(tq, sizeof (taskq_t)); 307168404Spjd} 308168404Spjd 309168404Spjdint 310168404Spjdtaskq_member(taskq_t *tq, void *t) 311168404Spjd{ 312168404Spjd int i; 313168404Spjd 314168404Spjd if (taskq_now) 315168404Spjd return (1); 316168404Spjd 317168404Spjd for (i = 0; i < tq->tq_nthreads; i++) 318168404Spjd if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 319168404Spjd return (1); 320168404Spjd 321168404Spjd return (0); 322168404Spjd} 323208047Smm 324208047Smmvoid 325208047Smmsystem_taskq_init(void) 326208047Smm{ 327208047Smm system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 328208047Smm TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 329208047Smm} 330219089Spjd 331219089Spjdvoid 332219089Spjdsystem_taskq_fini(void) 333219089Spjd{ 334219089Spjd taskq_destroy(system_taskq); 335219089Spjd system_taskq = NULL; /* defensive */ 336219089Spjd} 337