1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "apr_file_io.h" 18#include "apr_thread_proc.h" 19#include "apr_thread_mutex.h" 20#include "apr_thread_cond.h" 21#include "apr_errno.h" 22#include "apr_general.h" 23#include "apr_atomic.h" 24#include "testutil.h" 25 26#define NTHREADS 10 27 28#define ABTS_SUCCESS(rv) ABTS_INT_EQUAL(tc, APR_SUCCESS, rv) 29 30#if APR_HAS_THREADS 31 32typedef struct toolbox_t toolbox_t; 33 34struct toolbox_t { 35 void *data; 36 abts_case *tc; 37 apr_thread_mutex_t *mutex; 38 apr_thread_cond_t *cond; 39 void (*func)(toolbox_t *box); 40}; 41 42typedef struct toolbox_fnptr_t toolbox_fnptr_t; 43 44struct toolbox_fnptr_t { 45 void (*func)(toolbox_t *box); 46}; 47 48static void lost_signal(abts_case *tc, void *data) 49{ 50 apr_status_t rv; 51 apr_thread_cond_t *cond = NULL; 52 apr_thread_mutex_t *mutex = NULL; 53 54 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p); 55 ABTS_SUCCESS(rv); 56 ABTS_PTR_NOTNULL(tc, mutex); 57 58 rv = apr_thread_cond_create(&cond, p); 59 ABTS_SUCCESS(rv); 60 ABTS_PTR_NOTNULL(tc, cond); 61 62 rv = apr_thread_cond_signal(cond); 63 ABTS_SUCCESS(rv); 64 65 rv = apr_thread_mutex_lock(mutex); 66 ABTS_SUCCESS(rv); 67 68 rv = apr_thread_cond_timedwait(cond, mutex, 10000); 69 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_TIMEUP(rv)); 70 71 rv = apr_thread_mutex_unlock(mutex); 72 ABTS_SUCCESS(rv); 73 74 rv = apr_thread_cond_broadcast(cond); 75 ABTS_SUCCESS(rv); 76 77 rv = apr_thread_mutex_lock(mutex); 78 ABTS_SUCCESS(rv); 79 80 rv = apr_thread_cond_timedwait(cond, mutex, 10000); 81 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_TIMEUP(rv)); 82 83 rv = apr_thread_mutex_unlock(mutex); 84 ABTS_SUCCESS(rv); 85 86 rv = apr_thread_cond_destroy(cond); 87 ABTS_SUCCESS(rv); 88 89 rv = apr_thread_mutex_destroy(mutex); 90 ABTS_SUCCESS(rv); 91} 92 93static void *APR_THREAD_FUNC thread_routine(apr_thread_t *thd, void *data) 94{ 95 toolbox_t *box = data; 96 97 box->func(box); 98 99 apr_thread_exit(thd, 0); 100 101 return NULL; 102} 103 104static void lock_and_signal(toolbox_t *box) 105{ 106 apr_status_t rv; 107 abts_case *tc = box->tc; 108 109 rv = apr_thread_mutex_lock(box->mutex); 110 ABTS_SUCCESS(rv); 111 112 rv = apr_thread_cond_signal(box->cond); 113 ABTS_SUCCESS(rv); 114 115 rv = apr_thread_mutex_unlock(box->mutex); 116 ABTS_SUCCESS(rv); 117} 118 119static void dynamic_binding(abts_case *tc, void *data) 120{ 121 unsigned int i; 122 apr_status_t rv; 123 toolbox_t box[NTHREADS]; 124 apr_thread_t *thread[NTHREADS]; 125 apr_thread_mutex_t *mutex[NTHREADS]; 126 apr_thread_cond_t *cond = NULL; 127 128 rv = apr_thread_cond_create(&cond, p); 129 ABTS_SUCCESS(rv); 130 ABTS_PTR_NOTNULL(tc, cond); 131 132 for (i = 0; i < NTHREADS; i++) { 133 rv = apr_thread_mutex_create(&mutex[i], APR_THREAD_MUTEX_DEFAULT, p); 134 ABTS_SUCCESS(rv); 135 136 rv = apr_thread_mutex_lock(mutex[i]); 137 ABTS_SUCCESS(rv); 138 139 box[i].tc = tc; 140 box[i].cond = cond; 141 box[i].mutex = mutex[i]; 142 box[i].func = lock_and_signal; 143 144 rv = apr_thread_create(&thread[i], NULL, thread_routine, &box[i], p); 145 ABTS_SUCCESS(rv); 146 } 147 148 /* 149 * The dynamic binding should be preserved because we use only one waiter 150 */ 151 152 for (i = 0; i < NTHREADS; i++) { 153 rv = apr_thread_cond_wait(cond, mutex[i]); 154 ABTS_SUCCESS(rv); 155 } 156 157 for (i = 0; i < NTHREADS; i++) { 158 rv = apr_thread_cond_timedwait(cond, mutex[i], 10000); 159 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_TIMEUP(rv)); 160 161 rv = apr_thread_mutex_unlock(mutex[i]); 162 ABTS_SUCCESS(rv); 163 } 164 165 for (i = 0; i < NTHREADS; i++) { 166 apr_status_t retval; 167 rv = apr_thread_join(&retval, thread[i]); 168 ABTS_SUCCESS(rv); 169 } 170 171 rv = apr_thread_cond_destroy(cond); 172 ABTS_SUCCESS(rv); 173 174 for (i = 0; i < NTHREADS; i++) { 175 rv = apr_thread_mutex_destroy(mutex[i]); 176 ABTS_SUCCESS(rv); 177 } 178} 179 180static void lock_and_wait(toolbox_t *box) 181{ 182 apr_status_t rv; 183 abts_case *tc = box->tc; 184 apr_uint32_t *count = box->data; 185 186 rv = apr_thread_mutex_lock(box->mutex); 187 ABTS_SUCCESS(rv); 188 189 apr_atomic_inc32(count); 190 191 rv = apr_thread_cond_wait(box->cond, box->mutex); 192 ABTS_SUCCESS(rv); 193 194 apr_atomic_dec32(count); 195 196 rv = apr_thread_mutex_unlock(box->mutex); 197 ABTS_SUCCESS(rv); 198} 199 200static void broadcast_threads(abts_case *tc, void *data) 201{ 202 toolbox_t box; 203 unsigned int i; 204 apr_status_t rv; 205 apr_uint32_t count = 0; 206 apr_thread_cond_t *cond = NULL; 207 apr_thread_mutex_t *mutex = NULL; 208 apr_thread_t *thread[NTHREADS]; 209 210 rv = apr_thread_cond_create(&cond, p); 211 ABTS_SUCCESS(rv); 212 ABTS_PTR_NOTNULL(tc, cond); 213 214 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p); 215 ABTS_SUCCESS(rv); 216 ABTS_PTR_NOTNULL(tc, mutex); 217 218 rv = apr_thread_mutex_lock(mutex); 219 ABTS_SUCCESS(rv); 220 221 box.tc = tc; 222 box.data = &count; 223 box.mutex = mutex; 224 box.cond = cond; 225 box.func = lock_and_wait; 226 227 for (i = 0; i < NTHREADS; i++) { 228 rv = apr_thread_create(&thread[i], NULL, thread_routine, &box, p); 229 ABTS_SUCCESS(rv); 230 } 231 232 do { 233 rv = apr_thread_mutex_unlock(mutex); 234 ABTS_SUCCESS(rv); 235 apr_sleep(100000); 236 rv = apr_thread_mutex_lock(mutex); 237 ABTS_SUCCESS(rv); 238 } while (apr_atomic_read32(&count) != NTHREADS); 239 240 rv = apr_thread_cond_broadcast(cond); 241 ABTS_SUCCESS(rv); 242 243 rv = apr_thread_mutex_unlock(mutex); 244 ABTS_SUCCESS(rv); 245 246 for (i = 0; i < NTHREADS; i++) { 247 apr_status_t retval; 248 rv = apr_thread_join(&retval, thread[i]); 249 ABTS_SUCCESS(rv); 250 } 251 252 ABTS_INT_EQUAL(tc, 0, count); 253 254 rv = apr_thread_cond_destroy(cond); 255 ABTS_SUCCESS(rv); 256 257 rv = apr_thread_mutex_destroy(mutex); 258 ABTS_SUCCESS(rv); 259} 260 261static void nested_lock_and_wait(toolbox_t *box) 262{ 263 apr_status_t rv; 264 abts_case *tc = box->tc; 265 266 rv = apr_thread_mutex_lock(box->mutex); 267 ABTS_SUCCESS(rv); 268 269 rv = apr_thread_mutex_lock(box->mutex); 270 ABTS_SUCCESS(rv); 271 272 rv = apr_thread_mutex_lock(box->mutex); 273 ABTS_SUCCESS(rv); 274 275 rv = apr_thread_cond_wait(box->cond, box->mutex); 276 ABTS_SUCCESS(rv); 277} 278 279static void nested_lock_and_unlock(toolbox_t *box) 280{ 281 apr_status_t rv; 282 abts_case *tc = box->tc; 283 284 rv = apr_thread_mutex_lock(box->mutex); 285 ABTS_SUCCESS(rv); 286 287 rv = apr_thread_mutex_lock(box->mutex); 288 ABTS_SUCCESS(rv); 289 290 rv = apr_thread_mutex_lock(box->mutex); 291 ABTS_SUCCESS(rv); 292 293 rv = apr_thread_cond_timedwait(box->cond, box->mutex, 2000000); 294 ABTS_SUCCESS(rv); 295 296 rv = apr_thread_mutex_unlock(box->mutex); 297 ABTS_SUCCESS(rv); 298 299 rv = apr_thread_mutex_unlock(box->mutex); 300 ABTS_SUCCESS(rv); 301} 302 303static void nested_wait(abts_case *tc, void *data) 304{ 305 toolbox_fnptr_t *fnptr = data; 306 toolbox_t box; 307 apr_status_t rv, retval; 308 apr_thread_cond_t *cond = NULL; 309 apr_thread_t *thread = NULL; 310 apr_thread_mutex_t *mutex = NULL; 311 312 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, p); 313 ABTS_SUCCESS(rv); 314 ABTS_PTR_NOTNULL(tc, mutex); 315 316 rv = apr_thread_cond_create(&cond, p); 317 ABTS_SUCCESS(rv); 318 ABTS_PTR_NOTNULL(tc, cond); 319 320 rv = apr_thread_mutex_lock(mutex); 321 ABTS_SUCCESS(rv); 322 323 box.tc = tc; 324 box.cond = cond; 325 box.mutex = mutex; 326 box.func = fnptr->func; 327 328 rv = apr_thread_create(&thread, NULL, thread_routine, &box, p); 329 ABTS_SUCCESS(rv); 330 331 rv = apr_thread_mutex_unlock(mutex); 332 ABTS_SUCCESS(rv); 333 334 /* yield the processor */ 335 apr_sleep(500000); 336 337 rv = apr_thread_cond_signal(cond); 338 ABTS_SUCCESS(rv); 339 340 rv = apr_thread_join(&retval, thread); 341 ABTS_SUCCESS(rv); 342 343 rv = apr_thread_mutex_trylock(mutex); 344 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_EBUSY(rv)); 345 346 rv = apr_thread_mutex_trylock(mutex); 347 ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_EBUSY(rv)); 348} 349 350static volatile apr_uint64_t pipe_count; 351static volatile apr_uint32_t exiting; 352 353static void pipe_consumer(toolbox_t *box) 354{ 355 char ch; 356 apr_status_t rv; 357 apr_size_t nbytes; 358 abts_case *tc = box->tc; 359 apr_file_t *out = box->data; 360 apr_uint32_t consumed = 0; 361 362 do { 363 rv = apr_thread_mutex_lock(box->mutex); 364 ABTS_SUCCESS(rv); 365 366 while (!pipe_count && !exiting) { 367 rv = apr_thread_cond_wait(box->cond, box->mutex); 368 ABTS_SUCCESS(rv); 369 } 370 371 if (!pipe_count && exiting) { 372 rv = apr_thread_mutex_unlock(box->mutex); 373 ABTS_SUCCESS(rv); 374 break; 375 } 376 377 pipe_count--; 378 consumed++; 379 380 rv = apr_thread_mutex_unlock(box->mutex); 381 ABTS_SUCCESS(rv); 382 383 rv = apr_file_read_full(out, &ch, 1, &nbytes); 384 ABTS_SUCCESS(rv); 385 ABTS_SIZE_EQUAL(tc, 1, nbytes); 386 ABTS_TRUE(tc, ch == '.'); 387 } while (1); 388 389 /* naive fairness test - it would be good to introduce or solidify 390 * a solid test to ensure one thread is not starved. 391 * ABTS_INT_EQUAL(tc, 1, !!consumed); 392 */ 393} 394 395static void pipe_write(toolbox_t *box, char ch) 396{ 397 apr_status_t rv; 398 apr_size_t nbytes; 399 abts_case *tc = box->tc; 400 apr_file_t *in = box->data; 401 402 rv = apr_file_write_full(in, &ch, 1, &nbytes); 403 ABTS_SUCCESS(rv); 404 ABTS_SIZE_EQUAL(tc, 1, nbytes); 405 406 rv = apr_thread_mutex_lock(box->mutex); 407 ABTS_SUCCESS(rv); 408 409 if (!pipe_count) { 410 rv = apr_thread_cond_signal(box->cond); 411 ABTS_SUCCESS(rv); 412 } 413 414 pipe_count++; 415 416 rv = apr_thread_mutex_unlock(box->mutex); 417 ABTS_SUCCESS(rv); 418} 419 420static void pipe_producer(toolbox_t *box) 421{ 422 apr_uint32_t loop = 500; 423 424 do { 425 pipe_write(box, '.'); 426 } while (loop--); 427} 428 429static void pipe_producer_consumer(abts_case *tc, void *data) 430{ 431 apr_status_t rv; 432 toolbox_t boxcons, boxprod; 433 apr_thread_t *thread[NTHREADS]; 434 apr_thread_cond_t *cond = NULL; 435 apr_thread_mutex_t *mutex = NULL; 436 apr_file_t *in = NULL, *out = NULL; 437 apr_uint32_t i, ncons = (apr_uint32_t)(NTHREADS * 0.70); 438 439 rv = apr_file_pipe_create(&in, &out, p); 440 ABTS_SUCCESS(rv); 441 442 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p); 443 ABTS_SUCCESS(rv); 444 ABTS_PTR_NOTNULL(tc, mutex); 445 446 rv = apr_thread_cond_create(&cond, p); 447 ABTS_SUCCESS(rv); 448 ABTS_PTR_NOTNULL(tc, cond); 449 450 boxcons.tc = tc; 451 boxcons.data = in; 452 boxcons.mutex = mutex; 453 boxcons.cond = cond; 454 boxcons.func = pipe_consumer; 455 456 for (i = 0; i < ncons; i++) { 457 rv = apr_thread_create(&thread[i], NULL, thread_routine, &boxcons, p); 458 ABTS_SUCCESS(rv); 459 } 460 461 boxprod.tc = tc; 462 boxprod.data = out; 463 boxprod.mutex = mutex; 464 boxprod.cond = cond; 465 boxprod.func = pipe_producer; 466 467 for (; i < NTHREADS; i++) { 468 rv = apr_thread_create(&thread[i], NULL, thread_routine, &boxprod, p); 469 ABTS_SUCCESS(rv); 470 } 471 472 for (i = ncons; i < NTHREADS; i++) { 473 apr_status_t retval; 474 rv = apr_thread_join(&retval, thread[i]); 475 ABTS_SUCCESS(rv); 476 } 477 478 rv = apr_thread_mutex_lock(mutex); 479 ABTS_SUCCESS(rv); 480 481 exiting = 1; 482 483 rv = apr_thread_cond_broadcast(cond); 484 ABTS_SUCCESS(rv); 485 486 rv = apr_thread_mutex_unlock(mutex); 487 ABTS_SUCCESS(rv); 488 489 for (i = 0; i < ncons; i++) { 490 apr_status_t retval; 491 rv = apr_thread_join(&retval, thread[i]); 492 ABTS_SUCCESS(rv); 493 } 494 495 rv = apr_thread_cond_destroy(cond); 496 ABTS_SUCCESS(rv); 497 498 rv = apr_thread_mutex_destroy(mutex); 499 ABTS_SUCCESS(rv); 500 501 rv = apr_file_close(in); 502 ABTS_SUCCESS(rv); 503 504 rv = apr_file_close(out); 505 ABTS_SUCCESS(rv); 506} 507 508volatile enum { 509 TOSS, 510 PING, 511 PONG, 512 OVER 513} state; 514 515static void ping(toolbox_t *box) 516{ 517 apr_status_t rv; 518 abts_case *tc = box->tc; 519 520 rv = apr_thread_mutex_lock(box->mutex); 521 ABTS_SUCCESS(rv); 522 523 if (state == TOSS) 524 state = PING; 525 526 do { 527 rv = apr_thread_cond_signal(box->cond); 528 ABTS_SUCCESS(rv); 529 530 state = PONG; 531 532 rv = apr_thread_cond_wait(box->cond, box->mutex); 533 ABTS_SUCCESS(rv); 534 535 ABTS_TRUE(tc, state == PING || state == OVER); 536 } while (state != OVER); 537 538 rv = apr_thread_mutex_unlock(box->mutex); 539 ABTS_SUCCESS(rv); 540 541 rv = apr_thread_cond_broadcast(box->cond); 542 ABTS_SUCCESS(rv); 543} 544 545static void pong(toolbox_t *box) 546{ 547 apr_status_t rv; 548 abts_case *tc = box->tc; 549 550 rv = apr_thread_mutex_lock(box->mutex); 551 ABTS_SUCCESS(rv); 552 553 if (state == TOSS) 554 state = PONG; 555 556 do { 557 rv = apr_thread_cond_signal(box->cond); 558 ABTS_SUCCESS(rv); 559 560 state = PING; 561 562 rv = apr_thread_cond_wait(box->cond, box->mutex); 563 ABTS_SUCCESS(rv); 564 565 ABTS_TRUE(tc, state == PONG || state == OVER); 566 } while (state != OVER); 567 568 rv = apr_thread_mutex_unlock(box->mutex); 569 ABTS_SUCCESS(rv); 570 571 rv = apr_thread_cond_broadcast(box->cond); 572 ABTS_SUCCESS(rv); 573} 574 575static void ping_pong(abts_case *tc, void *data) 576{ 577 apr_status_t rv, retval; 578 toolbox_t box_ping, box_pong; 579 apr_thread_cond_t *cond = NULL; 580 apr_thread_mutex_t *mutex = NULL; 581 apr_thread_t *thr_ping = NULL, *thr_pong = NULL; 582 583 rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p); 584 ABTS_SUCCESS(rv); 585 ABTS_PTR_NOTNULL(tc, mutex); 586 587 rv = apr_thread_cond_create(&cond, p); 588 ABTS_SUCCESS(rv); 589 ABTS_PTR_NOTNULL(tc, cond); 590 591 rv = apr_thread_mutex_lock(mutex); 592 ABTS_SUCCESS(rv); 593 594 box_ping.tc = tc; 595 box_ping.data = NULL; 596 box_ping.mutex = mutex; 597 box_ping.cond = cond; 598 box_ping.func = ping; 599 600 rv = apr_thread_create(&thr_ping, NULL, thread_routine, &box_ping, p); 601 ABTS_SUCCESS(rv); 602 603 box_pong.tc = tc; 604 box_pong.data = NULL; 605 box_pong.mutex = mutex; 606 box_pong.cond = cond; 607 box_pong.func = pong; 608 609 rv = apr_thread_create(&thr_pong, NULL, thread_routine, &box_pong, p); 610 ABTS_SUCCESS(rv); 611 612 state = TOSS; 613 614 rv = apr_thread_mutex_unlock(mutex); 615 ABTS_SUCCESS(rv); 616 617 apr_sleep(3000000); 618 619 rv = apr_thread_mutex_lock(mutex); 620 ABTS_SUCCESS(rv); 621 622 state = OVER; 623 624 rv = apr_thread_mutex_unlock(mutex); 625 ABTS_SUCCESS(rv); 626 627 rv = apr_thread_join(&retval, thr_ping); 628 ABTS_SUCCESS(rv); 629 630 rv = apr_thread_join(&retval, thr_pong); 631 ABTS_SUCCESS(rv); 632 633 rv = apr_thread_cond_destroy(cond); 634 ABTS_SUCCESS(rv); 635 636 rv = apr_thread_mutex_destroy(mutex); 637 ABTS_SUCCESS(rv); 638} 639#endif /* !APR_HAS_THREADS */ 640 641#if !APR_HAS_THREADS 642static void threads_not_impl(abts_case *tc, void *data) 643{ 644 ABTS_NOT_IMPL(tc, "Threads not implemented on this platform"); 645} 646#endif 647 648abts_suite *testcond(abts_suite *suite) 649{ 650#if APR_HAS_THREADS 651 toolbox_fnptr_t fnptr; 652#endif 653 suite = ADD_SUITE(suite) 654 655#if !APR_HAS_THREADS 656 abts_run_test(suite, threads_not_impl, NULL); 657#else 658 abts_run_test(suite, lost_signal, NULL); 659 abts_run_test(suite, dynamic_binding, NULL); 660 abts_run_test(suite, broadcast_threads, NULL); 661 fnptr.func = nested_lock_and_wait; 662 abts_run_test(suite, nested_wait, &fnptr); 663 fnptr.func = nested_lock_and_unlock; 664 abts_run_test(suite, nested_wait, &fnptr); 665 abts_run_test(suite, pipe_producer_consumer, NULL); 666 abts_run_test(suite, ping_pong, NULL); 667#endif 668 669 return suite; 670} 671