1275970Scy/* 2275970Scy * work_thread.c - threads implementation for blocking worker child. 3275970Scy */ 4275970Scy#include <config.h> 5275970Scy#include "ntp_workimpl.h" 6275970Scy 7275970Scy#ifdef WORK_THREAD 8275970Scy 9275970Scy#include <stdio.h> 10275970Scy#include <ctype.h> 11275970Scy#include <signal.h> 12275970Scy#ifndef SYS_WINNT 13275970Scy#include <pthread.h> 14275970Scy#endif 15275970Scy 16275970Scy#include "ntp_stdlib.h" 17275970Scy#include "ntp_malloc.h" 18275970Scy#include "ntp_syslog.h" 19275970Scy#include "ntpd.h" 20275970Scy#include "ntp_io.h" 21275970Scy#include "ntp_assert.h" 22275970Scy#include "ntp_unixtime.h" 23275970Scy#include "timespecops.h" 24275970Scy#include "ntp_worker.h" 25275970Scy 26275970Scy#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27275970Scy#define CHILD_GONE_RESP CHILD_EXIT_REQ 28294569Sdelphij/* Queue size increments: 29294569Sdelphij * The request queue grows a bit faster than the response queue -- the 30330141Sdelphij * daemon can push requests and pull results faster on avarage than the 31294569Sdelphij * worker can process requests and push results... If this really pays 32294569Sdelphij * off is debatable. 33294569Sdelphij */ 34275970Scy#define WORKITEMS_ALLOC_INC 16 35275970Scy#define RESPONSES_ALLOC_INC 4 36275970Scy 37294569Sdelphij/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 38294569Sdelphij * set the maximum to 256kB. If the minimum goes below the 39294569Sdelphij * system-defined minimum stack size, we have to adjust accordingly. 40294569Sdelphij */ 41275970Scy#ifndef THREAD_MINSTACKSIZE 42294569Sdelphij# define THREAD_MINSTACKSIZE (64U * 1024) 43275970Scy#endif 44294569Sdelphij#ifndef __sun 45294569Sdelphij#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN 46294569Sdelphij# undef THREAD_MINSTACKSIZE 47294569Sdelphij# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN 48294569Sdelphij#endif 49294569Sdelphij#endif 50275970Scy 51294569Sdelphij#ifndef THREAD_MAXSTACKSIZE 52294569Sdelphij# define THREAD_MAXSTACKSIZE (256U * 1024) 53294569Sdelphij#endif 54294569Sdelphij#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE 55294569Sdelphij# undef THREAD_MAXSTACKSIZE 56294569Sdelphij# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE 57294569Sdelphij#endif 58294569Sdelphij 59338531Sdelphij/* need a good integer to store a pointer... */ 60338531Sdelphij#ifndef UINTPTR_T 61338531Sdelphij# if defined(UINTPTR_MAX) 62338531Sdelphij# define UINTPTR_T uintptr_t 63338531Sdelphij# elif defined(UINT_PTR) 64338531Sdelphij# define UINTPTR_T UINT_PTR 65338531Sdelphij# else 66338531Sdelphij# define UINTPTR_T size_t 67338531Sdelphij# endif 68338531Sdelphij#endif 69294569Sdelphij 70338531Sdelphij 71293650Sglebius#ifdef SYS_WINNT 72275970Scy 73275970Scy# define thread_exit(c) _endthreadex(c) 74293650Sglebius# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 75293650Sglebiusu_int WINAPI blocking_thread(void *); 76293650Sglebiusstatic BOOL same_os_sema(const sem_ref obj, void * osobj); 77293650Sglebius 78275970Scy#else 79293650Sglebius 80338531Sdelphij# define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c)) 81275970Scy# define tickle_sem sem_post 82293650Sglebiusvoid * blocking_thread(void *); 83293650Sglebiusstatic void block_thread_signals(sigset_t *); 84293650Sglebius 85275970Scy#endif 86275970Scy 87275970Scy#ifdef WORK_PIPE 88275970Scyaddremove_io_fd_func addremove_io_fd; 89275970Scy#else 90275970Scyaddremove_io_semaphore_func addremove_io_semaphore; 91275970Scy#endif 92275970Scy 93275970Scystatic void start_blocking_thread(blocking_child *); 94275970Scystatic void start_blocking_thread_internal(blocking_child *); 95275970Scystatic void prepare_child_sems(blocking_child *); 96275970Scystatic int wait_for_sem(sem_ref, struct timespec *); 97293650Sglebiusstatic int ensure_workitems_empty_slot(blocking_child *); 98293650Sglebiusstatic int ensure_workresp_empty_slot(blocking_child *); 99275970Scystatic int queue_req_pointer(blocking_child *, blocking_pipe_header *); 100275970Scystatic void cleanup_after_child(blocking_child *); 101275970Scy 102298699Sdelphijstatic sema_type worker_mmutex; 103298699Sdelphijstatic sem_ref worker_memlock; 104275970Scy 105298699Sdelphij/* -------------------------------------------------------------------- 106298699Sdelphij * locking the global worker state table (and other global stuff) 107298699Sdelphij */ 108275970Scyvoid 109298699Sdelphijworker_global_lock( 110298699Sdelphij int inOrOut) 111298699Sdelphij{ 112298699Sdelphij if (worker_memlock) { 113298699Sdelphij if (inOrOut) 114298699Sdelphij wait_for_sem(worker_memlock, NULL); 115298699Sdelphij else 116298699Sdelphij tickle_sem(worker_memlock); 117298699Sdelphij } 118298699Sdelphij} 119298699Sdelphij 120298699Sdelphij/* -------------------------------------------------------------------- 121298699Sdelphij * implementation isolation wrapper 122298699Sdelphij */ 123298699Sdelphijvoid 124275970Scyexit_worker( 125275970Scy int exitcode 126275970Scy ) 127275970Scy{ 128275970Scy thread_exit(exitcode); /* see #define thread_exit */ 129275970Scy} 130275970Scy 131293650Sglebius/* -------------------------------------------------------------------- 132293650Sglebius * sleep for a given time or until the wakup semaphore is tickled. 133293650Sglebius */ 134275970Scyint 135275970Scyworker_sleep( 136275970Scy blocking_child * c, 137275970Scy time_t seconds 138275970Scy ) 139275970Scy{ 140275970Scy struct timespec until; 141275970Scy int rc; 142275970Scy 143275970Scy# ifdef HAVE_CLOCK_GETTIME 144275970Scy if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 145275970Scy msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 146275970Scy return -1; 147275970Scy } 148275970Scy# else 149275970Scy if (0 != getclock(TIMEOFDAY, &until)) { 150275970Scy msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 151275970Scy return -1; 152275970Scy } 153275970Scy# endif 154275970Scy until.tv_sec += seconds; 155293650Sglebius rc = wait_for_sem(c->wake_scheduled_sleep, &until); 156275970Scy if (0 == rc) 157275970Scy return -1; 158275970Scy if (-1 == rc && ETIMEDOUT == errno) 159275970Scy return 0; 160275970Scy msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 161275970Scy return -1; 162275970Scy} 163275970Scy 164275970Scy 165293650Sglebius/* -------------------------------------------------------------------- 166293650Sglebius * Wake up a worker that takes a nap. 167293650Sglebius */ 168275970Scyvoid 169275970Scyinterrupt_worker_sleep(void) 170275970Scy{ 171275970Scy u_int idx; 172275970Scy blocking_child * c; 173275970Scy 174275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 175275970Scy c = blocking_children[idx]; 176275970Scy if (NULL == c || NULL == c->wake_scheduled_sleep) 177275970Scy continue; 178275970Scy tickle_sem(c->wake_scheduled_sleep); 179275970Scy } 180275970Scy} 181275970Scy 182293650Sglebius/* -------------------------------------------------------------------- 183293650Sglebius * Make sure there is an empty slot at the head of the request 184293650Sglebius * queue. Tell if the queue is currently empty. 185293650Sglebius */ 186293650Sglebiusstatic int 187275970Scyensure_workitems_empty_slot( 188275970Scy blocking_child *c 189275970Scy ) 190275970Scy{ 191293650Sglebius /* 192293650Sglebius ** !!! PRECONDITION: caller holds access lock! 193293650Sglebius ** 194293650Sglebius ** This simply tries to increase the size of the buffer if it 195293650Sglebius ** becomes full. The resize operation does *not* maintain the 196293650Sglebius ** order of requests, but that should be irrelevant since the 197293650Sglebius ** processing is considered asynchronous anyway. 198293650Sglebius ** 199293650Sglebius ** Return if the buffer is currently empty. 200293650Sglebius */ 201293650Sglebius 202293650Sglebius static const size_t each = 203293650Sglebius sizeof(blocking_children[0]->workitems[0]); 204275970Scy 205293650Sglebius size_t new_alloc; 206293650Sglebius size_t slots_used; 207294569Sdelphij size_t sidx; 208275970Scy 209293650Sglebius slots_used = c->head_workitem - c->tail_workitem; 210293650Sglebius if (slots_used >= c->workitems_alloc) { 211293650Sglebius new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 212293650Sglebius c->workitems = erealloc(c->workitems, new_alloc * each); 213294569Sdelphij for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 214294569Sdelphij c->workitems[sidx] = NULL; 215293650Sglebius c->tail_workitem = 0; 216293650Sglebius c->head_workitem = c->workitems_alloc; 217293650Sglebius c->workitems_alloc = new_alloc; 218293650Sglebius } 219294569Sdelphij INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 220293650Sglebius return (0 == slots_used); 221275970Scy} 222275970Scy 223293650Sglebius/* -------------------------------------------------------------------- 224293650Sglebius * Make sure there is an empty slot at the head of the response 225293650Sglebius * queue. Tell if the queue is currently empty. 226293650Sglebius */ 227293650Sglebiusstatic int 228275970Scyensure_workresp_empty_slot( 229275970Scy blocking_child *c 230275970Scy ) 231275970Scy{ 232293650Sglebius /* 233293650Sglebius ** !!! PRECONDITION: caller holds access lock! 234293650Sglebius ** 235293650Sglebius ** Works like the companion function above. 236293650Sglebius */ 237293650Sglebius 238293650Sglebius static const size_t each = 239293650Sglebius sizeof(blocking_children[0]->responses[0]); 240275970Scy 241293650Sglebius size_t new_alloc; 242293650Sglebius size_t slots_used; 243294569Sdelphij size_t sidx; 244275970Scy 245293650Sglebius slots_used = c->head_response - c->tail_response; 246293650Sglebius if (slots_used >= c->responses_alloc) { 247293650Sglebius new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 248293650Sglebius c->responses = erealloc(c->responses, new_alloc * each); 249294569Sdelphij for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 250294569Sdelphij c->responses[sidx] = NULL; 251293650Sglebius c->tail_response = 0; 252293650Sglebius c->head_response = c->responses_alloc; 253293650Sglebius c->responses_alloc = new_alloc; 254293650Sglebius } 255294569Sdelphij INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 256293650Sglebius return (0 == slots_used); 257275970Scy} 258275970Scy 259275970Scy 260293650Sglebius/* -------------------------------------------------------------------- 261275970Scy * queue_req_pointer() - append a work item or idle exit request to 262293650Sglebius * blocking_workitems[]. Employ proper locking. 263275970Scy */ 264275970Scystatic int 265275970Scyqueue_req_pointer( 266275970Scy blocking_child * c, 267275970Scy blocking_pipe_header * hdr 268275970Scy ) 269275970Scy{ 270293650Sglebius size_t qhead; 271293650Sglebius 272293650Sglebius /* >>>> ACCESS LOCKING STARTS >>>> */ 273293650Sglebius wait_for_sem(c->accesslock, NULL); 274293650Sglebius ensure_workitems_empty_slot(c); 275293650Sglebius qhead = c->head_workitem; 276293650Sglebius c->workitems[qhead % c->workitems_alloc] = hdr; 277293650Sglebius c->head_workitem = 1 + qhead; 278293650Sglebius tickle_sem(c->accesslock); 279293650Sglebius /* <<<< ACCESS LOCKING ENDS <<<< */ 280275970Scy 281293650Sglebius /* queue consumer wake-up notification */ 282293650Sglebius tickle_sem(c->workitems_pending); 283275970Scy 284275970Scy return 0; 285275970Scy} 286275970Scy 287293650Sglebius/* -------------------------------------------------------------------- 288293650Sglebius * API function to make sure a worker is running, a proper private copy 289293650Sglebius * of the data is made, the data eneterd into the queue and the worker 290293650Sglebius * is signalled. 291293650Sglebius */ 292275970Scyint 293275970Scysend_blocking_req_internal( 294275970Scy blocking_child * c, 295275970Scy blocking_pipe_header * hdr, 296275970Scy void * data 297275970Scy ) 298275970Scy{ 299275970Scy blocking_pipe_header * threadcopy; 300275970Scy size_t payload_octets; 301275970Scy 302275970Scy REQUIRE(hdr != NULL); 303275970Scy REQUIRE(data != NULL); 304275970Scy DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 305275970Scy 306275970Scy if (hdr->octets <= sizeof(*hdr)) 307275970Scy return 1; /* failure */ 308275970Scy payload_octets = hdr->octets - sizeof(*hdr); 309275970Scy 310293650Sglebius if (NULL == c->thread_ref) 311275970Scy start_blocking_thread(c); 312275970Scy threadcopy = emalloc(hdr->octets); 313275970Scy memcpy(threadcopy, hdr, sizeof(*hdr)); 314275970Scy memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 315275970Scy 316275970Scy return queue_req_pointer(c, threadcopy); 317275970Scy} 318275970Scy 319293650Sglebius/* -------------------------------------------------------------------- 320293650Sglebius * Wait for the 'incoming queue no longer empty' signal, lock the shared 321293650Sglebius * structure and dequeue an item. 322293650Sglebius */ 323275970Scyblocking_pipe_header * 324275970Scyreceive_blocking_req_internal( 325275970Scy blocking_child * c 326275970Scy ) 327275970Scy{ 328275970Scy blocking_pipe_header * req; 329293650Sglebius size_t qhead, qtail; 330275970Scy 331293650Sglebius req = NULL; 332275970Scy do { 333293650Sglebius /* wait for tickle from the producer side */ 334293650Sglebius wait_for_sem(c->workitems_pending, NULL); 335275970Scy 336293650Sglebius /* >>>> ACCESS LOCKING STARTS >>>> */ 337293650Sglebius wait_for_sem(c->accesslock, NULL); 338293650Sglebius qhead = c->head_workitem; 339293650Sglebius do { 340293650Sglebius qtail = c->tail_workitem; 341293650Sglebius if (qhead == qtail) 342293650Sglebius break; 343293650Sglebius c->tail_workitem = qtail + 1; 344293650Sglebius qtail %= c->workitems_alloc; 345293650Sglebius req = c->workitems[qtail]; 346293650Sglebius c->workitems[qtail] = NULL; 347293650Sglebius } while (NULL == req); 348293650Sglebius tickle_sem(c->accesslock); 349293650Sglebius /* <<<< ACCESS LOCKING ENDS <<<< */ 350293650Sglebius 351293650Sglebius } while (NULL == req); 352293650Sglebius 353275970Scy INSIST(NULL != req); 354275970Scy if (CHILD_EXIT_REQ == req) { /* idled out */ 355275970Scy send_blocking_resp_internal(c, CHILD_GONE_RESP); 356275970Scy req = NULL; 357275970Scy } 358275970Scy 359275970Scy return req; 360275970Scy} 361275970Scy 362293650Sglebius/* -------------------------------------------------------------------- 363293650Sglebius * Push a response into the return queue and eventually tickle the 364293650Sglebius * receiver. 365293650Sglebius */ 366275970Scyint 367275970Scysend_blocking_resp_internal( 368275970Scy blocking_child * c, 369275970Scy blocking_pipe_header * resp 370275970Scy ) 371275970Scy{ 372293650Sglebius size_t qhead; 373293650Sglebius int empty; 374293650Sglebius 375293650Sglebius /* >>>> ACCESS LOCKING STARTS >>>> */ 376293650Sglebius wait_for_sem(c->accesslock, NULL); 377293650Sglebius empty = ensure_workresp_empty_slot(c); 378293650Sglebius qhead = c->head_response; 379293650Sglebius c->responses[qhead % c->responses_alloc] = resp; 380293650Sglebius c->head_response = 1 + qhead; 381293650Sglebius tickle_sem(c->accesslock); 382293650Sglebius /* <<<< ACCESS LOCKING ENDS <<<< */ 383275970Scy 384293650Sglebius /* queue consumer wake-up notification */ 385293650Sglebius if (empty) 386293650Sglebius { 387293650Sglebius# ifdef WORK_PIPE 388338531Sdelphij if (1 != write(c->resp_write_pipe, "", 1)) 389338531Sdelphij msyslog(LOG_WARNING, "async resolver: %s", 390338531Sdelphij "failed to notify main thread!"); 391293650Sglebius# else 392293650Sglebius tickle_sem(c->responses_pending); 393293650Sglebius# endif 394293650Sglebius } 395275970Scy return 0; 396275970Scy} 397275970Scy 398275970Scy 399275970Scy#ifndef WORK_PIPE 400293650Sglebius 401293650Sglebius/* -------------------------------------------------------------------- 402293650Sglebius * Check if a (Windows-)hanndle to a semaphore is actually the same we 403293650Sglebius * are using inside the sema wrapper. 404293650Sglebius */ 405293650Sglebiusstatic BOOL 406293650Sglebiussame_os_sema( 407293650Sglebius const sem_ref obj, 408293650Sglebius void* osh 409293650Sglebius ) 410293650Sglebius{ 411293650Sglebius return obj && osh && (obj->shnd == (HANDLE)osh); 412293650Sglebius} 413293650Sglebius 414293650Sglebius/* -------------------------------------------------------------------- 415293650Sglebius * Find the shared context that associates to an OS handle and make sure 416293650Sglebius * the data is dequeued and processed. 417293650Sglebius */ 418275970Scyvoid 419275970Scyhandle_blocking_resp_sem( 420275970Scy void * context 421275970Scy ) 422275970Scy{ 423275970Scy blocking_child * c; 424275970Scy u_int idx; 425275970Scy 426275970Scy c = NULL; 427275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 428275970Scy c = blocking_children[idx]; 429293650Sglebius if (c != NULL && 430293650Sglebius c->thread_ref != NULL && 431293650Sglebius same_os_sema(c->responses_pending, context)) 432275970Scy break; 433275970Scy } 434275970Scy if (idx < blocking_children_alloc) 435275970Scy process_blocking_resp(c); 436275970Scy} 437275970Scy#endif /* !WORK_PIPE */ 438275970Scy 439293650Sglebius/* -------------------------------------------------------------------- 440293650Sglebius * Fetch the next response from the return queue. In case of signalling 441293650Sglebius * via pipe, make sure the pipe is flushed, too. 442293650Sglebius */ 443275970Scyblocking_pipe_header * 444275970Scyreceive_blocking_resp_internal( 445275970Scy blocking_child * c 446275970Scy ) 447275970Scy{ 448275970Scy blocking_pipe_header * removed; 449293650Sglebius size_t qhead, qtail, slot; 450293650Sglebius 451275970Scy#ifdef WORK_PIPE 452275970Scy int rc; 453275970Scy char scratch[32]; 454275970Scy 455293650Sglebius do 456275970Scy rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 457293650Sglebius while (-1 == rc && EINTR == errno); 458275970Scy#endif 459293650Sglebius 460293650Sglebius /* >>>> ACCESS LOCKING STARTS >>>> */ 461293650Sglebius wait_for_sem(c->accesslock, NULL); 462293650Sglebius qhead = c->head_response; 463293650Sglebius qtail = c->tail_response; 464293650Sglebius for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 465293650Sglebius slot = qtail % c->responses_alloc; 466293650Sglebius removed = c->responses[slot]; 467293650Sglebius c->responses[slot] = NULL; 468293650Sglebius } 469293650Sglebius c->tail_response = qtail; 470293650Sglebius tickle_sem(c->accesslock); 471293650Sglebius /* <<<< ACCESS LOCKING ENDS <<<< */ 472293650Sglebius 473275970Scy if (NULL != removed) { 474275970Scy DEBUG_ENSURE(CHILD_GONE_RESP == removed || 475275970Scy BLOCKING_RESP_MAGIC == removed->magic_sig); 476275970Scy } 477275970Scy if (CHILD_GONE_RESP == removed) { 478275970Scy cleanup_after_child(c); 479275970Scy removed = NULL; 480275970Scy } 481275970Scy 482275970Scy return removed; 483275970Scy} 484275970Scy 485293650Sglebius/* -------------------------------------------------------------------- 486293650Sglebius * Light up a new worker. 487293650Sglebius */ 488275970Scystatic void 489275970Scystart_blocking_thread( 490275970Scy blocking_child * c 491275970Scy ) 492275970Scy{ 493275970Scy 494275970Scy DEBUG_INSIST(!c->reusable); 495275970Scy 496275970Scy prepare_child_sems(c); 497275970Scy start_blocking_thread_internal(c); 498275970Scy} 499275970Scy 500293650Sglebius/* -------------------------------------------------------------------- 501293650Sglebius * Create a worker thread. There are several differences between POSIX 502293650Sglebius * and Windows, of course -- most notably the Windows thread is no 503293650Sglebius * detached thread, and we keep the handle around until we want to get 504293650Sglebius * rid of the thread. The notification scheme also differs: Windows 505293650Sglebius * makes use of semaphores in both directions, POSIX uses a pipe for 506293650Sglebius * integration with 'select()' or alike. 507293650Sglebius */ 508275970Scystatic void 509275970Scystart_blocking_thread_internal( 510275970Scy blocking_child * c 511275970Scy ) 512275970Scy#ifdef SYS_WINNT 513275970Scy{ 514275970Scy BOOL resumed; 515275970Scy 516293650Sglebius c->thread_ref = NULL; 517293650Sglebius (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 518293650Sglebius c->thr_table[0].thnd = 519275970Scy (HANDLE)_beginthreadex( 520275970Scy NULL, 521275970Scy 0, 522275970Scy &blocking_thread, 523275970Scy c, 524275970Scy CREATE_SUSPENDED, 525293650Sglebius NULL); 526275970Scy 527293650Sglebius if (NULL == c->thr_table[0].thnd) { 528275970Scy msyslog(LOG_ERR, "start blocking thread failed: %m"); 529275970Scy exit(-1); 530275970Scy } 531275970Scy /* remember the thread priority is only within the process class */ 532293650Sglebius if (!SetThreadPriority(c->thr_table[0].thnd, 533275970Scy THREAD_PRIORITY_BELOW_NORMAL)) 534275970Scy msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 535275970Scy 536293650Sglebius resumed = ResumeThread(c->thr_table[0].thnd); 537275970Scy DEBUG_INSIST(resumed); 538293650Sglebius c->thread_ref = &c->thr_table[0]; 539275970Scy} 540275970Scy#else /* pthreads start_blocking_thread_internal() follows */ 541275970Scy{ 542275970Scy# ifdef NEED_PTHREAD_INIT 543275970Scy static int pthread_init_called; 544275970Scy# endif 545275970Scy pthread_attr_t thr_attr; 546275970Scy int rc; 547275970Scy int pipe_ends[2]; /* read then write */ 548275970Scy int is_pipe; 549275970Scy int flags; 550294569Sdelphij size_t ostacksize; 551294569Sdelphij size_t nstacksize; 552275970Scy sigset_t saved_sig_mask; 553275970Scy 554293650Sglebius c->thread_ref = NULL; 555293650Sglebius 556275970Scy# ifdef NEED_PTHREAD_INIT 557275970Scy /* 558275970Scy * from lib/isc/unix/app.c: 559275970Scy * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 560275970Scy */ 561275970Scy if (!pthread_init_called) { 562275970Scy pthread_init(); 563275970Scy pthread_init_called = TRUE; 564275970Scy } 565275970Scy# endif 566275970Scy 567275970Scy rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 568275970Scy if (0 != rc) { 569275970Scy msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 570275970Scy exit(1); 571275970Scy } 572275970Scy c->resp_read_pipe = move_fd(pipe_ends[0]); 573275970Scy c->resp_write_pipe = move_fd(pipe_ends[1]); 574275970Scy c->ispipe = is_pipe; 575275970Scy flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 576275970Scy if (-1 == flags) { 577275970Scy msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 578275970Scy exit(1); 579275970Scy } 580275970Scy rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 581275970Scy if (-1 == rc) { 582275970Scy msyslog(LOG_ERR, 583275970Scy "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 584275970Scy exit(1); 585275970Scy } 586275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 587275970Scy pthread_attr_init(&thr_attr); 588275970Scy pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 589275970Scy#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 590275970Scy defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 591294569Sdelphij rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 592294569Sdelphij if (0 != rc) { 593275970Scy msyslog(LOG_ERR, 594294569Sdelphij "start_blocking_thread: pthread_attr_getstacksize() -> %s", 595294569Sdelphij strerror(rc)); 596294569Sdelphij } else { 597294569Sdelphij if (ostacksize < THREAD_MINSTACKSIZE) 598294569Sdelphij nstacksize = THREAD_MINSTACKSIZE; 599294569Sdelphij else if (ostacksize > THREAD_MAXSTACKSIZE) 600294569Sdelphij nstacksize = THREAD_MAXSTACKSIZE; 601294569Sdelphij else 602294569Sdelphij nstacksize = ostacksize; 603294569Sdelphij if (nstacksize != ostacksize) 604294569Sdelphij rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 605294569Sdelphij if (0 != rc) 606275970Scy msyslog(LOG_ERR, 607294569Sdelphij "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 608294569Sdelphij (u_long)ostacksize, (u_long)nstacksize, 609294569Sdelphij strerror(rc)); 610275970Scy } 611275970Scy#else 612294569Sdelphij UNUSED_ARG(nstacksize); 613294569Sdelphij UNUSED_ARG(ostacksize); 614275970Scy#endif 615275970Scy#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 616275970Scy pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 617275970Scy#endif 618275970Scy c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 619275970Scy block_thread_signals(&saved_sig_mask); 620293650Sglebius rc = pthread_create(&c->thr_table[0], &thr_attr, 621275970Scy &blocking_thread, c); 622275970Scy pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 623275970Scy pthread_attr_destroy(&thr_attr); 624275970Scy if (0 != rc) { 625294569Sdelphij msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 626294569Sdelphij strerror(rc)); 627275970Scy exit(1); 628275970Scy } 629293650Sglebius c->thread_ref = &c->thr_table[0]; 630275970Scy} 631275970Scy#endif 632275970Scy 633293650Sglebius/* -------------------------------------------------------------------- 634275970Scy * block_thread_signals() 635275970Scy * 636275970Scy * Temporarily block signals used by ntpd main thread, so that signal 637275970Scy * mask inherited by child threads leaves them blocked. Returns prior 638275970Scy * active signal mask via pmask, to be restored by the main thread 639275970Scy * after pthread_create(). 640275970Scy */ 641275970Scy#ifndef SYS_WINNT 642275970Scyvoid 643275970Scyblock_thread_signals( 644275970Scy sigset_t * pmask 645275970Scy ) 646275970Scy{ 647275970Scy sigset_t block; 648275970Scy 649275970Scy sigemptyset(&block); 650275970Scy# ifdef HAVE_SIGNALED_IO 651275970Scy# ifdef SIGIO 652275970Scy sigaddset(&block, SIGIO); 653275970Scy# endif 654275970Scy# ifdef SIGPOLL 655275970Scy sigaddset(&block, SIGPOLL); 656275970Scy# endif 657275970Scy# endif /* HAVE_SIGNALED_IO */ 658275970Scy sigaddset(&block, SIGALRM); 659275970Scy sigaddset(&block, MOREDEBUGSIG); 660275970Scy sigaddset(&block, LESSDEBUGSIG); 661275970Scy# ifdef SIGDIE1 662275970Scy sigaddset(&block, SIGDIE1); 663275970Scy# endif 664275970Scy# ifdef SIGDIE2 665275970Scy sigaddset(&block, SIGDIE2); 666275970Scy# endif 667275970Scy# ifdef SIGDIE3 668275970Scy sigaddset(&block, SIGDIE3); 669275970Scy# endif 670275970Scy# ifdef SIGDIE4 671275970Scy sigaddset(&block, SIGDIE4); 672275970Scy# endif 673275970Scy# ifdef SIGBUS 674275970Scy sigaddset(&block, SIGBUS); 675275970Scy# endif 676275970Scy sigemptyset(pmask); 677275970Scy pthread_sigmask(SIG_BLOCK, &block, pmask); 678275970Scy} 679275970Scy#endif /* !SYS_WINNT */ 680275970Scy 681275970Scy 682293650Sglebius/* -------------------------------------------------------------------- 683293650Sglebius * Create & destroy semaphores. This is sufficiently different between 684293650Sglebius * POSIX and Windows to warrant wrapper functions and close enough to 685293650Sglebius * use the concept of synchronization via semaphore for all platforms. 686293650Sglebius */ 687293650Sglebiusstatic sem_ref 688293650Sglebiuscreate_sema( 689293650Sglebius sema_type* semptr, 690293650Sglebius u_int inival, 691293650Sglebius u_int maxval) 692293650Sglebius{ 693293650Sglebius#ifdef SYS_WINNT 694293650Sglebius 695293650Sglebius long svini, svmax; 696293650Sglebius if (NULL != semptr) { 697293650Sglebius svini = (inival < LONG_MAX) 698293650Sglebius ? (long)inival : LONG_MAX; 699293650Sglebius svmax = (maxval < LONG_MAX && maxval > 0) 700293650Sglebius ? (long)maxval : LONG_MAX; 701293650Sglebius semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 702293650Sglebius if (NULL == semptr->shnd) 703293650Sglebius semptr = NULL; 704293650Sglebius } 705293650Sglebius 706293650Sglebius#else 707293650Sglebius 708293650Sglebius (void)maxval; 709293650Sglebius if (semptr && sem_init(semptr, FALSE, inival)) 710293650Sglebius semptr = NULL; 711293650Sglebius 712293650Sglebius#endif 713293650Sglebius 714293650Sglebius return semptr; 715293650Sglebius} 716293650Sglebius 717293650Sglebius/* ------------------------------------------------------------------ */ 718293650Sglebiusstatic sem_ref 719293650Sglebiusdelete_sema( 720293650Sglebius sem_ref obj) 721293650Sglebius{ 722293650Sglebius 723293650Sglebius# ifdef SYS_WINNT 724293650Sglebius 725293650Sglebius if (obj) { 726293650Sglebius if (obj->shnd) 727293650Sglebius CloseHandle(obj->shnd); 728293650Sglebius obj->shnd = NULL; 729293650Sglebius } 730293650Sglebius 731293650Sglebius# else 732293650Sglebius 733293650Sglebius if (obj) 734293650Sglebius sem_destroy(obj); 735293650Sglebius 736293650Sglebius# endif 737293650Sglebius 738293650Sglebius return NULL; 739293650Sglebius} 740293650Sglebius 741293650Sglebius/* -------------------------------------------------------------------- 742275970Scy * prepare_child_sems() 743275970Scy * 744293650Sglebius * create sync & access semaphores 745275970Scy * 746293650Sglebius * All semaphores are cleared, only the access semaphore has 1 unit. 747293650Sglebius * Childs wait on 'workitems_pending', then grabs 'sema_access' 748293650Sglebius * and dequeues jobs. When done, 'sema_access' is given one unit back. 749293650Sglebius * 750293650Sglebius * The producer grabs 'sema_access', manages the queue, restores 751293650Sglebius * 'sema_access' and puts one unit into 'workitems_pending'. 752293650Sglebius * 753293650Sglebius * The story goes the same for the response queue. 754275970Scy */ 755275970Scystatic void 756275970Scyprepare_child_sems( 757275970Scy blocking_child *c 758275970Scy ) 759275970Scy{ 760298699Sdelphij if (NULL == worker_memlock) 761298699Sdelphij worker_memlock = create_sema(&worker_mmutex, 1, 1); 762298699Sdelphij 763293650Sglebius c->accesslock = create_sema(&c->sem_table[0], 1, 1); 764293650Sglebius c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 765293650Sglebius c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 766293650Sglebius# ifndef WORK_PIPE 767293650Sglebius c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 768293650Sglebius# endif 769275970Scy} 770275970Scy 771293650Sglebius/* -------------------------------------------------------------------- 772293650Sglebius * wait for semaphore. Where the wait can be interrupted, it will 773293650Sglebius * internally resume -- When this function returns, there is either no 774293650Sglebius * semaphore at all, a timeout occurred, or the caller could 775293650Sglebius * successfully take a token from the semaphore. 776293650Sglebius * 777293650Sglebius * For untimed wait, not checking the result of this function at all is 778293650Sglebius * definitely an option. 779293650Sglebius */ 780275970Scystatic int 781275970Scywait_for_sem( 782275970Scy sem_ref sem, 783275970Scy struct timespec * timeout /* wall-clock */ 784275970Scy ) 785275970Scy#ifdef SYS_WINNT 786275970Scy{ 787275970Scy struct timespec now; 788275970Scy struct timespec delta; 789275970Scy DWORD msec; 790275970Scy DWORD rc; 791275970Scy 792293650Sglebius if (!(sem && sem->shnd)) { 793293650Sglebius errno = EINVAL; 794293650Sglebius return -1; 795293650Sglebius } 796293650Sglebius 797275970Scy if (NULL == timeout) { 798275970Scy msec = INFINITE; 799275970Scy } else { 800275970Scy getclock(TIMEOFDAY, &now); 801275970Scy delta = sub_tspec(*timeout, now); 802275970Scy if (delta.tv_sec < 0) { 803275970Scy msec = 0; 804275970Scy } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 805275970Scy msec = INFINITE; 806275970Scy } else { 807275970Scy msec = 1000 * (DWORD)delta.tv_sec; 808275970Scy msec += delta.tv_nsec / (1000 * 1000); 809275970Scy } 810275970Scy } 811293650Sglebius rc = WaitForSingleObject(sem->shnd, msec); 812275970Scy if (WAIT_OBJECT_0 == rc) 813275970Scy return 0; 814275970Scy if (WAIT_TIMEOUT == rc) { 815275970Scy errno = ETIMEDOUT; 816275970Scy return -1; 817275970Scy } 818275970Scy msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 819275970Scy errno = EFAULT; 820275970Scy return -1; 821275970Scy} 822275970Scy#else /* pthreads wait_for_sem() follows */ 823275970Scy{ 824293650Sglebius int rc = -1; 825275970Scy 826293650Sglebius if (sem) do { 827293650Sglebius if (NULL == timeout) 828293650Sglebius rc = sem_wait(sem); 829293650Sglebius else 830293650Sglebius rc = sem_timedwait(sem, timeout); 831293650Sglebius } while (rc == -1 && errno == EINTR); 832275970Scy else 833293650Sglebius errno = EINVAL; 834293650Sglebius 835275970Scy return rc; 836275970Scy} 837275970Scy#endif 838275970Scy 839293650Sglebius/* -------------------------------------------------------------------- 840293650Sglebius * blocking_thread - thread functions have WINAPI (aka 'stdcall') 841293650Sglebius * calling conventions under Windows and POSIX-defined signature 842293650Sglebius * otherwise. 843275970Scy */ 844275970Scy#ifdef SYS_WINNT 845293650Sglebiusu_int WINAPI 846275970Scy#else 847275970Scyvoid * 848275970Scy#endif 849275970Scyblocking_thread( 850275970Scy void * ThreadArg 851275970Scy ) 852275970Scy{ 853275970Scy blocking_child *c; 854275970Scy 855275970Scy c = ThreadArg; 856275970Scy exit_worker(blocking_child_common(c)); 857275970Scy 858275970Scy /* NOTREACHED */ 859275970Scy return 0; 860275970Scy} 861275970Scy 862293650Sglebius/* -------------------------------------------------------------------- 863275970Scy * req_child_exit() runs in the parent. 864293650Sglebius * 865293650Sglebius * This function is called from from the idle timer, too, and possibly 866293650Sglebius * without a thread being there any longer. Since we have folded up our 867293650Sglebius * tent in that case and all the semaphores are already gone, we simply 868293650Sglebius * ignore this request in this case. 869293650Sglebius * 870293650Sglebius * Since the existence of the semaphores is controlled exclusively by 871293650Sglebius * the parent, there's no risk of data race here. 872275970Scy */ 873275970Scyint 874275970Scyreq_child_exit( 875275970Scy blocking_child *c 876275970Scy ) 877275970Scy{ 878293650Sglebius return (c->accesslock) 879293650Sglebius ? queue_req_pointer(c, CHILD_EXIT_REQ) 880293650Sglebius : 0; 881275970Scy} 882275970Scy 883293650Sglebius/* -------------------------------------------------------------------- 884275970Scy * cleanup_after_child() runs in parent. 885275970Scy */ 886275970Scystatic void 887275970Scycleanup_after_child( 888275970Scy blocking_child * c 889275970Scy ) 890275970Scy{ 891275970Scy DEBUG_INSIST(!c->reusable); 892293650Sglebius 893293650Sglebius# ifdef SYS_WINNT 894293650Sglebius /* The thread was not created in detached state, so we better 895293650Sglebius * clean up. 896293650Sglebius */ 897293650Sglebius if (c->thread_ref && c->thread_ref->thnd) { 898293650Sglebius WaitForSingleObject(c->thread_ref->thnd, INFINITE); 899293650Sglebius INSIST(CloseHandle(c->thread_ref->thnd)); 900293650Sglebius c->thread_ref->thnd = NULL; 901293650Sglebius } 902293650Sglebius# endif 903275970Scy c->thread_ref = NULL; 904293650Sglebius 905293650Sglebius /* remove semaphores and (if signalling vi IO) pipes */ 906293650Sglebius 907293650Sglebius c->accesslock = delete_sema(c->accesslock); 908293650Sglebius c->workitems_pending = delete_sema(c->workitems_pending); 909293650Sglebius c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 910293650Sglebius 911293650Sglebius# ifdef WORK_PIPE 912275970Scy DEBUG_INSIST(-1 != c->resp_read_pipe); 913275970Scy DEBUG_INSIST(-1 != c->resp_write_pipe); 914275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 915275970Scy close(c->resp_write_pipe); 916275970Scy close(c->resp_read_pipe); 917275970Scy c->resp_write_pipe = -1; 918275970Scy c->resp_read_pipe = -1; 919293650Sglebius# else 920293650Sglebius DEBUG_INSIST(NULL != c->responses_pending); 921293650Sglebius (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 922293650Sglebius c->responses_pending = delete_sema(c->responses_pending); 923293650Sglebius# endif 924293650Sglebius 925293650Sglebius /* Is it necessary to check if there are pending requests and 926293650Sglebius * responses? If so, and if there are, what to do with them? 927293650Sglebius */ 928293650Sglebius 929293650Sglebius /* re-init buffer index sequencers */ 930293650Sglebius c->head_workitem = 0; 931293650Sglebius c->tail_workitem = 0; 932293650Sglebius c->head_response = 0; 933293650Sglebius c->tail_response = 0; 934293650Sglebius 935275970Scy c->reusable = TRUE; 936275970Scy} 937275970Scy 938275970Scy 939275970Scy#else /* !WORK_THREAD follows */ 940275970Scychar work_thread_nonempty_compilation_unit; 941275970Scy#endif 942