response_buckets.c revision 251877
1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <apr_lib.h>
17#include <apr_strings.h>
18#include <apr_date.h>
19
20#include "serf.h"
21#include "serf_bucket_util.h"
22
23
24typedef struct {
25    serf_bucket_t *stream;
26    serf_bucket_t *body;        /* Pointer to the stream wrapping the body. */
27    serf_bucket_t *headers;     /* holds parsed headers */
28
29    enum {
30        STATE_STATUS_LINE,      /* reading status line */
31        STATE_HEADERS,          /* reading headers */
32        STATE_BODY,             /* reading body */
33        STATE_TRAILERS,         /* reading trailers */
34        STATE_DONE              /* we've sent EOF */
35    } state;
36
37    /* Buffer for accumulating a line from the response. */
38    serf_linebuf_t linebuf;
39
40    serf_status_line sl;
41
42    int chunked;                /* Do we need to read trailers? */
43    int head_req;               /* Was this a HEAD request? */
44} response_context_t;
45
46
47serf_bucket_t *serf_bucket_response_create(
48    serf_bucket_t *stream,
49    serf_bucket_alloc_t *allocator)
50{
51    response_context_t *ctx;
52
53    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
54    ctx->stream = stream;
55    ctx->body = NULL;
56    ctx->headers = serf_bucket_headers_create(allocator);
57    ctx->state = STATE_STATUS_LINE;
58    ctx->chunked = 0;
59    ctx->head_req = 0;
60
61    serf_linebuf_init(&ctx->linebuf);
62
63    return serf_bucket_create(&serf_bucket_type_response, allocator, ctx);
64}
65
66void serf_bucket_response_set_head(
67    serf_bucket_t *bucket)
68{
69    response_context_t *ctx = bucket->data;
70
71    ctx->head_req = 1;
72}
73
74serf_bucket_t *serf_bucket_response_get_headers(
75    serf_bucket_t *bucket)
76{
77    return ((response_context_t *)bucket->data)->headers;
78}
79
80
81static void serf_response_destroy_and_data(serf_bucket_t *bucket)
82{
83    response_context_t *ctx = bucket->data;
84
85    if (ctx->state != STATE_STATUS_LINE) {
86        serf_bucket_mem_free(bucket->allocator, (void*)ctx->sl.reason);
87    }
88
89    serf_bucket_destroy(ctx->stream);
90    if (ctx->body != NULL)
91        serf_bucket_destroy(ctx->body);
92    serf_bucket_destroy(ctx->headers);
93
94    serf_default_destroy_and_data(bucket);
95}
96
97static apr_status_t fetch_line(response_context_t *ctx, int acceptable)
98{
99    return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
100}
101
102static apr_status_t parse_status_line(response_context_t *ctx,
103                                      serf_bucket_alloc_t *allocator)
104{
105    int res;
106    char *reason; /* ### stupid APR interface makes this non-const */
107
108    /* ctx->linebuf.line should be of form: HTTP/1.1 200 OK */
109    res = apr_date_checkmask(ctx->linebuf.line, "HTTP/#.# ###*");
110    if (!res) {
111        /* Not an HTTP response?  Well, at least we won't understand it. */
112        return SERF_ERROR_BAD_HTTP_RESPONSE;
113    }
114
115    ctx->sl.version = SERF_HTTP_VERSION(ctx->linebuf.line[5] - '0',
116                                        ctx->linebuf.line[7] - '0');
117    ctx->sl.code = apr_strtoi64(ctx->linebuf.line + 8, &reason, 10);
118
119    /* Skip leading spaces for the reason string. */
120    if (apr_isspace(*reason)) {
121        reason++;
122    }
123
124    /* Copy the reason value out of the line buffer. */
125    ctx->sl.reason = serf_bstrmemdup(allocator, reason,
126                                     ctx->linebuf.used
127                                     - (reason - ctx->linebuf.line));
128
129    return APR_SUCCESS;
130}
131
132/* This code should be replaced with header buckets. */
133static apr_status_t fetch_headers(serf_bucket_t *bkt, response_context_t *ctx)
134{
135    apr_status_t status;
136
137    /* RFC 2616 says that CRLF is the only line ending, but we can easily
138     * accept any kind of line ending.
139     */
140    status = fetch_line(ctx, SERF_NEWLINE_ANY);
141    if (SERF_BUCKET_READ_ERROR(status)) {
142        return status;
143    }
144    /* Something was read. Process it. */
145
146    if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
147        const char *end_key;
148        const char *c;
149
150        end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
151        if (!c) {
152            /* Bad headers? */
153            return SERF_ERROR_BAD_HTTP_RESPONSE;
154        }
155
156        /* Skip over initial ':' */
157        c++;
158
159        /* And skip all whitespaces. */
160        for(; c < ctx->linebuf.line + ctx->linebuf.used; c++)
161        {
162            if (!apr_isspace(*c))
163            {
164              break;
165            }
166        }
167
168        /* Always copy the headers (from the linebuf into new mem). */
169        /* ### we should be able to optimize some mem copies */
170        serf_bucket_headers_setx(
171            ctx->headers,
172            ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
173            c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
174    }
175
176    return status;
177}
178
179/* Perform one iteration of the state machine.
180 *
181 * Will return when one the following conditions occurred:
182 *  1) a state change
183 *  2) an error
184 *  3) the stream is not ready or at EOF
185 *  4) APR_SUCCESS, meaning the machine can be run again immediately
186 */
187static apr_status_t run_machine(serf_bucket_t *bkt, response_context_t *ctx)
188{
189    apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
190
191    switch (ctx->state) {
192    case STATE_STATUS_LINE:
193        /* RFC 2616 says that CRLF is the only line ending, but we can easily
194         * accept any kind of line ending.
195         */
196        status = fetch_line(ctx, SERF_NEWLINE_ANY);
197        if (SERF_BUCKET_READ_ERROR(status))
198            return status;
199
200        if (ctx->linebuf.state == SERF_LINEBUF_READY) {
201            /* The Status-Line is in the line buffer. Process it. */
202            status = parse_status_line(ctx, bkt->allocator);
203            if (status)
204                return status;
205
206            /* Good times ahead: we're switching protocols! */
207            if (ctx->sl.code == 101) {
208                ctx->body =
209                    serf_bucket_barrier_create(ctx->stream, bkt->allocator);
210                ctx->state = STATE_DONE;
211                break;
212            }
213
214            /* Okay... move on to reading the headers. */
215            ctx->state = STATE_HEADERS;
216        }
217        else {
218            /* The connection closed before we could get the next
219             * response.  Treat the request as lost so that our upper
220             * end knows the server never tried to give us a response.
221             */
222            if (APR_STATUS_IS_EOF(status)) {
223                return SERF_ERROR_REQUEST_LOST;
224            }
225        }
226        break;
227    case STATE_HEADERS:
228        status = fetch_headers(bkt, ctx);
229        if (SERF_BUCKET_READ_ERROR(status))
230            return status;
231
232        /* If an empty line was read, then we hit the end of the headers.
233         * Move on to the body.
234         */
235        if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
236            const void *v;
237
238            /* Advance the state. */
239            ctx->state = STATE_BODY;
240
241            ctx->body =
242                serf_bucket_barrier_create(ctx->stream, bkt->allocator);
243
244            /* Are we C-L, chunked, or conn close? */
245            v = serf_bucket_headers_get(ctx->headers, "Content-Length");
246            if (v) {
247                apr_uint64_t length;
248                length = apr_strtoi64(v, NULL, 10);
249                if (errno == ERANGE) {
250                    return APR_FROM_OS_ERROR(ERANGE);
251                }
252                ctx->body = serf_bucket_response_body_create(
253                              ctx->body, length, bkt->allocator);
254            }
255            else {
256                v = serf_bucket_headers_get(ctx->headers, "Transfer-Encoding");
257
258                /* Need to handle multiple transfer-encoding. */
259                if (v && strcasecmp("chunked", v) == 0) {
260                    ctx->chunked = 1;
261                    ctx->body = serf_bucket_dechunk_create(ctx->body,
262                                                           bkt->allocator);
263                }
264
265                if (!v && (ctx->sl.code == 204 || ctx->sl.code == 304)) {
266                    ctx->state = STATE_DONE;
267                }
268            }
269            v = serf_bucket_headers_get(ctx->headers, "Content-Encoding");
270            if (v) {
271                /* Need to handle multiple content-encoding. */
272                if (v && strcasecmp("gzip", v) == 0) {
273                    ctx->body =
274                        serf_bucket_deflate_create(ctx->body, bkt->allocator,
275                                                   SERF_DEFLATE_GZIP);
276                }
277                else if (v && strcasecmp("deflate", v) == 0) {
278                    ctx->body =
279                        serf_bucket_deflate_create(ctx->body, bkt->allocator,
280                                                   SERF_DEFLATE_DEFLATE);
281                }
282            }
283            /* If we're a HEAD request, we don't receive a body. */
284            if (ctx->head_req) {
285                ctx->state = STATE_DONE;
286            }
287        }
288        break;
289    case STATE_BODY:
290        /* Don't do anything. */
291        break;
292    case STATE_TRAILERS:
293        status = fetch_headers(bkt, ctx);
294        if (SERF_BUCKET_READ_ERROR(status))
295            return status;
296
297        /* If an empty line was read, then we're done. */
298        if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
299            ctx->state = STATE_DONE;
300            return APR_EOF;
301        }
302        break;
303    case STATE_DONE:
304        return APR_EOF;
305    default:
306        /* Not reachable */
307        return APR_EGENERAL;
308    }
309
310    return status;
311}
312
313static apr_status_t wait_for_body(serf_bucket_t *bkt, response_context_t *ctx)
314{
315    apr_status_t status;
316
317    /* Keep reading and moving through states if we aren't at the BODY */
318    while (ctx->state != STATE_BODY) {
319        status = run_machine(bkt, ctx);
320
321        /* Anything other than APR_SUCCESS means that we cannot immediately
322         * read again (for now).
323         */
324        if (status)
325            return status;
326    }
327    /* in STATE_BODY */
328
329    return APR_SUCCESS;
330}
331
332apr_status_t serf_bucket_response_wait_for_headers(
333    serf_bucket_t *bucket)
334{
335    response_context_t *ctx = bucket->data;
336
337    return wait_for_body(bucket, ctx);
338}
339
340apr_status_t serf_bucket_response_status(
341    serf_bucket_t *bkt,
342    serf_status_line *sline)
343{
344    response_context_t *ctx = bkt->data;
345    apr_status_t status;
346
347    if (ctx->state != STATE_STATUS_LINE) {
348        /* We already read it and moved on. Just return it. */
349        *sline = ctx->sl;
350        return APR_SUCCESS;
351    }
352
353    /* Running the state machine once will advance the machine, or state
354     * that the stream isn't ready with enough data. There isn't ever a
355     * need to run the machine more than once to try and satisfy this. We
356     * have to look at the state to tell whether it advanced, though, as
357     * it is quite possible to advance *and* to return APR_EAGAIN.
358     */
359    status = run_machine(bkt, ctx);
360    if (ctx->state == STATE_HEADERS) {
361        *sline = ctx->sl;
362    }
363    else {
364        /* Indicate that we don't have the information yet. */
365        sline->version = 0;
366    }
367
368    return status;
369}
370
371static apr_status_t serf_response_read(serf_bucket_t *bucket,
372                                       apr_size_t requested,
373                                       const char **data, apr_size_t *len)
374{
375    response_context_t *ctx = bucket->data;
376    apr_status_t rv;
377
378    rv = wait_for_body(bucket, ctx);
379    if (rv) {
380        /* It's not possible to have read anything yet! */
381        if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
382            *len = 0;
383        }
384        return rv;
385    }
386
387    rv = serf_bucket_read(ctx->body, requested, data, len);
388    if (SERF_BUCKET_READ_ERROR(rv))
389        return rv;
390
391    if (APR_STATUS_IS_EOF(rv)) {
392        if (ctx->chunked) {
393            ctx->state = STATE_TRAILERS;
394            /* Mask the result. */
395            rv = APR_SUCCESS;
396        } else {
397            ctx->state = STATE_DONE;
398        }
399    }
400    return rv;
401}
402
403static apr_status_t serf_response_readline(serf_bucket_t *bucket,
404                                           int acceptable, int *found,
405                                           const char **data, apr_size_t *len)
406{
407    response_context_t *ctx = bucket->data;
408    apr_status_t rv;
409
410    rv = wait_for_body(bucket, ctx);
411    if (rv) {
412        return rv;
413    }
414
415    /* Delegate to the stream bucket to do the readline. */
416    return serf_bucket_readline(ctx->body, acceptable, found, data, len);
417}
418
419apr_status_t serf_response_full_become_aggregate(serf_bucket_t *bucket)
420{
421    response_context_t *ctx = bucket->data;
422    serf_bucket_t *bkt;
423    char buf[256];
424    int size;
425
426    serf_bucket_aggregate_become(bucket);
427
428    /* Add reconstructed status line. */
429    size = apr_snprintf(buf, 256, "HTTP/%d.%d %d ",
430                        SERF_HTTP_VERSION_MAJOR(ctx->sl.version),
431                        SERF_HTTP_VERSION_MINOR(ctx->sl.version),
432                        ctx->sl.code);
433    bkt = serf_bucket_simple_copy_create(buf, size,
434                                         bucket->allocator);
435    serf_bucket_aggregate_append(bucket, bkt);
436    bkt = serf_bucket_simple_copy_create(ctx->sl.reason, strlen(ctx->sl.reason),
437                                         bucket->allocator);
438    serf_bucket_aggregate_append(bucket, bkt);
439    bkt = SERF_BUCKET_SIMPLE_STRING_LEN("\r\n", 2,
440                                        bucket->allocator);
441    serf_bucket_aggregate_append(bucket, bkt);
442
443    /* Add headers and stream buckets in order. */
444    serf_bucket_aggregate_append(bucket, ctx->headers);
445    serf_bucket_aggregate_append(bucket, ctx->stream);
446
447    serf_bucket_mem_free(bucket->allocator, ctx);
448
449    return APR_SUCCESS;
450}
451
452/* ### need to implement */
453#define serf_response_peek NULL
454
455const serf_bucket_type_t serf_bucket_type_response = {
456    "RESPONSE",
457    serf_response_read,
458    serf_response_readline,
459    serf_default_read_iovec,
460    serf_default_read_for_sendfile,
461    serf_default_read_bucket,
462    serf_response_peek,
463    serf_response_destroy_and_data,
464};
465