1/* $NetBSD: work_thread.c,v 1.8 2020/05/25 20:47:24 christos Exp $ */ 2 3/* 4 * work_thread.c - threads implementation for blocking worker child. 5 */ 6#include <config.h> 7#include "ntp_workimpl.h" 8 9#ifdef WORK_THREAD 10 11#include <stdio.h> 12#include <ctype.h> 13#include <signal.h> 14#ifndef SYS_WINNT 15#include <pthread.h> 16#endif 17 18#include "ntp_stdlib.h" 19#include "ntp_malloc.h" 20#include "ntp_syslog.h" 21#include "ntpd.h" 22#include "ntp_io.h" 23#include "ntp_assert.h" 24#include "ntp_unixtime.h" 25#include "timespecops.h" 26#include "ntp_worker.h" 27 28#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 29#define CHILD_GONE_RESP CHILD_EXIT_REQ 30/* Queue size increments: 31 * The request queue grows a bit faster than the response queue -- the 32 * daemon can push requests and pull results faster on avarage than the 33 * worker can process requests and push results... If this really pays 34 * off is debatable. 35 */ 36#define WORKITEMS_ALLOC_INC 16 37#define RESPONSES_ALLOC_INC 4 38 39/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 40 * set the maximum to 256kB. If the minimum goes below the 41 * system-defined minimum stack size, we have to adjust accordingly. 42 */ 43#ifndef THREAD_MINSTACKSIZE 44# define THREAD_MINSTACKSIZE (64U * 1024) 45#endif 46#ifndef __sun 47#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN 48# undef THREAD_MINSTACKSIZE 49# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN 50#endif 51#endif 52 53#ifndef THREAD_MAXSTACKSIZE 54# define THREAD_MAXSTACKSIZE (256U * 1024) 55#endif 56#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE 57# undef THREAD_MAXSTACKSIZE 58# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE 59#endif 60 61/* need a good integer to store a pointer... */ 62#ifndef UINTPTR_T 63# if defined(UINTPTR_MAX) 64# define UINTPTR_T uintptr_t 65# elif defined(UINT_PTR) 66# define UINTPTR_T UINT_PTR 67# else 68# define UINTPTR_T size_t 69# endif 70#endif 71 72 73#ifdef SYS_WINNT 74 75# define thread_exit(c) _endthreadex(c) 76# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 77u_int WINAPI blocking_thread(void *); 78static BOOL same_os_sema(const sem_ref obj, void * osobj); 79 80#else 81 82# define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c)) 83# define tickle_sem sem_post 84void * blocking_thread(void *); 85static void block_thread_signals(sigset_t *); 86 87#endif 88 89#ifdef WORK_PIPE 90addremove_io_fd_func addremove_io_fd; 91#else 92addremove_io_semaphore_func addremove_io_semaphore; 93#endif 94 95static void start_blocking_thread(blocking_child *); 96static void start_blocking_thread_internal(blocking_child *); 97static void prepare_child_sems(blocking_child *); 98static int wait_for_sem(sem_ref, struct timespec *); 99static int ensure_workitems_empty_slot(blocking_child *); 100static int ensure_workresp_empty_slot(blocking_child *); 101static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 102static void cleanup_after_child(blocking_child *); 103 104static sema_type worker_mmutex; 105static sem_ref worker_memlock; 106 107/* -------------------------------------------------------------------- 108 * locking the global worker state table (and other global stuff) 109 */ 110void 111worker_global_lock( 112 int inOrOut) 113{ 114 if (worker_memlock) { 115 if (inOrOut) 116 wait_for_sem(worker_memlock, NULL); 117 else 118 tickle_sem(worker_memlock); 119 } 120} 121 122/* -------------------------------------------------------------------- 123 * implementation isolation wrapper 124 */ 125void 126exit_worker( 127 int exitcode 128 ) 129{ 130 thread_exit(exitcode); /* see #define thread_exit */ 131} 132 133/* -------------------------------------------------------------------- 134 * sleep for a given time or until the wakup semaphore is tickled. 135 */ 136int 137worker_sleep( 138 blocking_child * c, 139 time_t seconds 140 ) 141{ 142 struct timespec until; 143 int rc; 144 145# ifdef HAVE_CLOCK_GETTIME 146 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 147 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 148 return -1; 149 } 150# else 151 if (0 != getclock(TIMEOFDAY, &until)) { 152 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 153 return -1; 154 } 155# endif 156 until.tv_sec += seconds; 157 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 158 if (0 == rc) 159 return -1; 160 if (-1 == rc && ETIMEDOUT == errno) 161 return 0; 162 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 163 return -1; 164} 165 166 167/* -------------------------------------------------------------------- 168 * Wake up a worker that takes a nap. 169 */ 170void 171interrupt_worker_sleep(void) 172{ 173 u_int idx; 174 blocking_child * c; 175 176 for (idx = 0; idx < blocking_children_alloc; idx++) { 177 c = blocking_children[idx]; 178 if (NULL == c || NULL == c->wake_scheduled_sleep) 179 continue; 180 tickle_sem(c->wake_scheduled_sleep); 181 } 182} 183 184/* -------------------------------------------------------------------- 185 * Make sure there is an empty slot at the head of the request 186 * queue. Tell if the queue is currently empty. 187 */ 188static int 189ensure_workitems_empty_slot( 190 blocking_child *c 191 ) 192{ 193 /* 194 ** !!! PRECONDITION: caller holds access lock! 195 ** 196 ** This simply tries to increase the size of the buffer if it 197 ** becomes full. The resize operation does *not* maintain the 198 ** order of requests, but that should be irrelevant since the 199 ** processing is considered asynchronous anyway. 200 ** 201 ** Return if the buffer is currently empty. 202 */ 203 204 static const size_t each = 205 sizeof(blocking_children[0]->workitems[0]); 206 207 size_t new_alloc; 208 size_t slots_used; 209 size_t sidx; 210 211 slots_used = c->head_workitem - c->tail_workitem; 212 if (slots_used >= c->workitems_alloc) { 213 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 214 c->workitems = erealloc(c->workitems, new_alloc * each); 215 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 216 c->workitems[sidx] = NULL; 217 c->tail_workitem = 0; 218 c->head_workitem = c->workitems_alloc; 219 c->workitems_alloc = new_alloc; 220 } 221 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 222 return (0 == slots_used); 223} 224 225/* -------------------------------------------------------------------- 226 * Make sure there is an empty slot at the head of the response 227 * queue. Tell if the queue is currently empty. 228 */ 229static int 230ensure_workresp_empty_slot( 231 blocking_child *c 232 ) 233{ 234 /* 235 ** !!! PRECONDITION: caller holds access lock! 236 ** 237 ** Works like the companion function above. 238 */ 239 240 static const size_t each = 241 sizeof(blocking_children[0]->responses[0]); 242 243 size_t new_alloc; 244 size_t slots_used; 245 size_t sidx; 246 247 slots_used = c->head_response - c->tail_response; 248 if (slots_used >= c->responses_alloc) { 249 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 250 c->responses = erealloc(c->responses, new_alloc * each); 251 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 252 c->responses[sidx] = NULL; 253 c->tail_response = 0; 254 c->head_response = c->responses_alloc; 255 c->responses_alloc = new_alloc; 256 } 257 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 258 return (0 == slots_used); 259} 260 261 262/* -------------------------------------------------------------------- 263 * queue_req_pointer() - append a work item or idle exit request to 264 * blocking_workitems[]. Employ proper locking. 265 */ 266static int 267queue_req_pointer( 268 blocking_child * c, 269 blocking_pipe_header * hdr 270 ) 271{ 272 size_t qhead; 273 274 /* >>>> ACCESS LOCKING STARTS >>>> */ 275 wait_for_sem(c->accesslock, NULL); 276 ensure_workitems_empty_slot(c); 277 qhead = c->head_workitem; 278 c->workitems[qhead % c->workitems_alloc] = hdr; 279 c->head_workitem = 1 + qhead; 280 tickle_sem(c->accesslock); 281 /* <<<< ACCESS LOCKING ENDS <<<< */ 282 283 /* queue consumer wake-up notification */ 284 tickle_sem(c->workitems_pending); 285 286 return 0; 287} 288 289/* -------------------------------------------------------------------- 290 * API function to make sure a worker is running, a proper private copy 291 * of the data is made, the data eneterd into the queue and the worker 292 * is signalled. 293 */ 294int 295send_blocking_req_internal( 296 blocking_child * c, 297 blocking_pipe_header * hdr, 298 void * data 299 ) 300{ 301 blocking_pipe_header * threadcopy; 302 size_t payload_octets; 303 304 REQUIRE(hdr != NULL); 305 REQUIRE(data != NULL); 306 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 307 308 if (hdr->octets <= sizeof(*hdr)) 309 return 1; /* failure */ 310 payload_octets = hdr->octets - sizeof(*hdr); 311 312 if (NULL == c->thread_ref) 313 start_blocking_thread(c); 314 threadcopy = emalloc(hdr->octets); 315 memcpy(threadcopy, hdr, sizeof(*hdr)); 316 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 317 318 return queue_req_pointer(c, threadcopy); 319} 320 321/* -------------------------------------------------------------------- 322 * Wait for the 'incoming queue no longer empty' signal, lock the shared 323 * structure and dequeue an item. 324 */ 325blocking_pipe_header * 326receive_blocking_req_internal( 327 blocking_child * c 328 ) 329{ 330 blocking_pipe_header * req; 331 size_t qhead, qtail; 332 333 req = NULL; 334 do { 335 /* wait for tickle from the producer side */ 336 wait_for_sem(c->workitems_pending, NULL); 337 338 /* >>>> ACCESS LOCKING STARTS >>>> */ 339 wait_for_sem(c->accesslock, NULL); 340 qhead = c->head_workitem; 341 do { 342 qtail = c->tail_workitem; 343 if (qhead == qtail) 344 break; 345 c->tail_workitem = qtail + 1; 346 qtail %= c->workitems_alloc; 347 req = c->workitems[qtail]; 348 c->workitems[qtail] = NULL; 349 } while (NULL == req); 350 tickle_sem(c->accesslock); 351 /* <<<< ACCESS LOCKING ENDS <<<< */ 352 353 } while (NULL == req); 354 355 INSIST(NULL != req); 356 if (CHILD_EXIT_REQ == req) { /* idled out */ 357 send_blocking_resp_internal(c, CHILD_GONE_RESP); 358 req = NULL; 359 } 360 361 return req; 362} 363 364/* -------------------------------------------------------------------- 365 * Push a response into the return queue and eventually tickle the 366 * receiver. 367 */ 368int 369send_blocking_resp_internal( 370 blocking_child * c, 371 blocking_pipe_header * resp 372 ) 373{ 374 size_t qhead; 375 int empty; 376 377 /* >>>> ACCESS LOCKING STARTS >>>> */ 378 wait_for_sem(c->accesslock, NULL); 379 empty = ensure_workresp_empty_slot(c); 380 qhead = c->head_response; 381 c->responses[qhead % c->responses_alloc] = resp; 382 c->head_response = 1 + qhead; 383 tickle_sem(c->accesslock); 384 /* <<<< ACCESS LOCKING ENDS <<<< */ 385 386 /* queue consumer wake-up notification */ 387 if (empty) 388 { 389# ifdef WORK_PIPE 390 if (1 != write(c->resp_write_pipe, "", 1)) 391 msyslog(LOG_WARNING, "async resolver: %s", 392 "failed to notify main thread!"); 393# else 394 tickle_sem(c->responses_pending); 395# endif 396 } 397 return 0; 398} 399 400 401#ifndef WORK_PIPE 402 403/* -------------------------------------------------------------------- 404 * Check if a (Windows-)hanndle to a semaphore is actually the same we 405 * are using inside the sema wrapper. 406 */ 407static BOOL 408same_os_sema( 409 const sem_ref obj, 410 void* osh 411 ) 412{ 413 return obj && osh && (obj->shnd == (HANDLE)osh); 414} 415 416/* -------------------------------------------------------------------- 417 * Find the shared context that associates to an OS handle and make sure 418 * the data is dequeued and processed. 419 */ 420void 421handle_blocking_resp_sem( 422 void * context 423 ) 424{ 425 blocking_child * c; 426 u_int idx; 427 428 c = NULL; 429 for (idx = 0; idx < blocking_children_alloc; idx++) { 430 c = blocking_children[idx]; 431 if (c != NULL && 432 c->thread_ref != NULL && 433 same_os_sema(c->responses_pending, context)) 434 break; 435 } 436 if (idx < blocking_children_alloc) 437 process_blocking_resp(c); 438} 439#endif /* !WORK_PIPE */ 440 441/* -------------------------------------------------------------------- 442 * Fetch the next response from the return queue. In case of signalling 443 * via pipe, make sure the pipe is flushed, too. 444 */ 445blocking_pipe_header * 446receive_blocking_resp_internal( 447 blocking_child * c 448 ) 449{ 450 blocking_pipe_header * removed; 451 size_t qhead, qtail, slot; 452 453#ifdef WORK_PIPE 454 int rc; 455 char scratch[32]; 456 457 do 458 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 459 while (-1 == rc && EINTR == errno); 460#endif 461 462 /* >>>> ACCESS LOCKING STARTS >>>> */ 463 wait_for_sem(c->accesslock, NULL); 464 qhead = c->head_response; 465 qtail = c->tail_response; 466 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 467 slot = qtail % c->responses_alloc; 468 removed = c->responses[slot]; 469 c->responses[slot] = NULL; 470 } 471 c->tail_response = qtail; 472 tickle_sem(c->accesslock); 473 /* <<<< ACCESS LOCKING ENDS <<<< */ 474 475 if (NULL != removed) { 476 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 477 BLOCKING_RESP_MAGIC == removed->magic_sig); 478 } 479 if (CHILD_GONE_RESP == removed) { 480 cleanup_after_child(c); 481 removed = NULL; 482 } 483 484 return removed; 485} 486 487/* -------------------------------------------------------------------- 488 * Light up a new worker. 489 */ 490static void 491start_blocking_thread( 492 blocking_child * c 493 ) 494{ 495 496 DEBUG_INSIST(!c->reusable); 497 498 prepare_child_sems(c); 499 start_blocking_thread_internal(c); 500} 501 502/* -------------------------------------------------------------------- 503 * Create a worker thread. There are several differences between POSIX 504 * and Windows, of course -- most notably the Windows thread is no 505 * detached thread, and we keep the handle around until we want to get 506 * rid of the thread. The notification scheme also differs: Windows 507 * makes use of semaphores in both directions, POSIX uses a pipe for 508 * integration with 'select()' or alike. 509 */ 510static void 511start_blocking_thread_internal( 512 blocking_child * c 513 ) 514#ifdef SYS_WINNT 515{ 516 BOOL resumed; 517 518 c->thread_ref = NULL; 519 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 520 c->thr_table[0].thnd = 521 (HANDLE)_beginthreadex( 522 NULL, 523 0, 524 &blocking_thread, 525 c, 526 CREATE_SUSPENDED, 527 NULL); 528 529 if (NULL == c->thr_table[0].thnd) { 530 msyslog(LOG_ERR, "start blocking thread failed: %m"); 531 exit(-1); 532 } 533 /* remember the thread priority is only within the process class */ 534 if (!SetThreadPriority(c->thr_table[0].thnd, 535 THREAD_PRIORITY_BELOW_NORMAL)) 536 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 537 538 resumed = ResumeThread(c->thr_table[0].thnd); 539 DEBUG_INSIST(resumed); 540 c->thread_ref = &c->thr_table[0]; 541} 542#else /* pthreads start_blocking_thread_internal() follows */ 543{ 544# ifdef NEED_PTHREAD_INIT 545 static int pthread_init_called; 546# endif 547 pthread_attr_t thr_attr; 548 int rc; 549 int pipe_ends[2]; /* read then write */ 550 int is_pipe; 551 int flags; 552 size_t ostacksize; 553 size_t nstacksize; 554 sigset_t saved_sig_mask; 555 556 c->thread_ref = NULL; 557 558# ifdef NEED_PTHREAD_INIT 559 /* 560 * from lib/isc/unix/app.c: 561 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 562 */ 563 if (!pthread_init_called) { 564 pthread_init(); 565 pthread_init_called = TRUE; 566 } 567# endif 568 569 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 570 if (0 != rc) { 571 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 572 exit(1); 573 } 574 c->resp_read_pipe = move_fd(pipe_ends[0]); 575 c->resp_write_pipe = move_fd(pipe_ends[1]); 576 c->ispipe = is_pipe; 577 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 578 if (-1 == flags) { 579 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 580 exit(1); 581 } 582 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 583 if (-1 == rc) { 584 msyslog(LOG_ERR, 585 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 586 exit(1); 587 } 588 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 589 pthread_attr_init(&thr_attr); 590 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 591#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 592 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 593 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 594 if (0 != rc) { 595 msyslog(LOG_ERR, 596 "start_blocking_thread: pthread_attr_getstacksize() -> %s", 597 strerror(rc)); 598 } else { 599 if (ostacksize < THREAD_MINSTACKSIZE) 600 nstacksize = THREAD_MINSTACKSIZE; 601 else if (ostacksize > THREAD_MAXSTACKSIZE) 602 nstacksize = THREAD_MAXSTACKSIZE; 603 else 604 nstacksize = ostacksize; 605 if (nstacksize != ostacksize) 606 rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 607 if (0 != rc) 608 msyslog(LOG_ERR, 609 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 610 (u_long)ostacksize, (u_long)nstacksize, 611 strerror(rc)); 612 } 613#else 614 UNUSED_ARG(nstacksize); 615 UNUSED_ARG(ostacksize); 616#endif 617#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 618 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 619#endif 620 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 621 block_thread_signals(&saved_sig_mask); 622 rc = pthread_create(&c->thr_table[0], &thr_attr, 623 &blocking_thread, c); 624 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 625 pthread_attr_destroy(&thr_attr); 626 if (0 != rc) { 627 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 628 strerror(rc)); 629 exit(1); 630 } 631 c->thread_ref = &c->thr_table[0]; 632} 633#endif 634 635/* -------------------------------------------------------------------- 636 * block_thread_signals() 637 * 638 * Temporarily block signals used by ntpd main thread, so that signal 639 * mask inherited by child threads leaves them blocked. Returns prior 640 * active signal mask via pmask, to be restored by the main thread 641 * after pthread_create(). 642 */ 643#ifndef SYS_WINNT 644void 645block_thread_signals( 646 sigset_t * pmask 647 ) 648{ 649 sigset_t block; 650 651 sigemptyset(&block); 652# ifdef HAVE_SIGNALED_IO 653# ifdef SIGIO 654 sigaddset(&block, SIGIO); 655# endif 656# ifdef SIGPOLL 657 sigaddset(&block, SIGPOLL); 658# endif 659# endif /* HAVE_SIGNALED_IO */ 660 sigaddset(&block, SIGALRM); 661 sigaddset(&block, MOREDEBUGSIG); 662 sigaddset(&block, LESSDEBUGSIG); 663# ifdef SIGDIE1 664 sigaddset(&block, SIGDIE1); 665# endif 666# ifdef SIGDIE2 667 sigaddset(&block, SIGDIE2); 668# endif 669# ifdef SIGDIE3 670 sigaddset(&block, SIGDIE3); 671# endif 672# ifdef SIGDIE4 673 sigaddset(&block, SIGDIE4); 674# endif 675# ifdef SIGBUS 676 sigaddset(&block, SIGBUS); 677# endif 678 sigemptyset(pmask); 679 pthread_sigmask(SIG_BLOCK, &block, pmask); 680} 681#endif /* !SYS_WINNT */ 682 683 684/* -------------------------------------------------------------------- 685 * Create & destroy semaphores. This is sufficiently different between 686 * POSIX and Windows to warrant wrapper functions and close enough to 687 * use the concept of synchronization via semaphore for all platforms. 688 */ 689static sem_ref 690create_sema( 691 sema_type* semptr, 692 u_int inival, 693 u_int maxval) 694{ 695#ifdef SYS_WINNT 696 697 long svini, svmax; 698 if (NULL != semptr) { 699 svini = (inival < LONG_MAX) 700 ? (long)inival : LONG_MAX; 701 svmax = (maxval < LONG_MAX && maxval > 0) 702 ? (long)maxval : LONG_MAX; 703 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 704 if (NULL == semptr->shnd) 705 semptr = NULL; 706 } 707 708#else 709 710 (void)maxval; 711 if (semptr && sem_init(semptr, FALSE, inival)) 712 semptr = NULL; 713 714#endif 715 716 return semptr; 717} 718 719/* ------------------------------------------------------------------ */ 720static sem_ref 721delete_sema( 722 sem_ref obj) 723{ 724 725# ifdef SYS_WINNT 726 727 if (obj) { 728 if (obj->shnd) 729 CloseHandle(obj->shnd); 730 obj->shnd = NULL; 731 } 732 733# else 734 735 if (obj) 736 sem_destroy(obj); 737 738# endif 739 740 return NULL; 741} 742 743/* -------------------------------------------------------------------- 744 * prepare_child_sems() 745 * 746 * create sync & access semaphores 747 * 748 * All semaphores are cleared, only the access semaphore has 1 unit. 749 * Childs wait on 'workitems_pending', then grabs 'sema_access' 750 * and dequeues jobs. When done, 'sema_access' is given one unit back. 751 * 752 * The producer grabs 'sema_access', manages the queue, restores 753 * 'sema_access' and puts one unit into 'workitems_pending'. 754 * 755 * The story goes the same for the response queue. 756 */ 757static void 758prepare_child_sems( 759 blocking_child *c 760 ) 761{ 762 if (NULL == worker_memlock) 763 worker_memlock = create_sema(&worker_mmutex, 1, 1); 764 765 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 766 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 767 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 768# ifndef WORK_PIPE 769 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 770# endif 771} 772 773/* -------------------------------------------------------------------- 774 * wait for semaphore. Where the wait can be interrupted, it will 775 * internally resume -- When this function returns, there is either no 776 * semaphore at all, a timeout occurred, or the caller could 777 * successfully take a token from the semaphore. 778 * 779 * For untimed wait, not checking the result of this function at all is 780 * definitely an option. 781 */ 782static int 783wait_for_sem( 784 sem_ref sem, 785 struct timespec * timeout /* wall-clock */ 786 ) 787#ifdef SYS_WINNT 788{ 789 struct timespec now; 790 struct timespec delta; 791 DWORD msec; 792 DWORD rc; 793 794 if (!(sem && sem->shnd)) { 795 errno = EINVAL; 796 return -1; 797 } 798 799 if (NULL == timeout) { 800 msec = INFINITE; 801 } else { 802 getclock(TIMEOFDAY, &now); 803 delta = sub_tspec(*timeout, now); 804 if (delta.tv_sec < 0) { 805 msec = 0; 806 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 807 msec = INFINITE; 808 } else { 809 msec = 1000 * (DWORD)delta.tv_sec; 810 msec += delta.tv_nsec / (1000 * 1000); 811 } 812 } 813 rc = WaitForSingleObject(sem->shnd, msec); 814 if (WAIT_OBJECT_0 == rc) 815 return 0; 816 if (WAIT_TIMEOUT == rc) { 817 errno = ETIMEDOUT; 818 return -1; 819 } 820 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 821 errno = EFAULT; 822 return -1; 823} 824#else /* pthreads wait_for_sem() follows */ 825{ 826 int rc = -1; 827 828 if (sem) do { 829 if (NULL == timeout) 830 rc = sem_wait(sem); 831 else 832 rc = sem_timedwait(sem, timeout); 833 } while (rc == -1 && errno == EINTR); 834 else 835 errno = EINVAL; 836 837 return rc; 838} 839#endif 840 841/* -------------------------------------------------------------------- 842 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 843 * calling conventions under Windows and POSIX-defined signature 844 * otherwise. 845 */ 846#ifdef SYS_WINNT 847u_int WINAPI 848#else 849void * 850#endif 851blocking_thread( 852 void * ThreadArg 853 ) 854{ 855 blocking_child *c; 856 857 c = ThreadArg; 858 exit_worker(blocking_child_common(c)); 859 860 /* NOTREACHED */ 861 return 0; 862} 863 864/* -------------------------------------------------------------------- 865 * req_child_exit() runs in the parent. 866 * 867 * This function is called from from the idle timer, too, and possibly 868 * without a thread being there any longer. Since we have folded up our 869 * tent in that case and all the semaphores are already gone, we simply 870 * ignore this request in this case. 871 * 872 * Since the existence of the semaphores is controlled exclusively by 873 * the parent, there's no risk of data race here. 874 */ 875int 876req_child_exit( 877 blocking_child *c 878 ) 879{ 880 return (c->accesslock) 881 ? queue_req_pointer(c, CHILD_EXIT_REQ) 882 : 0; 883} 884 885/* -------------------------------------------------------------------- 886 * cleanup_after_child() runs in parent. 887 */ 888static void 889cleanup_after_child( 890 blocking_child * c 891 ) 892{ 893 DEBUG_INSIST(!c->reusable); 894 895# ifdef SYS_WINNT 896 /* The thread was not created in detached state, so we better 897 * clean up. 898 */ 899 if (c->thread_ref && c->thread_ref->thnd) { 900 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 901 INSIST(CloseHandle(c->thread_ref->thnd)); 902 c->thread_ref->thnd = NULL; 903 } 904# endif 905 c->thread_ref = NULL; 906 907 /* remove semaphores and (if signalling vi IO) pipes */ 908 909 c->accesslock = delete_sema(c->accesslock); 910 c->workitems_pending = delete_sema(c->workitems_pending); 911 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 912 913# ifdef WORK_PIPE 914 DEBUG_INSIST(-1 != c->resp_read_pipe); 915 DEBUG_INSIST(-1 != c->resp_write_pipe); 916 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 917 close(c->resp_write_pipe); 918 close(c->resp_read_pipe); 919 c->resp_write_pipe = -1; 920 c->resp_read_pipe = -1; 921# else 922 DEBUG_INSIST(NULL != c->responses_pending); 923 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 924 c->responses_pending = delete_sema(c->responses_pending); 925# endif 926 927 /* Is it necessary to check if there are pending requests and 928 * responses? If so, and if there are, what to do with them? 929 */ 930 931 /* re-init buffer index sequencers */ 932 c->head_workitem = 0; 933 c->tail_workitem = 0; 934 c->head_response = 0; 935 c->tail_response = 0; 936 937 c->reusable = TRUE; 938} 939 940 941#else /* !WORK_THREAD follows */ 942char work_thread_nonempty_compilation_unit; 943#endif 944