1/* ====================================================================
2 *    Licensed to the Apache Software Foundation (ASF) under one
3 *    or more contributor license agreements.  See the NOTICE file
4 *    distributed with this work for additional information
5 *    regarding copyright ownership.  The ASF licenses this file
6 *    to you under the Apache License, Version 2.0 (the
7 *    "License"); you may not use this file except in compliance
8 *    with the License.  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *    Unless required by applicable law or agreed to in writing,
13 *    software distributed under the License is distributed on an
14 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 *    KIND, either express or implied.  See the License for the
16 *    specific language governing permissions and limitations
17 *    under the License.
18 * ====================================================================
19 */
20
21#include "serf.h"
22#include "serf_bucket_util.h"
23
24
25/* Should be an APR_RING? */
26typedef struct bucket_list {
27    serf_bucket_t *bucket;
28    struct bucket_list *next;
29} bucket_list_t;
30
31typedef struct {
32    bucket_list_t *list; /* active buckets */
33    bucket_list_t *last; /* last bucket of the list */
34    bucket_list_t *done; /* we finished reading this; now pending a destroy */
35
36    serf_bucket_aggregate_eof_t hold_open;
37    void *hold_open_baton;
38
39    /* Does this bucket own its children? !0 if yes, 0 if not. */
40    int bucket_owner;
41} aggregate_context_t;
42
43
44static void cleanup_aggregate(aggregate_context_t *ctx,
45                              serf_bucket_alloc_t *allocator)
46{
47    bucket_list_t *next_list;
48
49    /* If we finished reading a bucket during the previous read, then
50     * we can now toss that bucket.
51     */
52    while (ctx->done != NULL) {
53        next_list = ctx->done->next;
54
55        if (ctx->bucket_owner) {
56            serf_bucket_destroy(ctx->done->bucket);
57        }
58        serf_bucket_mem_free(allocator, ctx->done);
59
60        ctx->done = next_list;
61    }
62}
63
64void serf_bucket_aggregate_cleanup(
65    serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
66{
67    aggregate_context_t *ctx = bucket->data;
68
69    cleanup_aggregate(ctx, allocator);
70}
71
72static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
73{
74    aggregate_context_t *ctx;
75
76    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
77
78    ctx->list = NULL;
79    ctx->last = NULL;
80    ctx->done = NULL;
81    ctx->hold_open = NULL;
82    ctx->hold_open_baton = NULL;
83    ctx->bucket_owner = 1;
84
85    return ctx;
86}
87
88serf_bucket_t *serf_bucket_aggregate_create(
89    serf_bucket_alloc_t *allocator)
90{
91    aggregate_context_t *ctx;
92
93    ctx = create_aggregate(allocator);
94
95    return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
96}
97
98serf_bucket_t *serf__bucket_stream_create(
99    serf_bucket_alloc_t *allocator,
100    serf_bucket_aggregate_eof_t fn,
101    void *baton)
102{
103    serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
104    aggregate_context_t *ctx = bucket->data;
105
106    serf_bucket_aggregate_hold_open(bucket, fn, baton);
107
108    ctx->bucket_owner = 0;
109
110    return bucket;
111}
112
113
114static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
115{
116    aggregate_context_t *ctx = bucket->data;
117    bucket_list_t *next_ctx;
118
119    while (ctx->list) {
120        if (ctx->bucket_owner) {
121            serf_bucket_destroy(ctx->list->bucket);
122        }
123        next_ctx = ctx->list->next;
124        serf_bucket_mem_free(bucket->allocator, ctx->list);
125        ctx->list = next_ctx;
126    }
127    cleanup_aggregate(ctx, bucket->allocator);
128
129    serf_default_destroy_and_data(bucket);
130}
131
132void serf_bucket_aggregate_become(serf_bucket_t *bucket)
133{
134    aggregate_context_t *ctx;
135
136    ctx = create_aggregate(bucket->allocator);
137
138    bucket->type = &serf_bucket_type_aggregate;
139    bucket->data = ctx;
140
141    /* The allocator remains the same. */
142}
143
144
145void serf_bucket_aggregate_prepend(
146    serf_bucket_t *aggregate_bucket,
147    serf_bucket_t *prepend_bucket)
148{
149    aggregate_context_t *ctx = aggregate_bucket->data;
150    bucket_list_t *new_list;
151
152    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
153                                     sizeof(*new_list));
154    new_list->bucket = prepend_bucket;
155    new_list->next = ctx->list;
156
157    ctx->list = new_list;
158}
159
160void serf_bucket_aggregate_append(
161    serf_bucket_t *aggregate_bucket,
162    serf_bucket_t *append_bucket)
163{
164    aggregate_context_t *ctx = aggregate_bucket->data;
165    bucket_list_t *new_list;
166
167    new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
168                                     sizeof(*new_list));
169    new_list->bucket = append_bucket;
170    new_list->next = NULL;
171
172    /* If we use APR_RING, this is trivial.  So, wait.
173    new_list->next = ctx->list;
174    ctx->list = new_list;
175    */
176    if (ctx->list == NULL) {
177        ctx->list = new_list;
178        ctx->last = new_list;
179    }
180    else {
181        ctx->last->next = new_list;
182        ctx->last = ctx->last->next;
183    }
184}
185
186void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
187                                     serf_bucket_aggregate_eof_t fn,
188                                     void *baton)
189{
190    aggregate_context_t *ctx = aggregate_bucket->data;
191    ctx->hold_open = fn;
192    ctx->hold_open_baton = baton;
193}
194
195void serf_bucket_aggregate_prepend_iovec(
196    serf_bucket_t *aggregate_bucket,
197    struct iovec *vecs,
198    int vecs_count)
199{
200    int i;
201
202    /* Add in reverse order. */
203    for (i = vecs_count - 1; i >= 0; i--) {
204        serf_bucket_t *new_bucket;
205
206        new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
207                                               vecs[i].iov_len,
208                                               NULL, NULL,
209                                               aggregate_bucket->allocator);
210
211        serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
212
213    }
214}
215
216void serf_bucket_aggregate_append_iovec(
217    serf_bucket_t *aggregate_bucket,
218    struct iovec *vecs,
219    int vecs_count)
220{
221    serf_bucket_t *new_bucket;
222
223    new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
224                                          aggregate_bucket->allocator);
225
226    serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
227}
228
229static apr_status_t read_aggregate(serf_bucket_t *bucket,
230                                   apr_size_t requested,
231                                   int vecs_size, struct iovec *vecs,
232                                   int *vecs_used)
233{
234    aggregate_context_t *ctx = bucket->data;
235    int cur_vecs_used;
236    apr_status_t status;
237
238    *vecs_used = 0;
239
240    if (!ctx->list) {
241        if (ctx->hold_open) {
242            return ctx->hold_open(ctx->hold_open_baton, bucket);
243        }
244        else {
245            return APR_EOF;
246        }
247    }
248
249    status = APR_SUCCESS;
250    while (requested) {
251        serf_bucket_t *head = ctx->list->bucket;
252
253        status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
254                                        &cur_vecs_used);
255
256        if (SERF_BUCKET_READ_ERROR(status))
257            return status;
258
259        /* Add the number of vecs we read to our running total. */
260        *vecs_used += cur_vecs_used;
261
262        if (cur_vecs_used > 0 || status) {
263            bucket_list_t *next_list;
264
265            /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
266             * as it isn't safe to read more without returning to our caller.
267             */
268            if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
269                return status;
270            }
271
272            /* However, if we read EOF, we can stash this bucket in a
273             * to-be-freed list and move on to the next bucket.  This ensures
274             * that the bucket stays alive (so as not to violate our read
275             * semantics).  We'll destroy this list of buckets the next time
276             * we are asked to perform a read operation - thus ensuring the
277             * proper read lifetime.
278             */
279            next_list = ctx->list->next;
280            ctx->list->next = ctx->done;
281            ctx->done = ctx->list;
282            ctx->list = next_list;
283
284            /* If we have no more in our list, return EOF. */
285            if (!ctx->list) {
286                if (ctx->hold_open) {
287                    return ctx->hold_open(ctx->hold_open_baton, bucket);
288                }
289                else {
290                    return APR_EOF;
291                }
292            }
293
294            /* At this point, it safe to read the next bucket - if we can. */
295
296            /* If the caller doesn't want ALL_AVAIL, decrement the size
297             * of the items we just read from the list.
298             */
299            if (requested != SERF_READ_ALL_AVAIL) {
300                int i;
301
302                for (i = 0; i < cur_vecs_used; i++)
303                    requested -= vecs[i].iov_len;
304            }
305
306            /* Adjust our vecs to account for what we just read. */
307            vecs_size -= cur_vecs_used;
308            vecs += cur_vecs_used;
309
310            /* We reached our max.  Oh well. */
311            if (!requested || !vecs_size) {
312                return APR_SUCCESS;
313            }
314        }
315    }
316
317    return status;
318}
319
320static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
321                                        apr_size_t requested,
322                                        const char **data, apr_size_t *len)
323{
324    aggregate_context_t *ctx = bucket->data;
325    struct iovec vec;
326    int vecs_used;
327    apr_status_t status;
328
329    cleanup_aggregate(ctx, bucket->allocator);
330
331    status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
332
333    if (!vecs_used) {
334        *len = 0;
335    }
336    else {
337        *data = vec.iov_base;
338        *len = vec.iov_len;
339    }
340
341    return status;
342}
343
344static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
345                                              apr_size_t requested,
346                                              int vecs_size,
347                                              struct iovec *vecs,
348                                              int *vecs_used)
349{
350    aggregate_context_t *ctx = bucket->data;
351
352    cleanup_aggregate(ctx, bucket->allocator);
353
354    return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
355}
356
357static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
358                                            int acceptable, int *found,
359                                            const char **data, apr_size_t *len)
360{
361    aggregate_context_t *ctx = bucket->data;
362    apr_status_t status;
363
364    cleanup_aggregate(ctx, bucket->allocator);
365
366    do {
367        serf_bucket_t *head;
368
369        *len = 0;
370
371        if (!ctx->list) {
372            if (ctx->hold_open) {
373                return ctx->hold_open(ctx->hold_open_baton, bucket);
374            }
375            else {
376                return APR_EOF;
377            }
378        }
379
380        head = ctx->list->bucket;
381
382        status = serf_bucket_readline(head, acceptable, found,
383                                      data, len);
384        if (SERF_BUCKET_READ_ERROR(status))
385            return status;
386
387        if (status == APR_EOF) {
388            bucket_list_t *next_list;
389
390            /* head bucket is empty, move to to-be-cleaned-up list. */
391            next_list = ctx->list->next;
392            ctx->list->next = ctx->done;
393            ctx->done = ctx->list;
394            ctx->list = next_list;
395
396            /* If we have no more in our list, return EOF. */
397            if (!ctx->list) {
398                if (ctx->hold_open) {
399                    return ctx->hold_open(ctx->hold_open_baton, bucket);
400                }
401                else {
402                    return APR_EOF;
403                }
404            }
405
406            /* we read something, so bail out and let the appl. read again. */
407            if (*len)
408                status = APR_SUCCESS;
409        }
410
411        /* continue with APR_SUCCESS or APR_EOF and no data read yet. */
412    } while (!*len && status != APR_EAGAIN);
413
414    return status;
415}
416
417static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
418                                        const char **data,
419                                        apr_size_t *len)
420{
421    aggregate_context_t *ctx = bucket->data;
422    serf_bucket_t *head;
423    apr_status_t status;
424
425    cleanup_aggregate(ctx, bucket->allocator);
426
427    /* Peek the first bucket in the list, if any. */
428    if (!ctx->list) {
429        *len = 0;
430        if (ctx->hold_open) {
431            status = ctx->hold_open(ctx->hold_open_baton, bucket);
432            if (status == APR_EAGAIN)
433                status = APR_SUCCESS;
434            return status;
435        }
436        else {
437            return APR_EOF;
438        }
439    }
440
441    head = ctx->list->bucket;
442
443    status = serf_bucket_peek(head, data, len);
444
445    if (status == APR_EOF) {
446        if (ctx->list->next) {
447            status = APR_SUCCESS;
448        } else {
449            if (ctx->hold_open) {
450                status = ctx->hold_open(ctx->hold_open_baton, bucket);
451                if (status == APR_EAGAIN)
452                    status = APR_SUCCESS;
453                return status;
454            }
455        }
456    }
457
458    return status;
459}
460
461static serf_bucket_t * serf_aggregate_read_bucket(
462    serf_bucket_t *bucket,
463    const serf_bucket_type_t *type)
464{
465    aggregate_context_t *ctx = bucket->data;
466    serf_bucket_t *found_bucket;
467
468    if (!ctx->list) {
469        return NULL;
470    }
471
472    if (ctx->list->bucket->type == type) {
473        /* Got the bucket. Consume it from our list. */
474        found_bucket = ctx->list->bucket;
475        ctx->list = ctx->list->next;
476        return found_bucket;
477    }
478
479    /* Call read_bucket on first one in our list. */
480    return serf_bucket_read_bucket(ctx->list->bucket, type);
481}
482
483
484const serf_bucket_type_t serf_bucket_type_aggregate = {
485    "AGGREGATE",
486    serf_aggregate_read,
487    serf_aggregate_readline,
488    serf_aggregate_read_iovec,
489    serf_default_read_for_sendfile,
490    serf_aggregate_read_bucket,
491    serf_aggregate_peek,
492    serf_aggregate_destroy_and_data,
493};
494