1/* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements.  See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License.  You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 *
17 ******************************************************************************
18 *
19 * This implementation is based on a design by John Brooks (IBM Pok) which uses
20 * the z/OS sockets async i/o facility.  When a
21 * socket is added to the pollset, an async poll is issued for that individual
22 * socket.  It specifies that the kernel should send an IPC message when the
23 * socket becomes ready.  The IPC messages are sent to a single message queue
24 * that is part of the pollset.  apr_pollset_poll waits on the arrival of IPC
25 * messages or the specified timeout.
26 *
27 * Since z/OS does not support async i/o for pipes or files at present, this
28 * implementation falls back to using ordinary poll() when
29 * APR_POLLSET_THREADSAFE is unset.
30 *
31 * Greg Ames
32 * April 2012
33 */
34
35#include "apr.h"
36#include "apr_hash.h"
37#include "apr_poll.h"
38#include "apr_time.h"
39#include "apr_portable.h"
40#include "apr_arch_inherit.h"
41#include "apr_arch_file_io.h"
42#include "apr_arch_networkio.h"
43#include "apr_arch_poll_private.h"
44
45#ifdef HAVE_AIO_MSGQ
46
47#include <sys/msg.h>  	/* msgget etc   */
48#include <time.h>     	/* timestruct   */
49#include <poll.h>     	/* pollfd       */
50#include <limits.h>     /* MAX_INT      */
51
52struct apr_pollset_private_t
53{
54    int             msg_q;              /* IPC message queue. The z/OS kernel sends messages
55                                         * to this queue when our async polls on individual
56                                         * file descriptors complete
57                                         */
58    apr_pollfd_t    *result_set;
59    apr_uint32_t    size;
60
61#if APR_HAS_THREADS
62    /* A thread mutex to protect operations on the rings and the hash */
63    apr_thread_mutex_t *ring_lock;
64#endif
65
66    /* A hash of all active elements used for O(1) _remove operations */
67    apr_hash_t      *elems;
68
69    APR_RING_HEAD(ready_ring_t,       asio_elem_t)      ready_ring;
70    APR_RING_HEAD(prior_ready_ring_t, asio_elem_t)      prior_ready_ring;
71    APR_RING_HEAD(free_ring_t,        asio_elem_t)      free_ring;
72
73    /* for pipes etc with no asio */
74    struct pollfd   *pollset;
75    apr_pollfd_t    *query_set;
76};
77
78typedef enum {
79    ASIO_INIT = 0,
80    ASIO_REMOVED,
81    ASIO_COMPLETE
82} asio_state_e;
83
84typedef struct asio_elem_t asio_elem_t;
85
86struct asio_msgbuf_t {
87    long         msg_type;       /* must be > 0 */
88    asio_elem_t *msg_elem;
89};
90
91struct asio_elem_t
92{
93    APR_RING_ENTRY(asio_elem_t) link;
94    apr_pollfd_t                pfd;
95    struct pollfd               os_pfd;
96    struct aiocb                a;
97    asio_state_e                state;
98    struct asio_msgbuf_t        msg;
99};
100
101#define DEBUG 0
102
103/* DEBUG settings: 0 - no debug messages at all,
104 *                 1 - should not occur messages,
105 *                 2 - apr_pollset_* entry and exit messages,
106 *                 3 - state changes, memory usage,
107 *                 4 - z/OS, APR, and internal calls,
108 *                 5 - everything else except the timer pop path,
109 *                 6 - everything, including the Event 1 sec timer pop path
110 *
111 *  each DEBUG level includes all messages produced by lower numbered levels
112 */
113
114#if DEBUG
115
116#include <assert.h>
117#include <unistd.h>	/* getpid       */
118
119#define DBG_BUFF char dbg_msg_buff[256];
120
121#define DBG_TEST(lvl) if (lvl <= DEBUG) {
122
123#define DBG_CORE(msg)               sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
124                                        " "  msg, getpid()),                   \
125                                    fprintf(stderr, "%s", dbg_msg_buff);
126#define DBG_CORE1(msg, var1)        sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
127                                        " " msg, getpid(), var1),              \
128                                    fprintf(stderr, "%s", dbg_msg_buff);
129#define DBG_CORE2(msg, var1, var2)  sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
130                                        " " msg, getpid(), var1, var2),        \
131                                    fprintf(stderr, "%s", dbg_msg_buff);
132#define DBG_CORE3(msg, var1, var2, var3)                                       \
133                                    sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
134                                        " " msg, getpid(), var1, var2, var3),  \
135                                    fprintf(stderr, "%s", dbg_msg_buff);
136#define DBG_CORE4(msg, var1, var2, var3, var4)                                 \
137                                    sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \
138                                        " " msg, getpid(), var1, var2, var3, var4),\
139                                    fprintf(stderr, "%s", dbg_msg_buff);
140
141#define DBG_END }
142
143#define DBG(lvl, msg)   DBG_TEST(lvl)   \
144                        DBG_CORE(msg)   \
145                        DBG_END
146
147#define DBG1(lvl, msg, var1)    DBG_TEST(lvl)           \
148                                DBG_CORE1(msg, var1)    \
149                                DBG_END
150
151#define DBG2(lvl, msg, var1, var2)      DBG_TEST(lvl)               \
152                                        DBG_CORE2(msg, var1, var2)  \
153                                        DBG_END
154
155#define DBG3(lvl, msg, var1, var2, var3)                        \
156                        DBG_TEST(lvl)                           \
157                        DBG_CORE3(msg, var1, var2, var3)        \
158                        DBG_END
159
160#define DBG4(lvl, msg, var1, var2, var3, var4)                  \
161                        DBG_TEST(lvl)                           \
162                        DBG_CORE4(msg, var1, var2, var3, var4)  \
163                        DBG_END
164
165#else  /* DEBUG is 0 */
166#define DBG_BUFF
167#define DBG(lvl, msg)                            ((void)0)
168#define DBG1(lvl, msg, var1)                     ((void)0)
169#define DBG2(lvl, msg, var1, var2)               ((void)0)
170#define DBG3(lvl, msg, var1, var2, var3)         ((void)0)
171#define DBG4(lvl, msg, var1, var2, var3, var4)   ((void)0)
172
173#endif /* DEBUG */
174
175static int asyncio(struct aiocb *a)
176{
177    DBG_BUFF
178    int rv;
179
180#ifdef _LP64
181#define AIO BPX4AIO
182#else
183#define AIO BPX1AIO
184#endif
185
186    AIO(sizeof(struct aiocb), a, &rv, &errno, __err2ad());
187    DBG2(4, "BPX4AIO aiocb %p rv %d\n",
188             a, rv);
189#ifdef DEBUG
190    if (rv < 0) {
191        DBG2(4, "errno %d errnojr %08x\n",
192                 errno, *__err2ad());
193    }
194#endif
195    return rv;
196}
197
198static apr_int16_t get_event(apr_int16_t event)
199{
200    DBG_BUFF
201    apr_int16_t rv = 0;
202    DBG(4, "entered\n");
203
204    if (event & APR_POLLIN)
205        rv |= POLLIN;
206    if (event & APR_POLLPRI)
207        rv |= POLLPRI;
208    if (event & APR_POLLOUT)
209        rv |= POLLOUT;
210    if (event & APR_POLLERR)
211        rv |= POLLERR;
212    if (event & APR_POLLHUP)
213        rv |= POLLHUP;
214    if (event & APR_POLLNVAL)
215        rv |= POLLNVAL;
216
217    DBG(4, "exiting\n");
218    return rv;
219}
220
221static apr_int16_t get_revent(apr_int16_t event)
222{
223    DBG_BUFF
224    apr_int16_t rv = 0;
225    DBG(4, "entered\n");
226
227    if (event & POLLIN)
228        rv |= APR_POLLIN;
229    if (event & POLLPRI)
230        rv |= APR_POLLPRI;
231    if (event & POLLOUT)
232        rv |= APR_POLLOUT;
233    if (event & POLLERR)
234        rv |= APR_POLLERR;
235    if (event & POLLHUP)
236        rv |= APR_POLLHUP;
237    if (event & POLLNVAL)
238        rv |= APR_POLLNVAL;
239
240    DBG(4, "exiting\n");
241    return rv;
242}
243
244static apr_status_t asio_pollset_cleanup(apr_pollset_t *pollset)
245{
246    DBG_BUFF
247    int rv;
248
249    DBG(4, "entered\n");
250    if (pollset->flags & APR_POLLSET_THREADSAFE) {
251        rv = msgctl(pollset->p->msg_q, IPC_RMID, NULL);
252        DBG1(4, "asio_pollset_cleanup: msgctl(IPC_RMID) returned %d\n", rv);
253    }
254
255    return rv;
256}
257
258static apr_status_t asio_pollset_create(apr_pollset_t *pollset,
259                                        apr_uint32_t size,
260                                        apr_pool_t *p,
261                                        apr_uint32_t flags)
262{
263    DBG_BUFF
264    apr_status_t rv;
265    apr_pollset_private_t *priv;
266
267    DBG1(2, "entered, flags: %x\n", flags);
268
269    priv = pollset->p = apr_pcalloc(p, sizeof(*priv));
270
271    if (flags & APR_POLLSET_THREADSAFE) {
272#if APR_HAS_THREADS
273        if ((rv = apr_thread_mutex_create(&(priv->ring_lock),
274                                           APR_THREAD_MUTEX_DEFAULT,
275                                           p)) != APR_SUCCESS) {
276            DBG1(1, "apr_thread_mutex_create returned %d\n", rv);
277            pollset->p = NULL;
278            return rv;
279        }
280        rv = msgget(IPC_PRIVATE, S_IWUSR+S_IRUSR); /* user r/w perms */
281        if (rv < 0) {
282#if DEBUG
283            perror(__FUNCTION__ " msgget returned < 0 ");
284#endif
285            pollset->p = NULL;
286            return rv;
287        }
288
289        DBG2(4, "pollset %p msgget was OK, rv=%d\n", pollset, rv);
290        priv->msg_q = rv;
291        priv->elems   = apr_hash_make(p);
292
293        APR_RING_INIT(&priv->free_ring, asio_elem_t, link);
294        APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
295
296#else  /* APR doesn't have threads but caller wants a threadsafe pollset */
297        pollset->p = NULL;
298        return APR_ENOTIMPL;
299#endif
300
301    } else {  /* APR_POLLSET_THREADSAFE not set, i.e. no async i/o,
302               * init fields only needed in old style pollset
303               */
304
305        priv->pollset = apr_palloc(p, size * sizeof(struct pollfd));
306        priv->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
307
308        if ((!priv->pollset) || (!priv->query_set)) {
309            pollset->p = NULL;
310            return APR_ENOMEM;
311        }
312    }
313
314    pollset->nelts   = 0;
315    pollset->flags   = flags;
316    pollset->pool    = p;
317    priv->size    = size;
318    priv->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
319    if (!priv->result_set) {
320        if (flags & APR_POLLSET_THREADSAFE) {
321            msgctl(priv->msg_q, IPC_RMID, NULL);
322        }
323        pollset->p = NULL;
324        return APR_ENOMEM;
325    }
326
327    DBG2(2, "exiting, pollset: %p, type: %s\n",
328             pollset,
329             flags & APR_POLLSET_THREADSAFE ? "async" : "POSIX");
330
331
332    return APR_SUCCESS;
333
334} /* end of asio_pollset_create */
335
336static apr_status_t posix_add(apr_pollset_t      *pollset,
337                              const apr_pollfd_t *descriptor)
338{
339    DBG_BUFF
340    int fd;
341    apr_pool_t  *p = pollset->pool;
342    apr_pollset_private_t *priv = pollset->p;
343
344    DBG(4, "entered\n");
345
346    if (pollset->nelts == priv->size) {
347        return APR_ENOMEM;
348    }
349
350    priv->query_set[pollset->nelts] = *descriptor;
351    if (descriptor->desc_type == APR_POLL_SOCKET) {
352        fd = descriptor->desc.s->socketdes;
353    }
354    else {
355        fd = descriptor->desc.f->filedes;
356    }
357
358    priv->pollset[pollset->nelts].fd = fd;
359
360    priv->pollset[pollset->nelts].events =
361        get_event(descriptor->reqevents);
362
363    pollset->nelts++;
364
365    DBG2(4, "exiting, fd %d added to pollset %p\n", fd, pollset);
366
367    return APR_SUCCESS;
368}   /* end of posix_add */
369
370
371static apr_status_t asio_pollset_add(apr_pollset_t *pollset,
372                                     const apr_pollfd_t *descriptor)
373{
374    DBG_BUFF
375    asio_elem_t *elem;
376    apr_status_t rv = APR_SUCCESS;
377    apr_pollset_private_t *priv = pollset->p;
378
379    pollset_lock_rings();
380    DBG(2, "entered\n");
381
382    if (pollset->flags & APR_POLLSET_THREADSAFE) {
383
384        if (!APR_RING_EMPTY(&(priv->free_ring), asio_elem_t, link)) {
385            elem = APR_RING_FIRST(&(priv->free_ring));
386            APR_RING_REMOVE(elem, link);
387            DBG1(3, "used recycled memory at %08p\n", elem);
388            elem->state = ASIO_INIT;
389            elem->a.aio_cflags = 0;
390        }
391        else {
392            elem = (asio_elem_t *) apr_pcalloc(pollset->pool, sizeof(asio_elem_t));
393            DBG1(3, "alloced new memory at %08p\n", elem);
394
395            elem->a.aio_notifytype = AIO_MSGQ;
396            elem->a.aio_msgev_qid  = priv->msg_q;
397            DBG1(5, "aio_msgev_quid = %d \n", elem->a.aio_msgev_qid);
398            elem->a.aio_msgev_size = sizeof(asio_elem_t *);
399            elem->a.aio_msgev_flag = 0;     /* wait if queue is full */
400            elem->a.aio_msgev_addr = &(elem->msg);
401            elem->a.aio_buf        = &(elem->os_pfd);
402            elem->a.aio_nbytes     = 1;     /* number of pfds to poll */
403            elem->msg.msg_type     = 1;
404            elem->msg.msg_elem     = elem;
405        }
406
407        /* z/OS only supports async I/O for sockets for now */
408        elem->os_pfd.fd = descriptor->desc.s->socketdes;
409
410        APR_RING_ELEM_INIT(elem, link);
411        elem->a.aio_cmd       = AIO_SELPOLL;
412        elem->a.aio_cflags    &= ~AIO_OK2COMPIMD; /* not OK to complete inline*/
413        elem->pfd             = *descriptor;
414        elem->os_pfd.events   = get_event(descriptor->reqevents);
415
416        if (0 != asyncio(&elem->a)) {
417            rv = errno;
418            DBG3(4, "pollset %p asio failed fd %d, errno %p\n",
419                     pollset, elem->os_pfd.fd, rv);
420#if DEBUG
421            perror(__FUNCTION__ " asio failure");
422#endif
423        }
424        else {
425            DBG2(4, "good asio call, adding fd %d to pollset %p\n",
426                     elem->os_pfd.fd, pollset);
427
428            pollset->nelts++;
429            apr_hash_set(priv->elems, &(elem->os_pfd.fd), sizeof(int), elem);
430        }
431    }
432    else {
433        /* APR_POLLSET_THREADSAFE isn't set.  use POSIX poll in case
434         * pipes or files are used with this pollset
435         */
436
437        rv = posix_add(pollset, descriptor);
438    }
439
440    DBG1(2, "exiting, rv = %d\n", rv);
441
442    pollset_unlock_rings();
443    return rv;
444} /* end of asio_pollset_add */
445
446static posix_remove(apr_pollset_t *pollset, const apr_pollfd_t *descriptor)
447{
448    DBG_BUFF
449    apr_uint32_t i;
450    apr_pollset_private_t *priv = pollset->p;
451
452    DBG(4, "entered\n");
453    for (i = 0; i < pollset->nelts; i++) {
454        if (descriptor->desc.s == priv->query_set[i].desc.s) {
455            /* Found an instance of the fd: remove this and any other copies */
456            apr_uint32_t dst = i;
457            apr_uint32_t old_nelts = pollset->nelts;
458            pollset->nelts--;
459            for (i++; i < old_nelts; i++) {
460                if (descriptor->desc.s == priv->query_set[i].desc.s) {
461                    pollset->nelts--;
462                }
463                else {
464                    priv->pollset[dst] = priv->pollset[i];
465                    priv->query_set[dst] = priv->query_set[i];
466                    dst++;
467                }
468            }
469            DBG(4, "returning OK\n");
470            return APR_SUCCESS;
471        }
472    }
473
474    DBG(1, "returning APR_NOTFOUND\n");
475    return APR_NOTFOUND;
476
477}   /* end of posix_remove */
478
479static apr_status_t asio_pollset_remove(apr_pollset_t *pollset,
480                                        const apr_pollfd_t *descriptor)
481{
482    DBG_BUFF
483    asio_elem_t *elem;
484    apr_status_t rv = APR_SUCCESS;
485    apr_pollset_private_t *priv = pollset->p;
486    /* AIO_CANCEL is synchronous, so autodata works fine.  */
487    struct aiocb cancel_a = {0};
488
489    int fd;
490
491    DBG(2, "entered\n");
492
493    if (!(pollset->flags & APR_POLLSET_THREADSAFE)) {
494        return posix_remove(pollset, descriptor);
495    }
496
497    pollset_lock_rings();
498
499#if DEBUG
500    assert(descriptor->desc_type == APR_POLL_SOCKET);
501#endif
502    /* zOS 1.12 doesn't support files for async i/o */
503    fd = descriptor->desc.s->socketdes;
504
505    elem = apr_hash_get(priv->elems, &(fd), sizeof(int));
506    if (elem == NULL) {
507        DBG1(1, "couldn't find fd %d\n", fd);
508        rv = APR_NOTFOUND;
509    } else {
510        DBG1(5, "hash found fd %d\n", fd);
511        /* delete this fd from the hash */
512        apr_hash_set(priv->elems, &(fd), sizeof(int), NULL);
513
514        if (elem->state == ASIO_INIT) {
515            /* asyncio call to cancel */
516            cancel_a.aio_cmd = AIO_CANCEL;
517            cancel_a.aio_buf = &elem->a;   /* point to original aiocb */
518
519            cancel_a.aio_cflags  = 0;
520            cancel_a.aio_cflags2 = 0;
521
522            /* we want the original aiocb to show up on the pollset message queue
523             * before recycling its memory to eliminate race conditions
524             */
525
526            rv = asyncio(&cancel_a);
527            DBG1(4, "asyncio returned %d\n", rv);
528
529#if DEBUG
530            assert(rv == 1);
531#endif
532        }
533        elem->state = ASIO_REMOVED;
534        rv = APR_SUCCESS;
535    }
536
537    DBG1(2, "exiting, rv: %d\n", rv);
538
539    pollset_unlock_rings();
540
541    return rv;
542}   /* end of asio_pollset_remove */
543
544static posix_poll(apr_pollset_t *pollset,
545                  apr_interval_time_t timeout,
546                  apr_int32_t *num,
547                  const apr_pollfd_t **descriptors)
548{
549    DBG_BUFF
550    int rv;
551    apr_uint32_t i, j;
552    apr_pollset_private_t *priv = pollset->p;
553
554    DBG(4, "entered\n");
555
556    if (timeout > 0) {
557        timeout /= 1000;
558    }
559    rv = poll(priv->pollset, pollset->nelts, timeout);
560    (*num) = rv;
561    if (rv < 0) {
562        return apr_get_netos_error();
563    }
564    if (rv == 0) {
565        return APR_TIMEUP;
566    }
567    j = 0;
568    for (i = 0; i < pollset->nelts; i++) {
569        if (priv->pollset[i].revents != 0) {
570            priv->result_set[j] = priv->query_set[i];
571            priv->result_set[j].rtnevents =
572                get_revent(priv->pollset[i].revents);
573            j++;
574        }
575    }
576    if (descriptors)
577        *descriptors = priv->result_set;
578
579    DBG(4, "exiting ok\n");
580    return APR_SUCCESS;
581
582}   /* end of posix_poll */
583
584static process_msg(apr_pollset_t *pollset, struct asio_msgbuf_t *msg)
585{
586    DBG_BUFF
587    asio_elem_t *elem = msg->msg_elem;
588
589    switch(elem->state) {
590    case ASIO_REMOVED:
591        DBG2(5, "for cancelled elem, recycling memory - elem %08p, fd %d\n",
592                elem, elem->os_pfd.fd);
593        APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem,
594                             asio_elem_t, link);
595        break;
596    case ASIO_INIT:
597        DBG2(4, "adding to ready ring: elem %08p, fd %d\n",
598                elem, elem->os_pfd.fd);
599        elem->state = ASIO_COMPLETE;
600        APR_RING_INSERT_TAIL(&(pollset->p->ready_ring), elem,
601                             asio_elem_t, link);
602        break;
603    default:
604        DBG3(1, "unexpected state: elem %08p, fd %d, state %d\n",
605            elem, elem->os_pfd.fd, elem->state);
606#if DEBUG
607        assert(0);
608#endif
609    }
610}
611
612static apr_status_t asio_pollset_poll(apr_pollset_t *pollset,
613                                      apr_interval_time_t timeout,
614                                      apr_int32_t *num,
615                                      const apr_pollfd_t **descriptors)
616{
617    DBG_BUFF
618    int i, ret;
619    asio_elem_t *elem, *next_elem;
620    struct asio_msgbuf_t msg_buff;
621    struct timespec tv;
622    apr_status_t rv = APR_SUCCESS;
623    apr_pollset_private_t *priv = pollset->p;
624
625    DBG(6, "entered\n"); /* chatty - traces every second w/Event */
626
627    if ((pollset->flags & APR_POLLSET_THREADSAFE) == 0 ) {
628        return posix_poll(pollset, timeout, num, descriptors);
629    }
630
631    pollset_lock_rings();
632    APR_RING_INIT(&(priv->ready_ring), asio_elem_t, link);
633
634    while (!APR_RING_EMPTY(&(priv->prior_ready_ring), asio_elem_t, link)) {
635        elem = APR_RING_FIRST(&(priv->prior_ready_ring));
636        DBG3(5, "pollset %p elem %p fd %d on prior ready ring\n",
637                pollset,
638                elem,
639                elem->os_pfd.fd);
640
641        APR_RING_REMOVE(elem, link);
642
643        /*
644         * since USS does not remember what's in our pollset, we have
645         * to re-add fds which have not been apr_pollset_remove'd
646         *
647         * there may have been too many ready fd's to return in the
648         * result set last time. re-poll inline for both cases
649         */
650
651        if (elem->state == ASIO_REMOVED) {
652
653            /*
654             * async i/o is done since it was found on prior_ready
655             * the state says the caller is done with it too
656             * so recycle the elem
657             */
658
659            APR_RING_INSERT_TAIL(&(priv->free_ring), elem,
660                                 asio_elem_t, link);
661            continue;  /* do not re-add if it has been _removed */
662        }
663
664        elem->state = ASIO_INIT;
665        elem->a.aio_cflags     = AIO_OK2COMPIMD;
666
667        if (0 != (ret = asyncio(&elem->a))) {
668            if (ret == 1) {
669                DBG(4, "asyncio() completed inline\n");
670                /* it's ready now */
671                elem->state = ASIO_COMPLETE;
672                APR_RING_INSERT_TAIL(&(priv->ready_ring), elem, asio_elem_t,
673                                     link);
674            }
675            else {
676                DBG2(1, "asyncio() failed, ret: %d, errno: %d\n",
677                        ret, errno);
678                pollset_unlock_rings();
679                return errno;
680            }
681        }
682        DBG1(4, "asyncio() completed rc %d\n", ret);
683    }
684
685    DBG(6, "after prior ready loop\n"); /* chatty w/timeouts, hence 6 */
686
687    /* Gather async poll completions that have occurred since the last call */
688    while (0 < msgrcv(priv->msg_q, &msg_buff, sizeof(asio_elem_t *), 0,
689                      IPC_NOWAIT)) {
690        process_msg(pollset, &msg_buff);
691    }
692
693    /* Suspend if nothing is ready yet. */
694    if (APR_RING_EMPTY(&(priv->ready_ring), asio_elem_t, link)) {
695
696        if (timeout >= 0) {
697            tv.tv_sec  = apr_time_sec(timeout);
698            tv.tv_nsec = apr_time_usec(timeout) * 1000;
699        } else {
700            tv.tv_sec = INT_MAX;  /* block until something is ready */
701        }
702
703        DBG2(6, "nothing on the ready ring "
704                "- blocking for %d seconds %d ns\n",
705                tv.tv_sec, tv.tv_nsec);
706
707        pollset_unlock_rings();   /* allow other apr_pollset_* calls while blocked */
708
709        if (0 >= (ret = __msgrcv_timed(priv->msg_q, &msg_buff,
710                                       sizeof(asio_elem_t *), 0, NULL, &tv))) {
711#if DEBUG
712            if (errno == EAGAIN) {
713                DBG(6, "__msgrcv_timed timed out\n"); /* timeout path, so 6 */
714            }
715            else {
716                DBG(1, "__msgrcv_timed failed!\n");
717            }
718#endif
719            return (errno == EAGAIN) ? APR_TIMEUP : errno;
720        }
721
722        pollset_lock_rings();
723
724        process_msg(pollset, &msg_buff);
725    }
726
727    APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link);
728
729    (*num) = 0;
730    elem = APR_RING_FIRST(&(priv->ready_ring));
731
732    for (i = 0;
733
734        i < priv->size
735                && elem != APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link);
736        i++) {
737             DBG2(5, "ready ring: elem %08p, fd %d\n", elem, elem->os_pfd.fd);
738
739             priv->result_set[i] = elem->pfd;
740             priv->result_set[i].rtnevents
741                                    = get_revent(elem->os_pfd.revents);
742             (*num)++;
743
744             elem = APR_RING_NEXT(elem, link);
745
746#if DEBUG
747             if (elem == APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link)) {
748                 DBG(5, "end of ready ring reached\n");
749             }
750#endif
751    }
752
753    if (descriptors) {
754        *descriptors = priv->result_set;
755    }
756
757    /* if the result size is too small, remember which descriptors
758     * haven't had results reported yet.  we will look
759     * at these descriptors on the next apr_pollset_poll call
760     */
761
762    APR_RING_CONCAT(&priv->prior_ready_ring, &(priv->ready_ring), asio_elem_t, link);
763
764    DBG1(2, "exiting, rv = %d\n", rv);
765
766    pollset_unlock_rings();
767
768    return rv;
769}  /* end of asio_pollset_poll */
770
771static const apr_pollset_provider_t impl = {
772    asio_pollset_create,
773    asio_pollset_add,
774    asio_pollset_remove,
775    asio_pollset_poll,
776    asio_pollset_cleanup,
777    "asio"
778};
779
780const apr_pollset_provider_t *apr_pollset_provider_aio_msgq = &impl;
781
782#endif /* HAVE_AIO_MSGQ */
783