work_thread.c revision 1.4
1/* $NetBSD: work_thread.c,v 1.4 2016/01/08 21:35:39 christos Exp $ */ 2 3/* 4 * work_thread.c - threads implementation for blocking worker child. 5 */ 6#include <config.h> 7#include "ntp_workimpl.h" 8 9#ifdef WORK_THREAD 10 11#include <stdio.h> 12#include <ctype.h> 13#include <signal.h> 14#ifndef SYS_WINNT 15#include <pthread.h> 16#endif 17 18#include "ntp_stdlib.h" 19#include "ntp_malloc.h" 20#include "ntp_syslog.h" 21#include "ntpd.h" 22#include "ntp_io.h" 23#include "ntp_assert.h" 24#include "ntp_unixtime.h" 25#include "timespecops.h" 26#include "ntp_worker.h" 27 28#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 29#define CHILD_GONE_RESP CHILD_EXIT_REQ 30#define WORKITEMS_ALLOC_INC 16 31#define RESPONSES_ALLOC_INC 4 32 33#ifndef THREAD_MINSTACKSIZE 34#define THREAD_MINSTACKSIZE (64U * 1024) 35#endif 36 37#ifdef SYS_WINNT 38 39# define thread_exit(c) _endthreadex(c) 40# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 41u_int WINAPI blocking_thread(void *); 42static BOOL same_os_sema(const sem_ref obj, void * osobj); 43 44#else 45 46# define thread_exit(c) pthread_exit((void*)(size_t)(c)) 47# define tickle_sem sem_post 48void * blocking_thread(void *); 49static void block_thread_signals(sigset_t *); 50 51#endif 52 53#ifdef WORK_PIPE 54addremove_io_fd_func addremove_io_fd; 55#else 56addremove_io_semaphore_func addremove_io_semaphore; 57#endif 58 59static void start_blocking_thread(blocking_child *); 60static void start_blocking_thread_internal(blocking_child *); 61static void prepare_child_sems(blocking_child *); 62static int wait_for_sem(sem_ref, struct timespec *); 63static int ensure_workitems_empty_slot(blocking_child *); 64static int ensure_workresp_empty_slot(blocking_child *); 65static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 66static void cleanup_after_child(blocking_child *); 67 68 69void 70exit_worker( 71 int exitcode 72 ) 73{ 74 thread_exit(exitcode); /* see #define thread_exit */ 75} 76 77/* -------------------------------------------------------------------- 78 * sleep for a given time or until the wakup semaphore is tickled. 79 */ 80int 81worker_sleep( 82 blocking_child * c, 83 time_t seconds 84 ) 85{ 86 struct timespec until; 87 int rc; 88 89# ifdef HAVE_CLOCK_GETTIME 90 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 91 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 92 return -1; 93 } 94# else 95 if (0 != getclock(TIMEOFDAY, &until)) { 96 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 97 return -1; 98 } 99# endif 100 until.tv_sec += seconds; 101 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 102 if (0 == rc) 103 return -1; 104 if (-1 == rc && ETIMEDOUT == errno) 105 return 0; 106 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 107 return -1; 108} 109 110 111/* -------------------------------------------------------------------- 112 * Wake up a worker that takes a nap. 113 */ 114void 115interrupt_worker_sleep(void) 116{ 117 u_int idx; 118 blocking_child * c; 119 120 for (idx = 0; idx < blocking_children_alloc; idx++) { 121 c = blocking_children[idx]; 122 if (NULL == c || NULL == c->wake_scheduled_sleep) 123 continue; 124 tickle_sem(c->wake_scheduled_sleep); 125 } 126} 127 128/* -------------------------------------------------------------------- 129 * Make sure there is an empty slot at the head of the request 130 * queue. Tell if the queue is currently empty. 131 */ 132static int 133ensure_workitems_empty_slot( 134 blocking_child *c 135 ) 136{ 137 /* 138 ** !!! PRECONDITION: caller holds access lock! 139 ** 140 ** This simply tries to increase the size of the buffer if it 141 ** becomes full. The resize operation does *not* maintain the 142 ** order of requests, but that should be irrelevant since the 143 ** processing is considered asynchronous anyway. 144 ** 145 ** Return if the buffer is currently empty. 146 */ 147 148 static const size_t each = 149 sizeof(blocking_children[0]->workitems[0]); 150 151 size_t new_alloc; 152 size_t slots_used; 153 154 slots_used = c->head_workitem - c->tail_workitem; 155 if (slots_used >= c->workitems_alloc) { 156 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 157 c->workitems = erealloc(c->workitems, new_alloc * each); 158 c->tail_workitem = 0; 159 c->head_workitem = c->workitems_alloc; 160 c->workitems_alloc = new_alloc; 161 } 162 return (0 == slots_used); 163} 164 165/* -------------------------------------------------------------------- 166 * Make sure there is an empty slot at the head of the response 167 * queue. Tell if the queue is currently empty. 168 */ 169static int 170ensure_workresp_empty_slot( 171 blocking_child *c 172 ) 173{ 174 /* 175 ** !!! PRECONDITION: caller holds access lock! 176 ** 177 ** Works like the companion function above. 178 */ 179 180 static const size_t each = 181 sizeof(blocking_children[0]->responses[0]); 182 183 size_t new_alloc; 184 size_t slots_used; 185 186 slots_used = c->head_response - c->tail_response; 187 if (slots_used >= c->responses_alloc) { 188 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 189 c->responses = erealloc(c->responses, new_alloc * each); 190 c->tail_response = 0; 191 c->head_response = c->responses_alloc; 192 c->responses_alloc = new_alloc; 193 } 194 return (0 == slots_used); 195} 196 197 198/* -------------------------------------------------------------------- 199 * queue_req_pointer() - append a work item or idle exit request to 200 * blocking_workitems[]. Employ proper locking. 201 */ 202static int 203queue_req_pointer( 204 blocking_child * c, 205 blocking_pipe_header * hdr 206 ) 207{ 208 size_t qhead; 209 210 /* >>>> ACCESS LOCKING STARTS >>>> */ 211 wait_for_sem(c->accesslock, NULL); 212 ensure_workitems_empty_slot(c); 213 qhead = c->head_workitem; 214 c->workitems[qhead % c->workitems_alloc] = hdr; 215 c->head_workitem = 1 + qhead; 216 tickle_sem(c->accesslock); 217 /* <<<< ACCESS LOCKING ENDS <<<< */ 218 219 /* queue consumer wake-up notification */ 220 tickle_sem(c->workitems_pending); 221 222 return 0; 223} 224 225/* -------------------------------------------------------------------- 226 * API function to make sure a worker is running, a proper private copy 227 * of the data is made, the data eneterd into the queue and the worker 228 * is signalled. 229 */ 230int 231send_blocking_req_internal( 232 blocking_child * c, 233 blocking_pipe_header * hdr, 234 void * data 235 ) 236{ 237 blocking_pipe_header * threadcopy; 238 size_t payload_octets; 239 240 REQUIRE(hdr != NULL); 241 REQUIRE(data != NULL); 242 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 243 244 if (hdr->octets <= sizeof(*hdr)) 245 return 1; /* failure */ 246 payload_octets = hdr->octets - sizeof(*hdr); 247 248 if (NULL == c->thread_ref) 249 start_blocking_thread(c); 250 threadcopy = emalloc(hdr->octets); 251 memcpy(threadcopy, hdr, sizeof(*hdr)); 252 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 253 254 return queue_req_pointer(c, threadcopy); 255} 256 257/* -------------------------------------------------------------------- 258 * Wait for the 'incoming queue no longer empty' signal, lock the shared 259 * structure and dequeue an item. 260 */ 261blocking_pipe_header * 262receive_blocking_req_internal( 263 blocking_child * c 264 ) 265{ 266 blocking_pipe_header * req; 267 size_t qhead, qtail; 268 269 req = NULL; 270 do { 271 /* wait for tickle from the producer side */ 272 wait_for_sem(c->workitems_pending, NULL); 273 274 /* >>>> ACCESS LOCKING STARTS >>>> */ 275 wait_for_sem(c->accesslock, NULL); 276 qhead = c->head_workitem; 277 do { 278 qtail = c->tail_workitem; 279 if (qhead == qtail) 280 break; 281 c->tail_workitem = qtail + 1; 282 qtail %= c->workitems_alloc; 283 req = c->workitems[qtail]; 284 c->workitems[qtail] = NULL; 285 } while (NULL == req); 286 tickle_sem(c->accesslock); 287 /* <<<< ACCESS LOCKING ENDS <<<< */ 288 289 } while (NULL == req); 290 291 INSIST(NULL != req); 292 if (CHILD_EXIT_REQ == req) { /* idled out */ 293 send_blocking_resp_internal(c, CHILD_GONE_RESP); 294 req = NULL; 295 } 296 297 return req; 298} 299 300/* -------------------------------------------------------------------- 301 * Push a response into the return queue and eventually tickle the 302 * receiver. 303 */ 304int 305send_blocking_resp_internal( 306 blocking_child * c, 307 blocking_pipe_header * resp 308 ) 309{ 310 size_t qhead; 311 int empty; 312 313 /* >>>> ACCESS LOCKING STARTS >>>> */ 314 wait_for_sem(c->accesslock, NULL); 315 empty = ensure_workresp_empty_slot(c); 316 qhead = c->head_response; 317 c->responses[qhead % c->responses_alloc] = resp; 318 c->head_response = 1 + qhead; 319 tickle_sem(c->accesslock); 320 /* <<<< ACCESS LOCKING ENDS <<<< */ 321 322 /* queue consumer wake-up notification */ 323 if (empty) 324 { 325# ifdef WORK_PIPE 326 write(c->resp_write_pipe, "", 1); 327# else 328 tickle_sem(c->responses_pending); 329# endif 330 } 331 return 0; 332} 333 334 335#ifndef WORK_PIPE 336 337/* -------------------------------------------------------------------- 338 * Check if a (Windows-)hanndle to a semaphore is actually the same we 339 * are using inside the sema wrapper. 340 */ 341static BOOL 342same_os_sema( 343 const sem_ref obj, 344 void* osh 345 ) 346{ 347 return obj && osh && (obj->shnd == (HANDLE)osh); 348} 349 350/* -------------------------------------------------------------------- 351 * Find the shared context that associates to an OS handle and make sure 352 * the data is dequeued and processed. 353 */ 354void 355handle_blocking_resp_sem( 356 void * context 357 ) 358{ 359 blocking_child * c; 360 u_int idx; 361 362 c = NULL; 363 for (idx = 0; idx < blocking_children_alloc; idx++) { 364 c = blocking_children[idx]; 365 if (c != NULL && 366 c->thread_ref != NULL && 367 same_os_sema(c->responses_pending, context)) 368 break; 369 } 370 if (idx < blocking_children_alloc) 371 process_blocking_resp(c); 372} 373#endif /* !WORK_PIPE */ 374 375/* -------------------------------------------------------------------- 376 * Fetch the next response from the return queue. In case of signalling 377 * via pipe, make sure the pipe is flushed, too. 378 */ 379blocking_pipe_header * 380receive_blocking_resp_internal( 381 blocking_child * c 382 ) 383{ 384 blocking_pipe_header * removed; 385 size_t qhead, qtail, slot; 386 387#ifdef WORK_PIPE 388 int rc; 389 char scratch[32]; 390 391 do 392 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 393 while (-1 == rc && EINTR == errno); 394#endif 395 396 /* >>>> ACCESS LOCKING STARTS >>>> */ 397 wait_for_sem(c->accesslock, NULL); 398 qhead = c->head_response; 399 qtail = c->tail_response; 400 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 401 slot = qtail % c->responses_alloc; 402 removed = c->responses[slot]; 403 c->responses[slot] = NULL; 404 } 405 c->tail_response = qtail; 406 tickle_sem(c->accesslock); 407 /* <<<< ACCESS LOCKING ENDS <<<< */ 408 409 if (NULL != removed) { 410 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 411 BLOCKING_RESP_MAGIC == removed->magic_sig); 412 } 413 if (CHILD_GONE_RESP == removed) { 414 cleanup_after_child(c); 415 removed = NULL; 416 } 417 418 return removed; 419} 420 421/* -------------------------------------------------------------------- 422 * Light up a new worker. 423 */ 424static void 425start_blocking_thread( 426 blocking_child * c 427 ) 428{ 429 430 DEBUG_INSIST(!c->reusable); 431 432 prepare_child_sems(c); 433 start_blocking_thread_internal(c); 434} 435 436/* -------------------------------------------------------------------- 437 * Create a worker thread. There are several differences between POSIX 438 * and Windows, of course -- most notably the Windows thread is no 439 * detached thread, and we keep the handle around until we want to get 440 * rid of the thread. The notification scheme also differs: Windows 441 * makes use of semaphores in both directions, POSIX uses a pipe for 442 * integration with 'select()' or alike. 443 */ 444static void 445start_blocking_thread_internal( 446 blocking_child * c 447 ) 448#ifdef SYS_WINNT 449{ 450 BOOL resumed; 451 452 c->thread_ref = NULL; 453 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 454 c->thr_table[0].thnd = 455 (HANDLE)_beginthreadex( 456 NULL, 457 0, 458 &blocking_thread, 459 c, 460 CREATE_SUSPENDED, 461 NULL); 462 463 if (NULL == c->thr_table[0].thnd) { 464 msyslog(LOG_ERR, "start blocking thread failed: %m"); 465 exit(-1); 466 } 467 /* remember the thread priority is only within the process class */ 468 if (!SetThreadPriority(c->thr_table[0].thnd, 469 THREAD_PRIORITY_BELOW_NORMAL)) 470 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 471 472 resumed = ResumeThread(c->thr_table[0].thnd); 473 DEBUG_INSIST(resumed); 474 c->thread_ref = &c->thr_table[0]; 475} 476#else /* pthreads start_blocking_thread_internal() follows */ 477{ 478# ifdef NEED_PTHREAD_INIT 479 static int pthread_init_called; 480# endif 481 pthread_attr_t thr_attr; 482 int rc; 483 int saved_errno; 484 int pipe_ends[2]; /* read then write */ 485 int is_pipe; 486 int flags; 487 size_t stacksize; 488 sigset_t saved_sig_mask; 489 490 c->thread_ref = NULL; 491 492# ifdef NEED_PTHREAD_INIT 493 /* 494 * from lib/isc/unix/app.c: 495 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 496 */ 497 if (!pthread_init_called) { 498 pthread_init(); 499 pthread_init_called = TRUE; 500 } 501# endif 502 503 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 504 if (0 != rc) { 505 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 506 exit(1); 507 } 508 c->resp_read_pipe = move_fd(pipe_ends[0]); 509 c->resp_write_pipe = move_fd(pipe_ends[1]); 510 c->ispipe = is_pipe; 511 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 512 if (-1 == flags) { 513 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 514 exit(1); 515 } 516 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 517 if (-1 == rc) { 518 msyslog(LOG_ERR, 519 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 520 exit(1); 521 } 522 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 523 pthread_attr_init(&thr_attr); 524 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 525#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 526 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 527 rc = pthread_attr_getstacksize(&thr_attr, &stacksize); 528 if (-1 == rc) { 529 msyslog(LOG_ERR, 530 "start_blocking_thread: pthread_attr_getstacksize %m"); 531 } else if (stacksize < THREAD_MINSTACKSIZE) { 532 rc = pthread_attr_setstacksize(&thr_attr, 533 THREAD_MINSTACKSIZE); 534 if (-1 == rc) 535 msyslog(LOG_ERR, 536 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m", 537 (u_long)stacksize, 538 (u_long)THREAD_MINSTACKSIZE); 539 } 540#else 541 UNUSED_ARG(stacksize); 542#endif 543#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 544 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 545#endif 546 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 547 block_thread_signals(&saved_sig_mask); 548 rc = pthread_create(&c->thr_table[0], &thr_attr, 549 &blocking_thread, c); 550 saved_errno = errno; 551 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 552 pthread_attr_destroy(&thr_attr); 553 if (0 != rc) { 554 errno = saved_errno; 555 msyslog(LOG_ERR, "pthread_create() blocking child: %m"); 556 exit(1); 557 } 558 c->thread_ref = &c->thr_table[0]; 559} 560#endif 561 562/* -------------------------------------------------------------------- 563 * block_thread_signals() 564 * 565 * Temporarily block signals used by ntpd main thread, so that signal 566 * mask inherited by child threads leaves them blocked. Returns prior 567 * active signal mask via pmask, to be restored by the main thread 568 * after pthread_create(). 569 */ 570#ifndef SYS_WINNT 571void 572block_thread_signals( 573 sigset_t * pmask 574 ) 575{ 576 sigset_t block; 577 578 sigemptyset(&block); 579# ifdef HAVE_SIGNALED_IO 580# ifdef SIGIO 581 sigaddset(&block, SIGIO); 582# endif 583# ifdef SIGPOLL 584 sigaddset(&block, SIGPOLL); 585# endif 586# endif /* HAVE_SIGNALED_IO */ 587 sigaddset(&block, SIGALRM); 588 sigaddset(&block, MOREDEBUGSIG); 589 sigaddset(&block, LESSDEBUGSIG); 590# ifdef SIGDIE1 591 sigaddset(&block, SIGDIE1); 592# endif 593# ifdef SIGDIE2 594 sigaddset(&block, SIGDIE2); 595# endif 596# ifdef SIGDIE3 597 sigaddset(&block, SIGDIE3); 598# endif 599# ifdef SIGDIE4 600 sigaddset(&block, SIGDIE4); 601# endif 602# ifdef SIGBUS 603 sigaddset(&block, SIGBUS); 604# endif 605 sigemptyset(pmask); 606 pthread_sigmask(SIG_BLOCK, &block, pmask); 607} 608#endif /* !SYS_WINNT */ 609 610 611/* -------------------------------------------------------------------- 612 * Create & destroy semaphores. This is sufficiently different between 613 * POSIX and Windows to warrant wrapper functions and close enough to 614 * use the concept of synchronization via semaphore for all platforms. 615 */ 616static sem_ref 617create_sema( 618 sema_type* semptr, 619 u_int inival, 620 u_int maxval) 621{ 622#ifdef SYS_WINNT 623 624 long svini, svmax; 625 if (NULL != semptr) { 626 svini = (inival < LONG_MAX) 627 ? (long)inival : LONG_MAX; 628 svmax = (maxval < LONG_MAX && maxval > 0) 629 ? (long)maxval : LONG_MAX; 630 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 631 if (NULL == semptr->shnd) 632 semptr = NULL; 633 } 634 635#else 636 637 (void)maxval; 638 if (semptr && sem_init(semptr, FALSE, inival)) 639 semptr = NULL; 640 641#endif 642 643 return semptr; 644} 645 646/* ------------------------------------------------------------------ */ 647static sem_ref 648delete_sema( 649 sem_ref obj) 650{ 651 652# ifdef SYS_WINNT 653 654 if (obj) { 655 if (obj->shnd) 656 CloseHandle(obj->shnd); 657 obj->shnd = NULL; 658 } 659 660# else 661 662 if (obj) 663 sem_destroy(obj); 664 665# endif 666 667 return NULL; 668} 669 670/* -------------------------------------------------------------------- 671 * prepare_child_sems() 672 * 673 * create sync & access semaphores 674 * 675 * All semaphores are cleared, only the access semaphore has 1 unit. 676 * Childs wait on 'workitems_pending', then grabs 'sema_access' 677 * and dequeues jobs. When done, 'sema_access' is given one unit back. 678 * 679 * The producer grabs 'sema_access', manages the queue, restores 680 * 'sema_access' and puts one unit into 'workitems_pending'. 681 * 682 * The story goes the same for the response queue. 683 */ 684static void 685prepare_child_sems( 686 blocking_child *c 687 ) 688{ 689 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 690 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 691 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 692# ifndef WORK_PIPE 693 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 694# endif 695} 696 697/* -------------------------------------------------------------------- 698 * wait for semaphore. Where the wait can be interrupted, it will 699 * internally resume -- When this function returns, there is either no 700 * semaphore at all, a timeout occurred, or the caller could 701 * successfully take a token from the semaphore. 702 * 703 * For untimed wait, not checking the result of this function at all is 704 * definitely an option. 705 */ 706static int 707wait_for_sem( 708 sem_ref sem, 709 struct timespec * timeout /* wall-clock */ 710 ) 711#ifdef SYS_WINNT 712{ 713 struct timespec now; 714 struct timespec delta; 715 DWORD msec; 716 DWORD rc; 717 718 if (!(sem && sem->shnd)) { 719 errno = EINVAL; 720 return -1; 721 } 722 723 if (NULL == timeout) { 724 msec = INFINITE; 725 } else { 726 getclock(TIMEOFDAY, &now); 727 delta = sub_tspec(*timeout, now); 728 if (delta.tv_sec < 0) { 729 msec = 0; 730 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 731 msec = INFINITE; 732 } else { 733 msec = 1000 * (DWORD)delta.tv_sec; 734 msec += delta.tv_nsec / (1000 * 1000); 735 } 736 } 737 rc = WaitForSingleObject(sem->shnd, msec); 738 if (WAIT_OBJECT_0 == rc) 739 return 0; 740 if (WAIT_TIMEOUT == rc) { 741 errno = ETIMEDOUT; 742 return -1; 743 } 744 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 745 errno = EFAULT; 746 return -1; 747} 748#else /* pthreads wait_for_sem() follows */ 749{ 750 int rc = -1; 751 752 if (sem) do { 753 if (NULL == timeout) 754 rc = sem_wait(sem); 755 else 756 rc = sem_timedwait(sem, timeout); 757 } while (rc == -1 && errno == EINTR); 758 else 759 errno = EINVAL; 760 761 return rc; 762} 763#endif 764 765/* -------------------------------------------------------------------- 766 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 767 * calling conventions under Windows and POSIX-defined signature 768 * otherwise. 769 */ 770#ifdef SYS_WINNT 771u_int WINAPI 772#else 773void * 774#endif 775blocking_thread( 776 void * ThreadArg 777 ) 778{ 779 blocking_child *c; 780 781 c = ThreadArg; 782 exit_worker(blocking_child_common(c)); 783 784 /* NOTREACHED */ 785 return 0; 786} 787 788/* -------------------------------------------------------------------- 789 * req_child_exit() runs in the parent. 790 * 791 * This function is called from from the idle timer, too, and possibly 792 * without a thread being there any longer. Since we have folded up our 793 * tent in that case and all the semaphores are already gone, we simply 794 * ignore this request in this case. 795 * 796 * Since the existence of the semaphores is controlled exclusively by 797 * the parent, there's no risk of data race here. 798 */ 799int 800req_child_exit( 801 blocking_child *c 802 ) 803{ 804 return (c->accesslock) 805 ? queue_req_pointer(c, CHILD_EXIT_REQ) 806 : 0; 807} 808 809/* -------------------------------------------------------------------- 810 * cleanup_after_child() runs in parent. 811 */ 812static void 813cleanup_after_child( 814 blocking_child * c 815 ) 816{ 817 DEBUG_INSIST(!c->reusable); 818 819# ifdef SYS_WINNT 820 /* The thread was not created in detached state, so we better 821 * clean up. 822 */ 823 if (c->thread_ref && c->thread_ref->thnd) { 824 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 825 INSIST(CloseHandle(c->thread_ref->thnd)); 826 c->thread_ref->thnd = NULL; 827 } 828# endif 829 c->thread_ref = NULL; 830 831 /* remove semaphores and (if signalling vi IO) pipes */ 832 833 c->accesslock = delete_sema(c->accesslock); 834 c->workitems_pending = delete_sema(c->workitems_pending); 835 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 836 837# ifdef WORK_PIPE 838 DEBUG_INSIST(-1 != c->resp_read_pipe); 839 DEBUG_INSIST(-1 != c->resp_write_pipe); 840 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 841 close(c->resp_write_pipe); 842 close(c->resp_read_pipe); 843 c->resp_write_pipe = -1; 844 c->resp_read_pipe = -1; 845# else 846 DEBUG_INSIST(NULL != c->responses_pending); 847 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 848 c->responses_pending = delete_sema(c->responses_pending); 849# endif 850 851 /* Is it necessary to check if there are pending requests and 852 * responses? If so, and if there are, what to do with them? 853 */ 854 855 /* re-init buffer index sequencers */ 856 c->head_workitem = 0; 857 c->tail_workitem = 0; 858 c->head_response = 0; 859 c->tail_response = 0; 860 861 c->reusable = TRUE; 862} 863 864 865#else /* !WORK_THREAD follows */ 866char work_thread_nonempty_compilation_unit; 867#endif 868