1278307Srpaulo///////////////////////////////////////////////////////////////////////////////
2278307Srpaulo//
3278307Srpaulo/// \file       stream_encoder_mt.c
4278307Srpaulo/// \brief      Multithreaded .xz Stream encoder
5278307Srpaulo//
6278307Srpaulo//  Author:     Lasse Collin
7278307Srpaulo//
8278307Srpaulo//  This file has been put into the public domain.
9278307Srpaulo//  You can do whatever you want with this file.
10278307Srpaulo//
11278307Srpaulo///////////////////////////////////////////////////////////////////////////////
12278307Srpaulo
13278307Srpaulo#include "filter_encoder.h"
14278307Srpaulo#include "easy_preset.h"
15278307Srpaulo#include "block_encoder.h"
16278307Srpaulo#include "block_buffer_encoder.h"
17278307Srpaulo#include "index_encoder.h"
18278307Srpaulo#include "outqueue.h"
19278307Srpaulo
20278307Srpaulo
21278307Srpaulo/// Maximum supported block size. This makes it simpler to prevent integer
22278307Srpaulo/// overflows if we are given unusually large block size.
23278307Srpaulo#define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24278307Srpaulo
25278307Srpaulo
26278307Srpaulotypedef enum {
27278307Srpaulo	/// Waiting for work.
28278307Srpaulo	THR_IDLE,
29278307Srpaulo
30278307Srpaulo	/// Encoding is in progress.
31278307Srpaulo	THR_RUN,
32278307Srpaulo
33278307Srpaulo	/// Encoding is in progress but no more input data will
34278307Srpaulo	/// be read.
35278307Srpaulo	THR_FINISH,
36278307Srpaulo
37278307Srpaulo	/// The main thread wants the thread to stop whatever it was doing
38278307Srpaulo	/// but not exit.
39278307Srpaulo	THR_STOP,
40278307Srpaulo
41278307Srpaulo	/// The main thread wants the thread to exit. We could use
42278307Srpaulo	/// cancellation but since there's stopped anyway, this is lazier.
43278307Srpaulo	THR_EXIT,
44278307Srpaulo
45278307Srpaulo} worker_state;
46278307Srpaulo
47312518Sdelphijtypedef struct lzma_stream_coder_s lzma_stream_coder;
48278307Srpaulo
49278307Srpaulotypedef struct worker_thread_s worker_thread;
50278307Srpaulostruct worker_thread_s {
51278307Srpaulo	worker_state state;
52278307Srpaulo
53278307Srpaulo	/// Input buffer of coder->block_size bytes. The main thread will
54278307Srpaulo	/// put new input into this and update in_size accordingly. Once
55278307Srpaulo	/// no more input is coming, state will be set to THR_FINISH.
56278307Srpaulo	uint8_t *in;
57278307Srpaulo
58278307Srpaulo	/// Amount of data available in the input buffer. This is modified
59278307Srpaulo	/// only by the main thread.
60278307Srpaulo	size_t in_size;
61278307Srpaulo
62278307Srpaulo	/// Output buffer for this thread. This is set by the main
63278307Srpaulo	/// thread every time a new Block is started with this thread
64278307Srpaulo	/// structure.
65278307Srpaulo	lzma_outbuf *outbuf;
66278307Srpaulo
67278307Srpaulo	/// Pointer to the main structure is needed when putting this
68278307Srpaulo	/// thread back to the stack of free threads.
69312518Sdelphij	lzma_stream_coder *coder;
70278307Srpaulo
71278307Srpaulo	/// The allocator is set by the main thread. Since a copy of the
72278307Srpaulo	/// pointer is kept here, the application must not change the
73278307Srpaulo	/// allocator before calling lzma_end().
74278307Srpaulo	const lzma_allocator *allocator;
75278307Srpaulo
76278307Srpaulo	/// Amount of uncompressed data that has already been compressed.
77278307Srpaulo	uint64_t progress_in;
78278307Srpaulo
79278307Srpaulo	/// Amount of compressed data that is ready.
80278307Srpaulo	uint64_t progress_out;
81278307Srpaulo
82278307Srpaulo	/// Block encoder
83278307Srpaulo	lzma_next_coder block_encoder;
84278307Srpaulo
85278307Srpaulo	/// Compression options for this Block
86278307Srpaulo	lzma_block block_options;
87278307Srpaulo
88278307Srpaulo	/// Next structure in the stack of free worker threads.
89278307Srpaulo	worker_thread *next;
90278307Srpaulo
91278307Srpaulo	mythread_mutex mutex;
92278307Srpaulo	mythread_cond cond;
93278307Srpaulo
94278307Srpaulo	/// The ID of this thread is used to join the thread
95278307Srpaulo	/// when it's not needed anymore.
96278307Srpaulo	mythread thread_id;
97278307Srpaulo};
98278307Srpaulo
99278307Srpaulo
100312518Sdelphijstruct lzma_stream_coder_s {
101278307Srpaulo	enum {
102278307Srpaulo		SEQ_STREAM_HEADER,
103278307Srpaulo		SEQ_BLOCK,
104278307Srpaulo		SEQ_INDEX,
105278307Srpaulo		SEQ_STREAM_FOOTER,
106278307Srpaulo	} sequence;
107278307Srpaulo
108278307Srpaulo	/// Start a new Block every block_size bytes of input unless
109278307Srpaulo	/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
110278307Srpaulo	size_t block_size;
111278307Srpaulo
112278307Srpaulo	/// The filter chain currently in use
113278307Srpaulo	lzma_filter filters[LZMA_FILTERS_MAX + 1];
114278307Srpaulo
115278307Srpaulo
116278307Srpaulo	/// Index to hold sizes of the Blocks
117278307Srpaulo	lzma_index *index;
118278307Srpaulo
119278307Srpaulo	/// Index encoder
120278307Srpaulo	lzma_next_coder index_encoder;
121278307Srpaulo
122278307Srpaulo
123278307Srpaulo	/// Stream Flags for encoding the Stream Header and Stream Footer.
124278307Srpaulo	lzma_stream_flags stream_flags;
125278307Srpaulo
126278307Srpaulo	/// Buffer to hold Stream Header and Stream Footer.
127278307Srpaulo	uint8_t header[LZMA_STREAM_HEADER_SIZE];
128278307Srpaulo
129278307Srpaulo	/// Read position in header[]
130278307Srpaulo	size_t header_pos;
131278307Srpaulo
132278307Srpaulo
133278307Srpaulo	/// Output buffer queue for compressed data
134278307Srpaulo	lzma_outq outq;
135278307Srpaulo
136278307Srpaulo
137278307Srpaulo	/// Maximum wait time if cannot use all the input and cannot
138278307Srpaulo	/// fill the output buffer. This is in milliseconds.
139278307Srpaulo	uint32_t timeout;
140278307Srpaulo
141278307Srpaulo
142278307Srpaulo	/// Error code from a worker thread
143278307Srpaulo	lzma_ret thread_error;
144278307Srpaulo
145278307Srpaulo	/// Array of allocated thread-specific structures
146278307Srpaulo	worker_thread *threads;
147278307Srpaulo
148278307Srpaulo	/// Number of structures in "threads" above. This is also the
149278307Srpaulo	/// number of threads that will be created at maximum.
150278307Srpaulo	uint32_t threads_max;
151278307Srpaulo
152278307Srpaulo	/// Number of thread structures that have been initialized, and
153278307Srpaulo	/// thus the number of worker threads actually created so far.
154278307Srpaulo	uint32_t threads_initialized;
155278307Srpaulo
156278307Srpaulo	/// Stack of free threads. When a thread finishes, it puts itself
157278307Srpaulo	/// back into this stack. This starts as empty because threads
158278307Srpaulo	/// are created only when actually needed.
159278307Srpaulo	worker_thread *threads_free;
160278307Srpaulo
161278307Srpaulo	/// The most recent worker thread to which the main thread writes
162278307Srpaulo	/// the new input from the application.
163278307Srpaulo	worker_thread *thr;
164278307Srpaulo
165278307Srpaulo
166278307Srpaulo	/// Amount of uncompressed data in Blocks that have already
167278307Srpaulo	/// been finished.
168278307Srpaulo	uint64_t progress_in;
169278307Srpaulo
170278307Srpaulo	/// Amount of compressed data in Stream Header + Blocks that
171278307Srpaulo	/// have already been finished.
172278307Srpaulo	uint64_t progress_out;
173278307Srpaulo
174278307Srpaulo
175278307Srpaulo	mythread_mutex mutex;
176278307Srpaulo	mythread_cond cond;
177278307Srpaulo};
178278307Srpaulo
179278307Srpaulo
180278307Srpaulo/// Tell the main thread that something has gone wrong.
181278307Srpaulostatic void
182278307Srpauloworker_error(worker_thread *thr, lzma_ret ret)
183278307Srpaulo{
184278307Srpaulo	assert(ret != LZMA_OK);
185278307Srpaulo	assert(ret != LZMA_STREAM_END);
186278307Srpaulo
187278307Srpaulo	mythread_sync(thr->coder->mutex) {
188278307Srpaulo		if (thr->coder->thread_error == LZMA_OK)
189278307Srpaulo			thr->coder->thread_error = ret;
190278307Srpaulo
191278307Srpaulo		mythread_cond_signal(&thr->coder->cond);
192278307Srpaulo	}
193278307Srpaulo
194278307Srpaulo	return;
195278307Srpaulo}
196278307Srpaulo
197278307Srpaulo
198278307Srpaulostatic worker_state
199278307Srpauloworker_encode(worker_thread *thr, worker_state state)
200278307Srpaulo{
201278307Srpaulo	assert(thr->progress_in == 0);
202278307Srpaulo	assert(thr->progress_out == 0);
203278307Srpaulo
204278307Srpaulo	// Set the Block options.
205278307Srpaulo	thr->block_options = (lzma_block){
206278307Srpaulo		.version = 0,
207278307Srpaulo		.check = thr->coder->stream_flags.check,
208278307Srpaulo		.compressed_size = thr->coder->outq.buf_size_max,
209278307Srpaulo		.uncompressed_size = thr->coder->block_size,
210278307Srpaulo
211278307Srpaulo		// TODO: To allow changing the filter chain, the filters
212278307Srpaulo		// array must be copied to each worker_thread.
213278307Srpaulo		.filters = thr->coder->filters,
214278307Srpaulo	};
215278307Srpaulo
216278307Srpaulo	// Calculate maximum size of the Block Header. This amount is
217278307Srpaulo	// reserved in the beginning of the buffer so that Block Header
218278307Srpaulo	// along with Compressed Size and Uncompressed Size can be
219278307Srpaulo	// written there.
220278307Srpaulo	lzma_ret ret = lzma_block_header_size(&thr->block_options);
221278307Srpaulo	if (ret != LZMA_OK) {
222278307Srpaulo		worker_error(thr, ret);
223278307Srpaulo		return THR_STOP;
224278307Srpaulo	}
225278307Srpaulo
226278307Srpaulo	// Initialize the Block encoder.
227278307Srpaulo	ret = lzma_block_encoder_init(&thr->block_encoder,
228278307Srpaulo			thr->allocator, &thr->block_options);
229278307Srpaulo	if (ret != LZMA_OK) {
230278307Srpaulo		worker_error(thr, ret);
231278307Srpaulo		return THR_STOP;
232278307Srpaulo	}
233278307Srpaulo
234278307Srpaulo	size_t in_pos = 0;
235278307Srpaulo	size_t in_size = 0;
236278307Srpaulo
237278307Srpaulo	thr->outbuf->size = thr->block_options.header_size;
238278307Srpaulo	const size_t out_size = thr->coder->outq.buf_size_max;
239278307Srpaulo
240278307Srpaulo	do {
241278307Srpaulo		mythread_sync(thr->mutex) {
242278307Srpaulo			// Store in_pos and out_pos into *thr so that
243278307Srpaulo			// an application may read them via
244278307Srpaulo			// lzma_get_progress() to get progress information.
245278307Srpaulo			//
246278307Srpaulo			// NOTE: These aren't updated when the encoding
247278307Srpaulo			// finishes. Instead, the final values are taken
248278307Srpaulo			// later from thr->outbuf.
249278307Srpaulo			thr->progress_in = in_pos;
250278307Srpaulo			thr->progress_out = thr->outbuf->size;
251278307Srpaulo
252278307Srpaulo			while (in_size == thr->in_size
253278307Srpaulo					&& thr->state == THR_RUN)
254278307Srpaulo				mythread_cond_wait(&thr->cond, &thr->mutex);
255278307Srpaulo
256278307Srpaulo			state = thr->state;
257278307Srpaulo			in_size = thr->in_size;
258278307Srpaulo		}
259278307Srpaulo
260278307Srpaulo		// Return if we were asked to stop or exit.
261278307Srpaulo		if (state >= THR_STOP)
262278307Srpaulo			return state;
263278307Srpaulo
264278307Srpaulo		lzma_action action = state == THR_FINISH
265278307Srpaulo				? LZMA_FINISH : LZMA_RUN;
266278307Srpaulo
267278307Srpaulo		// Limit the amount of input given to the Block encoder
268278307Srpaulo		// at once. This way this thread can react fairly quickly
269278307Srpaulo		// if the main thread wants us to stop or exit.
270278307Srpaulo		static const size_t in_chunk_max = 16384;
271278307Srpaulo		size_t in_limit = in_size;
272278307Srpaulo		if (in_size - in_pos > in_chunk_max) {
273278307Srpaulo			in_limit = in_pos + in_chunk_max;
274278307Srpaulo			action = LZMA_RUN;
275278307Srpaulo		}
276278307Srpaulo
277278307Srpaulo		ret = thr->block_encoder.code(
278278307Srpaulo				thr->block_encoder.coder, thr->allocator,
279278307Srpaulo				thr->in, &in_pos, in_limit, thr->outbuf->buf,
280278307Srpaulo				&thr->outbuf->size, out_size, action);
281278307Srpaulo	} while (ret == LZMA_OK && thr->outbuf->size < out_size);
282278307Srpaulo
283278307Srpaulo	switch (ret) {
284278307Srpaulo	case LZMA_STREAM_END:
285278307Srpaulo		assert(state == THR_FINISH);
286278307Srpaulo
287278307Srpaulo		// Encode the Block Header. By doing it after
288278307Srpaulo		// the compression, we can store the Compressed Size
289278307Srpaulo		// and Uncompressed Size fields.
290278307Srpaulo		ret = lzma_block_header_encode(&thr->block_options,
291278307Srpaulo				thr->outbuf->buf);
292278307Srpaulo		if (ret != LZMA_OK) {
293278307Srpaulo			worker_error(thr, ret);
294278307Srpaulo			return THR_STOP;
295278307Srpaulo		}
296278307Srpaulo
297278307Srpaulo		break;
298278307Srpaulo
299278307Srpaulo	case LZMA_OK:
300278307Srpaulo		// The data was incompressible. Encode it using uncompressed
301278307Srpaulo		// LZMA2 chunks.
302278307Srpaulo		//
303278307Srpaulo		// First wait that we have gotten all the input.
304278307Srpaulo		mythread_sync(thr->mutex) {
305278307Srpaulo			while (thr->state == THR_RUN)
306278307Srpaulo				mythread_cond_wait(&thr->cond, &thr->mutex);
307278307Srpaulo
308278307Srpaulo			state = thr->state;
309278307Srpaulo			in_size = thr->in_size;
310278307Srpaulo		}
311278307Srpaulo
312278307Srpaulo		if (state >= THR_STOP)
313278307Srpaulo			return state;
314278307Srpaulo
315278307Srpaulo		// Do the encoding. This takes care of the Block Header too.
316278307Srpaulo		thr->outbuf->size = 0;
317278307Srpaulo		ret = lzma_block_uncomp_encode(&thr->block_options,
318278307Srpaulo				thr->in, in_size, thr->outbuf->buf,
319278307Srpaulo				&thr->outbuf->size, out_size);
320278307Srpaulo
321278307Srpaulo		// It shouldn't fail.
322278307Srpaulo		if (ret != LZMA_OK) {
323278307Srpaulo			worker_error(thr, LZMA_PROG_ERROR);
324278307Srpaulo			return THR_STOP;
325278307Srpaulo		}
326278307Srpaulo
327278307Srpaulo		break;
328278307Srpaulo
329278307Srpaulo	default:
330278307Srpaulo		worker_error(thr, ret);
331278307Srpaulo		return THR_STOP;
332278307Srpaulo	}
333278307Srpaulo
334278307Srpaulo	// Set the size information that will be read by the main thread
335278307Srpaulo	// to write the Index field.
336278307Srpaulo	thr->outbuf->unpadded_size
337278307Srpaulo			= lzma_block_unpadded_size(&thr->block_options);
338278307Srpaulo	assert(thr->outbuf->unpadded_size != 0);
339278307Srpaulo	thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
340278307Srpaulo
341278307Srpaulo	return THR_FINISH;
342278307Srpaulo}
343278307Srpaulo
344278307Srpaulo
345278307Srpaulostatic MYTHREAD_RET_TYPE
346278307Srpauloworker_start(void *thr_ptr)
347278307Srpaulo{
348278307Srpaulo	worker_thread *thr = thr_ptr;
349278307Srpaulo	worker_state state = THR_IDLE; // Init to silence a warning
350278307Srpaulo
351278307Srpaulo	while (true) {
352278307Srpaulo		// Wait for work.
353278307Srpaulo		mythread_sync(thr->mutex) {
354278307Srpaulo			while (true) {
355278307Srpaulo				// The thread is already idle so if we are
356278307Srpaulo				// requested to stop, just set the state.
357278307Srpaulo				if (thr->state == THR_STOP) {
358278307Srpaulo					thr->state = THR_IDLE;
359278307Srpaulo					mythread_cond_signal(&thr->cond);
360278307Srpaulo				}
361278307Srpaulo
362278307Srpaulo				state = thr->state;
363278307Srpaulo				if (state != THR_IDLE)
364278307Srpaulo					break;
365278307Srpaulo
366278307Srpaulo				mythread_cond_wait(&thr->cond, &thr->mutex);
367278307Srpaulo			}
368278307Srpaulo		}
369278307Srpaulo
370278307Srpaulo		assert(state != THR_IDLE);
371278307Srpaulo		assert(state != THR_STOP);
372278307Srpaulo
373278307Srpaulo		if (state <= THR_FINISH)
374278307Srpaulo			state = worker_encode(thr, state);
375278307Srpaulo
376278307Srpaulo		if (state == THR_EXIT)
377278307Srpaulo			break;
378278307Srpaulo
379278307Srpaulo		// Mark the thread as idle unless the main thread has
380278307Srpaulo		// told us to exit. Signal is needed for the case
381278307Srpaulo		// where the main thread is waiting for the threads to stop.
382278307Srpaulo		mythread_sync(thr->mutex) {
383278307Srpaulo			if (thr->state != THR_EXIT) {
384278307Srpaulo				thr->state = THR_IDLE;
385278307Srpaulo				mythread_cond_signal(&thr->cond);
386278307Srpaulo			}
387278307Srpaulo		}
388278307Srpaulo
389278307Srpaulo		mythread_sync(thr->coder->mutex) {
390278307Srpaulo			// Mark the output buffer as finished if
391278307Srpaulo			// no errors occurred.
392278307Srpaulo			thr->outbuf->finished = state == THR_FINISH;
393278307Srpaulo
394278307Srpaulo			// Update the main progress info.
395278307Srpaulo			thr->coder->progress_in
396278307Srpaulo					+= thr->outbuf->uncompressed_size;
397278307Srpaulo			thr->coder->progress_out += thr->outbuf->size;
398278307Srpaulo			thr->progress_in = 0;
399278307Srpaulo			thr->progress_out = 0;
400278307Srpaulo
401278307Srpaulo			// Return this thread to the stack of free threads.
402278307Srpaulo			thr->next = thr->coder->threads_free;
403278307Srpaulo			thr->coder->threads_free = thr;
404278307Srpaulo
405278307Srpaulo			mythread_cond_signal(&thr->coder->cond);
406278307Srpaulo		}
407278307Srpaulo	}
408278307Srpaulo
409278307Srpaulo	// Exiting, free the resources.
410278307Srpaulo	mythread_mutex_destroy(&thr->mutex);
411278307Srpaulo	mythread_cond_destroy(&thr->cond);
412278307Srpaulo
413278307Srpaulo	lzma_next_end(&thr->block_encoder, thr->allocator);
414278307Srpaulo	lzma_free(thr->in, thr->allocator);
415278307Srpaulo	return MYTHREAD_RET_VALUE;
416278307Srpaulo}
417278307Srpaulo
418278307Srpaulo
419278307Srpaulo/// Make the threads stop but not exit. Optionally wait for them to stop.
420278307Srpaulostatic void
421312518Sdelphijthreads_stop(lzma_stream_coder *coder, bool wait_for_threads)
422278307Srpaulo{
423278307Srpaulo	// Tell the threads to stop.
424278307Srpaulo	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
425278307Srpaulo		mythread_sync(coder->threads[i].mutex) {
426278307Srpaulo			coder->threads[i].state = THR_STOP;
427278307Srpaulo			mythread_cond_signal(&coder->threads[i].cond);
428278307Srpaulo		}
429278307Srpaulo	}
430278307Srpaulo
431278307Srpaulo	if (!wait_for_threads)
432278307Srpaulo		return;
433278307Srpaulo
434278307Srpaulo	// Wait for the threads to settle in the idle state.
435278307Srpaulo	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
436278307Srpaulo		mythread_sync(coder->threads[i].mutex) {
437278307Srpaulo			while (coder->threads[i].state != THR_IDLE)
438278307Srpaulo				mythread_cond_wait(&coder->threads[i].cond,
439278307Srpaulo						&coder->threads[i].mutex);
440278307Srpaulo		}
441278307Srpaulo	}
442278307Srpaulo
443278307Srpaulo	return;
444278307Srpaulo}
445278307Srpaulo
446278307Srpaulo
447278307Srpaulo/// Stop the threads and free the resources associated with them.
448278307Srpaulo/// Wait until the threads have exited.
449278307Srpaulostatic void
450312518Sdelphijthreads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
451278307Srpaulo{
452278307Srpaulo	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453278307Srpaulo		mythread_sync(coder->threads[i].mutex) {
454278307Srpaulo			coder->threads[i].state = THR_EXIT;
455278307Srpaulo			mythread_cond_signal(&coder->threads[i].cond);
456278307Srpaulo		}
457278307Srpaulo	}
458278307Srpaulo
459278307Srpaulo	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460278307Srpaulo		int ret = mythread_join(coder->threads[i].thread_id);
461278307Srpaulo		assert(ret == 0);
462278307Srpaulo		(void)ret;
463278307Srpaulo	}
464278307Srpaulo
465278307Srpaulo	lzma_free(coder->threads, allocator);
466278307Srpaulo	return;
467278307Srpaulo}
468278307Srpaulo
469278307Srpaulo
470278307Srpaulo/// Initialize a new worker_thread structure and create a new thread.
471278307Srpaulostatic lzma_ret
472312518Sdelphijinitialize_new_thread(lzma_stream_coder *coder,
473312518Sdelphij		const lzma_allocator *allocator)
474278307Srpaulo{
475278307Srpaulo	worker_thread *thr = &coder->threads[coder->threads_initialized];
476278307Srpaulo
477278307Srpaulo	thr->in = lzma_alloc(coder->block_size, allocator);
478278307Srpaulo	if (thr->in == NULL)
479278307Srpaulo		return LZMA_MEM_ERROR;
480278307Srpaulo
481278307Srpaulo	if (mythread_mutex_init(&thr->mutex))
482278307Srpaulo		goto error_mutex;
483278307Srpaulo
484278307Srpaulo	if (mythread_cond_init(&thr->cond))
485278307Srpaulo		goto error_cond;
486278307Srpaulo
487278307Srpaulo	thr->state = THR_IDLE;
488278307Srpaulo	thr->allocator = allocator;
489278307Srpaulo	thr->coder = coder;
490278307Srpaulo	thr->progress_in = 0;
491278307Srpaulo	thr->progress_out = 0;
492278307Srpaulo	thr->block_encoder = LZMA_NEXT_CODER_INIT;
493278307Srpaulo
494278307Srpaulo	if (mythread_create(&thr->thread_id, &worker_start, thr))
495278307Srpaulo		goto error_thread;
496278307Srpaulo
497278307Srpaulo	++coder->threads_initialized;
498278307Srpaulo	coder->thr = thr;
499278307Srpaulo
500278307Srpaulo	return LZMA_OK;
501278307Srpaulo
502278307Srpauloerror_thread:
503278307Srpaulo	mythread_cond_destroy(&thr->cond);
504278307Srpaulo
505278307Srpauloerror_cond:
506278307Srpaulo	mythread_mutex_destroy(&thr->mutex);
507278307Srpaulo
508278307Srpauloerror_mutex:
509278307Srpaulo	lzma_free(thr->in, allocator);
510278307Srpaulo	return LZMA_MEM_ERROR;
511278307Srpaulo}
512278307Srpaulo
513278307Srpaulo
514278307Srpaulostatic lzma_ret
515312518Sdelphijget_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
516278307Srpaulo{
517278307Srpaulo	// If there are no free output subqueues, there is no
518278307Srpaulo	// point to try getting a thread.
519278307Srpaulo	if (!lzma_outq_has_buf(&coder->outq))
520278307Srpaulo		return LZMA_OK;
521278307Srpaulo
522278307Srpaulo	// If there is a free structure on the stack, use it.
523278307Srpaulo	mythread_sync(coder->mutex) {
524278307Srpaulo		if (coder->threads_free != NULL) {
525278307Srpaulo			coder->thr = coder->threads_free;
526278307Srpaulo			coder->threads_free = coder->threads_free->next;
527278307Srpaulo		}
528278307Srpaulo	}
529278307Srpaulo
530278307Srpaulo	if (coder->thr == NULL) {
531278307Srpaulo		// If there are no uninitialized structures left, return.
532278307Srpaulo		if (coder->threads_initialized == coder->threads_max)
533278307Srpaulo			return LZMA_OK;
534278307Srpaulo
535278307Srpaulo		// Initialize a new thread.
536278307Srpaulo		return_if_error(initialize_new_thread(coder, allocator));
537278307Srpaulo	}
538278307Srpaulo
539278307Srpaulo	// Reset the parts of the thread state that have to be done
540278307Srpaulo	// in the main thread.
541278307Srpaulo	mythread_sync(coder->thr->mutex) {
542278307Srpaulo		coder->thr->state = THR_RUN;
543278307Srpaulo		coder->thr->in_size = 0;
544278307Srpaulo		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
545278307Srpaulo		mythread_cond_signal(&coder->thr->cond);
546278307Srpaulo	}
547278307Srpaulo
548278307Srpaulo	return LZMA_OK;
549278307Srpaulo}
550278307Srpaulo
551278307Srpaulo
552278307Srpaulostatic lzma_ret
553312518Sdelphijstream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
554278307Srpaulo		const uint8_t *restrict in, size_t *restrict in_pos,
555278307Srpaulo		size_t in_size, lzma_action action)
556278307Srpaulo{
557278307Srpaulo	while (*in_pos < in_size
558278307Srpaulo			|| (coder->thr != NULL && action != LZMA_RUN)) {
559278307Srpaulo		if (coder->thr == NULL) {
560278307Srpaulo			// Get a new thread.
561278307Srpaulo			const lzma_ret ret = get_thread(coder, allocator);
562278307Srpaulo			if (coder->thr == NULL)
563278307Srpaulo				return ret;
564278307Srpaulo		}
565278307Srpaulo
566278307Srpaulo		// Copy the input data to thread's buffer.
567278307Srpaulo		size_t thr_in_size = coder->thr->in_size;
568278307Srpaulo		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
569278307Srpaulo				&thr_in_size, coder->block_size);
570278307Srpaulo
571278307Srpaulo		// Tell the Block encoder to finish if
572278307Srpaulo		//  - it has got block_size bytes of input; or
573278307Srpaulo		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
574278307Srpaulo		//    or LZMA_FULL_BARRIER was used.
575278307Srpaulo		//
576278307Srpaulo		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
577278307Srpaulo		const bool finish = thr_in_size == coder->block_size
578278307Srpaulo				|| (*in_pos == in_size && action != LZMA_RUN);
579278307Srpaulo
580278307Srpaulo		bool block_error = false;
581278307Srpaulo
582278307Srpaulo		mythread_sync(coder->thr->mutex) {
583278307Srpaulo			if (coder->thr->state == THR_IDLE) {
584278307Srpaulo				// Something has gone wrong with the Block
585278307Srpaulo				// encoder. It has set coder->thread_error
586278307Srpaulo				// which we will read a few lines later.
587278307Srpaulo				block_error = true;
588278307Srpaulo			} else {
589278307Srpaulo				// Tell the Block encoder its new amount
590278307Srpaulo				// of input and update the state if needed.
591278307Srpaulo				coder->thr->in_size = thr_in_size;
592278307Srpaulo
593278307Srpaulo				if (finish)
594278307Srpaulo					coder->thr->state = THR_FINISH;
595278307Srpaulo
596278307Srpaulo				mythread_cond_signal(&coder->thr->cond);
597278307Srpaulo			}
598278307Srpaulo		}
599278307Srpaulo
600278307Srpaulo		if (block_error) {
601278307Srpaulo			lzma_ret ret;
602278307Srpaulo
603278307Srpaulo			mythread_sync(coder->mutex) {
604278307Srpaulo				ret = coder->thread_error;
605278307Srpaulo			}
606278307Srpaulo
607278307Srpaulo			return ret;
608278307Srpaulo		}
609278307Srpaulo
610278307Srpaulo		if (finish)
611278307Srpaulo			coder->thr = NULL;
612278307Srpaulo	}
613278307Srpaulo
614278307Srpaulo	return LZMA_OK;
615278307Srpaulo}
616278307Srpaulo
617278307Srpaulo
618278307Srpaulo/// Wait until more input can be consumed, more output can be read, or
619278307Srpaulo/// an optional timeout is reached.
620278307Srpaulostatic bool
621312518Sdelphijwait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
622278307Srpaulo		bool *has_blocked, bool has_input)
623278307Srpaulo{
624278307Srpaulo	if (coder->timeout != 0 && !*has_blocked) {
625278307Srpaulo		// Every time when stream_encode_mt() is called via
626278307Srpaulo		// lzma_code(), *has_blocked starts as false. We set it
627278307Srpaulo		// to true here and calculate the absolute time when
628278307Srpaulo		// we must return if there's nothing to do.
629278307Srpaulo		//
630278307Srpaulo		// The idea of *has_blocked is to avoid unneeded calls
631278307Srpaulo		// to mythread_condtime_set(), which may do a syscall
632278307Srpaulo		// depending on the operating system.
633278307Srpaulo		*has_blocked = true;
634278307Srpaulo		mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
635278307Srpaulo	}
636278307Srpaulo
637278307Srpaulo	bool timed_out = false;
638278307Srpaulo
639278307Srpaulo	mythread_sync(coder->mutex) {
640278307Srpaulo		// There are four things that we wait. If one of them
641278307Srpaulo		// becomes possible, we return.
642278307Srpaulo		//  - If there is input left, we need to get a free
643278307Srpaulo		//    worker thread and an output buffer for it.
644278307Srpaulo		//  - Data ready to be read from the output queue.
645278307Srpaulo		//  - A worker thread indicates an error.
646278307Srpaulo		//  - Time out occurs.
647278307Srpaulo		while ((!has_input || coder->threads_free == NULL
648278307Srpaulo					|| !lzma_outq_has_buf(&coder->outq))
649278307Srpaulo				&& !lzma_outq_is_readable(&coder->outq)
650278307Srpaulo				&& coder->thread_error == LZMA_OK
651278307Srpaulo				&& !timed_out) {
652278307Srpaulo			if (coder->timeout != 0)
653278307Srpaulo				timed_out = mythread_cond_timedwait(
654278307Srpaulo						&coder->cond, &coder->mutex,
655278307Srpaulo						wait_abs) != 0;
656278307Srpaulo			else
657278307Srpaulo				mythread_cond_wait(&coder->cond,
658278307Srpaulo						&coder->mutex);
659278307Srpaulo		}
660278307Srpaulo	}
661278307Srpaulo
662278307Srpaulo	return timed_out;
663278307Srpaulo}
664278307Srpaulo
665278307Srpaulo
666278307Srpaulostatic lzma_ret
667312518Sdelphijstream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
668278307Srpaulo		const uint8_t *restrict in, size_t *restrict in_pos,
669278307Srpaulo		size_t in_size, uint8_t *restrict out,
670278307Srpaulo		size_t *restrict out_pos, size_t out_size, lzma_action action)
671278307Srpaulo{
672312518Sdelphij	lzma_stream_coder *coder = coder_ptr;
673312518Sdelphij
674278307Srpaulo	switch (coder->sequence) {
675278307Srpaulo	case SEQ_STREAM_HEADER:
676278307Srpaulo		lzma_bufcpy(coder->header, &coder->header_pos,
677278307Srpaulo				sizeof(coder->header),
678278307Srpaulo				out, out_pos, out_size);
679278307Srpaulo		if (coder->header_pos < sizeof(coder->header))
680278307Srpaulo			return LZMA_OK;
681278307Srpaulo
682278307Srpaulo		coder->header_pos = 0;
683278307Srpaulo		coder->sequence = SEQ_BLOCK;
684278307Srpaulo
685278307Srpaulo	// Fall through
686278307Srpaulo
687278307Srpaulo	case SEQ_BLOCK: {
688278307Srpaulo		// Initialized to silence warnings.
689278307Srpaulo		lzma_vli unpadded_size = 0;
690278307Srpaulo		lzma_vli uncompressed_size = 0;
691278307Srpaulo		lzma_ret ret = LZMA_OK;
692278307Srpaulo
693278307Srpaulo		// These are for wait_for_work().
694278307Srpaulo		bool has_blocked = false;
695278307Srpaulo		mythread_condtime wait_abs;
696278307Srpaulo
697278307Srpaulo		while (true) {
698278307Srpaulo			mythread_sync(coder->mutex) {
699278307Srpaulo				// Check for Block encoder errors.
700278307Srpaulo				ret = coder->thread_error;
701278307Srpaulo				if (ret != LZMA_OK) {
702278307Srpaulo					assert(ret != LZMA_STREAM_END);
703278307Srpaulo					break;
704278307Srpaulo				}
705278307Srpaulo
706278307Srpaulo				// Try to read compressed data to out[].
707278307Srpaulo				ret = lzma_outq_read(&coder->outq,
708278307Srpaulo						out, out_pos, out_size,
709278307Srpaulo						&unpadded_size,
710278307Srpaulo						&uncompressed_size);
711278307Srpaulo			}
712278307Srpaulo
713278307Srpaulo			if (ret == LZMA_STREAM_END) {
714278307Srpaulo				// End of Block. Add it to the Index.
715278307Srpaulo				ret = lzma_index_append(coder->index,
716278307Srpaulo						allocator, unpadded_size,
717278307Srpaulo						uncompressed_size);
718278307Srpaulo
719278307Srpaulo				// If we didn't fill the output buffer yet,
720278307Srpaulo				// try to read more data. Maybe the next
721278307Srpaulo				// outbuf has been finished already too.
722278307Srpaulo				if (*out_pos < out_size)
723278307Srpaulo					continue;
724278307Srpaulo			}
725278307Srpaulo
726278307Srpaulo			if (ret != LZMA_OK) {
727278307Srpaulo				// coder->thread_error was set or
728278307Srpaulo				// lzma_index_append() failed.
729278307Srpaulo				threads_stop(coder, false);
730278307Srpaulo				return ret;
731278307Srpaulo			}
732278307Srpaulo
733278307Srpaulo			// Try to give uncompressed data to a worker thread.
734278307Srpaulo			ret = stream_encode_in(coder, allocator,
735278307Srpaulo					in, in_pos, in_size, action);
736278307Srpaulo			if (ret != LZMA_OK) {
737278307Srpaulo				threads_stop(coder, false);
738278307Srpaulo				return ret;
739278307Srpaulo			}
740278307Srpaulo
741278307Srpaulo			// See if we should wait or return.
742278307Srpaulo			//
743278307Srpaulo			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
744278307Srpaulo			if (*in_pos == in_size) {
745278307Srpaulo				// LZMA_RUN: More data is probably coming
746278307Srpaulo				// so return to let the caller fill the
747278307Srpaulo				// input buffer.
748278307Srpaulo				if (action == LZMA_RUN)
749278307Srpaulo					return LZMA_OK;
750278307Srpaulo
751278307Srpaulo				// LZMA_FULL_BARRIER: The same as with
752278307Srpaulo				// LZMA_RUN but tell the caller that the
753278307Srpaulo				// barrier was completed.
754278307Srpaulo				if (action == LZMA_FULL_BARRIER)
755278307Srpaulo					return LZMA_STREAM_END;
756278307Srpaulo
757278307Srpaulo				// Finishing or flushing isn't completed until
758278307Srpaulo				// all input data has been encoded and copied
759278307Srpaulo				// to the output buffer.
760278307Srpaulo				if (lzma_outq_is_empty(&coder->outq)) {
761278307Srpaulo					// LZMA_FINISH: Continue to encode
762278307Srpaulo					// the Index field.
763278307Srpaulo					if (action == LZMA_FINISH)
764278307Srpaulo						break;
765278307Srpaulo
766278307Srpaulo					// LZMA_FULL_FLUSH: Return to tell
767278307Srpaulo					// the caller that flushing was
768278307Srpaulo					// completed.
769278307Srpaulo					if (action == LZMA_FULL_FLUSH)
770278307Srpaulo						return LZMA_STREAM_END;
771278307Srpaulo				}
772278307Srpaulo			}
773278307Srpaulo
774278307Srpaulo			// Return if there is no output space left.
775278307Srpaulo			// This check must be done after testing the input
776278307Srpaulo			// buffer, because we might want to use a different
777278307Srpaulo			// return code.
778278307Srpaulo			if (*out_pos == out_size)
779278307Srpaulo				return LZMA_OK;
780278307Srpaulo
781278307Srpaulo			// Neither in nor out has been used completely.
782278307Srpaulo			// Wait until there's something we can do.
783278307Srpaulo			if (wait_for_work(coder, &wait_abs, &has_blocked,
784278307Srpaulo					*in_pos < in_size))
785278307Srpaulo				return LZMA_TIMED_OUT;
786278307Srpaulo		}
787278307Srpaulo
788278307Srpaulo		// All Blocks have been encoded and the threads have stopped.
789278307Srpaulo		// Prepare to encode the Index field.
790278307Srpaulo		return_if_error(lzma_index_encoder_init(
791278307Srpaulo				&coder->index_encoder, allocator,
792278307Srpaulo				coder->index));
793278307Srpaulo		coder->sequence = SEQ_INDEX;
794278307Srpaulo
795278307Srpaulo		// Update the progress info to take the Index and
796278307Srpaulo		// Stream Footer into account. Those are very fast to encode
797278307Srpaulo		// so in terms of progress information they can be thought
798278307Srpaulo		// to be ready to be copied out.
799278307Srpaulo		coder->progress_out += lzma_index_size(coder->index)
800278307Srpaulo				+ LZMA_STREAM_HEADER_SIZE;
801278307Srpaulo	}
802278307Srpaulo
803278307Srpaulo	// Fall through
804278307Srpaulo
805278307Srpaulo	case SEQ_INDEX: {
806278307Srpaulo		// Call the Index encoder. It doesn't take any input, so
807278307Srpaulo		// those pointers can be NULL.
808278307Srpaulo		const lzma_ret ret = coder->index_encoder.code(
809278307Srpaulo				coder->index_encoder.coder, allocator,
810278307Srpaulo				NULL, NULL, 0,
811278307Srpaulo				out, out_pos, out_size, LZMA_RUN);
812278307Srpaulo		if (ret != LZMA_STREAM_END)
813278307Srpaulo			return ret;
814278307Srpaulo
815278307Srpaulo		// Encode the Stream Footer into coder->buffer.
816278307Srpaulo		coder->stream_flags.backward_size
817278307Srpaulo				= lzma_index_size(coder->index);
818278307Srpaulo		if (lzma_stream_footer_encode(&coder->stream_flags,
819278307Srpaulo				coder->header) != LZMA_OK)
820278307Srpaulo			return LZMA_PROG_ERROR;
821278307Srpaulo
822278307Srpaulo		coder->sequence = SEQ_STREAM_FOOTER;
823278307Srpaulo	}
824278307Srpaulo
825278307Srpaulo	// Fall through
826278307Srpaulo
827278307Srpaulo	case SEQ_STREAM_FOOTER:
828278307Srpaulo		lzma_bufcpy(coder->header, &coder->header_pos,
829278307Srpaulo				sizeof(coder->header),
830278307Srpaulo				out, out_pos, out_size);
831278307Srpaulo		return coder->header_pos < sizeof(coder->header)
832278307Srpaulo				? LZMA_OK : LZMA_STREAM_END;
833278307Srpaulo	}
834278307Srpaulo
835278307Srpaulo	assert(0);
836278307Srpaulo	return LZMA_PROG_ERROR;
837278307Srpaulo}
838278307Srpaulo
839278307Srpaulo
840278307Srpaulostatic void
841312518Sdelphijstream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
842278307Srpaulo{
843312518Sdelphij	lzma_stream_coder *coder = coder_ptr;
844312518Sdelphij
845278307Srpaulo	// Threads must be killed before the output queue can be freed.
846278307Srpaulo	threads_end(coder, allocator);
847278307Srpaulo	lzma_outq_end(&coder->outq, allocator);
848278307Srpaulo
849278307Srpaulo	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
850278307Srpaulo		lzma_free(coder->filters[i].options, allocator);
851278307Srpaulo
852278307Srpaulo	lzma_next_end(&coder->index_encoder, allocator);
853278307Srpaulo	lzma_index_end(coder->index, allocator);
854278307Srpaulo
855278307Srpaulo	mythread_cond_destroy(&coder->cond);
856278307Srpaulo	mythread_mutex_destroy(&coder->mutex);
857278307Srpaulo
858278307Srpaulo	lzma_free(coder, allocator);
859278307Srpaulo	return;
860278307Srpaulo}
861278307Srpaulo
862278307Srpaulo
863278307Srpaulo/// Options handling for lzma_stream_encoder_mt_init() and
864278307Srpaulo/// lzma_stream_encoder_mt_memusage()
865278307Srpaulostatic lzma_ret
866278307Srpauloget_options(const lzma_mt *options, lzma_options_easy *opt_easy,
867278307Srpaulo		const lzma_filter **filters, uint64_t *block_size,
868278307Srpaulo		uint64_t *outbuf_size_max)
869278307Srpaulo{
870278307Srpaulo	// Validate some of the options.
871278307Srpaulo	if (options == NULL)
872278307Srpaulo		return LZMA_PROG_ERROR;
873278307Srpaulo
874278307Srpaulo	if (options->flags != 0 || options->threads == 0
875278307Srpaulo			|| options->threads > LZMA_THREADS_MAX)
876278307Srpaulo		return LZMA_OPTIONS_ERROR;
877278307Srpaulo
878278307Srpaulo	if (options->filters != NULL) {
879278307Srpaulo		// Filter chain was given, use it as is.
880278307Srpaulo		*filters = options->filters;
881278307Srpaulo	} else {
882278307Srpaulo		// Use a preset.
883278307Srpaulo		if (lzma_easy_preset(opt_easy, options->preset))
884278307Srpaulo			return LZMA_OPTIONS_ERROR;
885278307Srpaulo
886278307Srpaulo		*filters = opt_easy->filters;
887278307Srpaulo	}
888278307Srpaulo
889278307Srpaulo	// Block size
890278307Srpaulo	if (options->block_size > 0) {
891278307Srpaulo		if (options->block_size > BLOCK_SIZE_MAX)
892278307Srpaulo			return LZMA_OPTIONS_ERROR;
893278307Srpaulo
894278307Srpaulo		*block_size = options->block_size;
895278307Srpaulo	} else {
896278307Srpaulo		// Determine the Block size from the filter chain.
897278307Srpaulo		*block_size = lzma_mt_block_size(*filters);
898278307Srpaulo		if (*block_size == 0)
899278307Srpaulo			return LZMA_OPTIONS_ERROR;
900278307Srpaulo
901278307Srpaulo		assert(*block_size <= BLOCK_SIZE_MAX);
902278307Srpaulo	}
903278307Srpaulo
904278307Srpaulo	// Calculate the maximum amount output that a single output buffer
905278307Srpaulo	// may need to hold. This is the same as the maximum total size of
906278307Srpaulo	// a Block.
907278307Srpaulo	*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
908278307Srpaulo	if (*outbuf_size_max == 0)
909278307Srpaulo		return LZMA_MEM_ERROR;
910278307Srpaulo
911278307Srpaulo	return LZMA_OK;
912278307Srpaulo}
913278307Srpaulo
914278307Srpaulo
915278307Srpaulostatic void
916312518Sdelphijget_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
917278307Srpaulo{
918312518Sdelphij	lzma_stream_coder *coder = coder_ptr;
919312518Sdelphij
920278307Srpaulo	// Lock coder->mutex to prevent finishing threads from moving their
921312518Sdelphij	// progress info from the worker_thread structure to lzma_stream_coder.
922278307Srpaulo	mythread_sync(coder->mutex) {
923278307Srpaulo		*progress_in = coder->progress_in;
924278307Srpaulo		*progress_out = coder->progress_out;
925278307Srpaulo
926278307Srpaulo		for (size_t i = 0; i < coder->threads_initialized; ++i) {
927278307Srpaulo			mythread_sync(coder->threads[i].mutex) {
928278307Srpaulo				*progress_in += coder->threads[i].progress_in;
929278307Srpaulo				*progress_out += coder->threads[i]
930278307Srpaulo						.progress_out;
931278307Srpaulo			}
932278307Srpaulo		}
933278307Srpaulo	}
934278307Srpaulo
935278307Srpaulo	return;
936278307Srpaulo}
937278307Srpaulo
938278307Srpaulo
939278307Srpaulostatic lzma_ret
940278307Srpaulostream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
941278307Srpaulo		const lzma_mt *options)
942278307Srpaulo{
943278307Srpaulo	lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
944278307Srpaulo
945278307Srpaulo	// Get the filter chain.
946278307Srpaulo	lzma_options_easy easy;
947278307Srpaulo	const lzma_filter *filters;
948278307Srpaulo	uint64_t block_size;
949278307Srpaulo	uint64_t outbuf_size_max;
950278307Srpaulo	return_if_error(get_options(options, &easy, &filters,
951278307Srpaulo			&block_size, &outbuf_size_max));
952278307Srpaulo
953278307Srpaulo#if SIZE_MAX < UINT64_MAX
954278307Srpaulo	if (block_size > SIZE_MAX)
955278307Srpaulo		return LZMA_MEM_ERROR;
956278307Srpaulo#endif
957278307Srpaulo
958278307Srpaulo	// Validate the filter chain so that we can give an error in this
959278307Srpaulo	// function instead of delaying it to the first call to lzma_code().
960278307Srpaulo	// The memory usage calculation verifies the filter chain as
961278307Srpaulo	// a side effect so we take advatange of that.
962278307Srpaulo	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
963278307Srpaulo		return LZMA_OPTIONS_ERROR;
964278307Srpaulo
965278307Srpaulo	// Validate the Check ID.
966278307Srpaulo	if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
967278307Srpaulo		return LZMA_PROG_ERROR;
968278307Srpaulo
969278307Srpaulo	if (!lzma_check_is_supported(options->check))
970278307Srpaulo		return LZMA_UNSUPPORTED_CHECK;
971278307Srpaulo
972278307Srpaulo	// Allocate and initialize the base structure if needed.
973312518Sdelphij	lzma_stream_coder *coder = next->coder;
974312518Sdelphij	if (coder == NULL) {
975312518Sdelphij		coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
976312518Sdelphij		if (coder == NULL)
977278307Srpaulo			return LZMA_MEM_ERROR;
978278307Srpaulo
979312518Sdelphij		next->coder = coder;
980312518Sdelphij
981278307Srpaulo		// For the mutex and condition variable initializations
982278307Srpaulo		// the error handling has to be done here because
983278307Srpaulo		// stream_encoder_mt_end() doesn't know if they have
984278307Srpaulo		// already been initialized or not.
985312518Sdelphij		if (mythread_mutex_init(&coder->mutex)) {
986312518Sdelphij			lzma_free(coder, allocator);
987278307Srpaulo			next->coder = NULL;
988278307Srpaulo			return LZMA_MEM_ERROR;
989278307Srpaulo		}
990278307Srpaulo
991312518Sdelphij		if (mythread_cond_init(&coder->cond)) {
992312518Sdelphij			mythread_mutex_destroy(&coder->mutex);
993312518Sdelphij			lzma_free(coder, allocator);
994278307Srpaulo			next->coder = NULL;
995278307Srpaulo			return LZMA_MEM_ERROR;
996278307Srpaulo		}
997278307Srpaulo
998278307Srpaulo		next->code = &stream_encode_mt;
999278307Srpaulo		next->end = &stream_encoder_mt_end;
1000278307Srpaulo		next->get_progress = &get_progress;
1001278307Srpaulo// 		next->update = &stream_encoder_mt_update;
1002278307Srpaulo
1003312518Sdelphij		coder->filters[0].id = LZMA_VLI_UNKNOWN;
1004312518Sdelphij		coder->index_encoder = LZMA_NEXT_CODER_INIT;
1005312518Sdelphij		coder->index = NULL;
1006312518Sdelphij		memzero(&coder->outq, sizeof(coder->outq));
1007312518Sdelphij		coder->threads = NULL;
1008312518Sdelphij		coder->threads_max = 0;
1009312518Sdelphij		coder->threads_initialized = 0;
1010278307Srpaulo	}
1011278307Srpaulo
1012278307Srpaulo	// Basic initializations
1013312518Sdelphij	coder->sequence = SEQ_STREAM_HEADER;
1014312518Sdelphij	coder->block_size = (size_t)(block_size);
1015312518Sdelphij	coder->thread_error = LZMA_OK;
1016312518Sdelphij	coder->thr = NULL;
1017278307Srpaulo
1018278307Srpaulo	// Allocate the thread-specific base structures.
1019278307Srpaulo	assert(options->threads > 0);
1020312518Sdelphij	if (coder->threads_max != options->threads) {
1021312518Sdelphij		threads_end(coder, allocator);
1022278307Srpaulo
1023312518Sdelphij		coder->threads = NULL;
1024312518Sdelphij		coder->threads_max = 0;
1025278307Srpaulo
1026312518Sdelphij		coder->threads_initialized = 0;
1027312518Sdelphij		coder->threads_free = NULL;
1028278307Srpaulo
1029312518Sdelphij		coder->threads = lzma_alloc(
1030278307Srpaulo				options->threads * sizeof(worker_thread),
1031278307Srpaulo				allocator);
1032312518Sdelphij		if (coder->threads == NULL)
1033278307Srpaulo			return LZMA_MEM_ERROR;
1034278307Srpaulo
1035312518Sdelphij		coder->threads_max = options->threads;
1036278307Srpaulo	} else {
1037278307Srpaulo		// Reuse the old structures and threads. Tell the running
1038278307Srpaulo		// threads to stop and wait until they have stopped.
1039312518Sdelphij		threads_stop(coder, true);
1040278307Srpaulo	}
1041278307Srpaulo
1042278307Srpaulo	// Output queue
1043312518Sdelphij	return_if_error(lzma_outq_init(&coder->outq, allocator,
1044278307Srpaulo			outbuf_size_max, options->threads));
1045278307Srpaulo
1046278307Srpaulo	// Timeout
1047312518Sdelphij	coder->timeout = options->timeout;
1048278307Srpaulo
1049278307Srpaulo	// Free the old filter chain and copy the new one.
1050312518Sdelphij	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051312518Sdelphij		lzma_free(coder->filters[i].options, allocator);
1052278307Srpaulo
1053278307Srpaulo	return_if_error(lzma_filters_copy(
1054312518Sdelphij			filters, coder->filters, allocator));
1055278307Srpaulo
1056278307Srpaulo	// Index
1057312518Sdelphij	lzma_index_end(coder->index, allocator);
1058312518Sdelphij	coder->index = lzma_index_init(allocator);
1059312518Sdelphij	if (coder->index == NULL)
1060278307Srpaulo		return LZMA_MEM_ERROR;
1061278307Srpaulo
1062278307Srpaulo	// Stream Header
1063312518Sdelphij	coder->stream_flags.version = 0;
1064312518Sdelphij	coder->stream_flags.check = options->check;
1065278307Srpaulo	return_if_error(lzma_stream_header_encode(
1066312518Sdelphij			&coder->stream_flags, coder->header));
1067278307Srpaulo
1068312518Sdelphij	coder->header_pos = 0;
1069278307Srpaulo
1070278307Srpaulo	// Progress info
1071312518Sdelphij	coder->progress_in = 0;
1072312518Sdelphij	coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1073278307Srpaulo
1074278307Srpaulo	return LZMA_OK;
1075278307Srpaulo}
1076278307Srpaulo
1077278307Srpaulo
1078278307Srpauloextern LZMA_API(lzma_ret)
1079278307Srpaulolzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1080278307Srpaulo{
1081278307Srpaulo	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1082278307Srpaulo
1083278307Srpaulo	strm->internal->supported_actions[LZMA_RUN] = true;
1084278307Srpaulo// 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1085278307Srpaulo	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1086278307Srpaulo	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1087278307Srpaulo	strm->internal->supported_actions[LZMA_FINISH] = true;
1088278307Srpaulo
1089278307Srpaulo	return LZMA_OK;
1090278307Srpaulo}
1091278307Srpaulo
1092278307Srpaulo
1093278307Srpaulo// This function name is a monster but it's consistent with the older
1094278307Srpaulo// monster names. :-( 31 chars is the max that C99 requires so in that
1095278307Srpaulo// sense it's not too long. ;-)
1096278307Srpauloextern LZMA_API(uint64_t)
1097278307Srpaulolzma_stream_encoder_mt_memusage(const lzma_mt *options)
1098278307Srpaulo{
1099278307Srpaulo	lzma_options_easy easy;
1100278307Srpaulo	const lzma_filter *filters;
1101278307Srpaulo	uint64_t block_size;
1102278307Srpaulo	uint64_t outbuf_size_max;
1103278307Srpaulo
1104278307Srpaulo	if (get_options(options, &easy, &filters, &block_size,
1105278307Srpaulo			&outbuf_size_max) != LZMA_OK)
1106278307Srpaulo		return UINT64_MAX;
1107278307Srpaulo
1108278307Srpaulo	// Memory usage of the input buffers
1109278307Srpaulo	const uint64_t inbuf_memusage = options->threads * block_size;
1110278307Srpaulo
1111278307Srpaulo	// Memory usage of the filter encoders
1112278307Srpaulo	uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1113278307Srpaulo	if (filters_memusage == UINT64_MAX)
1114278307Srpaulo		return UINT64_MAX;
1115278307Srpaulo
1116278307Srpaulo	filters_memusage *= options->threads;
1117278307Srpaulo
1118278307Srpaulo	// Memory usage of the output queue
1119278307Srpaulo	const uint64_t outq_memusage = lzma_outq_memusage(
1120278307Srpaulo			outbuf_size_max, options->threads);
1121278307Srpaulo	if (outq_memusage == UINT64_MAX)
1122278307Srpaulo		return UINT64_MAX;
1123278307Srpaulo
1124278307Srpaulo	// Sum them with overflow checking.
1125312518Sdelphij	uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126312518Sdelphij			+ sizeof(lzma_stream_coder)
1127278307Srpaulo			+ options->threads * sizeof(worker_thread);
1128278307Srpaulo
1129278307Srpaulo	if (UINT64_MAX - total_memusage < inbuf_memusage)
1130278307Srpaulo		return UINT64_MAX;
1131278307Srpaulo
1132278307Srpaulo	total_memusage += inbuf_memusage;
1133278307Srpaulo
1134278307Srpaulo	if (UINT64_MAX - total_memusage < filters_memusage)
1135278307Srpaulo		return UINT64_MAX;
1136278307Srpaulo
1137278307Srpaulo	total_memusage += filters_memusage;
1138278307Srpaulo
1139278307Srpaulo	if (UINT64_MAX - total_memusage < outq_memusage)
1140278307Srpaulo		return UINT64_MAX;
1141278307Srpaulo
1142278307Srpaulo	return total_memusage + outq_memusage;
1143278307Srpaulo}
1144