1168515Sgshapiro/* 2244833Sgshapiro * Copyright (c) 2003-2004, 2007, 2009-2012 Sendmail, Inc. and its suppliers. 3168515Sgshapiro * All rights reserved. 4168515Sgshapiro * 5168515Sgshapiro * By using this file, you agree to the terms and conditions set 6168515Sgshapiro * forth in the LICENSE file which can be found at the top level of 7168515Sgshapiro * the sendmail distribution. 8168515Sgshapiro * 9168515Sgshapiro * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris 10168515Sgshapiro * Jose-Marcio.Martins@ensmp.fr 11168515Sgshapiro */ 12168515Sgshapiro 13168515Sgshapiro#include <sm/gen.h> 14244833SgshapiroSM_RCSID("@(#)$Id: worker.c,v 8.24 2012/03/13 15:37:46 ca Exp $") 15168515Sgshapiro 16168515Sgshapiro#include "libmilter.h" 17168515Sgshapiro 18168515Sgshapiro#if _FFR_WORKERS_POOL 19168515Sgshapiro 20168515Sgshapirotypedef struct taskmgr_S taskmgr_T; 21168515Sgshapiro 22168515Sgshapiro#define TM_SIGNATURE 0x23021957 23168515Sgshapiro 24168515Sgshapirostruct taskmgr_S 25168515Sgshapiro{ 26168515Sgshapiro long tm_signature; /* has the controller been initialized */ 27168515Sgshapiro sthread_t tm_tid; /* thread id of controller */ 28168515Sgshapiro smfi_hd_T tm_ctx_head; /* head of the linked list of contexts */ 29168515Sgshapiro 30168515Sgshapiro int tm_nb_workers; /* number of workers in the pool */ 31168515Sgshapiro int tm_nb_idle; /* number of workers waiting */ 32168515Sgshapiro 33168515Sgshapiro int tm_p[2]; /* poll control pipe */ 34168515Sgshapiro 35168515Sgshapiro smutex_t tm_w_mutex; /* linked list access mutex */ 36168515Sgshapiro scond_t tm_w_cond; /* */ 37168515Sgshapiro}; 38168515Sgshapiro 39168515Sgshapirostatic taskmgr_T Tskmgr = {0}; 40168515Sgshapiro 41168515Sgshapiro#define WRK_CTX_HEAD Tskmgr.tm_ctx_head 42168515Sgshapiro 43168515Sgshapiro#define RD_PIPE (Tskmgr.tm_p[0]) 44168515Sgshapiro#define WR_PIPE (Tskmgr.tm_p[1]) 45168515Sgshapiro 46168515Sgshapiro#define PIPE_SEND_SIGNAL() \ 47168515Sgshapiro do \ 48168515Sgshapiro { \ 49168515Sgshapiro char evt = 0x5a; \ 50168515Sgshapiro int fd = WR_PIPE; \ 51168515Sgshapiro if (write(fd, &evt, sizeof(evt)) != sizeof(evt)) \ 52168515Sgshapiro smi_log(SMI_LOG_ERR, \ 53168515Sgshapiro "Error writing to event pipe: %s", \ 54168515Sgshapiro sm_errstring(errno)); \ 55168515Sgshapiro } while (0) 56168515Sgshapiro 57168515Sgshapiro#ifndef USE_PIPE_WAKE_POLL 58168515Sgshapiro# define USE_PIPE_WAKE_POLL 1 59168515Sgshapiro#endif /* USE_PIPE_WAKE_POLL */ 60168515Sgshapiro 61168515Sgshapiro/* poll check periodicity (default 10000 - 10 s) */ 62168515Sgshapiro#define POLL_TIMEOUT 10000 63168515Sgshapiro 64168515Sgshapiro/* worker conditional wait timeout (default 10 s) */ 65168515Sgshapiro#define COND_TIMEOUT 10 66168515Sgshapiro 67168515Sgshapiro/* functions */ 68168515Sgshapirostatic int mi_close_session __P((SMFICTX_PTR)); 69168515Sgshapiro 70168515Sgshapirostatic void *mi_worker __P((void *)); 71168515Sgshapirostatic void *mi_pool_controller __P((void *)); 72168515Sgshapiro 73168515Sgshapirostatic int mi_list_add_ctx __P((SMFICTX_PTR)); 74168515Sgshapirostatic int mi_list_del_ctx __P((SMFICTX_PTR)); 75168515Sgshapiro 76168515Sgshapiro/* 77168515Sgshapiro** periodicity of cleaning up old sessions (timedout) 78168515Sgshapiro** sessions list will be checked to find old inactive 79168515Sgshapiro** sessions each DT_CHECK_OLD_SESSIONS sec 80168515Sgshapiro*/ 81168515Sgshapiro 82168515Sgshapiro#define DT_CHECK_OLD_SESSIONS 600 83168515Sgshapiro 84168515Sgshapiro#ifndef OLD_SESSION_TIMEOUT 85168515Sgshapiro# define OLD_SESSION_TIMEOUT ctx->ctx_timeout 86168515Sgshapiro#endif /* OLD_SESSION_TIMEOUT */ 87168515Sgshapiro 88168515Sgshapiro/* session states - with respect to the pool of workers */ 89168515Sgshapiro#define WKST_INIT 0 /* initial state */ 90168515Sgshapiro#define WKST_READY_TO_RUN 1 /* command ready do be read */ 91168515Sgshapiro#define WKST_RUNNING 2 /* session running on a worker */ 92168515Sgshapiro#define WKST_READY_TO_WAIT 3 /* session just finished by a worker */ 93168515Sgshapiro#define WKST_WAITING 4 /* waiting for new command */ 94168515Sgshapiro#define WKST_CLOSING 5 /* session finished */ 95168515Sgshapiro 96168515Sgshapiro#ifndef MIN_WORKERS 97168515Sgshapiro# define MIN_WORKERS 2 /* minimum number of threads to keep around */ 98168515Sgshapiro#endif 99168515Sgshapiro 100168515Sgshapiro#define MIN_IDLE 1 /* minimum number of idle threads */ 101168515Sgshapiro 102168515Sgshapiro 103168515Sgshapiro/* 104168515Sgshapiro** Macros for threads and mutex management 105168515Sgshapiro*/ 106168515Sgshapiro 107168515Sgshapiro#define TASKMGR_LOCK() \ 108168515Sgshapiro do \ 109168515Sgshapiro { \ 110168515Sgshapiro if (!smutex_lock(&Tskmgr.tm_w_mutex)) \ 111168515Sgshapiro smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error"); \ 112168515Sgshapiro } while (0) 113168515Sgshapiro 114168515Sgshapiro#define TASKMGR_UNLOCK() \ 115168515Sgshapiro do \ 116168515Sgshapiro { \ 117168515Sgshapiro if (!smutex_unlock(&Tskmgr.tm_w_mutex)) \ 118168515Sgshapiro smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error"); \ 119168515Sgshapiro } while (0) 120168515Sgshapiro 121168515Sgshapiro#define TASKMGR_COND_WAIT() \ 122168515Sgshapiro scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT) 123168515Sgshapiro 124168515Sgshapiro#define TASKMGR_COND_SIGNAL() \ 125168515Sgshapiro do \ 126168515Sgshapiro { \ 127168515Sgshapiro if (scond_signal(&Tskmgr.tm_w_cond) != 0) \ 128168515Sgshapiro smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \ 129168515Sgshapiro } while (0) 130168515Sgshapiro 131168515Sgshapiro#define LAUNCH_WORKER(ctx) \ 132168515Sgshapiro do \ 133168515Sgshapiro { \ 134168515Sgshapiro int r; \ 135168515Sgshapiro sthread_t tid; \ 136168515Sgshapiro \ 137168515Sgshapiro if ((r = thread_create(&tid, mi_worker, ctx)) != 0) \ 138168515Sgshapiro smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\ 139168515Sgshapiro sm_errstring(r)); \ 140168515Sgshapiro } while (0) 141168515Sgshapiro 142168515Sgshapiro#if POOL_DEBUG 143168515Sgshapiro# define POOL_LEV_DPRINTF(lev, x) \ 144244833Sgshapiro do \ 145244833Sgshapiro { \ 146168515Sgshapiro if ((lev) < ctx->ctx_dbg) \ 147168515Sgshapiro sm_dprintf x; \ 148168515Sgshapiro } while (0) 149168515Sgshapiro#else /* POOL_DEBUG */ 150168515Sgshapiro# define POOL_LEV_DPRINTF(lev, x) 151168515Sgshapiro#endif /* POOL_DEBUG */ 152168515Sgshapiro 153168515Sgshapiro/* 154168515Sgshapiro** MI_START_SESSION -- Start a session in the pool of workers 155168515Sgshapiro** 156168515Sgshapiro** Parameters: 157168515Sgshapiro** ctx -- context structure 158168515Sgshapiro** 159168515Sgshapiro** Returns: 160168515Sgshapiro** MI_SUCCESS/MI_FAILURE 161168515Sgshapiro*/ 162168515Sgshapiro 163168515Sgshapiroint 164168515Sgshapiromi_start_session(ctx) 165168515Sgshapiro SMFICTX_PTR ctx; 166168515Sgshapiro{ 167168515Sgshapiro static long id = 0; 168168515Sgshapiro 169223067Sgshapiro /* this can happen if the milter is shutting down */ 170223067Sgshapiro if (Tskmgr.tm_signature != TM_SIGNATURE) 171223067Sgshapiro return MI_FAILURE; 172168515Sgshapiro SM_ASSERT(ctx != NULL); 173168515Sgshapiro POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE)); 174168515Sgshapiro TASKMGR_LOCK(); 175168515Sgshapiro 176168515Sgshapiro if (mi_list_add_ctx(ctx) != MI_SUCCESS) 177168515Sgshapiro { 178168515Sgshapiro TASKMGR_UNLOCK(); 179168515Sgshapiro return MI_FAILURE; 180168515Sgshapiro } 181168515Sgshapiro 182168515Sgshapiro ctx->ctx_sid = id++; 183168515Sgshapiro 184168515Sgshapiro /* if there is an idle worker, signal it, otherwise start new worker */ 185168515Sgshapiro if (Tskmgr.tm_nb_idle > 0) 186168515Sgshapiro { 187168515Sgshapiro ctx->ctx_wstate = WKST_READY_TO_RUN; 188168515Sgshapiro TASKMGR_COND_SIGNAL(); 189168515Sgshapiro } 190168515Sgshapiro else 191168515Sgshapiro { 192168515Sgshapiro ctx->ctx_wstate = WKST_RUNNING; 193168515Sgshapiro LAUNCH_WORKER(ctx); 194168515Sgshapiro } 195168515Sgshapiro TASKMGR_UNLOCK(); 196168515Sgshapiro return MI_SUCCESS; 197168515Sgshapiro} 198168515Sgshapiro 199168515Sgshapiro/* 200168515Sgshapiro** MI_CLOSE_SESSION -- Close a session and clean up data structures 201168515Sgshapiro** 202168515Sgshapiro** Parameters: 203168515Sgshapiro** ctx -- context structure 204168515Sgshapiro** 205168515Sgshapiro** Returns: 206168515Sgshapiro** MI_SUCCESS/MI_FAILURE 207168515Sgshapiro*/ 208168515Sgshapiro 209168515Sgshapirostatic int 210168515Sgshapiromi_close_session(ctx) 211168515Sgshapiro SMFICTX_PTR ctx; 212168515Sgshapiro{ 213168515Sgshapiro SM_ASSERT(ctx != NULL); 214168515Sgshapiro 215168515Sgshapiro (void) mi_list_del_ctx(ctx); 216203004Sgshapiro mi_clr_ctx(ctx); 217168515Sgshapiro 218168515Sgshapiro return MI_SUCCESS; 219168515Sgshapiro} 220168515Sgshapiro 221168515Sgshapiro/* 222223067Sgshapiro** NONBLOCKING -- set nonblocking mode for a file descriptor. 223223067Sgshapiro** 224223067Sgshapiro** Parameters: 225223067Sgshapiro** fd -- file descriptor 226223067Sgshapiro** name -- name for (error) logging 227223067Sgshapiro** 228223067Sgshapiro** Returns: 229223067Sgshapiro** MI_SUCCESS/MI_FAILURE 230223067Sgshapiro*/ 231223067Sgshapiro 232223067Sgshapirostatic int 233223067Sgshapirononblocking(int fd, const char *name) 234223067Sgshapiro{ 235223067Sgshapiro int r; 236223067Sgshapiro 237223067Sgshapiro errno = 0; 238223067Sgshapiro r = fcntl(fd, F_GETFL, 0); 239223067Sgshapiro if (r == -1) 240223067Sgshapiro { 241223067Sgshapiro smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s", 242223067Sgshapiro name, sm_errstring(errno)); 243223067Sgshapiro return MI_FAILURE; 244223067Sgshapiro } 245223067Sgshapiro errno = 0; 246223067Sgshapiro r = fcntl(fd, F_SETFL, r | O_NONBLOCK); 247223067Sgshapiro if (r == -1) 248223067Sgshapiro { 249223067Sgshapiro smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s", 250223067Sgshapiro name, sm_errstring(errno)); 251223067Sgshapiro return MI_FAILURE; 252223067Sgshapiro } 253223067Sgshapiro return MI_SUCCESS; 254223067Sgshapiro} 255223067Sgshapiro 256223067Sgshapiro/* 257168515Sgshapiro** MI_POOL_CONTROLER_INIT -- Launch the worker pool controller 258168515Sgshapiro** Must be called before starting sessions. 259168515Sgshapiro** 260168515Sgshapiro** Parameters: 261168515Sgshapiro** none 262168515Sgshapiro** 263168515Sgshapiro** Returns: 264168515Sgshapiro** MI_SUCCESS/MI_FAILURE 265168515Sgshapiro*/ 266168515Sgshapiro 267168515Sgshapiroint 268168515Sgshapiromi_pool_controller_init() 269168515Sgshapiro{ 270168515Sgshapiro sthread_t tid; 271168515Sgshapiro int r, i; 272168515Sgshapiro 273168515Sgshapiro if (Tskmgr.tm_signature == TM_SIGNATURE) 274168515Sgshapiro return MI_SUCCESS; 275168515Sgshapiro 276168515Sgshapiro SM_TAILQ_INIT(&WRK_CTX_HEAD); 277168515Sgshapiro Tskmgr.tm_tid = (sthread_t) -1; 278168515Sgshapiro Tskmgr.tm_nb_workers = 0; 279168515Sgshapiro Tskmgr.tm_nb_idle = 0; 280168515Sgshapiro 281168515Sgshapiro if (pipe(Tskmgr.tm_p) != 0) 282168515Sgshapiro { 283168515Sgshapiro smi_log(SMI_LOG_ERR, "can't create event pipe: %s", 284203004Sgshapiro sm_errstring(errno)); 285168515Sgshapiro return MI_FAILURE; 286168515Sgshapiro } 287223067Sgshapiro r = nonblocking(WR_PIPE, "WR_PIPE"); 288223067Sgshapiro if (r != MI_SUCCESS) 289223067Sgshapiro return r; 290223067Sgshapiro r = nonblocking(RD_PIPE, "RD_PIPE"); 291223067Sgshapiro if (r != MI_SUCCESS) 292223067Sgshapiro return r; 293168515Sgshapiro 294168515Sgshapiro (void) smutex_init(&Tskmgr.tm_w_mutex); 295168515Sgshapiro (void) scond_init(&Tskmgr.tm_w_cond); 296168515Sgshapiro 297168515Sgshapiro /* Launch the pool controller */ 298168515Sgshapiro if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0) 299168515Sgshapiro { 300168515Sgshapiro smi_log(SMI_LOG_ERR, "can't create controller thread: %s", 301168515Sgshapiro sm_errstring(r)); 302168515Sgshapiro return MI_FAILURE; 303168515Sgshapiro } 304168515Sgshapiro Tskmgr.tm_tid = tid; 305168515Sgshapiro Tskmgr.tm_signature = TM_SIGNATURE; 306168515Sgshapiro 307168515Sgshapiro /* Create the pool of workers */ 308168515Sgshapiro for (i = 0; i < MIN_WORKERS; i++) 309168515Sgshapiro { 310168515Sgshapiro if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0) 311168515Sgshapiro { 312168515Sgshapiro smi_log(SMI_LOG_ERR, "can't create workers crew: %s", 313168515Sgshapiro sm_errstring(r)); 314168515Sgshapiro return MI_FAILURE; 315168515Sgshapiro } 316168515Sgshapiro } 317168515Sgshapiro 318168515Sgshapiro return MI_SUCCESS; 319168515Sgshapiro} 320168515Sgshapiro 321168515Sgshapiro/* 322168515Sgshapiro** MI_POOL_CONTROLLER -- manage the pool of workers 323168515Sgshapiro** This thread must be running when listener begins 324168515Sgshapiro** starting sessions 325168515Sgshapiro** 326168515Sgshapiro** Parameters: 327168515Sgshapiro** arg -- unused 328168515Sgshapiro** 329168515Sgshapiro** Returns: 330168515Sgshapiro** NULL 331168515Sgshapiro** 332168515Sgshapiro** Control flow: 333168515Sgshapiro** for (;;) 334168515Sgshapiro** Look for timed out sessions 335168515Sgshapiro** Select sessions to wait for sendmail command 336168515Sgshapiro** Poll set of file descriptors 337168515Sgshapiro** if timeout 338168515Sgshapiro** continue 339168515Sgshapiro** For each file descriptor ready 340168515Sgshapiro** launch new thread if no worker available 341168515Sgshapiro** else 342168515Sgshapiro** signal waiting worker 343168515Sgshapiro*/ 344168515Sgshapiro 345168515Sgshapiro/* Poll structure array (pollfd) size step */ 346168515Sgshapiro#define PFD_STEP 256 347168515Sgshapiro 348168515Sgshapiro#define WAIT_FD(i) (pfd[i].fd) 349168515Sgshapiro#define WAITFN "POLL" 350168515Sgshapiro 351168515Sgshapirostatic void * 352168515Sgshapiromi_pool_controller(arg) 353168515Sgshapiro void *arg; 354168515Sgshapiro{ 355168515Sgshapiro struct pollfd *pfd = NULL; 356168515Sgshapiro int dim_pfd = 0; 357168515Sgshapiro bool rebuild_set = true; 358168515Sgshapiro int pcnt = 0; /* error count for poll() failures */ 359203004Sgshapiro time_t lastcheck; 360168515Sgshapiro 361168515Sgshapiro Tskmgr.tm_tid = sthread_get_id(); 362168515Sgshapiro if (pthread_detach(Tskmgr.tm_tid) != 0) 363168515Sgshapiro { 364168515Sgshapiro smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread"); 365168515Sgshapiro return NULL; 366168515Sgshapiro } 367168515Sgshapiro 368168515Sgshapiro pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd)); 369168515Sgshapiro if (pfd == NULL) 370168515Sgshapiro { 371168515Sgshapiro smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s", 372168515Sgshapiro sm_errstring(errno)); 373168515Sgshapiro return NULL; 374168515Sgshapiro } 375168515Sgshapiro dim_pfd = PFD_STEP; 376168515Sgshapiro 377203004Sgshapiro lastcheck = time(NULL); 378168515Sgshapiro for (;;) 379168515Sgshapiro { 380168515Sgshapiro SMFICTX_PTR ctx; 381244833Sgshapiro int nfd, r, i; 382168515Sgshapiro time_t now; 383168515Sgshapiro 384168515Sgshapiro POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN)); 385168515Sgshapiro 386168515Sgshapiro if (mi_stop() != MILTER_CONT) 387168515Sgshapiro break; 388168515Sgshapiro 389168515Sgshapiro TASKMGR_LOCK(); 390168515Sgshapiro 391168515Sgshapiro now = time(NULL); 392168515Sgshapiro 393168515Sgshapiro /* check for timed out sessions? */ 394168515Sgshapiro if (lastcheck + DT_CHECK_OLD_SESSIONS < now) 395168515Sgshapiro { 396203004Sgshapiro ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 397203004Sgshapiro while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD)) 398168515Sgshapiro { 399203004Sgshapiro SMFICTX_PTR ctx_nxt; 400203004Sgshapiro 401203004Sgshapiro ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link); 402168515Sgshapiro if (ctx->ctx_wstate == WKST_WAITING) 403168515Sgshapiro { 404168515Sgshapiro if (ctx->ctx_wait == 0) 405168515Sgshapiro ctx->ctx_wait = now; 406203004Sgshapiro else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT 407203004Sgshapiro < now) 408168515Sgshapiro { 409203004Sgshapiro /* if session timed out, close it */ 410168515Sgshapiro sfsistat (*fi_close) __P((SMFICTX *)); 411168515Sgshapiro 412168515Sgshapiro POOL_LEV_DPRINTF(4, 413168515Sgshapiro ("Closing old connection: sd=%d id=%d", 414168515Sgshapiro ctx->ctx_sd, 415168515Sgshapiro ctx->ctx_sid)); 416168515Sgshapiro 417168515Sgshapiro if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL) 418168515Sgshapiro (void) (*fi_close)(ctx); 419168515Sgshapiro 420168515Sgshapiro mi_close_session(ctx); 421168515Sgshapiro } 422168515Sgshapiro } 423203004Sgshapiro ctx = ctx_nxt; 424168515Sgshapiro } 425168515Sgshapiro lastcheck = now; 426168515Sgshapiro } 427168515Sgshapiro 428168515Sgshapiro if (rebuild_set) 429168515Sgshapiro { 430168515Sgshapiro /* 431168515Sgshapiro ** Initialize poll set. 432168515Sgshapiro ** Insert into the poll set the file descriptors of 433168515Sgshapiro ** all sessions waiting for a command from sendmail. 434168515Sgshapiro */ 435168515Sgshapiro 436168515Sgshapiro nfd = 0; 437168515Sgshapiro 438168515Sgshapiro /* begin with worker pipe */ 439168515Sgshapiro pfd[nfd].fd = RD_PIPE; 440168515Sgshapiro pfd[nfd].events = MI_POLL_RD_FLAGS; 441168515Sgshapiro pfd[nfd].revents = 0; 442168515Sgshapiro nfd++; 443168515Sgshapiro 444168515Sgshapiro SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 445168515Sgshapiro { 446168515Sgshapiro /* 447168515Sgshapiro ** update ctx_wait - start of wait moment - 448168515Sgshapiro ** for timeout 449168515Sgshapiro */ 450168515Sgshapiro 451168515Sgshapiro if (ctx->ctx_wstate == WKST_READY_TO_WAIT) 452168515Sgshapiro ctx->ctx_wait = now; 453168515Sgshapiro 454168515Sgshapiro /* add the session to the pollfd array? */ 455168515Sgshapiro if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) || 456168515Sgshapiro (ctx->ctx_wstate == WKST_WAITING)) 457168515Sgshapiro { 458168515Sgshapiro /* 459168515Sgshapiro ** Resize the pollfd array if it 460168515Sgshapiro ** isn't large enough. 461168515Sgshapiro */ 462168515Sgshapiro 463168515Sgshapiro if (nfd >= dim_pfd) 464168515Sgshapiro { 465168515Sgshapiro struct pollfd *tpfd; 466168515Sgshapiro size_t new; 467168515Sgshapiro 468168515Sgshapiro new = (dim_pfd + PFD_STEP) * 469168515Sgshapiro sizeof(*tpfd); 470168515Sgshapiro tpfd = (struct pollfd *) 471168515Sgshapiro realloc(pfd, new); 472168515Sgshapiro if (tpfd != NULL) 473168515Sgshapiro { 474168515Sgshapiro pfd = tpfd; 475168515Sgshapiro dim_pfd += PFD_STEP; 476168515Sgshapiro } 477168515Sgshapiro else 478168515Sgshapiro { 479168515Sgshapiro smi_log(SMI_LOG_ERR, 480168515Sgshapiro "Failed to realloc pollfd array:%s", 481168515Sgshapiro sm_errstring(errno)); 482168515Sgshapiro } 483168515Sgshapiro } 484168515Sgshapiro 485168515Sgshapiro /* add the session to pollfd array */ 486168515Sgshapiro if (nfd < dim_pfd) 487168515Sgshapiro { 488168515Sgshapiro ctx->ctx_wstate = WKST_WAITING; 489168515Sgshapiro pfd[nfd].fd = ctx->ctx_sd; 490168515Sgshapiro pfd[nfd].events = MI_POLL_RD_FLAGS; 491168515Sgshapiro pfd[nfd].revents = 0; 492168515Sgshapiro nfd++; 493168515Sgshapiro } 494168515Sgshapiro } 495168515Sgshapiro } 496203004Sgshapiro rebuild_set = false; 497168515Sgshapiro } 498168515Sgshapiro 499168515Sgshapiro TASKMGR_UNLOCK(); 500168515Sgshapiro 501168515Sgshapiro /* Everything is ready, let's wait for an event */ 502244833Sgshapiro r = poll(pfd, nfd, POLL_TIMEOUT); 503168515Sgshapiro 504168515Sgshapiro POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d", 505168515Sgshapiro WAITFN, now, nfd)); 506168515Sgshapiro 507168515Sgshapiro /* timeout */ 508244833Sgshapiro if (r == 0) 509168515Sgshapiro continue; 510168515Sgshapiro 511168515Sgshapiro rebuild_set = true; 512168515Sgshapiro 513168515Sgshapiro /* error */ 514244833Sgshapiro if (r < 0) 515168515Sgshapiro { 516168515Sgshapiro if (errno == EINTR) 517168515Sgshapiro continue; 518168515Sgshapiro pcnt++; 519168515Sgshapiro smi_log(SMI_LOG_ERR, 520168515Sgshapiro "%s() failed (%s), %s", 521168515Sgshapiro WAITFN, sm_errstring(errno), 522168515Sgshapiro pcnt >= MAX_FAILS_S ? "abort" : "try again"); 523168515Sgshapiro 524168515Sgshapiro if (pcnt >= MAX_FAILS_S) 525168515Sgshapiro goto err; 526244833Sgshapiro continue; 527168515Sgshapiro } 528168515Sgshapiro pcnt = 0; 529168515Sgshapiro 530168515Sgshapiro /* something happened */ 531168515Sgshapiro for (i = 0; i < nfd; i++) 532168515Sgshapiro { 533168515Sgshapiro if (pfd[i].revents == 0) 534168515Sgshapiro continue; 535168515Sgshapiro 536168515Sgshapiro POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ", 537168515Sgshapiro WAITFN, i, nfd, 538168515Sgshapiro WAIT_FD(i))); 539168515Sgshapiro 540244833Sgshapiro /* has a worker signaled an end of task? */ 541168515Sgshapiro if (WAIT_FD(i) == RD_PIPE) 542168515Sgshapiro { 543223067Sgshapiro char evts[256]; 544223067Sgshapiro ssize_t r; 545168515Sgshapiro 546168515Sgshapiro POOL_LEV_DPRINTF(4, 547168515Sgshapiro ("PIPE WILL READ evt = %08X %08X", 548168515Sgshapiro pfd[i].events, pfd[i].revents)); 549168515Sgshapiro 550223067Sgshapiro r = 1; 551223067Sgshapiro while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0 552223067Sgshapiro && r != -1) 553168515Sgshapiro { 554223067Sgshapiro r = read(RD_PIPE, evts, sizeof(evts)); 555168515Sgshapiro } 556168515Sgshapiro 557168515Sgshapiro POOL_LEV_DPRINTF(4, 558168515Sgshapiro ("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]", 559223067Sgshapiro i, RD_PIPE, (int) r, evts[0])); 560168515Sgshapiro 561168515Sgshapiro if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0) 562168515Sgshapiro { 563168515Sgshapiro /* Exception handling */ 564168515Sgshapiro } 565168515Sgshapiro continue; 566168515Sgshapiro } 567168515Sgshapiro 568244833Sgshapiro /* 569244833Sgshapiro ** Not the pipe for workers waking us, 570244833Sgshapiro ** so must be something on an MTA connection. 571244833Sgshapiro */ 572244833Sgshapiro 573244833Sgshapiro TASKMGR_LOCK(); 574168515Sgshapiro SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) 575168515Sgshapiro { 576168515Sgshapiro if (ctx->ctx_wstate != WKST_WAITING) 577168515Sgshapiro continue; 578168515Sgshapiro 579168515Sgshapiro POOL_LEV_DPRINTF(4, 580168515Sgshapiro ("Checking context sd=%d - fd=%d ", 581168515Sgshapiro ctx->ctx_sd , WAIT_FD(i))); 582168515Sgshapiro 583168515Sgshapiro if (ctx->ctx_sd == pfd[i].fd) 584168515Sgshapiro { 585168515Sgshapiro 586168515Sgshapiro POOL_LEV_DPRINTF(4, 587168515Sgshapiro ("TASK: found %d for fd[%d]=%d", 588168515Sgshapiro ctx->ctx_sid, i, WAIT_FD(i))); 589168515Sgshapiro 590168515Sgshapiro if (Tskmgr.tm_nb_idle > 0) 591168515Sgshapiro { 592168515Sgshapiro ctx->ctx_wstate = WKST_READY_TO_RUN; 593168515Sgshapiro TASKMGR_COND_SIGNAL(); 594168515Sgshapiro } 595168515Sgshapiro else 596168515Sgshapiro { 597168515Sgshapiro ctx->ctx_wstate = WKST_RUNNING; 598168515Sgshapiro LAUNCH_WORKER(ctx); 599168515Sgshapiro } 600168515Sgshapiro break; 601168515Sgshapiro } 602168515Sgshapiro } 603244833Sgshapiro TASKMGR_UNLOCK(); 604168515Sgshapiro 605168515Sgshapiro POOL_LEV_DPRINTF(4, 606168515Sgshapiro ("TASK %s FOUND - Checking PIPE for fd[%d]", 607168515Sgshapiro ctx != NULL ? "" : "NOT", WAIT_FD(i))); 608168515Sgshapiro } 609168515Sgshapiro } 610168515Sgshapiro 611168515Sgshapiro err: 612168515Sgshapiro if (pfd != NULL) 613168515Sgshapiro free(pfd); 614168515Sgshapiro 615168515Sgshapiro Tskmgr.tm_signature = 0; 616244833Sgshapiro#if 0 617244833Sgshapiro /* 618244833Sgshapiro ** Do not clean up ctx -- it can cause double-free()s. 619244833Sgshapiro ** The program is shutting down anyway, so it's not worth the trouble. 620244833Sgshapiro ** There is a more complex solution that prevents race conditions 621244833Sgshapiro ** while accessing ctx, but that's maybe for a later version. 622244833Sgshapiro */ 623244833Sgshapiro 624168515Sgshapiro for (;;) 625168515Sgshapiro { 626168515Sgshapiro SMFICTX_PTR ctx; 627168515Sgshapiro 628168515Sgshapiro ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD); 629168515Sgshapiro if (ctx == NULL) 630168515Sgshapiro break; 631168515Sgshapiro mi_close_session(ctx); 632168515Sgshapiro } 633244833Sgshapiro#endif 634168515Sgshapiro 635168515Sgshapiro (void) smutex_destroy(&Tskmgr.tm_w_mutex); 636168515Sgshapiro (void) scond_destroy(&Tskmgr.tm_w_cond); 637168515Sgshapiro 638168515Sgshapiro return NULL; 639168515Sgshapiro} 640168515Sgshapiro 641168515Sgshapiro/* 642168515Sgshapiro** Look for a task ready to run. 643168515Sgshapiro** Value of ctx is NULL or a pointer to a task ready to run. 644168515Sgshapiro*/ 645168515Sgshapiro 646168515Sgshapiro#define GET_TASK_READY_TO_RUN() \ 647168515Sgshapiro SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link) \ 648168515Sgshapiro { \ 649168515Sgshapiro if (ctx->ctx_wstate == WKST_READY_TO_RUN) \ 650168515Sgshapiro { \ 651168515Sgshapiro ctx->ctx_wstate = WKST_RUNNING; \ 652168515Sgshapiro break; \ 653168515Sgshapiro } \ 654168515Sgshapiro } 655168515Sgshapiro 656168515Sgshapiro/* 657168515Sgshapiro** MI_WORKER -- worker thread 658168515Sgshapiro** executes tasks distributed by the mi_pool_controller 659168515Sgshapiro** or by mi_start_session 660168515Sgshapiro** 661168515Sgshapiro** Parameters: 662168515Sgshapiro** arg -- pointer to context structure 663168515Sgshapiro** 664168515Sgshapiro** Returns: 665168515Sgshapiro** NULL pointer 666168515Sgshapiro*/ 667168515Sgshapiro 668168515Sgshapirostatic void * 669168515Sgshapiromi_worker(arg) 670168515Sgshapiro void *arg; 671168515Sgshapiro{ 672168515Sgshapiro SMFICTX_PTR ctx; 673168515Sgshapiro bool done; 674168515Sgshapiro sthread_t t_id; 675168515Sgshapiro int r; 676168515Sgshapiro 677168515Sgshapiro ctx = (SMFICTX_PTR) arg; 678168515Sgshapiro done = false; 679168515Sgshapiro if (ctx != NULL) 680168515Sgshapiro ctx->ctx_wstate = WKST_RUNNING; 681168515Sgshapiro 682168515Sgshapiro t_id = sthread_get_id(); 683168515Sgshapiro if (pthread_detach(t_id) != 0) 684168515Sgshapiro { 685168515Sgshapiro smi_log(SMI_LOG_ERR, "Failed to detach worker thread"); 686168515Sgshapiro if (ctx != NULL) 687168515Sgshapiro ctx->ctx_wstate = WKST_READY_TO_RUN; 688168515Sgshapiro return NULL; 689168515Sgshapiro } 690168515Sgshapiro 691168515Sgshapiro TASKMGR_LOCK(); 692168515Sgshapiro Tskmgr.tm_nb_workers++; 693168515Sgshapiro TASKMGR_UNLOCK(); 694168515Sgshapiro 695168515Sgshapiro while (!done) 696168515Sgshapiro { 697168515Sgshapiro if (mi_stop() != MILTER_CONT) 698168515Sgshapiro break; 699168515Sgshapiro 700168515Sgshapiro /* let's handle next task... */ 701168515Sgshapiro if (ctx != NULL) 702168515Sgshapiro { 703168515Sgshapiro int res; 704168515Sgshapiro 705168515Sgshapiro POOL_LEV_DPRINTF(4, 706168515Sgshapiro ("worker %d: new task -> let's handle it", 707168515Sgshapiro t_id)); 708168515Sgshapiro res = mi_engine(ctx); 709168515Sgshapiro POOL_LEV_DPRINTF(4, 710168515Sgshapiro ("worker %d: mi_engine returned %d", t_id, res)); 711168515Sgshapiro 712168515Sgshapiro TASKMGR_LOCK(); 713168515Sgshapiro if (res != MI_CONTINUE) 714168515Sgshapiro { 715168515Sgshapiro ctx->ctx_wstate = WKST_CLOSING; 716168515Sgshapiro 717168515Sgshapiro /* 718168515Sgshapiro ** Delete context from linked list of 719168515Sgshapiro ** sessions and close session. 720168515Sgshapiro */ 721168515Sgshapiro 722168515Sgshapiro mi_close_session(ctx); 723168515Sgshapiro } 724168515Sgshapiro else 725168515Sgshapiro { 726168515Sgshapiro ctx->ctx_wstate = WKST_READY_TO_WAIT; 727168515Sgshapiro 728168515Sgshapiro POOL_LEV_DPRINTF(4, 729168515Sgshapiro ("writing to event pipe...")); 730168515Sgshapiro 731168515Sgshapiro /* 732168515Sgshapiro ** Signal task controller to add new session 733168515Sgshapiro ** to poll set. 734168515Sgshapiro */ 735168515Sgshapiro 736168515Sgshapiro PIPE_SEND_SIGNAL(); 737168515Sgshapiro } 738168515Sgshapiro TASKMGR_UNLOCK(); 739168515Sgshapiro ctx = NULL; 740168515Sgshapiro 741168515Sgshapiro } 742168515Sgshapiro 743168515Sgshapiro /* check if there is any task waiting to be served */ 744168515Sgshapiro TASKMGR_LOCK(); 745168515Sgshapiro 746168515Sgshapiro GET_TASK_READY_TO_RUN(); 747168515Sgshapiro 748168515Sgshapiro /* Got a task? */ 749168515Sgshapiro if (ctx != NULL) 750168515Sgshapiro { 751168515Sgshapiro TASKMGR_UNLOCK(); 752168515Sgshapiro continue; 753168515Sgshapiro } 754168515Sgshapiro 755168515Sgshapiro /* 756168515Sgshapiro ** if not, let's check if there is enough idle workers 757168515Sgshapiro ** if yes: quit 758168515Sgshapiro */ 759168515Sgshapiro 760168515Sgshapiro if (Tskmgr.tm_nb_workers > MIN_WORKERS && 761168515Sgshapiro Tskmgr.tm_nb_idle > MIN_IDLE) 762168515Sgshapiro done = true; 763168515Sgshapiro 764168515Sgshapiro POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id, 765168515Sgshapiro Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1)); 766168515Sgshapiro 767168515Sgshapiro if (done) 768168515Sgshapiro { 769168515Sgshapiro POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id)); 770168515Sgshapiro Tskmgr.tm_nb_workers--; 771168515Sgshapiro TASKMGR_UNLOCK(); 772168515Sgshapiro continue; 773168515Sgshapiro } 774168515Sgshapiro 775168515Sgshapiro /* 776168515Sgshapiro ** if no task ready to run, wait for another one 777168515Sgshapiro */ 778168515Sgshapiro 779168515Sgshapiro Tskmgr.tm_nb_idle++; 780168515Sgshapiro TASKMGR_COND_WAIT(); 781168515Sgshapiro Tskmgr.tm_nb_idle--; 782168515Sgshapiro 783168515Sgshapiro /* look for a task */ 784168515Sgshapiro GET_TASK_READY_TO_RUN(); 785168515Sgshapiro 786168515Sgshapiro TASKMGR_UNLOCK(); 787168515Sgshapiro } 788168515Sgshapiro return NULL; 789168515Sgshapiro} 790168515Sgshapiro 791168515Sgshapiro/* 792168515Sgshapiro** MI_LIST_ADD_CTX -- add new session to linked list 793168515Sgshapiro** 794168515Sgshapiro** Parameters: 795168515Sgshapiro** ctx -- context structure 796168515Sgshapiro** 797168515Sgshapiro** Returns: 798168515Sgshapiro** MI_FAILURE/MI_SUCCESS 799168515Sgshapiro*/ 800168515Sgshapiro 801168515Sgshapirostatic int 802168515Sgshapiromi_list_add_ctx(ctx) 803168515Sgshapiro SMFICTX_PTR ctx; 804168515Sgshapiro{ 805168515Sgshapiro SM_ASSERT(ctx != NULL); 806168515Sgshapiro SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link); 807168515Sgshapiro return MI_SUCCESS; 808168515Sgshapiro} 809168515Sgshapiro 810168515Sgshapiro/* 811168515Sgshapiro** MI_LIST_DEL_CTX -- remove session from linked list when finished 812168515Sgshapiro** 813168515Sgshapiro** Parameters: 814168515Sgshapiro** ctx -- context structure 815168515Sgshapiro** 816168515Sgshapiro** Returns: 817168515Sgshapiro** MI_FAILURE/MI_SUCCESS 818168515Sgshapiro*/ 819168515Sgshapiro 820168515Sgshapirostatic int 821168515Sgshapiromi_list_del_ctx(ctx) 822168515Sgshapiro SMFICTX_PTR ctx; 823168515Sgshapiro{ 824168515Sgshapiro SM_ASSERT(ctx != NULL); 825168515Sgshapiro if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD)) 826168515Sgshapiro return MI_FAILURE; 827168515Sgshapiro 828168515Sgshapiro SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link); 829168515Sgshapiro return MI_SUCCESS; 830168515Sgshapiro} 831168515Sgshapiro#endif /* _FFR_WORKERS_POOL */ 832