sigev_thread.c (156192) | sigev_thread.c (156267) |
---|---|
1/* 2 * Copyright (c) 2005 David Xu <davidxu@freebsd.org> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright --- 9 unchanged lines hidden (view full) --- 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 * | 1/* 2 * Copyright (c) 2005 David Xu <davidxu@freebsd.org> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright --- 9 unchanged lines hidden (view full) --- 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 * |
26 * $FreeBSD: head/lib/librt/sigev_thread.c 156192 2006-03-01 23:38:53Z davidxu $ | 26 * $FreeBSD: head/lib/librt/sigev_thread.c 156267 2006-03-04 00:18:19Z davidxu $ |
27 * 28 */ 29 30#include <sys/types.h> 31#include <machine/atomic.h> 32 33#include "namespace.h" | 27 * 28 */ 29 30#include <sys/types.h> 31#include <machine/atomic.h> 32 33#include "namespace.h" |
34#include <assert.h> |
|
34#include <err.h> | 35#include <err.h> |
36#include <errno.h> |
|
35#include <ucontext.h> 36#include <sys/thr.h> | 37#include <ucontext.h> 38#include <sys/thr.h> |
39#include <sys/time.h> |
|
37#include <stdio.h> 38#include <stdlib.h> 39#include <string.h> 40#include <signal.h> 41#include <pthread.h> 42#include "un-namespace.h" 43 44#include "sigev_thread.h" 45 | 40#include <stdio.h> 41#include <stdlib.h> 42#include <string.h> 43#include <signal.h> 44#include <pthread.h> 45#include "un-namespace.h" 46 47#include "sigev_thread.h" 48 |
49/* Lowest number of worker threads should be kept. */ 50#define SIGEV_WORKER_LOW 0 51 52/* Highest number of worker threads can be created. */ 53#define SIGEV_WORKER_HIGH 20 54 55/* How long an idle worker thread should stay. */ 56#define SIGEV_WORKER_IDLE 10 57 58struct sigev_worker { 59 LIST_ENTRY(sigev_worker) sw_link; 60 pthread_cond_t sw_cv; 61 struct sigev_node *sw_sn; 62 int sw_flags; 63 int *sw_readyptr; 64}; 65 66#define SWF_READYQ 1 67 |
|
46LIST_HEAD(sigev_list_head, sigev_node); 47#define HASH_QUEUES 17 48#define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES) | 68LIST_HEAD(sigev_list_head, sigev_node); 69#define HASH_QUEUES 17 70#define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES) |
71 |
|
49static struct sigev_list_head sigev_hash[HASH_QUEUES]; 50static struct sigev_list_head sigev_all; | 72static struct sigev_list_head sigev_hash[HASH_QUEUES]; 73static struct sigev_list_head sigev_all; |
74static TAILQ_HEAD(, sigev_node) sigev_actq; 75static TAILQ_HEAD(, sigev_thread_node) sigev_threads; |
|
51static int sigev_generation; 52static pthread_mutex_t *sigev_list_mtx; | 76static int sigev_generation; 77static pthread_mutex_t *sigev_list_mtx; |
53static TAILQ_HEAD(,sigev_thread_node) sigev_threads; | 78static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; 79static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT; |
54static pthread_mutex_t *sigev_threads_mtx; | 80static pthread_mutex_t *sigev_threads_mtx; |
81static pthread_cond_t *sigev_threads_cv; 82static pthread_cond_t *sigev_actq_cv; 83static struct sigev_thread_node *sigev_default_thread; 84static struct sigev_thread_attr sigev_default_sna; |
|
55static pthread_attr_t sigev_default_attr; | 85static pthread_attr_t sigev_default_attr; |
56static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; | 86static int atfork_registered; 87static LIST_HEAD(,sigev_worker) sigev_worker_ready; 88static int sigev_worker_count; 89static int sigev_worker_start; 90static int sigev_worker_high; 91static int sigev_worker_low; 92static pthread_cond_t *sigev_worker_init_cv; |
57 58static void __sigev_fork_prepare(void); 59static void __sigev_fork_parent(void); 60static void __sigev_fork_child(void); | 93 94static void __sigev_fork_prepare(void); 95static void __sigev_fork_parent(void); 96static void __sigev_fork_child(void); |
61static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *); | 97static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *, 98 struct sigev_thread_node *, int); |
62static void *sigev_service_loop(void *); | 99static void *sigev_service_loop(void *); |
100static void *sigev_worker_routine(void *); 101static void sigev_put(struct sigev_node *); 102static void worker_cleanup(void *arg); |
|
63 64#pragma weak pthread_create 65#pragma weak pthread_attr_getschedpolicy 66#pragma weak pthread_attr_getinheritsched 67#pragma weak pthread_attr_getschedparam 68#pragma weak pthread_attr_getscope 69#pragma weak pthread_attr_getstacksize 70#pragma weak pthread_attr_getstackaddr --- 11 unchanged lines hidden (view full) --- 82attr2sna(pthread_attr_t *attr, struct sigev_thread_attr *sna) 83{ 84 struct sched_param sched_param; 85 86 pthread_attr_getschedpolicy(attr, &sna->sna_policy); 87 pthread_attr_getinheritsched(attr, &sna->sna_inherit); 88 pthread_attr_getschedparam(attr, &sched_param); 89 sna->sna_prio = sched_param.sched_priority; | 103 104#pragma weak pthread_create 105#pragma weak pthread_attr_getschedpolicy 106#pragma weak pthread_attr_getinheritsched 107#pragma weak pthread_attr_getschedparam 108#pragma weak pthread_attr_getscope 109#pragma weak pthread_attr_getstacksize 110#pragma weak pthread_attr_getstackaddr --- 11 unchanged lines hidden (view full) --- 122attr2sna(pthread_attr_t *attr, struct sigev_thread_attr *sna) 123{ 124 struct sched_param sched_param; 125 126 pthread_attr_getschedpolicy(attr, &sna->sna_policy); 127 pthread_attr_getinheritsched(attr, &sna->sna_inherit); 128 pthread_attr_getschedparam(attr, &sched_param); 129 sna->sna_prio = sched_param.sched_priority; |
90 pthread_attr_getscope(attr, &sna->sna_scope); | |
91 pthread_attr_getstacksize(attr, &sna->sna_stacksize); 92 pthread_attr_getstackaddr(attr, &sna->sna_stackaddr); 93 pthread_attr_getguardsize(attr, &sna->sna_guardsize); 94} 95 96static __inline int 97sna_eq(const struct sigev_thread_attr *a, const struct sigev_thread_attr *b) 98{ 99 return memcmp(a, b, sizeof(*a)) == 0; 100} 101 102static __inline int 103have_threads(void) 104{ | 130 pthread_attr_getstacksize(attr, &sna->sna_stacksize); 131 pthread_attr_getstackaddr(attr, &sna->sna_stackaddr); 132 pthread_attr_getguardsize(attr, &sna->sna_guardsize); 133} 134 135static __inline int 136sna_eq(const struct sigev_thread_attr *a, const struct sigev_thread_attr *b) 137{ 138 return memcmp(a, b, sizeof(*a)) == 0; 139} 140 141static __inline int 142have_threads(void) 143{ |
105 return (pthread_create != NULL); | 144 return (&pthread_create != NULL); |
106} 107 108void 109__sigev_thread_init(void) 110{ | 145} 146 147void 148__sigev_thread_init(void) 149{ |
111 static int notfirst = 0; | 150 static int inited = 0; |
112 int i; 113 114 sigev_list_mtx = malloc(sizeof(pthread_mutex_t)); 115 _pthread_mutex_init(sigev_list_mtx, NULL); 116 sigev_threads_mtx = malloc(sizeof(pthread_mutex_t)); 117 _pthread_mutex_init(sigev_threads_mtx, NULL); | 151 int i; 152 153 sigev_list_mtx = malloc(sizeof(pthread_mutex_t)); 154 _pthread_mutex_init(sigev_list_mtx, NULL); 155 sigev_threads_mtx = malloc(sizeof(pthread_mutex_t)); 156 _pthread_mutex_init(sigev_threads_mtx, NULL); |
157 sigev_threads_cv = malloc(sizeof(pthread_cond_t)); 158 _pthread_cond_init(sigev_threads_cv, NULL); 159 sigev_actq_cv = malloc(sizeof(pthread_cond_t)); 160 _pthread_cond_init(sigev_actq_cv, NULL); |
|
118 for (i = 0; i < HASH_QUEUES; ++i) 119 LIST_INIT(&sigev_hash[i]); 120 LIST_INIT(&sigev_all); 121 TAILQ_INIT(&sigev_threads); | 161 for (i = 0; i < HASH_QUEUES; ++i) 162 LIST_INIT(&sigev_hash[i]); 163 LIST_INIT(&sigev_all); 164 TAILQ_INIT(&sigev_threads); |
122 if (!notfirst) { | 165 TAILQ_INIT(&sigev_actq); 166 sigev_default_thread = NULL; 167 sigev_worker_count = 0; 168 sigev_worker_start = 0; 169 LIST_INIT(&sigev_worker_ready); 170 sigev_worker_high = SIGEV_WORKER_HIGH; 171 sigev_worker_low = SIGEV_WORKER_LOW; 172 sigev_worker_init_cv = malloc(sizeof(pthread_cond_t)); 173 _pthread_cond_init(sigev_worker_init_cv, NULL); 174 if (atfork_registered == 0) { 175 pthread_atfork( 176 __sigev_fork_prepare, 177 __sigev_fork_parent, 178 __sigev_fork_child); 179 atfork_registered = 1; 180 } 181 if (!inited) { |
123 pthread_attr_init(&sigev_default_attr); | 182 pthread_attr_init(&sigev_default_attr); |
124 pthread_attr_setscope(&sigev_default_attr, PTHREAD_SCOPE_SYSTEM); | 183 attr2sna(&sigev_default_attr, &sigev_default_sna); 184 pthread_attr_setscope(&sigev_default_attr, 185 PTHREAD_SCOPE_SYSTEM); |
125 pthread_attr_setdetachstate(&sigev_default_attr, 126 PTHREAD_CREATE_DETACHED); | 186 pthread_attr_setdetachstate(&sigev_default_attr, 187 PTHREAD_CREATE_DETACHED); |
127 pthread_atfork(__sigev_fork_prepare, __sigev_fork_parent, 128 __sigev_fork_child); 129 notfirst = 1; | 188 inited = 1; |
130 } | 189 } |
190 sigev_default_thread = sigev_thread_create(NULL, NULL, 0); |
|
131} 132 133int 134__sigev_check_init(void) 135{ 136 if (!have_threads()) 137 return (-1); 138 139 _pthread_once(&sigev_once, __sigev_thread_init); | 191} 192 193int 194__sigev_check_init(void) 195{ 196 if (!have_threads()) 197 return (-1); 198 199 _pthread_once(&sigev_once, __sigev_thread_init); |
140 return (0); | 200 return (sigev_default_thread != NULL) ? 0 : -1; |
141} 142 | 201} 202 |
143void | 203static void |
144__sigev_fork_prepare(void) 145{ | 204__sigev_fork_prepare(void) 205{ |
146 __sigev_thread_list_lock(); | |
147} 148 | 206} 207 |
149void | 208static void |
150__sigev_fork_parent(void) 151{ | 209__sigev_fork_parent(void) 210{ |
152 __sigev_thread_list_unlock(); | |
153} 154 | 211} 212 |
155void | 213static void |
156__sigev_fork_child(void) 157{ | 214__sigev_fork_child(void) 215{ |
216 /* 217 * This is a hack, the thread libraries really should 218 * check if the handlers were already registered in 219 * pthread_atfork(). 220 */ 221 atfork_registered = 1; 222 memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once)); |
|
158 __sigev_thread_init(); 159} 160 161int 162__sigev_list_lock(void) 163{ 164 return _pthread_mutex_lock(sigev_list_mtx); 165} --- 12 unchanged lines hidden (view full) --- 178 179int 180__sigev_thread_list_unlock(void) 181{ 182 return _pthread_mutex_unlock(sigev_threads_mtx); 183} 184 185struct sigev_node * | 223 __sigev_thread_init(); 224} 225 226int 227__sigev_list_lock(void) 228{ 229 return _pthread_mutex_lock(sigev_list_mtx); 230} --- 12 unchanged lines hidden (view full) --- 243 244int 245__sigev_thread_list_unlock(void) 246{ 247 return _pthread_mutex_unlock(sigev_threads_mtx); 248} 249 250struct sigev_node * |
186__sigev_alloc(int type, const struct sigevent *evp) | 251__sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev, 252 int usethreadpool) |
187{ 188 struct sigev_node *sn; 189 190 sn = calloc(1, sizeof(*sn)); 191 if (sn != NULL) { 192 sn->sn_value = evp->sigev_value; 193 sn->sn_func = evp->sigev_notify_function; 194 sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1); 195 sn->sn_type = type; | 253{ 254 struct sigev_node *sn; 255 256 sn = calloc(1, sizeof(*sn)); 257 if (sn != NULL) { 258 sn->sn_value = evp->sigev_value; 259 sn->sn_func = evp->sigev_notify_function; 260 sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1); 261 sn->sn_type = type; |
196 sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes); | 262 if (usethreadpool) 263 sn->sn_flags |= SNF_THREADPOOL; 264 sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes, 265 prev ? prev->sn_tn : NULL, usethreadpool); |
197 if (sn->sn_tn == NULL) { 198 free(sn); 199 sn = NULL; 200 } 201 } 202 return (sn); 203} 204 --- 52 unchanged lines hidden (view full) --- 257} 258 259int 260__sigev_delete_node(struct sigev_node *sn) 261{ 262 LIST_REMOVE(sn, sn_link); 263 LIST_REMOVE(sn, sn_allist); 264 | 266 if (sn->sn_tn == NULL) { 267 free(sn); 268 sn = NULL; 269 } 270 } 271 return (sn); 272} 273 --- 52 unchanged lines hidden (view full) --- 326} 327 328int 329__sigev_delete_node(struct sigev_node *sn) 330{ 331 LIST_REMOVE(sn, sn_link); 332 LIST_REMOVE(sn, sn_allist); 333 |
334 __sigev_thread_list_lock(); 335 if (--sn->sn_tn->tn_refcount == 0) 336 if (!(sn->sn_flags & SNF_THREADPOOL)) 337 pthread_kill(sn->sn_tn->tn_thread, SIGEV_SIGSERVICE); 338 __sigev_thread_list_unlock(); |
|
265 if (sn->sn_flags & SNF_WORKING) 266 sn->sn_flags |= SNF_REMOVED; | 339 if (sn->sn_flags & SNF_WORKING) 340 sn->sn_flags |= SNF_REMOVED; |
267 else | 341 else { 342 if (sn->sn_flags & SNF_ACTQ) { 343 TAILQ_REMOVE(&sigev_actq, sn, sn_actq); 344 } |
268 __sigev_free(sn); | 345 __sigev_free(sn); |
346 } |
|
269 return (0); 270} 271 272static 273sigev_id_t 274sigev_get_id(siginfo_t *si) 275{ 276 switch(si->si_code) { 277 case SI_TIMER: 278 return (si->si_timerid); 279 case SI_MESGQ: 280 return (si->si_mqd); 281 case SI_ASYNCIO: 282 return (sigev_id_t)si->si_value.sival_ptr; | 347 return (0); 348} 349 350static 351sigev_id_t 352sigev_get_id(siginfo_t *si) 353{ 354 switch(si->si_code) { 355 case SI_TIMER: 356 return (si->si_timerid); 357 case SI_MESGQ: 358 return (si->si_mqd); 359 case SI_ASYNCIO: 360 return (sigev_id_t)si->si_value.sival_ptr; |
283 default: 284 warnx("%s %s : unknown si_code %d\n", __FILE__, __func__, 285 si->si_code); | |
286 } 287 return (-1); 288} 289 290static struct sigev_thread_node * | 361 } 362 return (-1); 363} 364 365static struct sigev_thread_node * |
291sigev_thread_create(pthread_attr_t *pattr) | 366sigev_thread_create(pthread_attr_t *pattr, struct sigev_thread_node *prev, 367 int usepool) |
292{ 293 struct sigev_thread_node *tn; 294 struct sigev_thread_attr sna; 295 sigset_t set; 296 int ret; 297 298 if (pattr == NULL) 299 pattr = &sigev_default_attr; 300 else { 301 pthread_attr_setscope(pattr, PTHREAD_SCOPE_SYSTEM); 302 pthread_attr_setdetachstate(pattr, PTHREAD_CREATE_DETACHED); 303 } 304 305 attr2sna(pattr, &sna); 306 307 __sigev_thread_list_lock(); | 368{ 369 struct sigev_thread_node *tn; 370 struct sigev_thread_attr sna; 371 sigset_t set; 372 int ret; 373 374 if (pattr == NULL) 375 pattr = &sigev_default_attr; 376 else { 377 pthread_attr_setscope(pattr, PTHREAD_SCOPE_SYSTEM); 378 pthread_attr_setdetachstate(pattr, PTHREAD_CREATE_DETACHED); 379 } 380 381 attr2sna(pattr, &sna); 382 383 __sigev_thread_list_lock(); |
308 /* Search a thread matching the required pthread_attr. */ 309 TAILQ_FOREACH(tn, &sigev_threads, tn_link) { 310 if (sna.sna_stackaddr == NULL) { 311 if (sna_eq(&tn->tn_sna, &sna)) 312 break; 313 } else { 314 /* 315 * Reuse the thread if it has same stack address, 316 * because two threads can not run on same stack. 317 */ | 384 385 if (prev != NULL && sna_eq(&prev->tn_sna, &sna)) { 386 prev->tn_refcount++; 387 __sigev_thread_list_unlock(); 388 return (prev); 389 } 390 391 if (sna_eq(&sna, &sigev_default_sna) && usepool && 392 sigev_default_thread != NULL) { 393 sigev_default_thread->tn_refcount++; 394 __sigev_thread_list_unlock(); 395 return (sigev_default_thread); 396 } 397 398 tn = NULL; 399 /* Search a thread matching the required stack address */ 400 if (sna.sna_stackaddr != NULL) { 401 TAILQ_FOREACH(tn, &sigev_threads, tn_link) { |
318 if (sna.sna_stackaddr == tn->tn_sna.sna_stackaddr) 319 break; 320 } 321 } | 402 if (sna.sna_stackaddr == tn->tn_sna.sna_stackaddr) 403 break; 404 } 405 } |
406 |
|
322 if (tn != NULL) { | 407 if (tn != NULL) { |
408 tn->tn_refcount++; |
|
323 __sigev_thread_list_unlock(); 324 return (tn); 325 } 326 tn = malloc(sizeof(*tn)); 327 tn->tn_sna = sna; 328 tn->tn_cur = NULL; | 409 __sigev_thread_list_unlock(); 410 return (tn); 411 } 412 tn = malloc(sizeof(*tn)); 413 tn->tn_sna = sna; 414 tn->tn_cur = NULL; |
415 tn->tn_lwpid = -1; 416 tn->tn_refcount = 1; |
|
329 TAILQ_INSERT_TAIL(&sigev_threads, tn, tn_link); 330 sigemptyset(&set); 331 sigaddset(&set, SIGEV_SIGSERVICE); 332 _sigprocmask(SIG_BLOCK, &set, NULL); | 417 TAILQ_INSERT_TAIL(&sigev_threads, tn, tn_link); 418 sigemptyset(&set); 419 sigaddset(&set, SIGEV_SIGSERVICE); 420 _sigprocmask(SIG_BLOCK, &set, NULL); |
333 _pthread_cond_init(&tn->tn_cv, NULL); | |
334 ret = pthread_create(&tn->tn_thread, pattr, sigev_service_loop, tn); 335 _sigprocmask(SIG_UNBLOCK, &set, NULL); 336 if (ret != 0) { 337 TAILQ_REMOVE(&sigev_threads, tn, tn_link); 338 __sigev_thread_list_unlock(); | 421 ret = pthread_create(&tn->tn_thread, pattr, sigev_service_loop, tn); 422 _sigprocmask(SIG_UNBLOCK, &set, NULL); 423 if (ret != 0) { 424 TAILQ_REMOVE(&sigev_threads, tn, tn_link); 425 __sigev_thread_list_unlock(); |
339 _pthread_cond_destroy(&tn->tn_cv); | |
340 free(tn); 341 tn = NULL; 342 } else { 343 /* wait the thread to get its lwpid */ | 426 free(tn); 427 tn = NULL; 428 } else { 429 /* wait the thread to get its lwpid */ |
344 _pthread_cond_wait(&tn->tn_cv, sigev_threads_mtx); | 430 while (tn->tn_lwpid == -1) 431 _pthread_cond_wait(sigev_threads_cv, sigev_threads_mtx); |
345 __sigev_thread_list_unlock(); 346 } 347 return (tn); 348} 349 350static void 351after_dispatch(struct sigev_thread_node *tn) 352{ --- 4 unchanged lines hidden (view full) --- 357 sn->sn_flags &= ~SNF_WORKING; 358 if (sn->sn_flags & SNF_REMOVED) 359 __sigev_free(sn); 360 else if (sn->sn_flags & SNF_ONESHOT) 361 __sigev_delete_node(sn); 362 tn->tn_cur = NULL; 363 __sigev_list_unlock(); 364 } | 432 __sigev_thread_list_unlock(); 433 } 434 return (tn); 435} 436 437static void 438after_dispatch(struct sigev_thread_node *tn) 439{ --- 4 unchanged lines hidden (view full) --- 444 sn->sn_flags &= ~SNF_WORKING; 445 if (sn->sn_flags & SNF_REMOVED) 446 __sigev_free(sn); 447 else if (sn->sn_flags & SNF_ONESHOT) 448 __sigev_delete_node(sn); 449 tn->tn_cur = NULL; 450 __sigev_list_unlock(); 451 } |
452 tn->tn_cur = NULL; |
|
365} 366 367/* 368 * This function is called if user callback calls 369 * pthread_exit() or pthread_cancel() for the thread. 370 */ 371static void 372thread_cleanup(void *arg) 373{ 374 struct sigev_thread_node *tn = arg; 375 376 fprintf(stderr, "Dangerous Robinson, calling pthread_exit() from " 377 "SIGEV_THREAD is undefined."); 378 after_dispatch(tn); 379 /* longjmp(tn->tn_jbuf, 1); */ 380 abort(); 381} 382 | 453} 454 455/* 456 * This function is called if user callback calls 457 * pthread_exit() or pthread_cancel() for the thread. 458 */ 459static void 460thread_cleanup(void *arg) 461{ 462 struct sigev_thread_node *tn = arg; 463 464 fprintf(stderr, "Dangerous Robinson, calling pthread_exit() from " 465 "SIGEV_THREAD is undefined."); 466 after_dispatch(tn); 467 /* longjmp(tn->tn_jbuf, 1); */ 468 abort(); 469} 470 |
471/* 472 * Main notification dispatch function, the function either 473 * run user callback by itself or hand off the notifications 474 * to worker threads depend on flags. 475 */ |
|
383static void * 384sigev_service_loop(void *arg) 385{ 386 siginfo_t si; 387 sigset_t set; 388 struct sigev_thread_node *tn; 389 struct sigev_node *sn; 390 sigev_id_t id; | 476static void * 477sigev_service_loop(void *arg) 478{ 479 siginfo_t si; 480 sigset_t set; 481 struct sigev_thread_node *tn; 482 struct sigev_node *sn; 483 sigev_id_t id; |
484 int ret; |
|
391 392 tn = arg; 393 thr_self(&tn->tn_lwpid); | 485 486 tn = arg; 487 thr_self(&tn->tn_lwpid); |
394 __sigev_list_lock(); 395 _pthread_cond_broadcast(&tn->tn_cv); 396 __sigev_list_unlock(); | 488 __sigev_thread_list_lock(); 489 _pthread_cond_broadcast(sigev_threads_cv); 490 __sigev_thread_list_unlock(); |
397 398 /* 399 * Service thread should not be killed by callback, if user 400 * attempts to do so, the thread will be restarted. 401 */ 402 setjmp(tn->tn_jbuf); 403 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); 404 sigemptyset(&set); 405 sigaddset(&set, SIGEV_SIGSERVICE); 406 pthread_cleanup_push(thread_cleanup, tn); 407 for (;;) { | 491 492 /* 493 * Service thread should not be killed by callback, if user 494 * attempts to do so, the thread will be restarted. 495 */ 496 setjmp(tn->tn_jbuf); 497 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); 498 sigemptyset(&set); 499 sigaddset(&set, SIGEV_SIGSERVICE); 500 pthread_cleanup_push(thread_cleanup, tn); 501 for (;;) { |
408 if (__predict_false(sigwaitinfo(&set, &si) == -1)) | 502 ret = sigwaitinfo(&set, &si); 503 __sigev_thread_list_lock(); 504 if (tn->tn_refcount == 0) { 505 TAILQ_REMOVE(&sigev_threads, tn, tn_link); 506 __sigev_thread_list_unlock(); 507 free(tn); 508 break; 509 } 510 __sigev_thread_list_unlock(); 511 if (ret == -1) |
409 continue; | 512 continue; |
410 | |
411 id = sigev_get_id(&si); 412 __sigev_list_lock(); 413 sn = __sigev_find(si.si_code, id); 414 if (sn != NULL) { | 513 id = sigev_get_id(&si); 514 __sigev_list_lock(); 515 sn = __sigev_find(si.si_code, id); 516 if (sn != NULL) { |
415 tn->tn_cur = sn; 416 sn->sn_flags |= SNF_WORKING; 417 __sigev_list_unlock(); 418 sn->sn_dispatch(sn, &si); 419 after_dispatch(tn); 420 } else { | 517 sn->sn_info = si; 518 if (!(sn->sn_flags & SNF_THREADPOOL)) { 519 tn->tn_cur = sn; 520 sn->sn_flags |= SNF_WORKING; 521 __sigev_list_unlock(); 522 sn->sn_dispatch(sn); 523 after_dispatch(tn); 524 } else { 525 assert(!(sn->sn_flags & SNF_ACTQ)); 526 sigev_put(sn); 527 } 528 } else { |
421 tn->tn_cur = NULL; 422 __sigev_list_unlock(); 423 } 424 } 425 pthread_cleanup_pop(0); 426 return (0); 427} | 529 tn->tn_cur = NULL; 530 __sigev_list_unlock(); 531 } 532 } 533 pthread_cleanup_pop(0); 534 return (0); 535} |
536 537/* 538 * Hand off notifications to worker threads. 539 * 540 * prerequist: sigev list locked. 541 */ 542static void 543sigev_put(struct sigev_node *sn) 544{ 545 struct sigev_worker *worker; 546 pthread_t td; 547 int ret, ready; 548 549 TAILQ_INSERT_TAIL(&sigev_actq, sn, sn_actq); 550 sn->sn_flags |= SNF_ACTQ; 551 /* 552 * check if we should add more worker threads unless quota is hit. 553 */ 554 if (LIST_EMPTY(&sigev_worker_ready) && 555 sigev_worker_count + sigev_worker_start < sigev_worker_high) { 556 sigev_worker_start++; 557 __sigev_list_unlock(); 558 worker = malloc(sizeof(*worker)); 559 _pthread_cond_init(&worker->sw_cv, NULL); 560 worker->sw_flags = 0; 561 worker->sw_sn = 0; 562 worker->sw_readyptr = &ready; 563 ready = 0; 564 ret = pthread_create(&td, &sigev_default_attr, 565 sigev_worker_routine, worker); 566 if (ret) { 567 warnc(ret, "%s:%s can not create worker thread", 568 __FILE__, __func__); 569 __sigev_list_lock(); 570 sigev_worker_start--; 571 __sigev_list_unlock(); 572 } else { 573 __sigev_list_lock(); 574 while (ready == 0) { 575 _pthread_cond_wait(sigev_worker_init_cv, 576 sigev_list_mtx); 577 } 578 __sigev_list_unlock(); 579 } 580 } else { 581 worker = LIST_FIRST(&sigev_worker_ready); 582 if (worker) { 583 LIST_REMOVE(worker, sw_link); 584 worker->sw_flags &= ~SWF_READYQ; 585 _pthread_cond_broadcast(&worker->sw_cv); 586 __sigev_list_unlock(); 587 } 588 } 589} 590 591/* 592 * Background thread to dispatch notification to user code. 593 * These threads are not bound to any realtime objects. 594 */ 595static void * 596sigev_worker_routine(void *arg) 597{ 598 struct sigev_worker *worker; 599 struct sigev_node *sn; 600 struct timespec ts; 601 int ret; 602 603 worker = arg; 604 __sigev_list_lock(); 605 sigev_worker_count++; 606 sigev_worker_start--; 607 (*(worker->sw_readyptr))++; 608 LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); 609 worker->sw_flags |= SWF_READYQ; 610 _pthread_cond_broadcast(sigev_worker_init_cv); 611 612 for (;;) { 613 if (worker->sw_flags & SWF_READYQ) { 614 LIST_REMOVE(worker, sw_link); 615 worker->sw_flags &= ~SWF_READYQ; 616 } 617 618 sn = TAILQ_FIRST(&sigev_actq); 619 if (sn != NULL) { 620 TAILQ_REMOVE(&sigev_actq, sn, sn_actq); 621 sn->sn_flags &= ~SNF_ACTQ; 622 sn->sn_flags |= SNF_WORKING; 623 __sigev_list_unlock(); 624 625 worker->sw_sn = sn; 626 pthread_cleanup_push(worker_cleanup, worker); 627 sn->sn_dispatch(sn); 628 pthread_cleanup_pop(0); 629 worker->sw_sn = NULL; 630 631 __sigev_list_lock(); 632 sn->sn_flags &= ~SNF_WORKING; 633 if (sn->sn_flags & SNF_REMOVED) 634 __sigev_free(sn); 635 else if (sn->sn_flags & SNF_ONESHOT) 636 __sigev_delete_node(sn); 637 } else { 638 LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link); 639 worker->sw_flags |= SWF_READYQ; 640 clock_gettime(CLOCK_REALTIME, &ts); 641 ts.tv_sec += SIGEV_WORKER_IDLE; 642 ret = _pthread_cond_timedwait(&worker->sw_cv, 643 sigev_list_mtx, &ts); 644 if (ret == ETIMEDOUT) { 645 /* 646 * If we were timeouted and there is nothing 647 * to do, exit the thread. 648 */ 649 if (TAILQ_EMPTY(&sigev_actq) && 650 (worker->sw_flags & SWF_READYQ) && 651 sigev_worker_count > sigev_worker_low) 652 goto out; 653 } 654 } 655 } 656out: 657 if (worker->sw_flags & SWF_READYQ) { 658 LIST_REMOVE(worker, sw_link); 659 worker->sw_flags &= ~SWF_READYQ; 660 } 661 sigev_worker_count--; 662 __sigev_list_unlock(); 663 _pthread_cond_destroy(&worker->sw_cv); 664 free(worker); 665 return (0); 666} 667 668static void 669worker_cleanup(void *arg) 670{ 671 struct sigev_worker *worker = arg; 672 struct sigev_node *sn; 673 674 sn = worker->sw_sn; 675 __sigev_list_lock(); 676 sn->sn_flags &= ~SNF_WORKING; 677 if (sn->sn_flags & SNF_REMOVED) 678 __sigev_free(sn); 679 else if (sn->sn_flags & SNF_ONESHOT) 680 __sigev_delete_node(sn); 681 sigev_worker_count--; 682 __sigev_list_unlock(); 683 _pthread_cond_destroy(&worker->sw_cv); 684 free(worker); 685} |
|