aggregate_buckets.c revision 251886
1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "serf.h"
17#include "serf_bucket_util.h"
18
19
20/* Should be an APR_RING? */
21typedef struct bucket_list {
22    serf_bucket_t *bucket;
23    struct bucket_list *next;
24} bucket_list_t;
25
26typedef struct {
27    bucket_list_t *list; /* active buckets */
28    bucket_list_t *last; /* last bucket of the list */
29    bucket_list_t *done; /* we finished reading this; now pending a destroy */
30
31    serf_bucket_aggregate_eof_t hold_open;
32    void *hold_open_baton;
33
34    /* Does this bucket own its children? !0 if yes, 0 if not. */
35    int bucket_owner;
36} aggregate_context_t;
37
38
39static void cleanup_aggregate(aggregate_context_t *ctx,
40                              serf_bucket_alloc_t *allocator)
41{
42    bucket_list_t *next_list;
43
44    /* If we finished reading a bucket during the previous read, then
45     * we can now toss that bucket.
46     */
47    while (ctx->done != NULL) {
48        next_list = ctx->done->next;
49
50        if (ctx->bucket_owner) {
51            serf_bucket_destroy(ctx->done->bucket);
52        }
53        serf_bucket_mem_free(allocator, ctx->done);
54
55        ctx->done = next_list;
56    }
57}
58
59void serf_bucket_aggregate_cleanup(
60    serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
61{
62    aggregate_context_t *ctx = bucket->data;
63
64    cleanup_aggregate(ctx, allocator);
65}
66
67static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
68{
69    aggregate_context_t *ctx;
70
71    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
72
73    ctx->list = NULL;
74    ctx->last = NULL;
75    ctx->done = NULL;
76    ctx->hold_open = NULL;
77    ctx->hold_open_baton = NULL;
78    ctx->bucket_owner = 1;
79
80    return ctx;
81}
82
83serf_bucket_t *serf_bucket_aggregate_create(
84    serf_bucket_alloc_t *allocator)
85{
86    aggregate_context_t *ctx;
87
88    ctx = create_aggregate(allocator);
89
90    return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
91}
92
93serf_bucket_t *serf__bucket_stream_create(
94    serf_bucket_alloc_t *allocator,
95    serf_bucket_aggregate_eof_t fn,
96    void *baton)
97{
98    serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
99    aggregate_context_t *ctx = bucket->data;
100
101    serf_bucket_aggregate_hold_open(bucket, fn, baton);
102
103    ctx->bucket_owner = 0;
104
105    return bucket;
106}
107
108
109static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
110{
111    aggregate_context_t *ctx = bucket->data;
112    bucket_list_t *next_ctx;
113
114    while (ctx->list) {
115        if (ctx->bucket_owner) {
116            serf_bucket_destroy(ctx->list->bucket);
117        }
118        next_ctx = ctx->list->next;
119        serf_bucket_mem_free(bucket->allocator, ctx->list);
120        ctx->list = next_ctx;
121    }
122    cleanup_aggregate(ctx, bucket->allocator);
123
124    serf_default_destroy_and_data(bucket);
125}
126
127void serf_bucket_aggregate_become(serf_bucket_t *bucket)
128{
129    aggregate_context_t *ctx;
130
131    ctx = create_aggregate(bucket->allocator);
132
133    bucket->type = &serf_bucket_type_aggregate;
134    bucket->data = ctx;
135
136    /* The allocator remains the same. */
137}
138
139
140void serf_bucket_aggregate_prepend(
141    serf_bucket_t *aggregate_bucket,
142    serf_bucket_t *prepend_bucket)
143{
144    aggregate_context_t *ctx = aggregate_bucket->data;
145    bucket_list_t *new_list;
146
147    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
148                                     sizeof(*new_list));
149    new_list->bucket = prepend_bucket;
150    new_list->next = ctx->list;
151
152    ctx->list = new_list;
153}
154
155void serf_bucket_aggregate_append(
156    serf_bucket_t *aggregate_bucket,
157    serf_bucket_t *append_bucket)
158{
159    aggregate_context_t *ctx = aggregate_bucket->data;
160    bucket_list_t *new_list;
161
162    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
163                                     sizeof(*new_list));
164    new_list->bucket = append_bucket;
165    new_list->next = NULL;
166
167    /* If we use APR_RING, this is trivial.  So, wait.
168    new_list->next = ctx->list;
169    ctx->list = new_list;
170    */
171    if (ctx->list == NULL) {
172        ctx->list = new_list;
173        ctx->last = new_list;
174    }
175    else {
176        ctx->last->next = new_list;
177        ctx->last = ctx->last->next;
178    }
179}
180
181void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
182                                     serf_bucket_aggregate_eof_t fn,
183                                     void *baton)
184{
185    aggregate_context_t *ctx = aggregate_bucket->data;
186    ctx->hold_open = fn;
187    ctx->hold_open_baton = baton;
188}
189
190void serf_bucket_aggregate_prepend_iovec(
191    serf_bucket_t *aggregate_bucket,
192    struct iovec *vecs,
193    int vecs_count)
194{
195    int i;
196
197    /* Add in reverse order. */
198    for (i = vecs_count - 1; i >= 0; i--) {
199        serf_bucket_t *new_bucket;
200
201        new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
202                                               vecs[i].iov_len,
203                                               NULL, NULL,
204                                               aggregate_bucket->allocator);
205
206        serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
207
208    }
209}
210
211void serf_bucket_aggregate_append_iovec(
212    serf_bucket_t *aggregate_bucket,
213    struct iovec *vecs,
214    int vecs_count)
215{
216    serf_bucket_t *new_bucket;
217
218    new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
219                                          aggregate_bucket->allocator);
220
221    serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
222}
223
224static apr_status_t read_aggregate(serf_bucket_t *bucket,
225                                   apr_size_t requested,
226                                   int vecs_size, struct iovec *vecs,
227                                   int *vecs_used)
228{
229    aggregate_context_t *ctx = bucket->data;
230    int cur_vecs_used;
231    apr_status_t status;
232
233    *vecs_used = 0;
234
235    if (!ctx->list) {
236        if (ctx->hold_open) {
237            return ctx->hold_open(ctx->hold_open_baton, bucket);
238        }
239        else {
240            return APR_EOF;
241        }
242    }
243
244    status = APR_SUCCESS;
245    while (requested) {
246        serf_bucket_t *head = ctx->list->bucket;
247
248        status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
249                                        &cur_vecs_used);
250
251        if (SERF_BUCKET_READ_ERROR(status))
252            return status;
253
254        /* Add the number of vecs we read to our running total. */
255        *vecs_used += cur_vecs_used;
256
257        if (cur_vecs_used > 0 || status) {
258            bucket_list_t *next_list;
259
260            /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
261             * as it isn't safe to read more without returning to our caller.
262             */
263            if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
264                return status;
265            }
266
267            /* However, if we read EOF, we can stash this bucket in a
268             * to-be-freed list and move on to the next bucket.  This ensures
269             * that the bucket stays alive (so as not to violate our read
270             * semantics).  We'll destroy this list of buckets the next time
271             * we are asked to perform a read operation - thus ensuring the
272             * proper read lifetime.
273             */
274            next_list = ctx->list->next;
275            ctx->list->next = ctx->done;
276            ctx->done = ctx->list;
277            ctx->list = next_list;
278
279            /* If we have no more in our list, return EOF. */
280            if (!ctx->list) {
281                if (ctx->hold_open) {
282                    return ctx->hold_open(ctx->hold_open_baton, bucket);
283                }
284                else {
285                    return APR_EOF;
286                }
287            }
288
289            /* At this point, it safe to read the next bucket - if we can. */
290
291            /* If the caller doesn't want ALL_AVAIL, decrement the size
292             * of the items we just read from the list.
293             */
294            if (requested != SERF_READ_ALL_AVAIL) {
295                int i;
296
297                for (i = 0; i < cur_vecs_used; i++)
298                    requested -= vecs[i].iov_len;
299            }
300
301            /* Adjust our vecs to account for what we just read. */
302            vecs_size -= cur_vecs_used;
303            vecs += cur_vecs_used;
304
305            /* We reached our max.  Oh well. */
306            if (!requested || !vecs_size) {
307                return APR_SUCCESS;
308            }
309        }
310    }
311
312    return status;
313}
314
315static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
316                                        apr_size_t requested,
317                                        const char **data, apr_size_t *len)
318{
319    aggregate_context_t *ctx = bucket->data;
320    struct iovec vec;
321    int vecs_used;
322    apr_status_t status;
323
324    cleanup_aggregate(ctx, bucket->allocator);
325
326    status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
327
328    if (!vecs_used) {
329        *len = 0;
330    }
331    else {
332        *data = vec.iov_base;
333        *len = vec.iov_len;
334    }
335
336    return status;
337}
338
339static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
340                                              apr_size_t requested,
341                                              int vecs_size,
342                                              struct iovec *vecs,
343                                              int *vecs_used)
344{
345    aggregate_context_t *ctx = bucket->data;
346
347    cleanup_aggregate(ctx, bucket->allocator);
348
349    return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
350}
351
352static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
353                                            int acceptable, int *found,
354                                            const char **data, apr_size_t *len)
355{
356    /* Follow pattern from serf_aggregate_read. */
357    return APR_ENOTIMPL;
358}
359
360static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
361                                        const char **data,
362                                        apr_size_t *len)
363{
364    /* Follow pattern from serf_aggregate_read. */
365    return APR_ENOTIMPL;
366}
367
368static serf_bucket_t * serf_aggregate_read_bucket(
369    serf_bucket_t *bucket,
370    const serf_bucket_type_t *type)
371{
372    aggregate_context_t *ctx = bucket->data;
373    serf_bucket_t *found_bucket;
374
375    if (!ctx->list) {
376        return NULL;
377    }
378
379    if (ctx->list->bucket->type == type) {
380        /* Got the bucket. Consume it from our list. */
381        found_bucket = ctx->list->bucket;
382        ctx->list = ctx->list->next;
383        return found_bucket;
384    }
385
386    /* Call read_bucket on first one in our list. */
387    return serf_bucket_read_bucket(ctx->list->bucket, type);
388}
389
390
391const serf_bucket_type_t serf_bucket_type_aggregate = {
392    "AGGREGATE",
393    serf_aggregate_read,
394    serf_aggregate_readline,
395    serf_aggregate_read_iovec,
396    serf_default_read_for_sendfile,
397    serf_aggregate_read_bucket,
398    serf_aggregate_peek,
399    serf_aggregate_destroy_and_data,
400};
401