apr_thread_pool.c revision 303975
133965Sjdp/* 2104834Sobrien * Licensed to the Apache Software Foundation (ASF) under one or more 338889Sjdp * contributor license agreements. See the NOTICE file distributed 433965Sjdp * with this work for additional information regarding copyright 533965Sjdp * ownership. The ASF licenses this file to you under the Apache 633965Sjdp * License, Version 2.0 (the "License"); you may not use this file 733965Sjdp * except in compliance with the License. You may obtain a copy of 833965Sjdp * the License at 933965Sjdp * 1033965Sjdp * http://www.apache.org/licenses/LICENSE-2.0 1133965Sjdp * 1233965Sjdp * Unless required by applicable law or agreed to in writing, software 1333965Sjdp * distributed under the License is distributed on an "AS IS" BASIS, 1433965Sjdp * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 1533965Sjdp * implied. See the License for the specific language governing 1633965Sjdp * permissions and limitations under the License. 1733965Sjdp */ 1833965Sjdp 1933965Sjdp#include <assert.h> 2033965Sjdp#include "apr_thread_pool.h" 2133965Sjdp#include "apr_ring.h" 2233965Sjdp#include "apr_thread_cond.h" 2333965Sjdp#include "apr_portable.h" 2433965Sjdp 2533965Sjdp#if APR_HAS_THREADS 2633965Sjdp 2733965Sjdp#define TASK_PRIORITY_SEGS 4 2833965Sjdp#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64) 2933965Sjdp 3033965Sjdptypedef struct apr_thread_pool_task 3133965Sjdp{ 3233965Sjdp APR_RING_ENTRY(apr_thread_pool_task) link; 3333965Sjdp apr_thread_start_t func; 3433965Sjdp void *param; 3533965Sjdp void *owner; 3677298Sobrien union 3733965Sjdp { 3833965Sjdp apr_byte_t priority; 3933965Sjdp apr_time_t time; 4033965Sjdp } dispatch; 4133965Sjdp} apr_thread_pool_task_t; 4233965Sjdp 4333965SjdpAPR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task); 4433965Sjdp 4533965Sjdpstruct apr_thread_list_elt 4633965Sjdp{ 4733965Sjdp APR_RING_ENTRY(apr_thread_list_elt) link; 4833965Sjdp apr_thread_t *thd; 4933965Sjdp volatile void *current_owner; 5033965Sjdp volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state; 5133965Sjdp}; 5238889Sjdp 5333965SjdpAPR_RING_HEAD(apr_thread_list, apr_thread_list_elt); 5433965Sjdp 5533965Sjdpstruct apr_thread_pool 5633965Sjdp{ 5733965Sjdp apr_pool_t *pool; 5833965Sjdp volatile apr_size_t thd_max; 5960484Sobrien volatile apr_size_t idle_max; 6033965Sjdp volatile apr_interval_time_t idle_wait; 6133965Sjdp volatile apr_size_t thd_cnt; 6233965Sjdp volatile apr_size_t idle_cnt; 6333965Sjdp volatile apr_size_t task_cnt; 6433965Sjdp volatile apr_size_t scheduled_task_cnt; 6533965Sjdp volatile apr_size_t threshold; 6633965Sjdp volatile apr_size_t tasks_run; 6733965Sjdp volatile apr_size_t tasks_high; 6833965Sjdp volatile apr_size_t thd_high; 6933965Sjdp volatile apr_size_t thd_timed_out; 7033965Sjdp struct apr_thread_pool_tasks *tasks; 7133965Sjdp struct apr_thread_pool_tasks *scheduled_tasks; 7238889Sjdp struct apr_thread_list *busy_thds; 7338889Sjdp struct apr_thread_list *idle_thds; 7438889Sjdp apr_thread_mutex_t *lock; 7538889Sjdp apr_thread_cond_t *cond; 7638889Sjdp volatile int terminated; 7733965Sjdp struct apr_thread_pool_tasks *recycled_tasks; 7833965Sjdp struct apr_thread_list *recycled_thds; 7933965Sjdp apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS]; 8033965Sjdp}; 8138889Sjdp 8238889Sjdpstatic apr_status_t thread_pool_construct(apr_thread_pool_t * me, 8338889Sjdp apr_size_t init_threads, 8438889Sjdp apr_size_t max_threads) 8538889Sjdp{ 8638889Sjdp apr_status_t rv; 8738889Sjdp int i; 8833965Sjdp 8933965Sjdp me->thd_max = max_threads; 9038889Sjdp me->idle_max = init_threads; 9138889Sjdp me->threshold = init_threads / 2; 9238889Sjdp rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, 9338889Sjdp me->pool); 9438889Sjdp if (APR_SUCCESS != rv) { 9538889Sjdp return rv; 9638889Sjdp } 9738889Sjdp rv = apr_thread_cond_create(&me->cond, me->pool); 9838889Sjdp if (APR_SUCCESS != rv) { 9938889Sjdp apr_thread_mutex_destroy(me->lock); 10038889Sjdp return rv; 10138889Sjdp } 10238889Sjdp me->tasks = apr_palloc(me->pool, sizeof(*me->tasks)); 10338889Sjdp if (!me->tasks) { 10438889Sjdp goto CATCH_ENOMEM; 10533965Sjdp } 10633965Sjdp APR_RING_INIT(me->tasks, apr_thread_pool_task, link); 10733965Sjdp me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks)); 10833965Sjdp if (!me->scheduled_tasks) { 10933965Sjdp goto CATCH_ENOMEM; 11033965Sjdp } 11133965Sjdp APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link); 11233965Sjdp me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks)); 11333965Sjdp if (!me->recycled_tasks) { 11438889Sjdp goto CATCH_ENOMEM; 11538889Sjdp } 11638889Sjdp APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link); 11738889Sjdp me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds)); 11838889Sjdp if (!me->busy_thds) { 11938889Sjdp goto CATCH_ENOMEM; 12038889Sjdp } 12138889Sjdp APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link); 12233965Sjdp me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds)); 12333965Sjdp if (!me->idle_thds) { 12433965Sjdp goto CATCH_ENOMEM; 12533965Sjdp } 12633965Sjdp APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link); 12733965Sjdp me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds)); 12833965Sjdp if (!me->recycled_thds) { 12933965Sjdp goto CATCH_ENOMEM; 13033965Sjdp } 13133965Sjdp APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link); 13233965Sjdp me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0; 13333965Sjdp me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0; 13433965Sjdp me->idle_wait = 0; 13533965Sjdp me->terminated = 0; 13633965Sjdp for (i = 0; i < TASK_PRIORITY_SEGS; i++) { 13733965Sjdp me->task_idx[i] = NULL; 13833965Sjdp } 13933965Sjdp goto FINAL_EXIT; 14033965Sjdp CATCH_ENOMEM: 14133965Sjdp rv = APR_ENOMEM; 14238889Sjdp apr_thread_mutex_destroy(me->lock); 14338889Sjdp apr_thread_cond_destroy(me->cond); 14438889Sjdp FINAL_EXIT: 14538889Sjdp return rv; 14638889Sjdp} 14738889Sjdp 14838889Sjdp/* 14938889Sjdp * NOTE: This function is not thread safe by itself. Caller should hold the lock 15038889Sjdp */ 15138889Sjdpstatic apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me) 15238889Sjdp{ 15338889Sjdp apr_thread_pool_task_t *task = NULL; 15438889Sjdp int seg; 15538889Sjdp 15638889Sjdp /* check for scheduled tasks */ 15738889Sjdp if (me->scheduled_task_cnt > 0) { 15838889Sjdp task = APR_RING_FIRST(me->scheduled_tasks); 15938889Sjdp assert(task != NULL); 16038889Sjdp assert(task != 16138889Sjdp APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 16233965Sjdp link)); 16333965Sjdp /* if it's time */ 16433965Sjdp if (task->dispatch.time <= apr_time_now()) { 16533965Sjdp --me->scheduled_task_cnt; 16633965Sjdp APR_RING_REMOVE(task, link); 16733965Sjdp return task; 16833965Sjdp } 16933965Sjdp } 17033965Sjdp /* check for normal tasks if we're not returning a scheduled task */ 17133965Sjdp if (me->task_cnt == 0) { 17233965Sjdp return NULL; 17333965Sjdp } 17433965Sjdp 17533965Sjdp task = APR_RING_FIRST(me->tasks); 17633965Sjdp assert(task != NULL); 17733965Sjdp assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)); 17833965Sjdp --me->task_cnt; 17933965Sjdp seg = TASK_PRIORITY_SEG(task); 18033965Sjdp if (task == me->task_idx[seg]) { 18133965Sjdp me->task_idx[seg] = APR_RING_NEXT(task, link); 18233965Sjdp if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, 18333965Sjdp apr_thread_pool_task, link) 18433965Sjdp || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { 18533965Sjdp me->task_idx[seg] = NULL; 18633965Sjdp } 18733965Sjdp } 18833965Sjdp APR_RING_REMOVE(task, link); 18933965Sjdp return task; 19033965Sjdp} 19133965Sjdp 19233965Sjdpstatic apr_interval_time_t waiting_time(apr_thread_pool_t * me) 19333965Sjdp{ 19433965Sjdp apr_thread_pool_task_t *task = NULL; 19560484Sobrien 19633965Sjdp task = APR_RING_FIRST(me->scheduled_tasks); 19733965Sjdp assert(task != NULL); 19833965Sjdp assert(task != 19933965Sjdp APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 20033965Sjdp link)); 20133965Sjdp return task->dispatch.time - apr_time_now(); 20233965Sjdp} 20333965Sjdp 20433965Sjdp/* 20533965Sjdp * NOTE: This function is not thread safe by itself. Caller should hold the lock 20633965Sjdp */ 20733965Sjdpstatic struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me, 20833965Sjdp apr_thread_t * t) 20933965Sjdp{ 21033965Sjdp struct apr_thread_list_elt *elt; 21133965Sjdp 21233965Sjdp if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) { 21333965Sjdp elt = apr_pcalloc(me->pool, sizeof(*elt)); 21433965Sjdp if (NULL == elt) { 21533965Sjdp return NULL; 21633965Sjdp } 21733965Sjdp } 21833965Sjdp else { 21933965Sjdp elt = APR_RING_FIRST(me->recycled_thds); 22033965Sjdp APR_RING_REMOVE(elt, link); 22133965Sjdp } 22233965Sjdp 22333965Sjdp APR_RING_ELEM_INIT(elt, link); 22433965Sjdp elt->thd = t; 22533965Sjdp elt->current_owner = NULL; 22633965Sjdp elt->state = TH_RUN; 22733965Sjdp return elt; 22833965Sjdp} 22933965Sjdp 23033965Sjdp/* 23133965Sjdp * The worker thread function. Take a task from the queue and perform it if 23233965Sjdp * there is any. Otherwise, put itself into the idle thread list and waiting 23333965Sjdp * for signal to wake up. 23433965Sjdp * The thread terminate directly by detach and exit when it is asked to stop 23533965Sjdp * after finishing a task. Otherwise, the thread should be in idle thread list 236104834Sobrien * and should be joined. 237104834Sobrien */ 238104834Sobrienstatic void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param) 23933965Sjdp{ 24033965Sjdp apr_thread_pool_t *me = param; 24133965Sjdp apr_thread_pool_task_t *task = NULL; 24233965Sjdp apr_interval_time_t wait; 24333965Sjdp struct apr_thread_list_elt *elt; 24433965Sjdp 24533965Sjdp apr_thread_mutex_lock(me->lock); 24633965Sjdp elt = elt_new(me, t); 24733965Sjdp if (!elt) { 248104834Sobrien apr_thread_mutex_unlock(me->lock); 249104834Sobrien apr_thread_exit(t, APR_ENOMEM); 250104834Sobrien } 251104834Sobrien 25233965Sjdp while (!me->terminated && elt->state != TH_STOP) { 25333965Sjdp /* Test if not new element, it is awakened from idle */ 25433965Sjdp if (APR_RING_NEXT(elt, link) != elt) { 25533965Sjdp --me->idle_cnt; 25633965Sjdp APR_RING_REMOVE(elt, link); 25733965Sjdp } 25833965Sjdp 25933965Sjdp APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link); 26033965Sjdp task = pop_task(me); 26133965Sjdp while (NULL != task && !me->terminated) { 26233965Sjdp ++me->tasks_run; 26333965Sjdp elt->current_owner = task->owner; 26433965Sjdp apr_thread_mutex_unlock(me->lock); 26533965Sjdp apr_thread_data_set(task, "apr_thread_pool_task", NULL, t); 26633965Sjdp task->func(t, task->param); 26733965Sjdp apr_thread_mutex_lock(me->lock); 26833965Sjdp APR_RING_INSERT_TAIL(me->recycled_tasks, task, 26933965Sjdp apr_thread_pool_task, link); 27033965Sjdp elt->current_owner = NULL; 27133965Sjdp if (TH_STOP == elt->state) { 27233965Sjdp break; 27333965Sjdp } 27433965Sjdp task = pop_task(me); 27533965Sjdp } 27633965Sjdp assert(NULL == elt->current_owner); 27733965Sjdp if (TH_STOP != elt->state) 27833965Sjdp APR_RING_REMOVE(elt, link); 27933965Sjdp 280104834Sobrien /* Test if a busy thread been asked to stop, which is not joinable */ 28133965Sjdp if ((me->idle_cnt >= me->idle_max 28233965Sjdp && !(me->scheduled_task_cnt && 0 >= me->idle_max) 283104834Sobrien && !me->idle_wait) 284104834Sobrien || me->terminated || elt->state != TH_RUN) { 28533965Sjdp --me->thd_cnt; 28633965Sjdp if ((TH_PROBATION == elt->state) && me->idle_wait) 28733965Sjdp ++me->thd_timed_out; 28833965Sjdp APR_RING_INSERT_TAIL(me->recycled_thds, elt, 28933965Sjdp apr_thread_list_elt, link); 29033965Sjdp apr_thread_mutex_unlock(me->lock); 29133965Sjdp apr_thread_detach(t); 29233965Sjdp apr_thread_exit(t, APR_SUCCESS); 29333965Sjdp return NULL; /* should not be here, safe net */ 29433965Sjdp } 29533965Sjdp 29633965Sjdp /* busy thread become idle */ 29733965Sjdp ++me->idle_cnt; 29833965Sjdp APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link); 29933965Sjdp 30033965Sjdp /* 30133965Sjdp * If there is a scheduled task, always scheduled to perform that task. 30277298Sobrien * Since there is no guarantee that current idle threads are scheduled 30333965Sjdp * for next scheduled task. 30433965Sjdp */ 30577298Sobrien if (me->scheduled_task_cnt) 30633965Sjdp wait = waiting_time(me); 30733965Sjdp else if (me->idle_cnt > me->idle_max) { 30877298Sobrien wait = me->idle_wait; 30933965Sjdp elt->state = TH_PROBATION; 31033965Sjdp } 31177298Sobrien else 31277298Sobrien wait = -1; 31333965Sjdp 31433965Sjdp if (wait >= 0) { 31577298Sobrien apr_thread_cond_timedwait(me->cond, me->lock, wait); 31677298Sobrien } 31777298Sobrien else { 31877298Sobrien apr_thread_cond_wait(me->cond, me->lock); 31977298Sobrien } 32077298Sobrien } 32177298Sobrien 32277298Sobrien /* idle thread been asked to stop, will be joined */ 32377298Sobrien --me->thd_cnt; 32433965Sjdp apr_thread_mutex_unlock(me->lock); 32577298Sobrien apr_thread_exit(t, APR_SUCCESS); 32677298Sobrien return NULL; /* should not be here, safe net */ 32777298Sobrien} 32833965Sjdp 32933965Sjdpstatic apr_status_t thread_pool_cleanup(void *me) 33077298Sobrien{ 33133965Sjdp apr_thread_pool_t *_myself = me; 33233965Sjdp 33377298Sobrien _myself->terminated = 1; 33433965Sjdp apr_thread_pool_idle_max_set(_myself, 0); 33533965Sjdp while (_myself->thd_cnt) { 33633965Sjdp apr_sleep(20 * 1000); /* spin lock with 20 ms */ 33733965Sjdp } 33833965Sjdp apr_thread_mutex_destroy(_myself->lock); 33933965Sjdp apr_thread_cond_destroy(_myself->cond); 34033965Sjdp return APR_SUCCESS; 34133965Sjdp} 34233965Sjdp 34333965SjdpAPU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me, 34433965Sjdp apr_size_t init_threads, 34533965Sjdp apr_size_t max_threads, 34633965Sjdp apr_pool_t * pool) 34733965Sjdp{ 34833965Sjdp apr_thread_t *t; 34933965Sjdp apr_status_t rv = APR_SUCCESS; 35077298Sobrien apr_thread_pool_t *tp; 35133965Sjdp 35233965Sjdp *me = NULL; 35333965Sjdp tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t)); 35433965Sjdp 35533965Sjdp /* 35633965Sjdp * This pool will be used by different threads. As we cannot ensure that 35733965Sjdp * our caller won't use the pool without acquiring the mutex, we must 35833965Sjdp * create a new sub pool. 35933965Sjdp */ 36033965Sjdp rv = apr_pool_create(&tp->pool, pool); 36133965Sjdp if (APR_SUCCESS != rv) 36233965Sjdp return rv; 36333965Sjdp rv = thread_pool_construct(tp, init_threads, max_threads); 36460484Sobrien if (APR_SUCCESS != rv) 36533965Sjdp return rv; 36633965Sjdp apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup); 36733965Sjdp 36833965Sjdp while (init_threads) { 36933965Sjdp /* Grab the mutex as apr_thread_create() and thread_pool_func() will 37077298Sobrien * allocate from (*me)->pool. This is dangerous if there are multiple 37133965Sjdp * initial threads to create. 37233965Sjdp */ 37333965Sjdp apr_thread_mutex_lock(tp->lock); 37433965Sjdp rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool); 37533965Sjdp apr_thread_mutex_unlock(tp->lock); 37633965Sjdp if (APR_SUCCESS != rv) { 37733965Sjdp break; 37833965Sjdp } 37933965Sjdp tp->thd_cnt++; 38033965Sjdp if (tp->thd_cnt > tp->thd_high) { 38177298Sobrien tp->thd_high = tp->thd_cnt; 38233965Sjdp } 38333965Sjdp --init_threads; 38433965Sjdp } 38533965Sjdp 38633965Sjdp if (rv == APR_SUCCESS) { 38733965Sjdp *me = tp; 38833965Sjdp } 38933965Sjdp 39033965Sjdp return rv; 39133965Sjdp} 39233965Sjdp 39333965SjdpAPU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me) 39433965Sjdp{ 39533965Sjdp apr_pool_destroy(me->pool); 39633965Sjdp return APR_SUCCESS; 39733965Sjdp} 39833965Sjdp 39933965Sjdp/* 40033965Sjdp * NOTE: This function is not thread safe by itself. Caller should hold the lock 40133965Sjdp */ 40233965Sjdpstatic apr_thread_pool_task_t *task_new(apr_thread_pool_t * me, 40333965Sjdp apr_thread_start_t func, 40433965Sjdp void *param, apr_byte_t priority, 40577298Sobrien void *owner, apr_time_t time) 40633965Sjdp{ 40733965Sjdp apr_thread_pool_task_t *t; 40833965Sjdp 40933965Sjdp if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) { 41033965Sjdp t = apr_pcalloc(me->pool, sizeof(*t)); 41133965Sjdp if (NULL == t) { 41233965Sjdp return NULL; 41377298Sobrien } 41433965Sjdp } 41533965Sjdp else { 41633965Sjdp t = APR_RING_FIRST(me->recycled_tasks); 41733965Sjdp APR_RING_REMOVE(t, link); 41833965Sjdp } 41977298Sobrien 42077298Sobrien APR_RING_ELEM_INIT(t, link); 42177298Sobrien t->func = func; 42277298Sobrien t->param = param; 42377298Sobrien t->owner = owner; 42433965Sjdp if (time > 0) { 42533965Sjdp t->dispatch.time = apr_time_now() + time; 42633965Sjdp } 42777298Sobrien else { 42877298Sobrien t->dispatch.priority = priority; 42977298Sobrien } 43033965Sjdp return t; 43177298Sobrien} 43277298Sobrien 43377298Sobrien/* 43433965Sjdp * Test it the task is the only one within the priority segment. 43533965Sjdp * If it is not, return the first element with same or lower priority. 43633965Sjdp * Otherwise, add the task into the queue and return NULL. 43733965Sjdp * 43833965Sjdp * NOTE: This function is not thread safe by itself. Caller should hold the lock 43933965Sjdp */ 44033965Sjdpstatic apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me, 44133965Sjdp apr_thread_pool_task_t * const t) 44233965Sjdp{ 44333965Sjdp int seg; 44433965Sjdp int next; 44533965Sjdp apr_thread_pool_task_t *t_next; 44633965Sjdp 44777298Sobrien seg = TASK_PRIORITY_SEG(t); 44877298Sobrien if (me->task_idx[seg]) { 44977298Sobrien assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != 45077298Sobrien me->task_idx[seg]); 45177298Sobrien t_next = me->task_idx[seg]; 45233965Sjdp while (t_next->dispatch.priority > t->dispatch.priority) { 45333965Sjdp t_next = APR_RING_NEXT(t_next, link); 45433965Sjdp if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) == 45533965Sjdp t_next) { 45633965Sjdp return t_next; 45733965Sjdp } 45877298Sobrien } 45977298Sobrien return t_next; 46077298Sobrien } 46177298Sobrien 46233965Sjdp for (next = seg - 1; next >= 0; next--) { 46377298Sobrien if (me->task_idx[next]) { 46477298Sobrien APR_RING_INSERT_BEFORE(me->task_idx[next], t, link); 46577298Sobrien break; 46677298Sobrien } 46733965Sjdp } 46877298Sobrien if (0 > next) { 46933965Sjdp APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link); 47077298Sobrien } 47177298Sobrien me->task_idx[seg] = t; 47277298Sobrien return NULL; 47377298Sobrien} 47477298Sobrien 47577298Sobrien/* 47677298Sobrien* schedule a task to run in "time" microseconds. Find the spot in the ring where 47777298Sobrien* the time fits. Adjust the short_time so the thread wakes up when the time is reached. 47877298Sobrien*/ 47977298Sobrienstatic apr_status_t schedule_task(apr_thread_pool_t *me, 48077298Sobrien apr_thread_start_t func, void *param, 48133965Sjdp void *owner, apr_interval_time_t time) 48233965Sjdp{ 48377298Sobrien apr_thread_pool_task_t *t; 48477298Sobrien apr_thread_pool_task_t *t_loc; 48577298Sobrien apr_thread_t *thd; 48677298Sobrien apr_status_t rv = APR_SUCCESS; 48777298Sobrien apr_thread_mutex_lock(me->lock); 48877298Sobrien 48977298Sobrien t = task_new(me, func, param, 0, owner, time); 49077298Sobrien if (NULL == t) { 49133965Sjdp apr_thread_mutex_unlock(me->lock); 49277298Sobrien return APR_ENOMEM; 49333965Sjdp } 49433965Sjdp t_loc = APR_RING_FIRST(me->scheduled_tasks); 49577298Sobrien while (NULL != t_loc) { 49677298Sobrien /* if the time is less than the entry insert ahead of it */ 49777298Sobrien if (t->dispatch.time < t_loc->dispatch.time) { 49833965Sjdp ++me->scheduled_task_cnt; 49977298Sobrien APR_RING_INSERT_BEFORE(t_loc, t, link); 50077298Sobrien break; 50133965Sjdp } 50277298Sobrien else { 50377298Sobrien t_loc = APR_RING_NEXT(t_loc, link); 50433965Sjdp if (t_loc == 50577298Sobrien APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 50633965Sjdp link)) { 50777298Sobrien ++me->scheduled_task_cnt; 50877298Sobrien APR_RING_INSERT_TAIL(me->scheduled_tasks, t, 50933965Sjdp apr_thread_pool_task, link); 51033965Sjdp break; 51133965Sjdp } 51233965Sjdp } 51377298Sobrien } 51433965Sjdp /* there should be at least one thread for scheduled tasks */ 51533965Sjdp if (0 == me->thd_cnt) { 51633965Sjdp rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); 51777298Sobrien if (APR_SUCCESS == rv) { 51833965Sjdp ++me->thd_cnt; 51933965Sjdp if (me->thd_cnt > me->thd_high) 52077298Sobrien me->thd_high = me->thd_cnt; 52133965Sjdp } 52233965Sjdp } 52333965Sjdp apr_thread_cond_signal(me->cond); 52433965Sjdp apr_thread_mutex_unlock(me->lock); 52533965Sjdp return rv; 52677298Sobrien} 52733965Sjdp 52877298Sobrienstatic apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func, 52933965Sjdp void *param, apr_byte_t priority, int push, 53033965Sjdp void *owner) 53133965Sjdp{ 53233965Sjdp apr_thread_pool_task_t *t; 53377298Sobrien apr_thread_pool_task_t *t_loc; 53433965Sjdp apr_thread_t *thd; 53533965Sjdp apr_status_t rv = APR_SUCCESS; 53633965Sjdp 53777298Sobrien apr_thread_mutex_lock(me->lock); 53877298Sobrien 53933965Sjdp t = task_new(me, func, param, priority, owner, 0); 54033965Sjdp if (NULL == t) { 54133965Sjdp apr_thread_mutex_unlock(me->lock); 54233965Sjdp return APR_ENOMEM; 54333965Sjdp } 54433965Sjdp 54533965Sjdp t_loc = add_if_empty(me, t); 54633965Sjdp if (NULL == t_loc) { 54777298Sobrien goto FINAL_EXIT; 54833965Sjdp } 54977298Sobrien 55033965Sjdp if (push) { 55133965Sjdp while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != 552 t_loc && t_loc->dispatch.priority >= t->dispatch.priority) { 553 t_loc = APR_RING_NEXT(t_loc, link); 554 } 555 } 556 APR_RING_INSERT_BEFORE(t_loc, t, link); 557 if (!push) { 558 if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) { 559 me->task_idx[TASK_PRIORITY_SEG(t)] = t; 560 } 561 } 562 563 FINAL_EXIT: 564 me->task_cnt++; 565 if (me->task_cnt > me->tasks_high) 566 me->tasks_high = me->task_cnt; 567 if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max && 568 me->task_cnt > me->threshold)) { 569 rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); 570 if (APR_SUCCESS == rv) { 571 ++me->thd_cnt; 572 if (me->thd_cnt > me->thd_high) 573 me->thd_high = me->thd_cnt; 574 } 575 } 576 577 apr_thread_cond_signal(me->cond); 578 apr_thread_mutex_unlock(me->lock); 579 580 return rv; 581} 582 583APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me, 584 apr_thread_start_t func, 585 void *param, 586 apr_byte_t priority, 587 void *owner) 588{ 589 return add_task(me, func, param, priority, 1, owner); 590} 591 592APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me, 593 apr_thread_start_t func, 594 void *param, 595 apr_interval_time_t time, 596 void *owner) 597{ 598 return schedule_task(me, func, param, owner, time); 599} 600 601APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me, 602 apr_thread_start_t func, 603 void *param, 604 apr_byte_t priority, 605 void *owner) 606{ 607 return add_task(me, func, param, priority, 0, owner); 608} 609 610static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me, 611 void *owner) 612{ 613 apr_thread_pool_task_t *t_loc; 614 apr_thread_pool_task_t *next; 615 616 t_loc = APR_RING_FIRST(me->scheduled_tasks); 617 while (t_loc != 618 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 619 link)) { 620 next = APR_RING_NEXT(t_loc, link); 621 /* if this is the owner remove it */ 622 if (t_loc->owner == owner) { 623 --me->scheduled_task_cnt; 624 APR_RING_REMOVE(t_loc, link); 625 } 626 t_loc = next; 627 } 628 return APR_SUCCESS; 629} 630 631static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner) 632{ 633 apr_thread_pool_task_t *t_loc; 634 apr_thread_pool_task_t *next; 635 int seg; 636 637 t_loc = APR_RING_FIRST(me->tasks); 638 while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { 639 next = APR_RING_NEXT(t_loc, link); 640 if (t_loc->owner == owner) { 641 --me->task_cnt; 642 seg = TASK_PRIORITY_SEG(t_loc); 643 if (t_loc == me->task_idx[seg]) { 644 me->task_idx[seg] = APR_RING_NEXT(t_loc, link); 645 if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, 646 apr_thread_pool_task, 647 link) 648 || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { 649 me->task_idx[seg] = NULL; 650 } 651 } 652 APR_RING_REMOVE(t_loc, link); 653 } 654 t_loc = next; 655 } 656 return APR_SUCCESS; 657} 658 659static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner) 660{ 661#ifndef NDEBUG 662 apr_os_thread_t *os_thread; 663#endif 664 struct apr_thread_list_elt *elt; 665 apr_thread_mutex_lock(me->lock); 666 elt = APR_RING_FIRST(me->busy_thds); 667 while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { 668 if (elt->current_owner != owner) { 669 elt = APR_RING_NEXT(elt, link); 670 continue; 671 } 672#ifndef NDEBUG 673 /* make sure the thread is not the one calling tasks_cancel */ 674 apr_os_thread_get(&os_thread, elt->thd); 675#ifdef WIN32 676 /* hack for apr win32 bug */ 677 assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread)); 678#else 679 assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread)); 680#endif 681#endif 682 while (elt->current_owner == owner) { 683 apr_thread_mutex_unlock(me->lock); 684 apr_sleep(200 * 1000); 685 apr_thread_mutex_lock(me->lock); 686 } 687 elt = APR_RING_FIRST(me->busy_thds); 688 } 689 apr_thread_mutex_unlock(me->lock); 690 return; 691} 692 693APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me, 694 void *owner) 695{ 696 apr_status_t rv = APR_SUCCESS; 697 698 apr_thread_mutex_lock(me->lock); 699 if (me->task_cnt > 0) { 700 rv = remove_tasks(me, owner); 701 } 702 if (me->scheduled_task_cnt > 0) { 703 rv = remove_scheduled_tasks(me, owner); 704 } 705 apr_thread_mutex_unlock(me->lock); 706 wait_on_busy_threads(me, owner); 707 708 return rv; 709} 710 711APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me) 712{ 713 return me->task_cnt; 714} 715 716APU_DECLARE(apr_size_t) 717 apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me) 718{ 719 return me->scheduled_task_cnt; 720} 721 722APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me) 723{ 724 return me->thd_cnt; 725} 726 727APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me) 728{ 729 return me->thd_cnt - me->idle_cnt; 730} 731 732APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me) 733{ 734 return me->idle_cnt; 735} 736 737APU_DECLARE(apr_size_t) 738 apr_thread_pool_tasks_run_count(apr_thread_pool_t * me) 739{ 740 return me->tasks_run; 741} 742 743APU_DECLARE(apr_size_t) 744 apr_thread_pool_tasks_high_count(apr_thread_pool_t * me) 745{ 746 return me->tasks_high; 747} 748 749APU_DECLARE(apr_size_t) 750 apr_thread_pool_threads_high_count(apr_thread_pool_t * me) 751{ 752 return me->thd_high; 753} 754 755APU_DECLARE(apr_size_t) 756 apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me) 757{ 758 return me->thd_timed_out; 759} 760 761 762APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me) 763{ 764 return me->idle_max; 765} 766 767APU_DECLARE(apr_interval_time_t) 768 apr_thread_pool_idle_wait_get(apr_thread_pool_t * me) 769{ 770 return me->idle_wait; 771} 772 773/* 774 * This function stop extra idle threads to the cnt. 775 * @return the number of threads stopped 776 * NOTE: There could be busy threads become idle during this function 777 */ 778static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me, 779 apr_size_t *cnt, int idle) 780{ 781 struct apr_thread_list *thds; 782 apr_size_t n, n_dbg, i; 783 struct apr_thread_list_elt *head, *tail, *elt; 784 785 apr_thread_mutex_lock(me->lock); 786 if (idle) { 787 thds = me->idle_thds; 788 n = me->idle_cnt; 789 } 790 else { 791 thds = me->busy_thds; 792 n = me->thd_cnt - me->idle_cnt; 793 } 794 if (n <= *cnt) { 795 apr_thread_mutex_unlock(me->lock); 796 *cnt = 0; 797 return NULL; 798 } 799 n -= *cnt; 800 801 head = APR_RING_FIRST(thds); 802 for (i = 0; i < *cnt; i++) { 803 head = APR_RING_NEXT(head, link); 804 } 805 tail = APR_RING_LAST(thds); 806 if (idle) { 807 APR_RING_UNSPLICE(head, tail, link); 808 me->idle_cnt = *cnt; 809 } 810 811 n_dbg = 0; 812 for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) { 813 elt->state = TH_STOP; 814 n_dbg++; 815 } 816 elt->state = TH_STOP; 817 n_dbg++; 818 assert(n == n_dbg); 819 *cnt = n; 820 821 apr_thread_mutex_unlock(me->lock); 822 823 APR_RING_PREV(head, link) = NULL; 824 APR_RING_NEXT(tail, link) = NULL; 825 return head; 826} 827 828static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt) 829{ 830 apr_size_t n_dbg; 831 struct apr_thread_list_elt *elt, *head, *tail; 832 apr_status_t rv; 833 834 elt = trim_threads(me, &cnt, 1); 835 836 apr_thread_mutex_lock(me->lock); 837 apr_thread_cond_broadcast(me->cond); 838 apr_thread_mutex_unlock(me->lock); 839 840 n_dbg = 0; 841 if (NULL != (head = elt)) { 842 while (elt) { 843 tail = elt; 844 apr_thread_join(&rv, elt->thd); 845 elt = APR_RING_NEXT(elt, link); 846 ++n_dbg; 847 } 848 apr_thread_mutex_lock(me->lock); 849 APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail, 850 apr_thread_list_elt, link); 851 apr_thread_mutex_unlock(me->lock); 852 } 853 assert(cnt == n_dbg); 854 855 return cnt; 856} 857 858/* don't join on busy threads for performance reasons, who knows how long will 859 * the task takes to perform 860 */ 861static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt) 862{ 863 trim_threads(me, &cnt, 0); 864 return cnt; 865} 866 867APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me, 868 apr_size_t cnt) 869{ 870 me->idle_max = cnt; 871 cnt = trim_idle_threads(me, cnt); 872 return cnt; 873} 874 875APU_DECLARE(apr_interval_time_t) 876 apr_thread_pool_idle_wait_set(apr_thread_pool_t * me, 877 apr_interval_time_t timeout) 878{ 879 apr_interval_time_t oldtime; 880 881 oldtime = me->idle_wait; 882 me->idle_wait = timeout; 883 884 return oldtime; 885} 886 887APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me) 888{ 889 return me->thd_max; 890} 891 892/* 893 * This function stop extra working threads to the new limit. 894 * NOTE: There could be busy threads become idle during this function 895 */ 896APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me, 897 apr_size_t cnt) 898{ 899 unsigned int n; 900 901 me->thd_max = cnt; 902 if (0 == cnt || me->thd_cnt <= cnt) { 903 return 0; 904 } 905 906 n = me->thd_cnt - cnt; 907 if (n >= me->idle_cnt) { 908 trim_busy_threads(me, n - me->idle_cnt); 909 trim_idle_threads(me, 0); 910 } 911 else { 912 trim_idle_threads(me, me->idle_cnt - n); 913 } 914 return n; 915} 916 917APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me) 918{ 919 return me->threshold; 920} 921 922APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me, 923 apr_size_t val) 924{ 925 apr_size_t ov; 926 927 ov = me->threshold; 928 me->threshold = val; 929 return ov; 930} 931 932APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd, 933 void **owner) 934{ 935 apr_status_t rv; 936 apr_thread_pool_task_t *task; 937 void *data; 938 939 rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd); 940 if (rv != APR_SUCCESS) { 941 return rv; 942 } 943 944 task = data; 945 if (!task) { 946 *owner = NULL; 947 return APR_BADARG; 948 } 949 950 *owner = task->owner; 951 return APR_SUCCESS; 952} 953 954#endif /* APR_HAS_THREADS */ 955 956/* vim: set ts=4 sw=4 et cin tw=80: */ 957