1168404Spjd/* 2168404Spjd * CDDL HEADER START 3168404Spjd * 4168404Spjd * The contents of this file are subject to the terms of the 5185029Spjd * Common Development and Distribution License (the "License"). 6185029Spjd * You may not use this file except in compliance with the License. 7168404Spjd * 8168404Spjd * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9168404Spjd * or http://www.opensolaris.org/os/licensing. 10168404Spjd * See the License for the specific language governing permissions 11168404Spjd * and limitations under the License. 12168404Spjd * 13168404Spjd * When distributing Covered Code, include this CDDL HEADER in each 14168404Spjd * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15168404Spjd * If applicable, add the following below this CDDL HEADER, with the 16168404Spjd * fields enclosed by brackets "[]" replaced with your own identifying 17168404Spjd * information: Portions Copyright [yyyy] [name of copyright owner] 18168404Spjd * 19168404Spjd * CDDL HEADER END 20168404Spjd */ 21168404Spjd/* 22219089Spjd * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23168404Spjd * Use is subject to license terms. 24168404Spjd */ 25258630Savg/* 26258630Savg * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27274303Sdelphij * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 28274304Sdelphij * Copyright (c) 2014 by Delphix. All rights reserved. 29258630Savg */ 30168404Spjd 31168404Spjd#include <sys/zfs_context.h> 32168404Spjd 33168404Spjdint taskq_now; 34208047Smmtaskq_t *system_taskq; 35168404Spjd 36168404Spjd#define TASKQ_ACTIVE 0x00010000 37274304Sdelphij#define TASKQ_NAMELEN 31 38168404Spjd 39168404Spjdstruct taskq { 40274304Sdelphij char tq_name[TASKQ_NAMELEN + 1]; 41168404Spjd kmutex_t tq_lock; 42168404Spjd krwlock_t tq_threadlock; 43168404Spjd kcondvar_t tq_dispatch_cv; 44168404Spjd kcondvar_t tq_wait_cv; 45168404Spjd thread_t *tq_threadlist; 46168404Spjd int tq_flags; 47168404Spjd int tq_active; 48168404Spjd int tq_nthreads; 49168404Spjd int tq_nalloc; 50168404Spjd int tq_minalloc; 51168404Spjd int tq_maxalloc; 52219089Spjd kcondvar_t tq_maxalloc_cv; 53219089Spjd int tq_maxalloc_wait; 54258630Savg taskq_ent_t *tq_freelist; 55258630Savg taskq_ent_t tq_task; 56168404Spjd}; 57168404Spjd 58258630Savgstatic taskq_ent_t * 59168404Spjdtask_alloc(taskq_t *tq, int tqflags) 60168404Spjd{ 61258630Savg taskq_ent_t *t; 62219089Spjd int rv; 63168404Spjd 64219089Spjdagain: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 65258630Savg tq->tq_freelist = t->tqent_next; 66168404Spjd } else { 67168404Spjd if (tq->tq_nalloc >= tq->tq_maxalloc) { 68219089Spjd if (!(tqflags & KM_SLEEP)) 69168404Spjd return (NULL); 70219089Spjd 71168404Spjd /* 72168404Spjd * We don't want to exceed tq_maxalloc, but we can't 73168404Spjd * wait for other tasks to complete (and thus free up 74168404Spjd * task structures) without risking deadlock with 75168404Spjd * the caller. So, we just delay for one second 76219089Spjd * to throttle the allocation rate. If we have tasks 77219089Spjd * complete before one second timeout expires then 78219089Spjd * taskq_ent_free will signal us and we will 79219089Spjd * immediately retry the allocation. 80168404Spjd */ 81219089Spjd tq->tq_maxalloc_wait++; 82349203Savg#ifdef __FreeBSD__ 83219089Spjd rv = cv_timedwait(&tq->tq_maxalloc_cv, 84349203Savg &tq->tq_lock, hz); 85349203Savg#else 86349203Savg rv = cv_timedwait(&tq->tq_maxalloc_cv, 87219089Spjd &tq->tq_lock, ddi_get_lbolt() + hz); 88349203Savg#endif 89219089Spjd tq->tq_maxalloc_wait--; 90219089Spjd if (rv > 0) 91219089Spjd goto again; /* signaled */ 92168404Spjd } 93219089Spjd mutex_exit(&tq->tq_lock); 94219089Spjd 95258630Savg t = kmem_alloc(sizeof (taskq_ent_t), tqflags & KM_SLEEP); 96219089Spjd 97168404Spjd mutex_enter(&tq->tq_lock); 98168404Spjd if (t != NULL) 99168404Spjd tq->tq_nalloc++; 100168404Spjd } 101168404Spjd return (t); 102168404Spjd} 103168404Spjd 104168404Spjdstatic void 105258630Savgtask_free(taskq_t *tq, taskq_ent_t *t) 106168404Spjd{ 107168404Spjd if (tq->tq_nalloc <= tq->tq_minalloc) { 108258630Savg t->tqent_next = tq->tq_freelist; 109168404Spjd tq->tq_freelist = t; 110168404Spjd } else { 111168404Spjd tq->tq_nalloc--; 112168404Spjd mutex_exit(&tq->tq_lock); 113258630Savg kmem_free(t, sizeof (taskq_ent_t)); 114168404Spjd mutex_enter(&tq->tq_lock); 115168404Spjd } 116219089Spjd 117219089Spjd if (tq->tq_maxalloc_wait) 118219089Spjd cv_signal(&tq->tq_maxalloc_cv); 119168404Spjd} 120168404Spjd 121168404Spjdtaskqid_t 122168404Spjdtaskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 123168404Spjd{ 124258630Savg taskq_ent_t *t; 125168404Spjd 126168404Spjd if (taskq_now) { 127168404Spjd func(arg); 128168404Spjd return (1); 129168404Spjd } 130168404Spjd 131168404Spjd mutex_enter(&tq->tq_lock); 132168404Spjd ASSERT(tq->tq_flags & TASKQ_ACTIVE); 133168404Spjd if ((t = task_alloc(tq, tqflags)) == NULL) { 134168404Spjd mutex_exit(&tq->tq_lock); 135168404Spjd return (0); 136168404Spjd } 137219089Spjd if (tqflags & TQ_FRONT) { 138258630Savg t->tqent_next = tq->tq_task.tqent_next; 139258630Savg t->tqent_prev = &tq->tq_task; 140219089Spjd } else { 141258630Savg t->tqent_next = &tq->tq_task; 142258630Savg t->tqent_prev = tq->tq_task.tqent_prev; 143219089Spjd } 144258630Savg t->tqent_next->tqent_prev = t; 145258630Savg t->tqent_prev->tqent_next = t; 146258630Savg t->tqent_func = func; 147258630Savg t->tqent_arg = arg; 148274303Sdelphij t->tqent_flags = 0; 149168404Spjd cv_signal(&tq->tq_dispatch_cv); 150168404Spjd mutex_exit(&tq->tq_lock); 151168404Spjd return (1); 152168404Spjd} 153168404Spjd 154168404Spjdvoid 155258630Savgtaskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 156258630Savg taskq_ent_t *t) 157258630Savg{ 158258630Savg ASSERT(func != NULL); 159258630Savg ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 160258630Savg 161258630Savg /* 162258630Savg * Mark it as a prealloc'd task. This is important 163258630Savg * to ensure that we don't free it later. 164258630Savg */ 165258630Savg t->tqent_flags |= TQENT_FLAG_PREALLOC; 166258630Savg /* 167258630Savg * Enqueue the task to the underlying queue. 168258630Savg */ 169258630Savg mutex_enter(&tq->tq_lock); 170258630Savg 171258630Savg if (flags & TQ_FRONT) { 172258630Savg t->tqent_next = tq->tq_task.tqent_next; 173258630Savg t->tqent_prev = &tq->tq_task; 174258630Savg } else { 175258630Savg t->tqent_next = &tq->tq_task; 176258630Savg t->tqent_prev = tq->tq_task.tqent_prev; 177258630Savg } 178258630Savg t->tqent_next->tqent_prev = t; 179258630Savg t->tqent_prev->tqent_next = t; 180258630Savg t->tqent_func = func; 181258630Savg t->tqent_arg = arg; 182258630Savg cv_signal(&tq->tq_dispatch_cv); 183258630Savg mutex_exit(&tq->tq_lock); 184258630Savg} 185258630Savg 186258630Savgvoid 187168404Spjdtaskq_wait(taskq_t *tq) 188168404Spjd{ 189168404Spjd mutex_enter(&tq->tq_lock); 190258630Savg while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 191168404Spjd cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 192168404Spjd mutex_exit(&tq->tq_lock); 193168404Spjd} 194168404Spjd 195339034Ssefvoid 196339034Sseftaskq_wait_id(taskq_t *tq, taskqid_t id) 197339034Ssef{ 198339034Ssef taskq_wait(tq); 199339034Ssef} 200339034Ssef 201168404Spjdstatic void * 202168404Spjdtaskq_thread(void *arg) 203168404Spjd{ 204168404Spjd taskq_t *tq = arg; 205258630Savg taskq_ent_t *t; 206258630Savg boolean_t prealloc; 207168404Spjd 208168404Spjd mutex_enter(&tq->tq_lock); 209168404Spjd while (tq->tq_flags & TASKQ_ACTIVE) { 210258630Savg if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 211168404Spjd if (--tq->tq_active == 0) 212168404Spjd cv_broadcast(&tq->tq_wait_cv); 213168404Spjd cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 214168404Spjd tq->tq_active++; 215168404Spjd continue; 216168404Spjd } 217258630Savg t->tqent_prev->tqent_next = t->tqent_next; 218258630Savg t->tqent_next->tqent_prev = t->tqent_prev; 219258630Savg t->tqent_next = NULL; 220258630Savg t->tqent_prev = NULL; 221258630Savg prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 222168404Spjd mutex_exit(&tq->tq_lock); 223168404Spjd 224168404Spjd rw_enter(&tq->tq_threadlock, RW_READER); 225258630Savg t->tqent_func(t->tqent_arg); 226168404Spjd rw_exit(&tq->tq_threadlock); 227168404Spjd 228168404Spjd mutex_enter(&tq->tq_lock); 229258630Savg if (!prealloc) 230258630Savg task_free(tq, t); 231168404Spjd } 232168404Spjd tq->tq_nthreads--; 233168404Spjd cv_broadcast(&tq->tq_wait_cv); 234168404Spjd mutex_exit(&tq->tq_lock); 235168404Spjd return (NULL); 236168404Spjd} 237168404Spjd 238168404Spjd/*ARGSUSED*/ 239168404Spjdtaskq_t * 240168404Spjdtaskq_create(const char *name, int nthreads, pri_t pri, 241168404Spjd int minalloc, int maxalloc, uint_t flags) 242168404Spjd{ 243168404Spjd taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 244168404Spjd int t; 245168404Spjd 246209962Smm if (flags & TASKQ_THREADS_CPU_PCT) { 247209962Smm int pct; 248209962Smm ASSERT3S(nthreads, >=, 0); 249209962Smm ASSERT3S(nthreads, <=, 100); 250209962Smm pct = MIN(nthreads, 100); 251209962Smm pct = MAX(pct, 0); 252209962Smm 253209962Smm nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 254209962Smm nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 255209962Smm } else { 256209962Smm ASSERT3S(nthreads, >=, 1); 257209962Smm } 258209962Smm 259168404Spjd rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 260185029Spjd mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 261185029Spjd cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 262185029Spjd cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 263219089Spjd cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 264274304Sdelphij (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); 265168404Spjd tq->tq_flags = flags | TASKQ_ACTIVE; 266168404Spjd tq->tq_active = nthreads; 267168404Spjd tq->tq_nthreads = nthreads; 268168404Spjd tq->tq_minalloc = minalloc; 269168404Spjd tq->tq_maxalloc = maxalloc; 270258630Savg tq->tq_task.tqent_next = &tq->tq_task; 271258630Savg tq->tq_task.tqent_prev = &tq->tq_task; 272168404Spjd tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 273168404Spjd 274168404Spjd if (flags & TASKQ_PREPOPULATE) { 275168404Spjd mutex_enter(&tq->tq_lock); 276168404Spjd while (minalloc-- > 0) 277168404Spjd task_free(tq, task_alloc(tq, KM_SLEEP)); 278168404Spjd mutex_exit(&tq->tq_lock); 279168404Spjd } 280168404Spjd 281168404Spjd for (t = 0; t < nthreads; t++) 282168404Spjd (void) thr_create(0, 0, taskq_thread, 283168404Spjd tq, THR_BOUND, &tq->tq_threadlist[t]); 284168404Spjd 285168404Spjd return (tq); 286168404Spjd} 287168404Spjd 288168404Spjdvoid 289168404Spjdtaskq_destroy(taskq_t *tq) 290168404Spjd{ 291168404Spjd int t; 292168404Spjd int nthreads = tq->tq_nthreads; 293168404Spjd 294168404Spjd taskq_wait(tq); 295168404Spjd 296168404Spjd mutex_enter(&tq->tq_lock); 297168404Spjd 298168404Spjd tq->tq_flags &= ~TASKQ_ACTIVE; 299168404Spjd cv_broadcast(&tq->tq_dispatch_cv); 300168404Spjd 301168404Spjd while (tq->tq_nthreads != 0) 302168404Spjd cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 303168404Spjd 304168404Spjd tq->tq_minalloc = 0; 305168404Spjd while (tq->tq_nalloc != 0) { 306168404Spjd ASSERT(tq->tq_freelist != NULL); 307168404Spjd task_free(tq, task_alloc(tq, KM_SLEEP)); 308168404Spjd } 309168404Spjd 310168404Spjd mutex_exit(&tq->tq_lock); 311168404Spjd 312168404Spjd for (t = 0; t < nthreads; t++) 313168404Spjd (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 314168404Spjd 315168404Spjd kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 316168404Spjd 317168404Spjd rw_destroy(&tq->tq_threadlock); 318185029Spjd mutex_destroy(&tq->tq_lock); 319185029Spjd cv_destroy(&tq->tq_dispatch_cv); 320185029Spjd cv_destroy(&tq->tq_wait_cv); 321219089Spjd cv_destroy(&tq->tq_maxalloc_cv); 322168404Spjd 323168404Spjd kmem_free(tq, sizeof (taskq_t)); 324168404Spjd} 325168404Spjd 326168404Spjdint 327168404Spjdtaskq_member(taskq_t *tq, void *t) 328168404Spjd{ 329168404Spjd int i; 330168404Spjd 331168404Spjd if (taskq_now) 332168404Spjd return (1); 333168404Spjd 334168404Spjd for (i = 0; i < tq->tq_nthreads; i++) 335168404Spjd if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 336168404Spjd return (1); 337168404Spjd 338168404Spjd return (0); 339168404Spjd} 340208047Smm 341208047Smmvoid 342208047Smmsystem_taskq_init(void) 343208047Smm{ 344208047Smm system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 345208047Smm TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 346208047Smm} 347219089Spjd 348219089Spjdvoid 349219089Spjdsystem_taskq_fini(void) 350219089Spjd{ 351219089Spjd taskq_destroy(system_taskq); 352219089Spjd system_taskq = NULL; /* defensive */ 353219089Spjd} 354