1275970Scy/* 2275970Scy * work_thread.c - threads implementation for blocking worker child. 3275970Scy */ 4275970Scy#include <config.h> 5275970Scy#include "ntp_workimpl.h" 6275970Scy 7275970Scy#ifdef WORK_THREAD 8275970Scy 9275970Scy#include <stdio.h> 10275970Scy#include <ctype.h> 11275970Scy#include <signal.h> 12275970Scy#ifndef SYS_WINNT 13275970Scy#include <pthread.h> 14275970Scy#endif 15275970Scy 16275970Scy#include "ntp_stdlib.h" 17275970Scy#include "ntp_malloc.h" 18275970Scy#include "ntp_syslog.h" 19275970Scy#include "ntpd.h" 20275970Scy#include "ntp_io.h" 21275970Scy#include "ntp_assert.h" 22275970Scy#include "ntp_unixtime.h" 23275970Scy#include "timespecops.h" 24275970Scy#include "ntp_worker.h" 25275970Scy 26275970Scy#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27275970Scy#define CHILD_GONE_RESP CHILD_EXIT_REQ 28294554Sdelphij/* Queue size increments: 29294554Sdelphij * The request queue grows a bit faster than the response queue -- the 30330106Sdelphij * daemon can push requests and pull results faster on avarage than the 31294554Sdelphij * worker can process requests and push results... If this really pays 32294554Sdelphij * off is debatable. 33294554Sdelphij */ 34275970Scy#define WORKITEMS_ALLOC_INC 16 35275970Scy#define RESPONSES_ALLOC_INC 4 36275970Scy 37294554Sdelphij/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 38294554Sdelphij * set the maximum to 256kB. If the minimum goes below the 39294554Sdelphij * system-defined minimum stack size, we have to adjust accordingly. 40294554Sdelphij */ 41275970Scy#ifndef THREAD_MINSTACKSIZE 42294554Sdelphij# define THREAD_MINSTACKSIZE (64U * 1024) 43275970Scy#endif 44294554Sdelphij#ifndef __sun 45294554Sdelphij#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN 46294554Sdelphij# undef THREAD_MINSTACKSIZE 47294554Sdelphij# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN 48294554Sdelphij#endif 49294554Sdelphij#endif 50275970Scy 51294554Sdelphij#ifndef THREAD_MAXSTACKSIZE 52294554Sdelphij# define THREAD_MAXSTACKSIZE (256U * 1024) 53294554Sdelphij#endif 54294554Sdelphij#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE 55294554Sdelphij# undef THREAD_MAXSTACKSIZE 56294554Sdelphij# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE 57294554Sdelphij#endif 58294554Sdelphij 59338530Sdelphij/* need a good integer to store a pointer... */ 60338530Sdelphij#ifndef UINTPTR_T 61338530Sdelphij# if defined(UINTPTR_MAX) 62338530Sdelphij# define UINTPTR_T uintptr_t 63338530Sdelphij# elif defined(UINT_PTR) 64338530Sdelphij# define UINTPTR_T UINT_PTR 65338530Sdelphij# else 66338530Sdelphij# define UINTPTR_T size_t 67338530Sdelphij# endif 68338530Sdelphij#endif 69294554Sdelphij 70338530Sdelphij 71293423Sdelphij#ifdef SYS_WINNT 72275970Scy 73275970Scy# define thread_exit(c) _endthreadex(c) 74293423Sdelphij# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 75293423Sdelphiju_int WINAPI blocking_thread(void *); 76293423Sdelphijstatic BOOL same_os_sema(const sem_ref obj, void * osobj); 77293423Sdelphij 78275970Scy#else 79293423Sdelphij 80338530Sdelphij# define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c)) 81275970Scy# define tickle_sem sem_post 82293423Sdelphijvoid * blocking_thread(void *); 83293423Sdelphijstatic void block_thread_signals(sigset_t *); 84293423Sdelphij 85275970Scy#endif 86275970Scy 87275970Scy#ifdef WORK_PIPE 88275970Scyaddremove_io_fd_func addremove_io_fd; 89275970Scy#else 90275970Scyaddremove_io_semaphore_func addremove_io_semaphore; 91275970Scy#endif 92275970Scy 93275970Scystatic void start_blocking_thread(blocking_child *); 94275970Scystatic void start_blocking_thread_internal(blocking_child *); 95275970Scystatic void prepare_child_sems(blocking_child *); 96275970Scystatic int wait_for_sem(sem_ref, struct timespec *); 97293423Sdelphijstatic int ensure_workitems_empty_slot(blocking_child *); 98293423Sdelphijstatic int ensure_workresp_empty_slot(blocking_child *); 99275970Scystatic int queue_req_pointer(blocking_child *, blocking_pipe_header *); 100275970Scystatic void cleanup_after_child(blocking_child *); 101275970Scy 102298695Sdelphijstatic sema_type worker_mmutex; 103298695Sdelphijstatic sem_ref worker_memlock; 104275970Scy 105298695Sdelphij/* -------------------------------------------------------------------- 106298695Sdelphij * locking the global worker state table (and other global stuff) 107298695Sdelphij */ 108275970Scyvoid 109298695Sdelphijworker_global_lock( 110298695Sdelphij int inOrOut) 111298695Sdelphij{ 112298695Sdelphij if (worker_memlock) { 113298695Sdelphij if (inOrOut) 114298695Sdelphij wait_for_sem(worker_memlock, NULL); 115298695Sdelphij else 116298695Sdelphij tickle_sem(worker_memlock); 117298695Sdelphij } 118298695Sdelphij} 119298695Sdelphij 120298695Sdelphij/* -------------------------------------------------------------------- 121298695Sdelphij * implementation isolation wrapper 122298695Sdelphij */ 123298695Sdelphijvoid 124275970Scyexit_worker( 125275970Scy int exitcode 126275970Scy ) 127275970Scy{ 128275970Scy thread_exit(exitcode); /* see #define thread_exit */ 129275970Scy} 130275970Scy 131293423Sdelphij/* -------------------------------------------------------------------- 132293423Sdelphij * sleep for a given time or until the wakup semaphore is tickled. 133293423Sdelphij */ 134275970Scyint 135275970Scyworker_sleep( 136275970Scy blocking_child * c, 137275970Scy time_t seconds 138275970Scy ) 139275970Scy{ 140275970Scy struct timespec until; 141275970Scy int rc; 142275970Scy 143275970Scy# ifdef HAVE_CLOCK_GETTIME 144275970Scy if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 145275970Scy msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 146275970Scy return -1; 147275970Scy } 148275970Scy# else 149275970Scy if (0 != getclock(TIMEOFDAY, &until)) { 150275970Scy msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 151275970Scy return -1; 152275970Scy } 153275970Scy# endif 154275970Scy until.tv_sec += seconds; 155293423Sdelphij rc = wait_for_sem(c->wake_scheduled_sleep, &until); 156275970Scy if (0 == rc) 157275970Scy return -1; 158275970Scy if (-1 == rc && ETIMEDOUT == errno) 159275970Scy return 0; 160275970Scy msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 161275970Scy return -1; 162275970Scy} 163275970Scy 164275970Scy 165293423Sdelphij/* -------------------------------------------------------------------- 166293423Sdelphij * Wake up a worker that takes a nap. 167293423Sdelphij */ 168275970Scyvoid 169275970Scyinterrupt_worker_sleep(void) 170275970Scy{ 171275970Scy u_int idx; 172275970Scy blocking_child * c; 173275970Scy 174275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 175275970Scy c = blocking_children[idx]; 176275970Scy if (NULL == c || NULL == c->wake_scheduled_sleep) 177275970Scy continue; 178275970Scy tickle_sem(c->wake_scheduled_sleep); 179275970Scy } 180275970Scy} 181275970Scy 182293423Sdelphij/* -------------------------------------------------------------------- 183293423Sdelphij * Make sure there is an empty slot at the head of the request 184293423Sdelphij * queue. Tell if the queue is currently empty. 185293423Sdelphij */ 186293423Sdelphijstatic int 187275970Scyensure_workitems_empty_slot( 188275970Scy blocking_child *c 189275970Scy ) 190275970Scy{ 191293423Sdelphij /* 192293423Sdelphij ** !!! PRECONDITION: caller holds access lock! 193293423Sdelphij ** 194293423Sdelphij ** This simply tries to increase the size of the buffer if it 195293423Sdelphij ** becomes full. The resize operation does *not* maintain the 196293423Sdelphij ** order of requests, but that should be irrelevant since the 197293423Sdelphij ** processing is considered asynchronous anyway. 198293423Sdelphij ** 199293423Sdelphij ** Return if the buffer is currently empty. 200293423Sdelphij */ 201293423Sdelphij 202293423Sdelphij static const size_t each = 203293423Sdelphij sizeof(blocking_children[0]->workitems[0]); 204275970Scy 205293423Sdelphij size_t new_alloc; 206293423Sdelphij size_t slots_used; 207294554Sdelphij size_t sidx; 208275970Scy 209293423Sdelphij slots_used = c->head_workitem - c->tail_workitem; 210293423Sdelphij if (slots_used >= c->workitems_alloc) { 211293423Sdelphij new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 212293423Sdelphij c->workitems = erealloc(c->workitems, new_alloc * each); 213294554Sdelphij for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 214294554Sdelphij c->workitems[sidx] = NULL; 215293423Sdelphij c->tail_workitem = 0; 216293423Sdelphij c->head_workitem = c->workitems_alloc; 217293423Sdelphij c->workitems_alloc = new_alloc; 218293423Sdelphij } 219294554Sdelphij INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 220293423Sdelphij return (0 == slots_used); 221275970Scy} 222275970Scy 223293423Sdelphij/* -------------------------------------------------------------------- 224293423Sdelphij * Make sure there is an empty slot at the head of the response 225293423Sdelphij * queue. Tell if the queue is currently empty. 226293423Sdelphij */ 227293423Sdelphijstatic int 228275970Scyensure_workresp_empty_slot( 229275970Scy blocking_child *c 230275970Scy ) 231275970Scy{ 232293423Sdelphij /* 233293423Sdelphij ** !!! PRECONDITION: caller holds access lock! 234293423Sdelphij ** 235293423Sdelphij ** Works like the companion function above. 236293423Sdelphij */ 237293423Sdelphij 238293423Sdelphij static const size_t each = 239293423Sdelphij sizeof(blocking_children[0]->responses[0]); 240275970Scy 241293423Sdelphij size_t new_alloc; 242293423Sdelphij size_t slots_used; 243294554Sdelphij size_t sidx; 244275970Scy 245293423Sdelphij slots_used = c->head_response - c->tail_response; 246293423Sdelphij if (slots_used >= c->responses_alloc) { 247293423Sdelphij new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 248293423Sdelphij c->responses = erealloc(c->responses, new_alloc * each); 249294554Sdelphij for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 250294554Sdelphij c->responses[sidx] = NULL; 251293423Sdelphij c->tail_response = 0; 252293423Sdelphij c->head_response = c->responses_alloc; 253293423Sdelphij c->responses_alloc = new_alloc; 254293423Sdelphij } 255294554Sdelphij INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 256293423Sdelphij return (0 == slots_used); 257275970Scy} 258275970Scy 259275970Scy 260293423Sdelphij/* -------------------------------------------------------------------- 261275970Scy * queue_req_pointer() - append a work item or idle exit request to 262293423Sdelphij * blocking_workitems[]. Employ proper locking. 263275970Scy */ 264275970Scystatic int 265275970Scyqueue_req_pointer( 266275970Scy blocking_child * c, 267275970Scy blocking_pipe_header * hdr 268275970Scy ) 269275970Scy{ 270293423Sdelphij size_t qhead; 271293423Sdelphij 272293423Sdelphij /* >>>> ACCESS LOCKING STARTS >>>> */ 273293423Sdelphij wait_for_sem(c->accesslock, NULL); 274293423Sdelphij ensure_workitems_empty_slot(c); 275293423Sdelphij qhead = c->head_workitem; 276293423Sdelphij c->workitems[qhead % c->workitems_alloc] = hdr; 277293423Sdelphij c->head_workitem = 1 + qhead; 278293423Sdelphij tickle_sem(c->accesslock); 279293423Sdelphij /* <<<< ACCESS LOCKING ENDS <<<< */ 280275970Scy 281293423Sdelphij /* queue consumer wake-up notification */ 282293423Sdelphij tickle_sem(c->workitems_pending); 283275970Scy 284275970Scy return 0; 285275970Scy} 286275970Scy 287293423Sdelphij/* -------------------------------------------------------------------- 288293423Sdelphij * API function to make sure a worker is running, a proper private copy 289293423Sdelphij * of the data is made, the data eneterd into the queue and the worker 290293423Sdelphij * is signalled. 291293423Sdelphij */ 292275970Scyint 293275970Scysend_blocking_req_internal( 294275970Scy blocking_child * c, 295275970Scy blocking_pipe_header * hdr, 296275970Scy void * data 297275970Scy ) 298275970Scy{ 299275970Scy blocking_pipe_header * threadcopy; 300275970Scy size_t payload_octets; 301275970Scy 302275970Scy REQUIRE(hdr != NULL); 303275970Scy REQUIRE(data != NULL); 304275970Scy DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 305275970Scy 306275970Scy if (hdr->octets <= sizeof(*hdr)) 307275970Scy return 1; /* failure */ 308275970Scy payload_octets = hdr->octets - sizeof(*hdr); 309275970Scy 310293423Sdelphij if (NULL == c->thread_ref) 311275970Scy start_blocking_thread(c); 312275970Scy threadcopy = emalloc(hdr->octets); 313275970Scy memcpy(threadcopy, hdr, sizeof(*hdr)); 314275970Scy memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 315275970Scy 316275970Scy return queue_req_pointer(c, threadcopy); 317275970Scy} 318275970Scy 319293423Sdelphij/* -------------------------------------------------------------------- 320293423Sdelphij * Wait for the 'incoming queue no longer empty' signal, lock the shared 321293423Sdelphij * structure and dequeue an item. 322293423Sdelphij */ 323275970Scyblocking_pipe_header * 324275970Scyreceive_blocking_req_internal( 325275970Scy blocking_child * c 326275970Scy ) 327275970Scy{ 328275970Scy blocking_pipe_header * req; 329293423Sdelphij size_t qhead, qtail; 330275970Scy 331293423Sdelphij req = NULL; 332275970Scy do { 333293423Sdelphij /* wait for tickle from the producer side */ 334293423Sdelphij wait_for_sem(c->workitems_pending, NULL); 335275970Scy 336293423Sdelphij /* >>>> ACCESS LOCKING STARTS >>>> */ 337293423Sdelphij wait_for_sem(c->accesslock, NULL); 338293423Sdelphij qhead = c->head_workitem; 339293423Sdelphij do { 340293423Sdelphij qtail = c->tail_workitem; 341293423Sdelphij if (qhead == qtail) 342293423Sdelphij break; 343293423Sdelphij c->tail_workitem = qtail + 1; 344293423Sdelphij qtail %= c->workitems_alloc; 345293423Sdelphij req = c->workitems[qtail]; 346293423Sdelphij c->workitems[qtail] = NULL; 347293423Sdelphij } while (NULL == req); 348293423Sdelphij tickle_sem(c->accesslock); 349293423Sdelphij /* <<<< ACCESS LOCKING ENDS <<<< */ 350293423Sdelphij 351293423Sdelphij } while (NULL == req); 352293423Sdelphij 353275970Scy INSIST(NULL != req); 354275970Scy if (CHILD_EXIT_REQ == req) { /* idled out */ 355275970Scy send_blocking_resp_internal(c, CHILD_GONE_RESP); 356275970Scy req = NULL; 357275970Scy } 358275970Scy 359275970Scy return req; 360275970Scy} 361275970Scy 362293423Sdelphij/* -------------------------------------------------------------------- 363293423Sdelphij * Push a response into the return queue and eventually tickle the 364293423Sdelphij * receiver. 365293423Sdelphij */ 366275970Scyint 367275970Scysend_blocking_resp_internal( 368275970Scy blocking_child * c, 369275970Scy blocking_pipe_header * resp 370275970Scy ) 371275970Scy{ 372293423Sdelphij size_t qhead; 373293423Sdelphij int empty; 374293423Sdelphij 375293423Sdelphij /* >>>> ACCESS LOCKING STARTS >>>> */ 376293423Sdelphij wait_for_sem(c->accesslock, NULL); 377293423Sdelphij empty = ensure_workresp_empty_slot(c); 378293423Sdelphij qhead = c->head_response; 379293423Sdelphij c->responses[qhead % c->responses_alloc] = resp; 380293423Sdelphij c->head_response = 1 + qhead; 381293423Sdelphij tickle_sem(c->accesslock); 382293423Sdelphij /* <<<< ACCESS LOCKING ENDS <<<< */ 383275970Scy 384293423Sdelphij /* queue consumer wake-up notification */ 385293423Sdelphij if (empty) 386293423Sdelphij { 387293423Sdelphij# ifdef WORK_PIPE 388338530Sdelphij if (1 != write(c->resp_write_pipe, "", 1)) 389338530Sdelphij msyslog(LOG_WARNING, "async resolver: %s", 390338530Sdelphij "failed to notify main thread!"); 391293423Sdelphij# else 392293423Sdelphij tickle_sem(c->responses_pending); 393293423Sdelphij# endif 394293423Sdelphij } 395275970Scy return 0; 396275970Scy} 397275970Scy 398275970Scy 399275970Scy#ifndef WORK_PIPE 400293423Sdelphij 401293423Sdelphij/* -------------------------------------------------------------------- 402293423Sdelphij * Check if a (Windows-)hanndle to a semaphore is actually the same we 403293423Sdelphij * are using inside the sema wrapper. 404293423Sdelphij */ 405293423Sdelphijstatic BOOL 406293423Sdelphijsame_os_sema( 407293423Sdelphij const sem_ref obj, 408293423Sdelphij void* osh 409293423Sdelphij ) 410293423Sdelphij{ 411293423Sdelphij return obj && osh && (obj->shnd == (HANDLE)osh); 412293423Sdelphij} 413293423Sdelphij 414293423Sdelphij/* -------------------------------------------------------------------- 415293423Sdelphij * Find the shared context that associates to an OS handle and make sure 416293423Sdelphij * the data is dequeued and processed. 417293423Sdelphij */ 418275970Scyvoid 419275970Scyhandle_blocking_resp_sem( 420275970Scy void * context 421275970Scy ) 422275970Scy{ 423275970Scy blocking_child * c; 424275970Scy u_int idx; 425275970Scy 426275970Scy c = NULL; 427275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 428275970Scy c = blocking_children[idx]; 429293423Sdelphij if (c != NULL && 430293423Sdelphij c->thread_ref != NULL && 431293423Sdelphij same_os_sema(c->responses_pending, context)) 432275970Scy break; 433275970Scy } 434275970Scy if (idx < blocking_children_alloc) 435275970Scy process_blocking_resp(c); 436275970Scy} 437275970Scy#endif /* !WORK_PIPE */ 438275970Scy 439293423Sdelphij/* -------------------------------------------------------------------- 440293423Sdelphij * Fetch the next response from the return queue. In case of signalling 441293423Sdelphij * via pipe, make sure the pipe is flushed, too. 442293423Sdelphij */ 443275970Scyblocking_pipe_header * 444275970Scyreceive_blocking_resp_internal( 445275970Scy blocking_child * c 446275970Scy ) 447275970Scy{ 448275970Scy blocking_pipe_header * removed; 449293423Sdelphij size_t qhead, qtail, slot; 450293423Sdelphij 451275970Scy#ifdef WORK_PIPE 452275970Scy int rc; 453275970Scy char scratch[32]; 454275970Scy 455293423Sdelphij do 456275970Scy rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 457293423Sdelphij while (-1 == rc && EINTR == errno); 458275970Scy#endif 459293423Sdelphij 460293423Sdelphij /* >>>> ACCESS LOCKING STARTS >>>> */ 461293423Sdelphij wait_for_sem(c->accesslock, NULL); 462293423Sdelphij qhead = c->head_response; 463293423Sdelphij qtail = c->tail_response; 464293423Sdelphij for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 465293423Sdelphij slot = qtail % c->responses_alloc; 466293423Sdelphij removed = c->responses[slot]; 467293423Sdelphij c->responses[slot] = NULL; 468293423Sdelphij } 469293423Sdelphij c->tail_response = qtail; 470293423Sdelphij tickle_sem(c->accesslock); 471293423Sdelphij /* <<<< ACCESS LOCKING ENDS <<<< */ 472293423Sdelphij 473275970Scy if (NULL != removed) { 474275970Scy DEBUG_ENSURE(CHILD_GONE_RESP == removed || 475275970Scy BLOCKING_RESP_MAGIC == removed->magic_sig); 476275970Scy } 477275970Scy if (CHILD_GONE_RESP == removed) { 478275970Scy cleanup_after_child(c); 479275970Scy removed = NULL; 480275970Scy } 481275970Scy 482275970Scy return removed; 483275970Scy} 484275970Scy 485293423Sdelphij/* -------------------------------------------------------------------- 486293423Sdelphij * Light up a new worker. 487293423Sdelphij */ 488275970Scystatic void 489275970Scystart_blocking_thread( 490275970Scy blocking_child * c 491275970Scy ) 492275970Scy{ 493275970Scy 494275970Scy DEBUG_INSIST(!c->reusable); 495275970Scy 496275970Scy prepare_child_sems(c); 497275970Scy start_blocking_thread_internal(c); 498275970Scy} 499275970Scy 500293423Sdelphij/* -------------------------------------------------------------------- 501293423Sdelphij * Create a worker thread. There are several differences between POSIX 502293423Sdelphij * and Windows, of course -- most notably the Windows thread is no 503293423Sdelphij * detached thread, and we keep the handle around until we want to get 504293423Sdelphij * rid of the thread. The notification scheme also differs: Windows 505293423Sdelphij * makes use of semaphores in both directions, POSIX uses a pipe for 506293423Sdelphij * integration with 'select()' or alike. 507293423Sdelphij */ 508275970Scystatic void 509275970Scystart_blocking_thread_internal( 510275970Scy blocking_child * c 511275970Scy ) 512275970Scy#ifdef SYS_WINNT 513275970Scy{ 514275970Scy BOOL resumed; 515275970Scy 516293423Sdelphij c->thread_ref = NULL; 517293423Sdelphij (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 518293423Sdelphij c->thr_table[0].thnd = 519275970Scy (HANDLE)_beginthreadex( 520275970Scy NULL, 521275970Scy 0, 522275970Scy &blocking_thread, 523275970Scy c, 524275970Scy CREATE_SUSPENDED, 525293423Sdelphij NULL); 526275970Scy 527293423Sdelphij if (NULL == c->thr_table[0].thnd) { 528275970Scy msyslog(LOG_ERR, "start blocking thread failed: %m"); 529275970Scy exit(-1); 530275970Scy } 531275970Scy /* remember the thread priority is only within the process class */ 532293423Sdelphij if (!SetThreadPriority(c->thr_table[0].thnd, 533275970Scy THREAD_PRIORITY_BELOW_NORMAL)) 534275970Scy msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 535275970Scy 536293423Sdelphij resumed = ResumeThread(c->thr_table[0].thnd); 537275970Scy DEBUG_INSIST(resumed); 538293423Sdelphij c->thread_ref = &c->thr_table[0]; 539275970Scy} 540275970Scy#else /* pthreads start_blocking_thread_internal() follows */ 541275970Scy{ 542275970Scy# ifdef NEED_PTHREAD_INIT 543275970Scy static int pthread_init_called; 544275970Scy# endif 545275970Scy pthread_attr_t thr_attr; 546275970Scy int rc; 547275970Scy int pipe_ends[2]; /* read then write */ 548275970Scy int is_pipe; 549275970Scy int flags; 550294554Sdelphij size_t ostacksize; 551294554Sdelphij size_t nstacksize; 552275970Scy sigset_t saved_sig_mask; 553275970Scy 554293423Sdelphij c->thread_ref = NULL; 555293423Sdelphij 556275970Scy# ifdef NEED_PTHREAD_INIT 557275970Scy /* 558275970Scy * from lib/isc/unix/app.c: 559275970Scy * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 560275970Scy */ 561275970Scy if (!pthread_init_called) { 562275970Scy pthread_init(); 563275970Scy pthread_init_called = TRUE; 564275970Scy } 565275970Scy# endif 566275970Scy 567275970Scy rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 568275970Scy if (0 != rc) { 569275970Scy msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 570275970Scy exit(1); 571275970Scy } 572275970Scy c->resp_read_pipe = move_fd(pipe_ends[0]); 573275970Scy c->resp_write_pipe = move_fd(pipe_ends[1]); 574275970Scy c->ispipe = is_pipe; 575275970Scy flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 576275970Scy if (-1 == flags) { 577275970Scy msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 578275970Scy exit(1); 579275970Scy } 580275970Scy rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 581275970Scy if (-1 == rc) { 582275970Scy msyslog(LOG_ERR, 583275970Scy "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 584275970Scy exit(1); 585275970Scy } 586275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 587275970Scy pthread_attr_init(&thr_attr); 588275970Scy pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 589275970Scy#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 590275970Scy defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 591294554Sdelphij rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 592294554Sdelphij if (0 != rc) { 593275970Scy msyslog(LOG_ERR, 594294554Sdelphij "start_blocking_thread: pthread_attr_getstacksize() -> %s", 595294554Sdelphij strerror(rc)); 596294554Sdelphij } else { 597294554Sdelphij if (ostacksize < THREAD_MINSTACKSIZE) 598294554Sdelphij nstacksize = THREAD_MINSTACKSIZE; 599294554Sdelphij else if (ostacksize > THREAD_MAXSTACKSIZE) 600294554Sdelphij nstacksize = THREAD_MAXSTACKSIZE; 601294554Sdelphij else 602294554Sdelphij nstacksize = ostacksize; 603294554Sdelphij if (nstacksize != ostacksize) 604294554Sdelphij rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 605294554Sdelphij if (0 != rc) 606275970Scy msyslog(LOG_ERR, 607294554Sdelphij "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 608294554Sdelphij (u_long)ostacksize, (u_long)nstacksize, 609294554Sdelphij strerror(rc)); 610275970Scy } 611275970Scy#else 612294554Sdelphij UNUSED_ARG(nstacksize); 613294554Sdelphij UNUSED_ARG(ostacksize); 614275970Scy#endif 615275970Scy#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 616275970Scy pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 617275970Scy#endif 618275970Scy c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 619275970Scy block_thread_signals(&saved_sig_mask); 620293423Sdelphij rc = pthread_create(&c->thr_table[0], &thr_attr, 621275970Scy &blocking_thread, c); 622275970Scy pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 623275970Scy pthread_attr_destroy(&thr_attr); 624275970Scy if (0 != rc) { 625294554Sdelphij msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 626294554Sdelphij strerror(rc)); 627275970Scy exit(1); 628275970Scy } 629293423Sdelphij c->thread_ref = &c->thr_table[0]; 630275970Scy} 631275970Scy#endif 632275970Scy 633293423Sdelphij/* -------------------------------------------------------------------- 634275970Scy * block_thread_signals() 635275970Scy * 636275970Scy * Temporarily block signals used by ntpd main thread, so that signal 637275970Scy * mask inherited by child threads leaves them blocked. Returns prior 638275970Scy * active signal mask via pmask, to be restored by the main thread 639275970Scy * after pthread_create(). 640275970Scy */ 641275970Scy#ifndef SYS_WINNT 642275970Scyvoid 643275970Scyblock_thread_signals( 644275970Scy sigset_t * pmask 645275970Scy ) 646275970Scy{ 647275970Scy sigset_t block; 648275970Scy 649275970Scy sigemptyset(&block); 650275970Scy# ifdef HAVE_SIGNALED_IO 651275970Scy# ifdef SIGIO 652275970Scy sigaddset(&block, SIGIO); 653275970Scy# endif 654275970Scy# ifdef SIGPOLL 655275970Scy sigaddset(&block, SIGPOLL); 656275970Scy# endif 657275970Scy# endif /* HAVE_SIGNALED_IO */ 658275970Scy sigaddset(&block, SIGALRM); 659275970Scy sigaddset(&block, MOREDEBUGSIG); 660275970Scy sigaddset(&block, LESSDEBUGSIG); 661275970Scy# ifdef SIGDIE1 662275970Scy sigaddset(&block, SIGDIE1); 663275970Scy# endif 664275970Scy# ifdef SIGDIE2 665275970Scy sigaddset(&block, SIGDIE2); 666275970Scy# endif 667275970Scy# ifdef SIGDIE3 668275970Scy sigaddset(&block, SIGDIE3); 669275970Scy# endif 670275970Scy# ifdef SIGDIE4 671275970Scy sigaddset(&block, SIGDIE4); 672275970Scy# endif 673275970Scy# ifdef SIGBUS 674275970Scy sigaddset(&block, SIGBUS); 675275970Scy# endif 676275970Scy sigemptyset(pmask); 677275970Scy pthread_sigmask(SIG_BLOCK, &block, pmask); 678275970Scy} 679275970Scy#endif /* !SYS_WINNT */ 680275970Scy 681275970Scy 682293423Sdelphij/* -------------------------------------------------------------------- 683293423Sdelphij * Create & destroy semaphores. This is sufficiently different between 684293423Sdelphij * POSIX and Windows to warrant wrapper functions and close enough to 685293423Sdelphij * use the concept of synchronization via semaphore for all platforms. 686293423Sdelphij */ 687293423Sdelphijstatic sem_ref 688293423Sdelphijcreate_sema( 689293423Sdelphij sema_type* semptr, 690293423Sdelphij u_int inival, 691293423Sdelphij u_int maxval) 692293423Sdelphij{ 693293423Sdelphij#ifdef SYS_WINNT 694293423Sdelphij 695293423Sdelphij long svini, svmax; 696293423Sdelphij if (NULL != semptr) { 697293423Sdelphij svini = (inival < LONG_MAX) 698293423Sdelphij ? (long)inival : LONG_MAX; 699293423Sdelphij svmax = (maxval < LONG_MAX && maxval > 0) 700293423Sdelphij ? (long)maxval : LONG_MAX; 701293423Sdelphij semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 702293423Sdelphij if (NULL == semptr->shnd) 703293423Sdelphij semptr = NULL; 704293423Sdelphij } 705293423Sdelphij 706293423Sdelphij#else 707293423Sdelphij 708293423Sdelphij (void)maxval; 709293423Sdelphij if (semptr && sem_init(semptr, FALSE, inival)) 710293423Sdelphij semptr = NULL; 711293423Sdelphij 712293423Sdelphij#endif 713293423Sdelphij 714293423Sdelphij return semptr; 715293423Sdelphij} 716293423Sdelphij 717293423Sdelphij/* ------------------------------------------------------------------ */ 718293423Sdelphijstatic sem_ref 719293423Sdelphijdelete_sema( 720293423Sdelphij sem_ref obj) 721293423Sdelphij{ 722293423Sdelphij 723293423Sdelphij# ifdef SYS_WINNT 724293423Sdelphij 725293423Sdelphij if (obj) { 726293423Sdelphij if (obj->shnd) 727293423Sdelphij CloseHandle(obj->shnd); 728293423Sdelphij obj->shnd = NULL; 729293423Sdelphij } 730293423Sdelphij 731293423Sdelphij# else 732293423Sdelphij 733293423Sdelphij if (obj) 734293423Sdelphij sem_destroy(obj); 735293423Sdelphij 736293423Sdelphij# endif 737293423Sdelphij 738293423Sdelphij return NULL; 739293423Sdelphij} 740293423Sdelphij 741293423Sdelphij/* -------------------------------------------------------------------- 742275970Scy * prepare_child_sems() 743275970Scy * 744293423Sdelphij * create sync & access semaphores 745275970Scy * 746293423Sdelphij * All semaphores are cleared, only the access semaphore has 1 unit. 747293423Sdelphij * Childs wait on 'workitems_pending', then grabs 'sema_access' 748293423Sdelphij * and dequeues jobs. When done, 'sema_access' is given one unit back. 749293423Sdelphij * 750293423Sdelphij * The producer grabs 'sema_access', manages the queue, restores 751293423Sdelphij * 'sema_access' and puts one unit into 'workitems_pending'. 752293423Sdelphij * 753293423Sdelphij * The story goes the same for the response queue. 754275970Scy */ 755275970Scystatic void 756275970Scyprepare_child_sems( 757275970Scy blocking_child *c 758275970Scy ) 759275970Scy{ 760298695Sdelphij if (NULL == worker_memlock) 761298695Sdelphij worker_memlock = create_sema(&worker_mmutex, 1, 1); 762298695Sdelphij 763293423Sdelphij c->accesslock = create_sema(&c->sem_table[0], 1, 1); 764293423Sdelphij c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 765293423Sdelphij c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 766293423Sdelphij# ifndef WORK_PIPE 767293423Sdelphij c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 768293423Sdelphij# endif 769275970Scy} 770275970Scy 771293423Sdelphij/* -------------------------------------------------------------------- 772293423Sdelphij * wait for semaphore. Where the wait can be interrupted, it will 773293423Sdelphij * internally resume -- When this function returns, there is either no 774293423Sdelphij * semaphore at all, a timeout occurred, or the caller could 775293423Sdelphij * successfully take a token from the semaphore. 776293423Sdelphij * 777293423Sdelphij * For untimed wait, not checking the result of this function at all is 778293423Sdelphij * definitely an option. 779293423Sdelphij */ 780275970Scystatic int 781275970Scywait_for_sem( 782275970Scy sem_ref sem, 783275970Scy struct timespec * timeout /* wall-clock */ 784275970Scy ) 785275970Scy#ifdef SYS_WINNT 786275970Scy{ 787275970Scy struct timespec now; 788275970Scy struct timespec delta; 789275970Scy DWORD msec; 790275970Scy DWORD rc; 791275970Scy 792293423Sdelphij if (!(sem && sem->shnd)) { 793293423Sdelphij errno = EINVAL; 794293423Sdelphij return -1; 795293423Sdelphij } 796293423Sdelphij 797275970Scy if (NULL == timeout) { 798275970Scy msec = INFINITE; 799275970Scy } else { 800275970Scy getclock(TIMEOFDAY, &now); 801275970Scy delta = sub_tspec(*timeout, now); 802275970Scy if (delta.tv_sec < 0) { 803275970Scy msec = 0; 804275970Scy } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 805275970Scy msec = INFINITE; 806275970Scy } else { 807275970Scy msec = 1000 * (DWORD)delta.tv_sec; 808275970Scy msec += delta.tv_nsec / (1000 * 1000); 809275970Scy } 810275970Scy } 811293423Sdelphij rc = WaitForSingleObject(sem->shnd, msec); 812275970Scy if (WAIT_OBJECT_0 == rc) 813275970Scy return 0; 814275970Scy if (WAIT_TIMEOUT == rc) { 815275970Scy errno = ETIMEDOUT; 816275970Scy return -1; 817275970Scy } 818275970Scy msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 819275970Scy errno = EFAULT; 820275970Scy return -1; 821275970Scy} 822275970Scy#else /* pthreads wait_for_sem() follows */ 823275970Scy{ 824293423Sdelphij int rc = -1; 825275970Scy 826293423Sdelphij if (sem) do { 827293423Sdelphij if (NULL == timeout) 828293423Sdelphij rc = sem_wait(sem); 829293423Sdelphij else 830293423Sdelphij rc = sem_timedwait(sem, timeout); 831293423Sdelphij } while (rc == -1 && errno == EINTR); 832275970Scy else 833293423Sdelphij errno = EINVAL; 834293423Sdelphij 835275970Scy return rc; 836275970Scy} 837275970Scy#endif 838275970Scy 839293423Sdelphij/* -------------------------------------------------------------------- 840293423Sdelphij * blocking_thread - thread functions have WINAPI (aka 'stdcall') 841293423Sdelphij * calling conventions under Windows and POSIX-defined signature 842293423Sdelphij * otherwise. 843275970Scy */ 844275970Scy#ifdef SYS_WINNT 845293423Sdelphiju_int WINAPI 846275970Scy#else 847275970Scyvoid * 848275970Scy#endif 849275970Scyblocking_thread( 850275970Scy void * ThreadArg 851275970Scy ) 852275970Scy{ 853275970Scy blocking_child *c; 854275970Scy 855275970Scy c = ThreadArg; 856275970Scy exit_worker(blocking_child_common(c)); 857275970Scy 858275970Scy /* NOTREACHED */ 859275970Scy return 0; 860275970Scy} 861275970Scy 862293423Sdelphij/* -------------------------------------------------------------------- 863275970Scy * req_child_exit() runs in the parent. 864293423Sdelphij * 865293423Sdelphij * This function is called from from the idle timer, too, and possibly 866293423Sdelphij * without a thread being there any longer. Since we have folded up our 867293423Sdelphij * tent in that case and all the semaphores are already gone, we simply 868293423Sdelphij * ignore this request in this case. 869293423Sdelphij * 870293423Sdelphij * Since the existence of the semaphores is controlled exclusively by 871293423Sdelphij * the parent, there's no risk of data race here. 872275970Scy */ 873275970Scyint 874275970Scyreq_child_exit( 875275970Scy blocking_child *c 876275970Scy ) 877275970Scy{ 878293423Sdelphij return (c->accesslock) 879293423Sdelphij ? queue_req_pointer(c, CHILD_EXIT_REQ) 880293423Sdelphij : 0; 881275970Scy} 882275970Scy 883293423Sdelphij/* -------------------------------------------------------------------- 884275970Scy * cleanup_after_child() runs in parent. 885275970Scy */ 886275970Scystatic void 887275970Scycleanup_after_child( 888275970Scy blocking_child * c 889275970Scy ) 890275970Scy{ 891275970Scy DEBUG_INSIST(!c->reusable); 892293423Sdelphij 893293423Sdelphij# ifdef SYS_WINNT 894293423Sdelphij /* The thread was not created in detached state, so we better 895293423Sdelphij * clean up. 896293423Sdelphij */ 897293423Sdelphij if (c->thread_ref && c->thread_ref->thnd) { 898293423Sdelphij WaitForSingleObject(c->thread_ref->thnd, INFINITE); 899293423Sdelphij INSIST(CloseHandle(c->thread_ref->thnd)); 900293423Sdelphij c->thread_ref->thnd = NULL; 901293423Sdelphij } 902293423Sdelphij# endif 903275970Scy c->thread_ref = NULL; 904293423Sdelphij 905293423Sdelphij /* remove semaphores and (if signalling vi IO) pipes */ 906293423Sdelphij 907293423Sdelphij c->accesslock = delete_sema(c->accesslock); 908293423Sdelphij c->workitems_pending = delete_sema(c->workitems_pending); 909293423Sdelphij c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 910293423Sdelphij 911293423Sdelphij# ifdef WORK_PIPE 912275970Scy DEBUG_INSIST(-1 != c->resp_read_pipe); 913275970Scy DEBUG_INSIST(-1 != c->resp_write_pipe); 914275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 915275970Scy close(c->resp_write_pipe); 916275970Scy close(c->resp_read_pipe); 917275970Scy c->resp_write_pipe = -1; 918275970Scy c->resp_read_pipe = -1; 919293423Sdelphij# else 920293423Sdelphij DEBUG_INSIST(NULL != c->responses_pending); 921293423Sdelphij (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 922293423Sdelphij c->responses_pending = delete_sema(c->responses_pending); 923293423Sdelphij# endif 924293423Sdelphij 925293423Sdelphij /* Is it necessary to check if there are pending requests and 926293423Sdelphij * responses? If so, and if there are, what to do with them? 927293423Sdelphij */ 928293423Sdelphij 929293423Sdelphij /* re-init buffer index sequencers */ 930293423Sdelphij c->head_workitem = 0; 931293423Sdelphij c->tail_workitem = 0; 932293423Sdelphij c->head_response = 0; 933293423Sdelphij c->tail_response = 0; 934293423Sdelphij 935275970Scy c->reusable = TRUE; 936275970Scy} 937275970Scy 938275970Scy 939275970Scy#else /* !WORK_THREAD follows */ 940275970Scychar work_thread_nonempty_compilation_unit; 941275970Scy#endif 942