1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "fdqueue.h" 18#include "apr_atomic.h" 19 20typedef struct recycled_pool 21{ 22 apr_pool_t *pool; 23 struct recycled_pool *next; 24} recycled_pool; 25 26struct fd_queue_info_t 27{ 28 apr_int32_t idlers; /** 29 * 0 or positive: number of idle worker threads 30 * negative: number of threads blocked waiting 31 * for an idle worker 32 */ 33 apr_thread_mutex_t *idlers_mutex; 34 apr_thread_cond_t *wait_for_idler; 35 int terminated; 36 int max_idlers; 37 recycled_pool *recycled_pools; 38}; 39 40static apr_status_t queue_info_cleanup(void *data_) 41{ 42 fd_queue_info_t *qi = data_; 43 apr_thread_cond_destroy(qi->wait_for_idler); 44 apr_thread_mutex_destroy(qi->idlers_mutex); 45 46 /* Clean up any pools in the recycled list */ 47 for (;;) { 48 struct recycled_pool *first_pool = qi->recycled_pools; 49 if (first_pool == NULL) { 50 break; 51 } 52 if (apr_atomic_casptr 53 ((volatile void **) &(qi->recycled_pools), first_pool->next, 54 first_pool) == first_pool) { 55 apr_pool_destroy(first_pool->pool); 56 } 57 } 58 59 return APR_SUCCESS; 60} 61 62apr_status_t ap_queue_info_create(fd_queue_info_t ** queue_info, 63 apr_pool_t * pool, int max_idlers) 64{ 65 apr_status_t rv; 66 fd_queue_info_t *qi; 67 68 qi = apr_pcalloc(pool, sizeof(*qi)); 69 70 rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT, 71 pool); 72 if (rv != APR_SUCCESS) { 73 return rv; 74 } 75 rv = apr_thread_cond_create(&qi->wait_for_idler, pool); 76 if (rv != APR_SUCCESS) { 77 return rv; 78 } 79 qi->recycled_pools = NULL; 80 qi->max_idlers = max_idlers; 81 apr_pool_cleanup_register(pool, qi, queue_info_cleanup, 82 apr_pool_cleanup_null); 83 84 *queue_info = qi; 85 86 return APR_SUCCESS; 87} 88 89apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info, 90 apr_pool_t * pool_to_recycle) 91{ 92 apr_status_t rv; 93 int prev_idlers; 94 95 ap_push_pool(queue_info, pool_to_recycle); 96 97 /* Atomically increment the count of idle workers */ 98 prev_idlers = apr_atomic_inc32(&(queue_info->idlers)); 99 100 /* If other threads are waiting on a worker, wake one up */ 101 if (prev_idlers < 0) { 102 rv = apr_thread_mutex_lock(queue_info->idlers_mutex); 103 if (rv != APR_SUCCESS) { 104 AP_DEBUG_ASSERT(0); 105 return rv; 106 } 107 rv = apr_thread_cond_signal(queue_info->wait_for_idler); 108 if (rv != APR_SUCCESS) { 109 apr_thread_mutex_unlock(queue_info->idlers_mutex); 110 return rv; 111 } 112 rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); 113 if (rv != APR_SUCCESS) { 114 return rv; 115 } 116 } 117 118 return APR_SUCCESS; 119} 120 121apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info) 122{ 123 apr_status_t rv; 124 int prev_idlers; 125 126 /* Atomically decrement the idle worker count, saving the old value */ 127 prev_idlers = apr_atomic_add32(&(queue_info->idlers), -1); 128 129 /* Block if there weren't any idle workers */ 130 if (prev_idlers <= 0) { 131 rv = apr_thread_mutex_lock(queue_info->idlers_mutex); 132 if (rv != APR_SUCCESS) { 133 AP_DEBUG_ASSERT(0); 134 apr_atomic_inc32(&(queue_info->idlers)); /* back out dec */ 135 return rv; 136 } 137 /* Re-check the idle worker count to guard against a 138 * race condition. Now that we're in the mutex-protected 139 * region, one of two things may have happened: 140 * - If the idle worker count is still negative, the 141 * workers are all still busy, so it's safe to 142 * block on a condition variable. 143 * - If the idle worker count is non-negative, then a 144 * worker has become idle since the first check 145 * of queue_info->idlers above. It's possible 146 * that the worker has also signaled the condition 147 * variable--and if so, the listener missed it 148 * because it wasn't yet blocked on the condition 149 * variable. But if the idle worker count is 150 * now non-negative, it's safe for this function to 151 * return immediately. 152 * 153 * A negative value in queue_info->idlers tells how many 154 * threads are waiting on an idle worker. 155 */ 156 if (queue_info->idlers < 0) { 157 rv = apr_thread_cond_wait(queue_info->wait_for_idler, 158 queue_info->idlers_mutex); 159 if (rv != APR_SUCCESS) { 160 apr_status_t rv2; 161 AP_DEBUG_ASSERT(0); 162 rv2 = apr_thread_mutex_unlock(queue_info->idlers_mutex); 163 if (rv2 != APR_SUCCESS) { 164 return rv2; 165 } 166 return rv; 167 } 168 } 169 rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); 170 if (rv != APR_SUCCESS) { 171 return rv; 172 } 173 } 174 175 if (queue_info->terminated) { 176 return APR_EOF; 177 } 178 else { 179 return APR_SUCCESS; 180 } 181} 182 183 184void ap_push_pool(fd_queue_info_t * queue_info, 185 apr_pool_t * pool_to_recycle) 186{ 187 /* If we have been given a pool to recycle, atomically link 188 * it into the queue_info's list of recycled pools 189 */ 190 if (pool_to_recycle) { 191 struct recycled_pool *new_recycle; 192 new_recycle = (struct recycled_pool *) apr_palloc(pool_to_recycle, 193 sizeof 194 (*new_recycle)); 195 new_recycle->pool = pool_to_recycle; 196 for (;;) { 197 /* 198 * Save queue_info->recycled_pool in local variable next because 199 * new_recycle->next can be changed after apr_atomic_casptr 200 * function call. For gory details see PR 44402. 201 */ 202 struct recycled_pool *next = queue_info->recycled_pools; 203 new_recycle->next = next; 204 if (apr_atomic_casptr 205 ((volatile void **) &(queue_info->recycled_pools), 206 new_recycle, next) == next) { 207 break; 208 } 209 } 210 } 211} 212 213void ap_pop_pool(apr_pool_t ** recycled_pool, fd_queue_info_t * queue_info) 214{ 215 /* Atomically pop a pool from the recycled list */ 216 217 /* This function is safe only as long as it is single threaded because 218 * it reaches into the queue and accesses "next" which can change. 219 * We are OK today because it is only called from the listener thread. 220 * cas-based pushes do not have the same limitation - any number can 221 * happen concurrently with a single cas-based pop. 222 */ 223 224 *recycled_pool = NULL; 225 226 227 /* Atomically pop a pool from the recycled list */ 228 for (;;) { 229 struct recycled_pool *first_pool = queue_info->recycled_pools; 230 if (first_pool == NULL) { 231 break; 232 } 233 if (apr_atomic_casptr 234 ((volatile void **) &(queue_info->recycled_pools), 235 first_pool->next, first_pool) == first_pool) { 236 *recycled_pool = first_pool->pool; 237 break; 238 } 239 } 240} 241 242apr_status_t ap_queue_info_term(fd_queue_info_t * queue_info) 243{ 244 apr_status_t rv; 245 rv = apr_thread_mutex_lock(queue_info->idlers_mutex); 246 if (rv != APR_SUCCESS) { 247 return rv; 248 } 249 queue_info->terminated = 1; 250 apr_thread_cond_broadcast(queue_info->wait_for_idler); 251 return apr_thread_mutex_unlock(queue_info->idlers_mutex); 252} 253 254/** 255 * Detects when the fd_queue_t is full. This utility function is expected 256 * to be called from within critical sections, and is not threadsafe. 257 */ 258#define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds) 259 260/** 261 * Detects when the fd_queue_t is empty. This utility function is expected 262 * to be called from within critical sections, and is not threadsafe. 263 */ 264#define ap_queue_empty(queue) ((queue)->nelts == 0) 265 266/** 267 * Callback routine that is called to destroy this 268 * fd_queue_t when its pool is destroyed. 269 */ 270static apr_status_t ap_queue_destroy(void *data) 271{ 272 fd_queue_t *queue = data; 273 274 /* Ignore errors here, we can't do anything about them anyway. 275 * XXX: We should at least try to signal an error here, it is 276 * indicative of a programmer error. -aaron */ 277 apr_thread_cond_destroy(queue->not_empty); 278 apr_thread_mutex_destroy(queue->one_big_mutex); 279 280 return APR_SUCCESS; 281} 282 283/** 284 * Initialize the fd_queue_t. 285 */ 286apr_status_t ap_queue_init(fd_queue_t * queue, int queue_capacity, 287 apr_pool_t * a) 288{ 289 int i; 290 apr_status_t rv; 291 292 if ((rv = apr_thread_mutex_create(&queue->one_big_mutex, 293 APR_THREAD_MUTEX_DEFAULT, 294 a)) != APR_SUCCESS) { 295 return rv; 296 } 297 if ((rv = apr_thread_cond_create(&queue->not_empty, a)) != APR_SUCCESS) { 298 return rv; 299 } 300 301 queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t)); 302 queue->bounds = queue_capacity; 303 queue->nelts = 0; 304 305 /* Set all the sockets in the queue to NULL */ 306 for (i = 0; i < queue_capacity; ++i) 307 queue->data[i].sd = NULL; 308 309 apr_pool_cleanup_register(a, queue, ap_queue_destroy, 310 apr_pool_cleanup_null); 311 312 return APR_SUCCESS; 313} 314 315/** 316 * Push a new socket onto the queue. 317 * 318 * precondition: ap_queue_info_wait_for_idler has already been called 319 * to reserve an idle worker thread 320 */ 321apr_status_t ap_queue_push(fd_queue_t * queue, apr_socket_t * sd, 322 conn_state_t * cs, apr_pool_t * p) 323{ 324 fd_queue_elem_t *elem; 325 apr_status_t rv; 326 327 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 328 return rv; 329 } 330 331 AP_DEBUG_ASSERT(!queue->terminated); 332 AP_DEBUG_ASSERT(!ap_queue_full(queue)); 333 334 elem = &queue->data[queue->nelts]; 335 elem->sd = sd; 336 elem->cs = cs; 337 elem->p = p; 338 queue->nelts++; 339 340 apr_thread_cond_signal(queue->not_empty); 341 342 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { 343 return rv; 344 } 345 346 return APR_SUCCESS; 347} 348 349/** 350 * Retrieves the next available socket from the queue. If there are no 351 * sockets available, it will block until one becomes available. 352 * Once retrieved, the socket is placed into the address specified by 353 * 'sd'. 354 */ 355apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd, 356 conn_state_t ** cs, apr_pool_t ** p) 357{ 358 fd_queue_elem_t *elem; 359 apr_status_t rv; 360 361 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 362 return rv; 363 } 364 365 /* Keep waiting until we wake up and find that the queue is not empty. */ 366 if (ap_queue_empty(queue)) { 367 if (!queue->terminated) { 368 apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); 369 } 370 /* If we wake up and it's still empty, then we were interrupted */ 371 if (ap_queue_empty(queue)) { 372 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 373 if (rv != APR_SUCCESS) { 374 return rv; 375 } 376 if (queue->terminated) { 377 return APR_EOF; /* no more elements ever again */ 378 } 379 else { 380 return APR_EINTR; 381 } 382 } 383 } 384 385 elem = &queue->data[--queue->nelts]; 386 *sd = elem->sd; 387 *cs = elem->cs; 388 *p = elem->p; 389#ifdef AP_DEBUG 390 elem->sd = NULL; 391 elem->p = NULL; 392#endif /* AP_DEBUG */ 393 394 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 395 return rv; 396} 397 398apr_status_t ap_queue_interrupt_all(fd_queue_t * queue) 399{ 400 apr_status_t rv; 401 402 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 403 return rv; 404 } 405 apr_thread_cond_broadcast(queue->not_empty); 406 return apr_thread_mutex_unlock(queue->one_big_mutex); 407} 408 409apr_status_t ap_queue_term(fd_queue_t * queue) 410{ 411 apr_status_t rv; 412 413 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 414 return rv; 415 } 416 /* we must hold one_big_mutex when setting this... otherwise, 417 * we could end up setting it and waking everybody up just after a 418 * would-be popper checks it but right before they block 419 */ 420 queue->terminated = 1; 421 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { 422 return rv; 423 } 424 return ap_queue_interrupt_all(queue); 425} 426