1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "apr_file_io.h"
18#include "apr_thread_proc.h"
19#include "apr_thread_mutex.h"
20#include "apr_thread_cond.h"
21#include "apr_errno.h"
22#include "apr_general.h"
23#include "apr_atomic.h"
24#include "testutil.h"
25
26#define NTHREADS 10
27
28#define ABTS_SUCCESS(rv)    ABTS_INT_EQUAL(tc, APR_SUCCESS, rv)
29
30#if APR_HAS_THREADS
31
32typedef struct toolbox_t toolbox_t;
33
34struct toolbox_t {
35    void *data;
36    abts_case *tc;
37    apr_thread_mutex_t *mutex;
38    apr_thread_cond_t *cond;
39    void (*func)(toolbox_t *box);
40};
41
42typedef struct toolbox_fnptr_t toolbox_fnptr_t;
43
44struct toolbox_fnptr_t {
45    void (*func)(toolbox_t *box);
46};
47
48static void lost_signal(abts_case *tc, void *data)
49{
50    apr_status_t rv;
51    apr_thread_cond_t *cond = NULL;
52    apr_thread_mutex_t *mutex = NULL;
53
54    rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p);
55    ABTS_SUCCESS(rv);
56    ABTS_PTR_NOTNULL(tc, mutex);
57
58    rv = apr_thread_cond_create(&cond, p);
59    ABTS_SUCCESS(rv);
60    ABTS_PTR_NOTNULL(tc, cond);
61
62    rv = apr_thread_cond_signal(cond);
63    ABTS_SUCCESS(rv);
64
65    rv = apr_thread_mutex_lock(mutex);
66    ABTS_SUCCESS(rv);
67
68    rv = apr_thread_cond_timedwait(cond, mutex, 10000);
69    ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_TIMEUP(rv));
70
71    rv = apr_thread_mutex_unlock(mutex);
72    ABTS_SUCCESS(rv);
73
74    rv = apr_thread_cond_broadcast(cond);
75    ABTS_SUCCESS(rv);
76
77    rv = apr_thread_mutex_lock(mutex);
78    ABTS_SUCCESS(rv);
79
80    rv = apr_thread_cond_timedwait(cond, mutex, 10000);
81    ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_TIMEUP(rv));
82
83    rv = apr_thread_mutex_unlock(mutex);
84    ABTS_SUCCESS(rv);
85
86    rv = apr_thread_cond_destroy(cond);
87    ABTS_SUCCESS(rv);
88
89    rv = apr_thread_mutex_destroy(mutex);
90    ABTS_SUCCESS(rv);
91}
92
93static void *APR_THREAD_FUNC thread_routine(apr_thread_t *thd, void *data)
94{
95    toolbox_t *box = data;
96
97    box->func(box);
98
99    apr_thread_exit(thd, 0);
100
101    return NULL;
102}
103
104static void lock_and_signal(toolbox_t *box)
105{
106    apr_status_t rv;
107    abts_case *tc = box->tc;
108
109    rv = apr_thread_mutex_lock(box->mutex);
110    ABTS_SUCCESS(rv);
111
112    rv = apr_thread_cond_signal(box->cond);
113    ABTS_SUCCESS(rv);
114
115    rv = apr_thread_mutex_unlock(box->mutex);
116    ABTS_SUCCESS(rv);
117}
118
119static void dynamic_binding(abts_case *tc, void *data)
120{
121    unsigned int i;
122    apr_status_t rv;
123    toolbox_t box[NTHREADS];
124    apr_thread_t *thread[NTHREADS];
125    apr_thread_mutex_t *mutex[NTHREADS];
126    apr_thread_cond_t *cond = NULL;
127
128    rv = apr_thread_cond_create(&cond, p);
129    ABTS_SUCCESS(rv);
130    ABTS_PTR_NOTNULL(tc, cond);
131
132    for (i = 0; i < NTHREADS; i++) {
133        rv = apr_thread_mutex_create(&mutex[i], APR_THREAD_MUTEX_DEFAULT, p);
134        ABTS_SUCCESS(rv);
135
136        rv = apr_thread_mutex_lock(mutex[i]);
137        ABTS_SUCCESS(rv);
138
139        box[i].tc = tc;
140        box[i].cond = cond;
141        box[i].mutex = mutex[i];
142        box[i].func = lock_and_signal;
143
144        rv = apr_thread_create(&thread[i], NULL, thread_routine, &box[i], p);
145        ABTS_SUCCESS(rv);
146    }
147
148    /*
149     * The dynamic binding should be preserved because we use only one waiter
150     */
151
152    for (i = 0; i < NTHREADS; i++) {
153        rv = apr_thread_cond_wait(cond, mutex[i]);
154        ABTS_SUCCESS(rv);
155    }
156
157    for (i = 0; i < NTHREADS; i++) {
158        rv = apr_thread_cond_timedwait(cond, mutex[i], 10000);
159        ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_TIMEUP(rv));
160
161        rv = apr_thread_mutex_unlock(mutex[i]);
162        ABTS_SUCCESS(rv);
163    }
164
165    for (i = 0; i < NTHREADS; i++) {
166        apr_status_t retval;
167        rv = apr_thread_join(&retval, thread[i]);
168        ABTS_SUCCESS(rv);
169    }
170
171    rv = apr_thread_cond_destroy(cond);
172    ABTS_SUCCESS(rv);
173
174    for (i = 0; i < NTHREADS; i++) {
175        rv = apr_thread_mutex_destroy(mutex[i]);
176        ABTS_SUCCESS(rv);
177    }
178}
179
180static void lock_and_wait(toolbox_t *box)
181{
182    apr_status_t rv;
183    abts_case *tc = box->tc;
184    apr_uint32_t *count = box->data;
185
186    rv = apr_thread_mutex_lock(box->mutex);
187    ABTS_SUCCESS(rv);
188
189    apr_atomic_inc32(count);
190
191    rv = apr_thread_cond_wait(box->cond, box->mutex);
192    ABTS_SUCCESS(rv);
193
194    apr_atomic_dec32(count);
195
196    rv = apr_thread_mutex_unlock(box->mutex);
197    ABTS_SUCCESS(rv);
198}
199
200static void broadcast_threads(abts_case *tc, void *data)
201{
202    toolbox_t box;
203    unsigned int i;
204    apr_status_t rv;
205    apr_uint32_t count = 0;
206    apr_thread_cond_t *cond = NULL;
207    apr_thread_mutex_t *mutex = NULL;
208    apr_thread_t *thread[NTHREADS];
209
210    rv = apr_thread_cond_create(&cond, p);
211    ABTS_SUCCESS(rv);
212    ABTS_PTR_NOTNULL(tc, cond);
213
214    rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p);
215    ABTS_SUCCESS(rv);
216    ABTS_PTR_NOTNULL(tc, mutex);
217
218    rv = apr_thread_mutex_lock(mutex);
219    ABTS_SUCCESS(rv);
220
221    box.tc = tc;
222    box.data = &count;
223    box.mutex = mutex;
224    box.cond = cond;
225    box.func = lock_and_wait;
226
227    for (i = 0; i < NTHREADS; i++) {
228        rv = apr_thread_create(&thread[i], NULL, thread_routine, &box, p);
229        ABTS_SUCCESS(rv);
230    }
231
232    do {
233        rv = apr_thread_mutex_unlock(mutex);
234        ABTS_SUCCESS(rv);
235        apr_sleep(100000);
236        rv = apr_thread_mutex_lock(mutex);
237        ABTS_SUCCESS(rv);
238    } while (apr_atomic_read32(&count) != NTHREADS);
239
240    rv = apr_thread_cond_broadcast(cond);
241    ABTS_SUCCESS(rv);
242
243    rv = apr_thread_mutex_unlock(mutex);
244    ABTS_SUCCESS(rv);
245
246    for (i = 0; i < NTHREADS; i++) {
247        apr_status_t retval;
248        rv = apr_thread_join(&retval, thread[i]);
249        ABTS_SUCCESS(rv);
250    }
251
252    ABTS_INT_EQUAL(tc, 0, count);
253
254    rv = apr_thread_cond_destroy(cond);
255    ABTS_SUCCESS(rv);
256
257    rv = apr_thread_mutex_destroy(mutex);
258    ABTS_SUCCESS(rv);
259}
260
261static void nested_lock_and_wait(toolbox_t *box)
262{
263    apr_status_t rv;
264    abts_case *tc = box->tc;
265
266    rv = apr_thread_mutex_lock(box->mutex);
267    ABTS_SUCCESS(rv);
268
269    rv = apr_thread_mutex_lock(box->mutex);
270    ABTS_SUCCESS(rv);
271
272    rv = apr_thread_mutex_lock(box->mutex);
273    ABTS_SUCCESS(rv);
274
275    rv = apr_thread_cond_wait(box->cond, box->mutex);
276    ABTS_SUCCESS(rv);
277}
278
279static void nested_lock_and_unlock(toolbox_t *box)
280{
281    apr_status_t rv;
282    abts_case *tc = box->tc;
283
284    rv = apr_thread_mutex_lock(box->mutex);
285    ABTS_SUCCESS(rv);
286
287    rv = apr_thread_mutex_lock(box->mutex);
288    ABTS_SUCCESS(rv);
289
290    rv = apr_thread_mutex_lock(box->mutex);
291    ABTS_SUCCESS(rv);
292
293    rv = apr_thread_cond_timedwait(box->cond, box->mutex, 2000000);
294    ABTS_SUCCESS(rv);
295
296    rv = apr_thread_mutex_unlock(box->mutex);
297    ABTS_SUCCESS(rv);
298
299    rv = apr_thread_mutex_unlock(box->mutex);
300    ABTS_SUCCESS(rv);
301}
302
303static void nested_wait(abts_case *tc, void *data)
304{
305    toolbox_fnptr_t *fnptr = data;
306    toolbox_t box;
307    apr_status_t rv, retval;
308    apr_thread_cond_t *cond = NULL;
309    apr_thread_t *thread = NULL;
310    apr_thread_mutex_t *mutex = NULL;
311
312    rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, p);
313    ABTS_SUCCESS(rv);
314    ABTS_PTR_NOTNULL(tc, mutex);
315
316    rv = apr_thread_cond_create(&cond, p);
317    ABTS_SUCCESS(rv);
318    ABTS_PTR_NOTNULL(tc, cond);
319
320    rv = apr_thread_mutex_lock(mutex);
321    ABTS_SUCCESS(rv);
322
323    box.tc = tc;
324    box.cond = cond;
325    box.mutex = mutex;
326    box.func = fnptr->func;
327
328    rv = apr_thread_create(&thread, NULL, thread_routine, &box, p);
329    ABTS_SUCCESS(rv);
330
331    rv = apr_thread_mutex_unlock(mutex);
332    ABTS_SUCCESS(rv);
333
334    /* yield the processor */
335    apr_sleep(500000);
336
337    rv = apr_thread_cond_signal(cond);
338    ABTS_SUCCESS(rv);
339
340    rv = apr_thread_join(&retval, thread);
341    ABTS_SUCCESS(rv);
342
343    rv = apr_thread_mutex_trylock(mutex);
344    ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_EBUSY(rv));
345
346    rv = apr_thread_mutex_trylock(mutex);
347    ABTS_INT_EQUAL(tc, 1, APR_STATUS_IS_EBUSY(rv));
348}
349
350static volatile apr_uint64_t pipe_count;
351static volatile apr_uint32_t exiting;
352
353static void pipe_consumer(toolbox_t *box)
354{
355    char ch;
356    apr_status_t rv;
357    apr_size_t nbytes;
358    abts_case *tc = box->tc;
359    apr_file_t *out = box->data;
360    apr_uint32_t consumed = 0;
361
362    do {
363        rv = apr_thread_mutex_lock(box->mutex);
364        ABTS_SUCCESS(rv);
365
366        while (!pipe_count && !exiting) {
367            rv = apr_thread_cond_wait(box->cond, box->mutex);
368            ABTS_SUCCESS(rv);
369        }
370
371        if (!pipe_count && exiting) {
372            rv = apr_thread_mutex_unlock(box->mutex);
373            ABTS_SUCCESS(rv);
374            break;
375        }
376
377        pipe_count--;
378        consumed++;
379
380        rv = apr_thread_mutex_unlock(box->mutex);
381        ABTS_SUCCESS(rv);
382
383        rv = apr_file_read_full(out, &ch, 1, &nbytes);
384        ABTS_SUCCESS(rv);
385        ABTS_SIZE_EQUAL(tc, 1, nbytes);
386        ABTS_TRUE(tc, ch == '.');
387    } while (1);
388
389    /* naive fairness test - it would be good to introduce or solidify
390     * a solid test to ensure one thread is not starved.
391     * ABTS_INT_EQUAL(tc, 1, !!consumed);
392     */
393}
394
395static void pipe_write(toolbox_t *box, char ch)
396{
397    apr_status_t rv;
398    apr_size_t nbytes;
399    abts_case *tc = box->tc;
400    apr_file_t *in = box->data;
401
402    rv = apr_file_write_full(in, &ch, 1, &nbytes);
403    ABTS_SUCCESS(rv);
404    ABTS_SIZE_EQUAL(tc, 1, nbytes);
405
406    rv = apr_thread_mutex_lock(box->mutex);
407    ABTS_SUCCESS(rv);
408
409    if (!pipe_count) {
410        rv = apr_thread_cond_signal(box->cond);
411        ABTS_SUCCESS(rv);
412    }
413
414    pipe_count++;
415
416    rv = apr_thread_mutex_unlock(box->mutex);
417    ABTS_SUCCESS(rv);
418}
419
420static void pipe_producer(toolbox_t *box)
421{
422    apr_uint32_t loop = 500;
423
424    do {
425        pipe_write(box, '.');
426    } while (loop--);
427}
428
429static void pipe_producer_consumer(abts_case *tc, void *data)
430{
431    apr_status_t rv;
432    toolbox_t boxcons, boxprod;
433    apr_thread_t *thread[NTHREADS];
434    apr_thread_cond_t *cond = NULL;
435    apr_thread_mutex_t *mutex = NULL;
436    apr_file_t *in = NULL, *out = NULL;
437    apr_uint32_t i, ncons = (apr_uint32_t)(NTHREADS * 0.70);
438
439    rv = apr_file_pipe_create(&in, &out, p);
440    ABTS_SUCCESS(rv);
441
442    rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p);
443    ABTS_SUCCESS(rv);
444    ABTS_PTR_NOTNULL(tc, mutex);
445
446    rv = apr_thread_cond_create(&cond, p);
447    ABTS_SUCCESS(rv);
448    ABTS_PTR_NOTNULL(tc, cond);
449
450    boxcons.tc = tc;
451    boxcons.data = in;
452    boxcons.mutex = mutex;
453    boxcons.cond = cond;
454    boxcons.func = pipe_consumer;
455
456    for (i = 0; i < ncons; i++) {
457        rv = apr_thread_create(&thread[i], NULL, thread_routine, &boxcons, p);
458        ABTS_SUCCESS(rv);
459    }
460
461    boxprod.tc = tc;
462    boxprod.data = out;
463    boxprod.mutex = mutex;
464    boxprod.cond = cond;
465    boxprod.func = pipe_producer;
466
467    for (; i < NTHREADS; i++) {
468        rv = apr_thread_create(&thread[i], NULL, thread_routine, &boxprod, p);
469        ABTS_SUCCESS(rv);
470    }
471
472    for (i = ncons; i < NTHREADS; i++) {
473        apr_status_t retval;
474        rv = apr_thread_join(&retval, thread[i]);
475        ABTS_SUCCESS(rv);
476    }
477
478    rv = apr_thread_mutex_lock(mutex);
479    ABTS_SUCCESS(rv);
480
481    exiting = 1;
482
483    rv = apr_thread_cond_broadcast(cond);
484    ABTS_SUCCESS(rv);
485
486    rv = apr_thread_mutex_unlock(mutex);
487    ABTS_SUCCESS(rv);
488
489    for (i = 0; i < ncons; i++) {
490        apr_status_t retval;
491        rv = apr_thread_join(&retval, thread[i]);
492        ABTS_SUCCESS(rv);
493    }
494
495    rv = apr_thread_cond_destroy(cond);
496    ABTS_SUCCESS(rv);
497
498    rv = apr_thread_mutex_destroy(mutex);
499    ABTS_SUCCESS(rv);
500
501    rv = apr_file_close(in);
502    ABTS_SUCCESS(rv);
503
504    rv = apr_file_close(out);
505    ABTS_SUCCESS(rv);
506}
507
508volatile enum {
509    TOSS,
510    PING,
511    PONG,
512    OVER
513} state;
514
515static void ping(toolbox_t *box)
516{
517    apr_status_t rv;
518    abts_case *tc = box->tc;
519
520    rv = apr_thread_mutex_lock(box->mutex);
521    ABTS_SUCCESS(rv);
522
523    if (state == TOSS)
524        state = PING;
525
526    do {
527        rv = apr_thread_cond_signal(box->cond);
528        ABTS_SUCCESS(rv);
529
530        state = PONG;
531
532        rv = apr_thread_cond_wait(box->cond, box->mutex);
533        ABTS_SUCCESS(rv);
534
535        ABTS_TRUE(tc, state == PING || state == OVER);
536    } while (state != OVER);
537
538    rv = apr_thread_mutex_unlock(box->mutex);
539    ABTS_SUCCESS(rv);
540
541    rv = apr_thread_cond_broadcast(box->cond);
542    ABTS_SUCCESS(rv);
543}
544
545static void pong(toolbox_t *box)
546{
547    apr_status_t rv;
548    abts_case *tc = box->tc;
549
550    rv = apr_thread_mutex_lock(box->mutex);
551    ABTS_SUCCESS(rv);
552
553    if (state == TOSS)
554        state = PONG;
555
556    do {
557        rv = apr_thread_cond_signal(box->cond);
558        ABTS_SUCCESS(rv);
559
560        state = PING;
561
562        rv = apr_thread_cond_wait(box->cond, box->mutex);
563        ABTS_SUCCESS(rv);
564
565        ABTS_TRUE(tc, state == PONG || state == OVER);
566    } while (state != OVER);
567
568    rv = apr_thread_mutex_unlock(box->mutex);
569    ABTS_SUCCESS(rv);
570
571    rv = apr_thread_cond_broadcast(box->cond);
572    ABTS_SUCCESS(rv);
573}
574
575static void ping_pong(abts_case *tc, void *data)
576{
577    apr_status_t rv, retval;
578    toolbox_t box_ping, box_pong;
579    apr_thread_cond_t *cond = NULL;
580    apr_thread_mutex_t *mutex = NULL;
581    apr_thread_t *thr_ping = NULL, *thr_pong = NULL;
582
583    rv = apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_DEFAULT, p);
584    ABTS_SUCCESS(rv);
585    ABTS_PTR_NOTNULL(tc, mutex);
586
587    rv = apr_thread_cond_create(&cond, p);
588    ABTS_SUCCESS(rv);
589    ABTS_PTR_NOTNULL(tc, cond);
590
591    rv = apr_thread_mutex_lock(mutex);
592    ABTS_SUCCESS(rv);
593
594    box_ping.tc = tc;
595    box_ping.data = NULL;
596    box_ping.mutex = mutex;
597    box_ping.cond = cond;
598    box_ping.func = ping;
599
600    rv = apr_thread_create(&thr_ping, NULL, thread_routine, &box_ping, p);
601    ABTS_SUCCESS(rv);
602
603    box_pong.tc = tc;
604    box_pong.data = NULL;
605    box_pong.mutex = mutex;
606    box_pong.cond = cond;
607    box_pong.func = pong;
608
609    rv = apr_thread_create(&thr_pong, NULL, thread_routine, &box_pong, p);
610    ABTS_SUCCESS(rv);
611
612    state = TOSS;
613
614    rv = apr_thread_mutex_unlock(mutex);
615    ABTS_SUCCESS(rv);
616
617    apr_sleep(3000000);
618
619    rv = apr_thread_mutex_lock(mutex);
620    ABTS_SUCCESS(rv);
621
622    state = OVER;
623
624    rv = apr_thread_mutex_unlock(mutex);
625    ABTS_SUCCESS(rv);
626
627    rv = apr_thread_join(&retval, thr_ping);
628    ABTS_SUCCESS(rv);
629
630    rv = apr_thread_join(&retval, thr_pong);
631    ABTS_SUCCESS(rv);
632
633    rv = apr_thread_cond_destroy(cond);
634    ABTS_SUCCESS(rv);
635
636    rv = apr_thread_mutex_destroy(mutex);
637    ABTS_SUCCESS(rv);
638}
639#endif /* !APR_HAS_THREADS */
640
641#if !APR_HAS_THREADS
642static void threads_not_impl(abts_case *tc, void *data)
643{
644    ABTS_NOT_IMPL(tc, "Threads not implemented on this platform");
645}
646#endif
647
648abts_suite *testcond(abts_suite *suite)
649{
650#if APR_HAS_THREADS
651    toolbox_fnptr_t fnptr;
652#endif
653    suite = ADD_SUITE(suite)
654
655#if !APR_HAS_THREADS
656    abts_run_test(suite, threads_not_impl, NULL);
657#else
658    abts_run_test(suite, lost_signal, NULL);
659    abts_run_test(suite, dynamic_binding, NULL);
660    abts_run_test(suite, broadcast_threads, NULL);
661    fnptr.func = nested_lock_and_wait;
662    abts_run_test(suite, nested_wait, &fnptr);
663    fnptr.func = nested_lock_and_unlock;
664    abts_run_test(suite, nested_wait, &fnptr);
665    abts_run_test(suite, pipe_producer_consumer, NULL);
666    abts_run_test(suite, ping_pong, NULL);
667#endif
668
669    return suite;
670}
671