outgoing.c revision 251886
1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein 2 * 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include <apr_pools.h> 17#include <apr_poll.h> 18#include <apr_version.h> 19 20#include "serf.h" 21#include "serf_bucket_util.h" 22 23#include "serf_private.h" 24 25/* cleanup for sockets */ 26static apr_status_t clean_skt(void *data) 27{ 28 serf_connection_t *conn = data; 29 apr_status_t status = APR_SUCCESS; 30 31 if (conn->skt) { 32 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt, "cleanup - "); 33 status = apr_socket_close(conn->skt); 34 conn->skt = NULL; 35 serf__log_nopref(SOCK_VERBOSE, "closed socket, status %d\n", status); 36 } 37 38 return status; 39} 40 41static apr_status_t clean_resp(void *data) 42{ 43 serf_request_t *request = data; 44 45 /* The request's RESPOOL is being cleared. */ 46 47 /* If the response has allocated some buckets, then destroy them (since 48 the bucket may hold resources other than memory in RESPOOL). Also 49 make sure to set their fields to NULL so connection closure does 50 not attempt to free them again. */ 51 if (request->resp_bkt) { 52 serf_bucket_destroy(request->resp_bkt); 53 request->resp_bkt = NULL; 54 } 55 if (request->req_bkt) { 56 serf_bucket_destroy(request->req_bkt); 57 request->req_bkt = NULL; 58 } 59 60 /* ### should we worry about debug stuff, like that performed in 61 ### destroy_request()? should we worry about calling req->handler 62 ### to notify this "cancellation" due to pool clearing? */ 63 64 /* This pool just got cleared/destroyed. Don't try to destroy the pool 65 (again) when the request is canceled. */ 66 request->respool = NULL; 67 68 return APR_SUCCESS; 69} 70 71/* cleanup for conns */ 72static apr_status_t clean_conn(void *data) 73{ 74 serf_connection_t *conn = data; 75 76 serf__log(CONN_VERBOSE, __FILE__, "cleaning up connection 0x%x\n", 77 conn); 78 serf_connection_close(conn); 79 80 return APR_SUCCESS; 81} 82 83/* Update the pollset for this connection. We tweak the pollset based on 84 * whether we want to read and/or write, given conditions within the 85 * connection. If the connection is not (yet) in the pollset, then it 86 * will be added. 87 */ 88apr_status_t serf__conn_update_pollset(serf_connection_t *conn) 89{ 90 serf_context_t *ctx = conn->ctx; 91 apr_status_t status; 92 apr_pollfd_t desc = { 0 }; 93 94 if (!conn->skt) { 95 return APR_SUCCESS; 96 } 97 98 /* Remove the socket from the poll set. */ 99 desc.desc_type = APR_POLL_SOCKET; 100 desc.desc.s = conn->skt; 101 desc.reqevents = conn->reqevents; 102 103 status = ctx->pollset_rm(ctx->pollset_baton, 104 &desc, conn); 105 if (status && !APR_STATUS_IS_NOTFOUND(status)) 106 return status; 107 108 /* Now put it back in with the correct read/write values. */ 109 desc.reqevents = APR_POLLHUP | APR_POLLERR; 110 if (conn->requests && 111 conn->state != SERF_CONN_INIT) { 112 /* If there are any outstanding events, then we want to read. */ 113 /* ### not true. we only want to read IF we have sent some data */ 114 desc.reqevents |= APR_POLLIN; 115 116 /* If the connection is not closing down and 117 * has unwritten data or 118 * there are any requests that still have buckets to write out, 119 * then we want to write. 120 */ 121 if (conn->vec_len && 122 conn->state != SERF_CONN_CLOSING) 123 desc.reqevents |= APR_POLLOUT; 124 else { 125 serf_request_t *request = conn->requests; 126 127 if ((conn->probable_keepalive_limit && 128 conn->completed_requests > conn->probable_keepalive_limit) || 129 (conn->max_outstanding_requests && 130 conn->completed_requests - conn->completed_responses >= 131 conn->max_outstanding_requests)) { 132 /* we wouldn't try to write any way right now. */ 133 } 134 else { 135 while (request != NULL && request->req_bkt == NULL && 136 request->written) 137 request = request->next; 138 if (request != NULL) 139 desc.reqevents |= APR_POLLOUT; 140 } 141 } 142 } 143 144 /* If we can have async responses, always look for something to read. */ 145 if (conn->async_responses) { 146 desc.reqevents |= APR_POLLIN; 147 } 148 149 /* save our reqevents, so we can pass it in to remove later. */ 150 conn->reqevents = desc.reqevents; 151 152 /* Note: even if we don't want to read/write this socket, we still 153 * want to poll it for hangups and errors. 154 */ 155 return ctx->pollset_add(ctx->pollset_baton, 156 &desc, &conn->baton); 157} 158 159#ifdef SERF_DEBUG_BUCKET_USE 160 161/* Make sure all response buckets were drained. */ 162static void check_buckets_drained(serf_connection_t *conn) 163{ 164 serf_request_t *request = conn->requests; 165 166 for ( ; request ; request = request->next ) { 167 if (request->resp_bkt != NULL) { 168 /* ### crap. can't do this. this allocator may have un-drained 169 * ### REQUEST buckets. 170 */ 171 /* serf_debug__entered_loop(request->resp_bkt->allocator); */ 172 /* ### for now, pretend we closed the conn (resets the tracking) */ 173 serf_debug__closed_conn(request->resp_bkt->allocator); 174 } 175 } 176} 177 178#endif 179 180/* Create and connect sockets for any connections which don't have them 181 * yet. This is the core of our lazy-connect behavior. 182 */ 183apr_status_t serf__open_connections(serf_context_t *ctx) 184{ 185 int i; 186 187 for (i = ctx->conns->nelts; i--; ) { 188 serf_connection_t *conn = GET_CONN(ctx, i); 189 apr_status_t status; 190 apr_socket_t *skt; 191 192 conn->seen_in_pollset = 0; 193 194 if (conn->skt != NULL) { 195#ifdef SERF_DEBUG_BUCKET_USE 196 check_buckets_drained(conn); 197#endif 198 continue; 199 } 200 201 /* Delay opening until we have something to deliver! */ 202 if (conn->requests == NULL) { 203 continue; 204 } 205 206 apr_pool_clear(conn->skt_pool); 207 apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, clean_skt); 208 209 status = apr_socket_create(&skt, conn->address->family, 210 SOCK_STREAM, 211#if APR_MAJOR_VERSION > 0 212 APR_PROTO_TCP, 213#endif 214 conn->skt_pool); 215 serf__log(SOCK_VERBOSE, __FILE__, 216 "created socket for conn 0x%x, status %d\n", conn, status); 217 if (status != APR_SUCCESS) 218 return status; 219 220 /* Set the socket to be non-blocking */ 221 if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS) 222 return status; 223 224 /* Disable Nagle's algorithm */ 225 if ((status = apr_socket_opt_set(skt, 226 APR_TCP_NODELAY, 1)) != APR_SUCCESS) 227 return status; 228 229 /* Configured. Store it into the connection now. */ 230 conn->skt = skt; 231 232 /* Remember time when we started connecting to server to calculate 233 network latency. */ 234 conn->connect_time = apr_time_now(); 235 236 /* Now that the socket is set up, let's connect it. This should 237 * return immediately. 238 */ 239 status = apr_socket_connect(skt, conn->address); 240 serf__log_skt(SOCK_VERBOSE, __FILE__, skt, 241 "connected socket for conn 0x%x, status %d\n", 242 conn, status); 243 if (status != APR_SUCCESS) { 244 if (!APR_STATUS_IS_EINPROGRESS(status)) 245 return status; 246 } 247 248 /* Flag our pollset as dirty now that we have a new socket. */ 249 conn->dirty_conn = 1; 250 ctx->dirty_pollset = 1; 251 252 /* If the authentication was already started on another connection, 253 prepare this connection (it might be possible to skip some 254 part of the handshaking). */ 255 if (ctx->proxy_address) { 256 if (conn->ctx->proxy_authn_info.scheme) 257 conn->ctx->proxy_authn_info.scheme->init_conn_func(407, conn, 258 conn->pool); 259 } 260 261 if (conn->ctx->authn_info.scheme) 262 conn->ctx->authn_info.scheme->init_conn_func(401, conn, 263 conn->pool); 264 265 /* Does this connection require a SSL tunnel over the proxy? */ 266 if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == 0) 267 serf__ssltunnel_connect(conn); 268 else 269 conn->state = SERF_CONN_CONNECTED; 270 271 } 272 273 return APR_SUCCESS; 274} 275 276static apr_status_t no_more_writes(serf_connection_t *conn, 277 serf_request_t *request) 278{ 279 /* Note that we should hold new requests until we open our new socket. */ 280 conn->state = SERF_CONN_CLOSING; 281 serf__log(CONN_VERBOSE, __FILE__, "stop writing on conn 0x%x\n", 282 conn); 283 284 /* Clear our iovec. */ 285 conn->vec_len = 0; 286 287 /* Update the pollset to know we don't want to write on this socket any 288 * more. 289 */ 290 conn->dirty_conn = 1; 291 conn->ctx->dirty_pollset = 1; 292 return APR_SUCCESS; 293} 294 295/* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if 296 * the header contains value 'close' indicating the server is closing the 297 * connection right after this response. 298 * Otherwise returns APR_SUCCESS. 299 */ 300static apr_status_t is_conn_closing(serf_bucket_t *response) 301{ 302 serf_bucket_t *hdrs; 303 const char *val; 304 305 hdrs = serf_bucket_response_get_headers(response); 306 val = serf_bucket_headers_get(hdrs, "Connection"); 307 if (val && strcasecmp("close", val) == 0) 308 { 309 return SERF_ERROR_CLOSING; 310 } 311 312 return APR_SUCCESS; 313} 314 315static void link_requests(serf_request_t **list, serf_request_t **tail, 316 serf_request_t *request) 317{ 318 if (*list == NULL) { 319 *list = request; 320 *tail = request; 321 } 322 else { 323 (*tail)->next = request; 324 *tail = request; 325 } 326} 327 328static apr_status_t destroy_request(serf_request_t *request) 329{ 330 serf_connection_t *conn = request->conn; 331 332 /* The request and response buckets are no longer needed, 333 nor is the request's pool. */ 334 if (request->resp_bkt) { 335 serf_debug__closed_conn(request->resp_bkt->allocator); 336 serf_bucket_destroy(request->resp_bkt); 337 request->resp_bkt = NULL; 338 } 339 if (request->req_bkt) { 340 serf_debug__closed_conn(request->req_bkt->allocator); 341 serf_bucket_destroy(request->req_bkt); 342 request->req_bkt = NULL; 343 } 344 345 serf_debug__bucket_alloc_check(request->allocator); 346 if (request->respool) { 347 /* ### unregister the pool cleanup for self? */ 348 apr_pool_destroy(request->respool); 349 } 350 351 serf_bucket_mem_free(conn->allocator, request); 352 353 return APR_SUCCESS; 354} 355 356static apr_status_t cancel_request(serf_request_t *request, 357 serf_request_t **list, 358 int notify_request) 359{ 360 /* If we haven't run setup, then we won't have a handler to call. */ 361 if (request->handler && notify_request) { 362 /* We actually don't care what the handler returns. 363 * We have bigger matters at hand. 364 */ 365 (*request->handler)(request, NULL, request->handler_baton, 366 request->respool); 367 } 368 369 if (*list == request) { 370 *list = request->next; 371 } 372 else { 373 serf_request_t *scan = *list; 374 375 while (scan->next && scan->next != request) 376 scan = scan->next; 377 378 if (scan->next) { 379 scan->next = scan->next->next; 380 } 381 } 382 383 return destroy_request(request); 384} 385 386static apr_status_t remove_connection(serf_context_t *ctx, 387 serf_connection_t *conn) 388{ 389 apr_pollfd_t desc = { 0 }; 390 391 desc.desc_type = APR_POLL_SOCKET; 392 desc.desc.s = conn->skt; 393 desc.reqevents = conn->reqevents; 394 395 return ctx->pollset_rm(ctx->pollset_baton, 396 &desc, conn); 397} 398 399static void destroy_ostream(serf_connection_t *conn) 400{ 401 if (conn->ostream_head != NULL) { 402 serf_bucket_destroy(conn->ostream_head); 403 conn->ostream_head = NULL; 404 conn->ostream_tail = NULL; 405 } 406} 407 408/* A socket was closed, inform the application. */ 409static void handle_conn_closed(serf_connection_t *conn, apr_status_t status) 410{ 411 (*conn->closed)(conn, conn->closed_baton, status, 412 conn->pool); 413} 414 415static apr_status_t reset_connection(serf_connection_t *conn, 416 int requeue_requests) 417{ 418 serf_context_t *ctx = conn->ctx; 419 apr_status_t status; 420 serf_request_t *old_reqs; 421 422 conn->probable_keepalive_limit = conn->completed_responses; 423 conn->completed_requests = 0; 424 conn->completed_responses = 0; 425 426 old_reqs = conn->requests; 427 428 conn->requests = NULL; 429 conn->requests_tail = NULL; 430 431 /* Handle all outstanding requests. These have either not been written yet, 432 or have been written but the expected reply wasn't received yet. */ 433 while (old_reqs) { 434 /* If we haven't started to write the connection, bring it over 435 * unchanged to our new socket. 436 */ 437 if (requeue_requests && !old_reqs->written) { 438 serf_request_t *req = old_reqs; 439 old_reqs = old_reqs->next; 440 req->next = NULL; 441 link_requests(&conn->requests, &conn->requests_tail, req); 442 } 443 else { 444 /* Request has been consumed, or we don't want to requeue the 445 request. Either way, inform the application that the request 446 is cancelled. */ 447 cancel_request(old_reqs, &old_reqs, requeue_requests); 448 } 449 } 450 451 /* Requests queue has been prepared for a new socket, close the old one. */ 452 if (conn->skt != NULL) { 453 remove_connection(ctx, conn); 454 status = apr_socket_close(conn->skt); 455 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt, 456 "closed socket, status %d\n", status); 457 if (conn->closed != NULL) { 458 handle_conn_closed(conn, status); 459 } 460 conn->skt = NULL; 461 } 462 463 if (conn->stream != NULL) { 464 serf_bucket_destroy(conn->stream); 465 conn->stream = NULL; 466 } 467 468 destroy_ostream(conn); 469 470 /* Don't try to resume any writes */ 471 conn->vec_len = 0; 472 473 conn->dirty_conn = 1; 474 conn->ctx->dirty_pollset = 1; 475 conn->state = SERF_CONN_INIT; 476 477 serf__log(CONN_VERBOSE, __FILE__, "reset connection 0x%x\n", conn); 478 479 conn->status = APR_SUCCESS; 480 481 /* Let our context know that we've 'reset' the socket already. */ 482 conn->seen_in_pollset |= APR_POLLHUP; 483 484 /* Found the connection. Closed it. All done. */ 485 return APR_SUCCESS; 486} 487 488static apr_status_t socket_writev(serf_connection_t *conn) 489{ 490 apr_size_t written; 491 apr_status_t status; 492 493 status = apr_socket_sendv(conn->skt, conn->vec, 494 conn->vec_len, &written); 495 if (status && !APR_STATUS_IS_EAGAIN(status)) 496 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt, 497 "socket_sendv error %d\n", status); 498 499 /* did we write everything? */ 500 if (written) { 501 apr_size_t len = 0; 502 int i; 503 504 serf__log_skt(SOCK_MSG_VERBOSE, __FILE__, conn->skt, 505 "--- socket_sendv:\n"); 506 507 for (i = 0; i < conn->vec_len; i++) { 508 len += conn->vec[i].iov_len; 509 if (written < len) { 510 serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s", 511 conn->vec[i].iov_len - (len - written), 512 conn->vec[i].iov_base); 513 if (i) { 514 memmove(conn->vec, &conn->vec[i], 515 sizeof(struct iovec) * (conn->vec_len - i)); 516 conn->vec_len -= i; 517 } 518 conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written)); 519 conn->vec[0].iov_len = len - written; 520 break; 521 } else { 522 serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s", 523 conn->vec[i].iov_len, conn->vec[i].iov_base); 524 } 525 } 526 if (len == written) { 527 conn->vec_len = 0; 528 } 529 serf__log_nopref(SOCK_MSG_VERBOSE, "-(%d)-\n", written); 530 531 /* Log progress information */ 532 serf__context_progress_delta(conn->ctx, 0, written); 533 } 534 535 return status; 536} 537 538static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket) 539{ 540 serf_connection_t *conn = baton; 541 conn->hit_eof = 1; 542 return APR_EAGAIN; 543} 544 545static apr_status_t do_conn_setup(serf_connection_t *conn) 546{ 547 apr_status_t status; 548 serf_bucket_t *ostream; 549 550 if (conn->ostream_head == NULL) { 551 conn->ostream_head = serf_bucket_aggregate_create(conn->allocator); 552 } 553 554 if (conn->ostream_tail == NULL) { 555 conn->ostream_tail = serf__bucket_stream_create(conn->allocator, 556 detect_eof, 557 conn); 558 } 559 560 ostream = conn->ostream_tail; 561 562 status = (*conn->setup)(conn->skt, 563 &conn->stream, 564 &ostream, 565 conn->setup_baton, 566 conn->pool); 567 if (status) { 568 /* extra destroy here since it wasn't added to the head bucket yet. */ 569 serf_bucket_destroy(conn->ostream_tail); 570 destroy_ostream(conn); 571 return status; 572 } 573 574 serf_bucket_aggregate_append(conn->ostream_head, 575 ostream); 576 577 return status; 578} 579 580/* Set up the input and output stream buckets. 581 When a tunnel over an http proxy is needed, create a socket bucket and 582 empty aggregate bucket for sending and receiving unencrypted requests 583 over the socket. 584 585 After the tunnel is there, or no tunnel was needed, ask the application 586 to create the input and output buckets, which should take care of the 587 [en/de]cryption. 588*/ 589 590static apr_status_t prepare_conn_streams(serf_connection_t *conn, 591 serf_bucket_t **istream, 592 serf_bucket_t **ostreamt, 593 serf_bucket_t **ostreamh) 594{ 595 apr_status_t status; 596 597 if (conn->stream == NULL) { 598 conn->latency = apr_time_now() - conn->connect_time; 599 } 600 601 /* Do we need a SSL tunnel first? */ 602 if (conn->state == SERF_CONN_CONNECTED) { 603 /* If the connection does not have an associated bucket, then 604 * call the setup callback to get one. 605 */ 606 if (conn->stream == NULL) { 607 status = do_conn_setup(conn); 608 if (status) { 609 return status; 610 } 611 } 612 *ostreamt = conn->ostream_tail; 613 *ostreamh = conn->ostream_head; 614 *istream = conn->stream; 615 } else { 616 /* SSL tunnel needed and not set up yet, get a direct unencrypted 617 stream for this socket */ 618 if (conn->stream == NULL) { 619 *istream = serf_bucket_socket_create(conn->skt, 620 conn->allocator); 621 } 622 /* Don't create the ostream bucket chain including the ssl_encrypt 623 bucket yet. This ensure the CONNECT request is sent unencrypted 624 to the proxy. */ 625 *ostreamt = *ostreamh = conn->ssltunnel_ostream; 626 } 627 628 return APR_SUCCESS; 629} 630 631/* write data out to the connection */ 632static apr_status_t write_to_connection(serf_connection_t *conn) 633{ 634 serf_request_t *request = conn->requests; 635 636 if (conn->probable_keepalive_limit && 637 conn->completed_requests > conn->probable_keepalive_limit) { 638 639 conn->dirty_conn = 1; 640 conn->ctx->dirty_pollset = 1; 641 642 /* backoff for now. */ 643 return APR_SUCCESS; 644 } 645 646 /* Find a request that has data which needs to be delivered. */ 647 while (request != NULL && 648 request->req_bkt == NULL && request->written) 649 request = request->next; 650 651 /* assert: request != NULL || conn->vec_len */ 652 653 /* Keep reading and sending until we run out of stuff to read, or 654 * writing would block. 655 */ 656 while (1) { 657 int stop_reading = 0; 658 apr_status_t status; 659 apr_status_t read_status; 660 serf_bucket_t *ostreamt, *ostreamh; 661 int max_outstanding_requests = conn->max_outstanding_requests; 662 663 /* If we're setting up an ssl tunnel, we can't send real requests 664 at yet, as they need to be encrypted and our encrypt buckets 665 aren't created yet as we still need to read the unencrypted 666 response of the CONNECT request. */ 667 if (conn->state != SERF_CONN_CONNECTED) 668 max_outstanding_requests = 1; 669 670 if (max_outstanding_requests && 671 conn->completed_requests - 672 conn->completed_responses >= max_outstanding_requests) { 673 /* backoff for now. */ 674 return APR_SUCCESS; 675 } 676 677 /* If we have unwritten data, then write what we can. */ 678 while (conn->vec_len) { 679 status = socket_writev(conn); 680 681 /* If the write would have blocked, then we're done. Don't try 682 * to write anything else to the socket. 683 */ 684 if (APR_STATUS_IS_EAGAIN(status)) 685 return APR_SUCCESS; 686 if (APR_STATUS_IS_EPIPE(status) || 687 APR_STATUS_IS_ECONNRESET(status) || 688 APR_STATUS_IS_ECONNABORTED(status)) 689 return no_more_writes(conn, request); 690 if (status) 691 return status; 692 } 693 /* ### can we have a short write, yet no EAGAIN? a short write 694 ### would imply unwritten_len > 0 ... */ 695 /* assert: unwritten_len == 0. */ 696 697 /* We may need to move forward to a request which has something 698 * to write. 699 */ 700 while (request != NULL && 701 request->req_bkt == NULL && request->written) 702 request = request->next; 703 704 if (request == NULL) { 705 /* No more requests (with data) are registered with the 706 * connection. Let's update the pollset so that we don't 707 * try to write to this socket again. 708 */ 709 conn->dirty_conn = 1; 710 conn->ctx->dirty_pollset = 1; 711 return APR_SUCCESS; 712 } 713 714 status = prepare_conn_streams(conn, &conn->stream, &ostreamt, &ostreamh); 715 if (status) { 716 return status; 717 } 718 719 if (request->req_bkt == NULL) { 720 /* Now that we are about to serve the request, allocate a pool. */ 721 apr_pool_create(&request->respool, conn->pool); 722 request->allocator = serf_bucket_allocator_create(request->respool, 723 NULL, NULL); 724 apr_pool_cleanup_register(request->respool, request, 725 clean_resp, clean_resp); 726 727 /* Fill in the rest of the values for the request. */ 728 read_status = request->setup(request, request->setup_baton, 729 &request->req_bkt, 730 &request->acceptor, 731 &request->acceptor_baton, 732 &request->handler, 733 &request->handler_baton, 734 request->respool); 735 736 if (read_status) { 737 /* Something bad happened. Propagate any errors. */ 738 return read_status; 739 } 740 741 request->written = 1; 742 serf_bucket_aggregate_append(ostreamt, request->req_bkt); 743 } 744 745 /* ### optimize at some point by using read_for_sendfile */ 746 read_status = serf_bucket_read_iovec(ostreamh, 747 SERF_READ_ALL_AVAIL, 748 IOV_MAX, 749 conn->vec, 750 &conn->vec_len); 751 752 if (!conn->hit_eof) { 753 if (APR_STATUS_IS_EAGAIN(read_status) || 754 read_status == SERF_ERROR_WAIT_CONN) { 755 /* We read some stuff, but should not try to read again. */ 756 stop_reading = 1; 757 758 /* ### we should avoid looking for writability for a while so 759 ### that (hopefully) something will appear in the bucket so 760 ### we can actually write something. otherwise, we could 761 ### end up in a CPU spin: socket wants something, but we 762 ### don't have anything (and keep returning EAGAIN) 763 */ 764 } 765 else if (read_status && !APR_STATUS_IS_EOF(read_status)) { 766 /* Something bad happened. Propagate any errors. */ 767 return read_status; 768 } 769 } 770 771 /* If we got some data, then deliver it. */ 772 /* ### what to do if we got no data?? is that a problem? */ 773 if (conn->vec_len > 0) { 774 status = socket_writev(conn); 775 776 /* If we can't write any more, or an error occurred, then 777 * we're done here. 778 */ 779 if (APR_STATUS_IS_EAGAIN(status)) 780 return APR_SUCCESS; 781 if (APR_STATUS_IS_EPIPE(status)) 782 return no_more_writes(conn, request); 783 if (APR_STATUS_IS_ECONNRESET(status) || 784 APR_STATUS_IS_ECONNABORTED(status)) { 785 return no_more_writes(conn, request); 786 } 787 if (status) 788 return status; 789 } 790 791 if (read_status == SERF_ERROR_WAIT_CONN) { 792 stop_reading = 1; 793 } 794 else if (read_status && conn->hit_eof && conn->vec_len == 0) { 795 /* If we hit the end of the request bucket and all of its data has 796 * been written, then clear it out to signify that we're done 797 * sending the request. On the next iteration through this loop: 798 * - if there are remaining bytes they will be written, and as the 799 * request bucket will be completely read it will be destroyed then. 800 * - we'll see if there are other requests that need to be sent 801 * ("pipelining"). 802 */ 803 conn->hit_eof = 0; 804 serf_bucket_destroy(request->req_bkt); 805 request->req_bkt = NULL; 806 807 /* If our connection has async responses enabled, we're not 808 * going to get a reply back, so kill the request. 809 */ 810 if (conn->async_responses) { 811 conn->requests = request->next; 812 destroy_request(request); 813 } 814 815 conn->completed_requests++; 816 817 if (conn->probable_keepalive_limit && 818 conn->completed_requests > conn->probable_keepalive_limit) { 819 /* backoff for now. */ 820 stop_reading = 1; 821 } 822 } 823 824 if (stop_reading) { 825 return APR_SUCCESS; 826 } 827 } 828 /* NOTREACHED */ 829} 830 831/* A response message was received from the server, so call 832 the handler as specified on the original request. */ 833static apr_status_t handle_response(serf_request_t *request, 834 apr_pool_t *pool) 835{ 836 apr_status_t status = APR_SUCCESS; 837 int consumed_response = 0; 838 839 /* Only enable the new authentication framework if the program has 840 * registered an authentication credential callback. 841 * 842 * This permits older Serf apps to still handle authentication 843 * themselves by not registering credential callbacks. 844 */ 845 if (request->conn->ctx->cred_cb) { 846 status = serf__handle_auth_response(&consumed_response, 847 request, 848 request->resp_bkt, 849 request->handler_baton, 850 pool); 851 852 /* If there was an error reading the response (maybe there wasn't 853 enough data available), don't bother passing the response to the 854 application. 855 856 If the authentication was tried, but failed, pass the response 857 to the application, maybe it can do better. */ 858 if (APR_STATUS_IS_EOF(status) || 859 APR_STATUS_IS_EAGAIN(status)) { 860 return status; 861 } 862 } 863 864 if (!consumed_response) { 865 return (*request->handler)(request, 866 request->resp_bkt, 867 request->handler_baton, 868 pool); 869 } 870 871 return status; 872} 873 874/* An async response message was received from the server. */ 875static apr_status_t handle_async_response(serf_connection_t *conn, 876 apr_pool_t *pool) 877{ 878 apr_status_t status; 879 880 if (conn->current_async_response == NULL) { 881 conn->current_async_response = 882 (*conn->async_acceptor)(NULL, conn->stream, 883 conn->async_acceptor_baton, pool); 884 } 885 886 status = (*conn->async_handler)(NULL, conn->current_async_response, 887 conn->async_handler_baton, pool); 888 889 if (APR_STATUS_IS_EOF(status)) { 890 serf_bucket_destroy(conn->current_async_response); 891 conn->current_async_response = NULL; 892 status = APR_SUCCESS; 893 } 894 895 return status; 896} 897 898/* read data from the connection */ 899static apr_status_t read_from_connection(serf_connection_t *conn) 900{ 901 apr_status_t status; 902 apr_pool_t *tmppool; 903 int close_connection = FALSE; 904 905 /* Whatever is coming in on the socket corresponds to the first request 906 * on our chain. 907 */ 908 serf_request_t *request = conn->requests; 909 910 /* assert: request != NULL */ 911 912 if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS) 913 goto error; 914 915 /* Invoke response handlers until we have no more work. */ 916 while (1) { 917 serf_bucket_t *dummy1, *dummy2; 918 919 apr_pool_clear(tmppool); 920 921 /* Only interested in the input stream here. */ 922 status = prepare_conn_streams(conn, &conn->stream, &dummy1, &dummy2); 923 if (status) { 924 goto error; 925 } 926 927 /* We have a different codepath when we can have async responses. */ 928 if (conn->async_responses) { 929 /* TODO What about socket errors? */ 930 status = handle_async_response(conn, tmppool); 931 if (APR_STATUS_IS_EAGAIN(status)) { 932 status = APR_SUCCESS; 933 goto error; 934 } 935 if (status) { 936 goto error; 937 } 938 continue; 939 } 940 941 /* We are reading a response for a request we haven't 942 * written yet! 943 * 944 * This shouldn't normally happen EXCEPT: 945 * 946 * 1) when the other end has closed the socket and we're 947 * pending an EOF return. 948 * 2) Doing the initial SSL handshake - we'll get EAGAIN 949 * as the SSL buckets will hide the handshake from us 950 * but not return any data. 951 * 3) When the server sends us an SSL alert. 952 * 953 * In these cases, we should not receive any actual user data. 954 * 955 * 4) When the server sends a error response, like 408 Request timeout. 956 * This response should be passed to the application. 957 * 958 * If we see an EOF (due to either an expired timeout or the server 959 * sending the SSL 'close notify' shutdown alert), we'll reset the 960 * connection and open a new one. 961 */ 962 if (request->req_bkt || !request->written) { 963 const char *data; 964 apr_size_t len; 965 966 status = serf_bucket_peek(conn->stream, &data, &len); 967 968 if (APR_STATUS_IS_EOF(status)) { 969 reset_connection(conn, 1); 970 status = APR_SUCCESS; 971 goto error; 972 } 973 else if (APR_STATUS_IS_EAGAIN(status) && !len) { 974 status = APR_SUCCESS; 975 goto error; 976 } else if (status && !APR_STATUS_IS_EAGAIN(status)) { 977 /* Read error */ 978 goto error; 979 } 980 981 /* Unexpected response from the server */ 982 983 } 984 985 /* If the request doesn't have a response bucket, then call the 986 * acceptor to get one created. 987 */ 988 if (request->resp_bkt == NULL) { 989 request->resp_bkt = (*request->acceptor)(request, conn->stream, 990 request->acceptor_baton, 991 tmppool); 992 apr_pool_clear(tmppool); 993 } 994 995 status = handle_response(request, tmppool); 996 997 /* Some systems will not generate a HUP poll event so we have to 998 * handle the ECONNRESET issue and ECONNABORT here. 999 */ 1000 if (APR_STATUS_IS_ECONNRESET(status) || 1001 APR_STATUS_IS_ECONNABORTED(status) || 1002 status == SERF_ERROR_REQUEST_LOST) { 1003 /* If the connection had ever been good, be optimistic & try again. 1004 * If it has never tried again (incl. a retry), fail. 1005 */ 1006 if (conn->completed_responses) { 1007 reset_connection(conn, 1); 1008 status = APR_SUCCESS; 1009 } 1010 else if (status == SERF_ERROR_REQUEST_LOST) { 1011 status = SERF_ERROR_ABORTED_CONNECTION; 1012 } 1013 goto error; 1014 } 1015 1016 /* If our response handler says it can't do anything more, we now 1017 * treat that as a success. 1018 */ 1019 if (APR_STATUS_IS_EAGAIN(status)) { 1020 status = APR_SUCCESS; 1021 goto error; 1022 } 1023 1024 /* If we received APR_SUCCESS, run this loop again. */ 1025 if (!status) { 1026 continue; 1027 } 1028 1029 close_connection = is_conn_closing(request->resp_bkt); 1030 1031 if (!APR_STATUS_IS_EOF(status) && 1032 close_connection != SERF_ERROR_CLOSING) { 1033 /* Whether success, or an error, there is no more to do unless 1034 * this request has been completed. 1035 */ 1036 goto error; 1037 } 1038 1039 /* The response has been fully-read, so that means the request has 1040 * either been fully-delivered (most likely), or that we don't need to 1041 * write the rest of it anymore, e.g. when a 408 Request timeout was 1042 $ received. 1043 * Remove it from our queue and loop to read another response. 1044 */ 1045 conn->requests = request->next; 1046 1047 destroy_request(request); 1048 1049 request = conn->requests; 1050 1051 /* If we're truly empty, update our tail. */ 1052 if (request == NULL) { 1053 conn->requests_tail = NULL; 1054 } 1055 1056 conn->completed_responses++; 1057 1058 /* We've to rebuild pollset since completed_responses is changed. */ 1059 conn->dirty_conn = 1; 1060 conn->ctx->dirty_pollset = 1; 1061 1062 /* This means that we're being advised that the connection is done. */ 1063 if (close_connection == SERF_ERROR_CLOSING) { 1064 reset_connection(conn, 1); 1065 if (APR_STATUS_IS_EOF(status)) 1066 status = APR_SUCCESS; 1067 goto error; 1068 } 1069 1070 /* The server is suddenly deciding to serve more responses than we've 1071 * seen before. 1072 * 1073 * Let our requests go. 1074 */ 1075 if (conn->probable_keepalive_limit && 1076 conn->completed_responses > conn->probable_keepalive_limit) { 1077 conn->probable_keepalive_limit = 0; 1078 } 1079 1080 /* If we just ran out of requests or have unwritten requests, then 1081 * update the pollset. We don't want to read from this socket any 1082 * more. We are definitely done with this loop, too. 1083 */ 1084 if (request == NULL || !request->written) { 1085 conn->dirty_conn = 1; 1086 conn->ctx->dirty_pollset = 1; 1087 status = APR_SUCCESS; 1088 goto error; 1089 } 1090 } 1091 1092error: 1093 apr_pool_destroy(tmppool); 1094 return status; 1095} 1096 1097/* process all events on the connection */ 1098apr_status_t serf__process_connection(serf_connection_t *conn, 1099 apr_int16_t events) 1100{ 1101 apr_status_t status; 1102 1103 /* POLLHUP/ERR should come after POLLIN so if there's an error message or 1104 * the like sitting on the connection, we give the app a chance to read 1105 * it before we trigger a reset condition. 1106 */ 1107 if ((events & APR_POLLIN) != 0) { 1108 if ((status = read_from_connection(conn)) != APR_SUCCESS) 1109 return status; 1110 1111 /* If we decided to reset our connection, return now as we don't 1112 * want to write. 1113 */ 1114 if ((conn->seen_in_pollset & APR_POLLHUP) != 0) { 1115 return APR_SUCCESS; 1116 } 1117 } 1118 if ((events & APR_POLLHUP) != 0) { 1119 /* The connection got reset by the server. On Windows this can happen 1120 when all data is read, so just cleanup the connection and open 1121 a new one. 1122 If we haven't had any successful responses on this connection, 1123 then error out as it is likely a server issue. */ 1124 if (conn->completed_responses) { 1125 return reset_connection(conn, 1); 1126 } 1127 return SERF_ERROR_ABORTED_CONNECTION; 1128 } 1129 if ((events & APR_POLLERR) != 0) { 1130 /* We might be talking to a buggy HTTP server that doesn't 1131 * do lingering-close. (httpd < 2.1.8 does this.) 1132 * 1133 * See: 1134 * 1135 * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292 1136 */ 1137 if (conn->completed_requests && !conn->probable_keepalive_limit) { 1138 return reset_connection(conn, 1); 1139 } 1140 return APR_EGENERAL; 1141 } 1142 if ((events & APR_POLLOUT) != 0) { 1143 if ((status = write_to_connection(conn)) != APR_SUCCESS) 1144 return status; 1145 } 1146 return APR_SUCCESS; 1147} 1148 1149serf_connection_t *serf_connection_create( 1150 serf_context_t *ctx, 1151 apr_sockaddr_t *address, 1152 serf_connection_setup_t setup, 1153 void *setup_baton, 1154 serf_connection_closed_t closed, 1155 void *closed_baton, 1156 apr_pool_t *pool) 1157{ 1158 serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn)); 1159 1160 conn->ctx = ctx; 1161 conn->status = APR_SUCCESS; 1162 /* Ignore server address if proxy was specified. */ 1163 conn->address = ctx->proxy_address ? ctx->proxy_address : address; 1164 conn->setup = setup; 1165 conn->setup_baton = setup_baton; 1166 conn->closed = closed; 1167 conn->closed_baton = closed_baton; 1168 conn->pool = pool; 1169 conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL); 1170 conn->stream = NULL; 1171 conn->ostream_head = NULL; 1172 conn->ostream_tail = NULL; 1173 conn->baton.type = SERF_IO_CONN; 1174 conn->baton.u.conn = conn; 1175 conn->hit_eof = 0; 1176 conn->state = SERF_CONN_INIT; 1177 conn->latency = -1; /* unknown */ 1178 1179 /* Create a subpool for our connection. */ 1180 apr_pool_create(&conn->skt_pool, conn->pool); 1181 1182 /* register a cleanup */ 1183 apr_pool_cleanup_register(conn->pool, conn, clean_conn, apr_pool_cleanup_null); 1184 1185 /* Add the connection to the context. */ 1186 *(serf_connection_t **)apr_array_push(ctx->conns) = conn; 1187 1188 serf__log(CONN_VERBOSE, __FILE__, "created connection 0x%x\n", 1189 conn); 1190 1191 return conn; 1192} 1193 1194apr_status_t serf_connection_create2( 1195 serf_connection_t **conn, 1196 serf_context_t *ctx, 1197 apr_uri_t host_info, 1198 serf_connection_setup_t setup, 1199 void *setup_baton, 1200 serf_connection_closed_t closed, 1201 void *closed_baton, 1202 apr_pool_t *pool) 1203{ 1204 apr_status_t status = APR_SUCCESS; 1205 serf_connection_t *c; 1206 apr_sockaddr_t *host_address = NULL; 1207 1208 /* Set the port number explicitly, needed to create the socket later. */ 1209 if (!host_info.port) { 1210 host_info.port = apr_uri_port_of_scheme(host_info.scheme); 1211 } 1212 1213 /* Only lookup the address of the server if no proxy server was 1214 configured. */ 1215 if (!ctx->proxy_address) { 1216 status = apr_sockaddr_info_get(&host_address, 1217 host_info.hostname, 1218 APR_UNSPEC, host_info.port, 0, pool); 1219 if (status) 1220 return status; 1221 } 1222 1223 c = serf_connection_create(ctx, host_address, setup, setup_baton, 1224 closed, closed_baton, pool); 1225 1226 /* We're not interested in the path following the hostname. */ 1227 c->host_url = apr_uri_unparse(c->pool, 1228 &host_info, 1229 APR_URI_UNP_OMITPATHINFO); 1230 c->host_info = host_info; 1231 1232 *conn = c; 1233 1234 return status; 1235} 1236 1237apr_status_t serf_connection_reset( 1238 serf_connection_t *conn) 1239{ 1240 return reset_connection(conn, 0); 1241} 1242 1243 1244apr_status_t serf_connection_close( 1245 serf_connection_t *conn) 1246{ 1247 int i; 1248 serf_context_t *ctx = conn->ctx; 1249 apr_status_t status; 1250 1251 for (i = ctx->conns->nelts; i--; ) { 1252 serf_connection_t *conn_seq = GET_CONN(ctx, i); 1253 1254 if (conn_seq == conn) { 1255 while (conn->requests) { 1256 serf_request_cancel(conn->requests); 1257 } 1258 if (conn->skt != NULL) { 1259 remove_connection(ctx, conn); 1260 status = apr_socket_close(conn->skt); 1261 serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt, 1262 "closed socket, status %d\n", 1263 status); 1264 if (conn->closed != NULL) { 1265 handle_conn_closed(conn, status); 1266 } 1267 conn->skt = NULL; 1268 } 1269 if (conn->stream != NULL) { 1270 serf_bucket_destroy(conn->stream); 1271 conn->stream = NULL; 1272 } 1273 1274 destroy_ostream(conn); 1275 1276 /* Remove the connection from the context. We don't want to 1277 * deal with it any more. 1278 */ 1279 if (i < ctx->conns->nelts - 1) { 1280 /* move later connections over this one. */ 1281 memmove( 1282 &GET_CONN(ctx, i), 1283 &GET_CONN(ctx, i + 1), 1284 (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t *)); 1285 } 1286 --ctx->conns->nelts; 1287 1288 serf__log(CONN_VERBOSE, __FILE__, "closed connection 0x%x\n", 1289 conn); 1290 1291 /* Found the connection. Closed it. All done. */ 1292 return APR_SUCCESS; 1293 } 1294 } 1295 1296 /* We didn't find the specified connection. */ 1297 /* ### doc talks about this w.r.t poll structures. use something else? */ 1298 return APR_NOTFOUND; 1299} 1300 1301 1302void serf_connection_set_max_outstanding_requests( 1303 serf_connection_t *conn, 1304 unsigned int max_requests) 1305{ 1306 if (max_requests == 0) 1307 serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt, 1308 "Set max. nr. of outstanding requests for this " 1309 "connection to unlimited.\n"); 1310 else 1311 serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt, 1312 "Limit max. nr. of outstanding requests for this " 1313 "connection to %u.\n", max_requests); 1314 1315 conn->max_outstanding_requests = max_requests; 1316} 1317 1318 1319void serf_connection_set_async_responses( 1320 serf_connection_t *conn, 1321 serf_response_acceptor_t acceptor, 1322 void *acceptor_baton, 1323 serf_response_handler_t handler, 1324 void *handler_baton) 1325{ 1326 conn->async_responses = 1; 1327 conn->async_acceptor = acceptor; 1328 conn->async_acceptor_baton = acceptor_baton; 1329 conn->async_handler = handler; 1330 conn->async_handler_baton = handler_baton; 1331} 1332 1333 1334serf_request_t *serf_connection_request_create( 1335 serf_connection_t *conn, 1336 serf_request_setup_t setup, 1337 void *setup_baton) 1338{ 1339 serf_request_t *request; 1340 1341 request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request)); 1342 request->conn = conn; 1343 request->setup = setup; 1344 request->setup_baton = setup_baton; 1345 request->handler = NULL; 1346 request->respool = NULL; 1347 request->req_bkt = NULL; 1348 request->resp_bkt = NULL; 1349 request->priority = 0; 1350 request->written = 0; 1351 request->next = NULL; 1352 1353 /* Link the request to the end of the request chain. */ 1354 link_requests(&conn->requests, &conn->requests_tail, request); 1355 1356 /* Ensure our pollset becomes writable in context run */ 1357 conn->ctx->dirty_pollset = 1; 1358 conn->dirty_conn = 1; 1359 1360 return request; 1361} 1362 1363 1364serf_request_t *serf_connection_priority_request_create( 1365 serf_connection_t *conn, 1366 serf_request_setup_t setup, 1367 void *setup_baton) 1368{ 1369 serf_request_t *request; 1370 serf_request_t *iter, *prev; 1371 1372 request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request)); 1373 request->conn = conn; 1374 request->setup = setup; 1375 request->setup_baton = setup_baton; 1376 request->handler = NULL; 1377 request->respool = NULL; 1378 request->req_bkt = NULL; 1379 request->resp_bkt = NULL; 1380 request->priority = 1; 1381 request->written = 0; 1382 request->next = NULL; 1383 1384 /* Link the new request after the last written request. */ 1385 iter = conn->requests; 1386 prev = NULL; 1387 1388 /* Find a request that has data which needs to be delivered. */ 1389 while (iter != NULL && iter->req_bkt == NULL && iter->written) { 1390 prev = iter; 1391 iter = iter->next; 1392 } 1393 1394 /* Advance to next non priority request */ 1395 while (iter != NULL && iter->priority) { 1396 prev = iter; 1397 iter = iter->next; 1398 } 1399 1400 if (prev) { 1401 request->next = iter; 1402 prev->next = request; 1403 } else { 1404 request->next = iter; 1405 conn->requests = request; 1406 } 1407 1408 /* Ensure our pollset becomes writable in context run */ 1409 conn->ctx->dirty_pollset = 1; 1410 conn->dirty_conn = 1; 1411 1412 return request; 1413} 1414 1415 1416apr_status_t serf_request_cancel(serf_request_t *request) 1417{ 1418 return cancel_request(request, &request->conn->requests, 0); 1419} 1420 1421apr_status_t serf_request_is_written(serf_request_t *request) 1422{ 1423 if (request->written && !request->req_bkt) 1424 return APR_SUCCESS; 1425 1426 return APR_EBUSY; 1427} 1428 1429apr_pool_t *serf_request_get_pool(const serf_request_t *request) 1430{ 1431 return request->respool; 1432} 1433 1434 1435serf_bucket_alloc_t *serf_request_get_alloc( 1436 const serf_request_t *request) 1437{ 1438 return request->allocator; 1439} 1440 1441 1442serf_connection_t *serf_request_get_conn( 1443 const serf_request_t *request) 1444{ 1445 return request->conn; 1446} 1447 1448 1449void serf_request_set_handler( 1450 serf_request_t *request, 1451 const serf_response_handler_t handler, 1452 const void **handler_baton) 1453{ 1454 request->handler = handler; 1455 request->handler_baton = handler_baton; 1456} 1457 1458 1459serf_bucket_t *serf_request_bucket_request_create( 1460 serf_request_t *request, 1461 const char *method, 1462 const char *uri, 1463 serf_bucket_t *body, 1464 serf_bucket_alloc_t *allocator) 1465{ 1466 serf_bucket_t *req_bkt, *hdrs_bkt; 1467 serf_connection_t *conn = request->conn; 1468 serf_context_t *ctx = conn->ctx; 1469 1470 req_bkt = serf_bucket_request_create(method, uri, body, allocator); 1471 hdrs_bkt = serf_bucket_request_get_headers(req_bkt); 1472 1473 /* Proxy? */ 1474 if (ctx->proxy_address && conn->host_url) 1475 serf_bucket_request_set_root(req_bkt, conn->host_url); 1476 1477 if (conn->host_info.hostinfo) 1478 serf_bucket_headers_setn(hdrs_bkt, "Host", 1479 conn->host_info.hostinfo); 1480 1481 /* Setup server authorization headers */ 1482 if (ctx->authn_info.scheme) 1483 ctx->authn_info.scheme->setup_request_func(HOST, 0, conn, request, 1484 method, uri, 1485 hdrs_bkt); 1486 1487 /* Setup proxy authorization headers */ 1488 if (ctx->proxy_authn_info.scheme) 1489 ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn, 1490 request, 1491 method, uri, hdrs_bkt); 1492 1493 return req_bkt; 1494} 1495 1496apr_interval_time_t serf_connection_get_latency(serf_connection_t *conn) 1497{ 1498 if (conn->ctx->proxy_address) { 1499 /* Detecting network latency for proxied connection is not implemented 1500 yet. */ 1501 return -1; 1502 } 1503 1504 return conn->latency; 1505} 1506