work_thread.c revision 294554
1/*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4#include <config.h>
5#include "ntp_workimpl.h"
6
7#ifdef WORK_THREAD
8
9#include <stdio.h>
10#include <ctype.h>
11#include <signal.h>
12#ifndef SYS_WINNT
13#include <pthread.h>
14#endif
15
16#include "ntp_stdlib.h"
17#include "ntp_malloc.h"
18#include "ntp_syslog.h"
19#include "ntpd.h"
20#include "ntp_io.h"
21#include "ntp_assert.h"
22#include "ntp_unixtime.h"
23#include "timespecops.h"
24#include "ntp_worker.h"
25
26#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27#define CHILD_GONE_RESP	CHILD_EXIT_REQ
28/* Queue size increments:
29 * The request queue grows a bit faster than the response queue -- the
30 * deamon can push requests and pull results faster on avarage than the
31 * worker can process requests and push results...  If this really pays
32 * off is debatable.
33 */
34#define WORKITEMS_ALLOC_INC	16
35#define RESPONSES_ALLOC_INC	4
36
37/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38 * set the maximum to 256kB. If the minimum goes below the
39 * system-defined minimum stack size, we have to adjust accordingly.
40 */
41#ifndef THREAD_MINSTACKSIZE
42# define THREAD_MINSTACKSIZE	(64U * 1024)
43#endif
44#ifndef __sun
45#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46# undef THREAD_MINSTACKSIZE
47# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48#endif
49#endif
50
51#ifndef THREAD_MAXSTACKSIZE
52# define THREAD_MAXSTACKSIZE	(256U * 1024)
53#endif
54#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55# undef  THREAD_MAXSTACKSIZE
56# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57#endif
58
59
60#ifdef SYS_WINNT
61
62# define thread_exit(c)	_endthreadex(c)
63# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
64u_int	WINAPI	blocking_thread(void *);
65static BOOL	same_os_sema(const sem_ref obj, void * osobj);
66
67#else
68
69# define thread_exit(c)	pthread_exit((void*)(size_t)(c))
70# define tickle_sem	sem_post
71void *		blocking_thread(void *);
72static	void	block_thread_signals(sigset_t *);
73
74#endif
75
76#ifdef WORK_PIPE
77addremove_io_fd_func		addremove_io_fd;
78#else
79addremove_io_semaphore_func	addremove_io_semaphore;
80#endif
81
82static	void	start_blocking_thread(blocking_child *);
83static	void	start_blocking_thread_internal(blocking_child *);
84static	void	prepare_child_sems(blocking_child *);
85static	int	wait_for_sem(sem_ref, struct timespec *);
86static	int	ensure_workitems_empty_slot(blocking_child *);
87static	int	ensure_workresp_empty_slot(blocking_child *);
88static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
89static	void	cleanup_after_child(blocking_child *);
90
91
92void
93exit_worker(
94	int	exitcode
95	)
96{
97	thread_exit(exitcode);	/* see #define thread_exit */
98}
99
100/* --------------------------------------------------------------------
101 * sleep for a given time or until the wakup semaphore is tickled.
102 */
103int
104worker_sleep(
105	blocking_child *	c,
106	time_t			seconds
107	)
108{
109	struct timespec	until;
110	int		rc;
111
112# ifdef HAVE_CLOCK_GETTIME
113	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
114		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
115		return -1;
116	}
117# else
118	if (0 != getclock(TIMEOFDAY, &until)) {
119		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
120		return -1;
121	}
122# endif
123	until.tv_sec += seconds;
124	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
125	if (0 == rc)
126		return -1;
127	if (-1 == rc && ETIMEDOUT == errno)
128		return 0;
129	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
130	return -1;
131}
132
133
134/* --------------------------------------------------------------------
135 * Wake up a worker that takes a nap.
136 */
137void
138interrupt_worker_sleep(void)
139{
140	u_int			idx;
141	blocking_child *	c;
142
143	for (idx = 0; idx < blocking_children_alloc; idx++) {
144		c = blocking_children[idx];
145		if (NULL == c || NULL == c->wake_scheduled_sleep)
146			continue;
147		tickle_sem(c->wake_scheduled_sleep);
148	}
149}
150
151/* --------------------------------------------------------------------
152 * Make sure there is an empty slot at the head of the request
153 * queue. Tell if the queue is currently empty.
154 */
155static int
156ensure_workitems_empty_slot(
157	blocking_child *c
158	)
159{
160	/*
161	** !!! PRECONDITION: caller holds access lock!
162	**
163	** This simply tries to increase the size of the buffer if it
164	** becomes full. The resize operation does *not* maintain the
165	** order of requests, but that should be irrelevant since the
166	** processing is considered asynchronous anyway.
167	**
168	** Return if the buffer is currently empty.
169	*/
170
171	static const size_t each =
172	    sizeof(blocking_children[0]->workitems[0]);
173
174	size_t	new_alloc;
175	size_t  slots_used;
176	size_t	sidx;
177
178	slots_used = c->head_workitem - c->tail_workitem;
179	if (slots_used >= c->workitems_alloc) {
180		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
181		c->workitems = erealloc(c->workitems, new_alloc * each);
182		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
183		    c->workitems[sidx] = NULL;
184		c->tail_workitem   = 0;
185		c->head_workitem   = c->workitems_alloc;
186		c->workitems_alloc = new_alloc;
187	}
188	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
189	return (0 == slots_used);
190}
191
192/* --------------------------------------------------------------------
193 * Make sure there is an empty slot at the head of the response
194 * queue. Tell if the queue is currently empty.
195 */
196static int
197ensure_workresp_empty_slot(
198	blocking_child *c
199	)
200{
201	/*
202	** !!! PRECONDITION: caller holds access lock!
203	**
204	** Works like the companion function above.
205	*/
206
207	static const size_t each =
208	    sizeof(blocking_children[0]->responses[0]);
209
210	size_t	new_alloc;
211	size_t  slots_used;
212	size_t	sidx;
213
214	slots_used = c->head_response - c->tail_response;
215	if (slots_used >= c->responses_alloc) {
216		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
217		c->responses = erealloc(c->responses, new_alloc * each);
218		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
219		    c->responses[sidx] = NULL;
220		c->tail_response   = 0;
221		c->head_response   = c->responses_alloc;
222		c->responses_alloc = new_alloc;
223	}
224	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
225	return (0 == slots_used);
226}
227
228
229/* --------------------------------------------------------------------
230 * queue_req_pointer() - append a work item or idle exit request to
231 *			 blocking_workitems[]. Employ proper locking.
232 */
233static int
234queue_req_pointer(
235	blocking_child	*	c,
236	blocking_pipe_header *	hdr
237	)
238{
239	size_t qhead;
240
241	/* >>>> ACCESS LOCKING STARTS >>>> */
242	wait_for_sem(c->accesslock, NULL);
243	ensure_workitems_empty_slot(c);
244	qhead = c->head_workitem;
245	c->workitems[qhead % c->workitems_alloc] = hdr;
246	c->head_workitem = 1 + qhead;
247	tickle_sem(c->accesslock);
248	/* <<<< ACCESS LOCKING ENDS <<<< */
249
250	/* queue consumer wake-up notification */
251	tickle_sem(c->workitems_pending);
252
253	return 0;
254}
255
256/* --------------------------------------------------------------------
257 * API function to make sure a worker is running, a proper private copy
258 * of the data is made, the data eneterd into the queue and the worker
259 * is signalled.
260 */
261int
262send_blocking_req_internal(
263	blocking_child *	c,
264	blocking_pipe_header *	hdr,
265	void *			data
266	)
267{
268	blocking_pipe_header *	threadcopy;
269	size_t			payload_octets;
270
271	REQUIRE(hdr != NULL);
272	REQUIRE(data != NULL);
273	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
274
275	if (hdr->octets <= sizeof(*hdr))
276		return 1;	/* failure */
277	payload_octets = hdr->octets - sizeof(*hdr);
278
279	if (NULL == c->thread_ref)
280		start_blocking_thread(c);
281	threadcopy = emalloc(hdr->octets);
282	memcpy(threadcopy, hdr, sizeof(*hdr));
283	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
284
285	return queue_req_pointer(c, threadcopy);
286}
287
288/* --------------------------------------------------------------------
289 * Wait for the 'incoming queue no longer empty' signal, lock the shared
290 * structure and dequeue an item.
291 */
292blocking_pipe_header *
293receive_blocking_req_internal(
294	blocking_child *	c
295	)
296{
297	blocking_pipe_header *	req;
298	size_t			qhead, qtail;
299
300	req = NULL;
301	do {
302		/* wait for tickle from the producer side */
303		wait_for_sem(c->workitems_pending, NULL);
304
305		/* >>>> ACCESS LOCKING STARTS >>>> */
306		wait_for_sem(c->accesslock, NULL);
307		qhead = c->head_workitem;
308		do {
309			qtail = c->tail_workitem;
310			if (qhead == qtail)
311				break;
312			c->tail_workitem = qtail + 1;
313			qtail %= c->workitems_alloc;
314			req = c->workitems[qtail];
315			c->workitems[qtail] = NULL;
316		} while (NULL == req);
317		tickle_sem(c->accesslock);
318		/* <<<< ACCESS LOCKING ENDS <<<< */
319
320	} while (NULL == req);
321
322	INSIST(NULL != req);
323	if (CHILD_EXIT_REQ == req) {	/* idled out */
324		send_blocking_resp_internal(c, CHILD_GONE_RESP);
325		req = NULL;
326	}
327
328	return req;
329}
330
331/* --------------------------------------------------------------------
332 * Push a response into the return queue and eventually tickle the
333 * receiver.
334 */
335int
336send_blocking_resp_internal(
337	blocking_child *	c,
338	blocking_pipe_header *	resp
339	)
340{
341	size_t	qhead;
342	int	empty;
343
344	/* >>>> ACCESS LOCKING STARTS >>>> */
345	wait_for_sem(c->accesslock, NULL);
346	empty = ensure_workresp_empty_slot(c);
347	qhead = c->head_response;
348	c->responses[qhead % c->responses_alloc] = resp;
349	c->head_response = 1 + qhead;
350	tickle_sem(c->accesslock);
351	/* <<<< ACCESS LOCKING ENDS <<<< */
352
353	/* queue consumer wake-up notification */
354	if (empty)
355	{
356#	    ifdef WORK_PIPE
357		write(c->resp_write_pipe, "", 1);
358#	    else
359		tickle_sem(c->responses_pending);
360#	    endif
361	}
362	return 0;
363}
364
365
366#ifndef WORK_PIPE
367
368/* --------------------------------------------------------------------
369 * Check if a (Windows-)hanndle to a semaphore is actually the same we
370 * are using inside the sema wrapper.
371 */
372static BOOL
373same_os_sema(
374	const sem_ref	obj,
375	void*		osh
376	)
377{
378	return obj && osh && (obj->shnd == (HANDLE)osh);
379}
380
381/* --------------------------------------------------------------------
382 * Find the shared context that associates to an OS handle and make sure
383 * the data is dequeued and processed.
384 */
385void
386handle_blocking_resp_sem(
387	void *	context
388	)
389{
390	blocking_child *	c;
391	u_int			idx;
392
393	c = NULL;
394	for (idx = 0; idx < blocking_children_alloc; idx++) {
395		c = blocking_children[idx];
396		if (c != NULL &&
397			c->thread_ref != NULL &&
398			same_os_sema(c->responses_pending, context))
399			break;
400	}
401	if (idx < blocking_children_alloc)
402		process_blocking_resp(c);
403}
404#endif	/* !WORK_PIPE */
405
406/* --------------------------------------------------------------------
407 * Fetch the next response from the return queue. In case of signalling
408 * via pipe, make sure the pipe is flushed, too.
409 */
410blocking_pipe_header *
411receive_blocking_resp_internal(
412	blocking_child *	c
413	)
414{
415	blocking_pipe_header *	removed;
416	size_t			qhead, qtail, slot;
417
418#ifdef WORK_PIPE
419	int			rc;
420	char			scratch[32];
421
422	do
423		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
424	while (-1 == rc && EINTR == errno);
425#endif
426
427	/* >>>> ACCESS LOCKING STARTS >>>> */
428	wait_for_sem(c->accesslock, NULL);
429	qhead = c->head_response;
430	qtail = c->tail_response;
431	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
432		slot = qtail % c->responses_alloc;
433		removed = c->responses[slot];
434		c->responses[slot] = NULL;
435	}
436	c->tail_response = qtail;
437	tickle_sem(c->accesslock);
438	/* <<<< ACCESS LOCKING ENDS <<<< */
439
440	if (NULL != removed) {
441		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
442			     BLOCKING_RESP_MAGIC == removed->magic_sig);
443	}
444	if (CHILD_GONE_RESP == removed) {
445		cleanup_after_child(c);
446		removed = NULL;
447	}
448
449	return removed;
450}
451
452/* --------------------------------------------------------------------
453 * Light up a new worker.
454 */
455static void
456start_blocking_thread(
457	blocking_child *	c
458	)
459{
460
461	DEBUG_INSIST(!c->reusable);
462
463	prepare_child_sems(c);
464	start_blocking_thread_internal(c);
465}
466
467/* --------------------------------------------------------------------
468 * Create a worker thread. There are several differences between POSIX
469 * and Windows, of course -- most notably the Windows thread is no
470 * detached thread, and we keep the handle around until we want to get
471 * rid of the thread. The notification scheme also differs: Windows
472 * makes use of semaphores in both directions, POSIX uses a pipe for
473 * integration with 'select()' or alike.
474 */
475static void
476start_blocking_thread_internal(
477	blocking_child *	c
478	)
479#ifdef SYS_WINNT
480{
481	BOOL	resumed;
482
483	c->thread_ref = NULL;
484	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
485	c->thr_table[0].thnd =
486		(HANDLE)_beginthreadex(
487			NULL,
488			0,
489			&blocking_thread,
490			c,
491			CREATE_SUSPENDED,
492			NULL);
493
494	if (NULL == c->thr_table[0].thnd) {
495		msyslog(LOG_ERR, "start blocking thread failed: %m");
496		exit(-1);
497	}
498	/* remember the thread priority is only within the process class */
499	if (!SetThreadPriority(c->thr_table[0].thnd,
500			       THREAD_PRIORITY_BELOW_NORMAL))
501		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
502
503	resumed = ResumeThread(c->thr_table[0].thnd);
504	DEBUG_INSIST(resumed);
505	c->thread_ref = &c->thr_table[0];
506}
507#else	/* pthreads start_blocking_thread_internal() follows */
508{
509# ifdef NEED_PTHREAD_INIT
510	static int	pthread_init_called;
511# endif
512	pthread_attr_t	thr_attr;
513	int		rc;
514	int		pipe_ends[2];	/* read then write */
515	int		is_pipe;
516	int		flags;
517	size_t		ostacksize;
518	size_t		nstacksize;
519	sigset_t	saved_sig_mask;
520
521	c->thread_ref = NULL;
522
523# ifdef NEED_PTHREAD_INIT
524	/*
525	 * from lib/isc/unix/app.c:
526	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
527	 */
528	if (!pthread_init_called) {
529		pthread_init();
530		pthread_init_called = TRUE;
531	}
532# endif
533
534	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
535	if (0 != rc) {
536		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
537		exit(1);
538	}
539	c->resp_read_pipe = move_fd(pipe_ends[0]);
540	c->resp_write_pipe = move_fd(pipe_ends[1]);
541	c->ispipe = is_pipe;
542	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
543	if (-1 == flags) {
544		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
545		exit(1);
546	}
547	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
548	if (-1 == rc) {
549		msyslog(LOG_ERR,
550			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
551		exit(1);
552	}
553	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
554	pthread_attr_init(&thr_attr);
555	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
556#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
557    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
558	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
559	if (0 != rc) {
560		msyslog(LOG_ERR,
561			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
562			strerror(rc));
563	} else {
564		if (ostacksize < THREAD_MINSTACKSIZE)
565			nstacksize = THREAD_MINSTACKSIZE;
566		else if (ostacksize > THREAD_MAXSTACKSIZE)
567			nstacksize = THREAD_MAXSTACKSIZE;
568		else
569			nstacksize = ostacksize;
570		if (nstacksize != ostacksize)
571			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
572		if (0 != rc)
573			msyslog(LOG_ERR,
574				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
575				(u_long)ostacksize, (u_long)nstacksize,
576				strerror(rc));
577	}
578#else
579	UNUSED_ARG(nstacksize);
580	UNUSED_ARG(ostacksize);
581#endif
582#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
583	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
584#endif
585	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
586	block_thread_signals(&saved_sig_mask);
587	rc = pthread_create(&c->thr_table[0], &thr_attr,
588			    &blocking_thread, c);
589	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
590	pthread_attr_destroy(&thr_attr);
591	if (0 != rc) {
592		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
593			strerror(rc));
594		exit(1);
595	}
596	c->thread_ref = &c->thr_table[0];
597}
598#endif
599
600/* --------------------------------------------------------------------
601 * block_thread_signals()
602 *
603 * Temporarily block signals used by ntpd main thread, so that signal
604 * mask inherited by child threads leaves them blocked.  Returns prior
605 * active signal mask via pmask, to be restored by the main thread
606 * after pthread_create().
607 */
608#ifndef SYS_WINNT
609void
610block_thread_signals(
611	sigset_t *	pmask
612	)
613{
614	sigset_t	block;
615
616	sigemptyset(&block);
617# ifdef HAVE_SIGNALED_IO
618#  ifdef SIGIO
619	sigaddset(&block, SIGIO);
620#  endif
621#  ifdef SIGPOLL
622	sigaddset(&block, SIGPOLL);
623#  endif
624# endif	/* HAVE_SIGNALED_IO */
625	sigaddset(&block, SIGALRM);
626	sigaddset(&block, MOREDEBUGSIG);
627	sigaddset(&block, LESSDEBUGSIG);
628# ifdef SIGDIE1
629	sigaddset(&block, SIGDIE1);
630# endif
631# ifdef SIGDIE2
632	sigaddset(&block, SIGDIE2);
633# endif
634# ifdef SIGDIE3
635	sigaddset(&block, SIGDIE3);
636# endif
637# ifdef SIGDIE4
638	sigaddset(&block, SIGDIE4);
639# endif
640# ifdef SIGBUS
641	sigaddset(&block, SIGBUS);
642# endif
643	sigemptyset(pmask);
644	pthread_sigmask(SIG_BLOCK, &block, pmask);
645}
646#endif	/* !SYS_WINNT */
647
648
649/* --------------------------------------------------------------------
650 * Create & destroy semaphores. This is sufficiently different between
651 * POSIX and Windows to warrant wrapper functions and close enough to
652 * use the concept of synchronization via semaphore for all platforms.
653 */
654static sem_ref
655create_sema(
656	sema_type*	semptr,
657	u_int		inival,
658	u_int		maxval)
659{
660#ifdef SYS_WINNT
661
662	long svini, svmax;
663	if (NULL != semptr) {
664		svini = (inival < LONG_MAX)
665		    ? (long)inival : LONG_MAX;
666		svmax = (maxval < LONG_MAX && maxval > 0)
667		    ? (long)maxval : LONG_MAX;
668		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
669		if (NULL == semptr->shnd)
670			semptr = NULL;
671	}
672
673#else
674
675	(void)maxval;
676	if (semptr && sem_init(semptr, FALSE, inival))
677		semptr = NULL;
678
679#endif
680
681	return semptr;
682}
683
684/* ------------------------------------------------------------------ */
685static sem_ref
686delete_sema(
687	sem_ref obj)
688{
689
690#   ifdef SYS_WINNT
691
692	if (obj) {
693		if (obj->shnd)
694			CloseHandle(obj->shnd);
695		obj->shnd = NULL;
696	}
697
698#   else
699
700	if (obj)
701		sem_destroy(obj);
702
703#   endif
704
705	return NULL;
706}
707
708/* --------------------------------------------------------------------
709 * prepare_child_sems()
710 *
711 * create sync & access semaphores
712 *
713 * All semaphores are cleared, only the access semaphore has 1 unit.
714 * Childs wait on 'workitems_pending', then grabs 'sema_access'
715 * and dequeues jobs. When done, 'sema_access' is given one unit back.
716 *
717 * The producer grabs 'sema_access', manages the queue, restores
718 * 'sema_access' and puts one unit into 'workitems_pending'.
719 *
720 * The story goes the same for the response queue.
721 */
722static void
723prepare_child_sems(
724	blocking_child *c
725	)
726{
727	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
728	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
729	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
730#   ifndef WORK_PIPE
731	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
732#   endif
733}
734
735/* --------------------------------------------------------------------
736 * wait for semaphore. Where the wait can be interrupted, it will
737 * internally resume -- When this function returns, there is either no
738 * semaphore at all, a timeout occurred, or the caller could
739 * successfully take a token from the semaphore.
740 *
741 * For untimed wait, not checking the result of this function at all is
742 * definitely an option.
743 */
744static int
745wait_for_sem(
746	sem_ref			sem,
747	struct timespec *	timeout		/* wall-clock */
748	)
749#ifdef SYS_WINNT
750{
751	struct timespec now;
752	struct timespec delta;
753	DWORD		msec;
754	DWORD		rc;
755
756	if (!(sem && sem->shnd)) {
757		errno = EINVAL;
758		return -1;
759	}
760
761	if (NULL == timeout) {
762		msec = INFINITE;
763	} else {
764		getclock(TIMEOFDAY, &now);
765		delta = sub_tspec(*timeout, now);
766		if (delta.tv_sec < 0) {
767			msec = 0;
768		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
769			msec = INFINITE;
770		} else {
771			msec = 1000 * (DWORD)delta.tv_sec;
772			msec += delta.tv_nsec / (1000 * 1000);
773		}
774	}
775	rc = WaitForSingleObject(sem->shnd, msec);
776	if (WAIT_OBJECT_0 == rc)
777		return 0;
778	if (WAIT_TIMEOUT == rc) {
779		errno = ETIMEDOUT;
780		return -1;
781	}
782	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
783	errno = EFAULT;
784	return -1;
785}
786#else	/* pthreads wait_for_sem() follows */
787{
788	int rc = -1;
789
790	if (sem) do {
791			if (NULL == timeout)
792				rc = sem_wait(sem);
793			else
794				rc = sem_timedwait(sem, timeout);
795		} while (rc == -1 && errno == EINTR);
796	else
797		errno = EINVAL;
798
799	return rc;
800}
801#endif
802
803/* --------------------------------------------------------------------
804 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
805 * calling conventions under Windows and POSIX-defined signature
806 * otherwise.
807 */
808#ifdef SYS_WINNT
809u_int WINAPI
810#else
811void *
812#endif
813blocking_thread(
814	void *	ThreadArg
815	)
816{
817	blocking_child *c;
818
819	c = ThreadArg;
820	exit_worker(blocking_child_common(c));
821
822	/* NOTREACHED */
823	return 0;
824}
825
826/* --------------------------------------------------------------------
827 * req_child_exit() runs in the parent.
828 *
829 * This function is called from from the idle timer, too, and possibly
830 * without a thread being there any longer. Since we have folded up our
831 * tent in that case and all the semaphores are already gone, we simply
832 * ignore this request in this case.
833 *
834 * Since the existence of the semaphores is controlled exclusively by
835 * the parent, there's no risk of data race here.
836 */
837int
838req_child_exit(
839	blocking_child *c
840	)
841{
842	return (c->accesslock)
843	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
844	    : 0;
845}
846
847/* --------------------------------------------------------------------
848 * cleanup_after_child() runs in parent.
849 */
850static void
851cleanup_after_child(
852	blocking_child *	c
853	)
854{
855	DEBUG_INSIST(!c->reusable);
856
857#   ifdef SYS_WINNT
858	/* The thread was not created in detached state, so we better
859	 * clean up.
860	 */
861	if (c->thread_ref && c->thread_ref->thnd) {
862		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
863		INSIST(CloseHandle(c->thread_ref->thnd));
864		c->thread_ref->thnd = NULL;
865	}
866#   endif
867	c->thread_ref = NULL;
868
869	/* remove semaphores and (if signalling vi IO) pipes */
870
871	c->accesslock           = delete_sema(c->accesslock);
872	c->workitems_pending    = delete_sema(c->workitems_pending);
873	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
874
875#   ifdef WORK_PIPE
876	DEBUG_INSIST(-1 != c->resp_read_pipe);
877	DEBUG_INSIST(-1 != c->resp_write_pipe);
878	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
879	close(c->resp_write_pipe);
880	close(c->resp_read_pipe);
881	c->resp_write_pipe = -1;
882	c->resp_read_pipe = -1;
883#   else
884	DEBUG_INSIST(NULL != c->responses_pending);
885	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
886	c->responses_pending = delete_sema(c->responses_pending);
887#   endif
888
889	/* Is it necessary to check if there are pending requests and
890	 * responses? If so, and if there are, what to do with them?
891	 */
892
893	/* re-init buffer index sequencers */
894	c->head_workitem = 0;
895	c->tail_workitem = 0;
896	c->head_response = 0;
897	c->tail_response = 0;
898
899	c->reusable = TRUE;
900}
901
902
903#else	/* !WORK_THREAD follows */
904char work_thread_nonempty_compilation_unit;
905#endif
906