1/* ==================================================================== 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 * ==================================================================== 19 */ 20 21#include <apr_pools.h> 22#include <apr_strings.h> 23#include <apr_lib.h> 24#include <apr_date.h> 25 26#include "serf.h" 27#include "serf_bucket_util.h" 28#include "serf_bucket_types.h" 29 30#include <stdlib.h> 31 32/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP) 33 * See: 34 * http://bwtp.wikidot.com/ 35 */ 36 37typedef struct { 38 int channel; 39 int open; 40 int type; /* 0 = header, 1 = message */ /* TODO enum? */ 41 const char *phrase; 42 serf_bucket_t *headers; 43 44 char req_line[1000]; 45} frame_context_t; 46 47typedef struct { 48 serf_bucket_t *stream; 49 serf_bucket_t *body; /* Pointer to the stream wrapping the body. */ 50 serf_bucket_t *headers; /* holds parsed headers */ 51 52 enum { 53 STATE_STATUS_LINE, /* reading status line */ 54 STATE_HEADERS, /* reading headers */ 55 STATE_BODY, /* reading body */ 56 STATE_DONE /* we've sent EOF */ 57 } state; 58 59 /* Buffer for accumulating a line from the response. */ 60 serf_linebuf_t linebuf; 61 62 int type; /* 0 = header, 1 = message */ /* TODO enum? */ 63 int channel; 64 char *phrase; 65 apr_size_t length; 66} incoming_context_t; 67 68 69serf_bucket_t *serf_bucket_bwtp_channel_close( 70 int channel, 71 serf_bucket_alloc_t *allocator) 72{ 73 frame_context_t *ctx; 74 75 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 76 ctx->type = 0; 77 ctx->open = 0; 78 ctx->channel = channel; 79 ctx->phrase = "CLOSED"; 80 ctx->headers = serf_bucket_headers_create(allocator); 81 82 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 83} 84 85serf_bucket_t *serf_bucket_bwtp_channel_open( 86 int channel, 87 const char *uri, 88 serf_bucket_alloc_t *allocator) 89{ 90 frame_context_t *ctx; 91 92 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 93 ctx->type = 0; 94 ctx->open = 1; 95 ctx->channel = channel; 96 ctx->phrase = uri; 97 ctx->headers = serf_bucket_headers_create(allocator); 98 99 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 100} 101 102serf_bucket_t *serf_bucket_bwtp_header_create( 103 int channel, 104 const char *phrase, 105 serf_bucket_alloc_t *allocator) 106{ 107 frame_context_t *ctx; 108 109 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 110 ctx->type = 0; 111 ctx->open = 0; 112 ctx->channel = channel; 113 ctx->phrase = phrase; 114 ctx->headers = serf_bucket_headers_create(allocator); 115 116 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 117} 118 119serf_bucket_t *serf_bucket_bwtp_message_create( 120 int channel, 121 serf_bucket_t *body, 122 serf_bucket_alloc_t *allocator) 123{ 124 frame_context_t *ctx; 125 126 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 127 ctx->type = 1; 128 ctx->open = 0; 129 ctx->channel = channel; 130 ctx->phrase = "MESSAGE"; 131 ctx->headers = serf_bucket_headers_create(allocator); 132 133 return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 134} 135 136int serf_bucket_bwtp_frame_get_channel( 137 serf_bucket_t *bucket) 138{ 139 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 140 frame_context_t *ctx = bucket->data; 141 142 return ctx->channel; 143 } 144 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 145 incoming_context_t *ctx = bucket->data; 146 147 return ctx->channel; 148 } 149 150 return -1; 151} 152 153int serf_bucket_bwtp_frame_get_type( 154 serf_bucket_t *bucket) 155{ 156 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 157 frame_context_t *ctx = bucket->data; 158 159 return ctx->type; 160 } 161 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 162 incoming_context_t *ctx = bucket->data; 163 164 return ctx->type; 165 } 166 167 return -1; 168} 169 170const char *serf_bucket_bwtp_frame_get_phrase( 171 serf_bucket_t *bucket) 172{ 173 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 174 frame_context_t *ctx = bucket->data; 175 176 return ctx->phrase; 177 } 178 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 179 incoming_context_t *ctx = bucket->data; 180 181 return ctx->phrase; 182 } 183 184 return NULL; 185} 186 187serf_bucket_t *serf_bucket_bwtp_frame_get_headers( 188 serf_bucket_t *bucket) 189{ 190 if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 191 frame_context_t *ctx = bucket->data; 192 193 return ctx->headers; 194 } 195 else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 196 incoming_context_t *ctx = bucket->data; 197 198 return ctx->headers; 199 } 200 201 return NULL; 202} 203 204static int count_size(void *baton, const char *key, const char *value) 205{ 206 apr_size_t *c = baton; 207 /* TODO Deal with folding. Yikes. */ 208 209 /* Add in ": " and CRLF - so an extra four bytes. */ 210 *c += strlen(key) + strlen(value) + 4; 211 212 return 0; 213} 214 215static apr_size_t calc_header_size(serf_bucket_t *hdrs) 216{ 217 apr_size_t size = 0; 218 219 serf_bucket_headers_do(hdrs, count_size, &size); 220 221 return size; 222} 223 224static void serialize_data(serf_bucket_t *bucket) 225{ 226 frame_context_t *ctx = bucket->data; 227 serf_bucket_t *new_bucket; 228 apr_size_t req_len; 229 230 /* Serialize the request-line and headers into one mother string, 231 * and wrap a bucket around it. 232 */ 233 req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line), 234 "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n", 235 (ctx->type ? "BWM" : "BWH"), 236 ctx->channel, calc_header_size(ctx->headers), 237 (ctx->open ? "OPEN " : ""), 238 ctx->phrase); 239 new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len, 240 bucket->allocator); 241 242 /* Build up the new bucket structure. 243 * 244 * Note that self needs to become an aggregate bucket so that a 245 * pointer to self still represents the "right" data. 246 */ 247 serf_bucket_aggregate_become(bucket); 248 249 /* Insert the two buckets. */ 250 serf_bucket_aggregate_append(bucket, new_bucket); 251 serf_bucket_aggregate_append(bucket, ctx->headers); 252 253 /* Our private context is no longer needed, and is not referred to by 254 * any existing bucket. Toss it. 255 */ 256 serf_bucket_mem_free(bucket->allocator, ctx); 257} 258 259static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket, 260 apr_size_t requested, 261 const char **data, apr_size_t *len) 262{ 263 /* Seralize our private data into a new aggregate bucket. */ 264 serialize_data(bucket); 265 266 /* Delegate to the "new" aggregate bucket to do the read. */ 267 return serf_bucket_read(bucket, requested, data, len); 268} 269 270static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket, 271 int acceptable, int *found, 272 const char **data, apr_size_t *len) 273{ 274 /* Seralize our private data into a new aggregate bucket. */ 275 serialize_data(bucket); 276 277 /* Delegate to the "new" aggregate bucket to do the readline. */ 278 return serf_bucket_readline(bucket, acceptable, found, data, len); 279} 280 281static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket, 282 apr_size_t requested, 283 int vecs_size, 284 struct iovec *vecs, 285 int *vecs_used) 286{ 287 /* Seralize our private data into a new aggregate bucket. */ 288 serialize_data(bucket); 289 290 /* Delegate to the "new" aggregate bucket to do the read. */ 291 return serf_bucket_read_iovec(bucket, requested, 292 vecs_size, vecs, vecs_used); 293} 294 295static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket, 296 const char **data, 297 apr_size_t *len) 298{ 299 /* Seralize our private data into a new aggregate bucket. */ 300 serialize_data(bucket); 301 302 /* Delegate to the "new" aggregate bucket to do the peek. */ 303 return serf_bucket_peek(bucket, data, len); 304} 305 306const serf_bucket_type_t serf_bucket_type_bwtp_frame = { 307 "BWTP-FRAME", 308 serf_bwtp_frame_read, 309 serf_bwtp_frame_readline, 310 serf_bwtp_frame_read_iovec, 311 serf_default_read_for_sendfile, 312 serf_default_read_bucket, 313 serf_bwtp_frame_peek, 314 serf_default_destroy_and_data, 315}; 316 317 318serf_bucket_t *serf_bucket_bwtp_incoming_frame_create( 319 serf_bucket_t *stream, 320 serf_bucket_alloc_t *allocator) 321{ 322 incoming_context_t *ctx; 323 324 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 325 ctx->stream = stream; 326 ctx->body = NULL; 327 ctx->headers = serf_bucket_headers_create(allocator); 328 ctx->state = STATE_STATUS_LINE; 329 ctx->length = 0; 330 ctx->channel = -1; 331 ctx->phrase = NULL; 332 333 serf_linebuf_init(&ctx->linebuf); 334 335 return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx); 336} 337 338static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket) 339{ 340 incoming_context_t *ctx = bucket->data; 341 342 if (ctx->state != STATE_STATUS_LINE && ctx->phrase) { 343 serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase); 344 } 345 346 serf_bucket_destroy(ctx->stream); 347 if (ctx->body != NULL) 348 serf_bucket_destroy(ctx->body); 349 serf_bucket_destroy(ctx->headers); 350 351 serf_default_destroy_and_data(bucket); 352} 353 354static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable) 355{ 356 return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable); 357} 358 359static apr_status_t parse_status_line(incoming_context_t *ctx, 360 serf_bucket_alloc_t *allocator) 361{ 362 int res; 363 char *reason; /* ### stupid APR interface makes this non-const */ 364 365 /* ctx->linebuf.line should be of form: BW* */ 366 res = apr_date_checkmask(ctx->linebuf.line, "BW*"); 367 if (!res) { 368 /* Not an BWTP response? Well, at least we won't understand it. */ 369 return APR_EGENERAL; 370 } 371 372 if (ctx->linebuf.line[2] == 'H') { 373 ctx->type = 0; 374 } 375 else if (ctx->linebuf.line[2] == 'M') { 376 ctx->type = 1; 377 } 378 else { 379 ctx->type = -1; 380 } 381 382 ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16); 383 384 /* Skip leading spaces for the reason string. */ 385 if (apr_isspace(*reason)) { 386 reason++; 387 } 388 389 ctx->length = apr_strtoi64(reason, &reason, 16); 390 391 /* Skip leading spaces for the reason string. */ 392 if (reason - ctx->linebuf.line < ctx->linebuf.used) { 393 if (apr_isspace(*reason)) { 394 reason++; 395 } 396 397 ctx->phrase = serf_bstrmemdup(allocator, reason, 398 ctx->linebuf.used 399 - (reason - ctx->linebuf.line)); 400 } else { 401 ctx->phrase = NULL; 402 } 403 404 return APR_SUCCESS; 405} 406 407/* This code should be replaced with header buckets. */ 408static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx) 409{ 410 apr_status_t status; 411 412 /* RFC 2616 says that CRLF is the only line ending, but we can easily 413 * accept any kind of line ending. 414 */ 415 status = fetch_line(ctx, SERF_NEWLINE_ANY); 416 if (SERF_BUCKET_READ_ERROR(status)) { 417 return status; 418 } 419 /* Something was read. Process it. */ 420 421 if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { 422 const char *end_key; 423 const char *c; 424 425 end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used); 426 if (!c) { 427 /* Bad headers? */ 428 return APR_EGENERAL; 429 } 430 431 /* Skip over initial : and spaces. */ 432 while (apr_isspace(*++c)) 433 continue; 434 435 /* Always copy the headers (from the linebuf into new mem). */ 436 /* ### we should be able to optimize some mem copies */ 437 serf_bucket_headers_setx( 438 ctx->headers, 439 ctx->linebuf.line, end_key - ctx->linebuf.line, 1, 440 c, ctx->linebuf.line + ctx->linebuf.used - c, 1); 441 } 442 443 return status; 444} 445 446/* Perform one iteration of the state machine. 447 * 448 * Will return when one the following conditions occurred: 449 * 1) a state change 450 * 2) an error 451 * 3) the stream is not ready or at EOF 452 * 4) APR_SUCCESS, meaning the machine can be run again immediately 453 */ 454static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx) 455{ 456 apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */ 457 458 switch (ctx->state) { 459 case STATE_STATUS_LINE: 460 /* RFC 2616 says that CRLF is the only line ending, but we can easily 461 * accept any kind of line ending. 462 */ 463 status = fetch_line(ctx, SERF_NEWLINE_ANY); 464 if (SERF_BUCKET_READ_ERROR(status)) 465 return status; 466 467 if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { 468 /* The Status-Line is in the line buffer. Process it. */ 469 status = parse_status_line(ctx, bkt->allocator); 470 if (status) 471 return status; 472 473 if (ctx->length) { 474 ctx->body = 475 serf_bucket_barrier_create(ctx->stream, bkt->allocator); 476 ctx->body = serf_bucket_limit_create(ctx->body, ctx->length, 477 bkt->allocator); 478 if (!ctx->type) { 479 ctx->state = STATE_HEADERS; 480 } else { 481 ctx->state = STATE_BODY; 482 } 483 } else { 484 ctx->state = STATE_DONE; 485 } 486 } 487 else { 488 /* The connection closed before we could get the next 489 * response. Treat the request as lost so that our upper 490 * end knows the server never tried to give us a response. 491 */ 492 if (APR_STATUS_IS_EOF(status)) { 493 return SERF_ERROR_REQUEST_LOST; 494 } 495 } 496 break; 497 case STATE_HEADERS: 498 status = fetch_headers(ctx->body, ctx); 499 if (SERF_BUCKET_READ_ERROR(status)) 500 return status; 501 502 /* If an empty line was read, then we hit the end of the headers. 503 * Move on to the body. 504 */ 505 if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { 506 /* Advance the state. */ 507 ctx->state = STATE_DONE; 508 } 509 break; 510 case STATE_BODY: 511 /* Don't do anything. */ 512 break; 513 case STATE_DONE: 514 return APR_EOF; 515 default: 516 /* Not reachable */ 517 return APR_EGENERAL; 518 } 519 520 return status; 521} 522 523static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx) 524{ 525 apr_status_t status; 526 527 /* Keep reading and moving through states if we aren't at the BODY */ 528 while (ctx->state != STATE_BODY) { 529 status = run_machine(bkt, ctx); 530 531 /* Anything other than APR_SUCCESS means that we cannot immediately 532 * read again (for now). 533 */ 534 if (status) 535 return status; 536 } 537 /* in STATE_BODY */ 538 539 return APR_SUCCESS; 540} 541 542apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers( 543 serf_bucket_t *bucket) 544{ 545 incoming_context_t *ctx = bucket->data; 546 547 return wait_for_body(bucket, ctx); 548} 549 550static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket, 551 apr_size_t requested, 552 const char **data, apr_size_t *len) 553{ 554 incoming_context_t *ctx = bucket->data; 555 apr_status_t rv; 556 557 rv = wait_for_body(bucket, ctx); 558 if (rv) { 559 /* It's not possible to have read anything yet! */ 560 if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) { 561 *len = 0; 562 } 563 return rv; 564 } 565 566 rv = serf_bucket_read(ctx->body, requested, data, len); 567 if (APR_STATUS_IS_EOF(rv)) { 568 ctx->state = STATE_DONE; 569 } 570 return rv; 571} 572 573static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket, 574 int acceptable, int *found, 575 const char **data, apr_size_t *len) 576{ 577 incoming_context_t *ctx = bucket->data; 578 apr_status_t rv; 579 580 rv = wait_for_body(bucket, ctx); 581 if (rv) { 582 return rv; 583 } 584 585 /* Delegate to the stream bucket to do the readline. */ 586 return serf_bucket_readline(ctx->body, acceptable, found, data, len); 587} 588 589/* ### need to implement */ 590#define bwtp_incoming_peek NULL 591 592const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = { 593 "BWTP-INCOMING", 594 bwtp_incoming_read, 595 bwtp_incoming_readline, 596 serf_default_read_iovec, 597 serf_default_read_for_sendfile, 598 serf_default_read_bucket, 599 bwtp_incoming_peek, 600 bwtp_incoming_destroy_and_data, 601}; 602