1251877Speter/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein 2251877Speter * 3251877Speter * Licensed under the Apache License, Version 2.0 (the "License"); 4251877Speter * you may not use this file except in compliance with the License. 5251877Speter * You may obtain a copy of the License at 6251877Speter * 7251877Speter * http://www.apache.org/licenses/LICENSE-2.0 8251877Speter * 9251877Speter * Unless required by applicable law or agreed to in writing, software 10251877Speter * distributed under the License is distributed on an "AS IS" BASIS, 11251877Speter * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12251877Speter * See the License for the specific language governing permissions and 13251877Speter * limitations under the License. 14251877Speter */ 15251877Speter 16251877Speter#include <apr_pools.h> 17251877Speter#include <apr_strings.h> 18251877Speter#include <apr_lib.h> 19251877Speter#include <apr_date.h> 20251877Speter 21251877Speter#include "serf.h" 22251877Speter#include "serf_bucket_util.h" 23251877Speter#include "serf_bucket_types.h" 24251877Speter 25251877Speter#include <stdlib.h> 26251877Speter 27251877Speter/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP) 28251877Speter * See: 29251877Speter * http://bwtp.wikidot.com/ 30251877Speter */ 31251877Speter 32251877Spetertypedef struct { 33251877Speter int channel; 34251877Speter int open; 35251877Speter int type; /* 0 = header, 1 = message */ /* TODO enum? */ 36251877Speter const char *phrase; 37251877Speter serf_bucket_t *headers; 38251877Speter 39251877Speter char req_line[1000]; 40251877Speter} frame_context_t; 41251877Speter 42251877Spetertypedef struct { 43251877Speter serf_bucket_t *stream; 44251877Speter serf_bucket_t *body; /* Pointer to the stream wrapping the body. */ 45251877Speter serf_bucket_t *headers; /* holds parsed headers */ 46251877Speter 47251877Speter enum { 48251877Speter STATE_STATUS_LINE, /* reading status line */ 49251877Speter STATE_HEADERS, /* reading headers */ 50251877Speter STATE_BODY, /* reading body */ 51251877Speter STATE_DONE /* we've sent EOF */ 52251877Speter } state; 53251877Speter 54251877Speter /* Buffer for accumulating a line from the response. */ 55251877Speter serf_linebuf_t linebuf; 56251877Speter 57251877Speter int type; /* 0 = header, 1 = message */ /* TODO enum? */ 58251877Speter int channel; 59251877Speter char *phrase; 60251877Speter apr_size_t length; 61251877Speter} incoming_context_t; 62251877Speter 63251877Speter 64251877Speterserf_bucket_t *serf_bucket_bwtp_channel_close( 65251877Speter int channel, 66251877Speter serf_bucket_alloc_t *allocator) 67251877Speter{ 68251877Speter frame_context_t *ctx; 69251877Speter 70251877Speter ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 71251877Speter ctx->type = 0; 72251877Speter ctx->open = 0; 73251877Speter ctx->channel = channel; 74251877Speter ctx->phrase = "CLOSED"; 75251877Speter ctx->headers = serf_bucket_headers_create(allocator); 76251877Speter 77251877Speter return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 78251877Speter} 79251877Speter 80251877Speterserf_bucket_t *serf_bucket_bwtp_channel_open( 81251877Speter int channel, 82251877Speter const char *uri, 83251877Speter serf_bucket_alloc_t *allocator) 84251877Speter{ 85251877Speter frame_context_t *ctx; 86251877Speter 87251877Speter ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 88251877Speter ctx->type = 0; 89251877Speter ctx->open = 1; 90251877Speter ctx->channel = channel; 91251877Speter ctx->phrase = uri; 92251877Speter ctx->headers = serf_bucket_headers_create(allocator); 93251877Speter 94251877Speter return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 95251877Speter} 96251877Speter 97251877Speterserf_bucket_t *serf_bucket_bwtp_header_create( 98251877Speter int channel, 99251877Speter const char *phrase, 100251877Speter serf_bucket_alloc_t *allocator) 101251877Speter{ 102251877Speter frame_context_t *ctx; 103251877Speter 104251877Speter ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 105251877Speter ctx->type = 0; 106251877Speter ctx->open = 0; 107251877Speter ctx->channel = channel; 108251877Speter ctx->phrase = phrase; 109251877Speter ctx->headers = serf_bucket_headers_create(allocator); 110251877Speter 111251877Speter return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 112251877Speter} 113251877Speter 114251877Speterserf_bucket_t *serf_bucket_bwtp_message_create( 115251877Speter int channel, 116251877Speter serf_bucket_t *body, 117251877Speter serf_bucket_alloc_t *allocator) 118251877Speter{ 119251877Speter frame_context_t *ctx; 120251877Speter 121251877Speter ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 122251877Speter ctx->type = 1; 123251877Speter ctx->open = 0; 124251877Speter ctx->channel = channel; 125251877Speter ctx->phrase = "MESSAGE"; 126251877Speter ctx->headers = serf_bucket_headers_create(allocator); 127251877Speter 128251877Speter return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx); 129251877Speter} 130251877Speter 131251877Speterint serf_bucket_bwtp_frame_get_channel( 132251877Speter serf_bucket_t *bucket) 133251877Speter{ 134251877Speter if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 135251877Speter frame_context_t *ctx = bucket->data; 136251877Speter 137251877Speter return ctx->channel; 138251877Speter } 139251877Speter else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 140251877Speter incoming_context_t *ctx = bucket->data; 141251877Speter 142251877Speter return ctx->channel; 143251877Speter } 144251877Speter 145251877Speter return -1; 146251877Speter} 147251877Speter 148251877Speterint serf_bucket_bwtp_frame_get_type( 149251877Speter serf_bucket_t *bucket) 150251877Speter{ 151251877Speter if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 152251877Speter frame_context_t *ctx = bucket->data; 153251877Speter 154251877Speter return ctx->type; 155251877Speter } 156251877Speter else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 157251877Speter incoming_context_t *ctx = bucket->data; 158251877Speter 159251877Speter return ctx->type; 160251877Speter } 161251877Speter 162251877Speter return -1; 163251877Speter} 164251877Speter 165251877Speterconst char *serf_bucket_bwtp_frame_get_phrase( 166251877Speter serf_bucket_t *bucket) 167251877Speter{ 168251877Speter if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 169251877Speter frame_context_t *ctx = bucket->data; 170251877Speter 171251877Speter return ctx->phrase; 172251877Speter } 173251877Speter else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 174251877Speter incoming_context_t *ctx = bucket->data; 175251877Speter 176251877Speter return ctx->phrase; 177251877Speter } 178251877Speter 179251877Speter return NULL; 180251877Speter} 181251877Speter 182251877Speterserf_bucket_t *serf_bucket_bwtp_frame_get_headers( 183251877Speter serf_bucket_t *bucket) 184251877Speter{ 185251877Speter if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) { 186251877Speter frame_context_t *ctx = bucket->data; 187251877Speter 188251877Speter return ctx->headers; 189251877Speter } 190251877Speter else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) { 191251877Speter incoming_context_t *ctx = bucket->data; 192251877Speter 193251877Speter return ctx->headers; 194251877Speter } 195251877Speter 196251877Speter return NULL; 197251877Speter} 198251877Speter 199251877Speterstatic int count_size(void *baton, const char *key, const char *value) 200251877Speter{ 201251877Speter apr_size_t *c = baton; 202251877Speter /* TODO Deal with folding. Yikes. */ 203251877Speter 204251877Speter /* Add in ": " and CRLF - so an extra four bytes. */ 205251877Speter *c += strlen(key) + strlen(value) + 4; 206251877Speter 207251877Speter return 0; 208251877Speter} 209251877Speter 210251877Speterstatic apr_size_t calc_header_size(serf_bucket_t *hdrs) 211251877Speter{ 212251877Speter apr_size_t size = 0; 213251877Speter 214251877Speter serf_bucket_headers_do(hdrs, count_size, &size); 215251877Speter 216251877Speter return size; 217251877Speter} 218251877Speter 219251877Speterstatic void serialize_data(serf_bucket_t *bucket) 220251877Speter{ 221251877Speter frame_context_t *ctx = bucket->data; 222251877Speter serf_bucket_t *new_bucket; 223251877Speter apr_size_t req_len; 224251877Speter 225251877Speter /* Serialize the request-line and headers into one mother string, 226251877Speter * and wrap a bucket around it. 227251877Speter */ 228251877Speter req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line), 229251877Speter "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n", 230251877Speter (ctx->type ? "BWM" : "BWH"), 231251877Speter ctx->channel, calc_header_size(ctx->headers), 232251877Speter (ctx->open ? "OPEN " : ""), 233251877Speter ctx->phrase); 234251877Speter new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len, 235251877Speter bucket->allocator); 236251877Speter 237251877Speter /* Build up the new bucket structure. 238251877Speter * 239251877Speter * Note that self needs to become an aggregate bucket so that a 240251877Speter * pointer to self still represents the "right" data. 241251877Speter */ 242251877Speter serf_bucket_aggregate_become(bucket); 243251877Speter 244251877Speter /* Insert the two buckets. */ 245251877Speter serf_bucket_aggregate_append(bucket, new_bucket); 246251877Speter serf_bucket_aggregate_append(bucket, ctx->headers); 247251877Speter 248251877Speter /* Our private context is no longer needed, and is not referred to by 249251877Speter * any existing bucket. Toss it. 250251877Speter */ 251251877Speter serf_bucket_mem_free(bucket->allocator, ctx); 252251877Speter} 253251877Speter 254251877Speterstatic apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket, 255251877Speter apr_size_t requested, 256251877Speter const char **data, apr_size_t *len) 257251877Speter{ 258251877Speter /* Seralize our private data into a new aggregate bucket. */ 259251877Speter serialize_data(bucket); 260251877Speter 261251877Speter /* Delegate to the "new" aggregate bucket to do the read. */ 262251877Speter return serf_bucket_read(bucket, requested, data, len); 263251877Speter} 264251877Speter 265251877Speterstatic apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket, 266251877Speter int acceptable, int *found, 267251877Speter const char **data, apr_size_t *len) 268251877Speter{ 269251877Speter /* Seralize our private data into a new aggregate bucket. */ 270251877Speter serialize_data(bucket); 271251877Speter 272251877Speter /* Delegate to the "new" aggregate bucket to do the readline. */ 273251877Speter return serf_bucket_readline(bucket, acceptable, found, data, len); 274251877Speter} 275251877Speter 276251877Speterstatic apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket, 277251877Speter apr_size_t requested, 278251877Speter int vecs_size, 279251877Speter struct iovec *vecs, 280251877Speter int *vecs_used) 281251877Speter{ 282251877Speter /* Seralize our private data into a new aggregate bucket. */ 283251877Speter serialize_data(bucket); 284251877Speter 285251877Speter /* Delegate to the "new" aggregate bucket to do the read. */ 286251877Speter return serf_bucket_read_iovec(bucket, requested, 287251877Speter vecs_size, vecs, vecs_used); 288251877Speter} 289251877Speter 290251877Speterstatic apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket, 291251877Speter const char **data, 292251877Speter apr_size_t *len) 293251877Speter{ 294251877Speter /* Seralize our private data into a new aggregate bucket. */ 295251877Speter serialize_data(bucket); 296251877Speter 297251877Speter /* Delegate to the "new" aggregate bucket to do the peek. */ 298251877Speter return serf_bucket_peek(bucket, data, len); 299251877Speter} 300251877Speter 301251877Speterconst serf_bucket_type_t serf_bucket_type_bwtp_frame = { 302251877Speter "BWTP-FRAME", 303251877Speter serf_bwtp_frame_read, 304251877Speter serf_bwtp_frame_readline, 305251877Speter serf_bwtp_frame_read_iovec, 306251877Speter serf_default_read_for_sendfile, 307251877Speter serf_default_read_bucket, 308251877Speter serf_bwtp_frame_peek, 309251877Speter serf_default_destroy_and_data, 310251877Speter}; 311251877Speter 312251877Speter 313251877Speterserf_bucket_t *serf_bucket_bwtp_incoming_frame_create( 314251877Speter serf_bucket_t *stream, 315251877Speter serf_bucket_alloc_t *allocator) 316251877Speter{ 317251877Speter incoming_context_t *ctx; 318251877Speter 319251877Speter ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 320251877Speter ctx->stream = stream; 321251877Speter ctx->body = NULL; 322251877Speter ctx->headers = serf_bucket_headers_create(allocator); 323251877Speter ctx->state = STATE_STATUS_LINE; 324251877Speter ctx->length = 0; 325251877Speter ctx->channel = -1; 326251877Speter ctx->phrase = NULL; 327251877Speter 328251877Speter serf_linebuf_init(&ctx->linebuf); 329251877Speter 330251877Speter return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx); 331251877Speter} 332251877Speter 333251877Speterstatic void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket) 334251877Speter{ 335251877Speter incoming_context_t *ctx = bucket->data; 336251877Speter 337251877Speter if (ctx->state != STATE_STATUS_LINE && ctx->phrase) { 338251877Speter serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase); 339251877Speter } 340251877Speter 341251877Speter serf_bucket_destroy(ctx->stream); 342251877Speter if (ctx->body != NULL) 343251877Speter serf_bucket_destroy(ctx->body); 344251877Speter serf_bucket_destroy(ctx->headers); 345251877Speter 346251877Speter serf_default_destroy_and_data(bucket); 347251877Speter} 348251877Speter 349251877Speterstatic apr_status_t fetch_line(incoming_context_t *ctx, int acceptable) 350251877Speter{ 351251877Speter return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable); 352251877Speter} 353251877Speter 354251877Speterstatic apr_status_t parse_status_line(incoming_context_t *ctx, 355251877Speter serf_bucket_alloc_t *allocator) 356251877Speter{ 357251877Speter int res; 358251877Speter char *reason; /* ### stupid APR interface makes this non-const */ 359251877Speter 360251877Speter /* ctx->linebuf.line should be of form: BW* */ 361251877Speter res = apr_date_checkmask(ctx->linebuf.line, "BW*"); 362251877Speter if (!res) { 363251877Speter /* Not an BWTP response? Well, at least we won't understand it. */ 364251877Speter return APR_EGENERAL; 365251877Speter } 366251877Speter 367251877Speter if (ctx->linebuf.line[2] == 'H') { 368251877Speter ctx->type = 0; 369251877Speter } 370251877Speter else if (ctx->linebuf.line[2] == 'M') { 371251877Speter ctx->type = 1; 372251877Speter } 373251877Speter else { 374251877Speter ctx->type = -1; 375251877Speter } 376251877Speter 377251877Speter ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16); 378251877Speter 379251877Speter /* Skip leading spaces for the reason string. */ 380251877Speter if (apr_isspace(*reason)) { 381251877Speter reason++; 382251877Speter } 383251877Speter 384251877Speter ctx->length = apr_strtoi64(reason, &reason, 16); 385251877Speter 386251877Speter /* Skip leading spaces for the reason string. */ 387251877Speter if (reason - ctx->linebuf.line < ctx->linebuf.used) { 388251877Speter if (apr_isspace(*reason)) { 389251877Speter reason++; 390251877Speter } 391251877Speter 392251877Speter ctx->phrase = serf_bstrmemdup(allocator, reason, 393251877Speter ctx->linebuf.used 394251877Speter - (reason - ctx->linebuf.line)); 395251877Speter } else { 396251877Speter ctx->phrase = NULL; 397251877Speter } 398251877Speter 399251877Speter return APR_SUCCESS; 400251877Speter} 401251877Speter 402251877Speter/* This code should be replaced with header buckets. */ 403251877Speterstatic apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx) 404251877Speter{ 405251877Speter apr_status_t status; 406251877Speter 407251877Speter /* RFC 2616 says that CRLF is the only line ending, but we can easily 408251877Speter * accept any kind of line ending. 409251877Speter */ 410251877Speter status = fetch_line(ctx, SERF_NEWLINE_ANY); 411251877Speter if (SERF_BUCKET_READ_ERROR(status)) { 412251877Speter return status; 413251877Speter } 414251877Speter /* Something was read. Process it. */ 415251877Speter 416251877Speter if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { 417251877Speter const char *end_key; 418251877Speter const char *c; 419251877Speter 420251877Speter end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used); 421251877Speter if (!c) { 422251877Speter /* Bad headers? */ 423251877Speter return APR_EGENERAL; 424251877Speter } 425251877Speter 426251877Speter /* Skip over initial : and spaces. */ 427251877Speter while (apr_isspace(*++c)) 428251877Speter continue; 429251877Speter 430251877Speter /* Always copy the headers (from the linebuf into new mem). */ 431251877Speter /* ### we should be able to optimize some mem copies */ 432251877Speter serf_bucket_headers_setx( 433251877Speter ctx->headers, 434251877Speter ctx->linebuf.line, end_key - ctx->linebuf.line, 1, 435251877Speter c, ctx->linebuf.line + ctx->linebuf.used - c, 1); 436251877Speter } 437251877Speter 438251877Speter return status; 439251877Speter} 440251877Speter 441251877Speter/* Perform one iteration of the state machine. 442251877Speter * 443251877Speter * Will return when one the following conditions occurred: 444251877Speter * 1) a state change 445251877Speter * 2) an error 446251877Speter * 3) the stream is not ready or at EOF 447251877Speter * 4) APR_SUCCESS, meaning the machine can be run again immediately 448251877Speter */ 449251877Speterstatic apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx) 450251877Speter{ 451251877Speter apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */ 452251877Speter 453251877Speter switch (ctx->state) { 454251877Speter case STATE_STATUS_LINE: 455251877Speter /* RFC 2616 says that CRLF is the only line ending, but we can easily 456251877Speter * accept any kind of line ending. 457251877Speter */ 458251877Speter status = fetch_line(ctx, SERF_NEWLINE_ANY); 459251877Speter if (SERF_BUCKET_READ_ERROR(status)) 460251877Speter return status; 461251877Speter 462251877Speter if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) { 463251877Speter /* The Status-Line is in the line buffer. Process it. */ 464251877Speter status = parse_status_line(ctx, bkt->allocator); 465251877Speter if (status) 466251877Speter return status; 467251877Speter 468251877Speter if (ctx->length) { 469251877Speter ctx->body = 470251877Speter serf_bucket_barrier_create(ctx->stream, bkt->allocator); 471251877Speter ctx->body = serf_bucket_limit_create(ctx->body, ctx->length, 472251877Speter bkt->allocator); 473251877Speter if (!ctx->type) { 474251877Speter ctx->state = STATE_HEADERS; 475251877Speter } else { 476251877Speter ctx->state = STATE_BODY; 477251877Speter } 478251877Speter } else { 479251877Speter ctx->state = STATE_DONE; 480251877Speter } 481251877Speter } 482251877Speter else { 483251877Speter /* The connection closed before we could get the next 484251877Speter * response. Treat the request as lost so that our upper 485251877Speter * end knows the server never tried to give us a response. 486251877Speter */ 487251877Speter if (APR_STATUS_IS_EOF(status)) { 488251877Speter return SERF_ERROR_REQUEST_LOST; 489251877Speter } 490251877Speter } 491251877Speter break; 492251877Speter case STATE_HEADERS: 493251877Speter status = fetch_headers(ctx->body, ctx); 494251877Speter if (SERF_BUCKET_READ_ERROR(status)) 495251877Speter return status; 496251877Speter 497251877Speter /* If an empty line was read, then we hit the end of the headers. 498251877Speter * Move on to the body. 499251877Speter */ 500251877Speter if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) { 501251877Speter /* Advance the state. */ 502251877Speter ctx->state = STATE_DONE; 503251877Speter } 504251877Speter break; 505251877Speter case STATE_BODY: 506251877Speter /* Don't do anything. */ 507251877Speter break; 508251877Speter case STATE_DONE: 509251877Speter return APR_EOF; 510251877Speter default: 511251877Speter /* Not reachable */ 512251877Speter return APR_EGENERAL; 513251877Speter } 514251877Speter 515251877Speter return status; 516251877Speter} 517251877Speter 518251877Speterstatic apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx) 519251877Speter{ 520251877Speter apr_status_t status; 521251877Speter 522251877Speter /* Keep reading and moving through states if we aren't at the BODY */ 523251877Speter while (ctx->state != STATE_BODY) { 524251877Speter status = run_machine(bkt, ctx); 525251877Speter 526251877Speter /* Anything other than APR_SUCCESS means that we cannot immediately 527251877Speter * read again (for now). 528251877Speter */ 529251877Speter if (status) 530251877Speter return status; 531251877Speter } 532251877Speter /* in STATE_BODY */ 533251877Speter 534251877Speter return APR_SUCCESS; 535251877Speter} 536251877Speter 537251877Speterapr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers( 538251877Speter serf_bucket_t *bucket) 539251877Speter{ 540251877Speter incoming_context_t *ctx = bucket->data; 541251877Speter 542251877Speter return wait_for_body(bucket, ctx); 543251877Speter} 544251877Speter 545251877Speterstatic apr_status_t bwtp_incoming_read(serf_bucket_t *bucket, 546251877Speter apr_size_t requested, 547251877Speter const char **data, apr_size_t *len) 548251877Speter{ 549251877Speter incoming_context_t *ctx = bucket->data; 550251877Speter apr_status_t rv; 551251877Speter 552251877Speter rv = wait_for_body(bucket, ctx); 553251877Speter if (rv) { 554251877Speter /* It's not possible to have read anything yet! */ 555251877Speter if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) { 556251877Speter *len = 0; 557251877Speter } 558251877Speter return rv; 559251877Speter } 560251877Speter 561251877Speter rv = serf_bucket_read(ctx->body, requested, data, len); 562251877Speter if (APR_STATUS_IS_EOF(rv)) { 563251877Speter ctx->state = STATE_DONE; 564251877Speter } 565251877Speter return rv; 566251877Speter} 567251877Speter 568251877Speterstatic apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket, 569251877Speter int acceptable, int *found, 570251877Speter const char **data, apr_size_t *len) 571251877Speter{ 572251877Speter incoming_context_t *ctx = bucket->data; 573251877Speter apr_status_t rv; 574251877Speter 575251877Speter rv = wait_for_body(bucket, ctx); 576251877Speter if (rv) { 577251877Speter return rv; 578251877Speter } 579251877Speter 580251877Speter /* Delegate to the stream bucket to do the readline. */ 581251877Speter return serf_bucket_readline(ctx->body, acceptable, found, data, len); 582251877Speter} 583251877Speter 584251877Speter/* ### need to implement */ 585251877Speter#define bwtp_incoming_peek NULL 586251877Speter 587251877Speterconst serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = { 588251877Speter "BWTP-INCOMING", 589251877Speter bwtp_incoming_read, 590251877Speter bwtp_incoming_readline, 591251877Speter serf_default_read_iovec, 592251877Speter serf_default_read_for_sendfile, 593251877Speter serf_default_read_bucket, 594251877Speter bwtp_incoming_peek, 595251877Speter bwtp_incoming_destroy_and_data, 596251877Speter}; 597