aggregate_buckets.c revision 253895
198944Sobrien/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
298944Sobrien *
398944Sobrien * Licensed under the Apache License, Version 2.0 (the "License");
498944Sobrien * you may not use this file except in compliance with the License.
598944Sobrien * You may obtain a copy of the License at
698944Sobrien *
798944Sobrien *     http://www.apache.org/licenses/LICENSE-2.0
898944Sobrien *
998944Sobrien * Unless required by applicable law or agreed to in writing, software
1098944Sobrien * distributed under the License is distributed on an "AS IS" BASIS,
1198944Sobrien * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1298944Sobrien * See the License for the specific language governing permissions and
1398944Sobrien * limitations under the License.
1498944Sobrien */
1598944Sobrien
1698944Sobrien#include "serf.h"
1798944Sobrien#include "serf_bucket_util.h"
1898944Sobrien
1998944Sobrien
2098944Sobrien/* Should be an APR_RING? */
2198944Sobrientypedef struct bucket_list {
2298944Sobrien    serf_bucket_t *bucket;
2398944Sobrien    struct bucket_list *next;
2498944Sobrien} bucket_list_t;
2598944Sobrien
2698944Sobrientypedef struct {
2798944Sobrien    bucket_list_t *list; /* active buckets */
2898944Sobrien    bucket_list_t *last; /* last bucket of the list */
2998944Sobrien    bucket_list_t *done; /* we finished reading this; now pending a destroy */
3098944Sobrien
3198944Sobrien    serf_bucket_aggregate_eof_t hold_open;
3298944Sobrien    void *hold_open_baton;
3398944Sobrien
3498944Sobrien    /* Does this bucket own its children? !0 if yes, 0 if not. */
3598944Sobrien    int bucket_owner;
3698944Sobrien} aggregate_context_t;
3798944Sobrien
3898944Sobrien
3998944Sobrienstatic void cleanup_aggregate(aggregate_context_t *ctx,
4098944Sobrien                              serf_bucket_alloc_t *allocator)
4198944Sobrien{
4298944Sobrien    bucket_list_t *next_list;
4398944Sobrien
4498944Sobrien    /* If we finished reading a bucket during the previous read, then
4598944Sobrien     * we can now toss that bucket.
4698944Sobrien     */
4798944Sobrien    while (ctx->done != NULL) {
4898944Sobrien        next_list = ctx->done->next;
4998944Sobrien
5098944Sobrien        if (ctx->bucket_owner) {
5198944Sobrien            serf_bucket_destroy(ctx->done->bucket);
5298944Sobrien        }
5398944Sobrien        serf_bucket_mem_free(allocator, ctx->done);
5498944Sobrien
5598944Sobrien        ctx->done = next_list;
5698944Sobrien    }
5798944Sobrien}
5898944Sobrien
5998944Sobrienvoid serf_bucket_aggregate_cleanup(
6098944Sobrien    serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
6198944Sobrien{
6298944Sobrien    aggregate_context_t *ctx = bucket->data;
6398944Sobrien
6498944Sobrien    cleanup_aggregate(ctx, allocator);
6598944Sobrien}
6698944Sobrien
6798944Sobrienstatic aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
6898944Sobrien{
6998944Sobrien    aggregate_context_t *ctx;
7098944Sobrien
7198944Sobrien    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
7298944Sobrien
7398944Sobrien    ctx->list = NULL;
7498944Sobrien    ctx->last = NULL;
7598944Sobrien    ctx->done = NULL;
7698944Sobrien    ctx->hold_open = NULL;
7798944Sobrien    ctx->hold_open_baton = NULL;
7898944Sobrien    ctx->bucket_owner = 1;
7998944Sobrien
8098944Sobrien    return ctx;
8198944Sobrien}
8298944Sobrien
8398944Sobrienserf_bucket_t *serf_bucket_aggregate_create(
8498944Sobrien    serf_bucket_alloc_t *allocator)
8598944Sobrien{
8698944Sobrien    aggregate_context_t *ctx;
8798944Sobrien
8898944Sobrien    ctx = create_aggregate(allocator);
8998944Sobrien
9098944Sobrien    return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
9198944Sobrien}
9298944Sobrien
9398944Sobrienserf_bucket_t *serf__bucket_stream_create(
9498944Sobrien    serf_bucket_alloc_t *allocator,
9598944Sobrien    serf_bucket_aggregate_eof_t fn,
9698944Sobrien    void *baton)
9798944Sobrien{
9898944Sobrien    serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
9998944Sobrien    aggregate_context_t *ctx = bucket->data;
10098944Sobrien
10198944Sobrien    serf_bucket_aggregate_hold_open(bucket, fn, baton);
10298944Sobrien
10398944Sobrien    ctx->bucket_owner = 0;
10498944Sobrien
10598944Sobrien    return bucket;
10698944Sobrien}
10798944Sobrien
10898944Sobrien
10998944Sobrienstatic void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
11098944Sobrien{
11198944Sobrien    aggregate_context_t *ctx = bucket->data;
11298944Sobrien    bucket_list_t *next_ctx;
11398944Sobrien
11498944Sobrien    while (ctx->list) {
11598944Sobrien        if (ctx->bucket_owner) {
11698944Sobrien            serf_bucket_destroy(ctx->list->bucket);
11798944Sobrien        }
11898944Sobrien        next_ctx = ctx->list->next;
11998944Sobrien        serf_bucket_mem_free(bucket->allocator, ctx->list);
12098944Sobrien        ctx->list = next_ctx;
12198944Sobrien    }
12298944Sobrien    cleanup_aggregate(ctx, bucket->allocator);
12398944Sobrien
12498944Sobrien    serf_default_destroy_and_data(bucket);
12598944Sobrien}
12698944Sobrien
12798944Sobrienvoid serf_bucket_aggregate_become(serf_bucket_t *bucket)
12898944Sobrien{
12998944Sobrien    aggregate_context_t *ctx;
13098944Sobrien
13198944Sobrien    ctx = create_aggregate(bucket->allocator);
13298944Sobrien
13398944Sobrien    bucket->type = &serf_bucket_type_aggregate;
13498944Sobrien    bucket->data = ctx;
13598944Sobrien
13698944Sobrien    /* The allocator remains the same. */
13798944Sobrien}
13898944Sobrien
13998944Sobrien
14098944Sobrienvoid serf_bucket_aggregate_prepend(
14198944Sobrien    serf_bucket_t *aggregate_bucket,
14298944Sobrien    serf_bucket_t *prepend_bucket)
14398944Sobrien{
14498944Sobrien    aggregate_context_t *ctx = aggregate_bucket->data;
14598944Sobrien    bucket_list_t *new_list;
14698944Sobrien
14798944Sobrien    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
14898944Sobrien                                     sizeof(*new_list));
14998944Sobrien    new_list->bucket = prepend_bucket;
15098944Sobrien    new_list->next = ctx->list;
15198944Sobrien
15298944Sobrien    ctx->list = new_list;
15398944Sobrien}
15498944Sobrien
15598944Sobrienvoid serf_bucket_aggregate_append(
15698944Sobrien    serf_bucket_t *aggregate_bucket,
15798944Sobrien    serf_bucket_t *append_bucket)
15898944Sobrien{
15998944Sobrien    aggregate_context_t *ctx = aggregate_bucket->data;
16098944Sobrien    bucket_list_t *new_list;
16198944Sobrien
16298944Sobrien    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
16398944Sobrien                                     sizeof(*new_list));
16498944Sobrien    new_list->bucket = append_bucket;
16598944Sobrien    new_list->next = NULL;
16698944Sobrien
16798944Sobrien    /* If we use APR_RING, this is trivial.  So, wait.
16898944Sobrien    new_list->next = ctx->list;
16998944Sobrien    ctx->list = new_list;
17098944Sobrien    */
17198944Sobrien    if (ctx->list == NULL) {
17298944Sobrien        ctx->list = new_list;
17398944Sobrien        ctx->last = new_list;
17498944Sobrien    }
17598944Sobrien    else {
17698944Sobrien        ctx->last->next = new_list;
17798944Sobrien        ctx->last = ctx->last->next;
17898944Sobrien    }
17998944Sobrien}
18098944Sobrien
18198944Sobrienvoid serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
18298944Sobrien                                     serf_bucket_aggregate_eof_t fn,
18398944Sobrien                                     void *baton)
18498944Sobrien{
18598944Sobrien    aggregate_context_t *ctx = aggregate_bucket->data;
18698944Sobrien    ctx->hold_open = fn;
18798944Sobrien    ctx->hold_open_baton = baton;
18898944Sobrien}
18998944Sobrien
19098944Sobrienvoid serf_bucket_aggregate_prepend_iovec(
19198944Sobrien    serf_bucket_t *aggregate_bucket,
19298944Sobrien    struct iovec *vecs,
19398944Sobrien    int vecs_count)
19498944Sobrien{
19598944Sobrien    int i;
19698944Sobrien
19798944Sobrien    /* Add in reverse order. */
19898944Sobrien    for (i = vecs_count - 1; i >= 0; i--) {
19998944Sobrien        serf_bucket_t *new_bucket;
20098944Sobrien
20198944Sobrien        new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
20298944Sobrien                                               vecs[i].iov_len,
20398944Sobrien                                               NULL, NULL,
20498944Sobrien                                               aggregate_bucket->allocator);
20598944Sobrien
20698944Sobrien        serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
20798944Sobrien
20898944Sobrien    }
20998944Sobrien}
21098944Sobrien
21198944Sobrienvoid serf_bucket_aggregate_append_iovec(
21298944Sobrien    serf_bucket_t *aggregate_bucket,
21398944Sobrien    struct iovec *vecs,
21498944Sobrien    int vecs_count)
21598944Sobrien{
21698944Sobrien    serf_bucket_t *new_bucket;
21798944Sobrien
21898944Sobrien    new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
21998944Sobrien                                          aggregate_bucket->allocator);
22098944Sobrien
22198944Sobrien    serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
22298944Sobrien}
22398944Sobrien
22498944Sobrienstatic apr_status_t read_aggregate(serf_bucket_t *bucket,
22598944Sobrien                                   apr_size_t requested,
22698944Sobrien                                   int vecs_size, struct iovec *vecs,
22798944Sobrien                                   int *vecs_used)
22898944Sobrien{
22998944Sobrien    aggregate_context_t *ctx = bucket->data;
23098944Sobrien    int cur_vecs_used;
23198944Sobrien    apr_status_t status;
23298944Sobrien
23398944Sobrien    *vecs_used = 0;
23498944Sobrien
23598944Sobrien    if (!ctx->list) {
23698944Sobrien        if (ctx->hold_open) {
23798944Sobrien            return ctx->hold_open(ctx->hold_open_baton, bucket);
23898944Sobrien        }
23998944Sobrien        else {
24098944Sobrien            return APR_EOF;
24198944Sobrien        }
24298944Sobrien    }
24398944Sobrien
24498944Sobrien    status = APR_SUCCESS;
24598944Sobrien    while (requested) {
24698944Sobrien        serf_bucket_t *head = ctx->list->bucket;
24798944Sobrien
24898944Sobrien        status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
24998944Sobrien                                        &cur_vecs_used);
25098944Sobrien
25198944Sobrien        if (SERF_BUCKET_READ_ERROR(status))
25298944Sobrien            return status;
25398944Sobrien
25498944Sobrien        /* Add the number of vecs we read to our running total. */
25598944Sobrien        *vecs_used += cur_vecs_used;
25698944Sobrien
25798944Sobrien        if (cur_vecs_used > 0 || status) {
25898944Sobrien            bucket_list_t *next_list;
25998944Sobrien
26098944Sobrien            /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
26198944Sobrien             * as it isn't safe to read more without returning to our caller.
26298944Sobrien             */
26398944Sobrien            if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
26498944Sobrien                return status;
26598944Sobrien            }
26698944Sobrien
26798944Sobrien            /* However, if we read EOF, we can stash this bucket in a
26898944Sobrien             * to-be-freed list and move on to the next bucket.  This ensures
26998944Sobrien             * that the bucket stays alive (so as not to violate our read
27098944Sobrien             * semantics).  We'll destroy this list of buckets the next time
27198944Sobrien             * we are asked to perform a read operation - thus ensuring the
27298944Sobrien             * proper read lifetime.
27398944Sobrien             */
27498944Sobrien            next_list = ctx->list->next;
27598944Sobrien            ctx->list->next = ctx->done;
27698944Sobrien            ctx->done = ctx->list;
27798944Sobrien            ctx->list = next_list;
27898944Sobrien
27998944Sobrien            /* If we have no more in our list, return EOF. */
28098944Sobrien            if (!ctx->list) {
28198944Sobrien                if (ctx->hold_open) {
28298944Sobrien                    return ctx->hold_open(ctx->hold_open_baton, bucket);
28398944Sobrien                }
28498944Sobrien                else {
28598944Sobrien                    return APR_EOF;
28698944Sobrien                }
28798944Sobrien            }
28898944Sobrien
28998944Sobrien            /* At this point, it safe to read the next bucket - if we can. */
29098944Sobrien
29198944Sobrien            /* If the caller doesn't want ALL_AVAIL, decrement the size
29298944Sobrien             * of the items we just read from the list.
29398944Sobrien             */
29498944Sobrien            if (requested != SERF_READ_ALL_AVAIL) {
29598944Sobrien                int i;
29698944Sobrien
29798944Sobrien                for (i = 0; i < cur_vecs_used; i++)
29898944Sobrien                    requested -= vecs[i].iov_len;
29998944Sobrien            }
30098944Sobrien
30198944Sobrien            /* Adjust our vecs to account for what we just read. */
30298944Sobrien            vecs_size -= cur_vecs_used;
30398944Sobrien            vecs += cur_vecs_used;
30498944Sobrien
30598944Sobrien            /* We reached our max.  Oh well. */
30698944Sobrien            if (!requested || !vecs_size) {
30798944Sobrien                return APR_SUCCESS;
30898944Sobrien            }
30998944Sobrien        }
31098944Sobrien    }
31198944Sobrien
31298944Sobrien    return status;
31398944Sobrien}
31498944Sobrien
31598944Sobrienstatic apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
31698944Sobrien                                        apr_size_t requested,
31798944Sobrien                                        const char **data, apr_size_t *len)
31898944Sobrien{
31998944Sobrien    aggregate_context_t *ctx = bucket->data;
32098944Sobrien    struct iovec vec;
32198944Sobrien    int vecs_used;
32298944Sobrien    apr_status_t status;
32398944Sobrien
324130803Smarcel    cleanup_aggregate(ctx, bucket->allocator);
32598944Sobrien
32698944Sobrien    status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
32798944Sobrien
32898944Sobrien    if (!vecs_used) {
32998944Sobrien        *len = 0;
33098944Sobrien    }
33198944Sobrien    else {
33298944Sobrien        *data = vec.iov_base;
33398944Sobrien        *len = vec.iov_len;
33498944Sobrien    }
33598944Sobrien
33698944Sobrien    return status;
337130803Smarcel}
33898944Sobrien
33998944Sobrienstatic apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
340130803Smarcel                                              apr_size_t requested,
34198944Sobrien                                              int vecs_size,
34298944Sobrien                                              struct iovec *vecs,
34398944Sobrien                                              int *vecs_used)
34498944Sobrien{
34598944Sobrien    aggregate_context_t *ctx = bucket->data;
34698944Sobrien
34798944Sobrien    cleanup_aggregate(ctx, bucket->allocator);
34898944Sobrien
34998944Sobrien    return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
35098944Sobrien}
35198944Sobrien
35298944Sobrienstatic apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
35398944Sobrien                                            int acceptable, int *found,
35498944Sobrien                                            const char **data, apr_size_t *len)
35598944Sobrien{
35698944Sobrien    aggregate_context_t *ctx = bucket->data;
35798944Sobrien    apr_status_t status;
35898944Sobrien
35998944Sobrien    cleanup_aggregate(ctx, bucket->allocator);
36098944Sobrien
36198944Sobrien    do {
362130803Smarcel        serf_bucket_t *head;
363130803Smarcel
364130803Smarcel        *len = 0;
365130803Smarcel
366130803Smarcel        if (!ctx->list) {
367130803Smarcel            if (ctx->hold_open) {
368130803Smarcel                return ctx->hold_open(ctx->hold_open_baton, bucket);
369130803Smarcel            }
37098944Sobrien            else {
37198944Sobrien                return APR_EOF;
37298944Sobrien            }
37398944Sobrien        }
37498944Sobrien
37598944Sobrien        head = ctx->list->bucket;
37698944Sobrien
37798944Sobrien        status = serf_bucket_readline(head, acceptable, found,
37898944Sobrien                                      data, len);
37998944Sobrien        if (SERF_BUCKET_READ_ERROR(status))
38098944Sobrien            return status;
38198944Sobrien
38298944Sobrien        if (status == APR_EOF) {
38398944Sobrien            bucket_list_t *next_list;
38498944Sobrien
38598944Sobrien            /* head bucket is empty, move to to-be-cleaned-up list. */
38698944Sobrien            next_list = ctx->list->next;
38798944Sobrien            ctx->list->next = ctx->done;
38898944Sobrien            ctx->done = ctx->list;
38998944Sobrien            ctx->list = next_list;
39098944Sobrien
39198944Sobrien            /* If we have no more in our list, return EOF. */
39298944Sobrien            if (!ctx->list) {
39398944Sobrien                if (ctx->hold_open) {
39498944Sobrien                    return ctx->hold_open(ctx->hold_open_baton, bucket);
39598944Sobrien                }
39698944Sobrien                else {
39798944Sobrien                    return APR_EOF;
39898944Sobrien                }
39998944Sobrien            }
40098944Sobrien
40198944Sobrien            /* we read something, so bail out and let the appl. read again. */
40298944Sobrien            if (*len)
40398944Sobrien                status = APR_SUCCESS;
40498944Sobrien        }
40598944Sobrien
40698944Sobrien        /* continue with APR_SUCCESS or APR_EOF and no data read yet. */
40798944Sobrien    } while (!*len && status != APR_EAGAIN);
40898944Sobrien
40998944Sobrien    return status;
41098944Sobrien}
41198944Sobrien
41298944Sobrienstatic apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
41398944Sobrien                                        const char **data,
41498944Sobrien                                        apr_size_t *len)
41598944Sobrien{
41698944Sobrien    aggregate_context_t *ctx = bucket->data;
41798944Sobrien    serf_bucket_t *head;
41898944Sobrien    apr_status_t status;
41998944Sobrien
42098944Sobrien    cleanup_aggregate(ctx, bucket->allocator);
42198944Sobrien
42298944Sobrien    /* Peek the first bucket in the list, if any. */
42398944Sobrien    if (!ctx->list) {
42498944Sobrien        *len = 0;
42598944Sobrien        if (ctx->hold_open) {
42698944Sobrien            status = ctx->hold_open(ctx->hold_open_baton, bucket);
42798944Sobrien            if (status == APR_EAGAIN)
42898944Sobrien                status = APR_SUCCESS;
42998944Sobrien            return status;
43098944Sobrien        }
43198944Sobrien        else {
43298944Sobrien            return APR_EOF;
43398944Sobrien        }
43498944Sobrien    }
43598944Sobrien
43698944Sobrien    head = ctx->list->bucket;
43798944Sobrien
43898944Sobrien    status = serf_bucket_peek(head, data, len);
43998944Sobrien
44098944Sobrien    if (status == APR_EOF) {
44198944Sobrien        if (ctx->list->next) {
44298944Sobrien            status = APR_SUCCESS;
44398944Sobrien        } else {
44498944Sobrien            if (ctx->hold_open) {
44598944Sobrien                status = ctx->hold_open(ctx->hold_open_baton, bucket);
44698944Sobrien                if (status == APR_EAGAIN)
44798944Sobrien                    status = APR_SUCCESS;
44898944Sobrien                return status;
44998944Sobrien            }
45098944Sobrien        }
45198944Sobrien    }
45298944Sobrien
45398944Sobrien    return status;
45498944Sobrien}
45598944Sobrien
45698944Sobrienstatic serf_bucket_t * serf_aggregate_read_bucket(
45798944Sobrien    serf_bucket_t *bucket,
45898944Sobrien    const serf_bucket_type_t *type)
45998944Sobrien{
46098944Sobrien    aggregate_context_t *ctx = bucket->data;
46198944Sobrien    serf_bucket_t *found_bucket;
46298944Sobrien
46398944Sobrien    if (!ctx->list) {
46498944Sobrien        return NULL;
46598944Sobrien    }
46698944Sobrien
46798944Sobrien    if (ctx->list->bucket->type == type) {
46898944Sobrien        /* Got the bucket. Consume it from our list. */
46998944Sobrien        found_bucket = ctx->list->bucket;
47098944Sobrien        ctx->list = ctx->list->next;
47198944Sobrien        return found_bucket;
47298944Sobrien    }
47398944Sobrien
47498944Sobrien    /* Call read_bucket on first one in our list. */
47598944Sobrien    return serf_bucket_read_bucket(ctx->list->bucket, type);
47698944Sobrien}
47798944Sobrien
47898944Sobrien
47998944Sobrienconst serf_bucket_type_t serf_bucket_type_aggregate = {
48098944Sobrien    "AGGREGATE",
48198944Sobrien    serf_aggregate_read,
48298944Sobrien    serf_aggregate_readline,
48398944Sobrien    serf_aggregate_read_iovec,
48498944Sobrien    serf_default_read_for_sendfile,
48598944Sobrien    serf_aggregate_read_bucket,
48698944Sobrien    serf_aggregate_peek,
48798944Sobrien    serf_aggregate_destroy_and_data,
48898944Sobrien};
48998944Sobrien