1/*	$NetBSD: tpool.c,v 1.2 2021/08/14 16:14:56 christos Exp $	*/
2
3/* $OpenLDAP$ */
4/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
5 *
6 * Copyright 1998-2021 The OpenLDAP Foundation.
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted only as authorized by the OpenLDAP
11 * Public License.
12 *
13 * A copy of this license is available in file LICENSE in the
14 * top-level directory of the distribution or, alternatively, at
15 * <http://www.OpenLDAP.org/license.html>.
16 */
17
18#include <sys/cdefs.h>
19__RCSID("$NetBSD: tpool.c,v 1.2 2021/08/14 16:14:56 christos Exp $");
20
21#include "portable.h"
22
23#include <stdio.h>
24
25#include <ac/signal.h>
26#include <ac/stdarg.h>
27#include <ac/stdlib.h>
28#include <ac/string.h>
29#include <ac/time.h>
30#include <ac/errno.h>
31
32#include "ldap-int.h"
33
34#ifdef LDAP_R_COMPILE
35
36#include "ldap_pvt_thread.h" /* Get the thread interface */
37#include "ldap_queue.h"
38#define LDAP_THREAD_POOL_IMPLEMENTATION
39#include "ldap_thr_debug.h"  /* May rename symbols defined below */
40
41#ifndef LDAP_THREAD_HAVE_TPOOL
42
43#ifndef CACHELINE
44#define CACHELINE	64
45#endif
46
47/* Thread-specific key with data and optional free function */
48typedef struct ldap_int_tpool_key_s {
49	void *ltk_key;
50	void *ltk_data;
51	ldap_pvt_thread_pool_keyfree_t *ltk_free;
52} ldap_int_tpool_key_t;
53
54/* Max number of thread-specific keys we store per thread.
55 * We don't expect to use many...
56 */
57#define	MAXKEYS	32
58
59/* Max number of threads */
60#define	LDAP_MAXTHR	1024	/* must be a power of 2 */
61
62/* (Theoretical) max number of pending requests */
63#define MAX_PENDING (INT_MAX/2)	/* INT_MAX - (room to avoid overflow) */
64
65/* pool->ltp_pause values */
66enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
67
68/* Context: thread ID and thread-specific key/data pairs */
69typedef struct ldap_int_thread_userctx_s {
70	struct ldap_int_thread_poolq_s *ltu_pq;
71	ldap_pvt_thread_t ltu_id;
72	ldap_int_tpool_key_t ltu_key[MAXKEYS];
73} ldap_int_thread_userctx_t;
74
75
76/* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
77 * Protected by ldap_pvt_thread_pool_mutex.
78 */
79static struct {
80	ldap_int_thread_userctx_t *ctx;
81	/* ctx is valid when not NULL or DELETED_THREAD_CTX */
82#	define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
83} thread_keys[LDAP_MAXTHR];
84
85#define	TID_HASH(tid, hash) do { \
86	unsigned const char *ptr_ = (unsigned const char *)&(tid); \
87	unsigned i_; \
88	for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
89		(hash) += ((hash) << 5) ^ ptr_[i_]; \
90} while(0)
91
92
93/* Task for a thread to perform */
94typedef struct ldap_int_thread_task_s {
95	union {
96		LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
97		LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
98	} ltt_next;
99	ldap_pvt_thread_start_t *ltt_start_routine;
100	void *ltt_arg;
101	struct ldap_int_thread_poolq_s *ltt_queue;
102} ldap_int_thread_task_t;
103
104typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
105
106struct ldap_int_thread_poolq_s {
107	void *ltp_free;
108
109	struct ldap_int_thread_pool_s *ltp_pool;
110
111	/* protect members below */
112	ldap_pvt_thread_mutex_t ltp_mutex;
113
114	/* not paused and something to do for pool_<wrapper/pause/destroy>()
115	 * Used for normal pool operation, to synch between submitter and
116	 * worker threads. Not used for pauses. In normal operation multiple
117	 * queues can rendezvous without acquiring the main pool lock.
118	 */
119	ldap_pvt_thread_cond_t ltp_cond;
120
121	/* ltp_pause == 0 ? &ltp_pending_list : &empty_pending_list,
122	 * maintained to reduce work for pool_wrapper()
123	 */
124	ldap_int_tpool_plist_t *ltp_work_list;
125
126	/* pending tasks, and unused task objects */
127	ldap_int_tpool_plist_t ltp_pending_list;
128	LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
129
130	/* Max number of threads in this queue */
131	int ltp_max_count;
132
133	/* Max pending + paused + idle tasks, negated when ltp_finishing */
134	int ltp_max_pending;
135
136	int ltp_pending_count;		/* Pending + paused + idle tasks */
137	int ltp_active_count;		/* Active, not paused/idle tasks */
138	int ltp_open_count;			/* Number of threads */
139	int ltp_starting;			/* Currently starting threads */
140};
141
142struct ldap_int_thread_pool_s {
143	LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
144
145	struct ldap_int_thread_poolq_s **ltp_wqs;
146
147	/* number of poolqs */
148	int ltp_numqs;
149
150	/* protect members below */
151	ldap_pvt_thread_mutex_t ltp_mutex;
152
153	/* paused and waiting for resume
154	 * When a pause is in effect all workers switch to waiting on
155	 * this cond instead of their per-queue cond.
156	 */
157	ldap_pvt_thread_cond_t ltp_cond;
158
159	/* ltp_active_queues < 1 && ltp_pause */
160	ldap_pvt_thread_cond_t ltp_pcond;
161
162	/* number of active queues */
163	int ltp_active_queues;
164
165	/* The pool is finishing, waiting for its threads to close.
166	 * They close when ltp_pending_list is done.  pool_submit()
167	 * rejects new tasks.  ltp_max_pending = -(its old value).
168	 */
169	int ltp_finishing;
170
171	/* Some active task needs to be the sole active task.
172	 * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
173	 */
174	volatile sig_atomic_t ltp_pause;
175
176	/* Max number of threads in pool */
177	int ltp_max_count;
178
179	/* Configured max number of threads in pool, 0 for default (LDAP_MAXTHR) */
180	int ltp_conf_max_count;
181
182	/* Max pending + paused + idle tasks, negated when ltp_finishing */
183	int ltp_max_pending;
184};
185
186static ldap_int_tpool_plist_t empty_pending_list =
187	LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
188
189static int ldap_int_has_thread_pool = 0;
190static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
191	ldap_int_thread_pool_list =
192	LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
193
194static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
195
196static void *ldap_int_thread_pool_wrapper( void *pool );
197
198static ldap_pvt_thread_key_t	ldap_tpool_key;
199
200/* Context of the main thread */
201static ldap_int_thread_userctx_t ldap_int_main_thrctx;
202
203int
204ldap_int_thread_pool_startup ( void )
205{
206	ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
207	ldap_pvt_thread_key_create( &ldap_tpool_key );
208	return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
209}
210
211int
212ldap_int_thread_pool_shutdown ( void )
213{
214	struct ldap_int_thread_pool_s *pool;
215
216	while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
217		(ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
218	}
219	ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
220	ldap_pvt_thread_key_destroy( ldap_tpool_key );
221	return(0);
222}
223
224
225/* Create a thread pool */
226int
227ldap_pvt_thread_pool_init_q (
228	ldap_pvt_thread_pool_t *tpool,
229	int max_threads,
230	int max_pending,
231	int numqs )
232{
233	ldap_pvt_thread_pool_t pool;
234	struct ldap_int_thread_poolq_s *pq;
235	int i, rc, rem_thr, rem_pend;
236
237	/* multiple pools are currently not supported (ITS#4943) */
238	assert(!ldap_int_has_thread_pool);
239
240	if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
241		max_threads = 0;
242	if (! (1 <= max_pending && max_pending <= MAX_PENDING))
243		max_pending = MAX_PENDING;
244
245	*tpool = NULL;
246	pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
247		sizeof(struct ldap_int_thread_pool_s));
248
249	if (pool == NULL) return(-1);
250
251	pool->ltp_wqs = LDAP_MALLOC(numqs * sizeof(struct ldap_int_thread_poolq_s *));
252	if (pool->ltp_wqs == NULL) {
253		LDAP_FREE(pool);
254		return(-1);
255	}
256
257	for (i=0; i<numqs; i++) {
258		char *ptr = LDAP_CALLOC(1, sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
259		if (ptr == NULL) {
260			for (--i; i>=0; i--)
261				LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
262			LDAP_FREE(pool->ltp_wqs);
263			LDAP_FREE(pool);
264			return(-1);
265		}
266		pool->ltp_wqs[i] = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
267		pool->ltp_wqs[i]->ltp_free = ptr;
268	}
269
270	pool->ltp_numqs = numqs;
271	pool->ltp_conf_max_count = max_threads;
272	if ( !max_threads )
273		max_threads = LDAP_MAXTHR;
274
275	rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
276	if (rc != 0) {
277fail:
278		for (i=0; i<numqs; i++)
279			LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
280		LDAP_FREE(pool->ltp_wqs);
281		LDAP_FREE(pool);
282		return(rc);
283	}
284
285	rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
286	if (rc != 0)
287		goto fail;
288
289	rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
290	if (rc != 0)
291		goto fail;
292
293	rem_thr = max_threads % numqs;
294	rem_pend = max_pending % numqs;
295	for ( i=0; i<numqs; i++ ) {
296		pq = pool->ltp_wqs[i];
297		pq->ltp_pool = pool;
298		rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
299		if (rc != 0)
300			return(rc);
301		rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
302		if (rc != 0)
303			return(rc);
304		LDAP_STAILQ_INIT(&pq->ltp_pending_list);
305		pq->ltp_work_list = &pq->ltp_pending_list;
306		LDAP_SLIST_INIT(&pq->ltp_free_list);
307
308		pq->ltp_max_count = max_threads / numqs;
309		if ( rem_thr ) {
310			pq->ltp_max_count++;
311			rem_thr--;
312		}
313		pq->ltp_max_pending = max_pending / numqs;
314		if ( rem_pend ) {
315			pq->ltp_max_pending++;
316			rem_pend--;
317		}
318	}
319
320	ldap_int_has_thread_pool = 1;
321
322	pool->ltp_max_count = max_threads;
323	pool->ltp_max_pending = max_pending;
324
325	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
326	LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
327	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
328
329	/* Start no threads just yet.  That can break if the process forks
330	 * later, as slapd does in order to daemonize.  On at least POSIX,
331	 * only the forking thread would survive in the child.  Yet fork()
332	 * can't unlock/clean up other threads' locks and data structures,
333	 * unless pthread_atfork() handlers have been set up to do so.
334	 */
335
336	*tpool = pool;
337	return(0);
338}
339
340int
341ldap_pvt_thread_pool_init (
342	ldap_pvt_thread_pool_t *tpool,
343	int max_threads,
344	int max_pending )
345{
346	return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
347}
348
349/* Submit a task to be performed by the thread pool */
350int
351ldap_pvt_thread_pool_submit (
352	ldap_pvt_thread_pool_t *tpool,
353	ldap_pvt_thread_start_t *start_routine, void *arg )
354{
355	return ldap_pvt_thread_pool_submit2( tpool, start_routine, arg, NULL );
356}
357
358/* Submit a task to be performed by the thread pool */
359int
360ldap_pvt_thread_pool_submit2 (
361	ldap_pvt_thread_pool_t *tpool,
362	ldap_pvt_thread_start_t *start_routine, void *arg,
363	void **cookie )
364{
365	struct ldap_int_thread_pool_s *pool;
366	struct ldap_int_thread_poolq_s *pq;
367	ldap_int_thread_task_t *task;
368	ldap_pvt_thread_t thr;
369	int i, j;
370
371	if (tpool == NULL)
372		return(-1);
373
374	pool = *tpool;
375
376	if (pool == NULL)
377		return(-1);
378
379	if ( pool->ltp_numqs > 1 ) {
380		int min = pool->ltp_wqs[0]->ltp_max_pending + pool->ltp_wqs[0]->ltp_max_count;
381		int min_x = 0, cnt;
382		for ( i = 0; i < pool->ltp_numqs; i++ ) {
383			/* take first queue that has nothing active */
384			if ( !pool->ltp_wqs[i]->ltp_active_count ) {
385				min_x = i;
386				break;
387			}
388			cnt = pool->ltp_wqs[i]->ltp_active_count + pool->ltp_wqs[i]->ltp_pending_count;
389			if ( cnt < min ) {
390				min = cnt;
391				min_x = i;
392			}
393		}
394		i = min_x;
395	} else
396		i = 0;
397
398	j = i;
399	while(1) {
400		ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i]->ltp_mutex);
401		if (pool->ltp_wqs[i]->ltp_pending_count < pool->ltp_wqs[i]->ltp_max_pending) {
402			break;
403		}
404		ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i]->ltp_mutex);
405		i++;
406		i %= pool->ltp_numqs;
407		if ( i == j )
408			return -1;
409	}
410
411	pq = pool->ltp_wqs[i];
412	task = LDAP_SLIST_FIRST(&pq->ltp_free_list);
413	if (task) {
414		LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
415	} else {
416		task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
417		if (task == NULL)
418			goto failed;
419	}
420
421	task->ltt_start_routine = start_routine;
422	task->ltt_arg = arg;
423	task->ltt_queue = pq;
424	if ( cookie )
425		*cookie = task;
426
427	pq->ltp_pending_count++;
428	LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
429
430	if (pool->ltp_pause)
431		goto done;
432
433	/* should we open (create) a thread? */
434	if (pq->ltp_open_count < pq->ltp_active_count+pq->ltp_pending_count &&
435		pq->ltp_open_count < pq->ltp_max_count)
436	{
437		pq->ltp_starting++;
438		pq->ltp_open_count++;
439
440		if (0 != ldap_pvt_thread_create(
441			&thr, 1, ldap_int_thread_pool_wrapper, pq))
442		{
443			/* couldn't create thread.  back out of
444			 * ltp_open_count and check for even worse things.
445			 */
446			pq->ltp_starting--;
447			pq->ltp_open_count--;
448
449			if (pq->ltp_open_count == 0) {
450				/* no open threads at all?!?
451				 */
452				ldap_int_thread_task_t *ptr;
453
454				/* let pool_close know there are no more threads */
455				ldap_pvt_thread_cond_signal(&pq->ltp_cond);
456
457				LDAP_STAILQ_FOREACH(ptr, &pq->ltp_pending_list, ltt_next.q)
458					if (ptr == task) break;
459				if (ptr == task) {
460					/* no open threads, task not handled, so
461					 * back out of ltp_pending_count, free the task,
462					 * report the error.
463					 */
464					pq->ltp_pending_count--;
465					LDAP_STAILQ_REMOVE(&pq->ltp_pending_list, task,
466						ldap_int_thread_task_s, ltt_next.q);
467					LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task,
468						ltt_next.l);
469					goto failed;
470				}
471			}
472			/* there is another open thread, so this
473			 * task will be handled eventually.
474			 */
475		}
476	}
477	ldap_pvt_thread_cond_signal(&pq->ltp_cond);
478
479 done:
480	ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
481	return(0);
482
483 failed:
484	ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
485	return(-1);
486}
487
488static void *
489no_task( void *ctx, void *arg )
490{
491	return NULL;
492}
493
494/* Cancel a pending task that was previously submitted.
495 * Return 1 if the task was successfully cancelled, 0 if
496 * not found, -1 for invalid parameters
497 */
498int
499ldap_pvt_thread_pool_retract (
500	void *cookie )
501{
502	ldap_int_thread_task_t *task, *ttmp;
503	struct ldap_int_thread_poolq_s *pq;
504
505	if (cookie == NULL)
506		return(-1);
507
508	ttmp = cookie;
509	pq = ttmp->ltt_queue;
510	if (pq == NULL)
511		return(-1);
512
513	ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
514	LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
515		if (task == ttmp) {
516			/* Could LDAP_STAILQ_REMOVE the task, but that
517			 * walks ltp_pending_list again to find it.
518			 */
519			task->ltt_start_routine = no_task;
520			task->ltt_arg = NULL;
521			break;
522		}
523	ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
524	return task != NULL;
525}
526
527/* Walk the pool and allow tasks to be retracted, only to be called while the
528 * pool is paused */
529int
530ldap_pvt_thread_pool_walk(
531	ldap_pvt_thread_pool_t *tpool,
532	ldap_pvt_thread_start_t *start,
533	ldap_pvt_thread_walk_t *cb, void *arg )
534{
535	struct ldap_int_thread_pool_s *pool;
536	struct ldap_int_thread_poolq_s *pq;
537	ldap_int_thread_task_t *task;
538	int i;
539
540	if (tpool == NULL)
541		return(-1);
542
543	pool = *tpool;
544
545	if (pool == NULL)
546		return(-1);
547
548	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
549	assert(pool->ltp_pause == PAUSED);
550	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
551
552	for (i=0; i<pool->ltp_numqs; i++) {
553		pq = pool->ltp_wqs[i];
554		LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q) {
555			if ( task->ltt_start_routine == start ) {
556				if ( cb( task->ltt_start_routine, task->ltt_arg, arg ) ) {
557					/* retract */
558					task->ltt_start_routine = no_task;
559					task->ltt_arg = NULL;
560				}
561			}
562		}
563	}
564	return 0;
565}
566
567/* Set number of work queues in this pool. Should not be
568 * more than the number of CPUs. */
569int
570ldap_pvt_thread_pool_queues(
571	ldap_pvt_thread_pool_t *tpool,
572	int numqs )
573{
574	struct ldap_int_thread_pool_s *pool;
575	struct ldap_int_thread_poolq_s *pq;
576	int i, rc, rem_thr, rem_pend;
577
578	if (numqs < 1 || tpool == NULL)
579		return(-1);
580
581	pool = *tpool;
582
583	if (pool == NULL)
584		return(-1);
585
586	if (numqs < pool->ltp_numqs) {
587		for (i=numqs; i<pool->ltp_numqs; i++)
588			pool->ltp_wqs[i]->ltp_max_count = 0;
589	} else if (numqs > pool->ltp_numqs) {
590		struct ldap_int_thread_poolq_s **wqs;
591		wqs = LDAP_REALLOC(pool->ltp_wqs, numqs * sizeof(struct ldap_int_thread_poolq_s *));
592		if (wqs == NULL)
593			return(-1);
594		pool->ltp_wqs = wqs;
595		for (i=pool->ltp_numqs; i<numqs; i++) {
596			char *ptr = LDAP_CALLOC(1, sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
597			if (ptr == NULL) {
598				for (; i<numqs; i++)
599					pool->ltp_wqs[i] = NULL;
600				return(-1);
601			}
602			pq = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
603			pq->ltp_free = ptr;
604			pool->ltp_wqs[i] = pq;
605			pq->ltp_pool = pool;
606			rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
607			if (rc != 0)
608				return(rc);
609			rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
610			if (rc != 0)
611				return(rc);
612			LDAP_STAILQ_INIT(&pq->ltp_pending_list);
613			pq->ltp_work_list = &pq->ltp_pending_list;
614			LDAP_SLIST_INIT(&pq->ltp_free_list);
615		}
616	}
617	rem_thr = pool->ltp_max_count % numqs;
618	rem_pend = pool->ltp_max_pending % numqs;
619	for ( i=0; i<numqs; i++ ) {
620		pq = pool->ltp_wqs[i];
621		pq->ltp_max_count = pool->ltp_max_count / numqs;
622		if ( rem_thr ) {
623			pq->ltp_max_count++;
624			rem_thr--;
625		}
626		pq->ltp_max_pending = pool->ltp_max_pending / numqs;
627		if ( rem_pend ) {
628			pq->ltp_max_pending++;
629			rem_pend--;
630		}
631	}
632	pool->ltp_numqs = numqs;
633	return 0;
634}
635
636/* Set max #threads.  value <= 0 means max supported #threads (LDAP_MAXTHR) */
637int
638ldap_pvt_thread_pool_maxthreads(
639	ldap_pvt_thread_pool_t *tpool,
640	int max_threads )
641{
642	struct ldap_int_thread_pool_s *pool;
643	struct ldap_int_thread_poolq_s *pq;
644	int remthr, i;
645
646	if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
647		max_threads = 0;
648
649	if (tpool == NULL)
650		return(-1);
651
652	pool = *tpool;
653
654	if (pool == NULL)
655		return(-1);
656
657	pool->ltp_conf_max_count = max_threads;
658	if ( !max_threads )
659		max_threads = LDAP_MAXTHR;
660	pool->ltp_max_count = max_threads;
661
662	remthr = max_threads % pool->ltp_numqs;
663	max_threads /= pool->ltp_numqs;
664
665	for (i=0; i<pool->ltp_numqs; i++) {
666		pq = pool->ltp_wqs[i];
667		pq->ltp_max_count = max_threads;
668		if (remthr) {
669			pq->ltp_max_count++;
670			remthr--;
671		}
672	}
673	return(0);
674}
675
676/* Inspect the pool */
677int
678ldap_pvt_thread_pool_query(
679	ldap_pvt_thread_pool_t *tpool,
680	ldap_pvt_thread_pool_param_t param,
681	void *value )
682{
683	struct ldap_int_thread_pool_s	*pool;
684	int				count = -1;
685
686	if ( tpool == NULL || value == NULL ) {
687		return -1;
688	}
689
690	pool = *tpool;
691
692	if ( pool == NULL ) {
693		return 0;
694	}
695
696	switch ( param ) {
697	case LDAP_PVT_THREAD_POOL_PARAM_MAX:
698		count = pool->ltp_conf_max_count;
699		break;
700
701	case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
702		count = pool->ltp_max_pending;
703		if (count < 0)
704			count = -count;
705		if (count == MAX_PENDING)
706			count = 0;
707		break;
708
709	case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
710		ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
711		count = (pool->ltp_pause != 0);
712		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
713		break;
714
715	case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
716	case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
717	case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
718	case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
719	case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
720		{
721			int i;
722			count = 0;
723			for (i=0; i<pool->ltp_numqs; i++) {
724				struct ldap_int_thread_poolq_s *pq = pool->ltp_wqs[i];
725				ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
726				switch(param) {
727					case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
728						count += pq->ltp_open_count;
729						break;
730					case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
731						count += pq->ltp_starting;
732						break;
733					case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
734						count += pq->ltp_active_count;
735						break;
736					case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
737						count += pq->ltp_pending_count;
738						break;
739					case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
740						count += pq->ltp_pending_count + pq->ltp_active_count;
741						break;
742					default:
743						break;
744				}
745				ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
746			}
747			if (count < 0)
748				count = -count;
749		}
750		break;
751
752	case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
753		break;
754
755	case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
756		break;
757
758	case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
759		break;
760
761	case LDAP_PVT_THREAD_POOL_PARAM_STATE:
762		if (pool->ltp_pause)
763			*((char **)value) = "pausing";
764		else if (!pool->ltp_finishing)
765			*((char **)value) = "running";
766		else {
767			int i;
768			for (i=0; i<pool->ltp_numqs; i++)
769				if (pool->ltp_wqs[i]->ltp_pending_count) break;
770			if (i<pool->ltp_numqs)
771				*((char **)value) = "finishing";
772			else
773				*((char **)value) = "stopping";
774		}
775		break;
776
777	case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
778		break;
779	}
780
781	if ( count > -1 ) {
782		*((int *)value) = count;
783	}
784
785	return ( count == -1 ? -1 : 0 );
786}
787
788/*
789 * true if pool is pausing; does not lock any mutex to check.
790 * 0 if not pause, 1 if pause, -1 if error or no pool.
791 */
792int
793ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
794{
795	int rc = -1;
796	struct ldap_int_thread_pool_s *pool;
797
798	if ( tpool != NULL && (pool = *tpool) != NULL ) {
799		rc = (pool->ltp_pause != 0);
800	}
801
802	return rc;
803}
804
805/*
806 * wrapper for ldap_pvt_thread_pool_query(), left around
807 * for backwards compatibility
808 */
809int
810ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
811{
812	int	rc, count;
813
814	rc = ldap_pvt_thread_pool_query( tpool,
815		LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
816
817	if ( rc == 0 ) {
818		return count;
819	}
820
821	return rc;
822}
823
824
825/*
826 * wrapper for ldap_pvt_thread_pool_close+free(), left around
827 * for backwards compatibility
828 */
829int
830ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
831{
832	int rc;
833
834	if ( (rc = ldap_pvt_thread_pool_close( tpool, run_pending )) ) {
835		return rc;
836	}
837
838	return ldap_pvt_thread_pool_free( tpool );
839}
840
841/* Shut down the pool making its threads finish */
842int
843ldap_pvt_thread_pool_close ( ldap_pvt_thread_pool_t *tpool, int run_pending )
844{
845	struct ldap_int_thread_pool_s *pool, *pptr;
846	struct ldap_int_thread_poolq_s *pq;
847	ldap_int_thread_task_t *task;
848	int i;
849
850	if (tpool == NULL)
851		return(-1);
852
853	pool = *tpool;
854
855	if (pool == NULL) return(-1);
856
857	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
858	LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
859		if (pptr == pool) break;
860	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
861
862	if (pool != pptr) return(-1);
863
864	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
865
866	pool->ltp_finishing = 1;
867	if (pool->ltp_max_pending > 0)
868		pool->ltp_max_pending = -pool->ltp_max_pending;
869
870	ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
871	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
872
873	for (i=0; i<pool->ltp_numqs; i++) {
874		pq = pool->ltp_wqs[i];
875		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
876		if (pq->ltp_max_pending > 0)
877			pq->ltp_max_pending = -pq->ltp_max_pending;
878		if (!run_pending) {
879			while ((task = LDAP_STAILQ_FIRST(&pq->ltp_pending_list)) != NULL) {
880				LDAP_STAILQ_REMOVE_HEAD(&pq->ltp_pending_list, ltt_next.q);
881				LDAP_FREE(task);
882			}
883			pq->ltp_pending_count = 0;
884		}
885
886		while (pq->ltp_open_count) {
887			ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
888			ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
889		}
890
891		while ((task = LDAP_SLIST_FIRST(&pq->ltp_free_list)) != NULL)
892		{
893			LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
894			LDAP_FREE(task);
895		}
896		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
897	}
898
899	return(0);
900}
901
902/* Destroy the pool, everything must have already shut down */
903int
904ldap_pvt_thread_pool_free ( ldap_pvt_thread_pool_t *tpool )
905{
906	struct ldap_int_thread_pool_s *pool, *pptr;
907	struct ldap_int_thread_poolq_s *pq;
908	int i;
909
910	if (tpool == NULL)
911		return(-1);
912
913	pool = *tpool;
914
915	if (pool == NULL) return(-1);
916
917	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
918	LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
919		if (pptr == pool) break;
920	if (pptr == pool)
921		LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
922			ldap_int_thread_pool_s, ltp_next);
923	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
924
925	if (pool != pptr) return(-1);
926
927	ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
928	ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
929	ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
930	for (i=0; i<pool->ltp_numqs; i++) {
931		pq = pool->ltp_wqs[i];
932
933		assert( !pq->ltp_open_count );
934		assert( LDAP_SLIST_EMPTY(&pq->ltp_free_list) );
935		ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
936		ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
937		if (pq->ltp_free) {
938			LDAP_FREE(pq->ltp_free);
939		}
940	}
941	LDAP_FREE(pool->ltp_wqs);
942	LDAP_FREE(pool);
943	*tpool = NULL;
944	ldap_int_has_thread_pool = 0;
945	return(0);
946}
947
948/* Thread loop.  Accept and handle submitted tasks. */
949static void *
950ldap_int_thread_pool_wrapper (
951	void *xpool )
952{
953	struct ldap_int_thread_poolq_s *pq = xpool;
954	struct ldap_int_thread_pool_s *pool = pq->ltp_pool;
955	ldap_int_thread_task_t *task;
956	ldap_int_tpool_plist_t *work_list;
957	ldap_int_thread_userctx_t ctx, *kctx;
958	unsigned i, keyslot, hash;
959	int pool_lock = 0, freeme = 0;
960
961	assert(pool != NULL);
962
963	for ( i=0; i<MAXKEYS; i++ ) {
964		ctx.ltu_key[i].ltk_key = NULL;
965	}
966
967	ctx.ltu_pq = pq;
968	ctx.ltu_id = ldap_pvt_thread_self();
969	TID_HASH(ctx.ltu_id, hash);
970
971	ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
972
973	if (pool->ltp_pause) {
974		ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
975		/* thread_keys[] is read-only when paused */
976		while (pool->ltp_pause)
977			ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
978		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
979	}
980
981	/* find a key slot to give this thread ID and store a
982	 * pointer to our keys there; start at the thread ID
983	 * itself (mod LDAP_MAXTHR) and look for an empty slot.
984	 */
985	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
986	for (keyslot = hash & (LDAP_MAXTHR-1);
987		(kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
988		keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
989	thread_keys[keyslot].ctx = &ctx;
990	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
991
992	ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
993	pq->ltp_starting--;
994	pq->ltp_active_count++;
995
996	for (;;) {
997		work_list = pq->ltp_work_list; /* help the compiler a bit */
998		task = LDAP_STAILQ_FIRST(work_list);
999		if (task == NULL) {	/* paused or no pending tasks */
1000			if (--(pq->ltp_active_count) < 1) {
1001				if (pool->ltp_pause) {
1002					ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1003					ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1004					pool_lock = 1;
1005					if (--(pool->ltp_active_queues) < 1) {
1006						/* Notify pool_pause it is the sole active thread. */
1007						ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
1008					}
1009				}
1010			}
1011
1012			do {
1013				if (pool->ltp_finishing || pq->ltp_open_count > pq->ltp_max_count) {
1014					/* Not paused, and either finishing or too many
1015					 * threads running (can happen if ltp_max_count
1016					 * was reduced).  Let this thread die.
1017					 */
1018					goto done;
1019				}
1020
1021				/* We could check an idle timer here, and let the
1022				 * thread die if it has been inactive for a while.
1023				 * Only die if there are other open threads (i.e.,
1024				 * always have at least one thread open).
1025				 * The check should be like this:
1026				 *   if (pool->ltp_open_count>1 && pool->ltp_starting==0)
1027				 *       check timer, wait if ltp_pause, leave thread;
1028				 *
1029				 * Just use pthread_cond_timedwait() if we want to
1030				 * check idle time.
1031				 */
1032				if (pool_lock) {
1033					ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1034					if (!pool->ltp_pause) {
1035						ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1036						ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1037						pool_lock = 0;
1038					}
1039				} else
1040					ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
1041
1042				work_list = pq->ltp_work_list;
1043				task = LDAP_STAILQ_FIRST(work_list);
1044			} while (task == NULL);
1045
1046			if (pool_lock) {
1047				ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1048				ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1049				pool_lock = 0;
1050			}
1051			pq->ltp_active_count++;
1052		}
1053
1054		LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
1055		pq->ltp_pending_count--;
1056		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1057
1058		task->ltt_start_routine(&ctx, task->ltt_arg);
1059
1060		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1061		LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task, ltt_next.l);
1062	}
1063 done:
1064
1065	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
1066
1067	/* The pool_mutex lock protects ctx->ltu_key from pool_purgekey()
1068	 * during this call, since it prevents new pauses. */
1069	ldap_pvt_thread_pool_context_reset(&ctx);
1070
1071	thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
1072	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
1073
1074	pq->ltp_open_count--;
1075	if (pq->ltp_open_count == 0) {
1076		if (pool->ltp_finishing)
1077			/* let pool_destroy know we're all done */
1078			ldap_pvt_thread_cond_signal(&pq->ltp_cond);
1079		else
1080			freeme = 1;
1081	}
1082
1083	if (pool_lock)
1084		ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1085	else
1086		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1087
1088	if (freeme) {
1089		ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
1090		ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
1091		LDAP_FREE(pq->ltp_free);
1092		pq->ltp_free = NULL;
1093	}
1094	ldap_pvt_thread_exit(NULL);
1095	return(NULL);
1096}
1097
1098/* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()).  arg=PAUSE_ARG
1099 * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*.
1100 */
1101#define GO_IDLE		8
1102#define GO_UNIDLE	16
1103#define CHECK_PAUSE	32	/* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */
1104#define DO_PAUSE	64	/* CHECK_PAUSE; pause the pool */
1105#define PAUSE_ARG(a) \
1106		((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE))
1107
1108static int
1109handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
1110{
1111	struct ldap_int_thread_pool_s *pool;
1112	struct ldap_int_thread_poolq_s *pq;
1113	int ret = 0, pause, max_ltp_pause;
1114
1115	if (tpool == NULL)
1116		return(-1);
1117
1118	pool = *tpool;
1119
1120	if (pool == NULL)
1121		return(0);
1122
1123	if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
1124		return(0);
1125
1126	{
1127		ldap_int_thread_userctx_t *ctx = ldap_pvt_thread_pool_context();
1128		pq = ctx->ltu_pq;
1129		if ( !pq )
1130			return(-1);
1131	}
1132
1133	/* Let pool_unidle() ignore requests for new pauses */
1134	max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
1135
1136	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1137
1138	pause = pool->ltp_pause;	/* NOT_PAUSED, WANT_PAUSE or PAUSED */
1139
1140	/* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
1141	pause_type -= pause;
1142
1143	if (pause_type & GO_IDLE) {
1144		int do_pool = 0;
1145		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1146		pq->ltp_pending_count++;
1147		pq->ltp_active_count--;
1148		if (pause && pq->ltp_active_count < 1) {
1149			do_pool = 1;
1150		}
1151		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1152		if (do_pool) {
1153			pool->ltp_active_queues--;
1154			if (pool->ltp_active_queues < 1)
1155			/* Tell the task waiting to DO_PAUSE it can proceed */
1156				ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
1157		}
1158	}
1159
1160	if (pause_type & GO_UNIDLE) {
1161		/* Wait out pause if any, then cancel GO_IDLE */
1162		if (pause > max_ltp_pause) {
1163			ret = 1;
1164			do {
1165				ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1166			} while (pool->ltp_pause > max_ltp_pause);
1167		}
1168		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1169		pq->ltp_pending_count--;
1170		pq->ltp_active_count++;
1171		ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1172	}
1173
1174	if (pause_type & DO_PAUSE) {
1175		int i, j;
1176		/* Tell everyone else to pause or finish, then await that */
1177		ret = 0;
1178		assert(!pool->ltp_pause);
1179		pool->ltp_pause = WANT_PAUSE;
1180		pool->ltp_active_queues = 0;
1181
1182		for (i=0; i<pool->ltp_numqs; i++)
1183			if (pool->ltp_wqs[i] == pq) break;
1184
1185		ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1186		/* temporarily remove ourself from active count */
1187		pq->ltp_active_count--;
1188
1189		j=i;
1190		do {
1191			pq = pool->ltp_wqs[j];
1192			if (j != i)
1193				ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1194
1195			/* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
1196			pq->ltp_work_list = &empty_pending_list;
1197
1198			if (pq->ltp_active_count > 0)
1199				pool->ltp_active_queues++;
1200
1201			ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1202			if (pool->ltp_numqs > 1) {
1203				j++;
1204				j %= pool->ltp_numqs;
1205			}
1206		} while (j != i);
1207
1208		/* Wait for this task to become the sole active task */
1209		while (pool->ltp_active_queues > 0)
1210			ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
1211
1212		/* restore us to active count */
1213		pool->ltp_wqs[i]->ltp_active_count++;
1214
1215		assert(pool->ltp_pause == WANT_PAUSE);
1216		pool->ltp_pause = PAUSED;
1217	}
1218	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1219
1220	return(ret);
1221}
1222
1223/* Consider this task idle: It will not block pool_pause() in other tasks. */
1224void
1225ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool )
1226{
1227	handle_pause(tpool, PAUSE_ARG(GO_IDLE));
1228}
1229
1230/* Cancel pool_idle(). If the pool is paused, wait it out first. */
1231void
1232ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool )
1233{
1234	handle_pause(tpool, PAUSE_ARG(GO_UNIDLE));
1235}
1236
1237/*
1238 * If a pause was requested, wait for it.  If several threads
1239 * are waiting to pause, let through one or more pauses.
1240 * The calling task must be active, not idle.
1241 * Return 1 if we waited, 0 if not, -1 at parameter error.
1242 */
1243int
1244ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
1245{
1246	return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
1247}
1248
1249/*
1250 * Wait for a pause, from a non-pooled thread.
1251 */
1252int
1253ldap_pvt_thread_pool_pausecheck_native( ldap_pvt_thread_pool_t *tpool )
1254{
1255	struct ldap_int_thread_pool_s *pool;
1256
1257	if (tpool == NULL)
1258		return(-1);
1259
1260	pool = *tpool;
1261
1262	if (pool == NULL)
1263		return(0);
1264
1265	if (!pool->ltp_pause)
1266		return(0);
1267
1268	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1269	while (pool->ltp_pause)
1270			ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1271	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1272	return 1;
1273}
1274
1275/*
1276 * Pause the pool.  The calling task must be active, not idle.
1277 * Return when all other tasks are paused or idle.
1278 */
1279int
1280ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool )
1281{
1282	return handle_pause(tpool, PAUSE_ARG(DO_PAUSE));
1283}
1284
1285/* End a pause */
1286int
1287ldap_pvt_thread_pool_resume (
1288	ldap_pvt_thread_pool_t *tpool )
1289{
1290	struct ldap_int_thread_pool_s *pool;
1291	struct ldap_int_thread_poolq_s *pq;
1292	int i;
1293
1294	if (tpool == NULL)
1295		return(-1);
1296
1297	pool = *tpool;
1298
1299	if (pool == NULL)
1300		return(0);
1301
1302	ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1303	assert(pool->ltp_pause == PAUSED);
1304	pool->ltp_pause = 0;
1305	for (i=0; i<pool->ltp_numqs; i++) {
1306		pq = pool->ltp_wqs[i];
1307		pq->ltp_work_list = &pq->ltp_pending_list;
1308		ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
1309	}
1310	ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
1311	ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1312	return(0);
1313}
1314
1315/*
1316 * Get the key's data and optionally free function in the given context.
1317 */
1318int ldap_pvt_thread_pool_getkey(
1319	void *xctx,
1320	void *key,
1321	void **data,
1322	ldap_pvt_thread_pool_keyfree_t **kfree )
1323{
1324	ldap_int_thread_userctx_t *ctx = xctx;
1325	int i;
1326
1327	if ( !ctx || !key || !data ) return EINVAL;
1328
1329	for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
1330		if ( ctx->ltu_key[i].ltk_key == key ) {
1331			*data = ctx->ltu_key[i].ltk_data;
1332			if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
1333			return 0;
1334		}
1335	}
1336	return ENOENT;
1337}
1338
1339static void
1340clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
1341{
1342	for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
1343		ctx->ltu_key[i] = ctx->ltu_key[i+1];
1344	ctx->ltu_key[i].ltk_key = NULL;
1345}
1346
1347/*
1348 * Set or remove data for the key in the given context.
1349 * key can be any unique pointer.
1350 * kfree() is an optional function to free the data (but not the key):
1351 *   pool_context_reset() and pool_purgekey() call kfree(key, data),
1352 *   but pool_setkey() does not.  For pool_setkey() it is the caller's
1353 *   responsibility to free any existing data with the same key.
1354 *   kfree() must not call functions taking a tpool argument.
1355 */
1356int ldap_pvt_thread_pool_setkey(
1357	void *xctx,
1358	void *key,
1359	void *data,
1360	ldap_pvt_thread_pool_keyfree_t *kfree,
1361	void **olddatap,
1362	ldap_pvt_thread_pool_keyfree_t **oldkfreep )
1363{
1364	ldap_int_thread_userctx_t *ctx = xctx;
1365	int i, found;
1366
1367	if ( !ctx || !key ) return EINVAL;
1368
1369	for ( i=found=0; i<MAXKEYS; i++ ) {
1370		if ( ctx->ltu_key[i].ltk_key == key ) {
1371			found = 1;
1372			break;
1373		} else if ( !ctx->ltu_key[i].ltk_key ) {
1374			break;
1375		}
1376	}
1377
1378	if ( olddatap ) {
1379		if ( found ) {
1380			*olddatap = ctx->ltu_key[i].ltk_data;
1381		} else {
1382			*olddatap = NULL;
1383		}
1384	}
1385
1386	if ( oldkfreep ) {
1387		if ( found ) {
1388			*oldkfreep = ctx->ltu_key[i].ltk_free;
1389		} else {
1390			*oldkfreep = 0;
1391		}
1392	}
1393
1394	if ( data || kfree ) {
1395		if ( i>=MAXKEYS )
1396			return ENOMEM;
1397		ctx->ltu_key[i].ltk_key = key;
1398		ctx->ltu_key[i].ltk_data = data;
1399		ctx->ltu_key[i].ltk_free = kfree;
1400	} else if ( found ) {
1401		clear_key_idx( ctx, i );
1402	}
1403
1404	return 0;
1405}
1406
1407/* Free all elements with this key, no matter which thread they're in.
1408 * May only be called while the pool is paused.
1409 */
1410void ldap_pvt_thread_pool_purgekey( void *key )
1411{
1412	int i, j;
1413	ldap_int_thread_userctx_t *ctx;
1414
1415	assert ( key != NULL );
1416
1417	ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
1418	for ( i=0; i<LDAP_MAXTHR; i++ ) {
1419		ctx = thread_keys[i].ctx;
1420		if ( ctx && ctx != DELETED_THREAD_CTX ) {
1421			for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
1422				if ( ctx->ltu_key[j].ltk_key == key ) {
1423					if (ctx->ltu_key[j].ltk_free)
1424						ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
1425						ctx->ltu_key[j].ltk_data );
1426					clear_key_idx( ctx, j );
1427					break;
1428				}
1429			}
1430		}
1431	}
1432	ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
1433}
1434
1435/*
1436 * Find the context of the current thread.
1437 * This is necessary if the caller does not have access to the
1438 * thread context handle (for example, a slapd plugin calling
1439 * slapi_search_internal()). No doubt it is more efficient
1440 * for the application to keep track of the thread context
1441 * handles itself.
1442 */
1443void *ldap_pvt_thread_pool_context( )
1444{
1445	void *ctx = NULL;
1446
1447	ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
1448	return ctx ? ctx : (void *) &ldap_int_main_thrctx;
1449}
1450
1451/*
1452 * Free the context's keys.
1453 * Must not call functions taking a tpool argument (because this
1454 * thread already holds ltp_mutex when called from pool_wrapper()).
1455 */
1456void ldap_pvt_thread_pool_context_reset( void *vctx )
1457{
1458	ldap_int_thread_userctx_t *ctx = vctx;
1459	int i;
1460
1461	for ( i=MAXKEYS-1; i>=0; i--) {
1462		if ( !ctx->ltu_key[i].ltk_key )
1463			continue;
1464		if ( ctx->ltu_key[i].ltk_free )
1465			ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
1466			ctx->ltu_key[i].ltk_data );
1467		ctx->ltu_key[i].ltk_key = NULL;
1468	}
1469}
1470
1471ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
1472{
1473	ldap_int_thread_userctx_t *ctx = vctx;
1474
1475	return ctx->ltu_id;
1476}
1477#endif /* LDAP_THREAD_HAVE_TPOOL */
1478
1479#endif /* LDAP_R_COMPILE */
1480