Lines Matching defs:tq

151  * taskqid_t taskq_dispatch(tq, func, arg, flags):
187 * void taskq_wait(tq):
194 * void taskq_suspend(tq)
200 * int taskq_suspended(tq)
205 * void taskq_resume(tq)
209 * int taskq_member(tq, thread)
211 * Returns 1 if 'thread' belongs to taskq 'tq' and 0 otherwise. The
676 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag) \
683 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag) \
686 (!(tq->tq_flags & TASKQ_PREPOPULATE) || \
687 (tq->tq_nalloc > tq->tq_minalloc)) && \
689 mutex_exit(&tq->tq_lock); \
693 #define TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flag)
694 #define TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flag)
722 #define TQ_DO_ENQUEUE(tq, tqe, func, arg, front) { \
723 ASSERT(MUTEX_HELD(&tq->tq_lock)); \
726 TQ_PREPEND(tq->tq_task, tqe); \
728 TQ_APPEND(tq->tq_task, tqe); \
732 tq->tq_tasks++; \
733 if (tq->tq_tasks - tq->tq_executed > tq->tq_maxtasks) \
734 tq->tq_maxtasks = tq->tq_tasks - tq->tq_executed; \
735 cv_signal(&tq->tq_dispatch_cv); \
736 DTRACE_PROBE2(taskq__enqueue, taskq_t *, tq, taskq_ent_t *, tqe); \
739 #define TQ_ENQUEUE(tq, tqe, func, arg) \
740 TQ_DO_ENQUEUE(tq, tqe, func, arg, 0)
742 #define TQ_ENQUEUE_FRONT(tq, tqe, func, arg) \
743 TQ_DO_ENQUEUE(tq, tqe, func, arg, 1)
758 taskq_t *tq = buf;
760 bzero(tq, sizeof (taskq_t));
762 mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, NULL);
763 rw_init(&tq->tq_threadlock, NULL, RW_DEFAULT, NULL);
764 cv_init(&tq->tq_dispatch_cv, NULL, CV_DEFAULT, NULL);
765 cv_init(&tq->tq_exit_cv, NULL, CV_DEFAULT, NULL);
766 cv_init(&tq->tq_wait_cv, NULL, CV_DEFAULT, NULL);
767 cv_init(&tq->tq_maxalloc_cv, NULL, CV_DEFAULT, NULL);
769 tq->tq_task.tqent_next = &tq->tq_task;
770 tq->tq_task.tqent_prev = &tq->tq_task;
779 taskq_t *tq = buf;
781 ASSERT(tq->tq_nthreads == 0);
782 ASSERT(tq->tq_buckets == NULL);
783 ASSERT(tq->tq_tcreates == 0);
784 ASSERT(tq->tq_tdeaths == 0);
786 mutex_destroy(&tq->tq_lock);
787 rw_destroy(&tq->tq_threadlock);
788 cv_destroy(&tq->tq_dispatch_cv);
789 cv_destroy(&tq->tq_exit_cv);
790 cv_destroy(&tq->tq_wait_cv);
791 cv_destroy(&tq->tq_maxalloc_cv);
833 taskq_update_nthreads(taskq_t *tq, uint_t ncpus)
835 uint_t newtarget = TASKQ_THREADS_PCT(ncpus, tq->tq_threads_ncpus_pct);
838 ASSERT(MUTEX_HELD(&tq->tq_lock));
841 ASSERT3U(tq->tq_nthreads_target, !=, 0);
844 ASSERT3U(newtarget, <=, tq->tq_nthreads_max);
845 if (newtarget != tq->tq_nthreads_target) {
846 tq->tq_flags |= TASKQ_CHANGING;
847 tq->tq_nthreads_target = newtarget;
848 cv_broadcast(&tq->tq_dispatch_cv);
849 cv_broadcast(&tq->tq_exit_cv);
855 taskq_cpupct_install(taskq_t *tq, cpupart_t *cpup)
857 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
860 mutex_enter(&tq->tq_lock);
861 tq->tq_cpupart = cpup->cp_id;
862 taskq_update_nthreads(tq, cpup->cp_ncpus);
863 mutex_exit(&tq->tq_lock);
865 list_insert_tail(&taskq_cpupct_list, tq);
870 taskq_cpupct_remove(taskq_t *tq)
872 ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT);
875 list_remove(&taskq_cpupct_list, tq);
883 taskq_t *tq;
906 for (tq = list_head(&taskq_cpupct_list); tq != NULL;
907 tq = list_next(&taskq_cpupct_list, tq)) {
909 mutex_enter(&tq->tq_lock);
914 if (tq->tq_cpupart == cp->cp_id) {
915 taskq_update_nthreads(tq, ncpus);
917 mutex_exit(&tq->tq_lock);
952 * Assumes: tq->tq_lock is held.
955 taskq_ent_alloc(taskq_t *tq, int flags)
962 ASSERT(MUTEX_HELD(&tq->tq_lock));
968 again: if ((tqe = tq->tq_freelist) != NULL &&
969 ((flags & TQ_NOALLOC) || tq->tq_nalloc >= tq->tq_minalloc)) {
970 tq->tq_freelist = tqe->tqent_next;
975 if (tq->tq_nalloc >= tq->tq_maxalloc) {
990 while (tq->tq_freelist == NULL) {
991 tq->tq_maxalloc_wait++;
992 wait_rv = cv_timedwait(&tq->tq_maxalloc_cv,
993 &tq->tq_lock, wait_time);
994 tq->tq_maxalloc_wait--;
998 if (tq->tq_freelist)
1002 mutex_exit(&tq->tq_lock);
1006 mutex_enter(&tq->tq_lock);
1008 tq->tq_nalloc++;
1019 * Assumes: tq->tq_lock is held.
1022 taskq_ent_free(taskq_t *tq, taskq_ent_t *tqe)
1024 ASSERT(MUTEX_HELD(&tq->tq_lock));
1026 if (tq->tq_nalloc <= tq->tq_minalloc) {
1027 tqe->tqent_next = tq->tq_freelist;
1028 tq->tq_freelist = tqe;
1030 tq->tq_nalloc--;
1031 mutex_exit(&tq->tq_lock);
1033 mutex_enter(&tq->tq_lock);
1036 if (tq->tq_maxalloc_wait)
1037 cv_signal(&tq->tq_maxalloc_cv);
1045 * Assumes: tq->tq_lock is held.
1048 taskq_ent_exists(taskq_t *tq, task_func_t func, void *arg)
1052 ASSERT(MUTEX_HELD(&tq->tq_lock));
1054 for (tqe = tq->tq_task.tqent_next; tqe != &tq->tq_task;
1123 taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags)
1130 ASSERT(tq != NULL);
1133 if (!(tq->tq_flags & TASKQ_DYNAMIC)) {
1141 mutex_enter(&tq->tq_lock);
1143 TASKQ_S_RANDOM_DISPATCH_FAILURE(tq, flags);
1145 if ((tqe = taskq_ent_alloc(tq, flags)) == NULL) {
1146 mutex_exit(&tq->tq_lock);
1150 TQ_ENQUEUE_FRONT(tq, tqe, func, arg);
1152 TQ_ENQUEUE(tq, tqe, func, arg);
1154 mutex_exit(&tq->tq_lock);
1162 TASKQ_D_RANDOM_DISPATCH_FAILURE(tq, flags);
1164 bsize = tq->tq_nbuckets;
1171 if ((tqe = taskq_bucket_dispatch(tq->tq_buckets, func, arg))
1174 bucket = tq->tq_buckets;
1187 b = &tq->tq_buckets[h & (bsize - 1)];
1188 ASSERT(b->tqbucket_taskq == tq); /* Sanity check */
1209 b = &tq->tq_buckets[++h & (bsize - 1)];
1210 ASSERT(b->tqbucket_taskq == tq); /* Sanity check */
1251 mutex_enter(&tq->tq_lock);
1252 if (!taskq_ent_exists(tq, taskq_bucket_extend, bucket)) {
1253 if ((tqe1 = taskq_ent_alloc(tq, TQ_NOSLEEP)) != NULL) {
1254 TQ_ENQUEUE_FRONT(tq, tqe1, taskq_bucket_extend, bucket);
1265 if ((tqe = taskq_ent_alloc(tq, flags)) != NULL) {
1266 TQ_ENQUEUE(tq, tqe, func, arg);
1271 mutex_exit(&tq->tq_lock);
1281 taskq_wait(taskq_t *tq)
1283 ASSERT(tq != curthread->t_taskq);
1285 mutex_enter(&tq->tq_lock);
1286 while (tq->tq_task.tqent_next != &tq->tq_task || tq->tq_active != 0)
1287 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
1288 mutex_exit(&tq->tq_lock);
1290 if (tq->tq_flags & TASKQ_DYNAMIC) {
1291 taskq_bucket_t *b = tq->tq_buckets;
1293 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
1310 taskq_suspend(taskq_t *tq)
1312 rw_enter(&tq->tq_threadlock, RW_WRITER);
1314 if (tq->tq_flags & TASKQ_DYNAMIC) {
1315 taskq_bucket_t *b = tq->tq_buckets;
1317 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
1326 mutex_enter(&tq->tq_lock);
1327 ASSERT(!(tq->tq_flags & TASKQ_SUSPENDED));
1328 tq->tq_flags |= TASKQ_SUSPENDED;
1329 mutex_exit(&tq->tq_lock);
1333 * returns: 1 if tq is suspended, 0 otherwise.
1336 taskq_suspended(taskq_t *tq)
1338 return ((tq->tq_flags & TASKQ_SUSPENDED) != 0);
1345 taskq_resume(taskq_t *tq)
1347 ASSERT(RW_WRITE_HELD(&tq->tq_threadlock));
1349 if (tq->tq_flags & TASKQ_DYNAMIC) {
1350 taskq_bucket_t *b = tq->tq_buckets;
1352 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
1358 mutex_enter(&tq->tq_lock);
1359 ASSERT(tq->tq_flags & TASKQ_SUSPENDED);
1360 tq->tq_flags &= ~TASKQ_SUSPENDED;
1361 mutex_exit(&tq->tq_lock);
1363 rw_exit(&tq->tq_threadlock);
1367 taskq_member(taskq_t *tq, kthread_t *thread)
1369 return (thread->t_taskq == tq);
1381 taskq_thread_create(taskq_t *tq)
1384 const boolean_t first = (tq->tq_nthreads == 0);
1386 ASSERT(MUTEX_HELD(&tq->tq_lock));
1387 ASSERT(tq->tq_flags & TASKQ_CHANGING);
1388 ASSERT(tq->tq_nthreads < tq->tq_nthreads_target);
1389 ASSERT(!(tq->tq_flags & TASKQ_THREAD_CREATED));
1392 tq->tq_flags |= TASKQ_THREAD_CREATED;
1393 tq->tq_active++;
1394 mutex_exit(&tq->tq_lock);
1396 if (tq->tq_proc != &p0) {
1397 t = lwp_kernel_create(tq->tq_proc, taskq_thread, tq, TS_RUN,
1398 tq->tq_pri);
1400 t = thread_create(NULL, 0, taskq_thread, tq, 0, &p0, TS_RUN,
1401 tq->tq_pri);
1405 mutex_enter(&tq->tq_lock);
1410 * We know the thread cannot go away, since tq cannot be
1414 if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) {
1415 taskq_cpupct_install(tq, t->t_cpupart);
1417 mutex_enter(&tq->tq_lock);
1420 while (tq->tq_nthreads != tq->tq_nthreads_target &&
1421 tq->tq_nthreads < TASKQ_CREATE_ACTIVE_THREADS) {
1422 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
1431 taskq_thread_wait(taskq_t *tq, kmutex_t *mx, kcondvar_t *cv,
1436 if (!(tq->tq_flags & TASKQ_CPR_SAFE)) {
1444 if (!(tq->tq_flags & TASKQ_CPR_SAFE)) {
1459 taskq_t *tq = arg;
1464 curthread->t_taskq = tq; /* mark ourselves for taskq_member() */
1466 if (curproc != &p0 && (tq->tq_flags & TASKQ_DUTY_CYCLE)) {
1467 sysdc_thread_enter(curthread, tq->tq_DC,
1468 (tq->tq_flags & TASKQ_DC_BATCH) ? SYSDC_THREAD_BATCH : 0);
1471 if (tq->tq_flags & TASKQ_CPR_SAFE) {
1472 CALLB_CPR_INIT_SAFE(curthread, tq->tq_name);
1474 CALLB_CPR_INIT(&cprinfo, &tq->tq_lock, callb_generic_cpr,
1475 tq->tq_name);
1477 mutex_enter(&tq->tq_lock);
1478 thread_id = ++tq->tq_nthreads;
1479 ASSERT(tq->tq_flags & TASKQ_THREAD_CREATED);
1480 ASSERT(tq->tq_flags & TASKQ_CHANGING);
1481 tq->tq_flags &= ~TASKQ_THREAD_CREATED;
1483 VERIFY3S(thread_id, <=, tq->tq_nthreads_max);
1485 if (tq->tq_nthreads_max == 1)
1486 tq->tq_thread = curthread;
1488 tq->tq_threadlist[thread_id - 1] = curthread;
1491 if (tq->tq_nthreads == TASKQ_CREATE_ACTIVE_THREADS)
1492 cv_broadcast(&tq->tq_wait_cv);
1495 if (tq->tq_flags & TASKQ_CHANGING) {
1497 if (thread_id > tq->tq_nthreads_target) {
1509 if (thread_id == tq->tq_nthreads ||
1510 tq->tq_nthreads_target == 0)
1514 (void) taskq_thread_wait(tq, &tq->tq_lock,
1515 &tq->tq_exit_cv, &cprinfo, -1);
1523 if (!(tq->tq_flags & TASKQ_THREAD_CREATED)) {
1525 if (tq->tq_nthreads == tq->tq_nthreads_target) {
1526 tq->tq_flags &= ~TASKQ_CHANGING;
1527 cv_broadcast(&tq->tq_wait_cv);
1530 if (tq->tq_nthreads < tq->tq_nthreads_target) {
1531 taskq_thread_create(tq);
1536 if ((tqe = tq->tq_task.tqent_next) == &tq->tq_task) {
1537 if (--tq->tq_active == 0)
1538 cv_broadcast(&tq->tq_wait_cv);
1539 (void) taskq_thread_wait(tq, &tq->tq_lock,
1540 &tq->tq_dispatch_cv, &cprinfo, -1);
1541 tq->tq_active++;
1547 mutex_exit(&tq->tq_lock);
1549 rw_enter(&tq->tq_threadlock, RW_READER);
1551 DTRACE_PROBE2(taskq__exec__start, taskq_t *, tq,
1554 DTRACE_PROBE2(taskq__exec__end, taskq_t *, tq,
1557 rw_exit(&tq->tq_threadlock);
1559 mutex_enter(&tq->tq_lock);
1560 tq->tq_totaltime += end - start;
1561 tq->tq_executed++;
1563 taskq_ent_free(tq, tqe);
1566 if (tq->tq_nthreads_max == 1)
1567 tq->tq_thread = NULL;
1569 tq->tq_threadlist[thread_id - 1] = NULL;
1572 ASSERT(tq->tq_active > 0);
1573 tq->tq_active--;
1575 ASSERT(tq->tq_nthreads > 0);
1576 tq->tq_nthreads--;
1579 cv_broadcast(&tq->tq_exit_cv);
1580 if (tq->tq_nthreads == tq->tq_nthreads_target) {
1581 if (!(tq->tq_flags & TASKQ_THREAD_CREATED))
1582 tq->tq_flags &= ~TASKQ_CHANGING;
1584 cv_broadcast(&tq->tq_wait_cv);
1587 ASSERT(!(tq->tq_flags & TASKQ_CPR_SAFE));
1588 CALLB_CPR_EXIT(&cprinfo); /* drops tq->tq_lock */
1604 taskq_t *tq = bucket->tqbucket_taskq;
1610 CALLB_CPR_INIT(&cprinfo, lock, callb_generic_cpr, tq->tq_name);
1638 DTRACE_PROBE3(taskq__d__exec__start, taskq_t *, tq,
1641 DTRACE_PROBE3(taskq__d__exec__end, taskq_t *, tq,
1673 w = taskq_thread_wait(tq, lock, cv,
1719 mutex_enter(&tq->tq_lock);
1720 tq->tq_tdeaths++;
1721 mutex_exit(&tq->tq_lock);
1796 taskq_t *tq = kmem_cache_alloc(taskq_cache, KM_SLEEP);
1823 tq->tq_maxsize = nthreads;
1844 tq->tq_threads_ncpus_pct = pct;
1860 (void) strncpy(tq->tq_name, name, TASKQ_NAMELEN + 1);
1861 strident_canon(tq->tq_name, TASKQ_NAMELEN + 1);
1863 tq->tq_flags = flags | TASKQ_CHANGING;
1864 tq->tq_active = 0;
1865 tq->tq_instance = instance;
1866 tq->tq_nthreads_target = nthreads;
1867 tq->tq_nthreads_max = max_nthreads;
1868 tq->tq_minalloc = minalloc;
1869 tq->tq_maxalloc = maxalloc;
1870 tq->tq_nbuckets = bsize;
1871 tq->tq_proc = proc;
1872 tq->tq_pri = pri;
1873 tq->tq_DC = dc;
1874 list_link_init(&tq->tq_cpupct_link);
1877 tq->tq_threadlist = kmem_alloc(
1880 mutex_enter(&tq->tq_lock);
1883 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP));
1891 taskq_thread_create(tq);
1892 mutex_exit(&tq->tq_lock);
1899 tq->tq_buckets = bucket;
1906 bucket->tqbucket_taskq = tq;
1924 instance = tq->tq_instance =
1929 if ((tq->tq_kstat = kstat_create("unix", instance,
1930 tq->tq_name, "taskq_d", KSTAT_TYPE_NAMED,
1933 tq->tq_kstat->ks_lock = &taskq_d_kstat_lock;
1934 tq->tq_kstat->ks_data = &taskq_d_kstat;
1935 tq->tq_kstat->ks_update = taskq_d_kstat_update;
1936 tq->tq_kstat->ks_private = tq;
1937 kstat_install(tq->tq_kstat);
1940 if ((tq->tq_kstat = kstat_create("unix", instance, tq->tq_name,
1944 tq->tq_kstat->ks_lock = &taskq_kstat_lock;
1945 tq->tq_kstat->ks_data = &taskq_kstat;
1946 tq->tq_kstat->ks_update = taskq_kstat_update;
1947 tq->tq_kstat->ks_private = tq;
1948 kstat_install(tq->tq_kstat);
1952 return (tq);
1962 taskq_destroy(taskq_t *tq)
1964 taskq_bucket_t *b = tq->tq_buckets;
1967 ASSERT(! (tq->tq_flags & TASKQ_CPR_SAFE));
1972 if (tq->tq_kstat != NULL) {
1973 kstat_delete(tq->tq_kstat);
1974 tq->tq_kstat = NULL;
1980 if (tq->tq_flags & TASKQ_NOINSTANCE) {
1981 vmem_free(taskq_id_arena, (void *)(uintptr_t)(tq->tq_instance),
1983 tq->tq_instance = 0;
1989 if (tq->tq_flags & TASKQ_THREADS_CPU_PCT) {
1990 taskq_cpupct_remove(tq);
1996 taskq_wait(tq);
1998 mutex_enter(&tq->tq_lock);
1999 ASSERT((tq->tq_task.tqent_next == &tq->tq_task) &&
2000 (tq->tq_active == 0));
2003 tq->tq_nthreads_target = 0;
2005 tq->tq_flags |= TASKQ_CHANGING;
2006 cv_broadcast(&tq->tq_dispatch_cv);
2007 cv_broadcast(&tq->tq_exit_cv);
2009 while (tq->tq_nthreads != 0)
2010 cv_wait(&tq->tq_wait_cv, &tq->tq_lock);
2012 if (tq->tq_nthreads_max != 1)
2013 kmem_free(tq->tq_threadlist, sizeof (kthread_t *) *
2014 tq->tq_nthreads_max);
2016 tq->tq_minalloc = 0;
2017 while (tq->tq_nalloc != 0)
2018 taskq_ent_free(tq, taskq_ent_alloc(tq, TQ_SLEEP));
2020 mutex_exit(&tq->tq_lock);
2025 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {
2051 if (tq->tq_buckets != NULL) {
2052 ASSERT(tq->tq_flags & TASKQ_DYNAMIC);
2053 kmem_free(tq->tq_buckets,
2054 sizeof (taskq_bucket_t) * tq->tq_nbuckets);
2056 /* Cleanup fields before returning tq to the cache */
2057 tq->tq_buckets = NULL;
2058 tq->tq_tcreates = 0;
2059 tq->tq_tdeaths = 0;
2061 ASSERT(!(tq->tq_flags & TASKQ_DYNAMIC));
2064 tq->tq_threads_ncpus_pct = 0;
2065 tq->tq_totaltime = 0;
2066 tq->tq_tasks = 0;
2067 tq->tq_maxtasks = 0;
2068 tq->tq_executed = 0;
2069 kmem_cache_free(taskq_cache, tq);
2086 taskq_t *tq = b->tqbucket_taskq;
2094 mutex_enter(&tq->tq_lock);
2099 if (tq->tq_tcreates++ - tq->tq_tdeaths > tq->tq_maxsize) {
2100 tq->tq_tcreates--;
2101 mutex_exit(&tq->tq_lock);
2104 mutex_exit(&tq->tq_lock);
2109 mutex_enter(&tq->tq_lock);
2111 tq->tq_tcreates--;
2112 mutex_exit(&tq->tq_lock);
2125 0, &p0, TS_STOPPED, tq->tq_pri);
2148 tqe->tqent_thread->t_taskq = tq;
2158 taskq_t *tq = ksp->ks_private;
2163 tqsp->tq_pid.value.ui64 = tq->tq_proc->p_pid;
2164 tqsp->tq_tasks.value.ui64 = tq->tq_tasks;
2165 tqsp->tq_executed.value.ui64 = tq->tq_executed;
2166 tqsp->tq_maxtasks.value.ui64 = tq->tq_maxtasks;
2167 tqsp->tq_totaltime.value.ui64 = tq->tq_totaltime;
2168 tqsp->tq_nactive.value.ui64 = tq->tq_active;
2169 tqsp->tq_nalloc.value.ui64 = tq->tq_nalloc;
2170 tqsp->tq_pri.value.ui64 = tq->tq_pri;
2171 tqsp->tq_nthreads.value.ui64 = tq->tq_nthreads;
2179 taskq_t *tq = ksp->ks_private;
2180 taskq_bucket_t *b = tq->tq_buckets;
2186 ASSERT(tq->tq_flags & TASKQ_DYNAMIC);
2188 tqsp->tqd_btasks.value.ui64 = tq->tq_tasks;
2189 tqsp->tqd_bexecuted.value.ui64 = tq->tq_executed;
2190 tqsp->tqd_bmaxtasks.value.ui64 = tq->tq_maxtasks;
2191 tqsp->tqd_bnalloc.value.ui64 = tq->tq_nalloc;
2192 tqsp->tqd_bnactive.value.ui64 = tq->tq_active;
2193 tqsp->tqd_btotaltime.value.ui64 = tq->tq_totaltime;
2194 tqsp->tqd_pri.value.ui64 = tq->tq_pri;
2208 for (; (b != NULL) && (bid < tq->tq_nbuckets); b++, bid++) {