Deleted Added
full compact
26c26
< * $FreeBSD: head/lib/librt/sigev_thread.c 156267 2006-03-04 00:18:19Z davidxu $
---
> * $FreeBSD: head/lib/librt/sigev_thread.c 156383 2006-03-07 08:28:07Z davidxu $
34d33
< #include <assert.h>
39d37
< #include <sys/time.h>
49,67d46
< /* Lowest number of worker threads should be kept. */
< #define SIGEV_WORKER_LOW 0
<
< /* Highest number of worker threads can be created. */
< #define SIGEV_WORKER_HIGH 20
<
< /* How long an idle worker thread should stay. */
< #define SIGEV_WORKER_IDLE 10
<
< struct sigev_worker {
< LIST_ENTRY(sigev_worker) sw_link;
< pthread_cond_t sw_cv;
< struct sigev_node *sw_sn;
< int sw_flags;
< int *sw_readyptr;
< };
<
< #define SWF_READYQ 1
<
74,75c53
< static TAILQ_HEAD(, sigev_node) sigev_actq;
< static TAILQ_HEAD(, sigev_thread_node) sigev_threads;
---
> static LIST_HEAD(,sigev_thread) sigev_threads;
80,84c58
< static pthread_mutex_t *sigev_threads_mtx;
< static pthread_cond_t *sigev_threads_cv;
< static pthread_cond_t *sigev_actq_cv;
< static struct sigev_thread_node *sigev_default_thread;
< static struct sigev_thread_attr sigev_default_sna;
---
> static struct sigev_thread *sigev_default_thread;
87,92d60
< static LIST_HEAD(,sigev_worker) sigev_worker_ready;
< static int sigev_worker_count;
< static int sigev_worker_start;
< static int sigev_worker_high;
< static int sigev_worker_low;
< static pthread_cond_t *sigev_worker_init_cv;
94,102c62,68
< static void __sigev_fork_prepare(void);
< static void __sigev_fork_parent(void);
< static void __sigev_fork_child(void);
< static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *,
< struct sigev_thread_node *, int);
< static void *sigev_service_loop(void *);
< static void *sigev_worker_routine(void *);
< static void sigev_put(struct sigev_node *);
< static void worker_cleanup(void *arg);
---
> static void __sigev_fork_prepare(void);
> static void __sigev_fork_parent(void);
> static void __sigev_fork_child(void);
> static struct sigev_thread *sigev_thread_create(int);
> static void *sigev_service_loop(void *);
> static void *worker_routine(void *);
> static void worker_cleanup(void *);
105,119d70
< #pragma weak pthread_attr_getschedpolicy
< #pragma weak pthread_attr_getinheritsched
< #pragma weak pthread_attr_getschedparam
< #pragma weak pthread_attr_getscope
< #pragma weak pthread_attr_getstacksize
< #pragma weak pthread_attr_getstackaddr
< #pragma weak pthread_attr_getguardsize
< #pragma weak pthread_attr_init
< #pragma weak pthread_attr_setscope
< #pragma weak pthread_attr_setdetachstate
< #pragma weak pthread_atfork
< #pragma weak _pthread_once
< #pragma weak pthread_cleanup_push
< #pragma weak pthread_cleanup_pop
< #pragma weak pthread_setcancelstate
121,122c72,73
< static __inline void
< attr2sna(pthread_attr_t *attr, struct sigev_thread_attr *sna)
---
> static void
> attrcopy(pthread_attr_t *src, pthread_attr_t *dst)
124c75,78
< struct sched_param sched_param;
---
> struct sched_param sched;
> void *a;
> size_t u;
> int v;
126,133c80,81
< pthread_attr_getschedpolicy(attr, &sna->sna_policy);
< pthread_attr_getinheritsched(attr, &sna->sna_inherit);
< pthread_attr_getschedparam(attr, &sched_param);
< sna->sna_prio = sched_param.sched_priority;
< pthread_attr_getstacksize(attr, &sna->sna_stacksize);
< pthread_attr_getstackaddr(attr, &sna->sna_stackaddr);
< pthread_attr_getguardsize(attr, &sna->sna_guardsize);
< }
---
> _pthread_attr_getschedpolicy(src, &v);
> _pthread_attr_setschedpolicy(dst, v);
135,138c83,99
< static __inline int
< sna_eq(const struct sigev_thread_attr *a, const struct sigev_thread_attr *b)
< {
< return memcmp(a, b, sizeof(*a)) == 0;
---
> _pthread_attr_getinheritsched(src, &v);
> _pthread_attr_setinheritsched(dst, v);
>
> _pthread_attr_getschedparam(src, &sched);
> _pthread_attr_setschedparam(dst, &sched);
>
> _pthread_attr_getscope(src, &v);
> _pthread_attr_setscope(dst, v);
>
> _pthread_attr_getstacksize(src, &u);
> _pthread_attr_setstacksize(dst, u);
>
> _pthread_attr_getstackaddr(src, &a);
> _pthread_attr_setstackaddr(src, a);
>
> _pthread_attr_getguardsize(src, &u);
> _pthread_attr_setguardsize(dst, u);
150a112
> pthread_mutexattr_t mattr;
152a115,116
> _pthread_mutexattr_init(&mattr);
> _pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_NORMAL);
154,160c118,120
< _pthread_mutex_init(sigev_list_mtx, NULL);
< sigev_threads_mtx = malloc(sizeof(pthread_mutex_t));
< _pthread_mutex_init(sigev_threads_mtx, NULL);
< sigev_threads_cv = malloc(sizeof(pthread_cond_t));
< _pthread_cond_init(sigev_threads_cv, NULL);
< sigev_actq_cv = malloc(sizeof(pthread_cond_t));
< _pthread_cond_init(sigev_actq_cv, NULL);
---
> _pthread_mutex_init(sigev_list_mtx, &mattr);
> _pthread_mutexattr_destroy(&mattr);
>
164,165c124
< TAILQ_INIT(&sigev_threads);
< TAILQ_INIT(&sigev_actq);
---
> LIST_INIT(&sigev_threads);
167,173d125
< sigev_worker_count = 0;
< sigev_worker_start = 0;
< LIST_INIT(&sigev_worker_ready);
< sigev_worker_high = SIGEV_WORKER_HIGH;
< sigev_worker_low = SIGEV_WORKER_LOW;
< sigev_worker_init_cv = malloc(sizeof(pthread_cond_t));
< _pthread_cond_init(sigev_worker_init_cv, NULL);
175c127
< pthread_atfork(
---
> _pthread_atfork(
182,184c134,135
< pthread_attr_init(&sigev_default_attr);
< attr2sna(&sigev_default_attr, &sigev_default_sna);
< pthread_attr_setscope(&sigev_default_attr,
---
> _pthread_attr_init(&sigev_default_attr);
> _pthread_attr_setscope(&sigev_default_attr,
186c137
< pthread_attr_setdetachstate(&sigev_default_attr,
---
> _pthread_attr_setdetachstate(&sigev_default_attr,
190c141
< sigev_default_thread = sigev_thread_create(NULL, NULL, 0);
---
> sigev_default_thread = sigev_thread_create(0);
226c177
< int
---
> void
229c180
< return _pthread_mutex_lock(sigev_list_mtx);
---
> _pthread_mutex_lock(sigev_list_mtx);
232c183
< int
---
> void
235c186
< return _pthread_mutex_unlock(sigev_list_mtx);
---
> _pthread_mutex_unlock(sigev_list_mtx);
238,249d188
< int
< __sigev_thread_list_lock(void)
< {
< return _pthread_mutex_lock(sigev_threads_mtx);
< }
<
< int
< __sigev_thread_list_unlock(void)
< {
< return _pthread_mutex_unlock(sigev_threads_mtx);
< }
<
252c191
< int usethreadpool)
---
> int usedefault)
259,268c198,216
< sn->sn_func = evp->sigev_notify_function;
< sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1);
< sn->sn_type = type;
< if (usethreadpool)
< sn->sn_flags |= SNF_THREADPOOL;
< sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes,
< prev ? prev->sn_tn : NULL, usethreadpool);
< if (sn->sn_tn == NULL) {
< free(sn);
< sn = NULL;
---
> sn->sn_func = evp->sigev_notify_function;
> sn->sn_gen = atomic_fetchadd_int(&sigev_generation, 1);
> sn->sn_type = type;
> _pthread_attr_init(&sn->sn_attr);
> _pthread_attr_setdetachstate(&sn->sn_attr, PTHREAD_CREATE_DETACHED);
> if (evp->sigev_notify_attributes)
> attrcopy(evp->sigev_notify_attributes, &sn->sn_attr);
> if (prev) {
> __sigev_list_lock();
> prev->sn_tn->tn_refcount++;
> __sigev_list_unlock();
> sn->sn_tn = prev->sn_tn;
> } else {
> sn->sn_tn = sigev_thread_create(usedefault);
> if (sn->sn_tn == NULL) {
> _pthread_attr_destroy(&sn->sn_attr);
> free(sn);
> sn = NULL;
> }
279c227
< * Build a new sigevent, and tell kernel to deliver SIGEV_SIGSERVICE
---
> * Build a new sigevent, and tell kernel to deliver SIGSERVICE
283c231
< newevp->sigev_signo = SIGEV_SIGSERVICE;
---
> newevp->sigev_signo = SIGSERVICE;
290a239
> _pthread_attr_destroy(&sn->sn_attr);
313d261
< LIST_INSERT_HEAD(&sigev_all, sn, sn_allist);
332d279
< LIST_REMOVE(sn, sn_allist);
334d280
< __sigev_thread_list_lock();
336,338c282
< if (!(sn->sn_flags & SNF_THREADPOOL))
< pthread_kill(sn->sn_tn->tn_thread, SIGEV_SIGSERVICE);
< __sigev_thread_list_unlock();
---
> _pthread_kill(sn->sn_tn->tn_thread, SIGSERVICE);
341,344c285
< else {
< if (sn->sn_flags & SNF_ACTQ) {
< TAILQ_REMOVE(&sigev_actq, sn, sn_actq);
< }
---
> else
346d286
< }
350,351c290
< static
< sigev_id_t
---
> static sigev_id_t
365,367c304,305
< static struct sigev_thread_node *
< sigev_thread_create(pthread_attr_t *pattr, struct sigev_thread_node *prev,
< int usepool)
---
> static struct sigev_thread *
> sigev_thread_create(int usedefault)
369,370c307
< struct sigev_thread_node *tn;
< struct sigev_thread_attr sna;
---
> struct sigev_thread *tn;
374,392c311,312
< if (pattr == NULL)
< pattr = &sigev_default_attr;
< else {
< pthread_attr_setscope(pattr, PTHREAD_SCOPE_SYSTEM);
< pthread_attr_setdetachstate(pattr, PTHREAD_CREATE_DETACHED);
< }
<
< attr2sna(pattr, &sna);
<
< __sigev_thread_list_lock();
<
< if (prev != NULL && sna_eq(&prev->tn_sna, &sna)) {
< prev->tn_refcount++;
< __sigev_thread_list_unlock();
< return (prev);
< }
<
< if (sna_eq(&sna, &sigev_default_sna) && usepool &&
< sigev_default_thread != NULL) {
---
> if (usedefault && sigev_default_thread) {
> __sigev_list_lock();
394,395c314,315
< __sigev_thread_list_unlock();
< return (sigev_default_thread);
---
> __sigev_list_unlock();
> return (sigev_default_thread);
398,411d317
< tn = NULL;
< /* Search a thread matching the required stack address */
< if (sna.sna_stackaddr != NULL) {
< TAILQ_FOREACH(tn, &sigev_threads, tn_link) {
< if (sna.sna_stackaddr == tn->tn_sna.sna_stackaddr)
< break;
< }
< }
<
< if (tn != NULL) {
< tn->tn_refcount++;
< __sigev_thread_list_unlock();
< return (tn);
< }
413d318
< tn->tn_sna = sna;
417c322,328
< TAILQ_INSERT_TAIL(&sigev_threads, tn, tn_link);
---
> _pthread_cond_init(&tn->tn_cv, NULL);
>
> /* for debug */
> __sigev_list_lock();
> LIST_INSERT_HEAD(&sigev_threads, tn, tn_link);
> __sigev_list_unlock();
>
419c330,331
< sigaddset(&set, SIGEV_SIGSERVICE);
---
> sigaddset(&set, SIGSERVICE);
>
421c333,334
< ret = pthread_create(&tn->tn_thread, pattr, sigev_service_loop, tn);
---
> ret = pthread_create(&tn->tn_thread, &sigev_default_attr,
> sigev_service_loop, tn);
422a336
>
424,425c338,340
< TAILQ_REMOVE(&sigev_threads, tn, tn_link);
< __sigev_thread_list_unlock();
---
> __sigev_list_lock();
> LIST_REMOVE(tn, tn_link);
> __sigev_list_unlock();
430,435d344
< while (tn->tn_lwpid == -1)
< _pthread_cond_wait(sigev_threads_cv, sigev_threads_mtx);
< __sigev_thread_list_unlock();
< }
< return (tn);
< }
437,442d345
< static void
< after_dispatch(struct sigev_thread_node *tn)
< {
< struct sigev_node *sn;
<
< if ((sn = tn->tn_cur) != NULL) {
444,449c347,348
< sn->sn_flags &= ~SNF_WORKING;
< if (sn->sn_flags & SNF_REMOVED)
< __sigev_free(sn);
< else if (sn->sn_flags & SNF_ONESHOT)
< __sigev_delete_node(sn);
< tn->tn_cur = NULL;
---
> while (tn->tn_lwpid == -1)
> _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx);
452c351
< tn->tn_cur = NULL;
---
> return (tn);
456,457c355,356
< * This function is called if user callback calls
< * pthread_exit() or pthread_cancel() for the thread.
---
> * The thread receives notification from kernel and creates
> * a thread to call user callback function.
459,475d357
< static void
< thread_cleanup(void *arg)
< {
< struct sigev_thread_node *tn = arg;
<
< fprintf(stderr, "Dangerous Robinson, calling pthread_exit() from "
< "SIGEV_THREAD is undefined.");
< after_dispatch(tn);
< /* longjmp(tn->tn_jbuf, 1); */
< abort();
< }
<
< /*
< * Main notification dispatch function, the function either
< * run user callback by itself or hand off the notifications
< * to worker threads depend on flags.
< */
478a361,362
> static int failure;
>
481c365
< struct sigev_thread_node *tn;
---
> struct sigev_thread *tn;
483a368
> pthread_t td;
488,490c373,375
< __sigev_thread_list_lock();
< _pthread_cond_broadcast(sigev_threads_cv);
< __sigev_thread_list_unlock();
---
> __sigev_list_lock();
> _pthread_cond_broadcast(&tn->tn_cv);
> __sigev_list_unlock();
496,497d380
< setjmp(tn->tn_jbuf);
< pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
499,500c382
< sigaddset(&set, SIGEV_SIGSERVICE);
< pthread_cleanup_push(thread_cleanup, tn);
---
> sigaddset(&set, SIGSERVICE);
503c385,386
< __sigev_thread_list_lock();
---
>
> __sigev_list_lock();
505,506c388,389
< TAILQ_REMOVE(&sigev_threads, tn, tn_link);
< __sigev_thread_list_unlock();
---
> LIST_REMOVE(tn, tn_link);
> __sigev_list_unlock();
510,511c393,395
< __sigev_thread_list_unlock();
< if (ret == -1)
---
>
> if (ret == -1) {
> __sigev_list_unlock();
512a397,398
> }
>
514d399
< __sigev_list_lock();
516,529c401
< if (sn != NULL) {
< sn->sn_info = si;
< if (!(sn->sn_flags & SNF_THREADPOOL)) {
< tn->tn_cur = sn;
< sn->sn_flags |= SNF_WORKING;
< __sigev_list_unlock();
< sn->sn_dispatch(sn);
< after_dispatch(tn);
< } else {
< assert(!(sn->sn_flags & SNF_ACTQ));
< sigev_put(sn);
< }
< } else {
< tn->tn_cur = NULL;
---
> if (sn == NULL) {
530a403
> continue;
532,535c405,412
< }
< pthread_cleanup_pop(0);
< return (0);
< }
---
>
> sn->sn_info = si;
> if (sn->sn_flags & SNF_SYNC)
> tn->tn_cur = sn;
> else
> tn->tn_cur = NULL;
> sn->sn_flags |= SNF_WORKING;
> __sigev_list_unlock();
537,547c414,418
< /*
< * Hand off notifications to worker threads.
< *
< * prerequist: sigev list locked.
< */
< static void
< sigev_put(struct sigev_node *sn)
< {
< struct sigev_worker *worker;
< pthread_t td;
< int ret, ready;
---
> ret = pthread_create(&td, &sn->sn_attr, worker_routine, sn);
> if (ret != 0) {
> if (failure++ < 5)
> warnc(ret, "%s:%s failed to create thread.\n",
> __FILE__, __func__);
549,568d419
< TAILQ_INSERT_TAIL(&sigev_actq, sn, sn_actq);
< sn->sn_flags |= SNF_ACTQ;
< /*
< * check if we should add more worker threads unless quota is hit.
< */
< if (LIST_EMPTY(&sigev_worker_ready) &&
< sigev_worker_count + sigev_worker_start < sigev_worker_high) {
< sigev_worker_start++;
< __sigev_list_unlock();
< worker = malloc(sizeof(*worker));
< _pthread_cond_init(&worker->sw_cv, NULL);
< worker->sw_flags = 0;
< worker->sw_sn = 0;
< worker->sw_readyptr = &ready;
< ready = 0;
< ret = pthread_create(&td, &sigev_default_attr,
< sigev_worker_routine, worker);
< if (ret) {
< warnc(ret, "%s:%s can not create worker thread",
< __FILE__, __func__);
570c421,423
< sigev_worker_start--;
---
> sn->sn_flags &= ~SNF_WORKING;
> if (sn->sn_flags & SNF_REMOVED)
> __sigev_free(sn);
572c425
< } else {
---
> } else if (tn->tn_cur) {
574,577c427,428
< while (ready == 0) {
< _pthread_cond_wait(sigev_worker_init_cv,
< sigev_list_mtx);
< }
---
> while (tn->tn_cur)
> _pthread_cond_wait(&tn->tn_cv, sigev_list_mtx);
580,587d430
< } else {
< worker = LIST_FIRST(&sigev_worker_ready);
< if (worker) {
< LIST_REMOVE(worker, sw_link);
< worker->sw_flags &= ~SWF_READYQ;
< _pthread_cond_broadcast(&worker->sw_cv);
< __sigev_list_unlock();
< }
588a432
> return (0);
592,593c436
< * Background thread to dispatch notification to user code.
< * These threads are not bound to any realtime objects.
---
> * newly created worker thread to call user callback function.
596c439
< sigev_worker_routine(void *arg)
---
> worker_routine(void *arg)
598,601c441
< struct sigev_worker *worker;
< struct sigev_node *sn;
< struct timespec ts;
< int ret;
---
> struct sigev_node *sn = arg;
603,610c443,445
< worker = arg;
< __sigev_list_lock();
< sigev_worker_count++;
< sigev_worker_start--;
< (*(worker->sw_readyptr))++;
< LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link);
< worker->sw_flags |= SWF_READYQ;
< _pthread_cond_broadcast(sigev_worker_init_cv);
---
> _pthread_cleanup_push(worker_cleanup, sn);
> sn->sn_dispatch(sn);
> _pthread_cleanup_pop(1);
612,664d446
< for (;;) {
< if (worker->sw_flags & SWF_READYQ) {
< LIST_REMOVE(worker, sw_link);
< worker->sw_flags &= ~SWF_READYQ;
< }
<
< sn = TAILQ_FIRST(&sigev_actq);
< if (sn != NULL) {
< TAILQ_REMOVE(&sigev_actq, sn, sn_actq);
< sn->sn_flags &= ~SNF_ACTQ;
< sn->sn_flags |= SNF_WORKING;
< __sigev_list_unlock();
<
< worker->sw_sn = sn;
< pthread_cleanup_push(worker_cleanup, worker);
< sn->sn_dispatch(sn);
< pthread_cleanup_pop(0);
< worker->sw_sn = NULL;
<
< __sigev_list_lock();
< sn->sn_flags &= ~SNF_WORKING;
< if (sn->sn_flags & SNF_REMOVED)
< __sigev_free(sn);
< else if (sn->sn_flags & SNF_ONESHOT)
< __sigev_delete_node(sn);
< } else {
< LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link);
< worker->sw_flags |= SWF_READYQ;
< clock_gettime(CLOCK_REALTIME, &ts);
< ts.tv_sec += SIGEV_WORKER_IDLE;
< ret = _pthread_cond_timedwait(&worker->sw_cv,
< sigev_list_mtx, &ts);
< if (ret == ETIMEDOUT) {
< /*
< * If we were timeouted and there is nothing
< * to do, exit the thread.
< */
< if (TAILQ_EMPTY(&sigev_actq) &&
< (worker->sw_flags & SWF_READYQ) &&
< sigev_worker_count > sigev_worker_low)
< goto out;
< }
< }
< }
< out:
< if (worker->sw_flags & SWF_READYQ) {
< LIST_REMOVE(worker, sw_link);
< worker->sw_flags &= ~SWF_READYQ;
< }
< sigev_worker_count--;
< __sigev_list_unlock();
< _pthread_cond_destroy(&worker->sw_cv);
< free(worker);
667a450
> /* clean up a notification after dispatch. */
671,672c454
< struct sigev_worker *worker = arg;
< struct sigev_node *sn;
---
> struct sigev_node *sn = arg;
674d455
< sn = worker->sw_sn;
676c457,460
< sn->sn_flags &= ~SNF_WORKING;
---
> if (sn->sn_flags & SNF_SYNC) {
> sn->sn_tn->tn_cur = NULL;
> _pthread_cond_broadcast(&sn->sn_tn->tn_cv);
> }
679,681c463,464
< else if (sn->sn_flags & SNF_ONESHOT)
< __sigev_delete_node(sn);
< sigev_worker_count--;
---
> else
> sn->sn_flags &= ~SNF_WORKING;
683,684d465
< _pthread_cond_destroy(&worker->sw_cv);
< free(worker);