1275970Scy/*
2275970Scy * work_thread.c - threads implementation for blocking worker child.
3275970Scy */
4275970Scy#include <config.h>
5275970Scy#include "ntp_workimpl.h"
6275970Scy
7275970Scy#ifdef WORK_THREAD
8275970Scy
9275970Scy#include <stdio.h>
10275970Scy#include <ctype.h>
11275970Scy#include <signal.h>
12275970Scy#ifndef SYS_WINNT
13275970Scy#include <pthread.h>
14275970Scy#endif
15275970Scy
16275970Scy#include "ntp_stdlib.h"
17275970Scy#include "ntp_malloc.h"
18275970Scy#include "ntp_syslog.h"
19275970Scy#include "ntpd.h"
20275970Scy#include "ntp_io.h"
21275970Scy#include "ntp_assert.h"
22275970Scy#include "ntp_unixtime.h"
23275970Scy#include "timespecops.h"
24275970Scy#include "ntp_worker.h"
25275970Scy
26275970Scy#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27275970Scy#define CHILD_GONE_RESP	CHILD_EXIT_REQ
28294554Sdelphij/* Queue size increments:
29294554Sdelphij * The request queue grows a bit faster than the response queue -- the
30330106Sdelphij * daemon can push requests and pull results faster on avarage than the
31294554Sdelphij * worker can process requests and push results...  If this really pays
32294554Sdelphij * off is debatable.
33294554Sdelphij */
34275970Scy#define WORKITEMS_ALLOC_INC	16
35275970Scy#define RESPONSES_ALLOC_INC	4
36275970Scy
37294554Sdelphij/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38294554Sdelphij * set the maximum to 256kB. If the minimum goes below the
39294554Sdelphij * system-defined minimum stack size, we have to adjust accordingly.
40294554Sdelphij */
41275970Scy#ifndef THREAD_MINSTACKSIZE
42294554Sdelphij# define THREAD_MINSTACKSIZE	(64U * 1024)
43275970Scy#endif
44294554Sdelphij#ifndef __sun
45294554Sdelphij#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46294554Sdelphij# undef THREAD_MINSTACKSIZE
47294554Sdelphij# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48294554Sdelphij#endif
49294554Sdelphij#endif
50275970Scy
51294554Sdelphij#ifndef THREAD_MAXSTACKSIZE
52294554Sdelphij# define THREAD_MAXSTACKSIZE	(256U * 1024)
53294554Sdelphij#endif
54294554Sdelphij#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55294554Sdelphij# undef  THREAD_MAXSTACKSIZE
56294554Sdelphij# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57294554Sdelphij#endif
58294554Sdelphij
59338530Sdelphij/* need a good integer to store a pointer... */
60338530Sdelphij#ifndef UINTPTR_T
61338530Sdelphij# if defined(UINTPTR_MAX)
62338530Sdelphij#  define UINTPTR_T uintptr_t
63338530Sdelphij# elif defined(UINT_PTR)
64338530Sdelphij#  define UINTPTR_T UINT_PTR
65338530Sdelphij# else
66338530Sdelphij#  define UINTPTR_T size_t
67338530Sdelphij# endif
68338530Sdelphij#endif
69294554Sdelphij
70338530Sdelphij
71293423Sdelphij#ifdef SYS_WINNT
72275970Scy
73275970Scy# define thread_exit(c)	_endthreadex(c)
74293423Sdelphij# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
75293423Sdelphiju_int	WINAPI	blocking_thread(void *);
76293423Sdelphijstatic BOOL	same_os_sema(const sem_ref obj, void * osobj);
77293423Sdelphij
78275970Scy#else
79293423Sdelphij
80338530Sdelphij# define thread_exit(c)	pthread_exit((void*)(UINTPTR_T)(c))
81275970Scy# define tickle_sem	sem_post
82293423Sdelphijvoid *		blocking_thread(void *);
83293423Sdelphijstatic	void	block_thread_signals(sigset_t *);
84293423Sdelphij
85275970Scy#endif
86275970Scy
87275970Scy#ifdef WORK_PIPE
88275970Scyaddremove_io_fd_func		addremove_io_fd;
89275970Scy#else
90275970Scyaddremove_io_semaphore_func	addremove_io_semaphore;
91275970Scy#endif
92275970Scy
93275970Scystatic	void	start_blocking_thread(blocking_child *);
94275970Scystatic	void	start_blocking_thread_internal(blocking_child *);
95275970Scystatic	void	prepare_child_sems(blocking_child *);
96275970Scystatic	int	wait_for_sem(sem_ref, struct timespec *);
97293423Sdelphijstatic	int	ensure_workitems_empty_slot(blocking_child *);
98293423Sdelphijstatic	int	ensure_workresp_empty_slot(blocking_child *);
99275970Scystatic	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
100275970Scystatic	void	cleanup_after_child(blocking_child *);
101275970Scy
102298695Sdelphijstatic sema_type worker_mmutex;
103298695Sdelphijstatic sem_ref   worker_memlock;
104275970Scy
105298695Sdelphij/* --------------------------------------------------------------------
106298695Sdelphij * locking the global worker state table (and other global stuff)
107298695Sdelphij */
108275970Scyvoid
109298695Sdelphijworker_global_lock(
110298695Sdelphij	int inOrOut)
111298695Sdelphij{
112298695Sdelphij	if (worker_memlock) {
113298695Sdelphij		if (inOrOut)
114298695Sdelphij			wait_for_sem(worker_memlock, NULL);
115298695Sdelphij		else
116298695Sdelphij			tickle_sem(worker_memlock);
117298695Sdelphij	}
118298695Sdelphij}
119298695Sdelphij
120298695Sdelphij/* --------------------------------------------------------------------
121298695Sdelphij * implementation isolation wrapper
122298695Sdelphij */
123298695Sdelphijvoid
124275970Scyexit_worker(
125275970Scy	int	exitcode
126275970Scy	)
127275970Scy{
128275970Scy	thread_exit(exitcode);	/* see #define thread_exit */
129275970Scy}
130275970Scy
131293423Sdelphij/* --------------------------------------------------------------------
132293423Sdelphij * sleep for a given time or until the wakup semaphore is tickled.
133293423Sdelphij */
134275970Scyint
135275970Scyworker_sleep(
136275970Scy	blocking_child *	c,
137275970Scy	time_t			seconds
138275970Scy	)
139275970Scy{
140275970Scy	struct timespec	until;
141275970Scy	int		rc;
142275970Scy
143275970Scy# ifdef HAVE_CLOCK_GETTIME
144275970Scy	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
145275970Scy		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
146275970Scy		return -1;
147275970Scy	}
148275970Scy# else
149275970Scy	if (0 != getclock(TIMEOFDAY, &until)) {
150275970Scy		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
151275970Scy		return -1;
152275970Scy	}
153275970Scy# endif
154275970Scy	until.tv_sec += seconds;
155293423Sdelphij	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
156275970Scy	if (0 == rc)
157275970Scy		return -1;
158275970Scy	if (-1 == rc && ETIMEDOUT == errno)
159275970Scy		return 0;
160275970Scy	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
161275970Scy	return -1;
162275970Scy}
163275970Scy
164275970Scy
165293423Sdelphij/* --------------------------------------------------------------------
166293423Sdelphij * Wake up a worker that takes a nap.
167293423Sdelphij */
168275970Scyvoid
169275970Scyinterrupt_worker_sleep(void)
170275970Scy{
171275970Scy	u_int			idx;
172275970Scy	blocking_child *	c;
173275970Scy
174275970Scy	for (idx = 0; idx < blocking_children_alloc; idx++) {
175275970Scy		c = blocking_children[idx];
176275970Scy		if (NULL == c || NULL == c->wake_scheduled_sleep)
177275970Scy			continue;
178275970Scy		tickle_sem(c->wake_scheduled_sleep);
179275970Scy	}
180275970Scy}
181275970Scy
182293423Sdelphij/* --------------------------------------------------------------------
183293423Sdelphij * Make sure there is an empty slot at the head of the request
184293423Sdelphij * queue. Tell if the queue is currently empty.
185293423Sdelphij */
186293423Sdelphijstatic int
187275970Scyensure_workitems_empty_slot(
188275970Scy	blocking_child *c
189275970Scy	)
190275970Scy{
191293423Sdelphij	/*
192293423Sdelphij	** !!! PRECONDITION: caller holds access lock!
193293423Sdelphij	**
194293423Sdelphij	** This simply tries to increase the size of the buffer if it
195293423Sdelphij	** becomes full. The resize operation does *not* maintain the
196293423Sdelphij	** order of requests, but that should be irrelevant since the
197293423Sdelphij	** processing is considered asynchronous anyway.
198293423Sdelphij	**
199293423Sdelphij	** Return if the buffer is currently empty.
200293423Sdelphij	*/
201293423Sdelphij
202293423Sdelphij	static const size_t each =
203293423Sdelphij	    sizeof(blocking_children[0]->workitems[0]);
204275970Scy
205293423Sdelphij	size_t	new_alloc;
206293423Sdelphij	size_t  slots_used;
207294554Sdelphij	size_t	sidx;
208275970Scy
209293423Sdelphij	slots_used = c->head_workitem - c->tail_workitem;
210293423Sdelphij	if (slots_used >= c->workitems_alloc) {
211293423Sdelphij		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
212293423Sdelphij		c->workitems = erealloc(c->workitems, new_alloc * each);
213294554Sdelphij		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
214294554Sdelphij		    c->workitems[sidx] = NULL;
215293423Sdelphij		c->tail_workitem   = 0;
216293423Sdelphij		c->head_workitem   = c->workitems_alloc;
217293423Sdelphij		c->workitems_alloc = new_alloc;
218293423Sdelphij	}
219294554Sdelphij	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
220293423Sdelphij	return (0 == slots_used);
221275970Scy}
222275970Scy
223293423Sdelphij/* --------------------------------------------------------------------
224293423Sdelphij * Make sure there is an empty slot at the head of the response
225293423Sdelphij * queue. Tell if the queue is currently empty.
226293423Sdelphij */
227293423Sdelphijstatic int
228275970Scyensure_workresp_empty_slot(
229275970Scy	blocking_child *c
230275970Scy	)
231275970Scy{
232293423Sdelphij	/*
233293423Sdelphij	** !!! PRECONDITION: caller holds access lock!
234293423Sdelphij	**
235293423Sdelphij	** Works like the companion function above.
236293423Sdelphij	*/
237293423Sdelphij
238293423Sdelphij	static const size_t each =
239293423Sdelphij	    sizeof(blocking_children[0]->responses[0]);
240275970Scy
241293423Sdelphij	size_t	new_alloc;
242293423Sdelphij	size_t  slots_used;
243294554Sdelphij	size_t	sidx;
244275970Scy
245293423Sdelphij	slots_used = c->head_response - c->tail_response;
246293423Sdelphij	if (slots_used >= c->responses_alloc) {
247293423Sdelphij		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
248293423Sdelphij		c->responses = erealloc(c->responses, new_alloc * each);
249294554Sdelphij		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
250294554Sdelphij		    c->responses[sidx] = NULL;
251293423Sdelphij		c->tail_response   = 0;
252293423Sdelphij		c->head_response   = c->responses_alloc;
253293423Sdelphij		c->responses_alloc = new_alloc;
254293423Sdelphij	}
255294554Sdelphij	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
256293423Sdelphij	return (0 == slots_used);
257275970Scy}
258275970Scy
259275970Scy
260293423Sdelphij/* --------------------------------------------------------------------
261275970Scy * queue_req_pointer() - append a work item or idle exit request to
262293423Sdelphij *			 blocking_workitems[]. Employ proper locking.
263275970Scy */
264275970Scystatic int
265275970Scyqueue_req_pointer(
266275970Scy	blocking_child	*	c,
267275970Scy	blocking_pipe_header *	hdr
268275970Scy	)
269275970Scy{
270293423Sdelphij	size_t qhead;
271293423Sdelphij
272293423Sdelphij	/* >>>> ACCESS LOCKING STARTS >>>> */
273293423Sdelphij	wait_for_sem(c->accesslock, NULL);
274293423Sdelphij	ensure_workitems_empty_slot(c);
275293423Sdelphij	qhead = c->head_workitem;
276293423Sdelphij	c->workitems[qhead % c->workitems_alloc] = hdr;
277293423Sdelphij	c->head_workitem = 1 + qhead;
278293423Sdelphij	tickle_sem(c->accesslock);
279293423Sdelphij	/* <<<< ACCESS LOCKING ENDS <<<< */
280275970Scy
281293423Sdelphij	/* queue consumer wake-up notification */
282293423Sdelphij	tickle_sem(c->workitems_pending);
283275970Scy
284275970Scy	return 0;
285275970Scy}
286275970Scy
287293423Sdelphij/* --------------------------------------------------------------------
288293423Sdelphij * API function to make sure a worker is running, a proper private copy
289293423Sdelphij * of the data is made, the data eneterd into the queue and the worker
290293423Sdelphij * is signalled.
291293423Sdelphij */
292275970Scyint
293275970Scysend_blocking_req_internal(
294275970Scy	blocking_child *	c,
295275970Scy	blocking_pipe_header *	hdr,
296275970Scy	void *			data
297275970Scy	)
298275970Scy{
299275970Scy	blocking_pipe_header *	threadcopy;
300275970Scy	size_t			payload_octets;
301275970Scy
302275970Scy	REQUIRE(hdr != NULL);
303275970Scy	REQUIRE(data != NULL);
304275970Scy	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
305275970Scy
306275970Scy	if (hdr->octets <= sizeof(*hdr))
307275970Scy		return 1;	/* failure */
308275970Scy	payload_octets = hdr->octets - sizeof(*hdr);
309275970Scy
310293423Sdelphij	if (NULL == c->thread_ref)
311275970Scy		start_blocking_thread(c);
312275970Scy	threadcopy = emalloc(hdr->octets);
313275970Scy	memcpy(threadcopy, hdr, sizeof(*hdr));
314275970Scy	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
315275970Scy
316275970Scy	return queue_req_pointer(c, threadcopy);
317275970Scy}
318275970Scy
319293423Sdelphij/* --------------------------------------------------------------------
320293423Sdelphij * Wait for the 'incoming queue no longer empty' signal, lock the shared
321293423Sdelphij * structure and dequeue an item.
322293423Sdelphij */
323275970Scyblocking_pipe_header *
324275970Scyreceive_blocking_req_internal(
325275970Scy	blocking_child *	c
326275970Scy	)
327275970Scy{
328275970Scy	blocking_pipe_header *	req;
329293423Sdelphij	size_t			qhead, qtail;
330275970Scy
331293423Sdelphij	req = NULL;
332275970Scy	do {
333293423Sdelphij		/* wait for tickle from the producer side */
334293423Sdelphij		wait_for_sem(c->workitems_pending, NULL);
335275970Scy
336293423Sdelphij		/* >>>> ACCESS LOCKING STARTS >>>> */
337293423Sdelphij		wait_for_sem(c->accesslock, NULL);
338293423Sdelphij		qhead = c->head_workitem;
339293423Sdelphij		do {
340293423Sdelphij			qtail = c->tail_workitem;
341293423Sdelphij			if (qhead == qtail)
342293423Sdelphij				break;
343293423Sdelphij			c->tail_workitem = qtail + 1;
344293423Sdelphij			qtail %= c->workitems_alloc;
345293423Sdelphij			req = c->workitems[qtail];
346293423Sdelphij			c->workitems[qtail] = NULL;
347293423Sdelphij		} while (NULL == req);
348293423Sdelphij		tickle_sem(c->accesslock);
349293423Sdelphij		/* <<<< ACCESS LOCKING ENDS <<<< */
350293423Sdelphij
351293423Sdelphij	} while (NULL == req);
352293423Sdelphij
353275970Scy	INSIST(NULL != req);
354275970Scy	if (CHILD_EXIT_REQ == req) {	/* idled out */
355275970Scy		send_blocking_resp_internal(c, CHILD_GONE_RESP);
356275970Scy		req = NULL;
357275970Scy	}
358275970Scy
359275970Scy	return req;
360275970Scy}
361275970Scy
362293423Sdelphij/* --------------------------------------------------------------------
363293423Sdelphij * Push a response into the return queue and eventually tickle the
364293423Sdelphij * receiver.
365293423Sdelphij */
366275970Scyint
367275970Scysend_blocking_resp_internal(
368275970Scy	blocking_child *	c,
369275970Scy	blocking_pipe_header *	resp
370275970Scy	)
371275970Scy{
372293423Sdelphij	size_t	qhead;
373293423Sdelphij	int	empty;
374293423Sdelphij
375293423Sdelphij	/* >>>> ACCESS LOCKING STARTS >>>> */
376293423Sdelphij	wait_for_sem(c->accesslock, NULL);
377293423Sdelphij	empty = ensure_workresp_empty_slot(c);
378293423Sdelphij	qhead = c->head_response;
379293423Sdelphij	c->responses[qhead % c->responses_alloc] = resp;
380293423Sdelphij	c->head_response = 1 + qhead;
381293423Sdelphij	tickle_sem(c->accesslock);
382293423Sdelphij	/* <<<< ACCESS LOCKING ENDS <<<< */
383275970Scy
384293423Sdelphij	/* queue consumer wake-up notification */
385293423Sdelphij	if (empty)
386293423Sdelphij	{
387293423Sdelphij#	    ifdef WORK_PIPE
388338530Sdelphij		if (1 != write(c->resp_write_pipe, "", 1))
389338530Sdelphij			msyslog(LOG_WARNING, "async resolver: %s",
390338530Sdelphij				"failed to notify main thread!");
391293423Sdelphij#	    else
392293423Sdelphij		tickle_sem(c->responses_pending);
393293423Sdelphij#	    endif
394293423Sdelphij	}
395275970Scy	return 0;
396275970Scy}
397275970Scy
398275970Scy
399275970Scy#ifndef WORK_PIPE
400293423Sdelphij
401293423Sdelphij/* --------------------------------------------------------------------
402293423Sdelphij * Check if a (Windows-)hanndle to a semaphore is actually the same we
403293423Sdelphij * are using inside the sema wrapper.
404293423Sdelphij */
405293423Sdelphijstatic BOOL
406293423Sdelphijsame_os_sema(
407293423Sdelphij	const sem_ref	obj,
408293423Sdelphij	void*		osh
409293423Sdelphij	)
410293423Sdelphij{
411293423Sdelphij	return obj && osh && (obj->shnd == (HANDLE)osh);
412293423Sdelphij}
413293423Sdelphij
414293423Sdelphij/* --------------------------------------------------------------------
415293423Sdelphij * Find the shared context that associates to an OS handle and make sure
416293423Sdelphij * the data is dequeued and processed.
417293423Sdelphij */
418275970Scyvoid
419275970Scyhandle_blocking_resp_sem(
420275970Scy	void *	context
421275970Scy	)
422275970Scy{
423275970Scy	blocking_child *	c;
424275970Scy	u_int			idx;
425275970Scy
426275970Scy	c = NULL;
427275970Scy	for (idx = 0; idx < blocking_children_alloc; idx++) {
428275970Scy		c = blocking_children[idx];
429293423Sdelphij		if (c != NULL &&
430293423Sdelphij			c->thread_ref != NULL &&
431293423Sdelphij			same_os_sema(c->responses_pending, context))
432275970Scy			break;
433275970Scy	}
434275970Scy	if (idx < blocking_children_alloc)
435275970Scy		process_blocking_resp(c);
436275970Scy}
437275970Scy#endif	/* !WORK_PIPE */
438275970Scy
439293423Sdelphij/* --------------------------------------------------------------------
440293423Sdelphij * Fetch the next response from the return queue. In case of signalling
441293423Sdelphij * via pipe, make sure the pipe is flushed, too.
442293423Sdelphij */
443275970Scyblocking_pipe_header *
444275970Scyreceive_blocking_resp_internal(
445275970Scy	blocking_child *	c
446275970Scy	)
447275970Scy{
448275970Scy	blocking_pipe_header *	removed;
449293423Sdelphij	size_t			qhead, qtail, slot;
450293423Sdelphij
451275970Scy#ifdef WORK_PIPE
452275970Scy	int			rc;
453275970Scy	char			scratch[32];
454275970Scy
455293423Sdelphij	do
456275970Scy		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
457293423Sdelphij	while (-1 == rc && EINTR == errno);
458275970Scy#endif
459293423Sdelphij
460293423Sdelphij	/* >>>> ACCESS LOCKING STARTS >>>> */
461293423Sdelphij	wait_for_sem(c->accesslock, NULL);
462293423Sdelphij	qhead = c->head_response;
463293423Sdelphij	qtail = c->tail_response;
464293423Sdelphij	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
465293423Sdelphij		slot = qtail % c->responses_alloc;
466293423Sdelphij		removed = c->responses[slot];
467293423Sdelphij		c->responses[slot] = NULL;
468293423Sdelphij	}
469293423Sdelphij	c->tail_response = qtail;
470293423Sdelphij	tickle_sem(c->accesslock);
471293423Sdelphij	/* <<<< ACCESS LOCKING ENDS <<<< */
472293423Sdelphij
473275970Scy	if (NULL != removed) {
474275970Scy		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
475275970Scy			     BLOCKING_RESP_MAGIC == removed->magic_sig);
476275970Scy	}
477275970Scy	if (CHILD_GONE_RESP == removed) {
478275970Scy		cleanup_after_child(c);
479275970Scy		removed = NULL;
480275970Scy	}
481275970Scy
482275970Scy	return removed;
483275970Scy}
484275970Scy
485293423Sdelphij/* --------------------------------------------------------------------
486293423Sdelphij * Light up a new worker.
487293423Sdelphij */
488275970Scystatic void
489275970Scystart_blocking_thread(
490275970Scy	blocking_child *	c
491275970Scy	)
492275970Scy{
493275970Scy
494275970Scy	DEBUG_INSIST(!c->reusable);
495275970Scy
496275970Scy	prepare_child_sems(c);
497275970Scy	start_blocking_thread_internal(c);
498275970Scy}
499275970Scy
500293423Sdelphij/* --------------------------------------------------------------------
501293423Sdelphij * Create a worker thread. There are several differences between POSIX
502293423Sdelphij * and Windows, of course -- most notably the Windows thread is no
503293423Sdelphij * detached thread, and we keep the handle around until we want to get
504293423Sdelphij * rid of the thread. The notification scheme also differs: Windows
505293423Sdelphij * makes use of semaphores in both directions, POSIX uses a pipe for
506293423Sdelphij * integration with 'select()' or alike.
507293423Sdelphij */
508275970Scystatic void
509275970Scystart_blocking_thread_internal(
510275970Scy	blocking_child *	c
511275970Scy	)
512275970Scy#ifdef SYS_WINNT
513275970Scy{
514275970Scy	BOOL	resumed;
515275970Scy
516293423Sdelphij	c->thread_ref = NULL;
517293423Sdelphij	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
518293423Sdelphij	c->thr_table[0].thnd =
519275970Scy		(HANDLE)_beginthreadex(
520275970Scy			NULL,
521275970Scy			0,
522275970Scy			&blocking_thread,
523275970Scy			c,
524275970Scy			CREATE_SUSPENDED,
525293423Sdelphij			NULL);
526275970Scy
527293423Sdelphij	if (NULL == c->thr_table[0].thnd) {
528275970Scy		msyslog(LOG_ERR, "start blocking thread failed: %m");
529275970Scy		exit(-1);
530275970Scy	}
531275970Scy	/* remember the thread priority is only within the process class */
532293423Sdelphij	if (!SetThreadPriority(c->thr_table[0].thnd,
533275970Scy			       THREAD_PRIORITY_BELOW_NORMAL))
534275970Scy		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
535275970Scy
536293423Sdelphij	resumed = ResumeThread(c->thr_table[0].thnd);
537275970Scy	DEBUG_INSIST(resumed);
538293423Sdelphij	c->thread_ref = &c->thr_table[0];
539275970Scy}
540275970Scy#else	/* pthreads start_blocking_thread_internal() follows */
541275970Scy{
542275970Scy# ifdef NEED_PTHREAD_INIT
543275970Scy	static int	pthread_init_called;
544275970Scy# endif
545275970Scy	pthread_attr_t	thr_attr;
546275970Scy	int		rc;
547275970Scy	int		pipe_ends[2];	/* read then write */
548275970Scy	int		is_pipe;
549275970Scy	int		flags;
550294554Sdelphij	size_t		ostacksize;
551294554Sdelphij	size_t		nstacksize;
552275970Scy	sigset_t	saved_sig_mask;
553275970Scy
554293423Sdelphij	c->thread_ref = NULL;
555293423Sdelphij
556275970Scy# ifdef NEED_PTHREAD_INIT
557275970Scy	/*
558275970Scy	 * from lib/isc/unix/app.c:
559275970Scy	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
560275970Scy	 */
561275970Scy	if (!pthread_init_called) {
562275970Scy		pthread_init();
563275970Scy		pthread_init_called = TRUE;
564275970Scy	}
565275970Scy# endif
566275970Scy
567275970Scy	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
568275970Scy	if (0 != rc) {
569275970Scy		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
570275970Scy		exit(1);
571275970Scy	}
572275970Scy	c->resp_read_pipe = move_fd(pipe_ends[0]);
573275970Scy	c->resp_write_pipe = move_fd(pipe_ends[1]);
574275970Scy	c->ispipe = is_pipe;
575275970Scy	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
576275970Scy	if (-1 == flags) {
577275970Scy		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
578275970Scy		exit(1);
579275970Scy	}
580275970Scy	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
581275970Scy	if (-1 == rc) {
582275970Scy		msyslog(LOG_ERR,
583275970Scy			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
584275970Scy		exit(1);
585275970Scy	}
586275970Scy	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
587275970Scy	pthread_attr_init(&thr_attr);
588275970Scy	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
589275970Scy#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
590275970Scy    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
591294554Sdelphij	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
592294554Sdelphij	if (0 != rc) {
593275970Scy		msyslog(LOG_ERR,
594294554Sdelphij			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
595294554Sdelphij			strerror(rc));
596294554Sdelphij	} else {
597294554Sdelphij		if (ostacksize < THREAD_MINSTACKSIZE)
598294554Sdelphij			nstacksize = THREAD_MINSTACKSIZE;
599294554Sdelphij		else if (ostacksize > THREAD_MAXSTACKSIZE)
600294554Sdelphij			nstacksize = THREAD_MAXSTACKSIZE;
601294554Sdelphij		else
602294554Sdelphij			nstacksize = ostacksize;
603294554Sdelphij		if (nstacksize != ostacksize)
604294554Sdelphij			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
605294554Sdelphij		if (0 != rc)
606275970Scy			msyslog(LOG_ERR,
607294554Sdelphij				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
608294554Sdelphij				(u_long)ostacksize, (u_long)nstacksize,
609294554Sdelphij				strerror(rc));
610275970Scy	}
611275970Scy#else
612294554Sdelphij	UNUSED_ARG(nstacksize);
613294554Sdelphij	UNUSED_ARG(ostacksize);
614275970Scy#endif
615275970Scy#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
616275970Scy	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
617275970Scy#endif
618275970Scy	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
619275970Scy	block_thread_signals(&saved_sig_mask);
620293423Sdelphij	rc = pthread_create(&c->thr_table[0], &thr_attr,
621275970Scy			    &blocking_thread, c);
622275970Scy	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
623275970Scy	pthread_attr_destroy(&thr_attr);
624275970Scy	if (0 != rc) {
625294554Sdelphij		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
626294554Sdelphij			strerror(rc));
627275970Scy		exit(1);
628275970Scy	}
629293423Sdelphij	c->thread_ref = &c->thr_table[0];
630275970Scy}
631275970Scy#endif
632275970Scy
633293423Sdelphij/* --------------------------------------------------------------------
634275970Scy * block_thread_signals()
635275970Scy *
636275970Scy * Temporarily block signals used by ntpd main thread, so that signal
637275970Scy * mask inherited by child threads leaves them blocked.  Returns prior
638275970Scy * active signal mask via pmask, to be restored by the main thread
639275970Scy * after pthread_create().
640275970Scy */
641275970Scy#ifndef SYS_WINNT
642275970Scyvoid
643275970Scyblock_thread_signals(
644275970Scy	sigset_t *	pmask
645275970Scy	)
646275970Scy{
647275970Scy	sigset_t	block;
648275970Scy
649275970Scy	sigemptyset(&block);
650275970Scy# ifdef HAVE_SIGNALED_IO
651275970Scy#  ifdef SIGIO
652275970Scy	sigaddset(&block, SIGIO);
653275970Scy#  endif
654275970Scy#  ifdef SIGPOLL
655275970Scy	sigaddset(&block, SIGPOLL);
656275970Scy#  endif
657275970Scy# endif	/* HAVE_SIGNALED_IO */
658275970Scy	sigaddset(&block, SIGALRM);
659275970Scy	sigaddset(&block, MOREDEBUGSIG);
660275970Scy	sigaddset(&block, LESSDEBUGSIG);
661275970Scy# ifdef SIGDIE1
662275970Scy	sigaddset(&block, SIGDIE1);
663275970Scy# endif
664275970Scy# ifdef SIGDIE2
665275970Scy	sigaddset(&block, SIGDIE2);
666275970Scy# endif
667275970Scy# ifdef SIGDIE3
668275970Scy	sigaddset(&block, SIGDIE3);
669275970Scy# endif
670275970Scy# ifdef SIGDIE4
671275970Scy	sigaddset(&block, SIGDIE4);
672275970Scy# endif
673275970Scy# ifdef SIGBUS
674275970Scy	sigaddset(&block, SIGBUS);
675275970Scy# endif
676275970Scy	sigemptyset(pmask);
677275970Scy	pthread_sigmask(SIG_BLOCK, &block, pmask);
678275970Scy}
679275970Scy#endif	/* !SYS_WINNT */
680275970Scy
681275970Scy
682293423Sdelphij/* --------------------------------------------------------------------
683293423Sdelphij * Create & destroy semaphores. This is sufficiently different between
684293423Sdelphij * POSIX and Windows to warrant wrapper functions and close enough to
685293423Sdelphij * use the concept of synchronization via semaphore for all platforms.
686293423Sdelphij */
687293423Sdelphijstatic sem_ref
688293423Sdelphijcreate_sema(
689293423Sdelphij	sema_type*	semptr,
690293423Sdelphij	u_int		inival,
691293423Sdelphij	u_int		maxval)
692293423Sdelphij{
693293423Sdelphij#ifdef SYS_WINNT
694293423Sdelphij
695293423Sdelphij	long svini, svmax;
696293423Sdelphij	if (NULL != semptr) {
697293423Sdelphij		svini = (inival < LONG_MAX)
698293423Sdelphij		    ? (long)inival : LONG_MAX;
699293423Sdelphij		svmax = (maxval < LONG_MAX && maxval > 0)
700293423Sdelphij		    ? (long)maxval : LONG_MAX;
701293423Sdelphij		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
702293423Sdelphij		if (NULL == semptr->shnd)
703293423Sdelphij			semptr = NULL;
704293423Sdelphij	}
705293423Sdelphij
706293423Sdelphij#else
707293423Sdelphij
708293423Sdelphij	(void)maxval;
709293423Sdelphij	if (semptr && sem_init(semptr, FALSE, inival))
710293423Sdelphij		semptr = NULL;
711293423Sdelphij
712293423Sdelphij#endif
713293423Sdelphij
714293423Sdelphij	return semptr;
715293423Sdelphij}
716293423Sdelphij
717293423Sdelphij/* ------------------------------------------------------------------ */
718293423Sdelphijstatic sem_ref
719293423Sdelphijdelete_sema(
720293423Sdelphij	sem_ref obj)
721293423Sdelphij{
722293423Sdelphij
723293423Sdelphij#   ifdef SYS_WINNT
724293423Sdelphij
725293423Sdelphij	if (obj) {
726293423Sdelphij		if (obj->shnd)
727293423Sdelphij			CloseHandle(obj->shnd);
728293423Sdelphij		obj->shnd = NULL;
729293423Sdelphij	}
730293423Sdelphij
731293423Sdelphij#   else
732293423Sdelphij
733293423Sdelphij	if (obj)
734293423Sdelphij		sem_destroy(obj);
735293423Sdelphij
736293423Sdelphij#   endif
737293423Sdelphij
738293423Sdelphij	return NULL;
739293423Sdelphij}
740293423Sdelphij
741293423Sdelphij/* --------------------------------------------------------------------
742275970Scy * prepare_child_sems()
743275970Scy *
744293423Sdelphij * create sync & access semaphores
745275970Scy *
746293423Sdelphij * All semaphores are cleared, only the access semaphore has 1 unit.
747293423Sdelphij * Childs wait on 'workitems_pending', then grabs 'sema_access'
748293423Sdelphij * and dequeues jobs. When done, 'sema_access' is given one unit back.
749293423Sdelphij *
750293423Sdelphij * The producer grabs 'sema_access', manages the queue, restores
751293423Sdelphij * 'sema_access' and puts one unit into 'workitems_pending'.
752293423Sdelphij *
753293423Sdelphij * The story goes the same for the response queue.
754275970Scy */
755275970Scystatic void
756275970Scyprepare_child_sems(
757275970Scy	blocking_child *c
758275970Scy	)
759275970Scy{
760298695Sdelphij	if (NULL == worker_memlock)
761298695Sdelphij		worker_memlock = create_sema(&worker_mmutex, 1, 1);
762298695Sdelphij
763293423Sdelphij	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
764293423Sdelphij	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
765293423Sdelphij	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
766293423Sdelphij#   ifndef WORK_PIPE
767293423Sdelphij	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
768293423Sdelphij#   endif
769275970Scy}
770275970Scy
771293423Sdelphij/* --------------------------------------------------------------------
772293423Sdelphij * wait for semaphore. Where the wait can be interrupted, it will
773293423Sdelphij * internally resume -- When this function returns, there is either no
774293423Sdelphij * semaphore at all, a timeout occurred, or the caller could
775293423Sdelphij * successfully take a token from the semaphore.
776293423Sdelphij *
777293423Sdelphij * For untimed wait, not checking the result of this function at all is
778293423Sdelphij * definitely an option.
779293423Sdelphij */
780275970Scystatic int
781275970Scywait_for_sem(
782275970Scy	sem_ref			sem,
783275970Scy	struct timespec *	timeout		/* wall-clock */
784275970Scy	)
785275970Scy#ifdef SYS_WINNT
786275970Scy{
787275970Scy	struct timespec now;
788275970Scy	struct timespec delta;
789275970Scy	DWORD		msec;
790275970Scy	DWORD		rc;
791275970Scy
792293423Sdelphij	if (!(sem && sem->shnd)) {
793293423Sdelphij		errno = EINVAL;
794293423Sdelphij		return -1;
795293423Sdelphij	}
796293423Sdelphij
797275970Scy	if (NULL == timeout) {
798275970Scy		msec = INFINITE;
799275970Scy	} else {
800275970Scy		getclock(TIMEOFDAY, &now);
801275970Scy		delta = sub_tspec(*timeout, now);
802275970Scy		if (delta.tv_sec < 0) {
803275970Scy			msec = 0;
804275970Scy		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
805275970Scy			msec = INFINITE;
806275970Scy		} else {
807275970Scy			msec = 1000 * (DWORD)delta.tv_sec;
808275970Scy			msec += delta.tv_nsec / (1000 * 1000);
809275970Scy		}
810275970Scy	}
811293423Sdelphij	rc = WaitForSingleObject(sem->shnd, msec);
812275970Scy	if (WAIT_OBJECT_0 == rc)
813275970Scy		return 0;
814275970Scy	if (WAIT_TIMEOUT == rc) {
815275970Scy		errno = ETIMEDOUT;
816275970Scy		return -1;
817275970Scy	}
818275970Scy	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
819275970Scy	errno = EFAULT;
820275970Scy	return -1;
821275970Scy}
822275970Scy#else	/* pthreads wait_for_sem() follows */
823275970Scy{
824293423Sdelphij	int rc = -1;
825275970Scy
826293423Sdelphij	if (sem) do {
827293423Sdelphij			if (NULL == timeout)
828293423Sdelphij				rc = sem_wait(sem);
829293423Sdelphij			else
830293423Sdelphij				rc = sem_timedwait(sem, timeout);
831293423Sdelphij		} while (rc == -1 && errno == EINTR);
832275970Scy	else
833293423Sdelphij		errno = EINVAL;
834293423Sdelphij
835275970Scy	return rc;
836275970Scy}
837275970Scy#endif
838275970Scy
839293423Sdelphij/* --------------------------------------------------------------------
840293423Sdelphij * blocking_thread - thread functions have WINAPI (aka 'stdcall')
841293423Sdelphij * calling conventions under Windows and POSIX-defined signature
842293423Sdelphij * otherwise.
843275970Scy */
844275970Scy#ifdef SYS_WINNT
845293423Sdelphiju_int WINAPI
846275970Scy#else
847275970Scyvoid *
848275970Scy#endif
849275970Scyblocking_thread(
850275970Scy	void *	ThreadArg
851275970Scy	)
852275970Scy{
853275970Scy	blocking_child *c;
854275970Scy
855275970Scy	c = ThreadArg;
856275970Scy	exit_worker(blocking_child_common(c));
857275970Scy
858275970Scy	/* NOTREACHED */
859275970Scy	return 0;
860275970Scy}
861275970Scy
862293423Sdelphij/* --------------------------------------------------------------------
863275970Scy * req_child_exit() runs in the parent.
864293423Sdelphij *
865293423Sdelphij * This function is called from from the idle timer, too, and possibly
866293423Sdelphij * without a thread being there any longer. Since we have folded up our
867293423Sdelphij * tent in that case and all the semaphores are already gone, we simply
868293423Sdelphij * ignore this request in this case.
869293423Sdelphij *
870293423Sdelphij * Since the existence of the semaphores is controlled exclusively by
871293423Sdelphij * the parent, there's no risk of data race here.
872275970Scy */
873275970Scyint
874275970Scyreq_child_exit(
875275970Scy	blocking_child *c
876275970Scy	)
877275970Scy{
878293423Sdelphij	return (c->accesslock)
879293423Sdelphij	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
880293423Sdelphij	    : 0;
881275970Scy}
882275970Scy
883293423Sdelphij/* --------------------------------------------------------------------
884275970Scy * cleanup_after_child() runs in parent.
885275970Scy */
886275970Scystatic void
887275970Scycleanup_after_child(
888275970Scy	blocking_child *	c
889275970Scy	)
890275970Scy{
891275970Scy	DEBUG_INSIST(!c->reusable);
892293423Sdelphij
893293423Sdelphij#   ifdef SYS_WINNT
894293423Sdelphij	/* The thread was not created in detached state, so we better
895293423Sdelphij	 * clean up.
896293423Sdelphij	 */
897293423Sdelphij	if (c->thread_ref && c->thread_ref->thnd) {
898293423Sdelphij		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
899293423Sdelphij		INSIST(CloseHandle(c->thread_ref->thnd));
900293423Sdelphij		c->thread_ref->thnd = NULL;
901293423Sdelphij	}
902293423Sdelphij#   endif
903275970Scy	c->thread_ref = NULL;
904293423Sdelphij
905293423Sdelphij	/* remove semaphores and (if signalling vi IO) pipes */
906293423Sdelphij
907293423Sdelphij	c->accesslock           = delete_sema(c->accesslock);
908293423Sdelphij	c->workitems_pending    = delete_sema(c->workitems_pending);
909293423Sdelphij	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
910293423Sdelphij
911293423Sdelphij#   ifdef WORK_PIPE
912275970Scy	DEBUG_INSIST(-1 != c->resp_read_pipe);
913275970Scy	DEBUG_INSIST(-1 != c->resp_write_pipe);
914275970Scy	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
915275970Scy	close(c->resp_write_pipe);
916275970Scy	close(c->resp_read_pipe);
917275970Scy	c->resp_write_pipe = -1;
918275970Scy	c->resp_read_pipe = -1;
919293423Sdelphij#   else
920293423Sdelphij	DEBUG_INSIST(NULL != c->responses_pending);
921293423Sdelphij	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
922293423Sdelphij	c->responses_pending = delete_sema(c->responses_pending);
923293423Sdelphij#   endif
924293423Sdelphij
925293423Sdelphij	/* Is it necessary to check if there are pending requests and
926293423Sdelphij	 * responses? If so, and if there are, what to do with them?
927293423Sdelphij	 */
928293423Sdelphij
929293423Sdelphij	/* re-init buffer index sequencers */
930293423Sdelphij	c->head_workitem = 0;
931293423Sdelphij	c->tail_workitem = 0;
932293423Sdelphij	c->head_response = 0;
933293423Sdelphij	c->tail_response = 0;
934293423Sdelphij
935275970Scy	c->reusable = TRUE;
936275970Scy}
937275970Scy
938275970Scy
939275970Scy#else	/* !WORK_THREAD follows */
940275970Scychar work_thread_nonempty_compilation_unit;
941275970Scy#endif
942