response_buckets.c revision 362181
1/* ====================================================================
2 *    Licensed to the Apache Software Foundation (ASF) under one
3 *    or more contributor license agreements.  See the NOTICE file
4 *    distributed with this work for additional information
5 *    regarding copyright ownership.  The ASF licenses this file
6 *    to you under the Apache License, Version 2.0 (the
7 *    "License"); you may not use this file except in compliance
8 *    with the License.  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *    Unless required by applicable law or agreed to in writing,
13 *    software distributed under the License is distributed on an
14 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 *    KIND, either express or implied.  See the License for the
16 *    specific language governing permissions and limitations
17 *    under the License.
18 * ====================================================================
19 */
20
21#include <apr_lib.h>
22#include <apr_strings.h>
23#include <apr_date.h>
24
25#include "serf.h"
26#include "serf_bucket_util.h"
27#include "serf_private.h"
28
29typedef struct {
30    serf_bucket_t *stream;
31    serf_bucket_t *body;        /* Pointer to the stream wrapping the body. */
32    serf_bucket_t *headers;     /* holds parsed headers */
33
34    enum {
35        STATE_STATUS_LINE,      /* reading status line */
36        STATE_HEADERS,          /* reading headers */
37        STATE_BODY,             /* reading body */
38        STATE_TRAILERS,         /* reading trailers */
39        STATE_DONE              /* we've sent EOF */
40    } state;
41
42    /* Buffer for accumulating a line from the response. */
43    serf_linebuf_t linebuf;
44
45    serf_status_line sl;
46
47    int chunked;                /* Do we need to read trailers? */
48    int head_req;               /* Was this a HEAD request? */
49} response_context_t;
50
51/* Returns 1 if according to RFC2626 this response can have a body, 0 if it
52   must not have a body. */
53static int expect_body(response_context_t *ctx)
54{
55    if (ctx->head_req)
56        return 0;
57
58    /* 100 Continue and 101 Switching Protocols */
59    if (ctx->sl.code >= 100 && ctx->sl.code < 200)
60        return 0;
61
62    /* 204 No Content */
63    if (ctx->sl.code == 204)
64        return 0;
65
66    /* 205? */
67
68    /* 304 Not Modified */
69    if (ctx->sl.code == 304)
70        return 0;
71
72    return 1;
73}
74
75serf_bucket_t *serf_bucket_response_create(
76    serf_bucket_t *stream,
77    serf_bucket_alloc_t *allocator)
78{
79    response_context_t *ctx;
80
81    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
82    ctx->stream = stream;
83    ctx->body = NULL;
84    ctx->headers = serf_bucket_headers_create(allocator);
85    ctx->state = STATE_STATUS_LINE;
86    ctx->chunked = 0;
87    ctx->head_req = 0;
88
89    serf_linebuf_init(&ctx->linebuf);
90
91    return serf_bucket_create(&serf_bucket_type_response, allocator, ctx);
92}
93
94void serf_bucket_response_set_head(
95    serf_bucket_t *bucket)
96{
97    response_context_t *ctx = bucket->data;
98
99    ctx->head_req = 1;
100}
101
102serf_bucket_t *serf_bucket_response_get_headers(
103    serf_bucket_t *bucket)
104{
105    return ((response_context_t *)bucket->data)->headers;
106}
107
108
109static void serf_response_destroy_and_data(serf_bucket_t *bucket)
110{
111    response_context_t *ctx = bucket->data;
112
113    if (ctx->state != STATE_STATUS_LINE) {
114        serf_bucket_mem_free(bucket->allocator, (void*)ctx->sl.reason);
115    }
116
117    serf_bucket_destroy(ctx->stream);
118    if (ctx->body != NULL)
119        serf_bucket_destroy(ctx->body);
120    serf_bucket_destroy(ctx->headers);
121
122    serf_default_destroy_and_data(bucket);
123}
124
125static apr_status_t fetch_line(response_context_t *ctx, int acceptable)
126{
127    return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
128}
129
130static apr_status_t parse_status_line(response_context_t *ctx,
131                                      serf_bucket_alloc_t *allocator)
132{
133    int res;
134    char *reason; /* ### stupid APR interface makes this non-const */
135
136    /* Ensure a valid length, to avoid overflow on the final '\0' */
137    if (ctx->linebuf.used >= SERF_LINEBUF_LIMIT) {
138       return SERF_ERROR_BAD_HTTP_RESPONSE;
139    }
140
141    /* apr_date_checkmask assumes its arguments are valid C strings */
142    ctx->linebuf.line[ctx->linebuf.used] = '\0';
143
144    /* ctx->linebuf.line should be of form: 'HTTP/1.1 200 OK',
145       but we also explicitly allow the forms 'HTTP/1.1 200' (no reason)
146       and 'HTTP/1.1 401.1 Logon failed' (iis extended error codes) */
147    res = apr_date_checkmask(ctx->linebuf.line, "HTTP/#.# ###*");
148    if (!res) {
149        /* Not an HTTP response?  Well, at least we won't understand it. */
150        return SERF_ERROR_BAD_HTTP_RESPONSE;
151    }
152
153    ctx->sl.version = SERF_HTTP_VERSION(ctx->linebuf.line[5] - '0',
154                                        ctx->linebuf.line[7] - '0');
155    ctx->sl.code = apr_strtoi64(ctx->linebuf.line + 8, &reason, 10);
156
157    /* Skip leading spaces for the reason string. */
158    if (apr_isspace(*reason)) {
159        reason++;
160    }
161
162    /* Copy the reason value out of the line buffer. */
163    ctx->sl.reason = serf_bstrmemdup(allocator, reason,
164                                     ctx->linebuf.used
165                                     - (reason - ctx->linebuf.line));
166
167    return APR_SUCCESS;
168}
169
170/* This code should be replaced with header buckets. */
171static apr_status_t fetch_headers(serf_bucket_t *bkt, response_context_t *ctx)
172{
173    apr_status_t status;
174
175    /* RFC 2616 says that CRLF is the only line ending, but we can easily
176     * accept any kind of line ending.
177     */
178    status = fetch_line(ctx, SERF_NEWLINE_ANY);
179    if (SERF_BUCKET_READ_ERROR(status)) {
180        return status;
181    }
182    /* Something was read. Process it. */
183
184    if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
185        const char *end_key;
186        const char *c;
187
188        end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
189        if (!c) {
190            /* Bad headers? */
191            return SERF_ERROR_BAD_HTTP_RESPONSE;
192        }
193
194        /* Skip over initial ':' */
195        c++;
196
197        /* And skip all whitespaces. */
198        for(; c < ctx->linebuf.line + ctx->linebuf.used; c++)
199        {
200            if (!apr_isspace(*c))
201            {
202              break;
203            }
204        }
205
206        /* Always copy the headers (from the linebuf into new mem). */
207        /* ### we should be able to optimize some mem copies */
208        serf_bucket_headers_setx(
209            ctx->headers,
210            ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
211            c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
212    }
213
214    return status;
215}
216
217/* Perform one iteration of the state machine.
218 *
219 * Will return when one the following conditions occurred:
220 *  1) a state change
221 *  2) an error
222 *  3) the stream is not ready or at EOF
223 *  4) APR_SUCCESS, meaning the machine can be run again immediately
224 */
225static apr_status_t run_machine(serf_bucket_t *bkt, response_context_t *ctx)
226{
227    apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
228
229    switch (ctx->state) {
230    case STATE_STATUS_LINE:
231        /* RFC 2616 says that CRLF is the only line ending, but we can easily
232         * accept any kind of line ending.
233         */
234        status = fetch_line(ctx, SERF_NEWLINE_ANY);
235        if (SERF_BUCKET_READ_ERROR(status))
236            return status;
237
238        if (ctx->linebuf.state == SERF_LINEBUF_READY) {
239            /* The Status-Line is in the line buffer. Process it. */
240            status = parse_status_line(ctx, bkt->allocator);
241            if (status)
242                return status;
243
244            /* Good times ahead: we're switching protocols! */
245            if (ctx->sl.code == 101) {
246                ctx->body =
247                    serf_bucket_barrier_create(ctx->stream, bkt->allocator);
248                ctx->state = STATE_DONE;
249                break;
250            }
251
252            /* Okay... move on to reading the headers. */
253            ctx->state = STATE_HEADERS;
254        }
255        else {
256            /* The connection closed before we could get the next
257             * response.  Treat the request as lost so that our upper
258             * end knows the server never tried to give us a response.
259             */
260            if (APR_STATUS_IS_EOF(status)) {
261                return SERF_ERROR_REQUEST_LOST;
262            }
263        }
264        break;
265    case STATE_HEADERS:
266        status = fetch_headers(bkt, ctx);
267        if (SERF_BUCKET_READ_ERROR(status))
268            return status;
269
270        /* If an empty line was read, then we hit the end of the headers.
271         * Move on to the body.
272         */
273        if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
274            const void *v;
275
276            /* Advance the state. */
277            ctx->state = STATE_BODY;
278
279            /* If this is a response to a HEAD request, or code == 1xx,204 or304
280               then we don't receive a real body. */
281            if (!expect_body(ctx)) {
282                ctx->body = serf_bucket_simple_create(NULL, 0, NULL, NULL,
283                                                      bkt->allocator);
284                ctx->state = STATE_BODY;
285                break;
286            }
287
288            ctx->body =
289                serf_bucket_barrier_create(ctx->stream, bkt->allocator);
290
291            /* Are we C-L, chunked, or conn close? */
292            v = serf_bucket_headers_get(ctx->headers, "Content-Length");
293            if (v) {
294                apr_uint64_t length;
295                length = apr_strtoi64(v, NULL, 10);
296                if (errno == ERANGE) {
297                    return APR_FROM_OS_ERROR(ERANGE);
298                }
299                ctx->body = serf_bucket_response_body_create(
300                              ctx->body, length, bkt->allocator);
301            }
302            else {
303                v = serf_bucket_headers_get(ctx->headers, "Transfer-Encoding");
304
305                /* Need to handle multiple transfer-encoding. */
306                if (v && strcasecmp("chunked", v) == 0) {
307                    ctx->chunked = 1;
308                    ctx->body = serf_bucket_dechunk_create(ctx->body,
309                                                           bkt->allocator);
310                }
311            }
312            v = serf_bucket_headers_get(ctx->headers, "Content-Encoding");
313            if (v) {
314                /* Need to handle multiple content-encoding. */
315                if (v && strcasecmp("gzip", v) == 0) {
316                    ctx->body =
317                        serf_bucket_deflate_create(ctx->body, bkt->allocator,
318                                                   SERF_DEFLATE_GZIP);
319                }
320                else if (v && strcasecmp("deflate", v) == 0) {
321                    ctx->body =
322                        serf_bucket_deflate_create(ctx->body, bkt->allocator,
323                                                   SERF_DEFLATE_DEFLATE);
324                }
325            }
326        }
327        break;
328    case STATE_BODY:
329        /* Don't do anything. */
330        break;
331    case STATE_TRAILERS:
332        status = fetch_headers(bkt, ctx);
333        if (SERF_BUCKET_READ_ERROR(status))
334            return status;
335
336        /* If an empty line was read, then we're done. */
337        if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
338            ctx->state = STATE_DONE;
339            return APR_EOF;
340        }
341        break;
342    case STATE_DONE:
343        return APR_EOF;
344    default:
345        /* Not reachable */
346        return APR_EGENERAL;
347    }
348
349    return status;
350}
351
352static apr_status_t wait_for_body(serf_bucket_t *bkt, response_context_t *ctx)
353{
354    apr_status_t status;
355
356    /* Keep reading and moving through states if we aren't at the BODY */
357    while (ctx->state != STATE_BODY) {
358        status = run_machine(bkt, ctx);
359
360        /* Anything other than APR_SUCCESS means that we cannot immediately
361         * read again (for now).
362         */
363        if (status)
364            return status;
365    }
366    /* in STATE_BODY */
367
368    return APR_SUCCESS;
369}
370
371apr_status_t serf_bucket_response_wait_for_headers(
372    serf_bucket_t *bucket)
373{
374    response_context_t *ctx = bucket->data;
375
376    return wait_for_body(bucket, ctx);
377}
378
379apr_status_t serf_bucket_response_status(
380    serf_bucket_t *bkt,
381    serf_status_line *sline)
382{
383    response_context_t *ctx = bkt->data;
384    apr_status_t status;
385
386    if (ctx->state != STATE_STATUS_LINE) {
387        /* We already read it and moved on. Just return it. */
388        *sline = ctx->sl;
389        return APR_SUCCESS;
390    }
391
392    /* Running the state machine once will advance the machine, or state
393     * that the stream isn't ready with enough data. There isn't ever a
394     * need to run the machine more than once to try and satisfy this. We
395     * have to look at the state to tell whether it advanced, though, as
396     * it is quite possible to advance *and* to return APR_EAGAIN.
397     */
398    status = run_machine(bkt, ctx);
399    if (ctx->state == STATE_HEADERS) {
400        *sline = ctx->sl;
401    }
402    else {
403        /* Indicate that we don't have the information yet. */
404        sline->version = 0;
405    }
406
407    return status;
408}
409
410static apr_status_t serf_response_read(serf_bucket_t *bucket,
411                                       apr_size_t requested,
412                                       const char **data, apr_size_t *len)
413{
414    response_context_t *ctx = bucket->data;
415    apr_status_t rv;
416
417    rv = wait_for_body(bucket, ctx);
418    if (rv) {
419        /* It's not possible to have read anything yet! */
420        if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
421            *len = 0;
422        }
423        return rv;
424    }
425
426    rv = serf_bucket_read(ctx->body, requested, data, len);
427    if (SERF_BUCKET_READ_ERROR(rv))
428        return rv;
429
430    if (APR_STATUS_IS_EOF(rv)) {
431        if (ctx->chunked) {
432            ctx->state = STATE_TRAILERS;
433            /* Mask the result. */
434            rv = APR_SUCCESS;
435        } else {
436            ctx->state = STATE_DONE;
437        }
438    }
439    return rv;
440}
441
442static apr_status_t serf_response_readline(serf_bucket_t *bucket,
443                                           int acceptable, int *found,
444                                           const char **data, apr_size_t *len)
445{
446    response_context_t *ctx = bucket->data;
447    apr_status_t rv;
448
449    rv = wait_for_body(bucket, ctx);
450    if (rv) {
451        return rv;
452    }
453
454    /* Delegate to the stream bucket to do the readline. */
455    return serf_bucket_readline(ctx->body, acceptable, found, data, len);
456}
457
458apr_status_t serf_response_full_become_aggregate(serf_bucket_t *bucket)
459{
460    response_context_t *ctx = bucket->data;
461    serf_bucket_t *bkt;
462    char buf[256];
463    int size;
464
465    serf_bucket_aggregate_become(bucket);
466
467    /* Add reconstructed status line. */
468    size = apr_snprintf(buf, 256, "HTTP/%d.%d %d ",
469                        SERF_HTTP_VERSION_MAJOR(ctx->sl.version),
470                        SERF_HTTP_VERSION_MINOR(ctx->sl.version),
471                        ctx->sl.code);
472    bkt = serf_bucket_simple_copy_create(buf, size,
473                                         bucket->allocator);
474    serf_bucket_aggregate_append(bucket, bkt);
475    bkt = serf_bucket_simple_copy_create(ctx->sl.reason, strlen(ctx->sl.reason),
476                                         bucket->allocator);
477    serf_bucket_aggregate_append(bucket, bkt);
478    bkt = SERF_BUCKET_SIMPLE_STRING_LEN("\r\n", 2,
479                                        bucket->allocator);
480    serf_bucket_aggregate_append(bucket, bkt);
481
482    /* Add headers and stream buckets in order. */
483    serf_bucket_aggregate_append(bucket, ctx->headers);
484    serf_bucket_aggregate_append(bucket, ctx->stream);
485
486    serf_bucket_mem_free(bucket->allocator, ctx);
487
488    return APR_SUCCESS;
489}
490
491/* ### need to implement */
492#define serf_response_peek NULL
493
494const serf_bucket_type_t serf_bucket_type_response = {
495    "RESPONSE",
496    serf_response_read,
497    serf_response_readline,
498    serf_default_read_iovec,
499    serf_default_read_for_sendfile,
500    serf_default_read_bucket,
501    serf_response_peek,
502    serf_response_destroy_and_data,
503};
504