work_thread.c revision 330106
1275970Scy/*
2275970Scy * work_thread.c - threads implementation for blocking worker child.
3275970Scy */
4275970Scy#include <config.h>
5275970Scy#include "ntp_workimpl.h"
6275970Scy
7275970Scy#ifdef WORK_THREAD
8275970Scy
9275970Scy#include <stdio.h>
10275970Scy#include <ctype.h>
11275970Scy#include <signal.h>
12275970Scy#ifndef SYS_WINNT
13275970Scy#include <pthread.h>
14275970Scy#endif
15275970Scy
16275970Scy#include "ntp_stdlib.h"
17275970Scy#include "ntp_malloc.h"
18275970Scy#include "ntp_syslog.h"
19275970Scy#include "ntpd.h"
20275970Scy#include "ntp_io.h"
21275970Scy#include "ntp_assert.h"
22275970Scy#include "ntp_unixtime.h"
23275970Scy#include "timespecops.h"
24275970Scy#include "ntp_worker.h"
25275970Scy
26275970Scy#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27275970Scy#define CHILD_GONE_RESP	CHILD_EXIT_REQ
28294554Sdelphij/* Queue size increments:
29294554Sdelphij * The request queue grows a bit faster than the response queue -- the
30330106Sdelphij * daemon can push requests and pull results faster on avarage than the
31294554Sdelphij * worker can process requests and push results...  If this really pays
32294554Sdelphij * off is debatable.
33294554Sdelphij */
34275970Scy#define WORKITEMS_ALLOC_INC	16
35275970Scy#define RESPONSES_ALLOC_INC	4
36275970Scy
37294554Sdelphij/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38294554Sdelphij * set the maximum to 256kB. If the minimum goes below the
39294554Sdelphij * system-defined minimum stack size, we have to adjust accordingly.
40294554Sdelphij */
41275970Scy#ifndef THREAD_MINSTACKSIZE
42294554Sdelphij# define THREAD_MINSTACKSIZE	(64U * 1024)
43275970Scy#endif
44294554Sdelphij#ifndef __sun
45294554Sdelphij#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46294554Sdelphij# undef THREAD_MINSTACKSIZE
47294554Sdelphij# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48294554Sdelphij#endif
49294554Sdelphij#endif
50275970Scy
51294554Sdelphij#ifndef THREAD_MAXSTACKSIZE
52294554Sdelphij# define THREAD_MAXSTACKSIZE	(256U * 1024)
53294554Sdelphij#endif
54294554Sdelphij#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55294554Sdelphij# undef  THREAD_MAXSTACKSIZE
56294554Sdelphij# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57294554Sdelphij#endif
58294554Sdelphij
59294554Sdelphij
60293423Sdelphij#ifdef SYS_WINNT
61275970Scy
62275970Scy# define thread_exit(c)	_endthreadex(c)
63293423Sdelphij# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
64293423Sdelphiju_int	WINAPI	blocking_thread(void *);
65293423Sdelphijstatic BOOL	same_os_sema(const sem_ref obj, void * osobj);
66293423Sdelphij
67275970Scy#else
68293423Sdelphij
69275970Scy# define thread_exit(c)	pthread_exit((void*)(size_t)(c))
70275970Scy# define tickle_sem	sem_post
71293423Sdelphijvoid *		blocking_thread(void *);
72293423Sdelphijstatic	void	block_thread_signals(sigset_t *);
73293423Sdelphij
74275970Scy#endif
75275970Scy
76275970Scy#ifdef WORK_PIPE
77275970Scyaddremove_io_fd_func		addremove_io_fd;
78275970Scy#else
79275970Scyaddremove_io_semaphore_func	addremove_io_semaphore;
80275970Scy#endif
81275970Scy
82275970Scystatic	void	start_blocking_thread(blocking_child *);
83275970Scystatic	void	start_blocking_thread_internal(blocking_child *);
84275970Scystatic	void	prepare_child_sems(blocking_child *);
85275970Scystatic	int	wait_for_sem(sem_ref, struct timespec *);
86293423Sdelphijstatic	int	ensure_workitems_empty_slot(blocking_child *);
87293423Sdelphijstatic	int	ensure_workresp_empty_slot(blocking_child *);
88275970Scystatic	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
89275970Scystatic	void	cleanup_after_child(blocking_child *);
90275970Scy
91298695Sdelphijstatic sema_type worker_mmutex;
92298695Sdelphijstatic sem_ref   worker_memlock;
93275970Scy
94298695Sdelphij/* --------------------------------------------------------------------
95298695Sdelphij * locking the global worker state table (and other global stuff)
96298695Sdelphij */
97275970Scyvoid
98298695Sdelphijworker_global_lock(
99298695Sdelphij	int inOrOut)
100298695Sdelphij{
101298695Sdelphij	if (worker_memlock) {
102298695Sdelphij		if (inOrOut)
103298695Sdelphij			wait_for_sem(worker_memlock, NULL);
104298695Sdelphij		else
105298695Sdelphij			tickle_sem(worker_memlock);
106298695Sdelphij	}
107298695Sdelphij}
108298695Sdelphij
109298695Sdelphij/* --------------------------------------------------------------------
110298695Sdelphij * implementation isolation wrapper
111298695Sdelphij */
112298695Sdelphijvoid
113275970Scyexit_worker(
114275970Scy	int	exitcode
115275970Scy	)
116275970Scy{
117275970Scy	thread_exit(exitcode);	/* see #define thread_exit */
118275970Scy}
119275970Scy
120293423Sdelphij/* --------------------------------------------------------------------
121293423Sdelphij * sleep for a given time or until the wakup semaphore is tickled.
122293423Sdelphij */
123275970Scyint
124275970Scyworker_sleep(
125275970Scy	blocking_child *	c,
126275970Scy	time_t			seconds
127275970Scy	)
128275970Scy{
129275970Scy	struct timespec	until;
130275970Scy	int		rc;
131275970Scy
132275970Scy# ifdef HAVE_CLOCK_GETTIME
133275970Scy	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
134275970Scy		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
135275970Scy		return -1;
136275970Scy	}
137275970Scy# else
138275970Scy	if (0 != getclock(TIMEOFDAY, &until)) {
139275970Scy		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
140275970Scy		return -1;
141275970Scy	}
142275970Scy# endif
143275970Scy	until.tv_sec += seconds;
144293423Sdelphij	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
145275970Scy	if (0 == rc)
146275970Scy		return -1;
147275970Scy	if (-1 == rc && ETIMEDOUT == errno)
148275970Scy		return 0;
149275970Scy	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
150275970Scy	return -1;
151275970Scy}
152275970Scy
153275970Scy
154293423Sdelphij/* --------------------------------------------------------------------
155293423Sdelphij * Wake up a worker that takes a nap.
156293423Sdelphij */
157275970Scyvoid
158275970Scyinterrupt_worker_sleep(void)
159275970Scy{
160275970Scy	u_int			idx;
161275970Scy	blocking_child *	c;
162275970Scy
163275970Scy	for (idx = 0; idx < blocking_children_alloc; idx++) {
164275970Scy		c = blocking_children[idx];
165275970Scy		if (NULL == c || NULL == c->wake_scheduled_sleep)
166275970Scy			continue;
167275970Scy		tickle_sem(c->wake_scheduled_sleep);
168275970Scy	}
169275970Scy}
170275970Scy
171293423Sdelphij/* --------------------------------------------------------------------
172293423Sdelphij * Make sure there is an empty slot at the head of the request
173293423Sdelphij * queue. Tell if the queue is currently empty.
174293423Sdelphij */
175293423Sdelphijstatic int
176275970Scyensure_workitems_empty_slot(
177275970Scy	blocking_child *c
178275970Scy	)
179275970Scy{
180293423Sdelphij	/*
181293423Sdelphij	** !!! PRECONDITION: caller holds access lock!
182293423Sdelphij	**
183293423Sdelphij	** This simply tries to increase the size of the buffer if it
184293423Sdelphij	** becomes full. The resize operation does *not* maintain the
185293423Sdelphij	** order of requests, but that should be irrelevant since the
186293423Sdelphij	** processing is considered asynchronous anyway.
187293423Sdelphij	**
188293423Sdelphij	** Return if the buffer is currently empty.
189293423Sdelphij	*/
190293423Sdelphij
191293423Sdelphij	static const size_t each =
192293423Sdelphij	    sizeof(blocking_children[0]->workitems[0]);
193275970Scy
194293423Sdelphij	size_t	new_alloc;
195293423Sdelphij	size_t  slots_used;
196294554Sdelphij	size_t	sidx;
197275970Scy
198293423Sdelphij	slots_used = c->head_workitem - c->tail_workitem;
199293423Sdelphij	if (slots_used >= c->workitems_alloc) {
200293423Sdelphij		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
201293423Sdelphij		c->workitems = erealloc(c->workitems, new_alloc * each);
202294554Sdelphij		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
203294554Sdelphij		    c->workitems[sidx] = NULL;
204293423Sdelphij		c->tail_workitem   = 0;
205293423Sdelphij		c->head_workitem   = c->workitems_alloc;
206293423Sdelphij		c->workitems_alloc = new_alloc;
207293423Sdelphij	}
208294554Sdelphij	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
209293423Sdelphij	return (0 == slots_used);
210275970Scy}
211275970Scy
212293423Sdelphij/* --------------------------------------------------------------------
213293423Sdelphij * Make sure there is an empty slot at the head of the response
214293423Sdelphij * queue. Tell if the queue is currently empty.
215293423Sdelphij */
216293423Sdelphijstatic int
217275970Scyensure_workresp_empty_slot(
218275970Scy	blocking_child *c
219275970Scy	)
220275970Scy{
221293423Sdelphij	/*
222293423Sdelphij	** !!! PRECONDITION: caller holds access lock!
223293423Sdelphij	**
224293423Sdelphij	** Works like the companion function above.
225293423Sdelphij	*/
226293423Sdelphij
227293423Sdelphij	static const size_t each =
228293423Sdelphij	    sizeof(blocking_children[0]->responses[0]);
229275970Scy
230293423Sdelphij	size_t	new_alloc;
231293423Sdelphij	size_t  slots_used;
232294554Sdelphij	size_t	sidx;
233275970Scy
234293423Sdelphij	slots_used = c->head_response - c->tail_response;
235293423Sdelphij	if (slots_used >= c->responses_alloc) {
236293423Sdelphij		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
237293423Sdelphij		c->responses = erealloc(c->responses, new_alloc * each);
238294554Sdelphij		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
239294554Sdelphij		    c->responses[sidx] = NULL;
240293423Sdelphij		c->tail_response   = 0;
241293423Sdelphij		c->head_response   = c->responses_alloc;
242293423Sdelphij		c->responses_alloc = new_alloc;
243293423Sdelphij	}
244294554Sdelphij	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
245293423Sdelphij	return (0 == slots_used);
246275970Scy}
247275970Scy
248275970Scy
249293423Sdelphij/* --------------------------------------------------------------------
250275970Scy * queue_req_pointer() - append a work item or idle exit request to
251293423Sdelphij *			 blocking_workitems[]. Employ proper locking.
252275970Scy */
253275970Scystatic int
254275970Scyqueue_req_pointer(
255275970Scy	blocking_child	*	c,
256275970Scy	blocking_pipe_header *	hdr
257275970Scy	)
258275970Scy{
259293423Sdelphij	size_t qhead;
260293423Sdelphij
261293423Sdelphij	/* >>>> ACCESS LOCKING STARTS >>>> */
262293423Sdelphij	wait_for_sem(c->accesslock, NULL);
263293423Sdelphij	ensure_workitems_empty_slot(c);
264293423Sdelphij	qhead = c->head_workitem;
265293423Sdelphij	c->workitems[qhead % c->workitems_alloc] = hdr;
266293423Sdelphij	c->head_workitem = 1 + qhead;
267293423Sdelphij	tickle_sem(c->accesslock);
268293423Sdelphij	/* <<<< ACCESS LOCKING ENDS <<<< */
269275970Scy
270293423Sdelphij	/* queue consumer wake-up notification */
271293423Sdelphij	tickle_sem(c->workitems_pending);
272275970Scy
273275970Scy	return 0;
274275970Scy}
275275970Scy
276293423Sdelphij/* --------------------------------------------------------------------
277293423Sdelphij * API function to make sure a worker is running, a proper private copy
278293423Sdelphij * of the data is made, the data eneterd into the queue and the worker
279293423Sdelphij * is signalled.
280293423Sdelphij */
281275970Scyint
282275970Scysend_blocking_req_internal(
283275970Scy	blocking_child *	c,
284275970Scy	blocking_pipe_header *	hdr,
285275970Scy	void *			data
286275970Scy	)
287275970Scy{
288275970Scy	blocking_pipe_header *	threadcopy;
289275970Scy	size_t			payload_octets;
290275970Scy
291275970Scy	REQUIRE(hdr != NULL);
292275970Scy	REQUIRE(data != NULL);
293275970Scy	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
294275970Scy
295275970Scy	if (hdr->octets <= sizeof(*hdr))
296275970Scy		return 1;	/* failure */
297275970Scy	payload_octets = hdr->octets - sizeof(*hdr);
298275970Scy
299293423Sdelphij	if (NULL == c->thread_ref)
300275970Scy		start_blocking_thread(c);
301275970Scy	threadcopy = emalloc(hdr->octets);
302275970Scy	memcpy(threadcopy, hdr, sizeof(*hdr));
303275970Scy	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
304275970Scy
305275970Scy	return queue_req_pointer(c, threadcopy);
306275970Scy}
307275970Scy
308293423Sdelphij/* --------------------------------------------------------------------
309293423Sdelphij * Wait for the 'incoming queue no longer empty' signal, lock the shared
310293423Sdelphij * structure and dequeue an item.
311293423Sdelphij */
312275970Scyblocking_pipe_header *
313275970Scyreceive_blocking_req_internal(
314275970Scy	blocking_child *	c
315275970Scy	)
316275970Scy{
317275970Scy	blocking_pipe_header *	req;
318293423Sdelphij	size_t			qhead, qtail;
319275970Scy
320293423Sdelphij	req = NULL;
321275970Scy	do {
322293423Sdelphij		/* wait for tickle from the producer side */
323293423Sdelphij		wait_for_sem(c->workitems_pending, NULL);
324275970Scy
325293423Sdelphij		/* >>>> ACCESS LOCKING STARTS >>>> */
326293423Sdelphij		wait_for_sem(c->accesslock, NULL);
327293423Sdelphij		qhead = c->head_workitem;
328293423Sdelphij		do {
329293423Sdelphij			qtail = c->tail_workitem;
330293423Sdelphij			if (qhead == qtail)
331293423Sdelphij				break;
332293423Sdelphij			c->tail_workitem = qtail + 1;
333293423Sdelphij			qtail %= c->workitems_alloc;
334293423Sdelphij			req = c->workitems[qtail];
335293423Sdelphij			c->workitems[qtail] = NULL;
336293423Sdelphij		} while (NULL == req);
337293423Sdelphij		tickle_sem(c->accesslock);
338293423Sdelphij		/* <<<< ACCESS LOCKING ENDS <<<< */
339293423Sdelphij
340293423Sdelphij	} while (NULL == req);
341293423Sdelphij
342275970Scy	INSIST(NULL != req);
343275970Scy	if (CHILD_EXIT_REQ == req) {	/* idled out */
344275970Scy		send_blocking_resp_internal(c, CHILD_GONE_RESP);
345275970Scy		req = NULL;
346275970Scy	}
347275970Scy
348275970Scy	return req;
349275970Scy}
350275970Scy
351293423Sdelphij/* --------------------------------------------------------------------
352293423Sdelphij * Push a response into the return queue and eventually tickle the
353293423Sdelphij * receiver.
354293423Sdelphij */
355275970Scyint
356275970Scysend_blocking_resp_internal(
357275970Scy	blocking_child *	c,
358275970Scy	blocking_pipe_header *	resp
359275970Scy	)
360275970Scy{
361293423Sdelphij	size_t	qhead;
362293423Sdelphij	int	empty;
363293423Sdelphij
364293423Sdelphij	/* >>>> ACCESS LOCKING STARTS >>>> */
365293423Sdelphij	wait_for_sem(c->accesslock, NULL);
366293423Sdelphij	empty = ensure_workresp_empty_slot(c);
367293423Sdelphij	qhead = c->head_response;
368293423Sdelphij	c->responses[qhead % c->responses_alloc] = resp;
369293423Sdelphij	c->head_response = 1 + qhead;
370293423Sdelphij	tickle_sem(c->accesslock);
371293423Sdelphij	/* <<<< ACCESS LOCKING ENDS <<<< */
372275970Scy
373293423Sdelphij	/* queue consumer wake-up notification */
374293423Sdelphij	if (empty)
375293423Sdelphij	{
376293423Sdelphij#	    ifdef WORK_PIPE
377293423Sdelphij		write(c->resp_write_pipe, "", 1);
378293423Sdelphij#	    else
379293423Sdelphij		tickle_sem(c->responses_pending);
380293423Sdelphij#	    endif
381293423Sdelphij	}
382275970Scy	return 0;
383275970Scy}
384275970Scy
385275970Scy
386275970Scy#ifndef WORK_PIPE
387293423Sdelphij
388293423Sdelphij/* --------------------------------------------------------------------
389293423Sdelphij * Check if a (Windows-)hanndle to a semaphore is actually the same we
390293423Sdelphij * are using inside the sema wrapper.
391293423Sdelphij */
392293423Sdelphijstatic BOOL
393293423Sdelphijsame_os_sema(
394293423Sdelphij	const sem_ref	obj,
395293423Sdelphij	void*		osh
396293423Sdelphij	)
397293423Sdelphij{
398293423Sdelphij	return obj && osh && (obj->shnd == (HANDLE)osh);
399293423Sdelphij}
400293423Sdelphij
401293423Sdelphij/* --------------------------------------------------------------------
402293423Sdelphij * Find the shared context that associates to an OS handle and make sure
403293423Sdelphij * the data is dequeued and processed.
404293423Sdelphij */
405275970Scyvoid
406275970Scyhandle_blocking_resp_sem(
407275970Scy	void *	context
408275970Scy	)
409275970Scy{
410275970Scy	blocking_child *	c;
411275970Scy	u_int			idx;
412275970Scy
413275970Scy	c = NULL;
414275970Scy	for (idx = 0; idx < blocking_children_alloc; idx++) {
415275970Scy		c = blocking_children[idx];
416293423Sdelphij		if (c != NULL &&
417293423Sdelphij			c->thread_ref != NULL &&
418293423Sdelphij			same_os_sema(c->responses_pending, context))
419275970Scy			break;
420275970Scy	}
421275970Scy	if (idx < blocking_children_alloc)
422275970Scy		process_blocking_resp(c);
423275970Scy}
424275970Scy#endif	/* !WORK_PIPE */
425275970Scy
426293423Sdelphij/* --------------------------------------------------------------------
427293423Sdelphij * Fetch the next response from the return queue. In case of signalling
428293423Sdelphij * via pipe, make sure the pipe is flushed, too.
429293423Sdelphij */
430275970Scyblocking_pipe_header *
431275970Scyreceive_blocking_resp_internal(
432275970Scy	blocking_child *	c
433275970Scy	)
434275970Scy{
435275970Scy	blocking_pipe_header *	removed;
436293423Sdelphij	size_t			qhead, qtail, slot;
437293423Sdelphij
438275970Scy#ifdef WORK_PIPE
439275970Scy	int			rc;
440275970Scy	char			scratch[32];
441275970Scy
442293423Sdelphij	do
443275970Scy		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
444293423Sdelphij	while (-1 == rc && EINTR == errno);
445275970Scy#endif
446293423Sdelphij
447293423Sdelphij	/* >>>> ACCESS LOCKING STARTS >>>> */
448293423Sdelphij	wait_for_sem(c->accesslock, NULL);
449293423Sdelphij	qhead = c->head_response;
450293423Sdelphij	qtail = c->tail_response;
451293423Sdelphij	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
452293423Sdelphij		slot = qtail % c->responses_alloc;
453293423Sdelphij		removed = c->responses[slot];
454293423Sdelphij		c->responses[slot] = NULL;
455293423Sdelphij	}
456293423Sdelphij	c->tail_response = qtail;
457293423Sdelphij	tickle_sem(c->accesslock);
458293423Sdelphij	/* <<<< ACCESS LOCKING ENDS <<<< */
459293423Sdelphij
460275970Scy	if (NULL != removed) {
461275970Scy		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
462275970Scy			     BLOCKING_RESP_MAGIC == removed->magic_sig);
463275970Scy	}
464275970Scy	if (CHILD_GONE_RESP == removed) {
465275970Scy		cleanup_after_child(c);
466275970Scy		removed = NULL;
467275970Scy	}
468275970Scy
469275970Scy	return removed;
470275970Scy}
471275970Scy
472293423Sdelphij/* --------------------------------------------------------------------
473293423Sdelphij * Light up a new worker.
474293423Sdelphij */
475275970Scystatic void
476275970Scystart_blocking_thread(
477275970Scy	blocking_child *	c
478275970Scy	)
479275970Scy{
480275970Scy
481275970Scy	DEBUG_INSIST(!c->reusable);
482275970Scy
483275970Scy	prepare_child_sems(c);
484275970Scy	start_blocking_thread_internal(c);
485275970Scy}
486275970Scy
487293423Sdelphij/* --------------------------------------------------------------------
488293423Sdelphij * Create a worker thread. There are several differences between POSIX
489293423Sdelphij * and Windows, of course -- most notably the Windows thread is no
490293423Sdelphij * detached thread, and we keep the handle around until we want to get
491293423Sdelphij * rid of the thread. The notification scheme also differs: Windows
492293423Sdelphij * makes use of semaphores in both directions, POSIX uses a pipe for
493293423Sdelphij * integration with 'select()' or alike.
494293423Sdelphij */
495275970Scystatic void
496275970Scystart_blocking_thread_internal(
497275970Scy	blocking_child *	c
498275970Scy	)
499275970Scy#ifdef SYS_WINNT
500275970Scy{
501275970Scy	BOOL	resumed;
502275970Scy
503293423Sdelphij	c->thread_ref = NULL;
504293423Sdelphij	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
505293423Sdelphij	c->thr_table[0].thnd =
506275970Scy		(HANDLE)_beginthreadex(
507275970Scy			NULL,
508275970Scy			0,
509275970Scy			&blocking_thread,
510275970Scy			c,
511275970Scy			CREATE_SUSPENDED,
512293423Sdelphij			NULL);
513275970Scy
514293423Sdelphij	if (NULL == c->thr_table[0].thnd) {
515275970Scy		msyslog(LOG_ERR, "start blocking thread failed: %m");
516275970Scy		exit(-1);
517275970Scy	}
518275970Scy	/* remember the thread priority is only within the process class */
519293423Sdelphij	if (!SetThreadPriority(c->thr_table[0].thnd,
520275970Scy			       THREAD_PRIORITY_BELOW_NORMAL))
521275970Scy		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
522275970Scy
523293423Sdelphij	resumed = ResumeThread(c->thr_table[0].thnd);
524275970Scy	DEBUG_INSIST(resumed);
525293423Sdelphij	c->thread_ref = &c->thr_table[0];
526275970Scy}
527275970Scy#else	/* pthreads start_blocking_thread_internal() follows */
528275970Scy{
529275970Scy# ifdef NEED_PTHREAD_INIT
530275970Scy	static int	pthread_init_called;
531275970Scy# endif
532275970Scy	pthread_attr_t	thr_attr;
533275970Scy	int		rc;
534275970Scy	int		pipe_ends[2];	/* read then write */
535275970Scy	int		is_pipe;
536275970Scy	int		flags;
537294554Sdelphij	size_t		ostacksize;
538294554Sdelphij	size_t		nstacksize;
539275970Scy	sigset_t	saved_sig_mask;
540275970Scy
541293423Sdelphij	c->thread_ref = NULL;
542293423Sdelphij
543275970Scy# ifdef NEED_PTHREAD_INIT
544275970Scy	/*
545275970Scy	 * from lib/isc/unix/app.c:
546275970Scy	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
547275970Scy	 */
548275970Scy	if (!pthread_init_called) {
549275970Scy		pthread_init();
550275970Scy		pthread_init_called = TRUE;
551275970Scy	}
552275970Scy# endif
553275970Scy
554275970Scy	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
555275970Scy	if (0 != rc) {
556275970Scy		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
557275970Scy		exit(1);
558275970Scy	}
559275970Scy	c->resp_read_pipe = move_fd(pipe_ends[0]);
560275970Scy	c->resp_write_pipe = move_fd(pipe_ends[1]);
561275970Scy	c->ispipe = is_pipe;
562275970Scy	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
563275970Scy	if (-1 == flags) {
564275970Scy		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
565275970Scy		exit(1);
566275970Scy	}
567275970Scy	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
568275970Scy	if (-1 == rc) {
569275970Scy		msyslog(LOG_ERR,
570275970Scy			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
571275970Scy		exit(1);
572275970Scy	}
573275970Scy	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
574275970Scy	pthread_attr_init(&thr_attr);
575275970Scy	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
576275970Scy#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
577275970Scy    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
578294554Sdelphij	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
579294554Sdelphij	if (0 != rc) {
580275970Scy		msyslog(LOG_ERR,
581294554Sdelphij			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
582294554Sdelphij			strerror(rc));
583294554Sdelphij	} else {
584294554Sdelphij		if (ostacksize < THREAD_MINSTACKSIZE)
585294554Sdelphij			nstacksize = THREAD_MINSTACKSIZE;
586294554Sdelphij		else if (ostacksize > THREAD_MAXSTACKSIZE)
587294554Sdelphij			nstacksize = THREAD_MAXSTACKSIZE;
588294554Sdelphij		else
589294554Sdelphij			nstacksize = ostacksize;
590294554Sdelphij		if (nstacksize != ostacksize)
591294554Sdelphij			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
592294554Sdelphij		if (0 != rc)
593275970Scy			msyslog(LOG_ERR,
594294554Sdelphij				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
595294554Sdelphij				(u_long)ostacksize, (u_long)nstacksize,
596294554Sdelphij				strerror(rc));
597275970Scy	}
598275970Scy#else
599294554Sdelphij	UNUSED_ARG(nstacksize);
600294554Sdelphij	UNUSED_ARG(ostacksize);
601275970Scy#endif
602275970Scy#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
603275970Scy	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
604275970Scy#endif
605275970Scy	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
606275970Scy	block_thread_signals(&saved_sig_mask);
607293423Sdelphij	rc = pthread_create(&c->thr_table[0], &thr_attr,
608275970Scy			    &blocking_thread, c);
609275970Scy	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
610275970Scy	pthread_attr_destroy(&thr_attr);
611275970Scy	if (0 != rc) {
612294554Sdelphij		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
613294554Sdelphij			strerror(rc));
614275970Scy		exit(1);
615275970Scy	}
616293423Sdelphij	c->thread_ref = &c->thr_table[0];
617275970Scy}
618275970Scy#endif
619275970Scy
620293423Sdelphij/* --------------------------------------------------------------------
621275970Scy * block_thread_signals()
622275970Scy *
623275970Scy * Temporarily block signals used by ntpd main thread, so that signal
624275970Scy * mask inherited by child threads leaves them blocked.  Returns prior
625275970Scy * active signal mask via pmask, to be restored by the main thread
626275970Scy * after pthread_create().
627275970Scy */
628275970Scy#ifndef SYS_WINNT
629275970Scyvoid
630275970Scyblock_thread_signals(
631275970Scy	sigset_t *	pmask
632275970Scy	)
633275970Scy{
634275970Scy	sigset_t	block;
635275970Scy
636275970Scy	sigemptyset(&block);
637275970Scy# ifdef HAVE_SIGNALED_IO
638275970Scy#  ifdef SIGIO
639275970Scy	sigaddset(&block, SIGIO);
640275970Scy#  endif
641275970Scy#  ifdef SIGPOLL
642275970Scy	sigaddset(&block, SIGPOLL);
643275970Scy#  endif
644275970Scy# endif	/* HAVE_SIGNALED_IO */
645275970Scy	sigaddset(&block, SIGALRM);
646275970Scy	sigaddset(&block, MOREDEBUGSIG);
647275970Scy	sigaddset(&block, LESSDEBUGSIG);
648275970Scy# ifdef SIGDIE1
649275970Scy	sigaddset(&block, SIGDIE1);
650275970Scy# endif
651275970Scy# ifdef SIGDIE2
652275970Scy	sigaddset(&block, SIGDIE2);
653275970Scy# endif
654275970Scy# ifdef SIGDIE3
655275970Scy	sigaddset(&block, SIGDIE3);
656275970Scy# endif
657275970Scy# ifdef SIGDIE4
658275970Scy	sigaddset(&block, SIGDIE4);
659275970Scy# endif
660275970Scy# ifdef SIGBUS
661275970Scy	sigaddset(&block, SIGBUS);
662275970Scy# endif
663275970Scy	sigemptyset(pmask);
664275970Scy	pthread_sigmask(SIG_BLOCK, &block, pmask);
665275970Scy}
666275970Scy#endif	/* !SYS_WINNT */
667275970Scy
668275970Scy
669293423Sdelphij/* --------------------------------------------------------------------
670293423Sdelphij * Create & destroy semaphores. This is sufficiently different between
671293423Sdelphij * POSIX and Windows to warrant wrapper functions and close enough to
672293423Sdelphij * use the concept of synchronization via semaphore for all platforms.
673293423Sdelphij */
674293423Sdelphijstatic sem_ref
675293423Sdelphijcreate_sema(
676293423Sdelphij	sema_type*	semptr,
677293423Sdelphij	u_int		inival,
678293423Sdelphij	u_int		maxval)
679293423Sdelphij{
680293423Sdelphij#ifdef SYS_WINNT
681293423Sdelphij
682293423Sdelphij	long svini, svmax;
683293423Sdelphij	if (NULL != semptr) {
684293423Sdelphij		svini = (inival < LONG_MAX)
685293423Sdelphij		    ? (long)inival : LONG_MAX;
686293423Sdelphij		svmax = (maxval < LONG_MAX && maxval > 0)
687293423Sdelphij		    ? (long)maxval : LONG_MAX;
688293423Sdelphij		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
689293423Sdelphij		if (NULL == semptr->shnd)
690293423Sdelphij			semptr = NULL;
691293423Sdelphij	}
692293423Sdelphij
693293423Sdelphij#else
694293423Sdelphij
695293423Sdelphij	(void)maxval;
696293423Sdelphij	if (semptr && sem_init(semptr, FALSE, inival))
697293423Sdelphij		semptr = NULL;
698293423Sdelphij
699293423Sdelphij#endif
700293423Sdelphij
701293423Sdelphij	return semptr;
702293423Sdelphij}
703293423Sdelphij
704293423Sdelphij/* ------------------------------------------------------------------ */
705293423Sdelphijstatic sem_ref
706293423Sdelphijdelete_sema(
707293423Sdelphij	sem_ref obj)
708293423Sdelphij{
709293423Sdelphij
710293423Sdelphij#   ifdef SYS_WINNT
711293423Sdelphij
712293423Sdelphij	if (obj) {
713293423Sdelphij		if (obj->shnd)
714293423Sdelphij			CloseHandle(obj->shnd);
715293423Sdelphij		obj->shnd = NULL;
716293423Sdelphij	}
717293423Sdelphij
718293423Sdelphij#   else
719293423Sdelphij
720293423Sdelphij	if (obj)
721293423Sdelphij		sem_destroy(obj);
722293423Sdelphij
723293423Sdelphij#   endif
724293423Sdelphij
725293423Sdelphij	return NULL;
726293423Sdelphij}
727293423Sdelphij
728293423Sdelphij/* --------------------------------------------------------------------
729275970Scy * prepare_child_sems()
730275970Scy *
731293423Sdelphij * create sync & access semaphores
732275970Scy *
733293423Sdelphij * All semaphores are cleared, only the access semaphore has 1 unit.
734293423Sdelphij * Childs wait on 'workitems_pending', then grabs 'sema_access'
735293423Sdelphij * and dequeues jobs. When done, 'sema_access' is given one unit back.
736293423Sdelphij *
737293423Sdelphij * The producer grabs 'sema_access', manages the queue, restores
738293423Sdelphij * 'sema_access' and puts one unit into 'workitems_pending'.
739293423Sdelphij *
740293423Sdelphij * The story goes the same for the response queue.
741275970Scy */
742275970Scystatic void
743275970Scyprepare_child_sems(
744275970Scy	blocking_child *c
745275970Scy	)
746275970Scy{
747298695Sdelphij	if (NULL == worker_memlock)
748298695Sdelphij		worker_memlock = create_sema(&worker_mmutex, 1, 1);
749298695Sdelphij
750293423Sdelphij	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
751293423Sdelphij	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
752293423Sdelphij	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
753293423Sdelphij#   ifndef WORK_PIPE
754293423Sdelphij	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
755293423Sdelphij#   endif
756275970Scy}
757275970Scy
758293423Sdelphij/* --------------------------------------------------------------------
759293423Sdelphij * wait for semaphore. Where the wait can be interrupted, it will
760293423Sdelphij * internally resume -- When this function returns, there is either no
761293423Sdelphij * semaphore at all, a timeout occurred, or the caller could
762293423Sdelphij * successfully take a token from the semaphore.
763293423Sdelphij *
764293423Sdelphij * For untimed wait, not checking the result of this function at all is
765293423Sdelphij * definitely an option.
766293423Sdelphij */
767275970Scystatic int
768275970Scywait_for_sem(
769275970Scy	sem_ref			sem,
770275970Scy	struct timespec *	timeout		/* wall-clock */
771275970Scy	)
772275970Scy#ifdef SYS_WINNT
773275970Scy{
774275970Scy	struct timespec now;
775275970Scy	struct timespec delta;
776275970Scy	DWORD		msec;
777275970Scy	DWORD		rc;
778275970Scy
779293423Sdelphij	if (!(sem && sem->shnd)) {
780293423Sdelphij		errno = EINVAL;
781293423Sdelphij		return -1;
782293423Sdelphij	}
783293423Sdelphij
784275970Scy	if (NULL == timeout) {
785275970Scy		msec = INFINITE;
786275970Scy	} else {
787275970Scy		getclock(TIMEOFDAY, &now);
788275970Scy		delta = sub_tspec(*timeout, now);
789275970Scy		if (delta.tv_sec < 0) {
790275970Scy			msec = 0;
791275970Scy		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
792275970Scy			msec = INFINITE;
793275970Scy		} else {
794275970Scy			msec = 1000 * (DWORD)delta.tv_sec;
795275970Scy			msec += delta.tv_nsec / (1000 * 1000);
796275970Scy		}
797275970Scy	}
798293423Sdelphij	rc = WaitForSingleObject(sem->shnd, msec);
799275970Scy	if (WAIT_OBJECT_0 == rc)
800275970Scy		return 0;
801275970Scy	if (WAIT_TIMEOUT == rc) {
802275970Scy		errno = ETIMEDOUT;
803275970Scy		return -1;
804275970Scy	}
805275970Scy	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
806275970Scy	errno = EFAULT;
807275970Scy	return -1;
808275970Scy}
809275970Scy#else	/* pthreads wait_for_sem() follows */
810275970Scy{
811293423Sdelphij	int rc = -1;
812275970Scy
813293423Sdelphij	if (sem) do {
814293423Sdelphij			if (NULL == timeout)
815293423Sdelphij				rc = sem_wait(sem);
816293423Sdelphij			else
817293423Sdelphij				rc = sem_timedwait(sem, timeout);
818293423Sdelphij		} while (rc == -1 && errno == EINTR);
819275970Scy	else
820293423Sdelphij		errno = EINVAL;
821293423Sdelphij
822275970Scy	return rc;
823275970Scy}
824275970Scy#endif
825275970Scy
826293423Sdelphij/* --------------------------------------------------------------------
827293423Sdelphij * blocking_thread - thread functions have WINAPI (aka 'stdcall')
828293423Sdelphij * calling conventions under Windows and POSIX-defined signature
829293423Sdelphij * otherwise.
830275970Scy */
831275970Scy#ifdef SYS_WINNT
832293423Sdelphiju_int WINAPI
833275970Scy#else
834275970Scyvoid *
835275970Scy#endif
836275970Scyblocking_thread(
837275970Scy	void *	ThreadArg
838275970Scy	)
839275970Scy{
840275970Scy	blocking_child *c;
841275970Scy
842275970Scy	c = ThreadArg;
843275970Scy	exit_worker(blocking_child_common(c));
844275970Scy
845275970Scy	/* NOTREACHED */
846275970Scy	return 0;
847275970Scy}
848275970Scy
849293423Sdelphij/* --------------------------------------------------------------------
850275970Scy * req_child_exit() runs in the parent.
851293423Sdelphij *
852293423Sdelphij * This function is called from from the idle timer, too, and possibly
853293423Sdelphij * without a thread being there any longer. Since we have folded up our
854293423Sdelphij * tent in that case and all the semaphores are already gone, we simply
855293423Sdelphij * ignore this request in this case.
856293423Sdelphij *
857293423Sdelphij * Since the existence of the semaphores is controlled exclusively by
858293423Sdelphij * the parent, there's no risk of data race here.
859275970Scy */
860275970Scyint
861275970Scyreq_child_exit(
862275970Scy	blocking_child *c
863275970Scy	)
864275970Scy{
865293423Sdelphij	return (c->accesslock)
866293423Sdelphij	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
867293423Sdelphij	    : 0;
868275970Scy}
869275970Scy
870293423Sdelphij/* --------------------------------------------------------------------
871275970Scy * cleanup_after_child() runs in parent.
872275970Scy */
873275970Scystatic void
874275970Scycleanup_after_child(
875275970Scy	blocking_child *	c
876275970Scy	)
877275970Scy{
878275970Scy	DEBUG_INSIST(!c->reusable);
879293423Sdelphij
880293423Sdelphij#   ifdef SYS_WINNT
881293423Sdelphij	/* The thread was not created in detached state, so we better
882293423Sdelphij	 * clean up.
883293423Sdelphij	 */
884293423Sdelphij	if (c->thread_ref && c->thread_ref->thnd) {
885293423Sdelphij		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
886293423Sdelphij		INSIST(CloseHandle(c->thread_ref->thnd));
887293423Sdelphij		c->thread_ref->thnd = NULL;
888293423Sdelphij	}
889293423Sdelphij#   endif
890275970Scy	c->thread_ref = NULL;
891293423Sdelphij
892293423Sdelphij	/* remove semaphores and (if signalling vi IO) pipes */
893293423Sdelphij
894293423Sdelphij	c->accesslock           = delete_sema(c->accesslock);
895293423Sdelphij	c->workitems_pending    = delete_sema(c->workitems_pending);
896293423Sdelphij	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
897293423Sdelphij
898293423Sdelphij#   ifdef WORK_PIPE
899275970Scy	DEBUG_INSIST(-1 != c->resp_read_pipe);
900275970Scy	DEBUG_INSIST(-1 != c->resp_write_pipe);
901275970Scy	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
902275970Scy	close(c->resp_write_pipe);
903275970Scy	close(c->resp_read_pipe);
904275970Scy	c->resp_write_pipe = -1;
905275970Scy	c->resp_read_pipe = -1;
906293423Sdelphij#   else
907293423Sdelphij	DEBUG_INSIST(NULL != c->responses_pending);
908293423Sdelphij	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
909293423Sdelphij	c->responses_pending = delete_sema(c->responses_pending);
910293423Sdelphij#   endif
911293423Sdelphij
912293423Sdelphij	/* Is it necessary to check if there are pending requests and
913293423Sdelphij	 * responses? If so, and if there are, what to do with them?
914293423Sdelphij	 */
915293423Sdelphij
916293423Sdelphij	/* re-init buffer index sequencers */
917293423Sdelphij	c->head_workitem = 0;
918293423Sdelphij	c->tail_workitem = 0;
919293423Sdelphij	c->head_response = 0;
920293423Sdelphij	c->tail_response = 0;
921293423Sdelphij
922275970Scy	c->reusable = TRUE;
923275970Scy}
924275970Scy
925275970Scy
926275970Scy#else	/* !WORK_THREAD follows */
927275970Scychar work_thread_nonempty_compilation_unit;
928275970Scy#endif
929