1/*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4#include <config.h>
5#include "ntp_workimpl.h"
6
7#ifdef WORK_THREAD
8
9#include <stdio.h>
10#include <ctype.h>
11#include <signal.h>
12#ifndef SYS_WINNT
13#include <pthread.h>
14#endif
15
16#include "ntp_stdlib.h"
17#include "ntp_malloc.h"
18#include "ntp_syslog.h"
19#include "ntpd.h"
20#include "ntp_io.h"
21#include "ntp_assert.h"
22#include "ntp_unixtime.h"
23#include "timespecops.h"
24#include "ntp_worker.h"
25
26#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27#define CHILD_GONE_RESP	CHILD_EXIT_REQ
28/* Queue size increments:
29 * The request queue grows a bit faster than the response queue -- the
30 * deamon can push requests and pull results faster on avarage than the
31 * worker can process requests and push results...  If this really pays
32 * off is debatable.
33 */
34#define WORKITEMS_ALLOC_INC	16
35#define RESPONSES_ALLOC_INC	4
36
37/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38 * set the maximum to 256kB. If the minimum goes below the
39 * system-defined minimum stack size, we have to adjust accordingly.
40 */
41#ifndef THREAD_MINSTACKSIZE
42# define THREAD_MINSTACKSIZE	(64U * 1024)
43#endif
44#ifndef __sun
45#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46# undef THREAD_MINSTACKSIZE
47# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48#endif
49#endif
50
51#ifndef THREAD_MAXSTACKSIZE
52# define THREAD_MAXSTACKSIZE	(256U * 1024)
53#endif
54#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55# undef  THREAD_MAXSTACKSIZE
56# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57#endif
58
59
60#ifdef SYS_WINNT
61
62# define thread_exit(c)	_endthreadex(c)
63# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
64u_int	WINAPI	blocking_thread(void *);
65static BOOL	same_os_sema(const sem_ref obj, void * osobj);
66
67#else
68
69# define thread_exit(c)	pthread_exit((void*)(size_t)(c))
70# define tickle_sem	sem_post
71void *		blocking_thread(void *);
72static	void	block_thread_signals(sigset_t *);
73
74#endif
75
76#ifdef WORK_PIPE
77addremove_io_fd_func		addremove_io_fd;
78#else
79addremove_io_semaphore_func	addremove_io_semaphore;
80#endif
81
82static	void	start_blocking_thread(blocking_child *);
83static	void	start_blocking_thread_internal(blocking_child *);
84static	void	prepare_child_sems(blocking_child *);
85static	int	wait_for_sem(sem_ref, struct timespec *);
86static	int	ensure_workitems_empty_slot(blocking_child *);
87static	int	ensure_workresp_empty_slot(blocking_child *);
88static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
89static	void	cleanup_after_child(blocking_child *);
90
91static sema_type worker_mmutex;
92static sem_ref   worker_memlock;
93
94/* --------------------------------------------------------------------
95 * locking the global worker state table (and other global stuff)
96 */
97void
98worker_global_lock(
99	int inOrOut)
100{
101	if (worker_memlock) {
102		if (inOrOut)
103			wait_for_sem(worker_memlock, NULL);
104		else
105			tickle_sem(worker_memlock);
106	}
107}
108
109/* --------------------------------------------------------------------
110 * implementation isolation wrapper
111 */
112void
113exit_worker(
114	int	exitcode
115	)
116{
117	thread_exit(exitcode);	/* see #define thread_exit */
118}
119
120/* --------------------------------------------------------------------
121 * sleep for a given time or until the wakup semaphore is tickled.
122 */
123int
124worker_sleep(
125	blocking_child *	c,
126	time_t			seconds
127	)
128{
129	struct timespec	until;
130	int		rc;
131
132# ifdef HAVE_CLOCK_GETTIME
133	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
134		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
135		return -1;
136	}
137# else
138	if (0 != getclock(TIMEOFDAY, &until)) {
139		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
140		return -1;
141	}
142# endif
143	until.tv_sec += seconds;
144	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
145	if (0 == rc)
146		return -1;
147	if (-1 == rc && ETIMEDOUT == errno)
148		return 0;
149	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
150	return -1;
151}
152
153
154/* --------------------------------------------------------------------
155 * Wake up a worker that takes a nap.
156 */
157void
158interrupt_worker_sleep(void)
159{
160	u_int			idx;
161	blocking_child *	c;
162
163	for (idx = 0; idx < blocking_children_alloc; idx++) {
164		c = blocking_children[idx];
165		if (NULL == c || NULL == c->wake_scheduled_sleep)
166			continue;
167		tickle_sem(c->wake_scheduled_sleep);
168	}
169}
170
171/* --------------------------------------------------------------------
172 * Make sure there is an empty slot at the head of the request
173 * queue. Tell if the queue is currently empty.
174 */
175static int
176ensure_workitems_empty_slot(
177	blocking_child *c
178	)
179{
180	/*
181	** !!! PRECONDITION: caller holds access lock!
182	**
183	** This simply tries to increase the size of the buffer if it
184	** becomes full. The resize operation does *not* maintain the
185	** order of requests, but that should be irrelevant since the
186	** processing is considered asynchronous anyway.
187	**
188	** Return if the buffer is currently empty.
189	*/
190
191	static const size_t each =
192	    sizeof(blocking_children[0]->workitems[0]);
193
194	size_t	new_alloc;
195	size_t  slots_used;
196	size_t	sidx;
197
198	slots_used = c->head_workitem - c->tail_workitem;
199	if (slots_used >= c->workitems_alloc) {
200		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
201		c->workitems = erealloc(c->workitems, new_alloc * each);
202		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
203		    c->workitems[sidx] = NULL;
204		c->tail_workitem   = 0;
205		c->head_workitem   = c->workitems_alloc;
206		c->workitems_alloc = new_alloc;
207	}
208	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
209	return (0 == slots_used);
210}
211
212/* --------------------------------------------------------------------
213 * Make sure there is an empty slot at the head of the response
214 * queue. Tell if the queue is currently empty.
215 */
216static int
217ensure_workresp_empty_slot(
218	blocking_child *c
219	)
220{
221	/*
222	** !!! PRECONDITION: caller holds access lock!
223	**
224	** Works like the companion function above.
225	*/
226
227	static const size_t each =
228	    sizeof(blocking_children[0]->responses[0]);
229
230	size_t	new_alloc;
231	size_t  slots_used;
232	size_t	sidx;
233
234	slots_used = c->head_response - c->tail_response;
235	if (slots_used >= c->responses_alloc) {
236		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
237		c->responses = erealloc(c->responses, new_alloc * each);
238		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
239		    c->responses[sidx] = NULL;
240		c->tail_response   = 0;
241		c->head_response   = c->responses_alloc;
242		c->responses_alloc = new_alloc;
243	}
244	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
245	return (0 == slots_used);
246}
247
248
249/* --------------------------------------------------------------------
250 * queue_req_pointer() - append a work item or idle exit request to
251 *			 blocking_workitems[]. Employ proper locking.
252 */
253static int
254queue_req_pointer(
255	blocking_child	*	c,
256	blocking_pipe_header *	hdr
257	)
258{
259	size_t qhead;
260
261	/* >>>> ACCESS LOCKING STARTS >>>> */
262	wait_for_sem(c->accesslock, NULL);
263	ensure_workitems_empty_slot(c);
264	qhead = c->head_workitem;
265	c->workitems[qhead % c->workitems_alloc] = hdr;
266	c->head_workitem = 1 + qhead;
267	tickle_sem(c->accesslock);
268	/* <<<< ACCESS LOCKING ENDS <<<< */
269
270	/* queue consumer wake-up notification */
271	tickle_sem(c->workitems_pending);
272
273	return 0;
274}
275
276/* --------------------------------------------------------------------
277 * API function to make sure a worker is running, a proper private copy
278 * of the data is made, the data eneterd into the queue and the worker
279 * is signalled.
280 */
281int
282send_blocking_req_internal(
283	blocking_child *	c,
284	blocking_pipe_header *	hdr,
285	void *			data
286	)
287{
288	blocking_pipe_header *	threadcopy;
289	size_t			payload_octets;
290
291	REQUIRE(hdr != NULL);
292	REQUIRE(data != NULL);
293	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
294
295	if (hdr->octets <= sizeof(*hdr))
296		return 1;	/* failure */
297	payload_octets = hdr->octets - sizeof(*hdr);
298
299	if (NULL == c->thread_ref)
300		start_blocking_thread(c);
301	threadcopy = emalloc(hdr->octets);
302	memcpy(threadcopy, hdr, sizeof(*hdr));
303	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
304
305	return queue_req_pointer(c, threadcopy);
306}
307
308/* --------------------------------------------------------------------
309 * Wait for the 'incoming queue no longer empty' signal, lock the shared
310 * structure and dequeue an item.
311 */
312blocking_pipe_header *
313receive_blocking_req_internal(
314	blocking_child *	c
315	)
316{
317	blocking_pipe_header *	req;
318	size_t			qhead, qtail;
319
320	req = NULL;
321	do {
322		/* wait for tickle from the producer side */
323		wait_for_sem(c->workitems_pending, NULL);
324
325		/* >>>> ACCESS LOCKING STARTS >>>> */
326		wait_for_sem(c->accesslock, NULL);
327		qhead = c->head_workitem;
328		do {
329			qtail = c->tail_workitem;
330			if (qhead == qtail)
331				break;
332			c->tail_workitem = qtail + 1;
333			qtail %= c->workitems_alloc;
334			req = c->workitems[qtail];
335			c->workitems[qtail] = NULL;
336		} while (NULL == req);
337		tickle_sem(c->accesslock);
338		/* <<<< ACCESS LOCKING ENDS <<<< */
339
340	} while (NULL == req);
341
342	INSIST(NULL != req);
343	if (CHILD_EXIT_REQ == req) {	/* idled out */
344		send_blocking_resp_internal(c, CHILD_GONE_RESP);
345		req = NULL;
346	}
347
348	return req;
349}
350
351/* --------------------------------------------------------------------
352 * Push a response into the return queue and eventually tickle the
353 * receiver.
354 */
355int
356send_blocking_resp_internal(
357	blocking_child *	c,
358	blocking_pipe_header *	resp
359	)
360{
361	size_t	qhead;
362	int	empty;
363
364	/* >>>> ACCESS LOCKING STARTS >>>> */
365	wait_for_sem(c->accesslock, NULL);
366	empty = ensure_workresp_empty_slot(c);
367	qhead = c->head_response;
368	c->responses[qhead % c->responses_alloc] = resp;
369	c->head_response = 1 + qhead;
370	tickle_sem(c->accesslock);
371	/* <<<< ACCESS LOCKING ENDS <<<< */
372
373	/* queue consumer wake-up notification */
374	if (empty)
375	{
376#	    ifdef WORK_PIPE
377		write(c->resp_write_pipe, "", 1);
378#	    else
379		tickle_sem(c->responses_pending);
380#	    endif
381	}
382	return 0;
383}
384
385
386#ifndef WORK_PIPE
387
388/* --------------------------------------------------------------------
389 * Check if a (Windows-)hanndle to a semaphore is actually the same we
390 * are using inside the sema wrapper.
391 */
392static BOOL
393same_os_sema(
394	const sem_ref	obj,
395	void*		osh
396	)
397{
398	return obj && osh && (obj->shnd == (HANDLE)osh);
399}
400
401/* --------------------------------------------------------------------
402 * Find the shared context that associates to an OS handle and make sure
403 * the data is dequeued and processed.
404 */
405void
406handle_blocking_resp_sem(
407	void *	context
408	)
409{
410	blocking_child *	c;
411	u_int			idx;
412
413	c = NULL;
414	for (idx = 0; idx < blocking_children_alloc; idx++) {
415		c = blocking_children[idx];
416		if (c != NULL &&
417			c->thread_ref != NULL &&
418			same_os_sema(c->responses_pending, context))
419			break;
420	}
421	if (idx < blocking_children_alloc)
422		process_blocking_resp(c);
423}
424#endif	/* !WORK_PIPE */
425
426/* --------------------------------------------------------------------
427 * Fetch the next response from the return queue. In case of signalling
428 * via pipe, make sure the pipe is flushed, too.
429 */
430blocking_pipe_header *
431receive_blocking_resp_internal(
432	blocking_child *	c
433	)
434{
435	blocking_pipe_header *	removed;
436	size_t			qhead, qtail, slot;
437
438#ifdef WORK_PIPE
439	int			rc;
440	char			scratch[32];
441
442	do
443		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
444	while (-1 == rc && EINTR == errno);
445#endif
446
447	/* >>>> ACCESS LOCKING STARTS >>>> */
448	wait_for_sem(c->accesslock, NULL);
449	qhead = c->head_response;
450	qtail = c->tail_response;
451	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
452		slot = qtail % c->responses_alloc;
453		removed = c->responses[slot];
454		c->responses[slot] = NULL;
455	}
456	c->tail_response = qtail;
457	tickle_sem(c->accesslock);
458	/* <<<< ACCESS LOCKING ENDS <<<< */
459
460	if (NULL != removed) {
461		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
462			     BLOCKING_RESP_MAGIC == removed->magic_sig);
463	}
464	if (CHILD_GONE_RESP == removed) {
465		cleanup_after_child(c);
466		removed = NULL;
467	}
468
469	return removed;
470}
471
472/* --------------------------------------------------------------------
473 * Light up a new worker.
474 */
475static void
476start_blocking_thread(
477	blocking_child *	c
478	)
479{
480
481	DEBUG_INSIST(!c->reusable);
482
483	prepare_child_sems(c);
484	start_blocking_thread_internal(c);
485}
486
487/* --------------------------------------------------------------------
488 * Create a worker thread. There are several differences between POSIX
489 * and Windows, of course -- most notably the Windows thread is no
490 * detached thread, and we keep the handle around until we want to get
491 * rid of the thread. The notification scheme also differs: Windows
492 * makes use of semaphores in both directions, POSIX uses a pipe for
493 * integration with 'select()' or alike.
494 */
495static void
496start_blocking_thread_internal(
497	blocking_child *	c
498	)
499#ifdef SYS_WINNT
500{
501	BOOL	resumed;
502
503	c->thread_ref = NULL;
504	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
505	c->thr_table[0].thnd =
506		(HANDLE)_beginthreadex(
507			NULL,
508			0,
509			&blocking_thread,
510			c,
511			CREATE_SUSPENDED,
512			NULL);
513
514	if (NULL == c->thr_table[0].thnd) {
515		msyslog(LOG_ERR, "start blocking thread failed: %m");
516		exit(-1);
517	}
518	/* remember the thread priority is only within the process class */
519	if (!SetThreadPriority(c->thr_table[0].thnd,
520			       THREAD_PRIORITY_BELOW_NORMAL))
521		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
522
523	resumed = ResumeThread(c->thr_table[0].thnd);
524	DEBUG_INSIST(resumed);
525	c->thread_ref = &c->thr_table[0];
526}
527#else	/* pthreads start_blocking_thread_internal() follows */
528{
529# ifdef NEED_PTHREAD_INIT
530	static int	pthread_init_called;
531# endif
532	pthread_attr_t	thr_attr;
533	int		rc;
534	int		pipe_ends[2];	/* read then write */
535	int		is_pipe;
536	int		flags;
537	size_t		ostacksize;
538	size_t		nstacksize;
539	sigset_t	saved_sig_mask;
540
541	c->thread_ref = NULL;
542
543# ifdef NEED_PTHREAD_INIT
544	/*
545	 * from lib/isc/unix/app.c:
546	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
547	 */
548	if (!pthread_init_called) {
549		pthread_init();
550		pthread_init_called = TRUE;
551	}
552# endif
553
554	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
555	if (0 != rc) {
556		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
557		exit(1);
558	}
559	c->resp_read_pipe = move_fd(pipe_ends[0]);
560	c->resp_write_pipe = move_fd(pipe_ends[1]);
561	c->ispipe = is_pipe;
562	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
563	if (-1 == flags) {
564		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
565		exit(1);
566	}
567	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
568	if (-1 == rc) {
569		msyslog(LOG_ERR,
570			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
571		exit(1);
572	}
573	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
574	pthread_attr_init(&thr_attr);
575	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
576#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
577    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
578	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
579	if (0 != rc) {
580		msyslog(LOG_ERR,
581			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
582			strerror(rc));
583	} else {
584		if (ostacksize < THREAD_MINSTACKSIZE)
585			nstacksize = THREAD_MINSTACKSIZE;
586		else if (ostacksize > THREAD_MAXSTACKSIZE)
587			nstacksize = THREAD_MAXSTACKSIZE;
588		else
589			nstacksize = ostacksize;
590		if (nstacksize != ostacksize)
591			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
592		if (0 != rc)
593			msyslog(LOG_ERR,
594				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
595				(u_long)ostacksize, (u_long)nstacksize,
596				strerror(rc));
597	}
598#else
599	UNUSED_ARG(nstacksize);
600	UNUSED_ARG(ostacksize);
601#endif
602#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
603	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
604#endif
605	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
606	block_thread_signals(&saved_sig_mask);
607	rc = pthread_create(&c->thr_table[0], &thr_attr,
608			    &blocking_thread, c);
609	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
610	pthread_attr_destroy(&thr_attr);
611	if (0 != rc) {
612		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
613			strerror(rc));
614		exit(1);
615	}
616	c->thread_ref = &c->thr_table[0];
617}
618#endif
619
620/* --------------------------------------------------------------------
621 * block_thread_signals()
622 *
623 * Temporarily block signals used by ntpd main thread, so that signal
624 * mask inherited by child threads leaves them blocked.  Returns prior
625 * active signal mask via pmask, to be restored by the main thread
626 * after pthread_create().
627 */
628#ifndef SYS_WINNT
629void
630block_thread_signals(
631	sigset_t *	pmask
632	)
633{
634	sigset_t	block;
635
636	sigemptyset(&block);
637# ifdef HAVE_SIGNALED_IO
638#  ifdef SIGIO
639	sigaddset(&block, SIGIO);
640#  endif
641#  ifdef SIGPOLL
642	sigaddset(&block, SIGPOLL);
643#  endif
644# endif	/* HAVE_SIGNALED_IO */
645	sigaddset(&block, SIGALRM);
646	sigaddset(&block, MOREDEBUGSIG);
647	sigaddset(&block, LESSDEBUGSIG);
648# ifdef SIGDIE1
649	sigaddset(&block, SIGDIE1);
650# endif
651# ifdef SIGDIE2
652	sigaddset(&block, SIGDIE2);
653# endif
654# ifdef SIGDIE3
655	sigaddset(&block, SIGDIE3);
656# endif
657# ifdef SIGDIE4
658	sigaddset(&block, SIGDIE4);
659# endif
660# ifdef SIGBUS
661	sigaddset(&block, SIGBUS);
662# endif
663	sigemptyset(pmask);
664	pthread_sigmask(SIG_BLOCK, &block, pmask);
665}
666#endif	/* !SYS_WINNT */
667
668
669/* --------------------------------------------------------------------
670 * Create & destroy semaphores. This is sufficiently different between
671 * POSIX and Windows to warrant wrapper functions and close enough to
672 * use the concept of synchronization via semaphore for all platforms.
673 */
674static sem_ref
675create_sema(
676	sema_type*	semptr,
677	u_int		inival,
678	u_int		maxval)
679{
680#ifdef SYS_WINNT
681
682	long svini, svmax;
683	if (NULL != semptr) {
684		svini = (inival < LONG_MAX)
685		    ? (long)inival : LONG_MAX;
686		svmax = (maxval < LONG_MAX && maxval > 0)
687		    ? (long)maxval : LONG_MAX;
688		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
689		if (NULL == semptr->shnd)
690			semptr = NULL;
691	}
692
693#else
694
695	(void)maxval;
696	if (semptr && sem_init(semptr, FALSE, inival))
697		semptr = NULL;
698
699#endif
700
701	return semptr;
702}
703
704/* ------------------------------------------------------------------ */
705static sem_ref
706delete_sema(
707	sem_ref obj)
708{
709
710#   ifdef SYS_WINNT
711
712	if (obj) {
713		if (obj->shnd)
714			CloseHandle(obj->shnd);
715		obj->shnd = NULL;
716	}
717
718#   else
719
720	if (obj)
721		sem_destroy(obj);
722
723#   endif
724
725	return NULL;
726}
727
728/* --------------------------------------------------------------------
729 * prepare_child_sems()
730 *
731 * create sync & access semaphores
732 *
733 * All semaphores are cleared, only the access semaphore has 1 unit.
734 * Childs wait on 'workitems_pending', then grabs 'sema_access'
735 * and dequeues jobs. When done, 'sema_access' is given one unit back.
736 *
737 * The producer grabs 'sema_access', manages the queue, restores
738 * 'sema_access' and puts one unit into 'workitems_pending'.
739 *
740 * The story goes the same for the response queue.
741 */
742static void
743prepare_child_sems(
744	blocking_child *c
745	)
746{
747	if (NULL == worker_memlock)
748		worker_memlock = create_sema(&worker_mmutex, 1, 1);
749
750	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
751	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
752	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
753#   ifndef WORK_PIPE
754	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
755#   endif
756}
757
758/* --------------------------------------------------------------------
759 * wait for semaphore. Where the wait can be interrupted, it will
760 * internally resume -- When this function returns, there is either no
761 * semaphore at all, a timeout occurred, or the caller could
762 * successfully take a token from the semaphore.
763 *
764 * For untimed wait, not checking the result of this function at all is
765 * definitely an option.
766 */
767static int
768wait_for_sem(
769	sem_ref			sem,
770	struct timespec *	timeout		/* wall-clock */
771	)
772#ifdef SYS_WINNT
773{
774	struct timespec now;
775	struct timespec delta;
776	DWORD		msec;
777	DWORD		rc;
778
779	if (!(sem && sem->shnd)) {
780		errno = EINVAL;
781		return -1;
782	}
783
784	if (NULL == timeout) {
785		msec = INFINITE;
786	} else {
787		getclock(TIMEOFDAY, &now);
788		delta = sub_tspec(*timeout, now);
789		if (delta.tv_sec < 0) {
790			msec = 0;
791		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
792			msec = INFINITE;
793		} else {
794			msec = 1000 * (DWORD)delta.tv_sec;
795			msec += delta.tv_nsec / (1000 * 1000);
796		}
797	}
798	rc = WaitForSingleObject(sem->shnd, msec);
799	if (WAIT_OBJECT_0 == rc)
800		return 0;
801	if (WAIT_TIMEOUT == rc) {
802		errno = ETIMEDOUT;
803		return -1;
804	}
805	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
806	errno = EFAULT;
807	return -1;
808}
809#else	/* pthreads wait_for_sem() follows */
810{
811	int rc = -1;
812
813	if (sem) do {
814			if (NULL == timeout)
815				rc = sem_wait(sem);
816			else
817				rc = sem_timedwait(sem, timeout);
818		} while (rc == -1 && errno == EINTR);
819	else
820		errno = EINVAL;
821
822	return rc;
823}
824#endif
825
826/* --------------------------------------------------------------------
827 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
828 * calling conventions under Windows and POSIX-defined signature
829 * otherwise.
830 */
831#ifdef SYS_WINNT
832u_int WINAPI
833#else
834void *
835#endif
836blocking_thread(
837	void *	ThreadArg
838	)
839{
840	blocking_child *c;
841
842	c = ThreadArg;
843	exit_worker(blocking_child_common(c));
844
845	/* NOTREACHED */
846	return 0;
847}
848
849/* --------------------------------------------------------------------
850 * req_child_exit() runs in the parent.
851 *
852 * This function is called from from the idle timer, too, and possibly
853 * without a thread being there any longer. Since we have folded up our
854 * tent in that case and all the semaphores are already gone, we simply
855 * ignore this request in this case.
856 *
857 * Since the existence of the semaphores is controlled exclusively by
858 * the parent, there's no risk of data race here.
859 */
860int
861req_child_exit(
862	blocking_child *c
863	)
864{
865	return (c->accesslock)
866	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
867	    : 0;
868}
869
870/* --------------------------------------------------------------------
871 * cleanup_after_child() runs in parent.
872 */
873static void
874cleanup_after_child(
875	blocking_child *	c
876	)
877{
878	DEBUG_INSIST(!c->reusable);
879
880#   ifdef SYS_WINNT
881	/* The thread was not created in detached state, so we better
882	 * clean up.
883	 */
884	if (c->thread_ref && c->thread_ref->thnd) {
885		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
886		INSIST(CloseHandle(c->thread_ref->thnd));
887		c->thread_ref->thnd = NULL;
888	}
889#   endif
890	c->thread_ref = NULL;
891
892	/* remove semaphores and (if signalling vi IO) pipes */
893
894	c->accesslock           = delete_sema(c->accesslock);
895	c->workitems_pending    = delete_sema(c->workitems_pending);
896	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
897
898#   ifdef WORK_PIPE
899	DEBUG_INSIST(-1 != c->resp_read_pipe);
900	DEBUG_INSIST(-1 != c->resp_write_pipe);
901	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
902	close(c->resp_write_pipe);
903	close(c->resp_read_pipe);
904	c->resp_write_pipe = -1;
905	c->resp_read_pipe = -1;
906#   else
907	DEBUG_INSIST(NULL != c->responses_pending);
908	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
909	c->responses_pending = delete_sema(c->responses_pending);
910#   endif
911
912	/* Is it necessary to check if there are pending requests and
913	 * responses? If so, and if there are, what to do with them?
914	 */
915
916	/* re-init buffer index sequencers */
917	c->head_workitem = 0;
918	c->tail_workitem = 0;
919	c->head_response = 0;
920	c->tail_response = 0;
921
922	c->reusable = TRUE;
923}
924
925
926#else	/* !WORK_THREAD follows */
927char work_thread_nonempty_compilation_unit;
928#endif
929