1112158Sdas/* 2112158Sdas * CDDL HEADER START 3112158Sdas * 4112158Sdas * The contents of this file are subject to the terms of the 5112158Sdas * Common Development and Distribution License (the "License"). 6112158Sdas * You may not use this file except in compliance with the License. 7112158Sdas * 8112158Sdas * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9112158Sdas * or http://www.opensolaris.org/os/licensing. 10112158Sdas * See the License for the specific language governing permissions 11112158Sdas * and limitations under the License. 12112158Sdas * 13112158Sdas * When distributing Covered Code, include this CDDL HEADER in each 14112158Sdas * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15112158Sdas * If applicable, add the following below this CDDL HEADER, with the 16112158Sdas * fields enclosed by brackets "[]" replaced with your own identifying 17112158Sdas * information: Portions Copyright [yyyy] [name of copyright owner] 18112158Sdas * 19112158Sdas * CDDL HEADER END 20112158Sdas */ 21112158Sdas/* 22112158Sdas * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23112158Sdas * Use is subject to license terms. 24112158Sdas */ 25112158Sdas/* 26112158Sdas * Copyright 2011 Nexenta Systems, Inc. All rights reserved. 27112158Sdas * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. 28112158Sdas * Copyright (c) 2014 by Delphix. All rights reserved. 29165743Sdas */ 30165743Sdas 31112158Sdas#include <sys/zfs_context.h> 32112158Sdas 33112158Sdasint taskq_now; 34112158Sdastaskq_t *system_taskq; 35112158Sdas 36112158Sdas#define TASKQ_ACTIVE 0x00010000 37112158Sdas#define TASKQ_NAMELEN 31 38112158Sdas 39112158Sdasstruct taskq { 40112158Sdas char tq_name[TASKQ_NAMELEN + 1]; 41112158Sdas kmutex_t tq_lock; 42112158Sdas krwlock_t tq_threadlock; 43112158Sdas kcondvar_t tq_dispatch_cv; 44112158Sdas kcondvar_t tq_wait_cv; 45112158Sdas thread_t *tq_threadlist; 46112158Sdas int tq_flags; 47112158Sdas int tq_active; 48112158Sdas int tq_nthreads; 49112158Sdas int tq_nalloc; 50112158Sdas int tq_minalloc; 51112158Sdas int tq_maxalloc; 52112158Sdas kcondvar_t tq_maxalloc_cv; 53112158Sdas int tq_maxalloc_wait; 54112158Sdas taskq_ent_t *tq_freelist; 55112158Sdas taskq_ent_t tq_task; 56112158Sdas}; 57112158Sdas 58112158Sdasstatic taskq_ent_t * 59112158Sdastask_alloc(taskq_t *tq, int tqflags) 60112158Sdas{ 61112158Sdas taskq_ent_t *t; 62112158Sdas int rv; 63112158Sdas 64112158Sdasagain: if ((t = tq->tq_freelist) != NULL && tq->tq_nalloc >= tq->tq_minalloc) { 65112158Sdas tq->tq_freelist = t->tqent_next; 66112158Sdas } else { 67112158Sdas if (tq->tq_nalloc >= tq->tq_maxalloc) { 68112158Sdas if (!(tqflags & KM_SLEEP)) 69112158Sdas return (NULL); 70112158Sdas 71112158Sdas /* 72112158Sdas * We don't want to exceed tq_maxalloc, but we can't 73112158Sdas * wait for other tasks to complete (and thus free up 74112158Sdas * task structures) without risking deadlock with 75112158Sdas * the caller. So, we just delay for one second 76112158Sdas * to throttle the allocation rate. If we have tasks 77112158Sdas * complete before one second timeout expires then 78112158Sdas * taskq_ent_free will signal us and we will 79112158Sdas * immediately retry the allocation. 80112158Sdas */ 81112158Sdas tq->tq_maxalloc_wait++; 82112158Sdas rv = cv_timedwait(&tq->tq_maxalloc_cv, 83112158Sdas &tq->tq_lock, ddi_get_lbolt() + hz); 84112158Sdas tq->tq_maxalloc_wait--; 85112158Sdas if (rv > 0) 86112158Sdas goto again; /* signaled */ 87112158Sdas } 88112158Sdas mutex_exit(&tq->tq_lock); 89112158Sdas 90112158Sdas t = kmem_alloc(sizeof (taskq_ent_t), tqflags & KM_SLEEP); 91112158Sdas 92112158Sdas mutex_enter(&tq->tq_lock); 93112158Sdas if (t != NULL) 94112158Sdas tq->tq_nalloc++; 95112158Sdas } 96112158Sdas return (t); 97112158Sdas} 98112158Sdas 99112158Sdasstatic void 100112158Sdastask_free(taskq_t *tq, taskq_ent_t *t) 101112158Sdas{ 102112158Sdas if (tq->tq_nalloc <= tq->tq_minalloc) { 103112158Sdas t->tqent_next = tq->tq_freelist; 104112158Sdas tq->tq_freelist = t; 105112158Sdas } else { 106112158Sdas tq->tq_nalloc--; 107112158Sdas mutex_exit(&tq->tq_lock); 108112158Sdas kmem_free(t, sizeof (taskq_ent_t)); 109112158Sdas mutex_enter(&tq->tq_lock); 110112158Sdas } 111112158Sdas 112112158Sdas if (tq->tq_maxalloc_wait) 113112158Sdas cv_signal(&tq->tq_maxalloc_cv); 114112158Sdas} 115112158Sdas 116112158Sdastaskqid_t 117112158Sdastaskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t tqflags) 118112158Sdas{ 119112158Sdas taskq_ent_t *t; 120112158Sdas 121112158Sdas if (taskq_now) { 122112158Sdas func(arg); 123112158Sdas return (1); 124112158Sdas } 125112158Sdas 126112158Sdas mutex_enter(&tq->tq_lock); 127112158Sdas ASSERT(tq->tq_flags & TASKQ_ACTIVE); 128112158Sdas if ((t = task_alloc(tq, tqflags)) == NULL) { 129112158Sdas mutex_exit(&tq->tq_lock); 130112158Sdas return (0); 131112158Sdas } 132112158Sdas if (tqflags & TQ_FRONT) { 133112158Sdas t->tqent_next = tq->tq_task.tqent_next; 134112158Sdas t->tqent_prev = &tq->tq_task; 135112158Sdas } else { 136112158Sdas t->tqent_next = &tq->tq_task; 137112158Sdas t->tqent_prev = tq->tq_task.tqent_prev; 138112158Sdas } 139112158Sdas t->tqent_next->tqent_prev = t; 140165743Sdas t->tqent_prev->tqent_next = t; 141112158Sdas t->tqent_func = func; 142112158Sdas t->tqent_arg = arg; 143112158Sdas t->tqent_flags = 0; 144112158Sdas cv_signal(&tq->tq_dispatch_cv); 145112158Sdas mutex_exit(&tq->tq_lock); 146112158Sdas return (1); 147112158Sdas} 148112158Sdas 149112158Sdasvoid 150112158Sdastaskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, 151112158Sdas taskq_ent_t *t) 152112158Sdas{ 153112158Sdas ASSERT(func != NULL); 154112158Sdas ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC)); 155112158Sdas 156112158Sdas /* 157112158Sdas * Mark it as a prealloc'd task. This is important 158112158Sdas * to ensure that we don't free it later. 159112158Sdas */ 160112158Sdas t->tqent_flags |= TQENT_FLAG_PREALLOC; 161112158Sdas /* 162112158Sdas * Enqueue the task to the underlying queue. 163112158Sdas */ 164112158Sdas mutex_enter(&tq->tq_lock); 165112158Sdas 166112158Sdas if (flags & TQ_FRONT) { 167112158Sdas t->tqent_next = tq->tq_task.tqent_next; 168112158Sdas t->tqent_prev = &tq->tq_task; 169112158Sdas } else { 170112158Sdas t->tqent_next = &tq->tq_task; 171112158Sdas t->tqent_prev = tq->tq_task.tqent_prev; 172112158Sdas } 173112158Sdas t->tqent_next->tqent_prev = t; 174112158Sdas t->tqent_prev->tqent_next = t; 175112158Sdas t->tqent_func = func; 176112158Sdas t->tqent_arg = arg; 177112158Sdas cv_signal(&tq->tq_dispatch_cv); 178112158Sdas mutex_exit(&tq->tq_lock); 179112158Sdas} 180112158Sdas 181112158Sdasvoid 182112158Sdastaskq_wait(taskq_t *tq) 183112158Sdas{ 184112158Sdas mutex_enter(&tq->tq_lock); 185112158Sdas while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0) 186112158Sdas cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 187112158Sdas mutex_exit(&tq->tq_lock); 188112158Sdas} 189112158Sdas 190112158Sdasstatic void * 191112158Sdastaskq_thread(void *arg) 192112158Sdas{ 193112158Sdas taskq_t *tq = arg; 194112158Sdas taskq_ent_t *t; 195112158Sdas boolean_t prealloc; 196112158Sdas 197112158Sdas mutex_enter(&tq->tq_lock); 198112158Sdas while (tq->tq_flags & TASKQ_ACTIVE) { 199112158Sdas if ((t = tq->tq_task.tqent_next) == &tq->tq_task) { 200112158Sdas if (--tq->tq_active == 0) 201112158Sdas cv_broadcast(&tq->tq_wait_cv); 202112158Sdas cv_wait(&tq->tq_dispatch_cv, &tq->tq_lock); 203112158Sdas tq->tq_active++; 204112158Sdas continue; 205165743Sdas } 206112158Sdas t->tqent_prev->tqent_next = t->tqent_next; 207112158Sdas t->tqent_next->tqent_prev = t->tqent_prev; 208112158Sdas t->tqent_next = NULL; 209112158Sdas t->tqent_prev = NULL; 210112158Sdas prealloc = t->tqent_flags & TQENT_FLAG_PREALLOC; 211112158Sdas mutex_exit(&tq->tq_lock); 212112158Sdas 213112158Sdas rw_enter(&tq->tq_threadlock, RW_READER); 214112158Sdas t->tqent_func(t->tqent_arg); 215112158Sdas rw_exit(&tq->tq_threadlock); 216112158Sdas 217112158Sdas mutex_enter(&tq->tq_lock); 218112158Sdas if (!prealloc) 219112158Sdas task_free(tq, t); 220112158Sdas } 221112158Sdas tq->tq_nthreads--; 222112158Sdas cv_broadcast(&tq->tq_wait_cv); 223112158Sdas mutex_exit(&tq->tq_lock); 224112158Sdas return (NULL); 225112158Sdas} 226112158Sdas 227112158Sdas/*ARGSUSED*/ 228112158Sdastaskq_t * 229112158Sdastaskq_create(const char *name, int nthreads, pri_t pri, 230112158Sdas int minalloc, int maxalloc, uint_t flags) 231112158Sdas{ 232112158Sdas taskq_t *tq = kmem_zalloc(sizeof (taskq_t), KM_SLEEP); 233112158Sdas int t; 234112158Sdas 235112158Sdas if (flags & TASKQ_THREADS_CPU_PCT) { 236112158Sdas int pct; 237112158Sdas ASSERT3S(nthreads, >=, 0); 238112158Sdas ASSERT3S(nthreads, <=, 100); 239112158Sdas pct = MIN(nthreads, 100); 240112158Sdas pct = MAX(pct, 0); 241112158Sdas 242112158Sdas nthreads = (sysconf(_SC_NPROCESSORS_ONLN) * pct) / 100; 243112158Sdas nthreads = MAX(nthreads, 1); /* need at least 1 thread */ 244112158Sdas } else { 245112158Sdas ASSERT3S(nthreads, >=, 1); 246112158Sdas } 247112158Sdas 248112158Sdas rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL); 249112158Sdas mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL); 250112158Sdas cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL); 251112158Sdas cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL); 252112158Sdas cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL); 253112158Sdas (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1); 254112158Sdas tq->tq_flags = flags | TASKQ_ACTIVE; 255112158Sdas tq->tq_active = nthreads; 256112158Sdas tq->tq_nthreads = nthreads; 257112158Sdas tq->tq_minalloc = minalloc; 258112158Sdas tq->tq_maxalloc = maxalloc; 259112158Sdas tq->tq_task.tqent_next = &tq->tq_task; 260112158Sdas tq->tq_task.tqent_prev = &tq->tq_task; 261112158Sdas tq->tq_threadlist = kmem_alloc(nthreads * sizeof (thread_t), KM_SLEEP); 262112158Sdas 263112158Sdas if (flags & TASKQ_PREPOPULATE) { 264112158Sdas mutex_enter(&tq->tq_lock); 265112158Sdas while (minalloc-- > 0) 266112158Sdas task_free(tq, task_alloc(tq, KM_SLEEP)); 267112158Sdas mutex_exit(&tq->tq_lock); 268112158Sdas } 269112158Sdas 270112158Sdas for (t = 0; t < nthreads; t++) 271112158Sdas (void) thr_create(0, 0, taskq_thread, 272112158Sdas tq, THR_BOUND, &tq->tq_threadlist[t]); 273112158Sdas 274112158Sdas return (tq); 275112158Sdas} 276112158Sdas 277112158Sdasvoid 278112158Sdastaskq_destroy(taskq_t *tq) 279112158Sdas{ 280112158Sdas int t; 281112158Sdas int nthreads = tq->tq_nthreads; 282112158Sdas 283112158Sdas taskq_wait(tq); 284112158Sdas 285112158Sdas mutex_enter(&tq->tq_lock); 286112158Sdas 287112158Sdas tq->tq_flags &= ~TASKQ_ACTIVE; 288112158Sdas cv_broadcast(&tq->tq_dispatch_cv); 289112158Sdas 290112158Sdas while (tq->tq_nthreads != 0) 291112158Sdas cv_wait(&tq->tq_wait_cv, &tq->tq_lock); 292112158Sdas 293112158Sdas tq->tq_minalloc = 0; 294112158Sdas while (tq->tq_nalloc != 0) { 295112158Sdas ASSERT(tq->tq_freelist != NULL); 296112158Sdas task_free(tq, task_alloc(tq, KM_SLEEP)); 297112158Sdas } 298112158Sdas 299112158Sdas mutex_exit(&tq->tq_lock); 300112158Sdas 301112158Sdas for (t = 0; t < nthreads; t++) 302112158Sdas (void) thr_join(tq->tq_threadlist[t], NULL, NULL); 303112158Sdas 304112158Sdas kmem_free(tq->tq_threadlist, nthreads * sizeof (thread_t)); 305112158Sdas 306112158Sdas rw_destroy(&tq->tq_threadlock); 307112158Sdas mutex_destroy(&tq->tq_lock); 308112158Sdas cv_destroy(&tq->tq_dispatch_cv); 309112158Sdas cv_destroy(&tq->tq_wait_cv); 310112158Sdas cv_destroy(&tq->tq_maxalloc_cv); 311112158Sdas 312112158Sdas kmem_free(tq, sizeof (taskq_t)); 313112158Sdas} 314112158Sdas 315112158Sdasint 316112158Sdastaskq_member(taskq_t *tq, void *t) 317112158Sdas{ 318112158Sdas int i; 319112158Sdas 320112158Sdas if (taskq_now) 321112158Sdas return (1); 322112158Sdas 323112158Sdas for (i = 0; i < tq->tq_nthreads; i++) 324112158Sdas if (tq->tq_threadlist[i] == (thread_t)(uintptr_t)t) 325112158Sdas return (1); 326112158Sdas 327112158Sdas return (0); 328112158Sdas} 329112158Sdas 330112158Sdasvoid 331112158Sdassystem_taskq_init(void) 332112158Sdas{ 333112158Sdas system_taskq = taskq_create("system_taskq", 64, minclsyspri, 4, 512, 334112158Sdas TASKQ_DYNAMIC | TASKQ_PREPOPULATE); 335112158Sdas} 336112158Sdas 337112158Sdasvoid 338112158Sdassystem_taskq_fini(void) 339112158Sdas{ 340112158Sdas taskq_destroy(system_taskq); 341112158Sdas system_taskq = NULL; /* defensive */ 342112158Sdas} 343112158Sdas