work_thread.c revision 1.2
1/*	$NetBSD: work_thread.c,v 1.2 2014/12/19 20:43:17 christos Exp $	*/
2
3/*
4 * work_thread.c - threads implementation for blocking worker child.
5 */
6#include <config.h>
7#include "ntp_workimpl.h"
8
9#ifdef WORK_THREAD
10
11#include <stdio.h>
12#include <ctype.h>
13#include <signal.h>
14#ifndef SYS_WINNT
15#include <pthread.h>
16#endif
17
18#include "ntp_stdlib.h"
19#include "ntp_malloc.h"
20#include "ntp_syslog.h"
21#include "ntpd.h"
22#include "ntp_io.h"
23#include "ntp_assert.h"
24#include "ntp_unixtime.h"
25#include "timespecops.h"
26#include "ntp_worker.h"
27
28#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
29#define CHILD_GONE_RESP	CHILD_EXIT_REQ
30#define WORKITEMS_ALLOC_INC	16
31#define RESPONSES_ALLOC_INC	4
32
33#ifndef THREAD_MINSTACKSIZE
34#define THREAD_MINSTACKSIZE	(64U * 1024)
35#endif
36
37#ifndef DEVOLATILE
38#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var))
39#endif
40
41#ifdef SYS_WINNT
42# define thread_exit(c)	_endthreadex(c)
43# define tickle_sem	SetEvent
44#else
45# define thread_exit(c)	pthread_exit((void*)(size_t)(c))
46# define tickle_sem	sem_post
47#endif
48
49#ifdef WORK_PIPE
50addremove_io_fd_func		addremove_io_fd;
51#else
52addremove_io_semaphore_func	addremove_io_semaphore;
53#endif
54
55static	void	start_blocking_thread(blocking_child *);
56static	void	start_blocking_thread_internal(blocking_child *);
57static	void	prepare_child_sems(blocking_child *);
58static	int	wait_for_sem(sem_ref, struct timespec *);
59static	void	ensure_workitems_empty_slot(blocking_child *);
60static	void	ensure_workresp_empty_slot(blocking_child *);
61static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
62static	void	cleanup_after_child(blocking_child *);
63#ifdef SYS_WINNT
64u_int	WINAPI	blocking_thread(void *);
65#else
66void *		blocking_thread(void *);
67#endif
68#ifndef SYS_WINNT
69static	void	block_thread_signals(sigset_t *);
70#endif
71
72
73void
74exit_worker(
75	int	exitcode
76	)
77{
78	thread_exit(exitcode);	/* see #define thread_exit */
79}
80
81
82int
83worker_sleep(
84	blocking_child *	c,
85	time_t			seconds
86	)
87{
88	struct timespec	until;
89	int		rc;
90
91# ifdef HAVE_CLOCK_GETTIME
92	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
93		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
94		return -1;
95	}
96# else
97	if (0 != getclock(TIMEOFDAY, &until)) {
98		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
99		return -1;
100	}
101# endif
102	until.tv_sec += seconds;
103	do {
104		rc = wait_for_sem(c->wake_scheduled_sleep, &until);
105	} while (-1 == rc && EINTR == errno);
106	if (0 == rc)
107		return -1;
108	if (-1 == rc && ETIMEDOUT == errno)
109		return 0;
110	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
111	return -1;
112}
113
114
115void
116interrupt_worker_sleep(void)
117{
118	u_int			idx;
119	blocking_child *	c;
120
121	for (idx = 0; idx < blocking_children_alloc; idx++) {
122		c = blocking_children[idx];
123		if (NULL == c || NULL == c->wake_scheduled_sleep)
124			continue;
125		tickle_sem(c->wake_scheduled_sleep);
126	}
127}
128
129
130static void
131ensure_workitems_empty_slot(
132	blocking_child *c
133	)
134{
135	const size_t	each = sizeof(blocking_children[0]->workitems[0]);
136	size_t		new_alloc;
137	size_t		old_octets;
138	size_t		new_octets;
139	void *		nonvol_workitems;
140
141
142	if (c->workitems != NULL &&
143	    NULL == c->workitems[c->next_workitem])
144		return;
145
146	new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
147	old_octets = c->workitems_alloc * each;
148	new_octets = new_alloc * each;
149	nonvol_workitems = DEVOLATILE(void *, c->workitems);
150	c->workitems = erealloc_zero(nonvol_workitems, new_octets,
151				     old_octets);
152	if (0 == c->next_workitem)
153		c->next_workitem = c->workitems_alloc;
154	c->workitems_alloc = new_alloc;
155}
156
157
158static void
159ensure_workresp_empty_slot(
160	blocking_child *c
161	)
162{
163	const size_t	each = sizeof(blocking_children[0]->responses[0]);
164	size_t		new_alloc;
165	size_t		old_octets;
166	size_t		new_octets;
167	void *		nonvol_responses;
168
169	if (c->responses != NULL &&
170	    NULL == c->responses[c->next_response])
171		return;
172
173	new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
174	old_octets = c->responses_alloc * each;
175	new_octets = new_alloc * each;
176	nonvol_responses = DEVOLATILE(void *, c->responses);
177	c->responses = erealloc_zero(nonvol_responses, new_octets,
178				     old_octets);
179	if (0 == c->next_response)
180		c->next_response = c->responses_alloc;
181	c->responses_alloc = new_alloc;
182}
183
184
185/*
186 * queue_req_pointer() - append a work item or idle exit request to
187 *			 blocking_workitems[].
188 */
189static int
190queue_req_pointer(
191	blocking_child	*	c,
192	blocking_pipe_header *	hdr
193	)
194{
195	c->workitems[c->next_workitem] = hdr;
196	c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc;
197
198	/*
199	 * We only want to signal the wakeup event if the child is
200	 * blocking on it, which is indicated by setting the blocking
201	 * event.  Wait with zero timeout to test.
202	 */
203	/* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */
204		tickle_sem(c->blocking_req_ready);
205
206	return 0;
207}
208
209
210int
211send_blocking_req_internal(
212	blocking_child *	c,
213	blocking_pipe_header *	hdr,
214	void *			data
215	)
216{
217	blocking_pipe_header *	threadcopy;
218	size_t			payload_octets;
219
220	REQUIRE(hdr != NULL);
221	REQUIRE(data != NULL);
222	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
223
224	if (hdr->octets <= sizeof(*hdr))
225		return 1;	/* failure */
226	payload_octets = hdr->octets - sizeof(*hdr);
227
228	ensure_workitems_empty_slot(c);
229	if (NULL == c->thread_ref) {
230		ensure_workresp_empty_slot(c);
231		start_blocking_thread(c);
232	}
233
234	threadcopy = emalloc(hdr->octets);
235	memcpy(threadcopy, hdr, sizeof(*hdr));
236	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
237
238	return queue_req_pointer(c, threadcopy);
239}
240
241
242blocking_pipe_header *
243receive_blocking_req_internal(
244	blocking_child *	c
245	)
246{
247	blocking_pipe_header *	req;
248	int			rc;
249
250	/*
251	 * Child blocks here when idle.  SysV semaphores maintain a
252	 * count and release from sem_wait() only when it reaches 0.
253	 * Windows auto-reset events are simpler, and multiple SetEvent
254	 * calls before any thread waits result in a single wakeup.
255	 * On Windows, the child drains all workitems each wakeup, while
256	 * with SysV semaphores wait_sem() is used before each item.
257	 */
258#ifdef SYS_WINNT
259	while (NULL == c->workitems[c->next_workeritem]) {
260		/* !!!! SetEvent(c->child_is_blocking); */
261		rc = wait_for_sem(c->blocking_req_ready, NULL);
262		INSIST(0 == rc);
263		/* !!!! ResetEvent(c->child_is_blocking); */
264	}
265#else
266	do {
267		rc = wait_for_sem(c->blocking_req_ready, NULL);
268	} while (-1 == rc && EINTR == errno);
269	INSIST(0 == rc);
270#endif
271
272	req = c->workitems[c->next_workeritem];
273	INSIST(NULL != req);
274	c->workitems[c->next_workeritem] = NULL;
275	c->next_workeritem = (1 + c->next_workeritem) %
276				c->workitems_alloc;
277
278	if (CHILD_EXIT_REQ == req) {	/* idled out */
279		send_blocking_resp_internal(c, CHILD_GONE_RESP);
280		req = NULL;
281	}
282
283	return req;
284}
285
286
287int
288send_blocking_resp_internal(
289	blocking_child *	c,
290	blocking_pipe_header *	resp
291	)
292{
293	ensure_workresp_empty_slot(c);
294
295	c->responses[c->next_response] = resp;
296	c->next_response = (1 + c->next_response) % c->responses_alloc;
297
298#ifdef WORK_PIPE
299	write(c->resp_write_pipe, "", 1);
300#else
301	tickle_sem(c->blocking_response_ready);
302#endif
303
304	return 0;
305}
306
307
308#ifndef WORK_PIPE
309void
310handle_blocking_resp_sem(
311	void *	context
312	)
313{
314	HANDLE			ready;
315	blocking_child *	c;
316	u_int			idx;
317
318	ready = (HANDLE)context;
319	c = NULL;
320	for (idx = 0; idx < blocking_children_alloc; idx++) {
321		c = blocking_children[idx];
322		if (c != NULL && c->thread_ref != NULL &&
323		    ready == c->blocking_response_ready)
324			break;
325	}
326	if (idx < blocking_children_alloc)
327		process_blocking_resp(c);
328}
329#endif	/* !WORK_PIPE */
330
331
332blocking_pipe_header *
333receive_blocking_resp_internal(
334	blocking_child *	c
335	)
336{
337	blocking_pipe_header *	removed;
338#ifdef WORK_PIPE
339	int			rc;
340	char			scratch[32];
341
342	do {
343		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
344	} while (-1 == rc && EINTR == errno);
345#endif
346	removed = c->responses[c->next_workresp];
347	if (NULL != removed) {
348		c->responses[c->next_workresp] = NULL;
349		c->next_workresp = (1 + c->next_workresp) %
350				   c->responses_alloc;
351		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
352			     BLOCKING_RESP_MAGIC == removed->magic_sig);
353	}
354	if (CHILD_GONE_RESP == removed) {
355		cleanup_after_child(c);
356		removed = NULL;
357	}
358
359	return removed;
360}
361
362
363static void
364start_blocking_thread(
365	blocking_child *	c
366	)
367{
368
369	DEBUG_INSIST(!c->reusable);
370
371	prepare_child_sems(c);
372	start_blocking_thread_internal(c);
373}
374
375
376static void
377start_blocking_thread_internal(
378	blocking_child *	c
379	)
380#ifdef SYS_WINNT
381{
382	thr_ref	blocking_child_thread;
383	u_int	blocking_thread_id;
384	BOOL	resumed;
385
386	(*addremove_io_semaphore)(c->blocking_response_ready, FALSE);
387	blocking_child_thread =
388		(HANDLE)_beginthreadex(
389			NULL,
390			0,
391			&blocking_thread,
392			c,
393			CREATE_SUSPENDED,
394			&blocking_thread_id);
395
396	if (NULL == blocking_child_thread) {
397		msyslog(LOG_ERR, "start blocking thread failed: %m");
398		exit(-1);
399	}
400	c->thread_id = blocking_thread_id;
401	c->thread_ref = blocking_child_thread;
402	/* remember the thread priority is only within the process class */
403	if (!SetThreadPriority(blocking_child_thread,
404			       THREAD_PRIORITY_BELOW_NORMAL))
405		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
406
407	resumed = ResumeThread(blocking_child_thread);
408	DEBUG_INSIST(resumed);
409}
410#else	/* pthreads start_blocking_thread_internal() follows */
411{
412# ifdef NEED_PTHREAD_INIT
413	static int	pthread_init_called;
414# endif
415	pthread_attr_t	thr_attr;
416	int		rc;
417	int		saved_errno;
418	int		pipe_ends[2];	/* read then write */
419	int		is_pipe;
420	int		flags;
421	size_t		stacksize;
422	sigset_t	saved_sig_mask;
423
424# ifdef NEED_PTHREAD_INIT
425	/*
426	 * from lib/isc/unix/app.c:
427	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
428	 */
429	if (!pthread_init_called) {
430		pthread_init();
431		pthread_init_called = TRUE;
432	}
433# endif
434
435	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
436	if (0 != rc) {
437		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
438		exit(1);
439	}
440	c->resp_read_pipe = move_fd(pipe_ends[0]);
441	c->resp_write_pipe = move_fd(pipe_ends[1]);
442	c->ispipe = is_pipe;
443	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
444	if (-1 == flags) {
445		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
446		exit(1);
447	}
448	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
449	if (-1 == rc) {
450		msyslog(LOG_ERR,
451			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
452		exit(1);
453	}
454	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
455	pthread_attr_init(&thr_attr);
456	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
457#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
458    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
459	rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
460	if (-1 == rc) {
461		msyslog(LOG_ERR,
462			"start_blocking_thread: pthread_attr_getstacksize %m");
463	} else if (stacksize < THREAD_MINSTACKSIZE) {
464		rc = pthread_attr_setstacksize(&thr_attr,
465					       THREAD_MINSTACKSIZE);
466		if (-1 == rc)
467			msyslog(LOG_ERR,
468				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
469				(u_long)stacksize,
470				(u_long)THREAD_MINSTACKSIZE);
471	}
472#else
473	UNUSED_ARG(stacksize);
474#endif
475#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
476	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
477#endif
478	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
479	block_thread_signals(&saved_sig_mask);
480	rc = pthread_create(c->thread_ref, &thr_attr,
481			    &blocking_thread, c);
482	saved_errno = errno;
483	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
484	pthread_attr_destroy(&thr_attr);
485	if (0 != rc) {
486		errno = saved_errno;
487		msyslog(LOG_ERR, "pthread_create() blocking child: %m");
488		exit(1);
489	}
490}
491#endif
492
493
494/*
495 * block_thread_signals()
496 *
497 * Temporarily block signals used by ntpd main thread, so that signal
498 * mask inherited by child threads leaves them blocked.  Returns prior
499 * active signal mask via pmask, to be restored by the main thread
500 * after pthread_create().
501 */
502#ifndef SYS_WINNT
503void
504block_thread_signals(
505	sigset_t *	pmask
506	)
507{
508	sigset_t	block;
509
510	sigemptyset(&block);
511# ifdef HAVE_SIGNALED_IO
512#  ifdef SIGIO
513	sigaddset(&block, SIGIO);
514#  endif
515#  ifdef SIGPOLL
516	sigaddset(&block, SIGPOLL);
517#  endif
518# endif	/* HAVE_SIGNALED_IO */
519	sigaddset(&block, SIGALRM);
520	sigaddset(&block, MOREDEBUGSIG);
521	sigaddset(&block, LESSDEBUGSIG);
522# ifdef SIGDIE1
523	sigaddset(&block, SIGDIE1);
524# endif
525# ifdef SIGDIE2
526	sigaddset(&block, SIGDIE2);
527# endif
528# ifdef SIGDIE3
529	sigaddset(&block, SIGDIE3);
530# endif
531# ifdef SIGDIE4
532	sigaddset(&block, SIGDIE4);
533# endif
534# ifdef SIGBUS
535	sigaddset(&block, SIGBUS);
536# endif
537	sigemptyset(pmask);
538	pthread_sigmask(SIG_BLOCK, &block, pmask);
539}
540#endif	/* !SYS_WINNT */
541
542
543/*
544 * prepare_child_sems()
545 *
546 * create sync events (semaphores)
547 * child_is_blocking initially unset
548 * blocking_req_ready initially unset
549 *
550 * Child waits for blocking_req_ready to be set after
551 * setting child_is_blocking.  blocking_req_ready and
552 * blocking_response_ready are auto-reset, so wake one
553 * waiter and become unset (unsignalled) in one operation.
554 */
555static void
556prepare_child_sems(
557	blocking_child *c
558	)
559#ifdef SYS_WINNT
560{
561	if (NULL == c->blocking_req_ready) {
562		/* manual reset using ResetEvent() */
563		/* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */
564		/* auto reset - one thread released from wait each set */
565		c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
566		c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
567		c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL);
568	} else {
569		/* !!!! ResetEvent(c->child_is_blocking); */
570		/* ResetEvent(c->blocking_req_ready); */
571		/* ResetEvent(c->blocking_response_ready); */
572		/* ResetEvent(c->wake_scheduled_sleep); */
573	}
574}
575#else	/* pthreads prepare_child_sems() follows */
576{
577	size_t	octets;
578
579	if (NULL == c->blocking_req_ready) {
580		octets = sizeof(*c->blocking_req_ready);
581		octets += sizeof(*c->wake_scheduled_sleep);
582		/* !!!! octets += sizeof(*c->child_is_blocking); */
583		c->blocking_req_ready = emalloc_zero(octets);;
584		c->wake_scheduled_sleep = 1 + c->blocking_req_ready;
585		/* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */
586	} else {
587		sem_destroy(c->blocking_req_ready);
588		sem_destroy(c->wake_scheduled_sleep);
589		/* !!!! sem_destroy(c->child_is_blocking); */
590	}
591	sem_init(c->blocking_req_ready, FALSE, 0);
592	sem_init(c->wake_scheduled_sleep, FALSE, 0);
593	/* !!!! sem_init(c->child_is_blocking, FALSE, 0); */
594}
595#endif
596
597
598static int
599wait_for_sem(
600	sem_ref			sem,
601	struct timespec *	timeout		/* wall-clock */
602	)
603#ifdef SYS_WINNT
604{
605	struct timespec now;
606	struct timespec delta;
607	DWORD		msec;
608	DWORD		rc;
609
610	if (NULL == timeout) {
611		msec = INFINITE;
612	} else {
613		getclock(TIMEOFDAY, &now);
614		delta = sub_tspec(*timeout, now);
615		if (delta.tv_sec < 0) {
616			msec = 0;
617		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
618			msec = INFINITE;
619		} else {
620			msec = 1000 * (DWORD)delta.tv_sec;
621			msec += delta.tv_nsec / (1000 * 1000);
622		}
623	}
624	rc = WaitForSingleObject(sem, msec);
625	if (WAIT_OBJECT_0 == rc)
626		return 0;
627	if (WAIT_TIMEOUT == rc) {
628		errno = ETIMEDOUT;
629		return -1;
630	}
631	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
632	errno = EFAULT;
633	return -1;
634}
635#else	/* pthreads wait_for_sem() follows */
636{
637	int rc;
638
639	if (NULL == timeout)
640		rc = sem_wait(sem);
641	else
642		rc = sem_timedwait(sem, timeout);
643
644	return rc;
645}
646#endif
647
648
649/*
650 * blocking_thread - thread functions have WINAPI calling convention
651 */
652#ifdef SYS_WINNT
653u_int
654WINAPI
655#else
656void *
657#endif
658blocking_thread(
659	void *	ThreadArg
660	)
661{
662	blocking_child *c;
663
664	c = ThreadArg;
665	exit_worker(blocking_child_common(c));
666
667	/* NOTREACHED */
668	return 0;
669}
670
671
672/*
673 * req_child_exit() runs in the parent.
674 */
675int
676req_child_exit(
677	blocking_child *c
678	)
679{
680	return queue_req_pointer(c, CHILD_EXIT_REQ);
681}
682
683
684/*
685 * cleanup_after_child() runs in parent.
686 */
687static void
688cleanup_after_child(
689	blocking_child *	c
690	)
691{
692	u_int	idx;
693
694	DEBUG_INSIST(!c->reusable);
695#ifdef SYS_WINNT
696	INSIST(CloseHandle(c->thread_ref));
697#else
698	free(c->thread_ref);
699#endif
700	c->thread_ref = NULL;
701	c->thread_id = 0;
702#ifdef WORK_PIPE
703	DEBUG_INSIST(-1 != c->resp_read_pipe);
704	DEBUG_INSIST(-1 != c->resp_write_pipe);
705	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
706	close(c->resp_write_pipe);
707	close(c->resp_read_pipe);
708	c->resp_write_pipe = -1;
709	c->resp_read_pipe = -1;
710#else
711	DEBUG_INSIST(NULL != c->blocking_response_ready);
712	(*addremove_io_semaphore)(c->blocking_response_ready, TRUE);
713#endif
714	for (idx = 0; idx < c->workitems_alloc; idx++)
715		c->workitems[idx] = NULL;
716	c->next_workitem = 0;
717	c->next_workeritem = 0;
718	for (idx = 0; idx < c->responses_alloc; idx++)
719		c->responses[idx] = NULL;
720	c->next_response = 0;
721	c->next_workresp = 0;
722	c->reusable = TRUE;
723}
724
725
726#else	/* !WORK_THREAD follows */
727char work_thread_nonempty_compilation_unit;
728#endif
729