aggregate_buckets.c revision 302408
1/* Copyright 2002-2004 Justin Erenkrantz and Greg Stein
2 *
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 *     http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16#include "serf.h"
17#include "serf_bucket_util.h"
18
19
20/* Should be an APR_RING? */
21typedef struct bucket_list {
22    serf_bucket_t *bucket;
23    struct bucket_list *next;
24} bucket_list_t;
25
26typedef struct {
27    bucket_list_t *list; /* active buckets */
28    bucket_list_t *last; /* last bucket of the list */
29    bucket_list_t *done; /* we finished reading this; now pending a destroy */
30
31    serf_bucket_aggregate_eof_t hold_open;
32    void *hold_open_baton;
33
34    /* Does this bucket own its children? !0 if yes, 0 if not. */
35    int bucket_owner;
36} aggregate_context_t;
37
38
39static void cleanup_aggregate(aggregate_context_t *ctx,
40                              serf_bucket_alloc_t *allocator)
41{
42    bucket_list_t *next_list;
43
44    /* If we finished reading a bucket during the previous read, then
45     * we can now toss that bucket.
46     */
47    while (ctx->done != NULL) {
48        next_list = ctx->done->next;
49
50        if (ctx->bucket_owner) {
51            serf_bucket_destroy(ctx->done->bucket);
52        }
53        serf_bucket_mem_free(allocator, ctx->done);
54
55        ctx->done = next_list;
56    }
57}
58
59void serf_bucket_aggregate_cleanup(
60    serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
61{
62    aggregate_context_t *ctx = bucket->data;
63
64    cleanup_aggregate(ctx, allocator);
65}
66
67static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
68{
69    aggregate_context_t *ctx;
70
71    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
72
73    ctx->list = NULL;
74    ctx->last = NULL;
75    ctx->done = NULL;
76    ctx->hold_open = NULL;
77    ctx->hold_open_baton = NULL;
78    ctx->bucket_owner = 1;
79
80    return ctx;
81}
82
83serf_bucket_t *serf_bucket_aggregate_create(
84    serf_bucket_alloc_t *allocator)
85{
86    aggregate_context_t *ctx;
87
88    ctx = create_aggregate(allocator);
89
90    return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
91}
92
93serf_bucket_t *serf__bucket_stream_create(
94    serf_bucket_alloc_t *allocator,
95    serf_bucket_aggregate_eof_t fn,
96    void *baton)
97{
98    serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
99    aggregate_context_t *ctx = bucket->data;
100
101    serf_bucket_aggregate_hold_open(bucket, fn, baton);
102
103    ctx->bucket_owner = 0;
104
105    return bucket;
106}
107
108
109static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
110{
111    aggregate_context_t *ctx = bucket->data;
112    bucket_list_t *next_ctx;
113
114    while (ctx->list) {
115        if (ctx->bucket_owner) {
116            serf_bucket_destroy(ctx->list->bucket);
117        }
118        next_ctx = ctx->list->next;
119        serf_bucket_mem_free(bucket->allocator, ctx->list);
120        ctx->list = next_ctx;
121    }
122    cleanup_aggregate(ctx, bucket->allocator);
123
124    serf_default_destroy_and_data(bucket);
125}
126
127void serf_bucket_aggregate_become(serf_bucket_t *bucket)
128{
129    aggregate_context_t *ctx;
130
131    ctx = create_aggregate(bucket->allocator);
132
133    bucket->type = &serf_bucket_type_aggregate;
134    bucket->data = ctx;
135
136    /* The allocator remains the same. */
137}
138
139
140void serf_bucket_aggregate_prepend(
141    serf_bucket_t *aggregate_bucket,
142    serf_bucket_t *prepend_bucket)
143{
144    aggregate_context_t *ctx = aggregate_bucket->data;
145    bucket_list_t *new_list;
146
147    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
148                                     sizeof(*new_list));
149    new_list->bucket = prepend_bucket;
150    new_list->next = ctx->list;
151
152    ctx->list = new_list;
153}
154
155void serf_bucket_aggregate_append(
156    serf_bucket_t *aggregate_bucket,
157    serf_bucket_t *append_bucket)
158{
159    aggregate_context_t *ctx = aggregate_bucket->data;
160    bucket_list_t *new_list;
161
162    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
163                                     sizeof(*new_list));
164    new_list->bucket = append_bucket;
165    new_list->next = NULL;
166
167    /* If we use APR_RING, this is trivial.  So, wait.
168    new_list->next = ctx->list;
169    ctx->list = new_list;
170    */
171    if (ctx->list == NULL) {
172        ctx->list = new_list;
173        ctx->last = new_list;
174    }
175    else {
176        ctx->last->next = new_list;
177        ctx->last = ctx->last->next;
178    }
179}
180
181void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
182                                     serf_bucket_aggregate_eof_t fn,
183                                     void *baton)
184{
185    aggregate_context_t *ctx = aggregate_bucket->data;
186    ctx->hold_open = fn;
187    ctx->hold_open_baton = baton;
188}
189
190void serf_bucket_aggregate_prepend_iovec(
191    serf_bucket_t *aggregate_bucket,
192    struct iovec *vecs,
193    int vecs_count)
194{
195    int i;
196
197    /* Add in reverse order. */
198    for (i = vecs_count - 1; i >= 0; i--) {
199        serf_bucket_t *new_bucket;
200
201        new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
202                                               vecs[i].iov_len,
203                                               NULL, NULL,
204                                               aggregate_bucket->allocator);
205
206        serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
207
208    }
209}
210
211void serf_bucket_aggregate_append_iovec(
212    serf_bucket_t *aggregate_bucket,
213    struct iovec *vecs,
214    int vecs_count)
215{
216    serf_bucket_t *new_bucket;
217
218    new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
219                                          aggregate_bucket->allocator);
220
221    serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
222}
223
224static apr_status_t read_aggregate(serf_bucket_t *bucket,
225                                   apr_size_t requested,
226                                   int vecs_size, struct iovec *vecs,
227                                   int *vecs_used)
228{
229    aggregate_context_t *ctx = bucket->data;
230    int cur_vecs_used;
231    apr_status_t status;
232
233    *vecs_used = 0;
234
235    if (!ctx->list) {
236        if (ctx->hold_open) {
237            return ctx->hold_open(ctx->hold_open_baton, bucket);
238        }
239        else {
240            return APR_EOF;
241        }
242    }
243
244    status = APR_SUCCESS;
245    while (requested) {
246        serf_bucket_t *head = ctx->list->bucket;
247
248        status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
249                                        &cur_vecs_used);
250
251        if (SERF_BUCKET_READ_ERROR(status))
252            return status;
253
254        /* Add the number of vecs we read to our running total. */
255        *vecs_used += cur_vecs_used;
256
257        if (cur_vecs_used > 0 || status) {
258            bucket_list_t *next_list;
259
260            /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
261             * as it isn't safe to read more without returning to our caller.
262             */
263            if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
264                return status;
265            }
266
267            /* However, if we read EOF, we can stash this bucket in a
268             * to-be-freed list and move on to the next bucket.  This ensures
269             * that the bucket stays alive (so as not to violate our read
270             * semantics).  We'll destroy this list of buckets the next time
271             * we are asked to perform a read operation - thus ensuring the
272             * proper read lifetime.
273             */
274            next_list = ctx->list->next;
275            ctx->list->next = ctx->done;
276            ctx->done = ctx->list;
277            ctx->list = next_list;
278
279            /* If we have no more in our list, return EOF. */
280            if (!ctx->list) {
281                if (ctx->hold_open) {
282                    return ctx->hold_open(ctx->hold_open_baton, bucket);
283                }
284                else {
285                    return APR_EOF;
286                }
287            }
288
289            /* At this point, it safe to read the next bucket - if we can. */
290
291            /* If the caller doesn't want ALL_AVAIL, decrement the size
292             * of the items we just read from the list.
293             */
294            if (requested != SERF_READ_ALL_AVAIL) {
295                int i;
296
297                for (i = 0; i < cur_vecs_used; i++)
298                    requested -= vecs[i].iov_len;
299            }
300
301            /* Adjust our vecs to account for what we just read. */
302            vecs_size -= cur_vecs_used;
303            vecs += cur_vecs_used;
304
305            /* We reached our max.  Oh well. */
306            if (!requested || !vecs_size) {
307                return APR_SUCCESS;
308            }
309        }
310    }
311
312    return status;
313}
314
315static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
316                                        apr_size_t requested,
317                                        const char **data, apr_size_t *len)
318{
319    aggregate_context_t *ctx = bucket->data;
320    struct iovec vec;
321    int vecs_used;
322    apr_status_t status;
323
324    cleanup_aggregate(ctx, bucket->allocator);
325
326    status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
327
328    if (!vecs_used) {
329        *len = 0;
330    }
331    else {
332        *data = vec.iov_base;
333        *len = vec.iov_len;
334    }
335
336    return status;
337}
338
339static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
340                                              apr_size_t requested,
341                                              int vecs_size,
342                                              struct iovec *vecs,
343                                              int *vecs_used)
344{
345    aggregate_context_t *ctx = bucket->data;
346
347    cleanup_aggregate(ctx, bucket->allocator);
348
349    return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
350}
351
352static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
353                                            int acceptable, int *found,
354                                            const char **data, apr_size_t *len)
355{
356    aggregate_context_t *ctx = bucket->data;
357    apr_status_t status;
358
359    cleanup_aggregate(ctx, bucket->allocator);
360
361    do {
362        serf_bucket_t *head;
363
364        *len = 0;
365
366        if (!ctx->list) {
367            if (ctx->hold_open) {
368                return ctx->hold_open(ctx->hold_open_baton, bucket);
369            }
370            else {
371                return APR_EOF;
372            }
373        }
374
375        head = ctx->list->bucket;
376
377        status = serf_bucket_readline(head, acceptable, found,
378                                      data, len);
379        if (SERF_BUCKET_READ_ERROR(status))
380            return status;
381
382        if (status == APR_EOF) {
383            bucket_list_t *next_list;
384
385            /* head bucket is empty, move to to-be-cleaned-up list. */
386            next_list = ctx->list->next;
387            ctx->list->next = ctx->done;
388            ctx->done = ctx->list;
389            ctx->list = next_list;
390
391            /* If we have no more in our list, return EOF. */
392            if (!ctx->list) {
393                if (ctx->hold_open) {
394                    return ctx->hold_open(ctx->hold_open_baton, bucket);
395                }
396                else {
397                    return APR_EOF;
398                }
399            }
400
401            /* we read something, so bail out and let the appl. read again. */
402            if (*len)
403                status = APR_SUCCESS;
404        }
405
406        /* continue with APR_SUCCESS or APR_EOF and no data read yet. */
407    } while (!*len && status != APR_EAGAIN);
408
409    return status;
410}
411
412static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
413                                        const char **data,
414                                        apr_size_t *len)
415{
416    aggregate_context_t *ctx = bucket->data;
417    serf_bucket_t *head;
418    apr_status_t status;
419
420    cleanup_aggregate(ctx, bucket->allocator);
421
422    /* Peek the first bucket in the list, if any. */
423    if (!ctx->list) {
424        *len = 0;
425        if (ctx->hold_open) {
426            status = ctx->hold_open(ctx->hold_open_baton, bucket);
427            if (status == APR_EAGAIN)
428                status = APR_SUCCESS;
429            return status;
430        }
431        else {
432            return APR_EOF;
433        }
434    }
435
436    head = ctx->list->bucket;
437
438    status = serf_bucket_peek(head, data, len);
439
440    if (status == APR_EOF) {
441        if (ctx->list->next) {
442            status = APR_SUCCESS;
443        } else {
444            if (ctx->hold_open) {
445                status = ctx->hold_open(ctx->hold_open_baton, bucket);
446                if (status == APR_EAGAIN)
447                    status = APR_SUCCESS;
448                return status;
449            }
450        }
451    }
452
453    return status;
454}
455
456static serf_bucket_t * serf_aggregate_read_bucket(
457    serf_bucket_t *bucket,
458    const serf_bucket_type_t *type)
459{
460    aggregate_context_t *ctx = bucket->data;
461    serf_bucket_t *found_bucket;
462
463    if (!ctx->list) {
464        return NULL;
465    }
466
467    if (ctx->list->bucket->type == type) {
468        /* Got the bucket. Consume it from our list. */
469        found_bucket = ctx->list->bucket;
470        ctx->list = ctx->list->next;
471        return found_bucket;
472    }
473
474    /* Call read_bucket on first one in our list. */
475    return serf_bucket_read_bucket(ctx->list->bucket, type);
476}
477
478
479const serf_bucket_type_t serf_bucket_type_aggregate = {
480    "AGGREGATE",
481    serf_aggregate_read,
482    serf_aggregate_readline,
483    serf_aggregate_read_iovec,
484    serf_default_read_for_sendfile,
485    serf_aggregate_read_bucket,
486    serf_aggregate_peek,
487    serf_aggregate_destroy_and_data,
488};
489