deflate_buckets.c revision 362181
1/* ====================================================================
2 *    Licensed to the Apache Software Foundation (ASF) under one
3 *    or more contributor license agreements.  See the NOTICE file
4 *    distributed with this work for additional information
5 *    regarding copyright ownership.  The ASF licenses this file
6 *    to you under the Apache License, Version 2.0 (the
7 *    "License"); you may not use this file except in compliance
8 *    with the License.  You may obtain a copy of the License at
9 *
10 *      http://www.apache.org/licenses/LICENSE-2.0
11 *
12 *    Unless required by applicable law or agreed to in writing,
13 *    software distributed under the License is distributed on an
14 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 *    KIND, either express or implied.  See the License for the
16 *    specific language governing permissions and limitations
17 *    under the License.
18 * ====================================================================
19 */
20
21#include <apr_strings.h>
22
23#include <zlib.h>
24
25/* This conditional isn't defined anywhere yet. */
26#ifdef HAVE_ZUTIL_H
27#include <zutil.h>
28#endif
29
30#include "serf.h"
31#include "serf_bucket_util.h"
32
33/* magic header */
34static char deflate_magic[2] = { '\037', '\213' };
35#define DEFLATE_MAGIC_SIZE 10
36#define DEFLATE_VERIFY_SIZE 8
37#define DEFLATE_BUFFER_SIZE 8096
38
39static const int DEFLATE_WINDOW_SIZE = -15;
40static const int DEFLATE_MEMLEVEL = 9;
41
42typedef struct {
43    serf_bucket_t *stream;
44    serf_bucket_t *inflate_stream;
45
46    int format;                 /* Are we 'deflate' or 'gzip'? */
47
48    enum {
49        STATE_READING_HEADER,   /* reading the gzip header */
50        STATE_HEADER,           /* read the gzip header */
51        STATE_INIT,             /* init'ing zlib functions */
52        STATE_INFLATE,          /* inflating the content now */
53        STATE_READING_VERIFY,   /* reading the final gzip CRC */
54        STATE_VERIFY,           /* verifying the final gzip CRC */
55        STATE_FINISH,           /* clean up after reading body */
56        STATE_DONE,             /* body is done; we'll return EOF here */
57    } state;
58
59    z_stream zstream;
60    char hdr_buffer[DEFLATE_MAGIC_SIZE];
61    unsigned char buffer[DEFLATE_BUFFER_SIZE];
62    unsigned long crc;
63    int windowSize;
64    int memLevel;
65    int bufferSize;
66
67    /* How much of the chunk, or the terminator, do we have left to read? */
68    apr_size_t stream_left;
69
70    /* How much are we supposed to read? */
71    apr_size_t stream_size;
72
73    int stream_status; /* What was the last status we read? */
74
75} deflate_context_t;
76
77/* Inputs a string and returns a long.  */
78static unsigned long getLong(unsigned char *string)
79{
80    return ((unsigned long)string[0])
81          | (((unsigned long)string[1]) << 8)
82          | (((unsigned long)string[2]) << 16)
83          | (((unsigned long)string[3]) << 24);
84}
85
86serf_bucket_t *serf_bucket_deflate_create(
87    serf_bucket_t *stream,
88    serf_bucket_alloc_t *allocator,
89    int format)
90{
91    deflate_context_t *ctx;
92
93    ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
94    ctx->stream = stream;
95    ctx->stream_status = APR_SUCCESS;
96    ctx->inflate_stream = serf_bucket_aggregate_create(allocator);
97    ctx->format = format;
98    ctx->crc = 0;
99    /* zstream must be NULL'd out. */
100    memset(&ctx->zstream, 0, sizeof(ctx->zstream));
101
102    switch (ctx->format) {
103        case SERF_DEFLATE_GZIP:
104            ctx->state = STATE_READING_HEADER;
105            break;
106        case SERF_DEFLATE_DEFLATE:
107            /* deflate doesn't have a header. */
108            ctx->state = STATE_INIT;
109            break;
110        default:
111            /* Not reachable */
112            return NULL;
113    }
114
115    /* Initial size of gzip header. */
116    ctx->stream_left = ctx->stream_size = DEFLATE_MAGIC_SIZE;
117
118    ctx->windowSize = DEFLATE_WINDOW_SIZE;
119    ctx->memLevel = DEFLATE_MEMLEVEL;
120    ctx->bufferSize = DEFLATE_BUFFER_SIZE;
121
122    return serf_bucket_create(&serf_bucket_type_deflate, allocator, ctx);
123}
124
125static void serf_deflate_destroy_and_data(serf_bucket_t *bucket)
126{
127    deflate_context_t *ctx = bucket->data;
128
129    if (ctx->state > STATE_INIT &&
130        ctx->state <= STATE_FINISH)
131        inflateEnd(&ctx->zstream);
132
133    /* We may have appended inflate_stream into the stream bucket.
134     * If so, avoid free'ing it twice.
135     */
136    if (ctx->inflate_stream) {
137        serf_bucket_destroy(ctx->inflate_stream);
138    }
139    serf_bucket_destroy(ctx->stream);
140
141    serf_default_destroy_and_data(bucket);
142}
143
144static apr_status_t serf_deflate_read(serf_bucket_t *bucket,
145                                      apr_size_t requested,
146                                      const char **data, apr_size_t *len)
147{
148    deflate_context_t *ctx = bucket->data;
149    apr_status_t status;
150    const char *private_data;
151    apr_size_t private_len;
152    int zRC;
153
154    while (1) {
155        switch (ctx->state) {
156        case STATE_READING_HEADER:
157        case STATE_READING_VERIFY:
158            status = serf_bucket_read(ctx->stream, ctx->stream_left,
159                                      &private_data, &private_len);
160
161            if (SERF_BUCKET_READ_ERROR(status)) {
162                return status;
163            }
164
165            memcpy(ctx->hdr_buffer + (ctx->stream_size - ctx->stream_left),
166                   private_data, private_len);
167
168            ctx->stream_left -= private_len;
169
170            if (ctx->stream_left == 0) {
171                ctx->state++;
172                if (APR_STATUS_IS_EAGAIN(status)) {
173                    *len = 0;
174                    return status;
175                }
176            }
177            else if (status) {
178                *len = 0;
179                return status;
180            }
181            break;
182        case STATE_HEADER:
183            if (ctx->hdr_buffer[0] != deflate_magic[0] ||
184                ctx->hdr_buffer[1] != deflate_magic[1]) {
185                return SERF_ERROR_DECOMPRESSION_FAILED;
186            }
187            if (ctx->hdr_buffer[3] != 0) {
188                return SERF_ERROR_DECOMPRESSION_FAILED;
189            }
190            ctx->state++;
191            break;
192        case STATE_VERIFY:
193        {
194            unsigned long compCRC, compLen, actualLen;
195
196            /* Do the checksum computation. */
197            compCRC = getLong((unsigned char*)ctx->hdr_buffer);
198            if (ctx->crc != compCRC) {
199                return SERF_ERROR_DECOMPRESSION_FAILED;
200            }
201            compLen = getLong((unsigned char*)ctx->hdr_buffer + 4);
202            /* The length in the trailer is module 2^32, so do the same for
203               the actual length. */
204            actualLen = ctx->zstream.total_out;
205            actualLen &= 0xFFFFFFFF;
206            if (actualLen != compLen) {
207                return SERF_ERROR_DECOMPRESSION_FAILED;
208            }
209            ctx->state++;
210            break;
211        }
212        case STATE_INIT:
213            zRC = inflateInit2(&ctx->zstream, ctx->windowSize);
214            if (zRC != Z_OK) {
215                return SERF_ERROR_DECOMPRESSION_FAILED;
216            }
217            ctx->zstream.next_out = ctx->buffer;
218            ctx->zstream.avail_out = ctx->bufferSize;
219            ctx->state++;
220            break;
221        case STATE_FINISH:
222            inflateEnd(&ctx->zstream);
223            serf_bucket_aggregate_prepend(ctx->stream, ctx->inflate_stream);
224            ctx->inflate_stream = 0;
225            ctx->state++;
226            break;
227        case STATE_INFLATE:
228            /* Do we have anything already uncompressed to read? */
229            status = serf_bucket_read(ctx->inflate_stream, requested, data,
230                                      len);
231            if (SERF_BUCKET_READ_ERROR(status)) {
232                return status;
233            }
234            /* Hide EOF. */
235            if (APR_STATUS_IS_EOF(status)) {
236                status = ctx->stream_status;
237                if (APR_STATUS_IS_EOF(status)) {
238                    /* We've read all of the data from our stream, but we
239                     * need to continue to iterate until we flush
240                     * out the zlib buffer.
241                     */
242                    status = APR_SUCCESS;
243                }
244            }
245            if (*len != 0) {
246                return status;
247            }
248
249            /* We tried; but we have nothing buffered. Fetch more. */
250
251            /* It is possible that we maxed out avail_out before
252             * exhausting avail_in; therefore, continue using the
253             * previous buffer.  Otherwise, fetch more data from
254             * our stream bucket.
255             */
256            if (ctx->zstream.avail_in == 0) {
257                /* When we empty our inflated stream, we'll return this
258                 * status - this allow us to eventually pass up EAGAINs.
259                 */
260                ctx->stream_status = serf_bucket_read(ctx->stream,
261                                                      ctx->bufferSize,
262                                                      &private_data,
263                                                      &private_len);
264
265                if (SERF_BUCKET_READ_ERROR(ctx->stream_status)) {
266                    return ctx->stream_status;
267                }
268
269                if (!private_len && APR_STATUS_IS_EAGAIN(ctx->stream_status)) {
270                    *len = 0;
271                    status = ctx->stream_status;
272                    ctx->stream_status = APR_SUCCESS;
273                    return status;
274                }
275
276                ctx->zstream.next_in = (unsigned char*)private_data;
277                ctx->zstream.avail_in = private_len;
278            }
279
280            while (1) {
281
282                zRC = inflate(&ctx->zstream, Z_NO_FLUSH);
283
284                /* We're full or zlib requires more space. Either case, clear
285                   out our buffer, reset, and return. */
286                if (zRC == Z_BUF_ERROR || ctx->zstream.avail_out == 0) {
287                    serf_bucket_t *tmp;
288                    ctx->zstream.next_out = ctx->buffer;
289                    private_len = ctx->bufferSize - ctx->zstream.avail_out;
290
291                    ctx->crc = crc32(ctx->crc, (const Bytef *)ctx->buffer,
292                                     private_len);
293
294                    /* FIXME: There probably needs to be a free func. */
295                    tmp = SERF_BUCKET_SIMPLE_STRING_LEN((char *)ctx->buffer,
296                                                        private_len,
297                                                        bucket->allocator);
298                    serf_bucket_aggregate_append(ctx->inflate_stream, tmp);
299                    ctx->zstream.avail_out = ctx->bufferSize;
300                    break;
301                }
302
303                if (zRC == Z_STREAM_END) {
304                    serf_bucket_t *tmp;
305
306                    private_len = ctx->bufferSize - ctx->zstream.avail_out;
307                    ctx->crc = crc32(ctx->crc, (const Bytef *)ctx->buffer,
308                                     private_len);
309                    /* FIXME: There probably needs to be a free func. */
310                    tmp = SERF_BUCKET_SIMPLE_STRING_LEN((char *)ctx->buffer,
311                                                        private_len,
312                                                        bucket->allocator);
313                    serf_bucket_aggregate_append(ctx->inflate_stream, tmp);
314
315                    ctx->zstream.avail_out = ctx->bufferSize;
316
317                    /* Push back the remaining data to be read. */
318                    tmp = serf_bucket_aggregate_create(bucket->allocator);
319                    serf_bucket_aggregate_prepend(tmp, ctx->stream);
320                    ctx->stream = tmp;
321
322                    /* We now need to take the remaining avail_in and
323                     * throw it in ctx->stream so our next read picks it up.
324                     */
325                    tmp = SERF_BUCKET_SIMPLE_STRING_LEN(
326                                        (const char*)ctx->zstream.next_in,
327                                                     ctx->zstream.avail_in,
328                                                     bucket->allocator);
329                    serf_bucket_aggregate_prepend(ctx->stream, tmp);
330
331                    switch (ctx->format) {
332                    case SERF_DEFLATE_GZIP:
333                        ctx->stream_left = ctx->stream_size =
334                            DEFLATE_VERIFY_SIZE;
335                        ctx->state++;
336                        break;
337                    case SERF_DEFLATE_DEFLATE:
338                        /* Deflate does not have a verify footer. */
339                        ctx->state = STATE_FINISH;
340                        break;
341                    default:
342                        /* Not reachable */
343                        return APR_EGENERAL;
344                    }
345
346                    break;
347                }
348
349                /* Any other error? */
350                if (zRC != Z_OK) {
351                    return SERF_ERROR_DECOMPRESSION_FAILED;
352                }
353
354                /* As long as zRC == Z_OK, just keep looping. */
355            }
356            /* Okay, we've inflated.  Try to read. */
357            status = serf_bucket_read(ctx->inflate_stream, requested, data,
358                                      len);
359            /* Hide EOF. */
360            if (APR_STATUS_IS_EOF(status)) {
361                status = ctx->stream_status;
362
363                /* If the inflation wasn't finished, return APR_SUCCESS. */
364                if (zRC != Z_STREAM_END)
365                    return APR_SUCCESS;
366
367                /* If our stream is finished too and all data was inflated,
368                 * return SUCCESS so we'll iterate one more time.
369                 */
370                if (APR_STATUS_IS_EOF(status)) {
371                    /* No more data to read from the stream, and everything
372                       inflated. If all data was received correctly, state
373                       should have been advanced to STATE_READING_VERIFY or
374                       STATE_FINISH. If not, then the data was incomplete
375                       and we have an error. */
376                    if (ctx->state != STATE_INFLATE)
377                        return APR_SUCCESS;
378                    else
379                        return SERF_ERROR_DECOMPRESSION_FAILED;
380                }
381            }
382            return status;
383        case STATE_DONE:
384            /* We're done inflating.  Use our finished buffer. */
385            return serf_bucket_read(ctx->stream, requested, data, len);
386        default:
387            /* Not reachable */
388            return APR_EGENERAL;
389        }
390    }
391
392    /* NOTREACHED */
393}
394
395/* ### need to implement */
396#define serf_deflate_readline NULL
397#define serf_deflate_peek NULL
398
399const serf_bucket_type_t serf_bucket_type_deflate = {
400    "DEFLATE",
401    serf_deflate_read,
402    serf_deflate_readline,
403    serf_default_read_iovec,
404    serf_default_read_for_sendfile,
405    serf_default_read_bucket,
406    serf_deflate_peek,
407    serf_deflate_destroy_and_data,
408};
409