1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed 4 * with this work for additional information regarding copyright 5 * ownership. The ASF licenses this file to you under the Apache 6 * License, Version 2.0 (the "License"); you may not use this file 7 * except in compliance with the License. You may obtain a copy of 8 * the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 15 * implied. See the License for the specific language governing 16 * permissions and limitations under the License. 17 */ 18 19#include <assert.h> 20#include "apr_thread_pool.h" 21#include "apr_ring.h" 22#include "apr_thread_cond.h" 23#include "apr_portable.h" 24 25#if APR_HAS_THREADS 26 27#define TASK_PRIORITY_SEGS 4 28#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64) 29 30typedef struct apr_thread_pool_task 31{ 32 APR_RING_ENTRY(apr_thread_pool_task) link; 33 apr_thread_start_t func; 34 void *param; 35 void *owner; 36 union 37 { 38 apr_byte_t priority; 39 apr_time_t time; 40 } dispatch; 41} apr_thread_pool_task_t; 42 43APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task); 44 45struct apr_thread_list_elt 46{ 47 APR_RING_ENTRY(apr_thread_list_elt) link; 48 apr_thread_t *thd; 49 volatile void *current_owner; 50 volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state; 51}; 52 53APR_RING_HEAD(apr_thread_list, apr_thread_list_elt); 54 55struct apr_thread_pool 56{ 57 apr_pool_t *pool; 58 volatile apr_size_t thd_max; 59 volatile apr_size_t idle_max; 60 volatile apr_interval_time_t idle_wait; 61 volatile apr_size_t thd_cnt; 62 volatile apr_size_t idle_cnt; 63 volatile apr_size_t task_cnt; 64 volatile apr_size_t scheduled_task_cnt; 65 volatile apr_size_t threshold; 66 volatile apr_size_t tasks_run; 67 volatile apr_size_t tasks_high; 68 volatile apr_size_t thd_high; 69 volatile apr_size_t thd_timed_out; 70 struct apr_thread_pool_tasks *tasks; 71 struct apr_thread_pool_tasks *scheduled_tasks; 72 struct apr_thread_list *busy_thds; 73 struct apr_thread_list *idle_thds; 74 apr_thread_mutex_t *lock; 75 apr_thread_cond_t *cond; 76 volatile int terminated; 77 struct apr_thread_pool_tasks *recycled_tasks; 78 struct apr_thread_list *recycled_thds; 79 apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS]; 80}; 81 82static apr_status_t thread_pool_construct(apr_thread_pool_t * me, 83 apr_size_t init_threads, 84 apr_size_t max_threads) 85{ 86 apr_status_t rv; 87 int i; 88 89 me->thd_max = max_threads; 90 me->idle_max = init_threads; 91 me->threshold = init_threads / 2; 92 rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, 93 me->pool); 94 if (APR_SUCCESS != rv) { 95 return rv; 96 } 97 rv = apr_thread_cond_create(&me->cond, me->pool); 98 if (APR_SUCCESS != rv) { 99 apr_thread_mutex_destroy(me->lock); 100 return rv; 101 } 102 me->tasks = apr_palloc(me->pool, sizeof(*me->tasks)); 103 if (!me->tasks) { 104 goto CATCH_ENOMEM; 105 } 106 APR_RING_INIT(me->tasks, apr_thread_pool_task, link); 107 me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks)); 108 if (!me->scheduled_tasks) { 109 goto CATCH_ENOMEM; 110 } 111 APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link); 112 me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks)); 113 if (!me->recycled_tasks) { 114 goto CATCH_ENOMEM; 115 } 116 APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link); 117 me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds)); 118 if (!me->busy_thds) { 119 goto CATCH_ENOMEM; 120 } 121 APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link); 122 me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds)); 123 if (!me->idle_thds) { 124 goto CATCH_ENOMEM; 125 } 126 APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link); 127 me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds)); 128 if (!me->recycled_thds) { 129 goto CATCH_ENOMEM; 130 } 131 APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link); 132 me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0; 133 me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0; 134 me->idle_wait = 0; 135 me->terminated = 0; 136 for (i = 0; i < TASK_PRIORITY_SEGS; i++) { 137 me->task_idx[i] = NULL; 138 } 139 goto FINAL_EXIT; 140 CATCH_ENOMEM: 141 rv = APR_ENOMEM; 142 apr_thread_mutex_destroy(me->lock); 143 apr_thread_cond_destroy(me->cond); 144 FINAL_EXIT: 145 return rv; 146} 147 148/* 149 * NOTE: This function is not thread safe by itself. Caller should hold the lock 150 */ 151static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me) 152{ 153 apr_thread_pool_task_t *task = NULL; 154 int seg; 155 156 /* check for scheduled tasks */ 157 if (me->scheduled_task_cnt > 0) { 158 task = APR_RING_FIRST(me->scheduled_tasks); 159 assert(task != NULL); 160 assert(task != 161 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 162 link)); 163 /* if it's time */ 164 if (task->dispatch.time <= apr_time_now()) { 165 --me->scheduled_task_cnt; 166 APR_RING_REMOVE(task, link); 167 return task; 168 } 169 } 170 /* check for normal tasks if we're not returning a scheduled task */ 171 if (me->task_cnt == 0) { 172 return NULL; 173 } 174 175 task = APR_RING_FIRST(me->tasks); 176 assert(task != NULL); 177 assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)); 178 --me->task_cnt; 179 seg = TASK_PRIORITY_SEG(task); 180 if (task == me->task_idx[seg]) { 181 me->task_idx[seg] = APR_RING_NEXT(task, link); 182 if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, 183 apr_thread_pool_task, link) 184 || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { 185 me->task_idx[seg] = NULL; 186 } 187 } 188 APR_RING_REMOVE(task, link); 189 return task; 190} 191 192static apr_interval_time_t waiting_time(apr_thread_pool_t * me) 193{ 194 apr_thread_pool_task_t *task = NULL; 195 196 task = APR_RING_FIRST(me->scheduled_tasks); 197 assert(task != NULL); 198 assert(task != 199 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 200 link)); 201 return task->dispatch.time - apr_time_now(); 202} 203 204/* 205 * NOTE: This function is not thread safe by itself. Caller should hold the lock 206 */ 207static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me, 208 apr_thread_t * t) 209{ 210 struct apr_thread_list_elt *elt; 211 212 if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) { 213 elt = apr_pcalloc(me->pool, sizeof(*elt)); 214 if (NULL == elt) { 215 return NULL; 216 } 217 } 218 else { 219 elt = APR_RING_FIRST(me->recycled_thds); 220 APR_RING_REMOVE(elt, link); 221 } 222 223 APR_RING_ELEM_INIT(elt, link); 224 elt->thd = t; 225 elt->current_owner = NULL; 226 elt->state = TH_RUN; 227 return elt; 228} 229 230/* 231 * The worker thread function. Take a task from the queue and perform it if 232 * there is any. Otherwise, put itself into the idle thread list and waiting 233 * for signal to wake up. 234 * The thread terminate directly by detach and exit when it is asked to stop 235 * after finishing a task. Otherwise, the thread should be in idle thread list 236 * and should be joined. 237 */ 238static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param) 239{ 240 apr_thread_pool_t *me = param; 241 apr_thread_pool_task_t *task = NULL; 242 apr_interval_time_t wait; 243 struct apr_thread_list_elt *elt; 244 245 apr_thread_mutex_lock(me->lock); 246 elt = elt_new(me, t); 247 if (!elt) { 248 apr_thread_mutex_unlock(me->lock); 249 apr_thread_exit(t, APR_ENOMEM); 250 } 251 252 while (!me->terminated && elt->state != TH_STOP) { 253 /* Test if not new element, it is awakened from idle */ 254 if (APR_RING_NEXT(elt, link) != elt) { 255 --me->idle_cnt; 256 APR_RING_REMOVE(elt, link); 257 } 258 259 APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link); 260 task = pop_task(me); 261 while (NULL != task && !me->terminated) { 262 ++me->tasks_run; 263 elt->current_owner = task->owner; 264 apr_thread_mutex_unlock(me->lock); 265 apr_thread_data_set(task, "apr_thread_pool_task", NULL, t); 266 task->func(t, task->param); 267 apr_thread_mutex_lock(me->lock); 268 APR_RING_INSERT_TAIL(me->recycled_tasks, task, 269 apr_thread_pool_task, link); 270 elt->current_owner = NULL; 271 if (TH_STOP == elt->state) { 272 break; 273 } 274 task = pop_task(me); 275 } 276 assert(NULL == elt->current_owner); 277 if (TH_STOP != elt->state) 278 APR_RING_REMOVE(elt, link); 279 280 /* Test if a busy thread been asked to stop, which is not joinable */ 281 if ((me->idle_cnt >= me->idle_max 282 && !(me->scheduled_task_cnt && 0 >= me->idle_max) 283 && !me->idle_wait) 284 || me->terminated || elt->state != TH_RUN) { 285 --me->thd_cnt; 286 if ((TH_PROBATION == elt->state) && me->idle_wait) 287 ++me->thd_timed_out; 288 APR_RING_INSERT_TAIL(me->recycled_thds, elt, 289 apr_thread_list_elt, link); 290 apr_thread_mutex_unlock(me->lock); 291 apr_thread_detach(t); 292 apr_thread_exit(t, APR_SUCCESS); 293 return NULL; /* should not be here, safe net */ 294 } 295 296 /* busy thread become idle */ 297 ++me->idle_cnt; 298 APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link); 299 300 /* 301 * If there is a scheduled task, always scheduled to perform that task. 302 * Since there is no guarantee that current idle threads are scheduled 303 * for next scheduled task. 304 */ 305 if (me->scheduled_task_cnt) 306 wait = waiting_time(me); 307 else if (me->idle_cnt > me->idle_max) { 308 wait = me->idle_wait; 309 elt->state = TH_PROBATION; 310 } 311 else 312 wait = -1; 313 314 if (wait >= 0) { 315 apr_thread_cond_timedwait(me->cond, me->lock, wait); 316 } 317 else { 318 apr_thread_cond_wait(me->cond, me->lock); 319 } 320 } 321 322 /* idle thread been asked to stop, will be joined */ 323 --me->thd_cnt; 324 apr_thread_mutex_unlock(me->lock); 325 apr_thread_exit(t, APR_SUCCESS); 326 return NULL; /* should not be here, safe net */ 327} 328 329static apr_status_t thread_pool_cleanup(void *me) 330{ 331 apr_thread_pool_t *_myself = me; 332 333 _myself->terminated = 1; 334 apr_thread_pool_idle_max_set(_myself, 0); 335 while (_myself->thd_cnt) { 336 apr_sleep(20 * 1000); /* spin lock with 20 ms */ 337 } 338 apr_thread_mutex_destroy(_myself->lock); 339 apr_thread_cond_destroy(_myself->cond); 340 return APR_SUCCESS; 341} 342 343APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me, 344 apr_size_t init_threads, 345 apr_size_t max_threads, 346 apr_pool_t * pool) 347{ 348 apr_thread_t *t; 349 apr_status_t rv = APR_SUCCESS; 350 apr_thread_pool_t *tp; 351 352 *me = NULL; 353 tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t)); 354 355 /* 356 * This pool will be used by different threads. As we cannot ensure that 357 * our caller won't use the pool without acquiring the mutex, we must 358 * create a new sub pool. 359 */ 360 rv = apr_pool_create(&tp->pool, pool); 361 if (APR_SUCCESS != rv) 362 return rv; 363 rv = thread_pool_construct(tp, init_threads, max_threads); 364 if (APR_SUCCESS != rv) 365 return rv; 366 apr_pool_pre_cleanup_register(tp->pool, tp, thread_pool_cleanup); 367 368 while (init_threads) { 369 /* Grab the mutex as apr_thread_create() and thread_pool_func() will 370 * allocate from (*me)->pool. This is dangerous if there are multiple 371 * initial threads to create. 372 */ 373 apr_thread_mutex_lock(tp->lock); 374 rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool); 375 apr_thread_mutex_unlock(tp->lock); 376 if (APR_SUCCESS != rv) { 377 break; 378 } 379 tp->thd_cnt++; 380 if (tp->thd_cnt > tp->thd_high) { 381 tp->thd_high = tp->thd_cnt; 382 } 383 --init_threads; 384 } 385 386 if (rv == APR_SUCCESS) { 387 *me = tp; 388 } 389 390 return rv; 391} 392 393APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me) 394{ 395 apr_pool_destroy(me->pool); 396 return APR_SUCCESS; 397} 398 399/* 400 * NOTE: This function is not thread safe by itself. Caller should hold the lock 401 */ 402static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me, 403 apr_thread_start_t func, 404 void *param, apr_byte_t priority, 405 void *owner, apr_time_t time) 406{ 407 apr_thread_pool_task_t *t; 408 409 if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) { 410 t = apr_pcalloc(me->pool, sizeof(*t)); 411 if (NULL == t) { 412 return NULL; 413 } 414 } 415 else { 416 t = APR_RING_FIRST(me->recycled_tasks); 417 APR_RING_REMOVE(t, link); 418 } 419 420 APR_RING_ELEM_INIT(t, link); 421 t->func = func; 422 t->param = param; 423 t->owner = owner; 424 if (time > 0) { 425 t->dispatch.time = apr_time_now() + time; 426 } 427 else { 428 t->dispatch.priority = priority; 429 } 430 return t; 431} 432 433/* 434 * Test it the task is the only one within the priority segment. 435 * If it is not, return the first element with same or lower priority. 436 * Otherwise, add the task into the queue and return NULL. 437 * 438 * NOTE: This function is not thread safe by itself. Caller should hold the lock 439 */ 440static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me, 441 apr_thread_pool_task_t * const t) 442{ 443 int seg; 444 int next; 445 apr_thread_pool_task_t *t_next; 446 447 seg = TASK_PRIORITY_SEG(t); 448 if (me->task_idx[seg]) { 449 assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != 450 me->task_idx[seg]); 451 t_next = me->task_idx[seg]; 452 while (t_next->dispatch.priority > t->dispatch.priority) { 453 t_next = APR_RING_NEXT(t_next, link); 454 if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) == 455 t_next) { 456 return t_next; 457 } 458 } 459 return t_next; 460 } 461 462 for (next = seg - 1; next >= 0; next--) { 463 if (me->task_idx[next]) { 464 APR_RING_INSERT_BEFORE(me->task_idx[next], t, link); 465 break; 466 } 467 } 468 if (0 > next) { 469 APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link); 470 } 471 me->task_idx[seg] = t; 472 return NULL; 473} 474 475/* 476* schedule a task to run in "time" microseconds. Find the spot in the ring where 477* the time fits. Adjust the short_time so the thread wakes up when the time is reached. 478*/ 479static apr_status_t schedule_task(apr_thread_pool_t *me, 480 apr_thread_start_t func, void *param, 481 void *owner, apr_interval_time_t time) 482{ 483 apr_thread_pool_task_t *t; 484 apr_thread_pool_task_t *t_loc; 485 apr_thread_t *thd; 486 apr_status_t rv = APR_SUCCESS; 487 apr_thread_mutex_lock(me->lock); 488 489 t = task_new(me, func, param, 0, owner, time); 490 if (NULL == t) { 491 apr_thread_mutex_unlock(me->lock); 492 return APR_ENOMEM; 493 } 494 t_loc = APR_RING_FIRST(me->scheduled_tasks); 495 while (NULL != t_loc) { 496 /* if the time is less than the entry insert ahead of it */ 497 if (t->dispatch.time < t_loc->dispatch.time) { 498 ++me->scheduled_task_cnt; 499 APR_RING_INSERT_BEFORE(t_loc, t, link); 500 break; 501 } 502 else { 503 t_loc = APR_RING_NEXT(t_loc, link); 504 if (t_loc == 505 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 506 link)) { 507 ++me->scheduled_task_cnt; 508 APR_RING_INSERT_TAIL(me->scheduled_tasks, t, 509 apr_thread_pool_task, link); 510 break; 511 } 512 } 513 } 514 /* there should be at least one thread for scheduled tasks */ 515 if (0 == me->thd_cnt) { 516 rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); 517 if (APR_SUCCESS == rv) { 518 ++me->thd_cnt; 519 if (me->thd_cnt > me->thd_high) 520 me->thd_high = me->thd_cnt; 521 } 522 } 523 apr_thread_cond_signal(me->cond); 524 apr_thread_mutex_unlock(me->lock); 525 return rv; 526} 527 528static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func, 529 void *param, apr_byte_t priority, int push, 530 void *owner) 531{ 532 apr_thread_pool_task_t *t; 533 apr_thread_pool_task_t *t_loc; 534 apr_thread_t *thd; 535 apr_status_t rv = APR_SUCCESS; 536 537 apr_thread_mutex_lock(me->lock); 538 539 t = task_new(me, func, param, priority, owner, 0); 540 if (NULL == t) { 541 apr_thread_mutex_unlock(me->lock); 542 return APR_ENOMEM; 543 } 544 545 t_loc = add_if_empty(me, t); 546 if (NULL == t_loc) { 547 goto FINAL_EXIT; 548 } 549 550 if (push) { 551 while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != 552 t_loc && t_loc->dispatch.priority >= t->dispatch.priority) { 553 t_loc = APR_RING_NEXT(t_loc, link); 554 } 555 } 556 APR_RING_INSERT_BEFORE(t_loc, t, link); 557 if (!push) { 558 if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) { 559 me->task_idx[TASK_PRIORITY_SEG(t)] = t; 560 } 561 } 562 563 FINAL_EXIT: 564 me->task_cnt++; 565 if (me->task_cnt > me->tasks_high) 566 me->tasks_high = me->task_cnt; 567 if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max && 568 me->task_cnt > me->threshold)) { 569 rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); 570 if (APR_SUCCESS == rv) { 571 ++me->thd_cnt; 572 if (me->thd_cnt > me->thd_high) 573 me->thd_high = me->thd_cnt; 574 } 575 } 576 577 apr_thread_cond_signal(me->cond); 578 apr_thread_mutex_unlock(me->lock); 579 580 return rv; 581} 582 583APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me, 584 apr_thread_start_t func, 585 void *param, 586 apr_byte_t priority, 587 void *owner) 588{ 589 return add_task(me, func, param, priority, 1, owner); 590} 591 592APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me, 593 apr_thread_start_t func, 594 void *param, 595 apr_interval_time_t time, 596 void *owner) 597{ 598 return schedule_task(me, func, param, owner, time); 599} 600 601APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me, 602 apr_thread_start_t func, 603 void *param, 604 apr_byte_t priority, 605 void *owner) 606{ 607 return add_task(me, func, param, priority, 0, owner); 608} 609 610static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me, 611 void *owner) 612{ 613 apr_thread_pool_task_t *t_loc; 614 apr_thread_pool_task_t *next; 615 616 t_loc = APR_RING_FIRST(me->scheduled_tasks); 617 while (t_loc != 618 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 619 link)) { 620 next = APR_RING_NEXT(t_loc, link); 621 /* if this is the owner remove it */ 622 if (t_loc->owner == owner) { 623 --me->scheduled_task_cnt; 624 APR_RING_REMOVE(t_loc, link); 625 } 626 t_loc = next; 627 } 628 return APR_SUCCESS; 629} 630 631static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner) 632{ 633 apr_thread_pool_task_t *t_loc; 634 apr_thread_pool_task_t *next; 635 int seg; 636 637 t_loc = APR_RING_FIRST(me->tasks); 638 while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { 639 next = APR_RING_NEXT(t_loc, link); 640 if (t_loc->owner == owner) { 641 --me->task_cnt; 642 seg = TASK_PRIORITY_SEG(t_loc); 643 if (t_loc == me->task_idx[seg]) { 644 me->task_idx[seg] = APR_RING_NEXT(t_loc, link); 645 if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, 646 apr_thread_pool_task, 647 link) 648 || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { 649 me->task_idx[seg] = NULL; 650 } 651 } 652 APR_RING_REMOVE(t_loc, link); 653 } 654 t_loc = next; 655 } 656 return APR_SUCCESS; 657} 658 659static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner) 660{ 661#ifndef NDEBUG 662 apr_os_thread_t *os_thread; 663#endif 664 struct apr_thread_list_elt *elt; 665 apr_thread_mutex_lock(me->lock); 666 elt = APR_RING_FIRST(me->busy_thds); 667 while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { 668 if (elt->current_owner != owner) { 669 elt = APR_RING_NEXT(elt, link); 670 continue; 671 } 672#ifndef NDEBUG 673 /* make sure the thread is not the one calling tasks_cancel */ 674 apr_os_thread_get(&os_thread, elt->thd); 675#ifdef WIN32 676 /* hack for apr win32 bug */ 677 assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread)); 678#else 679 assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread)); 680#endif 681#endif 682 while (elt->current_owner == owner) { 683 apr_thread_mutex_unlock(me->lock); 684 apr_sleep(200 * 1000); 685 apr_thread_mutex_lock(me->lock); 686 } 687 elt = APR_RING_FIRST(me->busy_thds); 688 } 689 apr_thread_mutex_unlock(me->lock); 690 return; 691} 692 693APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me, 694 void *owner) 695{ 696 apr_status_t rv = APR_SUCCESS; 697 698 apr_thread_mutex_lock(me->lock); 699 if (me->task_cnt > 0) { 700 rv = remove_tasks(me, owner); 701 } 702 if (me->scheduled_task_cnt > 0) { 703 rv = remove_scheduled_tasks(me, owner); 704 } 705 apr_thread_mutex_unlock(me->lock); 706 wait_on_busy_threads(me, owner); 707 708 return rv; 709} 710 711APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me) 712{ 713 return me->task_cnt; 714} 715 716APU_DECLARE(apr_size_t) 717 apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me) 718{ 719 return me->scheduled_task_cnt; 720} 721 722APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me) 723{ 724 return me->thd_cnt; 725} 726 727APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me) 728{ 729 return me->thd_cnt - me->idle_cnt; 730} 731 732APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me) 733{ 734 return me->idle_cnt; 735} 736 737APU_DECLARE(apr_size_t) 738 apr_thread_pool_tasks_run_count(apr_thread_pool_t * me) 739{ 740 return me->tasks_run; 741} 742 743APU_DECLARE(apr_size_t) 744 apr_thread_pool_tasks_high_count(apr_thread_pool_t * me) 745{ 746 return me->tasks_high; 747} 748 749APU_DECLARE(apr_size_t) 750 apr_thread_pool_threads_high_count(apr_thread_pool_t * me) 751{ 752 return me->thd_high; 753} 754 755APU_DECLARE(apr_size_t) 756 apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me) 757{ 758 return me->thd_timed_out; 759} 760 761 762APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me) 763{ 764 return me->idle_max; 765} 766 767APU_DECLARE(apr_interval_time_t) 768 apr_thread_pool_idle_wait_get(apr_thread_pool_t * me) 769{ 770 return me->idle_wait; 771} 772 773/* 774 * This function stop extra idle threads to the cnt. 775 * @return the number of threads stopped 776 * NOTE: There could be busy threads become idle during this function 777 */ 778static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me, 779 apr_size_t *cnt, int idle) 780{ 781 struct apr_thread_list *thds; 782 apr_size_t n, n_dbg, i; 783 struct apr_thread_list_elt *head, *tail, *elt; 784 785 apr_thread_mutex_lock(me->lock); 786 if (idle) { 787 thds = me->idle_thds; 788 n = me->idle_cnt; 789 } 790 else { 791 thds = me->busy_thds; 792 n = me->thd_cnt - me->idle_cnt; 793 } 794 if (n <= *cnt) { 795 apr_thread_mutex_unlock(me->lock); 796 *cnt = 0; 797 return NULL; 798 } 799 n -= *cnt; 800 801 head = APR_RING_FIRST(thds); 802 for (i = 0; i < *cnt; i++) { 803 head = APR_RING_NEXT(head, link); 804 } 805 tail = APR_RING_LAST(thds); 806 if (idle) { 807 APR_RING_UNSPLICE(head, tail, link); 808 me->idle_cnt = *cnt; 809 } 810 811 n_dbg = 0; 812 for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) { 813 elt->state = TH_STOP; 814 n_dbg++; 815 } 816 elt->state = TH_STOP; 817 n_dbg++; 818 assert(n == n_dbg); 819 *cnt = n; 820 821 apr_thread_mutex_unlock(me->lock); 822 823 APR_RING_PREV(head, link) = NULL; 824 APR_RING_NEXT(tail, link) = NULL; 825 return head; 826} 827 828static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt) 829{ 830 apr_size_t n_dbg; 831 struct apr_thread_list_elt *elt, *head, *tail; 832 apr_status_t rv; 833 834 elt = trim_threads(me, &cnt, 1); 835 836 apr_thread_mutex_lock(me->lock); 837 apr_thread_cond_broadcast(me->cond); 838 apr_thread_mutex_unlock(me->lock); 839 840 n_dbg = 0; 841 if (NULL != (head = elt)) { 842 while (elt) { 843 tail = elt; 844 apr_thread_join(&rv, elt->thd); 845 elt = APR_RING_NEXT(elt, link); 846 ++n_dbg; 847 } 848 apr_thread_mutex_lock(me->lock); 849 APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail, 850 apr_thread_list_elt, link); 851 apr_thread_mutex_unlock(me->lock); 852 } 853 assert(cnt == n_dbg); 854 855 return cnt; 856} 857 858/* don't join on busy threads for performance reasons, who knows how long will 859 * the task takes to perform 860 */ 861static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt) 862{ 863 trim_threads(me, &cnt, 0); 864 return cnt; 865} 866 867APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me, 868 apr_size_t cnt) 869{ 870 me->idle_max = cnt; 871 cnt = trim_idle_threads(me, cnt); 872 return cnt; 873} 874 875APU_DECLARE(apr_interval_time_t) 876 apr_thread_pool_idle_wait_set(apr_thread_pool_t * me, 877 apr_interval_time_t timeout) 878{ 879 apr_interval_time_t oldtime; 880 881 oldtime = me->idle_wait; 882 me->idle_wait = timeout; 883 884 return oldtime; 885} 886 887APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me) 888{ 889 return me->thd_max; 890} 891 892/* 893 * This function stop extra working threads to the new limit. 894 * NOTE: There could be busy threads become idle during this function 895 */ 896APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me, 897 apr_size_t cnt) 898{ 899 unsigned int n; 900 901 me->thd_max = cnt; 902 if (0 == cnt || me->thd_cnt <= cnt) { 903 return 0; 904 } 905 906 n = me->thd_cnt - cnt; 907 if (n >= me->idle_cnt) { 908 trim_busy_threads(me, n - me->idle_cnt); 909 trim_idle_threads(me, 0); 910 } 911 else { 912 trim_idle_threads(me, me->idle_cnt - n); 913 } 914 return n; 915} 916 917APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me) 918{ 919 return me->threshold; 920} 921 922APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me, 923 apr_size_t val) 924{ 925 apr_size_t ov; 926 927 ov = me->threshold; 928 me->threshold = val; 929 return ov; 930} 931 932APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd, 933 void **owner) 934{ 935 apr_status_t rv; 936 apr_thread_pool_task_t *task; 937 void *data; 938 939 rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd); 940 if (rv != APR_SUCCESS) { 941 return rv; 942 } 943 944 task = data; 945 if (!task) { 946 *owner = NULL; 947 return APR_BADARG; 948 } 949 950 *owner = task->owner; 951 return APR_SUCCESS; 952} 953 954#endif /* APR_HAS_THREADS */ 955 956/* vim: set ts=4 sw=4 et cin tw=80: */ 957