ntp_worker.c revision 290001
1/*
2 * ntp_worker.c
3 */
4#include <config.h>
5#include "ntp_workimpl.h"
6
7#ifdef WORKER
8
9#include <stdio.h>
10#include <ctype.h>
11#include <signal.h>
12
13#include "iosignal.h"
14#include "ntp_stdlib.h"
15#include "ntp_malloc.h"
16#include "ntp_syslog.h"
17#include "ntpd.h"
18#include "ntp_io.h"
19#include "ntp_assert.h"
20#include "ntp_unixtime.h"
21#include "intreswork.h"
22
23
24#define CHILD_MAX_IDLE	(3 * 60)	/* seconds, idle worker limit */
25
26blocking_child **	blocking_children;
27size_t			blocking_children_alloc;
28int			worker_per_query;	/* boolean */
29int			intres_req_pending;
30
31
32#ifndef HAVE_IO_COMPLETION_PORT
33/*
34 * pipe_socketpair()
35 *
36 * Provides an AF_UNIX socketpair on systems which have them, otherwise
37 * pair of unidirectional pipes.
38 */
39int
40pipe_socketpair(
41	int	caller_fds[2],
42	int *	is_pipe
43	)
44{
45	int	rc;
46	int	fds[2];
47	int	called_pipe;
48
49#ifdef HAVE_SOCKETPAIR
50	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]);
51#else
52	rc = -1;
53#endif
54
55	if (-1 == rc) {
56		rc = pipe(&fds[0]);
57		called_pipe = TRUE;
58	} else {
59		called_pipe = FALSE;
60	}
61
62	if (-1 == rc)
63		return rc;
64
65	caller_fds[0] = fds[0];
66	caller_fds[1] = fds[1];
67	if (is_pipe != NULL)
68		*is_pipe = called_pipe;
69
70	return 0;
71}
72
73
74/*
75 * close_all_except()
76 *
77 * Close all file descriptors except the given keep_fd.
78 */
79void
80close_all_except(
81	int keep_fd
82	)
83{
84	int fd;
85
86	for (fd = 0; fd < keep_fd; fd++)
87		close(fd);
88
89	close_all_beyond(keep_fd);
90}
91
92
93/*
94 * close_all_beyond()
95 *
96 * Close all file descriptors after the given keep_fd, which is the
97 * highest fd to keep open.
98 */
99void
100close_all_beyond(
101	int keep_fd
102	)
103{
104# ifdef HAVE_CLOSEFROM
105	closefrom(keep_fd + 1);
106# elif defined(F_CLOSEM)
107	/*
108	 * From 'Writing Reliable AIX Daemons,' SG24-4946-00,
109	 * by Eric Agar (saves us from doing 32767 system
110	 * calls)
111	 */
112	if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1)
113		msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1);
114# else	/* !HAVE_CLOSEFROM && !F_CLOSEM follows */
115	int fd;
116	int max_fd;
117
118	max_fd = GETDTABLESIZE();
119	for (fd = keep_fd + 1; fd < max_fd; fd++)
120		close(fd);
121# endif	/* !HAVE_CLOSEFROM && !F_CLOSEM */
122}
123#endif	/* HAVE_IO_COMPLETION_PORT */
124
125
126u_int
127available_blocking_child_slot(void)
128{
129	const size_t	each = sizeof(blocking_children[0]);
130	u_int		slot;
131	size_t		prev_alloc;
132	size_t		new_alloc;
133	size_t		prev_octets;
134	size_t		octets;
135
136	for (slot = 0; slot < blocking_children_alloc; slot++) {
137		if (NULL == blocking_children[slot])
138			return slot;
139		if (blocking_children[slot]->reusable) {
140			blocking_children[slot]->reusable = FALSE;
141			return slot;
142		}
143	}
144
145	prev_alloc = blocking_children_alloc;
146	prev_octets = prev_alloc * each;
147	new_alloc = blocking_children_alloc + 4;
148	octets = new_alloc * each;
149	blocking_children = erealloc_zero(blocking_children, octets,
150					  prev_octets);
151	blocking_children_alloc = new_alloc;
152
153	return prev_alloc;
154}
155
156
157int
158queue_blocking_request(
159	blocking_work_req	rtype,
160	void *			req,
161	size_t			reqsize,
162	blocking_work_callback	done_func,
163	void *			context
164	)
165{
166	static u_int		intres_slot = UINT_MAX;
167	u_int			child_slot;
168	blocking_child *	c;
169	blocking_pipe_header	req_hdr;
170
171	req_hdr.octets = sizeof(req_hdr) + reqsize;
172	req_hdr.magic_sig = BLOCKING_REQ_MAGIC;
173	req_hdr.rtype = rtype;
174	req_hdr.done_func = done_func;
175	req_hdr.context = context;
176
177	child_slot = UINT_MAX;
178	if (worker_per_query || UINT_MAX == intres_slot ||
179	    blocking_children[intres_slot]->reusable)
180		child_slot = available_blocking_child_slot();
181	if (!worker_per_query) {
182		if (UINT_MAX == intres_slot)
183			intres_slot = child_slot;
184		else
185			child_slot = intres_slot;
186		if (0 == intres_req_pending)
187			intres_timeout_req(0);
188	}
189	intres_req_pending++;
190	INSIST(UINT_MAX != child_slot);
191	c = blocking_children[child_slot];
192	if (NULL == c) {
193		c = emalloc_zero(sizeof(*c));
194#ifdef WORK_FORK
195		c->req_read_pipe = -1;
196		c->req_write_pipe = -1;
197#endif
198#ifdef WORK_PIPE
199		c->resp_read_pipe = -1;
200		c->resp_write_pipe = -1;
201#endif
202		blocking_children[child_slot] = c;
203	}
204	req_hdr.child_idx = child_slot;
205
206	return send_blocking_req_internal(c, &req_hdr, req);
207}
208
209
210int queue_blocking_response(
211	blocking_child *		c,
212	blocking_pipe_header *		resp,
213	size_t				respsize,
214	const blocking_pipe_header *	req
215	)
216{
217	resp->octets = respsize;
218	resp->magic_sig = BLOCKING_RESP_MAGIC;
219	resp->rtype = req->rtype;
220	resp->context = req->context;
221	resp->done_func = req->done_func;
222
223	return send_blocking_resp_internal(c, resp);
224}
225
226
227void
228process_blocking_resp(
229	blocking_child *	c
230	)
231{
232	blocking_pipe_header *	resp;
233	void *			data;
234
235	/*
236	 * On Windows send_blocking_resp_internal() may signal the
237	 * blocking_response_ready event multiple times while we're
238	 * processing a response, so always consume all available
239	 * responses before returning to test the event again.
240	 */
241#ifdef WORK_THREAD
242	do {
243#endif
244		resp = receive_blocking_resp_internal(c);
245		if (NULL != resp) {
246			DEBUG_REQUIRE(BLOCKING_RESP_MAGIC ==
247				      resp->magic_sig);
248			data = (char *)resp + sizeof(*resp);
249			intres_req_pending--;
250			(*resp->done_func)(resp->rtype, resp->context,
251					   resp->octets - sizeof(*resp),
252					   data);
253			free(resp);
254		}
255#ifdef WORK_THREAD
256	} while (NULL != resp);
257#endif
258	if (!worker_per_query && 0 == intres_req_pending)
259		intres_timeout_req(CHILD_MAX_IDLE);
260	else if (worker_per_query)
261		req_child_exit(c);
262}
263
264
265/*
266 * blocking_child_common runs as a forked child or a thread
267 */
268int
269blocking_child_common(
270	blocking_child	*c
271	)
272{
273	int say_bye;
274	blocking_pipe_header *req;
275
276	say_bye = FALSE;
277	while (!say_bye) {
278		req = receive_blocking_req_internal(c);
279		if (NULL == req) {
280			say_bye = TRUE;
281			continue;
282		}
283
284		DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig);
285
286		switch (req->rtype) {
287		case BLOCKING_GETADDRINFO:
288			if (blocking_getaddrinfo(c, req))
289				say_bye = TRUE;
290			break;
291
292		case BLOCKING_GETNAMEINFO:
293			if (blocking_getnameinfo(c, req))
294				say_bye = TRUE;
295			break;
296
297		default:
298			msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype);
299			say_bye = TRUE;
300		}
301
302		free(req);
303	}
304
305	return 0;
306}
307
308
309/*
310 * worker_idle_timer_fired()
311 *
312 * The parent starts this timer when the last pending response has been
313 * received from the child, making it idle, and clears the timer when a
314 * request is dispatched to the child.  Once the timer expires, the
315 * child is sent packing.
316 *
317 * This is called when worker_idle_timer is nonzero and less than or
318 * equal to current_time.
319 */
320void
321worker_idle_timer_fired(void)
322{
323	u_int			idx;
324	blocking_child *	c;
325
326	DEBUG_REQUIRE(0 == intres_req_pending);
327
328	intres_timeout_req(0);
329	for (idx = 0; idx < blocking_children_alloc; idx++) {
330		c = blocking_children[idx];
331		if (NULL == c)
332			continue;
333		req_child_exit(c);
334	}
335}
336
337
338#else	/* !WORKER follows */
339int ntp_worker_nonempty_compilation_unit;
340#endif
341