sigev_thread.c revision 156267
1/*
2 * Copyright (c) 2005 David Xu <davidxu@freebsd.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice unmodified, this list of conditions, and the following
10 *    disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 *
26 * $FreeBSD: head/lib/librt/sigev_thread.c 156267 2006-03-04 00:18:19Z davidxu $
27 *
28 */
29
30#include <sys/types.h>
31#include <machine/atomic.h>
32
33#include "namespace.h"
34#include <assert.h>
35#include <err.h>
36#include <errno.h>
37#include <ucontext.h>
38#include <sys/thr.h>
39#include <sys/time.h>
40#include <stdio.h>
41#include <stdlib.h>
42#include <string.h>
43#include <signal.h>
44#include <pthread.h>
45#include "un-namespace.h"
46
47#include "sigev_thread.h"
48
49/* Lowest number of worker threads should be kept. */
50#define SIGEV_WORKER_LOW		0
51
52/* Highest number of worker threads can be created. */
53#define	SIGEV_WORKER_HIGH		20
54
55/* How long an idle worker thread should stay. */
56#define SIGEV_WORKER_IDLE		10
57
58struct sigev_worker {
59	LIST_ENTRY(sigev_worker)	sw_link;
60	pthread_cond_t		sw_cv;
61	struct sigev_node	*sw_sn;
62	int			sw_flags;
63	int			*sw_readyptr;
64};
65
66#define	SWF_READYQ		1
67
68LIST_HEAD(sigev_list_head, sigev_node);
69#define HASH_QUEUES		17
70#define	HASH(t, id)		((((id) << 3) + (t)) % HASH_QUEUES)
71
72static struct sigev_list_head	sigev_hash[HASH_QUEUES];
73static struct sigev_list_head	sigev_all;
74static TAILQ_HEAD(, sigev_node)	sigev_actq;
75static TAILQ_HEAD(, sigev_thread_node)	sigev_threads;
76static int			sigev_generation;
77static pthread_mutex_t		*sigev_list_mtx;
78static pthread_once_t		sigev_once = PTHREAD_ONCE_INIT;
79static pthread_once_t		sigev_once_default = PTHREAD_ONCE_INIT;
80static pthread_mutex_t		*sigev_threads_mtx;
81static pthread_cond_t		*sigev_threads_cv;
82static pthread_cond_t		*sigev_actq_cv;
83static struct sigev_thread_node	*sigev_default_thread;
84static struct sigev_thread_attr	sigev_default_sna;
85static pthread_attr_t		sigev_default_attr;
86static int			atfork_registered;
87static LIST_HEAD(,sigev_worker)	sigev_worker_ready;
88static int			sigev_worker_count;
89static int			sigev_worker_start;
90static int			sigev_worker_high;
91static int			sigev_worker_low;
92static pthread_cond_t		*sigev_worker_init_cv;
93
94static void __sigev_fork_prepare(void);
95static void __sigev_fork_parent(void);
96static void __sigev_fork_child(void);
97static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *,
98	struct sigev_thread_node *, int);
99static void *sigev_service_loop(void *);
100static void *sigev_worker_routine(void *);
101static void sigev_put(struct sigev_node *);
102static void worker_cleanup(void *arg);
103
104#pragma weak pthread_create
105#pragma weak pthread_attr_getschedpolicy
106#pragma weak pthread_attr_getinheritsched
107#pragma weak pthread_attr_getschedparam
108#pragma weak pthread_attr_getscope
109#pragma weak pthread_attr_getstacksize
110#pragma weak pthread_attr_getstackaddr
111#pragma weak pthread_attr_getguardsize
112#pragma weak pthread_attr_init
113#pragma weak pthread_attr_setscope
114#pragma weak pthread_attr_setdetachstate
115#pragma weak pthread_atfork
116#pragma weak _pthread_once
117#pragma weak pthread_cleanup_push
118#pragma weak pthread_cleanup_pop
119#pragma weak pthread_setcancelstate
120
121static __inline void
122attr2sna(pthread_attr_t *attr, struct sigev_thread_attr *sna)
123{
124	struct sched_param sched_param;
125
126	pthread_attr_getschedpolicy(attr, &sna->sna_policy);
127	pthread_attr_getinheritsched(attr, &sna->sna_inherit);
128	pthread_attr_getschedparam(attr, &sched_param);
129	sna->sna_prio = sched_param.sched_priority;
130	pthread_attr_getstacksize(attr, &sna->sna_stacksize);
131	pthread_attr_getstackaddr(attr, &sna->sna_stackaddr);
132	pthread_attr_getguardsize(attr, &sna->sna_guardsize);
133}
134
135static __inline int
136sna_eq(const struct sigev_thread_attr *a, const struct sigev_thread_attr *b)
137{
138	return memcmp(a, b, sizeof(*a)) == 0;
139}
140
141static __inline int
142have_threads(void)
143{
144	return (&pthread_create != NULL);
145}
146
147void
148__sigev_thread_init(void)
149{
150	static int inited = 0;
151	int i;
152
153	sigev_list_mtx = malloc(sizeof(pthread_mutex_t));
154	_pthread_mutex_init(sigev_list_mtx, NULL);
155	sigev_threads_mtx = malloc(sizeof(pthread_mutex_t));
156	_pthread_mutex_init(sigev_threads_mtx, NULL);
157	sigev_threads_cv = malloc(sizeof(pthread_cond_t));
158	_pthread_cond_init(sigev_threads_cv, NULL);
159	sigev_actq_cv = malloc(sizeof(pthread_cond_t));
160	_pthread_cond_init(sigev_actq_cv, NULL);
161	for (i = 0; i < HASH_QUEUES; ++i)
162		LIST_INIT(&sigev_hash[i]);
163	LIST_INIT(&sigev_all);
164	TAILQ_INIT(&sigev_threads);
165	TAILQ_INIT(&sigev_actq);
166	sigev_default_thread = NULL;
167	sigev_worker_count = 0;
168	sigev_worker_start = 0;
169	LIST_INIT(&sigev_worker_ready);
170	sigev_worker_high = SIGEV_WORKER_HIGH;
171	sigev_worker_low = SIGEV_WORKER_LOW;
172	sigev_worker_init_cv = malloc(sizeof(pthread_cond_t));
173	_pthread_cond_init(sigev_worker_init_cv, NULL);
174	if (atfork_registered == 0) {
175		pthread_atfork(
176			__sigev_fork_prepare,
177			__sigev_fork_parent,
178			__sigev_fork_child);
179		atfork_registered = 1;
180	}
181	if (!inited) {
182		pthread_attr_init(&sigev_default_attr);
183		attr2sna(&sigev_default_attr, &sigev_default_sna);
184		pthread_attr_setscope(&sigev_default_attr,
185			PTHREAD_SCOPE_SYSTEM);
186		pthread_attr_setdetachstate(&sigev_default_attr,
187			PTHREAD_CREATE_DETACHED);
188		inited = 1;
189	}
190	sigev_default_thread = sigev_thread_create(NULL, NULL, 0);
191}
192
193int
194__sigev_check_init(void)
195{
196	if (!have_threads())
197		return (-1);
198
199	_pthread_once(&sigev_once, __sigev_thread_init);
200	return (sigev_default_thread != NULL) ? 0 : -1;
201}
202
203static void
204__sigev_fork_prepare(void)
205{
206}
207
208static void
209__sigev_fork_parent(void)
210{
211}
212
213static void
214__sigev_fork_child(void)
215{
216	/*
217	 * This is a hack, the thread libraries really should
218	 * check if the handlers were already registered in
219	 * pthread_atfork().
220	 */
221	atfork_registered = 1;
222	memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once));
223	__sigev_thread_init();
224}
225
226int
227__sigev_list_lock(void)
228{
229	return _pthread_mutex_lock(sigev_list_mtx);
230}
231
232int
233__sigev_list_unlock(void)
234{
235	return _pthread_mutex_unlock(sigev_list_mtx);
236}
237
238int
239__sigev_thread_list_lock(void)
240{
241	return _pthread_mutex_lock(sigev_threads_mtx);
242}
243
244int
245__sigev_thread_list_unlock(void)
246{
247	return _pthread_mutex_unlock(sigev_threads_mtx);
248}
249
250struct sigev_node *
251__sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev,
252	int usethreadpool)
253{
254	struct sigev_node *sn;
255
256	sn = calloc(1, sizeof(*sn));
257	if (sn != NULL) {
258		sn->sn_value = evp->sigev_value;
259		sn->sn_func = evp->sigev_notify_function;
260		sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1);
261		sn->sn_type = type;
262		if (usethreadpool)
263			sn->sn_flags |= SNF_THREADPOOL;
264		sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes,
265			prev ? prev->sn_tn : NULL, usethreadpool);
266		if (sn->sn_tn == NULL) {
267			free(sn);
268			sn = NULL;
269		}
270	}
271	return (sn);
272}
273
274void
275__sigev_get_sigevent(struct sigev_node *sn, struct sigevent *newevp,
276	sigev_id_t id)
277{
278	/*
279	 * Build a new sigevent, and tell kernel to deliver SIGEV_SIGSERVICE
280	 * signal to the new thread.
281	 */
282	newevp->sigev_notify = SIGEV_THREAD_ID;
283	newevp->sigev_signo  = SIGEV_SIGSERVICE;
284	newevp->sigev_notify_thread_id = (lwpid_t)sn->sn_tn->tn_lwpid;
285	newevp->sigev_value.sival_ptr = (void *)id;
286}
287
288void
289__sigev_free(struct sigev_node *sn)
290{
291	free(sn);
292}
293
294struct sigev_node *
295__sigev_find(int type, sigev_id_t id)
296{
297	struct sigev_node *sn;
298	int chain = HASH(type, id);
299
300	LIST_FOREACH(sn, &sigev_hash[chain], sn_link) {
301		if (sn->sn_type == type && sn->sn_id == id)
302			break;
303	}
304	return (sn);
305}
306
307int
308__sigev_register(struct sigev_node *sn)
309{
310	int chain = HASH(sn->sn_type, sn->sn_id);
311
312	LIST_INSERT_HEAD(&sigev_hash[chain], sn, sn_link);
313	LIST_INSERT_HEAD(&sigev_all, sn, sn_allist);
314	return (0);
315}
316
317int
318__sigev_delete(int type, sigev_id_t id)
319{
320	struct sigev_node *sn;
321
322	sn = __sigev_find(type, id);
323	if (sn != NULL)
324		return (__sigev_delete_node(sn));
325	return (0);
326}
327
328int
329__sigev_delete_node(struct sigev_node *sn)
330{
331	LIST_REMOVE(sn, sn_link);
332	LIST_REMOVE(sn, sn_allist);
333
334	__sigev_thread_list_lock();
335	if (--sn->sn_tn->tn_refcount == 0)
336		if (!(sn->sn_flags & SNF_THREADPOOL))
337			pthread_kill(sn->sn_tn->tn_thread, SIGEV_SIGSERVICE);
338	__sigev_thread_list_unlock();
339	if (sn->sn_flags & SNF_WORKING)
340		sn->sn_flags |= SNF_REMOVED;
341	else {
342		if (sn->sn_flags & SNF_ACTQ) {
343			TAILQ_REMOVE(&sigev_actq, sn, sn_actq);
344		}
345		__sigev_free(sn);
346	}
347	return (0);
348}
349
350static
351sigev_id_t
352sigev_get_id(siginfo_t *si)
353{
354	switch(si->si_code) {
355	case SI_TIMER:
356		return (si->si_timerid);
357	case SI_MESGQ:
358		return (si->si_mqd);
359	case SI_ASYNCIO:
360		return (sigev_id_t)si->si_value.sival_ptr;
361	}
362	return (-1);
363}
364
365static struct sigev_thread_node *
366sigev_thread_create(pthread_attr_t *pattr, struct sigev_thread_node *prev,
367	int usepool)
368{
369	struct sigev_thread_node *tn;
370	struct sigev_thread_attr sna;
371	sigset_t set;
372	int ret;
373
374	if (pattr == NULL)
375		pattr = &sigev_default_attr;
376	else {
377		pthread_attr_setscope(pattr, PTHREAD_SCOPE_SYSTEM);
378		pthread_attr_setdetachstate(pattr, PTHREAD_CREATE_DETACHED);
379	}
380
381	attr2sna(pattr, &sna);
382
383	__sigev_thread_list_lock();
384
385	if (prev != NULL && sna_eq(&prev->tn_sna, &sna)) {
386		prev->tn_refcount++;
387		__sigev_thread_list_unlock();
388		return (prev);
389	}
390
391	if (sna_eq(&sna, &sigev_default_sna) && usepool &&
392	    sigev_default_thread != NULL) {
393		sigev_default_thread->tn_refcount++;
394		__sigev_thread_list_unlock();
395		return (sigev_default_thread);
396	}
397
398	tn = NULL;
399	/* Search a thread matching the required stack address */
400	if (sna.sna_stackaddr != NULL) {
401		TAILQ_FOREACH(tn, &sigev_threads, tn_link) {
402			if (sna.sna_stackaddr == tn->tn_sna.sna_stackaddr)
403				break;
404		}
405	}
406
407	if (tn != NULL) {
408		tn->tn_refcount++;
409		__sigev_thread_list_unlock();
410		return (tn);
411	}
412	tn = malloc(sizeof(*tn));
413	tn->tn_sna = sna;
414	tn->tn_cur = NULL;
415	tn->tn_lwpid = -1;
416	tn->tn_refcount = 1;
417	TAILQ_INSERT_TAIL(&sigev_threads, tn, tn_link);
418	sigemptyset(&set);
419	sigaddset(&set, SIGEV_SIGSERVICE);
420	_sigprocmask(SIG_BLOCK, &set, NULL);
421	ret = pthread_create(&tn->tn_thread, pattr, sigev_service_loop, tn);
422	_sigprocmask(SIG_UNBLOCK, &set, NULL);
423	if (ret != 0) {
424		TAILQ_REMOVE(&sigev_threads, tn, tn_link);
425		__sigev_thread_list_unlock();
426		free(tn);
427		tn = NULL;
428	} else {
429		/* wait the thread to get its lwpid */
430		while (tn->tn_lwpid == -1)
431			_pthread_cond_wait(sigev_threads_cv, sigev_threads_mtx);
432		__sigev_thread_list_unlock();
433	}
434	return (tn);
435}
436
437static void
438after_dispatch(struct sigev_thread_node *tn)
439{
440	struct sigev_node *sn;
441
442	if ((sn = tn->tn_cur) != NULL) {
443		__sigev_list_lock();
444		sn->sn_flags &= ~SNF_WORKING;
445		if (sn->sn_flags & SNF_REMOVED)
446			__sigev_free(sn);
447		else if (sn->sn_flags & SNF_ONESHOT)
448			__sigev_delete_node(sn);
449		tn->tn_cur = NULL;
450		__sigev_list_unlock();
451	}
452	tn->tn_cur = NULL;
453}
454
455/*
456 * This function is called if user callback calls
457 * pthread_exit() or pthread_cancel() for the thread.
458 */
459static void
460thread_cleanup(void *arg)
461{
462	struct sigev_thread_node *tn = arg;
463
464	fprintf(stderr, "Dangerous Robinson, calling pthread_exit() from "
465			"SIGEV_THREAD is undefined.");
466	after_dispatch(tn);
467	/* longjmp(tn->tn_jbuf, 1); */
468	abort();
469}
470
471/*
472 * Main notification dispatch function, the function either
473 * run user callback by itself or hand off the notifications
474 * to worker threads depend on flags.
475 */
476static void *
477sigev_service_loop(void *arg)
478{
479	siginfo_t si;
480	sigset_t set;
481	struct sigev_thread_node *tn;
482	struct sigev_node *sn;
483	sigev_id_t id;
484	int ret;
485
486	tn = arg;
487	thr_self(&tn->tn_lwpid);
488	__sigev_thread_list_lock();
489	_pthread_cond_broadcast(sigev_threads_cv);
490	__sigev_thread_list_unlock();
491
492	/*
493	 * Service thread should not be killed by callback, if user
494	 * attempts to do so, the thread will be restarted.
495	 */
496	setjmp(tn->tn_jbuf);
497	pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
498	sigemptyset(&set);
499	sigaddset(&set, SIGEV_SIGSERVICE);
500	pthread_cleanup_push(thread_cleanup, tn);
501	for (;;) {
502		ret = sigwaitinfo(&set, &si);
503		__sigev_thread_list_lock();
504		if (tn->tn_refcount == 0) {
505			TAILQ_REMOVE(&sigev_threads, tn, tn_link);
506			__sigev_thread_list_unlock();
507			free(tn);
508			break;
509		}
510		__sigev_thread_list_unlock();
511		if (ret == -1)
512			continue;
513		id = sigev_get_id(&si);
514		__sigev_list_lock();
515		sn = __sigev_find(si.si_code, id);
516		if (sn != NULL) {
517			sn->sn_info = si;
518			if (!(sn->sn_flags & SNF_THREADPOOL)) {
519				tn->tn_cur = sn;
520				sn->sn_flags |= SNF_WORKING;
521				__sigev_list_unlock();
522				sn->sn_dispatch(sn);
523				after_dispatch(tn);
524			} else {
525				assert(!(sn->sn_flags & SNF_ACTQ));
526				sigev_put(sn);
527			}
528		} else {
529			tn->tn_cur = NULL;
530			__sigev_list_unlock();
531		}
532	}
533	pthread_cleanup_pop(0);
534	return (0);
535}
536
537/*
538 * Hand off notifications to worker threads.
539 *
540 * prerequist: sigev list locked.
541 */
542static void
543sigev_put(struct sigev_node *sn)
544{
545	struct sigev_worker *worker;
546	pthread_t td;
547	int ret, ready;
548
549	TAILQ_INSERT_TAIL(&sigev_actq, sn, sn_actq);
550	sn->sn_flags |= SNF_ACTQ;
551	/*
552	 * check if we should add more worker threads unless quota is hit.
553	 */
554	if (LIST_EMPTY(&sigev_worker_ready) &&
555	    sigev_worker_count + sigev_worker_start < sigev_worker_high) {
556		sigev_worker_start++;
557		__sigev_list_unlock();
558		worker = malloc(sizeof(*worker));
559		_pthread_cond_init(&worker->sw_cv, NULL);
560		worker->sw_flags = 0;
561		worker->sw_sn = 0;
562		worker->sw_readyptr = &ready;
563		ready = 0;
564		ret = pthread_create(&td, &sigev_default_attr,
565			sigev_worker_routine, worker);
566		if (ret) {
567			warnc(ret, "%s:%s can not create worker thread",
568				__FILE__, __func__);
569			__sigev_list_lock();
570			sigev_worker_start--;
571			__sigev_list_unlock();
572		} else {
573			__sigev_list_lock();
574			while (ready == 0) {
575				_pthread_cond_wait(sigev_worker_init_cv,
576					sigev_list_mtx);
577			}
578			__sigev_list_unlock();
579		}
580	} else {
581		worker = LIST_FIRST(&sigev_worker_ready);
582		if (worker) {
583			LIST_REMOVE(worker, sw_link);
584			worker->sw_flags &= ~SWF_READYQ;
585			_pthread_cond_broadcast(&worker->sw_cv);
586			__sigev_list_unlock();
587		}
588	}
589}
590
591/*
592 * Background thread to dispatch notification to user code.
593 * These threads are not bound to any realtime objects.
594 */
595static void *
596sigev_worker_routine(void *arg)
597{
598	struct sigev_worker *worker;
599	struct sigev_node *sn;
600	struct timespec ts;
601	int ret;
602
603	worker = arg;
604	__sigev_list_lock();
605	sigev_worker_count++;
606	sigev_worker_start--;
607	(*(worker->sw_readyptr))++;
608	LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link);
609	worker->sw_flags |= SWF_READYQ;
610	_pthread_cond_broadcast(sigev_worker_init_cv);
611
612	for (;;) {
613		if (worker->sw_flags & SWF_READYQ) {
614			LIST_REMOVE(worker, sw_link);
615			worker->sw_flags &= ~SWF_READYQ;
616		}
617
618		sn = TAILQ_FIRST(&sigev_actq);
619		if (sn != NULL) {
620			TAILQ_REMOVE(&sigev_actq, sn, sn_actq);
621			sn->sn_flags &= ~SNF_ACTQ;
622			sn->sn_flags |= SNF_WORKING;
623			__sigev_list_unlock();
624
625			worker->sw_sn = sn;
626			pthread_cleanup_push(worker_cleanup, worker);
627			sn->sn_dispatch(sn);
628			pthread_cleanup_pop(0);
629			worker->sw_sn = NULL;
630
631			__sigev_list_lock();
632			sn->sn_flags &= ~SNF_WORKING;
633			if (sn->sn_flags & SNF_REMOVED)
634				__sigev_free(sn);
635			else if (sn->sn_flags & SNF_ONESHOT)
636				__sigev_delete_node(sn);
637		} else {
638			LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link);
639			worker->sw_flags |= SWF_READYQ;
640			clock_gettime(CLOCK_REALTIME, &ts);
641			ts.tv_sec += SIGEV_WORKER_IDLE;
642			ret = _pthread_cond_timedwait(&worker->sw_cv,
643				sigev_list_mtx, &ts);
644			if (ret == ETIMEDOUT) {
645				/*
646				 * If we were timeouted and there is nothing
647				 * to do, exit the thread.
648				 */
649				if (TAILQ_EMPTY(&sigev_actq) &&
650				    (worker->sw_flags & SWF_READYQ) &&
651				    sigev_worker_count > sigev_worker_low)
652					goto out;
653			}
654		}
655	}
656out:
657	if (worker->sw_flags & SWF_READYQ) {
658		LIST_REMOVE(worker, sw_link);
659		worker->sw_flags &= ~SWF_READYQ;
660	}
661	sigev_worker_count--;
662	__sigev_list_unlock();
663	_pthread_cond_destroy(&worker->sw_cv);
664	free(worker);
665	return (0);
666}
667
668static void
669worker_cleanup(void *arg)
670{
671	struct sigev_worker *worker = arg;
672	struct sigev_node *sn;
673
674	sn = worker->sw_sn;
675	__sigev_list_lock();
676	sn->sn_flags &= ~SNF_WORKING;
677	if (sn->sn_flags & SNF_REMOVED)
678		__sigev_free(sn);
679	else if (sn->sn_flags & SNF_ONESHOT)
680		__sigev_delete_node(sn);
681	sigev_worker_count--;
682	__sigev_list_unlock();
683	_pthread_cond_destroy(&worker->sw_cv);
684	free(worker);
685}
686