1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 * 17 ****************************************************************************** 18 * 19 * This implementation is based on a design by John Brooks (IBM Pok) which uses 20 * the z/OS sockets async i/o facility. When a 21 * socket is added to the pollset, an async poll is issued for that individual 22 * socket. It specifies that the kernel should send an IPC message when the 23 * socket becomes ready. The IPC messages are sent to a single message queue 24 * that is part of the pollset. apr_pollset_poll waits on the arrival of IPC 25 * messages or the specified timeout. 26 * 27 * Since z/OS does not support async i/o for pipes or files at present, this 28 * implementation falls back to using ordinary poll() when 29 * APR_POLLSET_THREADSAFE is unset. 30 * 31 * Greg Ames 32 * April 2012 33 */ 34 35#include "apr.h" 36#include "apr_hash.h" 37#include "apr_poll.h" 38#include "apr_time.h" 39#include "apr_portable.h" 40#include "apr_arch_inherit.h" 41#include "apr_arch_file_io.h" 42#include "apr_arch_networkio.h" 43#include "apr_arch_poll_private.h" 44 45#ifdef HAVE_AIO_MSGQ 46 47#include <sys/msg.h> /* msgget etc */ 48#include <time.h> /* timestruct */ 49#include <poll.h> /* pollfd */ 50#include <limits.h> /* MAX_INT */ 51 52struct apr_pollset_private_t 53{ 54 int msg_q; /* IPC message queue. The z/OS kernel sends messages 55 * to this queue when our async polls on individual 56 * file descriptors complete 57 */ 58 apr_pollfd_t *result_set; 59 apr_uint32_t size; 60 61#if APR_HAS_THREADS 62 /* A thread mutex to protect operations on the rings and the hash */ 63 apr_thread_mutex_t *ring_lock; 64#endif 65 66 /* A hash of all active elements used for O(1) _remove operations */ 67 apr_hash_t *elems; 68 69 APR_RING_HEAD(ready_ring_t, asio_elem_t) ready_ring; 70 APR_RING_HEAD(prior_ready_ring_t, asio_elem_t) prior_ready_ring; 71 APR_RING_HEAD(free_ring_t, asio_elem_t) free_ring; 72 73 /* for pipes etc with no asio */ 74 struct pollfd *pollset; 75 apr_pollfd_t *query_set; 76}; 77 78typedef enum { 79 ASIO_INIT = 0, 80 ASIO_REMOVED, 81 ASIO_COMPLETE 82} asio_state_e; 83 84typedef struct asio_elem_t asio_elem_t; 85 86struct asio_msgbuf_t { 87 long msg_type; /* must be > 0 */ 88 asio_elem_t *msg_elem; 89}; 90 91struct asio_elem_t 92{ 93 APR_RING_ENTRY(asio_elem_t) link; 94 apr_pollfd_t pfd; 95 struct pollfd os_pfd; 96 struct aiocb a; 97 asio_state_e state; 98 struct asio_msgbuf_t msg; 99}; 100 101#define DEBUG 0 102 103/* DEBUG settings: 0 - no debug messages at all, 104 * 1 - should not occur messages, 105 * 2 - apr_pollset_* entry and exit messages, 106 * 3 - state changes, memory usage, 107 * 4 - z/OS, APR, and internal calls, 108 * 5 - everything else except the timer pop path, 109 * 6 - everything, including the Event 1 sec timer pop path 110 * 111 * each DEBUG level includes all messages produced by lower numbered levels 112 */ 113 114#if DEBUG 115 116#include <assert.h> 117#include <unistd.h> /* getpid */ 118 119#define DBG_BUFF char dbg_msg_buff[256]; 120 121#define DBG_TEST(lvl) if (lvl <= DEBUG) { 122 123#define DBG_CORE(msg) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 124 " " msg, getpid()), \ 125 fprintf(stderr, "%s", dbg_msg_buff); 126#define DBG_CORE1(msg, var1) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 127 " " msg, getpid(), var1), \ 128 fprintf(stderr, "%s", dbg_msg_buff); 129#define DBG_CORE2(msg, var1, var2) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 130 " " msg, getpid(), var1, var2), \ 131 fprintf(stderr, "%s", dbg_msg_buff); 132#define DBG_CORE3(msg, var1, var2, var3) \ 133 sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 134 " " msg, getpid(), var1, var2, var3), \ 135 fprintf(stderr, "%s", dbg_msg_buff); 136#define DBG_CORE4(msg, var1, var2, var3, var4) \ 137 sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 138 " " msg, getpid(), var1, var2, var3, var4),\ 139 fprintf(stderr, "%s", dbg_msg_buff); 140 141#define DBG_END } 142 143#define DBG(lvl, msg) DBG_TEST(lvl) \ 144 DBG_CORE(msg) \ 145 DBG_END 146 147#define DBG1(lvl, msg, var1) DBG_TEST(lvl) \ 148 DBG_CORE1(msg, var1) \ 149 DBG_END 150 151#define DBG2(lvl, msg, var1, var2) DBG_TEST(lvl) \ 152 DBG_CORE2(msg, var1, var2) \ 153 DBG_END 154 155#define DBG3(lvl, msg, var1, var2, var3) \ 156 DBG_TEST(lvl) \ 157 DBG_CORE3(msg, var1, var2, var3) \ 158 DBG_END 159 160#define DBG4(lvl, msg, var1, var2, var3, var4) \ 161 DBG_TEST(lvl) \ 162 DBG_CORE4(msg, var1, var2, var3, var4) \ 163 DBG_END 164 165#else /* DEBUG is 0 */ 166#define DBG_BUFF 167#define DBG(lvl, msg) ((void)0) 168#define DBG1(lvl, msg, var1) ((void)0) 169#define DBG2(lvl, msg, var1, var2) ((void)0) 170#define DBG3(lvl, msg, var1, var2, var3) ((void)0) 171#define DBG4(lvl, msg, var1, var2, var3, var4) ((void)0) 172 173#endif /* DEBUG */ 174 175static int asyncio(struct aiocb *a) 176{ 177 DBG_BUFF 178 int rv; 179 180#ifdef _LP64 181#define AIO BPX4AIO 182#else 183#define AIO BPX1AIO 184#endif 185 186 AIO(sizeof(struct aiocb), a, &rv, &errno, __err2ad()); 187 DBG2(4, "BPX4AIO aiocb %p rv %d\n", 188 a, rv); 189#ifdef DEBUG 190 if (rv < 0) { 191 DBG2(4, "errno %d errnojr %08x\n", 192 errno, *__err2ad()); 193 } 194#endif 195 return rv; 196} 197 198static apr_int16_t get_event(apr_int16_t event) 199{ 200 DBG_BUFF 201 apr_int16_t rv = 0; 202 DBG(4, "entered\n"); 203 204 if (event & APR_POLLIN) 205 rv |= POLLIN; 206 if (event & APR_POLLPRI) 207 rv |= POLLPRI; 208 if (event & APR_POLLOUT) 209 rv |= POLLOUT; 210 if (event & APR_POLLERR) 211 rv |= POLLERR; 212 if (event & APR_POLLHUP) 213 rv |= POLLHUP; 214 if (event & APR_POLLNVAL) 215 rv |= POLLNVAL; 216 217 DBG(4, "exiting\n"); 218 return rv; 219} 220 221static apr_int16_t get_revent(apr_int16_t event) 222{ 223 DBG_BUFF 224 apr_int16_t rv = 0; 225 DBG(4, "entered\n"); 226 227 if (event & POLLIN) 228 rv |= APR_POLLIN; 229 if (event & POLLPRI) 230 rv |= APR_POLLPRI; 231 if (event & POLLOUT) 232 rv |= APR_POLLOUT; 233 if (event & POLLERR) 234 rv |= APR_POLLERR; 235 if (event & POLLHUP) 236 rv |= APR_POLLHUP; 237 if (event & POLLNVAL) 238 rv |= APR_POLLNVAL; 239 240 DBG(4, "exiting\n"); 241 return rv; 242} 243 244static apr_status_t asio_pollset_cleanup(apr_pollset_t *pollset) 245{ 246 DBG_BUFF 247 int rv; 248 249 DBG(4, "entered\n"); 250 if (pollset->flags & APR_POLLSET_THREADSAFE) { 251 rv = msgctl(pollset->p->msg_q, IPC_RMID, NULL); 252 DBG1(4, "asio_pollset_cleanup: msgctl(IPC_RMID) returned %d\n", rv); 253 } 254 255 return rv; 256} 257 258static apr_status_t asio_pollset_create(apr_pollset_t *pollset, 259 apr_uint32_t size, 260 apr_pool_t *p, 261 apr_uint32_t flags) 262{ 263 DBG_BUFF 264 apr_status_t rv; 265 apr_pollset_private_t *priv; 266 267 DBG1(2, "entered, flags: %x\n", flags); 268 269 priv = pollset->p = apr_pcalloc(p, sizeof(*priv)); 270 271 if (flags & APR_POLLSET_THREADSAFE) { 272#if APR_HAS_THREADS 273 if ((rv = apr_thread_mutex_create(&(priv->ring_lock), 274 APR_THREAD_MUTEX_DEFAULT, 275 p)) != APR_SUCCESS) { 276 DBG1(1, "apr_thread_mutex_create returned %d\n", rv); 277 pollset->p = NULL; 278 return rv; 279 } 280 rv = msgget(IPC_PRIVATE, S_IWUSR+S_IRUSR); /* user r/w perms */ 281 if (rv < 0) { 282#if DEBUG 283 perror(__FUNCTION__ " msgget returned < 0 "); 284#endif 285 pollset->p = NULL; 286 return rv; 287 } 288 289 DBG2(4, "pollset %p msgget was OK, rv=%d\n", pollset, rv); 290 priv->msg_q = rv; 291 priv->elems = apr_hash_make(p); 292 293 APR_RING_INIT(&priv->free_ring, asio_elem_t, link); 294 APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link); 295 296#else /* APR doesn't have threads but caller wants a threadsafe pollset */ 297 pollset->p = NULL; 298 return APR_ENOTIMPL; 299#endif 300 301 } else { /* APR_POLLSET_THREADSAFE not set, i.e. no async i/o, 302 * init fields only needed in old style pollset 303 */ 304 305 priv->pollset = apr_palloc(p, size * sizeof(struct pollfd)); 306 priv->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 307 308 if ((!priv->pollset) || (!priv->query_set)) { 309 pollset->p = NULL; 310 return APR_ENOMEM; 311 } 312 } 313 314 pollset->nelts = 0; 315 pollset->flags = flags; 316 pollset->pool = p; 317 priv->size = size; 318 priv->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 319 if (!priv->result_set) { 320 if (flags & APR_POLLSET_THREADSAFE) { 321 msgctl(priv->msg_q, IPC_RMID, NULL); 322 } 323 pollset->p = NULL; 324 return APR_ENOMEM; 325 } 326 327 DBG2(2, "exiting, pollset: %p, type: %s\n", 328 pollset, 329 flags & APR_POLLSET_THREADSAFE ? "async" : "POSIX"); 330 331 332 return APR_SUCCESS; 333 334} /* end of asio_pollset_create */ 335 336static apr_status_t posix_add(apr_pollset_t *pollset, 337 const apr_pollfd_t *descriptor) 338{ 339 DBG_BUFF 340 int fd; 341 apr_pool_t *p = pollset->pool; 342 apr_pollset_private_t *priv = pollset->p; 343 344 DBG(4, "entered\n"); 345 346 if (pollset->nelts == priv->size) { 347 return APR_ENOMEM; 348 } 349 350 priv->query_set[pollset->nelts] = *descriptor; 351 if (descriptor->desc_type == APR_POLL_SOCKET) { 352 fd = descriptor->desc.s->socketdes; 353 } 354 else { 355 fd = descriptor->desc.f->filedes; 356 } 357 358 priv->pollset[pollset->nelts].fd = fd; 359 360 priv->pollset[pollset->nelts].events = 361 get_event(descriptor->reqevents); 362 363 pollset->nelts++; 364 365 DBG2(4, "exiting, fd %d added to pollset %p\n", fd, pollset); 366 367 return APR_SUCCESS; 368} /* end of posix_add */ 369 370 371static apr_status_t asio_pollset_add(apr_pollset_t *pollset, 372 const apr_pollfd_t *descriptor) 373{ 374 DBG_BUFF 375 asio_elem_t *elem; 376 apr_status_t rv = APR_SUCCESS; 377 apr_pollset_private_t *priv = pollset->p; 378 379 pollset_lock_rings(); 380 DBG(2, "entered\n"); 381 382 if (pollset->flags & APR_POLLSET_THREADSAFE) { 383 384 if (!APR_RING_EMPTY(&(priv->free_ring), asio_elem_t, link)) { 385 elem = APR_RING_FIRST(&(priv->free_ring)); 386 APR_RING_REMOVE(elem, link); 387 DBG1(3, "used recycled memory at %08p\n", elem); 388 elem->state = ASIO_INIT; 389 elem->a.aio_cflags = 0; 390 } 391 else { 392 elem = (asio_elem_t *) apr_pcalloc(pollset->pool, sizeof(asio_elem_t)); 393 DBG1(3, "alloced new memory at %08p\n", elem); 394 395 elem->a.aio_notifytype = AIO_MSGQ; 396 elem->a.aio_msgev_qid = priv->msg_q; 397 DBG1(5, "aio_msgev_quid = %d \n", elem->a.aio_msgev_qid); 398 elem->a.aio_msgev_size = sizeof(asio_elem_t *); 399 elem->a.aio_msgev_flag = 0; /* wait if queue is full */ 400 elem->a.aio_msgev_addr = &(elem->msg); 401 elem->a.aio_buf = &(elem->os_pfd); 402 elem->a.aio_nbytes = 1; /* number of pfds to poll */ 403 elem->msg.msg_type = 1; 404 elem->msg.msg_elem = elem; 405 } 406 407 /* z/OS only supports async I/O for sockets for now */ 408 elem->os_pfd.fd = descriptor->desc.s->socketdes; 409 410 APR_RING_ELEM_INIT(elem, link); 411 elem->a.aio_cmd = AIO_SELPOLL; 412 elem->a.aio_cflags &= ~AIO_OK2COMPIMD; /* not OK to complete inline*/ 413 elem->pfd = *descriptor; 414 elem->os_pfd.events = get_event(descriptor->reqevents); 415 416 if (0 != asyncio(&elem->a)) { 417 rv = errno; 418 DBG3(4, "pollset %p asio failed fd %d, errno %p\n", 419 pollset, elem->os_pfd.fd, rv); 420#if DEBUG 421 perror(__FUNCTION__ " asio failure"); 422#endif 423 } 424 else { 425 DBG2(4, "good asio call, adding fd %d to pollset %p\n", 426 elem->os_pfd.fd, pollset); 427 428 pollset->nelts++; 429 apr_hash_set(priv->elems, &(elem->os_pfd.fd), sizeof(int), elem); 430 } 431 } 432 else { 433 /* APR_POLLSET_THREADSAFE isn't set. use POSIX poll in case 434 * pipes or files are used with this pollset 435 */ 436 437 rv = posix_add(pollset, descriptor); 438 } 439 440 DBG1(2, "exiting, rv = %d\n", rv); 441 442 pollset_unlock_rings(); 443 return rv; 444} /* end of asio_pollset_add */ 445 446static posix_remove(apr_pollset_t *pollset, const apr_pollfd_t *descriptor) 447{ 448 DBG_BUFF 449 apr_uint32_t i; 450 apr_pollset_private_t *priv = pollset->p; 451 452 DBG(4, "entered\n"); 453 for (i = 0; i < pollset->nelts; i++) { 454 if (descriptor->desc.s == priv->query_set[i].desc.s) { 455 /* Found an instance of the fd: remove this and any other copies */ 456 apr_uint32_t dst = i; 457 apr_uint32_t old_nelts = pollset->nelts; 458 pollset->nelts--; 459 for (i++; i < old_nelts; i++) { 460 if (descriptor->desc.s == priv->query_set[i].desc.s) { 461 pollset->nelts--; 462 } 463 else { 464 priv->pollset[dst] = priv->pollset[i]; 465 priv->query_set[dst] = priv->query_set[i]; 466 dst++; 467 } 468 } 469 DBG(4, "returning OK\n"); 470 return APR_SUCCESS; 471 } 472 } 473 474 DBG(1, "returning APR_NOTFOUND\n"); 475 return APR_NOTFOUND; 476 477} /* end of posix_remove */ 478 479static apr_status_t asio_pollset_remove(apr_pollset_t *pollset, 480 const apr_pollfd_t *descriptor) 481{ 482 DBG_BUFF 483 asio_elem_t *elem; 484 apr_status_t rv = APR_SUCCESS; 485 apr_pollset_private_t *priv = pollset->p; 486 /* AIO_CANCEL is synchronous, so autodata works fine. */ 487 struct aiocb cancel_a = {0}; 488 489 int fd; 490 491 DBG(2, "entered\n"); 492 493 if (!(pollset->flags & APR_POLLSET_THREADSAFE)) { 494 return posix_remove(pollset, descriptor); 495 } 496 497 pollset_lock_rings(); 498 499#if DEBUG 500 assert(descriptor->desc_type == APR_POLL_SOCKET); 501#endif 502 /* zOS 1.12 doesn't support files for async i/o */ 503 fd = descriptor->desc.s->socketdes; 504 505 elem = apr_hash_get(priv->elems, &(fd), sizeof(int)); 506 if (elem == NULL) { 507 DBG1(1, "couldn't find fd %d\n", fd); 508 rv = APR_NOTFOUND; 509 } else { 510 DBG1(5, "hash found fd %d\n", fd); 511 /* delete this fd from the hash */ 512 apr_hash_set(priv->elems, &(fd), sizeof(int), NULL); 513 514 if (elem->state == ASIO_INIT) { 515 /* asyncio call to cancel */ 516 cancel_a.aio_cmd = AIO_CANCEL; 517 cancel_a.aio_buf = &elem->a; /* point to original aiocb */ 518 519 cancel_a.aio_cflags = 0; 520 cancel_a.aio_cflags2 = 0; 521 522 /* we want the original aiocb to show up on the pollset message queue 523 * before recycling its memory to eliminate race conditions 524 */ 525 526 rv = asyncio(&cancel_a); 527 DBG1(4, "asyncio returned %d\n", rv); 528 529#if DEBUG 530 assert(rv == 1); 531#endif 532 } 533 elem->state = ASIO_REMOVED; 534 rv = APR_SUCCESS; 535 } 536 537 DBG1(2, "exiting, rv: %d\n", rv); 538 539 pollset_unlock_rings(); 540 541 return rv; 542} /* end of asio_pollset_remove */ 543 544static posix_poll(apr_pollset_t *pollset, 545 apr_interval_time_t timeout, 546 apr_int32_t *num, 547 const apr_pollfd_t **descriptors) 548{ 549 DBG_BUFF 550 int rv; 551 apr_uint32_t i, j; 552 apr_pollset_private_t *priv = pollset->p; 553 554 DBG(4, "entered\n"); 555 556 if (timeout > 0) { 557 timeout /= 1000; 558 } 559 rv = poll(priv->pollset, pollset->nelts, timeout); 560 (*num) = rv; 561 if (rv < 0) { 562 return apr_get_netos_error(); 563 } 564 if (rv == 0) { 565 return APR_TIMEUP; 566 } 567 j = 0; 568 for (i = 0; i < pollset->nelts; i++) { 569 if (priv->pollset[i].revents != 0) { 570 priv->result_set[j] = priv->query_set[i]; 571 priv->result_set[j].rtnevents = 572 get_revent(priv->pollset[i].revents); 573 j++; 574 } 575 } 576 if (descriptors) 577 *descriptors = priv->result_set; 578 579 DBG(4, "exiting ok\n"); 580 return APR_SUCCESS; 581 582} /* end of posix_poll */ 583 584static process_msg(apr_pollset_t *pollset, struct asio_msgbuf_t *msg) 585{ 586 DBG_BUFF 587 asio_elem_t *elem = msg->msg_elem; 588 589 switch(elem->state) { 590 case ASIO_REMOVED: 591 DBG2(5, "for cancelled elem, recycling memory - elem %08p, fd %d\n", 592 elem, elem->os_pfd.fd); 593 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem, 594 asio_elem_t, link); 595 break; 596 case ASIO_INIT: 597 DBG2(4, "adding to ready ring: elem %08p, fd %d\n", 598 elem, elem->os_pfd.fd); 599 elem->state = ASIO_COMPLETE; 600 APR_RING_INSERT_TAIL(&(pollset->p->ready_ring), elem, 601 asio_elem_t, link); 602 break; 603 default: 604 DBG3(1, "unexpected state: elem %08p, fd %d, state %d\n", 605 elem, elem->os_pfd.fd, elem->state); 606#if DEBUG 607 assert(0); 608#endif 609 } 610} 611 612static apr_status_t asio_pollset_poll(apr_pollset_t *pollset, 613 apr_interval_time_t timeout, 614 apr_int32_t *num, 615 const apr_pollfd_t **descriptors) 616{ 617 DBG_BUFF 618 int i, ret; 619 asio_elem_t *elem, *next_elem; 620 struct asio_msgbuf_t msg_buff; 621 struct timespec tv; 622 apr_status_t rv = APR_SUCCESS; 623 apr_pollset_private_t *priv = pollset->p; 624 625 DBG(6, "entered\n"); /* chatty - traces every second w/Event */ 626 627 if ((pollset->flags & APR_POLLSET_THREADSAFE) == 0 ) { 628 return posix_poll(pollset, timeout, num, descriptors); 629 } 630 631 pollset_lock_rings(); 632 APR_RING_INIT(&(priv->ready_ring), asio_elem_t, link); 633 634 while (!APR_RING_EMPTY(&(priv->prior_ready_ring), asio_elem_t, link)) { 635 elem = APR_RING_FIRST(&(priv->prior_ready_ring)); 636 DBG3(5, "pollset %p elem %p fd %d on prior ready ring\n", 637 pollset, 638 elem, 639 elem->os_pfd.fd); 640 641 APR_RING_REMOVE(elem, link); 642 643 /* 644 * since USS does not remember what's in our pollset, we have 645 * to re-add fds which have not been apr_pollset_remove'd 646 * 647 * there may have been too many ready fd's to return in the 648 * result set last time. re-poll inline for both cases 649 */ 650 651 if (elem->state == ASIO_REMOVED) { 652 653 /* 654 * async i/o is done since it was found on prior_ready 655 * the state says the caller is done with it too 656 * so recycle the elem 657 */ 658 659 APR_RING_INSERT_TAIL(&(priv->free_ring), elem, 660 asio_elem_t, link); 661 continue; /* do not re-add if it has been _removed */ 662 } 663 664 elem->state = ASIO_INIT; 665 elem->a.aio_cflags = AIO_OK2COMPIMD; 666 667 if (0 != (ret = asyncio(&elem->a))) { 668 if (ret == 1) { 669 DBG(4, "asyncio() completed inline\n"); 670 /* it's ready now */ 671 elem->state = ASIO_COMPLETE; 672 APR_RING_INSERT_TAIL(&(priv->ready_ring), elem, asio_elem_t, 673 link); 674 } 675 else { 676 DBG2(1, "asyncio() failed, ret: %d, errno: %d\n", 677 ret, errno); 678 pollset_unlock_rings(); 679 return errno; 680 } 681 } 682 DBG1(4, "asyncio() completed rc %d\n", ret); 683 } 684 685 DBG(6, "after prior ready loop\n"); /* chatty w/timeouts, hence 6 */ 686 687 /* Gather async poll completions that have occurred since the last call */ 688 while (0 < msgrcv(priv->msg_q, &msg_buff, sizeof(asio_elem_t *), 0, 689 IPC_NOWAIT)) { 690 process_msg(pollset, &msg_buff); 691 } 692 693 /* Suspend if nothing is ready yet. */ 694 if (APR_RING_EMPTY(&(priv->ready_ring), asio_elem_t, link)) { 695 696 if (timeout >= 0) { 697 tv.tv_sec = apr_time_sec(timeout); 698 tv.tv_nsec = apr_time_usec(timeout) * 1000; 699 } else { 700 tv.tv_sec = INT_MAX; /* block until something is ready */ 701 } 702 703 DBG2(6, "nothing on the ready ring " 704 "- blocking for %d seconds %d ns\n", 705 tv.tv_sec, tv.tv_nsec); 706 707 pollset_unlock_rings(); /* allow other apr_pollset_* calls while blocked */ 708 709 if (0 >= (ret = __msgrcv_timed(priv->msg_q, &msg_buff, 710 sizeof(asio_elem_t *), 0, NULL, &tv))) { 711#if DEBUG 712 if (errno == EAGAIN) { 713 DBG(6, "__msgrcv_timed timed out\n"); /* timeout path, so 6 */ 714 } 715 else { 716 DBG(1, "__msgrcv_timed failed!\n"); 717 } 718#endif 719 return (errno == EAGAIN) ? APR_TIMEUP : errno; 720 } 721 722 pollset_lock_rings(); 723 724 process_msg(pollset, &msg_buff); 725 } 726 727 APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link); 728 729 (*num) = 0; 730 elem = APR_RING_FIRST(&(priv->ready_ring)); 731 732 for (i = 0; 733 734 i < priv->size 735 && elem != APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link); 736 i++) { 737 DBG2(5, "ready ring: elem %08p, fd %d\n", elem, elem->os_pfd.fd); 738 739 priv->result_set[i] = elem->pfd; 740 priv->result_set[i].rtnevents 741 = get_revent(elem->os_pfd.revents); 742 (*num)++; 743 744 elem = APR_RING_NEXT(elem, link); 745 746#if DEBUG 747 if (elem == APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link)) { 748 DBG(5, "end of ready ring reached\n"); 749 } 750#endif 751 } 752 753 if (descriptors) { 754 *descriptors = priv->result_set; 755 } 756 757 /* if the result size is too small, remember which descriptors 758 * haven't had results reported yet. we will look 759 * at these descriptors on the next apr_pollset_poll call 760 */ 761 762 APR_RING_CONCAT(&priv->prior_ready_ring, &(priv->ready_ring), asio_elem_t, link); 763 764 DBG1(2, "exiting, rv = %d\n", rv); 765 766 pollset_unlock_rings(); 767 768 return rv; 769} /* end of asio_pollset_poll */ 770 771static const apr_pollset_provider_t impl = { 772 asio_pollset_create, 773 asio_pollset_add, 774 asio_pollset_remove, 775 asio_pollset_poll, 776 asio_pollset_cleanup, 777 "asio" 778}; 779 780const apr_pollset_provider_t *apr_pollset_provider_aio_msgq = &impl; 781 782#endif /* HAVE_AIO_MSGQ */ 783