1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "fdqueue.h" 18#include "apr_atomic.h" 19 20typedef struct recycled_pool { 21 apr_pool_t *pool; 22 struct recycled_pool *next; 23} recycled_pool; 24 25struct fd_queue_info_t { 26 apr_uint32_t idlers; 27 apr_thread_mutex_t *idlers_mutex; 28 apr_thread_cond_t *wait_for_idler; 29 int terminated; 30 int max_idlers; 31 recycled_pool *recycled_pools; 32}; 33 34static apr_status_t queue_info_cleanup(void *data_) 35{ 36 fd_queue_info_t *qi = data_; 37 apr_thread_cond_destroy(qi->wait_for_idler); 38 apr_thread_mutex_destroy(qi->idlers_mutex); 39 40 /* Clean up any pools in the recycled list */ 41 for (;;) { 42 struct recycled_pool *first_pool = qi->recycled_pools; 43 if (first_pool == NULL) { 44 break; 45 } 46 if (apr_atomic_casptr((void*)&(qi->recycled_pools), first_pool->next, 47 first_pool) == first_pool) { 48 apr_pool_destroy(first_pool->pool); 49 } 50 } 51 52 return APR_SUCCESS; 53} 54 55apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info, 56 apr_pool_t *pool, int max_idlers) 57{ 58 apr_status_t rv; 59 fd_queue_info_t *qi; 60 61 qi = apr_pcalloc(pool, sizeof(*qi)); 62 63 rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT, 64 pool); 65 if (rv != APR_SUCCESS) { 66 return rv; 67 } 68 rv = apr_thread_cond_create(&qi->wait_for_idler, pool); 69 if (rv != APR_SUCCESS) { 70 return rv; 71 } 72 qi->recycled_pools = NULL; 73 qi->max_idlers = max_idlers; 74 apr_pool_cleanup_register(pool, qi, queue_info_cleanup, 75 apr_pool_cleanup_null); 76 77 *queue_info = qi; 78 79 return APR_SUCCESS; 80} 81 82apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info, 83 apr_pool_t *pool_to_recycle) 84{ 85 apr_status_t rv; 86 int prev_idlers; 87 88 /* If we have been given a pool to recycle, atomically link 89 * it into the queue_info's list of recycled pools 90 */ 91 if (pool_to_recycle) { 92 struct recycled_pool *new_recycle; 93 new_recycle = (struct recycled_pool *)apr_palloc(pool_to_recycle, 94 sizeof(*new_recycle)); 95 new_recycle->pool = pool_to_recycle; 96 for (;;) { 97 /* Save queue_info->recycled_pool in local variable next because 98 * new_recycle->next can be changed after apr_atomic_casptr 99 * function call. For gory details see PR 44402. 100 */ 101 struct recycled_pool *next = queue_info->recycled_pools; 102 new_recycle->next = next; 103 if (apr_atomic_casptr((void*)&(queue_info->recycled_pools), 104 new_recycle, next) == next) { 105 break; 106 } 107 } 108 } 109 110 /* Atomically increment the count of idle workers */ 111 for (;;) { 112 prev_idlers = queue_info->idlers; 113 if (apr_atomic_cas32(&(queue_info->idlers), prev_idlers + 1, 114 prev_idlers) == prev_idlers) { 115 break; 116 } 117 } 118 119 /* If this thread just made the idle worker count nonzero, 120 * wake up the listener. */ 121 if (prev_idlers == 0) { 122 rv = apr_thread_mutex_lock(queue_info->idlers_mutex); 123 if (rv != APR_SUCCESS) { 124 return rv; 125 } 126 rv = apr_thread_cond_signal(queue_info->wait_for_idler); 127 if (rv != APR_SUCCESS) { 128 apr_thread_mutex_unlock(queue_info->idlers_mutex); 129 return rv; 130 } 131 rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); 132 if (rv != APR_SUCCESS) { 133 return rv; 134 } 135 } 136 137 return APR_SUCCESS; 138} 139 140apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info, 141 apr_pool_t **recycled_pool) 142{ 143 apr_status_t rv; 144 145 *recycled_pool = NULL; 146 147 /* Block if the count of idle workers is zero */ 148 if (queue_info->idlers == 0) { 149 rv = apr_thread_mutex_lock(queue_info->idlers_mutex); 150 if (rv != APR_SUCCESS) { 151 return rv; 152 } 153 /* Re-check the idle worker count to guard against a 154 * race condition. Now that we're in the mutex-protected 155 * region, one of two things may have happened: 156 * - If the idle worker count is still zero, the 157 * workers are all still busy, so it's safe to 158 * block on a condition variable, BUT 159 * we need to check for idle worker count again 160 * when we are signaled since it can happen that 161 * we are signaled by a worker thread that went idle 162 * but received a context switch before it could 163 * tell us. If it does signal us later once it is on 164 * CPU again there might be no idle worker left. 165 * See 166 * https://issues.apache.org/bugzilla/show_bug.cgi?id=45605#c4 167 * - If the idle worker count is nonzero, then a 168 * worker has become idle since the first check 169 * of queue_info->idlers above. It's possible 170 * that the worker has also signaled the condition 171 * variable--and if so, the listener missed it 172 * because it wasn't yet blocked on the condition 173 * variable. But if the idle worker count is 174 * now nonzero, it's safe for this function to 175 * return immediately. 176 */ 177 while (queue_info->idlers == 0) { 178 rv = apr_thread_cond_wait(queue_info->wait_for_idler, 179 queue_info->idlers_mutex); 180 if (rv != APR_SUCCESS) { 181 apr_status_t rv2; 182 rv2 = apr_thread_mutex_unlock(queue_info->idlers_mutex); 183 if (rv2 != APR_SUCCESS) { 184 return rv2; 185 } 186 return rv; 187 } 188 } 189 rv = apr_thread_mutex_unlock(queue_info->idlers_mutex); 190 if (rv != APR_SUCCESS) { 191 return rv; 192 } 193 } 194 195 /* Atomically decrement the idle worker count */ 196 apr_atomic_dec32(&(queue_info->idlers)); 197 198 /* Atomically pop a pool from the recycled list */ 199 200 /* This function is safe only as long as it is single threaded because 201 * it reaches into the queue and accesses "next" which can change. 202 * We are OK today because it is only called from the listener thread. 203 * cas-based pushes do not have the same limitation - any number can 204 * happen concurrently with a single cas-based pop. 205 */ 206 207 for (;;) { 208 struct recycled_pool *first_pool = queue_info->recycled_pools; 209 if (first_pool == NULL) { 210 break; 211 } 212 if (apr_atomic_casptr((void*)&(queue_info->recycled_pools), first_pool->next, 213 first_pool) == first_pool) { 214 *recycled_pool = first_pool->pool; 215 break; 216 } 217 } 218 219 if (queue_info->terminated) { 220 return APR_EOF; 221 } 222 else { 223 return APR_SUCCESS; 224 } 225} 226 227apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info) 228{ 229 apr_status_t rv; 230 rv = apr_thread_mutex_lock(queue_info->idlers_mutex); 231 if (rv != APR_SUCCESS) { 232 return rv; 233 } 234 queue_info->terminated = 1; 235 apr_thread_cond_broadcast(queue_info->wait_for_idler); 236 return apr_thread_mutex_unlock(queue_info->idlers_mutex); 237} 238 239/** 240 * Detects when the fd_queue_t is full. This utility function is expected 241 * to be called from within critical sections, and is not threadsafe. 242 */ 243#define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds) 244 245/** 246 * Detects when the fd_queue_t is empty. This utility function is expected 247 * to be called from within critical sections, and is not threadsafe. 248 */ 249#define ap_queue_empty(queue) ((queue)->nelts == 0) 250 251/** 252 * Callback routine that is called to destroy this 253 * fd_queue_t when its pool is destroyed. 254 */ 255static apr_status_t ap_queue_destroy(void *data) 256{ 257 fd_queue_t *queue = data; 258 259 /* Ignore errors here, we can't do anything about them anyway. 260 * XXX: We should at least try to signal an error here, it is 261 * indicative of a programmer error. -aaron */ 262 apr_thread_cond_destroy(queue->not_empty); 263 apr_thread_mutex_destroy(queue->one_big_mutex); 264 265 return APR_SUCCESS; 266} 267 268/** 269 * Initialize the fd_queue_t. 270 */ 271apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a) 272{ 273 int i; 274 apr_status_t rv; 275 276 if ((rv = apr_thread_mutex_create(&queue->one_big_mutex, 277 APR_THREAD_MUTEX_DEFAULT, a)) != APR_SUCCESS) { 278 return rv; 279 } 280 if ((rv = apr_thread_cond_create(&queue->not_empty, a)) != APR_SUCCESS) { 281 return rv; 282 } 283 284 queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t)); 285 queue->bounds = queue_capacity; 286 queue->nelts = 0; 287 queue->in = 0; 288 queue->out = 0; 289 290 /* Set all the sockets in the queue to NULL */ 291 for (i = 0; i < queue_capacity; ++i) 292 queue->data[i].sd = NULL; 293 294 apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null); 295 296 return APR_SUCCESS; 297} 298 299/** 300 * Push a new socket onto the queue. 301 * 302 * precondition: ap_queue_info_wait_for_idler has already been called 303 * to reserve an idle worker thread 304 */ 305apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p) 306{ 307 fd_queue_elem_t *elem; 308 apr_status_t rv; 309 310 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 311 return rv; 312 } 313 314 AP_DEBUG_ASSERT(!queue->terminated); 315 AP_DEBUG_ASSERT(!ap_queue_full(queue)); 316 317 elem = &queue->data[queue->in]; 318 queue->in++; 319 if (queue->in >= queue->bounds) 320 queue->in -= queue->bounds; 321 elem->sd = sd; 322 elem->p = p; 323 queue->nelts++; 324 325 apr_thread_cond_signal(queue->not_empty); 326 327 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { 328 return rv; 329 } 330 331 return APR_SUCCESS; 332} 333 334/** 335 * Retrieves the next available socket from the queue. If there are no 336 * sockets available, it will block until one becomes available. 337 * Once retrieved, the socket is placed into the address specified by 338 * 'sd'. 339 */ 340apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p) 341{ 342 fd_queue_elem_t *elem; 343 apr_status_t rv; 344 345 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 346 return rv; 347 } 348 349 /* Keep waiting until we wake up and find that the queue is not empty. */ 350 if (ap_queue_empty(queue)) { 351 if (!queue->terminated) { 352 apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); 353 } 354 /* If we wake up and it's still empty, then we were interrupted */ 355 if (ap_queue_empty(queue)) { 356 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 357 if (rv != APR_SUCCESS) { 358 return rv; 359 } 360 if (queue->terminated) { 361 return APR_EOF; /* no more elements ever again */ 362 } 363 else { 364 return APR_EINTR; 365 } 366 } 367 } 368 369 elem = &queue->data[queue->out]; 370 queue->out++; 371 if (queue->out >= queue->bounds) 372 queue->out -= queue->bounds; 373 queue->nelts--; 374 *sd = elem->sd; 375 *p = elem->p; 376#ifdef AP_DEBUG 377 elem->sd = NULL; 378 elem->p = NULL; 379#endif /* AP_DEBUG */ 380 381 rv = apr_thread_mutex_unlock(queue->one_big_mutex); 382 return rv; 383} 384 385apr_status_t ap_queue_interrupt_all(fd_queue_t *queue) 386{ 387 apr_status_t rv; 388 389 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 390 return rv; 391 } 392 apr_thread_cond_broadcast(queue->not_empty); 393 return apr_thread_mutex_unlock(queue->one_big_mutex); 394} 395 396apr_status_t ap_queue_term(fd_queue_t *queue) 397{ 398 apr_status_t rv; 399 400 if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) { 401 return rv; 402 } 403 /* we must hold one_big_mutex when setting this... otherwise, 404 * we could end up setting it and waking everybody up just after a 405 * would-be popper checks it but right before they block 406 */ 407 queue->terminated = 1; 408 if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) { 409 return rv; 410 } 411 return ap_queue_interrupt_all(queue); 412} 413