response_buckets.c revision 251877
1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein 2 * 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include <apr_lib.h> 17#include <apr_strings.h> 18#include <apr_date.h> 19 20#include "serf.h" 21#include "serf_bucket_util.h" 22 23 24typedef struct { 25 serf_bucket_t *stream; 26 serf_bucket_t *body; /* Pointer to the stream wrapping the body. */ 27 serf_bucket_t *headers; /* holds parsed headers */ 28 29 enum { 30 STATE_STATUS_LINE, /* reading status line */ 31 STATE_HEADERS, /* reading headers */ 32 STATE_BODY, /* reading body */ 33 STATE_TRAILERS, /* reading trailers */ 34 STATE_DONE /* we've sent EOF */ 35 } state; 36 37 /* Buffer for accumulating a line from the response. */ 38 serf_linebuf_t linebuf; 39 40 serf_status_line sl; 41 42 int chunked; /* Do we need to read trailers? */ 43 int head_req; /* Was this a HEAD request? */ 44} response_context_t; 45 46 47serf_bucket_t *serf_bucket_response_create( 48 serf_bucket_t *stream, 49 serf_bucket_alloc_t *allocator) 50{ 51 response_context_t *ctx; 52 53 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 54 ctx->stream = stream; 55 ctx->body = NULL; 56 ctx->headers = serf_bucket_headers_create(allocator); 57 ctx->state = STATE_STATUS_LINE; 58 ctx->chunked = 0; 59 ctx->head_req = 0; 60 61 serf_linebuf_init(&ctx->linebuf); 62 63 return serf_bucket_create(&serf_bucket_type_response, allocator, ctx); 64} 65 66void serf_bucket_response_set_head( 67 serf_bucket_t *bucket) 68{ 69 response_context_t *ctx = bucket->data; 70 71 ctx->head_req = 1; 72} 73 74serf_bucket_t *serf_bucket_response_get_headers( 75 serf_bucket_t *bucket) 76{ 77 return ((response_context_t *)bucket->data)->headers; 78} 79 80 81static void serf_response_destroy_and_data(serf_bucket_t *bucket) 82{ 83 response_context_t *ctx = bucket->data; 84 85 if (ctx->state != STATE_STATUS_LINE) { 86 serf_bucket_mem_free(bucket->allocator, (void*)ctx->sl.reason); 87 } 88 89 serf_bucket_destroy(ctx->stream); 90 if (ctx->body != NULL) 91 serf_bucket_destroy(ctx->body); 92 serf_bucket_destroy(ctx->headers); 93 94 serf_default_destroy_and_data(bucket); 95} 96 97static apr_status_t fetch_line(response_context_t *ctx, int acceptable) 98{ 99 return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable); 100} 101 102static apr_status_t parse_status_line(response_context_t *ctx, 103 serf_bucket_alloc_t *allocator) 104{ 105 int res; 106 char *reason; /* ### stupid APR interface makes this non-const */ 107 108 /* ctx->linebuf.line should be of form: HTTP/1.1 200 OK */ 109 res = apr_date_checkmask(ctx->linebuf.line, "HTTP/#.# ###*"); 110 if (!res) { 111 /* Not an HTTP response? Well, at least we won't understand it. */ 112 return SERF_ERROR_BAD_HTTP_RESPONSE; 113 } 114 115 ctx->sl.version = SERF_HTTP_VERSION(ctx->linebuf.line[5] - '0', 116 ctx->linebuf.line[7] - '0'); 117 ctx->sl.code = apr_strtoi64(ctx->linebuf.line + 8, &reason, 10); 118 119 /* Skip leading spaces for the reason string. */ 120 if (apr_isspace(*reason)) { 121 reason++; 122 } 123 124 /* Copy the reason value out of the line buffer. */ 125 ctx->sl.reason = serf_bstrmemdup(allocator, reason, 126 ctx->linebuf.used 127 - (reason - ctx->linebuf.line)); 128 129 return APR_SUCCESS; 130} 131 132/* This code should be replaced with header buckets. */ 133static apr_status_t fetch_headers(serf_bucket_t *bkt, response_context_t *ctx) 134{ 135 apr_status_t status; 136 137 /* RFC 2616 says that CRLF is the only line ending, but we can easily 138 * accept any kind of line ending. 139 */ 140 status = fetch_line(ctx, SERF_NEWLINE_ANY); 141 if (SERF_BUCKET_READ_ERROR(status)) { 142 return status; 143 } 144 /* Something was read. Process it. */ 145 146 if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { 147 const char *end_key; 148 const char *c; 149 150 end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used); 151 if (!c) { 152 /* Bad headers? */ 153 return SERF_ERROR_BAD_HTTP_RESPONSE; 154 } 155 156 /* Skip over initial ':' */ 157 c++; 158 159 /* And skip all whitespaces. */ 160 for(; c < ctx->linebuf.line + ctx->linebuf.used; c++) 161 { 162 if (!apr_isspace(*c)) 163 { 164 break; 165 } 166 } 167 168 /* Always copy the headers (from the linebuf into new mem). */ 169 /* ### we should be able to optimize some mem copies */ 170 serf_bucket_headers_setx( 171 ctx->headers, 172 ctx->linebuf.line, end_key - ctx->linebuf.line, 1, 173 c, ctx->linebuf.line + ctx->linebuf.used - c, 1); 174 } 175 176 return status; 177} 178 179/* Perform one iteration of the state machine. 180 * 181 * Will return when one the following conditions occurred: 182 * 1) a state change 183 * 2) an error 184 * 3) the stream is not ready or at EOF 185 * 4) APR_SUCCESS, meaning the machine can be run again immediately 186 */ 187static apr_status_t run_machine(serf_bucket_t *bkt, response_context_t *ctx) 188{ 189 apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */ 190 191 switch (ctx->state) { 192 case STATE_STATUS_LINE: 193 /* RFC 2616 says that CRLF is the only line ending, but we can easily 194 * accept any kind of line ending. 195 */ 196 status = fetch_line(ctx, SERF_NEWLINE_ANY); 197 if (SERF_BUCKET_READ_ERROR(status)) 198 return status; 199 200 if (ctx->linebuf.state == SERF_LINEBUF_READY) { 201 /* The Status-Line is in the line buffer. Process it. */ 202 status = parse_status_line(ctx, bkt->allocator); 203 if (status) 204 return status; 205 206 /* Good times ahead: we're switching protocols! */ 207 if (ctx->sl.code == 101) { 208 ctx->body = 209 serf_bucket_barrier_create(ctx->stream, bkt->allocator); 210 ctx->state = STATE_DONE; 211 break; 212 } 213 214 /* Okay... move on to reading the headers. */ 215 ctx->state = STATE_HEADERS; 216 } 217 else { 218 /* The connection closed before we could get the next 219 * response. Treat the request as lost so that our upper 220 * end knows the server never tried to give us a response. 221 */ 222 if (APR_STATUS_IS_EOF(status)) { 223 return SERF_ERROR_REQUEST_LOST; 224 } 225 } 226 break; 227 case STATE_HEADERS: 228 status = fetch_headers(bkt, ctx); 229 if (SERF_BUCKET_READ_ERROR(status)) 230 return status; 231 232 /* If an empty line was read, then we hit the end of the headers. 233 * Move on to the body. 234 */ 235 if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { 236 const void *v; 237 238 /* Advance the state. */ 239 ctx->state = STATE_BODY; 240 241 ctx->body = 242 serf_bucket_barrier_create(ctx->stream, bkt->allocator); 243 244 /* Are we C-L, chunked, or conn close? */ 245 v = serf_bucket_headers_get(ctx->headers, "Content-Length"); 246 if (v) { 247 apr_uint64_t length; 248 length = apr_strtoi64(v, NULL, 10); 249 if (errno == ERANGE) { 250 return APR_FROM_OS_ERROR(ERANGE); 251 } 252 ctx->body = serf_bucket_response_body_create( 253 ctx->body, length, bkt->allocator); 254 } 255 else { 256 v = serf_bucket_headers_get(ctx->headers, "Transfer-Encoding"); 257 258 /* Need to handle multiple transfer-encoding. */ 259 if (v && strcasecmp("chunked", v) == 0) { 260 ctx->chunked = 1; 261 ctx->body = serf_bucket_dechunk_create(ctx->body, 262 bkt->allocator); 263 } 264 265 if (!v && (ctx->sl.code == 204 || ctx->sl.code == 304)) { 266 ctx->state = STATE_DONE; 267 } 268 } 269 v = serf_bucket_headers_get(ctx->headers, "Content-Encoding"); 270 if (v) { 271 /* Need to handle multiple content-encoding. */ 272 if (v && strcasecmp("gzip", v) == 0) { 273 ctx->body = 274 serf_bucket_deflate_create(ctx->body, bkt->allocator, 275 SERF_DEFLATE_GZIP); 276 } 277 else if (v && strcasecmp("deflate", v) == 0) { 278 ctx->body = 279 serf_bucket_deflate_create(ctx->body, bkt->allocator, 280 SERF_DEFLATE_DEFLATE); 281 } 282 } 283 /* If we're a HEAD request, we don't receive a body. */ 284 if (ctx->head_req) { 285 ctx->state = STATE_DONE; 286 } 287 } 288 break; 289 case STATE_BODY: 290 /* Don't do anything. */ 291 break; 292 case STATE_TRAILERS: 293 status = fetch_headers(bkt, ctx); 294 if (SERF_BUCKET_READ_ERROR(status)) 295 return status; 296 297 /* If an empty line was read, then we're done. */ 298 if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { 299 ctx->state = STATE_DONE; 300 return APR_EOF; 301 } 302 break; 303 case STATE_DONE: 304 return APR_EOF; 305 default: 306 /* Not reachable */ 307 return APR_EGENERAL; 308 } 309 310 return status; 311} 312 313static apr_status_t wait_for_body(serf_bucket_t *bkt, response_context_t *ctx) 314{ 315 apr_status_t status; 316 317 /* Keep reading and moving through states if we aren't at the BODY */ 318 while (ctx->state != STATE_BODY) { 319 status = run_machine(bkt, ctx); 320 321 /* Anything other than APR_SUCCESS means that we cannot immediately 322 * read again (for now). 323 */ 324 if (status) 325 return status; 326 } 327 /* in STATE_BODY */ 328 329 return APR_SUCCESS; 330} 331 332apr_status_t serf_bucket_response_wait_for_headers( 333 serf_bucket_t *bucket) 334{ 335 response_context_t *ctx = bucket->data; 336 337 return wait_for_body(bucket, ctx); 338} 339 340apr_status_t serf_bucket_response_status( 341 serf_bucket_t *bkt, 342 serf_status_line *sline) 343{ 344 response_context_t *ctx = bkt->data; 345 apr_status_t status; 346 347 if (ctx->state != STATE_STATUS_LINE) { 348 /* We already read it and moved on. Just return it. */ 349 *sline = ctx->sl; 350 return APR_SUCCESS; 351 } 352 353 /* Running the state machine once will advance the machine, or state 354 * that the stream isn't ready with enough data. There isn't ever a 355 * need to run the machine more than once to try and satisfy this. We 356 * have to look at the state to tell whether it advanced, though, as 357 * it is quite possible to advance *and* to return APR_EAGAIN. 358 */ 359 status = run_machine(bkt, ctx); 360 if (ctx->state == STATE_HEADERS) { 361 *sline = ctx->sl; 362 } 363 else { 364 /* Indicate that we don't have the information yet. */ 365 sline->version = 0; 366 } 367 368 return status; 369} 370 371static apr_status_t serf_response_read(serf_bucket_t *bucket, 372 apr_size_t requested, 373 const char **data, apr_size_t *len) 374{ 375 response_context_t *ctx = bucket->data; 376 apr_status_t rv; 377 378 rv = wait_for_body(bucket, ctx); 379 if (rv) { 380 /* It's not possible to have read anything yet! */ 381 if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) { 382 *len = 0; 383 } 384 return rv; 385 } 386 387 rv = serf_bucket_read(ctx->body, requested, data, len); 388 if (SERF_BUCKET_READ_ERROR(rv)) 389 return rv; 390 391 if (APR_STATUS_IS_EOF(rv)) { 392 if (ctx->chunked) { 393 ctx->state = STATE_TRAILERS; 394 /* Mask the result. */ 395 rv = APR_SUCCESS; 396 } else { 397 ctx->state = STATE_DONE; 398 } 399 } 400 return rv; 401} 402 403static apr_status_t serf_response_readline(serf_bucket_t *bucket, 404 int acceptable, int *found, 405 const char **data, apr_size_t *len) 406{ 407 response_context_t *ctx = bucket->data; 408 apr_status_t rv; 409 410 rv = wait_for_body(bucket, ctx); 411 if (rv) { 412 return rv; 413 } 414 415 /* Delegate to the stream bucket to do the readline. */ 416 return serf_bucket_readline(ctx->body, acceptable, found, data, len); 417} 418 419apr_status_t serf_response_full_become_aggregate(serf_bucket_t *bucket) 420{ 421 response_context_t *ctx = bucket->data; 422 serf_bucket_t *bkt; 423 char buf[256]; 424 int size; 425 426 serf_bucket_aggregate_become(bucket); 427 428 /* Add reconstructed status line. */ 429 size = apr_snprintf(buf, 256, "HTTP/%d.%d %d ", 430 SERF_HTTP_VERSION_MAJOR(ctx->sl.version), 431 SERF_HTTP_VERSION_MINOR(ctx->sl.version), 432 ctx->sl.code); 433 bkt = serf_bucket_simple_copy_create(buf, size, 434 bucket->allocator); 435 serf_bucket_aggregate_append(bucket, bkt); 436 bkt = serf_bucket_simple_copy_create(ctx->sl.reason, strlen(ctx->sl.reason), 437 bucket->allocator); 438 serf_bucket_aggregate_append(bucket, bkt); 439 bkt = SERF_BUCKET_SIMPLE_STRING_LEN("\r\n", 2, 440 bucket->allocator); 441 serf_bucket_aggregate_append(bucket, bkt); 442 443 /* Add headers and stream buckets in order. */ 444 serf_bucket_aggregate_append(bucket, ctx->headers); 445 serf_bucket_aggregate_append(bucket, ctx->stream); 446 447 serf_bucket_mem_free(bucket->allocator, ctx); 448 449 return APR_SUCCESS; 450} 451 452/* ### need to implement */ 453#define serf_response_peek NULL 454 455const serf_bucket_type_t serf_bucket_type_response = { 456 "RESPONSE", 457 serf_response_read, 458 serf_response_readline, 459 serf_default_read_iovec, 460 serf_default_read_for_sendfile, 461 serf_default_read_bucket, 462 serf_response_peek, 463 serf_response_destroy_and_data, 464}; 465