aggregate_buckets.c revision 302408
1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein 2 * 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16#include "serf.h" 17#include "serf_bucket_util.h" 18 19 20/* Should be an APR_RING? */ 21typedef struct bucket_list { 22 serf_bucket_t *bucket; 23 struct bucket_list *next; 24} bucket_list_t; 25 26typedef struct { 27 bucket_list_t *list; /* active buckets */ 28 bucket_list_t *last; /* last bucket of the list */ 29 bucket_list_t *done; /* we finished reading this; now pending a destroy */ 30 31 serf_bucket_aggregate_eof_t hold_open; 32 void *hold_open_baton; 33 34 /* Does this bucket own its children? !0 if yes, 0 if not. */ 35 int bucket_owner; 36} aggregate_context_t; 37 38 39static void cleanup_aggregate(aggregate_context_t *ctx, 40 serf_bucket_alloc_t *allocator) 41{ 42 bucket_list_t *next_list; 43 44 /* If we finished reading a bucket during the previous read, then 45 * we can now toss that bucket. 46 */ 47 while (ctx->done != NULL) { 48 next_list = ctx->done->next; 49 50 if (ctx->bucket_owner) { 51 serf_bucket_destroy(ctx->done->bucket); 52 } 53 serf_bucket_mem_free(allocator, ctx->done); 54 55 ctx->done = next_list; 56 } 57} 58 59void serf_bucket_aggregate_cleanup( 60 serf_bucket_t *bucket, serf_bucket_alloc_t *allocator) 61{ 62 aggregate_context_t *ctx = bucket->data; 63 64 cleanup_aggregate(ctx, allocator); 65} 66 67static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator) 68{ 69 aggregate_context_t *ctx; 70 71 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 72 73 ctx->list = NULL; 74 ctx->last = NULL; 75 ctx->done = NULL; 76 ctx->hold_open = NULL; 77 ctx->hold_open_baton = NULL; 78 ctx->bucket_owner = 1; 79 80 return ctx; 81} 82 83serf_bucket_t *serf_bucket_aggregate_create( 84 serf_bucket_alloc_t *allocator) 85{ 86 aggregate_context_t *ctx; 87 88 ctx = create_aggregate(allocator); 89 90 return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx); 91} 92 93serf_bucket_t *serf__bucket_stream_create( 94 serf_bucket_alloc_t *allocator, 95 serf_bucket_aggregate_eof_t fn, 96 void *baton) 97{ 98 serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator); 99 aggregate_context_t *ctx = bucket->data; 100 101 serf_bucket_aggregate_hold_open(bucket, fn, baton); 102 103 ctx->bucket_owner = 0; 104 105 return bucket; 106} 107 108 109static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket) 110{ 111 aggregate_context_t *ctx = bucket->data; 112 bucket_list_t *next_ctx; 113 114 while (ctx->list) { 115 if (ctx->bucket_owner) { 116 serf_bucket_destroy(ctx->list->bucket); 117 } 118 next_ctx = ctx->list->next; 119 serf_bucket_mem_free(bucket->allocator, ctx->list); 120 ctx->list = next_ctx; 121 } 122 cleanup_aggregate(ctx, bucket->allocator); 123 124 serf_default_destroy_and_data(bucket); 125} 126 127void serf_bucket_aggregate_become(serf_bucket_t *bucket) 128{ 129 aggregate_context_t *ctx; 130 131 ctx = create_aggregate(bucket->allocator); 132 133 bucket->type = &serf_bucket_type_aggregate; 134 bucket->data = ctx; 135 136 /* The allocator remains the same. */ 137} 138 139 140void serf_bucket_aggregate_prepend( 141 serf_bucket_t *aggregate_bucket, 142 serf_bucket_t *prepend_bucket) 143{ 144 aggregate_context_t *ctx = aggregate_bucket->data; 145 bucket_list_t *new_list; 146 147 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, 148 sizeof(*new_list)); 149 new_list->bucket = prepend_bucket; 150 new_list->next = ctx->list; 151 152 ctx->list = new_list; 153} 154 155void serf_bucket_aggregate_append( 156 serf_bucket_t *aggregate_bucket, 157 serf_bucket_t *append_bucket) 158{ 159 aggregate_context_t *ctx = aggregate_bucket->data; 160 bucket_list_t *new_list; 161 162 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, 163 sizeof(*new_list)); 164 new_list->bucket = append_bucket; 165 new_list->next = NULL; 166 167 /* If we use APR_RING, this is trivial. So, wait. 168 new_list->next = ctx->list; 169 ctx->list = new_list; 170 */ 171 if (ctx->list == NULL) { 172 ctx->list = new_list; 173 ctx->last = new_list; 174 } 175 else { 176 ctx->last->next = new_list; 177 ctx->last = ctx->last->next; 178 } 179} 180 181void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket, 182 serf_bucket_aggregate_eof_t fn, 183 void *baton) 184{ 185 aggregate_context_t *ctx = aggregate_bucket->data; 186 ctx->hold_open = fn; 187 ctx->hold_open_baton = baton; 188} 189 190void serf_bucket_aggregate_prepend_iovec( 191 serf_bucket_t *aggregate_bucket, 192 struct iovec *vecs, 193 int vecs_count) 194{ 195 int i; 196 197 /* Add in reverse order. */ 198 for (i = vecs_count - 1; i >= 0; i--) { 199 serf_bucket_t *new_bucket; 200 201 new_bucket = serf_bucket_simple_create(vecs[i].iov_base, 202 vecs[i].iov_len, 203 NULL, NULL, 204 aggregate_bucket->allocator); 205 206 serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket); 207 208 } 209} 210 211void serf_bucket_aggregate_append_iovec( 212 serf_bucket_t *aggregate_bucket, 213 struct iovec *vecs, 214 int vecs_count) 215{ 216 serf_bucket_t *new_bucket; 217 218 new_bucket = serf_bucket_iovec_create(vecs, vecs_count, 219 aggregate_bucket->allocator); 220 221 serf_bucket_aggregate_append(aggregate_bucket, new_bucket); 222} 223 224static apr_status_t read_aggregate(serf_bucket_t *bucket, 225 apr_size_t requested, 226 int vecs_size, struct iovec *vecs, 227 int *vecs_used) 228{ 229 aggregate_context_t *ctx = bucket->data; 230 int cur_vecs_used; 231 apr_status_t status; 232 233 *vecs_used = 0; 234 235 if (!ctx->list) { 236 if (ctx->hold_open) { 237 return ctx->hold_open(ctx->hold_open_baton, bucket); 238 } 239 else { 240 return APR_EOF; 241 } 242 } 243 244 status = APR_SUCCESS; 245 while (requested) { 246 serf_bucket_t *head = ctx->list->bucket; 247 248 status = serf_bucket_read_iovec(head, requested, vecs_size, vecs, 249 &cur_vecs_used); 250 251 if (SERF_BUCKET_READ_ERROR(status)) 252 return status; 253 254 /* Add the number of vecs we read to our running total. */ 255 *vecs_used += cur_vecs_used; 256 257 if (cur_vecs_used > 0 || status) { 258 bucket_list_t *next_list; 259 260 /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now 261 * as it isn't safe to read more without returning to our caller. 262 */ 263 if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) { 264 return status; 265 } 266 267 /* However, if we read EOF, we can stash this bucket in a 268 * to-be-freed list and move on to the next bucket. This ensures 269 * that the bucket stays alive (so as not to violate our read 270 * semantics). We'll destroy this list of buckets the next time 271 * we are asked to perform a read operation - thus ensuring the 272 * proper read lifetime. 273 */ 274 next_list = ctx->list->next; 275 ctx->list->next = ctx->done; 276 ctx->done = ctx->list; 277 ctx->list = next_list; 278 279 /* If we have no more in our list, return EOF. */ 280 if (!ctx->list) { 281 if (ctx->hold_open) { 282 return ctx->hold_open(ctx->hold_open_baton, bucket); 283 } 284 else { 285 return APR_EOF; 286 } 287 } 288 289 /* At this point, it safe to read the next bucket - if we can. */ 290 291 /* If the caller doesn't want ALL_AVAIL, decrement the size 292 * of the items we just read from the list. 293 */ 294 if (requested != SERF_READ_ALL_AVAIL) { 295 int i; 296 297 for (i = 0; i < cur_vecs_used; i++) 298 requested -= vecs[i].iov_len; 299 } 300 301 /* Adjust our vecs to account for what we just read. */ 302 vecs_size -= cur_vecs_used; 303 vecs += cur_vecs_used; 304 305 /* We reached our max. Oh well. */ 306 if (!requested || !vecs_size) { 307 return APR_SUCCESS; 308 } 309 } 310 } 311 312 return status; 313} 314 315static apr_status_t serf_aggregate_read(serf_bucket_t *bucket, 316 apr_size_t requested, 317 const char **data, apr_size_t *len) 318{ 319 aggregate_context_t *ctx = bucket->data; 320 struct iovec vec; 321 int vecs_used; 322 apr_status_t status; 323 324 cleanup_aggregate(ctx, bucket->allocator); 325 326 status = read_aggregate(bucket, requested, 1, &vec, &vecs_used); 327 328 if (!vecs_used) { 329 *len = 0; 330 } 331 else { 332 *data = vec.iov_base; 333 *len = vec.iov_len; 334 } 335 336 return status; 337} 338 339static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket, 340 apr_size_t requested, 341 int vecs_size, 342 struct iovec *vecs, 343 int *vecs_used) 344{ 345 aggregate_context_t *ctx = bucket->data; 346 347 cleanup_aggregate(ctx, bucket->allocator); 348 349 return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used); 350} 351 352static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket, 353 int acceptable, int *found, 354 const char **data, apr_size_t *len) 355{ 356 aggregate_context_t *ctx = bucket->data; 357 apr_status_t status; 358 359 cleanup_aggregate(ctx, bucket->allocator); 360 361 do { 362 serf_bucket_t *head; 363 364 *len = 0; 365 366 if (!ctx->list) { 367 if (ctx->hold_open) { 368 return ctx->hold_open(ctx->hold_open_baton, bucket); 369 } 370 else { 371 return APR_EOF; 372 } 373 } 374 375 head = ctx->list->bucket; 376 377 status = serf_bucket_readline(head, acceptable, found, 378 data, len); 379 if (SERF_BUCKET_READ_ERROR(status)) 380 return status; 381 382 if (status == APR_EOF) { 383 bucket_list_t *next_list; 384 385 /* head bucket is empty, move to to-be-cleaned-up list. */ 386 next_list = ctx->list->next; 387 ctx->list->next = ctx->done; 388 ctx->done = ctx->list; 389 ctx->list = next_list; 390 391 /* If we have no more in our list, return EOF. */ 392 if (!ctx->list) { 393 if (ctx->hold_open) { 394 return ctx->hold_open(ctx->hold_open_baton, bucket); 395 } 396 else { 397 return APR_EOF; 398 } 399 } 400 401 /* we read something, so bail out and let the appl. read again. */ 402 if (*len) 403 status = APR_SUCCESS; 404 } 405 406 /* continue with APR_SUCCESS or APR_EOF and no data read yet. */ 407 } while (!*len && status != APR_EAGAIN); 408 409 return status; 410} 411 412static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket, 413 const char **data, 414 apr_size_t *len) 415{ 416 aggregate_context_t *ctx = bucket->data; 417 serf_bucket_t *head; 418 apr_status_t status; 419 420 cleanup_aggregate(ctx, bucket->allocator); 421 422 /* Peek the first bucket in the list, if any. */ 423 if (!ctx->list) { 424 *len = 0; 425 if (ctx->hold_open) { 426 status = ctx->hold_open(ctx->hold_open_baton, bucket); 427 if (status == APR_EAGAIN) 428 status = APR_SUCCESS; 429 return status; 430 } 431 else { 432 return APR_EOF; 433 } 434 } 435 436 head = ctx->list->bucket; 437 438 status = serf_bucket_peek(head, data, len); 439 440 if (status == APR_EOF) { 441 if (ctx->list->next) { 442 status = APR_SUCCESS; 443 } else { 444 if (ctx->hold_open) { 445 status = ctx->hold_open(ctx->hold_open_baton, bucket); 446 if (status == APR_EAGAIN) 447 status = APR_SUCCESS; 448 return status; 449 } 450 } 451 } 452 453 return status; 454} 455 456static serf_bucket_t * serf_aggregate_read_bucket( 457 serf_bucket_t *bucket, 458 const serf_bucket_type_t *type) 459{ 460 aggregate_context_t *ctx = bucket->data; 461 serf_bucket_t *found_bucket; 462 463 if (!ctx->list) { 464 return NULL; 465 } 466 467 if (ctx->list->bucket->type == type) { 468 /* Got the bucket. Consume it from our list. */ 469 found_bucket = ctx->list->bucket; 470 ctx->list = ctx->list->next; 471 return found_bucket; 472 } 473 474 /* Call read_bucket on first one in our list. */ 475 return serf_bucket_read_bucket(ctx->list->bucket, type); 476} 477 478 479const serf_bucket_type_t serf_bucket_type_aggregate = { 480 "AGGREGATE", 481 serf_aggregate_read, 482 serf_aggregate_readline, 483 serf_aggregate_read_iovec, 484 serf_default_read_for_sendfile, 485 serf_aggregate_read_bucket, 486 serf_aggregate_peek, 487 serf_aggregate_destroy_and_data, 488}; 489