1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed
4 * with this work for additional information regarding copyright
5 * ownership.  The ASF licenses this file to you under the Apache
6 * License, Version 2.0 (the "License"); you may not use this file
7 * except in compliance with the License.  You may obtain a copy of
8 * the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
15 * implied.  See the License for the specific language governing
16 * permissions and limitations under the License.
17 */
18
19#include <assert.h>
20#include "apr_thread_pool.h"
21#include "apr_ring.h"
22#include "apr_thread_cond.h"
23#include "apr_portable.h"
24
25#if APR_HAS_THREADS
26
27#define TASK_PRIORITY_SEGS 4
28#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
29
30typedef struct apr_thread_pool_task
31{
32    APR_RING_ENTRY(apr_thread_pool_task) link;
33    apr_thread_start_t func;
34    void *param;
35    void *owner;
36    union
37    {
38        apr_byte_t priority;
39        apr_time_t time;
40    } dispatch;
41} apr_thread_pool_task_t;
42
43APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
44
45struct apr_thread_list_elt
46{
47    APR_RING_ENTRY(apr_thread_list_elt) link;
48    apr_thread_t *thd;
49    volatile void *current_owner;
50    volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state;
51};
52
53APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
54
55struct apr_thread_pool
56{
57    apr_pool_t *pool;
58    volatile apr_size_t thd_max;
59    volatile apr_size_t idle_max;
60    volatile apr_interval_time_t idle_wait;
61    volatile apr_size_t thd_cnt;
62    volatile apr_size_t idle_cnt;
63    volatile apr_size_t task_cnt;
64    volatile apr_size_t scheduled_task_cnt;
65    volatile apr_size_t threshold;
66    volatile apr_size_t tasks_run;
67    volatile apr_size_t tasks_high;
68    volatile apr_size_t thd_high;
69    volatile apr_size_t thd_timed_out;
70    struct apr_thread_pool_tasks *tasks;
71    struct apr_thread_pool_tasks *scheduled_tasks;
72    struct apr_thread_list *busy_thds;
73    struct apr_thread_list *idle_thds;
74    apr_thread_mutex_t *lock;
75    apr_thread_cond_t *cond;
76    volatile int terminated;
77    struct apr_thread_pool_tasks *recycled_tasks;
78    struct apr_thread_list *recycled_thds;
79    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
80};
81
82static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
83                                          apr_size_t init_threads,
84                                          apr_size_t max_threads)
85{
86    apr_status_t rv;
87    int i;
88
89    me->thd_max = max_threads;
90    me->idle_max = init_threads;
91    me->threshold = init_threads / 2;
92    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
93                                 me->pool);
94    if (APR_SUCCESS != rv) {
95        return rv;
96    }
97    rv = apr_thread_cond_create(&me->cond, me->pool);
98    if (APR_SUCCESS != rv) {
99        apr_thread_mutex_destroy(me->lock);
100        return rv;
101    }
102    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
103    if (!me->tasks) {
104        goto CATCH_ENOMEM;
105    }
106    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
107    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
108    if (!me->scheduled_tasks) {
109        goto CATCH_ENOMEM;
110    }
111    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
112    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
113    if (!me->recycled_tasks) {
114        goto CATCH_ENOMEM;
115    }
116    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
117    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
118    if (!me->busy_thds) {
119        goto CATCH_ENOMEM;
120    }
121    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
122    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
123    if (!me->idle_thds) {
124        goto CATCH_ENOMEM;
125    }
126    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
127    me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
128    if (!me->recycled_thds) {
129        goto CATCH_ENOMEM;
130    }
131    APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link);
132    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
133    me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0;
134    me->idle_wait = 0;
135    me->terminated = 0;
136    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
137        me->task_idx[i] = NULL;
138    }
139    goto FINAL_EXIT;
140  CATCH_ENOMEM:
141    rv = APR_ENOMEM;
142    apr_thread_mutex_destroy(me->lock);
143    apr_thread_cond_destroy(me->cond);
144  FINAL_EXIT:
145    return rv;
146}
147
148/*
149 * NOTE: This function is not thread safe by itself. Caller should hold the lock
150 */
151static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
152{
153    apr_thread_pool_task_t *task = NULL;
154    int seg;
155
156    /* check for scheduled tasks */
157    if (me->scheduled_task_cnt > 0) {
158        task = APR_RING_FIRST(me->scheduled_tasks);
159        assert(task != NULL);
160        assert(task !=
161               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
162                                 link));
163        /* if it's time */
164        if (task->dispatch.time <= apr_time_now()) {
165            --me->scheduled_task_cnt;
166            APR_RING_REMOVE(task, link);
167            return task;
168        }
169    }
170    /* check for normal tasks if we're not returning a scheduled task */
171    if (me->task_cnt == 0) {
172        return NULL;
173    }
174
175    task = APR_RING_FIRST(me->tasks);
176    assert(task != NULL);
177    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
178    --me->task_cnt;
179    seg = TASK_PRIORITY_SEG(task);
180    if (task == me->task_idx[seg]) {
181        me->task_idx[seg] = APR_RING_NEXT(task, link);
182        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
183                                                   apr_thread_pool_task, link)
184            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
185            me->task_idx[seg] = NULL;
186        }
187    }
188    APR_RING_REMOVE(task, link);
189    return task;
190}
191
192static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
193{
194    apr_thread_pool_task_t *task = NULL;
195
196    task = APR_RING_FIRST(me->scheduled_tasks);
197    assert(task != NULL);
198    assert(task !=
199           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
200                             link));
201    return task->dispatch.time - apr_time_now();
202}
203
204/*
205 * NOTE: This function is not thread safe by itself. Caller should hold the lock
206 */
207static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me,
208                                           apr_thread_t * t)
209{
210    struct apr_thread_list_elt *elt;
211
212    if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) {
213        elt = apr_pcalloc(me->pool, sizeof(*elt));
214        if (NULL == elt) {
215            return NULL;
216        }
217    }
218    else {
219        elt = APR_RING_FIRST(me->recycled_thds);
220        APR_RING_REMOVE(elt, link);
221    }
222
223    APR_RING_ELEM_INIT(elt, link);
224    elt->thd = t;
225    elt->current_owner = NULL;
226    elt->state = TH_RUN;
227    return elt;
228}
229
230/*
231 * The worker thread function. Take a task from the queue and perform it if
232 * there is any. Otherwise, put itself into the idle thread list and waiting
233 * for signal to wake up.
234 * The thread terminate directly by detach and exit when it is asked to stop
235 * after finishing a task. Otherwise, the thread should be in idle thread list
236 * and should be joined.
237 */
238static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
239{
240    apr_thread_pool_t *me = param;
241    apr_thread_pool_task_t *task = NULL;
242    apr_interval_time_t wait;
243    struct apr_thread_list_elt *elt;
244
245    apr_thread_mutex_lock(me->lock);
246    elt = elt_new(me, t);
247    if (!elt) {
248        apr_thread_mutex_unlock(me->lock);
249        apr_thread_exit(t, APR_ENOMEM);
250    }
251
252    while (!me->terminated && elt->state != TH_STOP) {
253        /* Test if not new element, it is awakened from idle */
254        if (APR_RING_NEXT(elt, link) != elt) {
255            --me->idle_cnt;
256            APR_RING_REMOVE(elt, link);
257        }
258
259        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
260        task = pop_task(me);
261        while (NULL != task && !me->terminated) {
262            ++me->tasks_run;
263            elt->current_owner = task->owner;
264            apr_thread_mutex_unlock(me->lock);
265            apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
266            task->func(t, task->param);
267            apr_thread_mutex_lock(me->lock);
268            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
269                                 apr_thread_pool_task, link);
270            elt->current_owner = NULL;
271            if (TH_STOP == elt->state) {
272                break;
273            }
274            task = pop_task(me);
275        }
276        assert(NULL == elt->current_owner);
277        if (TH_STOP != elt->state)
278            APR_RING_REMOVE(elt, link);
279
280        /* Test if a busy thread been asked to stop, which is not joinable */
281        if ((me->idle_cnt >= me->idle_max
282             && !(me->scheduled_task_cnt && 0 >= me->idle_max)
283             && !me->idle_wait)
284            || me->terminated || elt->state != TH_RUN) {
285            --me->thd_cnt;
286            if ((TH_PROBATION == elt->state) && me->idle_wait)
287                ++me->thd_timed_out;
288            APR_RING_INSERT_TAIL(me->recycled_thds, elt,
289                                 apr_thread_list_elt, link);
290            apr_thread_mutex_unlock(me->lock);
291            apr_thread_detach(t);
292            apr_thread_exit(t, APR_SUCCESS);
293            return NULL;        /* should not be here, safe net */
294        }
295
296        /* busy thread become idle */
297        ++me->idle_cnt;
298        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
299
300        /*
301         * If there is a scheduled task, always scheduled to perform that task.
302         * Since there is no guarantee that current idle threads are scheduled
303         * for next scheduled task.
304         */
305        if (me->scheduled_task_cnt)
306            wait = waiting_time(me);
307        else if (me->idle_cnt > me->idle_max) {
308            wait = me->idle_wait;
309            elt->state = TH_PROBATION;
310        }
311        else
312            wait = -1;
313
314        if (wait >= 0) {
315            apr_thread_cond_timedwait(me->cond, me->lock, wait);
316        }
317        else {
318            apr_thread_cond_wait(me->cond, me->lock);
319        }
320    }
321
322    /* idle thread been asked to stop, will be joined */
323    --me->thd_cnt;
324    apr_thread_mutex_unlock(me->lock);
325    apr_thread_exit(t, APR_SUCCESS);
326    return NULL;                /* should not be here, safe net */
327}
328
329static apr_status_t thread_pool_cleanup(void *me)
330{
331    apr_thread_pool_t *_myself = me;
332
333    _myself->terminated = 1;
334    apr_thread_pool_idle_max_set(_myself, 0);
335    while (_myself->thd_cnt) {
336        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
337    }
338    apr_thread_mutex_destroy(_myself->lock);
339    apr_thread_cond_destroy(_myself->cond);
340    return APR_SUCCESS;
341}
342
343APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
344                                                 apr_size_t init_threads,
345                                                 apr_size_t max_threads,
346                                                 apr_pool_t * pool)
347{
348    apr_thread_t *t;
349    apr_status_t rv = APR_SUCCESS;
350    apr_thread_pool_t *tp;
351
352    *me = NULL;
353    tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t));
354
355    /*
356     * This pool will be used by different threads. As we cannot ensure that
357     * our caller won't use the pool without acquiring the mutex, we must
358     * create a new sub pool.
359     */
360    rv = apr_pool_create(&tp->pool, pool);
361    if (APR_SUCCESS != rv)
362        return rv;
363    rv = thread_pool_construct(tp, init_threads, max_threads);
364    if (APR_SUCCESS != rv)
365        return rv;
366    apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup);
367
368    while (init_threads) {
369        /* Grab the mutex as apr_thread_create() and thread_pool_func() will
370         * allocate from (*me)->pool. This is dangerous if there are multiple
371         * initial threads to create.
372         */
373        apr_thread_mutex_lock(tp->lock);
374        rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool);
375        apr_thread_mutex_unlock(tp->lock);
376        if (APR_SUCCESS != rv) {
377            break;
378        }
379        tp->thd_cnt++;
380        if (tp->thd_cnt > tp->thd_high) {
381            tp->thd_high = tp->thd_cnt;
382        }
383        --init_threads;
384    }
385
386    if (rv == APR_SUCCESS) {
387        *me = tp;
388    }
389
390    return rv;
391}
392
393APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
394{
395    apr_pool_destroy(me->pool);
396    return APR_SUCCESS;
397}
398
399/*
400 * NOTE: This function is not thread safe by itself. Caller should hold the lock
401 */
402static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
403                                        apr_thread_start_t func,
404                                        void *param, apr_byte_t priority,
405                                        void *owner, apr_time_t time)
406{
407    apr_thread_pool_task_t *t;
408
409    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
410        t = apr_pcalloc(me->pool, sizeof(*t));
411        if (NULL == t) {
412            return NULL;
413        }
414    }
415    else {
416        t = APR_RING_FIRST(me->recycled_tasks);
417        APR_RING_REMOVE(t, link);
418    }
419
420    APR_RING_ELEM_INIT(t, link);
421    t->func = func;
422    t->param = param;
423    t->owner = owner;
424    if (time > 0) {
425        t->dispatch.time = apr_time_now() + time;
426    }
427    else {
428        t->dispatch.priority = priority;
429    }
430    return t;
431}
432
433/*
434 * Test it the task is the only one within the priority segment.
435 * If it is not, return the first element with same or lower priority.
436 * Otherwise, add the task into the queue and return NULL.
437 *
438 * NOTE: This function is not thread safe by itself. Caller should hold the lock
439 */
440static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
441                                            apr_thread_pool_task_t * const t)
442{
443    int seg;
444    int next;
445    apr_thread_pool_task_t *t_next;
446
447    seg = TASK_PRIORITY_SEG(t);
448    if (me->task_idx[seg]) {
449        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
450               me->task_idx[seg]);
451        t_next = me->task_idx[seg];
452        while (t_next->dispatch.priority > t->dispatch.priority) {
453            t_next = APR_RING_NEXT(t_next, link);
454            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
455                t_next) {
456                return t_next;
457            }
458        }
459        return t_next;
460    }
461
462    for (next = seg - 1; next >= 0; next--) {
463        if (me->task_idx[next]) {
464            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
465            break;
466        }
467    }
468    if (0 > next) {
469        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
470    }
471    me->task_idx[seg] = t;
472    return NULL;
473}
474
475/*
476*   schedule a task to run in "time" microseconds. Find the spot in the ring where
477*   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
478*/
479static apr_status_t schedule_task(apr_thread_pool_t *me,
480                                  apr_thread_start_t func, void *param,
481                                  void *owner, apr_interval_time_t time)
482{
483    apr_thread_pool_task_t *t;
484    apr_thread_pool_task_t *t_loc;
485    apr_thread_t *thd;
486    apr_status_t rv = APR_SUCCESS;
487    apr_thread_mutex_lock(me->lock);
488
489    t = task_new(me, func, param, 0, owner, time);
490    if (NULL == t) {
491        apr_thread_mutex_unlock(me->lock);
492        return APR_ENOMEM;
493    }
494    t_loc = APR_RING_FIRST(me->scheduled_tasks);
495    while (NULL != t_loc) {
496        /* if the time is less than the entry insert ahead of it */
497        if (t->dispatch.time < t_loc->dispatch.time) {
498            ++me->scheduled_task_cnt;
499            APR_RING_INSERT_BEFORE(t_loc, t, link);
500            break;
501        }
502        else {
503            t_loc = APR_RING_NEXT(t_loc, link);
504            if (t_loc ==
505                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
506                                  link)) {
507                ++me->scheduled_task_cnt;
508                APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
509                                     apr_thread_pool_task, link);
510                break;
511            }
512        }
513    }
514    /* there should be at least one thread for scheduled tasks */
515    if (0 == me->thd_cnt) {
516        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
517        if (APR_SUCCESS == rv) {
518            ++me->thd_cnt;
519            if (me->thd_cnt > me->thd_high)
520                me->thd_high = me->thd_cnt;
521        }
522    }
523    apr_thread_cond_signal(me->cond);
524    apr_thread_mutex_unlock(me->lock);
525    return rv;
526}
527
528static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func,
529                             void *param, apr_byte_t priority, int push,
530                             void *owner)
531{
532    apr_thread_pool_task_t *t;
533    apr_thread_pool_task_t *t_loc;
534    apr_thread_t *thd;
535    apr_status_t rv = APR_SUCCESS;
536
537    apr_thread_mutex_lock(me->lock);
538
539    t = task_new(me, func, param, priority, owner, 0);
540    if (NULL == t) {
541        apr_thread_mutex_unlock(me->lock);
542        return APR_ENOMEM;
543    }
544
545    t_loc = add_if_empty(me, t);
546    if (NULL == t_loc) {
547        goto FINAL_EXIT;
548    }
549
550    if (push) {
551        while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
552               t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
553            t_loc = APR_RING_NEXT(t_loc, link);
554        }
555    }
556    APR_RING_INSERT_BEFORE(t_loc, t, link);
557    if (!push) {
558        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
559            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
560        }
561    }
562
563  FINAL_EXIT:
564    me->task_cnt++;
565    if (me->task_cnt > me->tasks_high)
566        me->tasks_high = me->task_cnt;
567    if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
568                             me->task_cnt > me->threshold)) {
569        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
570        if (APR_SUCCESS == rv) {
571            ++me->thd_cnt;
572            if (me->thd_cnt > me->thd_high)
573                me->thd_high = me->thd_cnt;
574        }
575    }
576
577    apr_thread_cond_signal(me->cond);
578    apr_thread_mutex_unlock(me->lock);
579
580    return rv;
581}
582
583APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
584                                               apr_thread_start_t func,
585                                               void *param,
586                                               apr_byte_t priority,
587                                               void *owner)
588{
589    return add_task(me, func, param, priority, 1, owner);
590}
591
592APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me,
593                                                   apr_thread_start_t func,
594                                                   void *param,
595                                                   apr_interval_time_t time,
596                                                   void *owner)
597{
598    return schedule_task(me, func, param, owner, time);
599}
600
601APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
602                                              apr_thread_start_t func,
603                                              void *param,
604                                              apr_byte_t priority,
605                                              void *owner)
606{
607    return add_task(me, func, param, priority, 0, owner);
608}
609
610static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me,
611                                           void *owner)
612{
613    apr_thread_pool_task_t *t_loc;
614    apr_thread_pool_task_t *next;
615
616    t_loc = APR_RING_FIRST(me->scheduled_tasks);
617    while (t_loc !=
618           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
619                             link)) {
620        next = APR_RING_NEXT(t_loc, link);
621        /* if this is the owner remove it */
622        if (t_loc->owner == owner) {
623            --me->scheduled_task_cnt;
624            APR_RING_REMOVE(t_loc, link);
625        }
626        t_loc = next;
627    }
628    return APR_SUCCESS;
629}
630
631static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner)
632{
633    apr_thread_pool_task_t *t_loc;
634    apr_thread_pool_task_t *next;
635    int seg;
636
637    t_loc = APR_RING_FIRST(me->tasks);
638    while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
639        next = APR_RING_NEXT(t_loc, link);
640        if (t_loc->owner == owner) {
641            --me->task_cnt;
642            seg = TASK_PRIORITY_SEG(t_loc);
643            if (t_loc == me->task_idx[seg]) {
644                me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
645                if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
646                                                           apr_thread_pool_task,
647                                                           link)
648                    || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
649                    me->task_idx[seg] = NULL;
650                }
651            }
652            APR_RING_REMOVE(t_loc, link);
653        }
654        t_loc = next;
655    }
656    return APR_SUCCESS;
657}
658
659static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner)
660{
661#ifndef NDEBUG
662    apr_os_thread_t *os_thread;
663#endif
664    struct apr_thread_list_elt *elt;
665    apr_thread_mutex_lock(me->lock);
666    elt = APR_RING_FIRST(me->busy_thds);
667    while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
668        if (elt->current_owner != owner) {
669            elt = APR_RING_NEXT(elt, link);
670            continue;
671        }
672#ifndef NDEBUG
673        /* make sure the thread is not the one calling tasks_cancel */
674        apr_os_thread_get(&os_thread, elt->thd);
675#ifdef WIN32
676        /* hack for apr win32 bug */
677        assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
678#else
679        assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
680#endif
681#endif
682        while (elt->current_owner == owner) {
683            apr_thread_mutex_unlock(me->lock);
684            apr_sleep(200 * 1000);
685            apr_thread_mutex_lock(me->lock);
686        }
687        elt = APR_RING_FIRST(me->busy_thds);
688    }
689    apr_thread_mutex_unlock(me->lock);
690    return;
691}
692
693APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me,
694                                                       void *owner)
695{
696    apr_status_t rv = APR_SUCCESS;
697
698    apr_thread_mutex_lock(me->lock);
699    if (me->task_cnt > 0) {
700        rv = remove_tasks(me, owner);
701    }
702    if (me->scheduled_task_cnt > 0) {
703        rv = remove_scheduled_tasks(me, owner);
704    }
705    apr_thread_mutex_unlock(me->lock);
706    wait_on_busy_threads(me, owner);
707
708    return rv;
709}
710
711APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me)
712{
713    return me->task_cnt;
714}
715
716APU_DECLARE(apr_size_t)
717    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me)
718{
719    return me->scheduled_task_cnt;
720}
721
722APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me)
723{
724    return me->thd_cnt;
725}
726
727APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me)
728{
729    return me->thd_cnt - me->idle_cnt;
730}
731
732APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me)
733{
734    return me->idle_cnt;
735}
736
737APU_DECLARE(apr_size_t)
738    apr_thread_pool_tasks_run_count(apr_thread_pool_t * me)
739{
740    return me->tasks_run;
741}
742
743APU_DECLARE(apr_size_t)
744    apr_thread_pool_tasks_high_count(apr_thread_pool_t * me)
745{
746    return me->tasks_high;
747}
748
749APU_DECLARE(apr_size_t)
750    apr_thread_pool_threads_high_count(apr_thread_pool_t * me)
751{
752    return me->thd_high;
753}
754
755APU_DECLARE(apr_size_t)
756    apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me)
757{
758    return me->thd_timed_out;
759}
760
761
762APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me)
763{
764    return me->idle_max;
765}
766
767APU_DECLARE(apr_interval_time_t)
768    apr_thread_pool_idle_wait_get(apr_thread_pool_t * me)
769{
770    return me->idle_wait;
771}
772
773/*
774 * This function stop extra idle threads to the cnt.
775 * @return the number of threads stopped
776 * NOTE: There could be busy threads become idle during this function
777 */
778static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
779                                                apr_size_t *cnt, int idle)
780{
781    struct apr_thread_list *thds;
782    apr_size_t n, n_dbg, i;
783    struct apr_thread_list_elt *head, *tail, *elt;
784
785    apr_thread_mutex_lock(me->lock);
786    if (idle) {
787        thds = me->idle_thds;
788        n = me->idle_cnt;
789    }
790    else {
791        thds = me->busy_thds;
792        n = me->thd_cnt - me->idle_cnt;
793    }
794    if (n <= *cnt) {
795        apr_thread_mutex_unlock(me->lock);
796        *cnt = 0;
797        return NULL;
798    }
799    n -= *cnt;
800
801    head = APR_RING_FIRST(thds);
802    for (i = 0; i < *cnt; i++) {
803        head = APR_RING_NEXT(head, link);
804    }
805    tail = APR_RING_LAST(thds);
806    if (idle) {
807        APR_RING_UNSPLICE(head, tail, link);
808        me->idle_cnt = *cnt;
809    }
810
811    n_dbg = 0;
812    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
813        elt->state = TH_STOP;
814        n_dbg++;
815    }
816    elt->state = TH_STOP;
817    n_dbg++;
818    assert(n == n_dbg);
819    *cnt = n;
820
821    apr_thread_mutex_unlock(me->lock);
822
823    APR_RING_PREV(head, link) = NULL;
824    APR_RING_NEXT(tail, link) = NULL;
825    return head;
826}
827
828static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
829{
830    apr_size_t n_dbg;
831    struct apr_thread_list_elt *elt, *head, *tail;
832    apr_status_t rv;
833
834    elt = trim_threads(me, &cnt, 1);
835
836    apr_thread_mutex_lock(me->lock);
837    apr_thread_cond_broadcast(me->cond);
838    apr_thread_mutex_unlock(me->lock);
839
840    n_dbg = 0;
841    if (NULL != (head = elt)) {
842        while (elt) {
843            tail = elt;
844            apr_thread_join(&rv, elt->thd);
845            elt = APR_RING_NEXT(elt, link);
846            ++n_dbg;
847        }
848        apr_thread_mutex_lock(me->lock);
849        APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
850                             apr_thread_list_elt, link);
851        apr_thread_mutex_unlock(me->lock);
852    }
853    assert(cnt == n_dbg);
854
855    return cnt;
856}
857
858/* don't join on busy threads for performance reasons, who knows how long will
859 * the task takes to perform
860 */
861static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
862{
863    trim_threads(me, &cnt, 0);
864    return cnt;
865}
866
867APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me,
868                                                     apr_size_t cnt)
869{
870    me->idle_max = cnt;
871    cnt = trim_idle_threads(me, cnt);
872    return cnt;
873}
874
875APU_DECLARE(apr_interval_time_t)
876    apr_thread_pool_idle_wait_set(apr_thread_pool_t * me,
877                                  apr_interval_time_t timeout)
878{
879    apr_interval_time_t oldtime;
880
881    oldtime = me->idle_wait;
882    me->idle_wait = timeout;
883
884    return oldtime;
885}
886
887APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me)
888{
889    return me->thd_max;
890}
891
892/*
893 * This function stop extra working threads to the new limit.
894 * NOTE: There could be busy threads become idle during this function
895 */
896APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
897                                                       apr_size_t cnt)
898{
899    unsigned int n;
900
901    me->thd_max = cnt;
902    if (0 == cnt || me->thd_cnt <= cnt) {
903        return 0;
904    }
905
906    n = me->thd_cnt - cnt;
907    if (n >= me->idle_cnt) {
908        trim_busy_threads(me, n - me->idle_cnt);
909        trim_idle_threads(me, 0);
910    }
911    else {
912        trim_idle_threads(me, me->idle_cnt - n);
913    }
914    return n;
915}
916
917APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me)
918{
919    return me->threshold;
920}
921
922APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me,
923                                                      apr_size_t val)
924{
925    apr_size_t ov;
926
927    ov = me->threshold;
928    me->threshold = val;
929    return ov;
930}
931
932APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd,
933                                                         void **owner)
934{
935    apr_status_t rv;
936    apr_thread_pool_task_t *task;
937    void *data;
938
939    rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd);
940    if (rv != APR_SUCCESS) {
941        return rv;
942    }
943
944    task = data;
945    if (!task) {
946        *owner = NULL;
947        return APR_BADARG;
948    }
949
950    *owner = task->owner;
951    return APR_SUCCESS;
952}
953
954#endif /* APR_HAS_THREADS */
955
956/* vim: set ts=4 sw=4 et cin tw=80: */
957