work_thread.c revision 1.3
1/* $NetBSD: work_thread.c,v 1.3 2015/07/10 14:20:32 christos Exp $ */ 2 3/* 4 * work_thread.c - threads implementation for blocking worker child. 5 */ 6#include <config.h> 7#include "ntp_workimpl.h" 8 9#ifdef WORK_THREAD 10 11#include <stdio.h> 12#include <ctype.h> 13#include <signal.h> 14#ifndef SYS_WINNT 15#include <pthread.h> 16#endif 17 18#include "ntp_stdlib.h" 19#include "ntp_malloc.h" 20#include "ntp_syslog.h" 21#include "ntpd.h" 22#include "ntp_io.h" 23#include "ntp_assert.h" 24#include "ntp_unixtime.h" 25#include "timespecops.h" 26#include "ntp_worker.h" 27 28#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 29#define CHILD_GONE_RESP CHILD_EXIT_REQ 30#define WORKITEMS_ALLOC_INC 16 31#define RESPONSES_ALLOC_INC 4 32 33#ifndef THREAD_MINSTACKSIZE 34#define THREAD_MINSTACKSIZE (64U * 1024) 35#endif 36 37#ifndef DEVOLATILE 38#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var)) 39#endif 40 41#ifdef SYS_WINNT 42# define thread_exit(c) _endthreadex(c) 43# define tickle_sem SetEvent 44#else 45# define thread_exit(c) pthread_exit((void*)(size_t)(c)) 46# define tickle_sem sem_post 47#endif 48 49#ifdef WORK_PIPE 50addremove_io_fd_func addremove_io_fd; 51#else 52addremove_io_semaphore_func addremove_io_semaphore; 53#endif 54 55static void start_blocking_thread(blocking_child *); 56static void start_blocking_thread_internal(blocking_child *); 57static void prepare_child_sems(blocking_child *); 58static int wait_for_sem(sem_ref, struct timespec *); 59static void ensure_workitems_empty_slot(blocking_child *); 60static void ensure_workresp_empty_slot(blocking_child *); 61static int queue_req_pointer(blocking_child *, blocking_pipe_header *); 62static void cleanup_after_child(blocking_child *); 63#ifdef SYS_WINNT 64u_int WINAPI blocking_thread(void *); 65#else 66void * blocking_thread(void *); 67#endif 68#ifndef SYS_WINNT 69static void block_thread_signals(sigset_t *); 70#endif 71 72 73void 74exit_worker( 75 int exitcode 76 ) 77{ 78 thread_exit(exitcode); /* see #define thread_exit */ 79} 80 81 82int 83worker_sleep( 84 blocking_child * c, 85 time_t seconds 86 ) 87{ 88 struct timespec until; 89 int rc; 90 91# ifdef HAVE_CLOCK_GETTIME 92 if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 93 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 94 return -1; 95 } 96# else 97 if (0 != getclock(TIMEOFDAY, &until)) { 98 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 99 return -1; 100 } 101# endif 102 until.tv_sec += seconds; 103 do { 104 rc = wait_for_sem(c->wake_scheduled_sleep, &until); 105 } while (-1 == rc && EINTR == errno); 106 if (0 == rc) 107 return -1; 108 if (-1 == rc && ETIMEDOUT == errno) 109 return 0; 110 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 111 return -1; 112} 113 114 115void 116interrupt_worker_sleep(void) 117{ 118 u_int idx; 119 blocking_child * c; 120 121 for (idx = 0; idx < blocking_children_alloc; idx++) { 122 c = blocking_children[idx]; 123 if (NULL == c || NULL == c->wake_scheduled_sleep) 124 continue; 125 tickle_sem(c->wake_scheduled_sleep); 126 } 127} 128 129 130static void 131ensure_workitems_empty_slot( 132 blocking_child *c 133 ) 134{ 135 const size_t each = sizeof(blocking_children[0]->workitems[0]); 136 size_t new_alloc; 137 size_t old_octets; 138 size_t new_octets; 139 void * nonvol_workitems; 140 141 142 if (c->workitems != NULL && 143 NULL == c->workitems[c->next_workitem]) 144 return; 145 146 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 147 old_octets = c->workitems_alloc * each; 148 new_octets = new_alloc * each; 149 nonvol_workitems = DEVOLATILE(void *, c->workitems); 150 c->workitems = erealloc_zero(nonvol_workitems, new_octets, 151 old_octets); 152 if (0 == c->next_workitem) 153 c->next_workitem = c->workitems_alloc; 154 c->workitems_alloc = new_alloc; 155} 156 157 158static void 159ensure_workresp_empty_slot( 160 blocking_child *c 161 ) 162{ 163 const size_t each = sizeof(blocking_children[0]->responses[0]); 164 size_t new_alloc; 165 size_t old_octets; 166 size_t new_octets; 167 void * nonvol_responses; 168 169 if (c->responses != NULL && 170 NULL == c->responses[c->next_response]) 171 return; 172 173 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 174 old_octets = c->responses_alloc * each; 175 new_octets = new_alloc * each; 176 nonvol_responses = DEVOLATILE(void *, c->responses); 177 c->responses = erealloc_zero(nonvol_responses, new_octets, 178 old_octets); 179 if (0 == c->next_response) 180 c->next_response = c->responses_alloc; 181 c->responses_alloc = new_alloc; 182} 183 184 185/* 186 * queue_req_pointer() - append a work item or idle exit request to 187 * blocking_workitems[]. 188 */ 189static int 190queue_req_pointer( 191 blocking_child * c, 192 blocking_pipe_header * hdr 193 ) 194{ 195 c->workitems[c->next_workitem] = hdr; 196 c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc; 197 198 /* 199 * We only want to signal the wakeup event if the child is 200 * blocking on it, which is indicated by setting the blocking 201 * event. Wait with zero timeout to test. 202 */ 203 /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */ 204 tickle_sem(c->blocking_req_ready); 205 206 return 0; 207} 208 209 210int 211send_blocking_req_internal( 212 blocking_child * c, 213 blocking_pipe_header * hdr, 214 void * data 215 ) 216{ 217 blocking_pipe_header * threadcopy; 218 size_t payload_octets; 219 220 REQUIRE(hdr != NULL); 221 REQUIRE(data != NULL); 222 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 223 224 if (hdr->octets <= sizeof(*hdr)) 225 return 1; /* failure */ 226 payload_octets = hdr->octets - sizeof(*hdr); 227 228 ensure_workitems_empty_slot(c); 229 if (NULL == c->thread_ref) { 230 ensure_workresp_empty_slot(c); 231 start_blocking_thread(c); 232 } 233 234 threadcopy = emalloc(hdr->octets); 235 memcpy(threadcopy, hdr, sizeof(*hdr)); 236 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 237 238 return queue_req_pointer(c, threadcopy); 239} 240 241 242blocking_pipe_header * 243receive_blocking_req_internal( 244 blocking_child * c 245 ) 246{ 247 blocking_pipe_header * req; 248 int rc; 249 250 /* 251 * Child blocks here when idle. SysV semaphores maintain a 252 * count and release from sem_wait() only when it reaches 0. 253 * Windows auto-reset events are simpler, and multiple SetEvent 254 * calls before any thread waits result in a single wakeup. 255 * On Windows, the child drains all workitems each wakeup, while 256 * with SysV semaphores wait_sem() is used before each item. 257 */ 258#ifdef SYS_WINNT 259 while (NULL == c->workitems[c->next_workeritem]) { 260 /* !!!! SetEvent(c->child_is_blocking); */ 261 rc = wait_for_sem(c->blocking_req_ready, NULL); 262 INSIST(0 == rc); 263 /* !!!! ResetEvent(c->child_is_blocking); */ 264 } 265#else 266 do { 267 rc = wait_for_sem(c->blocking_req_ready, NULL); 268 } while (-1 == rc && EINTR == errno); 269 INSIST(0 == rc); 270#endif 271 272 req = c->workitems[c->next_workeritem]; 273 INSIST(NULL != req); 274 c->workitems[c->next_workeritem] = NULL; 275 c->next_workeritem = (1 + c->next_workeritem) % 276 c->workitems_alloc; 277 278 if (CHILD_EXIT_REQ == req) { /* idled out */ 279 send_blocking_resp_internal(c, CHILD_GONE_RESP); 280 req = NULL; 281 } 282 283 return req; 284} 285 286 287int 288send_blocking_resp_internal( 289 blocking_child * c, 290 blocking_pipe_header * resp 291 ) 292{ 293 ensure_workresp_empty_slot(c); 294 295 c->responses[c->next_response] = resp; 296 c->next_response = (1 + c->next_response) % c->responses_alloc; 297 298#ifdef WORK_PIPE 299 write(c->resp_write_pipe, "", 1); 300#else 301 tickle_sem(c->blocking_response_ready); 302#endif 303 304 return 0; 305} 306 307 308#ifndef WORK_PIPE 309void 310handle_blocking_resp_sem( 311 void * context 312 ) 313{ 314 HANDLE ready; 315 blocking_child * c; 316 u_int idx; 317 318 ready = (HANDLE)context; 319 c = NULL; 320 for (idx = 0; idx < blocking_children_alloc; idx++) { 321 c = blocking_children[idx]; 322 if (c != NULL && c->thread_ref != NULL && 323 ready == c->blocking_response_ready) 324 break; 325 } 326 if (idx < blocking_children_alloc) 327 process_blocking_resp(c); 328} 329#endif /* !WORK_PIPE */ 330 331 332blocking_pipe_header * 333receive_blocking_resp_internal( 334 blocking_child * c 335 ) 336{ 337 blocking_pipe_header * removed; 338#ifdef WORK_PIPE 339 int rc; 340 char scratch[32]; 341 342 do { 343 rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 344 } while (-1 == rc && EINTR == errno); 345#endif 346 removed = c->responses[c->next_workresp]; 347 if (NULL != removed) { 348 c->responses[c->next_workresp] = NULL; 349 c->next_workresp = (1 + c->next_workresp) % 350 c->responses_alloc; 351 DEBUG_ENSURE(CHILD_GONE_RESP == removed || 352 BLOCKING_RESP_MAGIC == removed->magic_sig); 353 } 354 if (CHILD_GONE_RESP == removed) { 355 cleanup_after_child(c); 356 removed = NULL; 357 } 358 359 return removed; 360} 361 362 363static void 364start_blocking_thread( 365 blocking_child * c 366 ) 367{ 368 369 DEBUG_INSIST(!c->reusable); 370 371 prepare_child_sems(c); 372 start_blocking_thread_internal(c); 373} 374 375 376static void 377start_blocking_thread_internal( 378 blocking_child * c 379 ) 380#ifdef SYS_WINNT 381{ 382 thr_ref blocking_child_thread; 383 u_int blocking_thread_id; 384 BOOL resumed; 385 386 (*addremove_io_semaphore)(c->blocking_response_ready, FALSE); 387 blocking_child_thread = 388 (HANDLE)_beginthreadex( 389 NULL, 390 0, 391 &blocking_thread, 392 c, 393 CREATE_SUSPENDED, 394 &blocking_thread_id); 395 396 if (NULL == blocking_child_thread) { 397 msyslog(LOG_ERR, "start blocking thread failed: %m"); 398 exit(-1); 399 } 400 c->thread_id = blocking_thread_id; 401 c->thread_ref = blocking_child_thread; 402 /* remember the thread priority is only within the process class */ 403 if (!SetThreadPriority(blocking_child_thread, 404 THREAD_PRIORITY_BELOW_NORMAL)) 405 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 406 407 resumed = ResumeThread(blocking_child_thread); 408 DEBUG_INSIST(resumed); 409} 410#else /* pthreads start_blocking_thread_internal() follows */ 411{ 412# ifdef NEED_PTHREAD_INIT 413 static int pthread_init_called; 414# endif 415 pthread_attr_t thr_attr; 416 int rc; 417 int saved_errno; 418 int pipe_ends[2]; /* read then write */ 419 int is_pipe; 420 int flags; 421 size_t stacksize; 422 sigset_t saved_sig_mask; 423 424# ifdef NEED_PTHREAD_INIT 425 /* 426 * from lib/isc/unix/app.c: 427 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 428 */ 429 if (!pthread_init_called) { 430 pthread_init(); 431 pthread_init_called = TRUE; 432 } 433# endif 434 435 rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 436 if (0 != rc) { 437 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 438 exit(1); 439 } 440 c->resp_read_pipe = move_fd(pipe_ends[0]); 441 c->resp_write_pipe = move_fd(pipe_ends[1]); 442 c->ispipe = is_pipe; 443 flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 444 if (-1 == flags) { 445 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 446 exit(1); 447 } 448 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 449 if (-1 == rc) { 450 msyslog(LOG_ERR, 451 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 452 exit(1); 453 } 454 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 455 pthread_attr_init(&thr_attr); 456 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 457#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 458 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 459 rc = pthread_attr_getstacksize(&thr_attr, &stacksize); 460 if (-1 == rc) { 461 msyslog(LOG_ERR, 462 "start_blocking_thread: pthread_attr_getstacksize %m"); 463 } else if (stacksize < THREAD_MINSTACKSIZE) { 464 rc = pthread_attr_setstacksize(&thr_attr, 465 THREAD_MINSTACKSIZE); 466 if (-1 == rc) 467 msyslog(LOG_ERR, 468 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m", 469 (u_long)stacksize, 470 (u_long)THREAD_MINSTACKSIZE); 471 } 472#else 473 UNUSED_ARG(stacksize); 474#endif 475#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 476 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 477#endif 478 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 479 block_thread_signals(&saved_sig_mask); 480 rc = pthread_create(c->thread_ref, &thr_attr, 481 &blocking_thread, c); 482 saved_errno = errno; 483 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 484 pthread_attr_destroy(&thr_attr); 485 if (0 != rc) { 486 errno = saved_errno; 487 msyslog(LOG_ERR, "pthread_create() blocking child: %m"); 488 exit(1); 489 } 490} 491#endif 492 493 494/* 495 * block_thread_signals() 496 * 497 * Temporarily block signals used by ntpd main thread, so that signal 498 * mask inherited by child threads leaves them blocked. Returns prior 499 * active signal mask via pmask, to be restored by the main thread 500 * after pthread_create(). 501 */ 502#ifndef SYS_WINNT 503void 504block_thread_signals( 505 sigset_t * pmask 506 ) 507{ 508 sigset_t block; 509 510 sigemptyset(&block); 511# ifdef HAVE_SIGNALED_IO 512# ifdef SIGIO 513 sigaddset(&block, SIGIO); 514# endif 515# ifdef SIGPOLL 516 sigaddset(&block, SIGPOLL); 517# endif 518# endif /* HAVE_SIGNALED_IO */ 519 sigaddset(&block, SIGALRM); 520 sigaddset(&block, MOREDEBUGSIG); 521 sigaddset(&block, LESSDEBUGSIG); 522# ifdef SIGDIE1 523 sigaddset(&block, SIGDIE1); 524# endif 525# ifdef SIGDIE2 526 sigaddset(&block, SIGDIE2); 527# endif 528# ifdef SIGDIE3 529 sigaddset(&block, SIGDIE3); 530# endif 531# ifdef SIGDIE4 532 sigaddset(&block, SIGDIE4); 533# endif 534# ifdef SIGBUS 535 sigaddset(&block, SIGBUS); 536# endif 537 sigemptyset(pmask); 538 pthread_sigmask(SIG_BLOCK, &block, pmask); 539} 540#endif /* !SYS_WINNT */ 541 542 543/* 544 * prepare_child_sems() 545 * 546 * create sync events (semaphores) 547 * child_is_blocking initially unset 548 * blocking_req_ready initially unset 549 * 550 * Child waits for blocking_req_ready to be set after 551 * setting child_is_blocking. blocking_req_ready and 552 * blocking_response_ready are auto-reset, so wake one 553 * waiter and become unset (unsignalled) in one operation. 554 */ 555static void 556prepare_child_sems( 557 blocking_child *c 558 ) 559#ifdef SYS_WINNT 560{ 561 if (NULL == c->blocking_req_ready) { 562 /* manual reset using ResetEvent() */ 563 /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */ 564 /* auto reset - one thread released from wait each set */ 565 c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL); 566 c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL); 567 c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL); 568 } else { 569 /* !!!! ResetEvent(c->child_is_blocking); */ 570 /* ResetEvent(c->blocking_req_ready); */ 571 /* ResetEvent(c->blocking_response_ready); */ 572 /* ResetEvent(c->wake_scheduled_sleep); */ 573 } 574} 575#else /* pthreads prepare_child_sems() follows */ 576{ 577 size_t octets; 578 579 if (NULL == c->blocking_req_ready) { 580 octets = sizeof(*c->blocking_req_ready); 581 octets += sizeof(*c->wake_scheduled_sleep); 582 /* !!!! octets += sizeof(*c->child_is_blocking); */ 583 c->blocking_req_ready = emalloc_zero(octets);; 584 c->wake_scheduled_sleep = 1 + c->blocking_req_ready; 585 /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */ 586 } else { 587 sem_destroy(c->blocking_req_ready); 588 sem_destroy(c->wake_scheduled_sleep); 589 /* !!!! sem_destroy(c->child_is_blocking); */ 590 } 591 sem_init(c->blocking_req_ready, FALSE, 0); 592 sem_init(c->wake_scheduled_sleep, FALSE, 0); 593 /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */ 594} 595#endif 596 597 598static int 599wait_for_sem( 600 sem_ref sem, 601 struct timespec * timeout /* wall-clock */ 602 ) 603#ifdef SYS_WINNT 604{ 605 struct timespec now; 606 struct timespec delta; 607 DWORD msec; 608 DWORD rc; 609 610 if (NULL == timeout) { 611 msec = INFINITE; 612 } else { 613 getclock(TIMEOFDAY, &now); 614 delta = sub_tspec(*timeout, now); 615 if (delta.tv_sec < 0) { 616 msec = 0; 617 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 618 msec = INFINITE; 619 } else { 620 msec = 1000 * (DWORD)delta.tv_sec; 621 msec += delta.tv_nsec / (1000 * 1000); 622 } 623 } 624 rc = WaitForSingleObject(sem, msec); 625 if (WAIT_OBJECT_0 == rc) 626 return 0; 627 if (WAIT_TIMEOUT == rc) { 628 errno = ETIMEDOUT; 629 return -1; 630 } 631 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 632 errno = EFAULT; 633 return -1; 634} 635#else /* pthreads wait_for_sem() follows */ 636{ 637 int rc; 638 639 if (NULL == timeout) 640 rc = sem_wait(sem); 641 else 642 rc = sem_timedwait(sem, timeout); 643 644 return rc; 645} 646#endif 647 648 649/* 650 * blocking_thread - thread functions have WINAPI calling convention 651 */ 652#ifdef SYS_WINNT 653u_int 654WINAPI 655#else 656void * 657#endif 658blocking_thread( 659 void * ThreadArg 660 ) 661{ 662 blocking_child *c; 663 664 c = ThreadArg; 665 exit_worker(blocking_child_common(c)); 666 667 /* NOTREACHED */ 668 return 0; 669} 670 671 672/* 673 * req_child_exit() runs in the parent. 674 */ 675int 676req_child_exit( 677 blocking_child *c 678 ) 679{ 680 return queue_req_pointer(c, CHILD_EXIT_REQ); 681} 682 683 684/* 685 * cleanup_after_child() runs in parent. 686 */ 687static void 688cleanup_after_child( 689 blocking_child * c 690 ) 691{ 692 u_int idx; 693 694 DEBUG_INSIST(!c->reusable); 695#ifdef SYS_WINNT 696 INSIST(CloseHandle(c->thread_ref)); 697#else 698 free(c->thread_ref); 699#endif 700 c->thread_ref = NULL; 701 c->thread_id = 0; 702#ifdef WORK_PIPE 703 DEBUG_INSIST(-1 != c->resp_read_pipe); 704 DEBUG_INSIST(-1 != c->resp_write_pipe); 705 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 706 close(c->resp_write_pipe); 707 close(c->resp_read_pipe); 708 c->resp_write_pipe = -1; 709 c->resp_read_pipe = -1; 710#else 711 DEBUG_INSIST(NULL != c->blocking_response_ready); 712 (*addremove_io_semaphore)(c->blocking_response_ready, TRUE); 713#endif 714 for (idx = 0; idx < c->workitems_alloc; idx++) 715 c->workitems[idx] = NULL; 716 c->next_workitem = 0; 717 c->next_workeritem = 0; 718 for (idx = 0; idx < c->responses_alloc; idx++) 719 c->responses[idx] = NULL; 720 c->next_response = 0; 721 c->next_workresp = 0; 722 c->reusable = TRUE; 723} 724 725 726#else /* !WORK_THREAD follows */ 727char work_thread_nonempty_compilation_unit; 728#endif 729