outgoing.c revision 262324
1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <apr_pools.h>
17#include <apr_poll.h>
18#include <apr_version.h>
19#include <apr_portable.h>
20
21#include "serf.h"
22#include "serf_bucket_util.h"
23
24#include "serf_private.h"
25
26/* cleanup for sockets */
27static apr_status_t clean_skt(void *data)
28{
29    serf_connection_t *conn = data;
30    apr_status_t status = APR_SUCCESS;
31
32    if (conn->skt) {
33        serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt, "cleanup - ");
34        status = apr_socket_close(conn->skt);
35        conn->skt = NULL;
36        serf__log_nopref(SOCK_VERBOSE, "closed socket, status %d\n", status);
37    }
38
39    return status;
40}
41
42static apr_status_t clean_resp(void *data)
43{
44    serf_request_t *request = data;
45
46    /* The request's RESPOOL is being cleared.  */
47
48    /* If the response has allocated some buckets, then destroy them (since
49       the bucket may hold resources other than memory in RESPOOL). Also
50       make sure to set their fields to NULL so connection closure does
51       not attempt to free them again.  */
52    if (request->resp_bkt) {
53        serf_bucket_destroy(request->resp_bkt);
54        request->resp_bkt = NULL;
55    }
56    if (request->req_bkt) {
57        serf_bucket_destroy(request->req_bkt);
58        request->req_bkt = NULL;
59    }
60
61    /* ### should we worry about debug stuff, like that performed in
62       ### destroy_request()? should we worry about calling req->handler
63       ### to notify this "cancellation" due to pool clearing?  */
64
65    /* This pool just got cleared/destroyed. Don't try to destroy the pool
66       (again) when the request is canceled.  */
67    request->respool = NULL;
68
69    return APR_SUCCESS;
70}
71
72/* cleanup for conns */
73static apr_status_t clean_conn(void *data)
74{
75    serf_connection_t *conn = data;
76
77    serf__log(CONN_VERBOSE, __FILE__, "cleaning up connection 0x%x\n",
78              conn);
79    serf_connection_close(conn);
80
81    return APR_SUCCESS;
82}
83
84/* Check if there is data waiting to be sent over the socket. This can happen
85   in two situations:
86   - The connection queue has atleast one request with unwritten data.
87   - All requests are written and the ssl layer wrote some data while reading
88     the response. This can happen when the server triggers a renegotiation,
89     e.g. after the first and only request on that connection was received.
90   Returns 1 if data is pending on CONN, NULL if not.
91   If NEXT_REQ is not NULL, it will be filled in with the next available request
92   with unwritten data. */
93static int
94request_or_data_pending(serf_request_t **next_req, serf_connection_t *conn)
95{
96    serf_request_t *request = conn->requests;
97
98    while (request != NULL && request->req_bkt == NULL &&
99           request->writing_started)
100        request = request->next;
101
102    if (next_req)
103        *next_req = request;
104
105    if (request != NULL) {
106        return 1;
107    } else if (conn->ostream_head) {
108        const char *dummy;
109        apr_size_t len;
110        apr_status_t status;
111
112        status = serf_bucket_peek(conn->ostream_head, &dummy,
113                                  &len);
114        if (!SERF_BUCKET_READ_ERROR(status) && len) {
115            serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
116                          "All requests written but still data pending.\n");
117            return 1;
118        }
119    }
120
121    return 0;
122}
123
124/* Update the pollset for this connection. We tweak the pollset based on
125 * whether we want to read and/or write, given conditions within the
126 * connection. If the connection is not (yet) in the pollset, then it
127 * will be added.
128 */
129apr_status_t serf__conn_update_pollset(serf_connection_t *conn)
130{
131    serf_context_t *ctx = conn->ctx;
132    apr_status_t status;
133    apr_pollfd_t desc = { 0 };
134
135    if (!conn->skt) {
136        return APR_SUCCESS;
137    }
138
139    /* Remove the socket from the poll set. */
140    desc.desc_type = APR_POLL_SOCKET;
141    desc.desc.s = conn->skt;
142    desc.reqevents = conn->reqevents;
143
144    status = ctx->pollset_rm(ctx->pollset_baton,
145                             &desc, conn);
146    if (status && !APR_STATUS_IS_NOTFOUND(status))
147        return status;
148
149    /* Now put it back in with the correct read/write values. */
150    desc.reqevents = APR_POLLHUP | APR_POLLERR;
151    if (conn->requests &&
152        conn->state != SERF_CONN_INIT) {
153        /* If there are any outstanding events, then we want to read. */
154        /* ### not true. we only want to read IF we have sent some data */
155        desc.reqevents |= APR_POLLIN;
156
157        /* Don't write if OpenSSL told us that it needs to read data first. */
158        if (conn->stop_writing != 1) {
159
160            /* If the connection is not closing down and
161             *   has unwritten data or
162             *   there are any requests that still have buckets to write out,
163             *     then we want to write.
164             */
165            if (conn->vec_len &&
166                conn->state != SERF_CONN_CLOSING)
167                desc.reqevents |= APR_POLLOUT;
168            else {
169
170                if ((conn->probable_keepalive_limit &&
171                     conn->completed_requests > conn->probable_keepalive_limit) ||
172                    (conn->max_outstanding_requests &&
173                     conn->completed_requests - conn->completed_responses >=
174                     conn->max_outstanding_requests)) {
175                        /* we wouldn't try to write any way right now. */
176                }
177                else if (request_or_data_pending(NULL, conn)) {
178                    desc.reqevents |= APR_POLLOUT;
179                }
180            }
181        }
182    }
183
184    /* If we can have async responses, always look for something to read. */
185    if (conn->async_responses) {
186        desc.reqevents |= APR_POLLIN;
187    }
188
189    /* save our reqevents, so we can pass it in to remove later. */
190    conn->reqevents = desc.reqevents;
191
192    /* Note: even if we don't want to read/write this socket, we still
193     * want to poll it for hangups and errors.
194     */
195    return ctx->pollset_add(ctx->pollset_baton,
196                            &desc, &conn->baton);
197}
198
199#ifdef SERF_DEBUG_BUCKET_USE
200
201/* Make sure all response buckets were drained. */
202static void check_buckets_drained(serf_connection_t *conn)
203{
204    serf_request_t *request = conn->requests;
205
206    for ( ; request ; request = request->next ) {
207        if (request->resp_bkt != NULL) {
208            /* ### crap. can't do this. this allocator may have un-drained
209             * ### REQUEST buckets.
210             */
211            /* serf_debug__entered_loop(request->resp_bkt->allocator); */
212            /* ### for now, pretend we closed the conn (resets the tracking) */
213            serf_debug__closed_conn(request->resp_bkt->allocator);
214        }
215    }
216}
217
218#endif
219
220static void destroy_ostream(serf_connection_t *conn)
221{
222    if (conn->ostream_head != NULL) {
223        serf_bucket_destroy(conn->ostream_head);
224        conn->ostream_head = NULL;
225        conn->ostream_tail = NULL;
226    }
227}
228
229static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
230{
231    serf_connection_t *conn = baton;
232    conn->hit_eof = 1;
233    return APR_EAGAIN;
234}
235
236static apr_status_t do_conn_setup(serf_connection_t *conn)
237{
238    apr_status_t status;
239    serf_bucket_t *ostream;
240
241    if (conn->ostream_head == NULL) {
242        conn->ostream_head = serf_bucket_aggregate_create(conn->allocator);
243    }
244
245    if (conn->ostream_tail == NULL) {
246        conn->ostream_tail = serf__bucket_stream_create(conn->allocator,
247                                                        detect_eof,
248                                                        conn);
249    }
250
251    ostream = conn->ostream_tail;
252
253    status = (*conn->setup)(conn->skt,
254                            &conn->stream,
255                            &ostream,
256                            conn->setup_baton,
257                            conn->pool);
258    if (status) {
259        /* extra destroy here since it wasn't added to the head bucket yet. */
260        serf_bucket_destroy(conn->ostream_tail);
261        destroy_ostream(conn);
262        return status;
263    }
264
265    serf_bucket_aggregate_append(conn->ostream_head,
266                                 ostream);
267
268    return status;
269}
270
271/* Set up the input and output stream buckets.
272 When a tunnel over an http proxy is needed, create a socket bucket and
273 empty aggregate bucket for sending and receiving unencrypted requests
274 over the socket.
275
276 After the tunnel is there, or no tunnel was needed, ask the application
277 to create the input and output buckets, which should take care of the
278 [en/de]cryption.
279 */
280
281static apr_status_t prepare_conn_streams(serf_connection_t *conn,
282                                         serf_bucket_t **istream,
283                                         serf_bucket_t **ostreamt,
284                                         serf_bucket_t **ostreamh)
285{
286    apr_status_t status;
287
288    if (conn->stream == NULL) {
289        conn->latency = apr_time_now() - conn->connect_time;
290    }
291
292    /* Do we need a SSL tunnel first? */
293    if (conn->state == SERF_CONN_CONNECTED) {
294        /* If the connection does not have an associated bucket, then
295         * call the setup callback to get one.
296         */
297        if (conn->stream == NULL) {
298            status = do_conn_setup(conn);
299            if (status) {
300                return status;
301            }
302        }
303        *ostreamt = conn->ostream_tail;
304        *ostreamh = conn->ostream_head;
305        *istream = conn->stream;
306    } else {
307        /* SSL tunnel needed and not set up yet, get a direct unencrypted
308         stream for this socket */
309        if (conn->stream == NULL) {
310            *istream = serf_bucket_socket_create(conn->skt,
311                                                 conn->allocator);
312        }
313        /* Don't create the ostream bucket chain including the ssl_encrypt
314         bucket yet. This ensure the CONNECT request is sent unencrypted
315         to the proxy. */
316        *ostreamt = *ostreamh = conn->ssltunnel_ostream;
317    }
318
319    return APR_SUCCESS;
320}
321
322/* Create and connect sockets for any connections which don't have them
323 * yet. This is the core of our lazy-connect behavior.
324 */
325apr_status_t serf__open_connections(serf_context_t *ctx)
326{
327    int i;
328
329    for (i = ctx->conns->nelts; i--; ) {
330        serf_connection_t *conn = GET_CONN(ctx, i);
331        serf__authn_info_t *authn_info;
332        apr_status_t status;
333        apr_socket_t *skt;
334
335        conn->seen_in_pollset = 0;
336
337        if (conn->skt != NULL) {
338#ifdef SERF_DEBUG_BUCKET_USE
339            check_buckets_drained(conn);
340#endif
341            continue;
342        }
343
344        /* Delay opening until we have something to deliver! */
345        if (conn->requests == NULL) {
346            continue;
347        }
348
349        apr_pool_clear(conn->skt_pool);
350        apr_pool_cleanup_register(conn->skt_pool, conn, clean_skt, clean_skt);
351
352        status = apr_socket_create(&skt, conn->address->family,
353                                   SOCK_STREAM,
354#if APR_MAJOR_VERSION > 0
355                                   APR_PROTO_TCP,
356#endif
357                                   conn->skt_pool);
358        serf__log(SOCK_VERBOSE, __FILE__,
359                  "created socket for conn 0x%x, status %d\n", conn, status);
360        if (status != APR_SUCCESS)
361            return status;
362
363        /* Set the socket to be non-blocking */
364        if ((status = apr_socket_timeout_set(skt, 0)) != APR_SUCCESS)
365            return status;
366
367        /* Disable Nagle's algorithm */
368        if ((status = apr_socket_opt_set(skt,
369                                         APR_TCP_NODELAY, 1)) != APR_SUCCESS)
370            return status;
371
372        /* Configured. Store it into the connection now. */
373        conn->skt = skt;
374
375        /* Remember time when we started connecting to server to calculate
376           network latency. */
377        conn->connect_time = apr_time_now();
378
379        /* Now that the socket is set up, let's connect it. This should
380         * return immediately.
381         */
382        status = apr_socket_connect(skt, conn->address);
383        serf__log_skt(SOCK_VERBOSE, __FILE__, skt,
384                      "connected socket for conn 0x%x, status %d\n",
385                      conn, status);
386        if (status != APR_SUCCESS) {
387            if (!APR_STATUS_IS_EINPROGRESS(status))
388                return status;
389        }
390
391        /* Flag our pollset as dirty now that we have a new socket. */
392        conn->dirty_conn = 1;
393        ctx->dirty_pollset = 1;
394
395        /* If the authentication was already started on another connection,
396           prepare this connection (it might be possible to skip some
397           part of the handshaking). */
398        if (ctx->proxy_address) {
399            authn_info = &ctx->proxy_authn_info;
400            if (authn_info->scheme) {
401                authn_info->scheme->init_conn_func(authn_info->scheme, 407,
402                                                   conn, conn->pool);
403            }
404        }
405
406        authn_info = serf__get_authn_info_for_server(conn);
407        if (authn_info->scheme) {
408            authn_info->scheme->init_conn_func(authn_info->scheme, 401,
409                                               conn, conn->pool);
410        }
411
412        /* Does this connection require a SSL tunnel over the proxy? */
413        if (ctx->proxy_address && strcmp(conn->host_info.scheme, "https") == 0)
414            serf__ssltunnel_connect(conn);
415        else {
416            serf_bucket_t *dummy1, *dummy2;
417
418            conn->state = SERF_CONN_CONNECTED;
419
420            status = prepare_conn_streams(conn, &conn->stream,
421                                          &dummy1, &dummy2);
422            if (status) {
423                return status;
424            }
425        }
426    }
427
428    return APR_SUCCESS;
429}
430
431static apr_status_t no_more_writes(serf_connection_t *conn)
432{
433    /* Note that we should hold new requests until we open our new socket. */
434    conn->state = SERF_CONN_CLOSING;
435    serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
436                  "stop writing on conn 0x%x\n", conn);
437
438    /* Clear our iovec. */
439    conn->vec_len = 0;
440
441    /* Update the pollset to know we don't want to write on this socket any
442     * more.
443     */
444    conn->dirty_conn = 1;
445    conn->ctx->dirty_pollset = 1;
446    return APR_SUCCESS;
447}
448
449/* Read the 'Connection' header from the response. Return SERF_ERROR_CLOSING if
450 * the header contains value 'close' indicating the server is closing the
451 * connection right after this response.
452 * Otherwise returns APR_SUCCESS.
453 */
454static apr_status_t is_conn_closing(serf_bucket_t *response)
455{
456    serf_bucket_t *hdrs;
457    const char *val;
458
459    hdrs = serf_bucket_response_get_headers(response);
460    val = serf_bucket_headers_get(hdrs, "Connection");
461    if (val && strcasecmp("close", val) == 0)
462        {
463            return SERF_ERROR_CLOSING;
464        }
465
466    return APR_SUCCESS;
467}
468
469static void link_requests(serf_request_t **list, serf_request_t **tail,
470                          serf_request_t *request)
471{
472    if (*list == NULL) {
473        *list = request;
474        *tail = request;
475    }
476    else {
477        (*tail)->next = request;
478        *tail = request;
479    }
480}
481
482static apr_status_t destroy_request(serf_request_t *request)
483{
484    serf_connection_t *conn = request->conn;
485
486    /* The request and response buckets are no longer needed,
487       nor is the request's pool.  */
488    if (request->resp_bkt) {
489        serf_debug__closed_conn(request->resp_bkt->allocator);
490        serf_bucket_destroy(request->resp_bkt);
491        request->resp_bkt = NULL;
492    }
493    if (request->req_bkt) {
494        serf_debug__closed_conn(request->req_bkt->allocator);
495        serf_bucket_destroy(request->req_bkt);
496        request->req_bkt = NULL;
497    }
498
499    serf_debug__bucket_alloc_check(request->allocator);
500    if (request->respool) {
501        /* ### unregister the pool cleanup for self?  */
502        apr_pool_destroy(request->respool);
503    }
504
505    serf_bucket_mem_free(conn->allocator, request);
506
507    return APR_SUCCESS;
508}
509
510static apr_status_t cancel_request(serf_request_t *request,
511                                   serf_request_t **list,
512                                   int notify_request)
513{
514    /* If we haven't run setup, then we won't have a handler to call. */
515    if (request->handler && notify_request) {
516        /* We actually don't care what the handler returns.
517         * We have bigger matters at hand.
518         */
519        (*request->handler)(request, NULL, request->handler_baton,
520                            request->respool);
521    }
522
523    if (*list == request) {
524        *list = request->next;
525    }
526    else {
527        serf_request_t *scan = *list;
528
529        while (scan->next && scan->next != request)
530            scan = scan->next;
531
532        if (scan->next) {
533            scan->next = scan->next->next;
534        }
535    }
536
537    return destroy_request(request);
538}
539
540static apr_status_t remove_connection(serf_context_t *ctx,
541                                      serf_connection_t *conn)
542{
543    apr_pollfd_t desc = { 0 };
544
545    desc.desc_type = APR_POLL_SOCKET;
546    desc.desc.s = conn->skt;
547    desc.reqevents = conn->reqevents;
548
549    return ctx->pollset_rm(ctx->pollset_baton,
550                           &desc, conn);
551}
552
553/* A socket was closed, inform the application. */
554static void handle_conn_closed(serf_connection_t *conn, apr_status_t status)
555{
556    (*conn->closed)(conn, conn->closed_baton, status,
557                    conn->pool);
558}
559
560static apr_status_t reset_connection(serf_connection_t *conn,
561                                     int requeue_requests)
562{
563    serf_context_t *ctx = conn->ctx;
564    apr_status_t status;
565    serf_request_t *old_reqs;
566
567    conn->probable_keepalive_limit = conn->completed_responses;
568    conn->completed_requests = 0;
569    conn->completed_responses = 0;
570
571    old_reqs = conn->requests;
572
573    conn->requests = NULL;
574    conn->requests_tail = NULL;
575
576    /* Handle all outstanding requests. These have either not been written yet,
577       or have been written but the expected reply wasn't received yet. */
578    while (old_reqs) {
579        /* If we haven't started to write the connection, bring it over
580         * unchanged to our new socket.
581         * Do not copy a CONNECT request to the new connection, the ssl tunnel
582         * setup code will create a new CONNECT request already.
583         */
584        if (requeue_requests && !old_reqs->writing_started &&
585            !old_reqs->ssltunnel) {
586
587            serf_request_t *req = old_reqs;
588            old_reqs = old_reqs->next;
589            req->next = NULL;
590            link_requests(&conn->requests, &conn->requests_tail, req);
591        }
592        else {
593            /* Request has been consumed, or we don't want to requeue the
594               request. Either way, inform the application that the request
595               is cancelled. */
596            cancel_request(old_reqs, &old_reqs, requeue_requests);
597        }
598    }
599
600    /* Requests queue has been prepared for a new socket, close the old one. */
601    if (conn->skt != NULL) {
602        remove_connection(ctx, conn);
603        status = apr_socket_close(conn->skt);
604        serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
605                      "closed socket, status %d\n", status);
606        if (conn->closed != NULL) {
607            handle_conn_closed(conn, status);
608        }
609        conn->skt = NULL;
610    }
611
612    if (conn->stream != NULL) {
613        serf_bucket_destroy(conn->stream);
614        conn->stream = NULL;
615    }
616
617    destroy_ostream(conn);
618
619    /* Don't try to resume any writes */
620    conn->vec_len = 0;
621
622    conn->dirty_conn = 1;
623    conn->ctx->dirty_pollset = 1;
624    conn->state = SERF_CONN_INIT;
625
626    serf__log(CONN_VERBOSE, __FILE__, "reset connection 0x%x\n", conn);
627
628    conn->status = APR_SUCCESS;
629
630    /* Let our context know that we've 'reset' the socket already. */
631    conn->seen_in_pollset |= APR_POLLHUP;
632
633    /* Found the connection. Closed it. All done. */
634    return APR_SUCCESS;
635}
636
637static apr_status_t socket_writev(serf_connection_t *conn)
638{
639    apr_size_t written;
640    apr_status_t status;
641
642    status = apr_socket_sendv(conn->skt, conn->vec,
643                              conn->vec_len, &written);
644    if (status && !APR_STATUS_IS_EAGAIN(status))
645        serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
646                      "socket_sendv error %d\n", status);
647
648    /* did we write everything? */
649    if (written) {
650        apr_size_t len = 0;
651        int i;
652
653        serf__log_skt(SOCK_MSG_VERBOSE, __FILE__, conn->skt,
654                      "--- socket_sendv:\n");
655
656        for (i = 0; i < conn->vec_len; i++) {
657            len += conn->vec[i].iov_len;
658            if (written < len) {
659                serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
660                                   conn->vec[i].iov_len - (len - written),
661                                   conn->vec[i].iov_base);
662                if (i) {
663                    memmove(conn->vec, &conn->vec[i],
664                            sizeof(struct iovec) * (conn->vec_len - i));
665                    conn->vec_len -= i;
666                }
667                conn->vec[0].iov_base = (char *)conn->vec[0].iov_base + (conn->vec[0].iov_len - (len - written));
668                conn->vec[0].iov_len = len - written;
669                break;
670            } else {
671                serf__log_nopref(SOCK_MSG_VERBOSE, "%.*s",
672                                   conn->vec[i].iov_len, conn->vec[i].iov_base);
673            }
674        }
675        if (len == written) {
676            conn->vec_len = 0;
677        }
678        serf__log_nopref(SOCK_MSG_VERBOSE, "-(%d)-\n", written);
679
680        /* Log progress information */
681        serf__context_progress_delta(conn->ctx, 0, written);
682    }
683
684    return status;
685}
686
687static apr_status_t setup_request(serf_request_t *request)
688{
689    serf_connection_t *conn = request->conn;
690    apr_status_t status;
691
692    /* Now that we are about to serve the request, allocate a pool. */
693    apr_pool_create(&request->respool, conn->pool);
694    request->allocator = serf_bucket_allocator_create(request->respool,
695                                                      NULL, NULL);
696    apr_pool_cleanup_register(request->respool, request,
697                              clean_resp, clean_resp);
698
699    /* Fill in the rest of the values for the request. */
700    status = request->setup(request, request->setup_baton,
701                            &request->req_bkt,
702                            &request->acceptor,
703                            &request->acceptor_baton,
704                            &request->handler,
705                            &request->handler_baton,
706                            request->respool);
707    return status;
708}
709
710/* write data out to the connection */
711static apr_status_t write_to_connection(serf_connection_t *conn)
712{
713    if (conn->probable_keepalive_limit &&
714        conn->completed_requests > conn->probable_keepalive_limit) {
715
716        conn->dirty_conn = 1;
717        conn->ctx->dirty_pollset = 1;
718
719        /* backoff for now. */
720        return APR_SUCCESS;
721    }
722
723    /* Keep reading and sending until we run out of stuff to read, or
724     * writing would block.
725     */
726    while (1) {
727        serf_request_t *request;
728        int stop_reading = 0;
729        apr_status_t status;
730        apr_status_t read_status;
731        serf_bucket_t *ostreamt;
732        serf_bucket_t *ostreamh;
733        int max_outstanding_requests = conn->max_outstanding_requests;
734
735        /* If we're setting up an ssl tunnel, we can't send real requests
736           at yet, as they need to be encrypted and our encrypt buckets
737           aren't created yet as we still need to read the unencrypted
738           response of the CONNECT request. */
739        if (conn->state != SERF_CONN_CONNECTED)
740            max_outstanding_requests = 1;
741
742        if (max_outstanding_requests &&
743            conn->completed_requests -
744                conn->completed_responses >= max_outstanding_requests) {
745            /* backoff for now. */
746            return APR_SUCCESS;
747        }
748
749        /* If we have unwritten data, then write what we can. */
750        while (conn->vec_len) {
751            status = socket_writev(conn);
752
753            /* If the write would have blocked, then we're done. Don't try
754             * to write anything else to the socket.
755             */
756            if (APR_STATUS_IS_EAGAIN(status))
757                return APR_SUCCESS;
758            if (APR_STATUS_IS_EPIPE(status) ||
759                APR_STATUS_IS_ECONNRESET(status) ||
760                APR_STATUS_IS_ECONNABORTED(status))
761                return no_more_writes(conn);
762            if (status)
763                return status;
764        }
765        /* ### can we have a short write, yet no EAGAIN? a short write
766           ### would imply unwritten_len > 0 ... */
767        /* assert: unwritten_len == 0. */
768
769        /* We may need to move forward to a request which has something
770         * to write.
771         */
772        if (!request_or_data_pending(&request, conn)) {
773            /* No more requests (with data) are registered with the
774             * connection, and no data is pending on the outgoing stream.
775             * Let's update the pollset so that we don't try to write to this
776             * socket again.
777             */
778            conn->dirty_conn = 1;
779            conn->ctx->dirty_pollset = 1;
780            return APR_SUCCESS;
781        }
782
783        status = prepare_conn_streams(conn, &conn->stream, &ostreamt, &ostreamh);
784        if (status) {
785            return status;
786        }
787
788        if (request) {
789            if (request->req_bkt == NULL) {
790                read_status = setup_request(request);
791                if (read_status) {
792                    /* Something bad happened. Propagate any errors. */
793                    return read_status;
794                }
795            }
796
797            if (!request->writing_started) {
798                request->writing_started = 1;
799                serf_bucket_aggregate_append(ostreamt, request->req_bkt);
800            }
801        }
802
803        /* ### optimize at some point by using read_for_sendfile */
804        /* TODO: now that read_iovec will effectively try to return as much
805           data as available, we probably don't want to read ALL_AVAIL, but
806           a lower number, like the size of one or a few TCP packets, the
807           available TCP buffer size ... */
808        read_status = serf_bucket_read_iovec(ostreamh,
809                                             SERF_READ_ALL_AVAIL,
810                                             IOV_MAX,
811                                             conn->vec,
812                                             &conn->vec_len);
813
814        if (!conn->hit_eof) {
815            if (APR_STATUS_IS_EAGAIN(read_status)) {
816                /* We read some stuff, but should not try to read again. */
817                stop_reading = 1;
818            }
819            else if (read_status == SERF_ERROR_WAIT_CONN) {
820                /* The bucket told us that it can't provide more data until
821                   more data is read from the socket. This normally happens
822                   during a SSL handshake.
823
824                   We should avoid looking for writability for a while so
825                   that (hopefully) something will appear in the bucket so
826                   we can actually write something. otherwise, we could
827                   end up in a CPU spin: socket wants something, but we
828                   don't have anything (and keep returning EAGAIN)
829                 */
830                conn->stop_writing = 1;
831                conn->dirty_conn = 1;
832                conn->ctx->dirty_pollset = 1;
833            }
834            else if (read_status && !APR_STATUS_IS_EOF(read_status)) {
835                /* Something bad happened. Propagate any errors. */
836                return read_status;
837            }
838        }
839
840        /* If we got some data, then deliver it. */
841        /* ### what to do if we got no data?? is that a problem? */
842        if (conn->vec_len > 0) {
843            status = socket_writev(conn);
844
845            /* If we can't write any more, or an error occurred, then
846             * we're done here.
847             */
848            if (APR_STATUS_IS_EAGAIN(status))
849                return APR_SUCCESS;
850            if (APR_STATUS_IS_EPIPE(status))
851                return no_more_writes(conn);
852            if (APR_STATUS_IS_ECONNRESET(status) ||
853                APR_STATUS_IS_ECONNABORTED(status)) {
854                return no_more_writes(conn);
855            }
856            if (status)
857                return status;
858        }
859
860        if (read_status == SERF_ERROR_WAIT_CONN) {
861            stop_reading = 1;
862            conn->stop_writing = 1;
863            conn->dirty_conn = 1;
864            conn->ctx->dirty_pollset = 1;
865        }
866        else if (request && read_status && conn->hit_eof &&
867                 conn->vec_len == 0) {
868            /* If we hit the end of the request bucket and all of its data has
869             * been written, then clear it out to signify that we're done
870             * sending the request. On the next iteration through this loop:
871             * - if there are remaining bytes they will be written, and as the
872             * request bucket will be completely read it will be destroyed then.
873             * - we'll see if there are other requests that need to be sent
874             * ("pipelining").
875             */
876            conn->hit_eof = 0;
877            serf_bucket_destroy(request->req_bkt);
878            request->req_bkt = NULL;
879
880            /* If our connection has async responses enabled, we're not
881             * going to get a reply back, so kill the request.
882             */
883            if (conn->async_responses) {
884                conn->requests = request->next;
885                destroy_request(request);
886            }
887
888            conn->completed_requests++;
889
890            if (conn->probable_keepalive_limit &&
891                conn->completed_requests > conn->probable_keepalive_limit) {
892                /* backoff for now. */
893                stop_reading = 1;
894            }
895        }
896
897        if (stop_reading) {
898            return APR_SUCCESS;
899        }
900    }
901    /* NOTREACHED */
902}
903
904/* A response message was received from the server, so call
905   the handler as specified on the original request. */
906static apr_status_t handle_response(serf_request_t *request,
907                                    apr_pool_t *pool)
908{
909    apr_status_t status = APR_SUCCESS;
910    int consumed_response = 0;
911
912    /* Only enable the new authentication framework if the program has
913     * registered an authentication credential callback.
914     *
915     * This permits older Serf apps to still handle authentication
916     * themselves by not registering credential callbacks.
917     */
918    if (request->conn->ctx->cred_cb) {
919      status = serf__handle_auth_response(&consumed_response,
920                                          request,
921                                          request->resp_bkt,
922                                          request->handler_baton,
923                                          pool);
924
925      /* If there was an error reading the response (maybe there wasn't
926         enough data available), don't bother passing the response to the
927         application.
928
929         If the authentication was tried, but failed, pass the response
930         to the application, maybe it can do better. */
931      if (status) {
932          return status;
933      }
934    }
935
936    if (!consumed_response) {
937        return (*request->handler)(request,
938                                   request->resp_bkt,
939                                   request->handler_baton,
940                                   pool);
941    }
942
943    return status;
944}
945
946/* An async response message was received from the server. */
947static apr_status_t handle_async_response(serf_connection_t *conn,
948                                          apr_pool_t *pool)
949{
950    apr_status_t status;
951
952    if (conn->current_async_response == NULL) {
953        conn->current_async_response =
954            (*conn->async_acceptor)(NULL, conn->stream,
955                                    conn->async_acceptor_baton, pool);
956    }
957
958    status = (*conn->async_handler)(NULL, conn->current_async_response,
959                                    conn->async_handler_baton, pool);
960
961    if (APR_STATUS_IS_EOF(status)) {
962        serf_bucket_destroy(conn->current_async_response);
963        conn->current_async_response = NULL;
964        status = APR_SUCCESS;
965    }
966
967    return status;
968}
969
970
971apr_status_t
972serf__provide_credentials(serf_context_t *ctx,
973                          char **username,
974                          char **password,
975                          serf_request_t *request, void *baton,
976                          int code, const char *authn_type,
977                          const char *realm,
978                          apr_pool_t *pool)
979{
980    serf_connection_t *conn = request->conn;
981    serf_request_t *authn_req = request;
982    apr_status_t status;
983
984    if (request->ssltunnel == 1 &&
985        conn->state == SERF_CONN_SETUP_SSLTUNNEL) {
986        /* This is a CONNECT request to set up an SSL tunnel over a proxy.
987           This request is created by serf, so if the proxy requires
988           authentication, we can't ask the application for credentials with
989           this request.
990
991           Solution: setup the first request created by the application on
992           this connection, and use that request and its handler_baton to
993           call back to the application. */
994
995        authn_req = request->next;
996        /* assert: app_request != NULL */
997        if (!authn_req)
998            return APR_EGENERAL;
999
1000        if (!authn_req->req_bkt) {
1001            apr_status_t status;
1002
1003            status = setup_request(authn_req);
1004            /* If we can't setup a request, don't bother setting up the
1005               ssl tunnel. */
1006            if (status)
1007                return status;
1008        }
1009    }
1010
1011    /* Ask the application. */
1012    status = (*ctx->cred_cb)(username, password,
1013                             authn_req, authn_req->handler_baton,
1014                             code, authn_type, realm, pool);
1015    if (status)
1016        return status;
1017
1018    return APR_SUCCESS;
1019}
1020
1021/* read data from the connection */
1022static apr_status_t read_from_connection(serf_connection_t *conn)
1023{
1024    apr_status_t status;
1025    apr_pool_t *tmppool;
1026    int close_connection = FALSE;
1027
1028    /* Whatever is coming in on the socket corresponds to the first request
1029     * on our chain.
1030     */
1031    serf_request_t *request = conn->requests;
1032
1033    /* If the stop_writing flag was set on the connection, reset it now because
1034       there is some data to read. */
1035    if (conn->stop_writing) {
1036        conn->stop_writing = 0;
1037        conn->dirty_conn = 1;
1038        conn->ctx->dirty_pollset = 1;
1039    }
1040
1041    /* assert: request != NULL */
1042
1043    if ((status = apr_pool_create(&tmppool, conn->pool)) != APR_SUCCESS)
1044        goto error;
1045
1046    /* Invoke response handlers until we have no more work. */
1047    while (1) {
1048        serf_bucket_t *dummy1, *dummy2;
1049
1050        apr_pool_clear(tmppool);
1051
1052        /* Only interested in the input stream here. */
1053        status = prepare_conn_streams(conn, &conn->stream, &dummy1, &dummy2);
1054        if (status) {
1055            goto error;
1056        }
1057
1058        /* We have a different codepath when we can have async responses. */
1059        if (conn->async_responses) {
1060            /* TODO What about socket errors? */
1061            status = handle_async_response(conn, tmppool);
1062            if (APR_STATUS_IS_EAGAIN(status)) {
1063                status = APR_SUCCESS;
1064                goto error;
1065            }
1066            if (status) {
1067                goto error;
1068            }
1069            continue;
1070        }
1071
1072        /* We are reading a response for a request we haven't
1073         * written yet!
1074         *
1075         * This shouldn't normally happen EXCEPT:
1076         *
1077         * 1) when the other end has closed the socket and we're
1078         *    pending an EOF return.
1079         * 2) Doing the initial SSL handshake - we'll get EAGAIN
1080         *    as the SSL buckets will hide the handshake from us
1081         *    but not return any data.
1082         * 3) When the server sends us an SSL alert.
1083         *
1084         * In these cases, we should not receive any actual user data.
1085         *
1086         * 4) When the server sends a error response, like 408 Request timeout.
1087         *    This response should be passed to the application.
1088         *
1089         * If we see an EOF (due to either an expired timeout or the server
1090         * sending the SSL 'close notify' shutdown alert), we'll reset the
1091         * connection and open a new one.
1092         */
1093        if (request->req_bkt || !request->writing_started) {
1094            const char *data;
1095            apr_size_t len;
1096
1097            status = serf_bucket_peek(conn->stream, &data, &len);
1098
1099            if (APR_STATUS_IS_EOF(status)) {
1100                reset_connection(conn, 1);
1101                status = APR_SUCCESS;
1102                goto error;
1103            }
1104            else if (APR_STATUS_IS_EAGAIN(status) && !len) {
1105                status = APR_SUCCESS;
1106                goto error;
1107            } else if (status && !APR_STATUS_IS_EAGAIN(status)) {
1108                /* Read error */
1109                goto error;
1110            }
1111
1112            /* Unexpected response from the server */
1113
1114        }
1115
1116        /* If the request doesn't have a response bucket, then call the
1117         * acceptor to get one created.
1118         */
1119        if (request->resp_bkt == NULL) {
1120            request->resp_bkt = (*request->acceptor)(request, conn->stream,
1121                                                     request->acceptor_baton,
1122                                                     tmppool);
1123            apr_pool_clear(tmppool);
1124        }
1125
1126        status = handle_response(request, tmppool);
1127
1128        /* Some systems will not generate a HUP poll event so we have to
1129         * handle the ECONNRESET issue and ECONNABORT here.
1130         */
1131        if (APR_STATUS_IS_ECONNRESET(status) ||
1132            APR_STATUS_IS_ECONNABORTED(status) ||
1133            status == SERF_ERROR_REQUEST_LOST) {
1134            /* If the connection had ever been good, be optimistic & try again.
1135             * If it has never tried again (incl. a retry), fail.
1136             */
1137            if (conn->completed_responses) {
1138                reset_connection(conn, 1);
1139                status = APR_SUCCESS;
1140            }
1141            else if (status == SERF_ERROR_REQUEST_LOST) {
1142                status = SERF_ERROR_ABORTED_CONNECTION;
1143            }
1144            goto error;
1145        }
1146
1147        /* If our response handler says it can't do anything more, we now
1148         * treat that as a success.
1149         */
1150        if (APR_STATUS_IS_EAGAIN(status)) {
1151            /* It is possible that while reading the response, the ssl layer
1152               has prepared some data to send. If this was the last request,
1153               serf will not check for socket writability, so force this here.
1154             */
1155            if (request_or_data_pending(&request, conn) && !request) {
1156                conn->dirty_conn = 1;
1157                conn->ctx->dirty_pollset = 1;
1158            }
1159            status = APR_SUCCESS;
1160            goto error;
1161        }
1162
1163        /* If we received APR_SUCCESS, run this loop again. */
1164        if (!status) {
1165            continue;
1166        }
1167
1168        close_connection = is_conn_closing(request->resp_bkt);
1169
1170        if (!APR_STATUS_IS_EOF(status) &&
1171            close_connection != SERF_ERROR_CLOSING) {
1172            /* Whether success, or an error, there is no more to do unless
1173             * this request has been completed.
1174             */
1175            goto error;
1176        }
1177
1178        /* The response has been fully-read, so that means the request has
1179         * either been fully-delivered (most likely), or that we don't need to
1180         * write the rest of it anymore, e.g. when a 408 Request timeout was
1181         $ received.
1182         * Remove it from our queue and loop to read another response.
1183         */
1184        conn->requests = request->next;
1185
1186        destroy_request(request);
1187
1188        request = conn->requests;
1189
1190        /* If we're truly empty, update our tail. */
1191        if (request == NULL) {
1192            conn->requests_tail = NULL;
1193        }
1194
1195        conn->completed_responses++;
1196
1197        /* We've to rebuild pollset since completed_responses is changed. */
1198        conn->dirty_conn = 1;
1199        conn->ctx->dirty_pollset = 1;
1200
1201        /* This means that we're being advised that the connection is done. */
1202        if (close_connection == SERF_ERROR_CLOSING) {
1203            reset_connection(conn, 1);
1204            if (APR_STATUS_IS_EOF(status))
1205                status = APR_SUCCESS;
1206            goto error;
1207        }
1208
1209        /* The server is suddenly deciding to serve more responses than we've
1210         * seen before.
1211         *
1212         * Let our requests go.
1213         */
1214        if (conn->probable_keepalive_limit &&
1215            conn->completed_responses > conn->probable_keepalive_limit) {
1216            conn->probable_keepalive_limit = 0;
1217        }
1218
1219        /* If we just ran out of requests or have unwritten requests, then
1220         * update the pollset. We don't want to read from this socket any
1221         * more. We are definitely done with this loop, too.
1222         */
1223        if (request == NULL || !request->writing_started) {
1224            conn->dirty_conn = 1;
1225            conn->ctx->dirty_pollset = 1;
1226            status = APR_SUCCESS;
1227            goto error;
1228        }
1229    }
1230
1231error:
1232    apr_pool_destroy(tmppool);
1233    return status;
1234}
1235
1236/* process all events on the connection */
1237apr_status_t serf__process_connection(serf_connection_t *conn,
1238                                      apr_int16_t events)
1239{
1240    apr_status_t status;
1241
1242    /* POLLHUP/ERR should come after POLLIN so if there's an error message or
1243     * the like sitting on the connection, we give the app a chance to read
1244     * it before we trigger a reset condition.
1245     */
1246    if ((events & APR_POLLIN) != 0) {
1247        if ((status = read_from_connection(conn)) != APR_SUCCESS)
1248            return status;
1249
1250        /* If we decided to reset our connection, return now as we don't
1251         * want to write.
1252         */
1253        if ((conn->seen_in_pollset & APR_POLLHUP) != 0) {
1254            return APR_SUCCESS;
1255        }
1256    }
1257    if ((events & APR_POLLHUP) != 0) {
1258        /* The connection got reset by the server. On Windows this can happen
1259           when all data is read, so just cleanup the connection and open
1260           a new one.
1261           If we haven't had any successful responses on this connection,
1262           then error out as it is likely a server issue. */
1263        if (conn->completed_responses) {
1264            return reset_connection(conn, 1);
1265        }
1266        return SERF_ERROR_ABORTED_CONNECTION;
1267    }
1268    if ((events & APR_POLLERR) != 0) {
1269        /* We might be talking to a buggy HTTP server that doesn't
1270         * do lingering-close.  (httpd < 2.1.8 does this.)
1271         *
1272         * See:
1273         *
1274         * http://issues.apache.org/bugzilla/show_bug.cgi?id=35292
1275         */
1276        if (conn->completed_requests && !conn->probable_keepalive_limit) {
1277            return reset_connection(conn, 1);
1278        }
1279#ifdef SO_ERROR
1280        /* If possible, get the error from the platform's socket layer and
1281           convert it to an APR status code. */
1282        {
1283            apr_os_sock_t osskt;
1284            if (!apr_os_sock_get(&osskt, conn->skt)) {
1285                int error;
1286                apr_socklen_t l = sizeof(error);
1287
1288                if (!getsockopt(osskt, SOL_SOCKET, SO_ERROR, (char*)&error,
1289                                &l)) {
1290                    status = APR_FROM_OS_ERROR(error);
1291
1292                    /* Handle fallback for multi-homed servers.
1293
1294                       ### Improve algorithm to find better than just 'next'?
1295
1296                       Current Windows versions already handle re-ordering for
1297                       api users by using statistics on the recently failed
1298                       connections to order the list of addresses. */
1299                    if (conn->completed_requests == 0
1300                        && conn->address->next != NULL
1301                        && (APR_STATUS_IS_ECONNREFUSED(status)
1302                            || APR_STATUS_IS_TIMEUP(status)
1303                            || APR_STATUS_IS_ENETUNREACH(status))) {
1304
1305                        conn->address = conn->address->next;
1306                        return reset_connection(conn, 1);
1307                    }
1308
1309                    return status;
1310                  }
1311            }
1312        }
1313#endif
1314        return APR_EGENERAL;
1315    }
1316    if ((events & APR_POLLOUT) != 0) {
1317        if ((status = write_to_connection(conn)) != APR_SUCCESS)
1318            return status;
1319    }
1320    return APR_SUCCESS;
1321}
1322
1323serf_connection_t *serf_connection_create(
1324    serf_context_t *ctx,
1325    apr_sockaddr_t *address,
1326    serf_connection_setup_t setup,
1327    void *setup_baton,
1328    serf_connection_closed_t closed,
1329    void *closed_baton,
1330    apr_pool_t *pool)
1331{
1332    serf_connection_t *conn = apr_pcalloc(pool, sizeof(*conn));
1333
1334    conn->ctx = ctx;
1335    conn->status = APR_SUCCESS;
1336    /* Ignore server address if proxy was specified. */
1337    conn->address = ctx->proxy_address ? ctx->proxy_address : address;
1338    conn->setup = setup;
1339    conn->setup_baton = setup_baton;
1340    conn->closed = closed;
1341    conn->closed_baton = closed_baton;
1342    conn->pool = pool;
1343    conn->allocator = serf_bucket_allocator_create(pool, NULL, NULL);
1344    conn->stream = NULL;
1345    conn->ostream_head = NULL;
1346    conn->ostream_tail = NULL;
1347    conn->baton.type = SERF_IO_CONN;
1348    conn->baton.u.conn = conn;
1349    conn->hit_eof = 0;
1350    conn->state = SERF_CONN_INIT;
1351    conn->latency = -1; /* unknown */
1352
1353    /* Create a subpool for our connection. */
1354    apr_pool_create(&conn->skt_pool, conn->pool);
1355
1356    /* register a cleanup */
1357    apr_pool_cleanup_register(conn->pool, conn, clean_conn,
1358                              apr_pool_cleanup_null);
1359
1360    /* Add the connection to the context. */
1361    *(serf_connection_t **)apr_array_push(ctx->conns) = conn;
1362
1363    serf__log(CONN_VERBOSE, __FILE__, "created connection 0x%x\n",
1364              conn);
1365
1366    return conn;
1367}
1368
1369apr_status_t serf_connection_create2(
1370    serf_connection_t **conn,
1371    serf_context_t *ctx,
1372    apr_uri_t host_info,
1373    serf_connection_setup_t setup,
1374    void *setup_baton,
1375    serf_connection_closed_t closed,
1376    void *closed_baton,
1377    apr_pool_t *pool)
1378{
1379    apr_status_t status = APR_SUCCESS;
1380    serf_connection_t *c;
1381    apr_sockaddr_t *host_address = NULL;
1382
1383    /* Set the port number explicitly, needed to create the socket later. */
1384    if (!host_info.port) {
1385        host_info.port = apr_uri_port_of_scheme(host_info.scheme);
1386    }
1387
1388    /* Only lookup the address of the server if no proxy server was
1389       configured. */
1390    if (!ctx->proxy_address) {
1391        status = apr_sockaddr_info_get(&host_address,
1392                                       host_info.hostname,
1393                                       APR_UNSPEC, host_info.port, 0, pool);
1394        if (status)
1395            return status;
1396    }
1397
1398    c = serf_connection_create(ctx, host_address, setup, setup_baton,
1399                               closed, closed_baton, pool);
1400
1401    /* We're not interested in the path following the hostname. */
1402    c->host_url = apr_uri_unparse(c->pool,
1403                                  &host_info,
1404                                  APR_URI_UNP_OMITPATHINFO |
1405                                  APR_URI_UNP_OMITUSERINFO);
1406
1407    /* Store the host info without the path on the connection. */
1408    (void)apr_uri_parse(c->pool, c->host_url, &(c->host_info));
1409    if (!c->host_info.port) {
1410        c->host_info.port = apr_uri_port_of_scheme(c->host_info.scheme);
1411    }
1412
1413    *conn = c;
1414
1415    return status;
1416}
1417
1418apr_status_t serf_connection_reset(
1419    serf_connection_t *conn)
1420{
1421    return reset_connection(conn, 0);
1422}
1423
1424
1425apr_status_t serf_connection_close(
1426    serf_connection_t *conn)
1427{
1428    int i;
1429    serf_context_t *ctx = conn->ctx;
1430    apr_status_t status;
1431
1432    for (i = ctx->conns->nelts; i--; ) {
1433        serf_connection_t *conn_seq = GET_CONN(ctx, i);
1434
1435        if (conn_seq == conn) {
1436            while (conn->requests) {
1437                serf_request_cancel(conn->requests);
1438            }
1439            if (conn->skt != NULL) {
1440                remove_connection(ctx, conn);
1441                status = apr_socket_close(conn->skt);
1442                serf__log_skt(SOCK_VERBOSE, __FILE__, conn->skt,
1443                              "closed socket, status %d\n",
1444                              status);
1445                if (conn->closed != NULL) {
1446                    handle_conn_closed(conn, status);
1447                }
1448                conn->skt = NULL;
1449            }
1450            if (conn->stream != NULL) {
1451                serf_bucket_destroy(conn->stream);
1452                conn->stream = NULL;
1453            }
1454
1455            destroy_ostream(conn);
1456
1457            /* Remove the connection from the context. We don't want to
1458             * deal with it any more.
1459             */
1460            if (i < ctx->conns->nelts - 1) {
1461                /* move later connections over this one. */
1462                memmove(
1463                    &GET_CONN(ctx, i),
1464                    &GET_CONN(ctx, i + 1),
1465                    (ctx->conns->nelts - i - 1) * sizeof(serf_connection_t *));
1466            }
1467            --ctx->conns->nelts;
1468
1469            serf__log(CONN_VERBOSE, __FILE__, "closed connection 0x%x\n",
1470                      conn);
1471
1472            /* Found the connection. Closed it. All done. */
1473            return APR_SUCCESS;
1474        }
1475    }
1476
1477    /* We didn't find the specified connection. */
1478    /* ### doc talks about this w.r.t poll structures. use something else? */
1479    return APR_NOTFOUND;
1480}
1481
1482
1483void serf_connection_set_max_outstanding_requests(
1484    serf_connection_t *conn,
1485    unsigned int max_requests)
1486{
1487    if (max_requests == 0)
1488        serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1489                      "Set max. nr. of outstanding requests for this "
1490                      "connection to unlimited.\n");
1491    else
1492        serf__log_skt(CONN_VERBOSE, __FILE__, conn->skt,
1493                      "Limit max. nr. of outstanding requests for this "
1494                      "connection to %u.\n", max_requests);
1495
1496    conn->max_outstanding_requests = max_requests;
1497}
1498
1499
1500void serf_connection_set_async_responses(
1501    serf_connection_t *conn,
1502    serf_response_acceptor_t acceptor,
1503    void *acceptor_baton,
1504    serf_response_handler_t handler,
1505    void *handler_baton)
1506{
1507    conn->async_responses = 1;
1508    conn->async_acceptor = acceptor;
1509    conn->async_acceptor_baton = acceptor_baton;
1510    conn->async_handler = handler;
1511    conn->async_handler_baton = handler_baton;
1512}
1513
1514static serf_request_t *
1515create_request(serf_connection_t *conn,
1516               serf_request_setup_t setup,
1517               void *setup_baton,
1518               int priority,
1519               int ssltunnel)
1520{
1521    serf_request_t *request;
1522
1523    request = serf_bucket_mem_alloc(conn->allocator, sizeof(*request));
1524    request->conn = conn;
1525    request->setup = setup;
1526    request->setup_baton = setup_baton;
1527    request->handler = NULL;
1528    request->respool = NULL;
1529    request->req_bkt = NULL;
1530    request->resp_bkt = NULL;
1531    request->priority = priority;
1532    request->writing_started = 0;
1533    request->ssltunnel = ssltunnel;
1534    request->next = NULL;
1535    request->auth_baton = NULL;
1536
1537    return request;
1538}
1539
1540serf_request_t *serf_connection_request_create(
1541    serf_connection_t *conn,
1542    serf_request_setup_t setup,
1543    void *setup_baton)
1544{
1545    serf_request_t *request;
1546
1547    request = create_request(conn, setup, setup_baton,
1548                             0, /* priority */
1549                             0  /* ssl tunnel */);
1550
1551    /* Link the request to the end of the request chain. */
1552    link_requests(&conn->requests, &conn->requests_tail, request);
1553
1554    /* Ensure our pollset becomes writable in context run */
1555    conn->ctx->dirty_pollset = 1;
1556    conn->dirty_conn = 1;
1557
1558    return request;
1559}
1560
1561static serf_request_t *
1562priority_request_create(serf_connection_t *conn,
1563                        int ssltunnelreq,
1564                        serf_request_setup_t setup,
1565                        void *setup_baton)
1566{
1567    serf_request_t *request;
1568    serf_request_t *iter, *prev;
1569
1570    request = create_request(conn, setup, setup_baton,
1571                             1, /* priority */
1572                             ssltunnelreq);
1573
1574    /* Link the new request after the last written request. */
1575    iter = conn->requests;
1576    prev = NULL;
1577
1578    /* Find a request that has data which needs to be delivered. */
1579    while (iter != NULL && iter->req_bkt == NULL && iter->writing_started) {
1580        prev = iter;
1581        iter = iter->next;
1582    }
1583
1584    /* A CONNECT request to setup an ssltunnel has absolute priority over all
1585       other requests on the connection, so:
1586       a. add it first to the queue
1587       b. ensure that other priority requests are added after the CONNECT
1588          request */
1589    if (!request->ssltunnel) {
1590        /* Advance to next non priority request */
1591        while (iter != NULL && iter->priority) {
1592            prev = iter;
1593            iter = iter->next;
1594        }
1595    }
1596
1597    if (prev) {
1598        request->next = iter;
1599        prev->next = request;
1600    } else {
1601        request->next = iter;
1602        conn->requests = request;
1603    }
1604
1605    /* Ensure our pollset becomes writable in context run */
1606    conn->ctx->dirty_pollset = 1;
1607    conn->dirty_conn = 1;
1608
1609    return request;
1610}
1611
1612serf_request_t *serf_connection_priority_request_create(
1613    serf_connection_t *conn,
1614    serf_request_setup_t setup,
1615    void *setup_baton)
1616{
1617    return priority_request_create(conn,
1618                                   0, /* not a ssltunnel CONNECT request */
1619                                   setup, setup_baton);
1620}
1621
1622serf_request_t *serf__ssltunnel_request_create(serf_connection_t *conn,
1623                                               serf_request_setup_t setup,
1624                                               void *setup_baton)
1625{
1626    return priority_request_create(conn,
1627                                   1, /* This is a ssltunnel CONNECT request */
1628                                   setup, setup_baton);
1629}
1630
1631apr_status_t serf_request_cancel(serf_request_t *request)
1632{
1633    return cancel_request(request, &request->conn->requests, 0);
1634}
1635
1636apr_status_t serf_request_is_written(serf_request_t *request)
1637{
1638    if (request->writing_started && !request->req_bkt)
1639        return APR_SUCCESS;
1640
1641    return APR_EBUSY;
1642}
1643
1644apr_pool_t *serf_request_get_pool(const serf_request_t *request)
1645{
1646    return request->respool;
1647}
1648
1649
1650serf_bucket_alloc_t *serf_request_get_alloc(
1651    const serf_request_t *request)
1652{
1653    return request->allocator;
1654}
1655
1656
1657serf_connection_t *serf_request_get_conn(
1658    const serf_request_t *request)
1659{
1660    return request->conn;
1661}
1662
1663
1664void serf_request_set_handler(
1665    serf_request_t *request,
1666    const serf_response_handler_t handler,
1667    const void **handler_baton)
1668{
1669    request->handler = handler;
1670    request->handler_baton = handler_baton;
1671}
1672
1673
1674serf_bucket_t *serf_request_bucket_request_create(
1675    serf_request_t *request,
1676    const char *method,
1677    const char *uri,
1678    serf_bucket_t *body,
1679    serf_bucket_alloc_t *allocator)
1680{
1681    serf_bucket_t *req_bkt, *hdrs_bkt;
1682    serf_connection_t *conn = request->conn;
1683    serf_context_t *ctx = conn->ctx;
1684    int ssltunnel;
1685
1686    ssltunnel = ctx->proxy_address &&
1687                (strcmp(conn->host_info.scheme, "https") == 0);
1688
1689    req_bkt = serf_bucket_request_create(method, uri, body, allocator);
1690    hdrs_bkt = serf_bucket_request_get_headers(req_bkt);
1691
1692    /* Use absolute uri's in requests to a proxy. USe relative uri's in
1693       requests directly to a server or sent through an SSL tunnel. */
1694    if (ctx->proxy_address && conn->host_url &&
1695        !(ssltunnel && !request->ssltunnel)) {
1696
1697        serf_bucket_request_set_root(req_bkt, conn->host_url);
1698    }
1699
1700    if (conn->host_info.hostinfo)
1701        serf_bucket_headers_setn(hdrs_bkt, "Host",
1702                                 conn->host_info.hostinfo);
1703
1704    /* Setup server authorization headers, unless this is a CONNECT request. */
1705    if (!request->ssltunnel) {
1706        serf__authn_info_t *authn_info;
1707        authn_info = serf__get_authn_info_for_server(conn);
1708        if (authn_info->scheme)
1709            authn_info->scheme->setup_request_func(HOST, 0, conn, request,
1710                                                   method, uri,
1711                                                   hdrs_bkt);
1712    }
1713
1714    /* Setup proxy authorization headers.
1715       Don't set these headers on the requests to the server if we're using
1716       an SSL tunnel, only on the CONNECT request to setup the tunnel. */
1717    if (ctx->proxy_authn_info.scheme) {
1718        if (strcmp(conn->host_info.scheme, "https") == 0) {
1719            if (request->ssltunnel)
1720                ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn,
1721                                                                 request,
1722                                                                 method, uri,
1723                                                                 hdrs_bkt);
1724        } else {
1725            ctx->proxy_authn_info.scheme->setup_request_func(PROXY, 0, conn,
1726                                                             request,
1727                                                             method, uri,
1728                                                             hdrs_bkt);
1729        }
1730    }
1731
1732    return req_bkt;
1733}
1734
1735apr_interval_time_t serf_connection_get_latency(serf_connection_t *conn)
1736{
1737    if (conn->ctx->proxy_address) {
1738        /* Detecting network latency for proxied connection is not implemented
1739           yet. */
1740        return -1;
1741    }
1742
1743    return conn->latency;
1744}
1745