1/* ==================================================================== 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 * ==================================================================== 19 */ 20 21#include "serf.h" 22#include "serf_bucket_util.h" 23 24 25/* Should be an APR_RING? */ 26typedef struct bucket_list { 27 serf_bucket_t *bucket; 28 struct bucket_list *next; 29} bucket_list_t; 30 31typedef struct { 32 bucket_list_t *list; /* active buckets */ 33 bucket_list_t *last; /* last bucket of the list */ 34 bucket_list_t *done; /* we finished reading this; now pending a destroy */ 35 36 serf_bucket_aggregate_eof_t hold_open; 37 void *hold_open_baton; 38 39 /* Does this bucket own its children? !0 if yes, 0 if not. */ 40 int bucket_owner; 41} aggregate_context_t; 42 43 44static void cleanup_aggregate(aggregate_context_t *ctx, 45 serf_bucket_alloc_t *allocator) 46{ 47 bucket_list_t *next_list; 48 49 /* If we finished reading a bucket during the previous read, then 50 * we can now toss that bucket. 51 */ 52 while (ctx->done != NULL) { 53 next_list = ctx->done->next; 54 55 if (ctx->bucket_owner) { 56 serf_bucket_destroy(ctx->done->bucket); 57 } 58 serf_bucket_mem_free(allocator, ctx->done); 59 60 ctx->done = next_list; 61 } 62} 63 64void serf_bucket_aggregate_cleanup( 65 serf_bucket_t *bucket, serf_bucket_alloc_t *allocator) 66{ 67 aggregate_context_t *ctx = bucket->data; 68 69 cleanup_aggregate(ctx, allocator); 70} 71 72static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator) 73{ 74 aggregate_context_t *ctx; 75 76 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 77 78 ctx->list = NULL; 79 ctx->last = NULL; 80 ctx->done = NULL; 81 ctx->hold_open = NULL; 82 ctx->hold_open_baton = NULL; 83 ctx->bucket_owner = 1; 84 85 return ctx; 86} 87 88serf_bucket_t *serf_bucket_aggregate_create( 89 serf_bucket_alloc_t *allocator) 90{ 91 aggregate_context_t *ctx; 92 93 ctx = create_aggregate(allocator); 94 95 return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx); 96} 97 98serf_bucket_t *serf__bucket_stream_create( 99 serf_bucket_alloc_t *allocator, 100 serf_bucket_aggregate_eof_t fn, 101 void *baton) 102{ 103 serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator); 104 aggregate_context_t *ctx = bucket->data; 105 106 serf_bucket_aggregate_hold_open(bucket, fn, baton); 107 108 ctx->bucket_owner = 0; 109 110 return bucket; 111} 112 113 114static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket) 115{ 116 aggregate_context_t *ctx = bucket->data; 117 bucket_list_t *next_ctx; 118 119 while (ctx->list) { 120 if (ctx->bucket_owner) { 121 serf_bucket_destroy(ctx->list->bucket); 122 } 123 next_ctx = ctx->list->next; 124 serf_bucket_mem_free(bucket->allocator, ctx->list); 125 ctx->list = next_ctx; 126 } 127 cleanup_aggregate(ctx, bucket->allocator); 128 129 serf_default_destroy_and_data(bucket); 130} 131 132void serf_bucket_aggregate_become(serf_bucket_t *bucket) 133{ 134 aggregate_context_t *ctx; 135 136 ctx = create_aggregate(bucket->allocator); 137 138 bucket->type = &serf_bucket_type_aggregate; 139 bucket->data = ctx; 140 141 /* The allocator remains the same. */ 142} 143 144 145void serf_bucket_aggregate_prepend( 146 serf_bucket_t *aggregate_bucket, 147 serf_bucket_t *prepend_bucket) 148{ 149 aggregate_context_t *ctx = aggregate_bucket->data; 150 bucket_list_t *new_list; 151 152 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, 153 sizeof(*new_list)); 154 new_list->bucket = prepend_bucket; 155 new_list->next = ctx->list; 156 157 ctx->list = new_list; 158} 159 160void serf_bucket_aggregate_append( 161 serf_bucket_t *aggregate_bucket, 162 serf_bucket_t *append_bucket) 163{ 164 aggregate_context_t *ctx = aggregate_bucket->data; 165 bucket_list_t *new_list; 166 167 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, 168 sizeof(*new_list)); 169 new_list->bucket = append_bucket; 170 new_list->next = NULL; 171 172 /* If we use APR_RING, this is trivial. So, wait. 173 new_list->next = ctx->list; 174 ctx->list = new_list; 175 */ 176 if (ctx->list == NULL) { 177 ctx->list = new_list; 178 ctx->last = new_list; 179 } 180 else { 181 ctx->last->next = new_list; 182 ctx->last = ctx->last->next; 183 } 184} 185 186void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket, 187 serf_bucket_aggregate_eof_t fn, 188 void *baton) 189{ 190 aggregate_context_t *ctx = aggregate_bucket->data; 191 ctx->hold_open = fn; 192 ctx->hold_open_baton = baton; 193} 194 195void serf_bucket_aggregate_prepend_iovec( 196 serf_bucket_t *aggregate_bucket, 197 struct iovec *vecs, 198 int vecs_count) 199{ 200 int i; 201 202 /* Add in reverse order. */ 203 for (i = vecs_count - 1; i >= 0; i--) { 204 serf_bucket_t *new_bucket; 205 206 new_bucket = serf_bucket_simple_create(vecs[i].iov_base, 207 vecs[i].iov_len, 208 NULL, NULL, 209 aggregate_bucket->allocator); 210 211 serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket); 212 213 } 214} 215 216void serf_bucket_aggregate_append_iovec( 217 serf_bucket_t *aggregate_bucket, 218 struct iovec *vecs, 219 int vecs_count) 220{ 221 serf_bucket_t *new_bucket; 222 223 new_bucket = serf_bucket_iovec_create(vecs, vecs_count, 224 aggregate_bucket->allocator); 225 226 serf_bucket_aggregate_append(aggregate_bucket, new_bucket); 227} 228 229static apr_status_t read_aggregate(serf_bucket_t *bucket, 230 apr_size_t requested, 231 int vecs_size, struct iovec *vecs, 232 int *vecs_used) 233{ 234 aggregate_context_t *ctx = bucket->data; 235 int cur_vecs_used; 236 apr_status_t status; 237 238 *vecs_used = 0; 239 240 if (!ctx->list) { 241 if (ctx->hold_open) { 242 return ctx->hold_open(ctx->hold_open_baton, bucket); 243 } 244 else { 245 return APR_EOF; 246 } 247 } 248 249 status = APR_SUCCESS; 250 while (requested) { 251 serf_bucket_t *head = ctx->list->bucket; 252 253 status = serf_bucket_read_iovec(head, requested, vecs_size, vecs, 254 &cur_vecs_used); 255 256 if (SERF_BUCKET_READ_ERROR(status)) 257 return status; 258 259 /* Add the number of vecs we read to our running total. */ 260 *vecs_used += cur_vecs_used; 261 262 if (cur_vecs_used > 0 || status) { 263 bucket_list_t *next_list; 264 265 /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now 266 * as it isn't safe to read more without returning to our caller. 267 */ 268 if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) { 269 return status; 270 } 271 272 /* However, if we read EOF, we can stash this bucket in a 273 * to-be-freed list and move on to the next bucket. This ensures 274 * that the bucket stays alive (so as not to violate our read 275 * semantics). We'll destroy this list of buckets the next time 276 * we are asked to perform a read operation - thus ensuring the 277 * proper read lifetime. 278 */ 279 next_list = ctx->list->next; 280 ctx->list->next = ctx->done; 281 ctx->done = ctx->list; 282 ctx->list = next_list; 283 284 /* If we have no more in our list, return EOF. */ 285 if (!ctx->list) { 286 if (ctx->hold_open) { 287 return ctx->hold_open(ctx->hold_open_baton, bucket); 288 } 289 else { 290 return APR_EOF; 291 } 292 } 293 294 /* At this point, it safe to read the next bucket - if we can. */ 295 296 /* If the caller doesn't want ALL_AVAIL, decrement the size 297 * of the items we just read from the list. 298 */ 299 if (requested != SERF_READ_ALL_AVAIL) { 300 int i; 301 302 for (i = 0; i < cur_vecs_used; i++) 303 requested -= vecs[i].iov_len; 304 } 305 306 /* Adjust our vecs to account for what we just read. */ 307 vecs_size -= cur_vecs_used; 308 vecs += cur_vecs_used; 309 310 /* We reached our max. Oh well. */ 311 if (!requested || !vecs_size) { 312 return APR_SUCCESS; 313 } 314 } 315 } 316 317 return status; 318} 319 320static apr_status_t serf_aggregate_read(serf_bucket_t *bucket, 321 apr_size_t requested, 322 const char **data, apr_size_t *len) 323{ 324 aggregate_context_t *ctx = bucket->data; 325 struct iovec vec; 326 int vecs_used; 327 apr_status_t status; 328 329 cleanup_aggregate(ctx, bucket->allocator); 330 331 status = read_aggregate(bucket, requested, 1, &vec, &vecs_used); 332 333 if (!vecs_used) { 334 *len = 0; 335 } 336 else { 337 *data = vec.iov_base; 338 *len = vec.iov_len; 339 } 340 341 return status; 342} 343 344static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket, 345 apr_size_t requested, 346 int vecs_size, 347 struct iovec *vecs, 348 int *vecs_used) 349{ 350 aggregate_context_t *ctx = bucket->data; 351 352 cleanup_aggregate(ctx, bucket->allocator); 353 354 return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used); 355} 356 357static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket, 358 int acceptable, int *found, 359 const char **data, apr_size_t *len) 360{ 361 aggregate_context_t *ctx = bucket->data; 362 apr_status_t status; 363 364 cleanup_aggregate(ctx, bucket->allocator); 365 366 do { 367 serf_bucket_t *head; 368 369 *len = 0; 370 371 if (!ctx->list) { 372 if (ctx->hold_open) { 373 return ctx->hold_open(ctx->hold_open_baton, bucket); 374 } 375 else { 376 return APR_EOF; 377 } 378 } 379 380 head = ctx->list->bucket; 381 382 status = serf_bucket_readline(head, acceptable, found, 383 data, len); 384 if (SERF_BUCKET_READ_ERROR(status)) 385 return status; 386 387 if (status == APR_EOF) { 388 bucket_list_t *next_list; 389 390 /* head bucket is empty, move to to-be-cleaned-up list. */ 391 next_list = ctx->list->next; 392 ctx->list->next = ctx->done; 393 ctx->done = ctx->list; 394 ctx->list = next_list; 395 396 /* If we have no more in our list, return EOF. */ 397 if (!ctx->list) { 398 if (ctx->hold_open) { 399 return ctx->hold_open(ctx->hold_open_baton, bucket); 400 } 401 else { 402 return APR_EOF; 403 } 404 } 405 406 /* we read something, so bail out and let the appl. read again. */ 407 if (*len) 408 status = APR_SUCCESS; 409 } 410 411 /* continue with APR_SUCCESS or APR_EOF and no data read yet. */ 412 } while (!*len && status != APR_EAGAIN); 413 414 return status; 415} 416 417static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket, 418 const char **data, 419 apr_size_t *len) 420{ 421 aggregate_context_t *ctx = bucket->data; 422 serf_bucket_t *head; 423 apr_status_t status; 424 425 cleanup_aggregate(ctx, bucket->allocator); 426 427 /* Peek the first bucket in the list, if any. */ 428 if (!ctx->list) { 429 *len = 0; 430 if (ctx->hold_open) { 431 status = ctx->hold_open(ctx->hold_open_baton, bucket); 432 if (status == APR_EAGAIN) 433 status = APR_SUCCESS; 434 return status; 435 } 436 else { 437 return APR_EOF; 438 } 439 } 440 441 head = ctx->list->bucket; 442 443 status = serf_bucket_peek(head, data, len); 444 445 if (status == APR_EOF) { 446 if (ctx->list->next) { 447 status = APR_SUCCESS; 448 } else { 449 if (ctx->hold_open) { 450 status = ctx->hold_open(ctx->hold_open_baton, bucket); 451 if (status == APR_EAGAIN) 452 status = APR_SUCCESS; 453 return status; 454 } 455 } 456 } 457 458 return status; 459} 460 461static serf_bucket_t * serf_aggregate_read_bucket( 462 serf_bucket_t *bucket, 463 const serf_bucket_type_t *type) 464{ 465 aggregate_context_t *ctx = bucket->data; 466 serf_bucket_t *found_bucket; 467 468 if (!ctx->list) { 469 return NULL; 470 } 471 472 if (ctx->list->bucket->type == type) { 473 /* Got the bucket. Consume it from our list. */ 474 found_bucket = ctx->list->bucket; 475 ctx->list = ctx->list->next; 476 return found_bucket; 477 } 478 479 /* Call read_bucket on first one in our list. */ 480 return serf_bucket_read_bucket(ctx->list->bucket, type); 481} 482 483 484const serf_bucket_type_t serf_bucket_type_aggregate = { 485 "AGGREGATE", 486 serf_aggregate_read, 487 serf_aggregate_readline, 488 serf_aggregate_read_iovec, 489 serf_default_read_for_sendfile, 490 serf_aggregate_read_bucket, 491 serf_aggregate_peek, 492 serf_aggregate_destroy_and_data, 493}; 494