work_thread.c revision 293894
1/*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4#include <config.h>
5#include "ntp_workimpl.h"
6
7#ifdef WORK_THREAD
8
9#include <stdio.h>
10#include <ctype.h>
11#include <signal.h>
12#ifndef SYS_WINNT
13#include <pthread.h>
14#endif
15
16#include "ntp_stdlib.h"
17#include "ntp_malloc.h"
18#include "ntp_syslog.h"
19#include "ntpd.h"
20#include "ntp_io.h"
21#include "ntp_assert.h"
22#include "ntp_unixtime.h"
23#include "timespecops.h"
24#include "ntp_worker.h"
25
26#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27#define CHILD_GONE_RESP	CHILD_EXIT_REQ
28#define WORKITEMS_ALLOC_INC	16
29#define RESPONSES_ALLOC_INC	4
30
31#ifndef THREAD_MINSTACKSIZE
32#define THREAD_MINSTACKSIZE	(64U * 1024)
33#endif
34
35#ifdef SYS_WINNT
36
37# define thread_exit(c)	_endthreadex(c)
38# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
39u_int	WINAPI	blocking_thread(void *);
40static BOOL	same_os_sema(const sem_ref obj, void * osobj);
41
42#else
43
44# define thread_exit(c)	pthread_exit((void*)(size_t)(c))
45# define tickle_sem	sem_post
46void *		blocking_thread(void *);
47static	void	block_thread_signals(sigset_t *);
48
49#endif
50
51#ifdef WORK_PIPE
52addremove_io_fd_func		addremove_io_fd;
53#else
54addremove_io_semaphore_func	addremove_io_semaphore;
55#endif
56
57static	void	start_blocking_thread(blocking_child *);
58static	void	start_blocking_thread_internal(blocking_child *);
59static	void	prepare_child_sems(blocking_child *);
60static	int	wait_for_sem(sem_ref, struct timespec *);
61static	int	ensure_workitems_empty_slot(blocking_child *);
62static	int	ensure_workresp_empty_slot(blocking_child *);
63static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
64static	void	cleanup_after_child(blocking_child *);
65
66
67void
68exit_worker(
69	int	exitcode
70	)
71{
72	thread_exit(exitcode);	/* see #define thread_exit */
73}
74
75/* --------------------------------------------------------------------
76 * sleep for a given time or until the wakup semaphore is tickled.
77 */
78int
79worker_sleep(
80	blocking_child *	c,
81	time_t			seconds
82	)
83{
84	struct timespec	until;
85	int		rc;
86
87# ifdef HAVE_CLOCK_GETTIME
88	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
89		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
90		return -1;
91	}
92# else
93	if (0 != getclock(TIMEOFDAY, &until)) {
94		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
95		return -1;
96	}
97# endif
98	until.tv_sec += seconds;
99	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
100	if (0 == rc)
101		return -1;
102	if (-1 == rc && ETIMEDOUT == errno)
103		return 0;
104	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
105	return -1;
106}
107
108
109/* --------------------------------------------------------------------
110 * Wake up a worker that takes a nap.
111 */
112void
113interrupt_worker_sleep(void)
114{
115	u_int			idx;
116	blocking_child *	c;
117
118	for (idx = 0; idx < blocking_children_alloc; idx++) {
119		c = blocking_children[idx];
120		if (NULL == c || NULL == c->wake_scheduled_sleep)
121			continue;
122		tickle_sem(c->wake_scheduled_sleep);
123	}
124}
125
126/* --------------------------------------------------------------------
127 * Make sure there is an empty slot at the head of the request
128 * queue. Tell if the queue is currently empty.
129 */
130static int
131ensure_workitems_empty_slot(
132	blocking_child *c
133	)
134{
135	/*
136	** !!! PRECONDITION: caller holds access lock!
137	**
138	** This simply tries to increase the size of the buffer if it
139	** becomes full. The resize operation does *not* maintain the
140	** order of requests, but that should be irrelevant since the
141	** processing is considered asynchronous anyway.
142	**
143	** Return if the buffer is currently empty.
144	*/
145
146	static const size_t each =
147	    sizeof(blocking_children[0]->workitems[0]);
148
149	size_t	new_alloc;
150	size_t  slots_used;
151
152	slots_used = c->head_workitem - c->tail_workitem;
153	if (slots_used >= c->workitems_alloc) {
154		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
155		c->workitems = erealloc(c->workitems, new_alloc * each);
156		c->tail_workitem   = 0;
157		c->head_workitem   = c->workitems_alloc;
158		c->workitems_alloc = new_alloc;
159	}
160	return (0 == slots_used);
161}
162
163/* --------------------------------------------------------------------
164 * Make sure there is an empty slot at the head of the response
165 * queue. Tell if the queue is currently empty.
166 */
167static int
168ensure_workresp_empty_slot(
169	blocking_child *c
170	)
171{
172	/*
173	** !!! PRECONDITION: caller holds access lock!
174	**
175	** Works like the companion function above.
176	*/
177
178	static const size_t each =
179	    sizeof(blocking_children[0]->responses[0]);
180
181	size_t	new_alloc;
182	size_t  slots_used;
183
184	slots_used = c->head_response - c->tail_response;
185	if (slots_used >= c->responses_alloc) {
186		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
187		c->responses = erealloc(c->responses, new_alloc * each);
188		c->tail_response   = 0;
189		c->head_response   = c->responses_alloc;
190		c->responses_alloc = new_alloc;
191	}
192	return (0 == slots_used);
193}
194
195
196/* --------------------------------------------------------------------
197 * queue_req_pointer() - append a work item or idle exit request to
198 *			 blocking_workitems[]. Employ proper locking.
199 */
200static int
201queue_req_pointer(
202	blocking_child	*	c,
203	blocking_pipe_header *	hdr
204	)
205{
206	size_t qhead;
207
208	/* >>>> ACCESS LOCKING STARTS >>>> */
209	wait_for_sem(c->accesslock, NULL);
210	ensure_workitems_empty_slot(c);
211	qhead = c->head_workitem;
212	c->workitems[qhead % c->workitems_alloc] = hdr;
213	c->head_workitem = 1 + qhead;
214	tickle_sem(c->accesslock);
215	/* <<<< ACCESS LOCKING ENDS <<<< */
216
217	/* queue consumer wake-up notification */
218	tickle_sem(c->workitems_pending);
219
220	return 0;
221}
222
223/* --------------------------------------------------------------------
224 * API function to make sure a worker is running, a proper private copy
225 * of the data is made, the data eneterd into the queue and the worker
226 * is signalled.
227 */
228int
229send_blocking_req_internal(
230	blocking_child *	c,
231	blocking_pipe_header *	hdr,
232	void *			data
233	)
234{
235	blocking_pipe_header *	threadcopy;
236	size_t			payload_octets;
237
238	REQUIRE(hdr != NULL);
239	REQUIRE(data != NULL);
240	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
241
242	if (hdr->octets <= sizeof(*hdr))
243		return 1;	/* failure */
244	payload_octets = hdr->octets - sizeof(*hdr);
245
246	if (NULL == c->thread_ref)
247		start_blocking_thread(c);
248	threadcopy = emalloc(hdr->octets);
249	memcpy(threadcopy, hdr, sizeof(*hdr));
250	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
251
252	return queue_req_pointer(c, threadcopy);
253}
254
255/* --------------------------------------------------------------------
256 * Wait for the 'incoming queue no longer empty' signal, lock the shared
257 * structure and dequeue an item.
258 */
259blocking_pipe_header *
260receive_blocking_req_internal(
261	blocking_child *	c
262	)
263{
264	blocking_pipe_header *	req;
265	size_t			qhead, qtail;
266
267	req = NULL;
268	do {
269		/* wait for tickle from the producer side */
270		wait_for_sem(c->workitems_pending, NULL);
271
272		/* >>>> ACCESS LOCKING STARTS >>>> */
273		wait_for_sem(c->accesslock, NULL);
274		qhead = c->head_workitem;
275		do {
276			qtail = c->tail_workitem;
277			if (qhead == qtail)
278				break;
279			c->tail_workitem = qtail + 1;
280			qtail %= c->workitems_alloc;
281			req = c->workitems[qtail];
282			c->workitems[qtail] = NULL;
283		} while (NULL == req);
284		tickle_sem(c->accesslock);
285		/* <<<< ACCESS LOCKING ENDS <<<< */
286
287	} while (NULL == req);
288
289	INSIST(NULL != req);
290	if (CHILD_EXIT_REQ == req) {	/* idled out */
291		send_blocking_resp_internal(c, CHILD_GONE_RESP);
292		req = NULL;
293	}
294
295	return req;
296}
297
298/* --------------------------------------------------------------------
299 * Push a response into the return queue and eventually tickle the
300 * receiver.
301 */
302int
303send_blocking_resp_internal(
304	blocking_child *	c,
305	blocking_pipe_header *	resp
306	)
307{
308	size_t	qhead;
309	int	empty;
310
311	/* >>>> ACCESS LOCKING STARTS >>>> */
312	wait_for_sem(c->accesslock, NULL);
313	empty = ensure_workresp_empty_slot(c);
314	qhead = c->head_response;
315	c->responses[qhead % c->responses_alloc] = resp;
316	c->head_response = 1 + qhead;
317	tickle_sem(c->accesslock);
318	/* <<<< ACCESS LOCKING ENDS <<<< */
319
320	/* queue consumer wake-up notification */
321	if (empty)
322	{
323#	    ifdef WORK_PIPE
324		write(c->resp_write_pipe, "", 1);
325#	    else
326		tickle_sem(c->responses_pending);
327#	    endif
328	}
329	return 0;
330}
331
332
333#ifndef WORK_PIPE
334
335/* --------------------------------------------------------------------
336 * Check if a (Windows-)hanndle to a semaphore is actually the same we
337 * are using inside the sema wrapper.
338 */
339static BOOL
340same_os_sema(
341	const sem_ref	obj,
342	void*		osh
343	)
344{
345	return obj && osh && (obj->shnd == (HANDLE)osh);
346}
347
348/* --------------------------------------------------------------------
349 * Find the shared context that associates to an OS handle and make sure
350 * the data is dequeued and processed.
351 */
352void
353handle_blocking_resp_sem(
354	void *	context
355	)
356{
357	blocking_child *	c;
358	u_int			idx;
359
360	c = NULL;
361	for (idx = 0; idx < blocking_children_alloc; idx++) {
362		c = blocking_children[idx];
363		if (c != NULL &&
364			c->thread_ref != NULL &&
365			same_os_sema(c->responses_pending, context))
366			break;
367	}
368	if (idx < blocking_children_alloc)
369		process_blocking_resp(c);
370}
371#endif	/* !WORK_PIPE */
372
373/* --------------------------------------------------------------------
374 * Fetch the next response from the return queue. In case of signalling
375 * via pipe, make sure the pipe is flushed, too.
376 */
377blocking_pipe_header *
378receive_blocking_resp_internal(
379	blocking_child *	c
380	)
381{
382	blocking_pipe_header *	removed;
383	size_t			qhead, qtail, slot;
384
385#ifdef WORK_PIPE
386	int			rc;
387	char			scratch[32];
388
389	do
390		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
391	while (-1 == rc && EINTR == errno);
392#endif
393
394	/* >>>> ACCESS LOCKING STARTS >>>> */
395	wait_for_sem(c->accesslock, NULL);
396	qhead = c->head_response;
397	qtail = c->tail_response;
398	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
399		slot = qtail % c->responses_alloc;
400		removed = c->responses[slot];
401		c->responses[slot] = NULL;
402	}
403	c->tail_response = qtail;
404	tickle_sem(c->accesslock);
405	/* <<<< ACCESS LOCKING ENDS <<<< */
406
407	if (NULL != removed) {
408		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
409			     BLOCKING_RESP_MAGIC == removed->magic_sig);
410	}
411	if (CHILD_GONE_RESP == removed) {
412		cleanup_after_child(c);
413		removed = NULL;
414	}
415
416	return removed;
417}
418
419/* --------------------------------------------------------------------
420 * Light up a new worker.
421 */
422static void
423start_blocking_thread(
424	blocking_child *	c
425	)
426{
427
428	DEBUG_INSIST(!c->reusable);
429
430	prepare_child_sems(c);
431	start_blocking_thread_internal(c);
432}
433
434/* --------------------------------------------------------------------
435 * Create a worker thread. There are several differences between POSIX
436 * and Windows, of course -- most notably the Windows thread is no
437 * detached thread, and we keep the handle around until we want to get
438 * rid of the thread. The notification scheme also differs: Windows
439 * makes use of semaphores in both directions, POSIX uses a pipe for
440 * integration with 'select()' or alike.
441 */
442static void
443start_blocking_thread_internal(
444	blocking_child *	c
445	)
446#ifdef SYS_WINNT
447{
448	BOOL	resumed;
449
450	c->thread_ref = NULL;
451	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
452	c->thr_table[0].thnd =
453		(HANDLE)_beginthreadex(
454			NULL,
455			0,
456			&blocking_thread,
457			c,
458			CREATE_SUSPENDED,
459			NULL);
460
461	if (NULL == c->thr_table[0].thnd) {
462		msyslog(LOG_ERR, "start blocking thread failed: %m");
463		exit(-1);
464	}
465	/* remember the thread priority is only within the process class */
466	if (!SetThreadPriority(c->thr_table[0].thnd,
467			       THREAD_PRIORITY_BELOW_NORMAL))
468		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
469
470	resumed = ResumeThread(c->thr_table[0].thnd);
471	DEBUG_INSIST(resumed);
472	c->thread_ref = &c->thr_table[0];
473}
474#else	/* pthreads start_blocking_thread_internal() follows */
475{
476# ifdef NEED_PTHREAD_INIT
477	static int	pthread_init_called;
478# endif
479	pthread_attr_t	thr_attr;
480	int		rc;
481	int		saved_errno;
482	int		pipe_ends[2];	/* read then write */
483	int		is_pipe;
484	int		flags;
485	size_t		stacksize;
486	sigset_t	saved_sig_mask;
487
488	c->thread_ref = NULL;
489
490# ifdef NEED_PTHREAD_INIT
491	/*
492	 * from lib/isc/unix/app.c:
493	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
494	 */
495	if (!pthread_init_called) {
496		pthread_init();
497		pthread_init_called = TRUE;
498	}
499# endif
500
501	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
502	if (0 != rc) {
503		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
504		exit(1);
505	}
506	c->resp_read_pipe = move_fd(pipe_ends[0]);
507	c->resp_write_pipe = move_fd(pipe_ends[1]);
508	c->ispipe = is_pipe;
509	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
510	if (-1 == flags) {
511		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
512		exit(1);
513	}
514	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
515	if (-1 == rc) {
516		msyslog(LOG_ERR,
517			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
518		exit(1);
519	}
520	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
521	pthread_attr_init(&thr_attr);
522	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
523#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
524    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
525	rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
526	if (-1 == rc) {
527		msyslog(LOG_ERR,
528			"start_blocking_thread: pthread_attr_getstacksize %m");
529	} else if (stacksize < THREAD_MINSTACKSIZE) {
530		rc = pthread_attr_setstacksize(&thr_attr,
531					       THREAD_MINSTACKSIZE);
532		if (-1 == rc)
533			msyslog(LOG_ERR,
534				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
535				(u_long)stacksize,
536				(u_long)THREAD_MINSTACKSIZE);
537	}
538#else
539	UNUSED_ARG(stacksize);
540#endif
541#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
542	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
543#endif
544	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
545	block_thread_signals(&saved_sig_mask);
546	rc = pthread_create(&c->thr_table[0], &thr_attr,
547			    &blocking_thread, c);
548	saved_errno = errno;
549	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
550	pthread_attr_destroy(&thr_attr);
551	if (0 != rc) {
552		errno = saved_errno;
553		msyslog(LOG_ERR, "pthread_create() blocking child: %m");
554		exit(1);
555	}
556	c->thread_ref = &c->thr_table[0];
557}
558#endif
559
560/* --------------------------------------------------------------------
561 * block_thread_signals()
562 *
563 * Temporarily block signals used by ntpd main thread, so that signal
564 * mask inherited by child threads leaves them blocked.  Returns prior
565 * active signal mask via pmask, to be restored by the main thread
566 * after pthread_create().
567 */
568#ifndef SYS_WINNT
569void
570block_thread_signals(
571	sigset_t *	pmask
572	)
573{
574	sigset_t	block;
575
576	sigemptyset(&block);
577# ifdef HAVE_SIGNALED_IO
578#  ifdef SIGIO
579	sigaddset(&block, SIGIO);
580#  endif
581#  ifdef SIGPOLL
582	sigaddset(&block, SIGPOLL);
583#  endif
584# endif	/* HAVE_SIGNALED_IO */
585	sigaddset(&block, SIGALRM);
586	sigaddset(&block, MOREDEBUGSIG);
587	sigaddset(&block, LESSDEBUGSIG);
588# ifdef SIGDIE1
589	sigaddset(&block, SIGDIE1);
590# endif
591# ifdef SIGDIE2
592	sigaddset(&block, SIGDIE2);
593# endif
594# ifdef SIGDIE3
595	sigaddset(&block, SIGDIE3);
596# endif
597# ifdef SIGDIE4
598	sigaddset(&block, SIGDIE4);
599# endif
600# ifdef SIGBUS
601	sigaddset(&block, SIGBUS);
602# endif
603	sigemptyset(pmask);
604	pthread_sigmask(SIG_BLOCK, &block, pmask);
605}
606#endif	/* !SYS_WINNT */
607
608
609/* --------------------------------------------------------------------
610 * Create & destroy semaphores. This is sufficiently different between
611 * POSIX and Windows to warrant wrapper functions and close enough to
612 * use the concept of synchronization via semaphore for all platforms.
613 */
614static sem_ref
615create_sema(
616	sema_type*	semptr,
617	u_int		inival,
618	u_int		maxval)
619{
620#ifdef SYS_WINNT
621
622	long svini, svmax;
623	if (NULL != semptr) {
624		svini = (inival < LONG_MAX)
625		    ? (long)inival : LONG_MAX;
626		svmax = (maxval < LONG_MAX && maxval > 0)
627		    ? (long)maxval : LONG_MAX;
628		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
629		if (NULL == semptr->shnd)
630			semptr = NULL;
631	}
632
633#else
634
635	(void)maxval;
636	if (semptr && sem_init(semptr, FALSE, inival))
637		semptr = NULL;
638
639#endif
640
641	return semptr;
642}
643
644/* ------------------------------------------------------------------ */
645static sem_ref
646delete_sema(
647	sem_ref obj)
648{
649
650#   ifdef SYS_WINNT
651
652	if (obj) {
653		if (obj->shnd)
654			CloseHandle(obj->shnd);
655		obj->shnd = NULL;
656	}
657
658#   else
659
660	if (obj)
661		sem_destroy(obj);
662
663#   endif
664
665	return NULL;
666}
667
668/* --------------------------------------------------------------------
669 * prepare_child_sems()
670 *
671 * create sync & access semaphores
672 *
673 * All semaphores are cleared, only the access semaphore has 1 unit.
674 * Childs wait on 'workitems_pending', then grabs 'sema_access'
675 * and dequeues jobs. When done, 'sema_access' is given one unit back.
676 *
677 * The producer grabs 'sema_access', manages the queue, restores
678 * 'sema_access' and puts one unit into 'workitems_pending'.
679 *
680 * The story goes the same for the response queue.
681 */
682static void
683prepare_child_sems(
684	blocking_child *c
685	)
686{
687	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
688	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
689	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
690#   ifndef WORK_PIPE
691	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
692#   endif
693}
694
695/* --------------------------------------------------------------------
696 * wait for semaphore. Where the wait can be interrupted, it will
697 * internally resume -- When this function returns, there is either no
698 * semaphore at all, a timeout occurred, or the caller could
699 * successfully take a token from the semaphore.
700 *
701 * For untimed wait, not checking the result of this function at all is
702 * definitely an option.
703 */
704static int
705wait_for_sem(
706	sem_ref			sem,
707	struct timespec *	timeout		/* wall-clock */
708	)
709#ifdef SYS_WINNT
710{
711	struct timespec now;
712	struct timespec delta;
713	DWORD		msec;
714	DWORD		rc;
715
716	if (!(sem && sem->shnd)) {
717		errno = EINVAL;
718		return -1;
719	}
720
721	if (NULL == timeout) {
722		msec = INFINITE;
723	} else {
724		getclock(TIMEOFDAY, &now);
725		delta = sub_tspec(*timeout, now);
726		if (delta.tv_sec < 0) {
727			msec = 0;
728		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
729			msec = INFINITE;
730		} else {
731			msec = 1000 * (DWORD)delta.tv_sec;
732			msec += delta.tv_nsec / (1000 * 1000);
733		}
734	}
735	rc = WaitForSingleObject(sem->shnd, msec);
736	if (WAIT_OBJECT_0 == rc)
737		return 0;
738	if (WAIT_TIMEOUT == rc) {
739		errno = ETIMEDOUT;
740		return -1;
741	}
742	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
743	errno = EFAULT;
744	return -1;
745}
746#else	/* pthreads wait_for_sem() follows */
747{
748	int rc = -1;
749
750	if (sem) do {
751			if (NULL == timeout)
752				rc = sem_wait(sem);
753			else
754				rc = sem_timedwait(sem, timeout);
755		} while (rc == -1 && errno == EINTR);
756	else
757		errno = EINVAL;
758
759	return rc;
760}
761#endif
762
763/* --------------------------------------------------------------------
764 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
765 * calling conventions under Windows and POSIX-defined signature
766 * otherwise.
767 */
768#ifdef SYS_WINNT
769u_int WINAPI
770#else
771void *
772#endif
773blocking_thread(
774	void *	ThreadArg
775	)
776{
777	blocking_child *c;
778
779	c = ThreadArg;
780	exit_worker(blocking_child_common(c));
781
782	/* NOTREACHED */
783	return 0;
784}
785
786/* --------------------------------------------------------------------
787 * req_child_exit() runs in the parent.
788 *
789 * This function is called from from the idle timer, too, and possibly
790 * without a thread being there any longer. Since we have folded up our
791 * tent in that case and all the semaphores are already gone, we simply
792 * ignore this request in this case.
793 *
794 * Since the existence of the semaphores is controlled exclusively by
795 * the parent, there's no risk of data race here.
796 */
797int
798req_child_exit(
799	blocking_child *c
800	)
801{
802	return (c->accesslock)
803	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
804	    : 0;
805}
806
807/* --------------------------------------------------------------------
808 * cleanup_after_child() runs in parent.
809 */
810static void
811cleanup_after_child(
812	blocking_child *	c
813	)
814{
815	DEBUG_INSIST(!c->reusable);
816
817#   ifdef SYS_WINNT
818	/* The thread was not created in detached state, so we better
819	 * clean up.
820	 */
821	if (c->thread_ref && c->thread_ref->thnd) {
822		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
823		INSIST(CloseHandle(c->thread_ref->thnd));
824		c->thread_ref->thnd = NULL;
825	}
826#   endif
827	c->thread_ref = NULL;
828
829	/* remove semaphores and (if signalling vi IO) pipes */
830
831	c->accesslock           = delete_sema(c->accesslock);
832	c->workitems_pending    = delete_sema(c->workitems_pending);
833	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
834
835#   ifdef WORK_PIPE
836	DEBUG_INSIST(-1 != c->resp_read_pipe);
837	DEBUG_INSIST(-1 != c->resp_write_pipe);
838	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
839	close(c->resp_write_pipe);
840	close(c->resp_read_pipe);
841	c->resp_write_pipe = -1;
842	c->resp_read_pipe = -1;
843#   else
844	DEBUG_INSIST(NULL != c->responses_pending);
845	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
846	c->responses_pending = delete_sema(c->responses_pending);
847#   endif
848
849	/* Is it necessary to check if there are pending requests and
850	 * responses? If so, and if there are, what to do with them?
851	 */
852
853	/* re-init buffer index sequencers */
854	c->head_workitem = 0;
855	c->tail_workitem = 0;
856	c->head_response = 0;
857	c->tail_response = 0;
858
859	c->reusable = TRUE;
860}
861
862
863#else	/* !WORK_THREAD follows */
864char work_thread_nonempty_compilation_unit;
865#endif
866