1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein 2 * 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include <apr_pools.h> 17#include <apr_strings.h> 18#include <apr_lib.h> 19#include <apr_date.h> 20 21#include "serf.h" 22#include "serf_bucket_util.h" 23#include "serf_bucket_types.h" 24 25#include <stdlib.h> 26 27/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP) 28 * See: 29 * http://bwtp.wikidot.com/ 30 */ 31 32typedef struct { 33 int channel; 34 int open; 35 int type; /* 0 = header, 1 = message */ /* TODO enum? */ 36 const char *phrase; 37 serf_bucket_t *headers; 38 39 char req_line[1000]; 40} frame_context_t; 41 42typedef struct { 43 serf_bucket_t *stream; 44 serf_bucket_t *body; /* Pointer to the stream wrapping the body. */ 45 serf_bucket_t *headers; /* holds parsed headers */ 46 47 enum { 48 STATE_STATUS_LINE, /* reading status line */ 49 STATE_HEADERS, /* reading headers */ 50 STATE_BODY, /* reading body */ 51 STATE_DONE /* we've sent EOF */ 52 } state; 53 54 /* Buffer for accumulating a line from the response. */ 55 serf_linebuf_t linebuf; 56 57 int type; /* 0 = header, 1 = message */ /* TODO enum? */ 58 int channel; 59 char *phrase; 60 apr_size_t length; 61} incoming_context_t; 62 63 64serf_bucket_t *serf_bucket_bwtp_channel_close( 65 int channel, 66 serf_bucket_alloc_t *allocator) 67{ 68 frame_context_t *ctx; 69 70 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 71 ctx->type = 0; 72 ctx->open = 0; 73 ctx->channel = channel; 74 ctx->phrase = "CLOSED"; 75 ctx->headers = serf_bucket_headers_create(allocator); 76 77 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 78} 79 80serf_bucket_t *serf_bucket_bwtp_channel_open( 81 int channel, 82 const char *uri, 83 serf_bucket_alloc_t *allocator) 84{ 85 frame_context_t *ctx; 86 87 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 88 ctx->type = 0; 89 ctx->open = 1; 90 ctx->channel = channel; 91 ctx->phrase = uri; 92 ctx->headers = serf_bucket_headers_create(allocator); 93 94 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 95} 96 97serf_bucket_t *serf_bucket_bwtp_header_create( 98 int channel, 99 const char *phrase, 100 serf_bucket_alloc_t *allocator) 101{ 102 frame_context_t *ctx; 103 104 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 105 ctx->type = 0; 106 ctx->open = 0; 107 ctx->channel = channel; 108 ctx->phrase = phrase; 109 ctx->headers = serf_bucket_headers_create(allocator); 110 111 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 112} 113 114serf_bucket_t *serf_bucket_bwtp_message_create( 115 int channel, 116 serf_bucket_t *body, 117 serf_bucket_alloc_t *allocator) 118{ 119 frame_context_t *ctx; 120 121 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 122 ctx->type = 1; 123 ctx->open = 0; 124 ctx->channel = channel; 125 ctx->phrase = "MESSAGE"; 126 ctx->headers = serf_bucket_headers_create(allocator); 127 128 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 129} 130 131int serf_bucket_bwtp_frame_get_channel( 132 serf_bucket_t *bucket) 133{ 134 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 135 frame_context_t *ctx = bucket->data; 136 137 return ctx->channel; 138 } 139 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 140 incoming_context_t *ctx = bucket->data; 141 142 return ctx->channel; 143 } 144 145 return -1; 146} 147 148int serf_bucket_bwtp_frame_get_type( 149 serf_bucket_t *bucket) 150{ 151 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 152 frame_context_t *ctx = bucket->data; 153 154 return ctx->type; 155 } 156 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 157 incoming_context_t *ctx = bucket->data; 158 159 return ctx->type; 160 } 161 162 return -1; 163} 164 165const char *serf_bucket_bwtp_frame_get_phrase( 166 serf_bucket_t *bucket) 167{ 168 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 169 frame_context_t *ctx = bucket->data; 170 171 return ctx->phrase; 172 } 173 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 174 incoming_context_t *ctx = bucket->data; 175 176 return ctx->phrase; 177 } 178 179 return NULL; 180} 181 182serf_bucket_t *serf_bucket_bwtp_frame_get_headers( 183 serf_bucket_t *bucket) 184{ 185 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 186 frame_context_t *ctx = bucket->data; 187 188 return ctx->headers; 189 } 190 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 191 incoming_context_t *ctx = bucket->data; 192 193 return ctx->headers; 194 } 195 196 return NULL; 197} 198 199static int count_size(void *baton, const char *key, const char *value) 200{ 201 apr_size_t *c = baton; 202 /* TODO Deal with folding. Yikes. */ 203 204 /* Add in ": " and CRLF - so an extra four bytes. */ 205 *c += strlen(key) + strlen(value) + 4; 206 207 return 0; 208} 209 210static apr_size_t calc_header_size(serf_bucket_t *hdrs) 211{ 212 apr_size_t size = 0; 213 214 serf_bucket_headers_do(hdrs, count_size, &size); 215 216 return size; 217} 218 219static void serialize_data(serf_bucket_t *bucket) 220{ 221 frame_context_t *ctx = bucket->data; 222 serf_bucket_t *new_bucket; 223 apr_size_t req_len; 224 225 /* Serialize the request-line and headers into one mother string, 226 * and wrap a bucket around it. 227 */ 228 req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line), 229 "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n", 230 (ctx->type ? "BWM" : "BWH"), 231 ctx->channel, calc_header_size(ctx->headers), 232 (ctx->open ? "OPEN " : ""), 233 ctx->phrase); 234 new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len, 235 bucket->allocator); 236 237 /* Build up the new bucket structure. 238 * 239 * Note that self needs to become an aggregate bucket so that a 240 * pointer to self still represents the "right" data. 241 */ 242 serf_bucket_aggregate_become(bucket); 243 244 /* Insert the two buckets. */ 245 serf_bucket_aggregate_append(bucket, new_bucket); 246 serf_bucket_aggregate_append(bucket, ctx->headers); 247 248 /* Our private context is no longer needed, and is not referred to by 249 * any existing bucket. Toss it. 250 */ 251 serf_bucket_mem_free(bucket->allocator, ctx); 252} 253 254static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket, 255 apr_size_t requested, 256 const char **data, apr_size_t *len) 257{ 258 /* Seralize our private data into a new aggregate bucket. */ 259 serialize_data(bucket); 260 261 /* Delegate to the "new" aggregate bucket to do the read. */ 262 return serf_bucket_read(bucket, requested, data, len); 263} 264 265static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket, 266 int acceptable, int *found, 267 const char **data, apr_size_t *len) 268{ 269 /* Seralize our private data into a new aggregate bucket. */ 270 serialize_data(bucket); 271 272 /* Delegate to the "new" aggregate bucket to do the readline. */ 273 return serf_bucket_readline(bucket, acceptable, found, data, len); 274} 275 276static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket, 277 apr_size_t requested, 278 int vecs_size, 279 struct iovec *vecs, 280 int *vecs_used) 281{ 282 /* Seralize our private data into a new aggregate bucket. */ 283 serialize_data(bucket); 284 285 /* Delegate to the "new" aggregate bucket to do the read. */ 286 return serf_bucket_read_iovec(bucket, requested, 287 vecs_size, vecs, vecs_used); 288} 289 290static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket, 291 const char **data, 292 apr_size_t *len) 293{ 294 /* Seralize our private data into a new aggregate bucket. */ 295 serialize_data(bucket); 296 297 /* Delegate to the "new" aggregate bucket to do the peek. */ 298 return serf_bucket_peek(bucket, data, len); 299} 300 301const serf_bucket_type_t serf_bucket_type_bwtp_frame = { 302 "BWTP-FRAME", 303 serf_bwtp_frame_read, 304 serf_bwtp_frame_readline, 305 serf_bwtp_frame_read_iovec, 306 serf_default_read_for_sendfile, 307 serf_default_read_bucket, 308 serf_bwtp_frame_peek, 309 serf_default_destroy_and_data, 310}; 311 312 313serf_bucket_t *serf_bucket_bwtp_incoming_frame_create( 314 serf_bucket_t *stream, 315 serf_bucket_alloc_t *allocator) 316{ 317 incoming_context_t *ctx; 318 319 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 320 ctx->stream = stream; 321 ctx->body = NULL; 322 ctx->headers = serf_bucket_headers_create(allocator); 323 ctx->state = STATE_STATUS_LINE; 324 ctx->length = 0; 325 ctx->channel = -1; 326 ctx->phrase = NULL; 327 328 serf_linebuf_init(&ctx->linebuf); 329 330 return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx); 331} 332 333static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket) 334{ 335 incoming_context_t *ctx = bucket->data; 336 337 if (ctx->state != STATE_STATUS_LINE && ctx->phrase) { 338 serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase); 339 } 340 341 serf_bucket_destroy(ctx->stream); 342 if (ctx->body != NULL) 343 serf_bucket_destroy(ctx->body); 344 serf_bucket_destroy(ctx->headers); 345 346 serf_default_destroy_and_data(bucket); 347} 348 349static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable) 350{ 351 return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable); 352} 353 354static apr_status_t parse_status_line(incoming_context_t *ctx, 355 serf_bucket_alloc_t *allocator) 356{ 357 int res; 358 char *reason; /* ### stupid APR interface makes this non-const */ 359 360 /* ctx->linebuf.line should be of form: BW* */ 361 res = apr_date_checkmask(ctx->linebuf.line, "BW*"); 362 if (!res) { 363 /* Not an BWTP response? Well, at least we won't understand it. */ 364 return APR_EGENERAL; 365 } 366 367 if (ctx->linebuf.line[2] == 'H') { 368 ctx->type = 0; 369 } 370 else if (ctx->linebuf.line[2] == 'M') { 371 ctx->type = 1; 372 } 373 else { 374 ctx->type = -1; 375 } 376 377 ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16); 378 379 /* Skip leading spaces for the reason string. */ 380 if (apr_isspace(*reason)) { 381 reason++; 382 } 383 384 ctx->length = apr_strtoi64(reason, &reason, 16); 385 386 /* Skip leading spaces for the reason string. */ 387 if (reason - ctx->linebuf.line < ctx->linebuf.used) { 388 if (apr_isspace(*reason)) { 389 reason++; 390 } 391 392 ctx->phrase = serf_bstrmemdup(allocator, reason, 393 ctx->linebuf.used 394 - (reason - ctx->linebuf.line)); 395 } else { 396 ctx->phrase = NULL; 397 } 398 399 return APR_SUCCESS; 400} 401 402/* This code should be replaced with header buckets. */ 403static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx) 404{ 405 apr_status_t status; 406 407 /* RFC 2616 says that CRLF is the only line ending, but we can easily 408 * accept any kind of line ending. 409 */ 410 status = fetch_line(ctx, SERF_NEWLINE_ANY); 411 if (SERF_BUCKET_READ_ERROR(status)) { 412 return status; 413 } 414 /* Something was read. Process it. */ 415 416 if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { 417 const char *end_key; 418 const char *c; 419 420 end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used); 421 if (!c) { 422 /* Bad headers? */ 423 return APR_EGENERAL; 424 } 425 426 /* Skip over initial : and spaces. */ 427 while (apr_isspace(*++c)) 428 continue; 429 430 /* Always copy the headers (from the linebuf into new mem). */ 431 /* ### we should be able to optimize some mem copies */ 432 serf_bucket_headers_setx( 433 ctx->headers, 434 ctx->linebuf.line, end_key - ctx->linebuf.line, 1, 435 c, ctx->linebuf.line + ctx->linebuf.used - c, 1); 436 } 437 438 return status; 439} 440 441/* Perform one iteration of the state machine. 442 * 443 * Will return when one the following conditions occurred: 444 * 1) a state change 445 * 2) an error 446 * 3) the stream is not ready or at EOF 447 * 4) APR_SUCCESS, meaning the machine can be run again immediately 448 */ 449static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx) 450{ 451 apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */ 452 453 switch (ctx->state) { 454 case STATE_STATUS_LINE: 455 /* RFC 2616 says that CRLF is the only line ending, but we can easily 456 * accept any kind of line ending. 457 */ 458 status = fetch_line(ctx, SERF_NEWLINE_ANY); 459 if (SERF_BUCKET_READ_ERROR(status)) 460 return status; 461 462 if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { 463 /* The Status-Line is in the line buffer. Process it. */ 464 status = parse_status_line(ctx, bkt->allocator); 465 if (status) 466 return status; 467 468 if (ctx->length) { 469 ctx->body = 470 serf_bucket_barrier_create(ctx->stream, bkt->allocator); 471 ctx->body = serf_bucket_limit_create(ctx->body, ctx->length, 472 bkt->allocator); 473 if (!ctx->type) { 474 ctx->state = STATE_HEADERS; 475 } else { 476 ctx->state = STATE_BODY; 477 } 478 } else { 479 ctx->state = STATE_DONE; 480 } 481 } 482 else { 483 /* The connection closed before we could get the next 484 * response. Treat the request as lost so that our upper 485 * end knows the server never tried to give us a response. 486 */ 487 if (APR_STATUS_IS_EOF(status)) { 488 return SERF_ERROR_REQUEST_LOST; 489 } 490 } 491 break; 492 case STATE_HEADERS: 493 status = fetch_headers(ctx->body, ctx); 494 if (SERF_BUCKET_READ_ERROR(status)) 495 return status; 496 497 /* If an empty line was read, then we hit the end of the headers. 498 * Move on to the body. 499 */ 500 if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { 501 /* Advance the state. */ 502 ctx->state = STATE_DONE; 503 } 504 break; 505 case STATE_BODY: 506 /* Don't do anything. */ 507 break; 508 case STATE_DONE: 509 return APR_EOF; 510 default: 511 /* Not reachable */ 512 return APR_EGENERAL; 513 } 514 515 return status; 516} 517 518static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx) 519{ 520 apr_status_t status; 521 522 /* Keep reading and moving through states if we aren't at the BODY */ 523 while (ctx->state != STATE_BODY) { 524 status = run_machine(bkt, ctx); 525 526 /* Anything other than APR_SUCCESS means that we cannot immediately 527 * read again (for now). 528 */ 529 if (status) 530 return status; 531 } 532 /* in STATE_BODY */ 533 534 return APR_SUCCESS; 535} 536 537apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers( 538 serf_bucket_t *bucket) 539{ 540 incoming_context_t *ctx = bucket->data; 541 542 return wait_for_body(bucket, ctx); 543} 544 545static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket, 546 apr_size_t requested, 547 const char **data, apr_size_t *len) 548{ 549 incoming_context_t *ctx = bucket->data; 550 apr_status_t rv; 551 552 rv = wait_for_body(bucket, ctx); 553 if (rv) { 554 /* It's not possible to have read anything yet! */ 555 if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) { 556 *len = 0; 557 } 558 return rv; 559 } 560 561 rv = serf_bucket_read(ctx->body, requested, data, len); 562 if (APR_STATUS_IS_EOF(rv)) { 563 ctx->state = STATE_DONE; 564 } 565 return rv; 566} 567 568static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket, 569 int acceptable, int *found, 570 const char **data, apr_size_t *len) 571{ 572 incoming_context_t *ctx = bucket->data; 573 apr_status_t rv; 574 575 rv = wait_for_body(bucket, ctx); 576 if (rv) { 577 return rv; 578 } 579 580 /* Delegate to the stream bucket to do the readline. */ 581 return serf_bucket_readline(ctx->body, acceptable, found, data, len); 582} 583 584/* ### need to implement */ 585#define bwtp_incoming_peek NULL 586 587const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = { 588 "BWTP-INCOMING", 589 bwtp_incoming_read, 590 bwtp_incoming_readline, 591 serf_default_read_iovec, 592 serf_default_read_for_sendfile, 593 serf_default_read_bucket, 594 bwtp_incoming_peek, 595 bwtp_incoming_destroy_and_data, 596}; 597