apr_thread_pool.c revision 303975
133965Sjdp/*
2104834Sobrien * Licensed to the Apache Software Foundation (ASF) under one or more
338889Sjdp * contributor license agreements.  See the NOTICE file distributed
433965Sjdp * with this work for additional information regarding copyright
533965Sjdp * ownership.  The ASF licenses this file to you under the Apache
633965Sjdp * License, Version 2.0 (the "License"); you may not use this file
733965Sjdp * except in compliance with the License.  You may obtain a copy of
833965Sjdp * the License at
933965Sjdp *
1033965Sjdp *     http://www.apache.org/licenses/LICENSE-2.0
1133965Sjdp *
1233965Sjdp * Unless required by applicable law or agreed to in writing, software
1333965Sjdp * distributed under the License is distributed on an "AS IS" BASIS,
1433965Sjdp * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
1533965Sjdp * implied.  See the License for the specific language governing
1633965Sjdp * permissions and limitations under the License.
1733965Sjdp */
1833965Sjdp
1933965Sjdp#include <assert.h>
2033965Sjdp#include "apr_thread_pool.h"
2133965Sjdp#include "apr_ring.h"
2233965Sjdp#include "apr_thread_cond.h"
2333965Sjdp#include "apr_portable.h"
2433965Sjdp
2533965Sjdp#if APR_HAS_THREADS
2633965Sjdp
2733965Sjdp#define TASK_PRIORITY_SEGS 4
2833965Sjdp#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
2933965Sjdp
3033965Sjdptypedef struct apr_thread_pool_task
3133965Sjdp{
3233965Sjdp    APR_RING_ENTRY(apr_thread_pool_task) link;
3333965Sjdp    apr_thread_start_t func;
3433965Sjdp    void *param;
3533965Sjdp    void *owner;
3677298Sobrien    union
3733965Sjdp    {
3833965Sjdp        apr_byte_t priority;
3933965Sjdp        apr_time_t time;
4033965Sjdp    } dispatch;
4133965Sjdp} apr_thread_pool_task_t;
4233965Sjdp
4333965SjdpAPR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
4433965Sjdp
4533965Sjdpstruct apr_thread_list_elt
4633965Sjdp{
4733965Sjdp    APR_RING_ENTRY(apr_thread_list_elt) link;
4833965Sjdp    apr_thread_t *thd;
4933965Sjdp    volatile void *current_owner;
5033965Sjdp    volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
5133965Sjdp};
5238889Sjdp
5333965SjdpAPR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
5433965Sjdp
5533965Sjdpstruct apr_thread_pool
5633965Sjdp{
5733965Sjdp    apr_pool_t *pool;
5833965Sjdp    volatile apr_size_t thd_max;
5960484Sobrien    volatile apr_size_t idle_max;
6033965Sjdp    volatile apr_interval_time_t idle_wait;
6133965Sjdp    volatile apr_size_t thd_cnt;
6233965Sjdp    volatile apr_size_t idle_cnt;
6333965Sjdp    volatile apr_size_t task_cnt;
6433965Sjdp    volatile apr_size_t scheduled_task_cnt;
6533965Sjdp    volatile apr_size_t threshold;
6633965Sjdp    volatile apr_size_t tasks_run;
6733965Sjdp    volatile apr_size_t tasks_high;
6833965Sjdp    volatile apr_size_t thd_high;
6933965Sjdp    volatile apr_size_t thd_timed_out;
7033965Sjdp    struct apr_thread_pool_tasks *tasks;
7133965Sjdp    struct apr_thread_pool_tasks *scheduled_tasks;
7238889Sjdp    struct apr_thread_list *busy_thds;
7338889Sjdp    struct apr_thread_list *idle_thds;
7438889Sjdp    apr_thread_mutex_t *lock;
7538889Sjdp    apr_thread_cond_t *cond;
7638889Sjdp    volatile int terminated;
7733965Sjdp    struct apr_thread_pool_tasks *recycled_tasks;
7833965Sjdp    struct apr_thread_list *recycled_thds;
7933965Sjdp    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
8033965Sjdp};
8138889Sjdp
8238889Sjdpstatic apr_status_t thread_pool_construct(apr_thread_pool_t * me,
8338889Sjdp                                          apr_size_t init_threads,
8438889Sjdp                                          apr_size_t max_threads)
8538889Sjdp{
8638889Sjdp    apr_status_t rv;
8738889Sjdp    int i;
8833965Sjdp
8933965Sjdp    me->thd_max = max_threads;
9038889Sjdp    me->idle_max = init_threads;
9138889Sjdp    me->threshold = init_threads / 2;
9238889Sjdp    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
9338889Sjdp                                 me->pool);
9438889Sjdp    if (APR_SUCCESS != rv) {
9538889Sjdp        return rv;
9638889Sjdp    }
9738889Sjdp    rv = apr_thread_cond_create(&me->cond, me->pool);
9838889Sjdp    if (APR_SUCCESS != rv) {
9938889Sjdp        apr_thread_mutex_destroy(me->lock);
10038889Sjdp        return rv;
10138889Sjdp    }
10238889Sjdp    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
10338889Sjdp    if (!me->tasks) {
10438889Sjdp        goto CATCH_ENOMEM;
10533965Sjdp    }
10633965Sjdp    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
10733965Sjdp    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
10833965Sjdp    if (!me->scheduled_tasks) {
10933965Sjdp        goto CATCH_ENOMEM;
11033965Sjdp    }
11133965Sjdp    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
11233965Sjdp    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
11333965Sjdp    if (!me->recycled_tasks) {
11438889Sjdp        goto CATCH_ENOMEM;
11538889Sjdp    }
11638889Sjdp    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
11738889Sjdp    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
11838889Sjdp    if (!me->busy_thds) {
11938889Sjdp        goto CATCH_ENOMEM;
12038889Sjdp    }
12138889Sjdp    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
12233965Sjdp    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
12333965Sjdp    if (!me->idle_thds) {
12433965Sjdp        goto CATCH_ENOMEM;
12533965Sjdp    }
12633965Sjdp    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
12733965Sjdp    me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
12833965Sjdp    if (!me->recycled_thds) {
12933965Sjdp        goto CATCH_ENOMEM;
13033965Sjdp    }
13133965Sjdp    APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
13233965Sjdp    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
13333965Sjdp    me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
13433965Sjdp    me->idle_wait = 0;
13533965Sjdp    me->terminated = 0;
13633965Sjdp    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
13733965Sjdp        me->task_idx[i] = NULL;
13833965Sjdp    }
13933965Sjdp    goto FINAL_EXIT;
14033965Sjdp  CATCH_ENOMEM:
14133965Sjdp    rv = APR_ENOMEM;
14238889Sjdp    apr_thread_mutex_destroy(me->lock);
14338889Sjdp    apr_thread_cond_destroy(me->cond);
14438889Sjdp  FINAL_EXIT:
14538889Sjdp    return rv;
14638889Sjdp}
14738889Sjdp
14838889Sjdp/*
14938889Sjdp * NOTE: This function is not thread safe by itself. Caller should hold the lock
15038889Sjdp */
15138889Sjdpstatic apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
15238889Sjdp{
15338889Sjdp    apr_thread_pool_task_t *task = NULL;
15438889Sjdp    int seg;
15538889Sjdp
15638889Sjdp    /* check for scheduled tasks */
15738889Sjdp    if (me->scheduled_task_cnt > 0) {
15838889Sjdp        task = APR_RING_FIRST(me->scheduled_tasks);
15938889Sjdp        assert(task != NULL);
16038889Sjdp        assert(task !=
16138889Sjdp               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
16233965Sjdp                                 link));
16333965Sjdp        /* if it's time */
16433965Sjdp        if (task->dispatch.time <= apr_time_now()) {
16533965Sjdp            --me->scheduled_task_cnt;
16633965Sjdp            APR_RING_REMOVE(task, link);
16733965Sjdp            return task;
16833965Sjdp        }
16933965Sjdp    }
17033965Sjdp    /* check for normal tasks if we're not returning a scheduled task */
17133965Sjdp    if (me->task_cnt == 0) {
17233965Sjdp        return NULL;
17333965Sjdp    }
17433965Sjdp
17533965Sjdp    task = APR_RING_FIRST(me->tasks);
17633965Sjdp    assert(task != NULL);
17733965Sjdp    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
17833965Sjdp    --me->task_cnt;
17933965Sjdp    seg = TASK_PRIORITY_SEG(task);
18033965Sjdp    if (task == me->task_idx[seg]) {
18133965Sjdp        me->task_idx[seg] = APR_RING_NEXT(task, link);
18233965Sjdp        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
18333965Sjdp                                                   apr_thread_pool_task, link)
18433965Sjdp            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
18533965Sjdp            me->task_idx[seg] = NULL;
18633965Sjdp        }
18733965Sjdp    }
18833965Sjdp    APR_RING_REMOVE(task, link);
18933965Sjdp    return task;
19033965Sjdp}
19133965Sjdp
19233965Sjdpstatic apr_interval_time_t waiting_time(apr_thread_pool_t * me)
19333965Sjdp{
19433965Sjdp    apr_thread_pool_task_t *task = NULL;
19560484Sobrien
19633965Sjdp    task = APR_RING_FIRST(me->scheduled_tasks);
19733965Sjdp    assert(task != NULL);
19833965Sjdp    assert(task !=
19933965Sjdp           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
20033965Sjdp                             link));
20133965Sjdp    return task->dispatch.time - apr_time_now();
20233965Sjdp}
20333965Sjdp
20433965Sjdp/*
20533965Sjdp * NOTE: This function is not thread safe by itself. Caller should hold the lock
20633965Sjdp */
20733965Sjdpstatic struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
20833965Sjdp                                           apr_thread_t * t)
20933965Sjdp{
21033965Sjdp    struct apr_thread_list_elt *elt;
21133965Sjdp
21233965Sjdp    if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
21333965Sjdp        elt = apr_pcalloc(me->pool, sizeof(*elt));
21433965Sjdp        if (NULL == elt) {
21533965Sjdp            return NULL;
21633965Sjdp        }
21733965Sjdp    }
21833965Sjdp    else {
21933965Sjdp        elt = APR_RING_FIRST(me->recycled_thds);
22033965Sjdp        APR_RING_REMOVE(elt, link);
22133965Sjdp    }
22233965Sjdp
22333965Sjdp    APR_RING_ELEM_INIT(elt, link);
22433965Sjdp    elt->thd = t;
22533965Sjdp    elt->current_owner = NULL;
22633965Sjdp    elt->state = TH_RUN;
22733965Sjdp    return elt;
22833965Sjdp}
22933965Sjdp
23033965Sjdp/*
23133965Sjdp * The worker thread function. Take a task from the queue and perform it if
23233965Sjdp * there is any. Otherwise, put itself into the idle thread list and waiting
23333965Sjdp * for signal to wake up.
23433965Sjdp * The thread terminate directly by detach and exit when it is asked to stop
23533965Sjdp * after finishing a task. Otherwise, the thread should be in idle thread list
236104834Sobrien * and should be joined.
237104834Sobrien */
238104834Sobrienstatic void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
23933965Sjdp{
24033965Sjdp    apr_thread_pool_t *me = param;
24133965Sjdp    apr_thread_pool_task_t *task = NULL;
24233965Sjdp    apr_interval_time_t wait;
24333965Sjdp    struct apr_thread_list_elt *elt;
24433965Sjdp
24533965Sjdp    apr_thread_mutex_lock(me->lock);
24633965Sjdp    elt = elt_new(me, t);
24733965Sjdp    if (!elt) {
248104834Sobrien        apr_thread_mutex_unlock(me->lock);
249104834Sobrien        apr_thread_exit(t, APR_ENOMEM);
250104834Sobrien    }
251104834Sobrien
25233965Sjdp    while (!me->terminated && elt->state != TH_STOP) {
25333965Sjdp        /* Test if not new element, it is awakened from idle */
25433965Sjdp        if (APR_RING_NEXT(elt, link) != elt) {
25533965Sjdp            --me->idle_cnt;
25633965Sjdp            APR_RING_REMOVE(elt, link);
25733965Sjdp        }
25833965Sjdp
25933965Sjdp        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
26033965Sjdp        task = pop_task(me);
26133965Sjdp        while (NULL != task && !me->terminated) {
26233965Sjdp            ++me->tasks_run;
26333965Sjdp            elt->current_owner = task->owner;
26433965Sjdp            apr_thread_mutex_unlock(me->lock);
26533965Sjdp            apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
26633965Sjdp            task->func(t, task->param);
26733965Sjdp            apr_thread_mutex_lock(me->lock);
26833965Sjdp            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
26933965Sjdp                                 apr_thread_pool_task, link);
27033965Sjdp            elt->current_owner = NULL;
27133965Sjdp            if (TH_STOP == elt->state) {
27233965Sjdp                break;
27333965Sjdp            }
27433965Sjdp            task = pop_task(me);
27533965Sjdp        }
27633965Sjdp        assert(NULL == elt->current_owner);
27733965Sjdp        if (TH_STOP != elt->state)
27833965Sjdp            APR_RING_REMOVE(elt, link);
27933965Sjdp
280104834Sobrien        /* Test if a busy thread been asked to stop, which is not joinable */
28133965Sjdp        if ((me->idle_cnt >= me->idle_max
28233965Sjdp             && !(me->scheduled_task_cnt && 0 >= me->idle_max)
283104834Sobrien             && !me->idle_wait)
284104834Sobrien            || me->terminated || elt->state != TH_RUN) {
28533965Sjdp            --me->thd_cnt;
28633965Sjdp            if ((TH_PROBATION == elt->state) && me->idle_wait)
28733965Sjdp                ++me->thd_timed_out;
28833965Sjdp            APR_RING_INSERT_TAIL(me->recycled_thds, elt,
28933965Sjdp                                 apr_thread_list_elt, link);
29033965Sjdp            apr_thread_mutex_unlock(me->lock);
29133965Sjdp            apr_thread_detach(t);
29233965Sjdp            apr_thread_exit(t, APR_SUCCESS);
29333965Sjdp            return NULL;        /* should not be here, safe net */
29433965Sjdp        }
29533965Sjdp
29633965Sjdp        /* busy thread become idle */
29733965Sjdp        ++me->idle_cnt;
29833965Sjdp        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
29933965Sjdp
30033965Sjdp        /*
30133965Sjdp         * If there is a scheduled task, always scheduled to perform that task.
30277298Sobrien         * Since there is no guarantee that current idle threads are scheduled
30333965Sjdp         * for next scheduled task.
30433965Sjdp         */
30577298Sobrien        if (me->scheduled_task_cnt)
30633965Sjdp            wait = waiting_time(me);
30733965Sjdp        else if (me->idle_cnt > me->idle_max) {
30877298Sobrien            wait = me->idle_wait;
30933965Sjdp            elt->state = TH_PROBATION;
31033965Sjdp        }
31177298Sobrien        else
31277298Sobrien            wait = -1;
31333965Sjdp
31433965Sjdp        if (wait >= 0) {
31577298Sobrien            apr_thread_cond_timedwait(me->cond, me->lock, wait);
31677298Sobrien        }
31777298Sobrien        else {
31877298Sobrien            apr_thread_cond_wait(me->cond, me->lock);
31977298Sobrien        }
32077298Sobrien    }
32177298Sobrien
32277298Sobrien    /* idle thread been asked to stop, will be joined */
32377298Sobrien    --me->thd_cnt;
32433965Sjdp    apr_thread_mutex_unlock(me->lock);
32577298Sobrien    apr_thread_exit(t, APR_SUCCESS);
32677298Sobrien    return NULL;                /* should not be here, safe net */
32777298Sobrien}
32833965Sjdp
32933965Sjdpstatic apr_status_t thread_pool_cleanup(void *me)
33077298Sobrien{
33133965Sjdp    apr_thread_pool_t *_myself = me;
33233965Sjdp
33377298Sobrien    _myself->terminated = 1;
33433965Sjdp    apr_thread_pool_idle_max_set(_myself, 0);
33533965Sjdp    while (_myself->thd_cnt) {
33633965Sjdp        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
33733965Sjdp    }
33833965Sjdp    apr_thread_mutex_destroy(_myself->lock);
33933965Sjdp    apr_thread_cond_destroy(_myself->cond);
34033965Sjdp    return APR_SUCCESS;
34133965Sjdp}
34233965Sjdp
34333965SjdpAPU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
34433965Sjdp                                                 apr_size_t init_threads,
34533965Sjdp                                                 apr_size_t max_threads,
34633965Sjdp                                                 apr_pool_t * pool)
34733965Sjdp{
34833965Sjdp    apr_thread_t *t;
34933965Sjdp    apr_status_t rv = APR_SUCCESS;
35077298Sobrien    apr_thread_pool_t *tp;
35133965Sjdp
35233965Sjdp    *me = NULL;
35333965Sjdp    tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t));
35433965Sjdp
35533965Sjdp    /*
35633965Sjdp     * This pool will be used by different threads. As we cannot ensure that
35733965Sjdp     * our caller won't use the pool without acquiring the mutex, we must
35833965Sjdp     * create a new sub pool.
35933965Sjdp     */
36033965Sjdp    rv = apr_pool_create(&tp->pool, pool);
36133965Sjdp    if (APR_SUCCESS != rv)
36233965Sjdp        return rv;
36333965Sjdp    rv = thread_pool_construct(tp, init_threads, max_threads);
36460484Sobrien    if (APR_SUCCESS != rv)
36533965Sjdp        return rv;
36633965Sjdp    apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup);
36733965Sjdp
36833965Sjdp    while (init_threads) {
36933965Sjdp        /* Grab the mutex as apr_thread_create() and thread_pool_func() will
37077298Sobrien         * allocate from (*me)->pool. This is dangerous if there are multiple
37133965Sjdp         * initial threads to create.
37233965Sjdp         */
37333965Sjdp        apr_thread_mutex_lock(tp->lock);
37433965Sjdp        rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool);
37533965Sjdp        apr_thread_mutex_unlock(tp->lock);
37633965Sjdp        if (APR_SUCCESS != rv) {
37733965Sjdp            break;
37833965Sjdp        }
37933965Sjdp        tp->thd_cnt++;
38033965Sjdp        if (tp->thd_cnt > tp->thd_high) {
38177298Sobrien            tp->thd_high = tp->thd_cnt;
38233965Sjdp        }
38333965Sjdp        --init_threads;
38433965Sjdp    }
38533965Sjdp
38633965Sjdp    if (rv == APR_SUCCESS) {
38733965Sjdp        *me = tp;
38833965Sjdp    }
38933965Sjdp
39033965Sjdp    return rv;
39133965Sjdp}
39233965Sjdp
39333965SjdpAPU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
39433965Sjdp{
39533965Sjdp    apr_pool_destroy(me->pool);
39633965Sjdp    return APR_SUCCESS;
39733965Sjdp}
39833965Sjdp
39933965Sjdp/*
40033965Sjdp * NOTE: This function is not thread safe by itself. Caller should hold the lock
40133965Sjdp */
40233965Sjdpstatic apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
40333965Sjdp                                        apr_thread_start_t func,
40433965Sjdp                                        void *param, apr_byte_t priority,
40577298Sobrien                                        void *owner, apr_time_t time)
40633965Sjdp{
40733965Sjdp    apr_thread_pool_task_t *t;
40833965Sjdp
40933965Sjdp    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
41033965Sjdp        t = apr_pcalloc(me->pool, sizeof(*t));
41133965Sjdp        if (NULL == t) {
41233965Sjdp            return NULL;
41377298Sobrien        }
41433965Sjdp    }
41533965Sjdp    else {
41633965Sjdp        t = APR_RING_FIRST(me->recycled_tasks);
41733965Sjdp        APR_RING_REMOVE(t, link);
41833965Sjdp    }
41977298Sobrien
42077298Sobrien    APR_RING_ELEM_INIT(t, link);
42177298Sobrien    t->func = func;
42277298Sobrien    t->param = param;
42377298Sobrien    t->owner = owner;
42433965Sjdp    if (time > 0) {
42533965Sjdp        t->dispatch.time = apr_time_now() + time;
42633965Sjdp    }
42777298Sobrien    else {
42877298Sobrien        t->dispatch.priority = priority;
42977298Sobrien    }
43033965Sjdp    return t;
43177298Sobrien}
43277298Sobrien
43377298Sobrien/*
43433965Sjdp * Test it the task is the only one within the priority segment.
43533965Sjdp * If it is not, return the first element with same or lower priority.
43633965Sjdp * Otherwise, add the task into the queue and return NULL.
43733965Sjdp *
43833965Sjdp * NOTE: This function is not thread safe by itself. Caller should hold the lock
43933965Sjdp */
44033965Sjdpstatic apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
44133965Sjdp                                            apr_thread_pool_task_t * const t)
44233965Sjdp{
44333965Sjdp    int seg;
44433965Sjdp    int next;
44533965Sjdp    apr_thread_pool_task_t *t_next;
44633965Sjdp
44777298Sobrien    seg = TASK_PRIORITY_SEG(t);
44877298Sobrien    if (me->task_idx[seg]) {
44977298Sobrien        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
45077298Sobrien               me->task_idx[seg]);
45177298Sobrien        t_next = me->task_idx[seg];
45233965Sjdp        while (t_next->dispatch.priority > t->dispatch.priority) {
45333965Sjdp            t_next = APR_RING_NEXT(t_next, link);
45433965Sjdp            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
45533965Sjdp                t_next) {
45633965Sjdp                return t_next;
45733965Sjdp            }
45877298Sobrien        }
45977298Sobrien        return t_next;
46077298Sobrien    }
46177298Sobrien
46233965Sjdp    for (next = seg - 1; next >= 0; next--) {
46377298Sobrien        if (me->task_idx[next]) {
46477298Sobrien            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
46577298Sobrien            break;
46677298Sobrien        }
46733965Sjdp    }
46877298Sobrien    if (0 > next) {
46933965Sjdp        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
47077298Sobrien    }
47177298Sobrien    me->task_idx[seg] = t;
47277298Sobrien    return NULL;
47377298Sobrien}
47477298Sobrien
47577298Sobrien/*
47677298Sobrien*   schedule a task to run in "time" microseconds. Find the spot in the ring where
47777298Sobrien*   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
47877298Sobrien*/
47977298Sobrienstatic apr_status_t schedule_task(apr_thread_pool_t *me,
48077298Sobrien                                  apr_thread_start_t func, void *param,
48133965Sjdp                                  void *owner, apr_interval_time_t time)
48233965Sjdp{
48377298Sobrien    apr_thread_pool_task_t *t;
48477298Sobrien    apr_thread_pool_task_t *t_loc;
48577298Sobrien    apr_thread_t *thd;
48677298Sobrien    apr_status_t rv = APR_SUCCESS;
48777298Sobrien    apr_thread_mutex_lock(me->lock);
48877298Sobrien
48977298Sobrien    t = task_new(me, func, param, 0, owner, time);
49077298Sobrien    if (NULL == t) {
49133965Sjdp        apr_thread_mutex_unlock(me->lock);
49277298Sobrien        return APR_ENOMEM;
49333965Sjdp    }
49433965Sjdp    t_loc = APR_RING_FIRST(me->scheduled_tasks);
49577298Sobrien    while (NULL != t_loc) {
49677298Sobrien        /* if the time is less than the entry insert ahead of it */
49777298Sobrien        if (t->dispatch.time < t_loc->dispatch.time) {
49833965Sjdp            ++me->scheduled_task_cnt;
49977298Sobrien            APR_RING_INSERT_BEFORE(t_loc, t, link);
50077298Sobrien            break;
50133965Sjdp        }
50277298Sobrien        else {
50377298Sobrien            t_loc = APR_RING_NEXT(t_loc, link);
50433965Sjdp            if (t_loc ==
50577298Sobrien                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
50633965Sjdp                                  link)) {
50777298Sobrien                ++me->scheduled_task_cnt;
50877298Sobrien                APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
50933965Sjdp                                     apr_thread_pool_task, link);
51033965Sjdp                break;
51133965Sjdp            }
51233965Sjdp        }
51377298Sobrien    }
51433965Sjdp    /* there should be at least one thread for scheduled tasks */
51533965Sjdp    if (0 == me->thd_cnt) {
51633965Sjdp        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
51777298Sobrien        if (APR_SUCCESS == rv) {
51833965Sjdp            ++me->thd_cnt;
51933965Sjdp            if (me->thd_cnt > me->thd_high)
52077298Sobrien                me->thd_high = me->thd_cnt;
52133965Sjdp        }
52233965Sjdp    }
52333965Sjdp    apr_thread_cond_signal(me->cond);
52433965Sjdp    apr_thread_mutex_unlock(me->lock);
52533965Sjdp    return rv;
52677298Sobrien}
52733965Sjdp
52877298Sobrienstatic apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
52933965Sjdp                             void *param, apr_byte_t priority, int push,
53033965Sjdp                             void *owner)
53133965Sjdp{
53233965Sjdp    apr_thread_pool_task_t *t;
53377298Sobrien    apr_thread_pool_task_t *t_loc;
53433965Sjdp    apr_thread_t *thd;
53533965Sjdp    apr_status_t rv = APR_SUCCESS;
53633965Sjdp
53777298Sobrien    apr_thread_mutex_lock(me->lock);
53877298Sobrien
53933965Sjdp    t = task_new(me, func, param, priority, owner, 0);
54033965Sjdp    if (NULL == t) {
54133965Sjdp        apr_thread_mutex_unlock(me->lock);
54233965Sjdp        return APR_ENOMEM;
54333965Sjdp    }
54433965Sjdp
54533965Sjdp    t_loc = add_if_empty(me, t);
54633965Sjdp    if (NULL == t_loc) {
54777298Sobrien        goto FINAL_EXIT;
54833965Sjdp    }
54977298Sobrien
55033965Sjdp    if (push) {
55133965Sjdp        while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
552               t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
553            t_loc = APR_RING_NEXT(t_loc, link);
554        }
555    }
556    APR_RING_INSERT_BEFORE(t_loc, t, link);
557    if (!push) {
558        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
559            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
560        }
561    }
562
563  FINAL_EXIT:
564    me->task_cnt++;
565    if (me->task_cnt > me->tasks_high)
566        me->tasks_high = me->task_cnt;
567    if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
568                             me->task_cnt > me->threshold)) {
569        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
570        if (APR_SUCCESS == rv) {
571            ++me->thd_cnt;
572            if (me->thd_cnt > me->thd_high)
573                me->thd_high = me->thd_cnt;
574        }
575    }
576
577    apr_thread_cond_signal(me->cond);
578    apr_thread_mutex_unlock(me->lock);
579
580    return rv;
581}
582
583APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
584                                               apr_thread_start_t func,
585                                               void *param,
586                                               apr_byte_t priority,
587                                               void *owner)
588{
589    return add_task(me, func, param, priority, 1, owner);
590}
591
592APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me,
593                                                   apr_thread_start_t func,
594                                                   void *param,
595                                                   apr_interval_time_t time,
596                                                   void *owner)
597{
598    return schedule_task(me, func, param, owner, time);
599}
600
601APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
602                                              apr_thread_start_t func,
603                                              void *param,
604                                              apr_byte_t priority,
605                                              void *owner)
606{
607    return add_task(me, func, param, priority, 0, owner);
608}
609
610static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
611                                           void *owner)
612{
613    apr_thread_pool_task_t *t_loc;
614    apr_thread_pool_task_t *next;
615
616    t_loc = APR_RING_FIRST(me->scheduled_tasks);
617    while (t_loc !=
618           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
619                             link)) {
620        next = APR_RING_NEXT(t_loc, link);
621        /* if this is the owner remove it */
622        if (t_loc->owner == owner) {
623            --me->scheduled_task_cnt;
624            APR_RING_REMOVE(t_loc, link);
625        }
626        t_loc = next;
627    }
628    return APR_SUCCESS;
629}
630
631static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
632{
633    apr_thread_pool_task_t *t_loc;
634    apr_thread_pool_task_t *next;
635    int seg;
636
637    t_loc = APR_RING_FIRST(me->tasks);
638    while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
639        next = APR_RING_NEXT(t_loc, link);
640        if (t_loc->owner == owner) {
641            --me->task_cnt;
642            seg = TASK_PRIORITY_SEG(t_loc);
643            if (t_loc == me->task_idx[seg]) {
644                me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
645                if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
646                                                           apr_thread_pool_task,
647                                                           link)
648                    || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
649                    me->task_idx[seg] = NULL;
650                }
651            }
652            APR_RING_REMOVE(t_loc, link);
653        }
654        t_loc = next;
655    }
656    return APR_SUCCESS;
657}
658
659static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
660{
661#ifndef NDEBUG
662    apr_os_thread_t *os_thread;
663#endif
664    struct apr_thread_list_elt *elt;
665    apr_thread_mutex_lock(me->lock);
666    elt = APR_RING_FIRST(me->busy_thds);
667    while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
668        if (elt->current_owner != owner) {
669            elt = APR_RING_NEXT(elt, link);
670            continue;
671        }
672#ifndef NDEBUG
673        /* make sure the thread is not the one calling tasks_cancel */
674        apr_os_thread_get(&os_thread, elt->thd);
675#ifdef WIN32
676        /* hack for apr win32 bug */
677        assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
678#else
679        assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
680#endif
681#endif
682        while (elt->current_owner == owner) {
683            apr_thread_mutex_unlock(me->lock);
684            apr_sleep(200 * 1000);
685            apr_thread_mutex_lock(me->lock);
686        }
687        elt = APR_RING_FIRST(me->busy_thds);
688    }
689    apr_thread_mutex_unlock(me->lock);
690    return;
691}
692
693APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
694                                                       void *owner)
695{
696    apr_status_t rv = APR_SUCCESS;
697
698    apr_thread_mutex_lock(me->lock);
699    if (me->task_cnt > 0) {
700        rv = remove_tasks(me, owner);
701    }
702    if (me->scheduled_task_cnt > 0) {
703        rv = remove_scheduled_tasks(me, owner);
704    }
705    apr_thread_mutex_unlock(me->lock);
706    wait_on_busy_threads(me, owner);
707
708    return rv;
709}
710
711APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me)
712{
713    return me->task_cnt;
714}
715
716APU_DECLARE(apr_size_t)
717    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
718{
719    return me->scheduled_task_cnt;
720}
721
722APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me)
723{
724    return me->thd_cnt;
725}
726
727APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me)
728{
729    return me->thd_cnt - me->idle_cnt;
730}
731
732APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me)
733{
734    return me->idle_cnt;
735}
736
737APU_DECLARE(apr_size_t)
738    apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
739{
740    return me->tasks_run;
741}
742
743APU_DECLARE(apr_size_t)
744    apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
745{
746    return me->tasks_high;
747}
748
749APU_DECLARE(apr_size_t)
750    apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
751{
752    return me->thd_high;
753}
754
755APU_DECLARE(apr_size_t)
756    apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
757{
758    return me->thd_timed_out;
759}
760
761
762APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
763{
764    return me->idle_max;
765}
766
767APU_DECLARE(apr_interval_time_t)
768    apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
769{
770    return me->idle_wait;
771}
772
773/*
774 * This function stop extra idle threads to the cnt.
775 * @return the number of threads stopped
776 * NOTE: There could be busy threads become idle during this function
777 */
778static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
779                                                apr_size_t *cnt, int idle)
780{
781    struct apr_thread_list *thds;
782    apr_size_t n, n_dbg, i;
783    struct apr_thread_list_elt *head, *tail, *elt;
784
785    apr_thread_mutex_lock(me->lock);
786    if (idle) {
787        thds = me->idle_thds;
788        n = me->idle_cnt;
789    }
790    else {
791        thds = me->busy_thds;
792        n = me->thd_cnt - me->idle_cnt;
793    }
794    if (n <= *cnt) {
795        apr_thread_mutex_unlock(me->lock);
796        *cnt = 0;
797        return NULL;
798    }
799    n -= *cnt;
800
801    head = APR_RING_FIRST(thds);
802    for (i = 0; i < *cnt; i++) {
803        head = APR_RING_NEXT(head, link);
804    }
805    tail = APR_RING_LAST(thds);
806    if (idle) {
807        APR_RING_UNSPLICE(head, tail, link);
808        me->idle_cnt = *cnt;
809    }
810
811    n_dbg = 0;
812    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
813        elt->state = TH_STOP;
814        n_dbg++;
815    }
816    elt->state = TH_STOP;
817    n_dbg++;
818    assert(n == n_dbg);
819    *cnt = n;
820
821    apr_thread_mutex_unlock(me->lock);
822
823    APR_RING_PREV(head, link) = NULL;
824    APR_RING_NEXT(tail, link) = NULL;
825    return head;
826}
827
828static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
829{
830    apr_size_t n_dbg;
831    struct apr_thread_list_elt *elt, *head, *tail;
832    apr_status_t rv;
833
834    elt = trim_threads(me, &cnt, 1);
835
836    apr_thread_mutex_lock(me->lock);
837    apr_thread_cond_broadcast(me->cond);
838    apr_thread_mutex_unlock(me->lock);
839
840    n_dbg = 0;
841    if (NULL != (head = elt)) {
842        while (elt) {
843            tail = elt;
844            apr_thread_join(&rv, elt->thd);
845            elt = APR_RING_NEXT(elt, link);
846            ++n_dbg;
847        }
848        apr_thread_mutex_lock(me->lock);
849        APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
850                             apr_thread_list_elt, link);
851        apr_thread_mutex_unlock(me->lock);
852    }
853    assert(cnt == n_dbg);
854
855    return cnt;
856}
857
858/* don't join on busy threads for performance reasons, who knows how long will
859 * the task takes to perform
860 */
861static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
862{
863    trim_threads(me, &cnt, 0);
864    return cnt;
865}
866
867APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
868                                                     apr_size_t cnt)
869{
870    me->idle_max = cnt;
871    cnt = trim_idle_threads(me, cnt);
872    return cnt;
873}
874
875APU_DECLARE(apr_interval_time_t)
876    apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
877                                  apr_interval_time_t timeout)
878{
879    apr_interval_time_t oldtime;
880
881    oldtime = me->idle_wait;
882    me->idle_wait = timeout;
883
884    return oldtime;
885}
886
887APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
888{
889    return me->thd_max;
890}
891
892/*
893 * This function stop extra working threads to the new limit.
894 * NOTE: There could be busy threads become idle during this function
895 */
896APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
897                                                       apr_size_t cnt)
898{
899    unsigned int n;
900
901    me->thd_max = cnt;
902    if (0 == cnt || me->thd_cnt <= cnt) {
903        return 0;
904    }
905
906    n = me->thd_cnt - cnt;
907    if (n >= me->idle_cnt) {
908        trim_busy_threads(me, n - me->idle_cnt);
909        trim_idle_threads(me, 0);
910    }
911    else {
912        trim_idle_threads(me, me->idle_cnt - n);
913    }
914    return n;
915}
916
917APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me)
918{
919    return me->threshold;
920}
921
922APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me,
923                                                      apr_size_t val)
924{
925    apr_size_t ov;
926
927    ov = me->threshold;
928    me->threshold = val;
929    return ov;
930}
931
932APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd,
933                                                         void **owner)
934{
935    apr_status_t rv;
936    apr_thread_pool_task_t *task;
937    void *data;
938
939    rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
940    if (rv != APR_SUCCESS) {
941        return rv;
942    }
943
944    task = data;
945    if (!task) {
946        *owner = NULL;
947        return APR_BADARG;
948    }
949
950    *owner = task->owner;
951    return APR_SUCCESS;
952}
953
954#endif /* APR_HAS_THREADS */
955
956/* vim: set ts=4 sw=4 et cin tw=80: */
957