chunk_buckets.c revision 362181
1/* ====================================================================
2 *    Licensed to the Apache Software Foundation (ASF) under one
3 *    or more contributor license agreements.  See the NOTICE file
4 *    distributed with this work for additional information
5 *    regarding copyright ownership.  The ASF licenses this file
6 *    to you under the Apache License, Version 2.0 (the
7 *    "License"); you may not use this file except in compliance
8 *    with the License.  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *    Unless required by applicable law or agreed to in writing,
13 *    software distributed under the License is distributed on an
14 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 *    KIND, either express or implied.  See the License for the
16 *    specific language governing permissions and limitations
17 *    under the License.
18 * ====================================================================
19 */
20
21#include <apr_pools.h>
22#include <apr_strings.h>
23
24#include "serf.h"
25#include "serf_bucket_util.h"
26
27
28typedef struct {
29    enum {
30        STATE_FETCH,
31        STATE_CHUNK,
32        STATE_EOF
33    } state;
34
35    apr_status_t last_status;
36
37    serf_bucket_t *chunk;
38    serf_bucket_t *stream;
39
40    char chunk_hdr[20];
41} chunk_context_t;
42
43
44serf_bucket_t *serf_bucket_chunk_create(
45    serf_bucket_t *stream, serf_bucket_alloc_t *allocator)
46{
47    chunk_context_t *ctx;
48
49    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
50    ctx->state = STATE_FETCH;
51    ctx->chunk = serf_bucket_aggregate_create(allocator);
52    ctx->stream = stream;
53
54    return serf_bucket_create(&serf_bucket_type_chunk, allocator, ctx);
55}
56
57#define CRLF "\r\n"
58
59static apr_status_t create_chunk(serf_bucket_t *bucket)
60{
61    chunk_context_t *ctx = bucket->data;
62    serf_bucket_t *simple_bkt;
63    apr_size_t chunk_len;
64    apr_size_t stream_len;
65    struct iovec vecs[66]; /* 64 + chunk trailer + EOF trailer = 66 */
66    int vecs_read;
67    int i;
68
69    if (ctx->state != STATE_FETCH) {
70        return APR_SUCCESS;
71    }
72
73    ctx->last_status =
74        serf_bucket_read_iovec(ctx->stream, SERF_READ_ALL_AVAIL,
75                               64, vecs, &vecs_read);
76
77    if (SERF_BUCKET_READ_ERROR(ctx->last_status)) {
78        /* Uh-oh. */
79        return ctx->last_status;
80    }
81
82    /* Count the length of the data we read. */
83    stream_len = 0;
84    for (i = 0; i < vecs_read; i++) {
85        stream_len += vecs[i].iov_len;
86    }
87
88    /* assert: stream_len in hex < sizeof(ctx->chunk_hdr) */
89
90    /* Inserting a 0 byte chunk indicates a terminator, which already happens
91     * during the EOF handler below.  Adding another one here will cause the
92     * EOF chunk to be interpreted by the server as a new request.  So,
93     * we'll only do this if we have something to write.
94     */
95    if (stream_len) {
96        /* Build the chunk header. */
97        chunk_len = apr_snprintf(ctx->chunk_hdr, sizeof(ctx->chunk_hdr),
98                                 "%" APR_UINT64_T_HEX_FMT CRLF,
99                                 (apr_uint64_t)stream_len);
100
101        /* Create a copy of the chunk header so we can have multiple chunks
102         * in the pipeline at the same time.
103         */
104        simple_bkt = serf_bucket_simple_copy_create(ctx->chunk_hdr, chunk_len,
105                                                    bucket->allocator);
106        serf_bucket_aggregate_append(ctx->chunk, simple_bkt);
107
108        /* Insert the chunk footer. */
109        vecs[vecs_read].iov_base = CRLF;
110        vecs[vecs_read++].iov_len = sizeof(CRLF) - 1;
111    }
112
113    /* We've reached the end of the line for the stream. */
114    if (APR_STATUS_IS_EOF(ctx->last_status)) {
115        /* Insert the chunk footer. */
116        vecs[vecs_read].iov_base = "0" CRLF CRLF;
117        vecs[vecs_read++].iov_len = sizeof("0" CRLF CRLF) - 1;
118
119        ctx->state = STATE_EOF;
120    }
121    else {
122        /* Okay, we can return data.  */
123        ctx->state = STATE_CHUNK;
124    }
125
126    serf_bucket_aggregate_append_iovec(ctx->chunk, vecs, vecs_read);
127
128    return APR_SUCCESS;
129}
130
131static apr_status_t serf_chunk_read(serf_bucket_t *bucket,
132                                    apr_size_t requested,
133                                    const char **data, apr_size_t *len)
134{
135    chunk_context_t *ctx = bucket->data;
136    apr_status_t status;
137
138    /* Before proceeding, we need to fetch some data from the stream. */
139    if (ctx->state == STATE_FETCH) {
140        status = create_chunk(bucket);
141        if (status) {
142            return status;
143        }
144    }
145
146    status = serf_bucket_read(ctx->chunk, requested, data, len);
147
148    /* Mask EOF from aggregate bucket. */
149    if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
150        status = ctx->last_status;
151        ctx->state = STATE_FETCH;
152    }
153
154    return status;
155}
156
157static apr_status_t serf_chunk_readline(serf_bucket_t *bucket,
158                                         int acceptable, int *found,
159                                         const char **data, apr_size_t *len)
160{
161    chunk_context_t *ctx = bucket->data;
162    apr_status_t status;
163
164    status = serf_bucket_readline(ctx->chunk, acceptable, found, data, len);
165
166    /* Mask EOF from aggregate bucket. */
167    if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
168        status = APR_EAGAIN;
169        ctx->state = STATE_FETCH;
170    }
171
172    return status;
173}
174
175static apr_status_t serf_chunk_read_iovec(serf_bucket_t *bucket,
176                                          apr_size_t requested,
177                                          int vecs_size,
178                                          struct iovec *vecs,
179                                          int *vecs_used)
180{
181    chunk_context_t *ctx = bucket->data;
182    apr_status_t status;
183
184    /* Before proceeding, we need to fetch some data from the stream. */
185    if (ctx->state == STATE_FETCH) {
186        status = create_chunk(bucket);
187        if (status) {
188            return status;
189        }
190    }
191
192    status = serf_bucket_read_iovec(ctx->chunk, requested, vecs_size, vecs,
193                                    vecs_used);
194
195    /* Mask EOF from aggregate bucket. */
196    if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
197        status = ctx->last_status;
198        ctx->state = STATE_FETCH;
199    }
200
201    return status;
202}
203
204static apr_status_t serf_chunk_peek(serf_bucket_t *bucket,
205                                     const char **data,
206                                     apr_size_t *len)
207{
208    chunk_context_t *ctx = bucket->data;
209    apr_status_t status;
210
211    status = serf_bucket_peek(ctx->chunk, data, len);
212
213    /* Mask EOF from aggregate bucket. */
214    if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) {
215        status = APR_EAGAIN;
216    }
217
218    return status;
219}
220
221static void serf_chunk_destroy(serf_bucket_t *bucket)
222{
223    chunk_context_t *ctx = bucket->data;
224
225    serf_bucket_destroy(ctx->stream);
226    serf_bucket_destroy(ctx->chunk);
227
228    serf_default_destroy_and_data(bucket);
229}
230
231const serf_bucket_type_t serf_bucket_type_chunk = {
232    "CHUNK",
233    serf_chunk_read,
234    serf_chunk_readline,
235    serf_chunk_read_iovec,
236    serf_default_read_for_sendfile,
237    serf_default_read_bucket,
238    serf_chunk_peek,
239    serf_chunk_destroy,
240};
241