1275970Scy/* 2275970Scy * work_thread.c - threads implementation for blocking worker child. 3275970Scy */ 4275970Scy#include <config.h> 5275970Scy#include "ntp_workimpl.h" 6275970Scy 7275970Scy#ifdef WORK_THREAD 8275970Scy 9275970Scy#include <stdio.h> 10275970Scy#include <ctype.h> 11275970Scy#include <signal.h> 12275970Scy#ifndef SYS_WINNT 13275970Scy#include <pthread.h> 14275970Scy#endif 15275970Scy 16275970Scy#include "ntp_stdlib.h" 17275970Scy#include "ntp_malloc.h" 18275970Scy#include "ntp_syslog.h" 19275970Scy#include "ntpd.h" 20275970Scy#include "ntp_io.h" 21275970Scy#include "ntp_assert.h" 22275970Scy#include "ntp_unixtime.h" 23275970Scy#include "timespecops.h" 24275970Scy#include "ntp_worker.h" 25275970Scy 26275970Scy#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27275970Scy#define CHILD_GONE_RESP CHILD_EXIT_REQ 28294904Sdelphij/* Queue size increments: 29294904Sdelphij * The request queue grows a bit faster than the response queue -- the 30294904Sdelphij * deamon can push requests and pull results faster on avarage than the 31294904Sdelphij * worker can process requests and push results... If this really pays 32294904Sdelphij * off is debatable. 33294904Sdelphij */ 34275970Scy#define WORKITEMS_ALLOC_INC 16 35275970Scy#define RESPONSES_ALLOC_INC 4 36275970Scy 37294904Sdelphij/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 38294904Sdelphij * set the maximum to 256kB. If the minimum goes below the 39294904Sdelphij * system-defined minimum stack size, we have to adjust accordingly. 40294904Sdelphij */ 41275970Scy#ifndef THREAD_MINSTACKSIZE 42294904Sdelphij# define THREAD_MINSTACKSIZE (64U * 1024) 43275970Scy#endif 44294904Sdelphij#ifndef __sun 45294904Sdelphij#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN 46294904Sdelphij# undef THREAD_MINSTACKSIZE 47294904Sdelphij# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN 48294904Sdelphij#endif 49294904Sdelphij#endif 50275970Scy 51294904Sdelphij#ifndef THREAD_MAXSTACKSIZE 52294904Sdelphij# define THREAD_MAXSTACKSIZE (256U * 1024) 53294904Sdelphij#endif 54294904Sdelphij#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE 55294904Sdelphij# undef THREAD_MAXSTACKSIZE 56294904Sdelphij# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE 57294904Sdelphij#endif 58294904Sdelphij 59294904Sdelphij 60293894Sglebius#ifdef SYS_WINNT 61275970Scy 62275970Scy# define thread_exit(c) _endthreadex(c) 63293894Sglebius# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 64293894Sglebiusu_int WINAPI blocking_thread(void *); 65293894Sglebiusstatic BOOL same_os_sema(const sem_ref obj, void * osobj); 66293894Sglebius 67275970Scy#else 68293894Sglebius 69275970Scy# define thread_exit(c) pthread_exit((void*)(size_t)(c)) 70275970Scy# define tickle_sem sem_post 71293894Sglebiusvoid * blocking_thread(void *); 72293894Sglebiusstatic void block_thread_signals(sigset_t *); 73293894Sglebius 74275970Scy#endif 75275970Scy 76275970Scy#ifdef WORK_PIPE 77275970Scyaddremove_io_fd_func addremove_io_fd; 78275970Scy#else 79275970Scyaddremove_io_semaphore_func addremove_io_semaphore; 80275970Scy#endif 81275970Scy 82275970Scystatic void start_blocking_thread(blocking_child *); 83275970Scystatic void start_blocking_thread_internal(blocking_child *); 84275970Scystatic void prepare_child_sems(blocking_child *); 85275970Scystatic int wait_for_sem(sem_ref, struct timespec *); 86293894Sglebiusstatic int ensure_workitems_empty_slot(blocking_child *); 87293894Sglebiusstatic int ensure_workresp_empty_slot(blocking_child *); 88275970Scystatic int queue_req_pointer(blocking_child *, blocking_pipe_header *); 89275970Scystatic void cleanup_after_child(blocking_child *); 90275970Scy 91298770Sdelphijstatic sema_type worker_mmutex; 92298770Sdelphijstatic sem_ref worker_memlock; 93275970Scy 94298770Sdelphij/* -------------------------------------------------------------------- 95298770Sdelphij * locking the global worker state table (and other global stuff) 96298770Sdelphij */ 97275970Scyvoid 98298770Sdelphijworker_global_lock( 99298770Sdelphij int inOrOut) 100298770Sdelphij{ 101298770Sdelphij if (worker_memlock) { 102298770Sdelphij if (inOrOut) 103298770Sdelphij wait_for_sem(worker_memlock, NULL); 104298770Sdelphij else 105298770Sdelphij tickle_sem(worker_memlock); 106298770Sdelphij } 107298770Sdelphij} 108298770Sdelphij 109298770Sdelphij/* -------------------------------------------------------------------- 110298770Sdelphij * implementation isolation wrapper 111298770Sdelphij */ 112298770Sdelphijvoid 113275970Scyexit_worker( 114275970Scy int exitcode 115275970Scy ) 116275970Scy{ 117275970Scy thread_exit(exitcode); /* see #define thread_exit */ 118275970Scy} 119275970Scy 120293894Sglebius/* -------------------------------------------------------------------- 121293894Sglebius * sleep for a given time or until the wakup semaphore is tickled. 122293894Sglebius */ 123275970Scyint 124275970Scyworker_sleep( 125275970Scy blocking_child * c, 126275970Scy time_t seconds 127275970Scy ) 128275970Scy{ 129275970Scy struct timespec until; 130275970Scy int rc; 131275970Scy 132275970Scy# ifdef HAVE_CLOCK_GETTIME 133275970Scy if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 134275970Scy msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 135275970Scy return -1; 136275970Scy } 137275970Scy# else 138275970Scy if (0 != getclock(TIMEOFDAY, &until)) { 139275970Scy msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 140275970Scy return -1; 141275970Scy } 142275970Scy# endif 143275970Scy until.tv_sec += seconds; 144293894Sglebius rc = wait_for_sem(c->wake_scheduled_sleep, &until); 145275970Scy if (0 == rc) 146275970Scy return -1; 147275970Scy if (-1 == rc && ETIMEDOUT == errno) 148275970Scy return 0; 149275970Scy msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 150275970Scy return -1; 151275970Scy} 152275970Scy 153275970Scy 154293894Sglebius/* -------------------------------------------------------------------- 155293894Sglebius * Wake up a worker that takes a nap. 156293894Sglebius */ 157275970Scyvoid 158275970Scyinterrupt_worker_sleep(void) 159275970Scy{ 160275970Scy u_int idx; 161275970Scy blocking_child * c; 162275970Scy 163275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 164275970Scy c = blocking_children[idx]; 165275970Scy if (NULL == c || NULL == c->wake_scheduled_sleep) 166275970Scy continue; 167275970Scy tickle_sem(c->wake_scheduled_sleep); 168275970Scy } 169275970Scy} 170275970Scy 171293894Sglebius/* -------------------------------------------------------------------- 172293894Sglebius * Make sure there is an empty slot at the head of the request 173293894Sglebius * queue. Tell if the queue is currently empty. 174293894Sglebius */ 175293894Sglebiusstatic int 176275970Scyensure_workitems_empty_slot( 177275970Scy blocking_child *c 178275970Scy ) 179275970Scy{ 180293894Sglebius /* 181293894Sglebius ** !!! PRECONDITION: caller holds access lock! 182293894Sglebius ** 183293894Sglebius ** This simply tries to increase the size of the buffer if it 184293894Sglebius ** becomes full. The resize operation does *not* maintain the 185293894Sglebius ** order of requests, but that should be irrelevant since the 186293894Sglebius ** processing is considered asynchronous anyway. 187293894Sglebius ** 188293894Sglebius ** Return if the buffer is currently empty. 189293894Sglebius */ 190293894Sglebius 191293894Sglebius static const size_t each = 192293894Sglebius sizeof(blocking_children[0]->workitems[0]); 193275970Scy 194293894Sglebius size_t new_alloc; 195293894Sglebius size_t slots_used; 196294904Sdelphij size_t sidx; 197275970Scy 198293894Sglebius slots_used = c->head_workitem - c->tail_workitem; 199293894Sglebius if (slots_used >= c->workitems_alloc) { 200293894Sglebius new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 201293894Sglebius c->workitems = erealloc(c->workitems, new_alloc * each); 202294904Sdelphij for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 203294904Sdelphij c->workitems[sidx] = NULL; 204293894Sglebius c->tail_workitem = 0; 205293894Sglebius c->head_workitem = c->workitems_alloc; 206293894Sglebius c->workitems_alloc = new_alloc; 207293894Sglebius } 208294904Sdelphij INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 209293894Sglebius return (0 == slots_used); 210275970Scy} 211275970Scy 212293894Sglebius/* -------------------------------------------------------------------- 213293894Sglebius * Make sure there is an empty slot at the head of the response 214293894Sglebius * queue. Tell if the queue is currently empty. 215293894Sglebius */ 216293894Sglebiusstatic int 217275970Scyensure_workresp_empty_slot( 218275970Scy blocking_child *c 219275970Scy ) 220275970Scy{ 221293894Sglebius /* 222293894Sglebius ** !!! PRECONDITION: caller holds access lock! 223293894Sglebius ** 224293894Sglebius ** Works like the companion function above. 225293894Sglebius */ 226293894Sglebius 227293894Sglebius static const size_t each = 228293894Sglebius sizeof(blocking_children[0]->responses[0]); 229275970Scy 230293894Sglebius size_t new_alloc; 231293894Sglebius size_t slots_used; 232294904Sdelphij size_t sidx; 233275970Scy 234293894Sglebius slots_used = c->head_response - c->tail_response; 235293894Sglebius if (slots_used >= c->responses_alloc) { 236293894Sglebius new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 237293894Sglebius c->responses = erealloc(c->responses, new_alloc * each); 238294904Sdelphij for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 239294904Sdelphij c->responses[sidx] = NULL; 240293894Sglebius c->tail_response = 0; 241293894Sglebius c->head_response = c->responses_alloc; 242293894Sglebius c->responses_alloc = new_alloc; 243293894Sglebius } 244294904Sdelphij INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 245293894Sglebius return (0 == slots_used); 246275970Scy} 247275970Scy 248275970Scy 249293894Sglebius/* -------------------------------------------------------------------- 250275970Scy * queue_req_pointer() - append a work item or idle exit request to 251293894Sglebius * blocking_workitems[]. Employ proper locking. 252275970Scy */ 253275970Scystatic int 254275970Scyqueue_req_pointer( 255275970Scy blocking_child * c, 256275970Scy blocking_pipe_header * hdr 257275970Scy ) 258275970Scy{ 259293894Sglebius size_t qhead; 260293894Sglebius 261293894Sglebius /* >>>> ACCESS LOCKING STARTS >>>> */ 262293894Sglebius wait_for_sem(c->accesslock, NULL); 263293894Sglebius ensure_workitems_empty_slot(c); 264293894Sglebius qhead = c->head_workitem; 265293894Sglebius c->workitems[qhead % c->workitems_alloc] = hdr; 266293894Sglebius c->head_workitem = 1 + qhead; 267293894Sglebius tickle_sem(c->accesslock); 268293894Sglebius /* <<<< ACCESS LOCKING ENDS <<<< */ 269275970Scy 270293894Sglebius /* queue consumer wake-up notification */ 271293894Sglebius tickle_sem(c->workitems_pending); 272275970Scy 273275970Scy return 0; 274275970Scy} 275275970Scy 276293894Sglebius/* -------------------------------------------------------------------- 277293894Sglebius * API function to make sure a worker is running, a proper private copy 278293894Sglebius * of the data is made, the data eneterd into the queue and the worker 279293894Sglebius * is signalled. 280293894Sglebius */ 281275970Scyint 282275970Scysend_blocking_req_internal( 283275970Scy blocking_child * c, 284275970Scy blocking_pipe_header * hdr, 285275970Scy void * data 286275970Scy ) 287275970Scy{ 288275970Scy blocking_pipe_header * threadcopy; 289275970Scy size_t payload_octets; 290275970Scy 291275970Scy REQUIRE(hdr != NULL); 292275970Scy REQUIRE(data != NULL); 293275970Scy DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 294275970Scy 295275970Scy if (hdr->octets <= sizeof(*hdr)) 296275970Scy return 1; /* failure */ 297275970Scy payload_octets = hdr->octets - sizeof(*hdr); 298275970Scy 299293894Sglebius if (NULL == c->thread_ref) 300275970Scy start_blocking_thread(c); 301275970Scy threadcopy = emalloc(hdr->octets); 302275970Scy memcpy(threadcopy, hdr, sizeof(*hdr)); 303275970Scy memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 304275970Scy 305275970Scy return queue_req_pointer(c, threadcopy); 306275970Scy} 307275970Scy 308293894Sglebius/* -------------------------------------------------------------------- 309293894Sglebius * Wait for the 'incoming queue no longer empty' signal, lock the shared 310293894Sglebius * structure and dequeue an item. 311293894Sglebius */ 312275970Scyblocking_pipe_header * 313275970Scyreceive_blocking_req_internal( 314275970Scy blocking_child * c 315275970Scy ) 316275970Scy{ 317275970Scy blocking_pipe_header * req; 318293894Sglebius size_t qhead, qtail; 319275970Scy 320293894Sglebius req = NULL; 321275970Scy do { 322293894Sglebius /* wait for tickle from the producer side */ 323293894Sglebius wait_for_sem(c->workitems_pending, NULL); 324275970Scy 325293894Sglebius /* >>>> ACCESS LOCKING STARTS >>>> */ 326293894Sglebius wait_for_sem(c->accesslock, NULL); 327293894Sglebius qhead = c->head_workitem; 328293894Sglebius do { 329293894Sglebius qtail = c->tail_workitem; 330293894Sglebius if (qhead == qtail) 331293894Sglebius break; 332293894Sglebius c->tail_workitem = qtail + 1; 333293894Sglebius qtail %= c->workitems_alloc; 334293894Sglebius req = c->workitems[qtail]; 335293894Sglebius c->workitems[qtail] = NULL; 336293894Sglebius } while (NULL == req); 337293894Sglebius tickle_sem(c->accesslock); 338293894Sglebius /* <<<< ACCESS LOCKING ENDS <<<< */ 339293894Sglebius 340293894Sglebius } while (NULL == req); 341293894Sglebius 342275970Scy INSIST(NULL != req); 343275970Scy if (CHILD_EXIT_REQ == req) { /* idled out */ 344275970Scy send_blocking_resp_internal(c, CHILD_GONE_RESP); 345275970Scy req = NULL; 346275970Scy } 347275970Scy 348275970Scy return req; 349275970Scy} 350275970Scy 351293894Sglebius/* -------------------------------------------------------------------- 352293894Sglebius * Push a response into the return queue and eventually tickle the 353293894Sglebius * receiver. 354293894Sglebius */ 355275970Scyint 356275970Scysend_blocking_resp_internal( 357275970Scy blocking_child * c, 358275970Scy blocking_pipe_header * resp 359275970Scy ) 360275970Scy{ 361293894Sglebius size_t qhead; 362293894Sglebius int empty; 363293894Sglebius 364293894Sglebius /* >>>> ACCESS LOCKING STARTS >>>> */ 365293894Sglebius wait_for_sem(c->accesslock, NULL); 366293894Sglebius empty = ensure_workresp_empty_slot(c); 367293894Sglebius qhead = c->head_response; 368293894Sglebius c->responses[qhead % c->responses_alloc] = resp; 369293894Sglebius c->head_response = 1 + qhead; 370293894Sglebius tickle_sem(c->accesslock); 371293894Sglebius /* <<<< ACCESS LOCKING ENDS <<<< */ 372275970Scy 373293894Sglebius /* queue consumer wake-up notification */ 374293894Sglebius if (empty) 375293894Sglebius { 376293894Sglebius# ifdef WORK_PIPE 377293894Sglebius write(c->resp_write_pipe, "", 1); 378293894Sglebius# else 379293894Sglebius tickle_sem(c->responses_pending); 380293894Sglebius# endif 381293894Sglebius } 382275970Scy return 0; 383275970Scy} 384275970Scy 385275970Scy 386275970Scy#ifndef WORK_PIPE 387293894Sglebius 388293894Sglebius/* -------------------------------------------------------------------- 389293894Sglebius * Check if a (Windows-)hanndle to a semaphore is actually the same we 390293894Sglebius * are using inside the sema wrapper. 391293894Sglebius */ 392293894Sglebiusstatic BOOL 393293894Sglebiussame_os_sema( 394293894Sglebius const sem_ref obj, 395293894Sglebius void* osh 396293894Sglebius ) 397293894Sglebius{ 398293894Sglebius return obj && osh && (obj->shnd == (HANDLE)osh); 399293894Sglebius} 400293894Sglebius 401293894Sglebius/* -------------------------------------------------------------------- 402293894Sglebius * Find the shared context that associates to an OS handle and make sure 403293894Sglebius * the data is dequeued and processed. 404293894Sglebius */ 405275970Scyvoid 406275970Scyhandle_blocking_resp_sem( 407275970Scy void * context 408275970Scy ) 409275970Scy{ 410275970Scy blocking_child * c; 411275970Scy u_int idx; 412275970Scy 413275970Scy c = NULL; 414275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 415275970Scy c = blocking_children[idx]; 416293894Sglebius if (c != NULL && 417293894Sglebius c->thread_ref != NULL && 418293894Sglebius same_os_sema(c->responses_pending, context)) 419275970Scy break; 420275970Scy } 421275970Scy if (idx < blocking_children_alloc) 422275970Scy process_blocking_resp(c); 423275970Scy} 424275970Scy#endif /* !WORK_PIPE */ 425275970Scy 426293894Sglebius/* -------------------------------------------------------------------- 427293894Sglebius * Fetch the next response from the return queue. In case of signalling 428293894Sglebius * via pipe, make sure the pipe is flushed, too. 429293894Sglebius */ 430275970Scyblocking_pipe_header * 431275970Scyreceive_blocking_resp_internal( 432275970Scy blocking_child * c 433275970Scy ) 434275970Scy{ 435275970Scy blocking_pipe_header * removed; 436293894Sglebius size_t qhead, qtail, slot; 437293894Sglebius 438275970Scy#ifdef WORK_PIPE 439275970Scy int rc; 440275970Scy char scratch[32]; 441275970Scy 442293894Sglebius do 443275970Scy rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 444293894Sglebius while (-1 == rc && EINTR == errno); 445275970Scy#endif 446293894Sglebius 447293894Sglebius /* >>>> ACCESS LOCKING STARTS >>>> */ 448293894Sglebius wait_for_sem(c->accesslock, NULL); 449293894Sglebius qhead = c->head_response; 450293894Sglebius qtail = c->tail_response; 451293894Sglebius for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 452293894Sglebius slot = qtail % c->responses_alloc; 453293894Sglebius removed = c->responses[slot]; 454293894Sglebius c->responses[slot] = NULL; 455293894Sglebius } 456293894Sglebius c->tail_response = qtail; 457293894Sglebius tickle_sem(c->accesslock); 458293894Sglebius /* <<<< ACCESS LOCKING ENDS <<<< */ 459293894Sglebius 460275970Scy if (NULL != removed) { 461275970Scy DEBUG_ENSURE(CHILD_GONE_RESP == removed || 462275970Scy BLOCKING_RESP_MAGIC == removed->magic_sig); 463275970Scy } 464275970Scy if (CHILD_GONE_RESP == removed) { 465275970Scy cleanup_after_child(c); 466275970Scy removed = NULL; 467275970Scy } 468275970Scy 469275970Scy return removed; 470275970Scy} 471275970Scy 472293894Sglebius/* -------------------------------------------------------------------- 473293894Sglebius * Light up a new worker. 474293894Sglebius */ 475275970Scystatic void 476275970Scystart_blocking_thread( 477275970Scy blocking_child * c 478275970Scy ) 479275970Scy{ 480275970Scy 481275970Scy DEBUG_INSIST(!c->reusable); 482275970Scy 483275970Scy prepare_child_sems(c); 484275970Scy start_blocking_thread_internal(c); 485275970Scy} 486275970Scy 487293894Sglebius/* -------------------------------------------------------------------- 488293894Sglebius * Create a worker thread. There are several differences between POSIX 489293894Sglebius * and Windows, of course -- most notably the Windows thread is no 490293894Sglebius * detached thread, and we keep the handle around until we want to get 491293894Sglebius * rid of the thread. The notification scheme also differs: Windows 492293894Sglebius * makes use of semaphores in both directions, POSIX uses a pipe for 493293894Sglebius * integration with 'select()' or alike. 494293894Sglebius */ 495275970Scystatic void 496275970Scystart_blocking_thread_internal( 497275970Scy blocking_child * c 498275970Scy ) 499275970Scy#ifdef SYS_WINNT 500275970Scy{ 501275970Scy BOOL resumed; 502275970Scy 503293894Sglebius c->thread_ref = NULL; 504293894Sglebius (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 505293894Sglebius c->thr_table[0].thnd = 506275970Scy (HANDLE)_beginthreadex( 507275970Scy NULL, 508275970Scy 0, 509275970Scy &blocking_thread, 510275970Scy c, 511275970Scy CREATE_SUSPENDED, 512293894Sglebius NULL); 513275970Scy 514293894Sglebius if (NULL == c->thr_table[0].thnd) { 515275970Scy msyslog(LOG_ERR, "start blocking thread failed: %m"); 516275970Scy exit(-1); 517275970Scy } 518275970Scy /* remember the thread priority is only within the process class */ 519293894Sglebius if (!SetThreadPriority(c->thr_table[0].thnd, 520275970Scy THREAD_PRIORITY_BELOW_NORMAL)) 521275970Scy msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 522275970Scy 523293894Sglebius resumed = ResumeThread(c->thr_table[0].thnd); 524275970Scy DEBUG_INSIST(resumed); 525293894Sglebius c->thread_ref = &c->thr_table[0]; 526275970Scy} 527275970Scy#else /* pthreads start_blocking_thread_internal() follows */ 528275970Scy{ 529275970Scy# ifdef NEED_PTHREAD_INIT 530275970Scy static int pthread_init_called; 531275970Scy# endif 532275970Scy pthread_attr_t thr_attr; 533275970Scy int rc; 534275970Scy int pipe_ends[2]; /* read then write */ 535275970Scy int is_pipe; 536275970Scy int flags; 537294904Sdelphij size_t ostacksize; 538294904Sdelphij size_t nstacksize; 539275970Scy sigset_t saved_sig_mask; 540275970Scy 541293894Sglebius c->thread_ref = NULL; 542293894Sglebius 543275970Scy# ifdef NEED_PTHREAD_INIT 544275970Scy /* 545275970Scy * from lib/isc/unix/app.c: 546275970Scy * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 547275970Scy */ 548275970Scy if (!pthread_init_called) { 549275970Scy pthread_init(); 550275970Scy pthread_init_called = TRUE; 551275970Scy } 552275970Scy# endif 553275970Scy 554275970Scy rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 555275970Scy if (0 != rc) { 556275970Scy msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 557275970Scy exit(1); 558275970Scy } 559275970Scy c->resp_read_pipe = move_fd(pipe_ends[0]); 560275970Scy c->resp_write_pipe = move_fd(pipe_ends[1]); 561275970Scy c->ispipe = is_pipe; 562275970Scy flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 563275970Scy if (-1 == flags) { 564275970Scy msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 565275970Scy exit(1); 566275970Scy } 567275970Scy rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 568275970Scy if (-1 == rc) { 569275970Scy msyslog(LOG_ERR, 570275970Scy "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 571275970Scy exit(1); 572275970Scy } 573275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 574275970Scy pthread_attr_init(&thr_attr); 575275970Scy pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 576275970Scy#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 577275970Scy defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 578294904Sdelphij rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 579294904Sdelphij if (0 != rc) { 580275970Scy msyslog(LOG_ERR, 581294904Sdelphij "start_blocking_thread: pthread_attr_getstacksize() -> %s", 582294904Sdelphij strerror(rc)); 583294904Sdelphij } else { 584294904Sdelphij if (ostacksize < THREAD_MINSTACKSIZE) 585294904Sdelphij nstacksize = THREAD_MINSTACKSIZE; 586294904Sdelphij else if (ostacksize > THREAD_MAXSTACKSIZE) 587294904Sdelphij nstacksize = THREAD_MAXSTACKSIZE; 588294904Sdelphij else 589294904Sdelphij nstacksize = ostacksize; 590294904Sdelphij if (nstacksize != ostacksize) 591294904Sdelphij rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 592294904Sdelphij if (0 != rc) 593275970Scy msyslog(LOG_ERR, 594294904Sdelphij "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 595294904Sdelphij (u_long)ostacksize, (u_long)nstacksize, 596294904Sdelphij strerror(rc)); 597275970Scy } 598275970Scy#else 599294904Sdelphij UNUSED_ARG(nstacksize); 600294904Sdelphij UNUSED_ARG(ostacksize); 601275970Scy#endif 602275970Scy#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 603275970Scy pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 604275970Scy#endif 605275970Scy c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 606275970Scy block_thread_signals(&saved_sig_mask); 607293894Sglebius rc = pthread_create(&c->thr_table[0], &thr_attr, 608275970Scy &blocking_thread, c); 609275970Scy pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 610275970Scy pthread_attr_destroy(&thr_attr); 611275970Scy if (0 != rc) { 612294904Sdelphij msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 613294904Sdelphij strerror(rc)); 614275970Scy exit(1); 615275970Scy } 616293894Sglebius c->thread_ref = &c->thr_table[0]; 617275970Scy} 618275970Scy#endif 619275970Scy 620293894Sglebius/* -------------------------------------------------------------------- 621275970Scy * block_thread_signals() 622275970Scy * 623275970Scy * Temporarily block signals used by ntpd main thread, so that signal 624275970Scy * mask inherited by child threads leaves them blocked. Returns prior 625275970Scy * active signal mask via pmask, to be restored by the main thread 626275970Scy * after pthread_create(). 627275970Scy */ 628275970Scy#ifndef SYS_WINNT 629275970Scyvoid 630275970Scyblock_thread_signals( 631275970Scy sigset_t * pmask 632275970Scy ) 633275970Scy{ 634275970Scy sigset_t block; 635275970Scy 636275970Scy sigemptyset(&block); 637275970Scy# ifdef HAVE_SIGNALED_IO 638275970Scy# ifdef SIGIO 639275970Scy sigaddset(&block, SIGIO); 640275970Scy# endif 641275970Scy# ifdef SIGPOLL 642275970Scy sigaddset(&block, SIGPOLL); 643275970Scy# endif 644275970Scy# endif /* HAVE_SIGNALED_IO */ 645275970Scy sigaddset(&block, SIGALRM); 646275970Scy sigaddset(&block, MOREDEBUGSIG); 647275970Scy sigaddset(&block, LESSDEBUGSIG); 648275970Scy# ifdef SIGDIE1 649275970Scy sigaddset(&block, SIGDIE1); 650275970Scy# endif 651275970Scy# ifdef SIGDIE2 652275970Scy sigaddset(&block, SIGDIE2); 653275970Scy# endif 654275970Scy# ifdef SIGDIE3 655275970Scy sigaddset(&block, SIGDIE3); 656275970Scy# endif 657275970Scy# ifdef SIGDIE4 658275970Scy sigaddset(&block, SIGDIE4); 659275970Scy# endif 660275970Scy# ifdef SIGBUS 661275970Scy sigaddset(&block, SIGBUS); 662275970Scy# endif 663275970Scy sigemptyset(pmask); 664275970Scy pthread_sigmask(SIG_BLOCK, &block, pmask); 665275970Scy} 666275970Scy#endif /* !SYS_WINNT */ 667275970Scy 668275970Scy 669293894Sglebius/* -------------------------------------------------------------------- 670293894Sglebius * Create & destroy semaphores. This is sufficiently different between 671293894Sglebius * POSIX and Windows to warrant wrapper functions and close enough to 672293894Sglebius * use the concept of synchronization via semaphore for all platforms. 673293894Sglebius */ 674293894Sglebiusstatic sem_ref 675293894Sglebiuscreate_sema( 676293894Sglebius sema_type* semptr, 677293894Sglebius u_int inival, 678293894Sglebius u_int maxval) 679293894Sglebius{ 680293894Sglebius#ifdef SYS_WINNT 681293894Sglebius 682293894Sglebius long svini, svmax; 683293894Sglebius if (NULL != semptr) { 684293894Sglebius svini = (inival < LONG_MAX) 685293894Sglebius ? (long)inival : LONG_MAX; 686293894Sglebius svmax = (maxval < LONG_MAX && maxval > 0) 687293894Sglebius ? (long)maxval : LONG_MAX; 688293894Sglebius semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 689293894Sglebius if (NULL == semptr->shnd) 690293894Sglebius semptr = NULL; 691293894Sglebius } 692293894Sglebius 693293894Sglebius#else 694293894Sglebius 695293894Sglebius (void)maxval; 696293894Sglebius if (semptr && sem_init(semptr, FALSE, inival)) 697293894Sglebius semptr = NULL; 698293894Sglebius 699293894Sglebius#endif 700293894Sglebius 701293894Sglebius return semptr; 702293894Sglebius} 703293894Sglebius 704293894Sglebius/* ------------------------------------------------------------------ */ 705293894Sglebiusstatic sem_ref 706293894Sglebiusdelete_sema( 707293894Sglebius sem_ref obj) 708293894Sglebius{ 709293894Sglebius 710293894Sglebius# ifdef SYS_WINNT 711293894Sglebius 712293894Sglebius if (obj) { 713293894Sglebius if (obj->shnd) 714293894Sglebius CloseHandle(obj->shnd); 715293894Sglebius obj->shnd = NULL; 716293894Sglebius } 717293894Sglebius 718293894Sglebius# else 719293894Sglebius 720293894Sglebius if (obj) 721293894Sglebius sem_destroy(obj); 722293894Sglebius 723293894Sglebius# endif 724293894Sglebius 725293894Sglebius return NULL; 726293894Sglebius} 727293894Sglebius 728293894Sglebius/* -------------------------------------------------------------------- 729275970Scy * prepare_child_sems() 730275970Scy * 731293894Sglebius * create sync & access semaphores 732275970Scy * 733293894Sglebius * All semaphores are cleared, only the access semaphore has 1 unit. 734293894Sglebius * Childs wait on 'workitems_pending', then grabs 'sema_access' 735293894Sglebius * and dequeues jobs. When done, 'sema_access' is given one unit back. 736293894Sglebius * 737293894Sglebius * The producer grabs 'sema_access', manages the queue, restores 738293894Sglebius * 'sema_access' and puts one unit into 'workitems_pending'. 739293894Sglebius * 740293894Sglebius * The story goes the same for the response queue. 741275970Scy */ 742275970Scystatic void 743275970Scyprepare_child_sems( 744275970Scy blocking_child *c 745275970Scy ) 746275970Scy{ 747298770Sdelphij if (NULL == worker_memlock) 748298770Sdelphij worker_memlock = create_sema(&worker_mmutex, 1, 1); 749298770Sdelphij 750293894Sglebius c->accesslock = create_sema(&c->sem_table[0], 1, 1); 751293894Sglebius c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 752293894Sglebius c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 753293894Sglebius# ifndef WORK_PIPE 754293894Sglebius c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 755293894Sglebius# endif 756275970Scy} 757275970Scy 758293894Sglebius/* -------------------------------------------------------------------- 759293894Sglebius * wait for semaphore. Where the wait can be interrupted, it will 760293894Sglebius * internally resume -- When this function returns, there is either no 761293894Sglebius * semaphore at all, a timeout occurred, or the caller could 762293894Sglebius * successfully take a token from the semaphore. 763293894Sglebius * 764293894Sglebius * For untimed wait, not checking the result of this function at all is 765293894Sglebius * definitely an option. 766293894Sglebius */ 767275970Scystatic int 768275970Scywait_for_sem( 769275970Scy sem_ref sem, 770275970Scy struct timespec * timeout /* wall-clock */ 771275970Scy ) 772275970Scy#ifdef SYS_WINNT 773275970Scy{ 774275970Scy struct timespec now; 775275970Scy struct timespec delta; 776275970Scy DWORD msec; 777275970Scy DWORD rc; 778275970Scy 779293894Sglebius if (!(sem && sem->shnd)) { 780293894Sglebius errno = EINVAL; 781293894Sglebius return -1; 782293894Sglebius } 783293894Sglebius 784275970Scy if (NULL == timeout) { 785275970Scy msec = INFINITE; 786275970Scy } else { 787275970Scy getclock(TIMEOFDAY, &now); 788275970Scy delta = sub_tspec(*timeout, now); 789275970Scy if (delta.tv_sec < 0) { 790275970Scy msec = 0; 791275970Scy } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 792275970Scy msec = INFINITE; 793275970Scy } else { 794275970Scy msec = 1000 * (DWORD)delta.tv_sec; 795275970Scy msec += delta.tv_nsec / (1000 * 1000); 796275970Scy } 797275970Scy } 798293894Sglebius rc = WaitForSingleObject(sem->shnd, msec); 799275970Scy if (WAIT_OBJECT_0 == rc) 800275970Scy return 0; 801275970Scy if (WAIT_TIMEOUT == rc) { 802275970Scy errno = ETIMEDOUT; 803275970Scy return -1; 804275970Scy } 805275970Scy msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 806275970Scy errno = EFAULT; 807275970Scy return -1; 808275970Scy} 809275970Scy#else /* pthreads wait_for_sem() follows */ 810275970Scy{ 811293894Sglebius int rc = -1; 812275970Scy 813293894Sglebius if (sem) do { 814293894Sglebius if (NULL == timeout) 815293894Sglebius rc = sem_wait(sem); 816293894Sglebius else 817293894Sglebius rc = sem_timedwait(sem, timeout); 818293894Sglebius } while (rc == -1 && errno == EINTR); 819275970Scy else 820293894Sglebius errno = EINVAL; 821293894Sglebius 822275970Scy return rc; 823275970Scy} 824275970Scy#endif 825275970Scy 826293894Sglebius/* -------------------------------------------------------------------- 827293894Sglebius * blocking_thread - thread functions have WINAPI (aka 'stdcall') 828293894Sglebius * calling conventions under Windows and POSIX-defined signature 829293894Sglebius * otherwise. 830275970Scy */ 831275970Scy#ifdef SYS_WINNT 832293894Sglebiusu_int WINAPI 833275970Scy#else 834275970Scyvoid * 835275970Scy#endif 836275970Scyblocking_thread( 837275970Scy void * ThreadArg 838275970Scy ) 839275970Scy{ 840275970Scy blocking_child *c; 841275970Scy 842275970Scy c = ThreadArg; 843275970Scy exit_worker(blocking_child_common(c)); 844275970Scy 845275970Scy /* NOTREACHED */ 846275970Scy return 0; 847275970Scy} 848275970Scy 849293894Sglebius/* -------------------------------------------------------------------- 850275970Scy * req_child_exit() runs in the parent. 851293894Sglebius * 852293894Sglebius * This function is called from from the idle timer, too, and possibly 853293894Sglebius * without a thread being there any longer. Since we have folded up our 854293894Sglebius * tent in that case and all the semaphores are already gone, we simply 855293894Sglebius * ignore this request in this case. 856293894Sglebius * 857293894Sglebius * Since the existence of the semaphores is controlled exclusively by 858293894Sglebius * the parent, there's no risk of data race here. 859275970Scy */ 860275970Scyint 861275970Scyreq_child_exit( 862275970Scy blocking_child *c 863275970Scy ) 864275970Scy{ 865293894Sglebius return (c->accesslock) 866293894Sglebius ? queue_req_pointer(c, CHILD_EXIT_REQ) 867293894Sglebius : 0; 868275970Scy} 869275970Scy 870293894Sglebius/* -------------------------------------------------------------------- 871275970Scy * cleanup_after_child() runs in parent. 872275970Scy */ 873275970Scystatic void 874275970Scycleanup_after_child( 875275970Scy blocking_child * c 876275970Scy ) 877275970Scy{ 878275970Scy DEBUG_INSIST(!c->reusable); 879293894Sglebius 880293894Sglebius# ifdef SYS_WINNT 881293894Sglebius /* The thread was not created in detached state, so we better 882293894Sglebius * clean up. 883293894Sglebius */ 884293894Sglebius if (c->thread_ref && c->thread_ref->thnd) { 885293894Sglebius WaitForSingleObject(c->thread_ref->thnd, INFINITE); 886293894Sglebius INSIST(CloseHandle(c->thread_ref->thnd)); 887293894Sglebius c->thread_ref->thnd = NULL; 888293894Sglebius } 889293894Sglebius# endif 890275970Scy c->thread_ref = NULL; 891293894Sglebius 892293894Sglebius /* remove semaphores and (if signalling vi IO) pipes */ 893293894Sglebius 894293894Sglebius c->accesslock = delete_sema(c->accesslock); 895293894Sglebius c->workitems_pending = delete_sema(c->workitems_pending); 896293894Sglebius c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 897293894Sglebius 898293894Sglebius# ifdef WORK_PIPE 899275970Scy DEBUG_INSIST(-1 != c->resp_read_pipe); 900275970Scy DEBUG_INSIST(-1 != c->resp_write_pipe); 901275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 902275970Scy close(c->resp_write_pipe); 903275970Scy close(c->resp_read_pipe); 904275970Scy c->resp_write_pipe = -1; 905275970Scy c->resp_read_pipe = -1; 906293894Sglebius# else 907293894Sglebius DEBUG_INSIST(NULL != c->responses_pending); 908293894Sglebius (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 909293894Sglebius c->responses_pending = delete_sema(c->responses_pending); 910293894Sglebius# endif 911293894Sglebius 912293894Sglebius /* Is it necessary to check if there are pending requests and 913293894Sglebius * responses? If so, and if there are, what to do with them? 914293894Sglebius */ 915293894Sglebius 916293894Sglebius /* re-init buffer index sequencers */ 917293894Sglebius c->head_workitem = 0; 918293894Sglebius c->tail_workitem = 0; 919293894Sglebius c->head_response = 0; 920293894Sglebius c->tail_response = 0; 921293894Sglebius 922275970Scy c->reusable = TRUE; 923275970Scy} 924275970Scy 925275970Scy 926275970Scy#else /* !WORK_THREAD follows */ 927275970Scychar work_thread_nonempty_compilation_unit; 928275970Scy#endif 929