aggregate_buckets.c revision 251877
1116743Ssam/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2127780Ssam *
3116743Ssam * Licensed under the Apache License, Version 2.0 (the "License");
4116743Ssam * you may not use this file except in compliance with the License.
5116743Ssam * You may obtain a copy of the License at
6116743Ssam *
7116743Ssam *     http://www.apache.org/licenses/LICENSE-2.0
8116743Ssam *
9116743Ssam * Unless required by applicable law or agreed to in writing, software
10116743Ssam * distributed under the License is distributed on an "AS IS" BASIS,
11116743Ssam * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12116743Ssam * See the License for the specific language governing permissions and
13116743Ssam * limitations under the License.
14116743Ssam */
15116743Ssam
16116743Ssam#include "serf.h"
17116743Ssam#include "serf_bucket_util.h"
18116743Ssam
19116743Ssam
20116743Ssam/* Should be an APR_RING? */
21116743Ssamtypedef struct bucket_list {
22116743Ssam    serf_bucket_t *bucket;
23116743Ssam    struct bucket_list *next;
24116743Ssam} bucket_list_t;
25116743Ssam
26116743Ssamtypedef struct {
27116743Ssam    bucket_list_t *list; /* active buckets */
28116743Ssam    bucket_list_t *last; /* last bucket of the list */
29116743Ssam    bucket_list_t *done; /* we finished reading this; now pending a destroy */
30116743Ssam
31116743Ssam    serf_bucket_aggregate_eof_t hold_open;
32116743Ssam    void *hold_open_baton;
33116743Ssam
34116743Ssam    /* Does this bucket own its children? !0 if yes, 0 if not. */
35116743Ssam    int bucket_owner;
36116743Ssam} aggregate_context_t;
37116743Ssam
38116743Ssam
39116743Ssamstatic void cleanup_aggregate(aggregate_context_t *ctx,
40116743Ssam                              serf_bucket_alloc_t *allocator)
41116743Ssam{
42116743Ssam    bucket_list_t *next_list;
43116743Ssam
44116743Ssam    /* If we finished reading a bucket during the previous read, then
45116743Ssam     * we can now toss that bucket.
46116743Ssam     */
47116743Ssam    while (ctx->done != NULL) {
48119783Ssam        next_list = ctx->done->next;
49116743Ssam
50138570Ssam        if (ctx->bucket_owner) {
51116743Ssam            serf_bucket_destroy(ctx->done->bucket);
52116743Ssam        }
53116743Ssam        serf_bucket_mem_free(allocator, ctx->done);
54116743Ssam
55116743Ssam        ctx->done = next_list;
56116743Ssam    }
57138570Ssam}
58138570Ssam
59116743Ssamvoid serf_bucket_aggregate_cleanup(
60138570Ssam    serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
61116743Ssam{
62119150Ssam    aggregate_context_t *ctx = bucket->data;
63138570Ssam
64138570Ssam    cleanup_aggregate(ctx, allocator);
65138570Ssam}
66138570Ssam
67138570Ssamstatic aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
68116743Ssam{
69138570Ssam    aggregate_context_t *ctx;
70138570Ssam
71116743Ssam    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
72138570Ssam
73138570Ssam    ctx->list = NULL;
74138570Ssam    ctx->last = NULL;
75138570Ssam    ctx->done = NULL;
76138570Ssam    ctx->hold_open = NULL;
77138570Ssam    ctx->hold_open_baton = NULL;
78138570Ssam    ctx->bucket_owner = 1;
79138570Ssam
80138570Ssam    return ctx;
81138570Ssam}
82138570Ssam
83116743Ssamserf_bucket_t *serf_bucket_aggregate_create(
84138570Ssam    serf_bucket_alloc_t *allocator)
85116743Ssam{
86116743Ssam    aggregate_context_t *ctx;
87116743Ssam
88138570Ssam    ctx = create_aggregate(allocator);
89116743Ssam
90116743Ssam    return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
91116743Ssam}
92116743Ssam
93116743Ssamserf_bucket_t *serf__bucket_stream_create(
94116743Ssam    serf_bucket_alloc_t *allocator,
95138570Ssam    serf_bucket_aggregate_eof_t fn,
96116743Ssam    void *baton)
97138570Ssam{
98138570Ssam    serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
99138570Ssam    aggregate_context_t *ctx = bucket->data;
100138570Ssam
101138570Ssam    serf_bucket_aggregate_hold_open(bucket, fn, baton);
102138570Ssam
103138570Ssam    ctx->bucket_owner = 0;
104138570Ssam
105138570Ssam    return bucket;
106138570Ssam}
107138570Ssam
108138570Ssam
109138570Ssamstatic void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
110138570Ssam{
111138570Ssam    aggregate_context_t *ctx = bucket->data;
112138570Ssam    bucket_list_t *next_ctx;
113138570Ssam
114138570Ssam    while (ctx->list) {
115138570Ssam        if (ctx->bucket_owner) {
116138570Ssam            serf_bucket_destroy(ctx->list->bucket);
117138570Ssam        }
118138570Ssam        next_ctx = ctx->list->next;
119138570Ssam        serf_bucket_mem_free(bucket->allocator, ctx->list);
120138570Ssam        ctx->list = next_ctx;
121138570Ssam    }
122138570Ssam    cleanup_aggregate(ctx, bucket->allocator);
123138570Ssam
124138570Ssam    serf_default_destroy_and_data(bucket);
125138570Ssam}
126138570Ssam
127138570Ssamvoid serf_bucket_aggregate_become(serf_bucket_t *bucket)
128138570Ssam{
129138570Ssam    aggregate_context_t *ctx;
130138570Ssam
131138570Ssam    ctx = create_aggregate(bucket->allocator);
132138570Ssam
133138570Ssam    bucket->type = &serf_bucket_type_aggregate;
134138570Ssam    bucket->data = ctx;
135138570Ssam
136138570Ssam    /* The allocator remains the same. */
137138570Ssam}
138138570Ssam
139138570Ssam
140138570Ssamvoid serf_bucket_aggregate_prepend(
141138570Ssam    serf_bucket_t *aggregate_bucket,
142138570Ssam    serf_bucket_t *prepend_bucket)
143138570Ssam{
144138570Ssam    aggregate_context_t *ctx = aggregate_bucket->data;
145138570Ssam    bucket_list_t *new_list;
146116743Ssam
147138570Ssam    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
148138570Ssam                                     sizeof(*new_list));
149116743Ssam    new_list->bucket = prepend_bucket;
150138570Ssam    new_list->next = ctx->list;
151138570Ssam
152138570Ssam    ctx->list = new_list;
153138570Ssam}
154138570Ssam
155138570Ssamvoid serf_bucket_aggregate_append(
156138570Ssam    serf_bucket_t *aggregate_bucket,
157117812Ssam    serf_bucket_t *append_bucket)
158117812Ssam{
159138570Ssam    aggregate_context_t *ctx = aggregate_bucket->data;
160116743Ssam    bucket_list_t *new_list;
161116743Ssam
162116743Ssam    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
163116743Ssam                                     sizeof(*new_list));
164116743Ssam    new_list->bucket = append_bucket;
165116743Ssam    new_list->next = NULL;
166138570Ssam
167138570Ssam    /* If we use APR_RING, this is trivial.  So, wait.
168116743Ssam    new_list->next = ctx->list;
169138570Ssam    ctx->list = new_list;
170138570Ssam    */
171138570Ssam    if (ctx->list == NULL) {
172138570Ssam        ctx->list = new_list;
173138570Ssam        ctx->last = new_list;
174138570Ssam    }
175138570Ssam    else {
176138570Ssam        ctx->last->next = new_list;
177116743Ssam        ctx->last = ctx->last->next;
178116743Ssam    }
179116743Ssam}
180116743Ssam
181138570Ssamvoid serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
182138570Ssam                                     serf_bucket_aggregate_eof_t fn,
183116743Ssam                                     void *baton)
184119144Ssam{
185139500Ssam    aggregate_context_t *ctx = aggregate_bucket->data;
186138570Ssam    ctx->hold_open = fn;
187138570Ssam    ctx->hold_open_baton = baton;
188116743Ssam}
189138570Ssam
190138570Ssamvoid serf_bucket_aggregate_prepend_iovec(
191116743Ssam    serf_bucket_t *aggregate_bucket,
192138570Ssam    struct iovec *vecs,
193138570Ssam    int vecs_count)
194138570Ssam{
195138570Ssam    int i;
196119783Ssam
197119783Ssam    /* Add in reverse order. */
198119783Ssam    for (i = vecs_count - 1; i >= 0; i--) {
199119783Ssam        serf_bucket_t *new_bucket;
200119783Ssam
201127698Ssam        new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
202119783Ssam                                               vecs[i].iov_len,
203139500Ssam                                               NULL, NULL,
204139500Ssam                                               aggregate_bucket->allocator);
205139500Ssam
206139500Ssam        serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
207119783Ssam
208119783Ssam    }
209139500Ssam}
210119783Ssam
211116743Ssamvoid serf_bucket_aggregate_append_iovec(
212116743Ssam    serf_bucket_t *aggregate_bucket,
213138570Ssam    struct iovec *vecs,
214138570Ssam    int vecs_count)
215116743Ssam{
216116743Ssam    serf_bucket_t *new_bucket;
217138570Ssam
218138570Ssam    new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
219138570Ssam                                          aggregate_bucket->allocator);
220116743Ssam
221138570Ssam    serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
222138570Ssam}
223138570Ssam
224116743Ssamstatic apr_status_t read_aggregate(serf_bucket_t *bucket,
225138570Ssam                                   apr_size_t requested,
226138570Ssam                                   int vecs_size, struct iovec *vecs,
227138570Ssam                                   int *vecs_used)
228138570Ssam{
229116743Ssam    aggregate_context_t *ctx = bucket->data;
230116743Ssam    int cur_vecs_used;
231138570Ssam    apr_status_t status;
232138570Ssam
233116743Ssam    *vecs_used = 0;
234138570Ssam
235138570Ssam    if (!ctx->list) {
236138570Ssam        if (ctx->hold_open) {
237138570Ssam            return ctx->hold_open(ctx->hold_open_baton, bucket);
238116743Ssam        }
239138570Ssam        else {
240138570Ssam            return APR_EOF;
241138570Ssam        }
242138570Ssam    }
243138570Ssam
244138570Ssam    status = APR_SUCCESS;
245116743Ssam    while (requested) {
246116743Ssam        serf_bucket_t *head = ctx->list->bucket;
247116743Ssam
248116743Ssam        status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
249138570Ssam                                        &cur_vecs_used);
250119783Ssam
251139500Ssam        if (SERF_BUCKET_READ_ERROR(status))
252139500Ssam            return status;
253139500Ssam
254116743Ssam        /* Add the number of vecs we read to our running total. */
255121100Ssam        *vecs_used += cur_vecs_used;
256121100Ssam
257121100Ssam        if (cur_vecs_used > 0 || status) {
258121100Ssam            bucket_list_t *next_list;
259121100Ssam
260121100Ssam            /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
261121100Ssam             * as it isn't safe to read more without returning to our caller.
262121100Ssam             */
263138570Ssam            if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
264138570Ssam                return status;
265121100Ssam            }
266121100Ssam
267121100Ssam            /* However, if we read EOF, we can stash this bucket in a
268121100Ssam             * to-be-freed list and move on to the next bucket.  This ensures
269121100Ssam             * that the bucket stays alive (so as not to violate our read
270121100Ssam             * semantics).  We'll destroy this list of buckets the next time
271121100Ssam             * we are asked to perform a read operation - thus ensuring the
272121100Ssam             * proper read lifetime.
273121100Ssam             */
274116743Ssam            next_list = ctx->list->next;
275116743Ssam            ctx->list->next = ctx->done;
276116743Ssam            ctx->done = ctx->list;
277116743Ssam            ctx->list = next_list;
278116743Ssam
279116743Ssam            /* If we have no more in our list, return EOF. */
280116743Ssam            if (!ctx->list) {
281116743Ssam                if (ctx->hold_open) {
282116743Ssam                    return ctx->hold_open(ctx->hold_open_baton, bucket);
283116743Ssam                }
284138570Ssam                else {
285138570Ssam                    return APR_EOF;
286116743Ssam                }
287116743Ssam            }
288116743Ssam
289116743Ssam            /* At this point, it safe to read the next bucket - if we can. */
290116743Ssam
291116743Ssam            /* If the caller doesn't want ALL_AVAIL, decrement the size
292138570Ssam             * of the items we just read from the list.
293138570Ssam             */
294116743Ssam            if (requested != SERF_READ_ALL_AVAIL) {
295116743Ssam                int i;
296116743Ssam
297116743Ssam                for (i = 0; i < cur_vecs_used; i++)
298116743Ssam                    requested -= vecs[i].iov_len;
299116743Ssam            }
300116743Ssam
301116743Ssam            /* Adjust our vecs to account for what we just read. */
302116743Ssam            vecs_size -= cur_vecs_used;
303116743Ssam            vecs += cur_vecs_used;
304116743Ssam
305116743Ssam            /* We reached our max.  Oh well. */
306138570Ssam            if (!requested || !vecs_size) {
307138570Ssam                return APR_SUCCESS;
308116743Ssam            }
309116743Ssam        }
310138570Ssam    }
311138570Ssam
312116743Ssam    return status;
313116743Ssam}
314116743Ssam
315116743Ssamstatic apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
316116743Ssam                                        apr_size_t requested,
317116743Ssam                                        const char **data, apr_size_t *len)
318116743Ssam{
319116743Ssam    aggregate_context_t *ctx = bucket->data;
320116743Ssam    struct iovec vec;
321116743Ssam    int vecs_used;
322116743Ssam    apr_status_t status;
323116743Ssam
324116743Ssam    cleanup_aggregate(ctx, bucket->allocator);
325116743Ssam
326116743Ssam    status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
327116743Ssam
328116743Ssam    if (!vecs_used) {
329116743Ssam        *len = 0;
330116743Ssam    }
331116743Ssam    else {
332116743Ssam        *data = vec.iov_base;
333116743Ssam        *len = vec.iov_len;
334116743Ssam    }
335116743Ssam
336116743Ssam    return status;
337116743Ssam}
338138570Ssam
339138570Ssamstatic apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
340116743Ssam                                              apr_size_t requested,
341116743Ssam                                              int vecs_size,
342116743Ssam                                              struct iovec *vecs,
343116743Ssam                                              int *vecs_used)
344116743Ssam{
345116743Ssam    aggregate_context_t *ctx = bucket->data;
346116743Ssam
347116743Ssam    cleanup_aggregate(ctx, bucket->allocator);
348116743Ssam
349116743Ssam    return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
350138570Ssam}
351138570Ssam
352116743Ssamstatic apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
353116743Ssam                                            int acceptable, int *found,
354138570Ssam                                            const char **data, apr_size_t *len)
355138570Ssam{
356116743Ssam    /* Follow pattern from serf_aggregate_read. */
357138570Ssam    return APR_ENOTIMPL;
358138570Ssam}
359138570Ssam
360138570Ssamstatic apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
361138570Ssam                                        const char **data,
362116743Ssam                                        apr_size_t *len)
363116743Ssam{
364116743Ssam    /* Follow pattern from serf_aggregate_read. */
365116743Ssam    return APR_ENOTIMPL;
366116743Ssam}
367116743Ssam
368116743Ssamstatic serf_bucket_t * serf_aggregate_read_bucket(
369116743Ssam    serf_bucket_t *bucket,
370138570Ssam    const serf_bucket_type_t *type)
371138570Ssam{
372138570Ssam    aggregate_context_t *ctx = bucket->data;
373116743Ssam    serf_bucket_t *found_bucket;
374116743Ssam
375116743Ssam    if (!ctx->list) {
376116743Ssam        return NULL;
377116743Ssam    }
378116743Ssam
379138570Ssam    if (ctx->list->bucket->type == type) {
380138570Ssam        /* Got the bucket. Consume it from our list. */
381138570Ssam        found_bucket = ctx->list->bucket;
382138570Ssam        ctx->list = ctx->list->next;
383116743Ssam        return found_bucket;
384116743Ssam    }
385138570Ssam
386138570Ssam    /* Call read_bucket on first one in our list. */
387138570Ssam    return serf_bucket_read_bucket(ctx->list->bucket, type);
388138570Ssam}
389138570Ssam
390138570Ssam
391138570Ssamconst serf_bucket_type_t serf_bucket_type_aggregate = {
392138570Ssam    "AGGREGATE",
393138570Ssam    serf_aggregate_read,
394138570Ssam    serf_aggregate_readline,
395138570Ssam    serf_aggregate_read_iovec,
396138570Ssam    serf_default_read_for_sendfile,
397138570Ssam    serf_aggregate_read_bucket,
398138570Ssam    serf_aggregate_peek,
399138570Ssam    serf_aggregate_destroy_and_data,
400138570Ssam};
401138570Ssam