1275970Scy/* 2275970Scy * ntp_worker.c 3275970Scy */ 4275970Scy#include <config.h> 5275970Scy#include "ntp_workimpl.h" 6275970Scy 7275970Scy#ifdef WORKER 8275970Scy 9275970Scy#include <stdio.h> 10275970Scy#include <ctype.h> 11275970Scy#include <signal.h> 12275970Scy 13275970Scy#include "iosignal.h" 14275970Scy#include "ntp_stdlib.h" 15275970Scy#include "ntp_malloc.h" 16275970Scy#include "ntp_syslog.h" 17275970Scy#include "ntpd.h" 18275970Scy#include "ntp_io.h" 19275970Scy#include "ntp_assert.h" 20275970Scy#include "ntp_unixtime.h" 21275970Scy#include "intreswork.h" 22275970Scy 23275970Scy 24275970Scy#define CHILD_MAX_IDLE (3 * 60) /* seconds, idle worker limit */ 25275970Scy 26275970Scyblocking_child ** blocking_children; 27275970Scysize_t blocking_children_alloc; 28275970Scyint worker_per_query; /* boolean */ 29275970Scyint intres_req_pending; 30294554Sdelphijvolatile u_int blocking_child_ready_seen; 31294554Sdelphijvolatile u_int blocking_child_ready_done; 32275970Scy 33275970Scy 34275970Scy#ifndef HAVE_IO_COMPLETION_PORT 35275970Scy/* 36275970Scy * pipe_socketpair() 37275970Scy * 38275970Scy * Provides an AF_UNIX socketpair on systems which have them, otherwise 39275970Scy * pair of unidirectional pipes. 40275970Scy */ 41275970Scyint 42275970Scypipe_socketpair( 43275970Scy int caller_fds[2], 44275970Scy int * is_pipe 45275970Scy ) 46275970Scy{ 47275970Scy int rc; 48275970Scy int fds[2]; 49275970Scy int called_pipe; 50275970Scy 51275970Scy#ifdef HAVE_SOCKETPAIR 52275970Scy rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]); 53275970Scy#else 54275970Scy rc = -1; 55275970Scy#endif 56275970Scy 57275970Scy if (-1 == rc) { 58275970Scy rc = pipe(&fds[0]); 59275970Scy called_pipe = TRUE; 60275970Scy } else { 61275970Scy called_pipe = FALSE; 62275970Scy } 63275970Scy 64275970Scy if (-1 == rc) 65275970Scy return rc; 66275970Scy 67275970Scy caller_fds[0] = fds[0]; 68275970Scy caller_fds[1] = fds[1]; 69275970Scy if (is_pipe != NULL) 70275970Scy *is_pipe = called_pipe; 71275970Scy 72275970Scy return 0; 73275970Scy} 74275970Scy 75275970Scy 76275970Scy/* 77275970Scy * close_all_except() 78275970Scy * 79275970Scy * Close all file descriptors except the given keep_fd. 80275970Scy */ 81275970Scyvoid 82275970Scyclose_all_except( 83275970Scy int keep_fd 84275970Scy ) 85275970Scy{ 86275970Scy int fd; 87275970Scy 88275970Scy for (fd = 0; fd < keep_fd; fd++) 89275970Scy close(fd); 90275970Scy 91275970Scy close_all_beyond(keep_fd); 92275970Scy} 93275970Scy 94275970Scy 95275970Scy/* 96275970Scy * close_all_beyond() 97275970Scy * 98275970Scy * Close all file descriptors after the given keep_fd, which is the 99275970Scy * highest fd to keep open. 100275970Scy */ 101275970Scyvoid 102275970Scyclose_all_beyond( 103275970Scy int keep_fd 104275970Scy ) 105275970Scy{ 106275970Scy# ifdef HAVE_CLOSEFROM 107275970Scy closefrom(keep_fd + 1); 108275970Scy# elif defined(F_CLOSEM) 109275970Scy /* 110275970Scy * From 'Writing Reliable AIX Daemons,' SG24-4946-00, 111275970Scy * by Eric Agar (saves us from doing 32767 system 112275970Scy * calls) 113275970Scy */ 114275970Scy if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1) 115275970Scy msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1); 116275970Scy# else /* !HAVE_CLOSEFROM && !F_CLOSEM follows */ 117275970Scy int fd; 118275970Scy int max_fd; 119275970Scy 120275970Scy max_fd = GETDTABLESIZE(); 121275970Scy for (fd = keep_fd + 1; fd < max_fd; fd++) 122275970Scy close(fd); 123275970Scy# endif /* !HAVE_CLOSEFROM && !F_CLOSEM */ 124275970Scy} 125275970Scy#endif /* HAVE_IO_COMPLETION_PORT */ 126275970Scy 127275970Scy 128275970Scyu_int 129275970Scyavailable_blocking_child_slot(void) 130275970Scy{ 131275970Scy const size_t each = sizeof(blocking_children[0]); 132275970Scy u_int slot; 133275970Scy size_t prev_alloc; 134275970Scy size_t new_alloc; 135275970Scy size_t prev_octets; 136275970Scy size_t octets; 137275970Scy 138275970Scy for (slot = 0; slot < blocking_children_alloc; slot++) { 139275970Scy if (NULL == blocking_children[slot]) 140275970Scy return slot; 141275970Scy if (blocking_children[slot]->reusable) { 142275970Scy blocking_children[slot]->reusable = FALSE; 143275970Scy return slot; 144275970Scy } 145275970Scy } 146275970Scy 147275970Scy prev_alloc = blocking_children_alloc; 148275970Scy prev_octets = prev_alloc * each; 149275970Scy new_alloc = blocking_children_alloc + 4; 150275970Scy octets = new_alloc * each; 151275970Scy blocking_children = erealloc_zero(blocking_children, octets, 152275970Scy prev_octets); 153275970Scy blocking_children_alloc = new_alloc; 154275970Scy 155293423Sdelphij /* assume we'll never have enough workers to overflow u_int */ 156293423Sdelphij return (u_int)prev_alloc; 157275970Scy} 158275970Scy 159275970Scy 160275970Scyint 161275970Scyqueue_blocking_request( 162275970Scy blocking_work_req rtype, 163275970Scy void * req, 164275970Scy size_t reqsize, 165275970Scy blocking_work_callback done_func, 166275970Scy void * context 167275970Scy ) 168275970Scy{ 169275970Scy static u_int intres_slot = UINT_MAX; 170275970Scy u_int child_slot; 171275970Scy blocking_child * c; 172275970Scy blocking_pipe_header req_hdr; 173275970Scy 174275970Scy req_hdr.octets = sizeof(req_hdr) + reqsize; 175275970Scy req_hdr.magic_sig = BLOCKING_REQ_MAGIC; 176275970Scy req_hdr.rtype = rtype; 177275970Scy req_hdr.done_func = done_func; 178275970Scy req_hdr.context = context; 179275970Scy 180275970Scy child_slot = UINT_MAX; 181275970Scy if (worker_per_query || UINT_MAX == intres_slot || 182275970Scy blocking_children[intres_slot]->reusable) 183275970Scy child_slot = available_blocking_child_slot(); 184275970Scy if (!worker_per_query) { 185275970Scy if (UINT_MAX == intres_slot) 186275970Scy intres_slot = child_slot; 187275970Scy else 188275970Scy child_slot = intres_slot; 189275970Scy if (0 == intres_req_pending) 190275970Scy intres_timeout_req(0); 191275970Scy } 192275970Scy intres_req_pending++; 193275970Scy INSIST(UINT_MAX != child_slot); 194275970Scy c = blocking_children[child_slot]; 195275970Scy if (NULL == c) { 196275970Scy c = emalloc_zero(sizeof(*c)); 197275970Scy#ifdef WORK_FORK 198275970Scy c->req_read_pipe = -1; 199275970Scy c->req_write_pipe = -1; 200275970Scy#endif 201275970Scy#ifdef WORK_PIPE 202275970Scy c->resp_read_pipe = -1; 203275970Scy c->resp_write_pipe = -1; 204275970Scy#endif 205275970Scy blocking_children[child_slot] = c; 206275970Scy } 207275970Scy req_hdr.child_idx = child_slot; 208275970Scy 209275970Scy return send_blocking_req_internal(c, &req_hdr, req); 210275970Scy} 211275970Scy 212275970Scy 213275970Scyint queue_blocking_response( 214275970Scy blocking_child * c, 215275970Scy blocking_pipe_header * resp, 216275970Scy size_t respsize, 217275970Scy const blocking_pipe_header * req 218275970Scy ) 219275970Scy{ 220275970Scy resp->octets = respsize; 221275970Scy resp->magic_sig = BLOCKING_RESP_MAGIC; 222275970Scy resp->rtype = req->rtype; 223275970Scy resp->context = req->context; 224275970Scy resp->done_func = req->done_func; 225275970Scy 226275970Scy return send_blocking_resp_internal(c, resp); 227275970Scy} 228275970Scy 229275970Scy 230275970Scyvoid 231275970Scyprocess_blocking_resp( 232275970Scy blocking_child * c 233275970Scy ) 234275970Scy{ 235275970Scy blocking_pipe_header * resp; 236275970Scy void * data; 237275970Scy 238275970Scy /* 239275970Scy * On Windows send_blocking_resp_internal() may signal the 240275970Scy * blocking_response_ready event multiple times while we're 241275970Scy * processing a response, so always consume all available 242275970Scy * responses before returning to test the event again. 243275970Scy */ 244275970Scy#ifdef WORK_THREAD 245275970Scy do { 246275970Scy#endif 247275970Scy resp = receive_blocking_resp_internal(c); 248275970Scy if (NULL != resp) { 249275970Scy DEBUG_REQUIRE(BLOCKING_RESP_MAGIC == 250275970Scy resp->magic_sig); 251275970Scy data = (char *)resp + sizeof(*resp); 252275970Scy intres_req_pending--; 253275970Scy (*resp->done_func)(resp->rtype, resp->context, 254275970Scy resp->octets - sizeof(*resp), 255275970Scy data); 256275970Scy free(resp); 257275970Scy } 258275970Scy#ifdef WORK_THREAD 259275970Scy } while (NULL != resp); 260275970Scy#endif 261275970Scy if (!worker_per_query && 0 == intres_req_pending) 262275970Scy intres_timeout_req(CHILD_MAX_IDLE); 263275970Scy else if (worker_per_query) 264275970Scy req_child_exit(c); 265275970Scy} 266275970Scy 267294554Sdelphijvoid 268294554Sdelphijharvest_blocking_responses(void) 269294554Sdelphij{ 270298695Sdelphij size_t idx; 271294554Sdelphij blocking_child* cp; 272294554Sdelphij u_int scseen, scdone; 273275970Scy 274294554Sdelphij scseen = blocking_child_ready_seen; 275294554Sdelphij scdone = blocking_child_ready_done; 276294554Sdelphij if (scdone != scseen) { 277294554Sdelphij blocking_child_ready_done = scseen; 278294554Sdelphij for (idx = 0; idx < blocking_children_alloc; idx++) { 279294554Sdelphij cp = blocking_children[idx]; 280294554Sdelphij if (NULL == cp) 281294554Sdelphij continue; 282294554Sdelphij scseen = cp->resp_ready_seen; 283294554Sdelphij scdone = cp->resp_ready_done; 284294554Sdelphij if (scdone != scseen) { 285294554Sdelphij cp->resp_ready_done = scseen; 286294554Sdelphij process_blocking_resp(cp); 287294554Sdelphij } 288294554Sdelphij } 289294554Sdelphij } 290294554Sdelphij} 291294554Sdelphij 292294554Sdelphij 293275970Scy/* 294275970Scy * blocking_child_common runs as a forked child or a thread 295275970Scy */ 296275970Scyint 297275970Scyblocking_child_common( 298275970Scy blocking_child *c 299275970Scy ) 300275970Scy{ 301275970Scy int say_bye; 302275970Scy blocking_pipe_header *req; 303275970Scy 304275970Scy say_bye = FALSE; 305275970Scy while (!say_bye) { 306275970Scy req = receive_blocking_req_internal(c); 307275970Scy if (NULL == req) { 308275970Scy say_bye = TRUE; 309289764Sglebius continue; 310275970Scy } 311275970Scy 312275970Scy DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig); 313275970Scy 314275970Scy switch (req->rtype) { 315275970Scy case BLOCKING_GETADDRINFO: 316275970Scy if (blocking_getaddrinfo(c, req)) 317275970Scy say_bye = TRUE; 318275970Scy break; 319275970Scy 320275970Scy case BLOCKING_GETNAMEINFO: 321275970Scy if (blocking_getnameinfo(c, req)) 322275970Scy say_bye = TRUE; 323275970Scy break; 324275970Scy 325275970Scy default: 326275970Scy msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype); 327275970Scy say_bye = TRUE; 328275970Scy } 329275970Scy 330275970Scy free(req); 331275970Scy } 332275970Scy 333275970Scy return 0; 334275970Scy} 335275970Scy 336275970Scy 337275970Scy/* 338275970Scy * worker_idle_timer_fired() 339275970Scy * 340275970Scy * The parent starts this timer when the last pending response has been 341275970Scy * received from the child, making it idle, and clears the timer when a 342275970Scy * request is dispatched to the child. Once the timer expires, the 343275970Scy * child is sent packing. 344275970Scy * 345275970Scy * This is called when worker_idle_timer is nonzero and less than or 346275970Scy * equal to current_time. 347275970Scy */ 348275970Scyvoid 349275970Scyworker_idle_timer_fired(void) 350275970Scy{ 351275970Scy u_int idx; 352275970Scy blocking_child * c; 353275970Scy 354275970Scy DEBUG_REQUIRE(0 == intres_req_pending); 355275970Scy 356275970Scy intres_timeout_req(0); 357275970Scy for (idx = 0; idx < blocking_children_alloc; idx++) { 358275970Scy c = blocking_children[idx]; 359275970Scy if (NULL == c) 360275970Scy continue; 361275970Scy req_child_exit(c); 362275970Scy } 363275970Scy} 364275970Scy 365275970Scy 366275970Scy#else /* !WORKER follows */ 367275970Scyint ntp_worker_nonempty_compilation_unit; 368275970Scy#endif 369