1145510Sdarrenr/* 2145510Sdarrenr * work_thread.c - threads implementation for blocking worker child. 322514Sdarrenr */ 453024Sguido#include <config.h> 522514Sdarrenr#include "ntp_workimpl.h" 680486Sdarrenr 722514Sdarrenr#ifdef WORK_THREAD 8145510Sdarrenr 9145510Sdarrenr#include <stdio.h> 10145510Sdarrenr#include <ctype.h> 1192686Sdarrenr#include <signal.h> 12145510Sdarrenr#ifndef SYS_WINNT 1322514Sdarrenr#include <pthread.h> 1422514Sdarrenr#endif 1522514Sdarrenr 1622514Sdarrenr#include "ntp_stdlib.h" 1722514Sdarrenr#include "ntp_malloc.h" 1822514Sdarrenr#include "ntp_syslog.h" 1931183Speter#include "ntpd.h" 2022514Sdarrenr#include "ntp_io.h" 2131183Speter#include "ntp_assert.h" 2231183Speter#include "ntp_unixtime.h" 2331183Speter#include "timespecops.h" 2431183Speter#include "ntp_worker.h" 2531183Speter 2622514Sdarrenr#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27145510Sdarrenr#define CHILD_GONE_RESP CHILD_EXIT_REQ 28145510Sdarrenr/* Queue size increments: 29145510Sdarrenr * The request queue grows a bit faster than the response queue -- the 30145510Sdarrenr * daemon can push requests and pull results faster on avarage than the 31145510Sdarrenr * worker can process requests and push results... If this really pays 3224583Sdarrenr * off is debatable. 3322514Sdarrenr */ 3422514Sdarrenr#define WORKITEMS_ALLOC_INC 16 3522514Sdarrenr#define RESPONSES_ALLOC_INC 4 3622514Sdarrenr 3722514Sdarrenr/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 3822514Sdarrenr * set the maximum to 256kB. If the minimum goes below the 3922514Sdarrenr * system-defined minimum stack size, we have to adjust accordingly. 4022514Sdarrenr */ 4122514Sdarrenr#ifndef THREAD_MINSTACKSIZE 4222514Sdarrenr# define THREAD_MINSTACKSIZE (64U * 1024) 4322514Sdarrenr#endif 4422514Sdarrenr 4522514Sdarrenr#ifndef THREAD_MAXSTACKSIZE 4622514Sdarrenr# define THREAD_MAXSTACKSIZE (256U * 1024) 4722514Sdarrenr#endif 4822514Sdarrenr 4922514Sdarrenr/* need a good integer to store a pointer... */ 5022514Sdarrenr#ifndef UINTPTR_T 5122514Sdarrenr# if defined(UINTPTR_MAX) 5222514Sdarrenr# define UINTPTR_T uintptr_t 5322514Sdarrenr# elif defined(UINT_PTR) 5422514Sdarrenr# define UINTPTR_T UINT_PTR 5522514Sdarrenr# else 5622514Sdarrenr# define UINTPTR_T size_t 5722514Sdarrenr# endif 5822514Sdarrenr#endif 5922514Sdarrenr 6022514Sdarrenr 6122514Sdarrenr#ifdef SYS_WINNT 6222514Sdarrenr 6322514Sdarrenr# define thread_exit(c) _endthreadex(c) 6422514Sdarrenr# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 6522514Sdarrenru_int WINAPI blocking_thread(void *); 6622514Sdarrenrstatic BOOL same_os_sema(const sem_ref obj, void * osobj); 6722514Sdarrenr 6822514Sdarrenr#else 6922514Sdarrenr 7022514Sdarrenr# define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c)) 71145510Sdarrenr# define tickle_sem sem_post 7231183Spetervoid * blocking_thread(void *); 73145510Sdarrenrstatic void block_thread_signals(sigset_t *); 7431183Speter 7522514Sdarrenr#endif 7622514Sdarrenr 7722514Sdarrenr#ifdef WORK_PIPE 7822514Sdarrenraddremove_io_fd_func addremove_io_fd; 7931183Speter#else 8022514Sdarrenraddremove_io_semaphore_func addremove_io_semaphore; 8122514Sdarrenr#endif 8222514Sdarrenr 8322514Sdarrenrstatic void start_blocking_thread(blocking_child *); 8422514Sdarrenrstatic void start_blocking_thread_internal(blocking_child *); 8522514Sdarrenrstatic void prepare_child_sems(blocking_child *); 8622514Sdarrenrstatic int wait_for_sem(sem_ref, struct timespec *); 8722514Sdarrenrstatic int ensure_workitems_empty_slot(blocking_child *); 8822514Sdarrenrstatic int ensure_workresp_empty_slot(blocking_child *); 8922514Sdarrenrstatic int queue_req_pointer(blocking_child *, blocking_pipe_header *); 9022514Sdarrenrstatic void cleanup_after_child(blocking_child *); 9122514Sdarrenr 9222514Sdarrenrstatic sema_type worker_mmutex; 9322514Sdarrenrstatic sem_ref worker_memlock; 94145510Sdarrenr 95145510Sdarrenr/* -------------------------------------------------------------------- 9622514Sdarrenr * locking the global worker state table (and other global stuff) 9722514Sdarrenr */ 9822514Sdarrenrvoid 9922514Sdarrenrworker_global_lock( 10034739Speter int inOrOut) 10122514Sdarrenr{ 10222514Sdarrenr if (worker_memlock) { 10372003Sdarrenr if (inOrOut) 10422514Sdarrenr wait_for_sem(worker_memlock, NULL); 105145510Sdarrenr else 10672003Sdarrenr tickle_sem(worker_memlock); 10772003Sdarrenr } 10872003Sdarrenr} 10972003Sdarrenr 11072003Sdarrenr/* -------------------------------------------------------------------- 11172003Sdarrenr * implementation isolation wrapper 11222514Sdarrenr */ 11322514Sdarrenrvoid 11431183Speterexit_worker( 11522514Sdarrenr int exitcode 116145510Sdarrenr ) 11731183Speter{ 118145510Sdarrenr thread_exit(exitcode); /* see #define thread_exit */ 11931183Speter} 12022514Sdarrenr 12122514Sdarrenr/* -------------------------------------------------------------------- 12222514Sdarrenr * sleep for a given time or until the wakup semaphore is tickled. 12322514Sdarrenr */ 12431183Speterint 12531183Speterworker_sleep( 12622514Sdarrenr blocking_child * c, 12722514Sdarrenr time_t seconds 12822514Sdarrenr ) 12934739Speter{ 13034739Speter struct timespec until; 13131183Speter int rc; 132145510Sdarrenr 133145510Sdarrenr# ifdef HAVE_CLOCK_GETTIME 13431183Speter if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 13531183Speter msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 13631183Speter return -1; 13731183Speter } 13831183Speter# else 13922514Sdarrenr if (0 != getclock(TIMEOFDAY, &until)) { 140145510Sdarrenr msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 141145510Sdarrenr return -1; 142145510Sdarrenr } 143145510Sdarrenr# endif 144145510Sdarrenr until.tv_sec += seconds; 145145510Sdarrenr rc = wait_for_sem(c->wake_scheduled_sleep, &until); 146145510Sdarrenr if (0 == rc) 14734739Speter return -1; 14822514Sdarrenr if (-1 == rc && ETIMEDOUT == errno) 14922514Sdarrenr return 0; 150145510Sdarrenr msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 15122514Sdarrenr return -1; 15234739Speter} 15334739Speter 15422514Sdarrenr 15522514Sdarrenr/* -------------------------------------------------------------------- 15622514Sdarrenr * Wake up a worker that takes a nap. 15722514Sdarrenr */ 15822514Sdarrenrvoid 15922514Sdarrenrinterrupt_worker_sleep(void) 16022514Sdarrenr{ 16122514Sdarrenr u_int idx; 16222514Sdarrenr blocking_child * c; 16322514Sdarrenr 16422514Sdarrenr for (idx = 0; idx < blocking_children_alloc; idx++) { 16534739Speter c = blocking_children[idx]; 16622514Sdarrenr if (NULL == c || NULL == c->wake_scheduled_sleep) 167145510Sdarrenr continue; 16822514Sdarrenr tickle_sem(c->wake_scheduled_sleep); 16922514Sdarrenr } 17022514Sdarrenr} 17122514Sdarrenr 17222514Sdarrenr/* -------------------------------------------------------------------- 17322514Sdarrenr * Make sure there is an empty slot at the head of the request 174145510Sdarrenr * queue. Tell if the queue is currently empty. 17522514Sdarrenr */ 17622514Sdarrenrstatic int 17722514Sdarrenrensure_workitems_empty_slot( 17822514Sdarrenr blocking_child *c 17922514Sdarrenr ) 18022514Sdarrenr{ 18122514Sdarrenr /* 18222514Sdarrenr ** !!! PRECONDITION: caller holds access lock! 18322514Sdarrenr ** 18422514Sdarrenr ** This simply tries to increase the size of the buffer if it 18522514Sdarrenr ** becomes full. The resize operation does *not* maintain the 18622514Sdarrenr ** order of requests, but that should be irrelevant since the 18722514Sdarrenr ** processing is considered asynchronous anyway. 18822514Sdarrenr ** 18922514Sdarrenr ** Return if the buffer is currently empty. 19022514Sdarrenr */ 19122514Sdarrenr 19222514Sdarrenr static const size_t each = 19322514Sdarrenr sizeof(blocking_children[0]->workitems[0]); 19422514Sdarrenr 19522514Sdarrenr size_t new_alloc; 19622514Sdarrenr size_t slots_used; 19722514Sdarrenr size_t sidx; 19822514Sdarrenr 19922514Sdarrenr slots_used = c->head_workitem - c->tail_workitem; 20022514Sdarrenr if (slots_used >= c->workitems_alloc) { 20122514Sdarrenr new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 20222514Sdarrenr c->workitems = erealloc(c->workitems, new_alloc * each); 20322514Sdarrenr for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 20422514Sdarrenr c->workitems[sidx] = NULL; 20522514Sdarrenr c->tail_workitem = 0; 20622514Sdarrenr c->head_workitem = c->workitems_alloc; 20722514Sdarrenr c->workitems_alloc = new_alloc; 20822514Sdarrenr } 20922514Sdarrenr INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 21022514Sdarrenr return (0 == slots_used); 21122514Sdarrenr} 21222514Sdarrenr 21322514Sdarrenr/* -------------------------------------------------------------------- 21422514Sdarrenr * Make sure there is an empty slot at the head of the response 21522514Sdarrenr * queue. Tell if the queue is currently empty. 21622514Sdarrenr */ 21722514Sdarrenrstatic int 21822514Sdarrenrensure_workresp_empty_slot( 21922514Sdarrenr blocking_child *c 22022514Sdarrenr ) 22122514Sdarrenr{ 22222514Sdarrenr /* 22324583Sdarrenr ** !!! PRECONDITION: caller holds access lock! 22422514Sdarrenr ** 22522514Sdarrenr ** Works like the companion function above. 22622514Sdarrenr */ 22722514Sdarrenr 22822514Sdarrenr static const size_t each = 22922514Sdarrenr sizeof(blocking_children[0]->responses[0]); 23022514Sdarrenr 23122514Sdarrenr size_t new_alloc; 23222514Sdarrenr size_t slots_used; 23322514Sdarrenr size_t sidx; 23422514Sdarrenr 235145510Sdarrenr slots_used = c->head_response - c->tail_response; 23622514Sdarrenr if (slots_used >= c->responses_alloc) { 23722514Sdarrenr new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 23822514Sdarrenr c->responses = erealloc(c->responses, new_alloc * each); 23922514Sdarrenr for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 24022514Sdarrenr c->responses[sidx] = NULL; 24122514Sdarrenr c->tail_response = 0; 24222514Sdarrenr c->head_response = c->responses_alloc; 24322514Sdarrenr c->responses_alloc = new_alloc; 24422514Sdarrenr } 24522514Sdarrenr INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 24622514Sdarrenr return (0 == slots_used); 24722514Sdarrenr} 24822514Sdarrenr 24922514Sdarrenr 25022514Sdarrenr/* -------------------------------------------------------------------- 25122514Sdarrenr * queue_req_pointer() - append a work item or idle exit request to 25222514Sdarrenr * blocking_workitems[]. Employ proper locking. 25322514Sdarrenr */ 25422514Sdarrenrstatic int 255145510Sdarrenrqueue_req_pointer( 25634739Speter blocking_child * c, 25734739Speter blocking_pipe_header * hdr 258145510Sdarrenr ) 25922514Sdarrenr{ 26034739Speter size_t qhead; 261145510Sdarrenr 26234739Speter /* >>>> ACCESS LOCKING STARTS >>>> */ 263145510Sdarrenr wait_for_sem(c->accesslock, NULL); 264145510Sdarrenr ensure_workitems_empty_slot(c); 265145510Sdarrenr qhead = c->head_workitem; 26634739Speter c->workitems[qhead % c->workitems_alloc] = hdr; 26734739Speter c->head_workitem = 1 + qhead; 268145510Sdarrenr tickle_sem(c->accesslock); 26922514Sdarrenr /* <<<< ACCESS LOCKING ENDS <<<< */ 270145510Sdarrenr 271145510Sdarrenr /* queue consumer wake-up notification */ 272145510Sdarrenr tickle_sem(c->workitems_pending); 273145510Sdarrenr 27422514Sdarrenr return 0; 275145510Sdarrenr} 276145510Sdarrenr 27734739Speter/* -------------------------------------------------------------------- 27822514Sdarrenr * API function to make sure a worker is running, a proper private copy 27934739Speter * of the data is made, the data eneterd into the queue and the worker 28034739Speter * is signalled. 281145510Sdarrenr */ 28234739Speterint 28322514Sdarrenrsend_blocking_req_internal( 28434739Speter blocking_child * c, 28534739Speter blocking_pipe_header * hdr, 28622514Sdarrenr void * data 28722514Sdarrenr ) 288145510Sdarrenr{ 289145510Sdarrenr blocking_pipe_header * threadcopy; 29034739Speter size_t payload_octets; 291145510Sdarrenr 292145510Sdarrenr REQUIRE(hdr != NULL); 29322514Sdarrenr REQUIRE(data != NULL); 294145510Sdarrenr DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 29522514Sdarrenr 29622514Sdarrenr if (hdr->octets <= sizeof(*hdr)) 29722514Sdarrenr return 1; /* failure */ 29822514Sdarrenr payload_octets = hdr->octets - sizeof(*hdr); 29922514Sdarrenr 30022514Sdarrenr if (NULL == c->thread_ref) 30122514Sdarrenr start_blocking_thread(c); 30222514Sdarrenr threadcopy = emalloc(hdr->octets); 30322514Sdarrenr memcpy(threadcopy, hdr, sizeof(*hdr)); 30422514Sdarrenr memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 30522514Sdarrenr 30622514Sdarrenr return queue_req_pointer(c, threadcopy); 30722514Sdarrenr} 30822514Sdarrenr 30922514Sdarrenr/* -------------------------------------------------------------------- 31022514Sdarrenr * Wait for the 'incoming queue no longer empty' signal, lock the shared 31122514Sdarrenr * structure and dequeue an item. 31222514Sdarrenr */ 31322514Sdarrenrblocking_pipe_header * 31422514Sdarrenrreceive_blocking_req_internal( 31522514Sdarrenr blocking_child * c 31622514Sdarrenr ) 317145510Sdarrenr{ 31822514Sdarrenr blocking_pipe_header * req; 31922514Sdarrenr size_t qhead, qtail; 32022514Sdarrenr 321145510Sdarrenr req = NULL; 32222514Sdarrenr do { 32324583Sdarrenr /* wait for tickle from the producer side */ 32422514Sdarrenr wait_for_sem(c->workitems_pending, NULL); 32522514Sdarrenr 326145510Sdarrenr /* >>>> ACCESS LOCKING STARTS >>>> */ 32722514Sdarrenr wait_for_sem(c->accesslock, NULL); 32822514Sdarrenr qhead = c->head_workitem; 32922514Sdarrenr do { 33022514Sdarrenr qtail = c->tail_workitem; 33122514Sdarrenr if (qhead == qtail) 33222514Sdarrenr break; 33322514Sdarrenr c->tail_workitem = qtail + 1; 33422514Sdarrenr qtail %= c->workitems_alloc; 33522514Sdarrenr req = c->workitems[qtail]; 33622514Sdarrenr c->workitems[qtail] = NULL; 33722514Sdarrenr } while (NULL == req); 33822514Sdarrenr tickle_sem(c->accesslock); 33922514Sdarrenr /* <<<< ACCESS LOCKING ENDS <<<< */ 34022514Sdarrenr 341145510Sdarrenr } while (NULL == req); 34222514Sdarrenr 34322514Sdarrenr INSIST(NULL != req); 34424583Sdarrenr if (CHILD_EXIT_REQ == req) { /* idled out */ 34522514Sdarrenr send_blocking_resp_internal(c, CHILD_GONE_RESP); 34622514Sdarrenr req = NULL; 34722514Sdarrenr } 34822514Sdarrenr 34922514Sdarrenr return req; 35022514Sdarrenr} 35122514Sdarrenr 35222514Sdarrenr/* -------------------------------------------------------------------- 35322514Sdarrenr * Push a response into the return queue and eventually tickle the 35422514Sdarrenr * receiver. 35522514Sdarrenr */ 35622514Sdarrenrint 35722514Sdarrenrsend_blocking_resp_internal( 35822514Sdarrenr blocking_child * c, 35922514Sdarrenr blocking_pipe_header * resp 36022514Sdarrenr ) 36122514Sdarrenr{ 36222514Sdarrenr size_t qhead; 36322514Sdarrenr int empty; 36422514Sdarrenr 36522514Sdarrenr /* >>>> ACCESS LOCKING STARTS >>>> */ 36622514Sdarrenr wait_for_sem(c->accesslock, NULL); 367 empty = ensure_workresp_empty_slot(c); 368 qhead = c->head_response; 369 c->responses[qhead % c->responses_alloc] = resp; 370 c->head_response = 1 + qhead; 371 tickle_sem(c->accesslock); 372 /* <<<< ACCESS LOCKING ENDS <<<< */ 373 374 /* queue consumer wake-up notification */ 375 if (empty) 376 { 377# ifdef WORK_PIPE 378 if (1 != write(c->resp_write_pipe, "", 1)) 379 msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo" 380 " failed to notify main thread!", 381 (BLOCKING_GETNAMEINFO == resp->rtype) 382 ? "name" 383 : "addr" 384 ); 385# else 386 tickle_sem(c->responses_pending); 387# endif 388 } 389 return 0; 390} 391 392 393#ifndef WORK_PIPE 394 395/* -------------------------------------------------------------------- 396 * Check if a (Windows-)handle to a semaphore is actually the same we 397 * are using inside the sema wrapper. 398 */ 399static BOOL 400same_os_sema( 401 const sem_ref obj, 402 void* osh 403 ) 404{ 405 return obj && osh && (obj->shnd == (HANDLE)osh); 406} 407 408/* -------------------------------------------------------------------- 409 * Find the shared context that associates to an OS handle and make sure 410 * the data is dequeued and processed. 411 */ 412void 413handle_blocking_resp_sem( 414 void * context 415 ) 416{ 417 blocking_child * c; 418 u_int idx; 419 420 c = NULL; 421 for (idx = 0; idx < blocking_children_alloc; idx++) { 422 c = blocking_children[idx]; 423 if (c != NULL && 424 c->thread_ref != NULL && 425 same_os_sema(c->responses_pending, context)) 426 break; 427 } 428 if (idx < blocking_children_alloc) 429 process_blocking_resp(c); 430} 431#endif /* !WORK_PIPE */ 432 433/* -------------------------------------------------------------------- 434 * Fetch the next response from the return queue. In case of signalling 435 * via pipe, make sure the pipe is flushed, too. 436 */ 437blocking_pipe_header * 438receive_blocking_resp_internal( 439 blocking_child * c 440 ) 441{ 442 blocking_pipe_header * removed; 443 size_t qhead, qtail, slot; 444 445#ifdef WORK_PIPE 446 int rc; 447 char scratch[32]; 448 449 do 450 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 451 while (-1 == rc && EINTR == errno); 452#endif 453 454 /* >>>> ACCESS LOCKING STARTS >>>> */ 455 wait_for_sem(c->accesslock, NULL); 456 qhead = c->head_response; 457 qtail = c->tail_response; 458 for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 459 slot = qtail % c->responses_alloc; 460 removed = c->responses[slot]; 461 c->responses[slot] = NULL; 462 } 463 c->tail_response = qtail; 464 tickle_sem(c->accesslock); 465 /* <<<< ACCESS LOCKING ENDS <<<< */ 466 467 if (NULL != removed) { 468 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 469 BLOCKING_RESP_MAGIC == removed->magic_sig); 470 } 471 if (CHILD_GONE_RESP == removed) { 472 cleanup_after_child(c); 473 removed = NULL; 474 } 475 476 return removed; 477} 478 479/* -------------------------------------------------------------------- 480 * Light up a new worker. 481 */ 482static void 483start_blocking_thread( 484 blocking_child * c 485 ) 486{ 487 488 DEBUG_INSIST(!c->reusable); 489 490 prepare_child_sems(c); 491 start_blocking_thread_internal(c); 492} 493 494/* -------------------------------------------------------------------- 495 * Create a worker thread. There are several differences between POSIX 496 * and Windows, of course -- most notably the Windows thread is a 497 * detached thread, and we keep the handle around until we want to get 498 * rid of the thread. The notification scheme also differs: Windows 499 * makes use of semaphores in both directions, POSIX uses a pipe for 500 * integration with 'select()' or alike. 501 */ 502static void 503start_blocking_thread_internal( 504 blocking_child * c 505 ) 506#ifdef SYS_WINNT 507{ 508 BOOL resumed; 509 510 c->thread_ref = NULL; 511 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 512 c->thr_table[0].thnd = 513 (HANDLE)_beginthreadex( 514 NULL, 515 0, 516 &blocking_thread, 517 c, 518 CREATE_SUSPENDED, 519 NULL); 520 521 if (NULL == c->thr_table[0].thnd) { 522 msyslog(LOG_ERR, "start blocking thread failed: %m"); 523 exit(-1); 524 } 525 /* remember the thread priority is only within the process class */ 526 if (!SetThreadPriority(c->thr_table[0].thnd, 527 THREAD_PRIORITY_BELOW_NORMAL)) { 528 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 529 } 530 if (NULL != pSetThreadDescription) { 531 (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker"); 532 } 533 resumed = ResumeThread(c->thr_table[0].thnd); 534 DEBUG_INSIST(resumed); 535 c->thread_ref = &c->thr_table[0]; 536} 537#else /* pthreads start_blocking_thread_internal() follows */ 538{ 539# ifdef NEED_PTHREAD_INIT 540 static int pthread_init_called; 541# endif 542 pthread_attr_t thr_attr; 543 int rc; 544 int pipe_ends[2]; /* read then write */ 545 int is_pipe; 546 int flags; 547 size_t ostacksize; 548 size_t nstacksize; 549 sigset_t saved_sig_mask; 550 551 c->thread_ref = NULL; 552 553# ifdef NEED_PTHREAD_INIT 554 /* 555 * from lib/isc/unix/app.c: 556 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 557 */ 558 if (!pthread_init_called) { 559 pthread_init(); 560 pthread_init_called = TRUE; 561 } 562# endif 563 564 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 565 if (0 != rc) { 566 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 567 exit(1); 568 } 569 c->resp_read_pipe = move_fd(pipe_ends[0]); 570 c->resp_write_pipe = move_fd(pipe_ends[1]); 571 c->ispipe = is_pipe; 572 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 573 if (-1 == flags) { 574 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 575 exit(1); 576 } 577 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 578 if (-1 == rc) { 579 msyslog(LOG_ERR, 580 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 581 exit(1); 582 } 583 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 584 pthread_attr_init(&thr_attr); 585 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 586#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 587 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 588 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 589 if (0 != rc) { 590 msyslog(LOG_ERR, 591 "start_blocking_thread: pthread_attr_getstacksize() -> %s", 592 strerror(rc)); 593 } else { 594 nstacksize = ostacksize; 595 /* order is important here: first clamp on upper limit, 596 * and the PTHREAD min stack size is ultimate override! 597 */ 598 if (nstacksize > THREAD_MAXSTACKSIZE) 599 nstacksize = THREAD_MAXSTACKSIZE; 600# ifdef PTHREAD_STACK_MAX 601 if (nstacksize > PTHREAD_STACK_MAX) 602 nstacksize = PTHREAD_STACK_MAX; 603# endif 604 605 /* now clamp on lower stack limit. */ 606 if (nstacksize < THREAD_MINSTACKSIZE) 607 nstacksize = THREAD_MINSTACKSIZE; 608# ifdef PTHREAD_STACK_MIN 609 if (nstacksize < PTHREAD_STACK_MIN) 610 nstacksize = PTHREAD_STACK_MIN; 611# endif 612 613 if (nstacksize != ostacksize) 614 rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 615 if (0 != rc) 616 msyslog(LOG_ERR, 617 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 618 (u_long)ostacksize, (u_long)nstacksize, 619 strerror(rc)); 620 } 621#else 622 UNUSED_ARG(nstacksize); 623 UNUSED_ARG(ostacksize); 624#endif 625#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 626 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 627#endif 628 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 629 block_thread_signals(&saved_sig_mask); 630 rc = pthread_create(&c->thr_table[0], &thr_attr, 631 &blocking_thread, c); 632 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 633 pthread_attr_destroy(&thr_attr); 634 if (0 != rc) { 635 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 636 strerror(rc)); 637 exit(1); 638 } 639 c->thread_ref = &c->thr_table[0]; 640} 641#endif 642 643/* -------------------------------------------------------------------- 644 * block_thread_signals() 645 * 646 * Temporarily block signals used by ntpd main thread, so that signal 647 * mask inherited by child threads leaves them blocked. Returns prior 648 * active signal mask via pmask, to be restored by the main thread 649 * after pthread_create(). 650 */ 651#ifndef SYS_WINNT 652void 653block_thread_signals( 654 sigset_t * pmask 655 ) 656{ 657 sigset_t block; 658 659 sigemptyset(&block); 660# ifdef HAVE_SIGNALED_IO 661# ifdef SIGIO 662 sigaddset(&block, SIGIO); 663# endif 664# ifdef SIGPOLL 665 sigaddset(&block, SIGPOLL); 666# endif 667# endif /* HAVE_SIGNALED_IO */ 668 sigaddset(&block, SIGALRM); 669 sigaddset(&block, MOREDEBUGSIG); 670 sigaddset(&block, LESSDEBUGSIG); 671# ifdef SIGDIE1 672 sigaddset(&block, SIGDIE1); 673# endif 674# ifdef SIGDIE2 675 sigaddset(&block, SIGDIE2); 676# endif 677# ifdef SIGDIE3 678 sigaddset(&block, SIGDIE3); 679# endif 680# ifdef SIGDIE4 681 sigaddset(&block, SIGDIE4); 682# endif 683# ifdef SIGBUS 684 sigaddset(&block, SIGBUS); 685# endif 686 sigemptyset(pmask); 687 pthread_sigmask(SIG_BLOCK, &block, pmask); 688} 689#endif /* !SYS_WINNT */ 690 691 692/* -------------------------------------------------------------------- 693 * Create & destroy semaphores. This is sufficiently different between 694 * POSIX and Windows to warrant wrapper functions and close enough to 695 * use the concept of synchronization via semaphore for all platforms. 696 */ 697static sem_ref 698create_sema( 699 sema_type* semptr, 700 u_int inival, 701 u_int maxval) 702{ 703#ifdef SYS_WINNT 704 705 long svini, svmax; 706 if (NULL != semptr) { 707 svini = (inival < LONG_MAX) 708 ? (long)inival : LONG_MAX; 709 svmax = (maxval < LONG_MAX && maxval > 0) 710 ? (long)maxval : LONG_MAX; 711 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 712 if (NULL == semptr->shnd) 713 semptr = NULL; 714 } 715 716#else 717 718 (void)maxval; 719 if (semptr && sem_init(semptr, FALSE, inival)) 720 semptr = NULL; 721 722#endif 723 724 return semptr; 725} 726 727/* ------------------------------------------------------------------ */ 728static sem_ref 729delete_sema( 730 sem_ref obj) 731{ 732 733# ifdef SYS_WINNT 734 735 if (obj) { 736 if (obj->shnd) 737 CloseHandle(obj->shnd); 738 obj->shnd = NULL; 739 } 740 741# else 742 743 if (obj) 744 sem_destroy(obj); 745 746# endif 747 748 return NULL; 749} 750 751/* -------------------------------------------------------------------- 752 * prepare_child_sems() 753 * 754 * create sync & access semaphores 755 * 756 * All semaphores are cleared, only the access semaphore has 1 unit. 757 * Childs wait on 'workitems_pending', then grabs 'sema_access' 758 * and dequeues jobs. When done, 'sema_access' is given one unit back. 759 * 760 * The producer grabs 'sema_access', manages the queue, restores 761 * 'sema_access' and puts one unit into 'workitems_pending'. 762 * 763 * The story goes the same for the response queue. 764 */ 765static void 766prepare_child_sems( 767 blocking_child *c 768 ) 769{ 770 if (NULL == worker_memlock) 771 worker_memlock = create_sema(&worker_mmutex, 1, 1); 772 773 c->accesslock = create_sema(&c->sem_table[0], 1, 1); 774 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 775 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 776# ifndef WORK_PIPE 777 c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 778# endif 779} 780 781/* -------------------------------------------------------------------- 782 * wait for semaphore. Where the wait can be interrupted, it will 783 * internally resume -- When this function returns, there is either no 784 * semaphore at all, a timeout occurred, or the caller could 785 * successfully take a token from the semaphore. 786 * 787 * For untimed wait, not checking the result of this function at all is 788 * definitely an option. 789 */ 790static int 791wait_for_sem( 792 sem_ref sem, 793 struct timespec * timeout /* wall-clock */ 794 ) 795#ifdef SYS_WINNT 796{ 797 struct timespec now; 798 struct timespec delta; 799 DWORD msec; 800 DWORD rc; 801 802 if (!(sem && sem->shnd)) { 803 errno = EINVAL; 804 return -1; 805 } 806 807 if (NULL == timeout) { 808 msec = INFINITE; 809 } else { 810 getclock(TIMEOFDAY, &now); 811 delta = sub_tspec(*timeout, now); 812 if (delta.tv_sec < 0) { 813 msec = 0; 814 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 815 msec = INFINITE; 816 } else { 817 msec = 1000 * (DWORD)delta.tv_sec; 818 msec += delta.tv_nsec / (1000 * 1000); 819 } 820 } 821 rc = WaitForSingleObject(sem->shnd, msec); 822 if (WAIT_OBJECT_0 == rc) 823 return 0; 824 if (WAIT_TIMEOUT == rc) { 825 errno = ETIMEDOUT; 826 return -1; 827 } 828 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 829 errno = EFAULT; 830 return -1; 831} 832#else /* pthreads wait_for_sem() follows */ 833{ 834 int rc = -1; 835 836 if (sem) do { 837 if (NULL == timeout) 838 rc = sem_wait(sem); 839 else 840 rc = sem_timedwait(sem, timeout); 841 } while (rc == -1 && errno == EINTR); 842 else 843 errno = EINVAL; 844 845 return rc; 846} 847#endif 848 849/* -------------------------------------------------------------------- 850 * blocking_thread - thread functions have WINAPI (aka 'stdcall') 851 * calling conventions under Windows and POSIX-defined signature 852 * otherwise. 853 */ 854#ifdef SYS_WINNT 855u_int WINAPI 856#else 857void * 858#endif 859blocking_thread( 860 void * ThreadArg 861 ) 862{ 863 blocking_child *c; 864 865 c = ThreadArg; 866 exit_worker(blocking_child_common(c)); 867 868 /* NOTREACHED */ 869 return 0; 870} 871 872/* -------------------------------------------------------------------- 873 * req_child_exit() runs in the parent. 874 * 875 * This function is called from from the idle timer, too, and possibly 876 * without a thread being there any longer. Since we have folded up our 877 * tent in that case and all the semaphores are already gone, we simply 878 * ignore this request in this case. 879 * 880 * Since the existence of the semaphores is controlled exclusively by 881 * the parent, there's no risk of data race here. 882 */ 883int 884req_child_exit( 885 blocking_child *c 886 ) 887{ 888 return (c->accesslock) 889 ? queue_req_pointer(c, CHILD_EXIT_REQ) 890 : 0; 891} 892 893/* -------------------------------------------------------------------- 894 * cleanup_after_child() runs in parent. 895 */ 896static void 897cleanup_after_child( 898 blocking_child * c 899 ) 900{ 901 DEBUG_INSIST(!c->reusable); 902 903# ifdef SYS_WINNT 904 /* The thread was not created in detached state, so we better 905 * clean up. 906 */ 907 if (c->thread_ref && c->thread_ref->thnd) { 908 WaitForSingleObject(c->thread_ref->thnd, INFINITE); 909 INSIST(CloseHandle(c->thread_ref->thnd)); 910 c->thread_ref->thnd = NULL; 911 } 912# endif 913 c->thread_ref = NULL; 914 915 /* remove semaphores and (if signalling vi IO) pipes */ 916 917 c->accesslock = delete_sema(c->accesslock); 918 c->workitems_pending = delete_sema(c->workitems_pending); 919 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 920 921# ifdef WORK_PIPE 922 DEBUG_INSIST(-1 != c->resp_read_pipe); 923 DEBUG_INSIST(-1 != c->resp_write_pipe); 924 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 925 close(c->resp_write_pipe); 926 close(c->resp_read_pipe); 927 c->resp_write_pipe = -1; 928 c->resp_read_pipe = -1; 929# else 930 DEBUG_INSIST(NULL != c->responses_pending); 931 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 932 c->responses_pending = delete_sema(c->responses_pending); 933# endif 934 935 /* Is it necessary to check if there are pending requests and 936 * responses? If so, and if there are, what to do with them? 937 */ 938 939 /* re-init buffer index sequencers */ 940 c->head_workitem = 0; 941 c->tail_workitem = 0; 942 c->head_response = 0; 943 c->tail_response = 0; 944 945 c->reusable = TRUE; 946} 947 948 949#else /* !WORK_THREAD follows */ 950char work_thread_nonempty_compilation_unit; 951#endif 952