1145510Sdarrenr/*
2145510Sdarrenr * work_thread.c - threads implementation for blocking worker child.
322514Sdarrenr */
453024Sguido#include <config.h>
522514Sdarrenr#include "ntp_workimpl.h"
680486Sdarrenr
722514Sdarrenr#ifdef WORK_THREAD
8145510Sdarrenr
9145510Sdarrenr#include <stdio.h>
10145510Sdarrenr#include <ctype.h>
1192686Sdarrenr#include <signal.h>
12145510Sdarrenr#ifndef SYS_WINNT
1322514Sdarrenr#include <pthread.h>
1422514Sdarrenr#endif
1522514Sdarrenr
1622514Sdarrenr#include "ntp_stdlib.h"
1722514Sdarrenr#include "ntp_malloc.h"
1822514Sdarrenr#include "ntp_syslog.h"
1931183Speter#include "ntpd.h"
2022514Sdarrenr#include "ntp_io.h"
2131183Speter#include "ntp_assert.h"
2231183Speter#include "ntp_unixtime.h"
2331183Speter#include "timespecops.h"
2431183Speter#include "ntp_worker.h"
2531183Speter
2622514Sdarrenr#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27145510Sdarrenr#define CHILD_GONE_RESP	CHILD_EXIT_REQ
28145510Sdarrenr/* Queue size increments:
29145510Sdarrenr * The request queue grows a bit faster than the response queue -- the
30145510Sdarrenr * daemon can push requests and pull results faster on avarage than the
31145510Sdarrenr * worker can process requests and push results...  If this really pays
3224583Sdarrenr * off is debatable.
3322514Sdarrenr */
3422514Sdarrenr#define WORKITEMS_ALLOC_INC	16
3522514Sdarrenr#define RESPONSES_ALLOC_INC	4
3622514Sdarrenr
3722514Sdarrenr/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
3822514Sdarrenr * set the maximum to 256kB. If the minimum goes below the
3922514Sdarrenr * system-defined minimum stack size, we have to adjust accordingly.
4022514Sdarrenr */
4122514Sdarrenr#ifndef THREAD_MINSTACKSIZE
4222514Sdarrenr# define THREAD_MINSTACKSIZE	(64U * 1024)
4322514Sdarrenr#endif
4422514Sdarrenr
4522514Sdarrenr#ifndef THREAD_MAXSTACKSIZE
4622514Sdarrenr# define THREAD_MAXSTACKSIZE	(256U * 1024)
4722514Sdarrenr#endif
4822514Sdarrenr
4922514Sdarrenr/* need a good integer to store a pointer... */
5022514Sdarrenr#ifndef UINTPTR_T
5122514Sdarrenr# if defined(UINTPTR_MAX)
5222514Sdarrenr#  define UINTPTR_T uintptr_t
5322514Sdarrenr# elif defined(UINT_PTR)
5422514Sdarrenr#  define UINTPTR_T UINT_PTR
5522514Sdarrenr# else
5622514Sdarrenr#  define UINTPTR_T size_t
5722514Sdarrenr# endif
5822514Sdarrenr#endif
5922514Sdarrenr
6022514Sdarrenr
6122514Sdarrenr#ifdef SYS_WINNT
6222514Sdarrenr
6322514Sdarrenr# define thread_exit(c)	_endthreadex(c)
6422514Sdarrenr# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
6522514Sdarrenru_int	WINAPI	blocking_thread(void *);
6622514Sdarrenrstatic BOOL	same_os_sema(const sem_ref obj, void * osobj);
6722514Sdarrenr
6822514Sdarrenr#else
6922514Sdarrenr
7022514Sdarrenr# define thread_exit(c)	pthread_exit((void*)(UINTPTR_T)(c))
71145510Sdarrenr# define tickle_sem	sem_post
7231183Spetervoid *		blocking_thread(void *);
73145510Sdarrenrstatic	void	block_thread_signals(sigset_t *);
7431183Speter
7522514Sdarrenr#endif
7622514Sdarrenr
7722514Sdarrenr#ifdef WORK_PIPE
7822514Sdarrenraddremove_io_fd_func		addremove_io_fd;
7931183Speter#else
8022514Sdarrenraddremove_io_semaphore_func	addremove_io_semaphore;
8122514Sdarrenr#endif
8222514Sdarrenr
8322514Sdarrenrstatic	void	start_blocking_thread(blocking_child *);
8422514Sdarrenrstatic	void	start_blocking_thread_internal(blocking_child *);
8522514Sdarrenrstatic	void	prepare_child_sems(blocking_child *);
8622514Sdarrenrstatic	int	wait_for_sem(sem_ref, struct timespec *);
8722514Sdarrenrstatic	int	ensure_workitems_empty_slot(blocking_child *);
8822514Sdarrenrstatic	int	ensure_workresp_empty_slot(blocking_child *);
8922514Sdarrenrstatic	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
9022514Sdarrenrstatic	void	cleanup_after_child(blocking_child *);
9122514Sdarrenr
9222514Sdarrenrstatic sema_type worker_mmutex;
9322514Sdarrenrstatic sem_ref   worker_memlock;
94145510Sdarrenr
95145510Sdarrenr/* --------------------------------------------------------------------
9622514Sdarrenr * locking the global worker state table (and other global stuff)
9722514Sdarrenr */
9822514Sdarrenrvoid
9922514Sdarrenrworker_global_lock(
10034739Speter	int inOrOut)
10122514Sdarrenr{
10222514Sdarrenr	if (worker_memlock) {
10372003Sdarrenr		if (inOrOut)
10422514Sdarrenr			wait_for_sem(worker_memlock, NULL);
105145510Sdarrenr		else
10672003Sdarrenr			tickle_sem(worker_memlock);
10772003Sdarrenr	}
10872003Sdarrenr}
10972003Sdarrenr
11072003Sdarrenr/* --------------------------------------------------------------------
11172003Sdarrenr * implementation isolation wrapper
11222514Sdarrenr */
11322514Sdarrenrvoid
11431183Speterexit_worker(
11522514Sdarrenr	int	exitcode
116145510Sdarrenr	)
11731183Speter{
118145510Sdarrenr	thread_exit(exitcode);	/* see #define thread_exit */
11931183Speter}
12022514Sdarrenr
12122514Sdarrenr/* --------------------------------------------------------------------
12222514Sdarrenr * sleep for a given time or until the wakup semaphore is tickled.
12322514Sdarrenr */
12431183Speterint
12531183Speterworker_sleep(
12622514Sdarrenr	blocking_child *	c,
12722514Sdarrenr	time_t			seconds
12822514Sdarrenr	)
12934739Speter{
13034739Speter	struct timespec	until;
13131183Speter	int		rc;
132145510Sdarrenr
133145510Sdarrenr# ifdef HAVE_CLOCK_GETTIME
13431183Speter	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
13531183Speter		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
13631183Speter		return -1;
13731183Speter	}
13831183Speter# else
13922514Sdarrenr	if (0 != getclock(TIMEOFDAY, &until)) {
140145510Sdarrenr		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
141145510Sdarrenr		return -1;
142145510Sdarrenr	}
143145510Sdarrenr# endif
144145510Sdarrenr	until.tv_sec += seconds;
145145510Sdarrenr	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
146145510Sdarrenr	if (0 == rc)
14734739Speter		return -1;
14822514Sdarrenr	if (-1 == rc && ETIMEDOUT == errno)
14922514Sdarrenr		return 0;
150145510Sdarrenr	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
15122514Sdarrenr	return -1;
15234739Speter}
15334739Speter
15422514Sdarrenr
15522514Sdarrenr/* --------------------------------------------------------------------
15622514Sdarrenr * Wake up a worker that takes a nap.
15722514Sdarrenr */
15822514Sdarrenrvoid
15922514Sdarrenrinterrupt_worker_sleep(void)
16022514Sdarrenr{
16122514Sdarrenr	u_int			idx;
16222514Sdarrenr	blocking_child *	c;
16322514Sdarrenr
16422514Sdarrenr	for (idx = 0; idx < blocking_children_alloc; idx++) {
16534739Speter		c = blocking_children[idx];
16622514Sdarrenr		if (NULL == c || NULL == c->wake_scheduled_sleep)
167145510Sdarrenr			continue;
16822514Sdarrenr		tickle_sem(c->wake_scheduled_sleep);
16922514Sdarrenr	}
17022514Sdarrenr}
17122514Sdarrenr
17222514Sdarrenr/* --------------------------------------------------------------------
17322514Sdarrenr * Make sure there is an empty slot at the head of the request
174145510Sdarrenr * queue. Tell if the queue is currently empty.
17522514Sdarrenr */
17622514Sdarrenrstatic int
17722514Sdarrenrensure_workitems_empty_slot(
17822514Sdarrenr	blocking_child *c
17922514Sdarrenr	)
18022514Sdarrenr{
18122514Sdarrenr	/*
18222514Sdarrenr	** !!! PRECONDITION: caller holds access lock!
18322514Sdarrenr	**
18422514Sdarrenr	** This simply tries to increase the size of the buffer if it
18522514Sdarrenr	** becomes full. The resize operation does *not* maintain the
18622514Sdarrenr	** order of requests, but that should be irrelevant since the
18722514Sdarrenr	** processing is considered asynchronous anyway.
18822514Sdarrenr	**
18922514Sdarrenr	** Return if the buffer is currently empty.
19022514Sdarrenr	*/
19122514Sdarrenr
19222514Sdarrenr	static const size_t each =
19322514Sdarrenr	    sizeof(blocking_children[0]->workitems[0]);
19422514Sdarrenr
19522514Sdarrenr	size_t	new_alloc;
19622514Sdarrenr	size_t  slots_used;
19722514Sdarrenr	size_t	sidx;
19822514Sdarrenr
19922514Sdarrenr	slots_used = c->head_workitem - c->tail_workitem;
20022514Sdarrenr	if (slots_used >= c->workitems_alloc) {
20122514Sdarrenr		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
20222514Sdarrenr		c->workitems = erealloc(c->workitems, new_alloc * each);
20322514Sdarrenr		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
20422514Sdarrenr		    c->workitems[sidx] = NULL;
20522514Sdarrenr		c->tail_workitem   = 0;
20622514Sdarrenr		c->head_workitem   = c->workitems_alloc;
20722514Sdarrenr		c->workitems_alloc = new_alloc;
20822514Sdarrenr	}
20922514Sdarrenr	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
21022514Sdarrenr	return (0 == slots_used);
21122514Sdarrenr}
21222514Sdarrenr
21322514Sdarrenr/* --------------------------------------------------------------------
21422514Sdarrenr * Make sure there is an empty slot at the head of the response
21522514Sdarrenr * queue. Tell if the queue is currently empty.
21622514Sdarrenr */
21722514Sdarrenrstatic int
21822514Sdarrenrensure_workresp_empty_slot(
21922514Sdarrenr	blocking_child *c
22022514Sdarrenr	)
22122514Sdarrenr{
22222514Sdarrenr	/*
22324583Sdarrenr	** !!! PRECONDITION: caller holds access lock!
22422514Sdarrenr	**
22522514Sdarrenr	** Works like the companion function above.
22622514Sdarrenr	*/
22722514Sdarrenr
22822514Sdarrenr	static const size_t each =
22922514Sdarrenr	    sizeof(blocking_children[0]->responses[0]);
23022514Sdarrenr
23122514Sdarrenr	size_t	new_alloc;
23222514Sdarrenr	size_t  slots_used;
23322514Sdarrenr	size_t	sidx;
23422514Sdarrenr
235145510Sdarrenr	slots_used = c->head_response - c->tail_response;
23622514Sdarrenr	if (slots_used >= c->responses_alloc) {
23722514Sdarrenr		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
23822514Sdarrenr		c->responses = erealloc(c->responses, new_alloc * each);
23922514Sdarrenr		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
24022514Sdarrenr		    c->responses[sidx] = NULL;
24122514Sdarrenr		c->tail_response   = 0;
24222514Sdarrenr		c->head_response   = c->responses_alloc;
24322514Sdarrenr		c->responses_alloc = new_alloc;
24422514Sdarrenr	}
24522514Sdarrenr	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
24622514Sdarrenr	return (0 == slots_used);
24722514Sdarrenr}
24822514Sdarrenr
24922514Sdarrenr
25022514Sdarrenr/* --------------------------------------------------------------------
25122514Sdarrenr * queue_req_pointer() - append a work item or idle exit request to
25222514Sdarrenr *			 blocking_workitems[]. Employ proper locking.
25322514Sdarrenr */
25422514Sdarrenrstatic int
255145510Sdarrenrqueue_req_pointer(
25634739Speter	blocking_child	*	c,
25734739Speter	blocking_pipe_header *	hdr
258145510Sdarrenr	)
25922514Sdarrenr{
26034739Speter	size_t qhead;
261145510Sdarrenr
26234739Speter	/* >>>> ACCESS LOCKING STARTS >>>> */
263145510Sdarrenr	wait_for_sem(c->accesslock, NULL);
264145510Sdarrenr	ensure_workitems_empty_slot(c);
265145510Sdarrenr	qhead = c->head_workitem;
26634739Speter	c->workitems[qhead % c->workitems_alloc] = hdr;
26734739Speter	c->head_workitem = 1 + qhead;
268145510Sdarrenr	tickle_sem(c->accesslock);
26922514Sdarrenr	/* <<<< ACCESS LOCKING ENDS <<<< */
270145510Sdarrenr
271145510Sdarrenr	/* queue consumer wake-up notification */
272145510Sdarrenr	tickle_sem(c->workitems_pending);
273145510Sdarrenr
27422514Sdarrenr	return 0;
275145510Sdarrenr}
276145510Sdarrenr
27734739Speter/* --------------------------------------------------------------------
27822514Sdarrenr * API function to make sure a worker is running, a proper private copy
27934739Speter * of the data is made, the data eneterd into the queue and the worker
28034739Speter * is signalled.
281145510Sdarrenr */
28234739Speterint
28322514Sdarrenrsend_blocking_req_internal(
28434739Speter	blocking_child *	c,
28534739Speter	blocking_pipe_header *	hdr,
28622514Sdarrenr	void *			data
28722514Sdarrenr	)
288145510Sdarrenr{
289145510Sdarrenr	blocking_pipe_header *	threadcopy;
29034739Speter	size_t			payload_octets;
291145510Sdarrenr
292145510Sdarrenr	REQUIRE(hdr != NULL);
29322514Sdarrenr	REQUIRE(data != NULL);
294145510Sdarrenr	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
29522514Sdarrenr
29622514Sdarrenr	if (hdr->octets <= sizeof(*hdr))
29722514Sdarrenr		return 1;	/* failure */
29822514Sdarrenr	payload_octets = hdr->octets - sizeof(*hdr);
29922514Sdarrenr
30022514Sdarrenr	if (NULL == c->thread_ref)
30122514Sdarrenr		start_blocking_thread(c);
30222514Sdarrenr	threadcopy = emalloc(hdr->octets);
30322514Sdarrenr	memcpy(threadcopy, hdr, sizeof(*hdr));
30422514Sdarrenr	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
30522514Sdarrenr
30622514Sdarrenr	return queue_req_pointer(c, threadcopy);
30722514Sdarrenr}
30822514Sdarrenr
30922514Sdarrenr/* --------------------------------------------------------------------
31022514Sdarrenr * Wait for the 'incoming queue no longer empty' signal, lock the shared
31122514Sdarrenr * structure and dequeue an item.
31222514Sdarrenr */
31322514Sdarrenrblocking_pipe_header *
31422514Sdarrenrreceive_blocking_req_internal(
31522514Sdarrenr	blocking_child *	c
31622514Sdarrenr	)
317145510Sdarrenr{
31822514Sdarrenr	blocking_pipe_header *	req;
31922514Sdarrenr	size_t			qhead, qtail;
32022514Sdarrenr
321145510Sdarrenr	req = NULL;
32222514Sdarrenr	do {
32324583Sdarrenr		/* wait for tickle from the producer side */
32422514Sdarrenr		wait_for_sem(c->workitems_pending, NULL);
32522514Sdarrenr
326145510Sdarrenr		/* >>>> ACCESS LOCKING STARTS >>>> */
32722514Sdarrenr		wait_for_sem(c->accesslock, NULL);
32822514Sdarrenr		qhead = c->head_workitem;
32922514Sdarrenr		do {
33022514Sdarrenr			qtail = c->tail_workitem;
33122514Sdarrenr			if (qhead == qtail)
33222514Sdarrenr				break;
33322514Sdarrenr			c->tail_workitem = qtail + 1;
33422514Sdarrenr			qtail %= c->workitems_alloc;
33522514Sdarrenr			req = c->workitems[qtail];
33622514Sdarrenr			c->workitems[qtail] = NULL;
33722514Sdarrenr		} while (NULL == req);
33822514Sdarrenr		tickle_sem(c->accesslock);
33922514Sdarrenr		/* <<<< ACCESS LOCKING ENDS <<<< */
34022514Sdarrenr
341145510Sdarrenr	} while (NULL == req);
34222514Sdarrenr
34322514Sdarrenr	INSIST(NULL != req);
34424583Sdarrenr	if (CHILD_EXIT_REQ == req) {	/* idled out */
34522514Sdarrenr		send_blocking_resp_internal(c, CHILD_GONE_RESP);
34622514Sdarrenr		req = NULL;
34722514Sdarrenr	}
34822514Sdarrenr
34922514Sdarrenr	return req;
35022514Sdarrenr}
35122514Sdarrenr
35222514Sdarrenr/* --------------------------------------------------------------------
35322514Sdarrenr * Push a response into the return queue and eventually tickle the
35422514Sdarrenr * receiver.
35522514Sdarrenr */
35622514Sdarrenrint
35722514Sdarrenrsend_blocking_resp_internal(
35822514Sdarrenr	blocking_child *	c,
35922514Sdarrenr	blocking_pipe_header *	resp
36022514Sdarrenr	)
36122514Sdarrenr{
36222514Sdarrenr	size_t	qhead;
36322514Sdarrenr	int	empty;
36422514Sdarrenr
36522514Sdarrenr	/* >>>> ACCESS LOCKING STARTS >>>> */
36622514Sdarrenr	wait_for_sem(c->accesslock, NULL);
367	empty = ensure_workresp_empty_slot(c);
368	qhead = c->head_response;
369	c->responses[qhead % c->responses_alloc] = resp;
370	c->head_response = 1 + qhead;
371	tickle_sem(c->accesslock);
372	/* <<<< ACCESS LOCKING ENDS <<<< */
373
374	/* queue consumer wake-up notification */
375	if (empty)
376	{
377#	    ifdef WORK_PIPE
378		if (1 != write(c->resp_write_pipe, "", 1))
379			msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo"
380				" failed to notify main thread!",
381				(BLOCKING_GETNAMEINFO == resp->rtype)
382				    ? "name"
383				    : "addr"
384				);
385#	    else
386		tickle_sem(c->responses_pending);
387#	    endif
388	}
389	return 0;
390}
391
392
393#ifndef WORK_PIPE
394
395/* --------------------------------------------------------------------
396 * Check if a (Windows-)handle to a semaphore is actually the same we
397 * are using inside the sema wrapper.
398 */
399static BOOL
400same_os_sema(
401	const sem_ref	obj,
402	void*		osh
403	)
404{
405	return obj && osh && (obj->shnd == (HANDLE)osh);
406}
407
408/* --------------------------------------------------------------------
409 * Find the shared context that associates to an OS handle and make sure
410 * the data is dequeued and processed.
411 */
412void
413handle_blocking_resp_sem(
414	void *	context
415	)
416{
417	blocking_child *	c;
418	u_int			idx;
419
420	c = NULL;
421	for (idx = 0; idx < blocking_children_alloc; idx++) {
422		c = blocking_children[idx];
423		if (c != NULL &&
424			c->thread_ref != NULL &&
425			same_os_sema(c->responses_pending, context))
426			break;
427	}
428	if (idx < blocking_children_alloc)
429		process_blocking_resp(c);
430}
431#endif	/* !WORK_PIPE */
432
433/* --------------------------------------------------------------------
434 * Fetch the next response from the return queue. In case of signalling
435 * via pipe, make sure the pipe is flushed, too.
436 */
437blocking_pipe_header *
438receive_blocking_resp_internal(
439	blocking_child *	c
440	)
441{
442	blocking_pipe_header *	removed;
443	size_t			qhead, qtail, slot;
444
445#ifdef WORK_PIPE
446	int			rc;
447	char			scratch[32];
448
449	do
450		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
451	while (-1 == rc && EINTR == errno);
452#endif
453
454	/* >>>> ACCESS LOCKING STARTS >>>> */
455	wait_for_sem(c->accesslock, NULL);
456	qhead = c->head_response;
457	qtail = c->tail_response;
458	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
459		slot = qtail % c->responses_alloc;
460		removed = c->responses[slot];
461		c->responses[slot] = NULL;
462	}
463	c->tail_response = qtail;
464	tickle_sem(c->accesslock);
465	/* <<<< ACCESS LOCKING ENDS <<<< */
466
467	if (NULL != removed) {
468		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
469			     BLOCKING_RESP_MAGIC == removed->magic_sig);
470	}
471	if (CHILD_GONE_RESP == removed) {
472		cleanup_after_child(c);
473		removed = NULL;
474	}
475
476	return removed;
477}
478
479/* --------------------------------------------------------------------
480 * Light up a new worker.
481 */
482static void
483start_blocking_thread(
484	blocking_child *	c
485	)
486{
487
488	DEBUG_INSIST(!c->reusable);
489
490	prepare_child_sems(c);
491	start_blocking_thread_internal(c);
492}
493
494/* --------------------------------------------------------------------
495 * Create a worker thread. There are several differences between POSIX
496 * and Windows, of course -- most notably the Windows thread is a
497 * detached thread, and we keep the handle around until we want to get
498 * rid of the thread. The notification scheme also differs: Windows
499 * makes use of semaphores in both directions, POSIX uses a pipe for
500 * integration with 'select()' or alike.
501 */
502static void
503start_blocking_thread_internal(
504	blocking_child *	c
505	)
506#ifdef SYS_WINNT
507{
508	BOOL	resumed;
509
510	c->thread_ref = NULL;
511	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
512	c->thr_table[0].thnd =
513		(HANDLE)_beginthreadex(
514			NULL,
515			0,
516			&blocking_thread,
517			c,
518			CREATE_SUSPENDED,
519			NULL);
520
521	if (NULL == c->thr_table[0].thnd) {
522		msyslog(LOG_ERR, "start blocking thread failed: %m");
523		exit(-1);
524	}
525	/* remember the thread priority is only within the process class */
526	if (!SetThreadPriority(c->thr_table[0].thnd,
527			       THREAD_PRIORITY_BELOW_NORMAL)) {
528		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
529	}
530	if (NULL != pSetThreadDescription) {
531		(*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
532	}
533	resumed = ResumeThread(c->thr_table[0].thnd);
534	DEBUG_INSIST(resumed);
535	c->thread_ref = &c->thr_table[0];
536}
537#else	/* pthreads start_blocking_thread_internal() follows */
538{
539# ifdef NEED_PTHREAD_INIT
540	static int	pthread_init_called;
541# endif
542	pthread_attr_t	thr_attr;
543	int		rc;
544	int		pipe_ends[2];	/* read then write */
545	int		is_pipe;
546	int		flags;
547	size_t		ostacksize;
548	size_t		nstacksize;
549	sigset_t	saved_sig_mask;
550
551	c->thread_ref = NULL;
552
553# ifdef NEED_PTHREAD_INIT
554	/*
555	 * from lib/isc/unix/app.c:
556	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
557	 */
558	if (!pthread_init_called) {
559		pthread_init();
560		pthread_init_called = TRUE;
561	}
562# endif
563
564	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
565	if (0 != rc) {
566		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
567		exit(1);
568	}
569	c->resp_read_pipe = move_fd(pipe_ends[0]);
570	c->resp_write_pipe = move_fd(pipe_ends[1]);
571	c->ispipe = is_pipe;
572	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
573	if (-1 == flags) {
574		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
575		exit(1);
576	}
577	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
578	if (-1 == rc) {
579		msyslog(LOG_ERR,
580			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
581		exit(1);
582	}
583	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
584	pthread_attr_init(&thr_attr);
585	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
586#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
587    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
588	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
589	if (0 != rc) {
590		msyslog(LOG_ERR,
591			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
592			strerror(rc));
593	} else {
594		nstacksize = ostacksize;
595		/* order is important here: first clamp on upper limit,
596		 * and the PTHREAD min stack size is ultimate override!
597		 */
598		if (nstacksize > THREAD_MAXSTACKSIZE)
599			nstacksize = THREAD_MAXSTACKSIZE;
600#            ifdef PTHREAD_STACK_MAX
601		if (nstacksize > PTHREAD_STACK_MAX)
602			nstacksize = PTHREAD_STACK_MAX;
603#            endif
604
605		/* now clamp on lower stack limit. */
606		if (nstacksize < THREAD_MINSTACKSIZE)
607			nstacksize = THREAD_MINSTACKSIZE;
608#            ifdef PTHREAD_STACK_MIN
609		if (nstacksize < PTHREAD_STACK_MIN)
610			nstacksize = PTHREAD_STACK_MIN;
611#            endif
612
613		if (nstacksize != ostacksize)
614			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
615		if (0 != rc)
616			msyslog(LOG_ERR,
617				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
618				(u_long)ostacksize, (u_long)nstacksize,
619				strerror(rc));
620	}
621#else
622	UNUSED_ARG(nstacksize);
623	UNUSED_ARG(ostacksize);
624#endif
625#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
626	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
627#endif
628	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
629	block_thread_signals(&saved_sig_mask);
630	rc = pthread_create(&c->thr_table[0], &thr_attr,
631			    &blocking_thread, c);
632	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
633	pthread_attr_destroy(&thr_attr);
634	if (0 != rc) {
635		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
636			strerror(rc));
637		exit(1);
638	}
639	c->thread_ref = &c->thr_table[0];
640}
641#endif
642
643/* --------------------------------------------------------------------
644 * block_thread_signals()
645 *
646 * Temporarily block signals used by ntpd main thread, so that signal
647 * mask inherited by child threads leaves them blocked.  Returns prior
648 * active signal mask via pmask, to be restored by the main thread
649 * after pthread_create().
650 */
651#ifndef SYS_WINNT
652void
653block_thread_signals(
654	sigset_t *	pmask
655	)
656{
657	sigset_t	block;
658
659	sigemptyset(&block);
660# ifdef HAVE_SIGNALED_IO
661#  ifdef SIGIO
662	sigaddset(&block, SIGIO);
663#  endif
664#  ifdef SIGPOLL
665	sigaddset(&block, SIGPOLL);
666#  endif
667# endif	/* HAVE_SIGNALED_IO */
668	sigaddset(&block, SIGALRM);
669	sigaddset(&block, MOREDEBUGSIG);
670	sigaddset(&block, LESSDEBUGSIG);
671# ifdef SIGDIE1
672	sigaddset(&block, SIGDIE1);
673# endif
674# ifdef SIGDIE2
675	sigaddset(&block, SIGDIE2);
676# endif
677# ifdef SIGDIE3
678	sigaddset(&block, SIGDIE3);
679# endif
680# ifdef SIGDIE4
681	sigaddset(&block, SIGDIE4);
682# endif
683# ifdef SIGBUS
684	sigaddset(&block, SIGBUS);
685# endif
686	sigemptyset(pmask);
687	pthread_sigmask(SIG_BLOCK, &block, pmask);
688}
689#endif	/* !SYS_WINNT */
690
691
692/* --------------------------------------------------------------------
693 * Create & destroy semaphores. This is sufficiently different between
694 * POSIX and Windows to warrant wrapper functions and close enough to
695 * use the concept of synchronization via semaphore for all platforms.
696 */
697static sem_ref
698create_sema(
699	sema_type*	semptr,
700	u_int		inival,
701	u_int		maxval)
702{
703#ifdef SYS_WINNT
704
705	long svini, svmax;
706	if (NULL != semptr) {
707		svini = (inival < LONG_MAX)
708		    ? (long)inival : LONG_MAX;
709		svmax = (maxval < LONG_MAX && maxval > 0)
710		    ? (long)maxval : LONG_MAX;
711		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
712		if (NULL == semptr->shnd)
713			semptr = NULL;
714	}
715
716#else
717
718	(void)maxval;
719	if (semptr && sem_init(semptr, FALSE, inival))
720		semptr = NULL;
721
722#endif
723
724	return semptr;
725}
726
727/* ------------------------------------------------------------------ */
728static sem_ref
729delete_sema(
730	sem_ref obj)
731{
732
733#   ifdef SYS_WINNT
734
735	if (obj) {
736		if (obj->shnd)
737			CloseHandle(obj->shnd);
738		obj->shnd = NULL;
739	}
740
741#   else
742
743	if (obj)
744		sem_destroy(obj);
745
746#   endif
747
748	return NULL;
749}
750
751/* --------------------------------------------------------------------
752 * prepare_child_sems()
753 *
754 * create sync & access semaphores
755 *
756 * All semaphores are cleared, only the access semaphore has 1 unit.
757 * Childs wait on 'workitems_pending', then grabs 'sema_access'
758 * and dequeues jobs. When done, 'sema_access' is given one unit back.
759 *
760 * The producer grabs 'sema_access', manages the queue, restores
761 * 'sema_access' and puts one unit into 'workitems_pending'.
762 *
763 * The story goes the same for the response queue.
764 */
765static void
766prepare_child_sems(
767	blocking_child *c
768	)
769{
770	if (NULL == worker_memlock)
771		worker_memlock = create_sema(&worker_mmutex, 1, 1);
772
773	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
774	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
775	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
776#   ifndef WORK_PIPE
777	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
778#   endif
779}
780
781/* --------------------------------------------------------------------
782 * wait for semaphore. Where the wait can be interrupted, it will
783 * internally resume -- When this function returns, there is either no
784 * semaphore at all, a timeout occurred, or the caller could
785 * successfully take a token from the semaphore.
786 *
787 * For untimed wait, not checking the result of this function at all is
788 * definitely an option.
789 */
790static int
791wait_for_sem(
792	sem_ref			sem,
793	struct timespec *	timeout		/* wall-clock */
794	)
795#ifdef SYS_WINNT
796{
797	struct timespec now;
798	struct timespec delta;
799	DWORD		msec;
800	DWORD		rc;
801
802	if (!(sem && sem->shnd)) {
803		errno = EINVAL;
804		return -1;
805	}
806
807	if (NULL == timeout) {
808		msec = INFINITE;
809	} else {
810		getclock(TIMEOFDAY, &now);
811		delta = sub_tspec(*timeout, now);
812		if (delta.tv_sec < 0) {
813			msec = 0;
814		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
815			msec = INFINITE;
816		} else {
817			msec = 1000 * (DWORD)delta.tv_sec;
818			msec += delta.tv_nsec / (1000 * 1000);
819		}
820	}
821	rc = WaitForSingleObject(sem->shnd, msec);
822	if (WAIT_OBJECT_0 == rc)
823		return 0;
824	if (WAIT_TIMEOUT == rc) {
825		errno = ETIMEDOUT;
826		return -1;
827	}
828	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
829	errno = EFAULT;
830	return -1;
831}
832#else	/* pthreads wait_for_sem() follows */
833{
834	int rc = -1;
835
836	if (sem) do {
837			if (NULL == timeout)
838				rc = sem_wait(sem);
839			else
840				rc = sem_timedwait(sem, timeout);
841		} while (rc == -1 && errno == EINTR);
842	else
843		errno = EINVAL;
844
845	return rc;
846}
847#endif
848
849/* --------------------------------------------------------------------
850 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
851 * calling conventions under Windows and POSIX-defined signature
852 * otherwise.
853 */
854#ifdef SYS_WINNT
855u_int WINAPI
856#else
857void *
858#endif
859blocking_thread(
860	void *	ThreadArg
861	)
862{
863	blocking_child *c;
864
865	c = ThreadArg;
866	exit_worker(blocking_child_common(c));
867
868	/* NOTREACHED */
869	return 0;
870}
871
872/* --------------------------------------------------------------------
873 * req_child_exit() runs in the parent.
874 *
875 * This function is called from from the idle timer, too, and possibly
876 * without a thread being there any longer. Since we have folded up our
877 * tent in that case and all the semaphores are already gone, we simply
878 * ignore this request in this case.
879 *
880 * Since the existence of the semaphores is controlled exclusively by
881 * the parent, there's no risk of data race here.
882 */
883int
884req_child_exit(
885	blocking_child *c
886	)
887{
888	return (c->accesslock)
889	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
890	    : 0;
891}
892
893/* --------------------------------------------------------------------
894 * cleanup_after_child() runs in parent.
895 */
896static void
897cleanup_after_child(
898	blocking_child *	c
899	)
900{
901	DEBUG_INSIST(!c->reusable);
902
903#   ifdef SYS_WINNT
904	/* The thread was not created in detached state, so we better
905	 * clean up.
906	 */
907	if (c->thread_ref && c->thread_ref->thnd) {
908		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
909		INSIST(CloseHandle(c->thread_ref->thnd));
910		c->thread_ref->thnd = NULL;
911	}
912#   endif
913	c->thread_ref = NULL;
914
915	/* remove semaphores and (if signalling vi IO) pipes */
916
917	c->accesslock           = delete_sema(c->accesslock);
918	c->workitems_pending    = delete_sema(c->workitems_pending);
919	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
920
921#   ifdef WORK_PIPE
922	DEBUG_INSIST(-1 != c->resp_read_pipe);
923	DEBUG_INSIST(-1 != c->resp_write_pipe);
924	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
925	close(c->resp_write_pipe);
926	close(c->resp_read_pipe);
927	c->resp_write_pipe = -1;
928	c->resp_read_pipe = -1;
929#   else
930	DEBUG_INSIST(NULL != c->responses_pending);
931	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
932	c->responses_pending = delete_sema(c->responses_pending);
933#   endif
934
935	/* Is it necessary to check if there are pending requests and
936	 * responses? If so, and if there are, what to do with them?
937	 */
938
939	/* re-init buffer index sequencers */
940	c->head_workitem = 0;
941	c->tail_workitem = 0;
942	c->head_response = 0;
943	c->tail_response = 0;
944
945	c->reusable = TRUE;
946}
947
948
949#else	/* !WORK_THREAD follows */
950char work_thread_nonempty_compilation_unit;
951#endif
952