1// SPDX-License-Identifier: 0BSD
2
3///////////////////////////////////////////////////////////////////////////////
4//
5/// \file       outqueue.c
6/// \brief      Output queue handling in multithreaded coding
7//
8//  Author:     Lasse Collin
9//
10///////////////////////////////////////////////////////////////////////////////
11
12#include "outqueue.h"
13
14
15/// Get the maximum number of buffers that may be allocated based
16/// on the number of threads. For now this is twice the number of threads.
17/// It's a compromise between RAM usage and keeping the worker threads busy
18/// when buffers finish out of order.
19#define GET_BUFS_LIMIT(threads) (2 * (threads))
20
21
22extern uint64_t
23lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
24{
25	// This is to ease integer overflow checking: We may allocate up to
26	// GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra
27	// memory for other data structures too (that's the /2).
28	//
29	// lzma_outq_prealloc_buf() will still accept bigger buffers than this.
30	const uint64_t limit
31			= UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2;
32
33	if (threads > LZMA_THREADS_MAX || buf_size_max > limit)
34		return UINT64_MAX;
35
36	return GET_BUFS_LIMIT(threads)
37			* lzma_outq_outbuf_memusage(buf_size_max);
38}
39
40
41static void
42move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator)
43{
44	assert(outq->head != NULL);
45	assert(outq->tail != NULL);
46	assert(outq->bufs_in_use > 0);
47
48	lzma_outbuf *buf = outq->head;
49	outq->head = buf->next;
50	if (outq->head == NULL)
51		outq->tail = NULL;
52
53	if (outq->cache != NULL && outq->cache->allocated != buf->allocated)
54		lzma_outq_clear_cache(outq, allocator);
55
56	buf->next = outq->cache;
57	outq->cache = buf;
58
59	--outq->bufs_in_use;
60	outq->mem_in_use -= lzma_outq_outbuf_memusage(buf->allocated);
61
62	return;
63}
64
65
66static void
67free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator)
68{
69	assert(outq->cache != NULL);
70
71	lzma_outbuf *buf = outq->cache;
72	outq->cache = buf->next;
73
74	--outq->bufs_allocated;
75	outq->mem_allocated -= lzma_outq_outbuf_memusage(buf->allocated);
76
77	lzma_free(buf, allocator);
78	return;
79}
80
81
82extern void
83lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator)
84{
85	while (outq->cache != NULL)
86		free_one_cached_buffer(outq, allocator);
87
88	return;
89}
90
91
92extern void
93lzma_outq_clear_cache2(lzma_outq *outq, const lzma_allocator *allocator,
94		size_t keep_size)
95{
96	if (outq->cache == NULL)
97		return;
98
99	// Free all but one.
100	while (outq->cache->next != NULL)
101		free_one_cached_buffer(outq, allocator);
102
103	// Free the last one only if its size doesn't equal to keep_size.
104	if (outq->cache->allocated != keep_size)
105		free_one_cached_buffer(outq, allocator);
106
107	return;
108}
109
110
111extern lzma_ret
112lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator,
113		uint32_t threads)
114{
115	if (threads > LZMA_THREADS_MAX)
116		return LZMA_OPTIONS_ERROR;
117
118	const uint32_t bufs_limit = GET_BUFS_LIMIT(threads);
119
120	// Clear head/tail.
121	while (outq->head != NULL)
122		move_head_to_cache(outq, allocator);
123
124	// If new buf_limit is lower than the old one, we may need to free
125	// a few cached buffers.
126	while (bufs_limit < outq->bufs_allocated)
127		free_one_cached_buffer(outq, allocator);
128
129	outq->bufs_limit = bufs_limit;
130	outq->read_pos = 0;
131
132	return LZMA_OK;
133}
134
135
136extern void
137lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
138{
139	while (outq->head != NULL)
140		move_head_to_cache(outq, allocator);
141
142	lzma_outq_clear_cache(outq, allocator);
143	return;
144}
145
146
147extern lzma_ret
148lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator,
149		size_t size)
150{
151	// Caller must have checked it with lzma_outq_has_buf().
152	assert(outq->bufs_in_use < outq->bufs_limit);
153
154	// If there already is appropriately-sized buffer in the cache,
155	// we need to do nothing.
156	if (outq->cache != NULL && outq->cache->allocated == size)
157		return LZMA_OK;
158
159	if (size > SIZE_MAX - sizeof(lzma_outbuf))
160		return LZMA_MEM_ERROR;
161
162	const size_t alloc_size = lzma_outq_outbuf_memusage(size);
163
164	// The cache may have buffers but their size is wrong.
165	lzma_outq_clear_cache(outq, allocator);
166
167	outq->cache = lzma_alloc(alloc_size, allocator);
168	if (outq->cache == NULL)
169		return LZMA_MEM_ERROR;
170
171	outq->cache->next = NULL;
172	outq->cache->allocated = size;
173
174	++outq->bufs_allocated;
175	outq->mem_allocated += alloc_size;
176
177	return LZMA_OK;
178}
179
180
181extern lzma_outbuf *
182lzma_outq_get_buf(lzma_outq *outq, void *worker)
183{
184	// Caller must have used lzma_outq_prealloc_buf() to ensure these.
185	assert(outq->bufs_in_use < outq->bufs_limit);
186	assert(outq->bufs_in_use < outq->bufs_allocated);
187	assert(outq->cache != NULL);
188
189	lzma_outbuf *buf = outq->cache;
190	outq->cache = buf->next;
191	buf->next = NULL;
192
193	if (outq->tail != NULL) {
194		assert(outq->head != NULL);
195		outq->tail->next = buf;
196	} else {
197		assert(outq->head == NULL);
198		outq->head = buf;
199	}
200
201	outq->tail = buf;
202
203	buf->worker = worker;
204	buf->finished = false;
205	buf->finish_ret = LZMA_STREAM_END;
206	buf->pos = 0;
207	buf->decoder_in_pos = 0;
208
209	buf->unpadded_size = 0;
210	buf->uncompressed_size = 0;
211
212	++outq->bufs_in_use;
213	outq->mem_in_use += lzma_outq_outbuf_memusage(buf->allocated);
214
215	return buf;
216}
217
218
219extern bool
220lzma_outq_is_readable(const lzma_outq *outq)
221{
222	if (outq->head == NULL)
223		return false;
224
225	return outq->read_pos < outq->head->pos || outq->head->finished;
226}
227
228
229extern lzma_ret
230lzma_outq_read(lzma_outq *restrict outq,
231		const lzma_allocator *restrict allocator,
232		uint8_t *restrict out, size_t *restrict out_pos,
233		size_t out_size,
234		lzma_vli *restrict unpadded_size,
235		lzma_vli *restrict uncompressed_size)
236{
237	// There must be at least one buffer from which to read.
238	if (outq->bufs_in_use == 0)
239		return LZMA_OK;
240
241	// Get the buffer.
242	lzma_outbuf *buf = outq->head;
243
244	// Copy from the buffer to output.
245	//
246	// FIXME? In threaded decoder it may be bad to do this copy while
247	// the mutex is being held.
248	lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos,
249			out, out_pos, out_size);
250
251	// Return if we didn't get all the data from the buffer.
252	if (!buf->finished || outq->read_pos < buf->pos)
253		return LZMA_OK;
254
255	// The buffer was finished. Tell the caller its size information.
256	if (unpadded_size != NULL)
257		*unpadded_size = buf->unpadded_size;
258
259	if (uncompressed_size != NULL)
260		*uncompressed_size = buf->uncompressed_size;
261
262	// Remember the return value.
263	const lzma_ret finish_ret = buf->finish_ret;
264
265	// Free this buffer for further use.
266	move_head_to_cache(outq, allocator);
267	outq->read_pos = 0;
268
269	return finish_ret;
270}
271
272
273extern void
274lzma_outq_enable_partial_output(lzma_outq *outq,
275		void (*enable_partial_output)(void *worker))
276{
277	if (outq->head != NULL && !outq->head->finished
278			&& outq->head->worker != NULL) {
279		enable_partial_output(outq->head->worker);
280
281		// Set it to NULL since calling it twice is pointless.
282		outq->head->worker = NULL;
283	}
284
285	return;
286}
287