work_thread.c revision 275970
1275970Scy/* 2275970Scy * work_thread.c - threads implementation for blocking worker child. 3275970Scy */ 4275970Scy#include <config.h> 5275970Scy#include "ntp_workimpl.h" 6275970Scy 7275970Scy#ifdef WORK_THREAD 8275970Scy 9275970Scy#include <stdio.h> 10275970Scy#include <ctype.h> 11275970Scy#include <signal.h> 12275970Scy#ifndef SYS_WINNT 13275970Scy#include <pthread.h> 14275970Scy#endif 15275970Scy 16275970Scy#include "ntp_stdlib.h" 17275970Scy#include "ntp_malloc.h" 18275970Scy#include "ntp_syslog.h" 19275970Scy#include "ntpd.h" 20275970Scy#include "ntp_io.h" 21275970Scy#include "ntp_assert.h" 22275970Scy#include "ntp_unixtime.h" 23275970Scy#include "timespecops.h" 24275970Scy#include "ntp_worker.h" 25275970Scy 26275970Scy#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27275970Scy#define CHILD_GONE_RESP CHILD_EXIT_REQ 28275970Scy#define WORKITEMS_ALLOC_INC 16 29275970Scy#define RESPONSES_ALLOC_INC 4 30275970Scy 31275970Scy#ifndef THREAD_MINSTACKSIZE 32275970Scy#define THREAD_MINSTACKSIZE (64U * 1024) 33275970Scy#endif 34275970Scy 35275970Scy#ifndef DEVOLATILE 36275970Scy#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var)) 37275970Scy#endif 38275970Scy 39275970Scy#ifdef SYS_WINNT 40275970Scy# define thread_exit(c) _endthreadex(c) 41275970Scy# define tickle_sem SetEvent 42275970Scy#else 43275970Scy# define thread_exit(c) pthread_exit((void*)(size_t)(c)) 44275970Scy# define tickle_sem sem_post 45275970Scy#endif 46275970Scy 47275970Scy#ifdef WORK_PIPE 48275970Scyaddremove_io_fd_func addremove_io_fd; 49275970Scy#else 50275970Scyaddremove_io_semaphore_func addremove_io_semaphore; 51275970Scy#endif 52275970Scy 53275970Scystatic void start_blocking_thread(blocking_child *); 54275970Scystatic void start_blocking_thread_internal(blocking_child *); 55275970Scystatic void prepare_child_sems(blocking_child *); 56275970Scystatic int wait_for_sem(sem_ref, struct timespec *); 57275970Scystatic void ensure_workitems_empty_slot(blocking_child *); 58275970Scystatic void ensure_workresp_empty_slot(blocking_child *); 59275970Scystatic int queue_req_pointer(blocking_child *, blocking_pipe_header *); 60275970Scystatic void cleanup_after_child(blocking_child *); 61275970Scy#ifdef SYS_WINNT 62275970Scyu_int WINAPI blocking_thread(void *); 63275970Scy#else 64275970Scyvoid * blocking_thread(void *); 65275970Scy#endif 66275970Scy#ifndef SYS_WINNT 67275970Scystatic void block_thread_signals(sigset_t *); 68275970Scy#endif 69275970Scy 70275970Scy 71275970Scyvoid 72275970Scyexit_worker( 73275970Scy int exitcode 74275970Scy ) 75275970Scy{ 76275970Scy thread_exit(exitcode); /* see #define thread_exit */ 77275970Scy} 78275970Scy 79275970Scy 80275970Scyint 81275970Scyworker_sleep( 82275970Scy blocking_child * c, 83275970Scy time_t seconds 84275970Scy ) 85275970Scy{ 86275970Scy struct timespec until; 87275970Scy int rc; 88275970Scy 89275970Scy# ifdef HAVE_CLOCK_GETTIME 90275970Scy if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 91275970Scy msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 92275970Scy return -1; 93275970Scy } 94275970Scy# else 95275970Scy if (0 != getclock(TIMEOFDAY, &until)) { 96275970Scy msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 97275970Scy return -1; 98275970Scy } 99275970Scy# endif 100275970Scy until.tv_sec += seconds; 101275970Scy do { 102275970Scy rc = wait_for_sem(c->wake_scheduled_sleep, &until); 103275970Scy } while (-1 == rc && EINTR == errno); 104275970Scy if (0 == rc) 105275970Scy return -1; 106275970Scy if (-1 == rc && ETIMEDOUT == errno) 107275970Scy return 0; 108275970Scy msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 109275970Scy return -1; 110275970Scy} 111275970Scy 112275970Scy 113275970Scyvoid 114275970Scyinterrupt_worker_sleep(void) 115275970Scy{ 116275970Scy u_int idx; 117275970Scy blocking_child * c; 118275970Scy 119275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 120275970Scy c = blocking_children[idx]; 121275970Scy if (NULL == c || NULL == c->wake_scheduled_sleep) 122275970Scy continue; 123275970Scy tickle_sem(c->wake_scheduled_sleep); 124275970Scy } 125275970Scy} 126275970Scy 127275970Scy 128275970Scystatic void 129275970Scyensure_workitems_empty_slot( 130275970Scy blocking_child *c 131275970Scy ) 132275970Scy{ 133275970Scy const size_t each = sizeof(blocking_children[0]->workitems[0]); 134275970Scy size_t new_alloc; 135275970Scy size_t old_octets; 136275970Scy size_t new_octets; 137275970Scy void * nonvol_workitems; 138275970Scy 139275970Scy 140275970Scy if (c->workitems != NULL && 141275970Scy NULL == c->workitems[c->next_workitem]) 142275970Scy return; 143275970Scy 144275970Scy new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 145275970Scy old_octets = c->workitems_alloc * each; 146275970Scy new_octets = new_alloc * each; 147275970Scy nonvol_workitems = DEVOLATILE(void *, c->workitems); 148275970Scy c->workitems = erealloc_zero(nonvol_workitems, new_octets, 149275970Scy old_octets); 150275970Scy if (0 == c->next_workitem) 151275970Scy c->next_workitem = c->workitems_alloc; 152275970Scy c->workitems_alloc = new_alloc; 153275970Scy} 154275970Scy 155275970Scy 156275970Scystatic void 157275970Scyensure_workresp_empty_slot( 158275970Scy blocking_child *c 159275970Scy ) 160275970Scy{ 161275970Scy const size_t each = sizeof(blocking_children[0]->responses[0]); 162275970Scy size_t new_alloc; 163275970Scy size_t old_octets; 164275970Scy size_t new_octets; 165275970Scy void * nonvol_responses; 166275970Scy 167275970Scy if (c->responses != NULL && 168275970Scy NULL == c->responses[c->next_response]) 169275970Scy return; 170275970Scy 171275970Scy new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 172275970Scy old_octets = c->responses_alloc * each; 173275970Scy new_octets = new_alloc * each; 174275970Scy nonvol_responses = DEVOLATILE(void *, c->responses); 175275970Scy c->responses = erealloc_zero(nonvol_responses, new_octets, 176275970Scy old_octets); 177275970Scy if (0 == c->next_response) 178275970Scy c->next_response = c->responses_alloc; 179275970Scy c->responses_alloc = new_alloc; 180275970Scy} 181275970Scy 182275970Scy 183275970Scy/* 184275970Scy * queue_req_pointer() - append a work item or idle exit request to 185275970Scy * blocking_workitems[]. 186275970Scy */ 187275970Scystatic int 188275970Scyqueue_req_pointer( 189275970Scy blocking_child * c, 190275970Scy blocking_pipe_header * hdr 191275970Scy ) 192275970Scy{ 193275970Scy c->workitems[c->next_workitem] = hdr; 194275970Scy c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc; 195275970Scy 196275970Scy /* 197275970Scy * We only want to signal the wakeup event if the child is 198275970Scy * blocking on it, which is indicated by setting the blocking 199275970Scy * event. Wait with zero timeout to test. 200275970Scy */ 201275970Scy /* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */ 202275970Scy tickle_sem(c->blocking_req_ready); 203275970Scy 204275970Scy return 0; 205275970Scy} 206275970Scy 207275970Scy 208275970Scyint 209275970Scysend_blocking_req_internal( 210275970Scy blocking_child * c, 211275970Scy blocking_pipe_header * hdr, 212275970Scy void * data 213275970Scy ) 214275970Scy{ 215275970Scy blocking_pipe_header * threadcopy; 216275970Scy size_t payload_octets; 217275970Scy 218275970Scy REQUIRE(hdr != NULL); 219275970Scy REQUIRE(data != NULL); 220275970Scy DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 221275970Scy 222275970Scy if (hdr->octets <= sizeof(*hdr)) 223275970Scy return 1; /* failure */ 224275970Scy payload_octets = hdr->octets - sizeof(*hdr); 225275970Scy 226275970Scy ensure_workitems_empty_slot(c); 227275970Scy if (NULL == c->thread_ref) { 228275970Scy ensure_workresp_empty_slot(c); 229275970Scy start_blocking_thread(c); 230275970Scy } 231275970Scy 232275970Scy threadcopy = emalloc(hdr->octets); 233275970Scy memcpy(threadcopy, hdr, sizeof(*hdr)); 234275970Scy memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 235275970Scy 236275970Scy return queue_req_pointer(c, threadcopy); 237275970Scy} 238275970Scy 239275970Scy 240275970Scyblocking_pipe_header * 241275970Scyreceive_blocking_req_internal( 242275970Scy blocking_child * c 243275970Scy ) 244275970Scy{ 245275970Scy blocking_pipe_header * req; 246275970Scy int rc; 247275970Scy 248275970Scy /* 249275970Scy * Child blocks here when idle. SysV semaphores maintain a 250275970Scy * count and release from sem_wait() only when it reaches 0. 251275970Scy * Windows auto-reset events are simpler, and multiple SetEvent 252275970Scy * calls before any thread waits result in a single wakeup. 253275970Scy * On Windows, the child drains all workitems each wakeup, while 254275970Scy * with SysV semaphores wait_sem() is used before each item. 255275970Scy */ 256275970Scy#ifdef SYS_WINNT 257275970Scy while (NULL == c->workitems[c->next_workeritem]) { 258275970Scy /* !!!! SetEvent(c->child_is_blocking); */ 259275970Scy rc = wait_for_sem(c->blocking_req_ready, NULL); 260275970Scy INSIST(0 == rc); 261275970Scy /* !!!! ResetEvent(c->child_is_blocking); */ 262275970Scy } 263275970Scy#else 264275970Scy do { 265275970Scy rc = wait_for_sem(c->blocking_req_ready, NULL); 266275970Scy } while (-1 == rc && EINTR == errno); 267275970Scy INSIST(0 == rc); 268275970Scy#endif 269275970Scy 270275970Scy req = c->workitems[c->next_workeritem]; 271275970Scy INSIST(NULL != req); 272275970Scy c->workitems[c->next_workeritem] = NULL; 273275970Scy c->next_workeritem = (1 + c->next_workeritem) % 274275970Scy c->workitems_alloc; 275275970Scy 276275970Scy if (CHILD_EXIT_REQ == req) { /* idled out */ 277275970Scy send_blocking_resp_internal(c, CHILD_GONE_RESP); 278275970Scy req = NULL; 279275970Scy } 280275970Scy 281275970Scy return req; 282275970Scy} 283275970Scy 284275970Scy 285275970Scyint 286275970Scysend_blocking_resp_internal( 287275970Scy blocking_child * c, 288275970Scy blocking_pipe_header * resp 289275970Scy ) 290275970Scy{ 291275970Scy ensure_workresp_empty_slot(c); 292275970Scy 293275970Scy c->responses[c->next_response] = resp; 294275970Scy c->next_response = (1 + c->next_response) % c->responses_alloc; 295275970Scy 296275970Scy#ifdef WORK_PIPE 297275970Scy write(c->resp_write_pipe, "", 1); 298275970Scy#else 299275970Scy tickle_sem(c->blocking_response_ready); 300275970Scy#endif 301275970Scy 302275970Scy return 0; 303275970Scy} 304275970Scy 305275970Scy 306275970Scy#ifndef WORK_PIPE 307275970Scyvoid 308275970Scyhandle_blocking_resp_sem( 309275970Scy void * context 310275970Scy ) 311275970Scy{ 312275970Scy HANDLE ready; 313275970Scy blocking_child * c; 314275970Scy u_int idx; 315275970Scy 316275970Scy ready = (HANDLE)context; 317275970Scy c = NULL; 318275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 319275970Scy c = blocking_children[idx]; 320275970Scy if (c != NULL && c->thread_ref != NULL && 321275970Scy ready == c->blocking_response_ready) 322275970Scy break; 323275970Scy } 324275970Scy if (idx < blocking_children_alloc) 325275970Scy process_blocking_resp(c); 326275970Scy} 327275970Scy#endif /* !WORK_PIPE */ 328275970Scy 329275970Scy 330275970Scyblocking_pipe_header * 331275970Scyreceive_blocking_resp_internal( 332275970Scy blocking_child * c 333275970Scy ) 334275970Scy{ 335275970Scy blocking_pipe_header * removed; 336275970Scy#ifdef WORK_PIPE 337275970Scy int rc; 338275970Scy char scratch[32]; 339275970Scy 340275970Scy do { 341275970Scy rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 342275970Scy } while (-1 == rc && EINTR == errno); 343275970Scy#endif 344275970Scy removed = c->responses[c->next_workresp]; 345275970Scy if (NULL != removed) { 346275970Scy c->responses[c->next_workresp] = NULL; 347275970Scy c->next_workresp = (1 + c->next_workresp) % 348275970Scy c->responses_alloc; 349275970Scy DEBUG_ENSURE(CHILD_GONE_RESP == removed || 350275970Scy BLOCKING_RESP_MAGIC == removed->magic_sig); 351275970Scy } 352275970Scy if (CHILD_GONE_RESP == removed) { 353275970Scy cleanup_after_child(c); 354275970Scy removed = NULL; 355275970Scy } 356275970Scy 357275970Scy return removed; 358275970Scy} 359275970Scy 360275970Scy 361275970Scystatic void 362275970Scystart_blocking_thread( 363275970Scy blocking_child * c 364275970Scy ) 365275970Scy{ 366275970Scy 367275970Scy DEBUG_INSIST(!c->reusable); 368275970Scy 369275970Scy prepare_child_sems(c); 370275970Scy start_blocking_thread_internal(c); 371275970Scy} 372275970Scy 373275970Scy 374275970Scystatic void 375275970Scystart_blocking_thread_internal( 376275970Scy blocking_child * c 377275970Scy ) 378275970Scy#ifdef SYS_WINNT 379275970Scy{ 380275970Scy thr_ref blocking_child_thread; 381275970Scy u_int blocking_thread_id; 382275970Scy BOOL resumed; 383275970Scy 384275970Scy (*addremove_io_semaphore)(c->blocking_response_ready, FALSE); 385275970Scy blocking_child_thread = 386275970Scy (HANDLE)_beginthreadex( 387275970Scy NULL, 388275970Scy 0, 389275970Scy &blocking_thread, 390275970Scy c, 391275970Scy CREATE_SUSPENDED, 392275970Scy &blocking_thread_id); 393275970Scy 394275970Scy if (NULL == blocking_child_thread) { 395275970Scy msyslog(LOG_ERR, "start blocking thread failed: %m"); 396275970Scy exit(-1); 397275970Scy } 398275970Scy c->thread_id = blocking_thread_id; 399275970Scy c->thread_ref = blocking_child_thread; 400275970Scy /* remember the thread priority is only within the process class */ 401275970Scy if (!SetThreadPriority(blocking_child_thread, 402275970Scy THREAD_PRIORITY_BELOW_NORMAL)) 403275970Scy msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 404275970Scy 405275970Scy resumed = ResumeThread(blocking_child_thread); 406275970Scy DEBUG_INSIST(resumed); 407275970Scy} 408275970Scy#else /* pthreads start_blocking_thread_internal() follows */ 409275970Scy{ 410275970Scy# ifdef NEED_PTHREAD_INIT 411275970Scy static int pthread_init_called; 412275970Scy# endif 413275970Scy pthread_attr_t thr_attr; 414275970Scy int rc; 415275970Scy int saved_errno; 416275970Scy int pipe_ends[2]; /* read then write */ 417275970Scy int is_pipe; 418275970Scy int flags; 419275970Scy size_t stacksize; 420275970Scy sigset_t saved_sig_mask; 421275970Scy 422275970Scy# ifdef NEED_PTHREAD_INIT 423275970Scy /* 424275970Scy * from lib/isc/unix/app.c: 425275970Scy * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 426275970Scy */ 427275970Scy if (!pthread_init_called) { 428275970Scy pthread_init(); 429275970Scy pthread_init_called = TRUE; 430275970Scy } 431275970Scy# endif 432275970Scy 433275970Scy rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 434275970Scy if (0 != rc) { 435275970Scy msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 436275970Scy exit(1); 437275970Scy } 438275970Scy c->resp_read_pipe = move_fd(pipe_ends[0]); 439275970Scy c->resp_write_pipe = move_fd(pipe_ends[1]); 440275970Scy c->ispipe = is_pipe; 441275970Scy flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 442275970Scy if (-1 == flags) { 443275970Scy msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 444275970Scy exit(1); 445275970Scy } 446275970Scy rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 447275970Scy if (-1 == rc) { 448275970Scy msyslog(LOG_ERR, 449275970Scy "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 450275970Scy exit(1); 451275970Scy } 452275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 453275970Scy pthread_attr_init(&thr_attr); 454275970Scy pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 455275970Scy#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 456275970Scy defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 457275970Scy rc = pthread_attr_getstacksize(&thr_attr, &stacksize); 458275970Scy if (-1 == rc) { 459275970Scy msyslog(LOG_ERR, 460275970Scy "start_blocking_thread: pthread_attr_getstacksize %m"); 461275970Scy } else if (stacksize < THREAD_MINSTACKSIZE) { 462275970Scy rc = pthread_attr_setstacksize(&thr_attr, 463275970Scy THREAD_MINSTACKSIZE); 464275970Scy if (-1 == rc) 465275970Scy msyslog(LOG_ERR, 466275970Scy "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m", 467275970Scy (u_long)stacksize, 468275970Scy (u_long)THREAD_MINSTACKSIZE); 469275970Scy } 470275970Scy#else 471275970Scy UNUSED_ARG(stacksize); 472275970Scy#endif 473275970Scy#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 474275970Scy pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 475275970Scy#endif 476275970Scy c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 477275970Scy block_thread_signals(&saved_sig_mask); 478275970Scy rc = pthread_create(c->thread_ref, &thr_attr, 479275970Scy &blocking_thread, c); 480275970Scy saved_errno = errno; 481275970Scy pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 482275970Scy pthread_attr_destroy(&thr_attr); 483275970Scy if (0 != rc) { 484275970Scy errno = saved_errno; 485275970Scy msyslog(LOG_ERR, "pthread_create() blocking child: %m"); 486275970Scy exit(1); 487275970Scy } 488275970Scy} 489275970Scy#endif 490275970Scy 491275970Scy 492275970Scy/* 493275970Scy * block_thread_signals() 494275970Scy * 495275970Scy * Temporarily block signals used by ntpd main thread, so that signal 496275970Scy * mask inherited by child threads leaves them blocked. Returns prior 497275970Scy * active signal mask via pmask, to be restored by the main thread 498275970Scy * after pthread_create(). 499275970Scy */ 500275970Scy#ifndef SYS_WINNT 501275970Scyvoid 502275970Scyblock_thread_signals( 503275970Scy sigset_t * pmask 504275970Scy ) 505275970Scy{ 506275970Scy sigset_t block; 507275970Scy 508275970Scy sigemptyset(&block); 509275970Scy# ifdef HAVE_SIGNALED_IO 510275970Scy# ifdef SIGIO 511275970Scy sigaddset(&block, SIGIO); 512275970Scy# endif 513275970Scy# ifdef SIGPOLL 514275970Scy sigaddset(&block, SIGPOLL); 515275970Scy# endif 516275970Scy# endif /* HAVE_SIGNALED_IO */ 517275970Scy sigaddset(&block, SIGALRM); 518275970Scy sigaddset(&block, MOREDEBUGSIG); 519275970Scy sigaddset(&block, LESSDEBUGSIG); 520275970Scy# ifdef SIGDIE1 521275970Scy sigaddset(&block, SIGDIE1); 522275970Scy# endif 523275970Scy# ifdef SIGDIE2 524275970Scy sigaddset(&block, SIGDIE2); 525275970Scy# endif 526275970Scy# ifdef SIGDIE3 527275970Scy sigaddset(&block, SIGDIE3); 528275970Scy# endif 529275970Scy# ifdef SIGDIE4 530275970Scy sigaddset(&block, SIGDIE4); 531275970Scy# endif 532275970Scy# ifdef SIGBUS 533275970Scy sigaddset(&block, SIGBUS); 534275970Scy# endif 535275970Scy sigemptyset(pmask); 536275970Scy pthread_sigmask(SIG_BLOCK, &block, pmask); 537275970Scy} 538275970Scy#endif /* !SYS_WINNT */ 539275970Scy 540275970Scy 541275970Scy/* 542275970Scy * prepare_child_sems() 543275970Scy * 544275970Scy * create sync events (semaphores) 545275970Scy * child_is_blocking initially unset 546275970Scy * blocking_req_ready initially unset 547275970Scy * 548275970Scy * Child waits for blocking_req_ready to be set after 549275970Scy * setting child_is_blocking. blocking_req_ready and 550275970Scy * blocking_response_ready are auto-reset, so wake one 551275970Scy * waiter and become unset (unsignalled) in one operation. 552275970Scy */ 553275970Scystatic void 554275970Scyprepare_child_sems( 555275970Scy blocking_child *c 556275970Scy ) 557275970Scy#ifdef SYS_WINNT 558275970Scy{ 559275970Scy if (NULL == c->blocking_req_ready) { 560275970Scy /* manual reset using ResetEvent() */ 561275970Scy /* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */ 562275970Scy /* auto reset - one thread released from wait each set */ 563275970Scy c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL); 564275970Scy c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL); 565275970Scy c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL); 566275970Scy } else { 567275970Scy /* !!!! ResetEvent(c->child_is_blocking); */ 568275970Scy /* ResetEvent(c->blocking_req_ready); */ 569275970Scy /* ResetEvent(c->blocking_response_ready); */ 570275970Scy /* ResetEvent(c->wake_scheduled_sleep); */ 571275970Scy } 572275970Scy} 573275970Scy#else /* pthreads prepare_child_sems() follows */ 574275970Scy{ 575275970Scy size_t octets; 576275970Scy 577275970Scy if (NULL == c->blocking_req_ready) { 578275970Scy octets = sizeof(*c->blocking_req_ready); 579275970Scy octets += sizeof(*c->wake_scheduled_sleep); 580275970Scy /* !!!! octets += sizeof(*c->child_is_blocking); */ 581275970Scy c->blocking_req_ready = emalloc_zero(octets);; 582275970Scy c->wake_scheduled_sleep = 1 + c->blocking_req_ready; 583275970Scy /* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */ 584275970Scy } else { 585275970Scy sem_destroy(c->blocking_req_ready); 586275970Scy sem_destroy(c->wake_scheduled_sleep); 587275970Scy /* !!!! sem_destroy(c->child_is_blocking); */ 588275970Scy } 589275970Scy sem_init(c->blocking_req_ready, FALSE, 0); 590275970Scy sem_init(c->wake_scheduled_sleep, FALSE, 0); 591275970Scy /* !!!! sem_init(c->child_is_blocking, FALSE, 0); */ 592275970Scy} 593275970Scy#endif 594275970Scy 595275970Scy 596275970Scystatic int 597275970Scywait_for_sem( 598275970Scy sem_ref sem, 599275970Scy struct timespec * timeout /* wall-clock */ 600275970Scy ) 601275970Scy#ifdef SYS_WINNT 602275970Scy{ 603275970Scy struct timespec now; 604275970Scy struct timespec delta; 605275970Scy DWORD msec; 606275970Scy DWORD rc; 607275970Scy 608275970Scy if (NULL == timeout) { 609275970Scy msec = INFINITE; 610275970Scy } else { 611275970Scy getclock(TIMEOFDAY, &now); 612275970Scy delta = sub_tspec(*timeout, now); 613275970Scy if (delta.tv_sec < 0) { 614275970Scy msec = 0; 615275970Scy } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 616275970Scy msec = INFINITE; 617275970Scy } else { 618275970Scy msec = 1000 * (DWORD)delta.tv_sec; 619275970Scy msec += delta.tv_nsec / (1000 * 1000); 620275970Scy } 621275970Scy } 622275970Scy rc = WaitForSingleObject(sem, msec); 623275970Scy if (WAIT_OBJECT_0 == rc) 624275970Scy return 0; 625275970Scy if (WAIT_TIMEOUT == rc) { 626275970Scy errno = ETIMEDOUT; 627275970Scy return -1; 628275970Scy } 629275970Scy msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 630275970Scy errno = EFAULT; 631275970Scy return -1; 632275970Scy} 633275970Scy#else /* pthreads wait_for_sem() follows */ 634275970Scy{ 635275970Scy int rc; 636275970Scy 637275970Scy if (NULL == timeout) 638275970Scy rc = sem_wait(sem); 639275970Scy else 640275970Scy rc = sem_timedwait(sem, timeout); 641275970Scy 642275970Scy return rc; 643275970Scy} 644275970Scy#endif 645275970Scy 646275970Scy 647275970Scy/* 648275970Scy * blocking_thread - thread functions have WINAPI calling convention 649275970Scy */ 650275970Scy#ifdef SYS_WINNT 651275970Scyu_int 652275970ScyWINAPI 653275970Scy#else 654275970Scyvoid * 655275970Scy#endif 656275970Scyblocking_thread( 657275970Scy void * ThreadArg 658275970Scy ) 659275970Scy{ 660275970Scy blocking_child *c; 661275970Scy 662275970Scy c = ThreadArg; 663275970Scy exit_worker(blocking_child_common(c)); 664275970Scy 665275970Scy /* NOTREACHED */ 666275970Scy return 0; 667275970Scy} 668275970Scy 669275970Scy 670275970Scy/* 671275970Scy * req_child_exit() runs in the parent. 672275970Scy */ 673275970Scyint 674275970Scyreq_child_exit( 675275970Scy blocking_child *c 676275970Scy ) 677275970Scy{ 678275970Scy return queue_req_pointer(c, CHILD_EXIT_REQ); 679275970Scy} 680275970Scy 681275970Scy 682275970Scy/* 683275970Scy * cleanup_after_child() runs in parent. 684275970Scy */ 685275970Scystatic void 686275970Scycleanup_after_child( 687275970Scy blocking_child * c 688275970Scy ) 689275970Scy{ 690275970Scy u_int idx; 691275970Scy 692275970Scy DEBUG_INSIST(!c->reusable); 693275970Scy#ifdef SYS_WINNT 694275970Scy INSIST(CloseHandle(c->thread_ref)); 695275970Scy#else 696275970Scy free(c->thread_ref); 697275970Scy#endif 698275970Scy c->thread_ref = NULL; 699275970Scy c->thread_id = 0; 700275970Scy#ifdef WORK_PIPE 701275970Scy DEBUG_INSIST(-1 != c->resp_read_pipe); 702275970Scy DEBUG_INSIST(-1 != c->resp_write_pipe); 703275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 704275970Scy close(c->resp_write_pipe); 705275970Scy close(c->resp_read_pipe); 706275970Scy c->resp_write_pipe = -1; 707275970Scy c->resp_read_pipe = -1; 708275970Scy#else 709275970Scy DEBUG_INSIST(NULL != c->blocking_response_ready); 710275970Scy (*addremove_io_semaphore)(c->blocking_response_ready, TRUE); 711275970Scy#endif 712275970Scy for (idx = 0; idx < c->workitems_alloc; idx++) 713275970Scy c->workitems[idx] = NULL; 714275970Scy c->next_workitem = 0; 715275970Scy c->next_workeritem = 0; 716275970Scy for (idx = 0; idx < c->responses_alloc; idx++) 717275970Scy c->responses[idx] = NULL; 718275970Scy c->next_response = 0; 719275970Scy c->next_workresp = 0; 720275970Scy c->reusable = TRUE; 721275970Scy} 722275970Scy 723275970Scy 724275970Scy#else /* !WORK_THREAD follows */ 725275970Scychar work_thread_nonempty_compilation_unit; 726275970Scy#endif 727