1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "apu.h" 18#include "apr_queue.h" 19#include "apr_thread_pool.h" 20#include "apr_time.h" 21#include "abts.h" 22#include "testutil.h" 23 24#if APR_HAS_THREADS 25 26#define NUMBER_CONSUMERS 3 27#define CONSUMER_ACTIVITY 4 28#define NUMBER_PRODUCERS 4 29#define PRODUCER_ACTIVITY 5 30#define QUEUE_SIZE 100 31 32static apr_queue_t *queue; 33 34static void * APR_THREAD_FUNC consumer(apr_thread_t *thd, void *data) 35{ 36 long sleeprate; 37 abts_case *tc = data; 38 apr_status_t rv; 39 void *v; 40 41 sleeprate = 1000000/CONSUMER_ACTIVITY; 42 apr_sleep((rand() % 4) * 1000000); /* sleep random seconds */ 43 44 while (1) 45 { 46 rv = apr_queue_pop(queue, &v); 47 48 if (rv == APR_EINTR) 49 continue; 50 51 if (rv == APR_EOF) 52 break; 53 54 ABTS_TRUE(tc, v == NULL); 55 ABTS_TRUE(tc, rv == APR_SUCCESS); 56 57 apr_sleep(sleeprate); /* sleep this long to acheive our rate */ 58 } 59 60 return NULL; 61} 62 63static void * APR_THREAD_FUNC producer(apr_thread_t *thd, void *data) 64{ 65 long sleeprate; 66 abts_case *tc = data; 67 apr_status_t rv; 68 69 sleeprate = 1000000/PRODUCER_ACTIVITY; 70 apr_sleep((rand() % 4) * 1000000); /* sleep random seconds */ 71 72 while (1) 73 { 74 rv = apr_queue_push(queue, NULL); 75 76 if (rv == APR_EINTR) 77 continue; 78 79 if (rv == APR_EOF) 80 break; 81 82 ABTS_TRUE(tc, rv == APR_SUCCESS); 83 84 apr_sleep(sleeprate); /* sleep this long to acheive our rate */ 85 } 86 87 return NULL; 88} 89 90static void test_queue_producer_consumer(abts_case *tc, void *data) 91{ 92 unsigned int i; 93 apr_status_t rv; 94 apr_thread_pool_t *thrp; 95 96 /* XXX: non-portable */ 97 srand((unsigned int)apr_time_now()); 98 99 rv = apr_queue_create(&queue, QUEUE_SIZE, p); 100 ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); 101 102 rv = apr_thread_pool_create(&thrp, 0, NUMBER_CONSUMERS + NUMBER_PRODUCERS, p); 103 ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); 104 105 for (i = 0; i < NUMBER_CONSUMERS; i++) { 106 rv = apr_thread_pool_push(thrp, consumer, tc, 0, NULL); 107 ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); 108 } 109 110 for (i = 0; i < NUMBER_PRODUCERS; i++) { 111 rv = apr_thread_pool_push(thrp, producer, tc, 0, NULL); 112 ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); 113 } 114 115 apr_sleep(5000000); /* sleep 5 seconds */ 116 117 rv = apr_queue_term(queue); 118 ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); 119 120 rv = apr_thread_pool_destroy(thrp); 121 ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); 122} 123 124#endif /* APR_HAS_THREADS */ 125 126abts_suite *testqueue(abts_suite *suite) 127{ 128 suite = ADD_SUITE(suite); 129 130#if APR_HAS_THREADS 131 abts_run_test(suite, test_queue_producer_consumer, NULL); 132#endif /* APR_HAS_THREADS */ 133 134 return suite; 135} 136