work_thread.c revision 298695
1275970Scy/* 2275970Scy * work_thread.c - threads implementation for blocking worker child. 3275970Scy */ 4275970Scy#include <config.h> 5275970Scy#include "ntp_workimpl.h" 6275970Scy 7275970Scy#ifdef WORK_THREAD 8275970Scy 9275970Scy#include <stdio.h> 10275970Scy#include <ctype.h> 11275970Scy#include <signal.h> 12275970Scy#ifndef SYS_WINNT 13275970Scy#include <pthread.h> 14275970Scy#endif 15275970Scy 16275970Scy#include "ntp_stdlib.h" 17275970Scy#include "ntp_malloc.h" 18275970Scy#include "ntp_syslog.h" 19275970Scy#include "ntpd.h" 20275970Scy#include "ntp_io.h" 21275970Scy#include "ntp_assert.h" 22275970Scy#include "ntp_unixtime.h" 23275970Scy#include "timespecops.h" 24275970Scy#include "ntp_worker.h" 25275970Scy 26275970Scy#define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1) 27275970Scy#define CHILD_GONE_RESP CHILD_EXIT_REQ 28294554Sdelphij/* Queue size increments: 29294554Sdelphij * The request queue grows a bit faster than the response queue -- the 30294554Sdelphij * deamon can push requests and pull results faster on avarage than the 31294554Sdelphij * worker can process requests and push results... If this really pays 32294554Sdelphij * off is debatable. 33294554Sdelphij */ 34275970Scy#define WORKITEMS_ALLOC_INC 16 35275970Scy#define RESPONSES_ALLOC_INC 4 36275970Scy 37294554Sdelphij/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we 38294554Sdelphij * set the maximum to 256kB. If the minimum goes below the 39294554Sdelphij * system-defined minimum stack size, we have to adjust accordingly. 40294554Sdelphij */ 41275970Scy#ifndef THREAD_MINSTACKSIZE 42294554Sdelphij# define THREAD_MINSTACKSIZE (64U * 1024) 43275970Scy#endif 44294554Sdelphij#ifndef __sun 45294554Sdelphij#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN 46294554Sdelphij# undef THREAD_MINSTACKSIZE 47294554Sdelphij# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN 48294554Sdelphij#endif 49294554Sdelphij#endif 50275970Scy 51294554Sdelphij#ifndef THREAD_MAXSTACKSIZE 52294554Sdelphij# define THREAD_MAXSTACKSIZE (256U * 1024) 53294554Sdelphij#endif 54294554Sdelphij#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE 55294554Sdelphij# undef THREAD_MAXSTACKSIZE 56294554Sdelphij# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE 57294554Sdelphij#endif 58294554Sdelphij 59294554Sdelphij 60293423Sdelphij#ifdef SYS_WINNT 61275970Scy 62275970Scy# define thread_exit(c) _endthreadex(c) 63293423Sdelphij# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL) 64293423Sdelphiju_int WINAPI blocking_thread(void *); 65293423Sdelphijstatic BOOL same_os_sema(const sem_ref obj, void * osobj); 66293423Sdelphij 67275970Scy#else 68293423Sdelphij 69275970Scy# define thread_exit(c) pthread_exit((void*)(size_t)(c)) 70275970Scy# define tickle_sem sem_post 71293423Sdelphijvoid * blocking_thread(void *); 72293423Sdelphijstatic void block_thread_signals(sigset_t *); 73293423Sdelphij 74275970Scy#endif 75275970Scy 76275970Scy#ifdef WORK_PIPE 77275970Scyaddremove_io_fd_func addremove_io_fd; 78275970Scy#else 79275970Scyaddremove_io_semaphore_func addremove_io_semaphore; 80275970Scy#endif 81275970Scy 82275970Scystatic void start_blocking_thread(blocking_child *); 83275970Scystatic void start_blocking_thread_internal(blocking_child *); 84275970Scystatic void prepare_child_sems(blocking_child *); 85275970Scystatic int wait_for_sem(sem_ref, struct timespec *); 86293423Sdelphijstatic int ensure_workitems_empty_slot(blocking_child *); 87293423Sdelphijstatic int ensure_workresp_empty_slot(blocking_child *); 88275970Scystatic int queue_req_pointer(blocking_child *, blocking_pipe_header *); 89275970Scystatic void cleanup_after_child(blocking_child *); 90275970Scy 91298695Sdelphijstatic sema_type worker_mmutex; 92298695Sdelphijstatic sem_ref worker_memlock; 93275970Scy 94298695Sdelphij/* -------------------------------------------------------------------- 95298695Sdelphij * locking the global worker state table (and other global stuff) 96298695Sdelphij */ 97275970Scyvoid 98298695Sdelphijworker_global_lock( 99298695Sdelphij int inOrOut) 100298695Sdelphij{ 101298695Sdelphij if (worker_memlock) { 102298695Sdelphij if (inOrOut) 103298695Sdelphij wait_for_sem(worker_memlock, NULL); 104298695Sdelphij else 105298695Sdelphij tickle_sem(worker_memlock); 106298695Sdelphij } 107298695Sdelphij} 108298695Sdelphij 109298695Sdelphij/* -------------------------------------------------------------------- 110298695Sdelphij * implementation isolation wrapper 111298695Sdelphij */ 112298695Sdelphijvoid 113275970Scyexit_worker( 114275970Scy int exitcode 115275970Scy ) 116275970Scy{ 117275970Scy thread_exit(exitcode); /* see #define thread_exit */ 118275970Scy} 119275970Scy 120293423Sdelphij/* -------------------------------------------------------------------- 121293423Sdelphij * sleep for a given time or until the wakup semaphore is tickled. 122293423Sdelphij */ 123275970Scyint 124275970Scyworker_sleep( 125275970Scy blocking_child * c, 126275970Scy time_t seconds 127275970Scy ) 128275970Scy{ 129275970Scy struct timespec until; 130275970Scy int rc; 131275970Scy 132275970Scy# ifdef HAVE_CLOCK_GETTIME 133275970Scy if (0 != clock_gettime(CLOCK_REALTIME, &until)) { 134275970Scy msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m"); 135275970Scy return -1; 136275970Scy } 137275970Scy# else 138275970Scy if (0 != getclock(TIMEOFDAY, &until)) { 139275970Scy msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m"); 140275970Scy return -1; 141275970Scy } 142275970Scy# endif 143275970Scy until.tv_sec += seconds; 144293423Sdelphij rc = wait_for_sem(c->wake_scheduled_sleep, &until); 145275970Scy if (0 == rc) 146275970Scy return -1; 147275970Scy if (-1 == rc && ETIMEDOUT == errno) 148275970Scy return 0; 149275970Scy msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m"); 150275970Scy return -1; 151275970Scy} 152275970Scy 153275970Scy 154293423Sdelphij/* -------------------------------------------------------------------- 155293423Sdelphij * Wake up a worker that takes a nap. 156293423Sdelphij */ 157275970Scyvoid 158275970Scyinterrupt_worker_sleep(void) 159275970Scy{ 160275970Scy u_int idx; 161275970Scy blocking_child * c; 162275970Scy 163275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 164275970Scy c = blocking_children[idx]; 165275970Scy if (NULL == c || NULL == c->wake_scheduled_sleep) 166275970Scy continue; 167275970Scy tickle_sem(c->wake_scheduled_sleep); 168275970Scy } 169275970Scy} 170275970Scy 171293423Sdelphij/* -------------------------------------------------------------------- 172293423Sdelphij * Make sure there is an empty slot at the head of the request 173293423Sdelphij * queue. Tell if the queue is currently empty. 174293423Sdelphij */ 175293423Sdelphijstatic int 176275970Scyensure_workitems_empty_slot( 177275970Scy blocking_child *c 178275970Scy ) 179275970Scy{ 180293423Sdelphij /* 181293423Sdelphij ** !!! PRECONDITION: caller holds access lock! 182293423Sdelphij ** 183293423Sdelphij ** This simply tries to increase the size of the buffer if it 184293423Sdelphij ** becomes full. The resize operation does *not* maintain the 185293423Sdelphij ** order of requests, but that should be irrelevant since the 186293423Sdelphij ** processing is considered asynchronous anyway. 187293423Sdelphij ** 188293423Sdelphij ** Return if the buffer is currently empty. 189293423Sdelphij */ 190293423Sdelphij 191293423Sdelphij static const size_t each = 192293423Sdelphij sizeof(blocking_children[0]->workitems[0]); 193275970Scy 194293423Sdelphij size_t new_alloc; 195293423Sdelphij size_t slots_used; 196294554Sdelphij size_t sidx; 197275970Scy 198293423Sdelphij slots_used = c->head_workitem - c->tail_workitem; 199293423Sdelphij if (slots_used >= c->workitems_alloc) { 200293423Sdelphij new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC; 201293423Sdelphij c->workitems = erealloc(c->workitems, new_alloc * each); 202294554Sdelphij for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx) 203294554Sdelphij c->workitems[sidx] = NULL; 204293423Sdelphij c->tail_workitem = 0; 205293423Sdelphij c->head_workitem = c->workitems_alloc; 206293423Sdelphij c->workitems_alloc = new_alloc; 207293423Sdelphij } 208294554Sdelphij INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]); 209293423Sdelphij return (0 == slots_used); 210275970Scy} 211275970Scy 212293423Sdelphij/* -------------------------------------------------------------------- 213293423Sdelphij * Make sure there is an empty slot at the head of the response 214293423Sdelphij * queue. Tell if the queue is currently empty. 215293423Sdelphij */ 216293423Sdelphijstatic int 217275970Scyensure_workresp_empty_slot( 218275970Scy blocking_child *c 219275970Scy ) 220275970Scy{ 221293423Sdelphij /* 222293423Sdelphij ** !!! PRECONDITION: caller holds access lock! 223293423Sdelphij ** 224293423Sdelphij ** Works like the companion function above. 225293423Sdelphij */ 226293423Sdelphij 227293423Sdelphij static const size_t each = 228293423Sdelphij sizeof(blocking_children[0]->responses[0]); 229275970Scy 230293423Sdelphij size_t new_alloc; 231293423Sdelphij size_t slots_used; 232294554Sdelphij size_t sidx; 233275970Scy 234293423Sdelphij slots_used = c->head_response - c->tail_response; 235293423Sdelphij if (slots_used >= c->responses_alloc) { 236293423Sdelphij new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC; 237293423Sdelphij c->responses = erealloc(c->responses, new_alloc * each); 238294554Sdelphij for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx) 239294554Sdelphij c->responses[sidx] = NULL; 240293423Sdelphij c->tail_response = 0; 241293423Sdelphij c->head_response = c->responses_alloc; 242293423Sdelphij c->responses_alloc = new_alloc; 243293423Sdelphij } 244294554Sdelphij INSIST(NULL == c->responses[c->head_response % c->responses_alloc]); 245293423Sdelphij return (0 == slots_used); 246275970Scy} 247275970Scy 248275970Scy 249293423Sdelphij/* -------------------------------------------------------------------- 250275970Scy * queue_req_pointer() - append a work item or idle exit request to 251293423Sdelphij * blocking_workitems[]. Employ proper locking. 252275970Scy */ 253275970Scystatic int 254275970Scyqueue_req_pointer( 255275970Scy blocking_child * c, 256275970Scy blocking_pipe_header * hdr 257275970Scy ) 258275970Scy{ 259293423Sdelphij size_t qhead; 260293423Sdelphij 261293423Sdelphij /* >>>> ACCESS LOCKING STARTS >>>> */ 262293423Sdelphij wait_for_sem(c->accesslock, NULL); 263293423Sdelphij ensure_workitems_empty_slot(c); 264293423Sdelphij qhead = c->head_workitem; 265293423Sdelphij c->workitems[qhead % c->workitems_alloc] = hdr; 266293423Sdelphij c->head_workitem = 1 + qhead; 267293423Sdelphij tickle_sem(c->accesslock); 268293423Sdelphij /* <<<< ACCESS LOCKING ENDS <<<< */ 269275970Scy 270293423Sdelphij /* queue consumer wake-up notification */ 271293423Sdelphij tickle_sem(c->workitems_pending); 272275970Scy 273275970Scy return 0; 274275970Scy} 275275970Scy 276293423Sdelphij/* -------------------------------------------------------------------- 277293423Sdelphij * API function to make sure a worker is running, a proper private copy 278293423Sdelphij * of the data is made, the data eneterd into the queue and the worker 279293423Sdelphij * is signalled. 280293423Sdelphij */ 281275970Scyint 282275970Scysend_blocking_req_internal( 283275970Scy blocking_child * c, 284275970Scy blocking_pipe_header * hdr, 285275970Scy void * data 286275970Scy ) 287275970Scy{ 288275970Scy blocking_pipe_header * threadcopy; 289275970Scy size_t payload_octets; 290275970Scy 291275970Scy REQUIRE(hdr != NULL); 292275970Scy REQUIRE(data != NULL); 293275970Scy DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig); 294275970Scy 295275970Scy if (hdr->octets <= sizeof(*hdr)) 296275970Scy return 1; /* failure */ 297275970Scy payload_octets = hdr->octets - sizeof(*hdr); 298275970Scy 299293423Sdelphij if (NULL == c->thread_ref) 300275970Scy start_blocking_thread(c); 301275970Scy threadcopy = emalloc(hdr->octets); 302275970Scy memcpy(threadcopy, hdr, sizeof(*hdr)); 303275970Scy memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets); 304275970Scy 305275970Scy return queue_req_pointer(c, threadcopy); 306275970Scy} 307275970Scy 308293423Sdelphij/* -------------------------------------------------------------------- 309293423Sdelphij * Wait for the 'incoming queue no longer empty' signal, lock the shared 310293423Sdelphij * structure and dequeue an item. 311293423Sdelphij */ 312275970Scyblocking_pipe_header * 313275970Scyreceive_blocking_req_internal( 314275970Scy blocking_child * c 315275970Scy ) 316275970Scy{ 317275970Scy blocking_pipe_header * req; 318293423Sdelphij size_t qhead, qtail; 319275970Scy 320293423Sdelphij req = NULL; 321275970Scy do { 322293423Sdelphij /* wait for tickle from the producer side */ 323293423Sdelphij wait_for_sem(c->workitems_pending, NULL); 324275970Scy 325293423Sdelphij /* >>>> ACCESS LOCKING STARTS >>>> */ 326293423Sdelphij wait_for_sem(c->accesslock, NULL); 327293423Sdelphij qhead = c->head_workitem; 328293423Sdelphij do { 329293423Sdelphij qtail = c->tail_workitem; 330293423Sdelphij if (qhead == qtail) 331293423Sdelphij break; 332293423Sdelphij c->tail_workitem = qtail + 1; 333293423Sdelphij qtail %= c->workitems_alloc; 334293423Sdelphij req = c->workitems[qtail]; 335293423Sdelphij c->workitems[qtail] = NULL; 336293423Sdelphij } while (NULL == req); 337293423Sdelphij tickle_sem(c->accesslock); 338293423Sdelphij /* <<<< ACCESS LOCKING ENDS <<<< */ 339293423Sdelphij 340293423Sdelphij } while (NULL == req); 341293423Sdelphij 342275970Scy INSIST(NULL != req); 343275970Scy if (CHILD_EXIT_REQ == req) { /* idled out */ 344275970Scy send_blocking_resp_internal(c, CHILD_GONE_RESP); 345275970Scy req = NULL; 346275970Scy } 347275970Scy 348275970Scy return req; 349275970Scy} 350275970Scy 351293423Sdelphij/* -------------------------------------------------------------------- 352293423Sdelphij * Push a response into the return queue and eventually tickle the 353293423Sdelphij * receiver. 354293423Sdelphij */ 355275970Scyint 356275970Scysend_blocking_resp_internal( 357275970Scy blocking_child * c, 358275970Scy blocking_pipe_header * resp 359275970Scy ) 360275970Scy{ 361293423Sdelphij size_t qhead; 362293423Sdelphij int empty; 363293423Sdelphij 364293423Sdelphij /* >>>> ACCESS LOCKING STARTS >>>> */ 365293423Sdelphij wait_for_sem(c->accesslock, NULL); 366293423Sdelphij empty = ensure_workresp_empty_slot(c); 367293423Sdelphij qhead = c->head_response; 368293423Sdelphij c->responses[qhead % c->responses_alloc] = resp; 369293423Sdelphij c->head_response = 1 + qhead; 370293423Sdelphij tickle_sem(c->accesslock); 371293423Sdelphij /* <<<< ACCESS LOCKING ENDS <<<< */ 372275970Scy 373293423Sdelphij /* queue consumer wake-up notification */ 374293423Sdelphij if (empty) 375293423Sdelphij { 376293423Sdelphij# ifdef WORK_PIPE 377293423Sdelphij write(c->resp_write_pipe, "", 1); 378293423Sdelphij# else 379293423Sdelphij tickle_sem(c->responses_pending); 380293423Sdelphij# endif 381293423Sdelphij } 382275970Scy return 0; 383275970Scy} 384275970Scy 385275970Scy 386275970Scy#ifndef WORK_PIPE 387293423Sdelphij 388293423Sdelphij/* -------------------------------------------------------------------- 389293423Sdelphij * Check if a (Windows-)hanndle to a semaphore is actually the same we 390293423Sdelphij * are using inside the sema wrapper. 391293423Sdelphij */ 392293423Sdelphijstatic BOOL 393293423Sdelphijsame_os_sema( 394293423Sdelphij const sem_ref obj, 395293423Sdelphij void* osh 396293423Sdelphij ) 397293423Sdelphij{ 398293423Sdelphij return obj && osh && (obj->shnd == (HANDLE)osh); 399293423Sdelphij} 400293423Sdelphij 401293423Sdelphij/* -------------------------------------------------------------------- 402293423Sdelphij * Find the shared context that associates to an OS handle and make sure 403293423Sdelphij * the data is dequeued and processed. 404293423Sdelphij */ 405275970Scyvoid 406275970Scyhandle_blocking_resp_sem( 407275970Scy void * context 408275970Scy ) 409275970Scy{ 410275970Scy blocking_child * c; 411275970Scy u_int idx; 412275970Scy 413275970Scy c = NULL; 414275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 415275970Scy c = blocking_children[idx]; 416293423Sdelphij if (c != NULL && 417293423Sdelphij c->thread_ref != NULL && 418293423Sdelphij same_os_sema(c->responses_pending, context)) 419275970Scy break; 420275970Scy } 421275970Scy if (idx < blocking_children_alloc) 422275970Scy process_blocking_resp(c); 423275970Scy} 424275970Scy#endif /* !WORK_PIPE */ 425275970Scy 426293423Sdelphij/* -------------------------------------------------------------------- 427293423Sdelphij * Fetch the next response from the return queue. In case of signalling 428293423Sdelphij * via pipe, make sure the pipe is flushed, too. 429293423Sdelphij */ 430275970Scyblocking_pipe_header * 431275970Scyreceive_blocking_resp_internal( 432275970Scy blocking_child * c 433275970Scy ) 434275970Scy{ 435275970Scy blocking_pipe_header * removed; 436293423Sdelphij size_t qhead, qtail, slot; 437293423Sdelphij 438275970Scy#ifdef WORK_PIPE 439275970Scy int rc; 440275970Scy char scratch[32]; 441275970Scy 442293423Sdelphij do 443275970Scy rc = read(c->resp_read_pipe, scratch, sizeof(scratch)); 444293423Sdelphij while (-1 == rc && EINTR == errno); 445275970Scy#endif 446293423Sdelphij 447293423Sdelphij /* >>>> ACCESS LOCKING STARTS >>>> */ 448293423Sdelphij wait_for_sem(c->accesslock, NULL); 449293423Sdelphij qhead = c->head_response; 450293423Sdelphij qtail = c->tail_response; 451293423Sdelphij for (removed = NULL; !removed && (qhead != qtail); ++qtail) { 452293423Sdelphij slot = qtail % c->responses_alloc; 453293423Sdelphij removed = c->responses[slot]; 454293423Sdelphij c->responses[slot] = NULL; 455293423Sdelphij } 456293423Sdelphij c->tail_response = qtail; 457293423Sdelphij tickle_sem(c->accesslock); 458293423Sdelphij /* <<<< ACCESS LOCKING ENDS <<<< */ 459293423Sdelphij 460275970Scy if (NULL != removed) { 461275970Scy DEBUG_ENSURE(CHILD_GONE_RESP == removed || 462275970Scy BLOCKING_RESP_MAGIC == removed->magic_sig); 463275970Scy } 464275970Scy if (CHILD_GONE_RESP == removed) { 465275970Scy cleanup_after_child(c); 466275970Scy removed = NULL; 467275970Scy } 468275970Scy 469275970Scy return removed; 470275970Scy} 471275970Scy 472293423Sdelphij/* -------------------------------------------------------------------- 473293423Sdelphij * Light up a new worker. 474293423Sdelphij */ 475275970Scystatic void 476275970Scystart_blocking_thread( 477275970Scy blocking_child * c 478275970Scy ) 479275970Scy{ 480275970Scy 481275970Scy DEBUG_INSIST(!c->reusable); 482275970Scy 483275970Scy prepare_child_sems(c); 484275970Scy start_blocking_thread_internal(c); 485275970Scy} 486275970Scy 487293423Sdelphij/* -------------------------------------------------------------------- 488293423Sdelphij * Create a worker thread. There are several differences between POSIX 489293423Sdelphij * and Windows, of course -- most notably the Windows thread is no 490293423Sdelphij * detached thread, and we keep the handle around until we want to get 491293423Sdelphij * rid of the thread. The notification scheme also differs: Windows 492293423Sdelphij * makes use of semaphores in both directions, POSIX uses a pipe for 493293423Sdelphij * integration with 'select()' or alike. 494293423Sdelphij */ 495275970Scystatic void 496275970Scystart_blocking_thread_internal( 497275970Scy blocking_child * c 498275970Scy ) 499275970Scy#ifdef SYS_WINNT 500275970Scy{ 501275970Scy BOOL resumed; 502275970Scy 503293423Sdelphij c->thread_ref = NULL; 504293423Sdelphij (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE); 505293423Sdelphij c->thr_table[0].thnd = 506275970Scy (HANDLE)_beginthreadex( 507275970Scy NULL, 508275970Scy 0, 509275970Scy &blocking_thread, 510275970Scy c, 511275970Scy CREATE_SUSPENDED, 512293423Sdelphij NULL); 513275970Scy 514293423Sdelphij if (NULL == c->thr_table[0].thnd) { 515275970Scy msyslog(LOG_ERR, "start blocking thread failed: %m"); 516275970Scy exit(-1); 517275970Scy } 518275970Scy /* remember the thread priority is only within the process class */ 519293423Sdelphij if (!SetThreadPriority(c->thr_table[0].thnd, 520275970Scy THREAD_PRIORITY_BELOW_NORMAL)) 521275970Scy msyslog(LOG_ERR, "Error lowering blocking thread priority: %m"); 522275970Scy 523293423Sdelphij resumed = ResumeThread(c->thr_table[0].thnd); 524275970Scy DEBUG_INSIST(resumed); 525293423Sdelphij c->thread_ref = &c->thr_table[0]; 526275970Scy} 527275970Scy#else /* pthreads start_blocking_thread_internal() follows */ 528275970Scy{ 529275970Scy# ifdef NEED_PTHREAD_INIT 530275970Scy static int pthread_init_called; 531275970Scy# endif 532275970Scy pthread_attr_t thr_attr; 533275970Scy int rc; 534275970Scy int pipe_ends[2]; /* read then write */ 535275970Scy int is_pipe; 536275970Scy int flags; 537294554Sdelphij size_t ostacksize; 538294554Sdelphij size_t nstacksize; 539275970Scy sigset_t saved_sig_mask; 540275970Scy 541293423Sdelphij c->thread_ref = NULL; 542293423Sdelphij 543275970Scy# ifdef NEED_PTHREAD_INIT 544275970Scy /* 545275970Scy * from lib/isc/unix/app.c: 546275970Scy * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this. 547275970Scy */ 548275970Scy if (!pthread_init_called) { 549275970Scy pthread_init(); 550275970Scy pthread_init_called = TRUE; 551275970Scy } 552275970Scy# endif 553275970Scy 554275970Scy rc = pipe_socketpair(&pipe_ends[0], &is_pipe); 555275970Scy if (0 != rc) { 556275970Scy msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m"); 557275970Scy exit(1); 558275970Scy } 559275970Scy c->resp_read_pipe = move_fd(pipe_ends[0]); 560275970Scy c->resp_write_pipe = move_fd(pipe_ends[1]); 561275970Scy c->ispipe = is_pipe; 562275970Scy flags = fcntl(c->resp_read_pipe, F_GETFL, 0); 563275970Scy if (-1 == flags) { 564275970Scy msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m"); 565275970Scy exit(1); 566275970Scy } 567275970Scy rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags); 568275970Scy if (-1 == rc) { 569275970Scy msyslog(LOG_ERR, 570275970Scy "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m"); 571275970Scy exit(1); 572275970Scy } 573275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE); 574275970Scy pthread_attr_init(&thr_attr); 575275970Scy pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED); 576275970Scy#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \ 577275970Scy defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE) 578294554Sdelphij rc = pthread_attr_getstacksize(&thr_attr, &ostacksize); 579294554Sdelphij if (0 != rc) { 580275970Scy msyslog(LOG_ERR, 581294554Sdelphij "start_blocking_thread: pthread_attr_getstacksize() -> %s", 582294554Sdelphij strerror(rc)); 583294554Sdelphij } else { 584294554Sdelphij if (ostacksize < THREAD_MINSTACKSIZE) 585294554Sdelphij nstacksize = THREAD_MINSTACKSIZE; 586294554Sdelphij else if (ostacksize > THREAD_MAXSTACKSIZE) 587294554Sdelphij nstacksize = THREAD_MAXSTACKSIZE; 588294554Sdelphij else 589294554Sdelphij nstacksize = ostacksize; 590294554Sdelphij if (nstacksize != ostacksize) 591294554Sdelphij rc = pthread_attr_setstacksize(&thr_attr, nstacksize); 592294554Sdelphij if (0 != rc) 593275970Scy msyslog(LOG_ERR, 594294554Sdelphij "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s", 595294554Sdelphij (u_long)ostacksize, (u_long)nstacksize, 596294554Sdelphij strerror(rc)); 597275970Scy } 598275970Scy#else 599294554Sdelphij UNUSED_ARG(nstacksize); 600294554Sdelphij UNUSED_ARG(ostacksize); 601275970Scy#endif 602275970Scy#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM) 603275970Scy pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM); 604275970Scy#endif 605275970Scy c->thread_ref = emalloc_zero(sizeof(*c->thread_ref)); 606275970Scy block_thread_signals(&saved_sig_mask); 607293423Sdelphij rc = pthread_create(&c->thr_table[0], &thr_attr, 608275970Scy &blocking_thread, c); 609275970Scy pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL); 610275970Scy pthread_attr_destroy(&thr_attr); 611275970Scy if (0 != rc) { 612294554Sdelphij msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s", 613294554Sdelphij strerror(rc)); 614275970Scy exit(1); 615275970Scy } 616293423Sdelphij c->thread_ref = &c->thr_table[0]; 617275970Scy} 618275970Scy#endif 619275970Scy 620293423Sdelphij/* -------------------------------------------------------------------- 621275970Scy * block_thread_signals() 622275970Scy * 623275970Scy * Temporarily block signals used by ntpd main thread, so that signal 624275970Scy * mask inherited by child threads leaves them blocked. Returns prior 625275970Scy * active signal mask via pmask, to be restored by the main thread 626275970Scy * after pthread_create(). 627275970Scy */ 628275970Scy#ifndef SYS_WINNT 629275970Scyvoid 630275970Scyblock_thread_signals( 631275970Scy sigset_t * pmask 632275970Scy ) 633275970Scy{ 634275970Scy sigset_t block; 635275970Scy 636275970Scy sigemptyset(&block); 637275970Scy# ifdef HAVE_SIGNALED_IO 638275970Scy# ifdef SIGIO 639275970Scy sigaddset(&block, SIGIO); 640275970Scy# endif 641275970Scy# ifdef SIGPOLL 642275970Scy sigaddset(&block, SIGPOLL); 643275970Scy# endif 644275970Scy# endif /* HAVE_SIGNALED_IO */ 645275970Scy sigaddset(&block, SIGALRM); 646275970Scy sigaddset(&block, MOREDEBUGSIG); 647275970Scy sigaddset(&block, LESSDEBUGSIG); 648275970Scy# ifdef SIGDIE1 649275970Scy sigaddset(&block, SIGDIE1); 650275970Scy# endif 651275970Scy# ifdef SIGDIE2 652275970Scy sigaddset(&block, SIGDIE2); 653275970Scy# endif 654275970Scy# ifdef SIGDIE3 655275970Scy sigaddset(&block, SIGDIE3); 656275970Scy# endif 657275970Scy# ifdef SIGDIE4 658275970Scy sigaddset(&block, SIGDIE4); 659275970Scy# endif 660275970Scy# ifdef SIGBUS 661275970Scy sigaddset(&block, SIGBUS); 662275970Scy# endif 663275970Scy sigemptyset(pmask); 664275970Scy pthread_sigmask(SIG_BLOCK, &block, pmask); 665275970Scy} 666275970Scy#endif /* !SYS_WINNT */ 667275970Scy 668275970Scy 669293423Sdelphij/* -------------------------------------------------------------------- 670293423Sdelphij * Create & destroy semaphores. This is sufficiently different between 671293423Sdelphij * POSIX and Windows to warrant wrapper functions and close enough to 672293423Sdelphij * use the concept of synchronization via semaphore for all platforms. 673293423Sdelphij */ 674293423Sdelphijstatic sem_ref 675293423Sdelphijcreate_sema( 676293423Sdelphij sema_type* semptr, 677293423Sdelphij u_int inival, 678293423Sdelphij u_int maxval) 679293423Sdelphij{ 680293423Sdelphij#ifdef SYS_WINNT 681293423Sdelphij 682293423Sdelphij long svini, svmax; 683293423Sdelphij if (NULL != semptr) { 684293423Sdelphij svini = (inival < LONG_MAX) 685293423Sdelphij ? (long)inival : LONG_MAX; 686293423Sdelphij svmax = (maxval < LONG_MAX && maxval > 0) 687293423Sdelphij ? (long)maxval : LONG_MAX; 688293423Sdelphij semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL); 689293423Sdelphij if (NULL == semptr->shnd) 690293423Sdelphij semptr = NULL; 691293423Sdelphij } 692293423Sdelphij 693293423Sdelphij#else 694293423Sdelphij 695293423Sdelphij (void)maxval; 696293423Sdelphij if (semptr && sem_init(semptr, FALSE, inival)) 697293423Sdelphij semptr = NULL; 698293423Sdelphij 699293423Sdelphij#endif 700293423Sdelphij 701293423Sdelphij return semptr; 702293423Sdelphij} 703293423Sdelphij 704293423Sdelphij/* ------------------------------------------------------------------ */ 705293423Sdelphijstatic sem_ref 706293423Sdelphijdelete_sema( 707293423Sdelphij sem_ref obj) 708293423Sdelphij{ 709293423Sdelphij 710293423Sdelphij# ifdef SYS_WINNT 711293423Sdelphij 712293423Sdelphij if (obj) { 713293423Sdelphij if (obj->shnd) 714293423Sdelphij CloseHandle(obj->shnd); 715293423Sdelphij obj->shnd = NULL; 716293423Sdelphij } 717293423Sdelphij 718293423Sdelphij# else 719293423Sdelphij 720293423Sdelphij if (obj) 721293423Sdelphij sem_destroy(obj); 722293423Sdelphij 723293423Sdelphij# endif 724293423Sdelphij 725293423Sdelphij return NULL; 726293423Sdelphij} 727293423Sdelphij 728293423Sdelphij/* -------------------------------------------------------------------- 729275970Scy * prepare_child_sems() 730275970Scy * 731293423Sdelphij * create sync & access semaphores 732275970Scy * 733293423Sdelphij * All semaphores are cleared, only the access semaphore has 1 unit. 734293423Sdelphij * Childs wait on 'workitems_pending', then grabs 'sema_access' 735293423Sdelphij * and dequeues jobs. When done, 'sema_access' is given one unit back. 736293423Sdelphij * 737293423Sdelphij * The producer grabs 'sema_access', manages the queue, restores 738293423Sdelphij * 'sema_access' and puts one unit into 'workitems_pending'. 739293423Sdelphij * 740293423Sdelphij * The story goes the same for the response queue. 741275970Scy */ 742275970Scystatic void 743275970Scyprepare_child_sems( 744275970Scy blocking_child *c 745275970Scy ) 746275970Scy{ 747298695Sdelphij if (NULL == worker_memlock) 748298695Sdelphij worker_memlock = create_sema(&worker_mmutex, 1, 1); 749298695Sdelphij 750293423Sdelphij c->accesslock = create_sema(&c->sem_table[0], 1, 1); 751293423Sdelphij c->workitems_pending = create_sema(&c->sem_table[1], 0, 0); 752293423Sdelphij c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1); 753293423Sdelphij# ifndef WORK_PIPE 754293423Sdelphij c->responses_pending = create_sema(&c->sem_table[3], 0, 0); 755293423Sdelphij# endif 756275970Scy} 757275970Scy 758293423Sdelphij/* -------------------------------------------------------------------- 759293423Sdelphij * wait for semaphore. Where the wait can be interrupted, it will 760293423Sdelphij * internally resume -- When this function returns, there is either no 761293423Sdelphij * semaphore at all, a timeout occurred, or the caller could 762293423Sdelphij * successfully take a token from the semaphore. 763293423Sdelphij * 764293423Sdelphij * For untimed wait, not checking the result of this function at all is 765293423Sdelphij * definitely an option. 766293423Sdelphij */ 767275970Scystatic int 768275970Scywait_for_sem( 769275970Scy sem_ref sem, 770275970Scy struct timespec * timeout /* wall-clock */ 771275970Scy ) 772275970Scy#ifdef SYS_WINNT 773275970Scy{ 774275970Scy struct timespec now; 775275970Scy struct timespec delta; 776275970Scy DWORD msec; 777275970Scy DWORD rc; 778275970Scy 779293423Sdelphij if (!(sem && sem->shnd)) { 780293423Sdelphij errno = EINVAL; 781293423Sdelphij return -1; 782293423Sdelphij } 783293423Sdelphij 784275970Scy if (NULL == timeout) { 785275970Scy msec = INFINITE; 786275970Scy } else { 787275970Scy getclock(TIMEOFDAY, &now); 788275970Scy delta = sub_tspec(*timeout, now); 789275970Scy if (delta.tv_sec < 0) { 790275970Scy msec = 0; 791275970Scy } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) { 792275970Scy msec = INFINITE; 793275970Scy } else { 794275970Scy msec = 1000 * (DWORD)delta.tv_sec; 795275970Scy msec += delta.tv_nsec / (1000 * 1000); 796275970Scy } 797275970Scy } 798293423Sdelphij rc = WaitForSingleObject(sem->shnd, msec); 799275970Scy if (WAIT_OBJECT_0 == rc) 800275970Scy return 0; 801275970Scy if (WAIT_TIMEOUT == rc) { 802275970Scy errno = ETIMEDOUT; 803275970Scy return -1; 804275970Scy } 805275970Scy msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc); 806275970Scy errno = EFAULT; 807275970Scy return -1; 808275970Scy} 809275970Scy#else /* pthreads wait_for_sem() follows */ 810275970Scy{ 811293423Sdelphij int rc = -1; 812275970Scy 813293423Sdelphij if (sem) do { 814293423Sdelphij if (NULL == timeout) 815293423Sdelphij rc = sem_wait(sem); 816293423Sdelphij else 817293423Sdelphij rc = sem_timedwait(sem, timeout); 818293423Sdelphij } while (rc == -1 && errno == EINTR); 819275970Scy else 820293423Sdelphij errno = EINVAL; 821293423Sdelphij 822275970Scy return rc; 823275970Scy} 824275970Scy#endif 825275970Scy 826293423Sdelphij/* -------------------------------------------------------------------- 827293423Sdelphij * blocking_thread - thread functions have WINAPI (aka 'stdcall') 828293423Sdelphij * calling conventions under Windows and POSIX-defined signature 829293423Sdelphij * otherwise. 830275970Scy */ 831275970Scy#ifdef SYS_WINNT 832293423Sdelphiju_int WINAPI 833275970Scy#else 834275970Scyvoid * 835275970Scy#endif 836275970Scyblocking_thread( 837275970Scy void * ThreadArg 838275970Scy ) 839275970Scy{ 840275970Scy blocking_child *c; 841275970Scy 842275970Scy c = ThreadArg; 843275970Scy exit_worker(blocking_child_common(c)); 844275970Scy 845275970Scy /* NOTREACHED */ 846275970Scy return 0; 847275970Scy} 848275970Scy 849293423Sdelphij/* -------------------------------------------------------------------- 850275970Scy * req_child_exit() runs in the parent. 851293423Sdelphij * 852293423Sdelphij * This function is called from from the idle timer, too, and possibly 853293423Sdelphij * without a thread being there any longer. Since we have folded up our 854293423Sdelphij * tent in that case and all the semaphores are already gone, we simply 855293423Sdelphij * ignore this request in this case. 856293423Sdelphij * 857293423Sdelphij * Since the existence of the semaphores is controlled exclusively by 858293423Sdelphij * the parent, there's no risk of data race here. 859275970Scy */ 860275970Scyint 861275970Scyreq_child_exit( 862275970Scy blocking_child *c 863275970Scy ) 864275970Scy{ 865293423Sdelphij return (c->accesslock) 866293423Sdelphij ? queue_req_pointer(c, CHILD_EXIT_REQ) 867293423Sdelphij : 0; 868275970Scy} 869275970Scy 870293423Sdelphij/* -------------------------------------------------------------------- 871275970Scy * cleanup_after_child() runs in parent. 872275970Scy */ 873275970Scystatic void 874275970Scycleanup_after_child( 875275970Scy blocking_child * c 876275970Scy ) 877275970Scy{ 878275970Scy DEBUG_INSIST(!c->reusable); 879293423Sdelphij 880293423Sdelphij# ifdef SYS_WINNT 881293423Sdelphij /* The thread was not created in detached state, so we better 882293423Sdelphij * clean up. 883293423Sdelphij */ 884293423Sdelphij if (c->thread_ref && c->thread_ref->thnd) { 885293423Sdelphij WaitForSingleObject(c->thread_ref->thnd, INFINITE); 886293423Sdelphij INSIST(CloseHandle(c->thread_ref->thnd)); 887293423Sdelphij c->thread_ref->thnd = NULL; 888293423Sdelphij } 889293423Sdelphij# endif 890275970Scy c->thread_ref = NULL; 891293423Sdelphij 892293423Sdelphij /* remove semaphores and (if signalling vi IO) pipes */ 893293423Sdelphij 894293423Sdelphij c->accesslock = delete_sema(c->accesslock); 895293423Sdelphij c->workitems_pending = delete_sema(c->workitems_pending); 896293423Sdelphij c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep); 897293423Sdelphij 898293423Sdelphij# ifdef WORK_PIPE 899275970Scy DEBUG_INSIST(-1 != c->resp_read_pipe); 900275970Scy DEBUG_INSIST(-1 != c->resp_write_pipe); 901275970Scy (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE); 902275970Scy close(c->resp_write_pipe); 903275970Scy close(c->resp_read_pipe); 904275970Scy c->resp_write_pipe = -1; 905275970Scy c->resp_read_pipe = -1; 906293423Sdelphij# else 907293423Sdelphij DEBUG_INSIST(NULL != c->responses_pending); 908293423Sdelphij (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE); 909293423Sdelphij c->responses_pending = delete_sema(c->responses_pending); 910293423Sdelphij# endif 911293423Sdelphij 912293423Sdelphij /* Is it necessary to check if there are pending requests and 913293423Sdelphij * responses? If so, and if there are, what to do with them? 914293423Sdelphij */ 915293423Sdelphij 916293423Sdelphij /* re-init buffer index sequencers */ 917293423Sdelphij c->head_workitem = 0; 918293423Sdelphij c->tail_workitem = 0; 919293423Sdelphij c->head_response = 0; 920293423Sdelphij c->tail_response = 0; 921293423Sdelphij 922275970Scy c->reusable = TRUE; 923275970Scy} 924275970Scy 925275970Scy 926275970Scy#else /* !WORK_THREAD follows */ 927275970Scychar work_thread_nonempty_compilation_unit; 928275970Scy#endif 929