1238106Sdes/* Licensed to the Apache Software Foundation (ASF) under one or more 2238106Sdes * contributor license agreements. See the NOTICE file distributed with 3238106Sdes * this work for additional information regarding copyright ownership. 4238106Sdes * The ASF licenses this file to You under the Apache License, Version 2.0 5238106Sdes * (the "License"); you may not use this file except in compliance with 6238106Sdes * the License. You may obtain a copy of the License at 7238106Sdes * 8238106Sdes * http://www.apache.org/licenses/LICENSE-2.0 9238106Sdes * 10238106Sdes * Unless required by applicable law or agreed to in writing, software 11238106Sdes * distributed under the License is distributed on an "AS IS" BASIS, 12238106Sdes * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13238106Sdes * See the License for the specific language governing permissions and 14238106Sdes * limitations under the License. 15238106Sdes */ 16238106Sdes 17238106Sdes#include "apr.h" 18238106Sdes 19238106Sdes#if APR_HAVE_STDIO_H 20238106Sdes#include <stdio.h> 21238106Sdes#endif 22238106Sdes#if APR_HAVE_STDLIB_H 23238106Sdes#include <stdlib.h> 24269257Sdes#endif 25269257Sdes#if APR_HAVE_UNISTD_H 26269257Sdes#include <unistd.h> 27269257Sdes#endif 28269257Sdes 29269257Sdes#include "apu.h" 30269257Sdes#include "apr_portable.h" 31269257Sdes#include "apr_thread_mutex.h" 32269257Sdes#include "apr_thread_cond.h" 33269257Sdes#include "apr_errno.h" 34238106Sdes#include "apr_queue.h" 35238106Sdes 36238106Sdes#if APR_HAS_THREADS 37238106Sdes/* 38238106Sdes * define this to get debug messages 39238106Sdes * 40238106Sdes#define QUEUE_DEBUG 41238106Sdes */ 42238106Sdes 43238106Sdesstruct apr_queue_t { 44238106Sdes void **data; 45238106Sdes unsigned int nelts; /**< # elements */ 46269257Sdes unsigned int in; /**< next empty location */ 47238106Sdes unsigned int out; /**< next filled location */ 48238106Sdes unsigned int bounds;/**< max size of queue */ 49238106Sdes unsigned int full_waiters; 50238106Sdes unsigned int empty_waiters; 51238106Sdes apr_thread_mutex_t *one_big_mutex; 52238106Sdes apr_thread_cond_t *not_empty; 53238106Sdes apr_thread_cond_t *not_full; 54238106Sdes int terminated; 55238106Sdes}; 56238106Sdes 57238106Sdes#ifdef QUEUE_DEBUG 58238106Sdesstatic void Q_DBG(char*msg, apr_queue_t *q) { 59238106Sdes fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n", 60238106Sdes apr_os_thread_current(), 61238106Sdes q->nelts, q->in, q->out, 62238106Sdes msg 63238106Sdes ); 64238106Sdes} 65238106Sdes#else 66238106Sdes#define Q_DBG(x,y) 67238106Sdes#endif 68238106Sdes 69238106Sdes/** 70238106Sdes * Detects when the apr_queue_t is full. This utility function is expected 71238106Sdes * to be called from within critical sections, and is not threadsafe. 72238106Sdes */ 73238106Sdes#define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds) 74238106Sdes 75238106Sdes/** 76238106Sdes * Detects when the apr_queue_t is empty. This utility function is expected 77238106Sdes * to be called from within critical sections, and is not threadsafe. 78238106Sdes */ 79238106Sdes#define apr_queue_empty(queue) ((queue)->nelts == 0) 80238106Sdes 81238106Sdes/** 82238106Sdes * Callback routine that is called to destroy this 83238106Sdes * apr_queue_t when its pool is destroyed. 84238106Sdes */ 85238106Sdesstatic apr_status_t queue_destroy(void *data) 86238106Sdes{ 87238106Sdes apr_queue_t *queue = data; 88238106Sdes 89238106Sdes /* Ignore errors here, we can't do anything about them anyway. */ 90238106Sdes 91238106Sdes apr_thread_cond_destroy(queue->not_empty); 92238106Sdes apr_thread_cond_destroy(queue->not_full); 93238106Sdes apr_thread_mutex_destroy(queue->one_big_mutex); 94238106Sdes 95238106Sdes return APR_SUCCESS; 96238106Sdes} 97238106Sdes 98238106Sdes/** 99238106Sdes * Initialize the apr_queue_t. 100238106Sdes */ 101238106SdesAPU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q, 102238106Sdes unsigned int queue_capacity, 103238106Sdes apr_pool_t *a) 104238106Sdes{ 105238106Sdes apr_status_t rv; 106238106Sdes apr_queue_t *queue; 107238106Sdes queue = apr_palloc(a, sizeof(apr_queue_t)); 108238106Sdes *q = queue; 109238106Sdes 110238106Sdes /* nested doesn't work ;( */ 111238106Sdes rv = apr_thread_mutex_create(&queue->one_big_mutex, 112238106Sdes APR_THREAD_MUTEX_UNNESTED, 113238106Sdes a); 114238106Sdes if (rv != APR_SUCCESS) { 115238106Sdes return rv; 116238106Sdes } 117238106Sdes 118238106Sdes rv = apr_thread_cond_create(&queue->not_empty, a); 119238106Sdes if (rv != APR_SUCCESS) { 120238106Sdes return rv; 121238106Sdes } 122238106Sdes 123238106Sdes rv = apr_thread_cond_create(&queue->not_full, a); 124238106Sdes if (rv != APR_SUCCESS) { 125238106Sdes return rv; 126238106Sdes } 127238106Sdes 128238106Sdes /* Set all the data in the queue to NULL */ 129238106Sdes queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*)); 130238106Sdes queue->bounds = queue_capacity; 131238106Sdes queue->nelts = 0; 132238106Sdes queue->in = 0; 133238106Sdes queue->out = 0; 134238106Sdes queue->terminated = 0; 135238106Sdes queue->full_waiters = 0; 136238106Sdes queue->empty_waiters = 0; 137238106Sdes 138238106Sdes apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null); 139238106Sdes 140238106Sdes return APR_SUCCESS; 141238106Sdes} 142238106Sdes 143238106Sdes/** 144238106Sdes * Push new data onto the queue. Blocks if the queue is full. Once 145238106Sdes * the push operation has completed, it signals other threads waiting 146238106Sdes * in apr_queue_pop() that they may continue consuming sockets. 147238106Sdes */ 148238106SdesAPU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) 149238106Sdes{ 150238106Sdes apr_status_t rv; 151238106Sdes 152238106Sdes if (queue->terminated) { 153238106Sdes return APR_EOF; /* no more elements ever again */ 154238106Sdes } 155238106Sdes 156238106Sdes rv = apr_thread_mutex_lock(queue->one_big_mutex); 157238106Sdes if (rv != APR_SUCCESS) { 158238106Sdes return rv; 159238106Sdes } 160238106Sdes 161238106Sdes if (apr_queue_full(queue)) { 162238106Sdes if (!queue->terminated) { 163238106Sdes queue->full_waiters++; 164238106Sdes rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex); 165238106Sdes queue->full_waiters--; 166238106Sdes if (rv != APR_SUCCESS) { 167238106Sdes apr_thread_mutex_unlock(queue->one_big_mutex); 168238106Sdes return rv; 169238106Sdes } 170238106Sdes } 171238106Sdes /* If we wake up and it's still empty, then we were interrupted */ 172238106Sdes if (apr_queue_full(queue)) { 173238106Sdes Q_DBG("queue full (intr)", queue); 174238106Sdes rv = apr_thread_mutex_unlock(queue->one_big_mutex); 175238106Sdes if (rv != APR_SUCCESS) { 176238106Sdes return rv; 177238106Sdes } 178238106Sdes if (queue->terminated) { 179238106Sdes return APR_EOF; /* no more elements ever again */ 180238106Sdes } 181238106Sdes else { 182238106Sdes return APR_EINTR; 183238106Sdes } 184238106Sdes } 185238106Sdes } 186238106Sdes 187238106Sdes queue->data[queue->in] = data; 188238106Sdes queue->in++; 189238106Sdes if (queue->in >= queue->bounds) 190238106Sdes queue->in -= queue->bounds; 191238106Sdes queue->nelts++; 192238106Sdes 193238106Sdes if (queue->empty_waiters) { 194238106Sdes Q_DBG("sig !empty", queue); 195238106Sdes rv = apr_thread_cond_signal(queue->not_empty); 196238106Sdes if (rv != APR_SUCCESS) { 197238106Sdes apr_thread_mutex_unlock(queue->one_big_mutex); 198238106Sdes return rv; 199238106Sdes } 200238106Sdes } 201238106Sdes 202238106Sdes rv = apr_thread_mutex_unlock(queue->one_big_mutex); 203238106Sdes return rv; 204238106Sdes} 205238106Sdes 206238106Sdes/** 207238106Sdes * Push new data onto the queue. If the queue is full, return APR_EAGAIN. If 208238106Sdes * the push operation completes successfully, it signals other threads 209238106Sdes * waiting in apr_queue_pop() that they may continue consuming sockets. 210238106Sdes */ 211238106SdesAPU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data) 212238106Sdes{ 213238106Sdes apr_status_t rv; 214238106Sdes 215238106Sdes if (queue->terminated) { 216238106Sdes return APR_EOF; /* no more elements ever again */ 217238106Sdes } 218238106Sdes 219238106Sdes rv = apr_thread_mutex_lock(queue->one_big_mutex); 220238106Sdes if (rv != APR_SUCCESS) { 221238106Sdes return rv; 222238106Sdes } 223238106Sdes 224238106Sdes if (apr_queue_full(queue)) { 225238106Sdes rv = apr_thread_mutex_unlock(queue->one_big_mutex); 226238106Sdes return APR_EAGAIN; 227238106Sdes } 228238106Sdes 229238106Sdes queue->data[queue->in] = data; 230238106Sdes queue->in++; 231238106Sdes if (queue->in >= queue->bounds) 232238106Sdes queue->in -= queue->bounds; 233238106Sdes queue->nelts++; 234238106Sdes 235238106Sdes if (queue->empty_waiters) { 236238106Sdes Q_DBG("sig !empty", queue); 237238106Sdes rv = apr_thread_cond_signal(queue->not_empty); 238238106Sdes if (rv != APR_SUCCESS) { 239238106Sdes apr_thread_mutex_unlock(queue->one_big_mutex); 240238106Sdes return rv; 241238106Sdes } 242238106Sdes } 243238106Sdes 244238106Sdes rv = apr_thread_mutex_unlock(queue->one_big_mutex); 245238106Sdes return rv; 246238106Sdes} 247238106Sdes 248/** 249 * not thread safe 250 */ 251APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) { 252 return queue->nelts; 253} 254 255/** 256 * Retrieves the next item from the queue. If there are no 257 * items available, it will block until one becomes available. 258 * Once retrieved, the item is placed into the address specified by 259 * 'data'. 260 */ 261APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) 262{ 263 apr_status_t rv; 264 265 if (queue->terminated) { 266 return APR_EOF; /* no more elements ever again */ 267 } 268 269 rv = apr_thread_mutex_lock(queue->one_big_mutex); 270 if (rv != APR_SUCCESS) { 271 return rv; 272 } 273 274 /* Keep waiting until we wake up and find that the queue is not empty. */ 275 if (apr_queue_empty(queue)) { 276 if (!queue->terminated) { 277 queue->empty_waiters++; 278 rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); 279 queue->empty_waiters--; 280 if (rv != APR_SUCCESS) { 281 apr_thread_mutex_unlock(queue->one_big_mutex); 282 return rv; 283 } 284 } 285 /* If we wake up and it's still empty, then we were interrupted */ 286 if (apr_queue_empty(queue)) { 287 Q_DBG("queue empty (intr)", queue); 288 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 289 if (rv != APR_SUCCESS) { 290 return rv; 291 } 292 if (queue->terminated) { 293 return APR_EOF; /* no more elements ever again */ 294 } 295 else { 296 return APR_EINTR; 297 } 298 } 299 } 300 301 *data = queue->data[queue->out]; 302 queue->nelts--; 303 304 queue->out++; 305 if (queue->out >= queue->bounds) 306 queue->out -= queue->bounds; 307 if (queue->full_waiters) { 308 Q_DBG("signal !full", queue); 309 rv = apr_thread_cond_signal(queue->not_full); 310 if (rv != APR_SUCCESS) { 311 apr_thread_mutex_unlock(queue->one_big_mutex); 312 return rv; 313 } 314 } 315 316 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 317 return rv; 318} 319 320/** 321 * Retrieves the next item from the queue. If there are no 322 * items available, return APR_EAGAIN. Once retrieved, 323 * the item is placed into the address specified by 'data'. 324 */ 325APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data) 326{ 327 apr_status_t rv; 328 329 if (queue->terminated) { 330 return APR_EOF; /* no more elements ever again */ 331 } 332 333 rv = apr_thread_mutex_lock(queue->one_big_mutex); 334 if (rv != APR_SUCCESS) { 335 return rv; 336 } 337 338 if (apr_queue_empty(queue)) { 339 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 340 return APR_EAGAIN; 341 } 342 343 *data = queue->data[queue->out]; 344 queue->nelts--; 345 346 queue->out++; 347 if (queue->out >= queue->bounds) 348 queue->out -= queue->bounds; 349 if (queue->full_waiters) { 350 Q_DBG("signal !full", queue); 351 rv = apr_thread_cond_signal(queue->not_full); 352 if (rv != APR_SUCCESS) { 353 apr_thread_mutex_unlock(queue->one_big_mutex); 354 return rv; 355 } 356 } 357 358 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 359 return rv; 360} 361 362APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue) 363{ 364 apr_status_t rv; 365 Q_DBG("intr all", queue); 366 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 367 return rv; 368 } 369 apr_thread_cond_broadcast(queue->not_empty); 370 apr_thread_cond_broadcast(queue->not_full); 371 372 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { 373 return rv; 374 } 375 376 return APR_SUCCESS; 377} 378 379APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue) 380{ 381 apr_status_t rv; 382 383 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 384 return rv; 385 } 386 387 /* we must hold one_big_mutex when setting this... otherwise, 388 * we could end up setting it and waking everybody up just after a 389 * would-be popper checks it but right before they block 390 */ 391 queue->terminated = 1; 392 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { 393 return rv; 394 } 395 return apr_queue_interrupt_all(queue); 396} 397 398#endif /* APR_HAS_THREADS */ 399