1251877Speter/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2251877Speter *
3251877Speter * Licensed under the Apache License, Version 2.0 (the "License");
4251877Speter * you may not use this file except in compliance with the License.
5251877Speter * You may obtain a copy of the License at
6251877Speter *
7251877Speter *     http://www.apache.org/licenses/LICENSE-2.0
8251877Speter *
9251877Speter * Unless required by applicable law or agreed to in writing, software
10251877Speter * distributed under the License is distributed on an "AS IS" BASIS,
11251877Speter * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12251877Speter * See the License for the specific language governing permissions and
13251877Speter * limitations under the License.
14251877Speter */
15251877Speter
16251877Speter#include <apr_pools.h>
17251877Speter#include <apr_strings.h>
18251877Speter#include <apr_lib.h>
19251877Speter#include <apr_date.h>
20251877Speter
21251877Speter#include "serf.h"
22251877Speter#include "serf_bucket_util.h"
23251877Speter#include "serf_bucket_types.h"
24251877Speter
25251877Speter#include <stdlib.h>
26251877Speter
27251877Speter/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
28251877Speter * See:
29251877Speter *   http://bwtp.wikidot.com/
30251877Speter */
31251877Speter
32251877Spetertypedef struct {
33251877Speter    int channel;
34251877Speter    int open;
35251877Speter    int type; /* 0 = header, 1 = message */ /* TODO enum? */
36251877Speter    const char *phrase;
37251877Speter    serf_bucket_t *headers;
38251877Speter
39251877Speter    char req_line[1000];
40251877Speter} frame_context_t;
41251877Speter
42251877Spetertypedef struct {
43251877Speter    serf_bucket_t *stream;
44251877Speter    serf_bucket_t *body;        /* Pointer to the stream wrapping the body. */
45251877Speter    serf_bucket_t *headers;     /* holds parsed headers */
46251877Speter
47251877Speter    enum {
48251877Speter        STATE_STATUS_LINE,      /* reading status line */
49251877Speter        STATE_HEADERS,          /* reading headers */
50251877Speter        STATE_BODY,             /* reading body */
51251877Speter        STATE_DONE              /* we've sent EOF */
52251877Speter    } state;
53251877Speter
54251877Speter    /* Buffer for accumulating a line from the response. */
55251877Speter    serf_linebuf_t linebuf;
56251877Speter
57251877Speter    int type; /* 0 = header, 1 = message */ /* TODO enum? */
58251877Speter    int channel;
59251877Speter    char *phrase;
60251877Speter    apr_size_t length;
61251877Speter} incoming_context_t;
62251877Speter
63251877Speter
64251877Speterserf_bucket_t *serf_bucket_bwtp_channel_close(
65251877Speter    int channel,
66251877Speter    serf_bucket_alloc_t *allocator)
67251877Speter{
68251877Speter    frame_context_t *ctx;
69251877Speter
70251877Speter    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
71251877Speter    ctx->type = 0;
72251877Speter    ctx->open = 0;
73251877Speter    ctx->channel = channel;
74251877Speter    ctx->phrase = "CLOSED";
75251877Speter    ctx->headers = serf_bucket_headers_create(allocator);
76251877Speter
77251877Speter    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
78251877Speter}
79251877Speter
80251877Speterserf_bucket_t *serf_bucket_bwtp_channel_open(
81251877Speter    int channel,
82251877Speter    const char *uri,
83251877Speter    serf_bucket_alloc_t *allocator)
84251877Speter{
85251877Speter    frame_context_t *ctx;
86251877Speter
87251877Speter    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
88251877Speter    ctx->type = 0;
89251877Speter    ctx->open = 1;
90251877Speter    ctx->channel = channel;
91251877Speter    ctx->phrase = uri;
92251877Speter    ctx->headers = serf_bucket_headers_create(allocator);
93251877Speter
94251877Speter    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
95251877Speter}
96251877Speter
97251877Speterserf_bucket_t *serf_bucket_bwtp_header_create(
98251877Speter    int channel,
99251877Speter    const char *phrase,
100251877Speter    serf_bucket_alloc_t *allocator)
101251877Speter{
102251877Speter    frame_context_t *ctx;
103251877Speter
104251877Speter    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
105251877Speter    ctx->type = 0;
106251877Speter    ctx->open = 0;
107251877Speter    ctx->channel = channel;
108251877Speter    ctx->phrase = phrase;
109251877Speter    ctx->headers = serf_bucket_headers_create(allocator);
110251877Speter
111251877Speter    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
112251877Speter}
113251877Speter
114251877Speterserf_bucket_t *serf_bucket_bwtp_message_create(
115251877Speter    int channel,
116251877Speter    serf_bucket_t *body,
117251877Speter    serf_bucket_alloc_t *allocator)
118251877Speter{
119251877Speter    frame_context_t *ctx;
120251877Speter
121251877Speter    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
122251877Speter    ctx->type = 1;
123251877Speter    ctx->open = 0;
124251877Speter    ctx->channel = channel;
125251877Speter    ctx->phrase = "MESSAGE";
126251877Speter    ctx->headers = serf_bucket_headers_create(allocator);
127251877Speter
128251877Speter    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
129251877Speter}
130251877Speter
131251877Speterint serf_bucket_bwtp_frame_get_channel(
132251877Speter    serf_bucket_t *bucket)
133251877Speter{
134251877Speter    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
135251877Speter        frame_context_t *ctx = bucket->data;
136251877Speter
137251877Speter        return ctx->channel;
138251877Speter    }
139251877Speter    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
140251877Speter        incoming_context_t *ctx = bucket->data;
141251877Speter
142251877Speter        return ctx->channel;
143251877Speter    }
144251877Speter
145251877Speter    return -1;
146251877Speter}
147251877Speter
148251877Speterint serf_bucket_bwtp_frame_get_type(
149251877Speter    serf_bucket_t *bucket)
150251877Speter{
151251877Speter    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
152251877Speter        frame_context_t *ctx = bucket->data;
153251877Speter
154251877Speter        return ctx->type;
155251877Speter    }
156251877Speter    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
157251877Speter        incoming_context_t *ctx = bucket->data;
158251877Speter
159251877Speter        return ctx->type;
160251877Speter    }
161251877Speter
162251877Speter    return -1;
163251877Speter}
164251877Speter
165251877Speterconst char *serf_bucket_bwtp_frame_get_phrase(
166251877Speter    serf_bucket_t *bucket)
167251877Speter{
168251877Speter    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
169251877Speter        frame_context_t *ctx = bucket->data;
170251877Speter
171251877Speter        return ctx->phrase;
172251877Speter    }
173251877Speter    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
174251877Speter        incoming_context_t *ctx = bucket->data;
175251877Speter
176251877Speter        return ctx->phrase;
177251877Speter    }
178251877Speter
179251877Speter    return NULL;
180251877Speter}
181251877Speter
182251877Speterserf_bucket_t *serf_bucket_bwtp_frame_get_headers(
183251877Speter    serf_bucket_t *bucket)
184251877Speter{
185251877Speter    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
186251877Speter        frame_context_t *ctx = bucket->data;
187251877Speter
188251877Speter        return ctx->headers;
189251877Speter    }
190251877Speter    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
191251877Speter        incoming_context_t *ctx = bucket->data;
192251877Speter
193251877Speter        return ctx->headers;
194251877Speter    }
195251877Speter
196251877Speter    return NULL;
197251877Speter}
198251877Speter
199251877Speterstatic int count_size(void *baton, const char *key, const char *value)
200251877Speter{
201251877Speter    apr_size_t *c = baton;
202251877Speter    /* TODO Deal with folding.  Yikes. */
203251877Speter
204251877Speter    /* Add in ": " and CRLF - so an extra four bytes. */
205251877Speter    *c += strlen(key) + strlen(value) + 4;
206251877Speter
207251877Speter    return 0;
208251877Speter}
209251877Speter
210251877Speterstatic apr_size_t calc_header_size(serf_bucket_t *hdrs)
211251877Speter{
212251877Speter    apr_size_t size = 0;
213251877Speter
214251877Speter    serf_bucket_headers_do(hdrs, count_size, &size);
215251877Speter
216251877Speter    return size;
217251877Speter}
218251877Speter
219251877Speterstatic void serialize_data(serf_bucket_t *bucket)
220251877Speter{
221251877Speter    frame_context_t *ctx = bucket->data;
222251877Speter    serf_bucket_t *new_bucket;
223251877Speter    apr_size_t req_len;
224251877Speter
225251877Speter    /* Serialize the request-line and headers into one mother string,
226251877Speter     * and wrap a bucket around it.
227251877Speter     */
228251877Speter    req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
229251877Speter                           "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
230251877Speter                           (ctx->type ? "BWM" : "BWH"),
231251877Speter                           ctx->channel, calc_header_size(ctx->headers),
232251877Speter                           (ctx->open ? "OPEN " : ""),
233251877Speter                           ctx->phrase);
234251877Speter    new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
235251877Speter                                                bucket->allocator);
236251877Speter
237251877Speter    /* Build up the new bucket structure.
238251877Speter     *
239251877Speter     * Note that self needs to become an aggregate bucket so that a
240251877Speter     * pointer to self still represents the "right" data.
241251877Speter     */
242251877Speter    serf_bucket_aggregate_become(bucket);
243251877Speter
244251877Speter    /* Insert the two buckets. */
245251877Speter    serf_bucket_aggregate_append(bucket, new_bucket);
246251877Speter    serf_bucket_aggregate_append(bucket, ctx->headers);
247251877Speter
248251877Speter    /* Our private context is no longer needed, and is not referred to by
249251877Speter     * any existing bucket. Toss it.
250251877Speter     */
251251877Speter    serf_bucket_mem_free(bucket->allocator, ctx);
252251877Speter}
253251877Speter
254251877Speterstatic apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
255251877Speter                                         apr_size_t requested,
256251877Speter                                         const char **data, apr_size_t *len)
257251877Speter{
258251877Speter    /* Seralize our private data into a new aggregate bucket. */
259251877Speter    serialize_data(bucket);
260251877Speter
261251877Speter    /* Delegate to the "new" aggregate bucket to do the read. */
262251877Speter    return serf_bucket_read(bucket, requested, data, len);
263251877Speter}
264251877Speter
265251877Speterstatic apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
266251877Speter                                             int acceptable, int *found,
267251877Speter                                             const char **data, apr_size_t *len)
268251877Speter{
269251877Speter    /* Seralize our private data into a new aggregate bucket. */
270251877Speter    serialize_data(bucket);
271251877Speter
272251877Speter    /* Delegate to the "new" aggregate bucket to do the readline. */
273251877Speter    return serf_bucket_readline(bucket, acceptable, found, data, len);
274251877Speter}
275251877Speter
276251877Speterstatic apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
277251877Speter                                               apr_size_t requested,
278251877Speter                                               int vecs_size,
279251877Speter                                               struct iovec *vecs,
280251877Speter                                               int *vecs_used)
281251877Speter{
282251877Speter    /* Seralize our private data into a new aggregate bucket. */
283251877Speter    serialize_data(bucket);
284251877Speter
285251877Speter    /* Delegate to the "new" aggregate bucket to do the read. */
286251877Speter    return serf_bucket_read_iovec(bucket, requested,
287251877Speter                                  vecs_size, vecs, vecs_used);
288251877Speter}
289251877Speter
290251877Speterstatic apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
291251877Speter                                         const char **data,
292251877Speter                                         apr_size_t *len)
293251877Speter{
294251877Speter    /* Seralize our private data into a new aggregate bucket. */
295251877Speter    serialize_data(bucket);
296251877Speter
297251877Speter    /* Delegate to the "new" aggregate bucket to do the peek. */
298251877Speter    return serf_bucket_peek(bucket, data, len);
299251877Speter}
300251877Speter
301251877Speterconst serf_bucket_type_t serf_bucket_type_bwtp_frame = {
302251877Speter    "BWTP-FRAME",
303251877Speter    serf_bwtp_frame_read,
304251877Speter    serf_bwtp_frame_readline,
305251877Speter    serf_bwtp_frame_read_iovec,
306251877Speter    serf_default_read_for_sendfile,
307251877Speter    serf_default_read_bucket,
308251877Speter    serf_bwtp_frame_peek,
309251877Speter    serf_default_destroy_and_data,
310251877Speter};
311251877Speter
312251877Speter
313251877Speterserf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
314251877Speter    serf_bucket_t *stream,
315251877Speter    serf_bucket_alloc_t *allocator)
316251877Speter{
317251877Speter    incoming_context_t *ctx;
318251877Speter
319251877Speter    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
320251877Speter    ctx->stream = stream;
321251877Speter    ctx->body = NULL;
322251877Speter    ctx->headers = serf_bucket_headers_create(allocator);
323251877Speter    ctx->state = STATE_STATUS_LINE;
324251877Speter    ctx->length = 0;
325251877Speter    ctx->channel = -1;
326251877Speter    ctx->phrase = NULL;
327251877Speter
328251877Speter    serf_linebuf_init(&ctx->linebuf);
329251877Speter
330251877Speter    return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
331251877Speter}
332251877Speter
333251877Speterstatic void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
334251877Speter{
335251877Speter    incoming_context_t *ctx = bucket->data;
336251877Speter
337251877Speter    if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
338251877Speter        serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
339251877Speter    }
340251877Speter
341251877Speter    serf_bucket_destroy(ctx->stream);
342251877Speter    if (ctx->body != NULL)
343251877Speter        serf_bucket_destroy(ctx->body);
344251877Speter    serf_bucket_destroy(ctx->headers);
345251877Speter
346251877Speter    serf_default_destroy_and_data(bucket);
347251877Speter}
348251877Speter
349251877Speterstatic apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
350251877Speter{
351251877Speter    return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
352251877Speter}
353251877Speter
354251877Speterstatic apr_status_t parse_status_line(incoming_context_t *ctx,
355251877Speter                                      serf_bucket_alloc_t *allocator)
356251877Speter{
357251877Speter    int res;
358251877Speter    char *reason; /* ### stupid APR interface makes this non-const */
359251877Speter
360251877Speter    /* ctx->linebuf.line should be of form: BW* */
361251877Speter    res = apr_date_checkmask(ctx->linebuf.line, "BW*");
362251877Speter    if (!res) {
363251877Speter        /* Not an BWTP response?  Well, at least we won't understand it. */
364251877Speter        return APR_EGENERAL;
365251877Speter    }
366251877Speter
367251877Speter    if (ctx->linebuf.line[2] == 'H') {
368251877Speter        ctx->type = 0;
369251877Speter    }
370251877Speter    else if (ctx->linebuf.line[2] == 'M') {
371251877Speter        ctx->type = 1;
372251877Speter    }
373251877Speter    else {
374251877Speter        ctx->type = -1;
375251877Speter    }
376251877Speter
377251877Speter    ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
378251877Speter
379251877Speter    /* Skip leading spaces for the reason string. */
380251877Speter    if (apr_isspace(*reason)) {
381251877Speter        reason++;
382251877Speter    }
383251877Speter
384251877Speter    ctx->length = apr_strtoi64(reason, &reason, 16);
385251877Speter
386251877Speter    /* Skip leading spaces for the reason string. */
387251877Speter    if (reason - ctx->linebuf.line < ctx->linebuf.used) {
388251877Speter        if (apr_isspace(*reason)) {
389251877Speter            reason++;
390251877Speter        }
391251877Speter
392251877Speter        ctx->phrase = serf_bstrmemdup(allocator, reason,
393251877Speter                                      ctx->linebuf.used
394251877Speter                                      - (reason - ctx->linebuf.line));
395251877Speter    } else {
396251877Speter        ctx->phrase = NULL;
397251877Speter    }
398251877Speter
399251877Speter    return APR_SUCCESS;
400251877Speter}
401251877Speter
402251877Speter/* This code should be replaced with header buckets. */
403251877Speterstatic apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
404251877Speter{
405251877Speter    apr_status_t status;
406251877Speter
407251877Speter    /* RFC 2616 says that CRLF is the only line ending, but we can easily
408251877Speter     * accept any kind of line ending.
409251877Speter     */
410251877Speter    status = fetch_line(ctx, SERF_NEWLINE_ANY);
411251877Speter    if (SERF_BUCKET_READ_ERROR(status)) {
412251877Speter        return status;
413251877Speter    }
414251877Speter    /* Something was read. Process it. */
415251877Speter
416251877Speter    if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
417251877Speter        const char *end_key;
418251877Speter        const char *c;
419251877Speter
420251877Speter        end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
421251877Speter        if (!c) {
422251877Speter            /* Bad headers? */
423251877Speter            return APR_EGENERAL;
424251877Speter        }
425251877Speter
426251877Speter        /* Skip over initial : and spaces. */
427251877Speter        while (apr_isspace(*++c))
428251877Speter            continue;
429251877Speter
430251877Speter        /* Always copy the headers (from the linebuf into new mem). */
431251877Speter        /* ### we should be able to optimize some mem copies */
432251877Speter        serf_bucket_headers_setx(
433251877Speter            ctx->headers,
434251877Speter            ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
435251877Speter            c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
436251877Speter    }
437251877Speter
438251877Speter    return status;
439251877Speter}
440251877Speter
441251877Speter/* Perform one iteration of the state machine.
442251877Speter *
443251877Speter * Will return when one the following conditions occurred:
444251877Speter *  1) a state change
445251877Speter *  2) an error
446251877Speter *  3) the stream is not ready or at EOF
447251877Speter *  4) APR_SUCCESS, meaning the machine can be run again immediately
448251877Speter */
449251877Speterstatic apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
450251877Speter{
451251877Speter    apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
452251877Speter
453251877Speter    switch (ctx->state) {
454251877Speter    case STATE_STATUS_LINE:
455251877Speter        /* RFC 2616 says that CRLF is the only line ending, but we can easily
456251877Speter         * accept any kind of line ending.
457251877Speter         */
458251877Speter        status = fetch_line(ctx, SERF_NEWLINE_ANY);
459251877Speter        if (SERF_BUCKET_READ_ERROR(status))
460251877Speter            return status;
461251877Speter
462251877Speter        if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
463251877Speter            /* The Status-Line is in the line buffer. Process it. */
464251877Speter            status = parse_status_line(ctx, bkt->allocator);
465251877Speter            if (status)
466251877Speter                return status;
467251877Speter
468251877Speter            if (ctx->length) {
469251877Speter                ctx->body =
470251877Speter                    serf_bucket_barrier_create(ctx->stream, bkt->allocator);
471251877Speter                ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
472251877Speter                                                     bkt->allocator);
473251877Speter                if (!ctx->type) {
474251877Speter                    ctx->state = STATE_HEADERS;
475251877Speter                } else {
476251877Speter                    ctx->state = STATE_BODY;
477251877Speter                }
478251877Speter            } else {
479251877Speter                ctx->state = STATE_DONE;
480251877Speter            }
481251877Speter        }
482251877Speter        else {
483251877Speter            /* The connection closed before we could get the next
484251877Speter             * response.  Treat the request as lost so that our upper
485251877Speter             * end knows the server never tried to give us a response.
486251877Speter             */
487251877Speter            if (APR_STATUS_IS_EOF(status)) {
488251877Speter                return SERF_ERROR_REQUEST_LOST;
489251877Speter            }
490251877Speter        }
491251877Speter        break;
492251877Speter    case STATE_HEADERS:
493251877Speter        status = fetch_headers(ctx->body, ctx);
494251877Speter        if (SERF_BUCKET_READ_ERROR(status))
495251877Speter            return status;
496251877Speter
497251877Speter        /* If an empty line was read, then we hit the end of the headers.
498251877Speter         * Move on to the body.
499251877Speter         */
500251877Speter        if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
501251877Speter            /* Advance the state. */
502251877Speter            ctx->state = STATE_DONE;
503251877Speter        }
504251877Speter        break;
505251877Speter    case STATE_BODY:
506251877Speter        /* Don't do anything. */
507251877Speter        break;
508251877Speter    case STATE_DONE:
509251877Speter        return APR_EOF;
510251877Speter    default:
511251877Speter        /* Not reachable */
512251877Speter        return APR_EGENERAL;
513251877Speter    }
514251877Speter
515251877Speter    return status;
516251877Speter}
517251877Speter
518251877Speterstatic apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
519251877Speter{
520251877Speter    apr_status_t status;
521251877Speter
522251877Speter    /* Keep reading and moving through states if we aren't at the BODY */
523251877Speter    while (ctx->state != STATE_BODY) {
524251877Speter        status = run_machine(bkt, ctx);
525251877Speter
526251877Speter        /* Anything other than APR_SUCCESS means that we cannot immediately
527251877Speter         * read again (for now).
528251877Speter         */
529251877Speter        if (status)
530251877Speter            return status;
531251877Speter    }
532251877Speter    /* in STATE_BODY */
533251877Speter
534251877Speter    return APR_SUCCESS;
535251877Speter}
536251877Speter
537251877Speterapr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
538251877Speter    serf_bucket_t *bucket)
539251877Speter{
540251877Speter    incoming_context_t *ctx = bucket->data;
541251877Speter
542251877Speter    return wait_for_body(bucket, ctx);
543251877Speter}
544251877Speter
545251877Speterstatic apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
546251877Speter                                       apr_size_t requested,
547251877Speter                                       const char **data, apr_size_t *len)
548251877Speter{
549251877Speter    incoming_context_t *ctx = bucket->data;
550251877Speter    apr_status_t rv;
551251877Speter
552251877Speter    rv = wait_for_body(bucket, ctx);
553251877Speter    if (rv) {
554251877Speter        /* It's not possible to have read anything yet! */
555251877Speter        if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
556251877Speter            *len = 0;
557251877Speter        }
558251877Speter        return rv;
559251877Speter    }
560251877Speter
561251877Speter    rv = serf_bucket_read(ctx->body, requested, data, len);
562251877Speter    if (APR_STATUS_IS_EOF(rv)) {
563251877Speter        ctx->state = STATE_DONE;
564251877Speter    }
565251877Speter    return rv;
566251877Speter}
567251877Speter
568251877Speterstatic apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
569251877Speter                                           int acceptable, int *found,
570251877Speter                                           const char **data, apr_size_t *len)
571251877Speter{
572251877Speter    incoming_context_t *ctx = bucket->data;
573251877Speter    apr_status_t rv;
574251877Speter
575251877Speter    rv = wait_for_body(bucket, ctx);
576251877Speter    if (rv) {
577251877Speter        return rv;
578251877Speter    }
579251877Speter
580251877Speter    /* Delegate to the stream bucket to do the readline. */
581251877Speter    return serf_bucket_readline(ctx->body, acceptable, found, data, len);
582251877Speter}
583251877Speter
584251877Speter/* ### need to implement */
585251877Speter#define bwtp_incoming_peek NULL
586251877Speter
587251877Speterconst serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
588251877Speter    "BWTP-INCOMING",
589251877Speter    bwtp_incoming_read,
590251877Speter    bwtp_incoming_readline,
591251877Speter    serf_default_read_iovec,
592251877Speter    serf_default_read_for_sendfile,
593251877Speter    serf_default_read_bucket,
594251877Speter    bwtp_incoming_peek,
595251877Speter    bwtp_incoming_destroy_and_data,
596251877Speter};
597