1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "apr.h" 18 19#if APR_HAVE_STDIO_H 20#include <stdio.h> 21#endif 22#if APR_HAVE_STDLIB_H 23#include <stdlib.h> 24#endif 25#if APR_HAVE_UNISTD_H 26#include <unistd.h> 27#endif 28 29#include "apu.h" 30#include "apr_portable.h" 31#include "apr_thread_mutex.h" 32#include "apr_thread_cond.h" 33#include "apr_errno.h" 34#include "apr_queue.h" 35 36#if APR_HAS_THREADS 37/* 38 * define this to get debug messages 39 * 40#define QUEUE_DEBUG 41 */ 42 43struct apr_queue_t { 44 void **data; 45 unsigned int nelts; /**< # elements */ 46 unsigned int in; /**< next empty location */ 47 unsigned int out; /**< next filled location */ 48 unsigned int bounds;/**< max size of queue */ 49 unsigned int full_waiters; 50 unsigned int empty_waiters; 51 apr_thread_mutex_t *one_big_mutex; 52 apr_thread_cond_t *not_empty; 53 apr_thread_cond_t *not_full; 54 int terminated; 55}; 56 57#ifdef QUEUE_DEBUG 58static void Q_DBG(char*msg, apr_queue_t *q) { 59 fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n", 60 apr_os_thread_current(), 61 q->nelts, q->in, q->out, 62 msg 63 ); 64} 65#else 66#define Q_DBG(x,y) 67#endif 68 69/** 70 * Detects when the apr_queue_t is full. This utility function is expected 71 * to be called from within critical sections, and is not threadsafe. 72 */ 73#define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds) 74 75/** 76 * Detects when the apr_queue_t is empty. This utility function is expected 77 * to be called from within critical sections, and is not threadsafe. 78 */ 79#define apr_queue_empty(queue) ((queue)->nelts == 0) 80 81/** 82 * Callback routine that is called to destroy this 83 * apr_queue_t when its pool is destroyed. 84 */ 85static apr_status_t queue_destroy(void *data) 86{ 87 apr_queue_t *queue = data; 88 89 /* Ignore errors here, we can't do anything about them anyway. */ 90 91 apr_thread_cond_destroy(queue->not_empty); 92 apr_thread_cond_destroy(queue->not_full); 93 apr_thread_mutex_destroy(queue->one_big_mutex); 94 95 return APR_SUCCESS; 96} 97 98/** 99 * Initialize the apr_queue_t. 100 */ 101APU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q, 102 unsigned int queue_capacity, 103 apr_pool_t *a) 104{ 105 apr_status_t rv; 106 apr_queue_t *queue; 107 queue = apr_palloc(a, sizeof(apr_queue_t)); 108 *q = queue; 109 110 /* nested doesn't work ;( */ 111 rv = apr_thread_mutex_create(&queue->one_big_mutex, 112 APR_THREAD_MUTEX_UNNESTED, 113 a); 114 if (rv != APR_SUCCESS) { 115 return rv; 116 } 117 118 rv = apr_thread_cond_create(&queue->not_empty, a); 119 if (rv != APR_SUCCESS) { 120 return rv; 121 } 122 123 rv = apr_thread_cond_create(&queue->not_full, a); 124 if (rv != APR_SUCCESS) { 125 return rv; 126 } 127 128 /* Set all the data in the queue to NULL */ 129 queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*)); 130 queue->bounds = queue_capacity; 131 queue->nelts = 0; 132 queue->in = 0; 133 queue->out = 0; 134 queue->terminated = 0; 135 queue->full_waiters = 0; 136 queue->empty_waiters = 0; 137 138 apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null); 139 140 return APR_SUCCESS; 141} 142 143/** 144 * Push new data onto the queue. Blocks if the queue is full. Once 145 * the push operation has completed, it signals other threads waiting 146 * in apr_queue_pop() that they may continue consuming sockets. 147 */ 148APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) 149{ 150 apr_status_t rv; 151 152 if (queue->terminated) { 153 return APR_EOF; /* no more elements ever again */ 154 } 155 156 rv = apr_thread_mutex_lock(queue->one_big_mutex); 157 if (rv != APR_SUCCESS) { 158 return rv; 159 } 160 161 if (apr_queue_full(queue)) { 162 if (!queue->terminated) { 163 queue->full_waiters++; 164 rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex); 165 queue->full_waiters--; 166 if (rv != APR_SUCCESS) { 167 apr_thread_mutex_unlock(queue->one_big_mutex); 168 return rv; 169 } 170 } 171 /* If we wake up and it's still empty, then we were interrupted */ 172 if (apr_queue_full(queue)) { 173 Q_DBG("queue full (intr)", queue); 174 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 175 if (rv != APR_SUCCESS) { 176 return rv; 177 } 178 if (queue->terminated) { 179 return APR_EOF; /* no more elements ever again */ 180 } 181 else { 182 return APR_EINTR; 183 } 184 } 185 } 186 187 queue->data[queue->in] = data; 188 queue->in++; 189 if (queue->in >= queue->bounds) 190 queue->in -= queue->bounds; 191 queue->nelts++; 192 193 if (queue->empty_waiters) { 194 Q_DBG("sig !empty", queue); 195 rv = apr_thread_cond_signal(queue->not_empty); 196 if (rv != APR_SUCCESS) { 197 apr_thread_mutex_unlock(queue->one_big_mutex); 198 return rv; 199 } 200 } 201 202 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 203 return rv; 204} 205 206/** 207 * Push new data onto the queue. If the queue is full, return APR_EAGAIN. If 208 * the push operation completes successfully, it signals other threads 209 * waiting in apr_queue_pop() that they may continue consuming sockets. 210 */ 211APU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data) 212{ 213 apr_status_t rv; 214 215 if (queue->terminated) { 216 return APR_EOF; /* no more elements ever again */ 217 } 218 219 rv = apr_thread_mutex_lock(queue->one_big_mutex); 220 if (rv != APR_SUCCESS) { 221 return rv; 222 } 223 224 if (apr_queue_full(queue)) { 225 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 226 return APR_EAGAIN; 227 } 228 229 queue->data[queue->in] = data; 230 queue->in++; 231 if (queue->in >= queue->bounds) 232 queue->in -= queue->bounds; 233 queue->nelts++; 234 235 if (queue->empty_waiters) { 236 Q_DBG("sig !empty", queue); 237 rv = apr_thread_cond_signal(queue->not_empty); 238 if (rv != APR_SUCCESS) { 239 apr_thread_mutex_unlock(queue->one_big_mutex); 240 return rv; 241 } 242 } 243 244 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 245 return rv; 246} 247 248/** 249 * not thread safe 250 */ 251APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) { 252 return queue->nelts; 253} 254 255/** 256 * Retrieves the next item from the queue. If there are no 257 * items available, it will block until one becomes available. 258 * Once retrieved, the item is placed into the address specified by 259 * 'data'. 260 */ 261APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) 262{ 263 apr_status_t rv; 264 265 if (queue->terminated) { 266 return APR_EOF; /* no more elements ever again */ 267 } 268 269 rv = apr_thread_mutex_lock(queue->one_big_mutex); 270 if (rv != APR_SUCCESS) { 271 return rv; 272 } 273 274 /* Keep waiting until we wake up and find that the queue is not empty. */ 275 if (apr_queue_empty(queue)) { 276 if (!queue->terminated) { 277 queue->empty_waiters++; 278 rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); 279 queue->empty_waiters--; 280 if (rv != APR_SUCCESS) { 281 apr_thread_mutex_unlock(queue->one_big_mutex); 282 return rv; 283 } 284 } 285 /* If we wake up and it's still empty, then we were interrupted */ 286 if (apr_queue_empty(queue)) { 287 Q_DBG("queue empty (intr)", queue); 288 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 289 if (rv != APR_SUCCESS) { 290 return rv; 291 } 292 if (queue->terminated) { 293 return APR_EOF; /* no more elements ever again */ 294 } 295 else { 296 return APR_EINTR; 297 } 298 } 299 } 300 301 *data = queue->data[queue->out]; 302 queue->nelts--; 303 304 queue->out++; 305 if (queue->out >= queue->bounds) 306 queue->out -= queue->bounds; 307 if (queue->full_waiters) { 308 Q_DBG("signal !full", queue); 309 rv = apr_thread_cond_signal(queue->not_full); 310 if (rv != APR_SUCCESS) { 311 apr_thread_mutex_unlock(queue->one_big_mutex); 312 return rv; 313 } 314 } 315 316 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 317 return rv; 318} 319 320/** 321 * Retrieves the next item from the queue. If there are no 322 * items available, return APR_EAGAIN. Once retrieved, 323 * the item is placed into the address specified by 'data'. 324 */ 325APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data) 326{ 327 apr_status_t rv; 328 329 if (queue->terminated) { 330 return APR_EOF; /* no more elements ever again */ 331 } 332 333 rv = apr_thread_mutex_lock(queue->one_big_mutex); 334 if (rv != APR_SUCCESS) { 335 return rv; 336 } 337 338 if (apr_queue_empty(queue)) { 339 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 340 return APR_EAGAIN; 341 } 342 343 *data = queue->data[queue->out]; 344 queue->nelts--; 345 346 queue->out++; 347 if (queue->out >= queue->bounds) 348 queue->out -= queue->bounds; 349 if (queue->full_waiters) { 350 Q_DBG("signal !full", queue); 351 rv = apr_thread_cond_signal(queue->not_full); 352 if (rv != APR_SUCCESS) { 353 apr_thread_mutex_unlock(queue->one_big_mutex); 354 return rv; 355 } 356 } 357 358 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 359 return rv; 360} 361 362APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue) 363{ 364 apr_status_t rv; 365 Q_DBG("intr all", queue); 366 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 367 return rv; 368 } 369 apr_thread_cond_broadcast(queue->not_empty); 370 apr_thread_cond_broadcast(queue->not_full); 371 372 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { 373 return rv; 374 } 375 376 return APR_SUCCESS; 377} 378 379APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue) 380{ 381 apr_status_t rv; 382 383 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 384 return rv; 385 } 386 387 /* we must hold one_big_mutex when setting this... otherwise, 388 * we could end up setting it and waking everybody up just after a 389 * would-be popper checks it but right before they block 390 */ 391 queue->terminated = 1; 392 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { 393 return rv; 394 } 395 return apr_queue_interrupt_all(queue); 396} 397 398#endif /* APR_HAS_THREADS */ 399