work_thread.c revision 1.5
1/*	$NetBSD: work_thread.c,v 1.5 2016/05/01 23:32:00 christos Exp $	*/
2
3/*
4 * work_thread.c - threads implementation for blocking worker child.
5 */
6#include <config.h>
7#include "ntp_workimpl.h"
8
9#ifdef WORK_THREAD
10
11#include <stdio.h>
12#include <ctype.h>
13#include <signal.h>
14#ifndef SYS_WINNT
15#include <pthread.h>
16#endif
17
18#include "ntp_stdlib.h"
19#include "ntp_malloc.h"
20#include "ntp_syslog.h"
21#include "ntpd.h"
22#include "ntp_io.h"
23#include "ntp_assert.h"
24#include "ntp_unixtime.h"
25#include "timespecops.h"
26#include "ntp_worker.h"
27
28#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
29#define CHILD_GONE_RESP	CHILD_EXIT_REQ
30/* Queue size increments:
31 * The request queue grows a bit faster than the response queue -- the
32 * deamon can push requests and pull results faster on avarage than the
33 * worker can process requests and push results...  If this really pays
34 * off is debatable.
35 */
36#define WORKITEMS_ALLOC_INC	16
37#define RESPONSES_ALLOC_INC	4
38
39/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
40 * set the maximum to 256kB. If the minimum goes below the
41 * system-defined minimum stack size, we have to adjust accordingly.
42 */
43#ifndef THREAD_MINSTACKSIZE
44# define THREAD_MINSTACKSIZE	(64U * 1024)
45#endif
46#ifndef __sun
47#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
48# undef THREAD_MINSTACKSIZE
49# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
50#endif
51#endif
52
53#ifndef THREAD_MAXSTACKSIZE
54# define THREAD_MAXSTACKSIZE	(256U * 1024)
55#endif
56#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
57# undef  THREAD_MAXSTACKSIZE
58# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
59#endif
60
61
62#ifdef SYS_WINNT
63
64# define thread_exit(c)	_endthreadex(c)
65# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
66u_int	WINAPI	blocking_thread(void *);
67static BOOL	same_os_sema(const sem_ref obj, void * osobj);
68
69#else
70
71# define thread_exit(c)	pthread_exit((void*)(size_t)(c))
72# define tickle_sem	sem_post
73void *		blocking_thread(void *);
74static	void	block_thread_signals(sigset_t *);
75
76#endif
77
78#ifdef WORK_PIPE
79addremove_io_fd_func		addremove_io_fd;
80#else
81addremove_io_semaphore_func	addremove_io_semaphore;
82#endif
83
84static	void	start_blocking_thread(blocking_child *);
85static	void	start_blocking_thread_internal(blocking_child *);
86static	void	prepare_child_sems(blocking_child *);
87static	int	wait_for_sem(sem_ref, struct timespec *);
88static	int	ensure_workitems_empty_slot(blocking_child *);
89static	int	ensure_workresp_empty_slot(blocking_child *);
90static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
91static	void	cleanup_after_child(blocking_child *);
92
93static sema_type worker_mmutex;
94static sem_ref   worker_memlock;
95
96/* --------------------------------------------------------------------
97 * locking the global worker state table (and other global stuff)
98 */
99void
100worker_global_lock(
101	int inOrOut)
102{
103	if (worker_memlock) {
104		if (inOrOut)
105			wait_for_sem(worker_memlock, NULL);
106		else
107			tickle_sem(worker_memlock);
108	}
109}
110
111/* --------------------------------------------------------------------
112 * implementation isolation wrapper
113 */
114void
115exit_worker(
116	int	exitcode
117	)
118{
119	thread_exit(exitcode);	/* see #define thread_exit */
120}
121
122/* --------------------------------------------------------------------
123 * sleep for a given time or until the wakup semaphore is tickled.
124 */
125int
126worker_sleep(
127	blocking_child *	c,
128	time_t			seconds
129	)
130{
131	struct timespec	until;
132	int		rc;
133
134# ifdef HAVE_CLOCK_GETTIME
135	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
136		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
137		return -1;
138	}
139# else
140	if (0 != getclock(TIMEOFDAY, &until)) {
141		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
142		return -1;
143	}
144# endif
145	until.tv_sec += seconds;
146	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
147	if (0 == rc)
148		return -1;
149	if (-1 == rc && ETIMEDOUT == errno)
150		return 0;
151	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
152	return -1;
153}
154
155
156/* --------------------------------------------------------------------
157 * Wake up a worker that takes a nap.
158 */
159void
160interrupt_worker_sleep(void)
161{
162	u_int			idx;
163	blocking_child *	c;
164
165	for (idx = 0; idx < blocking_children_alloc; idx++) {
166		c = blocking_children[idx];
167		if (NULL == c || NULL == c->wake_scheduled_sleep)
168			continue;
169		tickle_sem(c->wake_scheduled_sleep);
170	}
171}
172
173/* --------------------------------------------------------------------
174 * Make sure there is an empty slot at the head of the request
175 * queue. Tell if the queue is currently empty.
176 */
177static int
178ensure_workitems_empty_slot(
179	blocking_child *c
180	)
181{
182	/*
183	** !!! PRECONDITION: caller holds access lock!
184	**
185	** This simply tries to increase the size of the buffer if it
186	** becomes full. The resize operation does *not* maintain the
187	** order of requests, but that should be irrelevant since the
188	** processing is considered asynchronous anyway.
189	**
190	** Return if the buffer is currently empty.
191	*/
192
193	static const size_t each =
194	    sizeof(blocking_children[0]->workitems[0]);
195
196	size_t	new_alloc;
197	size_t  slots_used;
198	size_t	sidx;
199
200	slots_used = c->head_workitem - c->tail_workitem;
201	if (slots_used >= c->workitems_alloc) {
202		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
203		c->workitems = erealloc(c->workitems, new_alloc * each);
204		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
205		    c->workitems[sidx] = NULL;
206		c->tail_workitem   = 0;
207		c->head_workitem   = c->workitems_alloc;
208		c->workitems_alloc = new_alloc;
209	}
210	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
211	return (0 == slots_used);
212}
213
214/* --------------------------------------------------------------------
215 * Make sure there is an empty slot at the head of the response
216 * queue. Tell if the queue is currently empty.
217 */
218static int
219ensure_workresp_empty_slot(
220	blocking_child *c
221	)
222{
223	/*
224	** !!! PRECONDITION: caller holds access lock!
225	**
226	** Works like the companion function above.
227	*/
228
229	static const size_t each =
230	    sizeof(blocking_children[0]->responses[0]);
231
232	size_t	new_alloc;
233	size_t  slots_used;
234	size_t	sidx;
235
236	slots_used = c->head_response - c->tail_response;
237	if (slots_used >= c->responses_alloc) {
238		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
239		c->responses = erealloc(c->responses, new_alloc * each);
240		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
241		    c->responses[sidx] = NULL;
242		c->tail_response   = 0;
243		c->head_response   = c->responses_alloc;
244		c->responses_alloc = new_alloc;
245	}
246	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
247	return (0 == slots_used);
248}
249
250
251/* --------------------------------------------------------------------
252 * queue_req_pointer() - append a work item or idle exit request to
253 *			 blocking_workitems[]. Employ proper locking.
254 */
255static int
256queue_req_pointer(
257	blocking_child	*	c,
258	blocking_pipe_header *	hdr
259	)
260{
261	size_t qhead;
262
263	/* >>>> ACCESS LOCKING STARTS >>>> */
264	wait_for_sem(c->accesslock, NULL);
265	ensure_workitems_empty_slot(c);
266	qhead = c->head_workitem;
267	c->workitems[qhead % c->workitems_alloc] = hdr;
268	c->head_workitem = 1 + qhead;
269	tickle_sem(c->accesslock);
270	/* <<<< ACCESS LOCKING ENDS <<<< */
271
272	/* queue consumer wake-up notification */
273	tickle_sem(c->workitems_pending);
274
275	return 0;
276}
277
278/* --------------------------------------------------------------------
279 * API function to make sure a worker is running, a proper private copy
280 * of the data is made, the data eneterd into the queue and the worker
281 * is signalled.
282 */
283int
284send_blocking_req_internal(
285	blocking_child *	c,
286	blocking_pipe_header *	hdr,
287	void *			data
288	)
289{
290	blocking_pipe_header *	threadcopy;
291	size_t			payload_octets;
292
293	REQUIRE(hdr != NULL);
294	REQUIRE(data != NULL);
295	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
296
297	if (hdr->octets <= sizeof(*hdr))
298		return 1;	/* failure */
299	payload_octets = hdr->octets - sizeof(*hdr);
300
301	if (NULL == c->thread_ref)
302		start_blocking_thread(c);
303	threadcopy = emalloc(hdr->octets);
304	memcpy(threadcopy, hdr, sizeof(*hdr));
305	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
306
307	return queue_req_pointer(c, threadcopy);
308}
309
310/* --------------------------------------------------------------------
311 * Wait for the 'incoming queue no longer empty' signal, lock the shared
312 * structure and dequeue an item.
313 */
314blocking_pipe_header *
315receive_blocking_req_internal(
316	blocking_child *	c
317	)
318{
319	blocking_pipe_header *	req;
320	size_t			qhead, qtail;
321
322	req = NULL;
323	do {
324		/* wait for tickle from the producer side */
325		wait_for_sem(c->workitems_pending, NULL);
326
327		/* >>>> ACCESS LOCKING STARTS >>>> */
328		wait_for_sem(c->accesslock, NULL);
329		qhead = c->head_workitem;
330		do {
331			qtail = c->tail_workitem;
332			if (qhead == qtail)
333				break;
334			c->tail_workitem = qtail + 1;
335			qtail %= c->workitems_alloc;
336			req = c->workitems[qtail];
337			c->workitems[qtail] = NULL;
338		} while (NULL == req);
339		tickle_sem(c->accesslock);
340		/* <<<< ACCESS LOCKING ENDS <<<< */
341
342	} while (NULL == req);
343
344	INSIST(NULL != req);
345	if (CHILD_EXIT_REQ == req) {	/* idled out */
346		send_blocking_resp_internal(c, CHILD_GONE_RESP);
347		req = NULL;
348	}
349
350	return req;
351}
352
353/* --------------------------------------------------------------------
354 * Push a response into the return queue and eventually tickle the
355 * receiver.
356 */
357int
358send_blocking_resp_internal(
359	blocking_child *	c,
360	blocking_pipe_header *	resp
361	)
362{
363	size_t	qhead;
364	int	empty;
365
366	/* >>>> ACCESS LOCKING STARTS >>>> */
367	wait_for_sem(c->accesslock, NULL);
368	empty = ensure_workresp_empty_slot(c);
369	qhead = c->head_response;
370	c->responses[qhead % c->responses_alloc] = resp;
371	c->head_response = 1 + qhead;
372	tickle_sem(c->accesslock);
373	/* <<<< ACCESS LOCKING ENDS <<<< */
374
375	/* queue consumer wake-up notification */
376	if (empty)
377	{
378#	    ifdef WORK_PIPE
379		write(c->resp_write_pipe, "", 1);
380#	    else
381		tickle_sem(c->responses_pending);
382#	    endif
383	}
384	return 0;
385}
386
387
388#ifndef WORK_PIPE
389
390/* --------------------------------------------------------------------
391 * Check if a (Windows-)hanndle to a semaphore is actually the same we
392 * are using inside the sema wrapper.
393 */
394static BOOL
395same_os_sema(
396	const sem_ref	obj,
397	void*		osh
398	)
399{
400	return obj && osh && (obj->shnd == (HANDLE)osh);
401}
402
403/* --------------------------------------------------------------------
404 * Find the shared context that associates to an OS handle and make sure
405 * the data is dequeued and processed.
406 */
407void
408handle_blocking_resp_sem(
409	void *	context
410	)
411{
412	blocking_child *	c;
413	u_int			idx;
414
415	c = NULL;
416	for (idx = 0; idx < blocking_children_alloc; idx++) {
417		c = blocking_children[idx];
418		if (c != NULL &&
419			c->thread_ref != NULL &&
420			same_os_sema(c->responses_pending, context))
421			break;
422	}
423	if (idx < blocking_children_alloc)
424		process_blocking_resp(c);
425}
426#endif	/* !WORK_PIPE */
427
428/* --------------------------------------------------------------------
429 * Fetch the next response from the return queue. In case of signalling
430 * via pipe, make sure the pipe is flushed, too.
431 */
432blocking_pipe_header *
433receive_blocking_resp_internal(
434	blocking_child *	c
435	)
436{
437	blocking_pipe_header *	removed;
438	size_t			qhead, qtail, slot;
439
440#ifdef WORK_PIPE
441	int			rc;
442	char			scratch[32];
443
444	do
445		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
446	while (-1 == rc && EINTR == errno);
447#endif
448
449	/* >>>> ACCESS LOCKING STARTS >>>> */
450	wait_for_sem(c->accesslock, NULL);
451	qhead = c->head_response;
452	qtail = c->tail_response;
453	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
454		slot = qtail % c->responses_alloc;
455		removed = c->responses[slot];
456		c->responses[slot] = NULL;
457	}
458	c->tail_response = qtail;
459	tickle_sem(c->accesslock);
460	/* <<<< ACCESS LOCKING ENDS <<<< */
461
462	if (NULL != removed) {
463		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
464			     BLOCKING_RESP_MAGIC == removed->magic_sig);
465	}
466	if (CHILD_GONE_RESP == removed) {
467		cleanup_after_child(c);
468		removed = NULL;
469	}
470
471	return removed;
472}
473
474/* --------------------------------------------------------------------
475 * Light up a new worker.
476 */
477static void
478start_blocking_thread(
479	blocking_child *	c
480	)
481{
482
483	DEBUG_INSIST(!c->reusable);
484
485	prepare_child_sems(c);
486	start_blocking_thread_internal(c);
487}
488
489/* --------------------------------------------------------------------
490 * Create a worker thread. There are several differences between POSIX
491 * and Windows, of course -- most notably the Windows thread is no
492 * detached thread, and we keep the handle around until we want to get
493 * rid of the thread. The notification scheme also differs: Windows
494 * makes use of semaphores in both directions, POSIX uses a pipe for
495 * integration with 'select()' or alike.
496 */
497static void
498start_blocking_thread_internal(
499	blocking_child *	c
500	)
501#ifdef SYS_WINNT
502{
503	BOOL	resumed;
504
505	c->thread_ref = NULL;
506	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
507	c->thr_table[0].thnd =
508		(HANDLE)_beginthreadex(
509			NULL,
510			0,
511			&blocking_thread,
512			c,
513			CREATE_SUSPENDED,
514			NULL);
515
516	if (NULL == c->thr_table[0].thnd) {
517		msyslog(LOG_ERR, "start blocking thread failed: %m");
518		exit(-1);
519	}
520	/* remember the thread priority is only within the process class */
521	if (!SetThreadPriority(c->thr_table[0].thnd,
522			       THREAD_PRIORITY_BELOW_NORMAL))
523		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
524
525	resumed = ResumeThread(c->thr_table[0].thnd);
526	DEBUG_INSIST(resumed);
527	c->thread_ref = &c->thr_table[0];
528}
529#else	/* pthreads start_blocking_thread_internal() follows */
530{
531# ifdef NEED_PTHREAD_INIT
532	static int	pthread_init_called;
533# endif
534	pthread_attr_t	thr_attr;
535	int		rc;
536	int		pipe_ends[2];	/* read then write */
537	int		is_pipe;
538	int		flags;
539	size_t		ostacksize;
540	size_t		nstacksize;
541	sigset_t	saved_sig_mask;
542
543	c->thread_ref = NULL;
544
545# ifdef NEED_PTHREAD_INIT
546	/*
547	 * from lib/isc/unix/app.c:
548	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
549	 */
550	if (!pthread_init_called) {
551		pthread_init();
552		pthread_init_called = TRUE;
553	}
554# endif
555
556	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
557	if (0 != rc) {
558		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
559		exit(1);
560	}
561	c->resp_read_pipe = move_fd(pipe_ends[0]);
562	c->resp_write_pipe = move_fd(pipe_ends[1]);
563	c->ispipe = is_pipe;
564	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
565	if (-1 == flags) {
566		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
567		exit(1);
568	}
569	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
570	if (-1 == rc) {
571		msyslog(LOG_ERR,
572			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
573		exit(1);
574	}
575	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
576	pthread_attr_init(&thr_attr);
577	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
578#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
579    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
580	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
581	if (0 != rc) {
582		msyslog(LOG_ERR,
583			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
584			strerror(rc));
585	} else {
586		if (ostacksize < THREAD_MINSTACKSIZE)
587			nstacksize = THREAD_MINSTACKSIZE;
588		else if (ostacksize > THREAD_MAXSTACKSIZE)
589			nstacksize = THREAD_MAXSTACKSIZE;
590		else
591			nstacksize = ostacksize;
592		if (nstacksize != ostacksize)
593			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
594		if (0 != rc)
595			msyslog(LOG_ERR,
596				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
597				(u_long)ostacksize, (u_long)nstacksize,
598				strerror(rc));
599	}
600#else
601	UNUSED_ARG(nstacksize);
602	UNUSED_ARG(ostacksize);
603#endif
604#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
605	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
606#endif
607	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
608	block_thread_signals(&saved_sig_mask);
609	rc = pthread_create(&c->thr_table[0], &thr_attr,
610			    &blocking_thread, c);
611	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
612	pthread_attr_destroy(&thr_attr);
613	if (0 != rc) {
614		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
615			strerror(rc));
616		exit(1);
617	}
618	c->thread_ref = &c->thr_table[0];
619}
620#endif
621
622/* --------------------------------------------------------------------
623 * block_thread_signals()
624 *
625 * Temporarily block signals used by ntpd main thread, so that signal
626 * mask inherited by child threads leaves them blocked.  Returns prior
627 * active signal mask via pmask, to be restored by the main thread
628 * after pthread_create().
629 */
630#ifndef SYS_WINNT
631void
632block_thread_signals(
633	sigset_t *	pmask
634	)
635{
636	sigset_t	block;
637
638	sigemptyset(&block);
639# ifdef HAVE_SIGNALED_IO
640#  ifdef SIGIO
641	sigaddset(&block, SIGIO);
642#  endif
643#  ifdef SIGPOLL
644	sigaddset(&block, SIGPOLL);
645#  endif
646# endif	/* HAVE_SIGNALED_IO */
647	sigaddset(&block, SIGALRM);
648	sigaddset(&block, MOREDEBUGSIG);
649	sigaddset(&block, LESSDEBUGSIG);
650# ifdef SIGDIE1
651	sigaddset(&block, SIGDIE1);
652# endif
653# ifdef SIGDIE2
654	sigaddset(&block, SIGDIE2);
655# endif
656# ifdef SIGDIE3
657	sigaddset(&block, SIGDIE3);
658# endif
659# ifdef SIGDIE4
660	sigaddset(&block, SIGDIE4);
661# endif
662# ifdef SIGBUS
663	sigaddset(&block, SIGBUS);
664# endif
665	sigemptyset(pmask);
666	pthread_sigmask(SIG_BLOCK, &block, pmask);
667}
668#endif	/* !SYS_WINNT */
669
670
671/* --------------------------------------------------------------------
672 * Create & destroy semaphores. This is sufficiently different between
673 * POSIX and Windows to warrant wrapper functions and close enough to
674 * use the concept of synchronization via semaphore for all platforms.
675 */
676static sem_ref
677create_sema(
678	sema_type*	semptr,
679	u_int		inival,
680	u_int		maxval)
681{
682#ifdef SYS_WINNT
683
684	long svini, svmax;
685	if (NULL != semptr) {
686		svini = (inival < LONG_MAX)
687		    ? (long)inival : LONG_MAX;
688		svmax = (maxval < LONG_MAX && maxval > 0)
689		    ? (long)maxval : LONG_MAX;
690		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
691		if (NULL == semptr->shnd)
692			semptr = NULL;
693	}
694
695#else
696
697	(void)maxval;
698	if (semptr && sem_init(semptr, FALSE, inival))
699		semptr = NULL;
700
701#endif
702
703	return semptr;
704}
705
706/* ------------------------------------------------------------------ */
707static sem_ref
708delete_sema(
709	sem_ref obj)
710{
711
712#   ifdef SYS_WINNT
713
714	if (obj) {
715		if (obj->shnd)
716			CloseHandle(obj->shnd);
717		obj->shnd = NULL;
718	}
719
720#   else
721
722	if (obj)
723		sem_destroy(obj);
724
725#   endif
726
727	return NULL;
728}
729
730/* --------------------------------------------------------------------
731 * prepare_child_sems()
732 *
733 * create sync & access semaphores
734 *
735 * All semaphores are cleared, only the access semaphore has 1 unit.
736 * Childs wait on 'workitems_pending', then grabs 'sema_access'
737 * and dequeues jobs. When done, 'sema_access' is given one unit back.
738 *
739 * The producer grabs 'sema_access', manages the queue, restores
740 * 'sema_access' and puts one unit into 'workitems_pending'.
741 *
742 * The story goes the same for the response queue.
743 */
744static void
745prepare_child_sems(
746	blocking_child *c
747	)
748{
749	if (NULL == worker_memlock)
750		worker_memlock = create_sema(&worker_mmutex, 1, 1);
751
752	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
753	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
754	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
755#   ifndef WORK_PIPE
756	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
757#   endif
758}
759
760/* --------------------------------------------------------------------
761 * wait for semaphore. Where the wait can be interrupted, it will
762 * internally resume -- When this function returns, there is either no
763 * semaphore at all, a timeout occurred, or the caller could
764 * successfully take a token from the semaphore.
765 *
766 * For untimed wait, not checking the result of this function at all is
767 * definitely an option.
768 */
769static int
770wait_for_sem(
771	sem_ref			sem,
772	struct timespec *	timeout		/* wall-clock */
773	)
774#ifdef SYS_WINNT
775{
776	struct timespec now;
777	struct timespec delta;
778	DWORD		msec;
779	DWORD		rc;
780
781	if (!(sem && sem->shnd)) {
782		errno = EINVAL;
783		return -1;
784	}
785
786	if (NULL == timeout) {
787		msec = INFINITE;
788	} else {
789		getclock(TIMEOFDAY, &now);
790		delta = sub_tspec(*timeout, now);
791		if (delta.tv_sec < 0) {
792			msec = 0;
793		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
794			msec = INFINITE;
795		} else {
796			msec = 1000 * (DWORD)delta.tv_sec;
797			msec += delta.tv_nsec / (1000 * 1000);
798		}
799	}
800	rc = WaitForSingleObject(sem->shnd, msec);
801	if (WAIT_OBJECT_0 == rc)
802		return 0;
803	if (WAIT_TIMEOUT == rc) {
804		errno = ETIMEDOUT;
805		return -1;
806	}
807	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
808	errno = EFAULT;
809	return -1;
810}
811#else	/* pthreads wait_for_sem() follows */
812{
813	int rc = -1;
814
815	if (sem) do {
816			if (NULL == timeout)
817				rc = sem_wait(sem);
818			else
819				rc = sem_timedwait(sem, timeout);
820		} while (rc == -1 && errno == EINTR);
821	else
822		errno = EINVAL;
823
824	return rc;
825}
826#endif
827
828/* --------------------------------------------------------------------
829 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
830 * calling conventions under Windows and POSIX-defined signature
831 * otherwise.
832 */
833#ifdef SYS_WINNT
834u_int WINAPI
835#else
836void *
837#endif
838blocking_thread(
839	void *	ThreadArg
840	)
841{
842	blocking_child *c;
843
844	c = ThreadArg;
845	exit_worker(blocking_child_common(c));
846
847	/* NOTREACHED */
848	return 0;
849}
850
851/* --------------------------------------------------------------------
852 * req_child_exit() runs in the parent.
853 *
854 * This function is called from from the idle timer, too, and possibly
855 * without a thread being there any longer. Since we have folded up our
856 * tent in that case and all the semaphores are already gone, we simply
857 * ignore this request in this case.
858 *
859 * Since the existence of the semaphores is controlled exclusively by
860 * the parent, there's no risk of data race here.
861 */
862int
863req_child_exit(
864	blocking_child *c
865	)
866{
867	return (c->accesslock)
868	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
869	    : 0;
870}
871
872/* --------------------------------------------------------------------
873 * cleanup_after_child() runs in parent.
874 */
875static void
876cleanup_after_child(
877	blocking_child *	c
878	)
879{
880	DEBUG_INSIST(!c->reusable);
881
882#   ifdef SYS_WINNT
883	/* The thread was not created in detached state, so we better
884	 * clean up.
885	 */
886	if (c->thread_ref && c->thread_ref->thnd) {
887		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
888		INSIST(CloseHandle(c->thread_ref->thnd));
889		c->thread_ref->thnd = NULL;
890	}
891#   endif
892	c->thread_ref = NULL;
893
894	/* remove semaphores and (if signalling vi IO) pipes */
895
896	c->accesslock           = delete_sema(c->accesslock);
897	c->workitems_pending    = delete_sema(c->workitems_pending);
898	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
899
900#   ifdef WORK_PIPE
901	DEBUG_INSIST(-1 != c->resp_read_pipe);
902	DEBUG_INSIST(-1 != c->resp_write_pipe);
903	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
904	close(c->resp_write_pipe);
905	close(c->resp_read_pipe);
906	c->resp_write_pipe = -1;
907	c->resp_read_pipe = -1;
908#   else
909	DEBUG_INSIST(NULL != c->responses_pending);
910	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
911	c->responses_pending = delete_sema(c->responses_pending);
912#   endif
913
914	/* Is it necessary to check if there are pending requests and
915	 * responses? If so, and if there are, what to do with them?
916	 */
917
918	/* re-init buffer index sequencers */
919	c->head_workitem = 0;
920	c->tail_workitem = 0;
921	c->head_response = 0;
922	c->tail_response = 0;
923
924	c->reusable = TRUE;
925}
926
927
928#else	/* !WORK_THREAD follows */
929char work_thread_nonempty_compilation_unit;
930#endif
931