1/* ====================================================================
2 *    Licensed to the Apache Software Foundation (ASF) under one
3 *    or more contributor license agreements.  See the NOTICE file
4 *    distributed with this work for additional information
5 *    regarding copyright ownership.  The ASF licenses this file
6 *    to you under the Apache License, Version 2.0 (the
7 *    "License"); you may not use this file except in compliance
8 *    with the License.  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *    Unless required by applicable law or agreed to in writing,
13 *    software distributed under the License is distributed on an
14 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 *    KIND, either express or implied.  See the License for the
16 *    specific language governing permissions and limitations
17 *    under the License.
18 * ====================================================================
19 */
20
21#include <apr_pools.h>
22#include <apr_strings.h>
23#include <apr_lib.h>
24#include <apr_date.h>
25
26#include "serf.h"
27#include "serf_bucket_util.h"
28#include "serf_bucket_types.h"
29
30#include <stdlib.h>
31
32/* This is an implementation of Bidirectional Web Transfer Protocol (BWTP)
33 * See:
34 *   http://bwtp.wikidot.com/
35 */
36
37typedef struct {
38    int channel;
39    int open;
40    int type; /* 0 = header, 1 = message */ /* TODO enum? */
41    const char *phrase;
42    serf_bucket_t *headers;
43
44    char req_line[1000];
45} frame_context_t;
46
47typedef struct {
48    serf_bucket_t *stream;
49    serf_bucket_t *body;        /* Pointer to the stream wrapping the body. */
50    serf_bucket_t *headers;     /* holds parsed headers */
51
52    enum {
53        STATE_STATUS_LINE,      /* reading status line */
54        STATE_HEADERS,          /* reading headers */
55        STATE_BODY,             /* reading body */
56        STATE_DONE              /* we've sent EOF */
57    } state;
58
59    /* Buffer for accumulating a line from the response. */
60    serf_linebuf_t linebuf;
61
62    int type; /* 0 = header, 1 = message */ /* TODO enum? */
63    int channel;
64    char *phrase;
65    apr_size_t length;
66} incoming_context_t;
67
68
69serf_bucket_t *serf_bucket_bwtp_channel_close(
70    int channel,
71    serf_bucket_alloc_t *allocator)
72{
73    frame_context_t *ctx;
74
75    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
76    ctx->type = 0;
77    ctx->open = 0;
78    ctx->channel = channel;
79    ctx->phrase = "CLOSED";
80    ctx->headers = serf_bucket_headers_create(allocator);
81
82    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
83}
84
85serf_bucket_t *serf_bucket_bwtp_channel_open(
86    int channel,
87    const char *uri,
88    serf_bucket_alloc_t *allocator)
89{
90    frame_context_t *ctx;
91
92    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
93    ctx->type = 0;
94    ctx->open = 1;
95    ctx->channel = channel;
96    ctx->phrase = uri;
97    ctx->headers = serf_bucket_headers_create(allocator);
98
99    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
100}
101
102serf_bucket_t *serf_bucket_bwtp_header_create(
103    int channel,
104    const char *phrase,
105    serf_bucket_alloc_t *allocator)
106{
107    frame_context_t *ctx;
108
109    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
110    ctx->type = 0;
111    ctx->open = 0;
112    ctx->channel = channel;
113    ctx->phrase = phrase;
114    ctx->headers = serf_bucket_headers_create(allocator);
115
116    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
117}
118
119serf_bucket_t *serf_bucket_bwtp_message_create(
120    int channel,
121    serf_bucket_t *body,
122    serf_bucket_alloc_t *allocator)
123{
124    frame_context_t *ctx;
125
126    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
127    ctx->type = 1;
128    ctx->open = 0;
129    ctx->channel = channel;
130    ctx->phrase = "MESSAGE";
131    ctx->headers = serf_bucket_headers_create(allocator);
132
133    return serf_bucket_create(&serf_bucket_type_bwtp_frame, allocator, ctx);
134}
135
136int serf_bucket_bwtp_frame_get_channel(
137    serf_bucket_t *bucket)
138{
139    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
140        frame_context_t *ctx = bucket->data;
141
142        return ctx->channel;
143    }
144    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
145        incoming_context_t *ctx = bucket->data;
146
147        return ctx->channel;
148    }
149
150    return -1;
151}
152
153int serf_bucket_bwtp_frame_get_type(
154    serf_bucket_t *bucket)
155{
156    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
157        frame_context_t *ctx = bucket->data;
158
159        return ctx->type;
160    }
161    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
162        incoming_context_t *ctx = bucket->data;
163
164        return ctx->type;
165    }
166
167    return -1;
168}
169
170const char *serf_bucket_bwtp_frame_get_phrase(
171    serf_bucket_t *bucket)
172{
173    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
174        frame_context_t *ctx = bucket->data;
175
176        return ctx->phrase;
177    }
178    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
179        incoming_context_t *ctx = bucket->data;
180
181        return ctx->phrase;
182    }
183
184    return NULL;
185}
186
187serf_bucket_t *serf_bucket_bwtp_frame_get_headers(
188    serf_bucket_t *bucket)
189{
190    if (SERF_BUCKET_IS_BWTP_FRAME(bucket)) {
191        frame_context_t *ctx = bucket->data;
192
193        return ctx->headers;
194    }
195    else if (SERF_BUCKET_IS_BWTP_INCOMING_FRAME(bucket)) {
196        incoming_context_t *ctx = bucket->data;
197
198        return ctx->headers;
199    }
200
201    return NULL;
202}
203
204static int count_size(void *baton, const char *key, const char *value)
205{
206    apr_size_t *c = baton;
207    /* TODO Deal with folding.  Yikes. */
208
209    /* Add in ": " and CRLF - so an extra four bytes. */
210    *c += strlen(key) + strlen(value) + 4;
211
212    return 0;
213}
214
215static apr_size_t calc_header_size(serf_bucket_t *hdrs)
216{
217    apr_size_t size = 0;
218
219    serf_bucket_headers_do(hdrs, count_size, &size);
220
221    return size;
222}
223
224static void serialize_data(serf_bucket_t *bucket)
225{
226    frame_context_t *ctx = bucket->data;
227    serf_bucket_t *new_bucket;
228    apr_size_t req_len;
229
230    /* Serialize the request-line and headers into one mother string,
231     * and wrap a bucket around it.
232     */
233    req_len = apr_snprintf(ctx->req_line, sizeof(ctx->req_line),
234                           "%s %d " "%" APR_UINT64_T_HEX_FMT " %s%s\r\n",
235                           (ctx->type ? "BWM" : "BWH"),
236                           ctx->channel, calc_header_size(ctx->headers),
237                           (ctx->open ? "OPEN " : ""),
238                           ctx->phrase);
239    new_bucket = serf_bucket_simple_copy_create(ctx->req_line, req_len,
240                                                bucket->allocator);
241
242    /* Build up the new bucket structure.
243     *
244     * Note that self needs to become an aggregate bucket so that a
245     * pointer to self still represents the "right" data.
246     */
247    serf_bucket_aggregate_become(bucket);
248
249    /* Insert the two buckets. */
250    serf_bucket_aggregate_append(bucket, new_bucket);
251    serf_bucket_aggregate_append(bucket, ctx->headers);
252
253    /* Our private context is no longer needed, and is not referred to by
254     * any existing bucket. Toss it.
255     */
256    serf_bucket_mem_free(bucket->allocator, ctx);
257}
258
259static apr_status_t serf_bwtp_frame_read(serf_bucket_t *bucket,
260                                         apr_size_t requested,
261                                         const char **data, apr_size_t *len)
262{
263    /* Seralize our private data into a new aggregate bucket. */
264    serialize_data(bucket);
265
266    /* Delegate to the "new" aggregate bucket to do the read. */
267    return serf_bucket_read(bucket, requested, data, len);
268}
269
270static apr_status_t serf_bwtp_frame_readline(serf_bucket_t *bucket,
271                                             int acceptable, int *found,
272                                             const char **data, apr_size_t *len)
273{
274    /* Seralize our private data into a new aggregate bucket. */
275    serialize_data(bucket);
276
277    /* Delegate to the "new" aggregate bucket to do the readline. */
278    return serf_bucket_readline(bucket, acceptable, found, data, len);
279}
280
281static apr_status_t serf_bwtp_frame_read_iovec(serf_bucket_t *bucket,
282                                               apr_size_t requested,
283                                               int vecs_size,
284                                               struct iovec *vecs,
285                                               int *vecs_used)
286{
287    /* Seralize our private data into a new aggregate bucket. */
288    serialize_data(bucket);
289
290    /* Delegate to the "new" aggregate bucket to do the read. */
291    return serf_bucket_read_iovec(bucket, requested,
292                                  vecs_size, vecs, vecs_used);
293}
294
295static apr_status_t serf_bwtp_frame_peek(serf_bucket_t *bucket,
296                                         const char **data,
297                                         apr_size_t *len)
298{
299    /* Seralize our private data into a new aggregate bucket. */
300    serialize_data(bucket);
301
302    /* Delegate to the "new" aggregate bucket to do the peek. */
303    return serf_bucket_peek(bucket, data, len);
304}
305
306const serf_bucket_type_t serf_bucket_type_bwtp_frame = {
307    "BWTP-FRAME",
308    serf_bwtp_frame_read,
309    serf_bwtp_frame_readline,
310    serf_bwtp_frame_read_iovec,
311    serf_default_read_for_sendfile,
312    serf_default_read_bucket,
313    serf_bwtp_frame_peek,
314    serf_default_destroy_and_data,
315};
316
317
318serf_bucket_t *serf_bucket_bwtp_incoming_frame_create(
319    serf_bucket_t *stream,
320    serf_bucket_alloc_t *allocator)
321{
322    incoming_context_t *ctx;
323
324    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
325    ctx->stream = stream;
326    ctx->body = NULL;
327    ctx->headers = serf_bucket_headers_create(allocator);
328    ctx->state = STATE_STATUS_LINE;
329    ctx->length = 0;
330    ctx->channel = -1;
331    ctx->phrase = NULL;
332
333    serf_linebuf_init(&ctx->linebuf);
334
335    return serf_bucket_create(&serf_bucket_type_bwtp_incoming_frame, allocator, ctx);
336}
337
338static void bwtp_incoming_destroy_and_data(serf_bucket_t *bucket)
339{
340    incoming_context_t *ctx = bucket->data;
341
342    if (ctx->state != STATE_STATUS_LINE && ctx->phrase) {
343        serf_bucket_mem_free(bucket->allocator, (void*)ctx->phrase);
344    }
345
346    serf_bucket_destroy(ctx->stream);
347    if (ctx->body != NULL)
348        serf_bucket_destroy(ctx->body);
349    serf_bucket_destroy(ctx->headers);
350
351    serf_default_destroy_and_data(bucket);
352}
353
354static apr_status_t fetch_line(incoming_context_t *ctx, int acceptable)
355{
356    return serf_linebuf_fetch(&ctx->linebuf, ctx->stream, acceptable);
357}
358
359static apr_status_t parse_status_line(incoming_context_t *ctx,
360                                      serf_bucket_alloc_t *allocator)
361{
362    int res;
363    char *reason; /* ### stupid APR interface makes this non-const */
364
365    /* ctx->linebuf.line should be of form: BW* */
366    res = apr_date_checkmask(ctx->linebuf.line, "BW*");
367    if (!res) {
368        /* Not an BWTP response?  Well, at least we won't understand it. */
369        return APR_EGENERAL;
370    }
371
372    if (ctx->linebuf.line[2] == 'H') {
373        ctx->type = 0;
374    }
375    else if (ctx->linebuf.line[2] == 'M') {
376        ctx->type = 1;
377    }
378    else {
379        ctx->type = -1;
380    }
381
382    ctx->channel = apr_strtoi64(ctx->linebuf.line + 3, &reason, 16);
383
384    /* Skip leading spaces for the reason string. */
385    if (apr_isspace(*reason)) {
386        reason++;
387    }
388
389    ctx->length = apr_strtoi64(reason, &reason, 16);
390
391    /* Skip leading spaces for the reason string. */
392    if (reason - ctx->linebuf.line < ctx->linebuf.used) {
393        if (apr_isspace(*reason)) {
394            reason++;
395        }
396
397        ctx->phrase = serf_bstrmemdup(allocator, reason,
398                                      ctx->linebuf.used
399                                      - (reason - ctx->linebuf.line));
400    } else {
401        ctx->phrase = NULL;
402    }
403
404    return APR_SUCCESS;
405}
406
407/* This code should be replaced with header buckets. */
408static apr_status_t fetch_headers(serf_bucket_t *bkt, incoming_context_t *ctx)
409{
410    apr_status_t status;
411
412    /* RFC 2616 says that CRLF is the only line ending, but we can easily
413     * accept any kind of line ending.
414     */
415    status = fetch_line(ctx, SERF_NEWLINE_ANY);
416    if (SERF_BUCKET_READ_ERROR(status)) {
417        return status;
418    }
419    /* Something was read. Process it. */
420
421    if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
422        const char *end_key;
423        const char *c;
424
425        end_key = c = memchr(ctx->linebuf.line, ':', ctx->linebuf.used);
426        if (!c) {
427            /* Bad headers? */
428            return APR_EGENERAL;
429        }
430
431        /* Skip over initial : and spaces. */
432        while (apr_isspace(*++c))
433            continue;
434
435        /* Always copy the headers (from the linebuf into new mem). */
436        /* ### we should be able to optimize some mem copies */
437        serf_bucket_headers_setx(
438            ctx->headers,
439            ctx->linebuf.line, end_key - ctx->linebuf.line, 1,
440            c, ctx->linebuf.line + ctx->linebuf.used - c, 1);
441    }
442
443    return status;
444}
445
446/* Perform one iteration of the state machine.
447 *
448 * Will return when one the following conditions occurred:
449 *  1) a state change
450 *  2) an error
451 *  3) the stream is not ready or at EOF
452 *  4) APR_SUCCESS, meaning the machine can be run again immediately
453 */
454static apr_status_t run_machine(serf_bucket_t *bkt, incoming_context_t *ctx)
455{
456    apr_status_t status = APR_SUCCESS; /* initialize to avoid gcc warnings */
457
458    switch (ctx->state) {
459    case STATE_STATUS_LINE:
460        /* RFC 2616 says that CRLF is the only line ending, but we can easily
461         * accept any kind of line ending.
462         */
463        status = fetch_line(ctx, SERF_NEWLINE_ANY);
464        if (SERF_BUCKET_READ_ERROR(status))
465            return status;
466
467        if (ctx->linebuf.state == SERF_LINEBUF_READY && ctx->linebuf.used) {
468            /* The Status-Line is in the line buffer. Process it. */
469            status = parse_status_line(ctx, bkt->allocator);
470            if (status)
471                return status;
472
473            if (ctx->length) {
474                ctx->body =
475                    serf_bucket_barrier_create(ctx->stream, bkt->allocator);
476                ctx->body = serf_bucket_limit_create(ctx->body, ctx->length,
477                                                     bkt->allocator);
478                if (!ctx->type) {
479                    ctx->state = STATE_HEADERS;
480                } else {
481                    ctx->state = STATE_BODY;
482                }
483            } else {
484                ctx->state = STATE_DONE;
485            }
486        }
487        else {
488            /* The connection closed before we could get the next
489             * response.  Treat the request as lost so that our upper
490             * end knows the server never tried to give us a response.
491             */
492            if (APR_STATUS_IS_EOF(status)) {
493                return SERF_ERROR_REQUEST_LOST;
494            }
495        }
496        break;
497    case STATE_HEADERS:
498        status = fetch_headers(ctx->body, ctx);
499        if (SERF_BUCKET_READ_ERROR(status))
500            return status;
501
502        /* If an empty line was read, then we hit the end of the headers.
503         * Move on to the body.
504         */
505        if (ctx->linebuf.state == SERF_LINEBUF_READY && !ctx->linebuf.used) {
506            /* Advance the state. */
507            ctx->state = STATE_DONE;
508        }
509        break;
510    case STATE_BODY:
511        /* Don't do anything. */
512        break;
513    case STATE_DONE:
514        return APR_EOF;
515    default:
516        /* Not reachable */
517        return APR_EGENERAL;
518    }
519
520    return status;
521}
522
523static apr_status_t wait_for_body(serf_bucket_t *bkt, incoming_context_t *ctx)
524{
525    apr_status_t status;
526
527    /* Keep reading and moving through states if we aren't at the BODY */
528    while (ctx->state != STATE_BODY) {
529        status = run_machine(bkt, ctx);
530
531        /* Anything other than APR_SUCCESS means that we cannot immediately
532         * read again (for now).
533         */
534        if (status)
535            return status;
536    }
537    /* in STATE_BODY */
538
539    return APR_SUCCESS;
540}
541
542apr_status_t serf_bucket_bwtp_incoming_frame_wait_for_headers(
543    serf_bucket_t *bucket)
544{
545    incoming_context_t *ctx = bucket->data;
546
547    return wait_for_body(bucket, ctx);
548}
549
550static apr_status_t bwtp_incoming_read(serf_bucket_t *bucket,
551                                       apr_size_t requested,
552                                       const char **data, apr_size_t *len)
553{
554    incoming_context_t *ctx = bucket->data;
555    apr_status_t rv;
556
557    rv = wait_for_body(bucket, ctx);
558    if (rv) {
559        /* It's not possible to have read anything yet! */
560        if (APR_STATUS_IS_EOF(rv) || APR_STATUS_IS_EAGAIN(rv)) {
561            *len = 0;
562        }
563        return rv;
564    }
565
566    rv = serf_bucket_read(ctx->body, requested, data, len);
567    if (APR_STATUS_IS_EOF(rv)) {
568        ctx->state = STATE_DONE;
569    }
570    return rv;
571}
572
573static apr_status_t bwtp_incoming_readline(serf_bucket_t *bucket,
574                                           int acceptable, int *found,
575                                           const char **data, apr_size_t *len)
576{
577    incoming_context_t *ctx = bucket->data;
578    apr_status_t rv;
579
580    rv = wait_for_body(bucket, ctx);
581    if (rv) {
582        return rv;
583    }
584
585    /* Delegate to the stream bucket to do the readline. */
586    return serf_bucket_readline(ctx->body, acceptable, found, data, len);
587}
588
589/* ### need to implement */
590#define bwtp_incoming_peek NULL
591
592const serf_bucket_type_t serf_bucket_type_bwtp_incoming_frame = {
593    "BWTP-INCOMING",
594    bwtp_incoming_read,
595    bwtp_incoming_readline,
596    serf_default_read_iovec,
597    serf_default_read_for_sendfile,
598    serf_default_read_bucket,
599    bwtp_incoming_peek,
600    bwtp_incoming_destroy_and_data,
601};
602