1// SPDX-License-Identifier: 0BSD
2
3///////////////////////////////////////////////////////////////////////////////
4//
5/// \file       stream_encoder_mt.c
6/// \brief      Multithreaded .xz Stream encoder
7//
8//  Author:     Lasse Collin
9//
10///////////////////////////////////////////////////////////////////////////////
11
12#include "filter_encoder.h"
13#include "easy_preset.h"
14#include "block_encoder.h"
15#include "block_buffer_encoder.h"
16#include "index_encoder.h"
17#include "outqueue.h"
18
19
20/// Maximum supported block size. This makes it simpler to prevent integer
21/// overflows if we are given unusually large block size.
22#define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
23
24
25typedef enum {
26	/// Waiting for work.
27	THR_IDLE,
28
29	/// Encoding is in progress.
30	THR_RUN,
31
32	/// Encoding is in progress but no more input data will
33	/// be read.
34	THR_FINISH,
35
36	/// The main thread wants the thread to stop whatever it was doing
37	/// but not exit.
38	THR_STOP,
39
40	/// The main thread wants the thread to exit. We could use
41	/// cancellation but since there's stopped anyway, this is lazier.
42	THR_EXIT,
43
44} worker_state;
45
46typedef struct lzma_stream_coder_s lzma_stream_coder;
47
48typedef struct worker_thread_s worker_thread;
49struct worker_thread_s {
50	worker_state state;
51
52	/// Input buffer of coder->block_size bytes. The main thread will
53	/// put new input into this and update in_size accordingly. Once
54	/// no more input is coming, state will be set to THR_FINISH.
55	uint8_t *in;
56
57	/// Amount of data available in the input buffer. This is modified
58	/// only by the main thread.
59	size_t in_size;
60
61	/// Output buffer for this thread. This is set by the main
62	/// thread every time a new Block is started with this thread
63	/// structure.
64	lzma_outbuf *outbuf;
65
66	/// Pointer to the main structure is needed when putting this
67	/// thread back to the stack of free threads.
68	lzma_stream_coder *coder;
69
70	/// The allocator is set by the main thread. Since a copy of the
71	/// pointer is kept here, the application must not change the
72	/// allocator before calling lzma_end().
73	const lzma_allocator *allocator;
74
75	/// Amount of uncompressed data that has already been compressed.
76	uint64_t progress_in;
77
78	/// Amount of compressed data that is ready.
79	uint64_t progress_out;
80
81	/// Block encoder
82	lzma_next_coder block_encoder;
83
84	/// Compression options for this Block
85	lzma_block block_options;
86
87	/// Filter chain for this thread. By copying the filters array
88	/// to each thread it is possible to change the filter chain
89	/// between Blocks using lzma_filters_update().
90	lzma_filter filters[LZMA_FILTERS_MAX + 1];
91
92	/// Next structure in the stack of free worker threads.
93	worker_thread *next;
94
95	mythread_mutex mutex;
96	mythread_cond cond;
97
98	/// The ID of this thread is used to join the thread
99	/// when it's not needed anymore.
100	mythread thread_id;
101};
102
103
104struct lzma_stream_coder_s {
105	enum {
106		SEQ_STREAM_HEADER,
107		SEQ_BLOCK,
108		SEQ_INDEX,
109		SEQ_STREAM_FOOTER,
110	} sequence;
111
112	/// Start a new Block every block_size bytes of input unless
113	/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
114	size_t block_size;
115
116	/// The filter chain to use for the next Block.
117	/// This can be updated using lzma_filters_update()
118	/// after LZMA_FULL_BARRIER or LZMA_FULL_FLUSH.
119	lzma_filter filters[LZMA_FILTERS_MAX + 1];
120
121	/// A copy of filters[] will be put here when attempting to get
122	/// a new worker thread. This will be copied to a worker thread
123	/// when a thread becomes free and then this cache is marked as
124	/// empty by setting [0].id = LZMA_VLI_UNKNOWN. Without this cache
125	/// the filter options from filters[] would get uselessly copied
126	/// multiple times (allocated and freed) when waiting for a new free
127	/// worker thread.
128	///
129	/// This is freed if filters[] is updated via lzma_filters_update().
130	lzma_filter filters_cache[LZMA_FILTERS_MAX + 1];
131
132
133	/// Index to hold sizes of the Blocks
134	lzma_index *index;
135
136	/// Index encoder
137	lzma_next_coder index_encoder;
138
139
140	/// Stream Flags for encoding the Stream Header and Stream Footer.
141	lzma_stream_flags stream_flags;
142
143	/// Buffer to hold Stream Header and Stream Footer.
144	uint8_t header[LZMA_STREAM_HEADER_SIZE];
145
146	/// Read position in header[]
147	size_t header_pos;
148
149
150	/// Output buffer queue for compressed data
151	lzma_outq outq;
152
153	/// How much memory to allocate for each lzma_outbuf.buf
154	size_t outbuf_alloc_size;
155
156
157	/// Maximum wait time if cannot use all the input and cannot
158	/// fill the output buffer. This is in milliseconds.
159	uint32_t timeout;
160
161
162	/// Error code from a worker thread
163	lzma_ret thread_error;
164
165	/// Array of allocated thread-specific structures
166	worker_thread *threads;
167
168	/// Number of structures in "threads" above. This is also the
169	/// number of threads that will be created at maximum.
170	uint32_t threads_max;
171
172	/// Number of thread structures that have been initialized, and
173	/// thus the number of worker threads actually created so far.
174	uint32_t threads_initialized;
175
176	/// Stack of free threads. When a thread finishes, it puts itself
177	/// back into this stack. This starts as empty because threads
178	/// are created only when actually needed.
179	worker_thread *threads_free;
180
181	/// The most recent worker thread to which the main thread writes
182	/// the new input from the application.
183	worker_thread *thr;
184
185
186	/// Amount of uncompressed data in Blocks that have already
187	/// been finished.
188	uint64_t progress_in;
189
190	/// Amount of compressed data in Stream Header + Blocks that
191	/// have already been finished.
192	uint64_t progress_out;
193
194
195	mythread_mutex mutex;
196	mythread_cond cond;
197};
198
199
200/// Tell the main thread that something has gone wrong.
201static void
202worker_error(worker_thread *thr, lzma_ret ret)
203{
204	assert(ret != LZMA_OK);
205	assert(ret != LZMA_STREAM_END);
206
207	mythread_sync(thr->coder->mutex) {
208		if (thr->coder->thread_error == LZMA_OK)
209			thr->coder->thread_error = ret;
210
211		mythread_cond_signal(&thr->coder->cond);
212	}
213
214	return;
215}
216
217
218static worker_state
219worker_encode(worker_thread *thr, size_t *out_pos, worker_state state)
220{
221	assert(thr->progress_in == 0);
222	assert(thr->progress_out == 0);
223
224	// Set the Block options.
225	thr->block_options = (lzma_block){
226		.version = 0,
227		.check = thr->coder->stream_flags.check,
228		.compressed_size = thr->outbuf->allocated,
229		.uncompressed_size = thr->coder->block_size,
230		.filters = thr->filters,
231	};
232
233	// Calculate maximum size of the Block Header. This amount is
234	// reserved in the beginning of the buffer so that Block Header
235	// along with Compressed Size and Uncompressed Size can be
236	// written there.
237	lzma_ret ret = lzma_block_header_size(&thr->block_options);
238	if (ret != LZMA_OK) {
239		worker_error(thr, ret);
240		return THR_STOP;
241	}
242
243	// Initialize the Block encoder.
244	ret = lzma_block_encoder_init(&thr->block_encoder,
245			thr->allocator, &thr->block_options);
246	if (ret != LZMA_OK) {
247		worker_error(thr, ret);
248		return THR_STOP;
249	}
250
251	size_t in_pos = 0;
252	size_t in_size = 0;
253
254	*out_pos = thr->block_options.header_size;
255	const size_t out_size = thr->outbuf->allocated;
256
257	do {
258		mythread_sync(thr->mutex) {
259			// Store in_pos and *out_pos into *thr so that
260			// an application may read them via
261			// lzma_get_progress() to get progress information.
262			//
263			// NOTE: These aren't updated when the encoding
264			// finishes. Instead, the final values are taken
265			// later from thr->outbuf.
266			thr->progress_in = in_pos;
267			thr->progress_out = *out_pos;
268
269			while (in_size == thr->in_size
270					&& thr->state == THR_RUN)
271				mythread_cond_wait(&thr->cond, &thr->mutex);
272
273			state = thr->state;
274			in_size = thr->in_size;
275		}
276
277		// Return if we were asked to stop or exit.
278		if (state >= THR_STOP)
279			return state;
280
281		lzma_action action = state == THR_FINISH
282				? LZMA_FINISH : LZMA_RUN;
283
284		// Limit the amount of input given to the Block encoder
285		// at once. This way this thread can react fairly quickly
286		// if the main thread wants us to stop or exit.
287		static const size_t in_chunk_max = 16384;
288		size_t in_limit = in_size;
289		if (in_size - in_pos > in_chunk_max) {
290			in_limit = in_pos + in_chunk_max;
291			action = LZMA_RUN;
292		}
293
294		ret = thr->block_encoder.code(
295				thr->block_encoder.coder, thr->allocator,
296				thr->in, &in_pos, in_limit, thr->outbuf->buf,
297				out_pos, out_size, action);
298	} while (ret == LZMA_OK && *out_pos < out_size);
299
300	switch (ret) {
301	case LZMA_STREAM_END:
302		assert(state == THR_FINISH);
303
304		// Encode the Block Header. By doing it after
305		// the compression, we can store the Compressed Size
306		// and Uncompressed Size fields.
307		ret = lzma_block_header_encode(&thr->block_options,
308				thr->outbuf->buf);
309		if (ret != LZMA_OK) {
310			worker_error(thr, ret);
311			return THR_STOP;
312		}
313
314		break;
315
316	case LZMA_OK:
317		// The data was incompressible. Encode it using uncompressed
318		// LZMA2 chunks.
319		//
320		// First wait that we have gotten all the input.
321		mythread_sync(thr->mutex) {
322			while (thr->state == THR_RUN)
323				mythread_cond_wait(&thr->cond, &thr->mutex);
324
325			state = thr->state;
326			in_size = thr->in_size;
327		}
328
329		if (state >= THR_STOP)
330			return state;
331
332		// Do the encoding. This takes care of the Block Header too.
333		*out_pos = 0;
334		ret = lzma_block_uncomp_encode(&thr->block_options,
335				thr->in, in_size, thr->outbuf->buf,
336				out_pos, out_size);
337
338		// It shouldn't fail.
339		if (ret != LZMA_OK) {
340			worker_error(thr, LZMA_PROG_ERROR);
341			return THR_STOP;
342		}
343
344		break;
345
346	default:
347		worker_error(thr, ret);
348		return THR_STOP;
349	}
350
351	// Set the size information that will be read by the main thread
352	// to write the Index field.
353	thr->outbuf->unpadded_size
354			= lzma_block_unpadded_size(&thr->block_options);
355	assert(thr->outbuf->unpadded_size != 0);
356	thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
357
358	return THR_FINISH;
359}
360
361
362static MYTHREAD_RET_TYPE
363worker_start(void *thr_ptr)
364{
365	worker_thread *thr = thr_ptr;
366	worker_state state = THR_IDLE; // Init to silence a warning
367
368	while (true) {
369		// Wait for work.
370		mythread_sync(thr->mutex) {
371			while (true) {
372				// The thread is already idle so if we are
373				// requested to stop, just set the state.
374				if (thr->state == THR_STOP) {
375					thr->state = THR_IDLE;
376					mythread_cond_signal(&thr->cond);
377				}
378
379				state = thr->state;
380				if (state != THR_IDLE)
381					break;
382
383				mythread_cond_wait(&thr->cond, &thr->mutex);
384			}
385		}
386
387		size_t out_pos = 0;
388
389		assert(state != THR_IDLE);
390		assert(state != THR_STOP);
391
392		if (state <= THR_FINISH)
393			state = worker_encode(thr, &out_pos, state);
394
395		if (state == THR_EXIT)
396			break;
397
398		// Mark the thread as idle unless the main thread has
399		// told us to exit. Signal is needed for the case
400		// where the main thread is waiting for the threads to stop.
401		mythread_sync(thr->mutex) {
402			if (thr->state != THR_EXIT) {
403				thr->state = THR_IDLE;
404				mythread_cond_signal(&thr->cond);
405			}
406		}
407
408		mythread_sync(thr->coder->mutex) {
409			// If no errors occurred, make the encoded data
410			// available to be copied out.
411			if (state == THR_FINISH) {
412				thr->outbuf->pos = out_pos;
413				thr->outbuf->finished = true;
414			}
415
416			// Update the main progress info.
417			thr->coder->progress_in
418					+= thr->outbuf->uncompressed_size;
419			thr->coder->progress_out += out_pos;
420			thr->progress_in = 0;
421			thr->progress_out = 0;
422
423			// Return this thread to the stack of free threads.
424			thr->next = thr->coder->threads_free;
425			thr->coder->threads_free = thr;
426
427			mythread_cond_signal(&thr->coder->cond);
428		}
429	}
430
431	// Exiting, free the resources.
432	lzma_filters_free(thr->filters, thr->allocator);
433
434	mythread_mutex_destroy(&thr->mutex);
435	mythread_cond_destroy(&thr->cond);
436
437	lzma_next_end(&thr->block_encoder, thr->allocator);
438	lzma_free(thr->in, thr->allocator);
439	return MYTHREAD_RET_VALUE;
440}
441
442
443/// Make the threads stop but not exit. Optionally wait for them to stop.
444static void
445threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
446{
447	// Tell the threads to stop.
448	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
449		mythread_sync(coder->threads[i].mutex) {
450			coder->threads[i].state = THR_STOP;
451			mythread_cond_signal(&coder->threads[i].cond);
452		}
453	}
454
455	if (!wait_for_threads)
456		return;
457
458	// Wait for the threads to settle in the idle state.
459	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460		mythread_sync(coder->threads[i].mutex) {
461			while (coder->threads[i].state != THR_IDLE)
462				mythread_cond_wait(&coder->threads[i].cond,
463						&coder->threads[i].mutex);
464		}
465	}
466
467	return;
468}
469
470
471/// Stop the threads and free the resources associated with them.
472/// Wait until the threads have exited.
473static void
474threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
475{
476	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
477		mythread_sync(coder->threads[i].mutex) {
478			coder->threads[i].state = THR_EXIT;
479			mythread_cond_signal(&coder->threads[i].cond);
480		}
481	}
482
483	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
484		int ret = mythread_join(coder->threads[i].thread_id);
485		assert(ret == 0);
486		(void)ret;
487	}
488
489	lzma_free(coder->threads, allocator);
490	return;
491}
492
493
494/// Initialize a new worker_thread structure and create a new thread.
495static lzma_ret
496initialize_new_thread(lzma_stream_coder *coder,
497		const lzma_allocator *allocator)
498{
499	worker_thread *thr = &coder->threads[coder->threads_initialized];
500
501	thr->in = lzma_alloc(coder->block_size, allocator);
502	if (thr->in == NULL)
503		return LZMA_MEM_ERROR;
504
505	if (mythread_mutex_init(&thr->mutex))
506		goto error_mutex;
507
508	if (mythread_cond_init(&thr->cond))
509		goto error_cond;
510
511	thr->state = THR_IDLE;
512	thr->allocator = allocator;
513	thr->coder = coder;
514	thr->progress_in = 0;
515	thr->progress_out = 0;
516	thr->block_encoder = LZMA_NEXT_CODER_INIT;
517	thr->filters[0].id = LZMA_VLI_UNKNOWN;
518
519	if (mythread_create(&thr->thread_id, &worker_start, thr))
520		goto error_thread;
521
522	++coder->threads_initialized;
523	coder->thr = thr;
524
525	return LZMA_OK;
526
527error_thread:
528	mythread_cond_destroy(&thr->cond);
529
530error_cond:
531	mythread_mutex_destroy(&thr->mutex);
532
533error_mutex:
534	lzma_free(thr->in, allocator);
535	return LZMA_MEM_ERROR;
536}
537
538
539static lzma_ret
540get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
541{
542	// If there are no free output subqueues, there is no
543	// point to try getting a thread.
544	if (!lzma_outq_has_buf(&coder->outq))
545		return LZMA_OK;
546
547	// That's also true if we cannot allocate memory for the output
548	// buffer in the output queue.
549	return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator,
550			coder->outbuf_alloc_size));
551
552	// Make a thread-specific copy of the filter chain. Put it in
553	// the cache array first so that if we cannot get a new thread yet,
554	// the allocation is ready when we try again.
555	if (coder->filters_cache[0].id == LZMA_VLI_UNKNOWN)
556		return_if_error(lzma_filters_copy(
557			coder->filters, coder->filters_cache, allocator));
558
559	// If there is a free structure on the stack, use it.
560	mythread_sync(coder->mutex) {
561		if (coder->threads_free != NULL) {
562			coder->thr = coder->threads_free;
563			coder->threads_free = coder->threads_free->next;
564		}
565	}
566
567	if (coder->thr == NULL) {
568		// If there are no uninitialized structures left, return.
569		if (coder->threads_initialized == coder->threads_max)
570			return LZMA_OK;
571
572		// Initialize a new thread.
573		return_if_error(initialize_new_thread(coder, allocator));
574	}
575
576	// Reset the parts of the thread state that have to be done
577	// in the main thread.
578	mythread_sync(coder->thr->mutex) {
579		coder->thr->state = THR_RUN;
580		coder->thr->in_size = 0;
581		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL);
582
583		// Free the old thread-specific filter options and replace
584		// them with the already-allocated new options from
585		// coder->filters_cache[]. Then mark the cache as empty.
586		lzma_filters_free(coder->thr->filters, allocator);
587		memcpy(coder->thr->filters, coder->filters_cache,
588				sizeof(coder->filters_cache));
589		coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
590
591		mythread_cond_signal(&coder->thr->cond);
592	}
593
594	return LZMA_OK;
595}
596
597
598static lzma_ret
599stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
600		const uint8_t *restrict in, size_t *restrict in_pos,
601		size_t in_size, lzma_action action)
602{
603	while (*in_pos < in_size
604			|| (coder->thr != NULL && action != LZMA_RUN)) {
605		if (coder->thr == NULL) {
606			// Get a new thread.
607			const lzma_ret ret = get_thread(coder, allocator);
608			if (coder->thr == NULL)
609				return ret;
610		}
611
612		// Copy the input data to thread's buffer.
613		size_t thr_in_size = coder->thr->in_size;
614		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
615				&thr_in_size, coder->block_size);
616
617		// Tell the Block encoder to finish if
618		//  - it has got block_size bytes of input; or
619		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
620		//    or LZMA_FULL_BARRIER was used.
621		//
622		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
623		const bool finish = thr_in_size == coder->block_size
624				|| (*in_pos == in_size && action != LZMA_RUN);
625
626		bool block_error = false;
627
628		mythread_sync(coder->thr->mutex) {
629			if (coder->thr->state == THR_IDLE) {
630				// Something has gone wrong with the Block
631				// encoder. It has set coder->thread_error
632				// which we will read a few lines later.
633				block_error = true;
634			} else {
635				// Tell the Block encoder its new amount
636				// of input and update the state if needed.
637				coder->thr->in_size = thr_in_size;
638
639				if (finish)
640					coder->thr->state = THR_FINISH;
641
642				mythread_cond_signal(&coder->thr->cond);
643			}
644		}
645
646		if (block_error) {
647			lzma_ret ret = LZMA_OK; // Init to silence a warning.
648
649			mythread_sync(coder->mutex) {
650				ret = coder->thread_error;
651			}
652
653			return ret;
654		}
655
656		if (finish)
657			coder->thr = NULL;
658	}
659
660	return LZMA_OK;
661}
662
663
664/// Wait until more input can be consumed, more output can be read, or
665/// an optional timeout is reached.
666static bool
667wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
668		bool *has_blocked, bool has_input)
669{
670	if (coder->timeout != 0 && !*has_blocked) {
671		// Every time when stream_encode_mt() is called via
672		// lzma_code(), *has_blocked starts as false. We set it
673		// to true here and calculate the absolute time when
674		// we must return if there's nothing to do.
675		//
676		// This way if we block multiple times for short moments
677		// less than "timeout" milliseconds, we will return once
678		// "timeout" amount of time has passed since the *first*
679		// blocking occurred. If the absolute time was calculated
680		// again every time we block, "timeout" would effectively
681		// be meaningless if we never consecutively block longer
682		// than "timeout" ms.
683		*has_blocked = true;
684		mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
685	}
686
687	bool timed_out = false;
688
689	mythread_sync(coder->mutex) {
690		// There are four things that we wait. If one of them
691		// becomes possible, we return.
692		//  - If there is input left, we need to get a free
693		//    worker thread and an output buffer for it.
694		//  - Data ready to be read from the output queue.
695		//  - A worker thread indicates an error.
696		//  - Time out occurs.
697		while ((!has_input || coder->threads_free == NULL
698					|| !lzma_outq_has_buf(&coder->outq))
699				&& !lzma_outq_is_readable(&coder->outq)
700				&& coder->thread_error == LZMA_OK
701				&& !timed_out) {
702			if (coder->timeout != 0)
703				timed_out = mythread_cond_timedwait(
704						&coder->cond, &coder->mutex,
705						wait_abs) != 0;
706			else
707				mythread_cond_wait(&coder->cond,
708						&coder->mutex);
709		}
710	}
711
712	return timed_out;
713}
714
715
716static lzma_ret
717stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
718		const uint8_t *restrict in, size_t *restrict in_pos,
719		size_t in_size, uint8_t *restrict out,
720		size_t *restrict out_pos, size_t out_size, lzma_action action)
721{
722	lzma_stream_coder *coder = coder_ptr;
723
724	switch (coder->sequence) {
725	case SEQ_STREAM_HEADER:
726		lzma_bufcpy(coder->header, &coder->header_pos,
727				sizeof(coder->header),
728				out, out_pos, out_size);
729		if (coder->header_pos < sizeof(coder->header))
730			return LZMA_OK;
731
732		coder->header_pos = 0;
733		coder->sequence = SEQ_BLOCK;
734
735	// Fall through
736
737	case SEQ_BLOCK: {
738		// Initialized to silence warnings.
739		lzma_vli unpadded_size = 0;
740		lzma_vli uncompressed_size = 0;
741		lzma_ret ret = LZMA_OK;
742
743		// These are for wait_for_work().
744		bool has_blocked = false;
745		mythread_condtime wait_abs = { 0 };
746
747		while (true) {
748			mythread_sync(coder->mutex) {
749				// Check for Block encoder errors.
750				ret = coder->thread_error;
751				if (ret != LZMA_OK) {
752					assert(ret != LZMA_STREAM_END);
753					break; // Break out of mythread_sync.
754				}
755
756				// Try to read compressed data to out[].
757				ret = lzma_outq_read(&coder->outq, allocator,
758						out, out_pos, out_size,
759						&unpadded_size,
760						&uncompressed_size);
761			}
762
763			if (ret == LZMA_STREAM_END) {
764				// End of Block. Add it to the Index.
765				ret = lzma_index_append(coder->index,
766						allocator, unpadded_size,
767						uncompressed_size);
768				if (ret != LZMA_OK) {
769					threads_stop(coder, false);
770					return ret;
771				}
772
773				// If we didn't fill the output buffer yet,
774				// try to read more data. Maybe the next
775				// outbuf has been finished already too.
776				if (*out_pos < out_size)
777					continue;
778			}
779
780			if (ret != LZMA_OK) {
781				// coder->thread_error was set.
782				threads_stop(coder, false);
783				return ret;
784			}
785
786			// Try to give uncompressed data to a worker thread.
787			ret = stream_encode_in(coder, allocator,
788					in, in_pos, in_size, action);
789			if (ret != LZMA_OK) {
790				threads_stop(coder, false);
791				return ret;
792			}
793
794			// See if we should wait or return.
795			//
796			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
797			if (*in_pos == in_size) {
798				// LZMA_RUN: More data is probably coming
799				// so return to let the caller fill the
800				// input buffer.
801				if (action == LZMA_RUN)
802					return LZMA_OK;
803
804				// LZMA_FULL_BARRIER: The same as with
805				// LZMA_RUN but tell the caller that the
806				// barrier was completed.
807				if (action == LZMA_FULL_BARRIER)
808					return LZMA_STREAM_END;
809
810				// Finishing or flushing isn't completed until
811				// all input data has been encoded and copied
812				// to the output buffer.
813				if (lzma_outq_is_empty(&coder->outq)) {
814					// LZMA_FINISH: Continue to encode
815					// the Index field.
816					if (action == LZMA_FINISH)
817						break;
818
819					// LZMA_FULL_FLUSH: Return to tell
820					// the caller that flushing was
821					// completed.
822					if (action == LZMA_FULL_FLUSH)
823						return LZMA_STREAM_END;
824				}
825			}
826
827			// Return if there is no output space left.
828			// This check must be done after testing the input
829			// buffer, because we might want to use a different
830			// return code.
831			if (*out_pos == out_size)
832				return LZMA_OK;
833
834			// Neither in nor out has been used completely.
835			// Wait until there's something we can do.
836			if (wait_for_work(coder, &wait_abs, &has_blocked,
837					*in_pos < in_size))
838				return LZMA_TIMED_OUT;
839		}
840
841		// All Blocks have been encoded and the threads have stopped.
842		// Prepare to encode the Index field.
843		return_if_error(lzma_index_encoder_init(
844				&coder->index_encoder, allocator,
845				coder->index));
846		coder->sequence = SEQ_INDEX;
847
848		// Update the progress info to take the Index and
849		// Stream Footer into account. Those are very fast to encode
850		// so in terms of progress information they can be thought
851		// to be ready to be copied out.
852		coder->progress_out += lzma_index_size(coder->index)
853				+ LZMA_STREAM_HEADER_SIZE;
854	}
855
856	// Fall through
857
858	case SEQ_INDEX: {
859		// Call the Index encoder. It doesn't take any input, so
860		// those pointers can be NULL.
861		const lzma_ret ret = coder->index_encoder.code(
862				coder->index_encoder.coder, allocator,
863				NULL, NULL, 0,
864				out, out_pos, out_size, LZMA_RUN);
865		if (ret != LZMA_STREAM_END)
866			return ret;
867
868		// Encode the Stream Footer into coder->buffer.
869		coder->stream_flags.backward_size
870				= lzma_index_size(coder->index);
871		if (lzma_stream_footer_encode(&coder->stream_flags,
872				coder->header) != LZMA_OK)
873			return LZMA_PROG_ERROR;
874
875		coder->sequence = SEQ_STREAM_FOOTER;
876	}
877
878	// Fall through
879
880	case SEQ_STREAM_FOOTER:
881		lzma_bufcpy(coder->header, &coder->header_pos,
882				sizeof(coder->header),
883				out, out_pos, out_size);
884		return coder->header_pos < sizeof(coder->header)
885				? LZMA_OK : LZMA_STREAM_END;
886	}
887
888	assert(0);
889	return LZMA_PROG_ERROR;
890}
891
892
893static void
894stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
895{
896	lzma_stream_coder *coder = coder_ptr;
897
898	// Threads must be killed before the output queue can be freed.
899	threads_end(coder, allocator);
900	lzma_outq_end(&coder->outq, allocator);
901
902	lzma_filters_free(coder->filters, allocator);
903	lzma_filters_free(coder->filters_cache, allocator);
904
905	lzma_next_end(&coder->index_encoder, allocator);
906	lzma_index_end(coder->index, allocator);
907
908	mythread_cond_destroy(&coder->cond);
909	mythread_mutex_destroy(&coder->mutex);
910
911	lzma_free(coder, allocator);
912	return;
913}
914
915
916static lzma_ret
917stream_encoder_mt_update(void *coder_ptr, const lzma_allocator *allocator,
918		const lzma_filter *filters,
919		const lzma_filter *reversed_filters
920			lzma_attribute((__unused__)))
921{
922	lzma_stream_coder *coder = coder_ptr;
923
924	// Applications shouldn't attempt to change the options when
925	// we are already encoding the Index or Stream Footer.
926	if (coder->sequence > SEQ_BLOCK)
927		return LZMA_PROG_ERROR;
928
929	// For now the threaded encoder doesn't support changing
930	// the options in the middle of a Block.
931	if (coder->thr != NULL)
932		return LZMA_PROG_ERROR;
933
934	// Check if the filter chain seems mostly valid. See the comment
935	// in stream_encoder_mt_init().
936	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
937		return LZMA_OPTIONS_ERROR;
938
939	// Make a copy to a temporary buffer first. This way the encoder
940	// state stays unchanged if an error occurs in lzma_filters_copy().
941	lzma_filter temp[LZMA_FILTERS_MAX + 1];
942	return_if_error(lzma_filters_copy(filters, temp, allocator));
943
944	// Free the options of the old chain as well as the cache.
945	lzma_filters_free(coder->filters, allocator);
946	lzma_filters_free(coder->filters_cache, allocator);
947
948	// Copy the new filter chain in place.
949	memcpy(coder->filters, temp, sizeof(temp));
950
951	return LZMA_OK;
952}
953
954
955/// Options handling for lzma_stream_encoder_mt_init() and
956/// lzma_stream_encoder_mt_memusage()
957static lzma_ret
958get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
959		const lzma_filter **filters, uint64_t *block_size,
960		uint64_t *outbuf_size_max)
961{
962	// Validate some of the options.
963	if (options == NULL)
964		return LZMA_PROG_ERROR;
965
966	if (options->flags != 0 || options->threads == 0
967			|| options->threads > LZMA_THREADS_MAX)
968		return LZMA_OPTIONS_ERROR;
969
970	if (options->filters != NULL) {
971		// Filter chain was given, use it as is.
972		*filters = options->filters;
973	} else {
974		// Use a preset.
975		if (lzma_easy_preset(opt_easy, options->preset))
976			return LZMA_OPTIONS_ERROR;
977
978		*filters = opt_easy->filters;
979	}
980
981	// If the Block size is not set, determine it from the filter chain.
982	if (options->block_size > 0)
983		*block_size = options->block_size;
984	else
985		*block_size = lzma_mt_block_size(*filters);
986
987	// UINT64_MAX > BLOCK_SIZE_MAX, so the second condition
988	// should be optimized out by any reasonable compiler.
989	// The second condition should be there in the unlikely event that
990	// the macros change and UINT64_MAX < BLOCK_SIZE_MAX.
991	if (*block_size > BLOCK_SIZE_MAX || *block_size == UINT64_MAX)
992		return LZMA_OPTIONS_ERROR;
993
994	// Calculate the maximum amount output that a single output buffer
995	// may need to hold. This is the same as the maximum total size of
996	// a Block.
997	*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
998	if (*outbuf_size_max == 0)
999		return LZMA_MEM_ERROR;
1000
1001	return LZMA_OK;
1002}
1003
1004
1005static void
1006get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
1007{
1008	lzma_stream_coder *coder = coder_ptr;
1009
1010	// Lock coder->mutex to prevent finishing threads from moving their
1011	// progress info from the worker_thread structure to lzma_stream_coder.
1012	mythread_sync(coder->mutex) {
1013		*progress_in = coder->progress_in;
1014		*progress_out = coder->progress_out;
1015
1016		for (size_t i = 0; i < coder->threads_initialized; ++i) {
1017			mythread_sync(coder->threads[i].mutex) {
1018				*progress_in += coder->threads[i].progress_in;
1019				*progress_out += coder->threads[i]
1020						.progress_out;
1021			}
1022		}
1023	}
1024
1025	return;
1026}
1027
1028
1029static lzma_ret
1030stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
1031		const lzma_mt *options)
1032{
1033	lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
1034
1035	// Get the filter chain.
1036	lzma_options_easy easy;
1037	const lzma_filter *filters;
1038	uint64_t block_size;
1039	uint64_t outbuf_size_max;
1040	return_if_error(get_options(options, &easy, &filters,
1041			&block_size, &outbuf_size_max));
1042
1043#if SIZE_MAX < UINT64_MAX
1044	if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX)
1045		return LZMA_MEM_ERROR;
1046#endif
1047
1048	// Validate the filter chain so that we can give an error in this
1049	// function instead of delaying it to the first call to lzma_code().
1050	// The memory usage calculation verifies the filter chain as
1051	// a side effect so we take advantage of that. It's not a perfect
1052	// check though as raw encoder allows LZMA1 too but such problems
1053	// will be caught eventually with Block Header encoder.
1054	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
1055		return LZMA_OPTIONS_ERROR;
1056
1057	// Validate the Check ID.
1058	if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
1059		return LZMA_PROG_ERROR;
1060
1061	if (!lzma_check_is_supported(options->check))
1062		return LZMA_UNSUPPORTED_CHECK;
1063
1064	// Allocate and initialize the base structure if needed.
1065	lzma_stream_coder *coder = next->coder;
1066	if (coder == NULL) {
1067		coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
1068		if (coder == NULL)
1069			return LZMA_MEM_ERROR;
1070
1071		next->coder = coder;
1072
1073		// For the mutex and condition variable initializations
1074		// the error handling has to be done here because
1075		// stream_encoder_mt_end() doesn't know if they have
1076		// already been initialized or not.
1077		if (mythread_mutex_init(&coder->mutex)) {
1078			lzma_free(coder, allocator);
1079			next->coder = NULL;
1080			return LZMA_MEM_ERROR;
1081		}
1082
1083		if (mythread_cond_init(&coder->cond)) {
1084			mythread_mutex_destroy(&coder->mutex);
1085			lzma_free(coder, allocator);
1086			next->coder = NULL;
1087			return LZMA_MEM_ERROR;
1088		}
1089
1090		next->code = &stream_encode_mt;
1091		next->end = &stream_encoder_mt_end;
1092		next->get_progress = &get_progress;
1093		next->update = &stream_encoder_mt_update;
1094
1095		coder->filters[0].id = LZMA_VLI_UNKNOWN;
1096		coder->filters_cache[0].id = LZMA_VLI_UNKNOWN;
1097		coder->index_encoder = LZMA_NEXT_CODER_INIT;
1098		coder->index = NULL;
1099		memzero(&coder->outq, sizeof(coder->outq));
1100		coder->threads = NULL;
1101		coder->threads_max = 0;
1102		coder->threads_initialized = 0;
1103	}
1104
1105	// Basic initializations
1106	coder->sequence = SEQ_STREAM_HEADER;
1107	coder->block_size = (size_t)(block_size);
1108	coder->outbuf_alloc_size = (size_t)(outbuf_size_max);
1109	coder->thread_error = LZMA_OK;
1110	coder->thr = NULL;
1111
1112	// Allocate the thread-specific base structures.
1113	assert(options->threads > 0);
1114	if (coder->threads_max != options->threads) {
1115		threads_end(coder, allocator);
1116
1117		coder->threads = NULL;
1118		coder->threads_max = 0;
1119
1120		coder->threads_initialized = 0;
1121		coder->threads_free = NULL;
1122
1123		coder->threads = lzma_alloc(
1124				options->threads * sizeof(worker_thread),
1125				allocator);
1126		if (coder->threads == NULL)
1127			return LZMA_MEM_ERROR;
1128
1129		coder->threads_max = options->threads;
1130	} else {
1131		// Reuse the old structures and threads. Tell the running
1132		// threads to stop and wait until they have stopped.
1133		threads_stop(coder, true);
1134	}
1135
1136	// Output queue
1137	return_if_error(lzma_outq_init(&coder->outq, allocator,
1138			options->threads));
1139
1140	// Timeout
1141	coder->timeout = options->timeout;
1142
1143	// Free the old filter chain and the cache.
1144	lzma_filters_free(coder->filters, allocator);
1145	lzma_filters_free(coder->filters_cache, allocator);
1146
1147	// Copy the new filter chain.
1148	return_if_error(lzma_filters_copy(
1149			filters, coder->filters, allocator));
1150
1151	// Index
1152	lzma_index_end(coder->index, allocator);
1153	coder->index = lzma_index_init(allocator);
1154	if (coder->index == NULL)
1155		return LZMA_MEM_ERROR;
1156
1157	// Stream Header
1158	coder->stream_flags.version = 0;
1159	coder->stream_flags.check = options->check;
1160	return_if_error(lzma_stream_header_encode(
1161			&coder->stream_flags, coder->header));
1162
1163	coder->header_pos = 0;
1164
1165	// Progress info
1166	coder->progress_in = 0;
1167	coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1168
1169	return LZMA_OK;
1170}
1171
1172
1173#ifdef HAVE_SYMBOL_VERSIONS_LINUX
1174// These are for compatibility with binaries linked against liblzma that
1175// has been patched with xz-5.2.2-compat-libs.patch from RHEL/CentOS 7.
1176// Actually that patch didn't create lzma_stream_encoder_mt@XZ_5.2.2
1177// but it has been added here anyway since someone might misread the
1178// RHEL patch and think both @XZ_5.1.2alpha and @XZ_5.2.2 exist.
1179LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.1.2alpha",
1180	lzma_ret, lzma_stream_encoder_mt_512a)(
1181		lzma_stream *strm, const lzma_mt *options)
1182		lzma_nothrow lzma_attr_warn_unused_result
1183		__attribute__((__alias__("lzma_stream_encoder_mt_52")));
1184
1185LZMA_SYMVER_API("lzma_stream_encoder_mt@XZ_5.2.2",
1186	lzma_ret, lzma_stream_encoder_mt_522)(
1187		lzma_stream *strm, const lzma_mt *options)
1188		lzma_nothrow lzma_attr_warn_unused_result
1189		__attribute__((__alias__("lzma_stream_encoder_mt_52")));
1190
1191LZMA_SYMVER_API("lzma_stream_encoder_mt@@XZ_5.2",
1192	lzma_ret, lzma_stream_encoder_mt_52)(
1193		lzma_stream *strm, const lzma_mt *options)
1194		lzma_nothrow lzma_attr_warn_unused_result;
1195
1196#define lzma_stream_encoder_mt lzma_stream_encoder_mt_52
1197#endif
1198extern LZMA_API(lzma_ret)
1199lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1200{
1201	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1202
1203	strm->internal->supported_actions[LZMA_RUN] = true;
1204// 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1205	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1206	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1207	strm->internal->supported_actions[LZMA_FINISH] = true;
1208
1209	return LZMA_OK;
1210}
1211
1212
1213#ifdef HAVE_SYMBOL_VERSIONS_LINUX
1214LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.1.2alpha",
1215	uint64_t, lzma_stream_encoder_mt_memusage_512a)(
1216	const lzma_mt *options) lzma_nothrow lzma_attr_pure
1217	__attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1218
1219LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@XZ_5.2.2",
1220	uint64_t, lzma_stream_encoder_mt_memusage_522)(
1221	const lzma_mt *options) lzma_nothrow lzma_attr_pure
1222	__attribute__((__alias__("lzma_stream_encoder_mt_memusage_52")));
1223
1224LZMA_SYMVER_API("lzma_stream_encoder_mt_memusage@@XZ_5.2",
1225	uint64_t, lzma_stream_encoder_mt_memusage_52)(
1226	const lzma_mt *options) lzma_nothrow lzma_attr_pure;
1227
1228#define lzma_stream_encoder_mt_memusage lzma_stream_encoder_mt_memusage_52
1229#endif
1230// This function name is a monster but it's consistent with the older
1231// monster names. :-( 31 chars is the max that C99 requires so in that
1232// sense it's not too long. ;-)
1233extern LZMA_API(uint64_t)
1234lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1235{
1236	lzma_options_easy easy;
1237	const lzma_filter *filters;
1238	uint64_t block_size;
1239	uint64_t outbuf_size_max;
1240
1241	if (get_options(options, &easy, &filters, &block_size,
1242			&outbuf_size_max) != LZMA_OK)
1243		return UINT64_MAX;
1244
1245	// Memory usage of the input buffers
1246	const uint64_t inbuf_memusage = options->threads * block_size;
1247
1248	// Memory usage of the filter encoders
1249	uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1250	if (filters_memusage == UINT64_MAX)
1251		return UINT64_MAX;
1252
1253	filters_memusage *= options->threads;
1254
1255	// Memory usage of the output queue
1256	const uint64_t outq_memusage = lzma_outq_memusage(
1257			outbuf_size_max, options->threads);
1258	if (outq_memusage == UINT64_MAX)
1259		return UINT64_MAX;
1260
1261	// Sum them with overflow checking.
1262	uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1263			+ sizeof(lzma_stream_coder)
1264			+ options->threads * sizeof(worker_thread);
1265
1266	if (UINT64_MAX - total_memusage < inbuf_memusage)
1267		return UINT64_MAX;
1268
1269	total_memusage += inbuf_memusage;
1270
1271	if (UINT64_MAX - total_memusage < filters_memusage)
1272		return UINT64_MAX;
1273
1274	total_memusage += filters_memusage;
1275
1276	if (UINT64_MAX - total_memusage < outq_memusage)
1277		return UINT64_MAX;
1278
1279	return total_memusage + outq_memusage;
1280}
1281