1278307Srpaulo/////////////////////////////////////////////////////////////////////////////// 2278307Srpaulo// 3278307Srpaulo/// \file outqueue.c 4278307Srpaulo/// \brief Output queue handling in multithreaded coding 5278307Srpaulo// 6278307Srpaulo// Author: Lasse Collin 7278307Srpaulo// 8278307Srpaulo// This file has been put into the public domain. 9278307Srpaulo// You can do whatever you want with this file. 10278307Srpaulo// 11278307Srpaulo/////////////////////////////////////////////////////////////////////////////// 12278307Srpaulo 13278307Srpaulo#include "outqueue.h" 14278307Srpaulo 15278307Srpaulo 16278307Srpaulo/// This is to ease integer overflow checking: We may allocate up to 17278307Srpaulo/// 2 * LZMA_THREADS_MAX buffers and we need some extra memory for other 18278307Srpaulo/// data structures (that's the second /2). 19278307Srpaulo#define BUF_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX / 2 / 2) 20278307Srpaulo 21278307Srpaulo 22278307Srpaulostatic lzma_ret 23278307Srpauloget_options(uint64_t *bufs_alloc_size, uint32_t *bufs_count, 24278307Srpaulo uint64_t buf_size_max, uint32_t threads) 25278307Srpaulo{ 26278307Srpaulo if (threads > LZMA_THREADS_MAX || buf_size_max > BUF_SIZE_MAX) 27278307Srpaulo return LZMA_OPTIONS_ERROR; 28278307Srpaulo 29278307Srpaulo // The number of buffers is twice the number of threads. 30278307Srpaulo // This wastes RAM but keeps the threads busy when buffers 31278307Srpaulo // finish out of order. 32278307Srpaulo // 33278307Srpaulo // NOTE: If this is changed, update BUF_SIZE_MAX too. 34278307Srpaulo *bufs_count = threads * 2; 35278307Srpaulo *bufs_alloc_size = *bufs_count * buf_size_max; 36278307Srpaulo 37278307Srpaulo return LZMA_OK; 38278307Srpaulo} 39278307Srpaulo 40278307Srpaulo 41278307Srpauloextern uint64_t 42278307Srpaulolzma_outq_memusage(uint64_t buf_size_max, uint32_t threads) 43278307Srpaulo{ 44278307Srpaulo uint64_t bufs_alloc_size; 45278307Srpaulo uint32_t bufs_count; 46278307Srpaulo 47278307Srpaulo if (get_options(&bufs_alloc_size, &bufs_count, buf_size_max, threads) 48278307Srpaulo != LZMA_OK) 49278307Srpaulo return UINT64_MAX; 50278307Srpaulo 51278307Srpaulo return sizeof(lzma_outq) + bufs_count * sizeof(lzma_outbuf) 52278307Srpaulo + bufs_alloc_size; 53278307Srpaulo} 54278307Srpaulo 55278307Srpaulo 56278307Srpauloextern lzma_ret 57278307Srpaulolzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, 58278307Srpaulo uint64_t buf_size_max, uint32_t threads) 59278307Srpaulo{ 60278307Srpaulo uint64_t bufs_alloc_size; 61278307Srpaulo uint32_t bufs_count; 62278307Srpaulo 63278307Srpaulo // Set bufs_count and bufs_alloc_size. 64278307Srpaulo return_if_error(get_options(&bufs_alloc_size, &bufs_count, 65278307Srpaulo buf_size_max, threads)); 66278307Srpaulo 67278307Srpaulo // Allocate memory if needed. 68278307Srpaulo if (outq->buf_size_max != buf_size_max 69278307Srpaulo || outq->bufs_allocated != bufs_count) { 70278307Srpaulo lzma_outq_end(outq, allocator); 71278307Srpaulo 72278307Srpaulo#if SIZE_MAX < UINT64_MAX 73278307Srpaulo if (bufs_alloc_size > SIZE_MAX) 74278307Srpaulo return LZMA_MEM_ERROR; 75278307Srpaulo#endif 76278307Srpaulo 77278307Srpaulo outq->bufs = lzma_alloc(bufs_count * sizeof(lzma_outbuf), 78278307Srpaulo allocator); 79278307Srpaulo outq->bufs_mem = lzma_alloc((size_t)(bufs_alloc_size), 80278307Srpaulo allocator); 81278307Srpaulo 82278307Srpaulo if (outq->bufs == NULL || outq->bufs_mem == NULL) { 83278307Srpaulo lzma_outq_end(outq, allocator); 84278307Srpaulo return LZMA_MEM_ERROR; 85278307Srpaulo } 86278307Srpaulo } 87278307Srpaulo 88278307Srpaulo // Initialize the rest of the main structure. Initialization of 89278307Srpaulo // outq->bufs[] is done when they are actually needed. 90278307Srpaulo outq->buf_size_max = (size_t)(buf_size_max); 91278307Srpaulo outq->bufs_allocated = bufs_count; 92278307Srpaulo outq->bufs_pos = 0; 93278307Srpaulo outq->bufs_used = 0; 94278307Srpaulo outq->read_pos = 0; 95278307Srpaulo 96278307Srpaulo return LZMA_OK; 97278307Srpaulo} 98278307Srpaulo 99278307Srpaulo 100278307Srpauloextern void 101278307Srpaulolzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator) 102278307Srpaulo{ 103278307Srpaulo lzma_free(outq->bufs, allocator); 104278307Srpaulo outq->bufs = NULL; 105278307Srpaulo 106278307Srpaulo lzma_free(outq->bufs_mem, allocator); 107278307Srpaulo outq->bufs_mem = NULL; 108278307Srpaulo 109278307Srpaulo return; 110278307Srpaulo} 111278307Srpaulo 112278307Srpaulo 113278307Srpauloextern lzma_outbuf * 114278307Srpaulolzma_outq_get_buf(lzma_outq *outq) 115278307Srpaulo{ 116278307Srpaulo // Caller must have checked it with lzma_outq_has_buf(). 117278307Srpaulo assert(outq->bufs_used < outq->bufs_allocated); 118278307Srpaulo 119278307Srpaulo // Initialize the new buffer. 120278307Srpaulo lzma_outbuf *buf = &outq->bufs[outq->bufs_pos]; 121278307Srpaulo buf->buf = outq->bufs_mem + outq->bufs_pos * outq->buf_size_max; 122278307Srpaulo buf->size = 0; 123278307Srpaulo buf->finished = false; 124278307Srpaulo 125278307Srpaulo // Update the queue state. 126278307Srpaulo if (++outq->bufs_pos == outq->bufs_allocated) 127278307Srpaulo outq->bufs_pos = 0; 128278307Srpaulo 129278307Srpaulo ++outq->bufs_used; 130278307Srpaulo 131278307Srpaulo return buf; 132278307Srpaulo} 133278307Srpaulo 134278307Srpaulo 135278307Srpauloextern bool 136278307Srpaulolzma_outq_is_readable(const lzma_outq *outq) 137278307Srpaulo{ 138278307Srpaulo uint32_t i = outq->bufs_pos - outq->bufs_used; 139278307Srpaulo if (outq->bufs_pos < outq->bufs_used) 140278307Srpaulo i += outq->bufs_allocated; 141278307Srpaulo 142278307Srpaulo return outq->bufs[i].finished; 143278307Srpaulo} 144278307Srpaulo 145278307Srpaulo 146278307Srpauloextern lzma_ret 147278307Srpaulolzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out, 148278307Srpaulo size_t *restrict out_pos, size_t out_size, 149278307Srpaulo lzma_vli *restrict unpadded_size, 150278307Srpaulo lzma_vli *restrict uncompressed_size) 151278307Srpaulo{ 152278307Srpaulo // There must be at least one buffer from which to read. 153278307Srpaulo if (outq->bufs_used == 0) 154278307Srpaulo return LZMA_OK; 155278307Srpaulo 156278307Srpaulo // Get the buffer. 157278307Srpaulo uint32_t i = outq->bufs_pos - outq->bufs_used; 158278307Srpaulo if (outq->bufs_pos < outq->bufs_used) 159278307Srpaulo i += outq->bufs_allocated; 160278307Srpaulo 161278307Srpaulo lzma_outbuf *buf = &outq->bufs[i]; 162278307Srpaulo 163278307Srpaulo // If it isn't finished yet, we cannot read from it. 164278307Srpaulo if (!buf->finished) 165278307Srpaulo return LZMA_OK; 166278307Srpaulo 167278307Srpaulo // Copy from the buffer to output. 168278307Srpaulo lzma_bufcpy(buf->buf, &outq->read_pos, buf->size, 169278307Srpaulo out, out_pos, out_size); 170278307Srpaulo 171278307Srpaulo // Return if we didn't get all the data from the buffer. 172278307Srpaulo if (outq->read_pos < buf->size) 173278307Srpaulo return LZMA_OK; 174278307Srpaulo 175278307Srpaulo // The buffer was finished. Tell the caller its size information. 176278307Srpaulo *unpadded_size = buf->unpadded_size; 177278307Srpaulo *uncompressed_size = buf->uncompressed_size; 178278307Srpaulo 179278307Srpaulo // Free this buffer for further use. 180278307Srpaulo --outq->bufs_used; 181278307Srpaulo outq->read_pos = 0; 182278307Srpaulo 183278307Srpaulo return LZMA_STREAM_END; 184278307Srpaulo} 185