sigev_thread.c (156267) | sigev_thread.c (156383) |
---|---|
1/* 2 * Copyright (c) 2005 David Xu <davidxu@freebsd.org> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright --- 9 unchanged lines hidden (view full) --- 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 * | 1/* 2 * Copyright (c) 2005 David Xu <davidxu@freebsd.org> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright --- 9 unchanged lines hidden (view full) --- 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 * |
26 * $FreeBSD: head/lib/librt/sigev_thread.c 156267 2006-03-04 00:18:19Z davidxu $ | 26 * $FreeBSD: head/lib/librt/sigev_thread.c 156383 2006-03-07 08:28:07Z davidxu $ |
27 * 28 */ 29 30#include <sys/types.h> 31#include <machine/atomic.h> 32 33#include "namespace.h" | 27 * 28 */ 29 30#include <sys/types.h> 31#include <machine/atomic.h> 32 33#include "namespace.h" |
34#include <assert.h> | |
35#include <err.h> 36#include <errno.h> 37#include <ucontext.h> 38#include <sys/thr.h> | 34#include <err.h> 35#include <errno.h> 36#include <ucontext.h> 37#include <sys/thr.h> |
39#include <sys/time.h> | |
40#include <stdio.h> 41#include <stdlib.h> 42#include <string.h> 43#include <signal.h> 44#include <pthread.h> 45#include "un-namespace.h" 46 47#include "sigev_thread.h" 48 | 38#include <stdio.h> 39#include <stdlib.h> 40#include <string.h> 41#include <signal.h> 42#include <pthread.h> 43#include "un-namespace.h" 44 45#include "sigev_thread.h" 46 |
49/* Lowest number of worker threads should be kept. */ 50#define SIGEV_WORKER_LOW 0 51 52/* Highest number of worker threads can be created. */ 53#define SIGEV_WORKER_HIGH 20 54 55/* How long an idle worker thread should stay. */ 56#define SIGEV_WORKER_IDLE 10 57 58struct sigev_worker { 59 LIST_ENTRY(sigev_worker) sw_link; 60 pthread_cond_t sw_cv; 61 struct sigev_node *sw_sn; 62 int sw_flags; 63 int *sw_readyptr; 64}; 65 66#define SWF_READYQ 1 67 | |
68LIST_HEAD(sigev_list_head, sigev_node); 69#define HASH_QUEUES 17 70#define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES) 71 72static struct sigev_list_head sigev_hash[HASH_QUEUES]; 73static struct sigev_list_head sigev_all; | 47LIST_HEAD(sigev_list_head, sigev_node); 48#define HASH_QUEUES 17 49#define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES) 50 51static struct sigev_list_head sigev_hash[HASH_QUEUES]; 52static struct sigev_list_head sigev_all; |
74static TAILQ_HEAD(, sigev_node) sigev_actq; 75static TAILQ_HEAD(, sigev_thread_node) sigev_threads; | 53static LIST_HEAD(,sigev_thread) sigev_threads; |
76static int sigev_generation; 77static pthread_mutex_t *sigev_list_mtx; 78static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; 79static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT; | 54static int sigev_generation; 55static pthread_mutex_t *sigev_list_mtx; 56static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; 57static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT; |
80static pthread_mutex_t *sigev_threads_mtx; 81static pthread_cond_t *sigev_threads_cv; 82static pthread_cond_t *sigev_actq_cv; 83static struct sigev_thread_node *sigev_default_thread; 84static struct sigev_thread_attr sigev_default_sna; | 58static struct sigev_thread *sigev_default_thread; |
85static pthread_attr_t sigev_default_attr; 86static int atfork_registered; | 59static pthread_attr_t sigev_default_attr; 60static int atfork_registered; |
87static LIST_HEAD(,sigev_worker) sigev_worker_ready; 88static int sigev_worker_count; 89static int sigev_worker_start; 90static int sigev_worker_high; 91static int sigev_worker_low; 92static pthread_cond_t *sigev_worker_init_cv; | |
93 | 61 |
94static void __sigev_fork_prepare(void); 95static void __sigev_fork_parent(void); 96static void __sigev_fork_child(void); 97static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *, 98 struct sigev_thread_node *, int); 99static void *sigev_service_loop(void *); 100static void *sigev_worker_routine(void *); 101static void sigev_put(struct sigev_node *); 102static void worker_cleanup(void *arg); | 62static void __sigev_fork_prepare(void); 63static void __sigev_fork_parent(void); 64static void __sigev_fork_child(void); 65static struct sigev_thread *sigev_thread_create(int); 66static void *sigev_service_loop(void *); 67static void *worker_routine(void *); 68static void worker_cleanup(void *); |
103 104#pragma weak pthread_create | 69 70#pragma weak pthread_create |
105#pragma weak pthread_attr_getschedpolicy 106#pragma weak pthread_attr_getinheritsched 107#pragma weak pthread_attr_getschedparam 108#pragma weak pthread_attr_getscope 109#pragma weak pthread_attr_getstacksize 110#pragma weak pthread_attr_getstackaddr 111#pragma weak pthread_attr_getguardsize 112#pragma weak pthread_attr_init 113#pragma weak pthread_attr_setscope 114#pragma weak pthread_attr_setdetachstate 115#pragma weak pthread_atfork 116#pragma weak _pthread_once 117#pragma weak pthread_cleanup_push 118#pragma weak pthread_cleanup_pop 119#pragma weak pthread_setcancelstate | |
120 | 71 |
121static __inline void 122attr2sna(pthread_attr_t *attr, struct sigev_thread_attr *sna) | 72static void 73attrcopy(pthread_attr_t *src, pthread_attr_t *dst) |
123{ | 74{ |
124 struct sched_param sched_param; | 75 struct sched_param sched; 76 void *a; 77 size_t u; 78 int v; |
125 | 79 |
126 pthread_attr_getschedpolicy(attr, &sna->sna_policy); 127 pthread_attr_getinheritsched(attr, &sna->sna_inherit); 128 pthread_attr_getschedparam(attr, &sched_param); 129 sna->sna_prio = sched_param.sched_priority; 130 pthread_attr_getstacksize(attr, &sna->sna_stacksize); 131 pthread_attr_getstackaddr(attr, &sna->sna_stackaddr); 132 pthread_attr_getguardsize(attr, &sna->sna_guardsize); 133} | 80 _pthread_attr_getschedpolicy(src, &v); 81 _pthread_attr_setschedpolicy(dst, v); |
134 | 82 |
135static __inline int 136sna_eq(const struct sigev_thread_attr *a, const struct sigev_thread_attr *b) 137{ 138 return memcmp(a, b, sizeof(*a)) == 0; | 83 _pthread_attr_getinheritsched(src, &v); 84 _pthread_attr_setinheritsched(dst, v); 85 86 _pthread_attr_getschedparam(src, &sched); 87 _pthread_attr_setschedparam(dst, &sched); 88 89 _pthread_attr_getscope(src, &v); 90 _pthread_attr_setscope(dst, v); 91 92 _pthread_attr_getstacksize(src, &u); 93 _pthread_attr_setstacksize(dst, u); 94 95 _pthread_attr_getstackaddr(src, &a); 96 _pthread_attr_setstackaddr(src, a); 97 98 _pthread_attr_getguardsize(src, &u); 99 _pthread_attr_setguardsize(dst, u); |
139} 140 141static __inline int 142have_threads(void) 143{ 144 return (&pthread_create != NULL); 145} 146 147void 148__sigev_thread_init(void) 149{ 150 static int inited = 0; | 100} 101 102static __inline int 103have_threads(void) 104{ 105 return (&pthread_create != NULL); 106} 107 108void 109__sigev_thread_init(void) 110{ 111 static int inited = 0; |
112 pthread_mutexattr_t mattr; |
|
151 int i; 152 | 113 int i; 114 |
115 _pthread_mutexattr_init(&mattr); 116 _pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_NORMAL); |
|
153 sigev_list_mtx = malloc(sizeof(pthread_mutex_t)); | 117 sigev_list_mtx = malloc(sizeof(pthread_mutex_t)); |
154 _pthread_mutex_init(sigev_list_mtx, NULL); 155 sigev_threads_mtx = malloc(sizeof(pthread_mutex_t)); 156 _pthread_mutex_init(sigev_threads_mtx, NULL); 157 sigev_threads_cv = malloc(sizeof(pthread_cond_t)); 158 _pthread_cond_init(sigev_threads_cv, NULL); 159 sigev_actq_cv = malloc(sizeof(pthread_cond_t)); 160 _pthread_cond_init(sigev_actq_cv, NULL); | 118 _pthread_mutex_init(sigev_list_mtx, &mattr); 119 _pthread_mutexattr_destroy(&mattr); 120 |
161 for (i = 0; i < HASH_QUEUES; ++i) 162 LIST_INIT(&sigev_hash[i]); 163 LIST_INIT(&sigev_all); | 121 for (i = 0; i < HASH_QUEUES; ++i) 122 LIST_INIT(&sigev_hash[i]); 123 LIST_INIT(&sigev_all); |
164 TAILQ_INIT(&sigev_threads); 165 TAILQ_INIT(&sigev_actq); | 124 LIST_INIT(&sigev_threads); |
166 sigev_default_thread = NULL; | 125 sigev_default_thread = NULL; |
167 sigev_worker_count = 0; 168 sigev_worker_start = 0; 169 LIST_INIT(&sigev_worker_ready); 170 sigev_worker_high = SIGEV_WORKER_HIGH; 171 sigev_worker_low = SIGEV_WORKER_LOW; 172 sigev_worker_init_cv = malloc(sizeof(pthread_cond_t)); 173 _pthread_cond_init(sigev_worker_init_cv, NULL); | |
174 if (atfork_registered == 0) { | 126 if (atfork_registered == 0) { |
175 pthread_atfork( | 127 _pthread_atfork( |
176 __sigev_fork_prepare, 177 __sigev_fork_parent, 178 __sigev_fork_child); 179 atfork_registered = 1; 180 } 181 if (!inited) { | 128 __sigev_fork_prepare, 129 __sigev_fork_parent, 130 __sigev_fork_child); 131 atfork_registered = 1; 132 } 133 if (!inited) { |
182 pthread_attr_init(&sigev_default_attr); 183 attr2sna(&sigev_default_attr, &sigev_default_sna); 184 pthread_attr_setscope(&sigev_default_attr, | 134 _pthread_attr_init(&sigev_default_attr); 135 _pthread_attr_setscope(&sigev_default_attr, |
185 PTHREAD_SCOPE_SYSTEM); | 136 PTHREAD_SCOPE_SYSTEM); |
186 pthread_attr_setdetachstate(&sigev_default_attr, | 137 _pthread_attr_setdetachstate(&sigev_default_attr, |
187 PTHREAD_CREATE_DETACHED); 188 inited = 1; 189 } | 138 PTHREAD_CREATE_DETACHED); 139 inited = 1; 140 } |
190 sigev_default_thread = sigev_thread_create(NULL, NULL, 0); | 141 sigev_default_thread = sigev_thread_create(0); |
191} 192 193int 194__sigev_check_init(void) 195{ 196 if (!have_threads()) 197 return (-1); 198 --- 19 unchanged lines hidden (view full) --- 218 * check if the handlers were already registered in 219 * pthread_atfork(). 220 */ 221 atfork_registered = 1; 222 memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once)); 223 __sigev_thread_init(); 224} 225 | 142} 143 144int 145__sigev_check_init(void) 146{ 147 if (!have_threads()) 148 return (-1); 149 --- 19 unchanged lines hidden (view full) --- 169 * check if the handlers were already registered in 170 * pthread_atfork(). 171 */ 172 atfork_registered = 1; 173 memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once)); 174 __sigev_thread_init(); 175} 176 |
226int | 177void |
227__sigev_list_lock(void) 228{ | 178__sigev_list_lock(void) 179{ |
229 return _pthread_mutex_lock(sigev_list_mtx); | 180 _pthread_mutex_lock(sigev_list_mtx); |
230} 231 | 181} 182 |
232int | 183void |
233__sigev_list_unlock(void) 234{ | 184__sigev_list_unlock(void) 185{ |
235 return _pthread_mutex_unlock(sigev_list_mtx); | 186 _pthread_mutex_unlock(sigev_list_mtx); |
236} 237 | 187} 188 |
238int 239__sigev_thread_list_lock(void) 240{ 241 return _pthread_mutex_lock(sigev_threads_mtx); 242} 243 244int 245__sigev_thread_list_unlock(void) 246{ 247 return _pthread_mutex_unlock(sigev_threads_mtx); 248} 249 | |
250struct sigev_node * 251__sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev, | 189struct sigev_node * 190__sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev, |
252 int usethreadpool) | 191 int usedefault) |
253{ 254 struct sigev_node *sn; 255 256 sn = calloc(1, sizeof(*sn)); 257 if (sn != NULL) { 258 sn->sn_value = evp->sigev_value; | 192{ 193 struct sigev_node *sn; 194 195 sn = calloc(1, sizeof(*sn)); 196 if (sn != NULL) { 197 sn->sn_value = evp->sigev_value; |
259 sn->sn_func = evp->sigev_notify_function; 260 sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1); 261 sn->sn_type = type; 262 if (usethreadpool) 263 sn->sn_flags |= SNF_THREADPOOL; 264 sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes, 265 prev ? prev->sn_tn : NULL, usethreadpool); 266 if (sn->sn_tn == NULL) { 267 free(sn); 268 sn = NULL; | 198 sn->sn_func = evp->sigev_notify_function; 199 sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1); 200 sn->sn_type = type; 201 _pthread_attr_init(&sn->sn_attr); 202 _pthread_attr_setdetachstate(&sn->sn_attr, PTHREAD_CREATE_DETACHED); 203 if (evp->sigev_notify_attributes) 204 attrcopy(evp->sigev_notify_attributes, &sn->sn_attr); 205 if (prev) { 206 __sigev_list_lock(); 207 prev->sn_tn->tn_refcount++; 208 __sigev_list_unlock(); 209 sn->sn_tn = prev->sn_tn; 210 } else { 211 sn->sn_tn = sigev_thread_create(usedefault); 212 if (sn->sn_tn == NULL) { 213 _pthread_attr_destroy(&sn->sn_attr); 214 free(sn); 215 sn = NULL; 216 } |
269 } 270 } 271 return (sn); 272} 273 274void 275__sigev_get_sigevent(struct sigev_node *sn, struct sigevent *newevp, 276 sigev_id_t id) 277{ 278 /* | 217 } 218 } 219 return (sn); 220} 221 222void 223__sigev_get_sigevent(struct sigev_node *sn, struct sigevent *newevp, 224 sigev_id_t id) 225{ 226 /* |
279 * Build a new sigevent, and tell kernel to deliver SIGEV_SIGSERVICE | 227 * Build a new sigevent, and tell kernel to deliver SIGSERVICE |
280 * signal to the new thread. 281 */ 282 newevp->sigev_notify = SIGEV_THREAD_ID; | 228 * signal to the new thread. 229 */ 230 newevp->sigev_notify = SIGEV_THREAD_ID; |
283 newevp->sigev_signo = SIGEV_SIGSERVICE; | 231 newevp->sigev_signo = SIGSERVICE; |
284 newevp->sigev_notify_thread_id = (lwpid_t)sn->sn_tn->tn_lwpid; 285 newevp->sigev_value.sival_ptr = (void *)id; 286} 287 288void 289__sigev_free(struct sigev_node *sn) 290{ | 232 newevp->sigev_notify_thread_id = (lwpid_t)sn->sn_tn->tn_lwpid; 233 newevp->sigev_value.sival_ptr = (void *)id; 234} 235 236void 237__sigev_free(struct sigev_node *sn) 238{ |
239 _pthread_attr_destroy(&sn->sn_attr); |
|
291 free(sn); 292} 293 294struct sigev_node * 295__sigev_find(int type, sigev_id_t id) 296{ 297 struct sigev_node *sn; 298 int chain = HASH(type, id); --- 6 unchanged lines hidden (view full) --- 305} 306 307int 308__sigev_register(struct sigev_node *sn) 309{ 310 int chain = HASH(sn->sn_type, sn->sn_id); 311 312 LIST_INSERT_HEAD(&sigev_hash[chain], sn, sn_link); | 240 free(sn); 241} 242 243struct sigev_node * 244__sigev_find(int type, sigev_id_t id) 245{ 246 struct sigev_node *sn; 247 int chain = HASH(type, id); --- 6 unchanged lines hidden (view full) --- 254} 255 256int 257__sigev_register(struct sigev_node *sn) 258{ 259 int chain = HASH(sn->sn_type, sn->sn_id); 260 261 LIST_INSERT_HEAD(&sigev_hash[chain], sn, sn_link); |
313 LIST_INSERT_HEAD(&sigev_all, sn, sn_allist); | |
314 return (0); 315} 316 317int 318__sigev_delete(int type, sigev_id_t id) 319{ 320 struct sigev_node *sn; 321 322 sn = __sigev_find(type, id); 323 if (sn != NULL) 324 return (__sigev_delete_node(sn)); 325 return (0); 326} 327 328int 329__sigev_delete_node(struct sigev_node *sn) 330{ 331 LIST_REMOVE(sn, sn_link); | 262 return (0); 263} 264 265int 266__sigev_delete(int type, sigev_id_t id) 267{ 268 struct sigev_node *sn; 269 270 sn = __sigev_find(type, id); 271 if (sn != NULL) 272 return (__sigev_delete_node(sn)); 273 return (0); 274} 275 276int 277__sigev_delete_node(struct sigev_node *sn) 278{ 279 LIST_REMOVE(sn, sn_link); |
332 LIST_REMOVE(sn, sn_allist); | |
333 | 280 |
334 __sigev_thread_list_lock(); | |
335 if (--sn->sn_tn->tn_refcount == 0) | 281 if (--sn->sn_tn->tn_refcount == 0) |
336 if (!(sn->sn_flags & SNF_THREADPOOL)) 337 pthread_kill(sn->sn_tn->tn_thread, SIGEV_SIGSERVICE); 338 __sigev_thread_list_unlock(); | 282 _pthread_kill(sn->sn_tn->tn_thread, SIGSERVICE); |
339 if (sn->sn_flags & SNF_WORKING) 340 sn->sn_flags |= SNF_REMOVED; | 283 if (sn->sn_flags & SNF_WORKING) 284 sn->sn_flags |= SNF_REMOVED; |
341 else { 342 if (sn->sn_flags & SNF_ACTQ) { 343 TAILQ_REMOVE(&sigev_actq, sn, sn_actq); 344 } | 285 else |
345 __sigev_free(sn); | 286 __sigev_free(sn); |
346 } | |
347 return (0); 348} 349 | 287 return (0); 288} 289 |
350static 351sigev_id_t | 290static sigev_id_t |
352sigev_get_id(siginfo_t *si) 353{ 354 switch(si->si_code) { 355 case SI_TIMER: 356 return (si->si_timerid); 357 case SI_MESGQ: 358 return (si->si_mqd); 359 case SI_ASYNCIO: 360 return (sigev_id_t)si->si_value.sival_ptr; 361 } 362 return (-1); 363} 364 | 291sigev_get_id(siginfo_t *si) 292{ 293 switch(si->si_code) { 294 case SI_TIMER: 295 return (si->si_timerid); 296 case SI_MESGQ: 297 return (si->si_mqd); 298 case SI_ASYNCIO: 299 return (sigev_id_t)si->si_value.sival_ptr; 300 } 301 return (-1); 302} 303 |
365static struct sigev_thread_node * 366sigev_thread_create(pthread_attr_t *pattr, struct sigev_thread_node *prev, 367 int usepool) | 304static struct sigev_thread * 305sigev_thread_create(int usedefault) |
368{ | 306{ |
369 struct sigev_thread_node *tn; 370 struct sigev_thread_attr sna; | 307 struct sigev_thread *tn; |
371 sigset_t set; 372 int ret; 373 | 308 sigset_t set; 309 int ret; 310 |
374 if (pattr == NULL) 375 pattr = &sigev_default_attr; 376 else { 377 pthread_attr_setscope(pattr, PTHREAD_SCOPE_SYSTEM); 378 pthread_attr_setdetachstate(pattr, PTHREAD_CREATE_DETACHED); 379 } 380 381 attr2sna(pattr, &sna); 382 383 __sigev_thread_list_lock(); 384 385 if (prev != NULL && sna_eq(&prev->tn_sna, &sna)) { 386 prev->tn_refcount++; 387 __sigev_thread_list_unlock(); 388 return (prev); 389 } 390 391 if (sna_eq(&sna, &sigev_default_sna) && usepool && 392 sigev_default_thread != NULL) { | 311 if (usedefault && sigev_default_thread) { 312 __sigev_list_lock(); |
393 sigev_default_thread->tn_refcount++; | 313 sigev_default_thread->tn_refcount++; |
394 __sigev_thread_list_unlock(); 395 return (sigev_default_thread); | 314 __sigev_list_unlock(); 315 return (sigev_default_thread); |
396 } 397 | 316 } 317 |
398 tn = NULL; 399 /* Search a thread matching the required stack address */ 400 if (sna.sna_stackaddr != NULL) { 401 TAILQ_FOREACH(tn, &sigev_threads, tn_link) { 402 if (sna.sna_stackaddr == tn->tn_sna.sna_stackaddr) 403 break; 404 } 405 } 406 407 if (tn != NULL) { 408 tn->tn_refcount++; 409 __sigev_thread_list_unlock(); 410 return (tn); 411 } | |
412 tn = malloc(sizeof(*tn)); | 318 tn = malloc(sizeof(*tn)); |
413 tn->tn_sna = sna; | |
414 tn->tn_cur = NULL; 415 tn->tn_lwpid = -1; 416 tn->tn_refcount = 1; | 319 tn->tn_cur = NULL; 320 tn->tn_lwpid = -1; 321 tn->tn_refcount = 1; |
417 TAILQ_INSERT_TAIL(&sigev_threads, tn, tn_link); | 322 _pthread_cond_init(&tn->tn_cv, NULL); 323 324 /* for debug */ 325 __sigev_list_lock(); 326 LIST_INSERT_HEAD(&sigev_threads, tn, tn_link); 327 __sigev_list_unlock(); 328 |
418 sigemptyset(&set); | 329 sigemptyset(&set); |
419 sigaddset(&set, SIGEV_SIGSERVICE); | 330 sigaddset(&set, SIGSERVICE); 331 |
420 _sigprocmask(SIG_BLOCK, &set, NULL); | 332 _sigprocmask(SIG_BLOCK, &set, NULL); |
421 ret = pthread_create(&tn->tn_thread, pattr, sigev_service_loop, tn); | 333 ret = pthread_create(&tn->tn_thread, &sigev_default_attr, 334 sigev_service_loop, tn); |
422 _sigprocmask(SIG_UNBLOCK, &set, NULL); | 335 _sigprocmask(SIG_UNBLOCK, &set, NULL); |
336 |
|
423 if (ret != 0) { | 337 if (ret != 0) { |
424 TAILQ_REMOVE(&sigev_threads, tn, tn_link); 425 __sigev_thread_list_unlock(); | 338 __sigev_list_lock(); 339 LIST_REMOVE(tn, tn_link); 340 __sigev_list_unlock(); |
426 free(tn); 427 tn = NULL; 428 } else { 429 /* wait the thread to get its lwpid */ | 341 free(tn); 342 tn = NULL; 343 } else { 344 /* wait the thread to get its lwpid */ |
430 while (tn->tn_lwpid == -1) 431 _pthread_cond_wait(sigev_threads_cv, sigev_threads_mtx); 432 __sigev_thread_list_unlock(); 433 } 434 return (tn); 435} | |
436 | 345 |
437static void 438after_dispatch(struct sigev_thread_node *tn) 439{ 440 struct sigev_node *sn; 441 442 if ((sn = tn->tn_cur) != NULL) { | |
443 __sigev_list_lock(); | 346 __sigev_list_lock(); |
444 sn->sn_flags &= ~SNF_WORKING; 445 if (sn->sn_flags & SNF_REMOVED) 446 __sigev_free(sn); 447 else if (sn->sn_flags & SNF_ONESHOT) 448 __sigev_delete_node(sn); 449 tn->tn_cur = NULL; | 347 while (tn->tn_lwpid == -1) 348 _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx); |
450 __sigev_list_unlock(); 451 } | 349 __sigev_list_unlock(); 350 } |
452 tn->tn_cur = NULL; | 351 return (tn); |
453} 454 455/* | 352} 353 354/* |
456 * This function is called if user callback calls 457 * pthread_exit() or pthread_cancel() for the thread. | 355 * The thread receives notification from kernel and creates 356 * a thread to call user callback function. |
458 */ | 357 */ |
459static void 460thread_cleanup(void *arg) 461{ 462 struct sigev_thread_node *tn = arg; 463 464 fprintf(stderr, "Dangerous Robinson, calling pthread_exit() from " 465 "SIGEV_THREAD is undefined."); 466 after_dispatch(tn); 467 /* longjmp(tn->tn_jbuf, 1); */ 468 abort(); 469} 470 471/* 472 * Main notification dispatch function, the function either 473 * run user callback by itself or hand off the notifications 474 * to worker threads depend on flags. 475 */ | |
476static void * 477sigev_service_loop(void *arg) 478{ | 358static void * 359sigev_service_loop(void *arg) 360{ |
361 static int failure; 362 |
|
479 siginfo_t si; 480 sigset_t set; | 363 siginfo_t si; 364 sigset_t set; |
481 struct sigev_thread_node *tn; | 365 struct sigev_thread *tn; |
482 struct sigev_node *sn; 483 sigev_id_t id; | 366 struct sigev_node *sn; 367 sigev_id_t id; |
368 pthread_t td; |
|
484 int ret; 485 486 tn = arg; 487 thr_self(&tn->tn_lwpid); | 369 int ret; 370 371 tn = arg; 372 thr_self(&tn->tn_lwpid); |
488 __sigev_thread_list_lock(); 489 _pthread_cond_broadcast(sigev_threads_cv); 490 __sigev_thread_list_unlock(); | 373 __sigev_list_lock(); 374 _pthread_cond_broadcast(&tn->tn_cv); 375 __sigev_list_unlock(); |
491 492 /* 493 * Service thread should not be killed by callback, if user 494 * attempts to do so, the thread will be restarted. 495 */ | 376 377 /* 378 * Service thread should not be killed by callback, if user 379 * attempts to do so, the thread will be restarted. 380 */ |
496 setjmp(tn->tn_jbuf); 497 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); | |
498 sigemptyset(&set); | 381 sigemptyset(&set); |
499 sigaddset(&set, SIGEV_SIGSERVICE); 500 pthread_cleanup_push(thread_cleanup, tn); | 382 sigaddset(&set, SIGSERVICE); |
501 for (;;) { 502 ret = sigwaitinfo(&set, &si); | 383 for (;;) { 384 ret = sigwaitinfo(&set, &si); |
503 __sigev_thread_list_lock(); | 385 386 __sigev_list_lock(); |
504 if (tn->tn_refcount == 0) { | 387 if (tn->tn_refcount == 0) { |
505 TAILQ_REMOVE(&sigev_threads, tn, tn_link); 506 __sigev_thread_list_unlock(); | 388 LIST_REMOVE(tn, tn_link); 389 __sigev_list_unlock(); |
507 free(tn); 508 break; 509 } | 390 free(tn); 391 break; 392 } |
510 __sigev_thread_list_unlock(); 511 if (ret == -1) | 393 394 if (ret == -1) { 395 __sigev_list_unlock(); |
512 continue; | 396 continue; |
397 } 398 |
|
513 id = sigev_get_id(&si); | 399 id = sigev_get_id(&si); |
514 __sigev_list_lock(); | |
515 sn = __sigev_find(si.si_code, id); | 400 sn = __sigev_find(si.si_code, id); |
516 if (sn != NULL) { 517 sn->sn_info = si; 518 if (!(sn->sn_flags & SNF_THREADPOOL)) { 519 tn->tn_cur = sn; 520 sn->sn_flags |= SNF_WORKING; 521 __sigev_list_unlock(); 522 sn->sn_dispatch(sn); 523 after_dispatch(tn); 524 } else { 525 assert(!(sn->sn_flags & SNF_ACTQ)); 526 sigev_put(sn); 527 } 528 } else { 529 tn->tn_cur = NULL; | 401 if (sn == NULL) { |
530 __sigev_list_unlock(); | 402 __sigev_list_unlock(); |
403 continue; |
|
531 } | 404 } |
532 } 533 pthread_cleanup_pop(0); 534 return (0); 535} | 405 406 sn->sn_info = si; 407 if (sn->sn_flags & SNF_SYNC) 408 tn->tn_cur = sn; 409 else 410 tn->tn_cur = NULL; 411 sn->sn_flags |= SNF_WORKING; 412 __sigev_list_unlock(); |
536 | 413 |
537/* 538 * Hand off notifications to worker threads. 539 * 540 * prerequist: sigev list locked. 541 */ 542static void 543sigev_put(struct sigev_node *sn) 544{ 545 struct sigev_worker *worker; 546 pthread_t td; 547 int ret, ready; | 414 ret = pthread_create(&td, &sn->sn_attr, worker_routine, sn); 415 if (ret != 0) { 416 if (failure++ < 5) 417 warnc(ret, "%s:%s failed to create thread.\n", 418 __FILE__, __func__); |
548 | 419 |
549 TAILQ_INSERT_TAIL(&sigev_actq, sn, sn_actq); 550 sn->sn_flags |= SNF_ACTQ; 551 /* 552 * check if we should add more worker threads unless quota is hit. 553 */ 554 if (LIST_EMPTY(&sigev_worker_ready) && 555 sigev_worker_count + sigev_worker_start < sigev_worker_high) { 556 sigev_worker_start++; 557 __sigev_list_unlock(); 558 worker = malloc(sizeof(*worker)); 559 _pthread_cond_init(&worker->sw_cv, NULL); 560 worker->sw_flags = 0; 561 worker->sw_sn = 0; 562 worker->sw_readyptr = &ready; 563 ready = 0; 564 ret = pthread_create(&td, &sigev_default_attr, 565 sigev_worker_routine, worker); 566 if (ret) { 567 warnc(ret, "%s:%s can not create worker thread", 568 __FILE__, __func__); | |
569 __sigev_list_lock(); | 420 __sigev_list_lock(); |
570 sigev_worker_start--; | 421 sn->sn_flags &= ~SNF_WORKING; 422 if (sn->sn_flags & SNF_REMOVED) 423 __sigev_free(sn); |
571 __sigev_list_unlock(); | 424 __sigev_list_unlock(); |
572 } else { | 425 } else if (tn->tn_cur) { |
573 __sigev_list_lock(); | 426 __sigev_list_lock(); |
574 while (ready == 0) { 575 _pthread_cond_wait(sigev_worker_init_cv, 576 sigev_list_mtx); 577 } | 427 while (tn->tn_cur) 428 _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx); |
578 __sigev_list_unlock(); 579 } | 429 __sigev_list_unlock(); 430 } |
580 } else { 581 worker = LIST_FIRST(&sigev_worker_ready); 582 if (worker) { 583 LIST_REMOVE(worker, sw_link); 584 worker->sw_flags &= ~SWF_READYQ; 585 _pthread_cond_broadcast(&worker->sw_cv); 586 __sigev_list_unlock(); 587 } | |
588 } | 431 } |
432 return (0); |
|
589} 590 591/* | 433} 434 435/* |
592 * Background thread to dispatch notification to user code. 593 * These threads are not bound to any realtime objects. | 436 * newly created worker thread to call user callback function. |
594 */ 595static void * | 437 */ 438static void * |
596sigev_worker_routine(void *arg) | 439worker_routine(void *arg) |
597{ | 440{ |
598 struct sigev_worker *worker; 599 struct sigev_node *sn; 600 struct timespec ts; 601 int ret; | 441 struct sigev_node *sn = arg; |
602 | 442 |
603 worker = arg; 604 __sigev_list_lock(); 605 sigev_worker_count++; 606 sigev_worker_start--; 607 (*(worker->sw_readyptr))++; 608 LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); 609 worker->sw_flags |= SWF_READYQ; 610 _pthread_cond_broadcast(sigev_worker_init_cv); | 443 _pthread_cleanup_push(worker_cleanup, sn); 444 sn->sn_dispatch(sn); 445 _pthread_cleanup_pop(1); |
611 | 446 |
612 for (;;) { 613 if (worker->sw_flags & SWF_READYQ) { 614 LIST_REMOVE(worker, sw_link); 615 worker->sw_flags &= ~SWF_READYQ; 616 } 617 618 sn = TAILQ_FIRST(&sigev_actq); 619 if (sn != NULL) { 620 TAILQ_REMOVE(&sigev_actq, sn, sn_actq); 621 sn->sn_flags &= ~SNF_ACTQ; 622 sn->sn_flags |= SNF_WORKING; 623 __sigev_list_unlock(); 624 625 worker->sw_sn = sn; 626 pthread_cleanup_push(worker_cleanup, worker); 627 sn->sn_dispatch(sn); 628 pthread_cleanup_pop(0); 629 worker->sw_sn = NULL; 630 631 __sigev_list_lock(); 632 sn->sn_flags &= ~SNF_WORKING; 633 if (sn->sn_flags & SNF_REMOVED) 634 __sigev_free(sn); 635 else if (sn->sn_flags & SNF_ONESHOT) 636 __sigev_delete_node(sn); 637 } else { 638 LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); 639 worker->sw_flags |= SWF_READYQ; 640 clock_gettime(CLOCK_REALTIME, &ts); 641 ts.tv_sec += SIGEV_WORKER_IDLE; 642 ret = _pthread_cond_timedwait(&worker->sw_cv, 643 sigev_list_mtx, &ts); 644 if (ret == ETIMEDOUT) { 645 /* 646 * If we were timeouted and there is nothing 647 * to do, exit the thread. 648 */ 649 if (TAILQ_EMPTY(&sigev_actq) && 650 (worker->sw_flags & SWF_READYQ) && 651 sigev_worker_count > sigev_worker_low) 652 goto out; 653 } 654 } 655 } 656out: 657 if (worker->sw_flags & SWF_READYQ) { 658 LIST_REMOVE(worker, sw_link); 659 worker->sw_flags &= ~SWF_READYQ; 660 } 661 sigev_worker_count--; 662 __sigev_list_unlock(); 663 _pthread_cond_destroy(&worker->sw_cv); 664 free(worker); | |
665 return (0); 666} 667 | 447 return (0); 448} 449 |
450/* clean up a notification after dispatch. */ |
|
668static void 669worker_cleanup(void *arg) 670{ | 451static void 452worker_cleanup(void *arg) 453{ |
671 struct sigev_worker *worker = arg; 672 struct sigev_node *sn; | 454 struct sigev_node *sn = arg; |
673 | 455 |
674 sn = worker->sw_sn; | |
675 __sigev_list_lock(); | 456 __sigev_list_lock(); |
676 sn->sn_flags &= ~SNF_WORKING; | 457 if (sn->sn_flags & SNF_SYNC) { 458 sn->sn_tn->tn_cur = NULL; 459 _pthread_cond_broadcast(&sn->sn_tn->tn_cv); 460 } |
677 if (sn->sn_flags & SNF_REMOVED) 678 __sigev_free(sn); | 461 if (sn->sn_flags & SNF_REMOVED) 462 __sigev_free(sn); |
679 else if (sn->sn_flags & SNF_ONESHOT) 680 __sigev_delete_node(sn); 681 sigev_worker_count--; | 463 else 464 sn->sn_flags &= ~SNF_WORKING; |
682 __sigev_list_unlock(); | 465 __sigev_list_unlock(); |
683 _pthread_cond_destroy(&worker->sw_cv); 684 free(worker); | |
685} | 466} |