1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "fdqueue.h"
18#include "apr_atomic.h"
19
20typedef struct recycled_pool {
21    apr_pool_t *pool;
22    struct recycled_pool *next;
23} recycled_pool;
24
25struct fd_queue_info_t {
26    apr_uint32_t idlers;
27    apr_thread_mutex_t *idlers_mutex;
28    apr_thread_cond_t *wait_for_idler;
29    int terminated;
30    int max_idlers;
31    recycled_pool  *recycled_pools;
32};
33
34static apr_status_t queue_info_cleanup(void *data_)
35{
36    fd_queue_info_t *qi = data_;
37    apr_thread_cond_destroy(qi->wait_for_idler);
38    apr_thread_mutex_destroy(qi->idlers_mutex);
39
40    /* Clean up any pools in the recycled list */
41    for (;;) {
42        struct recycled_pool *first_pool = qi->recycled_pools;
43        if (first_pool == NULL) {
44            break;
45        }
46        if (apr_atomic_casptr((void*)&(qi->recycled_pools), first_pool->next,
47                              first_pool) == first_pool) {
48            apr_pool_destroy(first_pool->pool);
49        }
50    }
51
52    return APR_SUCCESS;
53}
54
55apr_status_t ap_queue_info_create(fd_queue_info_t **queue_info,
56                                  apr_pool_t *pool, int max_idlers)
57{
58    apr_status_t rv;
59    fd_queue_info_t *qi;
60
61    qi = apr_pcalloc(pool, sizeof(*qi));
62
63    rv = apr_thread_mutex_create(&qi->idlers_mutex, APR_THREAD_MUTEX_DEFAULT,
64                                 pool);
65    if (rv != APR_SUCCESS) {
66        return rv;
67    }
68    rv = apr_thread_cond_create(&qi->wait_for_idler, pool);
69    if (rv != APR_SUCCESS) {
70        return rv;
71    }
72    qi->recycled_pools = NULL;
73    qi->max_idlers = max_idlers;
74    apr_pool_cleanup_register(pool, qi, queue_info_cleanup,
75                              apr_pool_cleanup_null);
76
77    *queue_info = qi;
78
79    return APR_SUCCESS;
80}
81
82apr_status_t ap_queue_info_set_idle(fd_queue_info_t *queue_info,
83                                    apr_pool_t *pool_to_recycle)
84{
85    apr_status_t rv;
86    int prev_idlers;
87
88    /* If we have been given a pool to recycle, atomically link
89     * it into the queue_info's list of recycled pools
90     */
91    if (pool_to_recycle) {
92        struct recycled_pool *new_recycle;
93        new_recycle = (struct recycled_pool *)apr_palloc(pool_to_recycle,
94                                                         sizeof(*new_recycle));
95        new_recycle->pool = pool_to_recycle;
96        for (;;) {
97            /* Save queue_info->recycled_pool in local variable next because
98             * new_recycle->next can be changed after apr_atomic_casptr
99             * function call. For gory details see PR 44402.
100             */
101            struct recycled_pool *next = queue_info->recycled_pools;
102            new_recycle->next = next;
103            if (apr_atomic_casptr((void*)&(queue_info->recycled_pools),
104                                  new_recycle, next) == next) {
105                break;
106            }
107        }
108    }
109
110    /* Atomically increment the count of idle workers */
111    for (;;) {
112        prev_idlers = queue_info->idlers;
113        if (apr_atomic_cas32(&(queue_info->idlers), prev_idlers + 1,
114                             prev_idlers) == prev_idlers) {
115            break;
116        }
117    }
118
119    /* If this thread just made the idle worker count nonzero,
120     * wake up the listener. */
121    if (prev_idlers == 0) {
122        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
123        if (rv != APR_SUCCESS) {
124            return rv;
125        }
126        rv = apr_thread_cond_signal(queue_info->wait_for_idler);
127        if (rv != APR_SUCCESS) {
128            apr_thread_mutex_unlock(queue_info->idlers_mutex);
129            return rv;
130        }
131        rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
132        if (rv != APR_SUCCESS) {
133            return rv;
134        }
135    }
136
137    return APR_SUCCESS;
138}
139
140apr_status_t ap_queue_info_wait_for_idler(fd_queue_info_t *queue_info,
141                                          apr_pool_t **recycled_pool)
142{
143    apr_status_t rv;
144
145    *recycled_pool = NULL;
146
147    /* Block if the count of idle workers is zero */
148    if (queue_info->idlers == 0) {
149        rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
150        if (rv != APR_SUCCESS) {
151            return rv;
152        }
153        /* Re-check the idle worker count to guard against a
154         * race condition.  Now that we're in the mutex-protected
155         * region, one of two things may have happened:
156         *   - If the idle worker count is still zero, the
157         *     workers are all still busy, so it's safe to
158         *     block on a condition variable, BUT
159         *     we need to check for idle worker count again
160         *     when we are signaled since it can happen that
161         *     we are signaled by a worker thread that went idle
162         *     but received a context switch before it could
163         *     tell us. If it does signal us later once it is on
164         *     CPU again there might be no idle worker left.
165         *     See
166         *     https://issues.apache.org/bugzilla/show_bug.cgi?id=45605#c4
167         *   - If the idle worker count is nonzero, then a
168         *     worker has become idle since the first check
169         *     of queue_info->idlers above.  It's possible
170         *     that the worker has also signaled the condition
171         *     variable--and if so, the listener missed it
172         *     because it wasn't yet blocked on the condition
173         *     variable.  But if the idle worker count is
174         *     now nonzero, it's safe for this function to
175         *     return immediately.
176         */
177        while (queue_info->idlers == 0) {
178            rv = apr_thread_cond_wait(queue_info->wait_for_idler,
179                                  queue_info->idlers_mutex);
180            if (rv != APR_SUCCESS) {
181                apr_status_t rv2;
182                rv2 = apr_thread_mutex_unlock(queue_info->idlers_mutex);
183                if (rv2 != APR_SUCCESS) {
184                    return rv2;
185                }
186                return rv;
187            }
188        }
189        rv = apr_thread_mutex_unlock(queue_info->idlers_mutex);
190        if (rv != APR_SUCCESS) {
191            return rv;
192        }
193    }
194
195    /* Atomically decrement the idle worker count */
196    apr_atomic_dec32(&(queue_info->idlers));
197
198    /* Atomically pop a pool from the recycled list */
199
200    /* This function is safe only as long as it is single threaded because
201     * it reaches into the queue and accesses "next" which can change.
202     * We are OK today because it is only called from the listener thread.
203     * cas-based pushes do not have the same limitation - any number can
204     * happen concurrently with a single cas-based pop.
205     */
206
207    for (;;) {
208        struct recycled_pool *first_pool = queue_info->recycled_pools;
209        if (first_pool == NULL) {
210            break;
211        }
212        if (apr_atomic_casptr((void*)&(queue_info->recycled_pools), first_pool->next,
213                              first_pool) == first_pool) {
214            *recycled_pool = first_pool->pool;
215            break;
216        }
217    }
218
219    if (queue_info->terminated) {
220        return APR_EOF;
221    }
222    else {
223        return APR_SUCCESS;
224    }
225}
226
227apr_status_t ap_queue_info_term(fd_queue_info_t *queue_info)
228{
229    apr_status_t rv;
230    rv = apr_thread_mutex_lock(queue_info->idlers_mutex);
231    if (rv != APR_SUCCESS) {
232        return rv;
233    }
234    queue_info->terminated = 1;
235    apr_thread_cond_broadcast(queue_info->wait_for_idler);
236    return apr_thread_mutex_unlock(queue_info->idlers_mutex);
237}
238
239/**
240 * Detects when the fd_queue_t is full. This utility function is expected
241 * to be called from within critical sections, and is not threadsafe.
242 */
243#define ap_queue_full(queue) ((queue)->nelts == (queue)->bounds)
244
245/**
246 * Detects when the fd_queue_t is empty. This utility function is expected
247 * to be called from within critical sections, and is not threadsafe.
248 */
249#define ap_queue_empty(queue) ((queue)->nelts == 0)
250
251/**
252 * Callback routine that is called to destroy this
253 * fd_queue_t when its pool is destroyed.
254 */
255static apr_status_t ap_queue_destroy(void *data)
256{
257    fd_queue_t *queue = data;
258
259    /* Ignore errors here, we can't do anything about them anyway.
260     * XXX: We should at least try to signal an error here, it is
261     * indicative of a programmer error. -aaron */
262    apr_thread_cond_destroy(queue->not_empty);
263    apr_thread_mutex_destroy(queue->one_big_mutex);
264
265    return APR_SUCCESS;
266}
267
268/**
269 * Initialize the fd_queue_t.
270 */
271apr_status_t ap_queue_init(fd_queue_t *queue, int queue_capacity, apr_pool_t *a)
272{
273    int i;
274    apr_status_t rv;
275
276    if ((rv = apr_thread_mutex_create(&queue->one_big_mutex,
277                                      APR_THREAD_MUTEX_DEFAULT, a)) != APR_SUCCESS) {
278        return rv;
279    }
280    if ((rv = apr_thread_cond_create(&queue->not_empty, a)) != APR_SUCCESS) {
281        return rv;
282    }
283
284    queue->data = apr_palloc(a, queue_capacity * sizeof(fd_queue_elem_t));
285    queue->bounds = queue_capacity;
286    queue->nelts = 0;
287    queue->in = 0;
288    queue->out = 0;
289
290    /* Set all the sockets in the queue to NULL */
291    for (i = 0; i < queue_capacity; ++i)
292        queue->data[i].sd = NULL;
293
294    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
295
296    return APR_SUCCESS;
297}
298
299/**
300 * Push a new socket onto the queue.
301 *
302 * precondition: ap_queue_info_wait_for_idler has already been called
303 *               to reserve an idle worker thread
304 */
305apr_status_t ap_queue_push(fd_queue_t *queue, apr_socket_t *sd, apr_pool_t *p)
306{
307    fd_queue_elem_t *elem;
308    apr_status_t rv;
309
310    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
311        return rv;
312    }
313
314    AP_DEBUG_ASSERT(!queue->terminated);
315    AP_DEBUG_ASSERT(!ap_queue_full(queue));
316
317    elem = &queue->data[queue->in];
318    queue->in++;
319    if (queue->in >= queue->bounds)
320        queue->in -= queue->bounds;
321    elem->sd = sd;
322    elem->p = p;
323    queue->nelts++;
324
325    apr_thread_cond_signal(queue->not_empty);
326
327    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
328        return rv;
329    }
330
331    return APR_SUCCESS;
332}
333
334/**
335 * Retrieves the next available socket from the queue. If there are no
336 * sockets available, it will block until one becomes available.
337 * Once retrieved, the socket is placed into the address specified by
338 * 'sd'.
339 */
340apr_status_t ap_queue_pop(fd_queue_t *queue, apr_socket_t **sd, apr_pool_t **p)
341{
342    fd_queue_elem_t *elem;
343    apr_status_t rv;
344
345    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
346        return rv;
347    }
348
349    /* Keep waiting until we wake up and find that the queue is not empty. */
350    if (ap_queue_empty(queue)) {
351        if (!queue->terminated) {
352            apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
353        }
354        /* If we wake up and it's still empty, then we were interrupted */
355        if (ap_queue_empty(queue)) {
356            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
357            if (rv != APR_SUCCESS) {
358                return rv;
359            }
360            if (queue->terminated) {
361                return APR_EOF; /* no more elements ever again */
362            }
363            else {
364                return APR_EINTR;
365            }
366        }
367    }
368
369    elem = &queue->data[queue->out];
370    queue->out++;
371    if (queue->out >= queue->bounds)
372        queue->out -= queue->bounds;
373    queue->nelts--;
374    *sd = elem->sd;
375    *p = elem->p;
376#ifdef AP_DEBUG
377    elem->sd = NULL;
378    elem->p = NULL;
379#endif /* AP_DEBUG */
380
381    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
382    return rv;
383}
384
385apr_status_t ap_queue_interrupt_all(fd_queue_t *queue)
386{
387    apr_status_t rv;
388
389    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
390        return rv;
391    }
392    apr_thread_cond_broadcast(queue->not_empty);
393    return apr_thread_mutex_unlock(queue->one_big_mutex);
394}
395
396apr_status_t ap_queue_term(fd_queue_t *queue)
397{
398    apr_status_t rv;
399
400    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
401        return rv;
402    }
403    /* we must hold one_big_mutex when setting this... otherwise,
404     * we could end up setting it and waking everybody up just after a
405     * would-be popper checks it but right before they block
406     */
407    queue->terminated = 1;
408    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
409        return rv;
410    }
411    return ap_queue_interrupt_all(queue);
412}
413