1/* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed 4 * with this work for additional information regarding copyright 5 * ownership. The ASF licenses this file to you under the Apache 6 * License, Version 2.0 (the "License"); you may not use this file 7 * except in compliance with the License. You may obtain a copy of 8 * the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 15 * implied. See the License for the specific language governing 16 * permissions and limitations under the License. 17 */ 18 19#include <assert.h> 20#include "apr_thread_pool.h" 21#include "apr_ring.h" 22#include "apr_thread_cond.h" 23#include "apr_portable.h" 24 25#if APR_HAS_THREADS 26 27#define TASK_PRIORITY_SEGS 4 28#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64) 29 30typedef struct apr_thread_pool_task 31{ 32 APR_RING_ENTRY(apr_thread_pool_task) link; 33 apr_thread_start_t func; 34 void *param; 35 void *owner; 36 union 37 { 38 apr_byte_t priority; 39 apr_time_t time; 40 } dispatch; 41} apr_thread_pool_task_t; 42 43APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task); 44 45struct apr_thread_list_elt 46{ 47 APR_RING_ENTRY(apr_thread_list_elt) link; 48 apr_thread_t *thd; 49 volatile void *current_owner; 50 volatile enum { TH_RUN, TH_STOP, TH_PROBATION } state; 51}; 52 53APR_RING_HEAD(apr_thread_list, apr_thread_list_elt); 54 55struct apr_thread_pool 56{ 57 apr_pool_t *pool; 58 volatile apr_size_t thd_max; 59 volatile apr_size_t idle_max; 60 volatile apr_interval_time_t idle_wait; 61 volatile apr_size_t thd_cnt; 62 volatile apr_size_t idle_cnt; 63 volatile apr_size_t task_cnt; 64 volatile apr_size_t scheduled_task_cnt; 65 volatile apr_size_t threshold; 66 volatile apr_size_t tasks_run; 67 volatile apr_size_t tasks_high; 68 volatile apr_size_t thd_high; 69 volatile apr_size_t thd_timed_out; 70 struct apr_thread_pool_tasks *tasks; 71 struct apr_thread_pool_tasks *scheduled_tasks; 72 struct apr_thread_list *busy_thds; 73 struct apr_thread_list *idle_thds; 74 apr_thread_mutex_t *lock; 75 apr_thread_cond_t *cond; 76 volatile int terminated; 77 struct apr_thread_pool_tasks *recycled_tasks; 78 struct apr_thread_list *recycled_thds; 79 apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS]; 80}; 81 82static apr_status_t thread_pool_construct(apr_thread_pool_t * me, 83 apr_size_t init_threads, 84 apr_size_t max_threads) 85{ 86 apr_status_t rv; 87 int i; 88 89 me->thd_max = max_threads; 90 me->idle_max = init_threads; 91 me->threshold = init_threads / 2; 92 rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, 93 me->pool); 94 if (APR_SUCCESS != rv) { 95 return rv; 96 } 97 rv = apr_thread_cond_create(&me->cond, me->pool); 98 if (APR_SUCCESS != rv) { 99 apr_thread_mutex_destroy(me->lock); 100 return rv; 101 } 102 me->tasks = apr_palloc(me->pool, sizeof(*me->tasks)); 103 if (!me->tasks) { 104 goto CATCH_ENOMEM; 105 } 106 APR_RING_INIT(me->tasks, apr_thread_pool_task, link); 107 me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks)); 108 if (!me->scheduled_tasks) { 109 goto CATCH_ENOMEM; 110 } 111 APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link); 112 me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks)); 113 if (!me->recycled_tasks) { 114 goto CATCH_ENOMEM; 115 } 116 APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link); 117 me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds)); 118 if (!me->busy_thds) { 119 goto CATCH_ENOMEM; 120 } 121 APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link); 122 me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds)); 123 if (!me->idle_thds) { 124 goto CATCH_ENOMEM; 125 } 126 APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link); 127 me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds)); 128 if (!me->recycled_thds) { 129 goto CATCH_ENOMEM; 130 } 131 APR_RING_INIT(me->recycled_thds, apr_thread_list_elt, link); 132 me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0; 133 me->tasks_run = me->tasks_high = me->thd_high = me->thd_timed_out = 0; 134 me->idle_wait = 0; 135 me->terminated = 0; 136 for (i = 0; i < TASK_PRIORITY_SEGS; i++) { 137 me->task_idx[i] = NULL; 138 } 139 goto FINAL_EXIT; 140 CATCH_ENOMEM: 141 rv = APR_ENOMEM; 142 apr_thread_mutex_destroy(me->lock); 143 apr_thread_cond_destroy(me->cond); 144 FINAL_EXIT: 145 return rv; 146} 147 148/* 149 * NOTE: This function is not thread safe by itself. Caller should hold the lock 150 */ 151static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me) 152{ 153 apr_thread_pool_task_t *task = NULL; 154 int seg; 155 156 /* check for scheduled tasks */ 157 if (me->scheduled_task_cnt > 0) { 158 task = APR_RING_FIRST(me->scheduled_tasks); 159 assert(task != NULL); 160 assert(task != 161 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 162 link)); 163 /* if it's time */ 164 if (task->dispatch.time <= apr_time_now()) { 165 --me->scheduled_task_cnt; 166 APR_RING_REMOVE(task, link); 167 return task; 168 } 169 } 170 /* check for normal tasks if we're not returning a scheduled task */ 171 if (me->task_cnt == 0) { 172 return NULL; 173 } 174 175 task = APR_RING_FIRST(me->tasks); 176 assert(task != NULL); 177 assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)); 178 --me->task_cnt; 179 seg = TASK_PRIORITY_SEG(task); 180 if (task == me->task_idx[seg]) { 181 me->task_idx[seg] = APR_RING_NEXT(task, link); 182 if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, 183 apr_thread_pool_task, link) 184 || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { 185 me->task_idx[seg] = NULL; 186 } 187 } 188 APR_RING_REMOVE(task, link); 189 return task; 190} 191 192static apr_interval_time_t waiting_time(apr_thread_pool_t * me) 193{ 194 apr_thread_pool_task_t *task = NULL; 195 196 task = APR_RING_FIRST(me->scheduled_tasks); 197 assert(task != NULL); 198 assert(task != 199 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 200 link)); 201 return task->dispatch.time - apr_time_now(); 202} 203 204/* 205 * NOTE: This function is not thread safe by itself. Caller should hold the lock 206 */ 207static struct apr_thread_list_elt *elt_new(apr_thread_pool_t * me, 208 apr_thread_t * t) 209{ 210 struct apr_thread_list_elt *elt; 211 212 if (APR_RING_EMPTY(me->recycled_thds, apr_thread_list_elt, link)) { 213 elt = apr_pcalloc(me->pool, sizeof(*elt)); 214 if (NULL == elt) { 215 return NULL; 216 } 217 } 218 else { 219 elt = APR_RING_FIRST(me->recycled_thds); 220 APR_RING_REMOVE(elt, link); 221 } 222 223 APR_RING_ELEM_INIT(elt, link); 224 elt->thd = t; 225 elt->current_owner = NULL; 226 elt->state = TH_RUN; 227 return elt; 228} 229 230/* 231 * The worker thread function. Take a task from the queue and perform it if 232 * there is any. Otherwise, put itself into the idle thread list and waiting 233 * for signal to wake up. 234 * The thread terminate directly by detach and exit when it is asked to stop 235 * after finishing a task. Otherwise, the thread should be in idle thread list 236 * and should be joined. 237 */ 238static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param) 239{ 240 apr_status_t rv = APR_SUCCESS; 241 apr_thread_pool_t *me = param; 242 apr_thread_pool_task_t *task = NULL; 243 apr_interval_time_t wait; 244 struct apr_thread_list_elt *elt; 245 246 apr_thread_mutex_lock(me->lock); 247 elt = elt_new(me, t); 248 if (!elt) { 249 apr_thread_mutex_unlock(me->lock); 250 apr_thread_exit(t, APR_ENOMEM); 251 } 252 253 while (!me->terminated && elt->state != TH_STOP) { 254 /* Test if not new element, it is awakened from idle */ 255 if (APR_RING_NEXT(elt, link) != elt) { 256 --me->idle_cnt; 257 APR_RING_REMOVE(elt, link); 258 } 259 260 APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link); 261 task = pop_task(me); 262 while (NULL != task && !me->terminated) { 263 ++me->tasks_run; 264 elt->current_owner = task->owner; 265 apr_thread_mutex_unlock(me->lock); 266 apr_thread_data_set(task, "apr_thread_pool_task", NULL, t); 267 task->func(t, task->param); 268 apr_thread_mutex_lock(me->lock); 269 APR_RING_INSERT_TAIL(me->recycled_tasks, task, 270 apr_thread_pool_task, link); 271 elt->current_owner = NULL; 272 if (TH_STOP == elt->state) { 273 break; 274 } 275 task = pop_task(me); 276 } 277 assert(NULL == elt->current_owner); 278 if (TH_STOP != elt->state) 279 APR_RING_REMOVE(elt, link); 280 281 /* Test if a busy thread been asked to stop, which is not joinable */ 282 if ((me->idle_cnt >= me->idle_max 283 && !(me->scheduled_task_cnt && 0 >= me->idle_max) 284 && !me->idle_wait) 285 || me->terminated || elt->state != TH_RUN) { 286 --me->thd_cnt; 287 if ((TH_PROBATION == elt->state) && me->idle_wait) 288 ++me->thd_timed_out; 289 APR_RING_INSERT_TAIL(me->recycled_thds, elt, 290 apr_thread_list_elt, link); 291 apr_thread_mutex_unlock(me->lock); 292 apr_thread_detach(t); 293 apr_thread_exit(t, APR_SUCCESS); 294 return NULL; /* should not be here, safe net */ 295 } 296 297 /* busy thread become idle */ 298 ++me->idle_cnt; 299 APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link); 300 301 /* 302 * If there is a scheduled task, always scheduled to perform that task. 303 * Since there is no guarantee that current idle threads are scheduled 304 * for next scheduled task. 305 */ 306 if (me->scheduled_task_cnt) 307 wait = waiting_time(me); 308 else if (me->idle_cnt > me->idle_max) { 309 wait = me->idle_wait; 310 elt->state = TH_PROBATION; 311 } 312 else 313 wait = -1; 314 315 if (wait >= 0) { 316 rv = apr_thread_cond_timedwait(me->cond, me->lock, wait); 317 } 318 else { 319 rv = apr_thread_cond_wait(me->cond, me->lock); 320 } 321 } 322 323 /* idle thread been asked to stop, will be joined */ 324 --me->thd_cnt; 325 apr_thread_mutex_unlock(me->lock); 326 apr_thread_exit(t, APR_SUCCESS); 327 return NULL; /* should not be here, safe net */ 328} 329 330static apr_status_t thread_pool_cleanup(void *me) 331{ 332 apr_thread_pool_t *_myself = me; 333 334 _myself->terminated = 1; 335 apr_thread_pool_idle_max_set(_myself, 0); 336 while (_myself->thd_cnt) { 337 apr_sleep(20 * 1000); /* spin lock with 20 ms */ 338 } 339 apr_thread_mutex_destroy(_myself->lock); 340 apr_thread_cond_destroy(_myself->cond); 341 return APR_SUCCESS; 342} 343 344APU_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me, 345 apr_size_t init_threads, 346 apr_size_t max_threads, 347 apr_pool_t * pool) 348{ 349 apr_thread_t *t; 350 apr_status_t rv = APR_SUCCESS; 351 apr_thread_pool_t *tp; 352 353 *me = NULL; 354 tp = apr_pcalloc(pool, sizeof(apr_thread_pool_t)); 355 356 tp->pool = pool; 357 358 rv = thread_pool_construct(tp, init_threads, max_threads); 359 if (APR_SUCCESS != rv) { 360 return rv; 361 } 362 apr_pool_cleanup_register(pool, tp, thread_pool_cleanup, 363 apr_pool_cleanup_null); 364 365 while (init_threads) { 366 /* Grab the mutex as apr_thread_create() and thread_pool_func() will 367 * allocate from (*me)->pool. This is dangerous if there are multiple 368 * initial threads to create. 369 */ 370 apr_thread_mutex_lock(tp->lock); 371 rv = apr_thread_create(&t, NULL, thread_pool_func, tp, tp->pool); 372 apr_thread_mutex_unlock(tp->lock); 373 if (APR_SUCCESS != rv) { 374 break; 375 } 376 tp->thd_cnt++; 377 if (tp->thd_cnt > tp->thd_high) { 378 tp->thd_high = tp->thd_cnt; 379 } 380 --init_threads; 381 } 382 383 if (rv == APR_SUCCESS) { 384 *me = tp; 385 } 386 387 return rv; 388} 389 390APU_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me) 391{ 392 return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup); 393} 394 395/* 396 * NOTE: This function is not thread safe by itself. Caller should hold the lock 397 */ 398static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me, 399 apr_thread_start_t func, 400 void *param, apr_byte_t priority, 401 void *owner, apr_time_t time) 402{ 403 apr_thread_pool_task_t *t; 404 405 if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) { 406 t = apr_pcalloc(me->pool, sizeof(*t)); 407 if (NULL == t) { 408 return NULL; 409 } 410 } 411 else { 412 t = APR_RING_FIRST(me->recycled_tasks); 413 APR_RING_REMOVE(t, link); 414 } 415 416 APR_RING_ELEM_INIT(t, link); 417 t->func = func; 418 t->param = param; 419 t->owner = owner; 420 if (time > 0) { 421 t->dispatch.time = apr_time_now() + time; 422 } 423 else { 424 t->dispatch.priority = priority; 425 } 426 return t; 427} 428 429/* 430 * Test it the task is the only one within the priority segment. 431 * If it is not, return the first element with same or lower priority. 432 * Otherwise, add the task into the queue and return NULL. 433 * 434 * NOTE: This function is not thread safe by itself. Caller should hold the lock 435 */ 436static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me, 437 apr_thread_pool_task_t * const t) 438{ 439 int seg; 440 int next; 441 apr_thread_pool_task_t *t_next; 442 443 seg = TASK_PRIORITY_SEG(t); 444 if (me->task_idx[seg]) { 445 assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != 446 me->task_idx[seg]); 447 t_next = me->task_idx[seg]; 448 while (t_next->dispatch.priority > t->dispatch.priority) { 449 t_next = APR_RING_NEXT(t_next, link); 450 if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) == 451 t_next) { 452 return t_next; 453 } 454 } 455 return t_next; 456 } 457 458 for (next = seg - 1; next >= 0; next--) { 459 if (me->task_idx[next]) { 460 APR_RING_INSERT_BEFORE(me->task_idx[next], t, link); 461 break; 462 } 463 } 464 if (0 > next) { 465 APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link); 466 } 467 me->task_idx[seg] = t; 468 return NULL; 469} 470 471/* 472* schedule a task to run in "time" microseconds. Find the spot in the ring where 473* the time fits. Adjust the short_time so the thread wakes up when the time is reached. 474*/ 475static apr_status_t schedule_task(apr_thread_pool_t *me, 476 apr_thread_start_t func, void *param, 477 void *owner, apr_interval_time_t time) 478{ 479 apr_thread_pool_task_t *t; 480 apr_thread_pool_task_t *t_loc; 481 apr_thread_t *thd; 482 apr_status_t rv = APR_SUCCESS; 483 apr_thread_mutex_lock(me->lock); 484 485 t = task_new(me, func, param, 0, owner, time); 486 if (NULL == t) { 487 apr_thread_mutex_unlock(me->lock); 488 return APR_ENOMEM; 489 } 490 t_loc = APR_RING_FIRST(me->scheduled_tasks); 491 while (NULL != t_loc) { 492 /* if the time is less than the entry insert ahead of it */ 493 if (t->dispatch.time < t_loc->dispatch.time) { 494 ++me->scheduled_task_cnt; 495 APR_RING_INSERT_BEFORE(t_loc, t, link); 496 break; 497 } 498 else { 499 t_loc = APR_RING_NEXT(t_loc, link); 500 if (t_loc == 501 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 502 link)) { 503 ++me->scheduled_task_cnt; 504 APR_RING_INSERT_TAIL(me->scheduled_tasks, t, 505 apr_thread_pool_task, link); 506 break; 507 } 508 } 509 } 510 /* there should be at least one thread for scheduled tasks */ 511 if (0 == me->thd_cnt) { 512 rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); 513 if (APR_SUCCESS == rv) { 514 ++me->thd_cnt; 515 if (me->thd_cnt > me->thd_high) 516 me->thd_high = me->thd_cnt; 517 } 518 } 519 apr_thread_cond_signal(me->cond); 520 apr_thread_mutex_unlock(me->lock); 521 return rv; 522} 523 524static apr_status_t add_task(apr_thread_pool_t *me, apr_thread_start_t func, 525 void *param, apr_byte_t priority, int push, 526 void *owner) 527{ 528 apr_thread_pool_task_t *t; 529 apr_thread_pool_task_t *t_loc; 530 apr_thread_t *thd; 531 apr_status_t rv = APR_SUCCESS; 532 533 apr_thread_mutex_lock(me->lock); 534 535 t = task_new(me, func, param, priority, owner, 0); 536 if (NULL == t) { 537 apr_thread_mutex_unlock(me->lock); 538 return APR_ENOMEM; 539 } 540 541 t_loc = add_if_empty(me, t); 542 if (NULL == t_loc) { 543 goto FINAL_EXIT; 544 } 545 546 if (push) { 547 while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) != 548 t_loc && t_loc->dispatch.priority >= t->dispatch.priority) { 549 t_loc = APR_RING_NEXT(t_loc, link); 550 } 551 } 552 APR_RING_INSERT_BEFORE(t_loc, t, link); 553 if (!push) { 554 if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) { 555 me->task_idx[TASK_PRIORITY_SEG(t)] = t; 556 } 557 } 558 559 FINAL_EXIT: 560 me->task_cnt++; 561 if (me->task_cnt > me->tasks_high) 562 me->tasks_high = me->task_cnt; 563 if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max && 564 me->task_cnt > me->threshold)) { 565 rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool); 566 if (APR_SUCCESS == rv) { 567 ++me->thd_cnt; 568 if (me->thd_cnt > me->thd_high) 569 me->thd_high = me->thd_cnt; 570 } 571 } 572 573 apr_thread_cond_signal(me->cond); 574 apr_thread_mutex_unlock(me->lock); 575 576 return rv; 577} 578 579APU_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me, 580 apr_thread_start_t func, 581 void *param, 582 apr_byte_t priority, 583 void *owner) 584{ 585 return add_task(me, func, param, priority, 1, owner); 586} 587 588APU_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t *me, 589 apr_thread_start_t func, 590 void *param, 591 apr_interval_time_t time, 592 void *owner) 593{ 594 return schedule_task(me, func, param, owner, time); 595} 596 597APU_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me, 598 apr_thread_start_t func, 599 void *param, 600 apr_byte_t priority, 601 void *owner) 602{ 603 return add_task(me, func, param, priority, 0, owner); 604} 605 606static apr_status_t remove_scheduled_tasks(apr_thread_pool_t *me, 607 void *owner) 608{ 609 apr_thread_pool_task_t *t_loc; 610 apr_thread_pool_task_t *next; 611 612 t_loc = APR_RING_FIRST(me->scheduled_tasks); 613 while (t_loc != 614 APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task, 615 link)) { 616 next = APR_RING_NEXT(t_loc, link); 617 /* if this is the owner remove it */ 618 if (t_loc->owner == owner) { 619 --me->scheduled_task_cnt; 620 APR_RING_REMOVE(t_loc, link); 621 } 622 t_loc = next; 623 } 624 return APR_SUCCESS; 625} 626 627static apr_status_t remove_tasks(apr_thread_pool_t *me, void *owner) 628{ 629 apr_thread_pool_task_t *t_loc; 630 apr_thread_pool_task_t *next; 631 int seg; 632 633 t_loc = APR_RING_FIRST(me->tasks); 634 while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { 635 next = APR_RING_NEXT(t_loc, link); 636 if (t_loc->owner == owner) { 637 --me->task_cnt; 638 seg = TASK_PRIORITY_SEG(t_loc); 639 if (t_loc == me->task_idx[seg]) { 640 me->task_idx[seg] = APR_RING_NEXT(t_loc, link); 641 if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks, 642 apr_thread_pool_task, 643 link) 644 || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) { 645 me->task_idx[seg] = NULL; 646 } 647 } 648 APR_RING_REMOVE(t_loc, link); 649 } 650 t_loc = next; 651 } 652 return APR_SUCCESS; 653} 654 655static void wait_on_busy_threads(apr_thread_pool_t *me, void *owner) 656{ 657#ifndef NDEBUG 658 apr_os_thread_t *os_thread; 659#endif 660 struct apr_thread_list_elt *elt; 661 apr_thread_mutex_lock(me->lock); 662 elt = APR_RING_FIRST(me->busy_thds); 663 while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { 664 if (elt->current_owner != owner) { 665 elt = APR_RING_NEXT(elt, link); 666 continue; 667 } 668#ifndef NDEBUG 669 /* make sure the thread is not the one calling tasks_cancel */ 670 apr_os_thread_get(&os_thread, elt->thd); 671#ifdef WIN32 672 /* hack for apr win32 bug */ 673 assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread)); 674#else 675 assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread)); 676#endif 677#endif 678 while (elt->current_owner == owner) { 679 apr_thread_mutex_unlock(me->lock); 680 apr_sleep(200 * 1000); 681 apr_thread_mutex_lock(me->lock); 682 } 683 elt = APR_RING_FIRST(me->busy_thds); 684 } 685 apr_thread_mutex_unlock(me->lock); 686 return; 687} 688 689APU_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t *me, 690 void *owner) 691{ 692 apr_status_t rv = APR_SUCCESS; 693 694 apr_thread_mutex_lock(me->lock); 695 if (me->task_cnt > 0) { 696 rv = remove_tasks(me, owner); 697 } 698 if (me->scheduled_task_cnt > 0) { 699 rv = remove_scheduled_tasks(me, owner); 700 } 701 apr_thread_mutex_unlock(me->lock); 702 wait_on_busy_threads(me, owner); 703 704 return rv; 705} 706 707APU_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t *me) 708{ 709 return me->task_cnt; 710} 711 712APU_DECLARE(apr_size_t) 713 apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t *me) 714{ 715 return me->scheduled_task_cnt; 716} 717 718APU_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t *me) 719{ 720 return me->thd_cnt; 721} 722 723APU_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t *me) 724{ 725 return me->thd_cnt - me->idle_cnt; 726} 727 728APU_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t *me) 729{ 730 return me->idle_cnt; 731} 732 733APU_DECLARE(apr_size_t) 734 apr_thread_pool_tasks_run_count(apr_thread_pool_t * me) 735{ 736 return me->tasks_run; 737} 738 739APU_DECLARE(apr_size_t) 740 apr_thread_pool_tasks_high_count(apr_thread_pool_t * me) 741{ 742 return me->tasks_high; 743} 744 745APU_DECLARE(apr_size_t) 746 apr_thread_pool_threads_high_count(apr_thread_pool_t * me) 747{ 748 return me->thd_high; 749} 750 751APU_DECLARE(apr_size_t) 752 apr_thread_pool_threads_idle_timeout_count(apr_thread_pool_t * me) 753{ 754 return me->thd_timed_out; 755} 756 757 758APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t *me) 759{ 760 return me->idle_max; 761} 762 763APU_DECLARE(apr_interval_time_t) 764 apr_thread_pool_idle_wait_get(apr_thread_pool_t * me) 765{ 766 return me->idle_wait; 767} 768 769/* 770 * This function stop extra idle threads to the cnt. 771 * @return the number of threads stopped 772 * NOTE: There could be busy threads become idle during this function 773 */ 774static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me, 775 apr_size_t *cnt, int idle) 776{ 777 struct apr_thread_list *thds; 778 apr_size_t n, n_dbg, i; 779 struct apr_thread_list_elt *head, *tail, *elt; 780 781 apr_thread_mutex_lock(me->lock); 782 if (idle) { 783 thds = me->idle_thds; 784 n = me->idle_cnt; 785 } 786 else { 787 thds = me->busy_thds; 788 n = me->thd_cnt - me->idle_cnt; 789 } 790 if (n <= *cnt) { 791 apr_thread_mutex_unlock(me->lock); 792 *cnt = 0; 793 return NULL; 794 } 795 n -= *cnt; 796 797 head = APR_RING_FIRST(thds); 798 for (i = 0; i < *cnt; i++) { 799 head = APR_RING_NEXT(head, link); 800 } 801 tail = APR_RING_LAST(thds); 802 if (idle) { 803 APR_RING_UNSPLICE(head, tail, link); 804 me->idle_cnt = *cnt; 805 } 806 807 n_dbg = 0; 808 for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) { 809 elt->state = TH_STOP; 810 n_dbg++; 811 } 812 elt->state = TH_STOP; 813 n_dbg++; 814 assert(n == n_dbg); 815 *cnt = n; 816 817 apr_thread_mutex_unlock(me->lock); 818 819 APR_RING_PREV(head, link) = NULL; 820 APR_RING_NEXT(tail, link) = NULL; 821 return head; 822} 823 824static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt) 825{ 826 apr_size_t n_dbg; 827 struct apr_thread_list_elt *elt, *head, *tail; 828 apr_status_t rv; 829 830 elt = trim_threads(me, &cnt, 1); 831 832 apr_thread_mutex_lock(me->lock); 833 apr_thread_cond_broadcast(me->cond); 834 apr_thread_mutex_unlock(me->lock); 835 836 n_dbg = 0; 837 if (NULL != (head = elt)) { 838 while (elt) { 839 tail = elt; 840 apr_thread_join(&rv, elt->thd); 841 elt = APR_RING_NEXT(elt, link); 842 ++n_dbg; 843 } 844 apr_thread_mutex_lock(me->lock); 845 APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail, 846 apr_thread_list_elt, link); 847 apr_thread_mutex_unlock(me->lock); 848 } 849 assert(cnt == n_dbg); 850 851 return cnt; 852} 853 854/* don't join on busy threads for performance reasons, who knows how long will 855 * the task takes to perform 856 */ 857static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt) 858{ 859 trim_threads(me, &cnt, 0); 860 return cnt; 861} 862 863APU_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t *me, 864 apr_size_t cnt) 865{ 866 me->idle_max = cnt; 867 cnt = trim_idle_threads(me, cnt); 868 return cnt; 869} 870 871APU_DECLARE(apr_interval_time_t) 872 apr_thread_pool_idle_wait_set(apr_thread_pool_t * me, 873 apr_interval_time_t timeout) 874{ 875 apr_interval_time_t oldtime; 876 877 oldtime = me->idle_wait; 878 me->idle_wait = timeout; 879 880 return oldtime; 881} 882 883APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *me) 884{ 885 return me->thd_max; 886} 887 888/* 889 * This function stop extra working threads to the new limit. 890 * NOTE: There could be busy threads become idle during this function 891 */ 892APU_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me, 893 apr_size_t cnt) 894{ 895 unsigned int n; 896 897 me->thd_max = cnt; 898 if (0 == cnt || me->thd_cnt <= cnt) { 899 return 0; 900 } 901 902 n = me->thd_cnt - cnt; 903 if (n >= me->idle_cnt) { 904 trim_busy_threads(me, n - me->idle_cnt); 905 trim_idle_threads(me, 0); 906 } 907 else { 908 trim_idle_threads(me, me->idle_cnt - n); 909 } 910 return n; 911} 912 913APU_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t *me) 914{ 915 return me->threshold; 916} 917 918APU_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t *me, 919 apr_size_t val) 920{ 921 apr_size_t ov; 922 923 ov = me->threshold; 924 me->threshold = val; 925 return ov; 926} 927 928APU_DECLARE(apr_status_t) apr_thread_pool_task_owner_get(apr_thread_t *thd, 929 void **owner) 930{ 931 apr_status_t rv; 932 apr_thread_pool_task_t *task; 933 void *data; 934 935 rv = apr_thread_data_get(&data, "apr_thread_pool_task", thd); 936 if (rv != APR_SUCCESS) { 937 return rv; 938 } 939 940 task = data; 941 if (!task) { 942 *owner = NULL; 943 return APR_BADARG; 944 } 945 946 *owner = task->owner; 947 return APR_SUCCESS; 948} 949 950#endif /* APR_HAS_THREADS */ 951 952/* vim: set ts=4 sw=4 et cin tw=80: */ 953