1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include <apr_pools.h>
17#include <apr_strings.h>
18#include <apr_lib.h>
19#include <apr_date.h>
20
21#include "serf.h"
22#include "serf_bucket_util.h"
23#include "serf_bucket_types.h"
24
25#include <stdlib.h>
26
27/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
28 * See:
29 *   http://bwtp.wikidot.com/
30 */
31
32typedef struct {
33    int channel;
34    int open;
35    int type; /* 0 = header, 1 = message */ /* TODO enum? */
36    const char *phrase;
37    serf_bucket_t *headers;
38
39    char req_line[1000];
40} frame_context_t;
41
42typedef struct {
43    serf_bucket_t *stream;
44    serf_bucket_t *body;        /* Pointer to the stream wrapping the body. */
45    serf_bucket_t *headers;     /* holds parsed headers */
46
47    enum {
48        STATE_STATUS_LINE,      /* reading status line */
49        STATE_HEADERS,          /* reading headers */
50        STATE_BODY,             /* reading body */
51        STATE_DONE              /* we've sent EOF */
52    } state;
53
54    /* Buffer for accumulating a line from the response. */
55    serf_linebuf_t linebuf;
56
57    int type; /* 0 = header, 1 = message */ /* TODO enum? */
58    int channel;
59    char *phrase;
60    apr_size_t length;
61} incoming_context_t;
62
63
64serf_bucket_t *serf_bucket_bwtp_channel_close(
65    int channel,
66    serf_bucket_alloc_t *allocator)
67{
68    frame_context_t *ctx;
69
70    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
71    ctx->type = 0;
72    ctx->open = 0;
73    ctx->channel = channel;
74    ctx->phrase = "CLOSED";
75    ctx->headers = serf_bucket_headers_create(allocator);
76
77    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
78}
79
80serf_bucket_t *serf_bucket_bwtp_channel_open(
81    int channel,
82    const char *uri,
83    serf_bucket_alloc_t *allocator)
84{
85    frame_context_t *ctx;
86
87    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
88    ctx->type = 0;
89    ctx->open = 1;
90    ctx->channel = channel;
91    ctx->phrase = uri;
92    ctx->headers = serf_bucket_headers_create(allocator);
93
94    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
95}
96
97serf_bucket_t *serf_bucket_bwtp_header_create(
98    int channel,
99    const char *phrase,
100    serf_bucket_alloc_t *allocator)
101{
102    frame_context_t *ctx;
103
104    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
105    ctx->type = 0;
106    ctx->open = 0;
107    ctx->channel = channel;
108    ctx->phrase = phrase;
109    ctx->headers = serf_bucket_headers_create(allocator);
110
111    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
112}
113
114serf_bucket_t *serf_bucket_bwtp_message_create(
115    int channel,
116    serf_bucket_t *body,
117    serf_bucket_alloc_t *allocator)
118{
119    frame_context_t *ctx;
120
121    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
122    ctx->type = 1;
123    ctx->open = 0;
124    ctx->channel = channel;
125    ctx->phrase = "MESSAGE";
126    ctx->headers = serf_bucket_headers_create(allocator);
127
128    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
129}
130
131int serf_bucket_bwtp_frame_get_channel(
132    serf_bucket_t *bucket)
133{
134    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
135        frame_context_t *ctx = bucket->data;
136
137        return ctx->channel;
138    }
139    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
140        incoming_context_t *ctx = bucket->data;
141
142        return ctx->channel;
143    }
144
145    return -1;
146}
147
148int serf_bucket_bwtp_frame_get_type(
149    serf_bucket_t *bucket)
150{
151    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
152        frame_context_t *ctx = bucket->data;
153
154        return ctx->type;
155    }
156    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
157        incoming_context_t *ctx = bucket->data;
158
159        return ctx->type;
160    }
161
162    return -1;
163}
164
165const char *serf_bucket_bwtp_frame_get_phrase(
166    serf_bucket_t *bucket)
167{
168    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
169        frame_context_t *ctx = bucket->data;
170
171        return ctx->phrase;
172    }
173    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
174        incoming_context_t *ctx = bucket->data;
175
176        return ctx->phrase;
177    }
178
179    return NULL;
180}
181
182serf_bucket_t *serf_bucket_bwtp_frame_get_headers(
183    serf_bucket_t *bucket)
184{
185    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
186        frame_context_t *ctx = bucket->data;
187
188        return ctx->headers;
189    }
190    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
191        incoming_context_t *ctx = bucket->data;
192
193        return ctx->headers;
194    }
195
196    return NULL;
197}
198
199static int count_size(void *baton, const char *key, const char *value)
200{
201    apr_size_t *c = baton;
202    /* TODO Deal with folding.  Yikes. */
203
204    /* Add in ": " and CRLF - so an extra four bytes. */
205    *c += strlen(key) + strlen(value) + 4;
206
207    return 0;
208}
209
210static apr_size_t calc_header_size(serf_bucket_t *hdrs)
211{
212    apr_size_t size = 0;
213
214    serf_bucket_headers_do(hdrs, count_size, &size);
215
216    return size;
217}
218
219static void serialize_data(serf_bucket_t *bucket)
220{
221    frame_context_t *ctx = bucket->data;
222    serf_bucket_t *new_bucket;
223    apr_size_t req_len;
224
225    /* Serialize the request-line and headers into one mother string,
226     * and wrap a bucket around it.
227     */
228    req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
229                           "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
230                           (ctx->type ? "BWM" : "BWH"),
231                           ctx->channel, calc_header_size(ctx->headers),
232                           (ctx->open ? "OPEN " : ""),
233                           ctx->phrase);
234    new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
235                                                bucket->allocator);
236
237    /* Build up the new bucket structure.
238     *
239     * Note that self needs to become an aggregate bucket so that a
240     * pointer to self still represents the "right" data.
241     */
242    serf_bucket_aggregate_become(bucket);
243
244    /* Insert the two buckets. */
245    serf_bucket_aggregate_append(bucket, new_bucket);
246    serf_bucket_aggregate_append(bucket, ctx->headers);
247
248    /* Our private context is no longer needed, and is not referred to by
249     * any existing bucket. Toss it.
250     */
251    serf_bucket_mem_free(bucket->allocator, ctx);
252}
253
254static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
255                                         apr_size_t requested,
256                                         const char **data, apr_size_t *len)
257{
258    /* Seralize our private data into a new aggregate bucket. */
259    serialize_data(bucket);
260
261    /* Delegate to the "new" aggregate bucket to do the read. */
262    return serf_bucket_read(bucket, requested, data, len);
263}
264
265static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
266                                             int acceptable, int *found,
267                                             const char **data, apr_size_t *len)
268{
269    /* Seralize our private data into a new aggregate bucket. */
270    serialize_data(bucket);
271
272    /* Delegate to the "new" aggregate bucket to do the readline. */
273    return serf_bucket_readline(bucket, acceptable, found, data, len);
274}
275
276static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
277                                               apr_size_t requested,
278                                               int vecs_size,
279                                               struct iovec *vecs,
280                                               int *vecs_used)
281{
282    /* Seralize our private data into a new aggregate bucket. */
283    serialize_data(bucket);
284
285    /* Delegate to the "new" aggregate bucket to do the read. */
286    return serf_bucket_read_iovec(bucket, requested,
287                                  vecs_size, vecs, vecs_used);
288}
289
290static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
291                                         const char **data,
292                                         apr_size_t *len)
293{
294    /* Seralize our private data into a new aggregate bucket. */
295    serialize_data(bucket);
296
297    /* Delegate to the "new" aggregate bucket to do the peek. */
298    return serf_bucket_peek(bucket, data, len);
299}
300
301const serf_bucket_type_t serf_bucket_type_bwtp_frame = {
302    "BWTP-FRAME",
303    serf_bwtp_frame_read,
304    serf_bwtp_frame_readline,
305    serf_bwtp_frame_read_iovec,
306    serf_default_read_for_sendfile,
307    serf_default_read_bucket,
308    serf_bwtp_frame_peek,
309    serf_default_destroy_and_data,
310};
311
312
313serf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
314    serf_bucket_t *stream,
315    serf_bucket_alloc_t *allocator)
316{
317    incoming_context_t *ctx;
318
319    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
320    ctx->stream = stream;
321    ctx->body = NULL;
322    ctx->headers = serf_bucket_headers_create(allocator);
323    ctx->state = STATE_STATUS_LINE;
324    ctx->length = 0;
325    ctx->channel = -1;
326    ctx->phrase = NULL;
327
328    serf_linebuf_init(&ctx->linebuf);
329
330    return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
331}
332
333static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
334{
335    incoming_context_t *ctx = bucket->data;
336
337    if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
338        serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
339    }
340
341    serf_bucket_destroy(ctx->stream);
342    if (ctx->body != NULL)
343        serf_bucket_destroy(ctx->body);
344    serf_bucket_destroy(ctx->headers);
345
346    serf_default_destroy_and_data(bucket);
347}
348
349static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
350{
351    return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
352}
353
354static apr_status_t parse_status_line(incoming_context_t *ctx,
355                                      serf_bucket_alloc_t *allocator)
356{
357    int res;
358    char *reason; /* ### stupid APR interface makes this non-const */
359
360    /* ctx->linebuf.line should be of form: BW* */
361    res = apr_date_checkmask(ctx->linebuf.line, "BW*");
362    if (!res) {
363        /* Not an BWTP response?  Well, at least we won't understand it. */
364        return APR_EGENERAL;
365    }
366
367    if (ctx->linebuf.line[2] == 'H') {
368        ctx->type = 0;
369    }
370    else if (ctx->linebuf.line[2] == 'M') {
371        ctx->type = 1;
372    }
373    else {
374        ctx->type = -1;
375    }
376
377    ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
378
379    /* Skip leading spaces for the reason string. */
380    if (apr_isspace(*reason)) {
381        reason++;
382    }
383
384    ctx->length = apr_strtoi64(reason, &reason, 16);
385
386    /* Skip leading spaces for the reason string. */
387    if (reason - ctx->linebuf.line < ctx->linebuf.used) {
388        if (apr_isspace(*reason)) {
389            reason++;
390        }
391
392        ctx->phrase = serf_bstrmemdup(allocator, reason,
393                                      ctx->linebuf.used
394                                      - (reason - ctx->linebuf.line));
395    } else {
396        ctx->phrase = NULL;
397    }
398
399    return APR_SUCCESS;
400}
401
402/* This code should be replaced with header buckets. */
403static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
404{
405    apr_status_t status;
406
407    /* RFC 2616 says that CRLF is the only line ending, but we can easily
408     * accept any kind of line ending.
409     */
410    status = fetch_line(ctx, SERF_NEWLINE_ANY);
411    if (SERF_BUCKET_READ_ERROR(status)) {
412        return status;
413    }
414    /* Something was read. Process it. */
415
416    if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
417        const char *end_key;
418        const char *c;
419
420        end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
421        if (!c) {
422            /* Bad headers? */
423            return APR_EGENERAL;
424        }
425
426        /* Skip over initial : and spaces. */
427        while (apr_isspace(*++c))
428            continue;
429
430        /* Always copy the headers (from the linebuf into new mem). */
431        /* ### we should be able to optimize some mem copies */
432        serf_bucket_headers_setx(
433            ctx->headers,
434            ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
435            c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
436    }
437
438    return status;
439}
440
441/* Perform one iteration of the state machine.
442 *
443 * Will return when one the following conditions occurred:
444 *  1) a state change
445 *  2) an error
446 *  3) the stream is not ready or at EOF
447 *  4) APR_SUCCESS, meaning the machine can be run again immediately
448 */
449static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
450{
451    apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
452
453    switch (ctx->state) {
454    case STATE_STATUS_LINE:
455        /* RFC 2616 says that CRLF is the only line ending, but we can easily
456         * accept any kind of line ending.
457         */
458        status = fetch_line(ctx, SERF_NEWLINE_ANY);
459        if (SERF_BUCKET_READ_ERROR(status))
460            return status;
461
462        if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
463            /* The Status-Line is in the line buffer. Process it. */
464            status = parse_status_line(ctx, bkt->allocator);
465            if (status)
466                return status;
467
468            if (ctx->length) {
469                ctx->body =
470                    serf_bucket_barrier_create(ctx->stream, bkt->allocator);
471                ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
472                                                     bkt->allocator);
473                if (!ctx->type) {
474                    ctx->state = STATE_HEADERS;
475                } else {
476                    ctx->state = STATE_BODY;
477                }
478            } else {
479                ctx->state = STATE_DONE;
480            }
481        }
482        else {
483            /* The connection closed before we could get the next
484             * response.  Treat the request as lost so that our upper
485             * end knows the server never tried to give us a response.
486             */
487            if (APR_STATUS_IS_EOF(status)) {
488                return SERF_ERROR_REQUEST_LOST;
489            }
490        }
491        break;
492    case STATE_HEADERS:
493        status = fetch_headers(ctx->body, ctx);
494        if (SERF_BUCKET_READ_ERROR(status))
495            return status;
496
497        /* If an empty line was read, then we hit the end of the headers.
498         * Move on to the body.
499         */
500        if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
501            /* Advance the state. */
502            ctx->state = STATE_DONE;
503        }
504        break;
505    case STATE_BODY:
506        /* Don't do anything. */
507        break;
508    case STATE_DONE:
509        return APR_EOF;
510    default:
511        /* Not reachable */
512        return APR_EGENERAL;
513    }
514
515    return status;
516}
517
518static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
519{
520    apr_status_t status;
521
522    /* Keep reading and moving through states if we aren't at the BODY */
523    while (ctx->state != STATE_BODY) {
524        status = run_machine(bkt, ctx);
525
526        /* Anything other than APR_SUCCESS means that we cannot immediately
527         * read again (for now).
528         */
529        if (status)
530            return status;
531    }
532    /* in STATE_BODY */
533
534    return APR_SUCCESS;
535}
536
537apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
538    serf_bucket_t *bucket)
539{
540    incoming_context_t *ctx = bucket->data;
541
542    return wait_for_body(bucket, ctx);
543}
544
545static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
546                                       apr_size_t requested,
547                                       const char **data, apr_size_t *len)
548{
549    incoming_context_t *ctx = bucket->data;
550    apr_status_t rv;
551
552    rv = wait_for_body(bucket, ctx);
553    if (rv) {
554        /* It's not possible to have read anything yet! */
555        if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
556            *len = 0;
557        }
558        return rv;
559    }
560
561    rv = serf_bucket_read(ctx->body, requested, data, len);
562    if (APR_STATUS_IS_EOF(rv)) {
563        ctx->state = STATE_DONE;
564    }
565    return rv;
566}
567
568static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
569                                           int acceptable, int *found,
570                                           const char **data, apr_size_t *len)
571{
572    incoming_context_t *ctx = bucket->data;
573    apr_status_t rv;
574
575    rv = wait_for_body(bucket, ctx);
576    if (rv) {
577        return rv;
578    }
579
580    /* Delegate to the stream bucket to do the readline. */
581    return serf_bucket_readline(ctx->body, acceptable, found, data, len);
582}
583
584/* ### need to implement */
585#define bwtp_incoming_peek NULL
586
587const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
588    "BWTP-INCOMING",
589    bwtp_incoming_read,
590    bwtp_incoming_readline,
591    serf_default_read_iovec,
592    serf_default_read_for_sendfile,
593    serf_default_read_bucket,
594    bwtp_incoming_peek,
595    bwtp_incoming_destroy_and_data,
596};
597