aggregate_buckets.c revision 251877
1116743Ssam/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein 2127780Ssam * 3116743Ssam * Licensed under the Apache License, Version 2.0 (the "License"); 4116743Ssam * you may not use this file except in compliance with the License. 5116743Ssam * You may obtain a copy of the License at 6116743Ssam * 7116743Ssam * http://www.apache.org/licenses/LICENSE-2.0 8116743Ssam * 9116743Ssam * Unless required by applicable law or agreed to in writing, software 10116743Ssam * distributed under the License is distributed on an "AS IS" BASIS, 11116743Ssam * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12116743Ssam * See the License for the specific language governing permissions and 13116743Ssam * limitations under the License. 14116743Ssam */ 15116743Ssam 16116743Ssam#include "serf.h" 17116743Ssam#include "serf_bucket_util.h" 18116743Ssam 19116743Ssam 20116743Ssam/* Should be an APR_RING? */ 21116743Ssamtypedef struct bucket_list { 22116743Ssam serf_bucket_t *bucket; 23116743Ssam struct bucket_list *next; 24116743Ssam} bucket_list_t; 25116743Ssam 26116743Ssamtypedef struct { 27116743Ssam bucket_list_t *list; /* active buckets */ 28116743Ssam bucket_list_t *last; /* last bucket of the list */ 29116743Ssam bucket_list_t *done; /* we finished reading this; now pending a destroy */ 30116743Ssam 31116743Ssam serf_bucket_aggregate_eof_t hold_open; 32116743Ssam void *hold_open_baton; 33116743Ssam 34116743Ssam /* Does this bucket own its children? !0 if yes, 0 if not. */ 35116743Ssam int bucket_owner; 36116743Ssam} aggregate_context_t; 37116743Ssam 38116743Ssam 39116743Ssamstatic void cleanup_aggregate(aggregate_context_t *ctx, 40116743Ssam serf_bucket_alloc_t *allocator) 41116743Ssam{ 42116743Ssam bucket_list_t *next_list; 43116743Ssam 44116743Ssam /* If we finished reading a bucket during the previous read, then 45116743Ssam * we can now toss that bucket. 46116743Ssam */ 47116743Ssam while (ctx->done != NULL) { 48119783Ssam next_list = ctx->done->next; 49116743Ssam 50138570Ssam if (ctx->bucket_owner) { 51116743Ssam serf_bucket_destroy(ctx->done->bucket); 52116743Ssam } 53116743Ssam serf_bucket_mem_free(allocator, ctx->done); 54116743Ssam 55116743Ssam ctx->done = next_list; 56116743Ssam } 57138570Ssam} 58138570Ssam 59116743Ssamvoid serf_bucket_aggregate_cleanup( 60138570Ssam serf_bucket_t *bucket, serf_bucket_alloc_t *allocator) 61116743Ssam{ 62119150Ssam aggregate_context_t *ctx = bucket->data; 63138570Ssam 64138570Ssam cleanup_aggregate(ctx, allocator); 65138570Ssam} 66138570Ssam 67138570Ssamstatic aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator) 68116743Ssam{ 69138570Ssam aggregate_context_t *ctx; 70138570Ssam 71116743Ssam ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 72138570Ssam 73138570Ssam ctx->list = NULL; 74138570Ssam ctx->last = NULL; 75138570Ssam ctx->done = NULL; 76138570Ssam ctx->hold_open = NULL; 77138570Ssam ctx->hold_open_baton = NULL; 78138570Ssam ctx->bucket_owner = 1; 79138570Ssam 80138570Ssam return ctx; 81138570Ssam} 82138570Ssam 83116743Ssamserf_bucket_t *serf_bucket_aggregate_create( 84138570Ssam serf_bucket_alloc_t *allocator) 85116743Ssam{ 86116743Ssam aggregate_context_t *ctx; 87116743Ssam 88138570Ssam ctx = create_aggregate(allocator); 89116743Ssam 90116743Ssam return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx); 91116743Ssam} 92116743Ssam 93116743Ssamserf_bucket_t *serf__bucket_stream_create( 94116743Ssam serf_bucket_alloc_t *allocator, 95138570Ssam serf_bucket_aggregate_eof_t fn, 96116743Ssam void *baton) 97138570Ssam{ 98138570Ssam serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator); 99138570Ssam aggregate_context_t *ctx = bucket->data; 100138570Ssam 101138570Ssam serf_bucket_aggregate_hold_open(bucket, fn, baton); 102138570Ssam 103138570Ssam ctx->bucket_owner = 0; 104138570Ssam 105138570Ssam return bucket; 106138570Ssam} 107138570Ssam 108138570Ssam 109138570Ssamstatic void serf_aggregate_destroy_and_data(serf_bucket_t *bucket) 110138570Ssam{ 111138570Ssam aggregate_context_t *ctx = bucket->data; 112138570Ssam bucket_list_t *next_ctx; 113138570Ssam 114138570Ssam while (ctx->list) { 115138570Ssam if (ctx->bucket_owner) { 116138570Ssam serf_bucket_destroy(ctx->list->bucket); 117138570Ssam } 118138570Ssam next_ctx = ctx->list->next; 119138570Ssam serf_bucket_mem_free(bucket->allocator, ctx->list); 120138570Ssam ctx->list = next_ctx; 121138570Ssam } 122138570Ssam cleanup_aggregate(ctx, bucket->allocator); 123138570Ssam 124138570Ssam serf_default_destroy_and_data(bucket); 125138570Ssam} 126138570Ssam 127138570Ssamvoid serf_bucket_aggregate_become(serf_bucket_t *bucket) 128138570Ssam{ 129138570Ssam aggregate_context_t *ctx; 130138570Ssam 131138570Ssam ctx = create_aggregate(bucket->allocator); 132138570Ssam 133138570Ssam bucket->type = &serf_bucket_type_aggregate; 134138570Ssam bucket->data = ctx; 135138570Ssam 136138570Ssam /* The allocator remains the same. */ 137138570Ssam} 138138570Ssam 139138570Ssam 140138570Ssamvoid serf_bucket_aggregate_prepend( 141138570Ssam serf_bucket_t *aggregate_bucket, 142138570Ssam serf_bucket_t *prepend_bucket) 143138570Ssam{ 144138570Ssam aggregate_context_t *ctx = aggregate_bucket->data; 145138570Ssam bucket_list_t *new_list; 146116743Ssam 147138570Ssam new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, 148138570Ssam sizeof(*new_list)); 149116743Ssam new_list->bucket = prepend_bucket; 150138570Ssam new_list->next = ctx->list; 151138570Ssam 152138570Ssam ctx->list = new_list; 153138570Ssam} 154138570Ssam 155138570Ssamvoid serf_bucket_aggregate_append( 156138570Ssam serf_bucket_t *aggregate_bucket, 157117812Ssam serf_bucket_t *append_bucket) 158117812Ssam{ 159138570Ssam aggregate_context_t *ctx = aggregate_bucket->data; 160116743Ssam bucket_list_t *new_list; 161116743Ssam 162116743Ssam new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, 163116743Ssam sizeof(*new_list)); 164116743Ssam new_list->bucket = append_bucket; 165116743Ssam new_list->next = NULL; 166138570Ssam 167138570Ssam /* If we use APR_RING, this is trivial. So, wait. 168116743Ssam new_list->next = ctx->list; 169138570Ssam ctx->list = new_list; 170138570Ssam */ 171138570Ssam if (ctx->list == NULL) { 172138570Ssam ctx->list = new_list; 173138570Ssam ctx->last = new_list; 174138570Ssam } 175138570Ssam else { 176138570Ssam ctx->last->next = new_list; 177116743Ssam ctx->last = ctx->last->next; 178116743Ssam } 179116743Ssam} 180116743Ssam 181138570Ssamvoid serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket, 182138570Ssam serf_bucket_aggregate_eof_t fn, 183116743Ssam void *baton) 184119144Ssam{ 185139500Ssam aggregate_context_t *ctx = aggregate_bucket->data; 186138570Ssam ctx->hold_open = fn; 187138570Ssam ctx->hold_open_baton = baton; 188116743Ssam} 189138570Ssam 190138570Ssamvoid serf_bucket_aggregate_prepend_iovec( 191116743Ssam serf_bucket_t *aggregate_bucket, 192138570Ssam struct iovec *vecs, 193138570Ssam int vecs_count) 194138570Ssam{ 195138570Ssam int i; 196119783Ssam 197119783Ssam /* Add in reverse order. */ 198119783Ssam for (i = vecs_count - 1; i >= 0; i--) { 199119783Ssam serf_bucket_t *new_bucket; 200119783Ssam 201127698Ssam new_bucket = serf_bucket_simple_create(vecs[i].iov_base, 202119783Ssam vecs[i].iov_len, 203139500Ssam NULL, NULL, 204139500Ssam aggregate_bucket->allocator); 205139500Ssam 206139500Ssam serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket); 207119783Ssam 208119783Ssam } 209139500Ssam} 210119783Ssam 211116743Ssamvoid serf_bucket_aggregate_append_iovec( 212116743Ssam serf_bucket_t *aggregate_bucket, 213138570Ssam struct iovec *vecs, 214138570Ssam int vecs_count) 215116743Ssam{ 216116743Ssam serf_bucket_t *new_bucket; 217138570Ssam 218138570Ssam new_bucket = serf_bucket_iovec_create(vecs, vecs_count, 219138570Ssam aggregate_bucket->allocator); 220116743Ssam 221138570Ssam serf_bucket_aggregate_append(aggregate_bucket, new_bucket); 222138570Ssam} 223138570Ssam 224116743Ssamstatic apr_status_t read_aggregate(serf_bucket_t *bucket, 225138570Ssam apr_size_t requested, 226138570Ssam int vecs_size, struct iovec *vecs, 227138570Ssam int *vecs_used) 228138570Ssam{ 229116743Ssam aggregate_context_t *ctx = bucket->data; 230116743Ssam int cur_vecs_used; 231138570Ssam apr_status_t status; 232138570Ssam 233116743Ssam *vecs_used = 0; 234138570Ssam 235138570Ssam if (!ctx->list) { 236138570Ssam if (ctx->hold_open) { 237138570Ssam return ctx->hold_open(ctx->hold_open_baton, bucket); 238116743Ssam } 239138570Ssam else { 240138570Ssam return APR_EOF; 241138570Ssam } 242138570Ssam } 243138570Ssam 244138570Ssam status = APR_SUCCESS; 245116743Ssam while (requested) { 246116743Ssam serf_bucket_t *head = ctx->list->bucket; 247116743Ssam 248116743Ssam status = serf_bucket_read_iovec(head, requested, vecs_size, vecs, 249138570Ssam &cur_vecs_used); 250119783Ssam 251139500Ssam if (SERF_BUCKET_READ_ERROR(status)) 252139500Ssam return status; 253139500Ssam 254116743Ssam /* Add the number of vecs we read to our running total. */ 255121100Ssam *vecs_used += cur_vecs_used; 256121100Ssam 257121100Ssam if (cur_vecs_used > 0 || status) { 258121100Ssam bucket_list_t *next_list; 259121100Ssam 260121100Ssam /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now 261121100Ssam * as it isn't safe to read more without returning to our caller. 262121100Ssam */ 263138570Ssam if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) { 264138570Ssam return status; 265121100Ssam } 266121100Ssam 267121100Ssam /* However, if we read EOF, we can stash this bucket in a 268121100Ssam * to-be-freed list and move on to the next bucket. This ensures 269121100Ssam * that the bucket stays alive (so as not to violate our read 270121100Ssam * semantics). We'll destroy this list of buckets the next time 271121100Ssam * we are asked to perform a read operation - thus ensuring the 272121100Ssam * proper read lifetime. 273121100Ssam */ 274116743Ssam next_list = ctx->list->next; 275116743Ssam ctx->list->next = ctx->done; 276116743Ssam ctx->done = ctx->list; 277116743Ssam ctx->list = next_list; 278116743Ssam 279116743Ssam /* If we have no more in our list, return EOF. */ 280116743Ssam if (!ctx->list) { 281116743Ssam if (ctx->hold_open) { 282116743Ssam return ctx->hold_open(ctx->hold_open_baton, bucket); 283116743Ssam } 284138570Ssam else { 285138570Ssam return APR_EOF; 286116743Ssam } 287116743Ssam } 288116743Ssam 289116743Ssam /* At this point, it safe to read the next bucket - if we can. */ 290116743Ssam 291116743Ssam /* If the caller doesn't want ALL_AVAIL, decrement the size 292138570Ssam * of the items we just read from the list. 293138570Ssam */ 294116743Ssam if (requested != SERF_READ_ALL_AVAIL) { 295116743Ssam int i; 296116743Ssam 297116743Ssam for (i = 0; i < cur_vecs_used; i++) 298116743Ssam requested -= vecs[i].iov_len; 299116743Ssam } 300116743Ssam 301116743Ssam /* Adjust our vecs to account for what we just read. */ 302116743Ssam vecs_size -= cur_vecs_used; 303116743Ssam vecs += cur_vecs_used; 304116743Ssam 305116743Ssam /* We reached our max. Oh well. */ 306138570Ssam if (!requested || !vecs_size) { 307138570Ssam return APR_SUCCESS; 308116743Ssam } 309116743Ssam } 310138570Ssam } 311138570Ssam 312116743Ssam return status; 313116743Ssam} 314116743Ssam 315116743Ssamstatic apr_status_t serf_aggregate_read(serf_bucket_t *bucket, 316116743Ssam apr_size_t requested, 317116743Ssam const char **data, apr_size_t *len) 318116743Ssam{ 319116743Ssam aggregate_context_t *ctx = bucket->data; 320116743Ssam struct iovec vec; 321116743Ssam int vecs_used; 322116743Ssam apr_status_t status; 323116743Ssam 324116743Ssam cleanup_aggregate(ctx, bucket->allocator); 325116743Ssam 326116743Ssam status = read_aggregate(bucket, requested, 1, &vec, &vecs_used); 327116743Ssam 328116743Ssam if (!vecs_used) { 329116743Ssam *len = 0; 330116743Ssam } 331116743Ssam else { 332116743Ssam *data = vec.iov_base; 333116743Ssam *len = vec.iov_len; 334116743Ssam } 335116743Ssam 336116743Ssam return status; 337116743Ssam} 338138570Ssam 339138570Ssamstatic apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket, 340116743Ssam apr_size_t requested, 341116743Ssam int vecs_size, 342116743Ssam struct iovec *vecs, 343116743Ssam int *vecs_used) 344116743Ssam{ 345116743Ssam aggregate_context_t *ctx = bucket->data; 346116743Ssam 347116743Ssam cleanup_aggregate(ctx, bucket->allocator); 348116743Ssam 349116743Ssam return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used); 350138570Ssam} 351138570Ssam 352116743Ssamstatic apr_status_t serf_aggregate_readline(serf_bucket_t *bucket, 353116743Ssam int acceptable, int *found, 354138570Ssam const char **data, apr_size_t *len) 355138570Ssam{ 356116743Ssam /* Follow pattern from serf_aggregate_read. */ 357138570Ssam return APR_ENOTIMPL; 358138570Ssam} 359138570Ssam 360138570Ssamstatic apr_status_t serf_aggregate_peek(serf_bucket_t *bucket, 361138570Ssam const char **data, 362116743Ssam apr_size_t *len) 363116743Ssam{ 364116743Ssam /* Follow pattern from serf_aggregate_read. */ 365116743Ssam return APR_ENOTIMPL; 366116743Ssam} 367116743Ssam 368116743Ssamstatic serf_bucket_t * serf_aggregate_read_bucket( 369116743Ssam serf_bucket_t *bucket, 370138570Ssam const serf_bucket_type_t *type) 371138570Ssam{ 372138570Ssam aggregate_context_t *ctx = bucket->data; 373116743Ssam serf_bucket_t *found_bucket; 374116743Ssam 375116743Ssam if (!ctx->list) { 376116743Ssam return NULL; 377116743Ssam } 378116743Ssam 379138570Ssam if (ctx->list->bucket->type == type) { 380138570Ssam /* Got the bucket. Consume it from our list. */ 381138570Ssam found_bucket = ctx->list->bucket; 382138570Ssam ctx->list = ctx->list->next; 383116743Ssam return found_bucket; 384116743Ssam } 385138570Ssam 386138570Ssam /* Call read_bucket on first one in our list. */ 387138570Ssam return serf_bucket_read_bucket(ctx->list->bucket, type); 388138570Ssam} 389138570Ssam 390138570Ssam 391138570Ssamconst serf_bucket_type_t serf_bucket_type_aggregate = { 392138570Ssam "AGGREGATE", 393138570Ssam serf_aggregate_read, 394138570Ssam serf_aggregate_readline, 395138570Ssam serf_aggregate_read_iovec, 396138570Ssam serf_default_read_for_sendfile, 397138570Ssam serf_aggregate_read_bucket, 398138570Ssam serf_aggregate_peek, 399138570Ssam serf_aggregate_destroy_and_data, 400138570Ssam}; 401138570Ssam