z_asio.c revision 266733
1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 * 16 * 17 ****************************************************************************** 18 * 19 * This implementation is based on a design by John Brooks (IBM Pok) which uses 20 * the z/OS sockets async i/o facility. When a 21 * socket is added to the pollset, an async poll is issued for that individual 22 * socket. It specifies that the kernel should send an IPC message when the 23 * socket becomes ready. The IPC messages are sent to a single message queue 24 * that is part of the pollset. apr_pollset_poll waits on the arrival of IPC 25 * messages or the specified timeout. 26 * 27 * Since z/OS does not support async i/o for pipes or files at present, this 28 * implementation falls back to using ordinary poll() when 29 * APR_POLLSET_THREADSAFE is unset. 30 * 31 * Greg Ames 32 * April 2012 33 */ 34 35#include "apr.h" 36#include "apr_hash.h" 37#include "apr_poll.h" 38#include "apr_time.h" 39#include "apr_portable.h" 40#include "apr_arch_inherit.h" 41#include "apr_arch_file_io.h" 42#include "apr_arch_networkio.h" 43#include "apr_arch_poll_private.h" 44 45#ifdef HAVE_AIO_MSGQ 46 47#include <sys/msg.h> /* msgget etc */ 48#include <time.h> /* timestruct */ 49#include <poll.h> /* pollfd */ 50#include <limits.h> /* MAX_INT */ 51 52struct apr_pollset_private_t 53{ 54 int msg_q; /* IPC message queue. The z/OS kernel sends messages 55 * to this queue when our async polls on individual 56 * file descriptors complete 57 */ 58 apr_pollfd_t *result_set; 59 apr_uint32_t size; 60 61#if APR_HAS_THREADS 62 /* A thread mutex to protect operations on the rings and the hash */ 63 apr_thread_mutex_t *ring_lock; 64#endif 65 66 /* A hash of all active elements used for O(1) _remove operations */ 67 apr_hash_t *elems; 68 69 APR_RING_HEAD(ready_ring_t, asio_elem_t) ready_ring; 70 APR_RING_HEAD(prior_ready_ring_t, asio_elem_t) prior_ready_ring; 71 APR_RING_HEAD(free_ring_t, asio_elem_t) free_ring; 72 73 /* for pipes etc with no asio */ 74 struct pollfd *pollset; 75 apr_pollfd_t *query_set; 76}; 77 78typedef enum { 79 ASIO_INIT = 0, 80 ASIO_REMOVED, 81 ASIO_COMPLETE 82} asio_state_e; 83 84typedef struct asio_elem_t asio_elem_t; 85 86struct asio_msgbuf_t { 87 long msg_type; /* must be > 0 */ 88 asio_elem_t *msg_elem; 89}; 90 91struct asio_elem_t 92{ 93 APR_RING_ENTRY(asio_elem_t) link; 94 apr_pollfd_t pfd; 95 struct pollfd os_pfd; 96 struct aiocb a; 97 asio_state_e state; 98 struct asio_msgbuf_t msg; 99}; 100 101#define DEBUG 0 102 103/* DEBUG settings: 0 - no debug messages at all, 104 * 1 - should not occur messages, 105 * 2 - apr_pollset_* entry and exit messages, 106 * 3 - state changes, memory usage, 107 * 4 - z/OS, APR, and internal calls, 108 * 5 - everything else except the timer pop path, 109 * 6 - everything, including the Event 1 sec timer pop path 110 * 111 * each DEBUG level includes all messages produced by lower numbered levels 112 */ 113 114#if DEBUG 115 116#include <assert.h> 117#include <unistd.h> /* getpid */ 118 119#define DBG_BUFF char dbg_msg_buff[256]; 120 121#define DBG_TEST(lvl) if (lvl <= DEBUG) { 122 123#define DBG_CORE(msg) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 124 " " msg, getpid()), \ 125 fprintf(stderr, "%s", dbg_msg_buff); 126#define DBG_CORE1(msg, var1) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 127 " " msg, getpid(), var1), \ 128 fprintf(stderr, "%s", dbg_msg_buff); 129#define DBG_CORE2(msg, var1, var2) sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 130 " " msg, getpid(), var1, var2), \ 131 fprintf(stderr, "%s", dbg_msg_buff); 132#define DBG_CORE3(msg, var1, var2, var3) \ 133 sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 134 " " msg, getpid(), var1, var2, var3), \ 135 fprintf(stderr, "%s", dbg_msg_buff); 136#define DBG_CORE4(msg, var1, var2, var3, var4) \ 137 sprintf(dbg_msg_buff, "% 8d " __FUNCTION__ \ 138 " " msg, getpid(), var1, var2, var3, var4),\ 139 fprintf(stderr, "%s", dbg_msg_buff); 140 141#define DBG_END } 142 143#define DBG(lvl, msg) DBG_TEST(lvl) \ 144 DBG_CORE(msg) \ 145 DBG_END 146 147#define DBG1(lvl, msg, var1) DBG_TEST(lvl) \ 148 DBG_CORE1(msg, var1) \ 149 DBG_END 150 151#define DBG2(lvl, msg, var1, var2) DBG_TEST(lvl) \ 152 DBG_CORE2(msg, var1, var2) \ 153 DBG_END 154 155#define DBG3(lvl, msg, var1, var2, var3) \ 156 DBG_TEST(lvl) \ 157 DBG_CORE3(msg, var1, var2, var3) \ 158 DBG_END 159 160#define DBG4(lvl, msg, var1, var2, var3, var4) \ 161 DBG_TEST(lvl) \ 162 DBG_CORE4(msg, var1, var2, var3, var4) \ 163 DBG_END 164 165#else /* DEBUG is 0 */ 166#define DBG_BUFF 167#define DBG(lvl, msg) ((void)0) 168#define DBG1(lvl, msg, var1) ((void)0) 169#define DBG2(lvl, msg, var1, var2) ((void)0) 170#define DBG3(lvl, msg, var1, var2, var3) ((void)0) 171#define DBG4(lvl, msg, var1, var2, var3, var4) ((void)0) 172 173#endif /* DEBUG */ 174 175static int asyncio(struct aiocb *a) 176{ 177 DBG_BUFF 178 int rv; 179 180#ifdef _LP64 181#define AIO BPX4AIO 182#else 183#define AIO BPX1AIO 184#endif 185 186 AIO(sizeof(struct aiocb), a, &rv, &errno, __err2ad()); 187 DBG2(4, "BPX4AIO aiocb %p rv %d\n", 188 a, rv); 189#ifdef DEBUG 190 if (rv < 0) { 191 DBG2(4, "errno %d errnojr %08x\n", 192 errno, *__err2ad()); 193 } 194#endif 195 return rv; 196} 197 198static apr_int16_t get_event(apr_int16_t event) 199{ 200 DBG_BUFF 201 apr_int16_t rv = 0; 202 DBG(4, "entered\n"); 203 204 if (event & APR_POLLIN) 205 rv |= POLLIN; 206 if (event & APR_POLLPRI) 207 rv |= POLLPRI; 208 if (event & APR_POLLOUT) 209 rv |= POLLOUT; 210 if (event & APR_POLLERR) 211 rv |= POLLERR; 212 if (event & APR_POLLHUP) 213 rv |= POLLHUP; 214 if (event & APR_POLLNVAL) 215 rv |= POLLNVAL; 216 217 DBG(4, "exiting\n"); 218 return rv; 219} 220 221static apr_int16_t get_revent(apr_int16_t event) 222{ 223 DBG_BUFF 224 apr_int16_t rv = 0; 225 DBG(4, "entered\n"); 226 227 if (event & POLLIN) 228 rv |= APR_POLLIN; 229 if (event & POLLPRI) 230 rv |= APR_POLLPRI; 231 if (event & POLLOUT) 232 rv |= APR_POLLOUT; 233 if (event & POLLERR) 234 rv |= APR_POLLERR; 235 if (event & POLLHUP) 236 rv |= APR_POLLHUP; 237 if (event & POLLNVAL) 238 rv |= APR_POLLNVAL; 239 240 DBG(4, "exiting\n"); 241 return rv; 242} 243 244static apr_status_t asio_pollset_cleanup(apr_pollset_t *pollset) 245{ 246 DBG_BUFF 247 int rv; 248 249 DBG(4, "entered\n"); 250 rv = msgctl(pollset->p->msg_q, IPC_RMID, NULL); 251 252 DBG1(4, "exiting, msgctl(IPC_RMID) returned %d\n", rv); 253 return rv; 254} 255 256static apr_status_t asio_pollset_create(apr_pollset_t *pollset, 257 apr_uint32_t size, 258 apr_pool_t *p, 259 apr_uint32_t flags) 260{ 261 DBG_BUFF 262 apr_status_t rv; 263 apr_pollset_private_t *priv; 264 265 DBG1(2, "entered, flags: %x\n", flags); 266 267 priv = pollset->p = apr_palloc(p, sizeof(*priv)); 268 269 if (flags & APR_POLLSET_THREADSAFE) { 270#if APR_HAS_THREADS 271 if (rv = apr_thread_mutex_create(&(priv->ring_lock), 272 APR_THREAD_MUTEX_DEFAULT, 273 p) != APR_SUCCESS) { 274 DBG1(1, "apr_thread_mutex_create returned %d\n", rv); 275 pollset = NULL; 276 return rv; 277 } 278 rv = msgget(IPC_PRIVATE, S_IWUSR+S_IRUSR); /* user r/w perms */ 279 if (rv < 0) { 280#if DEBUG 281 perror(__FUNCTION__ " msgget returned < 0 "); 282#endif 283 pollset = NULL; 284 return rv; 285 } 286 287 DBG2(4, "pollset %p msgget was OK, rv=%d\n", pollset, rv); 288 priv->msg_q = rv; 289 priv->elems = apr_hash_make(p); 290 291 APR_RING_INIT(&priv->free_ring, asio_elem_t, link); 292 APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link); 293 294#else /* APR doesn't have threads but caller wants a threadsafe pollset */ 295 pollset = NULL; 296 return APR_ENOTIMPL; 297#endif 298 299 } else { /* APR_POLLSET_THREADSAFE not set, i.e. no async i/o, 300 * init fields only needed in old style pollset 301 */ 302 303 priv->pollset = apr_palloc(p, size * sizeof(struct pollfd)); 304 priv->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 305 306 if ((!priv->pollset) || (!priv->query_set)) { 307 return APR_ENOMEM; 308 } 309 } 310 311 pollset->nelts = 0; 312 pollset->flags = flags; 313 pollset->pool = p; 314 priv->size = size; 315 priv->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 316 if (!priv->result_set) { 317 return APR_ENOMEM; 318 } 319 320 DBG2(2, "exiting, pollset: %p, type: %s\n", 321 pollset, 322 flags & APR_POLLSET_THREADSAFE ? "async" : "POSIX"); 323 324 325 return APR_SUCCESS; 326 327} /* end of asio_pollset_create */ 328 329static apr_status_t posix_add(apr_pollset_t *pollset, 330 const apr_pollfd_t *descriptor) 331{ 332 DBG_BUFF 333 int fd; 334 apr_pool_t *p = pollset->pool; 335 apr_pollset_private_t *priv = pollset->p; 336 337 DBG(4, "entered\n"); 338 339 if (pollset->nelts == priv->size) { 340 return APR_ENOMEM; 341 } 342 343 priv->query_set[pollset->nelts] = *descriptor; 344 if (descriptor->desc_type == APR_POLL_SOCKET) { 345 fd = descriptor->desc.s->socketdes; 346 } 347 else { 348 fd = descriptor->desc.f->filedes; 349 } 350 351 priv->pollset[pollset->nelts].fd = fd; 352 353 priv->pollset[pollset->nelts].events = 354 get_event(descriptor->reqevents); 355 356 pollset->nelts++; 357 358 DBG2(4, "exiting, fd %d added to pollset %p\n", fd, pollset); 359 360 return APR_SUCCESS; 361} /* end of posix_add */ 362 363 364static apr_status_t asio_pollset_add(apr_pollset_t *pollset, 365 const apr_pollfd_t *descriptor) 366{ 367 DBG_BUFF 368 asio_elem_t *elem; 369 apr_status_t rv = APR_SUCCESS; 370 apr_pollset_private_t *priv = pollset->p; 371 372 pollset_lock_rings(); 373 DBG(2, "entered\n"); 374 375 if (pollset->flags & APR_POLLSET_THREADSAFE) { 376 377 if (!APR_RING_EMPTY(&(priv->free_ring), asio_elem_t, link)) { 378 elem = APR_RING_FIRST(&(priv->free_ring)); 379 APR_RING_REMOVE(elem, link); 380 DBG1(3, "used recycled memory at %08p\n", elem); 381 elem->state = ASIO_INIT; 382 } 383 else { 384 elem = (asio_elem_t *) apr_pcalloc(pollset->pool, sizeof(asio_elem_t)); 385 DBG1(3, "alloced new memory at %08p\n", elem); 386 387 elem->a.aio_notifytype = AIO_MSGQ; 388 elem->a.aio_msgev_qid = priv->msg_q; 389 DBG1(5, "aio_msgev_quid = %d \n", elem->a.aio_msgev_qid); 390 elem->a.aio_msgev_size = sizeof(asio_elem_t *); 391 elem->a.aio_msgev_flag = 0; /* wait if queue is full */ 392 elem->a.aio_msgev_addr = &(elem->msg); 393 elem->a.aio_buf = &(elem->os_pfd); 394 elem->a.aio_nbytes = 1; /* number of pfds to poll */ 395 elem->msg.msg_type = 1; 396 elem->msg.msg_elem = elem; 397 } 398 399 /* z/OS only supports async I/O for sockets for now */ 400 elem->os_pfd.fd = descriptor->desc.s->socketdes; 401 402 APR_RING_ELEM_INIT(elem, link); 403 elem->a.aio_cmd = AIO_SELPOLL; 404 elem->a.aio_cflags &= ~AIO_OK2COMPIMD; /* not OK to complete inline*/ 405 elem->pfd = *descriptor; 406 elem->os_pfd.events = get_event(descriptor->reqevents); 407 408 if (0 != asyncio(&elem->a)) { 409 rv = errno; 410 DBG3(4, "pollset %p asio failed fd %d, errno %p\n", 411 pollset, elem->os_pfd.fd, rv); 412#if DEBUG 413 perror(__FUNCTION__ " asio failure"); 414#endif 415 } 416 else { 417 DBG2(4, "good asio call, adding fd %d to pollset %p\n", 418 elem->os_pfd.fd, pollset); 419 420 pollset->nelts++; 421 apr_hash_set(priv->elems, &(elem->os_pfd.fd), sizeof(int), elem); 422 } 423 } 424 else { 425 /* APR_POLLSET_THREADSAFE isn't set. use POSIX poll in case 426 * pipes or files are used with this pollset 427 */ 428 429 rv = posix_add(pollset, descriptor); 430 } 431 432 DBG1(2, "exiting, rv = %d\n", rv); 433 434 pollset_unlock_rings(); 435 return rv; 436} /* end of asio_pollset_add */ 437 438static posix_remove(apr_pollset_t *pollset, const apr_pollfd_t *descriptor) 439{ 440 DBG_BUFF 441 apr_uint32_t i; 442 apr_pollset_private_t *priv = pollset->p; 443 444 DBG(4, "entered\n"); 445 for (i = 0; i < pollset->nelts; i++) { 446 if (descriptor->desc.s == priv->query_set[i].desc.s) { 447 /* Found an instance of the fd: remove this and any other copies */ 448 apr_uint32_t dst = i; 449 apr_uint32_t old_nelts = pollset->nelts; 450 pollset->nelts--; 451 for (i++; i < old_nelts; i++) { 452 if (descriptor->desc.s == priv->query_set[i].desc.s) { 453 pollset->nelts--; 454 } 455 else { 456 priv->pollset[dst] = priv->pollset[i]; 457 priv->query_set[dst] = priv->query_set[i]; 458 dst++; 459 } 460 } 461 DBG(4, "returning OK\n"); 462 return APR_SUCCESS; 463 } 464 } 465 466 DBG(1, "returning APR_NOTFOUND\n"); 467 return APR_NOTFOUND; 468 469} /* end of posix_remove */ 470 471static apr_status_t asio_pollset_remove(apr_pollset_t *pollset, 472 const apr_pollfd_t *descriptor) 473{ 474 DBG_BUFF 475 asio_elem_t *elem; 476 apr_status_t rv = APR_SUCCESS; 477 apr_pollset_private_t *priv = pollset->p; 478 struct aiocb cancel_a; /* AIO_CANCEL is synchronous, so autodata works fine */ 479 480 int fd; 481 482 DBG(2, "entered\n"); 483 484 if (!(pollset->flags & APR_POLLSET_THREADSAFE)) { 485 return posix_remove(pollset, descriptor); 486 } 487 488 pollset_lock_rings(); 489 490#if DEBUG 491 assert(descriptor->desc_type == APR_POLL_SOCKET); 492#endif 493 /* zOS 1.12 doesn't support files for async i/o */ 494 fd = descriptor->desc.s->socketdes; 495 496 elem = apr_hash_get(priv->elems, &(fd), sizeof(int)); 497 if (elem == NULL) { 498 DBG1(1, "couldn't find fd %d\n", fd); 499 rv = APR_NOTFOUND; 500 } else { 501 DBG1(5, "hash found fd %d\n", fd); 502 /* delete this fd from the hash */ 503 apr_hash_set(priv->elems, &(fd), sizeof(int), NULL); 504 505 if (elem->state == ASIO_INIT) { 506 /* asyncio call to cancel */ 507 cancel_a.aio_cmd = AIO_CANCEL; 508 cancel_a.aio_buf = &elem->a; /* point to original aiocb */ 509 510 cancel_a.aio_cflags = 0; 511 cancel_a.aio_cflags2 = 0; 512 513 /* we want the original aiocb to show up on the pollset message queue 514 * before recycling its memory to eliminate race conditions 515 */ 516 517 rv = asyncio(&cancel_a); 518 DBG1(4, "asyncio returned %d\n", rv); 519 520#if DEBUG 521 assert(rv == 1); 522#endif 523 } 524 elem->state = ASIO_REMOVED; 525 rv = APR_SUCCESS; 526 } 527 528 DBG1(2, "exiting, rv: %d\n", rv); 529 530 pollset_unlock_rings(); 531 532 return rv; 533} /* end of asio_pollset_remove */ 534 535static posix_poll(apr_pollset_t *pollset, 536 apr_interval_time_t timeout, 537 apr_int32_t *num, 538 const apr_pollfd_t **descriptors) 539{ 540 DBG_BUFF 541 int rv; 542 apr_uint32_t i, j; 543 apr_pollset_private_t *priv = pollset->p; 544 545 DBG(4, "entered\n"); 546 547 if (timeout > 0) { 548 timeout /= 1000; 549 } 550 rv = poll(priv->pollset, pollset->nelts, timeout); 551 (*num) = rv; 552 if (rv < 0) { 553 return apr_get_netos_error(); 554 } 555 if (rv == 0) { 556 return APR_TIMEUP; 557 } 558 j = 0; 559 for (i = 0; i < pollset->nelts; i++) { 560 if (priv->pollset[i].revents != 0) { 561 priv->result_set[j] = priv->query_set[i]; 562 priv->result_set[j].rtnevents = 563 get_revent(priv->pollset[i].revents); 564 j++; 565 } 566 } 567 if (descriptors) 568 *descriptors = priv->result_set; 569 570 DBG(4, "exiting ok\n"); 571 return APR_SUCCESS; 572 573} /* end of posix_poll */ 574 575static process_msg(apr_pollset_t *pollset, struct asio_msgbuf_t *msg) 576{ 577 DBG_BUFF 578 asio_elem_t *elem = msg->msg_elem; 579 580 switch(elem->state) { 581 case ASIO_REMOVED: 582 DBG2(5, "for cancelled elem, recycling memory - elem %08p, fd %d\n", 583 elem, elem->os_pfd.fd); 584 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem, 585 asio_elem_t, link); 586 break; 587 case ASIO_INIT: 588 DBG2(4, "adding to ready ring: elem %08p, fd %d\n", 589 elem, elem->os_pfd.fd); 590 elem->state = ASIO_COMPLETE; 591 APR_RING_INSERT_TAIL(&(pollset->p->ready_ring), elem, 592 asio_elem_t, link); 593 break; 594 default: 595 DBG3(1, "unexpected state: elem %08p, fd %d, state %d\n", 596 elem, elem->os_pfd.fd, elem->state); 597#if DEBUG 598 assert(0); 599#endif 600 } 601} 602 603static apr_status_t asio_pollset_poll(apr_pollset_t *pollset, 604 apr_interval_time_t timeout, 605 apr_int32_t *num, 606 const apr_pollfd_t **descriptors) 607{ 608 DBG_BUFF 609 int i, ret; 610 asio_elem_t *elem, *next_elem; 611 struct asio_msgbuf_t msg_buff; 612 struct timespec tv; 613 apr_status_t rv = APR_SUCCESS; 614 apr_pollset_private_t *priv = pollset->p; 615 616 DBG(6, "entered\n"); /* chatty - traces every second w/Event */ 617 618 if ((pollset->flags & APR_POLLSET_THREADSAFE) == 0 ) { 619 return posix_poll(pollset, timeout, num, descriptors); 620 } 621 622 pollset_lock_rings(); 623 APR_RING_INIT(&(priv->ready_ring), asio_elem_t, link); 624 625 while (!APR_RING_EMPTY(&(priv->prior_ready_ring), asio_elem_t, link)) { 626 elem = APR_RING_FIRST(&(priv->prior_ready_ring)); 627 DBG3(5, "pollset %p elem %p fd %d on prior ready ring\n", 628 pollset, 629 elem, 630 elem->os_pfd.fd); 631 632 APR_RING_REMOVE(elem, link); 633 634 /* 635 * since USS does not remember what's in our pollset, we have 636 * to re-add fds which have not been apr_pollset_remove'd 637 * 638 * there may have been too many ready fd's to return in the 639 * result set last time. re-poll inline for both cases 640 */ 641 642 if (elem->state == ASIO_REMOVED) { 643 644 /* 645 * async i/o is done since it was found on prior_ready 646 * the state says the caller is done with it too 647 * so recycle the elem 648 */ 649 650 APR_RING_INSERT_TAIL(&(priv->free_ring), elem, 651 asio_elem_t, link); 652 continue; /* do not re-add if it has been _removed */ 653 } 654 655 elem->state = ASIO_INIT; 656 elem->a.aio_cflags = AIO_OK2COMPIMD; 657 658 if (0 != (ret = asyncio(&elem->a))) { 659 if (ret == 1) { 660 DBG(4, "asyncio() completed inline\n"); 661 /* it's ready now */ 662 APR_RING_INSERT_TAIL(&(priv->ready_ring), elem, asio_elem_t, 663 link); 664 } 665 else { 666 DBG2(1, "asyncio() failed, ret: %d, errno: %d\n", 667 ret, errno); 668 pollset_unlock_rings(); 669 return errno; 670 } 671 } 672 DBG1(4, "asyncio() completed rc %d\n", ret); 673 } 674 675 DBG(6, "after prior ready loop\n"); /* chatty w/timeouts, hence 6 */ 676 677 /* Gather async poll completions that have occurred since the last call */ 678 while (0 < msgrcv(priv->msg_q, &msg_buff, sizeof(asio_elem_t *), 0, 679 IPC_NOWAIT)) { 680 process_msg(pollset, &msg_buff); 681 } 682 683 /* Suspend if nothing is ready yet. */ 684 if (APR_RING_EMPTY(&(priv->ready_ring), asio_elem_t, link)) { 685 686 if (timeout >= 0) { 687 tv.tv_sec = apr_time_sec(timeout); 688 tv.tv_nsec = apr_time_usec(timeout) * 1000; 689 } else { 690 tv.tv_sec = INT_MAX; /* block until something is ready */ 691 } 692 693 DBG2(6, "nothing on the ready ring " 694 "- blocking for %d seconds %d ns\n", 695 tv.tv_sec, tv.tv_nsec); 696 697 pollset_unlock_rings(); /* allow other apr_pollset_* calls while blocked */ 698 699 if (0 >= (ret = __msgrcv_timed(priv->msg_q, &msg_buff, 700 sizeof(asio_elem_t *), 0, NULL, &tv))) { 701#if DEBUG 702 if (errno == EAGAIN) { 703 DBG(6, "__msgrcv_timed timed out\n"); /* timeout path, so 6 */ 704 } 705 else { 706 DBG(1, "__msgrcv_timed failed!\n"); 707 } 708#endif 709 return (errno == EAGAIN) ? APR_TIMEUP : errno; 710 } 711 712 pollset_lock_rings(); 713 714 process_msg(pollset, &msg_buff); 715 } 716 717 APR_RING_INIT(&priv->prior_ready_ring, asio_elem_t, link); 718 719 (*num) = 0; 720 elem = APR_RING_FIRST(&(priv->ready_ring)); 721 722 for (i = 0; 723 724 i < priv->size 725 && elem != APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link); 726 i++) { 727 DBG2(5, "ready ring: elem %08p, fd %d\n", elem, elem->os_pfd.fd); 728 729 priv->result_set[i] = elem->pfd; 730 priv->result_set[i].rtnevents 731 = get_revent(elem->os_pfd.revents); 732 (*num)++; 733 734 elem = APR_RING_NEXT(elem, link); 735 736#if DEBUG 737 if (elem == APR_RING_SENTINEL(&(priv->ready_ring), asio_elem_t, link)) { 738 DBG(5, "end of ready ring reached\n"); 739 } 740#endif 741 } 742 743 if (descriptors) { 744 *descriptors = priv->result_set; 745 } 746 747 /* if the result size is too small, remember which descriptors 748 * haven't had results reported yet. we will look 749 * at these descriptors on the next apr_pollset_poll call 750 */ 751 752 APR_RING_CONCAT(&priv->prior_ready_ring, &(priv->ready_ring), asio_elem_t, link); 753 754 DBG1(2, "exiting, rv = %d\n", rv); 755 756 pollset_unlock_rings(); 757 758 return rv; 759} /* end of asio_pollset_poll */ 760 761static apr_pollset_provider_t impl = { 762 asio_pollset_create, 763 asio_pollset_add, 764 asio_pollset_remove, 765 asio_pollset_poll, 766 asio_pollset_cleanup, 767 "asio" 768}; 769 770apr_pollset_provider_t *apr_pollset_provider_aio_msgq = &impl; 771 772#endif /* HAVE_AIO_MSGQ */ 773