1275970Scy/*
2275970Scy * ntp_worker.c
3275970Scy */
4275970Scy#include <config.h>
5275970Scy#include "ntp_workimpl.h"
6275970Scy
7275970Scy#ifdef WORKER
8275970Scy
9275970Scy#include <stdio.h>
10275970Scy#include <ctype.h>
11275970Scy#include <signal.h>
12275970Scy
13275970Scy#include "iosignal.h"
14275970Scy#include "ntp_stdlib.h"
15275970Scy#include "ntp_malloc.h"
16275970Scy#include "ntp_syslog.h"
17275970Scy#include "ntpd.h"
18275970Scy#include "ntp_io.h"
19275970Scy#include "ntp_assert.h"
20275970Scy#include "ntp_unixtime.h"
21275970Scy#include "intreswork.h"
22275970Scy
23275970Scy
24275970Scy#define CHILD_MAX_IDLE	(3 * 60)	/* seconds, idle worker limit */
25275970Scy
26275970Scyblocking_child **	blocking_children;
27275970Scysize_t			blocking_children_alloc;
28275970Scyint			worker_per_query;	/* boolean */
29275970Scyint			intres_req_pending;
30294554Sdelphijvolatile u_int		blocking_child_ready_seen;
31294554Sdelphijvolatile u_int		blocking_child_ready_done;
32275970Scy
33275970Scy
34275970Scy#ifndef HAVE_IO_COMPLETION_PORT
35275970Scy/*
36275970Scy * pipe_socketpair()
37275970Scy *
38275970Scy * Provides an AF_UNIX socketpair on systems which have them, otherwise
39275970Scy * pair of unidirectional pipes.
40275970Scy */
41275970Scyint
42275970Scypipe_socketpair(
43275970Scy	int	caller_fds[2],
44275970Scy	int *	is_pipe
45275970Scy	)
46275970Scy{
47275970Scy	int	rc;
48275970Scy	int	fds[2];
49275970Scy	int	called_pipe;
50275970Scy
51275970Scy#ifdef HAVE_SOCKETPAIR
52275970Scy	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]);
53275970Scy#else
54275970Scy	rc = -1;
55275970Scy#endif
56275970Scy
57275970Scy	if (-1 == rc) {
58275970Scy		rc = pipe(&fds[0]);
59275970Scy		called_pipe = TRUE;
60275970Scy	} else {
61275970Scy		called_pipe = FALSE;
62275970Scy	}
63275970Scy
64275970Scy	if (-1 == rc)
65275970Scy		return rc;
66275970Scy
67275970Scy	caller_fds[0] = fds[0];
68275970Scy	caller_fds[1] = fds[1];
69275970Scy	if (is_pipe != NULL)
70275970Scy		*is_pipe = called_pipe;
71275970Scy
72275970Scy	return 0;
73275970Scy}
74275970Scy
75275970Scy
76275970Scy/*
77275970Scy * close_all_except()
78275970Scy *
79275970Scy * Close all file descriptors except the given keep_fd.
80275970Scy */
81275970Scyvoid
82275970Scyclose_all_except(
83275970Scy	int keep_fd
84275970Scy	)
85275970Scy{
86275970Scy	int fd;
87275970Scy
88275970Scy	for (fd = 0; fd < keep_fd; fd++)
89275970Scy		close(fd);
90275970Scy
91275970Scy	close_all_beyond(keep_fd);
92275970Scy}
93275970Scy
94275970Scy
95275970Scy/*
96275970Scy * close_all_beyond()
97275970Scy *
98275970Scy * Close all file descriptors after the given keep_fd, which is the
99275970Scy * highest fd to keep open.
100275970Scy */
101275970Scyvoid
102275970Scyclose_all_beyond(
103275970Scy	int keep_fd
104275970Scy	)
105275970Scy{
106275970Scy# ifdef HAVE_CLOSEFROM
107275970Scy	closefrom(keep_fd + 1);
108275970Scy# elif defined(F_CLOSEM)
109275970Scy	/*
110275970Scy	 * From 'Writing Reliable AIX Daemons,' SG24-4946-00,
111275970Scy	 * by Eric Agar (saves us from doing 32767 system
112275970Scy	 * calls)
113275970Scy	 */
114275970Scy	if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1)
115275970Scy		msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1);
116275970Scy# else	/* !HAVE_CLOSEFROM && !F_CLOSEM follows */
117275970Scy	int fd;
118275970Scy	int max_fd;
119275970Scy
120275970Scy	max_fd = GETDTABLESIZE();
121275970Scy	for (fd = keep_fd + 1; fd < max_fd; fd++)
122275970Scy		close(fd);
123275970Scy# endif	/* !HAVE_CLOSEFROM && !F_CLOSEM */
124275970Scy}
125275970Scy#endif	/* HAVE_IO_COMPLETION_PORT */
126275970Scy
127275970Scy
128275970Scyu_int
129275970Scyavailable_blocking_child_slot(void)
130275970Scy{
131275970Scy	const size_t	each = sizeof(blocking_children[0]);
132275970Scy	u_int		slot;
133275970Scy	size_t		prev_alloc;
134275970Scy	size_t		new_alloc;
135275970Scy	size_t		prev_octets;
136275970Scy	size_t		octets;
137275970Scy
138275970Scy	for (slot = 0; slot < blocking_children_alloc; slot++) {
139275970Scy		if (NULL == blocking_children[slot])
140275970Scy			return slot;
141275970Scy		if (blocking_children[slot]->reusable) {
142275970Scy			blocking_children[slot]->reusable = FALSE;
143275970Scy			return slot;
144275970Scy		}
145275970Scy	}
146275970Scy
147275970Scy	prev_alloc = blocking_children_alloc;
148275970Scy	prev_octets = prev_alloc * each;
149275970Scy	new_alloc = blocking_children_alloc + 4;
150275970Scy	octets = new_alloc * each;
151275970Scy	blocking_children = erealloc_zero(blocking_children, octets,
152275970Scy					  prev_octets);
153275970Scy	blocking_children_alloc = new_alloc;
154275970Scy
155293423Sdelphij	/* assume we'll never have enough workers to overflow u_int */
156293423Sdelphij	return (u_int)prev_alloc;
157275970Scy}
158275970Scy
159275970Scy
160275970Scyint
161275970Scyqueue_blocking_request(
162275970Scy	blocking_work_req	rtype,
163275970Scy	void *			req,
164275970Scy	size_t			reqsize,
165275970Scy	blocking_work_callback	done_func,
166275970Scy	void *			context
167275970Scy	)
168275970Scy{
169275970Scy	static u_int		intres_slot = UINT_MAX;
170275970Scy	u_int			child_slot;
171275970Scy	blocking_child *	c;
172275970Scy	blocking_pipe_header	req_hdr;
173275970Scy
174275970Scy	req_hdr.octets = sizeof(req_hdr) + reqsize;
175275970Scy	req_hdr.magic_sig = BLOCKING_REQ_MAGIC;
176275970Scy	req_hdr.rtype = rtype;
177275970Scy	req_hdr.done_func = done_func;
178275970Scy	req_hdr.context = context;
179275970Scy
180275970Scy	child_slot = UINT_MAX;
181275970Scy	if (worker_per_query || UINT_MAX == intres_slot ||
182275970Scy	    blocking_children[intres_slot]->reusable)
183275970Scy		child_slot = available_blocking_child_slot();
184275970Scy	if (!worker_per_query) {
185275970Scy		if (UINT_MAX == intres_slot)
186275970Scy			intres_slot = child_slot;
187275970Scy		else
188275970Scy			child_slot = intres_slot;
189275970Scy		if (0 == intres_req_pending)
190275970Scy			intres_timeout_req(0);
191275970Scy	}
192275970Scy	intres_req_pending++;
193275970Scy	INSIST(UINT_MAX != child_slot);
194275970Scy	c = blocking_children[child_slot];
195275970Scy	if (NULL == c) {
196275970Scy		c = emalloc_zero(sizeof(*c));
197275970Scy#ifdef WORK_FORK
198275970Scy		c->req_read_pipe = -1;
199275970Scy		c->req_write_pipe = -1;
200275970Scy#endif
201275970Scy#ifdef WORK_PIPE
202275970Scy		c->resp_read_pipe = -1;
203275970Scy		c->resp_write_pipe = -1;
204275970Scy#endif
205275970Scy		blocking_children[child_slot] = c;
206275970Scy	}
207275970Scy	req_hdr.child_idx = child_slot;
208275970Scy
209275970Scy	return send_blocking_req_internal(c, &req_hdr, req);
210275970Scy}
211275970Scy
212275970Scy
213275970Scyint queue_blocking_response(
214275970Scy	blocking_child *		c,
215275970Scy	blocking_pipe_header *		resp,
216275970Scy	size_t				respsize,
217275970Scy	const blocking_pipe_header *	req
218275970Scy	)
219275970Scy{
220275970Scy	resp->octets = respsize;
221275970Scy	resp->magic_sig = BLOCKING_RESP_MAGIC;
222275970Scy	resp->rtype = req->rtype;
223275970Scy	resp->context = req->context;
224275970Scy	resp->done_func = req->done_func;
225275970Scy
226275970Scy	return send_blocking_resp_internal(c, resp);
227275970Scy}
228275970Scy
229275970Scy
230275970Scyvoid
231275970Scyprocess_blocking_resp(
232275970Scy	blocking_child *	c
233275970Scy	)
234275970Scy{
235275970Scy	blocking_pipe_header *	resp;
236275970Scy	void *			data;
237275970Scy
238275970Scy	/*
239275970Scy	 * On Windows send_blocking_resp_internal() may signal the
240275970Scy	 * blocking_response_ready event multiple times while we're
241275970Scy	 * processing a response, so always consume all available
242275970Scy	 * responses before returning to test the event again.
243275970Scy	 */
244275970Scy#ifdef WORK_THREAD
245275970Scy	do {
246275970Scy#endif
247275970Scy		resp = receive_blocking_resp_internal(c);
248275970Scy		if (NULL != resp) {
249275970Scy			DEBUG_REQUIRE(BLOCKING_RESP_MAGIC ==
250275970Scy				      resp->magic_sig);
251275970Scy			data = (char *)resp + sizeof(*resp);
252275970Scy			intres_req_pending--;
253275970Scy			(*resp->done_func)(resp->rtype, resp->context,
254275970Scy					   resp->octets - sizeof(*resp),
255275970Scy					   data);
256275970Scy			free(resp);
257275970Scy		}
258275970Scy#ifdef WORK_THREAD
259275970Scy	} while (NULL != resp);
260275970Scy#endif
261275970Scy	if (!worker_per_query && 0 == intres_req_pending)
262275970Scy		intres_timeout_req(CHILD_MAX_IDLE);
263275970Scy	else if (worker_per_query)
264275970Scy		req_child_exit(c);
265275970Scy}
266275970Scy
267294554Sdelphijvoid
268294554Sdelphijharvest_blocking_responses(void)
269294554Sdelphij{
270298695Sdelphij	size_t		idx;
271294554Sdelphij	blocking_child*	cp;
272294554Sdelphij	u_int		scseen, scdone;
273275970Scy
274294554Sdelphij	scseen = blocking_child_ready_seen;
275294554Sdelphij	scdone = blocking_child_ready_done;
276294554Sdelphij	if (scdone != scseen) {
277294554Sdelphij		blocking_child_ready_done = scseen;
278294554Sdelphij		for (idx = 0; idx < blocking_children_alloc; idx++) {
279294554Sdelphij			cp = blocking_children[idx];
280294554Sdelphij			if (NULL == cp)
281294554Sdelphij				continue;
282294554Sdelphij			scseen = cp->resp_ready_seen;
283294554Sdelphij			scdone = cp->resp_ready_done;
284294554Sdelphij			if (scdone != scseen) {
285294554Sdelphij				cp->resp_ready_done = scseen;
286294554Sdelphij				process_blocking_resp(cp);
287294554Sdelphij			}
288294554Sdelphij		}
289294554Sdelphij	}
290294554Sdelphij}
291294554Sdelphij
292294554Sdelphij
293275970Scy/*
294275970Scy * blocking_child_common runs as a forked child or a thread
295275970Scy */
296275970Scyint
297275970Scyblocking_child_common(
298275970Scy	blocking_child	*c
299275970Scy	)
300275970Scy{
301275970Scy	int say_bye;
302275970Scy	blocking_pipe_header *req;
303275970Scy
304275970Scy	say_bye = FALSE;
305275970Scy	while (!say_bye) {
306275970Scy		req = receive_blocking_req_internal(c);
307275970Scy		if (NULL == req) {
308275970Scy			say_bye = TRUE;
309289764Sglebius			continue;
310275970Scy		}
311275970Scy
312275970Scy		DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig);
313275970Scy
314275970Scy		switch (req->rtype) {
315275970Scy		case BLOCKING_GETADDRINFO:
316275970Scy			if (blocking_getaddrinfo(c, req))
317275970Scy				say_bye = TRUE;
318275970Scy			break;
319275970Scy
320275970Scy		case BLOCKING_GETNAMEINFO:
321275970Scy			if (blocking_getnameinfo(c, req))
322275970Scy				say_bye = TRUE;
323275970Scy			break;
324275970Scy
325275970Scy		default:
326275970Scy			msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype);
327275970Scy			say_bye = TRUE;
328275970Scy		}
329275970Scy
330275970Scy		free(req);
331275970Scy	}
332275970Scy
333275970Scy	return 0;
334275970Scy}
335275970Scy
336275970Scy
337275970Scy/*
338275970Scy * worker_idle_timer_fired()
339275970Scy *
340275970Scy * The parent starts this timer when the last pending response has been
341275970Scy * received from the child, making it idle, and clears the timer when a
342275970Scy * request is dispatched to the child.  Once the timer expires, the
343275970Scy * child is sent packing.
344275970Scy *
345275970Scy * This is called when worker_idle_timer is nonzero and less than or
346275970Scy * equal to current_time.
347275970Scy */
348275970Scyvoid
349275970Scyworker_idle_timer_fired(void)
350275970Scy{
351275970Scy	u_int			idx;
352275970Scy	blocking_child *	c;
353275970Scy
354275970Scy	DEBUG_REQUIRE(0 == intres_req_pending);
355275970Scy
356275970Scy	intres_timeout_req(0);
357275970Scy	for (idx = 0; idx < blocking_children_alloc; idx++) {
358275970Scy		c = blocking_children[idx];
359275970Scy		if (NULL == c)
360275970Scy			continue;
361275970Scy		req_child_exit(c);
362275970Scy	}
363275970Scy}
364275970Scy
365275970Scy
366275970Scy#else	/* !WORKER follows */
367275970Scyint ntp_worker_nonempty_compilation_unit;
368275970Scy#endif
369