1/*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4#include <config.h>
5#include "ntp_workimpl.h"
6
7#ifdef WORK_THREAD
8
9#include <stdio.h>
10#include <ctype.h>
11#include <signal.h>
12#ifndef SYS_WINNT
13#include <pthread.h>
14#endif
15
16#include "ntp_stdlib.h"
17#include "ntp_malloc.h"
18#include "ntp_syslog.h"
19#include "ntpd.h"
20#include "ntp_io.h"
21#include "ntp_assert.h"
22#include "ntp_unixtime.h"
23#include "timespecops.h"
24#include "ntp_worker.h"
25
26#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27#define CHILD_GONE_RESP	CHILD_EXIT_REQ
28/* Queue size increments:
29 * The request queue grows a bit faster than the response queue -- the
30 * daemon can push requests and pull results faster on avarage than the
31 * worker can process requests and push results...  If this really pays
32 * off is debatable.
33 */
34#define WORKITEMS_ALLOC_INC	16
35#define RESPONSES_ALLOC_INC	4
36
37/* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38 * set the maximum to 256kB. If the minimum goes below the
39 * system-defined minimum stack size, we have to adjust accordingly.
40 */
41#ifndef THREAD_MINSTACKSIZE
42# define THREAD_MINSTACKSIZE	(64U * 1024)
43#endif
44#ifndef __sun
45#if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46# undef THREAD_MINSTACKSIZE
47# define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48#endif
49#endif
50
51#ifndef THREAD_MAXSTACKSIZE
52# define THREAD_MAXSTACKSIZE	(256U * 1024)
53#endif
54#if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55# undef  THREAD_MAXSTACKSIZE
56# define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57#endif
58
59/* need a good integer to store a pointer... */
60#ifndef UINTPTR_T
61# if defined(UINTPTR_MAX)
62#  define UINTPTR_T uintptr_t
63# elif defined(UINT_PTR)
64#  define UINTPTR_T UINT_PTR
65# else
66#  define UINTPTR_T size_t
67# endif
68#endif
69
70
71#ifdef SYS_WINNT
72
73# define thread_exit(c)	_endthreadex(c)
74# define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
75u_int	WINAPI	blocking_thread(void *);
76static BOOL	same_os_sema(const sem_ref obj, void * osobj);
77
78#else
79
80# define thread_exit(c)	pthread_exit((void*)(UINTPTR_T)(c))
81# define tickle_sem	sem_post
82void *		blocking_thread(void *);
83static	void	block_thread_signals(sigset_t *);
84
85#endif
86
87#ifdef WORK_PIPE
88addremove_io_fd_func		addremove_io_fd;
89#else
90addremove_io_semaphore_func	addremove_io_semaphore;
91#endif
92
93static	void	start_blocking_thread(blocking_child *);
94static	void	start_blocking_thread_internal(blocking_child *);
95static	void	prepare_child_sems(blocking_child *);
96static	int	wait_for_sem(sem_ref, struct timespec *);
97static	int	ensure_workitems_empty_slot(blocking_child *);
98static	int	ensure_workresp_empty_slot(blocking_child *);
99static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
100static	void	cleanup_after_child(blocking_child *);
101
102static sema_type worker_mmutex;
103static sem_ref   worker_memlock;
104
105/* --------------------------------------------------------------------
106 * locking the global worker state table (and other global stuff)
107 */
108void
109worker_global_lock(
110	int inOrOut)
111{
112	if (worker_memlock) {
113		if (inOrOut)
114			wait_for_sem(worker_memlock, NULL);
115		else
116			tickle_sem(worker_memlock);
117	}
118}
119
120/* --------------------------------------------------------------------
121 * implementation isolation wrapper
122 */
123void
124exit_worker(
125	int	exitcode
126	)
127{
128	thread_exit(exitcode);	/* see #define thread_exit */
129}
130
131/* --------------------------------------------------------------------
132 * sleep for a given time or until the wakup semaphore is tickled.
133 */
134int
135worker_sleep(
136	blocking_child *	c,
137	time_t			seconds
138	)
139{
140	struct timespec	until;
141	int		rc;
142
143# ifdef HAVE_CLOCK_GETTIME
144	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
145		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
146		return -1;
147	}
148# else
149	if (0 != getclock(TIMEOFDAY, &until)) {
150		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
151		return -1;
152	}
153# endif
154	until.tv_sec += seconds;
155	rc = wait_for_sem(c->wake_scheduled_sleep, &until);
156	if (0 == rc)
157		return -1;
158	if (-1 == rc && ETIMEDOUT == errno)
159		return 0;
160	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
161	return -1;
162}
163
164
165/* --------------------------------------------------------------------
166 * Wake up a worker that takes a nap.
167 */
168void
169interrupt_worker_sleep(void)
170{
171	u_int			idx;
172	blocking_child *	c;
173
174	for (idx = 0; idx < blocking_children_alloc; idx++) {
175		c = blocking_children[idx];
176		if (NULL == c || NULL == c->wake_scheduled_sleep)
177			continue;
178		tickle_sem(c->wake_scheduled_sleep);
179	}
180}
181
182/* --------------------------------------------------------------------
183 * Make sure there is an empty slot at the head of the request
184 * queue. Tell if the queue is currently empty.
185 */
186static int
187ensure_workitems_empty_slot(
188	blocking_child *c
189	)
190{
191	/*
192	** !!! PRECONDITION: caller holds access lock!
193	**
194	** This simply tries to increase the size of the buffer if it
195	** becomes full. The resize operation does *not* maintain the
196	** order of requests, but that should be irrelevant since the
197	** processing is considered asynchronous anyway.
198	**
199	** Return if the buffer is currently empty.
200	*/
201
202	static const size_t each =
203	    sizeof(blocking_children[0]->workitems[0]);
204
205	size_t	new_alloc;
206	size_t  slots_used;
207	size_t	sidx;
208
209	slots_used = c->head_workitem - c->tail_workitem;
210	if (slots_used >= c->workitems_alloc) {
211		new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
212		c->workitems = erealloc(c->workitems, new_alloc * each);
213		for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
214		    c->workitems[sidx] = NULL;
215		c->tail_workitem   = 0;
216		c->head_workitem   = c->workitems_alloc;
217		c->workitems_alloc = new_alloc;
218	}
219	INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
220	return (0 == slots_used);
221}
222
223/* --------------------------------------------------------------------
224 * Make sure there is an empty slot at the head of the response
225 * queue. Tell if the queue is currently empty.
226 */
227static int
228ensure_workresp_empty_slot(
229	blocking_child *c
230	)
231{
232	/*
233	** !!! PRECONDITION: caller holds access lock!
234	**
235	** Works like the companion function above.
236	*/
237
238	static const size_t each =
239	    sizeof(blocking_children[0]->responses[0]);
240
241	size_t	new_alloc;
242	size_t  slots_used;
243	size_t	sidx;
244
245	slots_used = c->head_response - c->tail_response;
246	if (slots_used >= c->responses_alloc) {
247		new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
248		c->responses = erealloc(c->responses, new_alloc * each);
249		for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
250		    c->responses[sidx] = NULL;
251		c->tail_response   = 0;
252		c->head_response   = c->responses_alloc;
253		c->responses_alloc = new_alloc;
254	}
255	INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
256	return (0 == slots_used);
257}
258
259
260/* --------------------------------------------------------------------
261 * queue_req_pointer() - append a work item or idle exit request to
262 *			 blocking_workitems[]. Employ proper locking.
263 */
264static int
265queue_req_pointer(
266	blocking_child	*	c,
267	blocking_pipe_header *	hdr
268	)
269{
270	size_t qhead;
271
272	/* >>>> ACCESS LOCKING STARTS >>>> */
273	wait_for_sem(c->accesslock, NULL);
274	ensure_workitems_empty_slot(c);
275	qhead = c->head_workitem;
276	c->workitems[qhead % c->workitems_alloc] = hdr;
277	c->head_workitem = 1 + qhead;
278	tickle_sem(c->accesslock);
279	/* <<<< ACCESS LOCKING ENDS <<<< */
280
281	/* queue consumer wake-up notification */
282	tickle_sem(c->workitems_pending);
283
284	return 0;
285}
286
287/* --------------------------------------------------------------------
288 * API function to make sure a worker is running, a proper private copy
289 * of the data is made, the data eneterd into the queue and the worker
290 * is signalled.
291 */
292int
293send_blocking_req_internal(
294	blocking_child *	c,
295	blocking_pipe_header *	hdr,
296	void *			data
297	)
298{
299	blocking_pipe_header *	threadcopy;
300	size_t			payload_octets;
301
302	REQUIRE(hdr != NULL);
303	REQUIRE(data != NULL);
304	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
305
306	if (hdr->octets <= sizeof(*hdr))
307		return 1;	/* failure */
308	payload_octets = hdr->octets - sizeof(*hdr);
309
310	if (NULL == c->thread_ref)
311		start_blocking_thread(c);
312	threadcopy = emalloc(hdr->octets);
313	memcpy(threadcopy, hdr, sizeof(*hdr));
314	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
315
316	return queue_req_pointer(c, threadcopy);
317}
318
319/* --------------------------------------------------------------------
320 * Wait for the 'incoming queue no longer empty' signal, lock the shared
321 * structure and dequeue an item.
322 */
323blocking_pipe_header *
324receive_blocking_req_internal(
325	blocking_child *	c
326	)
327{
328	blocking_pipe_header *	req;
329	size_t			qhead, qtail;
330
331	req = NULL;
332	do {
333		/* wait for tickle from the producer side */
334		wait_for_sem(c->workitems_pending, NULL);
335
336		/* >>>> ACCESS LOCKING STARTS >>>> */
337		wait_for_sem(c->accesslock, NULL);
338		qhead = c->head_workitem;
339		do {
340			qtail = c->tail_workitem;
341			if (qhead == qtail)
342				break;
343			c->tail_workitem = qtail + 1;
344			qtail %= c->workitems_alloc;
345			req = c->workitems[qtail];
346			c->workitems[qtail] = NULL;
347		} while (NULL == req);
348		tickle_sem(c->accesslock);
349		/* <<<< ACCESS LOCKING ENDS <<<< */
350
351	} while (NULL == req);
352
353	INSIST(NULL != req);
354	if (CHILD_EXIT_REQ == req) {	/* idled out */
355		send_blocking_resp_internal(c, CHILD_GONE_RESP);
356		req = NULL;
357	}
358
359	return req;
360}
361
362/* --------------------------------------------------------------------
363 * Push a response into the return queue and eventually tickle the
364 * receiver.
365 */
366int
367send_blocking_resp_internal(
368	blocking_child *	c,
369	blocking_pipe_header *	resp
370	)
371{
372	size_t	qhead;
373	int	empty;
374
375	/* >>>> ACCESS LOCKING STARTS >>>> */
376	wait_for_sem(c->accesslock, NULL);
377	empty = ensure_workresp_empty_slot(c);
378	qhead = c->head_response;
379	c->responses[qhead % c->responses_alloc] = resp;
380	c->head_response = 1 + qhead;
381	tickle_sem(c->accesslock);
382	/* <<<< ACCESS LOCKING ENDS <<<< */
383
384	/* queue consumer wake-up notification */
385	if (empty)
386	{
387#	    ifdef WORK_PIPE
388		if (1 != write(c->resp_write_pipe, "", 1))
389			msyslog(LOG_WARNING, "async resolver: %s",
390				"failed to notify main thread!");
391#	    else
392		tickle_sem(c->responses_pending);
393#	    endif
394	}
395	return 0;
396}
397
398
399#ifndef WORK_PIPE
400
401/* --------------------------------------------------------------------
402 * Check if a (Windows-)hanndle to a semaphore is actually the same we
403 * are using inside the sema wrapper.
404 */
405static BOOL
406same_os_sema(
407	const sem_ref	obj,
408	void*		osh
409	)
410{
411	return obj && osh && (obj->shnd == (HANDLE)osh);
412}
413
414/* --------------------------------------------------------------------
415 * Find the shared context that associates to an OS handle and make sure
416 * the data is dequeued and processed.
417 */
418void
419handle_blocking_resp_sem(
420	void *	context
421	)
422{
423	blocking_child *	c;
424	u_int			idx;
425
426	c = NULL;
427	for (idx = 0; idx < blocking_children_alloc; idx++) {
428		c = blocking_children[idx];
429		if (c != NULL &&
430			c->thread_ref != NULL &&
431			same_os_sema(c->responses_pending, context))
432			break;
433	}
434	if (idx < blocking_children_alloc)
435		process_blocking_resp(c);
436}
437#endif	/* !WORK_PIPE */
438
439/* --------------------------------------------------------------------
440 * Fetch the next response from the return queue. In case of signalling
441 * via pipe, make sure the pipe is flushed, too.
442 */
443blocking_pipe_header *
444receive_blocking_resp_internal(
445	blocking_child *	c
446	)
447{
448	blocking_pipe_header *	removed;
449	size_t			qhead, qtail, slot;
450
451#ifdef WORK_PIPE
452	int			rc;
453	char			scratch[32];
454
455	do
456		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
457	while (-1 == rc && EINTR == errno);
458#endif
459
460	/* >>>> ACCESS LOCKING STARTS >>>> */
461	wait_for_sem(c->accesslock, NULL);
462	qhead = c->head_response;
463	qtail = c->tail_response;
464	for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
465		slot = qtail % c->responses_alloc;
466		removed = c->responses[slot];
467		c->responses[slot] = NULL;
468	}
469	c->tail_response = qtail;
470	tickle_sem(c->accesslock);
471	/* <<<< ACCESS LOCKING ENDS <<<< */
472
473	if (NULL != removed) {
474		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
475			     BLOCKING_RESP_MAGIC == removed->magic_sig);
476	}
477	if (CHILD_GONE_RESP == removed) {
478		cleanup_after_child(c);
479		removed = NULL;
480	}
481
482	return removed;
483}
484
485/* --------------------------------------------------------------------
486 * Light up a new worker.
487 */
488static void
489start_blocking_thread(
490	blocking_child *	c
491	)
492{
493
494	DEBUG_INSIST(!c->reusable);
495
496	prepare_child_sems(c);
497	start_blocking_thread_internal(c);
498}
499
500/* --------------------------------------------------------------------
501 * Create a worker thread. There are several differences between POSIX
502 * and Windows, of course -- most notably the Windows thread is no
503 * detached thread, and we keep the handle around until we want to get
504 * rid of the thread. The notification scheme also differs: Windows
505 * makes use of semaphores in both directions, POSIX uses a pipe for
506 * integration with 'select()' or alike.
507 */
508static void
509start_blocking_thread_internal(
510	blocking_child *	c
511	)
512#ifdef SYS_WINNT
513{
514	BOOL	resumed;
515
516	c->thread_ref = NULL;
517	(*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
518	c->thr_table[0].thnd =
519		(HANDLE)_beginthreadex(
520			NULL,
521			0,
522			&blocking_thread,
523			c,
524			CREATE_SUSPENDED,
525			NULL);
526
527	if (NULL == c->thr_table[0].thnd) {
528		msyslog(LOG_ERR, "start blocking thread failed: %m");
529		exit(-1);
530	}
531	/* remember the thread priority is only within the process class */
532	if (!SetThreadPriority(c->thr_table[0].thnd,
533			       THREAD_PRIORITY_BELOW_NORMAL))
534		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
535
536	resumed = ResumeThread(c->thr_table[0].thnd);
537	DEBUG_INSIST(resumed);
538	c->thread_ref = &c->thr_table[0];
539}
540#else	/* pthreads start_blocking_thread_internal() follows */
541{
542# ifdef NEED_PTHREAD_INIT
543	static int	pthread_init_called;
544# endif
545	pthread_attr_t	thr_attr;
546	int		rc;
547	int		pipe_ends[2];	/* read then write */
548	int		is_pipe;
549	int		flags;
550	size_t		ostacksize;
551	size_t		nstacksize;
552	sigset_t	saved_sig_mask;
553
554	c->thread_ref = NULL;
555
556# ifdef NEED_PTHREAD_INIT
557	/*
558	 * from lib/isc/unix/app.c:
559	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
560	 */
561	if (!pthread_init_called) {
562		pthread_init();
563		pthread_init_called = TRUE;
564	}
565# endif
566
567	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
568	if (0 != rc) {
569		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
570		exit(1);
571	}
572	c->resp_read_pipe = move_fd(pipe_ends[0]);
573	c->resp_write_pipe = move_fd(pipe_ends[1]);
574	c->ispipe = is_pipe;
575	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
576	if (-1 == flags) {
577		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
578		exit(1);
579	}
580	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
581	if (-1 == rc) {
582		msyslog(LOG_ERR,
583			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
584		exit(1);
585	}
586	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
587	pthread_attr_init(&thr_attr);
588	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
589#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
590    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
591	rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
592	if (0 != rc) {
593		msyslog(LOG_ERR,
594			"start_blocking_thread: pthread_attr_getstacksize() -> %s",
595			strerror(rc));
596	} else {
597		if (ostacksize < THREAD_MINSTACKSIZE)
598			nstacksize = THREAD_MINSTACKSIZE;
599		else if (ostacksize > THREAD_MAXSTACKSIZE)
600			nstacksize = THREAD_MAXSTACKSIZE;
601		else
602			nstacksize = ostacksize;
603		if (nstacksize != ostacksize)
604			rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
605		if (0 != rc)
606			msyslog(LOG_ERR,
607				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
608				(u_long)ostacksize, (u_long)nstacksize,
609				strerror(rc));
610	}
611#else
612	UNUSED_ARG(nstacksize);
613	UNUSED_ARG(ostacksize);
614#endif
615#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
616	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
617#endif
618	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
619	block_thread_signals(&saved_sig_mask);
620	rc = pthread_create(&c->thr_table[0], &thr_attr,
621			    &blocking_thread, c);
622	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
623	pthread_attr_destroy(&thr_attr);
624	if (0 != rc) {
625		msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
626			strerror(rc));
627		exit(1);
628	}
629	c->thread_ref = &c->thr_table[0];
630}
631#endif
632
633/* --------------------------------------------------------------------
634 * block_thread_signals()
635 *
636 * Temporarily block signals used by ntpd main thread, so that signal
637 * mask inherited by child threads leaves them blocked.  Returns prior
638 * active signal mask via pmask, to be restored by the main thread
639 * after pthread_create().
640 */
641#ifndef SYS_WINNT
642void
643block_thread_signals(
644	sigset_t *	pmask
645	)
646{
647	sigset_t	block;
648
649	sigemptyset(&block);
650# ifdef HAVE_SIGNALED_IO
651#  ifdef SIGIO
652	sigaddset(&block, SIGIO);
653#  endif
654#  ifdef SIGPOLL
655	sigaddset(&block, SIGPOLL);
656#  endif
657# endif	/* HAVE_SIGNALED_IO */
658	sigaddset(&block, SIGALRM);
659	sigaddset(&block, MOREDEBUGSIG);
660	sigaddset(&block, LESSDEBUGSIG);
661# ifdef SIGDIE1
662	sigaddset(&block, SIGDIE1);
663# endif
664# ifdef SIGDIE2
665	sigaddset(&block, SIGDIE2);
666# endif
667# ifdef SIGDIE3
668	sigaddset(&block, SIGDIE3);
669# endif
670# ifdef SIGDIE4
671	sigaddset(&block, SIGDIE4);
672# endif
673# ifdef SIGBUS
674	sigaddset(&block, SIGBUS);
675# endif
676	sigemptyset(pmask);
677	pthread_sigmask(SIG_BLOCK, &block, pmask);
678}
679#endif	/* !SYS_WINNT */
680
681
682/* --------------------------------------------------------------------
683 * Create & destroy semaphores. This is sufficiently different between
684 * POSIX and Windows to warrant wrapper functions and close enough to
685 * use the concept of synchronization via semaphore for all platforms.
686 */
687static sem_ref
688create_sema(
689	sema_type*	semptr,
690	u_int		inival,
691	u_int		maxval)
692{
693#ifdef SYS_WINNT
694
695	long svini, svmax;
696	if (NULL != semptr) {
697		svini = (inival < LONG_MAX)
698		    ? (long)inival : LONG_MAX;
699		svmax = (maxval < LONG_MAX && maxval > 0)
700		    ? (long)maxval : LONG_MAX;
701		semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
702		if (NULL == semptr->shnd)
703			semptr = NULL;
704	}
705
706#else
707
708	(void)maxval;
709	if (semptr && sem_init(semptr, FALSE, inival))
710		semptr = NULL;
711
712#endif
713
714	return semptr;
715}
716
717/* ------------------------------------------------------------------ */
718static sem_ref
719delete_sema(
720	sem_ref obj)
721{
722
723#   ifdef SYS_WINNT
724
725	if (obj) {
726		if (obj->shnd)
727			CloseHandle(obj->shnd);
728		obj->shnd = NULL;
729	}
730
731#   else
732
733	if (obj)
734		sem_destroy(obj);
735
736#   endif
737
738	return NULL;
739}
740
741/* --------------------------------------------------------------------
742 * prepare_child_sems()
743 *
744 * create sync & access semaphores
745 *
746 * All semaphores are cleared, only the access semaphore has 1 unit.
747 * Childs wait on 'workitems_pending', then grabs 'sema_access'
748 * and dequeues jobs. When done, 'sema_access' is given one unit back.
749 *
750 * The producer grabs 'sema_access', manages the queue, restores
751 * 'sema_access' and puts one unit into 'workitems_pending'.
752 *
753 * The story goes the same for the response queue.
754 */
755static void
756prepare_child_sems(
757	blocking_child *c
758	)
759{
760	if (NULL == worker_memlock)
761		worker_memlock = create_sema(&worker_mmutex, 1, 1);
762
763	c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
764	c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
765	c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
766#   ifndef WORK_PIPE
767	c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
768#   endif
769}
770
771/* --------------------------------------------------------------------
772 * wait for semaphore. Where the wait can be interrupted, it will
773 * internally resume -- When this function returns, there is either no
774 * semaphore at all, a timeout occurred, or the caller could
775 * successfully take a token from the semaphore.
776 *
777 * For untimed wait, not checking the result of this function at all is
778 * definitely an option.
779 */
780static int
781wait_for_sem(
782	sem_ref			sem,
783	struct timespec *	timeout		/* wall-clock */
784	)
785#ifdef SYS_WINNT
786{
787	struct timespec now;
788	struct timespec delta;
789	DWORD		msec;
790	DWORD		rc;
791
792	if (!(sem && sem->shnd)) {
793		errno = EINVAL;
794		return -1;
795	}
796
797	if (NULL == timeout) {
798		msec = INFINITE;
799	} else {
800		getclock(TIMEOFDAY, &now);
801		delta = sub_tspec(*timeout, now);
802		if (delta.tv_sec < 0) {
803			msec = 0;
804		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
805			msec = INFINITE;
806		} else {
807			msec = 1000 * (DWORD)delta.tv_sec;
808			msec += delta.tv_nsec / (1000 * 1000);
809		}
810	}
811	rc = WaitForSingleObject(sem->shnd, msec);
812	if (WAIT_OBJECT_0 == rc)
813		return 0;
814	if (WAIT_TIMEOUT == rc) {
815		errno = ETIMEDOUT;
816		return -1;
817	}
818	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
819	errno = EFAULT;
820	return -1;
821}
822#else	/* pthreads wait_for_sem() follows */
823{
824	int rc = -1;
825
826	if (sem) do {
827			if (NULL == timeout)
828				rc = sem_wait(sem);
829			else
830				rc = sem_timedwait(sem, timeout);
831		} while (rc == -1 && errno == EINTR);
832	else
833		errno = EINVAL;
834
835	return rc;
836}
837#endif
838
839/* --------------------------------------------------------------------
840 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
841 * calling conventions under Windows and POSIX-defined signature
842 * otherwise.
843 */
844#ifdef SYS_WINNT
845u_int WINAPI
846#else
847void *
848#endif
849blocking_thread(
850	void *	ThreadArg
851	)
852{
853	blocking_child *c;
854
855	c = ThreadArg;
856	exit_worker(blocking_child_common(c));
857
858	/* NOTREACHED */
859	return 0;
860}
861
862/* --------------------------------------------------------------------
863 * req_child_exit() runs in the parent.
864 *
865 * This function is called from from the idle timer, too, and possibly
866 * without a thread being there any longer. Since we have folded up our
867 * tent in that case and all the semaphores are already gone, we simply
868 * ignore this request in this case.
869 *
870 * Since the existence of the semaphores is controlled exclusively by
871 * the parent, there's no risk of data race here.
872 */
873int
874req_child_exit(
875	blocking_child *c
876	)
877{
878	return (c->accesslock)
879	    ? queue_req_pointer(c, CHILD_EXIT_REQ)
880	    : 0;
881}
882
883/* --------------------------------------------------------------------
884 * cleanup_after_child() runs in parent.
885 */
886static void
887cleanup_after_child(
888	blocking_child *	c
889	)
890{
891	DEBUG_INSIST(!c->reusable);
892
893#   ifdef SYS_WINNT
894	/* The thread was not created in detached state, so we better
895	 * clean up.
896	 */
897	if (c->thread_ref && c->thread_ref->thnd) {
898		WaitForSingleObject(c->thread_ref->thnd, INFINITE);
899		INSIST(CloseHandle(c->thread_ref->thnd));
900		c->thread_ref->thnd = NULL;
901	}
902#   endif
903	c->thread_ref = NULL;
904
905	/* remove semaphores and (if signalling vi IO) pipes */
906
907	c->accesslock           = delete_sema(c->accesslock);
908	c->workitems_pending    = delete_sema(c->workitems_pending);
909	c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
910
911#   ifdef WORK_PIPE
912	DEBUG_INSIST(-1 != c->resp_read_pipe);
913	DEBUG_INSIST(-1 != c->resp_write_pipe);
914	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
915	close(c->resp_write_pipe);
916	close(c->resp_read_pipe);
917	c->resp_write_pipe = -1;
918	c->resp_read_pipe = -1;
919#   else
920	DEBUG_INSIST(NULL != c->responses_pending);
921	(*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
922	c->responses_pending = delete_sema(c->responses_pending);
923#   endif
924
925	/* Is it necessary to check if there are pending requests and
926	 * responses? If so, and if there are, what to do with them?
927	 */
928
929	/* re-init buffer index sequencers */
930	c->head_workitem = 0;
931	c->tail_workitem = 0;
932	c->head_response = 0;
933	c->tail_response = 0;
934
935	c->reusable = TRUE;
936}
937
938
939#else	/* !WORK_THREAD follows */
940char work_thread_nonempty_compilation_unit;
941#endif
942