Deleted Added
full compact
26c26
< * $FreeBSD: head/lib/librt/sigev_thread.c 156192 2006-03-01 23:38:53Z davidxu $
---
> * $FreeBSD: head/lib/librt/sigev_thread.c 156267 2006-03-04 00:18:19Z davidxu $
33a34
> #include <assert.h>
34a36
> #include <errno.h>
36a39
> #include <sys/time.h>
45a49,67
> /* Lowest number of worker threads should be kept. */
> #define SIGEV_WORKER_LOW 0
>
> /* Highest number of worker threads can be created. */
> #define SIGEV_WORKER_HIGH 20
>
> /* How long an idle worker thread should stay. */
> #define SIGEV_WORKER_IDLE 10
>
> struct sigev_worker {
> LIST_ENTRY(sigev_worker) sw_link;
> pthread_cond_t sw_cv;
> struct sigev_node *sw_sn;
> int sw_flags;
> int *sw_readyptr;
> };
>
> #define SWF_READYQ 1
>
48a71
>
50a74,75
> static TAILQ_HEAD(, sigev_node) sigev_actq;
> static TAILQ_HEAD(, sigev_thread_node) sigev_threads;
53c78,79
< static TAILQ_HEAD(,sigev_thread_node) sigev_threads;
---
> static pthread_once_t sigev_once = PTHREAD_ONCE_INIT;
> static pthread_once_t sigev_once_default = PTHREAD_ONCE_INIT;
54a81,84
> static pthread_cond_t *sigev_threads_cv;
> static pthread_cond_t *sigev_actq_cv;
> static struct sigev_thread_node *sigev_default_thread;
> static struct sigev_thread_attr sigev_default_sna;
56c86,92
< static pthread_once_t sigev_once = PTHREAD_ONCE_INIT;
---
> static int atfork_registered;
> static LIST_HEAD(,sigev_worker) sigev_worker_ready;
> static int sigev_worker_count;
> static int sigev_worker_start;
> static int sigev_worker_high;
> static int sigev_worker_low;
> static pthread_cond_t *sigev_worker_init_cv;
61c97,98
< static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *);
---
> static struct sigev_thread_node *sigev_thread_create(pthread_attr_t *,
> struct sigev_thread_node *, int);
62a100,102
> static void *sigev_worker_routine(void *);
> static void sigev_put(struct sigev_node *);
> static void worker_cleanup(void *arg);
90d129
< pthread_attr_getscope(attr, &sna->sna_scope);
105c144
< return (pthread_create != NULL);
---
> return (&pthread_create != NULL);
111c150
< static int notfirst = 0;
---
> static int inited = 0;
117a157,160
> sigev_threads_cv = malloc(sizeof(pthread_cond_t));
> _pthread_cond_init(sigev_threads_cv, NULL);
> sigev_actq_cv = malloc(sizeof(pthread_cond_t));
> _pthread_cond_init(sigev_actq_cv, NULL);
122c165,181
< if (!notfirst) {
---
> TAILQ_INIT(&sigev_actq);
> sigev_default_thread = NULL;
> sigev_worker_count = 0;
> sigev_worker_start = 0;
> LIST_INIT(&sigev_worker_ready);
> sigev_worker_high = SIGEV_WORKER_HIGH;
> sigev_worker_low = SIGEV_WORKER_LOW;
> sigev_worker_init_cv = malloc(sizeof(pthread_cond_t));
> _pthread_cond_init(sigev_worker_init_cv, NULL);
> if (atfork_registered == 0) {
> pthread_atfork(
> __sigev_fork_prepare,
> __sigev_fork_parent,
> __sigev_fork_child);
> atfork_registered = 1;
> }
> if (!inited) {
124c183,185
< pthread_attr_setscope(&sigev_default_attr, PTHREAD_SCOPE_SYSTEM);
---
> attr2sna(&sigev_default_attr, &sigev_default_sna);
> pthread_attr_setscope(&sigev_default_attr,
> PTHREAD_SCOPE_SYSTEM);
127,129c188
< pthread_atfork(__sigev_fork_prepare, __sigev_fork_parent,
< __sigev_fork_child);
< notfirst = 1;
---
> inited = 1;
130a190
> sigev_default_thread = sigev_thread_create(NULL, NULL, 0);
140c200
< return (0);
---
> return (sigev_default_thread != NULL) ? 0 : -1;
143c203
< void
---
> static void
146d205
< __sigev_thread_list_lock();
149c208
< void
---
> static void
152d210
< __sigev_thread_list_unlock();
155c213
< void
---
> static void
157a216,222
> /*
> * This is a hack, the thread libraries really should
> * check if the handlers were already registered in
> * pthread_atfork().
> */
> atfork_registered = 1;
> memcpy(&sigev_once, &sigev_once_default, sizeof(sigev_once));
186c251,252
< __sigev_alloc(int type, const struct sigevent *evp)
---
> __sigev_alloc(int type, const struct sigevent *evp, struct sigev_node *prev,
> int usethreadpool)
196c262,265
< sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes);
---
> if (usethreadpool)
> sn->sn_flags |= SNF_THREADPOOL;
> sn->sn_tn = sigev_thread_create(evp->sigev_notify_attributes,
> prev ? prev->sn_tn : NULL, usethreadpool);
264a334,338
> __sigev_thread_list_lock();
> if (--sn->sn_tn->tn_refcount == 0)
> if (!(sn->sn_flags & SNF_THREADPOOL))
> pthread_kill(sn->sn_tn->tn_thread, SIGEV_SIGSERVICE);
> __sigev_thread_list_unlock();
267c341,344
< else
---
> else {
> if (sn->sn_flags & SNF_ACTQ) {
> TAILQ_REMOVE(&sigev_actq, sn, sn_actq);
> }
268a346
> }
283,285d360
< default:
< warnx("%s %s : unknown si_code %d\n", __FILE__, __func__,
< si->si_code);
291c366,367
< sigev_thread_create(pthread_attr_t *pattr)
---
> sigev_thread_create(pthread_attr_t *pattr, struct sigev_thread_node *prev,
> int usepool)
308,317c384,401
< /* Search a thread matching the required pthread_attr. */
< TAILQ_FOREACH(tn, &sigev_threads, tn_link) {
< if (sna.sna_stackaddr == NULL) {
< if (sna_eq(&tn->tn_sna, &sna))
< break;
< } else {
< /*
< * Reuse the thread if it has same stack address,
< * because two threads can not run on same stack.
< */
---
>
> if (prev != NULL && sna_eq(&prev->tn_sna, &sna)) {
> prev->tn_refcount++;
> __sigev_thread_list_unlock();
> return (prev);
> }
>
> if (sna_eq(&sna, &sigev_default_sna) && usepool &&
> sigev_default_thread != NULL) {
> sigev_default_thread->tn_refcount++;
> __sigev_thread_list_unlock();
> return (sigev_default_thread);
> }
>
> tn = NULL;
> /* Search a thread matching the required stack address */
> if (sna.sna_stackaddr != NULL) {
> TAILQ_FOREACH(tn, &sigev_threads, tn_link) {
321a406
>
322a408
> tn->tn_refcount++;
328a415,416
> tn->tn_lwpid = -1;
> tn->tn_refcount = 1;
333d420
< _pthread_cond_init(&tn->tn_cv, NULL);
339d425
< _pthread_cond_destroy(&tn->tn_cv);
344c430,431
< _pthread_cond_wait(&tn->tn_cv, sigev_threads_mtx);
---
> while (tn->tn_lwpid == -1)
> _pthread_cond_wait(sigev_threads_cv, sigev_threads_mtx);
364a452
> tn->tn_cur = NULL;
382a471,475
> /*
> * Main notification dispatch function, the function either
> * run user callback by itself or hand off the notifications
> * to worker threads depend on flags.
> */
390a484
> int ret;
394,396c488,490
< __sigev_list_lock();
< _pthread_cond_broadcast(&tn->tn_cv);
< __sigev_list_unlock();
---
> __sigev_thread_list_lock();
> _pthread_cond_broadcast(sigev_threads_cv);
> __sigev_thread_list_unlock();
408c502,511
< if (__predict_false(sigwaitinfo(&set, &si) == -1))
---
> ret = sigwaitinfo(&set, &si);
> __sigev_thread_list_lock();
> if (tn->tn_refcount == 0) {
> TAILQ_REMOVE(&sigev_threads, tn, tn_link);
> __sigev_thread_list_unlock();
> free(tn);
> break;
> }
> __sigev_thread_list_unlock();
> if (ret == -1)
410d512
<
415,420c517,528
< tn->tn_cur = sn;
< sn->sn_flags |= SNF_WORKING;
< __sigev_list_unlock();
< sn->sn_dispatch(sn, &si);
< after_dispatch(tn);
< } else {
---
> sn->sn_info = si;
> if (!(sn->sn_flags & SNF_THREADPOOL)) {
> tn->tn_cur = sn;
> sn->sn_flags |= SNF_WORKING;
> __sigev_list_unlock();
> sn->sn_dispatch(sn);
> after_dispatch(tn);
> } else {
> assert(!(sn->sn_flags & SNF_ACTQ));
> sigev_put(sn);
> }
> } else {
427a536,685
>
> /*
> * Hand off notifications to worker threads.
> *
> * prerequist: sigev list locked.
> */
> static void
> sigev_put(struct sigev_node *sn)
> {
> struct sigev_worker *worker;
> pthread_t td;
> int ret, ready;
>
> TAILQ_INSERT_TAIL(&sigev_actq, sn, sn_actq);
> sn->sn_flags |= SNF_ACTQ;
> /*
> * check if we should add more worker threads unless quota is hit.
> */
> if (LIST_EMPTY(&sigev_worker_ready) &&
> sigev_worker_count + sigev_worker_start < sigev_worker_high) {
> sigev_worker_start++;
> __sigev_list_unlock();
> worker = malloc(sizeof(*worker));
> _pthread_cond_init(&worker->sw_cv, NULL);
> worker->sw_flags = 0;
> worker->sw_sn = 0;
> worker->sw_readyptr = &ready;
> ready = 0;
> ret = pthread_create(&td, &sigev_default_attr,
> sigev_worker_routine, worker);
> if (ret) {
> warnc(ret, "%s:%s can not create worker thread",
> __FILE__, __func__);
> __sigev_list_lock();
> sigev_worker_start--;
> __sigev_list_unlock();
> } else {
> __sigev_list_lock();
> while (ready == 0) {
> _pthread_cond_wait(sigev_worker_init_cv,
> sigev_list_mtx);
> }
> __sigev_list_unlock();
> }
> } else {
> worker = LIST_FIRST(&sigev_worker_ready);
> if (worker) {
> LIST_REMOVE(worker, sw_link);
> worker->sw_flags &= ~SWF_READYQ;
> _pthread_cond_broadcast(&worker->sw_cv);
> __sigev_list_unlock();
> }
> }
> }
>
> /*
> * Background thread to dispatch notification to user code.
> * These threads are not bound to any realtime objects.
> */
> static void *
> sigev_worker_routine(void *arg)
> {
> struct sigev_worker *worker;
> struct sigev_node *sn;
> struct timespec ts;
> int ret;
>
> worker = arg;
> __sigev_list_lock();
> sigev_worker_count++;
> sigev_worker_start--;
> (*(worker->sw_readyptr))++;
> LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link);
> worker->sw_flags |= SWF_READYQ;
> _pthread_cond_broadcast(sigev_worker_init_cv);
>
> for (;;) {
> if (worker->sw_flags & SWF_READYQ) {
> LIST_REMOVE(worker, sw_link);
> worker->sw_flags &= ~SWF_READYQ;
> }
>
> sn = TAILQ_FIRST(&sigev_actq);
> if (sn != NULL) {
> TAILQ_REMOVE(&sigev_actq, sn, sn_actq);
> sn->sn_flags &= ~SNF_ACTQ;
> sn->sn_flags |= SNF_WORKING;
> __sigev_list_unlock();
>
> worker->sw_sn = sn;
> pthread_cleanup_push(worker_cleanup, worker);
> sn->sn_dispatch(sn);
> pthread_cleanup_pop(0);
> worker->sw_sn = NULL;
>
> __sigev_list_lock();
> sn->sn_flags &= ~SNF_WORKING;
> if (sn->sn_flags & SNF_REMOVED)
> __sigev_free(sn);
> else if (sn->sn_flags & SNF_ONESHOT)
> __sigev_delete_node(sn);
> } else {
> LIST_INSERT_HEAD(&sigev_worker_ready, worker, sw_link);
> worker->sw_flags |= SWF_READYQ;
> clock_gettime(CLOCK_REALTIME, &ts);
> ts.tv_sec += SIGEV_WORKER_IDLE;
> ret = _pthread_cond_timedwait(&worker->sw_cv,
> sigev_list_mtx, &ts);
> if (ret == ETIMEDOUT) {
> /*
> * If we were timeouted and there is nothing
> * to do, exit the thread.
> */
> if (TAILQ_EMPTY(&sigev_actq) &&
> (worker->sw_flags & SWF_READYQ) &&
> sigev_worker_count > sigev_worker_low)
> goto out;
> }
> }
> }
> out:
> if (worker->sw_flags & SWF_READYQ) {
> LIST_REMOVE(worker, sw_link);
> worker->sw_flags &= ~SWF_READYQ;
> }
> sigev_worker_count--;
> __sigev_list_unlock();
> _pthread_cond_destroy(&worker->sw_cv);
> free(worker);
> return (0);
> }
>
> static void
> worker_cleanup(void *arg)
> {
> struct sigev_worker *worker = arg;
> struct sigev_node *sn;
>
> sn = worker->sw_sn;
> __sigev_list_lock();
> sn->sn_flags &= ~SNF_WORKING;
> if (sn->sn_flags & SNF_REMOVED)
> __sigev_free(sn);
> else if (sn->sn_flags & SNF_ONESHOT)
> __sigev_delete_node(sn);
> sigev_worker_count--;
> __sigev_list_unlock();
> _pthread_cond_destroy(&worker->sw_cv);
> free(worker);
> }