Deleted Added
full compact
sigev_thread.c (156267) sigev_thread.c (156383)
1/*
2 * Copyright (c) 2005 David Xu <davidxu@freebsd.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright

--- 9 unchanged lines hidden (view full) ---

18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 *
1/*
2 * Copyright (c) 2005 David Xu <davidxu@freebsd.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright

--- 9 unchanged lines hidden (view full) ---

18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25 *
26 * $FreeBSD: head/lib/librt/sigev_thread.c 156267 2006-03-04 00:18:19Z davidxu $
26 * $FreeBSD: head/lib/librt/sigev_thread.c 156383 2006-03-07 08:28:07Z davidxu $
27 *
28 */
29
30#include <sys/types.h>
31#include <machine/atomic.h>
32
33#include "namespace.h"
27 *
28 */
29
30#include <sys/types.h>
31#include <machine/atomic.h>
32
33#include "namespace.h"
34#include <assert.h>
35#include <err.h>
36#include <errno.h>
37#include <ucontext.h>
38#include <sys/thr.h>
34#include <err.h>
35#include <errno.h>
36#include <ucontext.h>
37#include <sys/thr.h>
39#include <sys/time.h>
40#include <stdio.h>
41#include <stdlib.h>
42#include <string.h>
43#include <signal.h>
44#include <pthread.h>
45#include "un-namespace.h"
46
47#include "sigev_thread.h"
48
38#include <stdio.h>
39#include <stdlib.h>
40#include <string.h>
41#include <signal.h>
42#include <pthread.h>
43#include "un-namespace.h"
44
45#include "sigev_thread.h"
46
49/* Lowest number of worker threads should be kept. */
50#define SIGEV_WORKER_LOW 0
51
52/* Highest number of worker threads can be created. */
53#define SIGEV_WORKER_HIGH 20
54
55/* How long an idle worker thread should stay. */
56#define SIGEV_WORKER_IDLE 10
57
58struct sigev_worker {
59 LIST_ENTRY(sigev_worker) sw_link;
60 pthread_cond_t sw_cv;
61 struct sigev_node *sw_sn;
62 int sw_flags;
63 int *sw_readyptr;
64};
65
66#define SWF_READYQ 1
67
68LIST_HEAD(sigev_list_head, sigev_node);
69#define HASH_QUEUES 17
70#define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES)
71
72static struct sigev_list_head sigev_hash[HASH_QUEUES];
73static struct sigev_list_head sigev_all;
47LIST_HEAD(sigev_list_head, sigev_node);
48#define HASH_QUEUES 17
49#define HASH(t, id) ((((id) << 3) + (t)) % HASH_QUEUES)
50
51static struct sigev_list_head sigev_hash[HASH_QUEUES];
52static struct sigev_list_head sigev_all;
74static TAILQ_HEAD(, sigev_node) sigev_actq;
75static TAILQ_HEAD(, sigev_thread_node) sigev_threads;
53static LIST_HEAD(,sigev_thread) sigev_threads;
76static int sigev_generation;
77static pthread_mutex_t *sigev_list_mtx;
78static pthread_once_t sigev_once = PTHREAD_ONCE_INIT;
79static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT;
54static int sigev_generation;
55static pthread_mutex_t *sigev_list_mtx;
56static pthread_once_t sigev_once = PTHREAD_ONCE_INIT;
57static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT;
80static pthread_mutex_t *sigev_threads_mtx;
81static pthread_cond_t *sigev_threads_cv;
82static pthread_cond_t *sigev_actq_cv;
83static struct sigev_thread_node *sigev_default_thread;
84static struct sigev_thread_attr sigev_default_sna;
58static struct sigev_thread *sigev_default_thread;
85static pthread_attr_t sigev_default_attr;
86static int atfork_registered;
59static pthread_attr_t sigev_default_attr;
60static int atfork_registered;
87static LIST_HEAD(,sigev_worker) sigev_worker_ready;
88static int sigev_worker_count;
89static int sigev_worker_start;
90static int sigev_worker_high;
91static int sigev_worker_low;
92static pthread_cond_t *sigev_worker_init_cv;
93
61
94static void __sigev_fork_prepare(void);
95static void __sigev_fork_parent(void);
96static void __sigev_fork_child(void);
97static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *,
98 struct sigev_thread_node *, int);
99static void *sigev_service_loop(void *);
100static void *sigev_worker_routine(void *);
101static void sigev_put(struct sigev_node *);
102static void worker_cleanup(void *arg);
62static void __sigev_fork_prepare(void);
63static void __sigev_fork_parent(void);
64static void __sigev_fork_child(void);
65static struct sigev_thread *sigev_thread_create(int);
66static void *sigev_service_loop(void *);
67static void *worker_routine(void *);
68static void worker_cleanup(void *);
103
104#pragma weak pthread_create
69
70#pragma weak pthread_create
105#pragma weak pthread_attr_getschedpolicy
106#pragma weak pthread_attr_getinheritsched
107#pragma weak pthread_attr_getschedparam
108#pragma weak pthread_attr_getscope
109#pragma weak pthread_attr_getstacksize
110#pragma weak pthread_attr_getstackaddr
111#pragma weak pthread_attr_getguardsize
112#pragma weak pthread_attr_init
113#pragma weak pthread_attr_setscope
114#pragma weak pthread_attr_setdetachstate
115#pragma weak pthread_atfork
116#pragma weak _pthread_once
117#pragma weak pthread_cleanup_push
118#pragma weak pthread_cleanup_pop
119#pragma weak pthread_setcancelstate
120
71
121static __inline void
122attr2sna(pthread_attr_t *attr, struct sigev_thread_attr *sna)
72static void
73attrcopy(pthread_attr_t *src, pthread_attr_t *dst)
123{
74{
124 struct sched_param sched_param;
75 struct sched_param sched;
76 void *a;
77 size_t u;
78 int v;
125
79
126 pthread_attr_getschedpolicy(attr, &sna->sna_policy);
127 pthread_attr_getinheritsched(attr, &sna->sna_inherit);
128 pthread_attr_getschedparam(attr, &sched_param);
129 sna->sna_prio = sched_param.sched_priority;
130 pthread_attr_getstacksize(attr, &sna->sna_stacksize);
131 pthread_attr_getstackaddr(attr, &sna->sna_stackaddr);
132 pthread_attr_getguardsize(attr, &sna->sna_guardsize);
133}
80 _pthread_attr_getschedpolicy(src, &v);
81 _pthread_attr_setschedpolicy(dst, v);
134
82
135static __inline int
136sna_eq(const struct sigev_thread_attr *a, const struct sigev_thread_attr *b)
137{
138 return memcmp(a, b, sizeof(*a)) == 0;
83 _pthread_attr_getinheritsched(src, &v);
84 _pthread_attr_setinheritsched(dst, v);
85
86 _pthread_attr_getschedparam(src, &sched);
87 _pthread_attr_setschedparam(dst, &sched);
88
89 _pthread_attr_getscope(src, &v);
90 _pthread_attr_setscope(dst, v);
91
92 _pthread_attr_getstacksize(src, &u);
93 _pthread_attr_setstacksize(dst, u);
94
95 _pthread_attr_getstackaddr(src, &a);
96 _pthread_attr_setstackaddr(src, a);
97
98 _pthread_attr_getguardsize(src, &u);
99 _pthread_attr_setguardsize(dst, u);
139}
140
141static __inline int
142have_threads(void)
143{
144 return (&pthread_create != NULL);
145}
146
147void
148__sigev_thread_init(void)
149{
150 static int inited = 0;
100}
101
102static __inline int
103have_threads(void)
104{
105 return (&pthread_create != NULL);
106}
107
108void
109__sigev_thread_init(void)
110{
111 static int inited = 0;
112 pthread_mutexattr_t mattr;
151 int i;
152
113 int i;
114
115 _pthread_mutexattr_init(&mattr);
116 _pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_NORMAL);
153 sigev_list_mtx = malloc(sizeof(pthread_mutex_t));
117 sigev_list_mtx = malloc(sizeof(pthread_mutex_t));
154 _pthread_mutex_init(sigev_list_mtx, NULL);
155 sigev_threads_mtx = malloc(sizeof(pthread_mutex_t));
156 _pthread_mutex_init(sigev_threads_mtx, NULL);
157 sigev_threads_cv = malloc(sizeof(pthread_cond_t));
158 _pthread_cond_init(sigev_threads_cv, NULL);
159 sigev_actq_cv = malloc(sizeof(pthread_cond_t));
160 _pthread_cond_init(sigev_actq_cv, NULL);
118 _pthread_mutex_init(sigev_list_mtx, &mattr);
119 _pthread_mutexattr_destroy(&mattr);
120
161 for (i = 0; i < HASH_QUEUES; ++i)
162 LIST_INIT(&sigev_hash[i]);
163 LIST_INIT(&sigev_all);
121 for (i = 0; i < HASH_QUEUES; ++i)
122 LIST_INIT(&sigev_hash[i]);
123 LIST_INIT(&sigev_all);
164 TAILQ_INIT(&sigev_threads);
165 TAILQ_INIT(&sigev_actq);
124 LIST_INIT(&sigev_threads);
166 sigev_default_thread = NULL;
125 sigev_default_thread = NULL;
167 sigev_worker_count = 0;
168 sigev_worker_start = 0;
169 LIST_INIT(&sigev_worker_ready);
170 sigev_worker_high = SIGEV_WORKER_HIGH;
171 sigev_worker_low = SIGEV_WORKER_LOW;
172 sigev_worker_init_cv = malloc(sizeof(pthread_cond_t));
173 _pthread_cond_init(sigev_worker_init_cv, NULL);
174 if (atfork_registered == 0) {
126 if (atfork_registered == 0) {
175 pthread_atfork(
127 _pthread_atfork(
176 __sigev_fork_prepare,
177 __sigev_fork_parent,
178 __sigev_fork_child);
179 atfork_registered = 1;
180 }
181 if (!inited) {
128 __sigev_fork_prepare,
129 __sigev_fork_parent,
130 __sigev_fork_child);
131 atfork_registered = 1;
132 }
133 if (!inited) {
182 pthread_attr_init(&sigev_default_attr);
183 attr2sna(&sigev_default_attr, &sigev_default_sna);
184 pthread_attr_setscope(&sigev_default_attr,
134 _pthread_attr_init(&sigev_default_attr);
135 _pthread_attr_setscope(&sigev_default_attr,
185 PTHREAD_SCOPE_SYSTEM);
136 PTHREAD_SCOPE_SYSTEM);
186 pthread_attr_setdetachstate(&sigev_default_attr,
137 _pthread_attr_setdetachstate(&sigev_default_attr,
187 PTHREAD_CREATE_DETACHED);
188 inited = 1;
189 }
138 PTHREAD_CREATE_DETACHED);
139 inited = 1;
140 }
190 sigev_default_thread = sigev_thread_create(NULL, NULL, 0);
141 sigev_default_thread = sigev_thread_create(0);
191}
192
193int
194__sigev_check_init(void)
195{
196 if (!have_threads())
197 return (-1);
198

--- 19 unchanged lines hidden (view full) ---

218 * check if the handlers were already registered in
219 * pthread_atfork().
220 */
221 atfork_registered = 1;
222 memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once));
223 __sigev_thread_init();
224}
225
142}
143
144int
145__sigev_check_init(void)
146{
147 if (!have_threads())
148 return (-1);
149

--- 19 unchanged lines hidden (view full) ---

169 * check if the handlers were already registered in
170 * pthread_atfork().
171 */
172 atfork_registered = 1;
173 memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once));
174 __sigev_thread_init();
175}
176
226int
177void
227__sigev_list_lock(void)
228{
178__sigev_list_lock(void)
179{
229 return _pthread_mutex_lock(sigev_list_mtx);
180 _pthread_mutex_lock(sigev_list_mtx);
230}
231
181}
182
232int
183void
233__sigev_list_unlock(void)
234{
184__sigev_list_unlock(void)
185{
235 return _pthread_mutex_unlock(sigev_list_mtx);
186 _pthread_mutex_unlock(sigev_list_mtx);
236}
237
187}
188
238int
239__sigev_thread_list_lock(void)
240{
241 return _pthread_mutex_lock(sigev_threads_mtx);
242}
243
244int
245__sigev_thread_list_unlock(void)
246{
247 return _pthread_mutex_unlock(sigev_threads_mtx);
248}
249
250struct sigev_node *
251__sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev,
189struct sigev_node *
190__sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev,
252 int usethreadpool)
191 int usedefault)
253{
254 struct sigev_node *sn;
255
256 sn = calloc(1, sizeof(*sn));
257 if (sn != NULL) {
258 sn->sn_value = evp->sigev_value;
192{
193 struct sigev_node *sn;
194
195 sn = calloc(1, sizeof(*sn));
196 if (sn != NULL) {
197 sn->sn_value = evp->sigev_value;
259 sn->sn_func = evp->sigev_notify_function;
260 sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1);
261 sn->sn_type = type;
262 if (usethreadpool)
263 sn->sn_flags |= SNF_THREADPOOL;
264 sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes,
265 prev ? prev->sn_tn : NULL, usethreadpool);
266 if (sn->sn_tn == NULL) {
267 free(sn);
268 sn = NULL;
198 sn->sn_func = evp->sigev_notify_function;
199 sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1);
200 sn->sn_type = type;
201 _pthread_attr_init(&sn->sn_attr);
202 _pthread_attr_setdetachstate(&sn->sn_attr, PTHREAD_CREATE_DETACHED);
203 if (evp->sigev_notify_attributes)
204 attrcopy(evp->sigev_notify_attributes, &sn->sn_attr);
205 if (prev) {
206 __sigev_list_lock();
207 prev->sn_tn->tn_refcount++;
208 __sigev_list_unlock();
209 sn->sn_tn = prev->sn_tn;
210 } else {
211 sn->sn_tn = sigev_thread_create(usedefault);
212 if (sn->sn_tn == NULL) {
213 _pthread_attr_destroy(&sn->sn_attr);
214 free(sn);
215 sn = NULL;
216 }
269 }
270 }
271 return (sn);
272}
273
274void
275__sigev_get_sigevent(struct sigev_node *sn, struct sigevent *newevp,
276 sigev_id_t id)
277{
278 /*
217 }
218 }
219 return (sn);
220}
221
222void
223__sigev_get_sigevent(struct sigev_node *sn, struct sigevent *newevp,
224 sigev_id_t id)
225{
226 /*
279 * Build a new sigevent, and tell kernel to deliver SIGEV_SIGSERVICE
227 * Build a new sigevent, and tell kernel to deliver SIGSERVICE
280 * signal to the new thread.
281 */
282 newevp->sigev_notify = SIGEV_THREAD_ID;
228 * signal to the new thread.
229 */
230 newevp->sigev_notify = SIGEV_THREAD_ID;
283 newevp->sigev_signo = SIGEV_SIGSERVICE;
231 newevp->sigev_signo = SIGSERVICE;
284 newevp->sigev_notify_thread_id = (lwpid_t)sn->sn_tn->tn_lwpid;
285 newevp->sigev_value.sival_ptr = (void *)id;
286}
287
288void
289__sigev_free(struct sigev_node *sn)
290{
232 newevp->sigev_notify_thread_id = (lwpid_t)sn->sn_tn->tn_lwpid;
233 newevp->sigev_value.sival_ptr = (void *)id;
234}
235
236void
237__sigev_free(struct sigev_node *sn)
238{
239 _pthread_attr_destroy(&sn->sn_attr);
291 free(sn);
292}
293
294struct sigev_node *
295__sigev_find(int type, sigev_id_t id)
296{
297 struct sigev_node *sn;
298 int chain = HASH(type, id);

--- 6 unchanged lines hidden (view full) ---

305}
306
307int
308__sigev_register(struct sigev_node *sn)
309{
310 int chain = HASH(sn->sn_type, sn->sn_id);
311
312 LIST_INSERT_HEAD(&sigev_hash[chain], sn, sn_link);
240 free(sn);
241}
242
243struct sigev_node *
244__sigev_find(int type, sigev_id_t id)
245{
246 struct sigev_node *sn;
247 int chain = HASH(type, id);

--- 6 unchanged lines hidden (view full) ---

254}
255
256int
257__sigev_register(struct sigev_node *sn)
258{
259 int chain = HASH(sn->sn_type, sn->sn_id);
260
261 LIST_INSERT_HEAD(&sigev_hash[chain], sn, sn_link);
313 LIST_INSERT_HEAD(&sigev_all, sn, sn_allist);
314 return (0);
315}
316
317int
318__sigev_delete(int type, sigev_id_t id)
319{
320 struct sigev_node *sn;
321
322 sn = __sigev_find(type, id);
323 if (sn != NULL)
324 return (__sigev_delete_node(sn));
325 return (0);
326}
327
328int
329__sigev_delete_node(struct sigev_node *sn)
330{
331 LIST_REMOVE(sn, sn_link);
262 return (0);
263}
264
265int
266__sigev_delete(int type, sigev_id_t id)
267{
268 struct sigev_node *sn;
269
270 sn = __sigev_find(type, id);
271 if (sn != NULL)
272 return (__sigev_delete_node(sn));
273 return (0);
274}
275
276int
277__sigev_delete_node(struct sigev_node *sn)
278{
279 LIST_REMOVE(sn, sn_link);
332 LIST_REMOVE(sn, sn_allist);
333
280
334 __sigev_thread_list_lock();
335 if (--sn->sn_tn->tn_refcount == 0)
281 if (--sn->sn_tn->tn_refcount == 0)
336 if (!(sn->sn_flags & SNF_THREADPOOL))
337 pthread_kill(sn->sn_tn->tn_thread, SIGEV_SIGSERVICE);
338 __sigev_thread_list_unlock();
282 _pthread_kill(sn->sn_tn->tn_thread, SIGSERVICE);
339 if (sn->sn_flags & SNF_WORKING)
340 sn->sn_flags |= SNF_REMOVED;
283 if (sn->sn_flags & SNF_WORKING)
284 sn->sn_flags |= SNF_REMOVED;
341 else {
342 if (sn->sn_flags & SNF_ACTQ) {
343 TAILQ_REMOVE(&sigev_actq, sn, sn_actq);
344 }
285 else
345 __sigev_free(sn);
286 __sigev_free(sn);
346 }
347 return (0);
348}
349
287 return (0);
288}
289
350static
351sigev_id_t
290static sigev_id_t
352sigev_get_id(siginfo_t *si)
353{
354 switch(si->si_code) {
355 case SI_TIMER:
356 return (si->si_timerid);
357 case SI_MESGQ:
358 return (si->si_mqd);
359 case SI_ASYNCIO:
360 return (sigev_id_t)si->si_value.sival_ptr;
361 }
362 return (-1);
363}
364
291sigev_get_id(siginfo_t *si)
292{
293 switch(si->si_code) {
294 case SI_TIMER:
295 return (si->si_timerid);
296 case SI_MESGQ:
297 return (si->si_mqd);
298 case SI_ASYNCIO:
299 return (sigev_id_t)si->si_value.sival_ptr;
300 }
301 return (-1);
302}
303
365static struct sigev_thread_node *
366sigev_thread_create(pthread_attr_t *pattr, struct sigev_thread_node *prev,
367 int usepool)
304static struct sigev_thread *
305sigev_thread_create(int usedefault)
368{
306{
369 struct sigev_thread_node *tn;
370 struct sigev_thread_attr sna;
307 struct sigev_thread *tn;
371 sigset_t set;
372 int ret;
373
308 sigset_t set;
309 int ret;
310
374 if (pattr == NULL)
375 pattr = &sigev_default_attr;
376 else {
377 pthread_attr_setscope(pattr, PTHREAD_SCOPE_SYSTEM);
378 pthread_attr_setdetachstate(pattr, PTHREAD_CREATE_DETACHED);
379 }
380
381 attr2sna(pattr, &sna);
382
383 __sigev_thread_list_lock();
384
385 if (prev != NULL && sna_eq(&prev->tn_sna, &sna)) {
386 prev->tn_refcount++;
387 __sigev_thread_list_unlock();
388 return (prev);
389 }
390
391 if (sna_eq(&sna, &sigev_default_sna) && usepool &&
392 sigev_default_thread != NULL) {
311 if (usedefault && sigev_default_thread) {
312 __sigev_list_lock();
393 sigev_default_thread->tn_refcount++;
313 sigev_default_thread->tn_refcount++;
394 __sigev_thread_list_unlock();
395 return (sigev_default_thread);
314 __sigev_list_unlock();
315 return (sigev_default_thread);
396 }
397
316 }
317
398 tn = NULL;
399 /* Search a thread matching the required stack address */
400 if (sna.sna_stackaddr != NULL) {
401 TAILQ_FOREACH(tn, &sigev_threads, tn_link) {
402 if (sna.sna_stackaddr == tn->tn_sna.sna_stackaddr)
403 break;
404 }
405 }
406
407 if (tn != NULL) {
408 tn->tn_refcount++;
409 __sigev_thread_list_unlock();
410 return (tn);
411 }
412 tn = malloc(sizeof(*tn));
318 tn = malloc(sizeof(*tn));
413 tn->tn_sna = sna;
414 tn->tn_cur = NULL;
415 tn->tn_lwpid = -1;
416 tn->tn_refcount = 1;
319 tn->tn_cur = NULL;
320 tn->tn_lwpid = -1;
321 tn->tn_refcount = 1;
417 TAILQ_INSERT_TAIL(&sigev_threads, tn, tn_link);
322 _pthread_cond_init(&tn->tn_cv, NULL);
323
324 /* for debug */
325 __sigev_list_lock();
326 LIST_INSERT_HEAD(&sigev_threads, tn, tn_link);
327 __sigev_list_unlock();
328
418 sigemptyset(&set);
329 sigemptyset(&set);
419 sigaddset(&set, SIGEV_SIGSERVICE);
330 sigaddset(&set, SIGSERVICE);
331
420 _sigprocmask(SIG_BLOCK, &set, NULL);
332 _sigprocmask(SIG_BLOCK, &set, NULL);
421 ret = pthread_create(&tn->tn_thread, pattr, sigev_service_loop, tn);
333 ret = pthread_create(&tn->tn_thread, &sigev_default_attr,
334 sigev_service_loop, tn);
422 _sigprocmask(SIG_UNBLOCK, &set, NULL);
335 _sigprocmask(SIG_UNBLOCK, &set, NULL);
336
423 if (ret != 0) {
337 if (ret != 0) {
424 TAILQ_REMOVE(&sigev_threads, tn, tn_link);
425 __sigev_thread_list_unlock();
338 __sigev_list_lock();
339 LIST_REMOVE(tn, tn_link);
340 __sigev_list_unlock();
426 free(tn);
427 tn = NULL;
428 } else {
429 /* wait the thread to get its lwpid */
341 free(tn);
342 tn = NULL;
343 } else {
344 /* wait the thread to get its lwpid */
430 while (tn->tn_lwpid == -1)
431 _pthread_cond_wait(sigev_threads_cv, sigev_threads_mtx);
432 __sigev_thread_list_unlock();
433 }
434 return (tn);
435}
436
345
437static void
438after_dispatch(struct sigev_thread_node *tn)
439{
440 struct sigev_node *sn;
441
442 if ((sn = tn->tn_cur) != NULL) {
443 __sigev_list_lock();
346 __sigev_list_lock();
444 sn->sn_flags &= ~SNF_WORKING;
445 if (sn->sn_flags & SNF_REMOVED)
446 __sigev_free(sn);
447 else if (sn->sn_flags & SNF_ONESHOT)
448 __sigev_delete_node(sn);
449 tn->tn_cur = NULL;
347 while (tn->tn_lwpid == -1)
348 _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx);
450 __sigev_list_unlock();
451 }
349 __sigev_list_unlock();
350 }
452 tn->tn_cur = NULL;
351 return (tn);
453}
454
455/*
352}
353
354/*
456 * This function is called if user callback calls
457 * pthread_exit() or pthread_cancel() for the thread.
355 * The thread receives notification from kernel and creates
356 * a thread to call user callback function.
458 */
357 */
459static void
460thread_cleanup(void *arg)
461{
462 struct sigev_thread_node *tn = arg;
463
464 fprintf(stderr, "Dangerous Robinson, calling pthread_exit() from "
465 "SIGEV_THREAD is undefined.");
466 after_dispatch(tn);
467 /* longjmp(tn->tn_jbuf, 1); */
468 abort();
469}
470
471/*
472 * Main notification dispatch function, the function either
473 * run user callback by itself or hand off the notifications
474 * to worker threads depend on flags.
475 */
476static void *
477sigev_service_loop(void *arg)
478{
358static void *
359sigev_service_loop(void *arg)
360{
361 static int failure;
362
479 siginfo_t si;
480 sigset_t set;
363 siginfo_t si;
364 sigset_t set;
481 struct sigev_thread_node *tn;
365 struct sigev_thread *tn;
482 struct sigev_node *sn;
483 sigev_id_t id;
366 struct sigev_node *sn;
367 sigev_id_t id;
368 pthread_t td;
484 int ret;
485
486 tn = arg;
487 thr_self(&tn->tn_lwpid);
369 int ret;
370
371 tn = arg;
372 thr_self(&tn->tn_lwpid);
488 __sigev_thread_list_lock();
489 _pthread_cond_broadcast(sigev_threads_cv);
490 __sigev_thread_list_unlock();
373 __sigev_list_lock();
374 _pthread_cond_broadcast(&tn->tn_cv);
375 __sigev_list_unlock();
491
492 /*
493 * Service thread should not be killed by callback, if user
494 * attempts to do so, the thread will be restarted.
495 */
376
377 /*
378 * Service thread should not be killed by callback, if user
379 * attempts to do so, the thread will be restarted.
380 */
496 setjmp(tn->tn_jbuf);
497 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
498 sigemptyset(&set);
381 sigemptyset(&set);
499 sigaddset(&set, SIGEV_SIGSERVICE);
500 pthread_cleanup_push(thread_cleanup, tn);
382 sigaddset(&set, SIGSERVICE);
501 for (;;) {
502 ret = sigwaitinfo(&set, &si);
383 for (;;) {
384 ret = sigwaitinfo(&set, &si);
503 __sigev_thread_list_lock();
385
386 __sigev_list_lock();
504 if (tn->tn_refcount == 0) {
387 if (tn->tn_refcount == 0) {
505 TAILQ_REMOVE(&sigev_threads, tn, tn_link);
506 __sigev_thread_list_unlock();
388 LIST_REMOVE(tn, tn_link);
389 __sigev_list_unlock();
507 free(tn);
508 break;
509 }
390 free(tn);
391 break;
392 }
510 __sigev_thread_list_unlock();
511 if (ret == -1)
393
394 if (ret == -1) {
395 __sigev_list_unlock();
512 continue;
396 continue;
397 }
398
513 id = sigev_get_id(&si);
399 id = sigev_get_id(&si);
514 __sigev_list_lock();
515 sn = __sigev_find(si.si_code, id);
400 sn = __sigev_find(si.si_code, id);
516 if (sn != NULL) {
517 sn->sn_info = si;
518 if (!(sn->sn_flags & SNF_THREADPOOL)) {
519 tn->tn_cur = sn;
520 sn->sn_flags |= SNF_WORKING;
521 __sigev_list_unlock();
522 sn->sn_dispatch(sn);
523 after_dispatch(tn);
524 } else {
525 assert(!(sn->sn_flags & SNF_ACTQ));
526 sigev_put(sn);
527 }
528 } else {
529 tn->tn_cur = NULL;
401 if (sn == NULL) {
530 __sigev_list_unlock();
402 __sigev_list_unlock();
403 continue;
531 }
404 }
532 }
533 pthread_cleanup_pop(0);
534 return (0);
535}
405
406 sn->sn_info = si;
407 if (sn->sn_flags & SNF_SYNC)
408 tn->tn_cur = sn;
409 else
410 tn->tn_cur = NULL;
411 sn->sn_flags |= SNF_WORKING;
412 __sigev_list_unlock();
536
413
537/*
538 * Hand off notifications to worker threads.
539 *
540 * prerequist: sigev list locked.
541 */
542static void
543sigev_put(struct sigev_node *sn)
544{
545 struct sigev_worker *worker;
546 pthread_t td;
547 int ret, ready;
414 ret = pthread_create(&td, &sn->sn_attr, worker_routine, sn);
415 if (ret != 0) {
416 if (failure++ < 5)
417 warnc(ret, "%s:%s failed to create thread.\n",
418 __FILE__, __func__);
548
419
549 TAILQ_INSERT_TAIL(&sigev_actq, sn, sn_actq);
550 sn->sn_flags |= SNF_ACTQ;
551 /*
552 * check if we should add more worker threads unless quota is hit.
553 */
554 if (LIST_EMPTY(&sigev_worker_ready) &&
555 sigev_worker_count + sigev_worker_start < sigev_worker_high) {
556 sigev_worker_start++;
557 __sigev_list_unlock();
558 worker = malloc(sizeof(*worker));
559 _pthread_cond_init(&worker->sw_cv, NULL);
560 worker->sw_flags = 0;
561 worker->sw_sn = 0;
562 worker->sw_readyptr = &ready;
563 ready = 0;
564 ret = pthread_create(&td, &sigev_default_attr,
565 sigev_worker_routine, worker);
566 if (ret) {
567 warnc(ret, "%s:%s can not create worker thread",
568 __FILE__, __func__);
569 __sigev_list_lock();
420 __sigev_list_lock();
570 sigev_worker_start--;
421 sn->sn_flags &= ~SNF_WORKING;
422 if (sn->sn_flags & SNF_REMOVED)
423 __sigev_free(sn);
571 __sigev_list_unlock();
424 __sigev_list_unlock();
572 } else {
425 } else if (tn->tn_cur) {
573 __sigev_list_lock();
426 __sigev_list_lock();
574 while (ready == 0) {
575 _pthread_cond_wait(sigev_worker_init_cv,
576 sigev_list_mtx);
577 }
427 while (tn->tn_cur)
428 _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx);
578 __sigev_list_unlock();
579 }
429 __sigev_list_unlock();
430 }
580 } else {
581 worker = LIST_FIRST(&sigev_worker_ready);
582 if (worker) {
583 LIST_REMOVE(worker, sw_link);
584 worker->sw_flags &= ~SWF_READYQ;
585 _pthread_cond_broadcast(&worker->sw_cv);
586 __sigev_list_unlock();
587 }
588 }
431 }
432 return (0);
589}
590
591/*
433}
434
435/*
592 * Background thread to dispatch notification to user code.
593 * These threads are not bound to any realtime objects.
436 * newly created worker thread to call user callback function.
594 */
595static void *
437 */
438static void *
596sigev_worker_routine(void *arg)
439worker_routine(void *arg)
597{
440{
598 struct sigev_worker *worker;
599 struct sigev_node *sn;
600 struct timespec ts;
601 int ret;
441 struct sigev_node *sn = arg;
602
442
603 worker = arg;
604 __sigev_list_lock();
605 sigev_worker_count++;
606 sigev_worker_start--;
607 (*(worker->sw_readyptr))++;
608 LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link);
609 worker->sw_flags |= SWF_READYQ;
610 _pthread_cond_broadcast(sigev_worker_init_cv);
443 _pthread_cleanup_push(worker_cleanup, sn);
444 sn->sn_dispatch(sn);
445 _pthread_cleanup_pop(1);
611
446
612 for (;;) {
613 if (worker->sw_flags & SWF_READYQ) {
614 LIST_REMOVE(worker, sw_link);
615 worker->sw_flags &= ~SWF_READYQ;
616 }
617
618 sn = TAILQ_FIRST(&sigev_actq);
619 if (sn != NULL) {
620 TAILQ_REMOVE(&sigev_actq, sn, sn_actq);
621 sn->sn_flags &= ~SNF_ACTQ;
622 sn->sn_flags |= SNF_WORKING;
623 __sigev_list_unlock();
624
625 worker->sw_sn = sn;
626 pthread_cleanup_push(worker_cleanup, worker);
627 sn->sn_dispatch(sn);
628 pthread_cleanup_pop(0);
629 worker->sw_sn = NULL;
630
631 __sigev_list_lock();
632 sn->sn_flags &= ~SNF_WORKING;
633 if (sn->sn_flags & SNF_REMOVED)
634 __sigev_free(sn);
635 else if (sn->sn_flags & SNF_ONESHOT)
636 __sigev_delete_node(sn);
637 } else {
638 LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link);
639 worker->sw_flags |= SWF_READYQ;
640 clock_gettime(CLOCK_REALTIME, &ts);
641 ts.tv_sec += SIGEV_WORKER_IDLE;
642 ret = _pthread_cond_timedwait(&worker->sw_cv,
643 sigev_list_mtx, &ts);
644 if (ret == ETIMEDOUT) {
645 /*
646 * If we were timeouted and there is nothing
647 * to do, exit the thread.
648 */
649 if (TAILQ_EMPTY(&sigev_actq) &&
650 (worker->sw_flags & SWF_READYQ) &&
651 sigev_worker_count > sigev_worker_low)
652 goto out;
653 }
654 }
655 }
656out:
657 if (worker->sw_flags & SWF_READYQ) {
658 LIST_REMOVE(worker, sw_link);
659 worker->sw_flags &= ~SWF_READYQ;
660 }
661 sigev_worker_count--;
662 __sigev_list_unlock();
663 _pthread_cond_destroy(&worker->sw_cv);
664 free(worker);
665 return (0);
666}
667
447 return (0);
448}
449
450/* clean up a notification after dispatch. */
668static void
669worker_cleanup(void *arg)
670{
451static void
452worker_cleanup(void *arg)
453{
671 struct sigev_worker *worker = arg;
672 struct sigev_node *sn;
454 struct sigev_node *sn = arg;
673
455
674 sn = worker->sw_sn;
675 __sigev_list_lock();
456 __sigev_list_lock();
676 sn->sn_flags &= ~SNF_WORKING;
457 if (sn->sn_flags & SNF_SYNC) {
458 sn->sn_tn->tn_cur = NULL;
459 _pthread_cond_broadcast(&sn->sn_tn->tn_cv);
460 }
677 if (sn->sn_flags & SNF_REMOVED)
678 __sigev_free(sn);
461 if (sn->sn_flags & SNF_REMOVED)
462 __sigev_free(sn);
679 else if (sn->sn_flags & SNF_ONESHOT)
680 __sigev_delete_node(sn);
681 sigev_worker_count--;
463 else
464 sn->sn_flags &= ~SNF_WORKING;
682 __sigev_list_unlock();
465 __sigev_list_unlock();
683 _pthread_cond_destroy(&worker->sw_cv);
684 free(worker);
685}
466}