spillbuf.c revision 362181
1/*
2 * spillbuf.c : an in-memory buffer that can spill to disk
3 *
4 * ====================================================================
5 *    Licensed to the Apache Software Foundation (ASF) under one
6 *    or more contributor license agreements.  See the NOTICE file
7 *    distributed with this work for additional information
8 *    regarding copyright ownership.  The ASF licenses this file
9 *    to you under the Apache License, Version 2.0 (the
10 *    "License"); you may not use this file except in compliance
11 *    with the License.  You may obtain a copy of the License at
12 *
13 *      http://www.apache.org/licenses/LICENSE-2.0
14 *
15 *    Unless required by applicable law or agreed to in writing,
16 *    software distributed under the License is distributed on an
17 *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
18 *    KIND, either express or implied.  See the License for the
19 *    specific language governing permissions and limitations
20 *    under the License.
21 * ====================================================================
22 */
23
24#include <apr_file_io.h>
25
26#include "svn_io.h"
27#include "svn_pools.h"
28
29#include "private/svn_subr_private.h"
30
31
32struct memblock_t {
33  apr_size_t size;
34  char *data;
35
36  struct memblock_t *next;
37};
38
39
40struct svn_spillbuf_t {
41  /* Pool for allocating blocks and the spill file.  */
42  apr_pool_t *pool;
43
44  /* Size of in-memory blocks.  */
45  apr_size_t blocksize;
46
47  /* Maximum in-memory size; start spilling when we reach this size.  */
48  apr_size_t maxsize;
49
50  /* The amount of content in memory.  */
51  apr_size_t memory_size;
52
53  /* HEAD points to the first block of the linked list of buffers.
54     TAIL points to the last block, for quickly appending more blocks
55     to the overall list.  */
56  struct memblock_t *head;
57  struct memblock_t *tail;
58
59  /* Available blocks for storing pending data. These were allocated
60     previously, then the data consumed and returned to this list.  */
61  struct memblock_t *avail;
62
63  /* When a block is borrowed for reading, it is listed here.  */
64  struct memblock_t *out_for_reading;
65
66  /* Once MEMORY_SIZE exceeds SPILL_SIZE, then arriving content will be
67     appended to the (temporary) file indicated by SPILL.  */
68  apr_file_t *spill;
69
70  /* As we consume content from SPILL, this value indicates where we
71     will begin reading.  */
72  apr_off_t spill_start;
73
74  /* How much content remains in SPILL.  */
75  svn_filesize_t spill_size;
76
77  /* When false, do not delete the spill file when it is closed. */
78  svn_boolean_t delete_on_close;
79
80  /* When true, and the amount of data written to the spillbuf is
81     larger than MAXSIZE, all spillbuf contents will be written to the
82     spill file. */
83  svn_boolean_t spill_all_contents;
84
85  /* The directory in which the spill file is created. */
86  const char *dirpath;
87
88  /* The name of the temporary spill file. */
89  const char *filename;
90};
91
92
93struct svn_spillbuf_reader_t {
94  /* Embed the spill-buffer within the reader.  */
95  struct svn_spillbuf_t *buf;
96
97  /* When we read content from the underlying spillbuf, these fields store
98     the ptr/len pair. The ptr will be incremented as we "read" out of this
99     buffer since we don't have to retain the original pointer (it is
100     managed inside of the spillbuf).  */
101  const char *sb_ptr;
102  apr_size_t sb_len;
103
104  /* If a write comes in, then we may need to save content from our
105     borrowed buffer (since that buffer may be destroyed by our call into
106     the spillbuf code). Note that we retain the original pointer since
107     this buffer is allocated by the reader code and re-used. The SAVE_POS
108     field indicates the current position within this save buffer. The
109     SAVE_LEN field describes how much content is present.  */
110  char *save_ptr;
111  apr_size_t save_len;
112  apr_size_t save_pos;
113};
114
115
116/* Extended spillbuf initialization. */
117static void
118init_spillbuf_extended(svn_spillbuf_t *buf,
119                       apr_size_t blocksize,
120                       apr_size_t maxsize,
121                       svn_boolean_t delete_on_close,
122                       svn_boolean_t spill_all_contents,
123                       const char *dirpath,
124                       apr_pool_t *result_pool)
125{
126  buf->pool = result_pool;
127  buf->blocksize = blocksize;
128  buf->maxsize = maxsize;
129  buf->delete_on_close = delete_on_close;
130  buf->spill_all_contents = spill_all_contents;
131  buf->dirpath = dirpath;
132}
133
134/* Common constructor for initializing spillbufs.
135   Used by svn_spillbuf__create, svn_spilbuff__reader_create. */
136static void
137init_spillbuf(svn_spillbuf_t *buf,
138              apr_size_t blocksize,
139              apr_size_t maxsize,
140              apr_pool_t *result_pool)
141{
142  init_spillbuf_extended(buf, blocksize, maxsize,
143                         TRUE, FALSE, NULL,
144                         result_pool);
145}
146
147svn_spillbuf_t *
148svn_spillbuf__create(apr_size_t blocksize,
149                     apr_size_t maxsize,
150                     apr_pool_t *result_pool)
151{
152  svn_spillbuf_t *buf = apr_pcalloc(result_pool, sizeof(*buf));
153  init_spillbuf(buf, blocksize, maxsize, result_pool);
154  return buf;
155}
156
157
158svn_spillbuf_t *
159svn_spillbuf__create_extended(apr_size_t blocksize,
160                              apr_size_t maxsize,
161                              svn_boolean_t delete_on_close,
162                              svn_boolean_t spill_all_contents,
163                              const char *dirpath,
164                              apr_pool_t *result_pool)
165{
166  svn_spillbuf_t *buf = apr_pcalloc(result_pool, sizeof(*buf));
167  init_spillbuf_extended(buf, blocksize, maxsize,
168                         delete_on_close, spill_all_contents, dirpath,
169                         result_pool);
170  return buf;
171}
172
173svn_filesize_t
174svn_spillbuf__get_size(const svn_spillbuf_t *buf)
175{
176  return buf->memory_size + buf->spill_size;
177}
178
179svn_filesize_t
180svn_spillbuf__get_memory_size(const svn_spillbuf_t *buf)
181{
182  return buf->memory_size;
183}
184
185const char *
186svn_spillbuf__get_filename(const svn_spillbuf_t *buf)
187{
188  return buf->filename;
189}
190
191apr_file_t *
192svn_spillbuf__get_file(const svn_spillbuf_t *buf)
193{
194  return buf->spill;
195}
196
197/* Get a memblock from the spill-buffer. It will be the block that we
198   passed out for reading, come from the free list, or allocated.  */
199static struct memblock_t *
200get_buffer(svn_spillbuf_t *buf)
201{
202  struct memblock_t *mem = buf->out_for_reading;
203
204  if (mem != NULL)
205    {
206      buf->out_for_reading = NULL;
207      return mem;
208    }
209
210  if (buf->avail == NULL)
211    {
212      mem = apr_palloc(buf->pool, sizeof(*mem));
213      mem->data = apr_palloc(buf->pool, buf->blocksize);
214      return mem;
215    }
216
217  mem = buf->avail;
218  buf->avail = mem->next;
219  return mem;
220}
221
222
223/* Return MEM to the list of available buffers in BUF.  */
224static void
225return_buffer(svn_spillbuf_t *buf,
226              struct memblock_t *mem)
227{
228  mem->next = buf->avail;
229  buf->avail = mem;
230}
231
232
233svn_error_t *
234svn_spillbuf__write(svn_spillbuf_t *buf,
235                    const char *data,
236                    apr_size_t len,
237                    apr_pool_t *scratch_pool)
238{
239  struct memblock_t *mem;
240
241  /* We do not (yet) have a spill file, but the amount stored in memory
242     will grow too large. Create the file and place the pending data into
243     the temporary file.  */
244  if (buf->spill == NULL
245      && ((buf->maxsize - buf->memory_size) < len))
246    {
247      SVN_ERR(svn_io_open_unique_file3(&buf->spill,
248                                       &buf->filename,
249                                       buf->dirpath,
250                                       (buf->delete_on_close
251                                        ? svn_io_file_del_on_close
252                                        : svn_io_file_del_none),
253                                       buf->pool, scratch_pool));
254
255      /* Optionally write the memory contents into the file. */
256      if (buf->spill_all_contents)
257        {
258          mem = buf->head;
259          while (mem != NULL)
260            {
261              SVN_ERR(svn_io_file_write_full(buf->spill, mem->data, mem->size,
262                                             NULL, scratch_pool));
263              mem = mem->next;
264            }
265
266          /* Adjust the start offset for reading from the spill file.
267
268             This way, the first `buf->memory_size` bytes of data will
269             be read from the existing in-memory buffers, which makes
270             more sense than discarding the buffers and re-reading
271             data from the file. */
272          buf->spill_start = buf->memory_size;
273        }
274    }
275
276  /* Once a spill file has been constructed, then we need to put all
277     arriving data into the file. We will no longer attempt to hold it
278     in memory.  */
279  if (buf->spill != NULL)
280    {
281      apr_off_t output_unused = 0;  /* ### stupid API  */
282
283      /* Seek to the end of the spill file. We don't know if a read has
284         occurred since our last write, and moved the file position.  */
285      SVN_ERR(svn_io_file_seek(buf->spill,
286                               APR_END, &output_unused,
287                               scratch_pool));
288
289      SVN_ERR(svn_io_file_write_full(buf->spill, data, len,
290                                     NULL, scratch_pool));
291      buf->spill_size += len;
292
293      return SVN_NO_ERROR;
294    }
295
296  while (len > 0)
297    {
298      apr_size_t amt;
299
300      if (buf->tail == NULL || buf->tail->size == buf->blocksize)
301        {
302          /* There is no existing memblock (that may have space), or the
303             tail memblock has no space, so we need a new memblock.  */
304          mem = get_buffer(buf);
305          mem->size = 0;
306          mem->next = NULL;
307        }
308      else
309        {
310          mem = buf->tail;
311        }
312
313      /* Compute how much to write into the memblock.  */
314      amt = buf->blocksize - mem->size;
315      if (amt > len)
316        amt = len;
317
318      /* Copy some data into this memblock.  */
319      memcpy(&mem->data[mem->size], data, amt);
320      mem->size += amt;
321      data += amt;
322      len -= amt;
323
324      /* We need to record how much is buffered in memory. Once we reach
325         buf->maxsize (or thereabouts, it doesn't have to be precise), then
326         we'll switch to putting the content into a file.  */
327      buf->memory_size += amt;
328
329      /* Start a list of buffers, or (if we're not writing into the tail)
330         append to the end of the linked list of buffers.  */
331      if (buf->tail == NULL)
332        {
333          buf->head = mem;
334          buf->tail = mem;
335        }
336      else if (mem != buf->tail)
337        {
338          buf->tail->next = mem;
339          buf->tail = mem;
340        }
341    }
342
343  return SVN_NO_ERROR;
344}
345
346
347/* Return a memblock of content, if any is available. *mem will be NULL if
348   no further content is available. The memblock should eventually be
349   passed to return_buffer() (or stored into buf->out_for_reading which
350   will grab that block at the next get_buffer() call). */
351static svn_error_t *
352read_data(struct memblock_t **mem,
353          svn_spillbuf_t *buf,
354          apr_pool_t *scratch_pool)
355{
356  svn_error_t *err;
357
358  /* If we have some in-memory blocks, then return one.  */
359  if (buf->head != NULL)
360    {
361      *mem = buf->head;
362      if (buf->tail == *mem)
363        buf->head = buf->tail = NULL;
364      else
365        buf->head = (*mem)->next;
366
367      /* We're using less memory now. If we haven't hit the spill file,
368         then we may be able to keep using memory.  */
369      buf->memory_size -= (*mem)->size;
370
371      return SVN_NO_ERROR;
372    }
373
374  /* No file? Done.  */
375  if (buf->spill == NULL)
376    {
377      *mem = NULL;
378      return SVN_NO_ERROR;
379    }
380
381  /* Assume that the caller has seeked the spill file to the correct pos.  */
382
383  /* Get a buffer that we can read content into.  */
384  *mem = get_buffer(buf);
385  /* NOTE: mem's size/next are uninitialized.  */
386
387  if ((apr_uint64_t)buf->spill_size < (apr_uint64_t)buf->blocksize)
388    (*mem)->size = (apr_size_t)buf->spill_size;
389  else
390    (*mem)->size = buf->blocksize;  /* The size of (*mem)->data  */
391  (*mem)->next = NULL;
392
393  /* Read some data from the spill file into the memblock.  */
394  err = svn_io_file_read(buf->spill, (*mem)->data, &(*mem)->size,
395                         scratch_pool);
396  if (err)
397    {
398      return_buffer(buf, *mem);
399      return svn_error_trace(err);
400    }
401
402  /* Mark the data that we consumed from the spill file.  */
403  buf->spill_start += (*mem)->size;
404
405  /* Did we consume all the data from the spill file?  */
406  if ((buf->spill_size -= (*mem)->size) == 0)
407    {
408      /* Close and reset our spill file information.  */
409      SVN_ERR(svn_io_file_close(buf->spill, scratch_pool));
410      buf->spill = NULL;
411      buf->spill_start = 0;
412    }
413
414  /* *mem has been initialized. Done.  */
415  return SVN_NO_ERROR;
416}
417
418
419/* If the next read would consume data from the file, then seek to the
420   correct position.  */
421static svn_error_t *
422maybe_seek(svn_boolean_t *seeked,
423           const svn_spillbuf_t *buf,
424           apr_pool_t *scratch_pool)
425{
426  if (buf->head == NULL && buf->spill != NULL)
427    {
428      apr_off_t output_unused;
429
430      /* Seek to where we left off reading.  */
431      output_unused = buf->spill_start;  /* ### stupid API  */
432      SVN_ERR(svn_io_file_seek(buf->spill,
433                               APR_SET, &output_unused,
434                               scratch_pool));
435      if (seeked != NULL)
436        *seeked = TRUE;
437    }
438  else if (seeked != NULL)
439    {
440      *seeked = FALSE;
441    }
442
443  return SVN_NO_ERROR;
444}
445
446
447svn_error_t *
448svn_spillbuf__read(const char **data,
449                   apr_size_t *len,
450                   svn_spillbuf_t *buf,
451                   apr_pool_t *scratch_pool)
452{
453  struct memblock_t *mem;
454
455  /* Possibly seek... */
456  SVN_ERR(maybe_seek(NULL, buf, scratch_pool));
457
458  SVN_ERR(read_data(&mem, buf, scratch_pool));
459  if (mem == NULL)
460    {
461      *data = NULL;
462      *len = 0;
463    }
464  else
465    {
466      *data = mem->data;
467      *len = mem->size;
468
469      /* If a block was out for reading, then return it.  */
470      if (buf->out_for_reading != NULL)
471        return_buffer(buf, buf->out_for_reading);
472
473      /* Remember that we've passed this block out for reading.  */
474      buf->out_for_reading = mem;
475    }
476
477  return SVN_NO_ERROR;
478}
479
480
481svn_error_t *
482svn_spillbuf__process(svn_boolean_t *exhausted,
483                      svn_spillbuf_t *buf,
484                      svn_spillbuf_read_t read_func,
485                      void *read_baton,
486                      apr_pool_t *scratch_pool)
487{
488  svn_boolean_t has_seeked = FALSE;
489  apr_pool_t *iterpool = svn_pool_create(scratch_pool);
490
491  *exhausted = FALSE;
492
493  while (TRUE)
494    {
495      struct memblock_t *mem;
496      svn_error_t *err;
497      svn_boolean_t stop;
498
499      svn_pool_clear(iterpool);
500
501      /* If this call to read_data() will read from the spill file, and we
502         have not seek'd the file... then do it now.  */
503      if (!has_seeked)
504        SVN_ERR(maybe_seek(&has_seeked, buf, iterpool));
505
506      /* Get some content to pass to the read callback.  */
507      SVN_ERR(read_data(&mem, buf, iterpool));
508      if (mem == NULL)
509        {
510          *exhausted = TRUE;
511          break;
512        }
513
514      err = read_func(&stop, read_baton, mem->data, mem->size, iterpool);
515
516      return_buffer(buf, mem);
517
518      if (err)
519        return svn_error_trace(err);
520
521      /* If the callbacks told us to stop, then we're done for now.  */
522      if (stop)
523        break;
524    }
525
526  svn_pool_destroy(iterpool);
527  return SVN_NO_ERROR;
528}
529
530
531svn_spillbuf_reader_t *
532svn_spillbuf__reader_create(apr_size_t blocksize,
533                            apr_size_t maxsize,
534                            apr_pool_t *result_pool)
535{
536  svn_spillbuf_reader_t *sbr = apr_pcalloc(result_pool, sizeof(*sbr));
537  sbr->buf = svn_spillbuf__create(blocksize, maxsize, result_pool);
538  return sbr;
539}
540
541svn_error_t *
542svn_spillbuf__reader_read(apr_size_t *amt,
543                          svn_spillbuf_reader_t *reader,
544                          char *data,
545                          apr_size_t len,
546                          apr_pool_t *scratch_pool)
547{
548  if (len == 0)
549    return svn_error_create(SVN_ERR_INCORRECT_PARAMS, NULL, NULL);
550
551  *amt = 0;
552
553  while (len > 0)
554    {
555      apr_size_t copy_amt;
556
557      if (reader->save_len > 0)
558        {
559          /* We have some saved content, so use this first.  */
560
561          if (len < reader->save_len)
562            copy_amt = len;
563          else
564            copy_amt = reader->save_len;
565
566          memcpy(data, reader->save_ptr + reader->save_pos, copy_amt);
567          reader->save_pos += copy_amt;
568          reader->save_len -= copy_amt;
569        }
570      else
571        {
572          /* No saved content. We should now copy from spillbuf-provided
573             buffers of content.  */
574
575          /* We may need more content from the spillbuf.  */
576          if (reader->sb_len == 0)
577            {
578              SVN_ERR(svn_spillbuf__read(&reader->sb_ptr, &reader->sb_len,
579                                         reader->buf,
580                                         scratch_pool));
581
582              /* We've run out of content, so return with whatever has
583                 been copied into DATA and stored into AMT.  */
584              if (reader->sb_ptr == NULL)
585                {
586                  /* For safety, read() may not have set SB_LEN. We use it
587                     as an indicator, so it needs to be cleared.  */
588                  reader->sb_len = 0;
589                  return SVN_NO_ERROR;
590                }
591            }
592
593          if (len < reader->sb_len)
594            copy_amt = len;
595          else
596            copy_amt = reader->sb_len;
597
598          memcpy(data, reader->sb_ptr, copy_amt);
599          reader->sb_ptr += copy_amt;
600          reader->sb_len -= copy_amt;
601        }
602
603      data += copy_amt;
604      len -= copy_amt;
605      (*amt) += copy_amt;
606    }
607
608  return SVN_NO_ERROR;
609}
610
611
612svn_error_t *
613svn_spillbuf__reader_getc(char *c,
614                          svn_spillbuf_reader_t *reader,
615                          apr_pool_t *scratch_pool)
616{
617  apr_size_t amt;
618
619  SVN_ERR(svn_spillbuf__reader_read(&amt, reader, c, 1, scratch_pool));
620  if (amt == 0)
621    return svn_error_create(SVN_ERR_STREAM_UNEXPECTED_EOF, NULL, NULL);
622
623  return SVN_NO_ERROR;
624}
625
626
627svn_error_t *
628svn_spillbuf__reader_write(svn_spillbuf_reader_t *reader,
629                           const char *data,
630                           apr_size_t len,
631                           apr_pool_t *scratch_pool)
632{
633  /* If we have a buffer of content from the spillbuf, then we need to
634     move that content to a safe place.  */
635  if (reader->sb_len > 0)
636    {
637      if (reader->save_ptr == NULL)
638        reader->save_ptr = apr_palloc(reader->buf->pool,
639                                      reader->buf->blocksize);
640
641      memcpy(reader->save_ptr, reader->sb_ptr, reader->sb_len);
642      reader->save_len = reader->sb_len;
643      reader->save_pos = 0;
644
645      /* No more content in the spillbuf-borrowed buffer.  */
646      reader->sb_len = 0;
647    }
648
649  return svn_error_trace(svn_spillbuf__write(reader->buf, data, len,
650                                             scratch_pool));
651}
652
653
654struct spillbuf_baton
655{
656  svn_spillbuf_reader_t *reader;
657  apr_pool_t *scratch_pool;
658};
659
660
661static svn_error_t *
662read_handler_spillbuf(void *baton, char *buffer, apr_size_t *len)
663{
664  struct spillbuf_baton *sb = baton;
665
666  SVN_ERR(svn_spillbuf__reader_read(len, sb->reader, buffer, *len,
667                                    sb->scratch_pool));
668
669  svn_pool_clear(sb->scratch_pool);
670  return SVN_NO_ERROR;
671}
672
673
674static svn_error_t *
675write_handler_spillbuf(void *baton, const char *data, apr_size_t *len)
676{
677  struct spillbuf_baton *sb = baton;
678
679  SVN_ERR(svn_spillbuf__reader_write(sb->reader, data, *len,
680                                     sb->scratch_pool));
681
682  svn_pool_clear(sb->scratch_pool);
683  return SVN_NO_ERROR;
684}
685
686
687svn_stream_t *
688svn_stream__from_spillbuf(svn_spillbuf_t *buf,
689                          apr_pool_t *result_pool)
690{
691  svn_stream_t *stream;
692  struct spillbuf_baton *sb = apr_palloc(result_pool, sizeof(*sb));
693
694  sb->reader = apr_pcalloc(result_pool, sizeof(*sb->reader));
695  sb->reader->buf = buf;
696  sb->scratch_pool = svn_pool_create(result_pool);
697
698  stream = svn_stream_create(sb, result_pool);
699
700  svn_stream_set_read2(stream, NULL /* only full read support */,
701                       read_handler_spillbuf);
702  svn_stream_set_write(stream, write_handler_spillbuf);
703
704  return stream;
705}
706