1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed
4 * with this work for additional information regarding copyright
5 * ownership.  The ASF licenses this file to you under the Apache
6 * License, Version 2.0 (the "License"); you may not use this file
7 * except in compliance with the License.  You may obtain a copy of
8 * the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 * implied.  See the License for the specific language governing
16 * permissions and limitations under the License.
17 */
18
19#include <assert.h>
20#include "apr_thread_pool.h"
21#include "apr_ring.h"
22#include "apr_thread_cond.h"
23#include "apr_portable.h"
24
25#if APR_HAS_THREADS
26
27#define TASK_PRIORITY_SEGS 4
28#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
29
30typedef struct apr_thread_pool_task
31{
32    APR_RING_ENTRY(apr_thread_pool_task) link;
33    apr_thread_start_t func;
34    void *param;
35    void *owner;
36    union
37    {
38        apr_byte_t priority;
39        apr_time_t time;
40    } dispatch;
41} apr_thread_pool_task_t;
42
43APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
44
45struct apr_thread_list_elt
46{
47    APR_RING_ENTRY(apr_thread_list_elt) link;
48    apr_thread_t *thd;
49    volatile void *current_owner;
50    volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
51};
52
53APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
54
55struct apr_thread_pool
56{
57    apr_pool_t *pool;
58    volatile apr_size_t thd_max;
59    volatile apr_size_t idle_max;
60    volatile apr_interval_time_t idle_wait;
61    volatile apr_size_t thd_cnt;
62    volatile apr_size_t idle_cnt;
63    volatile apr_size_t task_cnt;
64    volatile apr_size_t scheduled_task_cnt;
65    volatile apr_size_t threshold;
66    volatile apr_size_t tasks_run;
67    volatile apr_size_t tasks_high;
68    volatile apr_size_t thd_high;
69    volatile apr_size_t thd_timed_out;
70    struct apr_thread_pool_tasks *tasks;
71    struct apr_thread_pool_tasks *scheduled_tasks;
72    struct apr_thread_list *busy_thds;
73    struct apr_thread_list *idle_thds;
74    apr_thread_mutex_t *lock;
75    apr_thread_cond_t *cond;
76    volatile int terminated;
77    struct apr_thread_pool_tasks *recycled_tasks;
78    struct apr_thread_list *recycled_thds;
79    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
80};
81
82static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
83                                          apr_size_t init_threads,
84                                          apr_size_t max_threads)
85{
86    apr_status_t rv;
87    int i;
88
89    me->thd_max = max_threads;
90    me->idle_max = init_threads;
91    me->threshold = init_threads / 2;
92    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
93                                 me->pool);
94    if (APR_SUCCESS != rv) {
95        return rv;
96    }
97    rv = apr_thread_cond_create(&me->cond, me->pool);
98    if (APR_SUCCESS != rv) {
99        apr_thread_mutex_destroy(me->lock);
100        return rv;
101    }
102    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
103    if (!me->tasks) {
104        goto CATCH_ENOMEM;
105    }
106    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
107    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
108    if (!me->scheduled_tasks) {
109        goto CATCH_ENOMEM;
110    }
111    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
112    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
113    if (!me->recycled_tasks) {
114        goto CATCH_ENOMEM;
115    }
116    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
117    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
118    if (!me->busy_thds) {
119        goto CATCH_ENOMEM;
120    }
121    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
122    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
123    if (!me->idle_thds) {
124        goto CATCH_ENOMEM;
125    }
126    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
127    me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
128    if (!me->recycled_thds) {
129        goto CATCH_ENOMEM;
130    }
131    APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
132    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
133    me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
134    me->idle_wait = 0;
135    me->terminated = 0;
136    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
137        me->task_idx[i] = NULL;
138    }
139    goto FINAL_EXIT;
140  CATCH_ENOMEM:
141    rv = APR_ENOMEM;
142    apr_thread_mutex_destroy(me->lock);
143    apr_thread_cond_destroy(me->cond);
144  FINAL_EXIT:
145    return rv;
146}
147
148/*
149 * NOTE: This function is not thread safe by itself. Caller should hold the lock
150 */
151static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
152{
153    apr_thread_pool_task_t *task = NULL;
154    int seg;
155
156    /* check for scheduled tasks */
157    if (me->scheduled_task_cnt > 0) {
158        task = APR_RING_FIRST(me->scheduled_tasks);
159        assert(task != NULL);
160        assert(task !=
161               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
162                                 link));
163        /* if it's time */
164        if (task->dispatch.time <= apr_time_now()) {
165            --me->scheduled_task_cnt;
166            APR_RING_REMOVE(task, link);
167            return task;
168        }
169    }
170    /* check for normal tasks if we're not returning a scheduled task */
171    if (me->task_cnt == 0) {
172        return NULL;
173    }
174
175    task = APR_RING_FIRST(me->tasks);
176    assert(task != NULL);
177    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
178    --me->task_cnt;
179    seg = TASK_PRIORITY_SEG(task);
180    if (task == me->task_idx[seg]) {
181        me->task_idx[seg] = APR_RING_NEXT(task, link);
182        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
183                                                   apr_thread_pool_task, link)
184            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
185            me->task_idx[seg] = NULL;
186        }
187    }
188    APR_RING_REMOVE(task, link);
189    return task;
190}
191
192static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
193{
194    apr_thread_pool_task_t *task = NULL;
195
196    task = APR_RING_FIRST(me->scheduled_tasks);
197    assert(task != NULL);
198    assert(task !=
199           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
200                             link));
201    return task->dispatch.time - apr_time_now();
202}
203
204/*
205 * NOTE: This function is not thread safe by itself. Caller should hold the lock
206 */
207static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
208                                           apr_thread_t * t)
209{
210    struct apr_thread_list_elt *elt;
211
212    if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
213        elt = apr_pcalloc(me->pool, sizeof(*elt));
214        if (NULL == elt) {
215            return NULL;
216        }
217    }
218    else {
219        elt = APR_RING_FIRST(me->recycled_thds);
220        APR_RING_REMOVE(elt, link);
221    }
222
223    APR_RING_ELEM_INIT(elt, link);
224    elt->thd = t;
225    elt->current_owner = NULL;
226    elt->state = TH_RUN;
227    return elt;
228}
229
230/*
231 * The worker thread function. Take a task from the queue and perform it if
232 * there is any. Otherwise, put itself into the idle thread list and waiting
233 * for signal to wake up.
234 * The thread terminate directly by detach and exit when it is asked to stop
235 * after finishing a task. Otherwise, the thread should be in idle thread list
236 * and should be joined.
237 */
238static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
239{
240    apr_status_t rv = APR_SUCCESS;
241    apr_thread_pool_t *me = param;
242    apr_thread_pool_task_t *task = NULL;
243    apr_interval_time_t wait;
244    struct apr_thread_list_elt *elt;
245
246    apr_thread_mutex_lock(me->lock);
247    elt = elt_new(me, t);
248    if (!elt) {
249        apr_thread_mutex_unlock(me->lock);
250        apr_thread_exit(t, APR_ENOMEM);
251    }
252
253    while (!me->terminated && elt->state != TH_STOP) {
254        /* Test if not new element, it is awakened from idle */
255        if (APR_RING_NEXT(elt, link) != elt) {
256            --me->idle_cnt;
257            APR_RING_REMOVE(elt, link);
258        }
259
260        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
261        task = pop_task(me);
262        while (NULL != task && !me->terminated) {
263            ++me->tasks_run;
264            elt->current_owner = task->owner;
265            apr_thread_mutex_unlock(me->lock);
266            apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
267            task->func(t, task->param);
268            apr_thread_mutex_lock(me->lock);
269            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
270                                 apr_thread_pool_task, link);
271            elt->current_owner = NULL;
272            if (TH_STOP == elt->state) {
273                break;
274            }
275            task = pop_task(me);
276        }
277        assert(NULL == elt->current_owner);
278        if (TH_STOP != elt->state)
279            APR_RING_REMOVE(elt, link);
280
281        /* Test if a busy thread been asked to stop, which is not joinable */
282        if ((me->idle_cnt >= me->idle_max
283             && !(me->scheduled_task_cnt && 0 >= me->idle_max)
284             && !me->idle_wait)
285            || me->terminated || elt->state != TH_RUN) {
286            --me->thd_cnt;
287            if ((TH_PROBATION == elt->state) && me->idle_wait)
288                ++me->thd_timed_out;
289            APR_RING_INSERT_TAIL(me->recycled_thds, elt,
290                                 apr_thread_list_elt, link);
291            apr_thread_mutex_unlock(me->lock);
292            apr_thread_detach(t);
293            apr_thread_exit(t, APR_SUCCESS);
294            return NULL;        /* should not be here, safe net */
295        }
296
297        /* busy thread become idle */
298        ++me->idle_cnt;
299        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
300
301        /*
302         * If there is a scheduled task, always scheduled to perform that task.
303         * Since there is no guarantee that current idle threads are scheduled
304         * for next scheduled task.
305         */
306        if (me->scheduled_task_cnt)
307            wait = waiting_time(me);
308        else if (me->idle_cnt > me->idle_max) {
309            wait = me->idle_wait;
310            elt->state = TH_PROBATION;
311        }
312        else
313            wait = -1;
314
315        if (wait >= 0) {
316            rv = apr_thread_cond_timedwait(me->cond, me->lock, wait);
317        }
318        else {
319            rv = apr_thread_cond_wait(me->cond, me->lock);
320        }
321    }
322
323    /* idle thread been asked to stop, will be joined */
324    --me->thd_cnt;
325    apr_thread_mutex_unlock(me->lock);
326    apr_thread_exit(t, APR_SUCCESS);
327    return NULL;                /* should not be here, safe net */
328}
329
330static apr_status_t thread_pool_cleanup(void *me)
331{
332    apr_thread_pool_t *_myself = me;
333
334    _myself->terminated = 1;
335    apr_thread_pool_idle_max_set(_myself, 0);
336    while (_myself->thd_cnt) {
337        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
338    }
339    apr_thread_mutex_destroy(_myself->lock);
340    apr_thread_cond_destroy(_myself->cond);
341    return APR_SUCCESS;
342}
343
344APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
345                                                 apr_size_t init_threads,
346                                                 apr_size_t max_threads,
347                                                 apr_pool_t * pool)
348{
349    apr_thread_t *t;
350    apr_status_t rv = APR_SUCCESS;
351    apr_thread_pool_t *tp;
352
353    *me = NULL;
354    tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t));
355
356    tp->pool = pool;
357
358    rv = thread_pool_construct(tp, init_threads, max_threads);
359    if (APR_SUCCESS != rv) {
360        return rv;
361    }
362    apr_pool_cleanup_register(pool, tp, thread_pool_cleanup,
363                              apr_pool_cleanup_null);
364
365    while (init_threads) {
366        /* Grab the mutex as apr_thread_create() and thread_pool_func() will
367         * allocate from (*me)->pool. This is dangerous if there are multiple
368         * initial threads to create.
369         */
370        apr_thread_mutex_lock(tp->lock);
371        rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool);
372        apr_thread_mutex_unlock(tp->lock);
373        if (APR_SUCCESS != rv) {
374            break;
375        }
376        tp->thd_cnt++;
377        if (tp->thd_cnt > tp->thd_high) {
378            tp->thd_high = tp->thd_cnt;
379        }
380        --init_threads;
381    }
382
383    if (rv == APR_SUCCESS) {
384        *me = tp;
385    }
386
387    return rv;
388}
389
390APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
391{
392    return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup);
393}
394
395/*
396 * NOTE: This function is not thread safe by itself. Caller should hold the lock
397 */
398static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
399                                        apr_thread_start_t func,
400                                        void *param, apr_byte_t priority,
401                                        void *owner, apr_time_t time)
402{
403    apr_thread_pool_task_t *t;
404
405    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
406        t = apr_pcalloc(me->pool, sizeof(*t));
407        if (NULL == t) {
408            return NULL;
409        }
410    }
411    else {
412        t = APR_RING_FIRST(me->recycled_tasks);
413        APR_RING_REMOVE(t, link);
414    }
415
416    APR_RING_ELEM_INIT(t, link);
417    t->func = func;
418    t->param = param;
419    t->owner = owner;
420    if (time > 0) {
421        t->dispatch.time = apr_time_now() + time;
422    }
423    else {
424        t->dispatch.priority = priority;
425    }
426    return t;
427}
428
429/*
430 * Test it the task is the only one within the priority segment.
431 * If it is not, return the first element with same or lower priority.
432 * Otherwise, add the task into the queue and return NULL.
433 *
434 * NOTE: This function is not thread safe by itself. Caller should hold the lock
435 */
436static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
437                                            apr_thread_pool_task_t * const t)
438{
439    int seg;
440    int next;
441    apr_thread_pool_task_t *t_next;
442
443    seg = TASK_PRIORITY_SEG(t);
444    if (me->task_idx[seg]) {
445        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
446               me->task_idx[seg]);
447        t_next = me->task_idx[seg];
448        while (t_next->dispatch.priority > t->dispatch.priority) {
449            t_next = APR_RING_NEXT(t_next, link);
450            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
451                t_next) {
452                return t_next;
453            }
454        }
455        return t_next;
456    }
457
458    for (next = seg - 1; next >= 0; next--) {
459        if (me->task_idx[next]) {
460            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
461            break;
462        }
463    }
464    if (0 > next) {
465        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
466    }
467    me->task_idx[seg] = t;
468    return NULL;
469}
470
471/*
472*   schedule a task to run in "time" microseconds. Find the spot in the ring where
473*   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
474*/
475static apr_status_t schedule_task(apr_thread_pool_t *me,
476                                  apr_thread_start_t func, void *param,
477                                  void *owner, apr_interval_time_t time)
478{
479    apr_thread_pool_task_t *t;
480    apr_thread_pool_task_t *t_loc;
481    apr_thread_t *thd;
482    apr_status_t rv = APR_SUCCESS;
483    apr_thread_mutex_lock(me->lock);
484
485    t = task_new(me, func, param, 0, owner, time);
486    if (NULL == t) {
487        apr_thread_mutex_unlock(me->lock);
488        return APR_ENOMEM;
489    }
490    t_loc = APR_RING_FIRST(me->scheduled_tasks);
491    while (NULL != t_loc) {
492        /* if the time is less than the entry insert ahead of it */
493        if (t->dispatch.time < t_loc->dispatch.time) {
494            ++me->scheduled_task_cnt;
495            APR_RING_INSERT_BEFORE(t_loc, t, link);
496            break;
497        }
498        else {
499            t_loc = APR_RING_NEXT(t_loc, link);
500            if (t_loc ==
501                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
502                                  link)) {
503                ++me->scheduled_task_cnt;
504                APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
505                                     apr_thread_pool_task, link);
506                break;
507            }
508        }
509    }
510    /* there should be at least one thread for scheduled tasks */
511    if (0 == me->thd_cnt) {
512        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
513        if (APR_SUCCESS == rv) {
514            ++me->thd_cnt;
515            if (me->thd_cnt > me->thd_high)
516                me->thd_high = me->thd_cnt;
517        }
518    }
519    apr_thread_cond_signal(me->cond);
520    apr_thread_mutex_unlock(me->lock);
521    return rv;
522}
523
524static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
525                             void *param, apr_byte_t priority, int push,
526                             void *owner)
527{
528    apr_thread_pool_task_t *t;
529    apr_thread_pool_task_t *t_loc;
530    apr_thread_t *thd;
531    apr_status_t rv = APR_SUCCESS;
532
533    apr_thread_mutex_lock(me->lock);
534
535    t = task_new(me, func, param, priority, owner, 0);
536    if (NULL == t) {
537        apr_thread_mutex_unlock(me->lock);
538        return APR_ENOMEM;
539    }
540
541    t_loc = add_if_empty(me, t);
542    if (NULL == t_loc) {
543        goto FINAL_EXIT;
544    }
545
546    if (push) {
547        while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
548               t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
549            t_loc = APR_RING_NEXT(t_loc, link);
550        }
551    }
552    APR_RING_INSERT_BEFORE(t_loc, t, link);
553    if (!push) {
554        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
555            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
556        }
557    }
558
559  FINAL_EXIT:
560    me->task_cnt++;
561    if (me->task_cnt > me->tasks_high)
562        me->tasks_high = me->task_cnt;
563    if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
564                             me->task_cnt > me->threshold)) {
565        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
566        if (APR_SUCCESS == rv) {
567            ++me->thd_cnt;
568            if (me->thd_cnt > me->thd_high)
569                me->thd_high = me->thd_cnt;
570        }
571    }
572
573    apr_thread_cond_signal(me->cond);
574    apr_thread_mutex_unlock(me->lock);
575
576    return rv;
577}
578
579APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
580                                               apr_thread_start_t func,
581                                               void *param,
582                                               apr_byte_t priority,
583                                               void *owner)
584{
585    return add_task(me, func, param, priority, 1, owner);
586}
587
588APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me,
589                                                   apr_thread_start_t func,
590                                                   void *param,
591                                                   apr_interval_time_t time,
592                                                   void *owner)
593{
594    return schedule_task(me, func, param, owner, time);
595}
596
597APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
598                                              apr_thread_start_t func,
599                                              void *param,
600                                              apr_byte_t priority,
601                                              void *owner)
602{
603    return add_task(me, func, param, priority, 0, owner);
604}
605
606static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
607                                           void *owner)
608{
609    apr_thread_pool_task_t *t_loc;
610    apr_thread_pool_task_t *next;
611
612    t_loc = APR_RING_FIRST(me->scheduled_tasks);
613    while (t_loc !=
614           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
615                             link)) {
616        next = APR_RING_NEXT(t_loc, link);
617        /* if this is the owner remove it */
618        if (t_loc->owner == owner) {
619            --me->scheduled_task_cnt;
620            APR_RING_REMOVE(t_loc, link);
621        }
622        t_loc = next;
623    }
624    return APR_SUCCESS;
625}
626
627static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
628{
629    apr_thread_pool_task_t *t_loc;
630    apr_thread_pool_task_t *next;
631    int seg;
632
633    t_loc = APR_RING_FIRST(me->tasks);
634    while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
635        next = APR_RING_NEXT(t_loc, link);
636        if (t_loc->owner == owner) {
637            --me->task_cnt;
638            seg = TASK_PRIORITY_SEG(t_loc);
639            if (t_loc == me->task_idx[seg]) {
640                me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
641                if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
642                                                           apr_thread_pool_task,
643                                                           link)
644                    || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
645                    me->task_idx[seg] = NULL;
646                }
647            }
648            APR_RING_REMOVE(t_loc, link);
649        }
650        t_loc = next;
651    }
652    return APR_SUCCESS;
653}
654
655static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
656{
657#ifndef NDEBUG
658    apr_os_thread_t *os_thread;
659#endif
660    struct apr_thread_list_elt *elt;
661    apr_thread_mutex_lock(me->lock);
662    elt = APR_RING_FIRST(me->busy_thds);
663    while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
664        if (elt->current_owner != owner) {
665            elt = APR_RING_NEXT(elt, link);
666            continue;
667        }
668#ifndef NDEBUG
669        /* make sure the thread is not the one calling tasks_cancel */
670        apr_os_thread_get(&os_thread, elt->thd);
671#ifdef WIN32
672        /* hack for apr win32 bug */
673        assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
674#else
675        assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
676#endif
677#endif
678        while (elt->current_owner == owner) {
679            apr_thread_mutex_unlock(me->lock);
680            apr_sleep(200 * 1000);
681            apr_thread_mutex_lock(me->lock);
682        }
683        elt = APR_RING_FIRST(me->busy_thds);
684    }
685    apr_thread_mutex_unlock(me->lock);
686    return;
687}
688
689APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
690                                                       void *owner)
691{
692    apr_status_t rv = APR_SUCCESS;
693
694    apr_thread_mutex_lock(me->lock);
695    if (me->task_cnt > 0) {
696        rv = remove_tasks(me, owner);
697    }
698    if (me->scheduled_task_cnt > 0) {
699        rv = remove_scheduled_tasks(me, owner);
700    }
701    apr_thread_mutex_unlock(me->lock);
702    wait_on_busy_threads(me, owner);
703
704    return rv;
705}
706
707APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me)
708{
709    return me->task_cnt;
710}
711
712APU_DECLARE(apr_size_t)
713    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
714{
715    return me->scheduled_task_cnt;
716}
717
718APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me)
719{
720    return me->thd_cnt;
721}
722
723APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me)
724{
725    return me->thd_cnt - me->idle_cnt;
726}
727
728APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me)
729{
730    return me->idle_cnt;
731}
732
733APU_DECLARE(apr_size_t)
734    apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
735{
736    return me->tasks_run;
737}
738
739APU_DECLARE(apr_size_t)
740    apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
741{
742    return me->tasks_high;
743}
744
745APU_DECLARE(apr_size_t)
746    apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
747{
748    return me->thd_high;
749}
750
751APU_DECLARE(apr_size_t)
752    apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
753{
754    return me->thd_timed_out;
755}
756
757
758APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
759{
760    return me->idle_max;
761}
762
763APU_DECLARE(apr_interval_time_t)
764    apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
765{
766    return me->idle_wait;
767}
768
769/*
770 * This function stop extra idle threads to the cnt.
771 * @return the number of threads stopped
772 * NOTE: There could be busy threads become idle during this function
773 */
774static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
775                                                apr_size_t *cnt, int idle)
776{
777    struct apr_thread_list *thds;
778    apr_size_t n, n_dbg, i;
779    struct apr_thread_list_elt *head, *tail, *elt;
780
781    apr_thread_mutex_lock(me->lock);
782    if (idle) {
783        thds = me->idle_thds;
784        n = me->idle_cnt;
785    }
786    else {
787        thds = me->busy_thds;
788        n = me->thd_cnt - me->idle_cnt;
789    }
790    if (n <= *cnt) {
791        apr_thread_mutex_unlock(me->lock);
792        *cnt = 0;
793        return NULL;
794    }
795    n -= *cnt;
796
797    head = APR_RING_FIRST(thds);
798    for (i = 0; i < *cnt; i++) {
799        head = APR_RING_NEXT(head, link);
800    }
801    tail = APR_RING_LAST(thds);
802    if (idle) {
803        APR_RING_UNSPLICE(head, tail, link);
804        me->idle_cnt = *cnt;
805    }
806
807    n_dbg = 0;
808    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
809        elt->state = TH_STOP;
810        n_dbg++;
811    }
812    elt->state = TH_STOP;
813    n_dbg++;
814    assert(n == n_dbg);
815    *cnt = n;
816
817    apr_thread_mutex_unlock(me->lock);
818
819    APR_RING_PREV(head, link) = NULL;
820    APR_RING_NEXT(tail, link) = NULL;
821    return head;
822}
823
824static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
825{
826    apr_size_t n_dbg;
827    struct apr_thread_list_elt *elt, *head, *tail;
828    apr_status_t rv;
829
830    elt = trim_threads(me, &cnt, 1);
831
832    apr_thread_mutex_lock(me->lock);
833    apr_thread_cond_broadcast(me->cond);
834    apr_thread_mutex_unlock(me->lock);
835
836    n_dbg = 0;
837    if (NULL != (head = elt)) {
838        while (elt) {
839            tail = elt;
840            apr_thread_join(&rv, elt->thd);
841            elt = APR_RING_NEXT(elt, link);
842            ++n_dbg;
843        }
844        apr_thread_mutex_lock(me->lock);
845        APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
846                             apr_thread_list_elt, link);
847        apr_thread_mutex_unlock(me->lock);
848    }
849    assert(cnt == n_dbg);
850
851    return cnt;
852}
853
854/* don't join on busy threads for performance reasons, who knows how long will
855 * the task takes to perform
856 */
857static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
858{
859    trim_threads(me, &cnt, 0);
860    return cnt;
861}
862
863APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
864                                                     apr_size_t cnt)
865{
866    me->idle_max = cnt;
867    cnt = trim_idle_threads(me, cnt);
868    return cnt;
869}
870
871APU_DECLARE(apr_interval_time_t)
872    apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
873                                  apr_interval_time_t timeout)
874{
875    apr_interval_time_t oldtime;
876
877    oldtime = me->idle_wait;
878    me->idle_wait = timeout;
879
880    return oldtime;
881}
882
883APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
884{
885    return me->thd_max;
886}
887
888/*
889 * This function stop extra working threads to the new limit.
890 * NOTE: There could be busy threads become idle during this function
891 */
892APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
893                                                       apr_size_t cnt)
894{
895    unsigned int n;
896
897    me->thd_max = cnt;
898    if (0 == cnt || me->thd_cnt <= cnt) {
899        return 0;
900    }
901
902    n = me->thd_cnt - cnt;
903    if (n >= me->idle_cnt) {
904        trim_busy_threads(me, n - me->idle_cnt);
905        trim_idle_threads(me, 0);
906    }
907    else {
908        trim_idle_threads(me, me->idle_cnt - n);
909    }
910    return n;
911}
912
913APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me)
914{
915    return me->threshold;
916}
917
918APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me,
919                                                      apr_size_t val)
920{
921    apr_size_t ov;
922
923    ov = me->threshold;
924    me->threshold = val;
925    return ov;
926}
927
928APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd,
929                                                         void **owner)
930{
931    apr_status_t rv;
932    apr_thread_pool_task_t *task;
933    void *data;
934
935    rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
936    if (rv != APR_SUCCESS) {
937        return rv;
938    }
939
940    task = data;
941    if (!task) {
942        *owner = NULL;
943        return APR_BADARG;
944    }
945
946    *owner = task->owner;
947    return APR_SUCCESS;
948}
949
950#endif /* APR_HAS_THREADS */
951
952/* vim: set ts=4 sw=4 et cin tw=80: */
953