1238106Sdes/* Licensed to the Apache Software Foundation (ASF) under one or more
2238106Sdes * contributor license agreements.  See the NOTICE file distributed with
3238106Sdes * this work for additional information regarding copyright ownership.
4238106Sdes * The ASF licenses this file to You under the Apache License, Version 2.0
5238106Sdes * (the "License"); you may not use this file except in compliance with
6238106Sdes * the License.  You may obtain a copy of the License at
7238106Sdes *
8238106Sdes *     http://www.apache.org/licenses/LICENSE-2.0
9238106Sdes *
10238106Sdes * Unless required by applicable law or agreed to in writing, software
11238106Sdes * distributed under the License is distributed on an "AS IS" BASIS,
12238106Sdes * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13238106Sdes * See the License for the specific language governing permissions and
14238106Sdes * limitations under the License.
15238106Sdes */
16238106Sdes
17238106Sdes#include "apr.h"
18238106Sdes
19238106Sdes#if APR_HAVE_STDIO_H
20238106Sdes#include <stdio.h>
21238106Sdes#endif
22238106Sdes#if APR_HAVE_STDLIB_H
23238106Sdes#include <stdlib.h>
24269257Sdes#endif
25269257Sdes#if APR_HAVE_UNISTD_H
26269257Sdes#include <unistd.h>
27269257Sdes#endif
28269257Sdes
29269257Sdes#include "apu.h"
30269257Sdes#include "apr_portable.h"
31269257Sdes#include "apr_thread_mutex.h"
32269257Sdes#include "apr_thread_cond.h"
33269257Sdes#include "apr_errno.h"
34238106Sdes#include "apr_queue.h"
35238106Sdes
36238106Sdes#if APR_HAS_THREADS
37238106Sdes/*
38238106Sdes * define this to get debug messages
39238106Sdes *
40238106Sdes#define QUEUE_DEBUG
41238106Sdes */
42238106Sdes
43238106Sdesstruct apr_queue_t {
44238106Sdes    void              **data;
45238106Sdes    unsigned int        nelts; /**< # elements */
46269257Sdes    unsigned int        in;    /**< next empty location */
47238106Sdes    unsigned int        out;   /**< next filled location */
48238106Sdes    unsigned int        bounds;/**< max size of queue */
49238106Sdes    unsigned int        full_waiters;
50238106Sdes    unsigned int        empty_waiters;
51238106Sdes    apr_thread_mutex_t *one_big_mutex;
52238106Sdes    apr_thread_cond_t  *not_empty;
53238106Sdes    apr_thread_cond_t  *not_full;
54238106Sdes    int                 terminated;
55238106Sdes};
56238106Sdes
57238106Sdes#ifdef QUEUE_DEBUG
58238106Sdesstatic void Q_DBG(char*msg, apr_queue_t *q) {
59238106Sdes    fprintf(stderr, "%ld\t#%d in %d out %d\t%s\n",
60238106Sdes                    apr_os_thread_current(),
61238106Sdes                    q->nelts, q->in, q->out,
62238106Sdes                    msg
63238106Sdes                    );
64238106Sdes}
65238106Sdes#else
66238106Sdes#define Q_DBG(x,y)
67238106Sdes#endif
68238106Sdes
69238106Sdes/**
70238106Sdes * Detects when the apr_queue_t is full. This utility function is expected
71238106Sdes * to be called from within critical sections, and is not threadsafe.
72238106Sdes */
73238106Sdes#define apr_queue_full(queue) ((queue)->nelts == (queue)->bounds)
74238106Sdes
75238106Sdes/**
76238106Sdes * Detects when the apr_queue_t is empty. This utility function is expected
77238106Sdes * to be called from within critical sections, and is not threadsafe.
78238106Sdes */
79238106Sdes#define apr_queue_empty(queue) ((queue)->nelts == 0)
80238106Sdes
81238106Sdes/**
82238106Sdes * Callback routine that is called to destroy this
83238106Sdes * apr_queue_t when its pool is destroyed.
84238106Sdes */
85238106Sdesstatic apr_status_t queue_destroy(void *data)
86238106Sdes{
87238106Sdes    apr_queue_t *queue = data;
88238106Sdes
89238106Sdes    /* Ignore errors here, we can't do anything about them anyway. */
90238106Sdes
91238106Sdes    apr_thread_cond_destroy(queue->not_empty);
92238106Sdes    apr_thread_cond_destroy(queue->not_full);
93238106Sdes    apr_thread_mutex_destroy(queue->one_big_mutex);
94238106Sdes
95238106Sdes    return APR_SUCCESS;
96238106Sdes}
97238106Sdes
98238106Sdes/**
99238106Sdes * Initialize the apr_queue_t.
100238106Sdes */
101238106SdesAPU_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q,
102238106Sdes                                           unsigned int queue_capacity,
103238106Sdes                                           apr_pool_t *a)
104238106Sdes{
105238106Sdes    apr_status_t rv;
106238106Sdes    apr_queue_t *queue;
107238106Sdes    queue = apr_palloc(a, sizeof(apr_queue_t));
108238106Sdes    *q = queue;
109238106Sdes
110238106Sdes    /* nested doesn't work ;( */
111238106Sdes    rv = apr_thread_mutex_create(&queue->one_big_mutex,
112238106Sdes                                 APR_THREAD_MUTEX_UNNESTED,
113238106Sdes                                 a);
114238106Sdes    if (rv != APR_SUCCESS) {
115238106Sdes        return rv;
116238106Sdes    }
117238106Sdes
118238106Sdes    rv = apr_thread_cond_create(&queue->not_empty, a);
119238106Sdes    if (rv != APR_SUCCESS) {
120238106Sdes        return rv;
121238106Sdes    }
122238106Sdes
123238106Sdes    rv = apr_thread_cond_create(&queue->not_full, a);
124238106Sdes    if (rv != APR_SUCCESS) {
125238106Sdes        return rv;
126238106Sdes    }
127238106Sdes
128238106Sdes    /* Set all the data in the queue to NULL */
129238106Sdes    queue->data = apr_pcalloc(a, queue_capacity * sizeof(void*));
130238106Sdes    queue->bounds = queue_capacity;
131238106Sdes    queue->nelts = 0;
132238106Sdes    queue->in = 0;
133238106Sdes    queue->out = 0;
134238106Sdes    queue->terminated = 0;
135238106Sdes    queue->full_waiters = 0;
136238106Sdes    queue->empty_waiters = 0;
137238106Sdes
138238106Sdes    apr_pool_cleanup_register(a, queue, queue_destroy, apr_pool_cleanup_null);
139238106Sdes
140238106Sdes    return APR_SUCCESS;
141238106Sdes}
142238106Sdes
143238106Sdes/**
144238106Sdes * Push new data onto the queue. Blocks if the queue is full. Once
145238106Sdes * the push operation has completed, it signals other threads waiting
146238106Sdes * in apr_queue_pop() that they may continue consuming sockets.
147238106Sdes */
148238106SdesAPU_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
149238106Sdes{
150238106Sdes    apr_status_t rv;
151238106Sdes
152238106Sdes    if (queue->terminated) {
153238106Sdes        return APR_EOF; /* no more elements ever again */
154238106Sdes    }
155238106Sdes
156238106Sdes    rv = apr_thread_mutex_lock(queue->one_big_mutex);
157238106Sdes    if (rv != APR_SUCCESS) {
158238106Sdes        return rv;
159238106Sdes    }
160238106Sdes
161238106Sdes    if (apr_queue_full(queue)) {
162238106Sdes        if (!queue->terminated) {
163238106Sdes            queue->full_waiters++;
164238106Sdes            rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
165238106Sdes            queue->full_waiters--;
166238106Sdes            if (rv != APR_SUCCESS) {
167238106Sdes                apr_thread_mutex_unlock(queue->one_big_mutex);
168238106Sdes                return rv;
169238106Sdes            }
170238106Sdes        }
171238106Sdes        /* If we wake up and it's still empty, then we were interrupted */
172238106Sdes        if (apr_queue_full(queue)) {
173238106Sdes            Q_DBG("queue full (intr)", queue);
174238106Sdes            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
175238106Sdes            if (rv != APR_SUCCESS) {
176238106Sdes                return rv;
177238106Sdes            }
178238106Sdes            if (queue->terminated) {
179238106Sdes                return APR_EOF; /* no more elements ever again */
180238106Sdes            }
181238106Sdes            else {
182238106Sdes                return APR_EINTR;
183238106Sdes            }
184238106Sdes        }
185238106Sdes    }
186238106Sdes
187238106Sdes    queue->data[queue->in] = data;
188238106Sdes    queue->in++;
189238106Sdes    if (queue->in >= queue->bounds)
190238106Sdes        queue->in -= queue->bounds;
191238106Sdes    queue->nelts++;
192238106Sdes
193238106Sdes    if (queue->empty_waiters) {
194238106Sdes        Q_DBG("sig !empty", queue);
195238106Sdes        rv = apr_thread_cond_signal(queue->not_empty);
196238106Sdes        if (rv != APR_SUCCESS) {
197238106Sdes            apr_thread_mutex_unlock(queue->one_big_mutex);
198238106Sdes            return rv;
199238106Sdes        }
200238106Sdes    }
201238106Sdes
202238106Sdes    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
203238106Sdes    return rv;
204238106Sdes}
205238106Sdes
206238106Sdes/**
207238106Sdes * Push new data onto the queue. If the queue is full, return APR_EAGAIN. If
208238106Sdes * the push operation completes successfully, it signals other threads
209238106Sdes * waiting in apr_queue_pop() that they may continue consuming sockets.
210238106Sdes */
211238106SdesAPU_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
212238106Sdes{
213238106Sdes    apr_status_t rv;
214238106Sdes
215238106Sdes    if (queue->terminated) {
216238106Sdes        return APR_EOF; /* no more elements ever again */
217238106Sdes    }
218238106Sdes
219238106Sdes    rv = apr_thread_mutex_lock(queue->one_big_mutex);
220238106Sdes    if (rv != APR_SUCCESS) {
221238106Sdes        return rv;
222238106Sdes    }
223238106Sdes
224238106Sdes    if (apr_queue_full(queue)) {
225238106Sdes        rv = apr_thread_mutex_unlock(queue->one_big_mutex);
226238106Sdes        return APR_EAGAIN;
227238106Sdes    }
228238106Sdes
229238106Sdes    queue->data[queue->in] = data;
230238106Sdes    queue->in++;
231238106Sdes    if (queue->in >= queue->bounds)
232238106Sdes        queue->in -= queue->bounds;
233238106Sdes    queue->nelts++;
234238106Sdes
235238106Sdes    if (queue->empty_waiters) {
236238106Sdes        Q_DBG("sig !empty", queue);
237238106Sdes        rv  = apr_thread_cond_signal(queue->not_empty);
238238106Sdes        if (rv != APR_SUCCESS) {
239238106Sdes            apr_thread_mutex_unlock(queue->one_big_mutex);
240238106Sdes            return rv;
241238106Sdes        }
242238106Sdes    }
243238106Sdes
244238106Sdes    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
245238106Sdes    return rv;
246238106Sdes}
247238106Sdes
248/**
249 * not thread safe
250 */
251APU_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
252    return queue->nelts;
253}
254
255/**
256 * Retrieves the next item from the queue. If there are no
257 * items available, it will block until one becomes available.
258 * Once retrieved, the item is placed into the address specified by
259 * 'data'.
260 */
261APU_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
262{
263    apr_status_t rv;
264
265    if (queue->terminated) {
266        return APR_EOF; /* no more elements ever again */
267    }
268
269    rv = apr_thread_mutex_lock(queue->one_big_mutex);
270    if (rv != APR_SUCCESS) {
271        return rv;
272    }
273
274    /* Keep waiting until we wake up and find that the queue is not empty. */
275    if (apr_queue_empty(queue)) {
276        if (!queue->terminated) {
277            queue->empty_waiters++;
278            rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
279            queue->empty_waiters--;
280            if (rv != APR_SUCCESS) {
281                apr_thread_mutex_unlock(queue->one_big_mutex);
282                return rv;
283            }
284        }
285        /* If we wake up and it's still empty, then we were interrupted */
286        if (apr_queue_empty(queue)) {
287            Q_DBG("queue empty (intr)", queue);
288            rv = apr_thread_mutex_unlock(queue->one_big_mutex);
289            if (rv != APR_SUCCESS) {
290                return rv;
291            }
292            if (queue->terminated) {
293                return APR_EOF; /* no more elements ever again */
294            }
295            else {
296                return APR_EINTR;
297            }
298        }
299    }
300
301    *data = queue->data[queue->out];
302    queue->nelts--;
303
304    queue->out++;
305    if (queue->out >= queue->bounds)
306        queue->out -= queue->bounds;
307    if (queue->full_waiters) {
308        Q_DBG("signal !full", queue);
309        rv = apr_thread_cond_signal(queue->not_full);
310        if (rv != APR_SUCCESS) {
311            apr_thread_mutex_unlock(queue->one_big_mutex);
312            return rv;
313        }
314    }
315
316    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
317    return rv;
318}
319
320/**
321 * Retrieves the next item from the queue. If there are no
322 * items available, return APR_EAGAIN.  Once retrieved,
323 * the item is placed into the address specified by 'data'.
324 */
325APU_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
326{
327    apr_status_t rv;
328
329    if (queue->terminated) {
330        return APR_EOF; /* no more elements ever again */
331    }
332
333    rv = apr_thread_mutex_lock(queue->one_big_mutex);
334    if (rv != APR_SUCCESS) {
335        return rv;
336    }
337
338    if (apr_queue_empty(queue)) {
339        rv = apr_thread_mutex_unlock(queue->one_big_mutex);
340        return APR_EAGAIN;
341    }
342
343    *data = queue->data[queue->out];
344    queue->nelts--;
345
346    queue->out++;
347    if (queue->out >= queue->bounds)
348        queue->out -= queue->bounds;
349    if (queue->full_waiters) {
350        Q_DBG("signal !full", queue);
351        rv = apr_thread_cond_signal(queue->not_full);
352        if (rv != APR_SUCCESS) {
353            apr_thread_mutex_unlock(queue->one_big_mutex);
354            return rv;
355        }
356    }
357
358    rv = apr_thread_mutex_unlock(queue->one_big_mutex);
359    return rv;
360}
361
362APU_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)
363{
364    apr_status_t rv;
365    Q_DBG("intr all", queue);
366    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
367        return rv;
368    }
369    apr_thread_cond_broadcast(queue->not_empty);
370    apr_thread_cond_broadcast(queue->not_full);
371
372    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
373        return rv;
374    }
375
376    return APR_SUCCESS;
377}
378
379APU_DECLARE(apr_status_t) apr_queue_term(apr_queue_t *queue)
380{
381    apr_status_t rv;
382
383    if ((rv = apr_thread_mutex_lock(queue->one_big_mutex)) != APR_SUCCESS) {
384        return rv;
385    }
386
387    /* we must hold one_big_mutex when setting this... otherwise,
388     * we could end up setting it and waking everybody up just after a
389     * would-be popper checks it but right before they block
390     */
391    queue->terminated = 1;
392    if ((rv = apr_thread_mutex_unlock(queue->one_big_mutex)) != APR_SUCCESS) {
393        return rv;
394    }
395    return apr_queue_interrupt_all(queue);
396}
397
398#endif /* APR_HAS_THREADS */
399