ntp_worker.c revision 293896
1/*
2 * ntp_worker.c
3 */
4#include <config.h>
5#include "ntp_workimpl.h"
6
7#ifdef WORKER
8
9#include <stdio.h>
10#include <ctype.h>
11#include <signal.h>
12
13#include "iosignal.h"
14#include "ntp_stdlib.h"
15#include "ntp_malloc.h"
16#include "ntp_syslog.h"
17#include "ntpd.h"
18#include "ntp_io.h"
19#include "ntp_assert.h"
20#include "ntp_unixtime.h"
21#include "intreswork.h"
22
23
24#define CHILD_MAX_IDLE	(3 * 60)	/* seconds, idle worker limit */
25
26blocking_child **	blocking_children;
27size_t			blocking_children_alloc;
28int			worker_per_query;	/* boolean */
29int			intres_req_pending;
30
31
32#ifndef HAVE_IO_COMPLETION_PORT
33/*
34 * pipe_socketpair()
35 *
36 * Provides an AF_UNIX socketpair on systems which have them, otherwise
37 * pair of unidirectional pipes.
38 */
39int
40pipe_socketpair(
41	int	caller_fds[2],
42	int *	is_pipe
43	)
44{
45	int	rc;
46	int	fds[2];
47	int	called_pipe;
48
49#ifdef HAVE_SOCKETPAIR
50	rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]);
51#else
52	rc = -1;
53#endif
54
55	if (-1 == rc) {
56		rc = pipe(&fds[0]);
57		called_pipe = TRUE;
58	} else {
59		called_pipe = FALSE;
60	}
61
62	if (-1 == rc)
63		return rc;
64
65	caller_fds[0] = fds[0];
66	caller_fds[1] = fds[1];
67	if (is_pipe != NULL)
68		*is_pipe = called_pipe;
69
70	return 0;
71}
72
73
74/*
75 * close_all_except()
76 *
77 * Close all file descriptors except the given keep_fd.
78 */
79void
80close_all_except(
81	int keep_fd
82	)
83{
84	int fd;
85
86	for (fd = 0; fd < keep_fd; fd++)
87		close(fd);
88
89	close_all_beyond(keep_fd);
90}
91
92
93/*
94 * close_all_beyond()
95 *
96 * Close all file descriptors after the given keep_fd, which is the
97 * highest fd to keep open.
98 */
99void
100close_all_beyond(
101	int keep_fd
102	)
103{
104# ifdef HAVE_CLOSEFROM
105	closefrom(keep_fd + 1);
106# elif defined(F_CLOSEM)
107	/*
108	 * From 'Writing Reliable AIX Daemons,' SG24-4946-00,
109	 * by Eric Agar (saves us from doing 32767 system
110	 * calls)
111	 */
112	if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1)
113		msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1);
114# else	/* !HAVE_CLOSEFROM && !F_CLOSEM follows */
115	int fd;
116	int max_fd;
117
118	max_fd = GETDTABLESIZE();
119	for (fd = keep_fd + 1; fd < max_fd; fd++)
120		close(fd);
121# endif	/* !HAVE_CLOSEFROM && !F_CLOSEM */
122}
123#endif	/* HAVE_IO_COMPLETION_PORT */
124
125
126u_int
127available_blocking_child_slot(void)
128{
129	const size_t	each = sizeof(blocking_children[0]);
130	u_int		slot;
131	size_t		prev_alloc;
132	size_t		new_alloc;
133	size_t		prev_octets;
134	size_t		octets;
135
136	for (slot = 0; slot < blocking_children_alloc; slot++) {
137		if (NULL == blocking_children[slot])
138			return slot;
139		if (blocking_children[slot]->reusable) {
140			blocking_children[slot]->reusable = FALSE;
141			return slot;
142		}
143	}
144
145	prev_alloc = blocking_children_alloc;
146	prev_octets = prev_alloc * each;
147	new_alloc = blocking_children_alloc + 4;
148	octets = new_alloc * each;
149	blocking_children = erealloc_zero(blocking_children, octets,
150					  prev_octets);
151	blocking_children_alloc = new_alloc;
152
153	/* assume we'll never have enough workers to overflow u_int */
154	return (u_int)prev_alloc;
155}
156
157
158int
159queue_blocking_request(
160	blocking_work_req	rtype,
161	void *			req,
162	size_t			reqsize,
163	blocking_work_callback	done_func,
164	void *			context
165	)
166{
167	static u_int		intres_slot = UINT_MAX;
168	u_int			child_slot;
169	blocking_child *	c;
170	blocking_pipe_header	req_hdr;
171
172	req_hdr.octets = sizeof(req_hdr) + reqsize;
173	req_hdr.magic_sig = BLOCKING_REQ_MAGIC;
174	req_hdr.rtype = rtype;
175	req_hdr.done_func = done_func;
176	req_hdr.context = context;
177
178	child_slot = UINT_MAX;
179	if (worker_per_query || UINT_MAX == intres_slot ||
180	    blocking_children[intres_slot]->reusable)
181		child_slot = available_blocking_child_slot();
182	if (!worker_per_query) {
183		if (UINT_MAX == intres_slot)
184			intres_slot = child_slot;
185		else
186			child_slot = intres_slot;
187		if (0 == intres_req_pending)
188			intres_timeout_req(0);
189	}
190	intres_req_pending++;
191	INSIST(UINT_MAX != child_slot);
192	c = blocking_children[child_slot];
193	if (NULL == c) {
194		c = emalloc_zero(sizeof(*c));
195#ifdef WORK_FORK
196		c->req_read_pipe = -1;
197		c->req_write_pipe = -1;
198#endif
199#ifdef WORK_PIPE
200		c->resp_read_pipe = -1;
201		c->resp_write_pipe = -1;
202#endif
203		blocking_children[child_slot] = c;
204	}
205	req_hdr.child_idx = child_slot;
206
207	return send_blocking_req_internal(c, &req_hdr, req);
208}
209
210
211int queue_blocking_response(
212	blocking_child *		c,
213	blocking_pipe_header *		resp,
214	size_t				respsize,
215	const blocking_pipe_header *	req
216	)
217{
218	resp->octets = respsize;
219	resp->magic_sig = BLOCKING_RESP_MAGIC;
220	resp->rtype = req->rtype;
221	resp->context = req->context;
222	resp->done_func = req->done_func;
223
224	return send_blocking_resp_internal(c, resp);
225}
226
227
228void
229process_blocking_resp(
230	blocking_child *	c
231	)
232{
233	blocking_pipe_header *	resp;
234	void *			data;
235
236	/*
237	 * On Windows send_blocking_resp_internal() may signal the
238	 * blocking_response_ready event multiple times while we're
239	 * processing a response, so always consume all available
240	 * responses before returning to test the event again.
241	 */
242#ifdef WORK_THREAD
243	do {
244#endif
245		resp = receive_blocking_resp_internal(c);
246		if (NULL != resp) {
247			DEBUG_REQUIRE(BLOCKING_RESP_MAGIC ==
248				      resp->magic_sig);
249			data = (char *)resp + sizeof(*resp);
250			intres_req_pending--;
251			(*resp->done_func)(resp->rtype, resp->context,
252					   resp->octets - sizeof(*resp),
253					   data);
254			free(resp);
255		}
256#ifdef WORK_THREAD
257	} while (NULL != resp);
258#endif
259	if (!worker_per_query && 0 == intres_req_pending)
260		intres_timeout_req(CHILD_MAX_IDLE);
261	else if (worker_per_query)
262		req_child_exit(c);
263}
264
265
266/*
267 * blocking_child_common runs as a forked child or a thread
268 */
269int
270blocking_child_common(
271	blocking_child	*c
272	)
273{
274	int say_bye;
275	blocking_pipe_header *req;
276
277	say_bye = FALSE;
278	while (!say_bye) {
279		req = receive_blocking_req_internal(c);
280		if (NULL == req) {
281			say_bye = TRUE;
282			continue;
283		}
284
285		DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig);
286
287		switch (req->rtype) {
288		case BLOCKING_GETADDRINFO:
289			if (blocking_getaddrinfo(c, req))
290				say_bye = TRUE;
291			break;
292
293		case BLOCKING_GETNAMEINFO:
294			if (blocking_getnameinfo(c, req))
295				say_bye = TRUE;
296			break;
297
298		default:
299			msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype);
300			say_bye = TRUE;
301		}
302
303		free(req);
304	}
305
306	return 0;
307}
308
309
310/*
311 * worker_idle_timer_fired()
312 *
313 * The parent starts this timer when the last pending response has been
314 * received from the child, making it idle, and clears the timer when a
315 * request is dispatched to the child.  Once the timer expires, the
316 * child is sent packing.
317 *
318 * This is called when worker_idle_timer is nonzero and less than or
319 * equal to current_time.
320 */
321void
322worker_idle_timer_fired(void)
323{
324	u_int			idx;
325	blocking_child *	c;
326
327	DEBUG_REQUIRE(0 == intres_req_pending);
328
329	intres_timeout_req(0);
330	for (idx = 0; idx < blocking_children_alloc; idx++) {
331		c = blocking_children[idx];
332		if (NULL == c)
333			continue;
334		req_child_exit(c);
335	}
336}
337
338
339#else	/* !WORKER follows */
340int ntp_worker_nonempty_compilation_unit;
341#endif
342