aggregate_buckets.c revision 253895
198944Sobrien/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein 298944Sobrien * 398944Sobrien * Licensed under the Apache License, Version 2.0 (the "License"); 498944Sobrien * you may not use this file except in compliance with the License. 598944Sobrien * You may obtain a copy of the License at 698944Sobrien * 798944Sobrien * http://www.apache.org/licenses/LICENSE-2.0 898944Sobrien * 998944Sobrien * Unless required by applicable law or agreed to in writing, software 1098944Sobrien * distributed under the License is distributed on an "AS IS" BASIS, 1198944Sobrien * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1298944Sobrien * See the License for the specific language governing permissions and 1398944Sobrien * limitations under the License. 1498944Sobrien */ 1598944Sobrien 1698944Sobrien#include "serf.h" 1798944Sobrien#include "serf_bucket_util.h" 1898944Sobrien 1998944Sobrien 2098944Sobrien/* Should be an APR_RING? */ 2198944Sobrientypedef struct bucket_list { 2298944Sobrien serf_bucket_t *bucket; 2398944Sobrien struct bucket_list *next; 2498944Sobrien} bucket_list_t; 2598944Sobrien 2698944Sobrientypedef struct { 2798944Sobrien bucket_list_t *list; /* active buckets */ 2898944Sobrien bucket_list_t *last; /* last bucket of the list */ 2998944Sobrien bucket_list_t *done; /* we finished reading this; now pending a destroy */ 3098944Sobrien 3198944Sobrien serf_bucket_aggregate_eof_t hold_open; 3298944Sobrien void *hold_open_baton; 3398944Sobrien 3498944Sobrien /* Does this bucket own its children? !0 if yes, 0 if not. */ 3598944Sobrien int bucket_owner; 3698944Sobrien} aggregate_context_t; 3798944Sobrien 3898944Sobrien 3998944Sobrienstatic void cleanup_aggregate(aggregate_context_t *ctx, 4098944Sobrien serf_bucket_alloc_t *allocator) 4198944Sobrien{ 4298944Sobrien bucket_list_t *next_list; 4398944Sobrien 4498944Sobrien /* If we finished reading a bucket during the previous read, then 4598944Sobrien * we can now toss that bucket. 4698944Sobrien */ 4798944Sobrien while (ctx->done != NULL) { 4898944Sobrien next_list = ctx->done->next; 4998944Sobrien 5098944Sobrien if (ctx->bucket_owner) { 5198944Sobrien serf_bucket_destroy(ctx->done->bucket); 5298944Sobrien } 5398944Sobrien serf_bucket_mem_free(allocator, ctx->done); 5498944Sobrien 5598944Sobrien ctx->done = next_list; 5698944Sobrien } 5798944Sobrien} 5898944Sobrien 5998944Sobrienvoid serf_bucket_aggregate_cleanup( 6098944Sobrien serf_bucket_t *bucket, serf_bucket_alloc_t *allocator) 6198944Sobrien{ 6298944Sobrien aggregate_context_t *ctx = bucket->data; 6398944Sobrien 6498944Sobrien cleanup_aggregate(ctx, allocator); 6598944Sobrien} 6698944Sobrien 6798944Sobrienstatic aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator) 6898944Sobrien{ 6998944Sobrien aggregate_context_t *ctx; 7098944Sobrien 7198944Sobrien ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx)); 7298944Sobrien 7398944Sobrien ctx->list = NULL; 7498944Sobrien ctx->last = NULL; 7598944Sobrien ctx->done = NULL; 7698944Sobrien ctx->hold_open = NULL; 7798944Sobrien ctx->hold_open_baton = NULL; 7898944Sobrien ctx->bucket_owner = 1; 7998944Sobrien 8098944Sobrien return ctx; 8198944Sobrien} 8298944Sobrien 8398944Sobrienserf_bucket_t *serf_bucket_aggregate_create( 8498944Sobrien serf_bucket_alloc_t *allocator) 8598944Sobrien{ 8698944Sobrien aggregate_context_t *ctx; 8798944Sobrien 8898944Sobrien ctx = create_aggregate(allocator); 8998944Sobrien 9098944Sobrien return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx); 9198944Sobrien} 9298944Sobrien 9398944Sobrienserf_bucket_t *serf__bucket_stream_create( 9498944Sobrien serf_bucket_alloc_t *allocator, 9598944Sobrien serf_bucket_aggregate_eof_t fn, 9698944Sobrien void *baton) 9798944Sobrien{ 9898944Sobrien serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator); 9998944Sobrien aggregate_context_t *ctx = bucket->data; 10098944Sobrien 10198944Sobrien serf_bucket_aggregate_hold_open(bucket, fn, baton); 10298944Sobrien 10398944Sobrien ctx->bucket_owner = 0; 10498944Sobrien 10598944Sobrien return bucket; 10698944Sobrien} 10798944Sobrien 10898944Sobrien 10998944Sobrienstatic void serf_aggregate_destroy_and_data(serf_bucket_t *bucket) 11098944Sobrien{ 11198944Sobrien aggregate_context_t *ctx = bucket->data; 11298944Sobrien bucket_list_t *next_ctx; 11398944Sobrien 11498944Sobrien while (ctx->list) { 11598944Sobrien if (ctx->bucket_owner) { 11698944Sobrien serf_bucket_destroy(ctx->list->bucket); 11798944Sobrien } 11898944Sobrien next_ctx = ctx->list->next; 11998944Sobrien serf_bucket_mem_free(bucket->allocator, ctx->list); 12098944Sobrien ctx->list = next_ctx; 12198944Sobrien } 12298944Sobrien cleanup_aggregate(ctx, bucket->allocator); 12398944Sobrien 12498944Sobrien serf_default_destroy_and_data(bucket); 12598944Sobrien} 12698944Sobrien 12798944Sobrienvoid serf_bucket_aggregate_become(serf_bucket_t *bucket) 12898944Sobrien{ 12998944Sobrien aggregate_context_t *ctx; 13098944Sobrien 13198944Sobrien ctx = create_aggregate(bucket->allocator); 13298944Sobrien 13398944Sobrien bucket->type = &serf_bucket_type_aggregate; 13498944Sobrien bucket->data = ctx; 13598944Sobrien 13698944Sobrien /* The allocator remains the same. */ 13798944Sobrien} 13898944Sobrien 13998944Sobrien 14098944Sobrienvoid serf_bucket_aggregate_prepend( 14198944Sobrien serf_bucket_t *aggregate_bucket, 14298944Sobrien serf_bucket_t *prepend_bucket) 14398944Sobrien{ 14498944Sobrien aggregate_context_t *ctx = aggregate_bucket->data; 14598944Sobrien bucket_list_t *new_list; 14698944Sobrien 14798944Sobrien new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, 14898944Sobrien sizeof(*new_list)); 14998944Sobrien new_list->bucket = prepend_bucket; 15098944Sobrien new_list->next = ctx->list; 15198944Sobrien 15298944Sobrien ctx->list = new_list; 15398944Sobrien} 15498944Sobrien 15598944Sobrienvoid serf_bucket_aggregate_append( 15698944Sobrien serf_bucket_t *aggregate_bucket, 15798944Sobrien serf_bucket_t *append_bucket) 15898944Sobrien{ 15998944Sobrien aggregate_context_t *ctx = aggregate_bucket->data; 16098944Sobrien bucket_list_t *new_list; 16198944Sobrien 16298944Sobrien new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator, 16398944Sobrien sizeof(*new_list)); 16498944Sobrien new_list->bucket = append_bucket; 16598944Sobrien new_list->next = NULL; 16698944Sobrien 16798944Sobrien /* If we use APR_RING, this is trivial. So, wait. 16898944Sobrien new_list->next = ctx->list; 16998944Sobrien ctx->list = new_list; 17098944Sobrien */ 17198944Sobrien if (ctx->list == NULL) { 17298944Sobrien ctx->list = new_list; 17398944Sobrien ctx->last = new_list; 17498944Sobrien } 17598944Sobrien else { 17698944Sobrien ctx->last->next = new_list; 17798944Sobrien ctx->last = ctx->last->next; 17898944Sobrien } 17998944Sobrien} 18098944Sobrien 18198944Sobrienvoid serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket, 18298944Sobrien serf_bucket_aggregate_eof_t fn, 18398944Sobrien void *baton) 18498944Sobrien{ 18598944Sobrien aggregate_context_t *ctx = aggregate_bucket->data; 18698944Sobrien ctx->hold_open = fn; 18798944Sobrien ctx->hold_open_baton = baton; 18898944Sobrien} 18998944Sobrien 19098944Sobrienvoid serf_bucket_aggregate_prepend_iovec( 19198944Sobrien serf_bucket_t *aggregate_bucket, 19298944Sobrien struct iovec *vecs, 19398944Sobrien int vecs_count) 19498944Sobrien{ 19598944Sobrien int i; 19698944Sobrien 19798944Sobrien /* Add in reverse order. */ 19898944Sobrien for (i = vecs_count - 1; i >= 0; i--) { 19998944Sobrien serf_bucket_t *new_bucket; 20098944Sobrien 20198944Sobrien new_bucket = serf_bucket_simple_create(vecs[i].iov_base, 20298944Sobrien vecs[i].iov_len, 20398944Sobrien NULL, NULL, 20498944Sobrien aggregate_bucket->allocator); 20598944Sobrien 20698944Sobrien serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket); 20798944Sobrien 20898944Sobrien } 20998944Sobrien} 21098944Sobrien 21198944Sobrienvoid serf_bucket_aggregate_append_iovec( 21298944Sobrien serf_bucket_t *aggregate_bucket, 21398944Sobrien struct iovec *vecs, 21498944Sobrien int vecs_count) 21598944Sobrien{ 21698944Sobrien serf_bucket_t *new_bucket; 21798944Sobrien 21898944Sobrien new_bucket = serf_bucket_iovec_create(vecs, vecs_count, 21998944Sobrien aggregate_bucket->allocator); 22098944Sobrien 22198944Sobrien serf_bucket_aggregate_append(aggregate_bucket, new_bucket); 22298944Sobrien} 22398944Sobrien 22498944Sobrienstatic apr_status_t read_aggregate(serf_bucket_t *bucket, 22598944Sobrien apr_size_t requested, 22698944Sobrien int vecs_size, struct iovec *vecs, 22798944Sobrien int *vecs_used) 22898944Sobrien{ 22998944Sobrien aggregate_context_t *ctx = bucket->data; 23098944Sobrien int cur_vecs_used; 23198944Sobrien apr_status_t status; 23298944Sobrien 23398944Sobrien *vecs_used = 0; 23498944Sobrien 23598944Sobrien if (!ctx->list) { 23698944Sobrien if (ctx->hold_open) { 23798944Sobrien return ctx->hold_open(ctx->hold_open_baton, bucket); 23898944Sobrien } 23998944Sobrien else { 24098944Sobrien return APR_EOF; 24198944Sobrien } 24298944Sobrien } 24398944Sobrien 24498944Sobrien status = APR_SUCCESS; 24598944Sobrien while (requested) { 24698944Sobrien serf_bucket_t *head = ctx->list->bucket; 24798944Sobrien 24898944Sobrien status = serf_bucket_read_iovec(head, requested, vecs_size, vecs, 24998944Sobrien &cur_vecs_used); 25098944Sobrien 25198944Sobrien if (SERF_BUCKET_READ_ERROR(status)) 25298944Sobrien return status; 25398944Sobrien 25498944Sobrien /* Add the number of vecs we read to our running total. */ 25598944Sobrien *vecs_used += cur_vecs_used; 25698944Sobrien 25798944Sobrien if (cur_vecs_used > 0 || status) { 25898944Sobrien bucket_list_t *next_list; 25998944Sobrien 26098944Sobrien /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now 26198944Sobrien * as it isn't safe to read more without returning to our caller. 26298944Sobrien */ 26398944Sobrien if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) { 26498944Sobrien return status; 26598944Sobrien } 26698944Sobrien 26798944Sobrien /* However, if we read EOF, we can stash this bucket in a 26898944Sobrien * to-be-freed list and move on to the next bucket. This ensures 26998944Sobrien * that the bucket stays alive (so as not to violate our read 27098944Sobrien * semantics). We'll destroy this list of buckets the next time 27198944Sobrien * we are asked to perform a read operation - thus ensuring the 27298944Sobrien * proper read lifetime. 27398944Sobrien */ 27498944Sobrien next_list = ctx->list->next; 27598944Sobrien ctx->list->next = ctx->done; 27698944Sobrien ctx->done = ctx->list; 27798944Sobrien ctx->list = next_list; 27898944Sobrien 27998944Sobrien /* If we have no more in our list, return EOF. */ 28098944Sobrien if (!ctx->list) { 28198944Sobrien if (ctx->hold_open) { 28298944Sobrien return ctx->hold_open(ctx->hold_open_baton, bucket); 28398944Sobrien } 28498944Sobrien else { 28598944Sobrien return APR_EOF; 28698944Sobrien } 28798944Sobrien } 28898944Sobrien 28998944Sobrien /* At this point, it safe to read the next bucket - if we can. */ 29098944Sobrien 29198944Sobrien /* If the caller doesn't want ALL_AVAIL, decrement the size 29298944Sobrien * of the items we just read from the list. 29398944Sobrien */ 29498944Sobrien if (requested != SERF_READ_ALL_AVAIL) { 29598944Sobrien int i; 29698944Sobrien 29798944Sobrien for (i = 0; i < cur_vecs_used; i++) 29898944Sobrien requested -= vecs[i].iov_len; 29998944Sobrien } 30098944Sobrien 30198944Sobrien /* Adjust our vecs to account for what we just read. */ 30298944Sobrien vecs_size -= cur_vecs_used; 30398944Sobrien vecs += cur_vecs_used; 30498944Sobrien 30598944Sobrien /* We reached our max. Oh well. */ 30698944Sobrien if (!requested || !vecs_size) { 30798944Sobrien return APR_SUCCESS; 30898944Sobrien } 30998944Sobrien } 31098944Sobrien } 31198944Sobrien 31298944Sobrien return status; 31398944Sobrien} 31498944Sobrien 31598944Sobrienstatic apr_status_t serf_aggregate_read(serf_bucket_t *bucket, 31698944Sobrien apr_size_t requested, 31798944Sobrien const char **data, apr_size_t *len) 31898944Sobrien{ 31998944Sobrien aggregate_context_t *ctx = bucket->data; 32098944Sobrien struct iovec vec; 32198944Sobrien int vecs_used; 32298944Sobrien apr_status_t status; 32398944Sobrien 324130803Smarcel cleanup_aggregate(ctx, bucket->allocator); 32598944Sobrien 32698944Sobrien status = read_aggregate(bucket, requested, 1, &vec, &vecs_used); 32798944Sobrien 32898944Sobrien if (!vecs_used) { 32998944Sobrien *len = 0; 33098944Sobrien } 33198944Sobrien else { 33298944Sobrien *data = vec.iov_base; 33398944Sobrien *len = vec.iov_len; 33498944Sobrien } 33598944Sobrien 33698944Sobrien return status; 337130803Smarcel} 33898944Sobrien 33998944Sobrienstatic apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket, 340130803Smarcel apr_size_t requested, 34198944Sobrien int vecs_size, 34298944Sobrien struct iovec *vecs, 34398944Sobrien int *vecs_used) 34498944Sobrien{ 34598944Sobrien aggregate_context_t *ctx = bucket->data; 34698944Sobrien 34798944Sobrien cleanup_aggregate(ctx, bucket->allocator); 34898944Sobrien 34998944Sobrien return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used); 35098944Sobrien} 35198944Sobrien 35298944Sobrienstatic apr_status_t serf_aggregate_readline(serf_bucket_t *bucket, 35398944Sobrien int acceptable, int *found, 35498944Sobrien const char **data, apr_size_t *len) 35598944Sobrien{ 35698944Sobrien aggregate_context_t *ctx = bucket->data; 35798944Sobrien apr_status_t status; 35898944Sobrien 35998944Sobrien cleanup_aggregate(ctx, bucket->allocator); 36098944Sobrien 36198944Sobrien do { 362130803Smarcel serf_bucket_t *head; 363130803Smarcel 364130803Smarcel *len = 0; 365130803Smarcel 366130803Smarcel if (!ctx->list) { 367130803Smarcel if (ctx->hold_open) { 368130803Smarcel return ctx->hold_open(ctx->hold_open_baton, bucket); 369130803Smarcel } 37098944Sobrien else { 37198944Sobrien return APR_EOF; 37298944Sobrien } 37398944Sobrien } 37498944Sobrien 37598944Sobrien head = ctx->list->bucket; 37698944Sobrien 37798944Sobrien status = serf_bucket_readline(head, acceptable, found, 37898944Sobrien data, len); 37998944Sobrien if (SERF_BUCKET_READ_ERROR(status)) 38098944Sobrien return status; 38198944Sobrien 38298944Sobrien if (status == APR_EOF) { 38398944Sobrien bucket_list_t *next_list; 38498944Sobrien 38598944Sobrien /* head bucket is empty, move to to-be-cleaned-up list. */ 38698944Sobrien next_list = ctx->list->next; 38798944Sobrien ctx->list->next = ctx->done; 38898944Sobrien ctx->done = ctx->list; 38998944Sobrien ctx->list = next_list; 39098944Sobrien 39198944Sobrien /* If we have no more in our list, return EOF. */ 39298944Sobrien if (!ctx->list) { 39398944Sobrien if (ctx->hold_open) { 39498944Sobrien return ctx->hold_open(ctx->hold_open_baton, bucket); 39598944Sobrien } 39698944Sobrien else { 39798944Sobrien return APR_EOF; 39898944Sobrien } 39998944Sobrien } 40098944Sobrien 40198944Sobrien /* we read something, so bail out and let the appl. read again. */ 40298944Sobrien if (*len) 40398944Sobrien status = APR_SUCCESS; 40498944Sobrien } 40598944Sobrien 40698944Sobrien /* continue with APR_SUCCESS or APR_EOF and no data read yet. */ 40798944Sobrien } while (!*len && status != APR_EAGAIN); 40898944Sobrien 40998944Sobrien return status; 41098944Sobrien} 41198944Sobrien 41298944Sobrienstatic apr_status_t serf_aggregate_peek(serf_bucket_t *bucket, 41398944Sobrien const char **data, 41498944Sobrien apr_size_t *len) 41598944Sobrien{ 41698944Sobrien aggregate_context_t *ctx = bucket->data; 41798944Sobrien serf_bucket_t *head; 41898944Sobrien apr_status_t status; 41998944Sobrien 42098944Sobrien cleanup_aggregate(ctx, bucket->allocator); 42198944Sobrien 42298944Sobrien /* Peek the first bucket in the list, if any. */ 42398944Sobrien if (!ctx->list) { 42498944Sobrien *len = 0; 42598944Sobrien if (ctx->hold_open) { 42698944Sobrien status = ctx->hold_open(ctx->hold_open_baton, bucket); 42798944Sobrien if (status == APR_EAGAIN) 42898944Sobrien status = APR_SUCCESS; 42998944Sobrien return status; 43098944Sobrien } 43198944Sobrien else { 43298944Sobrien return APR_EOF; 43398944Sobrien } 43498944Sobrien } 43598944Sobrien 43698944Sobrien head = ctx->list->bucket; 43798944Sobrien 43898944Sobrien status = serf_bucket_peek(head, data, len); 43998944Sobrien 44098944Sobrien if (status == APR_EOF) { 44198944Sobrien if (ctx->list->next) { 44298944Sobrien status = APR_SUCCESS; 44398944Sobrien } else { 44498944Sobrien if (ctx->hold_open) { 44598944Sobrien status = ctx->hold_open(ctx->hold_open_baton, bucket); 44698944Sobrien if (status == APR_EAGAIN) 44798944Sobrien status = APR_SUCCESS; 44898944Sobrien return status; 44998944Sobrien } 45098944Sobrien } 45198944Sobrien } 45298944Sobrien 45398944Sobrien return status; 45498944Sobrien} 45598944Sobrien 45698944Sobrienstatic serf_bucket_t * serf_aggregate_read_bucket( 45798944Sobrien serf_bucket_t *bucket, 45898944Sobrien const serf_bucket_type_t *type) 45998944Sobrien{ 46098944Sobrien aggregate_context_t *ctx = bucket->data; 46198944Sobrien serf_bucket_t *found_bucket; 46298944Sobrien 46398944Sobrien if (!ctx->list) { 46498944Sobrien return NULL; 46598944Sobrien } 46698944Sobrien 46798944Sobrien if (ctx->list->bucket->type == type) { 46898944Sobrien /* Got the bucket. Consume it from our list. */ 46998944Sobrien found_bucket = ctx->list->bucket; 47098944Sobrien ctx->list = ctx->list->next; 47198944Sobrien return found_bucket; 47298944Sobrien } 47398944Sobrien 47498944Sobrien /* Call read_bucket on first one in our list. */ 47598944Sobrien return serf_bucket_read_bucket(ctx->list->bucket, type); 47698944Sobrien} 47798944Sobrien 47898944Sobrien 47998944Sobrienconst serf_bucket_type_t serf_bucket_type_aggregate = { 48098944Sobrien "AGGREGATE", 48198944Sobrien serf_aggregate_read, 48298944Sobrien serf_aggregate_readline, 48398944Sobrien serf_aggregate_read_iovec, 48498944Sobrien serf_default_read_for_sendfile, 48598944Sobrien serf_aggregate_read_bucket, 48698944Sobrien serf_aggregate_peek, 48798944Sobrien serf_aggregate_destroy_and_data, 48898944Sobrien}; 48998944Sobrien