1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "fdqueue.h"
18#include "apr_atomic.h"
19
20typedef struct recycled_pool
21{
22    apr_pool_t *pool;
23    struct recycled_pool *next;
24} recycled_pool;
25
26struct fd_queue_info_t
27{
28    apr_int32_t idlers;      /**
29                                  * 0 or positive: number of idle worker threads
30                              * negative: number of threads blocked waiting
31                              *           for an idle worker
32                              */
33    apr_thread_mutex_t *idlers_mutex;
34    apr_thread_cond_t *wait_for_idler;
35    int terminated;
36    int max_idlers;
37    recycled_pool *recycled_pools;
38};
39
40static apr_status_t queue_info_cleanup(void *data_)
41{
42    fd_queue_info_t *qi = data_;
43    apr_thread_cond_destroy(qi->wait_for_idler);
44    apr_thread_mutex_destroy(qi->idlers_mutex);
45
46    /* Clean up any pools in the recycled list */
47    for (;;) {
48        struct recycled_pool *first_pool = qi->recycled_pools;
49        if (first_pool == NULL) {
50            break;
51        }
52        if (apr_atomic_casptr
53            ((volatile void **) &(qi->recycled_pools), first_pool->next,
54             first_pool) == first_pool) {
55            apr_pool_destroy(first_pool->pool);
56        }
57    }
58
59    return APR_SUCCESS;
60}
61
62apr_status_t ap_queue_info_create(fd_queue_info_t ** queue_info,
63                                  apr_pool_t * pool, int max_idlers)
64{
65    apr_status_t rv;
66    fd_queue_info_t *qi;
67
68    qi = apr_pcalloc(pool, sizeof(*qi));
69
70    rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT,
71                                 pool);
72    if (rv != APR_SUCCESS) {
73        return rv;
74    }
75    rv = apr_thread_cond_create(&qi->wait_for_idler, pool);
76    if (rv != APR_SUCCESS) {
77        return rv;
78    }
79    qi->recycled_pools = NULL;
80    qi->max_idlers = max_idlers;
81    apr_pool_cleanup_register(pool, qi, queue_info_cleanup,
82                              apr_pool_cleanup_null);
83
84    *queue_info = qi;
85
86    return APR_SUCCESS;
87}
88
89apr_status_t ap_queue_info_set_idle(fd_queue_info_t * queue_info,
90                                    apr_pool_t * pool_to_recycle)
91{
92    apr_status_t rv;
93    int prev_idlers;
94
95    ap_push_pool(queue_info, pool_to_recycle);
96
97    /* Atomically increment the count of idle workers */
98    prev_idlers = apr_atomic_inc32(&(queue_info->idlers));
99
100    /* If other threads are waiting on a worker, wake one up */
101    if (prev_idlers < 0) {
102        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
103        if (rv != APR_SUCCESS) {
104            AP_DEBUG_ASSERT(0);
105            return rv;
106        }
107        rv = apr_thread_cond_signal(queue_info->wait_for_idler);
108        if (rv != APR_SUCCESS) {
109            apr_thread_mutex_unlock(queue_info->idlers_mutex);
110            return rv;
111        }
112        rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
113        if (rv != APR_SUCCESS) {
114            return rv;
115        }
116    }
117
118    return APR_SUCCESS;
119}
120
121apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t * queue_info)
122{
123    apr_status_t rv;
124    int prev_idlers;
125
126    /* Atomically decrement the idle worker count, saving the old value */
127    prev_idlers = apr_atomic_add32(&(queue_info->idlers), -1);
128
129    /* Block if there weren't any idle workers */
130    if (prev_idlers <= 0) {
131        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
132        if (rv != APR_SUCCESS) {
133            AP_DEBUG_ASSERT(0);
134            apr_atomic_inc32(&(queue_info->idlers));    /* back out dec */
135            return rv;
136        }
137        /* Re-check the idle worker count to guard against a
138         * race condition.  Now that we're in the mutex-protected
139         * region, one of two things may have happened:
140         *   - If the idle worker count is still negative, the
141         *     workers are all still busy, so it's safe to
142         *     block on a condition variable.
143         *   - If the idle worker count is non-negative, then a
144         *     worker has become idle since the first check
145         *     of queue_info->idlers above.  It's possible
146         *     that the worker has also signaled the condition
147         *     variable--and if so, the listener missed it
148         *     because it wasn't yet blocked on the condition
149         *     variable.  But if the idle worker count is
150         *     now non-negative, it's safe for this function to
151         *     return immediately.
152         *
153         *     A negative value in queue_info->idlers tells how many
154         *     threads are waiting on an idle worker.
155         */
156        if (queue_info->idlers < 0) {
157            rv = apr_thread_cond_wait(queue_info->wait_for_idler,
158                                      queue_info->idlers_mutex);
159            if (rv != APR_SUCCESS) {
160                apr_status_t rv2;
161                AP_DEBUG_ASSERT(0);
162                rv2 = apr_thread_mutex_unlock(queue_info->idlers_mutex);
163                if (rv2 != APR_SUCCESS) {
164                    return rv2;
165                }
166                return rv;
167            }
168        }
169        rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
170        if (rv != APR_SUCCESS) {
171            return rv;
172        }
173    }
174
175    if (queue_info->terminated) {
176        return APR_EOF;
177    }
178    else {
179        return APR_SUCCESS;
180    }
181}
182
183
184void ap_push_pool(fd_queue_info_t * queue_info,
185                                    apr_pool_t * pool_to_recycle)
186{
187    /* If we have been given a pool to recycle, atomically link
188     * it into the queue_info's list of recycled pools
189     */
190    if (pool_to_recycle) {
191        struct recycled_pool *new_recycle;
192        new_recycle = (struct recycled_pool *) apr_palloc(pool_to_recycle,
193                                                          sizeof
194                                                          (*new_recycle));
195        new_recycle->pool = pool_to_recycle;
196        for (;;) {
197            /*
198             * Save queue_info->recycled_pool in local variable next because
199             * new_recycle->next can be changed after apr_atomic_casptr
200             * function call. For gory details see PR 44402.
201             */
202            struct recycled_pool *next = queue_info->recycled_pools;
203            new_recycle->next = next;
204            if (apr_atomic_casptr
205                ((volatile void **) &(queue_info->recycled_pools),
206                 new_recycle, next) == next) {
207                break;
208            }
209        }
210    }
211}
212
213void ap_pop_pool(apr_pool_t ** recycled_pool, fd_queue_info_t * queue_info)
214{
215    /* Atomically pop a pool from the recycled list */
216
217    /* This function is safe only as long as it is single threaded because
218     * it reaches into the queue and accesses "next" which can change.
219     * We are OK today because it is only called from the listener thread.
220     * cas-based pushes do not have the same limitation - any number can
221     * happen concurrently with a single cas-based pop.
222     */
223
224    *recycled_pool = NULL;
225
226
227    /* Atomically pop a pool from the recycled list */
228    for (;;) {
229        struct recycled_pool *first_pool = queue_info->recycled_pools;
230        if (first_pool == NULL) {
231            break;
232        }
233        if (apr_atomic_casptr
234            ((volatile void **) &(queue_info->recycled_pools),
235             first_pool->next, first_pool) == first_pool) {
236            *recycled_pool = first_pool->pool;
237            break;
238        }
239    }
240}
241
242apr_status_t ap_queue_info_term(fd_queue_info_t * queue_info)
243{
244    apr_status_t rv;
245    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
246    if (rv != APR_SUCCESS) {
247        return rv;
248    }
249    queue_info->terminated = 1;
250    apr_thread_cond_broadcast(queue_info->wait_for_idler);
251    return apr_thread_mutex_unlock(queue_info->idlers_mutex);
252}
253
254/**
255 * Detects when the fd_queue_t is full. This utility function is expected
256 * to be called from within critical sections, and is not threadsafe.
257 */
258#define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds)
259
260/**
261 * Detects when the fd_queue_t is empty. This utility function is expected
262 * to be called from within critical sections, and is not threadsafe.
263 */
264#define ap_queue_empty(queue) ((queue)->nelts == 0)
265
266/**
267 * Callback routine that is called to destroy this
268 * fd_queue_t when its pool is destroyed.
269 */
270static apr_status_t ap_queue_destroy(void *data)
271{
272    fd_queue_t *queue = data;
273
274    /* Ignore errors here, we can't do anything about them anyway.
275     * XXX: We should at least try to signal an error here, it is
276     * indicative of a programmer error. -aaron */
277    apr_thread_cond_destroy(queue->not_empty);
278    apr_thread_mutex_destroy(queue->one_big_mutex);
279
280    return APR_SUCCESS;
281}
282
283/**
284 * Initialize the fd_queue_t.
285 */
286apr_status_t ap_queue_init(fd_queue_t * queue, int queue_capacity,
287                           apr_pool_t * a)
288{
289    int i;
290    apr_status_t rv;
291
292    if ((rv = apr_thread_mutex_create(&queue->one_big_mutex,
293                                      APR_THREAD_MUTEX_DEFAULT,
294                                      a)) != APR_SUCCESS) {
295        return rv;
296    }
297    if ((rv = apr_thread_cond_create(&queue->not_empty, a)) != APR_SUCCESS) {
298        return rv;
299    }
300
301    queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
302    queue->bounds = queue_capacity;
303    queue->nelts = 0;
304
305    /* Set all the sockets in the queue to NULL */
306    for (i = 0; i < queue_capacity; ++i)
307        queue->data[i].sd = NULL;
308
309    apr_pool_cleanup_register(a, queue, ap_queue_destroy,
310                              apr_pool_cleanup_null);
311
312    return APR_SUCCESS;
313}
314
315/**
316 * Push a new socket onto the queue.
317 *
318 * precondition: ap_queue_info_wait_for_idler has already been called
319 *               to reserve an idle worker thread
320 */
321apr_status_t ap_queue_push(fd_queue_t * queue, apr_socket_t * sd,
322                           conn_state_t * cs, apr_pool_t * p)
323{
324    fd_queue_elem_t *elem;
325    apr_status_t rv;
326
327    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
328        return rv;
329    }
330
331    AP_DEBUG_ASSERT(!queue->terminated);
332    AP_DEBUG_ASSERT(!ap_queue_full(queue));
333
334    elem = &queue->data[queue->nelts];
335    elem->sd = sd;
336    elem->cs = cs;
337    elem->p = p;
338    queue->nelts++;
339
340    apr_thread_cond_signal(queue->not_empty);
341
342    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
343        return rv;
344    }
345
346    return APR_SUCCESS;
347}
348
349/**
350 * Retrieves the next available socket from the queue. If there are no
351 * sockets available, it will block until one becomes available.
352 * Once retrieved, the socket is placed into the address specified by
353 * 'sd'.
354 */
355apr_status_t ap_queue_pop(fd_queue_t * queue, apr_socket_t ** sd,
356                          conn_state_t ** cs, apr_pool_t ** p)
357{
358    fd_queue_elem_t *elem;
359    apr_status_t rv;
360
361    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
362        return rv;
363    }
364
365    /* Keep waiting until we wake up and find that the queue is not empty. */
366    if (ap_queue_empty(queue)) {
367        if (!queue->terminated) {
368            apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
369        }
370        /* If we wake up and it's still empty, then we were interrupted */
371        if (ap_queue_empty(queue)) {
372            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
373            if (rv != APR_SUCCESS) {
374                return rv;
375            }
376            if (queue->terminated) {
377                return APR_EOF; /* no more elements ever again */
378            }
379            else {
380                return APR_EINTR;
381            }
382        }
383    }
384
385    elem = &queue->data[--queue->nelts];
386    *sd = elem->sd;
387    *cs = elem->cs;
388    *p = elem->p;
389#ifdef AP_DEBUG
390    elem->sd = NULL;
391    elem->p = NULL;
392#endif /* AP_DEBUG */
393
394    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
395    return rv;
396}
397
398apr_status_t ap_queue_interrupt_all(fd_queue_t * queue)
399{
400    apr_status_t rv;
401
402    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
403        return rv;
404    }
405    apr_thread_cond_broadcast(queue->not_empty);
406    return apr_thread_mutex_unlock(queue->one_big_mutex);
407}
408
409apr_status_t ap_queue_term(fd_queue_t * queue)
410{
411    apr_status_t rv;
412
413    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
414        return rv;
415    }
416    /* we must hold one_big_mutex when setting this... otherwise,
417     * we could end up setting it and waking everybody up just after a
418     * would-be popper checks it but right before they block
419     */
420    queue->terminated = 1;
421    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
422        return rv;
423    }
424    return ap_queue_interrupt_all(queue);
425}
426