1168515Sgshapiro/*
2244833Sgshapiro *  Copyright (c) 2003-2004, 2007, 2009-2012 Sendmail, Inc. and its suppliers.
3168515Sgshapiro *	All rights reserved.
4168515Sgshapiro *
5168515Sgshapiro * By using this file, you agree to the terms and conditions set
6168515Sgshapiro * forth in the LICENSE file which can be found at the top level of
7168515Sgshapiro * the sendmail distribution.
8168515Sgshapiro *
9168515Sgshapiro * Contributed by Jose Marcio Martins da Cruz - Ecole des Mines de Paris
10168515Sgshapiro *   Jose-Marcio.Martins@ensmp.fr
11168515Sgshapiro */
12168515Sgshapiro
13168515Sgshapiro#include <sm/gen.h>
14244833SgshapiroSM_RCSID("@(#)$Id: worker.c,v 8.24 2012/03/13 15:37:46 ca Exp $")
15168515Sgshapiro
16168515Sgshapiro#include "libmilter.h"
17168515Sgshapiro
18168515Sgshapiro#if _FFR_WORKERS_POOL
19168515Sgshapiro
20168515Sgshapirotypedef struct taskmgr_S taskmgr_T;
21168515Sgshapiro
22168515Sgshapiro#define TM_SIGNATURE		0x23021957
23168515Sgshapiro
24168515Sgshapirostruct taskmgr_S
25168515Sgshapiro{
26168515Sgshapiro	long		tm_signature; /* has the controller been initialized */
27168515Sgshapiro	sthread_t	tm_tid;	/* thread id of controller */
28168515Sgshapiro	smfi_hd_T	tm_ctx_head; /* head of the linked list of contexts */
29168515Sgshapiro
30168515Sgshapiro	int		tm_nb_workers;	/* number of workers in the pool */
31168515Sgshapiro	int		tm_nb_idle;	/* number of workers waiting */
32168515Sgshapiro
33168515Sgshapiro	int		tm_p[2];	/* poll control pipe */
34168515Sgshapiro
35168515Sgshapiro	smutex_t	tm_w_mutex;	/* linked list access mutex */
36168515Sgshapiro	scond_t		tm_w_cond;	/* */
37168515Sgshapiro};
38168515Sgshapiro
39168515Sgshapirostatic taskmgr_T     Tskmgr = {0};
40168515Sgshapiro
41168515Sgshapiro#define WRK_CTX_HEAD	Tskmgr.tm_ctx_head
42168515Sgshapiro
43168515Sgshapiro#define RD_PIPE	(Tskmgr.tm_p[0])
44168515Sgshapiro#define WR_PIPE	(Tskmgr.tm_p[1])
45168515Sgshapiro
46168515Sgshapiro#define PIPE_SEND_SIGNAL()						\
47168515Sgshapiro	do								\
48168515Sgshapiro	{								\
49168515Sgshapiro		char evt = 0x5a;					\
50168515Sgshapiro		int fd = WR_PIPE;					\
51168515Sgshapiro		if (write(fd, &evt, sizeof(evt)) != sizeof(evt))	\
52168515Sgshapiro			smi_log(SMI_LOG_ERR,				\
53168515Sgshapiro				"Error writing to event pipe: %s",	\
54168515Sgshapiro				sm_errstring(errno));			\
55168515Sgshapiro	} while (0)
56168515Sgshapiro
57168515Sgshapiro#ifndef USE_PIPE_WAKE_POLL
58168515Sgshapiro# define USE_PIPE_WAKE_POLL 1
59168515Sgshapiro#endif /* USE_PIPE_WAKE_POLL */
60168515Sgshapiro
61168515Sgshapiro/* poll check periodicity (default 10000 - 10 s) */
62168515Sgshapiro#define POLL_TIMEOUT   10000
63168515Sgshapiro
64168515Sgshapiro/* worker conditional wait timeout (default 10 s) */
65168515Sgshapiro#define COND_TIMEOUT     10
66168515Sgshapiro
67168515Sgshapiro/* functions */
68168515Sgshapirostatic int mi_close_session __P((SMFICTX_PTR));
69168515Sgshapiro
70168515Sgshapirostatic void *mi_worker __P((void *));
71168515Sgshapirostatic void *mi_pool_controller __P((void *));
72168515Sgshapiro
73168515Sgshapirostatic int mi_list_add_ctx __P((SMFICTX_PTR));
74168515Sgshapirostatic int mi_list_del_ctx __P((SMFICTX_PTR));
75168515Sgshapiro
76168515Sgshapiro/*
77168515Sgshapiro**  periodicity of cleaning up old sessions (timedout)
78168515Sgshapiro**	sessions list will be checked to find old inactive
79168515Sgshapiro**	sessions each DT_CHECK_OLD_SESSIONS sec
80168515Sgshapiro*/
81168515Sgshapiro
82168515Sgshapiro#define DT_CHECK_OLD_SESSIONS   600
83168515Sgshapiro
84168515Sgshapiro#ifndef OLD_SESSION_TIMEOUT
85168515Sgshapiro# define OLD_SESSION_TIMEOUT      ctx->ctx_timeout
86168515Sgshapiro#endif /* OLD_SESSION_TIMEOUT */
87168515Sgshapiro
88168515Sgshapiro/* session states - with respect to the pool of workers */
89168515Sgshapiro#define WKST_INIT		0	/* initial state */
90168515Sgshapiro#define WKST_READY_TO_RUN	1	/* command ready do be read */
91168515Sgshapiro#define WKST_RUNNING		2	/* session running on a worker */
92168515Sgshapiro#define WKST_READY_TO_WAIT	3	/* session just finished by a worker */
93168515Sgshapiro#define WKST_WAITING		4	/* waiting for new command */
94168515Sgshapiro#define WKST_CLOSING		5	/* session finished */
95168515Sgshapiro
96168515Sgshapiro#ifndef MIN_WORKERS
97168515Sgshapiro# define MIN_WORKERS	2  /* minimum number of threads to keep around */
98168515Sgshapiro#endif
99168515Sgshapiro
100168515Sgshapiro#define MIN_IDLE	1  /* minimum number of idle threads */
101168515Sgshapiro
102168515Sgshapiro
103168515Sgshapiro/*
104168515Sgshapiro**  Macros for threads and mutex management
105168515Sgshapiro*/
106168515Sgshapiro
107168515Sgshapiro#define TASKMGR_LOCK()							\
108168515Sgshapiro	do								\
109168515Sgshapiro	{								\
110168515Sgshapiro		if (!smutex_lock(&Tskmgr.tm_w_mutex))			\
111168515Sgshapiro			smi_log(SMI_LOG_ERR, "TASKMGR_LOCK error");	\
112168515Sgshapiro	} while (0)
113168515Sgshapiro
114168515Sgshapiro#define TASKMGR_UNLOCK()						\
115168515Sgshapiro	do								\
116168515Sgshapiro	{								\
117168515Sgshapiro		if (!smutex_unlock(&Tskmgr.tm_w_mutex))			\
118168515Sgshapiro			smi_log(SMI_LOG_ERR, "TASKMGR_UNLOCK error");	\
119168515Sgshapiro	} while (0)
120168515Sgshapiro
121168515Sgshapiro#define	TASKMGR_COND_WAIT()						\
122168515Sgshapiro	scond_timedwait(&Tskmgr.tm_w_cond, &Tskmgr.tm_w_mutex, COND_TIMEOUT)
123168515Sgshapiro
124168515Sgshapiro#define	TASKMGR_COND_SIGNAL()						\
125168515Sgshapiro	do								\
126168515Sgshapiro	{								\
127168515Sgshapiro		if (scond_signal(&Tskmgr.tm_w_cond) != 0)		\
128168515Sgshapiro			smi_log(SMI_LOG_ERR, "TASKMGR_COND_SIGNAL error"); \
129168515Sgshapiro	} while (0)
130168515Sgshapiro
131168515Sgshapiro#define LAUNCH_WORKER(ctx)						\
132168515Sgshapiro	do								\
133168515Sgshapiro	{								\
134168515Sgshapiro		int r;							\
135168515Sgshapiro		sthread_t tid;						\
136168515Sgshapiro									\
137168515Sgshapiro		if ((r = thread_create(&tid, mi_worker, ctx)) != 0)	\
138168515Sgshapiro			smi_log(SMI_LOG_ERR, "LAUNCH_WORKER error: %s",\
139168515Sgshapiro				sm_errstring(r));			\
140168515Sgshapiro	} while (0)
141168515Sgshapiro
142168515Sgshapiro#if POOL_DEBUG
143168515Sgshapiro# define POOL_LEV_DPRINTF(lev, x)					\
144244833Sgshapiro	do								\
145244833Sgshapiro	{								\
146168515Sgshapiro		if ((lev) < ctx->ctx_dbg)				\
147168515Sgshapiro			sm_dprintf x;					\
148168515Sgshapiro	} while (0)
149168515Sgshapiro#else /* POOL_DEBUG */
150168515Sgshapiro# define POOL_LEV_DPRINTF(lev, x)
151168515Sgshapiro#endif /* POOL_DEBUG */
152168515Sgshapiro
153168515Sgshapiro/*
154168515Sgshapiro**  MI_START_SESSION -- Start a session in the pool of workers
155168515Sgshapiro**
156168515Sgshapiro**	Parameters:
157168515Sgshapiro**		ctx -- context structure
158168515Sgshapiro**
159168515Sgshapiro**	Returns:
160168515Sgshapiro**		MI_SUCCESS/MI_FAILURE
161168515Sgshapiro*/
162168515Sgshapiro
163168515Sgshapiroint
164168515Sgshapiromi_start_session(ctx)
165168515Sgshapiro	SMFICTX_PTR ctx;
166168515Sgshapiro{
167168515Sgshapiro	static long id = 0;
168168515Sgshapiro
169223067Sgshapiro	/* this can happen if the milter is shutting down */
170223067Sgshapiro	if (Tskmgr.tm_signature != TM_SIGNATURE)
171223067Sgshapiro		return MI_FAILURE;
172168515Sgshapiro	SM_ASSERT(ctx != NULL);
173168515Sgshapiro	POOL_LEV_DPRINTF(4, ("PIPE r=[%d] w=[%d]", RD_PIPE, WR_PIPE));
174168515Sgshapiro	TASKMGR_LOCK();
175168515Sgshapiro
176168515Sgshapiro	if (mi_list_add_ctx(ctx) != MI_SUCCESS)
177168515Sgshapiro	{
178168515Sgshapiro		TASKMGR_UNLOCK();
179168515Sgshapiro		return MI_FAILURE;
180168515Sgshapiro	}
181168515Sgshapiro
182168515Sgshapiro	ctx->ctx_sid = id++;
183168515Sgshapiro
184168515Sgshapiro	/* if there is an idle worker, signal it, otherwise start new worker */
185168515Sgshapiro	if (Tskmgr.tm_nb_idle > 0)
186168515Sgshapiro	{
187168515Sgshapiro		ctx->ctx_wstate = WKST_READY_TO_RUN;
188168515Sgshapiro		TASKMGR_COND_SIGNAL();
189168515Sgshapiro	}
190168515Sgshapiro	else
191168515Sgshapiro	{
192168515Sgshapiro		ctx->ctx_wstate = WKST_RUNNING;
193168515Sgshapiro		LAUNCH_WORKER(ctx);
194168515Sgshapiro	}
195168515Sgshapiro	TASKMGR_UNLOCK();
196168515Sgshapiro	return MI_SUCCESS;
197168515Sgshapiro}
198168515Sgshapiro
199168515Sgshapiro/*
200168515Sgshapiro**  MI_CLOSE_SESSION -- Close a session and clean up data structures
201168515Sgshapiro**
202168515Sgshapiro**	Parameters:
203168515Sgshapiro**		ctx -- context structure
204168515Sgshapiro**
205168515Sgshapiro**	Returns:
206168515Sgshapiro**		MI_SUCCESS/MI_FAILURE
207168515Sgshapiro*/
208168515Sgshapiro
209168515Sgshapirostatic int
210168515Sgshapiromi_close_session(ctx)
211168515Sgshapiro	SMFICTX_PTR ctx;
212168515Sgshapiro{
213168515Sgshapiro	SM_ASSERT(ctx != NULL);
214168515Sgshapiro
215168515Sgshapiro	(void) mi_list_del_ctx(ctx);
216203004Sgshapiro	mi_clr_ctx(ctx);
217168515Sgshapiro
218168515Sgshapiro	return MI_SUCCESS;
219168515Sgshapiro}
220168515Sgshapiro
221168515Sgshapiro/*
222223067Sgshapiro**  NONBLOCKING -- set nonblocking mode for a file descriptor.
223223067Sgshapiro**
224223067Sgshapiro**	Parameters:
225223067Sgshapiro**		fd -- file descriptor
226223067Sgshapiro**		name -- name for (error) logging
227223067Sgshapiro**
228223067Sgshapiro**	Returns:
229223067Sgshapiro**		MI_SUCCESS/MI_FAILURE
230223067Sgshapiro*/
231223067Sgshapiro
232223067Sgshapirostatic int
233223067Sgshapirononblocking(int fd, const char *name)
234223067Sgshapiro{
235223067Sgshapiro	int r;
236223067Sgshapiro
237223067Sgshapiro	errno = 0;
238223067Sgshapiro	r = fcntl(fd, F_GETFL, 0);
239223067Sgshapiro	if (r == -1)
240223067Sgshapiro	{
241223067Sgshapiro		smi_log(SMI_LOG_ERR, "fcntl(%s, F_GETFL)=%s",
242223067Sgshapiro			name, sm_errstring(errno));
243223067Sgshapiro		return MI_FAILURE;
244223067Sgshapiro	}
245223067Sgshapiro	errno = 0;
246223067Sgshapiro	r = fcntl(fd, F_SETFL, r | O_NONBLOCK);
247223067Sgshapiro	if (r == -1)
248223067Sgshapiro	{
249223067Sgshapiro		smi_log(SMI_LOG_ERR, "fcntl(%s, F_SETFL, O_NONBLOCK)=%s",
250223067Sgshapiro			name, sm_errstring(errno));
251223067Sgshapiro		return MI_FAILURE;
252223067Sgshapiro	}
253223067Sgshapiro	return MI_SUCCESS;
254223067Sgshapiro}
255223067Sgshapiro
256223067Sgshapiro/*
257168515Sgshapiro**  MI_POOL_CONTROLER_INIT -- Launch the worker pool controller
258168515Sgshapiro**		Must be called before starting sessions.
259168515Sgshapiro**
260168515Sgshapiro**	Parameters:
261168515Sgshapiro**		none
262168515Sgshapiro**
263168515Sgshapiro**	Returns:
264168515Sgshapiro**		MI_SUCCESS/MI_FAILURE
265168515Sgshapiro*/
266168515Sgshapiro
267168515Sgshapiroint
268168515Sgshapiromi_pool_controller_init()
269168515Sgshapiro{
270168515Sgshapiro	sthread_t tid;
271168515Sgshapiro	int r, i;
272168515Sgshapiro
273168515Sgshapiro	if (Tskmgr.tm_signature == TM_SIGNATURE)
274168515Sgshapiro		return MI_SUCCESS;
275168515Sgshapiro
276168515Sgshapiro	SM_TAILQ_INIT(&WRK_CTX_HEAD);
277168515Sgshapiro	Tskmgr.tm_tid = (sthread_t) -1;
278168515Sgshapiro	Tskmgr.tm_nb_workers = 0;
279168515Sgshapiro	Tskmgr.tm_nb_idle = 0;
280168515Sgshapiro
281168515Sgshapiro	if (pipe(Tskmgr.tm_p) != 0)
282168515Sgshapiro	{
283168515Sgshapiro		smi_log(SMI_LOG_ERR, "can't create event pipe: %s",
284203004Sgshapiro			sm_errstring(errno));
285168515Sgshapiro		return MI_FAILURE;
286168515Sgshapiro	}
287223067Sgshapiro	r = nonblocking(WR_PIPE, "WR_PIPE");
288223067Sgshapiro	if (r != MI_SUCCESS)
289223067Sgshapiro		return r;
290223067Sgshapiro	r = nonblocking(RD_PIPE, "RD_PIPE");
291223067Sgshapiro	if (r != MI_SUCCESS)
292223067Sgshapiro		return r;
293168515Sgshapiro
294168515Sgshapiro	(void) smutex_init(&Tskmgr.tm_w_mutex);
295168515Sgshapiro	(void) scond_init(&Tskmgr.tm_w_cond);
296168515Sgshapiro
297168515Sgshapiro	/* Launch the pool controller */
298168515Sgshapiro	if ((r = thread_create(&tid, mi_pool_controller, (void *) NULL)) != 0)
299168515Sgshapiro	{
300168515Sgshapiro		smi_log(SMI_LOG_ERR, "can't create controller thread: %s",
301168515Sgshapiro			sm_errstring(r));
302168515Sgshapiro		return MI_FAILURE;
303168515Sgshapiro	}
304168515Sgshapiro	Tskmgr.tm_tid = tid;
305168515Sgshapiro	Tskmgr.tm_signature = TM_SIGNATURE;
306168515Sgshapiro
307168515Sgshapiro	/* Create the pool of workers */
308168515Sgshapiro	for (i = 0; i < MIN_WORKERS; i++)
309168515Sgshapiro	{
310168515Sgshapiro		if ((r = thread_create(&tid, mi_worker, (void *) NULL)) != 0)
311168515Sgshapiro		{
312168515Sgshapiro			smi_log(SMI_LOG_ERR, "can't create workers crew: %s",
313168515Sgshapiro				sm_errstring(r));
314168515Sgshapiro			return MI_FAILURE;
315168515Sgshapiro		}
316168515Sgshapiro	}
317168515Sgshapiro
318168515Sgshapiro	return MI_SUCCESS;
319168515Sgshapiro}
320168515Sgshapiro
321168515Sgshapiro/*
322168515Sgshapiro**  MI_POOL_CONTROLLER -- manage the pool of workers
323168515Sgshapiro**	This thread must be running when listener begins
324168515Sgshapiro**	starting sessions
325168515Sgshapiro**
326168515Sgshapiro**	Parameters:
327168515Sgshapiro**		arg -- unused
328168515Sgshapiro**
329168515Sgshapiro**	Returns:
330168515Sgshapiro**		NULL
331168515Sgshapiro**
332168515Sgshapiro**	Control flow:
333168515Sgshapiro**		for (;;)
334168515Sgshapiro**			Look for timed out sessions
335168515Sgshapiro**			Select sessions to wait for sendmail command
336168515Sgshapiro**			Poll set of file descriptors
337168515Sgshapiro**			if timeout
338168515Sgshapiro**				continue
339168515Sgshapiro**			For each file descriptor ready
340168515Sgshapiro**				launch new thread if no worker available
341168515Sgshapiro**				else
342168515Sgshapiro**				signal waiting worker
343168515Sgshapiro*/
344168515Sgshapiro
345168515Sgshapiro/* Poll structure array (pollfd) size step */
346168515Sgshapiro#define PFD_STEP	256
347168515Sgshapiro
348168515Sgshapiro#define WAIT_FD(i)	(pfd[i].fd)
349168515Sgshapiro#define WAITFN		"POLL"
350168515Sgshapiro
351168515Sgshapirostatic void *
352168515Sgshapiromi_pool_controller(arg)
353168515Sgshapiro	void *arg;
354168515Sgshapiro{
355168515Sgshapiro	struct pollfd *pfd = NULL;
356168515Sgshapiro	int dim_pfd = 0;
357168515Sgshapiro	bool rebuild_set = true;
358168515Sgshapiro	int pcnt = 0; /* error count for poll() failures */
359203004Sgshapiro	time_t lastcheck;
360168515Sgshapiro
361168515Sgshapiro	Tskmgr.tm_tid = sthread_get_id();
362168515Sgshapiro	if (pthread_detach(Tskmgr.tm_tid) != 0)
363168515Sgshapiro	{
364168515Sgshapiro		smi_log(SMI_LOG_ERR, "Failed to detach pool controller thread");
365168515Sgshapiro		return NULL;
366168515Sgshapiro	}
367168515Sgshapiro
368168515Sgshapiro	pfd = (struct pollfd *) malloc(PFD_STEP * sizeof(struct pollfd));
369168515Sgshapiro	if (pfd == NULL)
370168515Sgshapiro	{
371168515Sgshapiro		smi_log(SMI_LOG_ERR, "Failed to malloc pollfd array: %s",
372168515Sgshapiro			sm_errstring(errno));
373168515Sgshapiro		return NULL;
374168515Sgshapiro	}
375168515Sgshapiro	dim_pfd = PFD_STEP;
376168515Sgshapiro
377203004Sgshapiro	lastcheck = time(NULL);
378168515Sgshapiro	for (;;)
379168515Sgshapiro	{
380168515Sgshapiro		SMFICTX_PTR ctx;
381244833Sgshapiro		int nfd, r, i;
382168515Sgshapiro		time_t now;
383168515Sgshapiro
384168515Sgshapiro		POOL_LEV_DPRINTF(4, ("Let's %s again...", WAITFN));
385168515Sgshapiro
386168515Sgshapiro		if (mi_stop() != MILTER_CONT)
387168515Sgshapiro			break;
388168515Sgshapiro
389168515Sgshapiro		TASKMGR_LOCK();
390168515Sgshapiro
391168515Sgshapiro		now = time(NULL);
392168515Sgshapiro
393168515Sgshapiro		/* check for timed out sessions? */
394168515Sgshapiro		if (lastcheck + DT_CHECK_OLD_SESSIONS < now)
395168515Sgshapiro		{
396203004Sgshapiro			ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
397203004Sgshapiro			while (ctx != SM_TAILQ_END(&WRK_CTX_HEAD))
398168515Sgshapiro			{
399203004Sgshapiro				SMFICTX_PTR ctx_nxt;
400203004Sgshapiro
401203004Sgshapiro				ctx_nxt = SM_TAILQ_NEXT(ctx, ctx_link);
402168515Sgshapiro				if (ctx->ctx_wstate == WKST_WAITING)
403168515Sgshapiro				{
404168515Sgshapiro					if (ctx->ctx_wait == 0)
405168515Sgshapiro						ctx->ctx_wait = now;
406203004Sgshapiro					else if (ctx->ctx_wait + OLD_SESSION_TIMEOUT
407203004Sgshapiro						 < now)
408168515Sgshapiro					{
409203004Sgshapiro						/* if session timed out, close it */
410168515Sgshapiro						sfsistat (*fi_close) __P((SMFICTX *));
411168515Sgshapiro
412168515Sgshapiro						POOL_LEV_DPRINTF(4,
413168515Sgshapiro							("Closing old connection: sd=%d id=%d",
414168515Sgshapiro							ctx->ctx_sd,
415168515Sgshapiro							ctx->ctx_sid));
416168515Sgshapiro
417168515Sgshapiro						if ((fi_close = ctx->ctx_smfi->xxfi_close) != NULL)
418168515Sgshapiro							(void) (*fi_close)(ctx);
419168515Sgshapiro
420168515Sgshapiro						mi_close_session(ctx);
421168515Sgshapiro					}
422168515Sgshapiro				}
423203004Sgshapiro				ctx = ctx_nxt;
424168515Sgshapiro			}
425168515Sgshapiro			lastcheck = now;
426168515Sgshapiro		}
427168515Sgshapiro
428168515Sgshapiro		if (rebuild_set)
429168515Sgshapiro		{
430168515Sgshapiro			/*
431168515Sgshapiro			**  Initialize poll set.
432168515Sgshapiro			**  Insert into the poll set the file descriptors of
433168515Sgshapiro			**  all sessions waiting for a command from sendmail.
434168515Sgshapiro			*/
435168515Sgshapiro
436168515Sgshapiro			nfd = 0;
437168515Sgshapiro
438168515Sgshapiro			/* begin with worker pipe */
439168515Sgshapiro			pfd[nfd].fd = RD_PIPE;
440168515Sgshapiro			pfd[nfd].events = MI_POLL_RD_FLAGS;
441168515Sgshapiro			pfd[nfd].revents = 0;
442168515Sgshapiro			nfd++;
443168515Sgshapiro
444168515Sgshapiro			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
445168515Sgshapiro			{
446168515Sgshapiro				/*
447168515Sgshapiro				**  update ctx_wait - start of wait moment -
448168515Sgshapiro				**  for timeout
449168515Sgshapiro				*/
450168515Sgshapiro
451168515Sgshapiro				if (ctx->ctx_wstate == WKST_READY_TO_WAIT)
452168515Sgshapiro					ctx->ctx_wait = now;
453168515Sgshapiro
454168515Sgshapiro				/* add the session to the pollfd array? */
455168515Sgshapiro				if ((ctx->ctx_wstate == WKST_READY_TO_WAIT) ||
456168515Sgshapiro				    (ctx->ctx_wstate == WKST_WAITING))
457168515Sgshapiro				{
458168515Sgshapiro					/*
459168515Sgshapiro					**  Resize the pollfd array if it
460168515Sgshapiro					**  isn't large enough.
461168515Sgshapiro					*/
462168515Sgshapiro
463168515Sgshapiro					if (nfd >= dim_pfd)
464168515Sgshapiro					{
465168515Sgshapiro						struct pollfd *tpfd;
466168515Sgshapiro						size_t new;
467168515Sgshapiro
468168515Sgshapiro						new = (dim_pfd + PFD_STEP) *
469168515Sgshapiro							sizeof(*tpfd);
470168515Sgshapiro						tpfd = (struct pollfd *)
471168515Sgshapiro							realloc(pfd, new);
472168515Sgshapiro						if (tpfd != NULL)
473168515Sgshapiro						{
474168515Sgshapiro							pfd = tpfd;
475168515Sgshapiro							dim_pfd += PFD_STEP;
476168515Sgshapiro						}
477168515Sgshapiro						else
478168515Sgshapiro						{
479168515Sgshapiro							smi_log(SMI_LOG_ERR,
480168515Sgshapiro								"Failed to realloc pollfd array:%s",
481168515Sgshapiro								sm_errstring(errno));
482168515Sgshapiro						}
483168515Sgshapiro					}
484168515Sgshapiro
485168515Sgshapiro					/* add the session to pollfd array */
486168515Sgshapiro					if (nfd < dim_pfd)
487168515Sgshapiro					{
488168515Sgshapiro						ctx->ctx_wstate = WKST_WAITING;
489168515Sgshapiro						pfd[nfd].fd = ctx->ctx_sd;
490168515Sgshapiro						pfd[nfd].events = MI_POLL_RD_FLAGS;
491168515Sgshapiro						pfd[nfd].revents = 0;
492168515Sgshapiro						nfd++;
493168515Sgshapiro					}
494168515Sgshapiro				}
495168515Sgshapiro			}
496203004Sgshapiro			rebuild_set = false;
497168515Sgshapiro		}
498168515Sgshapiro
499168515Sgshapiro		TASKMGR_UNLOCK();
500168515Sgshapiro
501168515Sgshapiro		/* Everything is ready, let's wait for an event */
502244833Sgshapiro		r = poll(pfd, nfd, POLL_TIMEOUT);
503168515Sgshapiro
504168515Sgshapiro		POOL_LEV_DPRINTF(4, ("%s returned: at epoch %d value %d",
505168515Sgshapiro			WAITFN, now, nfd));
506168515Sgshapiro
507168515Sgshapiro		/* timeout */
508244833Sgshapiro		if (r == 0)
509168515Sgshapiro			continue;
510168515Sgshapiro
511168515Sgshapiro		rebuild_set = true;
512168515Sgshapiro
513168515Sgshapiro		/* error */
514244833Sgshapiro		if (r < 0)
515168515Sgshapiro		{
516168515Sgshapiro			if (errno == EINTR)
517168515Sgshapiro				continue;
518168515Sgshapiro			pcnt++;
519168515Sgshapiro			smi_log(SMI_LOG_ERR,
520168515Sgshapiro				"%s() failed (%s), %s",
521168515Sgshapiro				WAITFN, sm_errstring(errno),
522168515Sgshapiro				pcnt >= MAX_FAILS_S ? "abort" : "try again");
523168515Sgshapiro
524168515Sgshapiro			if (pcnt >= MAX_FAILS_S)
525168515Sgshapiro				goto err;
526244833Sgshapiro			continue;
527168515Sgshapiro		}
528168515Sgshapiro		pcnt = 0;
529168515Sgshapiro
530168515Sgshapiro		/* something happened */
531168515Sgshapiro		for (i = 0; i < nfd; i++)
532168515Sgshapiro		{
533168515Sgshapiro			if (pfd[i].revents == 0)
534168515Sgshapiro				continue;
535168515Sgshapiro
536168515Sgshapiro			POOL_LEV_DPRINTF(4, ("%s event on pfd[%d/%d]=%d ",
537168515Sgshapiro				WAITFN, i, nfd,
538168515Sgshapiro			WAIT_FD(i)));
539168515Sgshapiro
540244833Sgshapiro			/* has a worker signaled an end of task? */
541168515Sgshapiro			if (WAIT_FD(i) == RD_PIPE)
542168515Sgshapiro			{
543223067Sgshapiro				char evts[256];
544223067Sgshapiro				ssize_t r;
545168515Sgshapiro
546168515Sgshapiro				POOL_LEV_DPRINTF(4,
547168515Sgshapiro					("PIPE WILL READ evt = %08X %08X",
548168515Sgshapiro					pfd[i].events, pfd[i].revents));
549168515Sgshapiro
550223067Sgshapiro				r = 1;
551223067Sgshapiro				while ((pfd[i].revents & MI_POLL_RD_FLAGS) != 0
552223067Sgshapiro					&& r != -1)
553168515Sgshapiro				{
554223067Sgshapiro					r = read(RD_PIPE, evts, sizeof(evts));
555168515Sgshapiro				}
556168515Sgshapiro
557168515Sgshapiro				POOL_LEV_DPRINTF(4,
558168515Sgshapiro					("PIPE DONE READ i=[%d] fd=[%d] r=[%d] evt=[%d]",
559223067Sgshapiro					i, RD_PIPE, (int) r, evts[0]));
560168515Sgshapiro
561168515Sgshapiro				if ((pfd[i].revents & ~MI_POLL_RD_FLAGS) != 0)
562168515Sgshapiro				{
563168515Sgshapiro					/* Exception handling */
564168515Sgshapiro				}
565168515Sgshapiro				continue;
566168515Sgshapiro			}
567168515Sgshapiro
568244833Sgshapiro			/*
569244833Sgshapiro			**  Not the pipe for workers waking us,
570244833Sgshapiro			**  so must be something on an MTA connection.
571244833Sgshapiro			*/
572244833Sgshapiro
573244833Sgshapiro			TASKMGR_LOCK();
574168515Sgshapiro			SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)
575168515Sgshapiro			{
576168515Sgshapiro				if (ctx->ctx_wstate != WKST_WAITING)
577168515Sgshapiro					continue;
578168515Sgshapiro
579168515Sgshapiro				POOL_LEV_DPRINTF(4,
580168515Sgshapiro					("Checking context sd=%d - fd=%d ",
581168515Sgshapiro					ctx->ctx_sd , WAIT_FD(i)));
582168515Sgshapiro
583168515Sgshapiro				if (ctx->ctx_sd == pfd[i].fd)
584168515Sgshapiro				{
585168515Sgshapiro
586168515Sgshapiro					POOL_LEV_DPRINTF(4,
587168515Sgshapiro						("TASK: found %d for fd[%d]=%d",
588168515Sgshapiro						ctx->ctx_sid, i, WAIT_FD(i)));
589168515Sgshapiro
590168515Sgshapiro					if (Tskmgr.tm_nb_idle > 0)
591168515Sgshapiro					{
592168515Sgshapiro						ctx->ctx_wstate = WKST_READY_TO_RUN;
593168515Sgshapiro						TASKMGR_COND_SIGNAL();
594168515Sgshapiro					}
595168515Sgshapiro					else
596168515Sgshapiro					{
597168515Sgshapiro						ctx->ctx_wstate = WKST_RUNNING;
598168515Sgshapiro						LAUNCH_WORKER(ctx);
599168515Sgshapiro					}
600168515Sgshapiro					break;
601168515Sgshapiro				}
602168515Sgshapiro			}
603244833Sgshapiro			TASKMGR_UNLOCK();
604168515Sgshapiro
605168515Sgshapiro			POOL_LEV_DPRINTF(4,
606168515Sgshapiro				("TASK %s FOUND - Checking PIPE for fd[%d]",
607168515Sgshapiro				ctx != NULL ? "" : "NOT", WAIT_FD(i)));
608168515Sgshapiro		}
609168515Sgshapiro	}
610168515Sgshapiro
611168515Sgshapiro  err:
612168515Sgshapiro	if (pfd != NULL)
613168515Sgshapiro		free(pfd);
614168515Sgshapiro
615168515Sgshapiro	Tskmgr.tm_signature = 0;
616244833Sgshapiro#if 0
617244833Sgshapiro	/*
618244833Sgshapiro	**  Do not clean up ctx -- it can cause double-free()s.
619244833Sgshapiro	**  The program is shutting down anyway, so it's not worth the trouble.
620244833Sgshapiro	**  There is a more complex solution that prevents race conditions
621244833Sgshapiro	**  while accessing ctx, but that's maybe for a later version.
622244833Sgshapiro	*/
623244833Sgshapiro
624168515Sgshapiro	for (;;)
625168515Sgshapiro	{
626168515Sgshapiro		SMFICTX_PTR ctx;
627168515Sgshapiro
628168515Sgshapiro		ctx = SM_TAILQ_FIRST(&WRK_CTX_HEAD);
629168515Sgshapiro		if (ctx == NULL)
630168515Sgshapiro			break;
631168515Sgshapiro		mi_close_session(ctx);
632168515Sgshapiro	}
633244833Sgshapiro#endif
634168515Sgshapiro
635168515Sgshapiro	(void) smutex_destroy(&Tskmgr.tm_w_mutex);
636168515Sgshapiro	(void) scond_destroy(&Tskmgr.tm_w_cond);
637168515Sgshapiro
638168515Sgshapiro	return NULL;
639168515Sgshapiro}
640168515Sgshapiro
641168515Sgshapiro/*
642168515Sgshapiro**  Look for a task ready to run.
643168515Sgshapiro**  Value of ctx is NULL or a pointer to a task ready to run.
644168515Sgshapiro*/
645168515Sgshapiro
646168515Sgshapiro#define GET_TASK_READY_TO_RUN()					\
647168515Sgshapiro	SM_TAILQ_FOREACH(ctx, &WRK_CTX_HEAD, ctx_link)		\
648168515Sgshapiro	{							\
649168515Sgshapiro		if (ctx->ctx_wstate == WKST_READY_TO_RUN)	\
650168515Sgshapiro		{						\
651168515Sgshapiro			ctx->ctx_wstate = WKST_RUNNING;		\
652168515Sgshapiro			break;					\
653168515Sgshapiro		}						\
654168515Sgshapiro	}
655168515Sgshapiro
656168515Sgshapiro/*
657168515Sgshapiro**  MI_WORKER -- worker thread
658168515Sgshapiro**	executes tasks distributed by the mi_pool_controller
659168515Sgshapiro**	or by mi_start_session
660168515Sgshapiro**
661168515Sgshapiro**	Parameters:
662168515Sgshapiro**		arg -- pointer to context structure
663168515Sgshapiro**
664168515Sgshapiro**	Returns:
665168515Sgshapiro**		NULL pointer
666168515Sgshapiro*/
667168515Sgshapiro
668168515Sgshapirostatic void *
669168515Sgshapiromi_worker(arg)
670168515Sgshapiro	void *arg;
671168515Sgshapiro{
672168515Sgshapiro	SMFICTX_PTR ctx;
673168515Sgshapiro	bool done;
674168515Sgshapiro	sthread_t t_id;
675168515Sgshapiro	int r;
676168515Sgshapiro
677168515Sgshapiro	ctx = (SMFICTX_PTR) arg;
678168515Sgshapiro	done = false;
679168515Sgshapiro	if (ctx != NULL)
680168515Sgshapiro		ctx->ctx_wstate = WKST_RUNNING;
681168515Sgshapiro
682168515Sgshapiro	t_id = sthread_get_id();
683168515Sgshapiro	if (pthread_detach(t_id) != 0)
684168515Sgshapiro	{
685168515Sgshapiro		smi_log(SMI_LOG_ERR, "Failed to detach worker thread");
686168515Sgshapiro		if (ctx != NULL)
687168515Sgshapiro			ctx->ctx_wstate = WKST_READY_TO_RUN;
688168515Sgshapiro		return NULL;
689168515Sgshapiro	}
690168515Sgshapiro
691168515Sgshapiro	TASKMGR_LOCK();
692168515Sgshapiro	Tskmgr.tm_nb_workers++;
693168515Sgshapiro	TASKMGR_UNLOCK();
694168515Sgshapiro
695168515Sgshapiro	while (!done)
696168515Sgshapiro	{
697168515Sgshapiro		if (mi_stop() != MILTER_CONT)
698168515Sgshapiro			break;
699168515Sgshapiro
700168515Sgshapiro		/* let's handle next task... */
701168515Sgshapiro		if (ctx != NULL)
702168515Sgshapiro		{
703168515Sgshapiro			int res;
704168515Sgshapiro
705168515Sgshapiro			POOL_LEV_DPRINTF(4,
706168515Sgshapiro				("worker %d: new task -> let's handle it",
707168515Sgshapiro				t_id));
708168515Sgshapiro			res = mi_engine(ctx);
709168515Sgshapiro			POOL_LEV_DPRINTF(4,
710168515Sgshapiro				("worker %d: mi_engine returned %d", t_id, res));
711168515Sgshapiro
712168515Sgshapiro			TASKMGR_LOCK();
713168515Sgshapiro			if (res != MI_CONTINUE)
714168515Sgshapiro			{
715168515Sgshapiro				ctx->ctx_wstate = WKST_CLOSING;
716168515Sgshapiro
717168515Sgshapiro				/*
718168515Sgshapiro				**  Delete context from linked list of
719168515Sgshapiro				**  sessions and close session.
720168515Sgshapiro				*/
721168515Sgshapiro
722168515Sgshapiro				mi_close_session(ctx);
723168515Sgshapiro			}
724168515Sgshapiro			else
725168515Sgshapiro			{
726168515Sgshapiro				ctx->ctx_wstate = WKST_READY_TO_WAIT;
727168515Sgshapiro
728168515Sgshapiro				POOL_LEV_DPRINTF(4,
729168515Sgshapiro					("writing to event pipe..."));
730168515Sgshapiro
731168515Sgshapiro				/*
732168515Sgshapiro				**  Signal task controller to add new session
733168515Sgshapiro				**  to poll set.
734168515Sgshapiro				*/
735168515Sgshapiro
736168515Sgshapiro				PIPE_SEND_SIGNAL();
737168515Sgshapiro			}
738168515Sgshapiro			TASKMGR_UNLOCK();
739168515Sgshapiro			ctx = NULL;
740168515Sgshapiro
741168515Sgshapiro		}
742168515Sgshapiro
743168515Sgshapiro		/* check if there is any task waiting to be served */
744168515Sgshapiro		TASKMGR_LOCK();
745168515Sgshapiro
746168515Sgshapiro		GET_TASK_READY_TO_RUN();
747168515Sgshapiro
748168515Sgshapiro		/* Got a task? */
749168515Sgshapiro		if (ctx != NULL)
750168515Sgshapiro		{
751168515Sgshapiro			TASKMGR_UNLOCK();
752168515Sgshapiro			continue;
753168515Sgshapiro		}
754168515Sgshapiro
755168515Sgshapiro		/*
756168515Sgshapiro		**  if not, let's check if there is enough idle workers
757168515Sgshapiro		**	if yes: quit
758168515Sgshapiro		*/
759168515Sgshapiro
760168515Sgshapiro		if (Tskmgr.tm_nb_workers > MIN_WORKERS &&
761168515Sgshapiro		    Tskmgr.tm_nb_idle > MIN_IDLE)
762168515Sgshapiro			done = true;
763168515Sgshapiro
764168515Sgshapiro		POOL_LEV_DPRINTF(4, ("worker %d: checking ... %d %d", t_id,
765168515Sgshapiro			Tskmgr.tm_nb_workers, Tskmgr.tm_nb_idle + 1));
766168515Sgshapiro
767168515Sgshapiro		if (done)
768168515Sgshapiro		{
769168515Sgshapiro			POOL_LEV_DPRINTF(4, ("worker %d: quitting... ", t_id));
770168515Sgshapiro			Tskmgr.tm_nb_workers--;
771168515Sgshapiro			TASKMGR_UNLOCK();
772168515Sgshapiro			continue;
773168515Sgshapiro		}
774168515Sgshapiro
775168515Sgshapiro		/*
776168515Sgshapiro		**  if no task ready to run, wait for another one
777168515Sgshapiro		*/
778168515Sgshapiro
779168515Sgshapiro		Tskmgr.tm_nb_idle++;
780168515Sgshapiro		TASKMGR_COND_WAIT();
781168515Sgshapiro		Tskmgr.tm_nb_idle--;
782168515Sgshapiro
783168515Sgshapiro		/* look for a task */
784168515Sgshapiro		GET_TASK_READY_TO_RUN();
785168515Sgshapiro
786168515Sgshapiro		TASKMGR_UNLOCK();
787168515Sgshapiro	}
788168515Sgshapiro	return NULL;
789168515Sgshapiro}
790168515Sgshapiro
791168515Sgshapiro/*
792168515Sgshapiro**  MI_LIST_ADD_CTX -- add new session to linked list
793168515Sgshapiro**
794168515Sgshapiro**	Parameters:
795168515Sgshapiro**		ctx -- context structure
796168515Sgshapiro**
797168515Sgshapiro**	Returns:
798168515Sgshapiro**		MI_FAILURE/MI_SUCCESS
799168515Sgshapiro*/
800168515Sgshapiro
801168515Sgshapirostatic int
802168515Sgshapiromi_list_add_ctx(ctx)
803168515Sgshapiro	SMFICTX_PTR ctx;
804168515Sgshapiro{
805168515Sgshapiro	SM_ASSERT(ctx != NULL);
806168515Sgshapiro	SM_TAILQ_INSERT_TAIL(&WRK_CTX_HEAD, ctx, ctx_link);
807168515Sgshapiro	return MI_SUCCESS;
808168515Sgshapiro}
809168515Sgshapiro
810168515Sgshapiro/*
811168515Sgshapiro**  MI_LIST_DEL_CTX -- remove session from linked list when finished
812168515Sgshapiro**
813168515Sgshapiro**	Parameters:
814168515Sgshapiro**		ctx -- context structure
815168515Sgshapiro**
816168515Sgshapiro**	Returns:
817168515Sgshapiro**		MI_FAILURE/MI_SUCCESS
818168515Sgshapiro*/
819168515Sgshapiro
820168515Sgshapirostatic int
821168515Sgshapiromi_list_del_ctx(ctx)
822168515Sgshapiro	SMFICTX_PTR ctx;
823168515Sgshapiro{
824168515Sgshapiro	SM_ASSERT(ctx != NULL);
825168515Sgshapiro	if (SM_TAILQ_EMPTY(&WRK_CTX_HEAD))
826168515Sgshapiro		return MI_FAILURE;
827168515Sgshapiro
828168515Sgshapiro	SM_TAILQ_REMOVE(&WRK_CTX_HEAD, ctx, ctx_link);
829168515Sgshapiro	return MI_SUCCESS;
830168515Sgshapiro}
831168515Sgshapiro#endif /* _FFR_WORKERS_POOL */
832