work_thread.c revision 275970
1275970Scy/*
2275970Scy * work_thread.c - threads implementation for blocking worker child.
3275970Scy */
4275970Scy#include <config.h>
5275970Scy#include "ntp_workimpl.h"
6275970Scy
7275970Scy#ifdef WORK_THREAD
8275970Scy
9275970Scy#include <stdio.h>
10275970Scy#include <ctype.h>
11275970Scy#include <signal.h>
12275970Scy#ifndef SYS_WINNT
13275970Scy#include <pthread.h>
14275970Scy#endif
15275970Scy
16275970Scy#include "ntp_stdlib.h"
17275970Scy#include "ntp_malloc.h"
18275970Scy#include "ntp_syslog.h"
19275970Scy#include "ntpd.h"
20275970Scy#include "ntp_io.h"
21275970Scy#include "ntp_assert.h"
22275970Scy#include "ntp_unixtime.h"
23275970Scy#include "timespecops.h"
24275970Scy#include "ntp_worker.h"
25275970Scy
26275970Scy#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27275970Scy#define CHILD_GONE_RESP	CHILD_EXIT_REQ
28275970Scy#define WORKITEMS_ALLOC_INC	16
29275970Scy#define RESPONSES_ALLOC_INC	4
30275970Scy
31275970Scy#ifndef THREAD_MINSTACKSIZE
32275970Scy#define THREAD_MINSTACKSIZE	(64U * 1024)
33275970Scy#endif
34275970Scy
35275970Scy#ifndef DEVOLATILE
36275970Scy#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var))
37275970Scy#endif
38275970Scy
39275970Scy#ifdef SYS_WINNT
40275970Scy# define thread_exit(c)	_endthreadex(c)
41275970Scy# define tickle_sem	SetEvent
42275970Scy#else
43275970Scy# define thread_exit(c)	pthread_exit((void*)(size_t)(c))
44275970Scy# define tickle_sem	sem_post
45275970Scy#endif
46275970Scy
47275970Scy#ifdef WORK_PIPE
48275970Scyaddremove_io_fd_func		addremove_io_fd;
49275970Scy#else
50275970Scyaddremove_io_semaphore_func	addremove_io_semaphore;
51275970Scy#endif
52275970Scy
53275970Scystatic	void	start_blocking_thread(blocking_child *);
54275970Scystatic	void	start_blocking_thread_internal(blocking_child *);
55275970Scystatic	void	prepare_child_sems(blocking_child *);
56275970Scystatic	int	wait_for_sem(sem_ref, struct timespec *);
57275970Scystatic	void	ensure_workitems_empty_slot(blocking_child *);
58275970Scystatic	void	ensure_workresp_empty_slot(blocking_child *);
59275970Scystatic	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
60275970Scystatic	void	cleanup_after_child(blocking_child *);
61275970Scy#ifdef SYS_WINNT
62275970Scyu_int	WINAPI	blocking_thread(void *);
63275970Scy#else
64275970Scyvoid *		blocking_thread(void *);
65275970Scy#endif
66275970Scy#ifndef SYS_WINNT
67275970Scystatic	void	block_thread_signals(sigset_t *);
68275970Scy#endif
69275970Scy
70275970Scy
71275970Scyvoid
72275970Scyexit_worker(
73275970Scy	int	exitcode
74275970Scy	)
75275970Scy{
76275970Scy	thread_exit(exitcode);	/* see #define thread_exit */
77275970Scy}
78275970Scy
79275970Scy
80275970Scyint
81275970Scyworker_sleep(
82275970Scy	blocking_child *	c,
83275970Scy	time_t			seconds
84275970Scy	)
85275970Scy{
86275970Scy	struct timespec	until;
87275970Scy	int		rc;
88275970Scy
89275970Scy# ifdef HAVE_CLOCK_GETTIME
90275970Scy	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
91275970Scy		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
92275970Scy		return -1;
93275970Scy	}
94275970Scy# else
95275970Scy	if (0 != getclock(TIMEOFDAY, &until)) {
96275970Scy		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
97275970Scy		return -1;
98275970Scy	}
99275970Scy# endif
100275970Scy	until.tv_sec += seconds;
101275970Scy	do {
102275970Scy		rc = wait_for_sem(c->wake_scheduled_sleep, &until);
103275970Scy	} while (-1 == rc && EINTR == errno);
104275970Scy	if (0 == rc)
105275970Scy		return -1;
106275970Scy	if (-1 == rc && ETIMEDOUT == errno)
107275970Scy		return 0;
108275970Scy	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
109275970Scy	return -1;
110275970Scy}
111275970Scy
112275970Scy
113275970Scyvoid
114275970Scyinterrupt_worker_sleep(void)
115275970Scy{
116275970Scy	u_int			idx;
117275970Scy	blocking_child *	c;
118275970Scy
119275970Scy	for (idx = 0; idx < blocking_children_alloc; idx++) {
120275970Scy		c = blocking_children[idx];
121275970Scy		if (NULL == c || NULL == c->wake_scheduled_sleep)
122275970Scy			continue;
123275970Scy		tickle_sem(c->wake_scheduled_sleep);
124275970Scy	}
125275970Scy}
126275970Scy
127275970Scy
128275970Scystatic void
129275970Scyensure_workitems_empty_slot(
130275970Scy	blocking_child *c
131275970Scy	)
132275970Scy{
133275970Scy	const size_t	each = sizeof(blocking_children[0]->workitems[0]);
134275970Scy	size_t		new_alloc;
135275970Scy	size_t		old_octets;
136275970Scy	size_t		new_octets;
137275970Scy	void *		nonvol_workitems;
138275970Scy
139275970Scy
140275970Scy	if (c->workitems != NULL &&
141275970Scy	    NULL == c->workitems[c->next_workitem])
142275970Scy		return;
143275970Scy
144275970Scy	new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
145275970Scy	old_octets = c->workitems_alloc * each;
146275970Scy	new_octets = new_alloc * each;
147275970Scy	nonvol_workitems = DEVOLATILE(void *, c->workitems);
148275970Scy	c->workitems = erealloc_zero(nonvol_workitems, new_octets,
149275970Scy				     old_octets);
150275970Scy	if (0 == c->next_workitem)
151275970Scy		c->next_workitem = c->workitems_alloc;
152275970Scy	c->workitems_alloc = new_alloc;
153275970Scy}
154275970Scy
155275970Scy
156275970Scystatic void
157275970Scyensure_workresp_empty_slot(
158275970Scy	blocking_child *c
159275970Scy	)
160275970Scy{
161275970Scy	const size_t	each = sizeof(blocking_children[0]->responses[0]);
162275970Scy	size_t		new_alloc;
163275970Scy	size_t		old_octets;
164275970Scy	size_t		new_octets;
165275970Scy	void *		nonvol_responses;
166275970Scy
167275970Scy	if (c->responses != NULL &&
168275970Scy	    NULL == c->responses[c->next_response])
169275970Scy		return;
170275970Scy
171275970Scy	new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
172275970Scy	old_octets = c->responses_alloc * each;
173275970Scy	new_octets = new_alloc * each;
174275970Scy	nonvol_responses = DEVOLATILE(void *, c->responses);
175275970Scy	c->responses = erealloc_zero(nonvol_responses, new_octets,
176275970Scy				     old_octets);
177275970Scy	if (0 == c->next_response)
178275970Scy		c->next_response = c->responses_alloc;
179275970Scy	c->responses_alloc = new_alloc;
180275970Scy}
181275970Scy
182275970Scy
183275970Scy/*
184275970Scy * queue_req_pointer() - append a work item or idle exit request to
185275970Scy *			 blocking_workitems[].
186275970Scy */
187275970Scystatic int
188275970Scyqueue_req_pointer(
189275970Scy	blocking_child	*	c,
190275970Scy	blocking_pipe_header *	hdr
191275970Scy	)
192275970Scy{
193275970Scy	c->workitems[c->next_workitem] = hdr;
194275970Scy	c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc;
195275970Scy
196275970Scy	/*
197275970Scy	 * We only want to signal the wakeup event if the child is
198275970Scy	 * blocking on it, which is indicated by setting the blocking
199275970Scy	 * event.  Wait with zero timeout to test.
200275970Scy	 */
201275970Scy	/* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */
202275970Scy		tickle_sem(c->blocking_req_ready);
203275970Scy
204275970Scy	return 0;
205275970Scy}
206275970Scy
207275970Scy
208275970Scyint
209275970Scysend_blocking_req_internal(
210275970Scy	blocking_child *	c,
211275970Scy	blocking_pipe_header *	hdr,
212275970Scy	void *			data
213275970Scy	)
214275970Scy{
215275970Scy	blocking_pipe_header *	threadcopy;
216275970Scy	size_t			payload_octets;
217275970Scy
218275970Scy	REQUIRE(hdr != NULL);
219275970Scy	REQUIRE(data != NULL);
220275970Scy	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
221275970Scy
222275970Scy	if (hdr->octets <= sizeof(*hdr))
223275970Scy		return 1;	/* failure */
224275970Scy	payload_octets = hdr->octets - sizeof(*hdr);
225275970Scy
226275970Scy	ensure_workitems_empty_slot(c);
227275970Scy	if (NULL == c->thread_ref) {
228275970Scy		ensure_workresp_empty_slot(c);
229275970Scy		start_blocking_thread(c);
230275970Scy	}
231275970Scy
232275970Scy	threadcopy = emalloc(hdr->octets);
233275970Scy	memcpy(threadcopy, hdr, sizeof(*hdr));
234275970Scy	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
235275970Scy
236275970Scy	return queue_req_pointer(c, threadcopy);
237275970Scy}
238275970Scy
239275970Scy
240275970Scyblocking_pipe_header *
241275970Scyreceive_blocking_req_internal(
242275970Scy	blocking_child *	c
243275970Scy	)
244275970Scy{
245275970Scy	blocking_pipe_header *	req;
246275970Scy	int			rc;
247275970Scy
248275970Scy	/*
249275970Scy	 * Child blocks here when idle.  SysV semaphores maintain a
250275970Scy	 * count and release from sem_wait() only when it reaches 0.
251275970Scy	 * Windows auto-reset events are simpler, and multiple SetEvent
252275970Scy	 * calls before any thread waits result in a single wakeup.
253275970Scy	 * On Windows, the child drains all workitems each wakeup, while
254275970Scy	 * with SysV semaphores wait_sem() is used before each item.
255275970Scy	 */
256275970Scy#ifdef SYS_WINNT
257275970Scy	while (NULL == c->workitems[c->next_workeritem]) {
258275970Scy		/* !!!! SetEvent(c->child_is_blocking); */
259275970Scy		rc = wait_for_sem(c->blocking_req_ready, NULL);
260275970Scy		INSIST(0 == rc);
261275970Scy		/* !!!! ResetEvent(c->child_is_blocking); */
262275970Scy	}
263275970Scy#else
264275970Scy	do {
265275970Scy		rc = wait_for_sem(c->blocking_req_ready, NULL);
266275970Scy	} while (-1 == rc && EINTR == errno);
267275970Scy	INSIST(0 == rc);
268275970Scy#endif
269275970Scy
270275970Scy	req = c->workitems[c->next_workeritem];
271275970Scy	INSIST(NULL != req);
272275970Scy	c->workitems[c->next_workeritem] = NULL;
273275970Scy	c->next_workeritem = (1 + c->next_workeritem) %
274275970Scy				c->workitems_alloc;
275275970Scy
276275970Scy	if (CHILD_EXIT_REQ == req) {	/* idled out */
277275970Scy		send_blocking_resp_internal(c, CHILD_GONE_RESP);
278275970Scy		req = NULL;
279275970Scy	}
280275970Scy
281275970Scy	return req;
282275970Scy}
283275970Scy
284275970Scy
285275970Scyint
286275970Scysend_blocking_resp_internal(
287275970Scy	blocking_child *	c,
288275970Scy	blocking_pipe_header *	resp
289275970Scy	)
290275970Scy{
291275970Scy	ensure_workresp_empty_slot(c);
292275970Scy
293275970Scy	c->responses[c->next_response] = resp;
294275970Scy	c->next_response = (1 + c->next_response) % c->responses_alloc;
295275970Scy
296275970Scy#ifdef WORK_PIPE
297275970Scy	write(c->resp_write_pipe, "", 1);
298275970Scy#else
299275970Scy	tickle_sem(c->blocking_response_ready);
300275970Scy#endif
301275970Scy
302275970Scy	return 0;
303275970Scy}
304275970Scy
305275970Scy
306275970Scy#ifndef WORK_PIPE
307275970Scyvoid
308275970Scyhandle_blocking_resp_sem(
309275970Scy	void *	context
310275970Scy	)
311275970Scy{
312275970Scy	HANDLE			ready;
313275970Scy	blocking_child *	c;
314275970Scy	u_int			idx;
315275970Scy
316275970Scy	ready = (HANDLE)context;
317275970Scy	c = NULL;
318275970Scy	for (idx = 0; idx < blocking_children_alloc; idx++) {
319275970Scy		c = blocking_children[idx];
320275970Scy		if (c != NULL && c->thread_ref != NULL &&
321275970Scy		    ready == c->blocking_response_ready)
322275970Scy			break;
323275970Scy	}
324275970Scy	if (idx < blocking_children_alloc)
325275970Scy		process_blocking_resp(c);
326275970Scy}
327275970Scy#endif	/* !WORK_PIPE */
328275970Scy
329275970Scy
330275970Scyblocking_pipe_header *
331275970Scyreceive_blocking_resp_internal(
332275970Scy	blocking_child *	c
333275970Scy	)
334275970Scy{
335275970Scy	blocking_pipe_header *	removed;
336275970Scy#ifdef WORK_PIPE
337275970Scy	int			rc;
338275970Scy	char			scratch[32];
339275970Scy
340275970Scy	do {
341275970Scy		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
342275970Scy	} while (-1 == rc && EINTR == errno);
343275970Scy#endif
344275970Scy	removed = c->responses[c->next_workresp];
345275970Scy	if (NULL != removed) {
346275970Scy		c->responses[c->next_workresp] = NULL;
347275970Scy		c->next_workresp = (1 + c->next_workresp) %
348275970Scy				   c->responses_alloc;
349275970Scy		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
350275970Scy			     BLOCKING_RESP_MAGIC == removed->magic_sig);
351275970Scy	}
352275970Scy	if (CHILD_GONE_RESP == removed) {
353275970Scy		cleanup_after_child(c);
354275970Scy		removed = NULL;
355275970Scy	}
356275970Scy
357275970Scy	return removed;
358275970Scy}
359275970Scy
360275970Scy
361275970Scystatic void
362275970Scystart_blocking_thread(
363275970Scy	blocking_child *	c
364275970Scy	)
365275970Scy{
366275970Scy
367275970Scy	DEBUG_INSIST(!c->reusable);
368275970Scy
369275970Scy	prepare_child_sems(c);
370275970Scy	start_blocking_thread_internal(c);
371275970Scy}
372275970Scy
373275970Scy
374275970Scystatic void
375275970Scystart_blocking_thread_internal(
376275970Scy	blocking_child *	c
377275970Scy	)
378275970Scy#ifdef SYS_WINNT
379275970Scy{
380275970Scy	thr_ref	blocking_child_thread;
381275970Scy	u_int	blocking_thread_id;
382275970Scy	BOOL	resumed;
383275970Scy
384275970Scy	(*addremove_io_semaphore)(c->blocking_response_ready, FALSE);
385275970Scy	blocking_child_thread =
386275970Scy		(HANDLE)_beginthreadex(
387275970Scy			NULL,
388275970Scy			0,
389275970Scy			&blocking_thread,
390275970Scy			c,
391275970Scy			CREATE_SUSPENDED,
392275970Scy			&blocking_thread_id);
393275970Scy
394275970Scy	if (NULL == blocking_child_thread) {
395275970Scy		msyslog(LOG_ERR, "start blocking thread failed: %m");
396275970Scy		exit(-1);
397275970Scy	}
398275970Scy	c->thread_id = blocking_thread_id;
399275970Scy	c->thread_ref = blocking_child_thread;
400275970Scy	/* remember the thread priority is only within the process class */
401275970Scy	if (!SetThreadPriority(blocking_child_thread,
402275970Scy			       THREAD_PRIORITY_BELOW_NORMAL))
403275970Scy		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
404275970Scy
405275970Scy	resumed = ResumeThread(blocking_child_thread);
406275970Scy	DEBUG_INSIST(resumed);
407275970Scy}
408275970Scy#else	/* pthreads start_blocking_thread_internal() follows */
409275970Scy{
410275970Scy# ifdef NEED_PTHREAD_INIT
411275970Scy	static int	pthread_init_called;
412275970Scy# endif
413275970Scy	pthread_attr_t	thr_attr;
414275970Scy	int		rc;
415275970Scy	int		saved_errno;
416275970Scy	int		pipe_ends[2];	/* read then write */
417275970Scy	int		is_pipe;
418275970Scy	int		flags;
419275970Scy	size_t		stacksize;
420275970Scy	sigset_t	saved_sig_mask;
421275970Scy
422275970Scy# ifdef NEED_PTHREAD_INIT
423275970Scy	/*
424275970Scy	 * from lib/isc/unix/app.c:
425275970Scy	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
426275970Scy	 */
427275970Scy	if (!pthread_init_called) {
428275970Scy		pthread_init();
429275970Scy		pthread_init_called = TRUE;
430275970Scy	}
431275970Scy# endif
432275970Scy
433275970Scy	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
434275970Scy	if (0 != rc) {
435275970Scy		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
436275970Scy		exit(1);
437275970Scy	}
438275970Scy	c->resp_read_pipe = move_fd(pipe_ends[0]);
439275970Scy	c->resp_write_pipe = move_fd(pipe_ends[1]);
440275970Scy	c->ispipe = is_pipe;
441275970Scy	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
442275970Scy	if (-1 == flags) {
443275970Scy		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
444275970Scy		exit(1);
445275970Scy	}
446275970Scy	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
447275970Scy	if (-1 == rc) {
448275970Scy		msyslog(LOG_ERR,
449275970Scy			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
450275970Scy		exit(1);
451275970Scy	}
452275970Scy	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
453275970Scy	pthread_attr_init(&thr_attr);
454275970Scy	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
455275970Scy#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
456275970Scy    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
457275970Scy	rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
458275970Scy	if (-1 == rc) {
459275970Scy		msyslog(LOG_ERR,
460275970Scy			"start_blocking_thread: pthread_attr_getstacksize %m");
461275970Scy	} else if (stacksize < THREAD_MINSTACKSIZE) {
462275970Scy		rc = pthread_attr_setstacksize(&thr_attr,
463275970Scy					       THREAD_MINSTACKSIZE);
464275970Scy		if (-1 == rc)
465275970Scy			msyslog(LOG_ERR,
466275970Scy				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
467275970Scy				(u_long)stacksize,
468275970Scy				(u_long)THREAD_MINSTACKSIZE);
469275970Scy	}
470275970Scy#else
471275970Scy	UNUSED_ARG(stacksize);
472275970Scy#endif
473275970Scy#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
474275970Scy	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
475275970Scy#endif
476275970Scy	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
477275970Scy	block_thread_signals(&saved_sig_mask);
478275970Scy	rc = pthread_create(c->thread_ref, &thr_attr,
479275970Scy			    &blocking_thread, c);
480275970Scy	saved_errno = errno;
481275970Scy	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
482275970Scy	pthread_attr_destroy(&thr_attr);
483275970Scy	if (0 != rc) {
484275970Scy		errno = saved_errno;
485275970Scy		msyslog(LOG_ERR, "pthread_create() blocking child: %m");
486275970Scy		exit(1);
487275970Scy	}
488275970Scy}
489275970Scy#endif
490275970Scy
491275970Scy
492275970Scy/*
493275970Scy * block_thread_signals()
494275970Scy *
495275970Scy * Temporarily block signals used by ntpd main thread, so that signal
496275970Scy * mask inherited by child threads leaves them blocked.  Returns prior
497275970Scy * active signal mask via pmask, to be restored by the main thread
498275970Scy * after pthread_create().
499275970Scy */
500275970Scy#ifndef SYS_WINNT
501275970Scyvoid
502275970Scyblock_thread_signals(
503275970Scy	sigset_t *	pmask
504275970Scy	)
505275970Scy{
506275970Scy	sigset_t	block;
507275970Scy
508275970Scy	sigemptyset(&block);
509275970Scy# ifdef HAVE_SIGNALED_IO
510275970Scy#  ifdef SIGIO
511275970Scy	sigaddset(&block, SIGIO);
512275970Scy#  endif
513275970Scy#  ifdef SIGPOLL
514275970Scy	sigaddset(&block, SIGPOLL);
515275970Scy#  endif
516275970Scy# endif	/* HAVE_SIGNALED_IO */
517275970Scy	sigaddset(&block, SIGALRM);
518275970Scy	sigaddset(&block, MOREDEBUGSIG);
519275970Scy	sigaddset(&block, LESSDEBUGSIG);
520275970Scy# ifdef SIGDIE1
521275970Scy	sigaddset(&block, SIGDIE1);
522275970Scy# endif
523275970Scy# ifdef SIGDIE2
524275970Scy	sigaddset(&block, SIGDIE2);
525275970Scy# endif
526275970Scy# ifdef SIGDIE3
527275970Scy	sigaddset(&block, SIGDIE3);
528275970Scy# endif
529275970Scy# ifdef SIGDIE4
530275970Scy	sigaddset(&block, SIGDIE4);
531275970Scy# endif
532275970Scy# ifdef SIGBUS
533275970Scy	sigaddset(&block, SIGBUS);
534275970Scy# endif
535275970Scy	sigemptyset(pmask);
536275970Scy	pthread_sigmask(SIG_BLOCK, &block, pmask);
537275970Scy}
538275970Scy#endif	/* !SYS_WINNT */
539275970Scy
540275970Scy
541275970Scy/*
542275970Scy * prepare_child_sems()
543275970Scy *
544275970Scy * create sync events (semaphores)
545275970Scy * child_is_blocking initially unset
546275970Scy * blocking_req_ready initially unset
547275970Scy *
548275970Scy * Child waits for blocking_req_ready to be set after
549275970Scy * setting child_is_blocking.  blocking_req_ready and
550275970Scy * blocking_response_ready are auto-reset, so wake one
551275970Scy * waiter and become unset (unsignalled) in one operation.
552275970Scy */
553275970Scystatic void
554275970Scyprepare_child_sems(
555275970Scy	blocking_child *c
556275970Scy	)
557275970Scy#ifdef SYS_WINNT
558275970Scy{
559275970Scy	if (NULL == c->blocking_req_ready) {
560275970Scy		/* manual reset using ResetEvent() */
561275970Scy		/* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */
562275970Scy		/* auto reset - one thread released from wait each set */
563275970Scy		c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
564275970Scy		c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
565275970Scy		c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL);
566275970Scy	} else {
567275970Scy		/* !!!! ResetEvent(c->child_is_blocking); */
568275970Scy		/* ResetEvent(c->blocking_req_ready); */
569275970Scy		/* ResetEvent(c->blocking_response_ready); */
570275970Scy		/* ResetEvent(c->wake_scheduled_sleep); */
571275970Scy	}
572275970Scy}
573275970Scy#else	/* pthreads prepare_child_sems() follows */
574275970Scy{
575275970Scy	size_t	octets;
576275970Scy
577275970Scy	if (NULL == c->blocking_req_ready) {
578275970Scy		octets = sizeof(*c->blocking_req_ready);
579275970Scy		octets += sizeof(*c->wake_scheduled_sleep);
580275970Scy		/* !!!! octets += sizeof(*c->child_is_blocking); */
581275970Scy		c->blocking_req_ready = emalloc_zero(octets);;
582275970Scy		c->wake_scheduled_sleep = 1 + c->blocking_req_ready;
583275970Scy		/* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */
584275970Scy	} else {
585275970Scy		sem_destroy(c->blocking_req_ready);
586275970Scy		sem_destroy(c->wake_scheduled_sleep);
587275970Scy		/* !!!! sem_destroy(c->child_is_blocking); */
588275970Scy	}
589275970Scy	sem_init(c->blocking_req_ready, FALSE, 0);
590275970Scy	sem_init(c->wake_scheduled_sleep, FALSE, 0);
591275970Scy	/* !!!! sem_init(c->child_is_blocking, FALSE, 0); */
592275970Scy}
593275970Scy#endif
594275970Scy
595275970Scy
596275970Scystatic int
597275970Scywait_for_sem(
598275970Scy	sem_ref			sem,
599275970Scy	struct timespec *	timeout		/* wall-clock */
600275970Scy	)
601275970Scy#ifdef SYS_WINNT
602275970Scy{
603275970Scy	struct timespec now;
604275970Scy	struct timespec delta;
605275970Scy	DWORD		msec;
606275970Scy	DWORD		rc;
607275970Scy
608275970Scy	if (NULL == timeout) {
609275970Scy		msec = INFINITE;
610275970Scy	} else {
611275970Scy		getclock(TIMEOFDAY, &now);
612275970Scy		delta = sub_tspec(*timeout, now);
613275970Scy		if (delta.tv_sec < 0) {
614275970Scy			msec = 0;
615275970Scy		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
616275970Scy			msec = INFINITE;
617275970Scy		} else {
618275970Scy			msec = 1000 * (DWORD)delta.tv_sec;
619275970Scy			msec += delta.tv_nsec / (1000 * 1000);
620275970Scy		}
621275970Scy	}
622275970Scy	rc = WaitForSingleObject(sem, msec);
623275970Scy	if (WAIT_OBJECT_0 == rc)
624275970Scy		return 0;
625275970Scy	if (WAIT_TIMEOUT == rc) {
626275970Scy		errno = ETIMEDOUT;
627275970Scy		return -1;
628275970Scy	}
629275970Scy	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
630275970Scy	errno = EFAULT;
631275970Scy	return -1;
632275970Scy}
633275970Scy#else	/* pthreads wait_for_sem() follows */
634275970Scy{
635275970Scy	int rc;
636275970Scy
637275970Scy	if (NULL == timeout)
638275970Scy		rc = sem_wait(sem);
639275970Scy	else
640275970Scy		rc = sem_timedwait(sem, timeout);
641275970Scy
642275970Scy	return rc;
643275970Scy}
644275970Scy#endif
645275970Scy
646275970Scy
647275970Scy/*
648275970Scy * blocking_thread - thread functions have WINAPI calling convention
649275970Scy */
650275970Scy#ifdef SYS_WINNT
651275970Scyu_int
652275970ScyWINAPI
653275970Scy#else
654275970Scyvoid *
655275970Scy#endif
656275970Scyblocking_thread(
657275970Scy	void *	ThreadArg
658275970Scy	)
659275970Scy{
660275970Scy	blocking_child *c;
661275970Scy
662275970Scy	c = ThreadArg;
663275970Scy	exit_worker(blocking_child_common(c));
664275970Scy
665275970Scy	/* NOTREACHED */
666275970Scy	return 0;
667275970Scy}
668275970Scy
669275970Scy
670275970Scy/*
671275970Scy * req_child_exit() runs in the parent.
672275970Scy */
673275970Scyint
674275970Scyreq_child_exit(
675275970Scy	blocking_child *c
676275970Scy	)
677275970Scy{
678275970Scy	return queue_req_pointer(c, CHILD_EXIT_REQ);
679275970Scy}
680275970Scy
681275970Scy
682275970Scy/*
683275970Scy * cleanup_after_child() runs in parent.
684275970Scy */
685275970Scystatic void
686275970Scycleanup_after_child(
687275970Scy	blocking_child *	c
688275970Scy	)
689275970Scy{
690275970Scy	u_int	idx;
691275970Scy
692275970Scy	DEBUG_INSIST(!c->reusable);
693275970Scy#ifdef SYS_WINNT
694275970Scy	INSIST(CloseHandle(c->thread_ref));
695275970Scy#else
696275970Scy	free(c->thread_ref);
697275970Scy#endif
698275970Scy	c->thread_ref = NULL;
699275970Scy	c->thread_id = 0;
700275970Scy#ifdef WORK_PIPE
701275970Scy	DEBUG_INSIST(-1 != c->resp_read_pipe);
702275970Scy	DEBUG_INSIST(-1 != c->resp_write_pipe);
703275970Scy	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
704275970Scy	close(c->resp_write_pipe);
705275970Scy	close(c->resp_read_pipe);
706275970Scy	c->resp_write_pipe = -1;
707275970Scy	c->resp_read_pipe = -1;
708275970Scy#else
709275970Scy	DEBUG_INSIST(NULL != c->blocking_response_ready);
710275970Scy	(*addremove_io_semaphore)(c->blocking_response_ready, TRUE);
711275970Scy#endif
712275970Scy	for (idx = 0; idx < c->workitems_alloc; idx++)
713275970Scy		c->workitems[idx] = NULL;
714275970Scy	c->next_workitem = 0;
715275970Scy	c->next_workeritem = 0;
716275970Scy	for (idx = 0; idx < c->responses_alloc; idx++)
717275970Scy		c->responses[idx] = NULL;
718275970Scy	c->next_response = 0;
719275970Scy	c->next_workresp = 0;
720275970Scy	c->reusable = TRUE;
721275970Scy}
722275970Scy
723275970Scy
724275970Scy#else	/* !WORK_THREAD follows */
725275970Scychar work_thread_nonempty_compilation_unit;
726275970Scy#endif
727