work_thread.c revision 1.4
1/*	$NetBSD: work_thread.c,v 1.4 2016/01/08 21:35:39 christos Exp $	*/
2
3/*
4 * work_thread.c - threads implementation for blocking worker child.
5 */
6#include <config.h>
7#include "ntp_workimpl.h"
8
9#ifdef WORK_THREAD
10
11#include <stdio.h>
12#include <ctype.h>
13#include <signal.h>
14#ifndef SYS_WINNT
15#include <pthread.h>
16#endif
17
18#include "ntp_stdlib.h"
19#include "ntp_malloc.h"
20#include "ntp_syslog.h"
21#include "ntpd.h"
22#include "ntp_io.h"
23#include "ntp_assert.h"
24#include "ntp_unixtime.h"
25#include "timespecops.h"
26#include "ntp_worker.h"
27
28#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
29#define CHILD_GONE_RESP	CHILD_EXIT_REQ
30#define WORKITEMS_ALLOC_INC	16
31#define RESPONSES_ALLOC_INC	4
32
33#ifndef THREAD_MINSTACKSIZE
34#define THREAD_MINSTACKSIZE	(64U * 1024)
35#endif
36
37#ifdef SYS_WINNT
38
39# define thread_exit(c)	_endthreadex(c)
40# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
41u_int	WINAPI	blocking_thread(void *);
42static BOOL	same_os_sema(const sem_ref obj, void * osobj);
43
44#else
45
46# define thread_exit(c)	pthread_exit((void*)(size_t)(c))
47# define tickle_sem	sem_post
48void *		blocking_thread(void *);
49static	void	block_thread_signals(sigset_t *);
50
51#endif
52
53#ifdef WORK_PIPE
54addremove_io_fd_func		addremove_io_fd;
55#else
56addremove_io_semaphore_func	addremove_io_semaphore;
57#endif
58
59static	void	start_blocking_thread(blocking_child *);
60static	void	start_blocking_thread_internal(blocking_child *);
61static	void	prepare_child_sems(blocking_child *);
62static	int	wait_for_sem(sem_ref, struct timespec *);
63static	int	ensure_workitems_empty_slot(blocking_child *);
64static	int	ensure_workresp_empty_slot(blocking_child *);
65static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
66static	void	cleanup_after_child(blocking_child *);
67
68
69void
70exit_worker(
71	int	exitcode
72	)
73{
74	thread_exit(exitcode);	/* see #define thread_exit */
75}
76
77/* --------------------------------------------------------------------
78 * sleep for a given time or until the wakup semaphore is tickled.
79 */
80int
81worker_sleep(
82	blocking_child *	c,
83	time_t			seconds
84	)
85{
86	struct timespec	until;
87	int		rc;
88
89# ifdef HAVE_CLOCK_GETTIME
90	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
91		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
92		return -1;
93	}
94# else
95	if (0 != getclock(TIMEOFDAY, &until)) {
96		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
97		return -1;
98	}
99# endif
100	until.tv_sec += seconds;
101	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
102	if (0 == rc)
103		return -1;
104	if (-1 == rc && ETIMEDOUT == errno)
105		return 0;
106	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
107	return -1;
108}
109
110
111/* --------------------------------------------------------------------
112 * Wake up a worker that takes a nap.
113 */
114void
115interrupt_worker_sleep(void)
116{
117	u_int			idx;
118	blocking_child *	c;
119
120	for (idx = 0; idx < blocking_children_alloc; idx++) {
121		c = blocking_children[idx];
122		if (NULL == c || NULL == c->wake_scheduled_sleep)
123			continue;
124		tickle_sem(c->wake_scheduled_sleep);
125	}
126}
127
128/* --------------------------------------------------------------------
129 * Make sure there is an empty slot at the head of the request
130 * queue. Tell if the queue is currently empty.
131 */
132static int
133ensure_workitems_empty_slot(
134	blocking_child *c
135	)
136{
137	/*
138	** !!! PRECONDITION: caller holds access lock!
139	**
140	** This simply tries to increase the size of the buffer if it
141	** becomes full. The resize operation does *not* maintain the
142	** order of requests, but that should be irrelevant since the
143	** processing is considered asynchronous anyway.
144	**
145	** Return if the buffer is currently empty.
146	*/
147
148	static const size_t each =
149	    sizeof(blocking_children[0]->workitems[0]);
150
151	size_t	new_alloc;
152	size_t  slots_used;
153
154	slots_used = c->head_workitem - c->tail_workitem;
155	if (slots_used >= c->workitems_alloc) {
156		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
157		c->workitems = erealloc(c->workitems, new_alloc * each);
158		c->tail_workitem   = 0;
159		c->head_workitem   = c->workitems_alloc;
160		c->workitems_alloc = new_alloc;
161	}
162	return (0 == slots_used);
163}
164
165/* --------------------------------------------------------------------
166 * Make sure there is an empty slot at the head of the response
167 * queue. Tell if the queue is currently empty.
168 */
169static int
170ensure_workresp_empty_slot(
171	blocking_child *c
172	)
173{
174	/*
175	** !!! PRECONDITION: caller holds access lock!
176	**
177	** Works like the companion function above.
178	*/
179
180	static const size_t each =
181	    sizeof(blocking_children[0]->responses[0]);
182
183	size_t	new_alloc;
184	size_t  slots_used;
185
186	slots_used = c->head_response - c->tail_response;
187	if (slots_used >= c->responses_alloc) {
188		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
189		c->responses = erealloc(c->responses, new_alloc * each);
190		c->tail_response   = 0;
191		c->head_response   = c->responses_alloc;
192		c->responses_alloc = new_alloc;
193	}
194	return (0 == slots_used);
195}
196
197
198/* --------------------------------------------------------------------
199 * queue_req_pointer() - append a work item or idle exit request to
200 *			 blocking_workitems[]. Employ proper locking.
201 */
202static int
203queue_req_pointer(
204	blocking_child	*	c,
205	blocking_pipe_header *	hdr
206	)
207{
208	size_t qhead;
209
210	/* >>>> ACCESS LOCKING STARTS >>>> */
211	wait_for_sem(c->accesslock, NULL);
212	ensure_workitems_empty_slot(c);
213	qhead = c->head_workitem;
214	c->workitems[qhead % c->workitems_alloc] = hdr;
215	c->head_workitem = 1 + qhead;
216	tickle_sem(c->accesslock);
217	/* <<<< ACCESS LOCKING ENDS <<<< */
218
219	/* queue consumer wake-up notification */
220	tickle_sem(c->workitems_pending);
221
222	return 0;
223}
224
225/* --------------------------------------------------------------------
226 * API function to make sure a worker is running, a proper private copy
227 * of the data is made, the data eneterd into the queue and the worker
228 * is signalled.
229 */
230int
231send_blocking_req_internal(
232	blocking_child *	c,
233	blocking_pipe_header *	hdr,
234	void *			data
235	)
236{
237	blocking_pipe_header *	threadcopy;
238	size_t			payload_octets;
239
240	REQUIRE(hdr != NULL);
241	REQUIRE(data != NULL);
242	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
243
244	if (hdr->octets <= sizeof(*hdr))
245		return 1;	/* failure */
246	payload_octets = hdr->octets - sizeof(*hdr);
247
248	if (NULL == c->thread_ref)
249		start_blocking_thread(c);
250	threadcopy = emalloc(hdr->octets);
251	memcpy(threadcopy, hdr, sizeof(*hdr));
252	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
253
254	return queue_req_pointer(c, threadcopy);
255}
256
257/* --------------------------------------------------------------------
258 * Wait for the 'incoming queue no longer empty' signal, lock the shared
259 * structure and dequeue an item.
260 */
261blocking_pipe_header *
262receive_blocking_req_internal(
263	blocking_child *	c
264	)
265{
266	blocking_pipe_header *	req;
267	size_t			qhead, qtail;
268
269	req = NULL;
270	do {
271		/* wait for tickle from the producer side */
272		wait_for_sem(c->workitems_pending, NULL);
273
274		/* >>>> ACCESS LOCKING STARTS >>>> */
275		wait_for_sem(c->accesslock, NULL);
276		qhead = c->head_workitem;
277		do {
278			qtail = c->tail_workitem;
279			if (qhead == qtail)
280				break;
281			c->tail_workitem = qtail + 1;
282			qtail %= c->workitems_alloc;
283			req = c->workitems[qtail];
284			c->workitems[qtail] = NULL;
285		} while (NULL == req);
286		tickle_sem(c->accesslock);
287		/* <<<< ACCESS LOCKING ENDS <<<< */
288
289	} while (NULL == req);
290
291	INSIST(NULL != req);
292	if (CHILD_EXIT_REQ == req) {	/* idled out */
293		send_blocking_resp_internal(c, CHILD_GONE_RESP);
294		req = NULL;
295	}
296
297	return req;
298}
299
300/* --------------------------------------------------------------------
301 * Push a response into the return queue and eventually tickle the
302 * receiver.
303 */
304int
305send_blocking_resp_internal(
306	blocking_child *	c,
307	blocking_pipe_header *	resp
308	)
309{
310	size_t	qhead;
311	int	empty;
312
313	/* >>>> ACCESS LOCKING STARTS >>>> */
314	wait_for_sem(c->accesslock, NULL);
315	empty = ensure_workresp_empty_slot(c);
316	qhead = c->head_response;
317	c->responses[qhead % c->responses_alloc] = resp;
318	c->head_response = 1 + qhead;
319	tickle_sem(c->accesslock);
320	/* <<<< ACCESS LOCKING ENDS <<<< */
321
322	/* queue consumer wake-up notification */
323	if (empty)
324	{
325#	    ifdef WORK_PIPE
326		write(c->resp_write_pipe, "", 1);
327#	    else
328		tickle_sem(c->responses_pending);
329#	    endif
330	}
331	return 0;
332}
333
334
335#ifndef WORK_PIPE
336
337/* --------------------------------------------------------------------
338 * Check if a (Windows-)hanndle to a semaphore is actually the same we
339 * are using inside the sema wrapper.
340 */
341static BOOL
342same_os_sema(
343	const sem_ref	obj,
344	void*		osh
345	)
346{
347	return obj && osh && (obj->shnd == (HANDLE)osh);
348}
349
350/* --------------------------------------------------------------------
351 * Find the shared context that associates to an OS handle and make sure
352 * the data is dequeued and processed.
353 */
354void
355handle_blocking_resp_sem(
356	void *	context
357	)
358{
359	blocking_child *	c;
360	u_int			idx;
361
362	c = NULL;
363	for (idx = 0; idx < blocking_children_alloc; idx++) {
364		c = blocking_children[idx];
365		if (c != NULL &&
366			c->thread_ref != NULL &&
367			same_os_sema(c->responses_pending, context))
368			break;
369	}
370	if (idx < blocking_children_alloc)
371		process_blocking_resp(c);
372}
373#endif	/* !WORK_PIPE */
374
375/* --------------------------------------------------------------------
376 * Fetch the next response from the return queue. In case of signalling
377 * via pipe, make sure the pipe is flushed, too.
378 */
379blocking_pipe_header *
380receive_blocking_resp_internal(
381	blocking_child *	c
382	)
383{
384	blocking_pipe_header *	removed;
385	size_t			qhead, qtail, slot;
386
387#ifdef WORK_PIPE
388	int			rc;
389	char			scratch[32];
390
391	do
392		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
393	while (-1 == rc && EINTR == errno);
394#endif
395
396	/* >>>> ACCESS LOCKING STARTS >>>> */
397	wait_for_sem(c->accesslock, NULL);
398	qhead = c->head_response;
399	qtail = c->tail_response;
400	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
401		slot = qtail % c->responses_alloc;
402		removed = c->responses[slot];
403		c->responses[slot] = NULL;
404	}
405	c->tail_response = qtail;
406	tickle_sem(c->accesslock);
407	/* <<<< ACCESS LOCKING ENDS <<<< */
408
409	if (NULL != removed) {
410		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
411			     BLOCKING_RESP_MAGIC == removed->magic_sig);
412	}
413	if (CHILD_GONE_RESP == removed) {
414		cleanup_after_child(c);
415		removed = NULL;
416	}
417
418	return removed;
419}
420
421/* --------------------------------------------------------------------
422 * Light up a new worker.
423 */
424static void
425start_blocking_thread(
426	blocking_child *	c
427	)
428{
429
430	DEBUG_INSIST(!c->reusable);
431
432	prepare_child_sems(c);
433	start_blocking_thread_internal(c);
434}
435
436/* --------------------------------------------------------------------
437 * Create a worker thread. There are several differences between POSIX
438 * and Windows, of course -- most notably the Windows thread is no
439 * detached thread, and we keep the handle around until we want to get
440 * rid of the thread. The notification scheme also differs: Windows
441 * makes use of semaphores in both directions, POSIX uses a pipe for
442 * integration with 'select()' or alike.
443 */
444static void
445start_blocking_thread_internal(
446	blocking_child *	c
447	)
448#ifdef SYS_WINNT
449{
450	BOOL	resumed;
451
452	c->thread_ref = NULL;
453	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
454	c->thr_table[0].thnd =
455		(HANDLE)_beginthreadex(
456			NULL,
457			0,
458			&blocking_thread,
459			c,
460			CREATE_SUSPENDED,
461			NULL);
462
463	if (NULL == c->thr_table[0].thnd) {
464		msyslog(LOG_ERR, "start blocking thread failed: %m");
465		exit(-1);
466	}
467	/* remember the thread priority is only within the process class */
468	if (!SetThreadPriority(c->thr_table[0].thnd,
469			       THREAD_PRIORITY_BELOW_NORMAL))
470		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
471
472	resumed = ResumeThread(c->thr_table[0].thnd);
473	DEBUG_INSIST(resumed);
474	c->thread_ref = &c->thr_table[0];
475}
476#else	/* pthreads start_blocking_thread_internal() follows */
477{
478# ifdef NEED_PTHREAD_INIT
479	static int	pthread_init_called;
480# endif
481	pthread_attr_t	thr_attr;
482	int		rc;
483	int		saved_errno;
484	int		pipe_ends[2];	/* read then write */
485	int		is_pipe;
486	int		flags;
487	size_t		stacksize;
488	sigset_t	saved_sig_mask;
489
490	c->thread_ref = NULL;
491
492# ifdef NEED_PTHREAD_INIT
493	/*
494	 * from lib/isc/unix/app.c:
495	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
496	 */
497	if (!pthread_init_called) {
498		pthread_init();
499		pthread_init_called = TRUE;
500	}
501# endif
502
503	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
504	if (0 != rc) {
505		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
506		exit(1);
507	}
508	c->resp_read_pipe = move_fd(pipe_ends[0]);
509	c->resp_write_pipe = move_fd(pipe_ends[1]);
510	c->ispipe = is_pipe;
511	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
512	if (-1 == flags) {
513		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
514		exit(1);
515	}
516	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
517	if (-1 == rc) {
518		msyslog(LOG_ERR,
519			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
520		exit(1);
521	}
522	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
523	pthread_attr_init(&thr_attr);
524	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
525#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
526    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
527	rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
528	if (-1 == rc) {
529		msyslog(LOG_ERR,
530			"start_blocking_thread: pthread_attr_getstacksize %m");
531	} else if (stacksize < THREAD_MINSTACKSIZE) {
532		rc = pthread_attr_setstacksize(&thr_attr,
533					       THREAD_MINSTACKSIZE);
534		if (-1 == rc)
535			msyslog(LOG_ERR,
536				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
537				(u_long)stacksize,
538				(u_long)THREAD_MINSTACKSIZE);
539	}
540#else
541	UNUSED_ARG(stacksize);
542#endif
543#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
544	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
545#endif
546	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
547	block_thread_signals(&saved_sig_mask);
548	rc = pthread_create(&c->thr_table[0], &thr_attr,
549			    &blocking_thread, c);
550	saved_errno = errno;
551	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
552	pthread_attr_destroy(&thr_attr);
553	if (0 != rc) {
554		errno = saved_errno;
555		msyslog(LOG_ERR, "pthread_create() blocking child: %m");
556		exit(1);
557	}
558	c->thread_ref = &c->thr_table[0];
559}
560#endif
561
562/* --------------------------------------------------------------------
563 * block_thread_signals()
564 *
565 * Temporarily block signals used by ntpd main thread, so that signal
566 * mask inherited by child threads leaves them blocked.  Returns prior
567 * active signal mask via pmask, to be restored by the main thread
568 * after pthread_create().
569 */
570#ifndef SYS_WINNT
571void
572block_thread_signals(
573	sigset_t *	pmask
574	)
575{
576	sigset_t	block;
577
578	sigemptyset(&block);
579# ifdef HAVE_SIGNALED_IO
580#  ifdef SIGIO
581	sigaddset(&block, SIGIO);
582#  endif
583#  ifdef SIGPOLL
584	sigaddset(&block, SIGPOLL);
585#  endif
586# endif	/* HAVE_SIGNALED_IO */
587	sigaddset(&block, SIGALRM);
588	sigaddset(&block, MOREDEBUGSIG);
589	sigaddset(&block, LESSDEBUGSIG);
590# ifdef SIGDIE1
591	sigaddset(&block, SIGDIE1);
592# endif
593# ifdef SIGDIE2
594	sigaddset(&block, SIGDIE2);
595# endif
596# ifdef SIGDIE3
597	sigaddset(&block, SIGDIE3);
598# endif
599# ifdef SIGDIE4
600	sigaddset(&block, SIGDIE4);
601# endif
602# ifdef SIGBUS
603	sigaddset(&block, SIGBUS);
604# endif
605	sigemptyset(pmask);
606	pthread_sigmask(SIG_BLOCK, &block, pmask);
607}
608#endif	/* !SYS_WINNT */
609
610
611/* --------------------------------------------------------------------
612 * Create & destroy semaphores. This is sufficiently different between
613 * POSIX and Windows to warrant wrapper functions and close enough to
614 * use the concept of synchronization via semaphore for all platforms.
615 */
616static sem_ref
617create_sema(
618	sema_type*	semptr,
619	u_int		inival,
620	u_int		maxval)
621{
622#ifdef SYS_WINNT
623
624	long svini, svmax;
625	if (NULL != semptr) {
626		svini = (inival < LONG_MAX)
627		    ? (long)inival : LONG_MAX;
628		svmax = (maxval < LONG_MAX && maxval > 0)
629		    ? (long)maxval : LONG_MAX;
630		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
631		if (NULL == semptr->shnd)
632			semptr = NULL;
633	}
634
635#else
636
637	(void)maxval;
638	if (semptr && sem_init(semptr, FALSE, inival))
639		semptr = NULL;
640
641#endif
642
643	return semptr;
644}
645
646/* ------------------------------------------------------------------ */
647static sem_ref
648delete_sema(
649	sem_ref obj)
650{
651
652#   ifdef SYS_WINNT
653
654	if (obj) {
655		if (obj->shnd)
656			CloseHandle(obj->shnd);
657		obj->shnd = NULL;
658	}
659
660#   else
661
662	if (obj)
663		sem_destroy(obj);
664
665#   endif
666
667	return NULL;
668}
669
670/* --------------------------------------------------------------------
671 * prepare_child_sems()
672 *
673 * create sync & access semaphores
674 *
675 * All semaphores are cleared, only the access semaphore has 1 unit.
676 * Childs wait on 'workitems_pending', then grabs 'sema_access'
677 * and dequeues jobs. When done, 'sema_access' is given one unit back.
678 *
679 * The producer grabs 'sema_access', manages the queue, restores
680 * 'sema_access' and puts one unit into 'workitems_pending'.
681 *
682 * The story goes the same for the response queue.
683 */
684static void
685prepare_child_sems(
686	blocking_child *c
687	)
688{
689	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
690	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
691	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
692#   ifndef WORK_PIPE
693	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
694#   endif
695}
696
697/* --------------------------------------------------------------------
698 * wait for semaphore. Where the wait can be interrupted, it will
699 * internally resume -- When this function returns, there is either no
700 * semaphore at all, a timeout occurred, or the caller could
701 * successfully take a token from the semaphore.
702 *
703 * For untimed wait, not checking the result of this function at all is
704 * definitely an option.
705 */
706static int
707wait_for_sem(
708	sem_ref			sem,
709	struct timespec *	timeout		/* wall-clock */
710	)
711#ifdef SYS_WINNT
712{
713	struct timespec now;
714	struct timespec delta;
715	DWORD		msec;
716	DWORD		rc;
717
718	if (!(sem && sem->shnd)) {
719		errno = EINVAL;
720		return -1;
721	}
722
723	if (NULL == timeout) {
724		msec = INFINITE;
725	} else {
726		getclock(TIMEOFDAY, &now);
727		delta = sub_tspec(*timeout, now);
728		if (delta.tv_sec < 0) {
729			msec = 0;
730		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
731			msec = INFINITE;
732		} else {
733			msec = 1000 * (DWORD)delta.tv_sec;
734			msec += delta.tv_nsec / (1000 * 1000);
735		}
736	}
737	rc = WaitForSingleObject(sem->shnd, msec);
738	if (WAIT_OBJECT_0 == rc)
739		return 0;
740	if (WAIT_TIMEOUT == rc) {
741		errno = ETIMEDOUT;
742		return -1;
743	}
744	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
745	errno = EFAULT;
746	return -1;
747}
748#else	/* pthreads wait_for_sem() follows */
749{
750	int rc = -1;
751
752	if (sem) do {
753			if (NULL == timeout)
754				rc = sem_wait(sem);
755			else
756				rc = sem_timedwait(sem, timeout);
757		} while (rc == -1 && errno == EINTR);
758	else
759		errno = EINVAL;
760
761	return rc;
762}
763#endif
764
765/* --------------------------------------------------------------------
766 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
767 * calling conventions under Windows and POSIX-defined signature
768 * otherwise.
769 */
770#ifdef SYS_WINNT
771u_int WINAPI
772#else
773void *
774#endif
775blocking_thread(
776	void *	ThreadArg
777	)
778{
779	blocking_child *c;
780
781	c = ThreadArg;
782	exit_worker(blocking_child_common(c));
783
784	/* NOTREACHED */
785	return 0;
786}
787
788/* --------------------------------------------------------------------
789 * req_child_exit() runs in the parent.
790 *
791 * This function is called from from the idle timer, too, and possibly
792 * without a thread being there any longer. Since we have folded up our
793 * tent in that case and all the semaphores are already gone, we simply
794 * ignore this request in this case.
795 *
796 * Since the existence of the semaphores is controlled exclusively by
797 * the parent, there's no risk of data race here.
798 */
799int
800req_child_exit(
801	blocking_child *c
802	)
803{
804	return (c->accesslock)
805	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
806	    : 0;
807}
808
809/* --------------------------------------------------------------------
810 * cleanup_after_child() runs in parent.
811 */
812static void
813cleanup_after_child(
814	blocking_child *	c
815	)
816{
817	DEBUG_INSIST(!c->reusable);
818
819#   ifdef SYS_WINNT
820	/* The thread was not created in detached state, so we better
821	 * clean up.
822	 */
823	if (c->thread_ref && c->thread_ref->thnd) {
824		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
825		INSIST(CloseHandle(c->thread_ref->thnd));
826		c->thread_ref->thnd = NULL;
827	}
828#   endif
829	c->thread_ref = NULL;
830
831	/* remove semaphores and (if signalling vi IO) pipes */
832
833	c->accesslock           = delete_sema(c->accesslock);
834	c->workitems_pending    = delete_sema(c->workitems_pending);
835	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
836
837#   ifdef WORK_PIPE
838	DEBUG_INSIST(-1 != c->resp_read_pipe);
839	DEBUG_INSIST(-1 != c->resp_write_pipe);
840	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
841	close(c->resp_write_pipe);
842	close(c->resp_read_pipe);
843	c->resp_write_pipe = -1;
844	c->resp_read_pipe = -1;
845#   else
846	DEBUG_INSIST(NULL != c->responses_pending);
847	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
848	c->responses_pending = delete_sema(c->responses_pending);
849#   endif
850
851	/* Is it necessary to check if there are pending requests and
852	 * responses? If so, and if there are, what to do with them?
853	 */
854
855	/* re-init buffer index sequencers */
856	c->head_workitem = 0;
857	c->tail_workitem = 0;
858	c->head_response = 0;
859	c->tail_response = 0;
860
861	c->reusable = TRUE;
862}
863
864
865#else	/* !WORK_THREAD follows */
866char work_thread_nonempty_compilation_unit;
867#endif
868