1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "apr.h"
18
19#if APR_HAVE_STDIO_H
20#include <stdio.h>
21#endif
22#if APR_HAVE_STDLIB_H
23#include <stdlib.h>
24#endif
25#if APR_HAVE_UNISTD_H
26#include <unistd.h>
27#endif
28
29#include "apu.h"
30#include "apr_portable.h"
31#include "apr_thread_mutex.h"
32#include "apr_thread_cond.h"
33#include "apr_errno.h"
34#include "apr_queue.h"
35
36#if APR_HAS_THREADS
37/*
38 * define this to get debug messages
39 *
40#define QUEUE_DEBUG
41 */
42
43struct apr_queue_t {
44    void              **data;
45    unsigned int        nelts; /**< # elements */
46    unsigned int        in;    /**< next empty location */
47    unsigned int        out;   /**< next filled location */
48    unsigned int        bounds;/**< max size of queue */
49    unsigned int        full_waiters;
50    unsigned int        empty_waiters;
51    apr_thread_mutex_t *one_big_mutex;
52    apr_thread_cond_t  *not_empty;
53    apr_thread_cond_t  *not_full;
54    int                 terminated;
55};
56
57#ifdef QUEUE_DEBUG
58static void Q_DBG(char*msg, apr_queue_t *q) {
59    fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n",
60                    apr_os_thread_current(),
61                    q->nelts, q->in, q->out,
62                    msg
63                    );
64}
65#else
66#define Q_DBG(x,y)
67#endif
68
69/**
70 * Detects when the apr_queue_t is full. This utility function is expected
71 * to be called from within critical sections, and is not threadsafe.
72 */
73#define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
74
75/**
76 * Detects when the apr_queue_t is empty. This utility function is expected
77 * to be called from within critical sections, and is not threadsafe.
78 */
79#define apr_queue_empty(queue) ((queue)->nelts == 0)
80
81/**
82 * Callback routine that is called to destroy this
83 * apr_queue_t when its pool is destroyed.
84 */
85static apr_status_t queue_destroy(void *data)
86{
87    apr_queue_t *queue = data;
88
89    /* Ignore errors here, we can't do anything about them anyway. */
90
91    apr_thread_cond_destroy(queue->not_empty);
92    apr_thread_cond_destroy(queue->not_full);
93    apr_thread_mutex_destroy(queue->one_big_mutex);
94
95    return APR_SUCCESS;
96}
97
98/**
99 * Initialize the apr_queue_t.
100 */
101APU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q,
102                                           unsigned int queue_capacity,
103                                           apr_pool_t *a)
104{
105    apr_status_t rv;
106    apr_queue_t *queue;
107    queue = apr_palloc(a, sizeof(apr_queue_t));
108    *q = queue;
109
110    /* nested doesn't work ;( */
111    rv = apr_thread_mutex_create(&queue->one_big_mutex,
112                                 APR_THREAD_MUTEX_UNNESTED,
113                                 a);
114    if (rv != APR_SUCCESS) {
115        return rv;
116    }
117
118    rv = apr_thread_cond_create(&queue->not_empty, a);
119    if (rv != APR_SUCCESS) {
120        return rv;
121    }
122
123    rv = apr_thread_cond_create(&queue->not_full, a);
124    if (rv != APR_SUCCESS) {
125        return rv;
126    }
127
128    /* Set all the data in the queue to NULL */
129    queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*));
130    queue->bounds = queue_capacity;
131    queue->nelts = 0;
132    queue->in = 0;
133    queue->out = 0;
134    queue->terminated = 0;
135    queue->full_waiters = 0;
136    queue->empty_waiters = 0;
137
138    apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null);
139
140    return APR_SUCCESS;
141}
142
143/**
144 * Push new data onto the queue. Blocks if the queue is full. Once
145 * the push operation has completed, it signals other threads waiting
146 * in apr_queue_pop() that they may continue consuming sockets.
147 */
148APU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
149{
150    apr_status_t rv;
151
152    if (queue->terminated) {
153        return APR_EOF; /* no more elements ever again */
154    }
155
156    rv = apr_thread_mutex_lock(queue->one_big_mutex);
157    if (rv != APR_SUCCESS) {
158        return rv;
159    }
160
161    if (apr_queue_full(queue)) {
162        if (!queue->terminated) {
163            queue->full_waiters++;
164            rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
165            queue->full_waiters--;
166            if (rv != APR_SUCCESS) {
167                apr_thread_mutex_unlock(queue->one_big_mutex);
168                return rv;
169            }
170        }
171        /* If we wake up and it's still empty, then we were interrupted */
172        if (apr_queue_full(queue)) {
173            Q_DBG("queue full (intr)", queue);
174            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
175            if (rv != APR_SUCCESS) {
176                return rv;
177            }
178            if (queue->terminated) {
179                return APR_EOF; /* no more elements ever again */
180            }
181            else {
182                return APR_EINTR;
183            }
184        }
185    }
186
187    queue->data[queue->in] = data;
188    queue->in++;
189    if (queue->in >= queue->bounds)
190        queue->in -= queue->bounds;
191    queue->nelts++;
192
193    if (queue->empty_waiters) {
194        Q_DBG("sig !empty", queue);
195        rv = apr_thread_cond_signal(queue->not_empty);
196        if (rv != APR_SUCCESS) {
197            apr_thread_mutex_unlock(queue->one_big_mutex);
198            return rv;
199        }
200    }
201
202    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
203    return rv;
204}
205
206/**
207 * Push new data onto the queue. If the queue is full, return APR_EAGAIN. If
208 * the push operation completes successfully, it signals other threads
209 * waiting in apr_queue_pop() that they may continue consuming sockets.
210 */
211APU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
212{
213    apr_status_t rv;
214
215    if (queue->terminated) {
216        return APR_EOF; /* no more elements ever again */
217    }
218
219    rv = apr_thread_mutex_lock(queue->one_big_mutex);
220    if (rv != APR_SUCCESS) {
221        return rv;
222    }
223
224    if (apr_queue_full(queue)) {
225        rv = apr_thread_mutex_unlock(queue->one_big_mutex);
226        return APR_EAGAIN;
227    }
228
229    queue->data[queue->in] = data;
230    queue->in++;
231    if (queue->in >= queue->bounds)
232        queue->in -= queue->bounds;
233    queue->nelts++;
234
235    if (queue->empty_waiters) {
236        Q_DBG("sig !empty", queue);
237        rv  = apr_thread_cond_signal(queue->not_empty);
238        if (rv != APR_SUCCESS) {
239            apr_thread_mutex_unlock(queue->one_big_mutex);
240            return rv;
241        }
242    }
243
244    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
245    return rv;
246}
247
248/**
249 * not thread safe
250 */
251APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
252    return queue->nelts;
253}
254
255/**
256 * Retrieves the next item from the queue. If there are no
257 * items available, it will block until one becomes available.
258 * Once retrieved, the item is placed into the address specified by
259 * 'data'.
260 */
261APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
262{
263    apr_status_t rv;
264
265    if (queue->terminated) {
266        return APR_EOF; /* no more elements ever again */
267    }
268
269    rv = apr_thread_mutex_lock(queue->one_big_mutex);
270    if (rv != APR_SUCCESS) {
271        return rv;
272    }
273
274    /* Keep waiting until we wake up and find that the queue is not empty. */
275    if (apr_queue_empty(queue)) {
276        if (!queue->terminated) {
277            queue->empty_waiters++;
278            rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
279            queue->empty_waiters--;
280            if (rv != APR_SUCCESS) {
281                apr_thread_mutex_unlock(queue->one_big_mutex);
282                return rv;
283            }
284        }
285        /* If we wake up and it's still empty, then we were interrupted */
286        if (apr_queue_empty(queue)) {
287            Q_DBG("queue empty (intr)", queue);
288            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
289            if (rv != APR_SUCCESS) {
290                return rv;
291            }
292            if (queue->terminated) {
293                return APR_EOF; /* no more elements ever again */
294            }
295            else {
296                return APR_EINTR;
297            }
298        }
299    }
300
301    *data = queue->data[queue->out];
302    queue->nelts--;
303
304    queue->out++;
305    if (queue->out >= queue->bounds)
306        queue->out -= queue->bounds;
307    if (queue->full_waiters) {
308        Q_DBG("signal !full", queue);
309        rv = apr_thread_cond_signal(queue->not_full);
310        if (rv != APR_SUCCESS) {
311            apr_thread_mutex_unlock(queue->one_big_mutex);
312            return rv;
313        }
314    }
315
316    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
317    return rv;
318}
319
320/**
321 * Retrieves the next item from the queue. If there are no
322 * items available, return APR_EAGAIN.  Once retrieved,
323 * the item is placed into the address specified by 'data'.
324 */
325APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
326{
327    apr_status_t rv;
328
329    if (queue->terminated) {
330        return APR_EOF; /* no more elements ever again */
331    }
332
333    rv = apr_thread_mutex_lock(queue->one_big_mutex);
334    if (rv != APR_SUCCESS) {
335        return rv;
336    }
337
338    if (apr_queue_empty(queue)) {
339        rv = apr_thread_mutex_unlock(queue->one_big_mutex);
340        return APR_EAGAIN;
341    }
342
343    *data = queue->data[queue->out];
344    queue->nelts--;
345
346    queue->out++;
347    if (queue->out >= queue->bounds)
348        queue->out -= queue->bounds;
349    if (queue->full_waiters) {
350        Q_DBG("signal !full", queue);
351        rv = apr_thread_cond_signal(queue->not_full);
352        if (rv != APR_SUCCESS) {
353            apr_thread_mutex_unlock(queue->one_big_mutex);
354            return rv;
355        }
356    }
357
358    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
359    return rv;
360}
361
362APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)
363{
364    apr_status_t rv;
365    Q_DBG("intr all", queue);
366    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
367        return rv;
368    }
369    apr_thread_cond_broadcast(queue->not_empty);
370    apr_thread_cond_broadcast(queue->not_full);
371
372    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
373        return rv;
374    }
375
376    return APR_SUCCESS;
377}
378
379APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue)
380{
381    apr_status_t rv;
382
383    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
384        return rv;
385    }
386
387    /* we must hold one_big_mutex when setting this... otherwise,
388     * we could end up setting it and waking everybody up just after a
389     * would-be popper checks it but right before they block
390     */
391    queue->terminated = 1;
392    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
393        return rv;
394    }
395    return apr_queue_interrupt_all(queue);
396}
397
398#endif /* APR_HAS_THREADS */
399