work_thread.c revision 275970
1/*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4#include <config.h>
5#include "ntp_workimpl.h"
6
7#ifdef WORK_THREAD
8
9#include <stdio.h>
10#include <ctype.h>
11#include <signal.h>
12#ifndef SYS_WINNT
13#include <pthread.h>
14#endif
15
16#include "ntp_stdlib.h"
17#include "ntp_malloc.h"
18#include "ntp_syslog.h"
19#include "ntpd.h"
20#include "ntp_io.h"
21#include "ntp_assert.h"
22#include "ntp_unixtime.h"
23#include "timespecops.h"
24#include "ntp_worker.h"
25
26#define CHILD_EXIT_REQ	((blocking_pipe_header *)(intptr_t)-1)
27#define CHILD_GONE_RESP	CHILD_EXIT_REQ
28#define WORKITEMS_ALLOC_INC	16
29#define RESPONSES_ALLOC_INC	4
30
31#ifndef THREAD_MINSTACKSIZE
32#define THREAD_MINSTACKSIZE	(64U * 1024)
33#endif
34
35#ifndef DEVOLATILE
36#define DEVOLATILE(type, var) ((type)(uintptr_t)(volatile void *)(var))
37#endif
38
39#ifdef SYS_WINNT
40# define thread_exit(c)	_endthreadex(c)
41# define tickle_sem	SetEvent
42#else
43# define thread_exit(c)	pthread_exit((void*)(size_t)(c))
44# define tickle_sem	sem_post
45#endif
46
47#ifdef WORK_PIPE
48addremove_io_fd_func		addremove_io_fd;
49#else
50addremove_io_semaphore_func	addremove_io_semaphore;
51#endif
52
53static	void	start_blocking_thread(blocking_child *);
54static	void	start_blocking_thread_internal(blocking_child *);
55static	void	prepare_child_sems(blocking_child *);
56static	int	wait_for_sem(sem_ref, struct timespec *);
57static	void	ensure_workitems_empty_slot(blocking_child *);
58static	void	ensure_workresp_empty_slot(blocking_child *);
59static	int	queue_req_pointer(blocking_child *, blocking_pipe_header *);
60static	void	cleanup_after_child(blocking_child *);
61#ifdef SYS_WINNT
62u_int	WINAPI	blocking_thread(void *);
63#else
64void *		blocking_thread(void *);
65#endif
66#ifndef SYS_WINNT
67static	void	block_thread_signals(sigset_t *);
68#endif
69
70
71void
72exit_worker(
73	int	exitcode
74	)
75{
76	thread_exit(exitcode);	/* see #define thread_exit */
77}
78
79
80int
81worker_sleep(
82	blocking_child *	c,
83	time_t			seconds
84	)
85{
86	struct timespec	until;
87	int		rc;
88
89# ifdef HAVE_CLOCK_GETTIME
90	if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
91		msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
92		return -1;
93	}
94# else
95	if (0 != getclock(TIMEOFDAY, &until)) {
96		msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
97		return -1;
98	}
99# endif
100	until.tv_sec += seconds;
101	do {
102		rc = wait_for_sem(c->wake_scheduled_sleep, &until);
103	} while (-1 == rc && EINTR == errno);
104	if (0 == rc)
105		return -1;
106	if (-1 == rc && ETIMEDOUT == errno)
107		return 0;
108	msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
109	return -1;
110}
111
112
113void
114interrupt_worker_sleep(void)
115{
116	u_int			idx;
117	blocking_child *	c;
118
119	for (idx = 0; idx < blocking_children_alloc; idx++) {
120		c = blocking_children[idx];
121		if (NULL == c || NULL == c->wake_scheduled_sleep)
122			continue;
123		tickle_sem(c->wake_scheduled_sleep);
124	}
125}
126
127
128static void
129ensure_workitems_empty_slot(
130	blocking_child *c
131	)
132{
133	const size_t	each = sizeof(blocking_children[0]->workitems[0]);
134	size_t		new_alloc;
135	size_t		old_octets;
136	size_t		new_octets;
137	void *		nonvol_workitems;
138
139
140	if (c->workitems != NULL &&
141	    NULL == c->workitems[c->next_workitem])
142		return;
143
144	new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
145	old_octets = c->workitems_alloc * each;
146	new_octets = new_alloc * each;
147	nonvol_workitems = DEVOLATILE(void *, c->workitems);
148	c->workitems = erealloc_zero(nonvol_workitems, new_octets,
149				     old_octets);
150	if (0 == c->next_workitem)
151		c->next_workitem = c->workitems_alloc;
152	c->workitems_alloc = new_alloc;
153}
154
155
156static void
157ensure_workresp_empty_slot(
158	blocking_child *c
159	)
160{
161	const size_t	each = sizeof(blocking_children[0]->responses[0]);
162	size_t		new_alloc;
163	size_t		old_octets;
164	size_t		new_octets;
165	void *		nonvol_responses;
166
167	if (c->responses != NULL &&
168	    NULL == c->responses[c->next_response])
169		return;
170
171	new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
172	old_octets = c->responses_alloc * each;
173	new_octets = new_alloc * each;
174	nonvol_responses = DEVOLATILE(void *, c->responses);
175	c->responses = erealloc_zero(nonvol_responses, new_octets,
176				     old_octets);
177	if (0 == c->next_response)
178		c->next_response = c->responses_alloc;
179	c->responses_alloc = new_alloc;
180}
181
182
183/*
184 * queue_req_pointer() - append a work item or idle exit request to
185 *			 blocking_workitems[].
186 */
187static int
188queue_req_pointer(
189	blocking_child	*	c,
190	blocking_pipe_header *	hdr
191	)
192{
193	c->workitems[c->next_workitem] = hdr;
194	c->next_workitem = (1 + c->next_workitem) % c->workitems_alloc;
195
196	/*
197	 * We only want to signal the wakeup event if the child is
198	 * blocking on it, which is indicated by setting the blocking
199	 * event.  Wait with zero timeout to test.
200	 */
201	/* !!!! if (WAIT_OBJECT_0 == WaitForSingleObject(c->child_is_blocking, 0)) */
202		tickle_sem(c->blocking_req_ready);
203
204	return 0;
205}
206
207
208int
209send_blocking_req_internal(
210	blocking_child *	c,
211	blocking_pipe_header *	hdr,
212	void *			data
213	)
214{
215	blocking_pipe_header *	threadcopy;
216	size_t			payload_octets;
217
218	REQUIRE(hdr != NULL);
219	REQUIRE(data != NULL);
220	DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
221
222	if (hdr->octets <= sizeof(*hdr))
223		return 1;	/* failure */
224	payload_octets = hdr->octets - sizeof(*hdr);
225
226	ensure_workitems_empty_slot(c);
227	if (NULL == c->thread_ref) {
228		ensure_workresp_empty_slot(c);
229		start_blocking_thread(c);
230	}
231
232	threadcopy = emalloc(hdr->octets);
233	memcpy(threadcopy, hdr, sizeof(*hdr));
234	memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
235
236	return queue_req_pointer(c, threadcopy);
237}
238
239
240blocking_pipe_header *
241receive_blocking_req_internal(
242	blocking_child *	c
243	)
244{
245	blocking_pipe_header *	req;
246	int			rc;
247
248	/*
249	 * Child blocks here when idle.  SysV semaphores maintain a
250	 * count and release from sem_wait() only when it reaches 0.
251	 * Windows auto-reset events are simpler, and multiple SetEvent
252	 * calls before any thread waits result in a single wakeup.
253	 * On Windows, the child drains all workitems each wakeup, while
254	 * with SysV semaphores wait_sem() is used before each item.
255	 */
256#ifdef SYS_WINNT
257	while (NULL == c->workitems[c->next_workeritem]) {
258		/* !!!! SetEvent(c->child_is_blocking); */
259		rc = wait_for_sem(c->blocking_req_ready, NULL);
260		INSIST(0 == rc);
261		/* !!!! ResetEvent(c->child_is_blocking); */
262	}
263#else
264	do {
265		rc = wait_for_sem(c->blocking_req_ready, NULL);
266	} while (-1 == rc && EINTR == errno);
267	INSIST(0 == rc);
268#endif
269
270	req = c->workitems[c->next_workeritem];
271	INSIST(NULL != req);
272	c->workitems[c->next_workeritem] = NULL;
273	c->next_workeritem = (1 + c->next_workeritem) %
274				c->workitems_alloc;
275
276	if (CHILD_EXIT_REQ == req) {	/* idled out */
277		send_blocking_resp_internal(c, CHILD_GONE_RESP);
278		req = NULL;
279	}
280
281	return req;
282}
283
284
285int
286send_blocking_resp_internal(
287	blocking_child *	c,
288	blocking_pipe_header *	resp
289	)
290{
291	ensure_workresp_empty_slot(c);
292
293	c->responses[c->next_response] = resp;
294	c->next_response = (1 + c->next_response) % c->responses_alloc;
295
296#ifdef WORK_PIPE
297	write(c->resp_write_pipe, "", 1);
298#else
299	tickle_sem(c->blocking_response_ready);
300#endif
301
302	return 0;
303}
304
305
306#ifndef WORK_PIPE
307void
308handle_blocking_resp_sem(
309	void *	context
310	)
311{
312	HANDLE			ready;
313	blocking_child *	c;
314	u_int			idx;
315
316	ready = (HANDLE)context;
317	c = NULL;
318	for (idx = 0; idx < blocking_children_alloc; idx++) {
319		c = blocking_children[idx];
320		if (c != NULL && c->thread_ref != NULL &&
321		    ready == c->blocking_response_ready)
322			break;
323	}
324	if (idx < blocking_children_alloc)
325		process_blocking_resp(c);
326}
327#endif	/* !WORK_PIPE */
328
329
330blocking_pipe_header *
331receive_blocking_resp_internal(
332	blocking_child *	c
333	)
334{
335	blocking_pipe_header *	removed;
336#ifdef WORK_PIPE
337	int			rc;
338	char			scratch[32];
339
340	do {
341		rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
342	} while (-1 == rc && EINTR == errno);
343#endif
344	removed = c->responses[c->next_workresp];
345	if (NULL != removed) {
346		c->responses[c->next_workresp] = NULL;
347		c->next_workresp = (1 + c->next_workresp) %
348				   c->responses_alloc;
349		DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
350			     BLOCKING_RESP_MAGIC == removed->magic_sig);
351	}
352	if (CHILD_GONE_RESP == removed) {
353		cleanup_after_child(c);
354		removed = NULL;
355	}
356
357	return removed;
358}
359
360
361static void
362start_blocking_thread(
363	blocking_child *	c
364	)
365{
366
367	DEBUG_INSIST(!c->reusable);
368
369	prepare_child_sems(c);
370	start_blocking_thread_internal(c);
371}
372
373
374static void
375start_blocking_thread_internal(
376	blocking_child *	c
377	)
378#ifdef SYS_WINNT
379{
380	thr_ref	blocking_child_thread;
381	u_int	blocking_thread_id;
382	BOOL	resumed;
383
384	(*addremove_io_semaphore)(c->blocking_response_ready, FALSE);
385	blocking_child_thread =
386		(HANDLE)_beginthreadex(
387			NULL,
388			0,
389			&blocking_thread,
390			c,
391			CREATE_SUSPENDED,
392			&blocking_thread_id);
393
394	if (NULL == blocking_child_thread) {
395		msyslog(LOG_ERR, "start blocking thread failed: %m");
396		exit(-1);
397	}
398	c->thread_id = blocking_thread_id;
399	c->thread_ref = blocking_child_thread;
400	/* remember the thread priority is only within the process class */
401	if (!SetThreadPriority(blocking_child_thread,
402			       THREAD_PRIORITY_BELOW_NORMAL))
403		msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
404
405	resumed = ResumeThread(blocking_child_thread);
406	DEBUG_INSIST(resumed);
407}
408#else	/* pthreads start_blocking_thread_internal() follows */
409{
410# ifdef NEED_PTHREAD_INIT
411	static int	pthread_init_called;
412# endif
413	pthread_attr_t	thr_attr;
414	int		rc;
415	int		saved_errno;
416	int		pipe_ends[2];	/* read then write */
417	int		is_pipe;
418	int		flags;
419	size_t		stacksize;
420	sigset_t	saved_sig_mask;
421
422# ifdef NEED_PTHREAD_INIT
423	/*
424	 * from lib/isc/unix/app.c:
425	 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
426	 */
427	if (!pthread_init_called) {
428		pthread_init();
429		pthread_init_called = TRUE;
430	}
431# endif
432
433	rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
434	if (0 != rc) {
435		msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
436		exit(1);
437	}
438	c->resp_read_pipe = move_fd(pipe_ends[0]);
439	c->resp_write_pipe = move_fd(pipe_ends[1]);
440	c->ispipe = is_pipe;
441	flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
442	if (-1 == flags) {
443		msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
444		exit(1);
445	}
446	rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
447	if (-1 == rc) {
448		msyslog(LOG_ERR,
449			"start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
450		exit(1);
451	}
452	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
453	pthread_attr_init(&thr_attr);
454	pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
455#if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
456    defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
457	rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
458	if (-1 == rc) {
459		msyslog(LOG_ERR,
460			"start_blocking_thread: pthread_attr_getstacksize %m");
461	} else if (stacksize < THREAD_MINSTACKSIZE) {
462		rc = pthread_attr_setstacksize(&thr_attr,
463					       THREAD_MINSTACKSIZE);
464		if (-1 == rc)
465			msyslog(LOG_ERR,
466				"start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
467				(u_long)stacksize,
468				(u_long)THREAD_MINSTACKSIZE);
469	}
470#else
471	UNUSED_ARG(stacksize);
472#endif
473#if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
474	pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
475#endif
476	c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
477	block_thread_signals(&saved_sig_mask);
478	rc = pthread_create(c->thread_ref, &thr_attr,
479			    &blocking_thread, c);
480	saved_errno = errno;
481	pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
482	pthread_attr_destroy(&thr_attr);
483	if (0 != rc) {
484		errno = saved_errno;
485		msyslog(LOG_ERR, "pthread_create() blocking child: %m");
486		exit(1);
487	}
488}
489#endif
490
491
492/*
493 * block_thread_signals()
494 *
495 * Temporarily block signals used by ntpd main thread, so that signal
496 * mask inherited by child threads leaves them blocked.  Returns prior
497 * active signal mask via pmask, to be restored by the main thread
498 * after pthread_create().
499 */
500#ifndef SYS_WINNT
501void
502block_thread_signals(
503	sigset_t *	pmask
504	)
505{
506	sigset_t	block;
507
508	sigemptyset(&block);
509# ifdef HAVE_SIGNALED_IO
510#  ifdef SIGIO
511	sigaddset(&block, SIGIO);
512#  endif
513#  ifdef SIGPOLL
514	sigaddset(&block, SIGPOLL);
515#  endif
516# endif	/* HAVE_SIGNALED_IO */
517	sigaddset(&block, SIGALRM);
518	sigaddset(&block, MOREDEBUGSIG);
519	sigaddset(&block, LESSDEBUGSIG);
520# ifdef SIGDIE1
521	sigaddset(&block, SIGDIE1);
522# endif
523# ifdef SIGDIE2
524	sigaddset(&block, SIGDIE2);
525# endif
526# ifdef SIGDIE3
527	sigaddset(&block, SIGDIE3);
528# endif
529# ifdef SIGDIE4
530	sigaddset(&block, SIGDIE4);
531# endif
532# ifdef SIGBUS
533	sigaddset(&block, SIGBUS);
534# endif
535	sigemptyset(pmask);
536	pthread_sigmask(SIG_BLOCK, &block, pmask);
537}
538#endif	/* !SYS_WINNT */
539
540
541/*
542 * prepare_child_sems()
543 *
544 * create sync events (semaphores)
545 * child_is_blocking initially unset
546 * blocking_req_ready initially unset
547 *
548 * Child waits for blocking_req_ready to be set after
549 * setting child_is_blocking.  blocking_req_ready and
550 * blocking_response_ready are auto-reset, so wake one
551 * waiter and become unset (unsignalled) in one operation.
552 */
553static void
554prepare_child_sems(
555	blocking_child *c
556	)
557#ifdef SYS_WINNT
558{
559	if (NULL == c->blocking_req_ready) {
560		/* manual reset using ResetEvent() */
561		/* !!!! c->child_is_blocking = CreateEvent(NULL, TRUE, FALSE, NULL); */
562		/* auto reset - one thread released from wait each set */
563		c->blocking_req_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
564		c->blocking_response_ready = CreateEvent(NULL, FALSE, FALSE, NULL);
565		c->wake_scheduled_sleep = CreateEvent(NULL, FALSE, FALSE, NULL);
566	} else {
567		/* !!!! ResetEvent(c->child_is_blocking); */
568		/* ResetEvent(c->blocking_req_ready); */
569		/* ResetEvent(c->blocking_response_ready); */
570		/* ResetEvent(c->wake_scheduled_sleep); */
571	}
572}
573#else	/* pthreads prepare_child_sems() follows */
574{
575	size_t	octets;
576
577	if (NULL == c->blocking_req_ready) {
578		octets = sizeof(*c->blocking_req_ready);
579		octets += sizeof(*c->wake_scheduled_sleep);
580		/* !!!! octets += sizeof(*c->child_is_blocking); */
581		c->blocking_req_ready = emalloc_zero(octets);;
582		c->wake_scheduled_sleep = 1 + c->blocking_req_ready;
583		/* !!!! c->child_is_blocking = 1 + c->wake_scheduled_sleep; */
584	} else {
585		sem_destroy(c->blocking_req_ready);
586		sem_destroy(c->wake_scheduled_sleep);
587		/* !!!! sem_destroy(c->child_is_blocking); */
588	}
589	sem_init(c->blocking_req_ready, FALSE, 0);
590	sem_init(c->wake_scheduled_sleep, FALSE, 0);
591	/* !!!! sem_init(c->child_is_blocking, FALSE, 0); */
592}
593#endif
594
595
596static int
597wait_for_sem(
598	sem_ref			sem,
599	struct timespec *	timeout		/* wall-clock */
600	)
601#ifdef SYS_WINNT
602{
603	struct timespec now;
604	struct timespec delta;
605	DWORD		msec;
606	DWORD		rc;
607
608	if (NULL == timeout) {
609		msec = INFINITE;
610	} else {
611		getclock(TIMEOFDAY, &now);
612		delta = sub_tspec(*timeout, now);
613		if (delta.tv_sec < 0) {
614			msec = 0;
615		} else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
616			msec = INFINITE;
617		} else {
618			msec = 1000 * (DWORD)delta.tv_sec;
619			msec += delta.tv_nsec / (1000 * 1000);
620		}
621	}
622	rc = WaitForSingleObject(sem, msec);
623	if (WAIT_OBJECT_0 == rc)
624		return 0;
625	if (WAIT_TIMEOUT == rc) {
626		errno = ETIMEDOUT;
627		return -1;
628	}
629	msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
630	errno = EFAULT;
631	return -1;
632}
633#else	/* pthreads wait_for_sem() follows */
634{
635	int rc;
636
637	if (NULL == timeout)
638		rc = sem_wait(sem);
639	else
640		rc = sem_timedwait(sem, timeout);
641
642	return rc;
643}
644#endif
645
646
647/*
648 * blocking_thread - thread functions have WINAPI calling convention
649 */
650#ifdef SYS_WINNT
651u_int
652WINAPI
653#else
654void *
655#endif
656blocking_thread(
657	void *	ThreadArg
658	)
659{
660	blocking_child *c;
661
662	c = ThreadArg;
663	exit_worker(blocking_child_common(c));
664
665	/* NOTREACHED */
666	return 0;
667}
668
669
670/*
671 * req_child_exit() runs in the parent.
672 */
673int
674req_child_exit(
675	blocking_child *c
676	)
677{
678	return queue_req_pointer(c, CHILD_EXIT_REQ);
679}
680
681
682/*
683 * cleanup_after_child() runs in parent.
684 */
685static void
686cleanup_after_child(
687	blocking_child *	c
688	)
689{
690	u_int	idx;
691
692	DEBUG_INSIST(!c->reusable);
693#ifdef SYS_WINNT
694	INSIST(CloseHandle(c->thread_ref));
695#else
696	free(c->thread_ref);
697#endif
698	c->thread_ref = NULL;
699	c->thread_id = 0;
700#ifdef WORK_PIPE
701	DEBUG_INSIST(-1 != c->resp_read_pipe);
702	DEBUG_INSIST(-1 != c->resp_write_pipe);
703	(*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
704	close(c->resp_write_pipe);
705	close(c->resp_read_pipe);
706	c->resp_write_pipe = -1;
707	c->resp_read_pipe = -1;
708#else
709	DEBUG_INSIST(NULL != c->blocking_response_ready);
710	(*addremove_io_semaphore)(c->blocking_response_ready, TRUE);
711#endif
712	for (idx = 0; idx < c->workitems_alloc; idx++)
713		c->workitems[idx] = NULL;
714	c->next_workitem = 0;
715	c->next_workeritem = 0;
716	for (idx = 0; idx < c->responses_alloc; idx++)
717		c->responses[idx] = NULL;
718	c->next_response = 0;
719	c->next_workresp = 0;
720	c->reusable = TRUE;
721}
722
723
724#else	/* !WORK_THREAD follows */
725char work_thread_nonempty_compilation_unit;
726#endif
727