1266733Speter/* Licensed to the Apache Software Foundation (ASF) under one or more 2266733Speter * contributor license agreements. See the NOTICE file distributed with 3266733Speter * this work for additional information regarding copyright ownership. 4266733Speter * The ASF licenses this file to You under the Apache License, Version 2.0 5266733Speter * (the "License"); you may not use this file except in compliance with 6266733Speter * the License. You may obtain a copy of the License at 7266733Speter * 8266733Speter * http://www.apache.org/licenses/LICENSE-2.0 9266733Speter * 10266733Speter * Unless required by applicable law or agreed to in writing, software 11266733Speter * distributed under the License is distributed on an "AS IS" BASIS, 12266733Speter * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13266733Speter * See the License for the specific language governing permissions and 14266733Speter * limitations under the License. 15266733Speter * 16266733Speter * 17266733Speter ****************************************************************************** 18266733Speter * 19266733Speter * This implementation is based on a design by John Brooks (IBM Pok) which uses 20266733Speter * the z/OS sockets async i/o facility. When a 21266733Speter * socket is added to the pollset, an async poll is issued for that individual 22266733Speter * socket. It specifies that the kernel should send an IPC message when the 23266733Speter * socket becomes ready. The IPC messages are sent to a single message queue 24266733Speter * that is part of the pollset. apr_pollset_poll waits on the arrival of IPC 25266733Speter * messages or the specified timeout. 26266733Speter * 27266733Speter * Since z/OS does not support async i/o for pipes or files at present, this 28266733Speter * implementation falls back to using ordinary poll() when 29266733Speter * APR_POLLSET_THREADSAFE is unset. 30266733Speter * 31266733Speter * Greg Ames 32266733Speter * April 2012 33266733Speter */ 34266733Speter 35266733Speter#include "apr.h" 36266733Speter#include "apr_hash.h" 37266733Speter#include "apr_poll.h" 38266733Speter#include "apr_time.h" 39266733Speter#include "apr_portable.h" 40266733Speter#include "apr_arch_inherit.h" 41266733Speter#include "apr_arch_file_io.h" 42266733Speter#include "apr_arch_networkio.h" 43266733Speter#include "apr_arch_poll_private.h" 44266733Speter 45266733Speter#ifdef HAVE_AIO_MSGQ 46266733Speter 47266733Speter#include <sys/msg.h> /* msgget etc */ 48266733Speter#include <time.h> /* timestruct */ 49266733Speter#include <poll.h> /* pollfd */ 50266733Speter#include <limits.h> /* MAX_INT */ 51266733Speter 52266733Speterstruct apr_pollset_private_t 53266733Speter{ 54266733Speter int msg_q; /* IPC message queue. The z/OS kernel sends messages 55266733Speter * to this queue when our async polls on individual 56266733Speter * file descriptors complete 57266733Speter */ 58266733Speter apr_pollfd_t *result_set; 59266733Speter apr_uint32_t size; 60266733Speter 61266733Speter#if APR_HAS_THREADS 62266733Speter /* A thread mutex to protect operations on the rings and the hash */ 63266733Speter apr_thread_mutex_t *ring_lock; 64266733Speter#endif 65266733Speter 66266733Speter /* A hash of all active elements used for O(1) _remove operations */ 67266733Speter apr_hash_t *elems; 68266733Speter 69266733Speter APR_RING_HEAD(ready_ring_t, asio_elem_t) ready_ring; 70266733Speter APR_RING_HEAD(prior_ready_ring_t, asio_elem_t) prior_ready_ring; 71266733Speter APR_RING_HEAD(free_ring_t, asio_elem_t) free_ring; 72266733Speter 73266733Speter /* for pipes etc with no asio */ 74266733Speter struct pollfd *pollset; 75266733Speter apr_pollfd_t *query_set; 76266733Speter}; 77266733Speter 78266733Spetertypedef enum { 79266733Speter ASIO_INIT = 0, 80266733Speter ASIO_REMOVED, 81266733Speter ASIO_COMPLETE 82266733Speter} asio_state_e; 83266733Speter 84266733Spetertypedef struct asio_elem_t asio_elem_t; 85266733Speter 86266733Speterstruct asio_msgbuf_t { 87266733Speter long msg_type; /* must be > 0 */ 88266733Speter asio_elem_t *msg_elem; 89266733Speter}; 90266733Speter 91266733Speterstruct asio_elem_t 92266733Speter{ 93266733Speter APR_RING_ENTRY(asio_elem_t) link; 94266733Speter apr_pollfd_t pfd; 95266733Speter struct pollfd os_pfd; 96266733Speter struct aiocb a; 97266733Speter asio_state_e state; 98266733Speter struct asio_msgbuf_t msg; 99266733Speter}; 100266733Speter 101266733Speter#define DEBUG 0 102266733Speter 103266733Speter/* DEBUG settings: 0 - no debug messages at all, 104266733Speter * 1 - should not occur messages, 105266733Speter * 2 - apr_pollset_* entry and exit messages, 106266733Speter * 3 - state changes, memory usage, 107266733Speter * 4 - z/OS, APR, and internal calls, 108266733Speter * 5 - everything else except the timer pop path, 109266733Speter * 6 - everything, including the Event 1 sec timer pop path 110266733Speter * 111266733Speter * each DEBUG level includes all messages produced by lower numbered levels 112266733Speter */ 113266733Speter 114266733Speter#if DEBUG 115266733Speter 116266733Speter#include <assert.h> 117266733Speter#include <unistd.h> /* getpid */ 118266733Speter 119266733Speter#define DBG_BUFF char dbg_msg_buff[256]; 120266733Speter 121266733Speter#define DBG_TEST(lvl) if (lvl <= DEBUG) { 122266733Speter 123266733Speter#define DBG_CORE(msg) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 124266733Speter " " msg, getpid()), \ 125266733Speter fprintf(stderr, "%s", dbg_msg_buff); 126266733Speter#define DBG_CORE1(msg, var1) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 127266733Speter " " msg, getpid(), var1), \ 128266733Speter fprintf(stderr, "%s", dbg_msg_buff); 129266733Speter#define DBG_CORE2(msg, var1, var2) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 130266733Speter " " msg, getpid(), var1, var2), \ 131266733Speter fprintf(stderr, "%s", dbg_msg_buff); 132266733Speter#define DBG_CORE3(msg, var1, var2, var3) \ 133266733Speter sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 134266733Speter " " msg, getpid(), var1, var2, var3), \ 135266733Speter fprintf(stderr, "%s", dbg_msg_buff); 136266733Speter#define DBG_CORE4(msg, var1, var2, var3, var4) \ 137266733Speter sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 138266733Speter " " msg, getpid(), var1, var2, var3, var4),\ 139266733Speter fprintf(stderr, "%s", dbg_msg_buff); 140266733Speter 141266733Speter#define DBG_END } 142266733Speter 143266733Speter#define DBG(lvl, msg) DBG_TEST(lvl) \ 144266733Speter DBG_CORE(msg) \ 145266733Speter DBG_END 146266733Speter 147266733Speter#define DBG1(lvl, msg, var1) DBG_TEST(lvl) \ 148266733Speter DBG_CORE1(msg, var1) \ 149266733Speter DBG_END 150266733Speter 151266733Speter#define DBG2(lvl, msg, var1, var2) DBG_TEST(lvl) \ 152266733Speter DBG_CORE2(msg, var1, var2) \ 153266733Speter DBG_END 154266733Speter 155266733Speter#define DBG3(lvl, msg, var1, var2, var3) \ 156266733Speter DBG_TEST(lvl) \ 157266733Speter DBG_CORE3(msg, var1, var2, var3) \ 158266733Speter DBG_END 159266733Speter 160266733Speter#define DBG4(lvl, msg, var1, var2, var3, var4) \ 161266733Speter DBG_TEST(lvl) \ 162266733Speter DBG_CORE4(msg, var1, var2, var3, var4) \ 163266733Speter DBG_END 164266733Speter 165266733Speter#else /* DEBUG is 0 */ 166266733Speter#define DBG_BUFF 167266733Speter#define DBG(lvl, msg) ((void)0) 168266733Speter#define DBG1(lvl, msg, var1) ((void)0) 169266733Speter#define DBG2(lvl, msg, var1, var2) ((void)0) 170266733Speter#define DBG3(lvl, msg, var1, var2, var3) ((void)0) 171266733Speter#define DBG4(lvl, msg, var1, var2, var3, var4) ((void)0) 172266733Speter 173266733Speter#endif /* DEBUG */ 174266733Speter 175266733Speterstatic int asyncio(struct aiocb *a) 176266733Speter{ 177266733Speter DBG_BUFF 178266733Speter int rv; 179266733Speter 180266733Speter#ifdef _LP64 181266733Speter#define AIO BPX4AIO 182266733Speter#else 183266733Speter#define AIO BPX1AIO 184266733Speter#endif 185266733Speter 186266733Speter AIO(sizeof(struct aiocb), a, &rv, &errno, __err2ad()); 187266733Speter DBG2(4, "BPX4AIO aiocb %p rv %d\n", 188266733Speter a, rv); 189266733Speter#ifdef DEBUG 190266733Speter if (rv < 0) { 191266733Speter DBG2(4, "errno %d errnojr %08x\n", 192266733Speter errno, *__err2ad()); 193266733Speter } 194266733Speter#endif 195266733Speter return rv; 196266733Speter} 197266733Speter 198266733Speterstatic apr_int16_t get_event(apr_int16_t event) 199266733Speter{ 200266733Speter DBG_BUFF 201266733Speter apr_int16_t rv = 0; 202266733Speter DBG(4, "entered\n"); 203266733Speter 204266733Speter if (event & APR_POLLIN) 205266733Speter rv |= POLLIN; 206266733Speter if (event & APR_POLLPRI) 207266733Speter rv |= POLLPRI; 208266733Speter if (event & APR_POLLOUT) 209266733Speter rv |= POLLOUT; 210266733Speter if (event & APR_POLLERR) 211266733Speter rv |= POLLERR; 212266733Speter if (event & APR_POLLHUP) 213266733Speter rv |= POLLHUP; 214266733Speter if (event & APR_POLLNVAL) 215266733Speter rv |= POLLNVAL; 216266733Speter 217266733Speter DBG(4, "exiting\n"); 218266733Speter return rv; 219266733Speter} 220266733Speter 221266733Speterstatic apr_int16_t get_revent(apr_int16_t event) 222266733Speter{ 223266733Speter DBG_BUFF 224266733Speter apr_int16_t rv = 0; 225266733Speter DBG(4, "entered\n"); 226266733Speter 227266733Speter if (event & POLLIN) 228266733Speter rv |= APR_POLLIN; 229266733Speter if (event & POLLPRI) 230266733Speter rv |= APR_POLLPRI; 231266733Speter if (event & POLLOUT) 232266733Speter rv |= APR_POLLOUT; 233266733Speter if (event & POLLERR) 234266733Speter rv |= APR_POLLERR; 235266733Speter if (event & POLLHUP) 236266733Speter rv |= APR_POLLHUP; 237266733Speter if (event & POLLNVAL) 238266733Speter rv |= APR_POLLNVAL; 239266733Speter 240266733Speter DBG(4, "exiting\n"); 241266733Speter return rv; 242266733Speter} 243266733Speter 244266733Speterstatic apr_status_t asio_pollset_cleanup(apr_pollset_t *pollset) 245266733Speter{ 246266733Speter DBG_BUFF 247266733Speter int rv; 248266733Speter 249266733Speter DBG(4, "entered\n"); 250362181Sdim if (pollset->flags & APR_POLLSET_THREADSAFE) { 251362181Sdim rv = msgctl(pollset->p->msg_q, IPC_RMID, NULL); 252362181Sdim DBG1(4, "asio_pollset_cleanup: msgctl(IPC_RMID) returned %d\n", rv); 253362181Sdim } 254266733Speter 255266733Speter return rv; 256266733Speter} 257266733Speter 258266733Speterstatic apr_status_t asio_pollset_create(apr_pollset_t *pollset, 259266733Speter apr_uint32_t size, 260266733Speter apr_pool_t *p, 261266733Speter apr_uint32_t flags) 262266733Speter{ 263266733Speter DBG_BUFF 264266733Speter apr_status_t rv; 265266733Speter apr_pollset_private_t *priv; 266266733Speter 267266733Speter DBG1(2, "entered, flags: %x\n", flags); 268266733Speter 269362181Sdim priv = pollset->p = apr_pcalloc(p, sizeof(*priv)); 270266733Speter 271266733Speter if (flags & APR_POLLSET_THREADSAFE) { 272266733Speter#if APR_HAS_THREADS 273362181Sdim if ((rv = apr_thread_mutex_create(&(priv->ring_lock), 274266733Speter APR_THREAD_MUTEX_DEFAULT, 275362181Sdim p)) != APR_SUCCESS) { 276266733Speter DBG1(1, "apr_thread_mutex_create returned %d\n", rv); 277286503Speter pollset->p = NULL; 278266733Speter return rv; 279266733Speter } 280266733Speter rv = msgget(IPC_PRIVATE, S_IWUSR+S_IRUSR); /* user r/w perms */ 281266733Speter if (rv < 0) { 282266733Speter#if DEBUG 283266733Speter perror(__FUNCTION__ " msgget returned < 0 "); 284266733Speter#endif 285286503Speter pollset->p = NULL; 286266733Speter return rv; 287266733Speter } 288266733Speter 289266733Speter DBG2(4, "pollset %p msgget was OK, rv=%d\n", pollset, rv); 290266733Speter priv->msg_q = rv; 291266733Speter priv->elems = apr_hash_make(p); 292266733Speter 293266733Speter APR_RING_INIT(&priv->free_ring, asio_elem_t, link); 294266733Speter APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link); 295266733Speter 296266733Speter#else /* APR doesn't have threads but caller wants a threadsafe pollset */ 297286503Speter pollset->p = NULL; 298266733Speter return APR_ENOTIMPL; 299266733Speter#endif 300266733Speter 301266733Speter } else { /* APR_POLLSET_THREADSAFE not set, i.e. no async i/o, 302266733Speter * init fields only needed in old style pollset 303266733Speter */ 304266733Speter 305266733Speter priv->pollset = apr_palloc(p, size * sizeof(struct pollfd)); 306266733Speter priv->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 307266733Speter 308266733Speter if ((!priv->pollset) || (!priv->query_set)) { 309286503Speter pollset->p = NULL; 310266733Speter return APR_ENOMEM; 311266733Speter } 312266733Speter } 313266733Speter 314266733Speter pollset->nelts = 0; 315266733Speter pollset->flags = flags; 316266733Speter pollset->pool = p; 317266733Speter priv->size = size; 318266733Speter priv->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 319266733Speter if (!priv->result_set) { 320286503Speter if (flags & APR_POLLSET_THREADSAFE) { 321286503Speter msgctl(priv->msg_q, IPC_RMID, NULL); 322286503Speter } 323286503Speter pollset->p = NULL; 324266733Speter return APR_ENOMEM; 325266733Speter } 326266733Speter 327266733Speter DBG2(2, "exiting, pollset: %p, type: %s\n", 328266733Speter pollset, 329266733Speter flags & APR_POLLSET_THREADSAFE ? "async" : "POSIX"); 330266733Speter 331266733Speter 332266733Speter return APR_SUCCESS; 333266733Speter 334266733Speter} /* end of asio_pollset_create */ 335266733Speter 336266733Speterstatic apr_status_t posix_add(apr_pollset_t *pollset, 337266733Speter const apr_pollfd_t *descriptor) 338266733Speter{ 339266733Speter DBG_BUFF 340266733Speter int fd; 341266733Speter apr_pool_t *p = pollset->pool; 342266733Speter apr_pollset_private_t *priv = pollset->p; 343266733Speter 344266733Speter DBG(4, "entered\n"); 345266733Speter 346266733Speter if (pollset->nelts == priv->size) { 347266733Speter return APR_ENOMEM; 348266733Speter } 349266733Speter 350266733Speter priv->query_set[pollset->nelts] = *descriptor; 351266733Speter if (descriptor->desc_type == APR_POLL_SOCKET) { 352266733Speter fd = descriptor->desc.s->socketdes; 353266733Speter } 354266733Speter else { 355266733Speter fd = descriptor->desc.f->filedes; 356266733Speter } 357266733Speter 358266733Speter priv->pollset[pollset->nelts].fd = fd; 359266733Speter 360266733Speter priv->pollset[pollset->nelts].events = 361266733Speter get_event(descriptor->reqevents); 362266733Speter 363266733Speter pollset->nelts++; 364266733Speter 365266733Speter DBG2(4, "exiting, fd %d added to pollset %p\n", fd, pollset); 366266733Speter 367266733Speter return APR_SUCCESS; 368266733Speter} /* end of posix_add */ 369266733Speter 370266733Speter 371266733Speterstatic apr_status_t asio_pollset_add(apr_pollset_t *pollset, 372266733Speter const apr_pollfd_t *descriptor) 373266733Speter{ 374266733Speter DBG_BUFF 375266733Speter asio_elem_t *elem; 376266733Speter apr_status_t rv = APR_SUCCESS; 377266733Speter apr_pollset_private_t *priv = pollset->p; 378266733Speter 379266733Speter pollset_lock_rings(); 380266733Speter DBG(2, "entered\n"); 381266733Speter 382266733Speter if (pollset->flags & APR_POLLSET_THREADSAFE) { 383266733Speter 384266733Speter if (!APR_RING_EMPTY(&(priv->free_ring), asio_elem_t, link)) { 385266733Speter elem = APR_RING_FIRST(&(priv->free_ring)); 386266733Speter APR_RING_REMOVE(elem, link); 387266733Speter DBG1(3, "used recycled memory at %08p\n", elem); 388266733Speter elem->state = ASIO_INIT; 389286503Speter elem->a.aio_cflags = 0; 390266733Speter } 391266733Speter else { 392266733Speter elem = (asio_elem_t *) apr_pcalloc(pollset->pool, sizeof(asio_elem_t)); 393266733Speter DBG1(3, "alloced new memory at %08p\n", elem); 394266733Speter 395266733Speter elem->a.aio_notifytype = AIO_MSGQ; 396266733Speter elem->a.aio_msgev_qid = priv->msg_q; 397266733Speter DBG1(5, "aio_msgev_quid = %d \n", elem->a.aio_msgev_qid); 398266733Speter elem->a.aio_msgev_size = sizeof(asio_elem_t *); 399266733Speter elem->a.aio_msgev_flag = 0; /* wait if queue is full */ 400266733Speter elem->a.aio_msgev_addr = &(elem->msg); 401266733Speter elem->a.aio_buf = &(elem->os_pfd); 402266733Speter elem->a.aio_nbytes = 1; /* number of pfds to poll */ 403266733Speter elem->msg.msg_type = 1; 404266733Speter elem->msg.msg_elem = elem; 405266733Speter } 406266733Speter 407266733Speter /* z/OS only supports async I/O for sockets for now */ 408266733Speter elem->os_pfd.fd = descriptor->desc.s->socketdes; 409266733Speter 410266733Speter APR_RING_ELEM_INIT(elem, link); 411266733Speter elem->a.aio_cmd = AIO_SELPOLL; 412266733Speter elem->a.aio_cflags &= ~AIO_OK2COMPIMD; /* not OK to complete inline*/ 413266733Speter elem->pfd = *descriptor; 414266733Speter elem->os_pfd.events = get_event(descriptor->reqevents); 415266733Speter 416266733Speter if (0 != asyncio(&elem->a)) { 417266733Speter rv = errno; 418266733Speter DBG3(4, "pollset %p asio failed fd %d, errno %p\n", 419266733Speter pollset, elem->os_pfd.fd, rv); 420266733Speter#if DEBUG 421266733Speter perror(__FUNCTION__ " asio failure"); 422266733Speter#endif 423266733Speter } 424266733Speter else { 425266733Speter DBG2(4, "good asio call, adding fd %d to pollset %p\n", 426266733Speter elem->os_pfd.fd, pollset); 427266733Speter 428266733Speter pollset->nelts++; 429266733Speter apr_hash_set(priv->elems, &(elem->os_pfd.fd), sizeof(int), elem); 430266733Speter } 431266733Speter } 432266733Speter else { 433266733Speter /* APR_POLLSET_THREADSAFE isn't set. use POSIX poll in case 434266733Speter * pipes or files are used with this pollset 435266733Speter */ 436266733Speter 437266733Speter rv = posix_add(pollset, descriptor); 438266733Speter } 439266733Speter 440266733Speter DBG1(2, "exiting, rv = %d\n", rv); 441266733Speter 442266733Speter pollset_unlock_rings(); 443266733Speter return rv; 444266733Speter} /* end of asio_pollset_add */ 445266733Speter 446266733Speterstatic posix_remove(apr_pollset_t *pollset, const apr_pollfd_t *descriptor) 447266733Speter{ 448266733Speter DBG_BUFF 449266733Speter apr_uint32_t i; 450266733Speter apr_pollset_private_t *priv = pollset->p; 451266733Speter 452266733Speter DBG(4, "entered\n"); 453266733Speter for (i = 0; i < pollset->nelts; i++) { 454266733Speter if (descriptor->desc.s == priv->query_set[i].desc.s) { 455266733Speter /* Found an instance of the fd: remove this and any other copies */ 456266733Speter apr_uint32_t dst = i; 457266733Speter apr_uint32_t old_nelts = pollset->nelts; 458266733Speter pollset->nelts--; 459266733Speter for (i++; i < old_nelts; i++) { 460266733Speter if (descriptor->desc.s == priv->query_set[i].desc.s) { 461266733Speter pollset->nelts--; 462266733Speter } 463266733Speter else { 464266733Speter priv->pollset[dst] = priv->pollset[i]; 465266733Speter priv->query_set[dst] = priv->query_set[i]; 466266733Speter dst++; 467266733Speter } 468266733Speter } 469266733Speter DBG(4, "returning OK\n"); 470266733Speter return APR_SUCCESS; 471266733Speter } 472266733Speter } 473266733Speter 474266733Speter DBG(1, "returning APR_NOTFOUND\n"); 475266733Speter return APR_NOTFOUND; 476266733Speter 477266733Speter} /* end of posix_remove */ 478266733Speter 479266733Speterstatic apr_status_t asio_pollset_remove(apr_pollset_t *pollset, 480266733Speter const apr_pollfd_t *descriptor) 481266733Speter{ 482266733Speter DBG_BUFF 483266733Speter asio_elem_t *elem; 484266733Speter apr_status_t rv = APR_SUCCESS; 485266733Speter apr_pollset_private_t *priv = pollset->p; 486362181Sdim /* AIO_CANCEL is synchronous, so autodata works fine. */ 487362181Sdim struct aiocb cancel_a = {0}; 488266733Speter 489266733Speter int fd; 490266733Speter 491266733Speter DBG(2, "entered\n"); 492266733Speter 493266733Speter if (!(pollset->flags & APR_POLLSET_THREADSAFE)) { 494266733Speter return posix_remove(pollset, descriptor); 495266733Speter } 496266733Speter 497266733Speter pollset_lock_rings(); 498266733Speter 499266733Speter#if DEBUG 500266733Speter assert(descriptor->desc_type == APR_POLL_SOCKET); 501266733Speter#endif 502266733Speter /* zOS 1.12 doesn't support files for async i/o */ 503266733Speter fd = descriptor->desc.s->socketdes; 504266733Speter 505266733Speter elem = apr_hash_get(priv->elems, &(fd), sizeof(int)); 506266733Speter if (elem == NULL) { 507266733Speter DBG1(1, "couldn't find fd %d\n", fd); 508266733Speter rv = APR_NOTFOUND; 509266733Speter } else { 510266733Speter DBG1(5, "hash found fd %d\n", fd); 511266733Speter /* delete this fd from the hash */ 512266733Speter apr_hash_set(priv->elems, &(fd), sizeof(int), NULL); 513266733Speter 514266733Speter if (elem->state == ASIO_INIT) { 515266733Speter /* asyncio call to cancel */ 516266733Speter cancel_a.aio_cmd = AIO_CANCEL; 517266733Speter cancel_a.aio_buf = &elem->a; /* point to original aiocb */ 518266733Speter 519266733Speter cancel_a.aio_cflags = 0; 520266733Speter cancel_a.aio_cflags2 = 0; 521266733Speter 522266733Speter /* we want the original aiocb to show up on the pollset message queue 523266733Speter * before recycling its memory to eliminate race conditions 524266733Speter */ 525266733Speter 526266733Speter rv = asyncio(&cancel_a); 527266733Speter DBG1(4, "asyncio returned %d\n", rv); 528266733Speter 529266733Speter#if DEBUG 530266733Speter assert(rv == 1); 531266733Speter#endif 532266733Speter } 533266733Speter elem->state = ASIO_REMOVED; 534266733Speter rv = APR_SUCCESS; 535266733Speter } 536266733Speter 537266733Speter DBG1(2, "exiting, rv: %d\n", rv); 538266733Speter 539266733Speter pollset_unlock_rings(); 540266733Speter 541266733Speter return rv; 542266733Speter} /* end of asio_pollset_remove */ 543266733Speter 544266733Speterstatic posix_poll(apr_pollset_t *pollset, 545266733Speter apr_interval_time_t timeout, 546266733Speter apr_int32_t *num, 547266733Speter const apr_pollfd_t **descriptors) 548266733Speter{ 549266733Speter DBG_BUFF 550266733Speter int rv; 551266733Speter apr_uint32_t i, j; 552266733Speter apr_pollset_private_t *priv = pollset->p; 553266733Speter 554266733Speter DBG(4, "entered\n"); 555266733Speter 556266733Speter if (timeout > 0) { 557266733Speter timeout /= 1000; 558266733Speter } 559266733Speter rv = poll(priv->pollset, pollset->nelts, timeout); 560266733Speter (*num) = rv; 561266733Speter if (rv < 0) { 562266733Speter return apr_get_netos_error(); 563266733Speter } 564266733Speter if (rv == 0) { 565266733Speter return APR_TIMEUP; 566266733Speter } 567266733Speter j = 0; 568266733Speter for (i = 0; i < pollset->nelts; i++) { 569266733Speter if (priv->pollset[i].revents != 0) { 570266733Speter priv->result_set[j] = priv->query_set[i]; 571266733Speter priv->result_set[j].rtnevents = 572266733Speter get_revent(priv->pollset[i].revents); 573266733Speter j++; 574266733Speter } 575266733Speter } 576266733Speter if (descriptors) 577266733Speter *descriptors = priv->result_set; 578266733Speter 579266733Speter DBG(4, "exiting ok\n"); 580266733Speter return APR_SUCCESS; 581266733Speter 582266733Speter} /* end of posix_poll */ 583266733Speter 584266733Speterstatic process_msg(apr_pollset_t *pollset, struct asio_msgbuf_t *msg) 585266733Speter{ 586266733Speter DBG_BUFF 587266733Speter asio_elem_t *elem = msg->msg_elem; 588266733Speter 589266733Speter switch(elem->state) { 590266733Speter case ASIO_REMOVED: 591266733Speter DBG2(5, "for cancelled elem, recycling memory - elem %08p, fd %d\n", 592266733Speter elem, elem->os_pfd.fd); 593266733Speter APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem, 594266733Speter asio_elem_t, link); 595266733Speter break; 596266733Speter case ASIO_INIT: 597266733Speter DBG2(4, "adding to ready ring: elem %08p, fd %d\n", 598266733Speter elem, elem->os_pfd.fd); 599266733Speter elem->state = ASIO_COMPLETE; 600266733Speter APR_RING_INSERT_TAIL(&(pollset->p->ready_ring), elem, 601266733Speter asio_elem_t, link); 602266733Speter break; 603266733Speter default: 604266733Speter DBG3(1, "unexpected state: elem %08p, fd %d, state %d\n", 605266733Speter elem, elem->os_pfd.fd, elem->state); 606266733Speter#if DEBUG 607266733Speter assert(0); 608266733Speter#endif 609266733Speter } 610266733Speter} 611266733Speter 612266733Speterstatic apr_status_t asio_pollset_poll(apr_pollset_t *pollset, 613266733Speter apr_interval_time_t timeout, 614266733Speter apr_int32_t *num, 615266733Speter const apr_pollfd_t **descriptors) 616266733Speter{ 617266733Speter DBG_BUFF 618266733Speter int i, ret; 619266733Speter asio_elem_t *elem, *next_elem; 620266733Speter struct asio_msgbuf_t msg_buff; 621266733Speter struct timespec tv; 622266733Speter apr_status_t rv = APR_SUCCESS; 623266733Speter apr_pollset_private_t *priv = pollset->p; 624266733Speter 625266733Speter DBG(6, "entered\n"); /* chatty - traces every second w/Event */ 626266733Speter 627266733Speter if ((pollset->flags & APR_POLLSET_THREADSAFE) == 0 ) { 628266733Speter return posix_poll(pollset, timeout, num, descriptors); 629266733Speter } 630266733Speter 631266733Speter pollset_lock_rings(); 632266733Speter APR_RING_INIT(&(priv->ready_ring), asio_elem_t, link); 633266733Speter 634266733Speter while (!APR_RING_EMPTY(&(priv->prior_ready_ring), asio_elem_t, link)) { 635266733Speter elem = APR_RING_FIRST(&(priv->prior_ready_ring)); 636266733Speter DBG3(5, "pollset %p elem %p fd %d on prior ready ring\n", 637266733Speter pollset, 638266733Speter elem, 639266733Speter elem->os_pfd.fd); 640266733Speter 641266733Speter APR_RING_REMOVE(elem, link); 642266733Speter 643266733Speter /* 644266733Speter * since USS does not remember what's in our pollset, we have 645266733Speter * to re-add fds which have not been apr_pollset_remove'd 646266733Speter * 647266733Speter * there may have been too many ready fd's to return in the 648266733Speter * result set last time. re-poll inline for both cases 649266733Speter */ 650266733Speter 651266733Speter if (elem->state == ASIO_REMOVED) { 652266733Speter 653266733Speter /* 654266733Speter * async i/o is done since it was found on prior_ready 655266733Speter * the state says the caller is done with it too 656266733Speter * so recycle the elem 657266733Speter */ 658266733Speter 659266733Speter APR_RING_INSERT_TAIL(&(priv->free_ring), elem, 660266733Speter asio_elem_t, link); 661266733Speter continue; /* do not re-add if it has been _removed */ 662266733Speter } 663266733Speter 664266733Speter elem->state = ASIO_INIT; 665266733Speter elem->a.aio_cflags = AIO_OK2COMPIMD; 666266733Speter 667266733Speter if (0 != (ret = asyncio(&elem->a))) { 668266733Speter if (ret == 1) { 669266733Speter DBG(4, "asyncio() completed inline\n"); 670266733Speter /* it's ready now */ 671286503Speter elem->state = ASIO_COMPLETE; 672266733Speter APR_RING_INSERT_TAIL(&(priv->ready_ring), elem, asio_elem_t, 673266733Speter link); 674266733Speter } 675266733Speter else { 676266733Speter DBG2(1, "asyncio() failed, ret: %d, errno: %d\n", 677266733Speter ret, errno); 678266733Speter pollset_unlock_rings(); 679266733Speter return errno; 680266733Speter } 681266733Speter } 682266733Speter DBG1(4, "asyncio() completed rc %d\n", ret); 683266733Speter } 684266733Speter 685266733Speter DBG(6, "after prior ready loop\n"); /* chatty w/timeouts, hence 6 */ 686266733Speter 687266733Speter /* Gather async poll completions that have occurred since the last call */ 688266733Speter while (0 < msgrcv(priv->msg_q, &msg_buff, sizeof(asio_elem_t *), 0, 689266733Speter IPC_NOWAIT)) { 690266733Speter process_msg(pollset, &msg_buff); 691266733Speter } 692266733Speter 693266733Speter /* Suspend if nothing is ready yet. */ 694266733Speter if (APR_RING_EMPTY(&(priv->ready_ring), asio_elem_t, link)) { 695266733Speter 696266733Speter if (timeout >= 0) { 697266733Speter tv.tv_sec = apr_time_sec(timeout); 698266733Speter tv.tv_nsec = apr_time_usec(timeout) * 1000; 699266733Speter } else { 700266733Speter tv.tv_sec = INT_MAX; /* block until something is ready */ 701266733Speter } 702266733Speter 703266733Speter DBG2(6, "nothing on the ready ring " 704266733Speter "- blocking for %d seconds %d ns\n", 705266733Speter tv.tv_sec, tv.tv_nsec); 706266733Speter 707266733Speter pollset_unlock_rings(); /* allow other apr_pollset_* calls while blocked */ 708266733Speter 709266733Speter if (0 >= (ret = __msgrcv_timed(priv->msg_q, &msg_buff, 710266733Speter sizeof(asio_elem_t *), 0, NULL, &tv))) { 711266733Speter#if DEBUG 712266733Speter if (errno == EAGAIN) { 713266733Speter DBG(6, "__msgrcv_timed timed out\n"); /* timeout path, so 6 */ 714266733Speter } 715266733Speter else { 716266733Speter DBG(1, "__msgrcv_timed failed!\n"); 717266733Speter } 718266733Speter#endif 719266733Speter return (errno == EAGAIN) ? APR_TIMEUP : errno; 720266733Speter } 721266733Speter 722266733Speter pollset_lock_rings(); 723266733Speter 724266733Speter process_msg(pollset, &msg_buff); 725266733Speter } 726266733Speter 727266733Speter APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link); 728266733Speter 729266733Speter (*num) = 0; 730266733Speter elem = APR_RING_FIRST(&(priv->ready_ring)); 731266733Speter 732266733Speter for (i = 0; 733266733Speter 734266733Speter i < priv->size 735266733Speter && elem != APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link); 736266733Speter i++) { 737266733Speter DBG2(5, "ready ring: elem %08p, fd %d\n", elem, elem->os_pfd.fd); 738266733Speter 739266733Speter priv->result_set[i] = elem->pfd; 740266733Speter priv->result_set[i].rtnevents 741266733Speter = get_revent(elem->os_pfd.revents); 742266733Speter (*num)++; 743266733Speter 744266733Speter elem = APR_RING_NEXT(elem, link); 745266733Speter 746266733Speter#if DEBUG 747266733Speter if (elem == APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link)) { 748266733Speter DBG(5, "end of ready ring reached\n"); 749266733Speter } 750266733Speter#endif 751266733Speter } 752266733Speter 753266733Speter if (descriptors) { 754266733Speter *descriptors = priv->result_set; 755266733Speter } 756266733Speter 757266733Speter /* if the result size is too small, remember which descriptors 758266733Speter * haven't had results reported yet. we will look 759266733Speter * at these descriptors on the next apr_pollset_poll call 760266733Speter */ 761266733Speter 762266733Speter APR_RING_CONCAT(&priv->prior_ready_ring, &(priv->ready_ring), asio_elem_t, link); 763266733Speter 764266733Speter DBG1(2, "exiting, rv = %d\n", rv); 765266733Speter 766266733Speter pollset_unlock_rings(); 767266733Speter 768266733Speter return rv; 769266733Speter} /* end of asio_pollset_poll */ 770266733Speter 771362181Sdimstatic const apr_pollset_provider_t impl = { 772266733Speter asio_pollset_create, 773266733Speter asio_pollset_add, 774266733Speter asio_pollset_remove, 775266733Speter asio_pollset_poll, 776266733Speter asio_pollset_cleanup, 777266733Speter "asio" 778266733Speter}; 779266733Speter 780362181Sdimconst apr_pollset_provider_t *apr_pollset_provider_aio_msgq = &impl; 781266733Speter 782266733Speter#endif /* HAVE_AIO_MSGQ */ 783