chunk_buckets.c revision 362181
1/* ==================================================================== 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 * ==================================================================== 19 */ 20 21#include <apr_pools.h> 22#include <apr_strings.h> 23 24#include "serf.h" 25#include "serf_bucket_util.h" 26 27 28typedef struct { 29 enum { 30 STATE_FETCH, 31 STATE_CHUNK, 32 STATE_EOF 33 } state; 34 35 apr_status_t last_status; 36 37 serf_bucket_t *chunk; 38 serf_bucket_t *stream; 39 40 char chunk_hdr[20]; 41} chunk_context_t; 42 43 44serf_bucket_t *serf_bucket_chunk_create( 45 serf_bucket_t *stream, serf_bucket_alloc_t *allocator) 46{ 47 chunk_context_t *ctx; 48 49 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 50 ctx->state = STATE_FETCH; 51 ctx->chunk = serf_bucket_aggregate_create(allocator); 52 ctx->stream = stream; 53 54 return serf_bucket_create(&serf_bucket_type_chunk, allocator, ctx); 55} 56 57#define CRLF "\r\n" 58 59static apr_status_t create_chunk(serf_bucket_t *bucket) 60{ 61 chunk_context_t *ctx = bucket->data; 62 serf_bucket_t *simple_bkt; 63 apr_size_t chunk_len; 64 apr_size_t stream_len; 65 struct iovec vecs[66]; /* 64 + chunk trailer + EOF trailer = 66 */ 66 int vecs_read; 67 int i; 68 69 if (ctx->state != STATE_FETCH) { 70 return APR_SUCCESS; 71 } 72 73 ctx->last_status = 74 serf_bucket_read_iovec(ctx->stream, SERF_READ_ALL_AVAIL, 75 64, vecs, &vecs_read); 76 77 if (SERF_BUCKET_READ_ERROR(ctx->last_status)) { 78 /* Uh-oh. */ 79 return ctx->last_status; 80 } 81 82 /* Count the length of the data we read. */ 83 stream_len = 0; 84 for (i = 0; i < vecs_read; i++) { 85 stream_len += vecs[i].iov_len; 86 } 87 88 /* assert: stream_len in hex < sizeof(ctx->chunk_hdr) */ 89 90 /* Inserting a 0 byte chunk indicates a terminator, which already happens 91 * during the EOF handler below. Adding another one here will cause the 92 * EOF chunk to be interpreted by the server as a new request. So, 93 * we'll only do this if we have something to write. 94 */ 95 if (stream_len) { 96 /* Build the chunk header. */ 97 chunk_len = apr_snprintf(ctx->chunk_hdr, sizeof(ctx->chunk_hdr), 98 "%" APR_UINT64_T_HEX_FMT CRLF, 99 (apr_uint64_t)stream_len); 100 101 /* Create a copy of the chunk header so we can have multiple chunks 102 * in the pipeline at the same time. 103 */ 104 simple_bkt = serf_bucket_simple_copy_create(ctx->chunk_hdr, chunk_len, 105 bucket->allocator); 106 serf_bucket_aggregate_append(ctx->chunk, simple_bkt); 107 108 /* Insert the chunk footer. */ 109 vecs[vecs_read].iov_base = CRLF; 110 vecs[vecs_read++].iov_len = sizeof(CRLF) - 1; 111 } 112 113 /* We've reached the end of the line for the stream. */ 114 if (APR_STATUS_IS_EOF(ctx->last_status)) { 115 /* Insert the chunk footer. */ 116 vecs[vecs_read].iov_base = "0" CRLF CRLF; 117 vecs[vecs_read++].iov_len = sizeof("0" CRLF CRLF) - 1; 118 119 ctx->state = STATE_EOF; 120 } 121 else { 122 /* Okay, we can return data. */ 123 ctx->state = STATE_CHUNK; 124 } 125 126 serf_bucket_aggregate_append_iovec(ctx->chunk, vecs, vecs_read); 127 128 return APR_SUCCESS; 129} 130 131static apr_status_t serf_chunk_read(serf_bucket_t *bucket, 132 apr_size_t requested, 133 const char **data, apr_size_t *len) 134{ 135 chunk_context_t *ctx = bucket->data; 136 apr_status_t status; 137 138 /* Before proceeding, we need to fetch some data from the stream. */ 139 if (ctx->state == STATE_FETCH) { 140 status = create_chunk(bucket); 141 if (status) { 142 return status; 143 } 144 } 145 146 status = serf_bucket_read(ctx->chunk, requested, data, len); 147 148 /* Mask EOF from aggregate bucket. */ 149 if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) { 150 status = ctx->last_status; 151 ctx->state = STATE_FETCH; 152 } 153 154 return status; 155} 156 157static apr_status_t serf_chunk_readline(serf_bucket_t *bucket, 158 int acceptable, int *found, 159 const char **data, apr_size_t *len) 160{ 161 chunk_context_t *ctx = bucket->data; 162 apr_status_t status; 163 164 status = serf_bucket_readline(ctx->chunk, acceptable, found, data, len); 165 166 /* Mask EOF from aggregate bucket. */ 167 if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) { 168 status = APR_EAGAIN; 169 ctx->state = STATE_FETCH; 170 } 171 172 return status; 173} 174 175static apr_status_t serf_chunk_read_iovec(serf_bucket_t *bucket, 176 apr_size_t requested, 177 int vecs_size, 178 struct iovec *vecs, 179 int *vecs_used) 180{ 181 chunk_context_t *ctx = bucket->data; 182 apr_status_t status; 183 184 /* Before proceeding, we need to fetch some data from the stream. */ 185 if (ctx->state == STATE_FETCH) { 186 status = create_chunk(bucket); 187 if (status) { 188 return status; 189 } 190 } 191 192 status = serf_bucket_read_iovec(ctx->chunk, requested, vecs_size, vecs, 193 vecs_used); 194 195 /* Mask EOF from aggregate bucket. */ 196 if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) { 197 status = ctx->last_status; 198 ctx->state = STATE_FETCH; 199 } 200 201 return status; 202} 203 204static apr_status_t serf_chunk_peek(serf_bucket_t *bucket, 205 const char **data, 206 apr_size_t *len) 207{ 208 chunk_context_t *ctx = bucket->data; 209 apr_status_t status; 210 211 status = serf_bucket_peek(ctx->chunk, data, len); 212 213 /* Mask EOF from aggregate bucket. */ 214 if (APR_STATUS_IS_EOF(status) && ctx->state == STATE_CHUNK) { 215 status = APR_EAGAIN; 216 } 217 218 return status; 219} 220 221static void serf_chunk_destroy(serf_bucket_t *bucket) 222{ 223 chunk_context_t *ctx = bucket->data; 224 225 serf_bucket_destroy(ctx->stream); 226 serf_bucket_destroy(ctx->chunk); 227 228 serf_default_destroy_and_data(bucket); 229} 230 231const serf_bucket_type_t serf_bucket_type_chunk = { 232 "CHUNK", 233 serf_chunk_read, 234 serf_chunk_readline, 235 serf_chunk_read_iovec, 236 serf_default_read_for_sendfile, 237 serf_default_read_bucket, 238 serf_chunk_peek, 239 serf_chunk_destroy, 240}; 241