1/*	$NetBSD: work_thread.c,v 1.8 2020/05/25 20:47:24 christos Exp $	*/
2
3/*
4 * work_thread.c - threads implementation for blocking worker child.
5 */
6#include <config.h>
7#include "ntp_workimpl.h"
8
9#ifdef WORK_THREAD
10
11#include <stdio.h>
12#include <ctype.h>
13#include <signal.h>
14#ifndef SYS_WINNT
15#include <pthread.h>
16#endif
17
18#include "ntp_stdlib.h"
19#include "ntp_malloc.h"
20#include "ntp_syslog.h"
21#include "ntpd.h"
22#include "ntp_io.h"
23#include "ntp_assert.h"
24#include "ntp_unixtime.h"
25#include "timespecops.h"
26#include "ntp_worker.h"
27
28#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
29#define CHILD_GONE_RESP	CHILD_EXIT_REQ
30/* Queue size increments:
31 * The request queue grows a bit faster than the response queue -- the
32 * daemon can push requests and pull results faster on avarage than the
33 * worker can process requests and push results...  If this really pays
34 * off is debatable.
35 */
36#define WORKITEMS_ALLOC_INC	16
37#define RESPONSES_ALLOC_INC	4
38
39/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
40 * set the maximum to 256kB. If the minimum goes below the
41 * system-defined minimum stack size, we have to adjust accordingly.
42 */
43#ifndef THREAD_MINSTACKSIZE
44# define THREAD_MINSTACKSIZE	(64U * 1024)
45#endif
46#ifndef __sun
47#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
48# undef THREAD_MINSTACKSIZE
49# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
50#endif
51#endif
52
53#ifndef THREAD_MAXSTACKSIZE
54# define THREAD_MAXSTACKSIZE	(256U * 1024)
55#endif
56#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
57# undef  THREAD_MAXSTACKSIZE
58# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
59#endif
60
61/* need a good integer to store a pointer... */
62#ifndef UINTPTR_T
63# if defined(UINTPTR_MAX)
64#  define UINTPTR_T uintptr_t
65# elif defined(UINT_PTR)
66#  define UINTPTR_T UINT_PTR
67# else
68#  define UINTPTR_T size_t
69# endif
70#endif
71
72
73#ifdef SYS_WINNT
74
75# define thread_exit(c)	_endthreadex(c)
76# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
77u_int	WINAPI	blocking_thread(void *);
78static BOOL	same_os_sema(const sem_ref obj, void * osobj);
79
80#else
81
82# define thread_exit(c)	pthread_exit((void*)(UINTPTR_T)(c))
83# define tickle_sem	sem_post
84void *		blocking_thread(void *);
85static	void	block_thread_signals(sigset_t *);
86
87#endif
88
89#ifdef WORK_PIPE
90addremove_io_fd_func		addremove_io_fd;
91#else
92addremove_io_semaphore_func	addremove_io_semaphore;
93#endif
94
95static	void	start_blocking_thread(blocking_child *);
96static	void	start_blocking_thread_internal(blocking_child *);
97static	void	prepare_child_sems(blocking_child *);
98static	int	wait_for_sem(sem_ref, struct timespec *);
99static	int	ensure_workitems_empty_slot(blocking_child *);
100static	int	ensure_workresp_empty_slot(blocking_child *);
101static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
102static	void	cleanup_after_child(blocking_child *);
103
104static sema_type worker_mmutex;
105static sem_ref   worker_memlock;
106
107/* --------------------------------------------------------------------
108 * locking the global worker state table (and other global stuff)
109 */
110void
111worker_global_lock(
112	int inOrOut)
113{
114	if (worker_memlock) {
115		if (inOrOut)
116			wait_for_sem(worker_memlock, NULL);
117		else
118			tickle_sem(worker_memlock);
119	}
120}
121
122/* --------------------------------------------------------------------
123 * implementation isolation wrapper
124 */
125void
126exit_worker(
127	int	exitcode
128	)
129{
130	thread_exit(exitcode);	/* see #define thread_exit */
131}
132
133/* --------------------------------------------------------------------
134 * sleep for a given time or until the wakup semaphore is tickled.
135 */
136int
137worker_sleep(
138	blocking_child *	c,
139	time_t			seconds
140	)
141{
142	struct timespec	until;
143	int		rc;
144
145# ifdef HAVE_CLOCK_GETTIME
146	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
147		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
148		return -1;
149	}
150# else
151	if (0 != getclock(TIMEOFDAY, &until)) {
152		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
153		return -1;
154	}
155# endif
156	until.tv_sec += seconds;
157	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
158	if (0 == rc)
159		return -1;
160	if (-1 == rc && ETIMEDOUT == errno)
161		return 0;
162	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
163	return -1;
164}
165
166
167/* --------------------------------------------------------------------
168 * Wake up a worker that takes a nap.
169 */
170void
171interrupt_worker_sleep(void)
172{
173	u_int			idx;
174	blocking_child *	c;
175
176	for (idx = 0; idx < blocking_children_alloc; idx++) {
177		c = blocking_children[idx];
178		if (NULL == c || NULL == c->wake_scheduled_sleep)
179			continue;
180		tickle_sem(c->wake_scheduled_sleep);
181	}
182}
183
184/* --------------------------------------------------------------------
185 * Make sure there is an empty slot at the head of the request
186 * queue. Tell if the queue is currently empty.
187 */
188static int
189ensure_workitems_empty_slot(
190	blocking_child *c
191	)
192{
193	/*
194	** !!! PRECONDITION: caller holds access lock!
195	**
196	** This simply tries to increase the size of the buffer if it
197	** becomes full. The resize operation does *not* maintain the
198	** order of requests, but that should be irrelevant since the
199	** processing is considered asynchronous anyway.
200	**
201	** Return if the buffer is currently empty.
202	*/
203
204	static const size_t each =
205	    sizeof(blocking_children[0]->workitems[0]);
206
207	size_t	new_alloc;
208	size_t  slots_used;
209	size_t	sidx;
210
211	slots_used = c->head_workitem - c->tail_workitem;
212	if (slots_used >= c->workitems_alloc) {
213		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
214		c->workitems = erealloc(c->workitems, new_alloc * each);
215		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
216		    c->workitems[sidx] = NULL;
217		c->tail_workitem   = 0;
218		c->head_workitem   = c->workitems_alloc;
219		c->workitems_alloc = new_alloc;
220	}
221	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
222	return (0 == slots_used);
223}
224
225/* --------------------------------------------------------------------
226 * Make sure there is an empty slot at the head of the response
227 * queue. Tell if the queue is currently empty.
228 */
229static int
230ensure_workresp_empty_slot(
231	blocking_child *c
232	)
233{
234	/*
235	** !!! PRECONDITION: caller holds access lock!
236	**
237	** Works like the companion function above.
238	*/
239
240	static const size_t each =
241	    sizeof(blocking_children[0]->responses[0]);
242
243	size_t	new_alloc;
244	size_t  slots_used;
245	size_t	sidx;
246
247	slots_used = c->head_response - c->tail_response;
248	if (slots_used >= c->responses_alloc) {
249		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
250		c->responses = erealloc(c->responses, new_alloc * each);
251		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
252		    c->responses[sidx] = NULL;
253		c->tail_response   = 0;
254		c->head_response   = c->responses_alloc;
255		c->responses_alloc = new_alloc;
256	}
257	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
258	return (0 == slots_used);
259}
260
261
262/* --------------------------------------------------------------------
263 * queue_req_pointer() - append a work item or idle exit request to
264 *			 blocking_workitems[]. Employ proper locking.
265 */
266static int
267queue_req_pointer(
268	blocking_child	*	c,
269	blocking_pipe_header *	hdr
270	)
271{
272	size_t qhead;
273
274	/* >>>> ACCESS LOCKING STARTS >>>> */
275	wait_for_sem(c->accesslock, NULL);
276	ensure_workitems_empty_slot(c);
277	qhead = c->head_workitem;
278	c->workitems[qhead % c->workitems_alloc] = hdr;
279	c->head_workitem = 1 + qhead;
280	tickle_sem(c->accesslock);
281	/* <<<< ACCESS LOCKING ENDS <<<< */
282
283	/* queue consumer wake-up notification */
284	tickle_sem(c->workitems_pending);
285
286	return 0;
287}
288
289/* --------------------------------------------------------------------
290 * API function to make sure a worker is running, a proper private copy
291 * of the data is made, the data eneterd into the queue and the worker
292 * is signalled.
293 */
294int
295send_blocking_req_internal(
296	blocking_child *	c,
297	blocking_pipe_header *	hdr,
298	void *			data
299	)
300{
301	blocking_pipe_header *	threadcopy;
302	size_t			payload_octets;
303
304	REQUIRE(hdr != NULL);
305	REQUIRE(data != NULL);
306	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
307
308	if (hdr->octets <= sizeof(*hdr))
309		return 1;	/* failure */
310	payload_octets = hdr->octets - sizeof(*hdr);
311
312	if (NULL == c->thread_ref)
313		start_blocking_thread(c);
314	threadcopy = emalloc(hdr->octets);
315	memcpy(threadcopy, hdr, sizeof(*hdr));
316	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
317
318	return queue_req_pointer(c, threadcopy);
319}
320
321/* --------------------------------------------------------------------
322 * Wait for the 'incoming queue no longer empty' signal, lock the shared
323 * structure and dequeue an item.
324 */
325blocking_pipe_header *
326receive_blocking_req_internal(
327	blocking_child *	c
328	)
329{
330	blocking_pipe_header *	req;
331	size_t			qhead, qtail;
332
333	req = NULL;
334	do {
335		/* wait for tickle from the producer side */
336		wait_for_sem(c->workitems_pending, NULL);
337
338		/* >>>> ACCESS LOCKING STARTS >>>> */
339		wait_for_sem(c->accesslock, NULL);
340		qhead = c->head_workitem;
341		do {
342			qtail = c->tail_workitem;
343			if (qhead == qtail)
344				break;
345			c->tail_workitem = qtail + 1;
346			qtail %= c->workitems_alloc;
347			req = c->workitems[qtail];
348			c->workitems[qtail] = NULL;
349		} while (NULL == req);
350		tickle_sem(c->accesslock);
351		/* <<<< ACCESS LOCKING ENDS <<<< */
352
353	} while (NULL == req);
354
355	INSIST(NULL != req);
356	if (CHILD_EXIT_REQ == req) {	/* idled out */
357		send_blocking_resp_internal(c, CHILD_GONE_RESP);
358		req = NULL;
359	}
360
361	return req;
362}
363
364/* --------------------------------------------------------------------
365 * Push a response into the return queue and eventually tickle the
366 * receiver.
367 */
368int
369send_blocking_resp_internal(
370	blocking_child *	c,
371	blocking_pipe_header *	resp
372	)
373{
374	size_t	qhead;
375	int	empty;
376
377	/* >>>> ACCESS LOCKING STARTS >>>> */
378	wait_for_sem(c->accesslock, NULL);
379	empty = ensure_workresp_empty_slot(c);
380	qhead = c->head_response;
381	c->responses[qhead % c->responses_alloc] = resp;
382	c->head_response = 1 + qhead;
383	tickle_sem(c->accesslock);
384	/* <<<< ACCESS LOCKING ENDS <<<< */
385
386	/* queue consumer wake-up notification */
387	if (empty)
388	{
389#	    ifdef WORK_PIPE
390		if (1 != write(c->resp_write_pipe, "", 1))
391			msyslog(LOG_WARNING, "async resolver: %s",
392				"failed to notify main thread!");
393#	    else
394		tickle_sem(c->responses_pending);
395#	    endif
396	}
397	return 0;
398}
399
400
401#ifndef WORK_PIPE
402
403/* --------------------------------------------------------------------
404 * Check if a (Windows-)hanndle to a semaphore is actually the same we
405 * are using inside the sema wrapper.
406 */
407static BOOL
408same_os_sema(
409	const sem_ref	obj,
410	void*		osh
411	)
412{
413	return obj && osh && (obj->shnd == (HANDLE)osh);
414}
415
416/* --------------------------------------------------------------------
417 * Find the shared context that associates to an OS handle and make sure
418 * the data is dequeued and processed.
419 */
420void
421handle_blocking_resp_sem(
422	void *	context
423	)
424{
425	blocking_child *	c;
426	u_int			idx;
427
428	c = NULL;
429	for (idx = 0; idx < blocking_children_alloc; idx++) {
430		c = blocking_children[idx];
431		if (c != NULL &&
432			c->thread_ref != NULL &&
433			same_os_sema(c->responses_pending, context))
434			break;
435	}
436	if (idx < blocking_children_alloc)
437		process_blocking_resp(c);
438}
439#endif	/* !WORK_PIPE */
440
441/* --------------------------------------------------------------------
442 * Fetch the next response from the return queue. In case of signalling
443 * via pipe, make sure the pipe is flushed, too.
444 */
445blocking_pipe_header *
446receive_blocking_resp_internal(
447	blocking_child *	c
448	)
449{
450	blocking_pipe_header *	removed;
451	size_t			qhead, qtail, slot;
452
453#ifdef WORK_PIPE
454	int			rc;
455	char			scratch[32];
456
457	do
458		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
459	while (-1 == rc && EINTR == errno);
460#endif
461
462	/* >>>> ACCESS LOCKING STARTS >>>> */
463	wait_for_sem(c->accesslock, NULL);
464	qhead = c->head_response;
465	qtail = c->tail_response;
466	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
467		slot = qtail % c->responses_alloc;
468		removed = c->responses[slot];
469		c->responses[slot] = NULL;
470	}
471	c->tail_response = qtail;
472	tickle_sem(c->accesslock);
473	/* <<<< ACCESS LOCKING ENDS <<<< */
474
475	if (NULL != removed) {
476		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
477			     BLOCKING_RESP_MAGIC == removed->magic_sig);
478	}
479	if (CHILD_GONE_RESP == removed) {
480		cleanup_after_child(c);
481		removed = NULL;
482	}
483
484	return removed;
485}
486
487/* --------------------------------------------------------------------
488 * Light up a new worker.
489 */
490static void
491start_blocking_thread(
492	blocking_child *	c
493	)
494{
495
496	DEBUG_INSIST(!c->reusable);
497
498	prepare_child_sems(c);
499	start_blocking_thread_internal(c);
500}
501
502/* --------------------------------------------------------------------
503 * Create a worker thread. There are several differences between POSIX
504 * and Windows, of course -- most notably the Windows thread is no
505 * detached thread, and we keep the handle around until we want to get
506 * rid of the thread. The notification scheme also differs: Windows
507 * makes use of semaphores in both directions, POSIX uses a pipe for
508 * integration with 'select()' or alike.
509 */
510static void
511start_blocking_thread_internal(
512	blocking_child *	c
513	)
514#ifdef SYS_WINNT
515{
516	BOOL	resumed;
517
518	c->thread_ref = NULL;
519	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
520	c->thr_table[0].thnd =
521		(HANDLE)_beginthreadex(
522			NULL,
523			0,
524			&blocking_thread,
525			c,
526			CREATE_SUSPENDED,
527			NULL);
528
529	if (NULL == c->thr_table[0].thnd) {
530		msyslog(LOG_ERR, "start blocking thread failed: %m");
531		exit(-1);
532	}
533	/* remember the thread priority is only within the process class */
534	if (!SetThreadPriority(c->thr_table[0].thnd,
535			       THREAD_PRIORITY_BELOW_NORMAL))
536		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
537
538	resumed = ResumeThread(c->thr_table[0].thnd);
539	DEBUG_INSIST(resumed);
540	c->thread_ref = &c->thr_table[0];
541}
542#else	/* pthreads start_blocking_thread_internal() follows */
543{
544# ifdef NEED_PTHREAD_INIT
545	static int	pthread_init_called;
546# endif
547	pthread_attr_t	thr_attr;
548	int		rc;
549	int		pipe_ends[2];	/* read then write */
550	int		is_pipe;
551	int		flags;
552	size_t		ostacksize;
553	size_t		nstacksize;
554	sigset_t	saved_sig_mask;
555
556	c->thread_ref = NULL;
557
558# ifdef NEED_PTHREAD_INIT
559	/*
560	 * from lib/isc/unix/app.c:
561	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
562	 */
563	if (!pthread_init_called) {
564		pthread_init();
565		pthread_init_called = TRUE;
566	}
567# endif
568
569	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
570	if (0 != rc) {
571		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
572		exit(1);
573	}
574	c->resp_read_pipe = move_fd(pipe_ends[0]);
575	c->resp_write_pipe = move_fd(pipe_ends[1]);
576	c->ispipe = is_pipe;
577	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
578	if (-1 == flags) {
579		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
580		exit(1);
581	}
582	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
583	if (-1 == rc) {
584		msyslog(LOG_ERR,
585			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
586		exit(1);
587	}
588	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
589	pthread_attr_init(&thr_attr);
590	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
591#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
592    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
593	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
594	if (0 != rc) {
595		msyslog(LOG_ERR,
596			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
597			strerror(rc));
598	} else {
599		if (ostacksize < THREAD_MINSTACKSIZE)
600			nstacksize = THREAD_MINSTACKSIZE;
601		else if (ostacksize > THREAD_MAXSTACKSIZE)
602			nstacksize = THREAD_MAXSTACKSIZE;
603		else
604			nstacksize = ostacksize;
605		if (nstacksize != ostacksize)
606			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
607		if (0 != rc)
608			msyslog(LOG_ERR,
609				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
610				(u_long)ostacksize, (u_long)nstacksize,
611				strerror(rc));
612	}
613#else
614	UNUSED_ARG(nstacksize);
615	UNUSED_ARG(ostacksize);
616#endif
617#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
618	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
619#endif
620	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
621	block_thread_signals(&saved_sig_mask);
622	rc = pthread_create(&c->thr_table[0], &thr_attr,
623			    &blocking_thread, c);
624	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
625	pthread_attr_destroy(&thr_attr);
626	if (0 != rc) {
627		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
628			strerror(rc));
629		exit(1);
630	}
631	c->thread_ref = &c->thr_table[0];
632}
633#endif
634
635/* --------------------------------------------------------------------
636 * block_thread_signals()
637 *
638 * Temporarily block signals used by ntpd main thread, so that signal
639 * mask inherited by child threads leaves them blocked.  Returns prior
640 * active signal mask via pmask, to be restored by the main thread
641 * after pthread_create().
642 */
643#ifndef SYS_WINNT
644void
645block_thread_signals(
646	sigset_t *	pmask
647	)
648{
649	sigset_t	block;
650
651	sigemptyset(&block);
652# ifdef HAVE_SIGNALED_IO
653#  ifdef SIGIO
654	sigaddset(&block, SIGIO);
655#  endif
656#  ifdef SIGPOLL
657	sigaddset(&block, SIGPOLL);
658#  endif
659# endif	/* HAVE_SIGNALED_IO */
660	sigaddset(&block, SIGALRM);
661	sigaddset(&block, MOREDEBUGSIG);
662	sigaddset(&block, LESSDEBUGSIG);
663# ifdef SIGDIE1
664	sigaddset(&block, SIGDIE1);
665# endif
666# ifdef SIGDIE2
667	sigaddset(&block, SIGDIE2);
668# endif
669# ifdef SIGDIE3
670	sigaddset(&block, SIGDIE3);
671# endif
672# ifdef SIGDIE4
673	sigaddset(&block, SIGDIE4);
674# endif
675# ifdef SIGBUS
676	sigaddset(&block, SIGBUS);
677# endif
678	sigemptyset(pmask);
679	pthread_sigmask(SIG_BLOCK, &block, pmask);
680}
681#endif	/* !SYS_WINNT */
682
683
684/* --------------------------------------------------------------------
685 * Create & destroy semaphores. This is sufficiently different between
686 * POSIX and Windows to warrant wrapper functions and close enough to
687 * use the concept of synchronization via semaphore for all platforms.
688 */
689static sem_ref
690create_sema(
691	sema_type*	semptr,
692	u_int		inival,
693	u_int		maxval)
694{
695#ifdef SYS_WINNT
696
697	long svini, svmax;
698	if (NULL != semptr) {
699		svini = (inival < LONG_MAX)
700		    ? (long)inival : LONG_MAX;
701		svmax = (maxval < LONG_MAX && maxval > 0)
702		    ? (long)maxval : LONG_MAX;
703		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
704		if (NULL == semptr->shnd)
705			semptr = NULL;
706	}
707
708#else
709
710	(void)maxval;
711	if (semptr && sem_init(semptr, FALSE, inival))
712		semptr = NULL;
713
714#endif
715
716	return semptr;
717}
718
719/* ------------------------------------------------------------------ */
720static sem_ref
721delete_sema(
722	sem_ref obj)
723{
724
725#   ifdef SYS_WINNT
726
727	if (obj) {
728		if (obj->shnd)
729			CloseHandle(obj->shnd);
730		obj->shnd = NULL;
731	}
732
733#   else
734
735	if (obj)
736		sem_destroy(obj);
737
738#   endif
739
740	return NULL;
741}
742
743/* --------------------------------------------------------------------
744 * prepare_child_sems()
745 *
746 * create sync & access semaphores
747 *
748 * All semaphores are cleared, only the access semaphore has 1 unit.
749 * Childs wait on 'workitems_pending', then grabs 'sema_access'
750 * and dequeues jobs. When done, 'sema_access' is given one unit back.
751 *
752 * The producer grabs 'sema_access', manages the queue, restores
753 * 'sema_access' and puts one unit into 'workitems_pending'.
754 *
755 * The story goes the same for the response queue.
756 */
757static void
758prepare_child_sems(
759	blocking_child *c
760	)
761{
762	if (NULL == worker_memlock)
763		worker_memlock = create_sema(&worker_mmutex, 1, 1);
764
765	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
766	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
767	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
768#   ifndef WORK_PIPE
769	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
770#   endif
771}
772
773/* --------------------------------------------------------------------
774 * wait for semaphore. Where the wait can be interrupted, it will
775 * internally resume -- When this function returns, there is either no
776 * semaphore at all, a timeout occurred, or the caller could
777 * successfully take a token from the semaphore.
778 *
779 * For untimed wait, not checking the result of this function at all is
780 * definitely an option.
781 */
782static int
783wait_for_sem(
784	sem_ref			sem,
785	struct timespec *	timeout		/* wall-clock */
786	)
787#ifdef SYS_WINNT
788{
789	struct timespec now;
790	struct timespec delta;
791	DWORD		msec;
792	DWORD		rc;
793
794	if (!(sem && sem->shnd)) {
795		errno = EINVAL;
796		return -1;
797	}
798
799	if (NULL == timeout) {
800		msec = INFINITE;
801	} else {
802		getclock(TIMEOFDAY, &now);
803		delta = sub_tspec(*timeout, now);
804		if (delta.tv_sec < 0) {
805			msec = 0;
806		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
807			msec = INFINITE;
808		} else {
809			msec = 1000 * (DWORD)delta.tv_sec;
810			msec += delta.tv_nsec / (1000 * 1000);
811		}
812	}
813	rc = WaitForSingleObject(sem->shnd, msec);
814	if (WAIT_OBJECT_0 == rc)
815		return 0;
816	if (WAIT_TIMEOUT == rc) {
817		errno = ETIMEDOUT;
818		return -1;
819	}
820	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
821	errno = EFAULT;
822	return -1;
823}
824#else	/* pthreads wait_for_sem() follows */
825{
826	int rc = -1;
827
828	if (sem) do {
829			if (NULL == timeout)
830				rc = sem_wait(sem);
831			else
832				rc = sem_timedwait(sem, timeout);
833		} while (rc == -1 && errno == EINTR);
834	else
835		errno = EINVAL;
836
837	return rc;
838}
839#endif
840
841/* --------------------------------------------------------------------
842 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
843 * calling conventions under Windows and POSIX-defined signature
844 * otherwise.
845 */
846#ifdef SYS_WINNT
847u_int WINAPI
848#else
849void *
850#endif
851blocking_thread(
852	void *	ThreadArg
853	)
854{
855	blocking_child *c;
856
857	c = ThreadArg;
858	exit_worker(blocking_child_common(c));
859
860	/* NOTREACHED */
861	return 0;
862}
863
864/* --------------------------------------------------------------------
865 * req_child_exit() runs in the parent.
866 *
867 * This function is called from from the idle timer, too, and possibly
868 * without a thread being there any longer. Since we have folded up our
869 * tent in that case and all the semaphores are already gone, we simply
870 * ignore this request in this case.
871 *
872 * Since the existence of the semaphores is controlled exclusively by
873 * the parent, there's no risk of data race here.
874 */
875int
876req_child_exit(
877	blocking_child *c
878	)
879{
880	return (c->accesslock)
881	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
882	    : 0;
883}
884
885/* --------------------------------------------------------------------
886 * cleanup_after_child() runs in parent.
887 */
888static void
889cleanup_after_child(
890	blocking_child *	c
891	)
892{
893	DEBUG_INSIST(!c->reusable);
894
895#   ifdef SYS_WINNT
896	/* The thread was not created in detached state, so we better
897	 * clean up.
898	 */
899	if (c->thread_ref && c->thread_ref->thnd) {
900		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
901		INSIST(CloseHandle(c->thread_ref->thnd));
902		c->thread_ref->thnd = NULL;
903	}
904#   endif
905	c->thread_ref = NULL;
906
907	/* remove semaphores and (if signalling vi IO) pipes */
908
909	c->accesslock           = delete_sema(c->accesslock);
910	c->workitems_pending    = delete_sema(c->workitems_pending);
911	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
912
913#   ifdef WORK_PIPE
914	DEBUG_INSIST(-1 != c->resp_read_pipe);
915	DEBUG_INSIST(-1 != c->resp_write_pipe);
916	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
917	close(c->resp_write_pipe);
918	close(c->resp_read_pipe);
919	c->resp_write_pipe = -1;
920	c->resp_read_pipe = -1;
921#   else
922	DEBUG_INSIST(NULL != c->responses_pending);
923	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
924	c->responses_pending = delete_sema(c->responses_pending);
925#   endif
926
927	/* Is it necessary to check if there are pending requests and
928	 * responses? If so, and if there are, what to do with them?
929	 */
930
931	/* re-init buffer index sequencers */
932	c->head_workitem = 0;
933	c->tail_workitem = 0;
934	c->head_response = 0;
935	c->tail_response = 0;
936
937	c->reusable = TRUE;
938}
939
940
941#else	/* !WORK_THREAD follows */
942char work_thread_nonempty_compilation_unit;
943#endif
944