sigev_thread.c revision 156383
1/* 2 * Copyright (c) 2005 David Xu <davidxu@freebsd.org> 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice unmodified, this list of conditions, and the following 10 * disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 * 26 * $FreeBSD: head/lib/librt/sigev_thread.c 156383 2006-03-07 08:28:07Z davidxu $ 27 * 28 */ 29 30#include <sys/types.h> 31#include <machine/atomic.h> 32 33#include "namespace.h" 34#include <err.h> 35#include <errno.h> 36#include <ucontext.h> 37#include <sys/thr.h> 38#include <stdio.h> 39#include <stdlib.h> 40#include <string.h> 41#include <signal.h> 42#include <pthread.h> 43#include "un-namespace.h" 44 45#include "sigev_thread.h" 46 47LIST_HEAD(sigev_list_head, sigev_node); 48#define HASH_QUEUES 17 49#define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES) 50 51static struct sigev_list_head sigev_hash[HASH_QUEUES]; 52static struct sigev_list_head sigev_all; 53static LIST_HEAD(,sigev_thread) sigev_threads; 54static int sigev_generation; 55static pthread_mutex_t *sigev_list_mtx; 56static pthread_once_t sigev_once = PTHREAD_ONCE_INIT; 57static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT; 58static struct sigev_thread *sigev_default_thread; 59static pthread_attr_t sigev_default_attr; 60static int atfork_registered; 61 62static void __sigev_fork_prepare(void); 63static void __sigev_fork_parent(void); 64static void __sigev_fork_child(void); 65static struct sigev_thread *sigev_thread_create(int); 66static void *sigev_service_loop(void *); 67static void *worker_routine(void *); 68static void worker_cleanup(void *); 69 70#pragma weak pthread_create 71 72static void 73attrcopy(pthread_attr_t *src, pthread_attr_t *dst) 74{ 75 struct sched_param sched; 76 void *a; 77 size_t u; 78 int v; 79 80 _pthread_attr_getschedpolicy(src, &v); 81 _pthread_attr_setschedpolicy(dst, v); 82 83 _pthread_attr_getinheritsched(src, &v); 84 _pthread_attr_setinheritsched(dst, v); 85 86 _pthread_attr_getschedparam(src, &sched); 87 _pthread_attr_setschedparam(dst, &sched); 88 89 _pthread_attr_getscope(src, &v); 90 _pthread_attr_setscope(dst, v); 91 92 _pthread_attr_getstacksize(src, &u); 93 _pthread_attr_setstacksize(dst, u); 94 95 _pthread_attr_getstackaddr(src, &a); 96 _pthread_attr_setstackaddr(src, a); 97 98 _pthread_attr_getguardsize(src, &u); 99 _pthread_attr_setguardsize(dst, u); 100} 101 102static __inline int 103have_threads(void) 104{ 105 return (&pthread_create != NULL); 106} 107 108void 109__sigev_thread_init(void) 110{ 111 static int inited = 0; 112 pthread_mutexattr_t mattr; 113 int i; 114 115 _pthread_mutexattr_init(&mattr); 116 _pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_NORMAL); 117 sigev_list_mtx = malloc(sizeof(pthread_mutex_t)); 118 _pthread_mutex_init(sigev_list_mtx, &mattr); 119 _pthread_mutexattr_destroy(&mattr); 120 121 for (i = 0; i < HASH_QUEUES; ++i) 122 LIST_INIT(&sigev_hash[i]); 123 LIST_INIT(&sigev_all); 124 LIST_INIT(&sigev_threads); 125 sigev_default_thread = NULL; 126 if (atfork_registered == 0) { 127 _pthread_atfork( 128 __sigev_fork_prepare, 129 __sigev_fork_parent, 130 __sigev_fork_child); 131 atfork_registered = 1; 132 } 133 if (!inited) { 134 _pthread_attr_init(&sigev_default_attr); 135 _pthread_attr_setscope(&sigev_default_attr, 136 PTHREAD_SCOPE_SYSTEM); 137 _pthread_attr_setdetachstate(&sigev_default_attr, 138 PTHREAD_CREATE_DETACHED); 139 inited = 1; 140 } 141 sigev_default_thread = sigev_thread_create(0); 142} 143 144int 145__sigev_check_init(void) 146{ 147 if (!have_threads()) 148 return (-1); 149 150 _pthread_once(&sigev_once, __sigev_thread_init); 151 return (sigev_default_thread != NULL) ? 0 : -1; 152} 153 154static void 155__sigev_fork_prepare(void) 156{ 157} 158 159static void 160__sigev_fork_parent(void) 161{ 162} 163 164static void 165__sigev_fork_child(void) 166{ 167 /* 168 * This is a hack, the thread libraries really should 169 * check if the handlers were already registered in 170 * pthread_atfork(). 171 */ 172 atfork_registered = 1; 173 memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once)); 174 __sigev_thread_init(); 175} 176 177void 178__sigev_list_lock(void) 179{ 180 _pthread_mutex_lock(sigev_list_mtx); 181} 182 183void 184__sigev_list_unlock(void) 185{ 186 _pthread_mutex_unlock(sigev_list_mtx); 187} 188 189struct sigev_node * 190__sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev, 191 int usedefault) 192{ 193 struct sigev_node *sn; 194 195 sn = calloc(1, sizeof(*sn)); 196 if (sn != NULL) { 197 sn->sn_value = evp->sigev_value; 198 sn->sn_func = evp->sigev_notify_function; 199 sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1); 200 sn->sn_type = type; 201 _pthread_attr_init(&sn->sn_attr); 202 _pthread_attr_setdetachstate(&sn->sn_attr, PTHREAD_CREATE_DETACHED); 203 if (evp->sigev_notify_attributes) 204 attrcopy(evp->sigev_notify_attributes, &sn->sn_attr); 205 if (prev) { 206 __sigev_list_lock(); 207 prev->sn_tn->tn_refcount++; 208 __sigev_list_unlock(); 209 sn->sn_tn = prev->sn_tn; 210 } else { 211 sn->sn_tn = sigev_thread_create(usedefault); 212 if (sn->sn_tn == NULL) { 213 _pthread_attr_destroy(&sn->sn_attr); 214 free(sn); 215 sn = NULL; 216 } 217 } 218 } 219 return (sn); 220} 221 222void 223__sigev_get_sigevent(struct sigev_node *sn, struct sigevent *newevp, 224 sigev_id_t id) 225{ 226 /* 227 * Build a new sigevent, and tell kernel to deliver SIGSERVICE 228 * signal to the new thread. 229 */ 230 newevp->sigev_notify = SIGEV_THREAD_ID; 231 newevp->sigev_signo = SIGSERVICE; 232 newevp->sigev_notify_thread_id = (lwpid_t)sn->sn_tn->tn_lwpid; 233 newevp->sigev_value.sival_ptr = (void *)id; 234} 235 236void 237__sigev_free(struct sigev_node *sn) 238{ 239 _pthread_attr_destroy(&sn->sn_attr); 240 free(sn); 241} 242 243struct sigev_node * 244__sigev_find(int type, sigev_id_t id) 245{ 246 struct sigev_node *sn; 247 int chain = HASH(type, id); 248 249 LIST_FOREACH(sn, &sigev_hash[chain], sn_link) { 250 if (sn->sn_type == type && sn->sn_id == id) 251 break; 252 } 253 return (sn); 254} 255 256int 257__sigev_register(struct sigev_node *sn) 258{ 259 int chain = HASH(sn->sn_type, sn->sn_id); 260 261 LIST_INSERT_HEAD(&sigev_hash[chain], sn, sn_link); 262 return (0); 263} 264 265int 266__sigev_delete(int type, sigev_id_t id) 267{ 268 struct sigev_node *sn; 269 270 sn = __sigev_find(type, id); 271 if (sn != NULL) 272 return (__sigev_delete_node(sn)); 273 return (0); 274} 275 276int 277__sigev_delete_node(struct sigev_node *sn) 278{ 279 LIST_REMOVE(sn, sn_link); 280 281 if (--sn->sn_tn->tn_refcount == 0) 282 _pthread_kill(sn->sn_tn->tn_thread, SIGSERVICE); 283 if (sn->sn_flags & SNF_WORKING) 284 sn->sn_flags |= SNF_REMOVED; 285 else 286 __sigev_free(sn); 287 return (0); 288} 289 290static sigev_id_t 291sigev_get_id(siginfo_t *si) 292{ 293 switch(si->si_code) { 294 case SI_TIMER: 295 return (si->si_timerid); 296 case SI_MESGQ: 297 return (si->si_mqd); 298 case SI_ASYNCIO: 299 return (sigev_id_t)si->si_value.sival_ptr; 300 } 301 return (-1); 302} 303 304static struct sigev_thread * 305sigev_thread_create(int usedefault) 306{ 307 struct sigev_thread *tn; 308 sigset_t set; 309 int ret; 310 311 if (usedefault && sigev_default_thread) { 312 __sigev_list_lock(); 313 sigev_default_thread->tn_refcount++; 314 __sigev_list_unlock(); 315 return (sigev_default_thread); 316 } 317 318 tn = malloc(sizeof(*tn)); 319 tn->tn_cur = NULL; 320 tn->tn_lwpid = -1; 321 tn->tn_refcount = 1; 322 _pthread_cond_init(&tn->tn_cv, NULL); 323 324 /* for debug */ 325 __sigev_list_lock(); 326 LIST_INSERT_HEAD(&sigev_threads, tn, tn_link); 327 __sigev_list_unlock(); 328 329 sigemptyset(&set); 330 sigaddset(&set, SIGSERVICE); 331 332 _sigprocmask(SIG_BLOCK, &set, NULL); 333 ret = pthread_create(&tn->tn_thread, &sigev_default_attr, 334 sigev_service_loop, tn); 335 _sigprocmask(SIG_UNBLOCK, &set, NULL); 336 337 if (ret != 0) { 338 __sigev_list_lock(); 339 LIST_REMOVE(tn, tn_link); 340 __sigev_list_unlock(); 341 free(tn); 342 tn = NULL; 343 } else { 344 /* wait the thread to get its lwpid */ 345 346 __sigev_list_lock(); 347 while (tn->tn_lwpid == -1) 348 _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx); 349 __sigev_list_unlock(); 350 } 351 return (tn); 352} 353 354/* 355 * The thread receives notification from kernel and creates 356 * a thread to call user callback function. 357 */ 358static void * 359sigev_service_loop(void *arg) 360{ 361 static int failure; 362 363 siginfo_t si; 364 sigset_t set; 365 struct sigev_thread *tn; 366 struct sigev_node *sn; 367 sigev_id_t id; 368 pthread_t td; 369 int ret; 370 371 tn = arg; 372 thr_self(&tn->tn_lwpid); 373 __sigev_list_lock(); 374 _pthread_cond_broadcast(&tn->tn_cv); 375 __sigev_list_unlock(); 376 377 /* 378 * Service thread should not be killed by callback, if user 379 * attempts to do so, the thread will be restarted. 380 */ 381 sigemptyset(&set); 382 sigaddset(&set, SIGSERVICE); 383 for (;;) { 384 ret = sigwaitinfo(&set, &si); 385 386 __sigev_list_lock(); 387 if (tn->tn_refcount == 0) { 388 LIST_REMOVE(tn, tn_link); 389 __sigev_list_unlock(); 390 free(tn); 391 break; 392 } 393 394 if (ret == -1) { 395 __sigev_list_unlock(); 396 continue; 397 } 398 399 id = sigev_get_id(&si); 400 sn = __sigev_find(si.si_code, id); 401 if (sn == NULL) { 402 __sigev_list_unlock(); 403 continue; 404 } 405 406 sn->sn_info = si; 407 if (sn->sn_flags & SNF_SYNC) 408 tn->tn_cur = sn; 409 else 410 tn->tn_cur = NULL; 411 sn->sn_flags |= SNF_WORKING; 412 __sigev_list_unlock(); 413 414 ret = pthread_create(&td, &sn->sn_attr, worker_routine, sn); 415 if (ret != 0) { 416 if (failure++ < 5) 417 warnc(ret, "%s:%s failed to create thread.\n", 418 __FILE__, __func__); 419 420 __sigev_list_lock(); 421 sn->sn_flags &= ~SNF_WORKING; 422 if (sn->sn_flags & SNF_REMOVED) 423 __sigev_free(sn); 424 __sigev_list_unlock(); 425 } else if (tn->tn_cur) { 426 __sigev_list_lock(); 427 while (tn->tn_cur) 428 _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx); 429 __sigev_list_unlock(); 430 } 431 } 432 return (0); 433} 434 435/* 436 * newly created worker thread to call user callback function. 437 */ 438static void * 439worker_routine(void *arg) 440{ 441 struct sigev_node *sn = arg; 442 443 _pthread_cleanup_push(worker_cleanup, sn); 444 sn->sn_dispatch(sn); 445 _pthread_cleanup_pop(1); 446 447 return (0); 448} 449 450/* clean up a notification after dispatch. */ 451static void 452worker_cleanup(void *arg) 453{ 454 struct sigev_node *sn = arg; 455 456 __sigev_list_lock(); 457 if (sn->sn_flags & SNF_SYNC) { 458 sn->sn_tn->tn_cur = NULL; 459 _pthread_cond_broadcast(&sn->sn_tn->tn_cv); 460 } 461 if (sn->sn_flags & SNF_REMOVED) 462 __sigev_free(sn); 463 else 464 sn->sn_flags &= ~SNF_WORKING; 465 __sigev_list_unlock(); 466} 467