1/* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17#include "apr.h" 18#include "apr_poll.h" 19#include "apr_time.h" 20#include "apr_portable.h" 21#include "apr_atomic.h" 22#include "apr_arch_file_io.h" 23#include "apr_arch_networkio.h" 24#include "apr_arch_poll_private.h" 25#include "apr_arch_inherit.h" 26 27#if defined(HAVE_PORT_CREATE) 28 29static apr_int16_t get_event(apr_int16_t event) 30{ 31 apr_int16_t rv = 0; 32 33 if (event & APR_POLLIN) 34 rv |= POLLIN; 35 if (event & APR_POLLPRI) 36 rv |= POLLPRI; 37 if (event & APR_POLLOUT) 38 rv |= POLLOUT; 39 /* POLLERR, POLLHUP, and POLLNVAL aren't valid as requested events */ 40 41 return rv; 42} 43 44static apr_int16_t get_revent(apr_int16_t event) 45{ 46 apr_int16_t rv = 0; 47 48 if (event & POLLIN) 49 rv |= APR_POLLIN; 50 if (event & POLLPRI) 51 rv |= APR_POLLPRI; 52 if (event & POLLOUT) 53 rv |= APR_POLLOUT; 54 if (event & POLLERR) 55 rv |= APR_POLLERR; 56 if (event & POLLHUP) 57 rv |= APR_POLLHUP; 58 if (event & POLLNVAL) 59 rv |= APR_POLLNVAL; 60 61 return rv; 62} 63 64 65struct apr_pollset_private_t 66{ 67 int port_fd; 68 port_event_t *port_set; 69 apr_pollfd_t *result_set; 70#if APR_HAS_THREADS 71 /* A thread mutex to protect operations on the rings */ 72 apr_thread_mutex_t *ring_lock; 73#endif 74 /* A ring containing all of the pollfd_t that are active */ 75 APR_RING_HEAD(pfd_query_ring_t, pfd_elem_t) query_ring; 76 /* A ring containing the pollfd_t that will be added on the 77 * next call to apr_pollset_poll(). 78 */ 79 APR_RING_HEAD(pfd_add_ring_t, pfd_elem_t) add_ring; 80 /* A ring of pollfd_t that have been used, and then _remove'd */ 81 APR_RING_HEAD(pfd_free_ring_t, pfd_elem_t) free_ring; 82 /* A ring of pollfd_t where rings that have been _remove'd but 83 might still be inside a _poll */ 84 APR_RING_HEAD(pfd_dead_ring_t, pfd_elem_t) dead_ring; 85 /* number of threads in poll */ 86 volatile apr_uint32_t waiting; 87}; 88 89static apr_status_t call_port_getn(int port, port_event_t list[], 90 unsigned int max, unsigned int *nget, 91 apr_interval_time_t timeout) 92{ 93 struct timespec tv, *tvptr; 94 int ret; 95 apr_status_t rv = APR_SUCCESS; 96 97 if (timeout < 0) { 98 tvptr = NULL; 99 } 100 else { 101 tv.tv_sec = (long) apr_time_sec(timeout); 102 tv.tv_nsec = (long) apr_time_usec(timeout) * 1000; 103 tvptr = &tv; 104 } 105 106 list[0].portev_user = (void *)-1; /* so we can double check that an 107 * event was returned 108 */ 109 110 ret = port_getn(port, list, max, nget, tvptr); 111 /* Note: 32-bit port_getn() on Solaris 10 x86 returns large negative 112 * values instead of 0 when returning immediately. 113 */ 114 115 if (ret == -1) { 116 rv = apr_get_netos_error(); 117 118 switch(rv) { 119 case EINTR: 120 case ETIME: 121 if (*nget > 0 && list[0].portev_user != (void *)-1) { 122 /* This confusing API can return an event at the same time 123 * that it reports EINTR or ETIME. If that occurs, just 124 * report the event. With EINTR, nget can be > 0 without 125 * any event, so check that portev_user was filled in. 126 * 127 * (Maybe it will be simplified; see thread 128 * http://mail.opensolaris.org 129 * /pipermail/networking-discuss/2009-August/011979.html 130 * This code will still work afterwards.) 131 */ 132 rv = APR_SUCCESS; 133 break; 134 } 135 if (rv == ETIME) { 136 rv = APR_TIMEUP; 137 } 138 /* fall-through */ 139 default: 140 *nget = 0; 141 } 142 } 143 else if (*nget == 0) { 144 rv = APR_TIMEUP; 145 } 146 147 return rv; 148} 149 150static apr_status_t impl_pollset_cleanup(apr_pollset_t *pollset) 151{ 152 close(pollset->p->port_fd); 153 return APR_SUCCESS; 154} 155 156static apr_status_t impl_pollset_create(apr_pollset_t *pollset, 157 apr_uint32_t size, 158 apr_pool_t *p, 159 apr_uint32_t flags) 160{ 161 apr_status_t rv = APR_SUCCESS; 162 pollset->p = apr_palloc(p, sizeof(apr_pollset_private_t)); 163#if APR_HAS_THREADS 164 if (flags & APR_POLLSET_THREADSAFE && 165 ((rv = apr_thread_mutex_create(&pollset->p->ring_lock, 166 APR_THREAD_MUTEX_DEFAULT, 167 p)) != APR_SUCCESS)) { 168 pollset->p = NULL; 169 return rv; 170 } 171#else 172 if (flags & APR_POLLSET_THREADSAFE) { 173 pollset->p = NULL; 174 return APR_ENOTIMPL; 175 } 176#endif 177 pollset->p->waiting = 0; 178 179 pollset->p->port_set = apr_palloc(p, size * sizeof(port_event_t)); 180 181 pollset->p->port_fd = port_create(); 182 183 if (pollset->p->port_fd < 0) { 184 pollset->p = NULL; 185 return apr_get_netos_error(); 186 } 187 188 { 189 int flags; 190 191 if ((flags = fcntl(pollset->p->port_fd, F_GETFD)) == -1) 192 return errno; 193 194 flags |= FD_CLOEXEC; 195 if (fcntl(pollset->p->port_fd, F_SETFD, flags) == -1) 196 return errno; 197 } 198 199 pollset->p->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); 200 201 APR_RING_INIT(&pollset->p->query_ring, pfd_elem_t, link); 202 APR_RING_INIT(&pollset->p->add_ring, pfd_elem_t, link); 203 APR_RING_INIT(&pollset->p->free_ring, pfd_elem_t, link); 204 APR_RING_INIT(&pollset->p->dead_ring, pfd_elem_t, link); 205 206 return rv; 207} 208 209static apr_status_t impl_pollset_add(apr_pollset_t *pollset, 210 const apr_pollfd_t *descriptor) 211{ 212 apr_os_sock_t fd; 213 pfd_elem_t *elem; 214 int res; 215 apr_status_t rv = APR_SUCCESS; 216 217 pollset_lock_rings(); 218 219 if (!APR_RING_EMPTY(&(pollset->p->free_ring), pfd_elem_t, link)) { 220 elem = APR_RING_FIRST(&(pollset->p->free_ring)); 221 APR_RING_REMOVE(elem, link); 222 } 223 else { 224 elem = (pfd_elem_t *) apr_palloc(pollset->pool, sizeof(pfd_elem_t)); 225 APR_RING_ELEM_INIT(elem, link); 226 elem->on_query_ring = 0; 227 } 228 elem->pfd = *descriptor; 229 230 if (descriptor->desc_type == APR_POLL_SOCKET) { 231 fd = descriptor->desc.s->socketdes; 232 } 233 else { 234 fd = descriptor->desc.f->filedes; 235 } 236 237 /* If another thread is polling, notify the kernel immediately; otherwise, 238 * wait until the next call to apr_pollset_poll(). 239 */ 240 if (apr_atomic_read32(&pollset->p->waiting)) { 241 res = port_associate(pollset->p->port_fd, PORT_SOURCE_FD, fd, 242 get_event(descriptor->reqevents), (void *)elem); 243 244 if (res < 0) { 245 rv = apr_get_netos_error(); 246 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), elem, pfd_elem_t, link); 247 } 248 else { 249 elem->on_query_ring = 1; 250 APR_RING_INSERT_TAIL(&(pollset->p->query_ring), elem, pfd_elem_t, link); 251 } 252 } 253 else { 254 APR_RING_INSERT_TAIL(&(pollset->p->add_ring), elem, pfd_elem_t, link); 255 } 256 257 pollset_unlock_rings(); 258 259 return rv; 260} 261 262static apr_status_t impl_pollset_remove(apr_pollset_t *pollset, 263 const apr_pollfd_t *descriptor) 264{ 265 apr_os_sock_t fd; 266 pfd_elem_t *ep; 267 apr_status_t rv = APR_SUCCESS; 268 int res; 269 int err = 0; 270 int found; 271 272 pollset_lock_rings(); 273 274 if (descriptor->desc_type == APR_POLL_SOCKET) { 275 fd = descriptor->desc.s->socketdes; 276 } 277 else { 278 fd = descriptor->desc.f->filedes; 279 } 280 281 /* Search the add ring first. This ring is often shorter, 282 * and it often contains the descriptor being removed. 283 * (For the common scenario where apr_pollset_poll() 284 * returns activity for the descriptor and the descriptor 285 * is then removed from the pollset, it will have just 286 * been moved to the add ring by apr_pollset_poll().) 287 * 288 * If it is on the add ring, it isn't associated with the 289 * event port yet/anymore. 290 */ 291 found = 0; 292 for (ep = APR_RING_FIRST(&(pollset->p->add_ring)); 293 ep != APR_RING_SENTINEL(&(pollset->p->add_ring), 294 pfd_elem_t, link); 295 ep = APR_RING_NEXT(ep, link)) { 296 297 if (descriptor->desc.s == ep->pfd.desc.s) { 298 found = 1; 299 APR_RING_REMOVE(ep, link); 300 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), 301 ep, pfd_elem_t, link); 302 break; 303 } 304 } 305 306 if (!found) { 307 res = port_dissociate(pollset->p->port_fd, PORT_SOURCE_FD, fd); 308 309 if (res < 0) { 310 /* The expected case for this failure is that another 311 * thread's call to port_getn() returned this fd and 312 * disassociated the fd from the event port, and 313 * impl_pollset_poll() is blocked on the ring lock, 314 * which this thread holds. 315 */ 316 err = errno; 317 rv = APR_NOTFOUND; 318 } 319 320 for (ep = APR_RING_FIRST(&(pollset->p->query_ring)); 321 ep != APR_RING_SENTINEL(&(pollset->p->query_ring), 322 pfd_elem_t, link); 323 ep = APR_RING_NEXT(ep, link)) { 324 325 if (descriptor->desc.s == ep->pfd.desc.s) { 326 APR_RING_REMOVE(ep, link); 327 ep->on_query_ring = 0; 328 APR_RING_INSERT_TAIL(&(pollset->p->dead_ring), 329 ep, pfd_elem_t, link); 330 if (ENOENT == err) { 331 rv = APR_SUCCESS; 332 } 333 break; 334 } 335 } 336 } 337 338 pollset_unlock_rings(); 339 340 return rv; 341} 342 343static apr_status_t impl_pollset_poll(apr_pollset_t *pollset, 344 apr_interval_time_t timeout, 345 apr_int32_t *num, 346 const apr_pollfd_t **descriptors) 347{ 348 apr_os_sock_t fd; 349 int ret, i, j; 350 unsigned int nget; 351 pfd_elem_t *ep; 352 apr_status_t rv = APR_SUCCESS; 353 apr_pollfd_t fp; 354 355 nget = 1; 356 357 pollset_lock_rings(); 358 359 apr_atomic_inc32(&pollset->p->waiting); 360 361 while (!APR_RING_EMPTY(&(pollset->p->add_ring), pfd_elem_t, link)) { 362 ep = APR_RING_FIRST(&(pollset->p->add_ring)); 363 APR_RING_REMOVE(ep, link); 364 365 if (ep->pfd.desc_type == APR_POLL_SOCKET) { 366 fd = ep->pfd.desc.s->socketdes; 367 } 368 else { 369 fd = ep->pfd.desc.f->filedes; 370 } 371 372 ret = port_associate(pollset->p->port_fd, PORT_SOURCE_FD, 373 fd, get_event(ep->pfd.reqevents), ep); 374 if (ret < 0) { 375 rv = apr_get_netos_error(); 376 APR_RING_INSERT_TAIL(&(pollset->p->free_ring), ep, pfd_elem_t, link); 377 break; 378 } 379 380 ep->on_query_ring = 1; 381 APR_RING_INSERT_TAIL(&(pollset->p->query_ring), ep, pfd_elem_t, link); 382 } 383 384 pollset_unlock_rings(); 385 386 if (rv != APR_SUCCESS) { 387 apr_atomic_dec32(&pollset->p->waiting); 388 return rv; 389 } 390 391 rv = call_port_getn(pollset->p->port_fd, pollset->p->port_set, 392 pollset->nalloc, &nget, timeout); 393 394 /* decrease the waiting ASAP to reduce the window for calling 395 port_associate within apr_pollset_add() */ 396 apr_atomic_dec32(&pollset->p->waiting); 397 398 (*num) = nget; 399 if (nget) { 400 401 pollset_lock_rings(); 402 403 for (i = 0, j = 0; i < nget; i++) { 404 fp = (((pfd_elem_t*)(pollset->p->port_set[i].portev_user))->pfd); 405 if ((pollset->flags & APR_POLLSET_WAKEABLE) && 406 fp.desc_type == APR_POLL_FILE && 407 fp.desc.f == pollset->wakeup_pipe[0]) { 408 apr_pollset_drain_wakeup_pipe(pollset); 409 rv = APR_EINTR; 410 } 411 else { 412 pollset->p->result_set[j] = fp; 413 pollset->p->result_set[j].rtnevents = 414 get_revent(pollset->p->port_set[i].portev_events); 415 416 /* If the ring element is still on the query ring, move it 417 * to the add ring for re-association with the event port 418 * later. (It may have already been moved to the dead ring 419 * by a call to pollset_remove on another thread.) 420 */ 421 ep = (pfd_elem_t *)pollset->p->port_set[i].portev_user; 422 if (ep->on_query_ring) { 423 APR_RING_REMOVE(ep, link); 424 ep->on_query_ring = 0; 425 APR_RING_INSERT_TAIL(&(pollset->p->add_ring), ep, 426 pfd_elem_t, link); 427 } 428 j++; 429 } 430 } 431 pollset_unlock_rings(); 432 if ((*num = j)) { /* any event besides wakeup pipe? */ 433 rv = APR_SUCCESS; 434 if (descriptors) { 435 *descriptors = pollset->p->result_set; 436 } 437 } 438 } 439 440 pollset_lock_rings(); 441 442 /* Shift all PFDs in the Dead Ring to the Free Ring */ 443 APR_RING_CONCAT(&(pollset->p->free_ring), &(pollset->p->dead_ring), pfd_elem_t, link); 444 445 pollset_unlock_rings(); 446 447 return rv; 448} 449 450static apr_pollset_provider_t impl = { 451 impl_pollset_create, 452 impl_pollset_add, 453 impl_pollset_remove, 454 impl_pollset_poll, 455 impl_pollset_cleanup, 456 "port" 457}; 458 459apr_pollset_provider_t *apr_pollset_provider_port = &impl; 460 461static apr_status_t cb_cleanup(void *p_) 462{ 463 apr_pollcb_t *pollcb = (apr_pollcb_t *) p_; 464 close(pollcb->fd); 465 return APR_SUCCESS; 466} 467 468static apr_status_t impl_pollcb_create(apr_pollcb_t *pollcb, 469 apr_uint32_t size, 470 apr_pool_t *p, 471 apr_uint32_t flags) 472{ 473 pollcb->fd = port_create(); 474 475 if (pollcb->fd < 0) { 476 return apr_get_netos_error(); 477 } 478 479 { 480 int flags; 481 482 if ((flags = fcntl(pollcb->fd, F_GETFD)) == -1) 483 return errno; 484 485 flags |= FD_CLOEXEC; 486 if (fcntl(pollcb->fd, F_SETFD, flags) == -1) 487 return errno; 488 } 489 490 pollcb->pollset.port = apr_palloc(p, size * sizeof(port_event_t)); 491 apr_pool_cleanup_register(p, pollcb, cb_cleanup, apr_pool_cleanup_null); 492 493 return APR_SUCCESS; 494} 495 496static apr_status_t impl_pollcb_add(apr_pollcb_t *pollcb, 497 apr_pollfd_t *descriptor) 498{ 499 int ret, fd; 500 501 if (descriptor->desc_type == APR_POLL_SOCKET) { 502 fd = descriptor->desc.s->socketdes; 503 } 504 else { 505 fd = descriptor->desc.f->filedes; 506 } 507 508 ret = port_associate(pollcb->fd, PORT_SOURCE_FD, fd, 509 get_event(descriptor->reqevents), descriptor); 510 511 if (ret == -1) { 512 return apr_get_netos_error(); 513 } 514 515 return APR_SUCCESS; 516} 517 518static apr_status_t impl_pollcb_remove(apr_pollcb_t *pollcb, 519 apr_pollfd_t *descriptor) 520{ 521 int fd, ret; 522 523 if (descriptor->desc_type == APR_POLL_SOCKET) { 524 fd = descriptor->desc.s->socketdes; 525 } 526 else { 527 fd = descriptor->desc.f->filedes; 528 } 529 530 ret = port_dissociate(pollcb->fd, PORT_SOURCE_FD, fd); 531 532 if (ret < 0) { 533 return APR_NOTFOUND; 534 } 535 536 return APR_SUCCESS; 537} 538 539static apr_status_t impl_pollcb_poll(apr_pollcb_t *pollcb, 540 apr_interval_time_t timeout, 541 apr_pollcb_cb_t func, 542 void *baton) 543{ 544 apr_pollfd_t *pollfd; 545 apr_status_t rv; 546 unsigned int i, nget = 1; 547 548 rv = call_port_getn(pollcb->fd, pollcb->pollset.port, pollcb->nalloc, 549 &nget, timeout); 550 551 if (nget) { 552 for (i = 0; i < nget; i++) { 553 pollfd = (apr_pollfd_t *)(pollcb->pollset.port[i].portev_user); 554 pollfd->rtnevents = get_revent(pollcb->pollset.port[i].portev_events); 555 556 rv = func(baton, pollfd); 557 if (rv) { 558 return rv; 559 } 560 rv = apr_pollcb_add(pollcb, pollfd); 561 } 562 } 563 564 return rv; 565} 566 567static apr_pollcb_provider_t impl_cb = { 568 impl_pollcb_create, 569 impl_pollcb_add, 570 impl_pollcb_remove, 571 impl_pollcb_poll, 572 "port" 573}; 574 575apr_pollcb_provider_t *apr_pollcb_provider_port = &impl_cb; 576 577#endif /* HAVE_PORT_CREATE */ 578