outgoing.c revision 251886
1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <apr_pools.h>
17#include <apr_poll.h>
18#include <apr_version.h>
19
20#include "serf.h"
21#include "serf_bucket_util.h"
22
23#include "serf_private.h"
24
25/* cleanup for sockets */
26static apr_status_t clean_skt(void *data)
27{
28    serf_connection_t *conn = data;
29    apr_status_t status = APR_SUCCESS;
30
31    if (conn->skt) {
32        serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt, "cleanup - ");
33        status = apr_socket_close(conn->skt);
34        conn->skt = NULL;
35        serf__log_nopref(SOCK_VERBOSE, "closed socket, status %d\n", status);
36    }
37
38    return status;
39}
40
41static apr_status_t clean_resp(void *data)
42{
43    serf_request_t *request = data;
44
45    /* The request's RESPOOL is being cleared.  */
46
47    /* If the response has allocated some buckets, then destroy them (since
48       the bucket may hold resources other than memory in RESPOOL). Also
49       make sure to set their fields to NULL so connection closure does
50       not attempt to free them again.  */
51    if (request->resp_bkt) {
52        serf_bucket_destroy(request->resp_bkt);
53        request->resp_bkt = NULL;
54    }
55    if (request->req_bkt) {
56        serf_bucket_destroy(request->req_bkt);
57        request->req_bkt = NULL;
58    }
59
60    /* ### should we worry about debug stuff, like that performed in
61       ### destroy_request()? should we worry about calling req->handler
62       ### to notify this "cancellation" due to pool clearing?  */
63
64    /* This pool just got cleared/destroyed. Don't try to destroy the pool
65       (again) when the request is canceled.  */
66    request->respool = NULL;
67
68    return APR_SUCCESS;
69}
70
71/* cleanup for conns */
72static apr_status_t clean_conn(void *data)
73{
74    serf_connection_t *conn = data;
75
76    serf__log(CONN_VERBOSE, __FILE__, "cleaning up connection 0x%x\n",
77              conn);
78    serf_connection_close(conn);
79
80    return APR_SUCCESS;
81}
82
83/* Update the pollset for this connection. We tweak the pollset based on
84 * whether we want to read and/or write, given conditions within the
85 * connection. If the connection is not (yet) in the pollset, then it
86 * will be added.
87 */
88apr_status_t serf__conn_update_pollset(serf_connection_t *conn)
89{
90    serf_context_t *ctx = conn->ctx;
91    apr_status_t status;
92    apr_pollfd_t desc = { 0 };
93
94    if (!conn->skt) {
95        return APR_SUCCESS;
96    }
97
98    /* Remove the socket from the poll set. */
99    desc.desc_type = APR_POLL_SOCKET;
100    desc.desc.s = conn->skt;
101    desc.reqevents = conn->reqevents;
102
103    status = ctx->pollset_rm(ctx->pollset_baton,
104                             &desc, conn);
105    if (status && !APR_STATUS_IS_NOTFOUND(status))
106        return status;
107
108    /* Now put it back in with the correct read/write values. */
109    desc.reqevents = APR_POLLHUP | APR_POLLERR;
110    if (conn->requests &&
111        conn->state != SERF_CONN_INIT) {
112        /* If there are any outstanding events, then we want to read. */
113        /* ### not true. we only want to read IF we have sent some data */
114        desc.reqevents |= APR_POLLIN;
115
116        /* If the connection is not closing down and
117         *   has unwritten data or
118         *   there are any requests that still have buckets to write out,
119         *     then we want to write.
120         */
121        if (conn->vec_len &&
122            conn->state != SERF_CONN_CLOSING)
123            desc.reqevents |= APR_POLLOUT;
124        else {
125            serf_request_t *request = conn->requests;
126
127            if ((conn->probable_keepalive_limit &&
128                 conn->completed_requests > conn->probable_keepalive_limit) ||
129                (conn->max_outstanding_requests &&
130                 conn->completed_requests - conn->completed_responses >=
131                     conn->max_outstanding_requests)) {
132                /* we wouldn't try to write any way right now. */
133            }
134            else {
135                while (request != NULL && request->req_bkt == NULL &&
136                       request->written)
137                    request = request->next;
138                if (request != NULL)
139                    desc.reqevents |= APR_POLLOUT;
140            }
141        }
142    }
143
144    /* If we can have async responses, always look for something to read. */
145    if (conn->async_responses) {
146        desc.reqevents |= APR_POLLIN;
147    }
148
149    /* save our reqevents, so we can pass it in to remove later. */
150    conn->reqevents = desc.reqevents;
151
152    /* Note: even if we don't want to read/write this socket, we still
153     * want to poll it for hangups and errors.
154     */
155    return ctx->pollset_add(ctx->pollset_baton,
156                            &desc, &conn->baton);
157}
158
159#ifdef SERF_DEBUG_BUCKET_USE
160
161/* Make sure all response buckets were drained. */
162static void check_buckets_drained(serf_connection_t *conn)
163{
164    serf_request_t *request = conn->requests;
165
166    for ( ; request ; request = request->next ) {
167        if (request->resp_bkt != NULL) {
168            /* ### crap. can't do this. this allocator may have un-drained
169             * ### REQUEST buckets.
170             */
171            /* serf_debug__entered_loop(request->resp_bkt->allocator); */
172            /* ### for now, pretend we closed the conn (resets the tracking) */
173            serf_debug__closed_conn(request->resp_bkt->allocator);
174        }
175    }
176}
177
178#endif
179
180/* Create and connect sockets for any connections which don't have them
181 * yet. This is the core of our lazy-connect behavior.
182 */
183apr_status_t serf__open_connections(serf_context_t *ctx)
184{
185    int i;
186
187    for (i = ctx->conns->nelts; i--; ) {
188        serf_connection_t *conn = GET_CONN(ctx, i);
189        apr_status_t status;
190        apr_socket_t *skt;
191
192        conn->seen_in_pollset = 0;
193
194        if (conn->skt != NULL) {
195#ifdef SERF_DEBUG_BUCKET_USE
196            check_buckets_drained(conn);
197#endif
198            continue;
199        }
200
201        /* Delay opening until we have something to deliver! */
202        if (conn->requests == NULL) {
203            continue;
204        }
205
206        apr_pool_clear(conn->skt_pool);
207        apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, clean_skt);
208
209        status = apr_socket_create(&skt, conn->address->family,
210                                   SOCK_STREAM,
211#if APR_MAJOR_VERSION > 0
212                                   APR_PROTO_TCP,
213#endif
214                                   conn->skt_pool);
215        serf__log(SOCK_VERBOSE, __FILE__,
216                  "created socket for conn 0x%x, status %d\n", conn, status);
217        if (status != APR_SUCCESS)
218            return status;
219
220        /* Set the socket to be non-blocking */
221        if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS)
222            return status;
223
224        /* Disable Nagle's algorithm */
225        if ((status = apr_socket_opt_set(skt,
226                                         APR_TCP_NODELAY, 1)) != APR_SUCCESS)
227            return status;
228
229        /* Configured. Store it into the connection now. */
230        conn->skt = skt;
231
232        /* Remember time when we started connecting to server to calculate
233           network latency. */
234        conn->connect_time = apr_time_now();
235
236        /* Now that the socket is set up, let's connect it. This should
237         * return immediately.
238         */
239        status = apr_socket_connect(skt, conn->address);
240        serf__log_skt(SOCK_VERBOSE, __FILE__, skt,
241                      "connected socket for conn 0x%x, status %d\n",
242                      conn, status);
243		if (status != APR_SUCCESS) {
244            if (!APR_STATUS_IS_EINPROGRESS(status))
245                return status;
246        }
247
248        /* Flag our pollset as dirty now that we have a new socket. */
249        conn->dirty_conn = 1;
250        ctx->dirty_pollset = 1;
251
252        /* If the authentication was already started on another connection,
253           prepare this connection (it might be possible to skip some
254           part of the handshaking). */
255        if (ctx->proxy_address) {
256            if (conn->ctx->proxy_authn_info.scheme)
257                conn->ctx->proxy_authn_info.scheme->init_conn_func(407, conn,
258                                                                   conn->pool);
259        }
260
261        if (conn->ctx->authn_info.scheme)
262            conn->ctx->authn_info.scheme->init_conn_func(401, conn,
263                                                         conn->pool);
264
265        /* Does this connection require a SSL tunnel over the proxy? */
266        if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == 0)
267            serf__ssltunnel_connect(conn);
268        else
269            conn->state = SERF_CONN_CONNECTED;
270
271    }
272
273    return APR_SUCCESS;
274}
275
276static apr_status_t no_more_writes(serf_connection_t *conn,
277                                   serf_request_t *request)
278{
279    /* Note that we should hold new requests until we open our new socket. */
280    conn->state = SERF_CONN_CLOSING;
281    serf__log(CONN_VERBOSE, __FILE__, "stop writing on conn 0x%x\n",
282              conn);
283
284    /* Clear our iovec. */
285    conn->vec_len = 0;
286
287    /* Update the pollset to know we don't want to write on this socket any
288     * more.
289     */
290    conn->dirty_conn = 1;
291    conn->ctx->dirty_pollset = 1;
292    return APR_SUCCESS;
293}
294
295/* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if
296 * the header contains value 'close' indicating the server is closing the
297 * connection right after this response.
298 * Otherwise returns APR_SUCCESS.
299 */
300static apr_status_t is_conn_closing(serf_bucket_t *response)
301{
302    serf_bucket_t *hdrs;
303    const char *val;
304
305    hdrs = serf_bucket_response_get_headers(response);
306    val = serf_bucket_headers_get(hdrs, "Connection");
307    if (val && strcasecmp("close", val) == 0)
308        {
309            return SERF_ERROR_CLOSING;
310        }
311
312    return APR_SUCCESS;
313}
314
315static void link_requests(serf_request_t **list, serf_request_t **tail,
316                          serf_request_t *request)
317{
318    if (*list == NULL) {
319        *list = request;
320        *tail = request;
321    }
322    else {
323        (*tail)->next = request;
324        *tail = request;
325    }
326}
327
328static apr_status_t destroy_request(serf_request_t *request)
329{
330    serf_connection_t *conn = request->conn;
331
332    /* The request and response buckets are no longer needed,
333       nor is the request's pool.  */
334    if (request->resp_bkt) {
335        serf_debug__closed_conn(request->resp_bkt->allocator);
336        serf_bucket_destroy(request->resp_bkt);
337        request->resp_bkt = NULL;
338    }
339    if (request->req_bkt) {
340        serf_debug__closed_conn(request->req_bkt->allocator);
341        serf_bucket_destroy(request->req_bkt);
342        request->req_bkt = NULL;
343    }
344
345    serf_debug__bucket_alloc_check(request->allocator);
346    if (request->respool) {
347        /* ### unregister the pool cleanup for self?  */
348        apr_pool_destroy(request->respool);
349    }
350
351    serf_bucket_mem_free(conn->allocator, request);
352
353    return APR_SUCCESS;
354}
355
356static apr_status_t cancel_request(serf_request_t *request,
357                                   serf_request_t **list,
358                                   int notify_request)
359{
360    /* If we haven't run setup, then we won't have a handler to call. */
361    if (request->handler && notify_request) {
362        /* We actually don't care what the handler returns.
363         * We have bigger matters at hand.
364         */
365        (*request->handler)(request, NULL, request->handler_baton,
366                            request->respool);
367    }
368
369    if (*list == request) {
370        *list = request->next;
371    }
372    else {
373        serf_request_t *scan = *list;
374
375        while (scan->next && scan->next != request)
376            scan = scan->next;
377
378        if (scan->next) {
379            scan->next = scan->next->next;
380        }
381    }
382
383    return destroy_request(request);
384}
385
386static apr_status_t remove_connection(serf_context_t *ctx,
387                                      serf_connection_t *conn)
388{
389    apr_pollfd_t desc = { 0 };
390
391    desc.desc_type = APR_POLL_SOCKET;
392    desc.desc.s = conn->skt;
393    desc.reqevents = conn->reqevents;
394
395    return ctx->pollset_rm(ctx->pollset_baton,
396                           &desc, conn);
397}
398
399static void destroy_ostream(serf_connection_t *conn)
400{
401    if (conn->ostream_head != NULL) {
402        serf_bucket_destroy(conn->ostream_head);
403        conn->ostream_head = NULL;
404        conn->ostream_tail = NULL;
405    }
406}
407
408/* A socket was closed, inform the application. */
409static void handle_conn_closed(serf_connection_t *conn, apr_status_t status)
410{
411    (*conn->closed)(conn, conn->closed_baton, status,
412                    conn->pool);
413}
414
415static apr_status_t reset_connection(serf_connection_t *conn,
416                                     int requeue_requests)
417{
418    serf_context_t *ctx = conn->ctx;
419    apr_status_t status;
420    serf_request_t *old_reqs;
421
422    conn->probable_keepalive_limit = conn->completed_responses;
423    conn->completed_requests = 0;
424    conn->completed_responses = 0;
425
426    old_reqs = conn->requests;
427
428    conn->requests = NULL;
429    conn->requests_tail = NULL;
430
431    /* Handle all outstanding requests. These have either not been written yet,
432       or have been written but the expected reply wasn't received yet. */
433    while (old_reqs) {
434        /* If we haven't started to write the connection, bring it over
435         * unchanged to our new socket.
436         */
437        if (requeue_requests && !old_reqs->written) {
438            serf_request_t *req = old_reqs;
439            old_reqs = old_reqs->next;
440            req->next = NULL;
441            link_requests(&conn->requests, &conn->requests_tail, req);
442        }
443        else {
444            /* Request has been consumed, or we don't want to requeue the
445               request. Either way, inform the application that the request
446               is cancelled. */
447            cancel_request(old_reqs, &old_reqs, requeue_requests);
448        }
449    }
450
451    /* Requests queue has been prepared for a new socket, close the old one. */
452    if (conn->skt != NULL) {
453        remove_connection(ctx, conn);
454        status = apr_socket_close(conn->skt);
455        serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
456                      "closed socket, status %d\n", status);
457        if (conn->closed != NULL) {
458            handle_conn_closed(conn, status);
459        }
460        conn->skt = NULL;
461    }
462
463    if (conn->stream != NULL) {
464        serf_bucket_destroy(conn->stream);
465        conn->stream = NULL;
466    }
467
468    destroy_ostream(conn);
469
470    /* Don't try to resume any writes */
471    conn->vec_len = 0;
472
473    conn->dirty_conn = 1;
474    conn->ctx->dirty_pollset = 1;
475    conn->state = SERF_CONN_INIT;
476
477    serf__log(CONN_VERBOSE, __FILE__, "reset connection 0x%x\n", conn);
478
479    conn->status = APR_SUCCESS;
480
481    /* Let our context know that we've 'reset' the socket already. */
482    conn->seen_in_pollset |= APR_POLLHUP;
483
484    /* Found the connection. Closed it. All done. */
485    return APR_SUCCESS;
486}
487
488static apr_status_t socket_writev(serf_connection_t *conn)
489{
490    apr_size_t written;
491    apr_status_t status;
492
493    status = apr_socket_sendv(conn->skt, conn->vec,
494                              conn->vec_len, &written);
495	if (status && !APR_STATUS_IS_EAGAIN(status))
496        serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
497                      "socket_sendv error %d\n", status);
498
499    /* did we write everything? */
500    if (written) {
501        apr_size_t len = 0;
502        int i;
503
504        serf__log_skt(SOCK_MSG_VERBOSE, __FILE__, conn->skt,
505                      "--- socket_sendv:\n");
506
507        for (i = 0; i < conn->vec_len; i++) {
508            len += conn->vec[i].iov_len;
509            if (written < len) {
510                serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
511                                   conn->vec[i].iov_len - (len - written),
512                                   conn->vec[i].iov_base);
513                if (i) {
514                    memmove(conn->vec, &conn->vec[i],
515                            sizeof(struct iovec) * (conn->vec_len - i));
516                    conn->vec_len -= i;
517                }
518                conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written));
519                conn->vec[0].iov_len = len - written;
520                break;
521            } else {
522                serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
523                                   conn->vec[i].iov_len, conn->vec[i].iov_base);
524            }
525        }
526        if (len == written) {
527            conn->vec_len = 0;
528        }
529        serf__log_nopref(SOCK_MSG_VERBOSE, "-(%d)-\n", written);
530
531        /* Log progress information */
532        serf__context_progress_delta(conn->ctx, 0, written);
533    }
534
535    return status;
536}
537
538static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
539{
540    serf_connection_t *conn = baton;
541    conn->hit_eof = 1;
542    return APR_EAGAIN;
543}
544
545static apr_status_t do_conn_setup(serf_connection_t *conn)
546{
547    apr_status_t status;
548    serf_bucket_t *ostream;
549
550    if (conn->ostream_head == NULL) {
551        conn->ostream_head = serf_bucket_aggregate_create(conn->allocator);
552    }
553
554    if (conn->ostream_tail == NULL) {
555        conn->ostream_tail = serf__bucket_stream_create(conn->allocator,
556                                                        detect_eof,
557                                                        conn);
558    }
559
560    ostream = conn->ostream_tail;
561
562    status = (*conn->setup)(conn->skt,
563                            &conn->stream,
564                            &ostream,
565                            conn->setup_baton,
566                            conn->pool);
567    if (status) {
568        /* extra destroy here since it wasn't added to the head bucket yet. */
569        serf_bucket_destroy(conn->ostream_tail);
570        destroy_ostream(conn);
571        return status;
572    }
573
574    serf_bucket_aggregate_append(conn->ostream_head,
575                                 ostream);
576
577    return status;
578}
579
580/* Set up the input and output stream buckets.
581   When a tunnel over an http proxy is needed, create a socket bucket and
582   empty aggregate bucket for sending and receiving unencrypted requests
583   over the socket.
584
585   After the tunnel is there, or no tunnel was needed, ask the application
586   to create the input and output buckets, which should take care of the
587   [en/de]cryption.
588*/
589
590static apr_status_t prepare_conn_streams(serf_connection_t *conn,
591                                         serf_bucket_t **istream,
592                                         serf_bucket_t **ostreamt,
593                                         serf_bucket_t **ostreamh)
594{
595    apr_status_t status;
596
597    if (conn->stream == NULL) {
598        conn->latency = apr_time_now() - conn->connect_time;
599    }
600
601    /* Do we need a SSL tunnel first? */
602    if (conn->state == SERF_CONN_CONNECTED) {
603        /* If the connection does not have an associated bucket, then
604         * call the setup callback to get one.
605         */
606        if (conn->stream == NULL) {
607            status = do_conn_setup(conn);
608            if (status) {
609                return status;
610            }
611        }
612        *ostreamt = conn->ostream_tail;
613        *ostreamh = conn->ostream_head;
614        *istream = conn->stream;
615    } else {
616        /* SSL tunnel needed and not set up yet, get a direct unencrypted
617           stream for this socket */
618        if (conn->stream == NULL) {
619            *istream = serf_bucket_socket_create(conn->skt,
620                                                 conn->allocator);
621        }
622        /* Don't create the ostream bucket chain including the ssl_encrypt
623           bucket yet. This ensure the CONNECT request is sent unencrypted
624           to the proxy. */
625        *ostreamt = *ostreamh = conn->ssltunnel_ostream;
626    }
627
628    return APR_SUCCESS;
629}
630
631/* write data out to the connection */
632static apr_status_t write_to_connection(serf_connection_t *conn)
633{
634    serf_request_t *request = conn->requests;
635
636    if (conn->probable_keepalive_limit &&
637        conn->completed_requests > conn->probable_keepalive_limit) {
638
639        conn->dirty_conn = 1;
640        conn->ctx->dirty_pollset = 1;
641
642        /* backoff for now. */
643        return APR_SUCCESS;
644    }
645
646    /* Find a request that has data which needs to be delivered. */
647    while (request != NULL &&
648           request->req_bkt == NULL && request->written)
649        request = request->next;
650
651    /* assert: request != NULL || conn->vec_len */
652
653    /* Keep reading and sending until we run out of stuff to read, or
654     * writing would block.
655     */
656    while (1) {
657        int stop_reading = 0;
658        apr_status_t status;
659        apr_status_t read_status;
660        serf_bucket_t *ostreamt, *ostreamh;
661        int max_outstanding_requests = conn->max_outstanding_requests;
662
663        /* If we're setting up an ssl tunnel, we can't send real requests
664           at yet, as they need to be encrypted and our encrypt buckets
665           aren't created yet as we still need to read the unencrypted
666           response of the CONNECT request. */
667        if (conn->state != SERF_CONN_CONNECTED)
668            max_outstanding_requests = 1;
669
670        if (max_outstanding_requests &&
671            conn->completed_requests -
672                conn->completed_responses >= max_outstanding_requests) {
673            /* backoff for now. */
674            return APR_SUCCESS;
675        }
676
677        /* If we have unwritten data, then write what we can. */
678        while (conn->vec_len) {
679            status = socket_writev(conn);
680
681            /* If the write would have blocked, then we're done. Don't try
682             * to write anything else to the socket.
683             */
684            if (APR_STATUS_IS_EAGAIN(status))
685                return APR_SUCCESS;
686            if (APR_STATUS_IS_EPIPE(status) ||
687                APR_STATUS_IS_ECONNRESET(status) ||
688                APR_STATUS_IS_ECONNABORTED(status))
689                return no_more_writes(conn, request);
690            if (status)
691                return status;
692        }
693        /* ### can we have a short write, yet no EAGAIN? a short write
694           ### would imply unwritten_len > 0 ... */
695        /* assert: unwritten_len == 0. */
696
697        /* We may need to move forward to a request which has something
698         * to write.
699         */
700        while (request != NULL &&
701               request->req_bkt == NULL && request->written)
702            request = request->next;
703
704        if (request == NULL) {
705            /* No more requests (with data) are registered with the
706             * connection. Let's update the pollset so that we don't
707             * try to write to this socket again.
708             */
709            conn->dirty_conn = 1;
710            conn->ctx->dirty_pollset = 1;
711            return APR_SUCCESS;
712        }
713
714        status = prepare_conn_streams(conn, &conn->stream, &ostreamt, &ostreamh);
715        if (status) {
716            return status;
717        }
718
719        if (request->req_bkt == NULL) {
720            /* Now that we are about to serve the request, allocate a pool. */
721            apr_pool_create(&request->respool, conn->pool);
722            request->allocator = serf_bucket_allocator_create(request->respool,
723                                                              NULL, NULL);
724            apr_pool_cleanup_register(request->respool, request,
725                                      clean_resp, clean_resp);
726
727            /* Fill in the rest of the values for the request. */
728            read_status = request->setup(request, request->setup_baton,
729                                         &request->req_bkt,
730                                         &request->acceptor,
731                                         &request->acceptor_baton,
732                                         &request->handler,
733                                         &request->handler_baton,
734                                         request->respool);
735
736            if (read_status) {
737                /* Something bad happened. Propagate any errors. */
738                return read_status;
739            }
740
741            request->written = 1;
742            serf_bucket_aggregate_append(ostreamt, request->req_bkt);
743        }
744
745        /* ### optimize at some point by using read_for_sendfile */
746        read_status = serf_bucket_read_iovec(ostreamh,
747                                             SERF_READ_ALL_AVAIL,
748                                             IOV_MAX,
749                                             conn->vec,
750                                             &conn->vec_len);
751
752        if (!conn->hit_eof) {
753            if (APR_STATUS_IS_EAGAIN(read_status) ||
754                read_status == SERF_ERROR_WAIT_CONN) {
755                /* We read some stuff, but should not try to read again. */
756                stop_reading = 1;
757
758                /* ### we should avoid looking for writability for a while so
759                   ### that (hopefully) something will appear in the bucket so
760                   ### we can actually write something. otherwise, we could
761                   ### end up in a CPU spin: socket wants something, but we
762                   ### don't have anything (and keep returning EAGAIN)
763                */
764            }
765            else if (read_status && !APR_STATUS_IS_EOF(read_status)) {
766                /* Something bad happened. Propagate any errors. */
767                return read_status;
768            }
769        }
770
771        /* If we got some data, then deliver it. */
772        /* ### what to do if we got no data?? is that a problem? */
773        if (conn->vec_len > 0) {
774            status = socket_writev(conn);
775
776            /* If we can't write any more, or an error occurred, then
777             * we're done here.
778             */
779            if (APR_STATUS_IS_EAGAIN(status))
780                return APR_SUCCESS;
781            if (APR_STATUS_IS_EPIPE(status))
782                return no_more_writes(conn, request);
783            if (APR_STATUS_IS_ECONNRESET(status) ||
784                APR_STATUS_IS_ECONNABORTED(status)) {
785                return no_more_writes(conn, request);
786            }
787            if (status)
788                return status;
789        }
790
791        if (read_status == SERF_ERROR_WAIT_CONN) {
792            stop_reading = 1;
793        }
794        else if (read_status && conn->hit_eof && conn->vec_len == 0) {
795            /* If we hit the end of the request bucket and all of its data has
796             * been written, then clear it out to signify that we're done
797             * sending the request. On the next iteration through this loop:
798             * - if there are remaining bytes they will be written, and as the
799             * request bucket will be completely read it will be destroyed then.
800             * - we'll see if there are other requests that need to be sent
801             * ("pipelining").
802             */
803            conn->hit_eof = 0;
804            serf_bucket_destroy(request->req_bkt);
805            request->req_bkt = NULL;
806
807            /* If our connection has async responses enabled, we're not
808             * going to get a reply back, so kill the request.
809             */
810            if (conn->async_responses) {
811                conn->requests = request->next;
812                destroy_request(request);
813            }
814
815            conn->completed_requests++;
816
817            if (conn->probable_keepalive_limit &&
818                conn->completed_requests > conn->probable_keepalive_limit) {
819                /* backoff for now. */
820                stop_reading = 1;
821            }
822        }
823
824        if (stop_reading) {
825            return APR_SUCCESS;
826        }
827    }
828    /* NOTREACHED */
829}
830
831/* A response message was received from the server, so call
832   the handler as specified on the original request. */
833static apr_status_t handle_response(serf_request_t *request,
834                                    apr_pool_t *pool)
835{
836    apr_status_t status = APR_SUCCESS;
837    int consumed_response = 0;
838
839    /* Only enable the new authentication framework if the program has
840     * registered an authentication credential callback.
841     *
842     * This permits older Serf apps to still handle authentication
843     * themselves by not registering credential callbacks.
844     */
845    if (request->conn->ctx->cred_cb) {
846      status = serf__handle_auth_response(&consumed_response,
847                                          request,
848                                          request->resp_bkt,
849                                          request->handler_baton,
850                                          pool);
851
852      /* If there was an error reading the response (maybe there wasn't
853         enough data available), don't bother passing the response to the
854         application.
855
856         If the authentication was tried, but failed, pass the response
857         to the application, maybe it can do better. */
858      if (APR_STATUS_IS_EOF(status) ||
859          APR_STATUS_IS_EAGAIN(status)) {
860          return status;
861      }
862    }
863
864    if (!consumed_response) {
865        return (*request->handler)(request,
866                                   request->resp_bkt,
867                                   request->handler_baton,
868                                   pool);
869    }
870
871    return status;
872}
873
874/* An async response message was received from the server. */
875static apr_status_t handle_async_response(serf_connection_t *conn,
876                                          apr_pool_t *pool)
877{
878    apr_status_t status;
879
880    if (conn->current_async_response == NULL) {
881        conn->current_async_response =
882            (*conn->async_acceptor)(NULL, conn->stream,
883                                    conn->async_acceptor_baton, pool);
884    }
885
886    status = (*conn->async_handler)(NULL, conn->current_async_response,
887                                    conn->async_handler_baton, pool);
888
889    if (APR_STATUS_IS_EOF(status)) {
890        serf_bucket_destroy(conn->current_async_response);
891        conn->current_async_response = NULL;
892        status = APR_SUCCESS;
893    }
894
895    return status;
896}
897
898/* read data from the connection */
899static apr_status_t read_from_connection(serf_connection_t *conn)
900{
901    apr_status_t status;
902    apr_pool_t *tmppool;
903    int close_connection = FALSE;
904
905    /* Whatever is coming in on the socket corresponds to the first request
906     * on our chain.
907     */
908    serf_request_t *request = conn->requests;
909
910    /* assert: request != NULL */
911
912    if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS)
913        goto error;
914
915    /* Invoke response handlers until we have no more work. */
916    while (1) {
917        serf_bucket_t *dummy1, *dummy2;
918
919        apr_pool_clear(tmppool);
920
921        /* Only interested in the input stream here. */
922        status = prepare_conn_streams(conn, &conn->stream, &dummy1, &dummy2);
923        if (status) {
924            goto error;
925        }
926
927        /* We have a different codepath when we can have async responses. */
928        if (conn->async_responses) {
929            /* TODO What about socket errors? */
930            status = handle_async_response(conn, tmppool);
931            if (APR_STATUS_IS_EAGAIN(status)) {
932                status = APR_SUCCESS;
933                goto error;
934            }
935            if (status) {
936                goto error;
937            }
938            continue;
939        }
940
941        /* We are reading a response for a request we haven't
942         * written yet!
943         *
944         * This shouldn't normally happen EXCEPT:
945         *
946         * 1) when the other end has closed the socket and we're
947         *    pending an EOF return.
948         * 2) Doing the initial SSL handshake - we'll get EAGAIN
949         *    as the SSL buckets will hide the handshake from us
950         *    but not return any data.
951         * 3) When the server sends us an SSL alert.
952         *
953         * In these cases, we should not receive any actual user data.
954         *
955         * 4) When the server sends a error response, like 408 Request timeout.
956         *    This response should be passed to the application.
957         *
958         * If we see an EOF (due to either an expired timeout or the server
959         * sending the SSL 'close notify' shutdown alert), we'll reset the
960         * connection and open a new one.
961         */
962        if (request->req_bkt || !request->written) {
963            const char *data;
964            apr_size_t len;
965
966            status = serf_bucket_peek(conn->stream, &data, &len);
967
968            if (APR_STATUS_IS_EOF(status)) {
969                reset_connection(conn, 1);
970                status = APR_SUCCESS;
971                goto error;
972            }
973            else if (APR_STATUS_IS_EAGAIN(status) && !len) {
974                status = APR_SUCCESS;
975                goto error;
976            } else if (status && !APR_STATUS_IS_EAGAIN(status)) {
977                /* Read error */
978                goto error;
979            }
980
981            /* Unexpected response from the server */
982
983        }
984
985        /* If the request doesn't have a response bucket, then call the
986         * acceptor to get one created.
987         */
988        if (request->resp_bkt == NULL) {
989            request->resp_bkt = (*request->acceptor)(request, conn->stream,
990                                                     request->acceptor_baton,
991                                                     tmppool);
992            apr_pool_clear(tmppool);
993        }
994
995        status = handle_response(request, tmppool);
996
997        /* Some systems will not generate a HUP poll event so we have to
998         * handle the ECONNRESET issue and ECONNABORT here.
999         */
1000        if (APR_STATUS_IS_ECONNRESET(status) ||
1001            APR_STATUS_IS_ECONNABORTED(status) ||
1002            status == SERF_ERROR_REQUEST_LOST) {
1003            /* If the connection had ever been good, be optimistic & try again.
1004             * If it has never tried again (incl. a retry), fail.
1005             */
1006            if (conn->completed_responses) {
1007                reset_connection(conn, 1);
1008                status = APR_SUCCESS;
1009            }
1010            else if (status == SERF_ERROR_REQUEST_LOST) {
1011                status = SERF_ERROR_ABORTED_CONNECTION;
1012            }
1013            goto error;
1014        }
1015
1016        /* If our response handler says it can't do anything more, we now
1017         * treat that as a success.
1018         */
1019        if (APR_STATUS_IS_EAGAIN(status)) {
1020            status = APR_SUCCESS;
1021            goto error;
1022        }
1023
1024        /* If we received APR_SUCCESS, run this loop again. */
1025        if (!status) {
1026            continue;
1027        }
1028
1029        close_connection = is_conn_closing(request->resp_bkt);
1030
1031        if (!APR_STATUS_IS_EOF(status) &&
1032            close_connection != SERF_ERROR_CLOSING) {
1033            /* Whether success, or an error, there is no more to do unless
1034             * this request has been completed.
1035             */
1036            goto error;
1037        }
1038
1039        /* The response has been fully-read, so that means the request has
1040         * either been fully-delivered (most likely), or that we don't need to
1041         * write the rest of it anymore, e.g. when a 408 Request timeout was
1042         $ received.
1043         * Remove it from our queue and loop to read another response.
1044         */
1045        conn->requests = request->next;
1046
1047        destroy_request(request);
1048
1049        request = conn->requests;
1050
1051        /* If we're truly empty, update our tail. */
1052        if (request == NULL) {
1053            conn->requests_tail = NULL;
1054        }
1055
1056        conn->completed_responses++;
1057
1058        /* We've to rebuild pollset since completed_responses is changed. */
1059        conn->dirty_conn = 1;
1060        conn->ctx->dirty_pollset = 1;
1061
1062        /* This means that we're being advised that the connection is done. */
1063        if (close_connection == SERF_ERROR_CLOSING) {
1064            reset_connection(conn, 1);
1065            if (APR_STATUS_IS_EOF(status))
1066                status = APR_SUCCESS;
1067            goto error;
1068        }
1069
1070        /* The server is suddenly deciding to serve more responses than we've
1071         * seen before.
1072         *
1073         * Let our requests go.
1074         */
1075        if (conn->probable_keepalive_limit &&
1076            conn->completed_responses > conn->probable_keepalive_limit) {
1077            conn->probable_keepalive_limit = 0;
1078        }
1079
1080        /* If we just ran out of requests or have unwritten requests, then
1081         * update the pollset. We don't want to read from this socket any
1082         * more. We are definitely done with this loop, too.
1083         */
1084        if (request == NULL || !request->written) {
1085            conn->dirty_conn = 1;
1086            conn->ctx->dirty_pollset = 1;
1087            status = APR_SUCCESS;
1088            goto error;
1089        }
1090    }
1091
1092error:
1093    apr_pool_destroy(tmppool);
1094    return status;
1095}
1096
1097/* process all events on the connection */
1098apr_status_t serf__process_connection(serf_connection_t *conn,
1099                                      apr_int16_t events)
1100{
1101    apr_status_t status;
1102
1103    /* POLLHUP/ERR should come after POLLIN so if there's an error message or
1104     * the like sitting on the connection, we give the app a chance to read
1105     * it before we trigger a reset condition.
1106     */
1107    if ((events & APR_POLLIN) != 0) {
1108        if ((status = read_from_connection(conn)) != APR_SUCCESS)
1109            return status;
1110
1111        /* If we decided to reset our connection, return now as we don't
1112         * want to write.
1113         */
1114        if ((conn->seen_in_pollset & APR_POLLHUP) != 0) {
1115            return APR_SUCCESS;
1116        }
1117    }
1118    if ((events & APR_POLLHUP) != 0) {
1119        /* The connection got reset by the server. On Windows this can happen
1120           when all data is read, so just cleanup the connection and open
1121           a new one.
1122           If we haven't had any successful responses on this connection,
1123           then error out as it is likely a server issue. */
1124        if (conn->completed_responses) {
1125            return reset_connection(conn, 1);
1126        }
1127        return SERF_ERROR_ABORTED_CONNECTION;
1128    }
1129    if ((events & APR_POLLERR) != 0) {
1130        /* We might be talking to a buggy HTTP server that doesn't
1131         * do lingering-close.  (httpd < 2.1.8 does this.)
1132         *
1133         * See:
1134         *
1135         * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292
1136         */
1137        if (conn->completed_requests && !conn->probable_keepalive_limit) {
1138            return reset_connection(conn, 1);
1139        }
1140        return APR_EGENERAL;
1141    }
1142    if ((events & APR_POLLOUT) != 0) {
1143        if ((status = write_to_connection(conn)) != APR_SUCCESS)
1144            return status;
1145    }
1146    return APR_SUCCESS;
1147}
1148
1149serf_connection_t *serf_connection_create(
1150    serf_context_t *ctx,
1151    apr_sockaddr_t *address,
1152    serf_connection_setup_t setup,
1153    void *setup_baton,
1154    serf_connection_closed_t closed,
1155    void *closed_baton,
1156    apr_pool_t *pool)
1157{
1158    serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn));
1159
1160    conn->ctx = ctx;
1161    conn->status = APR_SUCCESS;
1162    /* Ignore server address if proxy was specified. */
1163    conn->address = ctx->proxy_address ? ctx->proxy_address : address;
1164    conn->setup = setup;
1165    conn->setup_baton = setup_baton;
1166    conn->closed = closed;
1167    conn->closed_baton = closed_baton;
1168    conn->pool = pool;
1169    conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
1170    conn->stream = NULL;
1171    conn->ostream_head = NULL;
1172    conn->ostream_tail = NULL;
1173    conn->baton.type = SERF_IO_CONN;
1174    conn->baton.u.conn = conn;
1175    conn->hit_eof = 0;
1176    conn->state = SERF_CONN_INIT;
1177    conn->latency = -1; /* unknown */
1178
1179    /* Create a subpool for our connection. */
1180    apr_pool_create(&conn->skt_pool, conn->pool);
1181
1182    /* register a cleanup */
1183    apr_pool_cleanup_register(conn->pool, conn, clean_conn, apr_pool_cleanup_null);
1184
1185    /* Add the connection to the context. */
1186    *(serf_connection_t **)apr_array_push(ctx->conns) = conn;
1187
1188    serf__log(CONN_VERBOSE, __FILE__, "created connection 0x%x\n",
1189              conn);
1190
1191    return conn;
1192}
1193
1194apr_status_t serf_connection_create2(
1195    serf_connection_t **conn,
1196    serf_context_t *ctx,
1197    apr_uri_t host_info,
1198    serf_connection_setup_t setup,
1199    void *setup_baton,
1200    serf_connection_closed_t closed,
1201    void *closed_baton,
1202    apr_pool_t *pool)
1203{
1204    apr_status_t status = APR_SUCCESS;
1205    serf_connection_t *c;
1206    apr_sockaddr_t *host_address = NULL;
1207
1208    /* Set the port number explicitly, needed to create the socket later. */
1209    if (!host_info.port) {
1210        host_info.port = apr_uri_port_of_scheme(host_info.scheme);
1211    }
1212
1213    /* Only lookup the address of the server if no proxy server was
1214       configured. */
1215    if (!ctx->proxy_address) {
1216        status = apr_sockaddr_info_get(&host_address,
1217                                       host_info.hostname,
1218                                       APR_UNSPEC, host_info.port, 0, pool);
1219        if (status)
1220            return status;
1221    }
1222
1223    c = serf_connection_create(ctx, host_address, setup, setup_baton,
1224                               closed, closed_baton, pool);
1225
1226    /* We're not interested in the path following the hostname. */
1227    c->host_url = apr_uri_unparse(c->pool,
1228                                  &host_info,
1229                                  APR_URI_UNP_OMITPATHINFO);
1230    c->host_info = host_info;
1231
1232    *conn = c;
1233
1234    return status;
1235}
1236
1237apr_status_t serf_connection_reset(
1238    serf_connection_t *conn)
1239{
1240    return reset_connection(conn, 0);
1241}
1242
1243
1244apr_status_t serf_connection_close(
1245    serf_connection_t *conn)
1246{
1247    int i;
1248    serf_context_t *ctx = conn->ctx;
1249    apr_status_t status;
1250
1251    for (i = ctx->conns->nelts; i--; ) {
1252        serf_connection_t *conn_seq = GET_CONN(ctx, i);
1253
1254        if (conn_seq == conn) {
1255            while (conn->requests) {
1256                serf_request_cancel(conn->requests);
1257            }
1258            if (conn->skt != NULL) {
1259                remove_connection(ctx, conn);
1260                status = apr_socket_close(conn->skt);
1261                serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
1262                              "closed socket, status %d\n",
1263                              status);
1264                if (conn->closed != NULL) {
1265                    handle_conn_closed(conn, status);
1266                }
1267                conn->skt = NULL;
1268            }
1269            if (conn->stream != NULL) {
1270                serf_bucket_destroy(conn->stream);
1271                conn->stream = NULL;
1272            }
1273
1274            destroy_ostream(conn);
1275
1276            /* Remove the connection from the context. We don't want to
1277             * deal with it any more.
1278             */
1279            if (i < ctx->conns->nelts - 1) {
1280                /* move later connections over this one. */
1281                memmove(
1282                    &GET_CONN(ctx, i),
1283                    &GET_CONN(ctx, i + 1),
1284                    (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t *));
1285            }
1286            --ctx->conns->nelts;
1287
1288            serf__log(CONN_VERBOSE, __FILE__, "closed connection 0x%x\n",
1289                      conn);
1290
1291            /* Found the connection. Closed it. All done. */
1292            return APR_SUCCESS;
1293        }
1294    }
1295
1296    /* We didn't find the specified connection. */
1297    /* ### doc talks about this w.r.t poll structures. use something else? */
1298    return APR_NOTFOUND;
1299}
1300
1301
1302void serf_connection_set_max_outstanding_requests(
1303    serf_connection_t *conn,
1304    unsigned int max_requests)
1305{
1306    if (max_requests == 0)
1307        serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1308                      "Set max. nr. of outstanding requests for this "
1309                      "connection to unlimited.\n");
1310    else
1311        serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1312                      "Limit max. nr. of outstanding requests for this "
1313                      "connection to %u.\n", max_requests);
1314
1315    conn->max_outstanding_requests = max_requests;
1316}
1317
1318
1319void serf_connection_set_async_responses(
1320    serf_connection_t *conn,
1321    serf_response_acceptor_t acceptor,
1322    void *acceptor_baton,
1323    serf_response_handler_t handler,
1324    void *handler_baton)
1325{
1326    conn->async_responses = 1;
1327    conn->async_acceptor = acceptor;
1328    conn->async_acceptor_baton = acceptor_baton;
1329    conn->async_handler = handler;
1330    conn->async_handler_baton = handler_baton;
1331}
1332
1333
1334serf_request_t *serf_connection_request_create(
1335    serf_connection_t *conn,
1336    serf_request_setup_t setup,
1337    void *setup_baton)
1338{
1339    serf_request_t *request;
1340
1341    request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request));
1342    request->conn = conn;
1343    request->setup = setup;
1344    request->setup_baton = setup_baton;
1345    request->handler = NULL;
1346    request->respool = NULL;
1347    request->req_bkt = NULL;
1348    request->resp_bkt = NULL;
1349    request->priority = 0;
1350    request->written = 0;
1351    request->next = NULL;
1352
1353    /* Link the request to the end of the request chain. */
1354    link_requests(&conn->requests, &conn->requests_tail, request);
1355
1356    /* Ensure our pollset becomes writable in context run */
1357    conn->ctx->dirty_pollset = 1;
1358    conn->dirty_conn = 1;
1359
1360    return request;
1361}
1362
1363
1364serf_request_t *serf_connection_priority_request_create(
1365    serf_connection_t *conn,
1366    serf_request_setup_t setup,
1367    void *setup_baton)
1368{
1369    serf_request_t *request;
1370    serf_request_t *iter, *prev;
1371
1372    request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request));
1373    request->conn = conn;
1374    request->setup = setup;
1375    request->setup_baton = setup_baton;
1376    request->handler = NULL;
1377    request->respool = NULL;
1378    request->req_bkt = NULL;
1379    request->resp_bkt = NULL;
1380    request->priority = 1;
1381    request->written = 0;
1382    request->next = NULL;
1383
1384    /* Link the new request after the last written request. */
1385    iter = conn->requests;
1386    prev = NULL;
1387
1388    /* Find a request that has data which needs to be delivered. */
1389    while (iter != NULL && iter->req_bkt == NULL && iter->written) {
1390        prev = iter;
1391        iter = iter->next;
1392    }
1393
1394    /* Advance to next non priority request */
1395    while (iter != NULL && iter->priority) {
1396        prev = iter;
1397        iter = iter->next;
1398    }
1399
1400    if (prev) {
1401        request->next = iter;
1402        prev->next = request;
1403    } else {
1404        request->next = iter;
1405        conn->requests = request;
1406    }
1407
1408    /* Ensure our pollset becomes writable in context run */
1409    conn->ctx->dirty_pollset = 1;
1410    conn->dirty_conn = 1;
1411
1412    return request;
1413}
1414
1415
1416apr_status_t serf_request_cancel(serf_request_t *request)
1417{
1418    return cancel_request(request, &request->conn->requests, 0);
1419}
1420
1421apr_status_t serf_request_is_written(serf_request_t *request)
1422{
1423    if (request->written && !request->req_bkt)
1424        return APR_SUCCESS;
1425
1426    return APR_EBUSY;
1427}
1428
1429apr_pool_t *serf_request_get_pool(const serf_request_t *request)
1430{
1431    return request->respool;
1432}
1433
1434
1435serf_bucket_alloc_t *serf_request_get_alloc(
1436    const serf_request_t *request)
1437{
1438    return request->allocator;
1439}
1440
1441
1442serf_connection_t *serf_request_get_conn(
1443    const serf_request_t *request)
1444{
1445    return request->conn;
1446}
1447
1448
1449void serf_request_set_handler(
1450    serf_request_t *request,
1451    const serf_response_handler_t handler,
1452    const void **handler_baton)
1453{
1454    request->handler = handler;
1455    request->handler_baton = handler_baton;
1456}
1457
1458
1459serf_bucket_t *serf_request_bucket_request_create(
1460    serf_request_t *request,
1461    const char *method,
1462    const char *uri,
1463    serf_bucket_t *body,
1464    serf_bucket_alloc_t *allocator)
1465{
1466    serf_bucket_t *req_bkt, *hdrs_bkt;
1467    serf_connection_t *conn = request->conn;
1468    serf_context_t *ctx = conn->ctx;
1469
1470    req_bkt = serf_bucket_request_create(method, uri, body, allocator);
1471    hdrs_bkt = serf_bucket_request_get_headers(req_bkt);
1472
1473    /* Proxy? */
1474    if (ctx->proxy_address && conn->host_url)
1475        serf_bucket_request_set_root(req_bkt, conn->host_url);
1476
1477    if (conn->host_info.hostinfo)
1478        serf_bucket_headers_setn(hdrs_bkt, "Host",
1479                                 conn->host_info.hostinfo);
1480
1481    /* Setup server authorization headers */
1482    if (ctx->authn_info.scheme)
1483        ctx->authn_info.scheme->setup_request_func(HOST, 0, conn, request,
1484                                                   method, uri,
1485                                                   hdrs_bkt);
1486
1487    /* Setup proxy authorization headers */
1488    if (ctx->proxy_authn_info.scheme)
1489        ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn,
1490                                                         request,
1491                                                         method, uri, hdrs_bkt);
1492
1493    return req_bkt;
1494}
1495
1496apr_interval_time_t serf_connection_get_latency(serf_connection_t *conn)
1497{
1498    if (conn->ctx->proxy_address) {
1499        /* Detecting network latency for proxied connection is not implemented
1500           yet. */
1501        return -1;
1502    }
1503
1504    return conn->latency;
1505}
1506