1///////////////////////////////////////////////////////////////////////////////
2//
3/// \file       stream_encoder_mt.c
4/// \brief      Multithreaded .xz Stream encoder
5//
6//  Author:     Lasse Collin
7//
8//  This file has been put into the public domain.
9//  You can do whatever you want with this file.
10//
11///////////////////////////////////////////////////////////////////////////////
12
13#include "filter_encoder.h"
14#include "easy_preset.h"
15#include "block_encoder.h"
16#include "block_buffer_encoder.h"
17#include "index_encoder.h"
18#include "outqueue.h"
19
20
21/// Maximum supported block size. This makes it simpler to prevent integer
22/// overflows if we are given unusually large block size.
23#define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24
25
26typedef enum {
27	/// Waiting for work.
28	THR_IDLE,
29
30	/// Encoding is in progress.
31	THR_RUN,
32
33	/// Encoding is in progress but no more input data will
34	/// be read.
35	THR_FINISH,
36
37	/// The main thread wants the thread to stop whatever it was doing
38	/// but not exit.
39	THR_STOP,
40
41	/// The main thread wants the thread to exit. We could use
42	/// cancellation but since there's stopped anyway, this is lazier.
43	THR_EXIT,
44
45} worker_state;
46
47typedef struct lzma_stream_coder_s lzma_stream_coder;
48
49typedef struct worker_thread_s worker_thread;
50struct worker_thread_s {
51	worker_state state;
52
53	/// Input buffer of coder->block_size bytes. The main thread will
54	/// put new input into this and update in_size accordingly. Once
55	/// no more input is coming, state will be set to THR_FINISH.
56	uint8_t *in;
57
58	/// Amount of data available in the input buffer. This is modified
59	/// only by the main thread.
60	size_t in_size;
61
62	/// Output buffer for this thread. This is set by the main
63	/// thread every time a new Block is started with this thread
64	/// structure.
65	lzma_outbuf *outbuf;
66
67	/// Pointer to the main structure is needed when putting this
68	/// thread back to the stack of free threads.
69	lzma_stream_coder *coder;
70
71	/// The allocator is set by the main thread. Since a copy of the
72	/// pointer is kept here, the application must not change the
73	/// allocator before calling lzma_end().
74	const lzma_allocator *allocator;
75
76	/// Amount of uncompressed data that has already been compressed.
77	uint64_t progress_in;
78
79	/// Amount of compressed data that is ready.
80	uint64_t progress_out;
81
82	/// Block encoder
83	lzma_next_coder block_encoder;
84
85	/// Compression options for this Block
86	lzma_block block_options;
87
88	/// Next structure in the stack of free worker threads.
89	worker_thread *next;
90
91	mythread_mutex mutex;
92	mythread_cond cond;
93
94	/// The ID of this thread is used to join the thread
95	/// when it's not needed anymore.
96	mythread thread_id;
97};
98
99
100struct lzma_stream_coder_s {
101	enum {
102		SEQ_STREAM_HEADER,
103		SEQ_BLOCK,
104		SEQ_INDEX,
105		SEQ_STREAM_FOOTER,
106	} sequence;
107
108	/// Start a new Block every block_size bytes of input unless
109	/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
110	size_t block_size;
111
112	/// The filter chain currently in use
113	lzma_filter filters[LZMA_FILTERS_MAX + 1];
114
115
116	/// Index to hold sizes of the Blocks
117	lzma_index *index;
118
119	/// Index encoder
120	lzma_next_coder index_encoder;
121
122
123	/// Stream Flags for encoding the Stream Header and Stream Footer.
124	lzma_stream_flags stream_flags;
125
126	/// Buffer to hold Stream Header and Stream Footer.
127	uint8_t header[LZMA_STREAM_HEADER_SIZE];
128
129	/// Read position in header[]
130	size_t header_pos;
131
132
133	/// Output buffer queue for compressed data
134	lzma_outq outq;
135
136
137	/// Maximum wait time if cannot use all the input and cannot
138	/// fill the output buffer. This is in milliseconds.
139	uint32_t timeout;
140
141
142	/// Error code from a worker thread
143	lzma_ret thread_error;
144
145	/// Array of allocated thread-specific structures
146	worker_thread *threads;
147
148	/// Number of structures in "threads" above. This is also the
149	/// number of threads that will be created at maximum.
150	uint32_t threads_max;
151
152	/// Number of thread structures that have been initialized, and
153	/// thus the number of worker threads actually created so far.
154	uint32_t threads_initialized;
155
156	/// Stack of free threads. When a thread finishes, it puts itself
157	/// back into this stack. This starts as empty because threads
158	/// are created only when actually needed.
159	worker_thread *threads_free;
160
161	/// The most recent worker thread to which the main thread writes
162	/// the new input from the application.
163	worker_thread *thr;
164
165
166	/// Amount of uncompressed data in Blocks that have already
167	/// been finished.
168	uint64_t progress_in;
169
170	/// Amount of compressed data in Stream Header + Blocks that
171	/// have already been finished.
172	uint64_t progress_out;
173
174
175	mythread_mutex mutex;
176	mythread_cond cond;
177};
178
179
180/// Tell the main thread that something has gone wrong.
181static void
182worker_error(worker_thread *thr, lzma_ret ret)
183{
184	assert(ret != LZMA_OK);
185	assert(ret != LZMA_STREAM_END);
186
187	mythread_sync(thr->coder->mutex) {
188		if (thr->coder->thread_error == LZMA_OK)
189			thr->coder->thread_error = ret;
190
191		mythread_cond_signal(&thr->coder->cond);
192	}
193
194	return;
195}
196
197
198static worker_state
199worker_encode(worker_thread *thr, worker_state state)
200{
201	assert(thr->progress_in == 0);
202	assert(thr->progress_out == 0);
203
204	// Set the Block options.
205	thr->block_options = (lzma_block){
206		.version = 0,
207		.check = thr->coder->stream_flags.check,
208		.compressed_size = thr->coder->outq.buf_size_max,
209		.uncompressed_size = thr->coder->block_size,
210
211		// TODO: To allow changing the filter chain, the filters
212		// array must be copied to each worker_thread.
213		.filters = thr->coder->filters,
214	};
215
216	// Calculate maximum size of the Block Header. This amount is
217	// reserved in the beginning of the buffer so that Block Header
218	// along with Compressed Size and Uncompressed Size can be
219	// written there.
220	lzma_ret ret = lzma_block_header_size(&thr->block_options);
221	if (ret != LZMA_OK) {
222		worker_error(thr, ret);
223		return THR_STOP;
224	}
225
226	// Initialize the Block encoder.
227	ret = lzma_block_encoder_init(&thr->block_encoder,
228			thr->allocator, &thr->block_options);
229	if (ret != LZMA_OK) {
230		worker_error(thr, ret);
231		return THR_STOP;
232	}
233
234	size_t in_pos = 0;
235	size_t in_size = 0;
236
237	thr->outbuf->size = thr->block_options.header_size;
238	const size_t out_size = thr->coder->outq.buf_size_max;
239
240	do {
241		mythread_sync(thr->mutex) {
242			// Store in_pos and out_pos into *thr so that
243			// an application may read them via
244			// lzma_get_progress() to get progress information.
245			//
246			// NOTE: These aren't updated when the encoding
247			// finishes. Instead, the final values are taken
248			// later from thr->outbuf.
249			thr->progress_in = in_pos;
250			thr->progress_out = thr->outbuf->size;
251
252			while (in_size == thr->in_size
253					&& thr->state == THR_RUN)
254				mythread_cond_wait(&thr->cond, &thr->mutex);
255
256			state = thr->state;
257			in_size = thr->in_size;
258		}
259
260		// Return if we were asked to stop or exit.
261		if (state >= THR_STOP)
262			return state;
263
264		lzma_action action = state == THR_FINISH
265				? LZMA_FINISH : LZMA_RUN;
266
267		// Limit the amount of input given to the Block encoder
268		// at once. This way this thread can react fairly quickly
269		// if the main thread wants us to stop or exit.
270		static const size_t in_chunk_max = 16384;
271		size_t in_limit = in_size;
272		if (in_size - in_pos > in_chunk_max) {
273			in_limit = in_pos + in_chunk_max;
274			action = LZMA_RUN;
275		}
276
277		ret = thr->block_encoder.code(
278				thr->block_encoder.coder, thr->allocator,
279				thr->in, &in_pos, in_limit, thr->outbuf->buf,
280				&thr->outbuf->size, out_size, action);
281	} while (ret == LZMA_OK && thr->outbuf->size < out_size);
282
283	switch (ret) {
284	case LZMA_STREAM_END:
285		assert(state == THR_FINISH);
286
287		// Encode the Block Header. By doing it after
288		// the compression, we can store the Compressed Size
289		// and Uncompressed Size fields.
290		ret = lzma_block_header_encode(&thr->block_options,
291				thr->outbuf->buf);
292		if (ret != LZMA_OK) {
293			worker_error(thr, ret);
294			return THR_STOP;
295		}
296
297		break;
298
299	case LZMA_OK:
300		// The data was incompressible. Encode it using uncompressed
301		// LZMA2 chunks.
302		//
303		// First wait that we have gotten all the input.
304		mythread_sync(thr->mutex) {
305			while (thr->state == THR_RUN)
306				mythread_cond_wait(&thr->cond, &thr->mutex);
307
308			state = thr->state;
309			in_size = thr->in_size;
310		}
311
312		if (state >= THR_STOP)
313			return state;
314
315		// Do the encoding. This takes care of the Block Header too.
316		thr->outbuf->size = 0;
317		ret = lzma_block_uncomp_encode(&thr->block_options,
318				thr->in, in_size, thr->outbuf->buf,
319				&thr->outbuf->size, out_size);
320
321		// It shouldn't fail.
322		if (ret != LZMA_OK) {
323			worker_error(thr, LZMA_PROG_ERROR);
324			return THR_STOP;
325		}
326
327		break;
328
329	default:
330		worker_error(thr, ret);
331		return THR_STOP;
332	}
333
334	// Set the size information that will be read by the main thread
335	// to write the Index field.
336	thr->outbuf->unpadded_size
337			= lzma_block_unpadded_size(&thr->block_options);
338	assert(thr->outbuf->unpadded_size != 0);
339	thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
340
341	return THR_FINISH;
342}
343
344
345static MYTHREAD_RET_TYPE
346worker_start(void *thr_ptr)
347{
348	worker_thread *thr = thr_ptr;
349	worker_state state = THR_IDLE; // Init to silence a warning
350
351	while (true) {
352		// Wait for work.
353		mythread_sync(thr->mutex) {
354			while (true) {
355				// The thread is already idle so if we are
356				// requested to stop, just set the state.
357				if (thr->state == THR_STOP) {
358					thr->state = THR_IDLE;
359					mythread_cond_signal(&thr->cond);
360				}
361
362				state = thr->state;
363				if (state != THR_IDLE)
364					break;
365
366				mythread_cond_wait(&thr->cond, &thr->mutex);
367			}
368		}
369
370		assert(state != THR_IDLE);
371		assert(state != THR_STOP);
372
373		if (state <= THR_FINISH)
374			state = worker_encode(thr, state);
375
376		if (state == THR_EXIT)
377			break;
378
379		// Mark the thread as idle unless the main thread has
380		// told us to exit. Signal is needed for the case
381		// where the main thread is waiting for the threads to stop.
382		mythread_sync(thr->mutex) {
383			if (thr->state != THR_EXIT) {
384				thr->state = THR_IDLE;
385				mythread_cond_signal(&thr->cond);
386			}
387		}
388
389		mythread_sync(thr->coder->mutex) {
390			// Mark the output buffer as finished if
391			// no errors occurred.
392			thr->outbuf->finished = state == THR_FINISH;
393
394			// Update the main progress info.
395			thr->coder->progress_in
396					+= thr->outbuf->uncompressed_size;
397			thr->coder->progress_out += thr->outbuf->size;
398			thr->progress_in = 0;
399			thr->progress_out = 0;
400
401			// Return this thread to the stack of free threads.
402			thr->next = thr->coder->threads_free;
403			thr->coder->threads_free = thr;
404
405			mythread_cond_signal(&thr->coder->cond);
406		}
407	}
408
409	// Exiting, free the resources.
410	mythread_mutex_destroy(&thr->mutex);
411	mythread_cond_destroy(&thr->cond);
412
413	lzma_next_end(&thr->block_encoder, thr->allocator);
414	lzma_free(thr->in, thr->allocator);
415	return MYTHREAD_RET_VALUE;
416}
417
418
419/// Make the threads stop but not exit. Optionally wait for them to stop.
420static void
421threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
422{
423	// Tell the threads to stop.
424	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
425		mythread_sync(coder->threads[i].mutex) {
426			coder->threads[i].state = THR_STOP;
427			mythread_cond_signal(&coder->threads[i].cond);
428		}
429	}
430
431	if (!wait_for_threads)
432		return;
433
434	// Wait for the threads to settle in the idle state.
435	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
436		mythread_sync(coder->threads[i].mutex) {
437			while (coder->threads[i].state != THR_IDLE)
438				mythread_cond_wait(&coder->threads[i].cond,
439						&coder->threads[i].mutex);
440		}
441	}
442
443	return;
444}
445
446
447/// Stop the threads and free the resources associated with them.
448/// Wait until the threads have exited.
449static void
450threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
451{
452	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453		mythread_sync(coder->threads[i].mutex) {
454			coder->threads[i].state = THR_EXIT;
455			mythread_cond_signal(&coder->threads[i].cond);
456		}
457	}
458
459	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460		int ret = mythread_join(coder->threads[i].thread_id);
461		assert(ret == 0);
462		(void)ret;
463	}
464
465	lzma_free(coder->threads, allocator);
466	return;
467}
468
469
470/// Initialize a new worker_thread structure and create a new thread.
471static lzma_ret
472initialize_new_thread(lzma_stream_coder *coder,
473		const lzma_allocator *allocator)
474{
475	worker_thread *thr = &coder->threads[coder->threads_initialized];
476
477	thr->in = lzma_alloc(coder->block_size, allocator);
478	if (thr->in == NULL)
479		return LZMA_MEM_ERROR;
480
481	if (mythread_mutex_init(&thr->mutex))
482		goto error_mutex;
483
484	if (mythread_cond_init(&thr->cond))
485		goto error_cond;
486
487	thr->state = THR_IDLE;
488	thr->allocator = allocator;
489	thr->coder = coder;
490	thr->progress_in = 0;
491	thr->progress_out = 0;
492	thr->block_encoder = LZMA_NEXT_CODER_INIT;
493
494	if (mythread_create(&thr->thread_id, &worker_start, thr))
495		goto error_thread;
496
497	++coder->threads_initialized;
498	coder->thr = thr;
499
500	return LZMA_OK;
501
502error_thread:
503	mythread_cond_destroy(&thr->cond);
504
505error_cond:
506	mythread_mutex_destroy(&thr->mutex);
507
508error_mutex:
509	lzma_free(thr->in, allocator);
510	return LZMA_MEM_ERROR;
511}
512
513
514static lzma_ret
515get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
516{
517	// If there are no free output subqueues, there is no
518	// point to try getting a thread.
519	if (!lzma_outq_has_buf(&coder->outq))
520		return LZMA_OK;
521
522	// If there is a free structure on the stack, use it.
523	mythread_sync(coder->mutex) {
524		if (coder->threads_free != NULL) {
525			coder->thr = coder->threads_free;
526			coder->threads_free = coder->threads_free->next;
527		}
528	}
529
530	if (coder->thr == NULL) {
531		// If there are no uninitialized structures left, return.
532		if (coder->threads_initialized == coder->threads_max)
533			return LZMA_OK;
534
535		// Initialize a new thread.
536		return_if_error(initialize_new_thread(coder, allocator));
537	}
538
539	// Reset the parts of the thread state that have to be done
540	// in the main thread.
541	mythread_sync(coder->thr->mutex) {
542		coder->thr->state = THR_RUN;
543		coder->thr->in_size = 0;
544		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
545		mythread_cond_signal(&coder->thr->cond);
546	}
547
548	return LZMA_OK;
549}
550
551
552static lzma_ret
553stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
554		const uint8_t *restrict in, size_t *restrict in_pos,
555		size_t in_size, lzma_action action)
556{
557	while (*in_pos < in_size
558			|| (coder->thr != NULL && action != LZMA_RUN)) {
559		if (coder->thr == NULL) {
560			// Get a new thread.
561			const lzma_ret ret = get_thread(coder, allocator);
562			if (coder->thr == NULL)
563				return ret;
564		}
565
566		// Copy the input data to thread's buffer.
567		size_t thr_in_size = coder->thr->in_size;
568		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
569				&thr_in_size, coder->block_size);
570
571		// Tell the Block encoder to finish if
572		//  - it has got block_size bytes of input; or
573		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
574		//    or LZMA_FULL_BARRIER was used.
575		//
576		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
577		const bool finish = thr_in_size == coder->block_size
578				|| (*in_pos == in_size && action != LZMA_RUN);
579
580		bool block_error = false;
581
582		mythread_sync(coder->thr->mutex) {
583			if (coder->thr->state == THR_IDLE) {
584				// Something has gone wrong with the Block
585				// encoder. It has set coder->thread_error
586				// which we will read a few lines later.
587				block_error = true;
588			} else {
589				// Tell the Block encoder its new amount
590				// of input and update the state if needed.
591				coder->thr->in_size = thr_in_size;
592
593				if (finish)
594					coder->thr->state = THR_FINISH;
595
596				mythread_cond_signal(&coder->thr->cond);
597			}
598		}
599
600		if (block_error) {
601			lzma_ret ret;
602
603			mythread_sync(coder->mutex) {
604				ret = coder->thread_error;
605			}
606
607			return ret;
608		}
609
610		if (finish)
611			coder->thr = NULL;
612	}
613
614	return LZMA_OK;
615}
616
617
618/// Wait until more input can be consumed, more output can be read, or
619/// an optional timeout is reached.
620static bool
621wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
622		bool *has_blocked, bool has_input)
623{
624	if (coder->timeout != 0 && !*has_blocked) {
625		// Every time when stream_encode_mt() is called via
626		// lzma_code(), *has_blocked starts as false. We set it
627		// to true here and calculate the absolute time when
628		// we must return if there's nothing to do.
629		//
630		// The idea of *has_blocked is to avoid unneeded calls
631		// to mythread_condtime_set(), which may do a syscall
632		// depending on the operating system.
633		*has_blocked = true;
634		mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
635	}
636
637	bool timed_out = false;
638
639	mythread_sync(coder->mutex) {
640		// There are four things that we wait. If one of them
641		// becomes possible, we return.
642		//  - If there is input left, we need to get a free
643		//    worker thread and an output buffer for it.
644		//  - Data ready to be read from the output queue.
645		//  - A worker thread indicates an error.
646		//  - Time out occurs.
647		while ((!has_input || coder->threads_free == NULL
648					|| !lzma_outq_has_buf(&coder->outq))
649				&& !lzma_outq_is_readable(&coder->outq)
650				&& coder->thread_error == LZMA_OK
651				&& !timed_out) {
652			if (coder->timeout != 0)
653				timed_out = mythread_cond_timedwait(
654						&coder->cond, &coder->mutex,
655						wait_abs) != 0;
656			else
657				mythread_cond_wait(&coder->cond,
658						&coder->mutex);
659		}
660	}
661
662	return timed_out;
663}
664
665
666static lzma_ret
667stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
668		const uint8_t *restrict in, size_t *restrict in_pos,
669		size_t in_size, uint8_t *restrict out,
670		size_t *restrict out_pos, size_t out_size, lzma_action action)
671{
672	lzma_stream_coder *coder = coder_ptr;
673
674	switch (coder->sequence) {
675	case SEQ_STREAM_HEADER:
676		lzma_bufcpy(coder->header, &coder->header_pos,
677				sizeof(coder->header),
678				out, out_pos, out_size);
679		if (coder->header_pos < sizeof(coder->header))
680			return LZMA_OK;
681
682		coder->header_pos = 0;
683		coder->sequence = SEQ_BLOCK;
684
685	// Fall through
686
687	case SEQ_BLOCK: {
688		// Initialized to silence warnings.
689		lzma_vli unpadded_size = 0;
690		lzma_vli uncompressed_size = 0;
691		lzma_ret ret = LZMA_OK;
692
693		// These are for wait_for_work().
694		bool has_blocked = false;
695		mythread_condtime wait_abs;
696
697		while (true) {
698			mythread_sync(coder->mutex) {
699				// Check for Block encoder errors.
700				ret = coder->thread_error;
701				if (ret != LZMA_OK) {
702					assert(ret != LZMA_STREAM_END);
703					break;
704				}
705
706				// Try to read compressed data to out[].
707				ret = lzma_outq_read(&coder->outq,
708						out, out_pos, out_size,
709						&unpadded_size,
710						&uncompressed_size);
711			}
712
713			if (ret == LZMA_STREAM_END) {
714				// End of Block. Add it to the Index.
715				ret = lzma_index_append(coder->index,
716						allocator, unpadded_size,
717						uncompressed_size);
718
719				// If we didn't fill the output buffer yet,
720				// try to read more data. Maybe the next
721				// outbuf has been finished already too.
722				if (*out_pos < out_size)
723					continue;
724			}
725
726			if (ret != LZMA_OK) {
727				// coder->thread_error was set or
728				// lzma_index_append() failed.
729				threads_stop(coder, false);
730				return ret;
731			}
732
733			// Try to give uncompressed data to a worker thread.
734			ret = stream_encode_in(coder, allocator,
735					in, in_pos, in_size, action);
736			if (ret != LZMA_OK) {
737				threads_stop(coder, false);
738				return ret;
739			}
740
741			// See if we should wait or return.
742			//
743			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
744			if (*in_pos == in_size) {
745				// LZMA_RUN: More data is probably coming
746				// so return to let the caller fill the
747				// input buffer.
748				if (action == LZMA_RUN)
749					return LZMA_OK;
750
751				// LZMA_FULL_BARRIER: The same as with
752				// LZMA_RUN but tell the caller that the
753				// barrier was completed.
754				if (action == LZMA_FULL_BARRIER)
755					return LZMA_STREAM_END;
756
757				// Finishing or flushing isn't completed until
758				// all input data has been encoded and copied
759				// to the output buffer.
760				if (lzma_outq_is_empty(&coder->outq)) {
761					// LZMA_FINISH: Continue to encode
762					// the Index field.
763					if (action == LZMA_FINISH)
764						break;
765
766					// LZMA_FULL_FLUSH: Return to tell
767					// the caller that flushing was
768					// completed.
769					if (action == LZMA_FULL_FLUSH)
770						return LZMA_STREAM_END;
771				}
772			}
773
774			// Return if there is no output space left.
775			// This check must be done after testing the input
776			// buffer, because we might want to use a different
777			// return code.
778			if (*out_pos == out_size)
779				return LZMA_OK;
780
781			// Neither in nor out has been used completely.
782			// Wait until there's something we can do.
783			if (wait_for_work(coder, &wait_abs, &has_blocked,
784					*in_pos < in_size))
785				return LZMA_TIMED_OUT;
786		}
787
788		// All Blocks have been encoded and the threads have stopped.
789		// Prepare to encode the Index field.
790		return_if_error(lzma_index_encoder_init(
791				&coder->index_encoder, allocator,
792				coder->index));
793		coder->sequence = SEQ_INDEX;
794
795		// Update the progress info to take the Index and
796		// Stream Footer into account. Those are very fast to encode
797		// so in terms of progress information they can be thought
798		// to be ready to be copied out.
799		coder->progress_out += lzma_index_size(coder->index)
800				+ LZMA_STREAM_HEADER_SIZE;
801	}
802
803	// Fall through
804
805	case SEQ_INDEX: {
806		// Call the Index encoder. It doesn't take any input, so
807		// those pointers can be NULL.
808		const lzma_ret ret = coder->index_encoder.code(
809				coder->index_encoder.coder, allocator,
810				NULL, NULL, 0,
811				out, out_pos, out_size, LZMA_RUN);
812		if (ret != LZMA_STREAM_END)
813			return ret;
814
815		// Encode the Stream Footer into coder->buffer.
816		coder->stream_flags.backward_size
817				= lzma_index_size(coder->index);
818		if (lzma_stream_footer_encode(&coder->stream_flags,
819				coder->header) != LZMA_OK)
820			return LZMA_PROG_ERROR;
821
822		coder->sequence = SEQ_STREAM_FOOTER;
823	}
824
825	// Fall through
826
827	case SEQ_STREAM_FOOTER:
828		lzma_bufcpy(coder->header, &coder->header_pos,
829				sizeof(coder->header),
830				out, out_pos, out_size);
831		return coder->header_pos < sizeof(coder->header)
832				? LZMA_OK : LZMA_STREAM_END;
833	}
834
835	assert(0);
836	return LZMA_PROG_ERROR;
837}
838
839
840static void
841stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
842{
843	lzma_stream_coder *coder = coder_ptr;
844
845	// Threads must be killed before the output queue can be freed.
846	threads_end(coder, allocator);
847	lzma_outq_end(&coder->outq, allocator);
848
849	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
850		lzma_free(coder->filters[i].options, allocator);
851
852	lzma_next_end(&coder->index_encoder, allocator);
853	lzma_index_end(coder->index, allocator);
854
855	mythread_cond_destroy(&coder->cond);
856	mythread_mutex_destroy(&coder->mutex);
857
858	lzma_free(coder, allocator);
859	return;
860}
861
862
863/// Options handling for lzma_stream_encoder_mt_init() and
864/// lzma_stream_encoder_mt_memusage()
865static lzma_ret
866get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
867		const lzma_filter **filters, uint64_t *block_size,
868		uint64_t *outbuf_size_max)
869{
870	// Validate some of the options.
871	if (options == NULL)
872		return LZMA_PROG_ERROR;
873
874	if (options->flags != 0 || options->threads == 0
875			|| options->threads > LZMA_THREADS_MAX)
876		return LZMA_OPTIONS_ERROR;
877
878	if (options->filters != NULL) {
879		// Filter chain was given, use it as is.
880		*filters = options->filters;
881	} else {
882		// Use a preset.
883		if (lzma_easy_preset(opt_easy, options->preset))
884			return LZMA_OPTIONS_ERROR;
885
886		*filters = opt_easy->filters;
887	}
888
889	// Block size
890	if (options->block_size > 0) {
891		if (options->block_size > BLOCK_SIZE_MAX)
892			return LZMA_OPTIONS_ERROR;
893
894		*block_size = options->block_size;
895	} else {
896		// Determine the Block size from the filter chain.
897		*block_size = lzma_mt_block_size(*filters);
898		if (*block_size == 0)
899			return LZMA_OPTIONS_ERROR;
900
901		assert(*block_size <= BLOCK_SIZE_MAX);
902	}
903
904	// Calculate the maximum amount output that a single output buffer
905	// may need to hold. This is the same as the maximum total size of
906	// a Block.
907	*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
908	if (*outbuf_size_max == 0)
909		return LZMA_MEM_ERROR;
910
911	return LZMA_OK;
912}
913
914
915static void
916get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
917{
918	lzma_stream_coder *coder = coder_ptr;
919
920	// Lock coder->mutex to prevent finishing threads from moving their
921	// progress info from the worker_thread structure to lzma_stream_coder.
922	mythread_sync(coder->mutex) {
923		*progress_in = coder->progress_in;
924		*progress_out = coder->progress_out;
925
926		for (size_t i = 0; i < coder->threads_initialized; ++i) {
927			mythread_sync(coder->threads[i].mutex) {
928				*progress_in += coder->threads[i].progress_in;
929				*progress_out += coder->threads[i]
930						.progress_out;
931			}
932		}
933	}
934
935	return;
936}
937
938
939static lzma_ret
940stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
941		const lzma_mt *options)
942{
943	lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
944
945	// Get the filter chain.
946	lzma_options_easy easy;
947	const lzma_filter *filters;
948	uint64_t block_size;
949	uint64_t outbuf_size_max;
950	return_if_error(get_options(options, &easy, &filters,
951			&block_size, &outbuf_size_max));
952
953#if SIZE_MAX < UINT64_MAX
954	if (block_size > SIZE_MAX)
955		return LZMA_MEM_ERROR;
956#endif
957
958	// Validate the filter chain so that we can give an error in this
959	// function instead of delaying it to the first call to lzma_code().
960	// The memory usage calculation verifies the filter chain as
961	// a side effect so we take advatange of that.
962	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
963		return LZMA_OPTIONS_ERROR;
964
965	// Validate the Check ID.
966	if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
967		return LZMA_PROG_ERROR;
968
969	if (!lzma_check_is_supported(options->check))
970		return LZMA_UNSUPPORTED_CHECK;
971
972	// Allocate and initialize the base structure if needed.
973	lzma_stream_coder *coder = next->coder;
974	if (coder == NULL) {
975		coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
976		if (coder == NULL)
977			return LZMA_MEM_ERROR;
978
979		next->coder = coder;
980
981		// For the mutex and condition variable initializations
982		// the error handling has to be done here because
983		// stream_encoder_mt_end() doesn't know if they have
984		// already been initialized or not.
985		if (mythread_mutex_init(&coder->mutex)) {
986			lzma_free(coder, allocator);
987			next->coder = NULL;
988			return LZMA_MEM_ERROR;
989		}
990
991		if (mythread_cond_init(&coder->cond)) {
992			mythread_mutex_destroy(&coder->mutex);
993			lzma_free(coder, allocator);
994			next->coder = NULL;
995			return LZMA_MEM_ERROR;
996		}
997
998		next->code = &stream_encode_mt;
999		next->end = &stream_encoder_mt_end;
1000		next->get_progress = &get_progress;
1001// 		next->update = &stream_encoder_mt_update;
1002
1003		coder->filters[0].id = LZMA_VLI_UNKNOWN;
1004		coder->index_encoder = LZMA_NEXT_CODER_INIT;
1005		coder->index = NULL;
1006		memzero(&coder->outq, sizeof(coder->outq));
1007		coder->threads = NULL;
1008		coder->threads_max = 0;
1009		coder->threads_initialized = 0;
1010	}
1011
1012	// Basic initializations
1013	coder->sequence = SEQ_STREAM_HEADER;
1014	coder->block_size = (size_t)(block_size);
1015	coder->thread_error = LZMA_OK;
1016	coder->thr = NULL;
1017
1018	// Allocate the thread-specific base structures.
1019	assert(options->threads > 0);
1020	if (coder->threads_max != options->threads) {
1021		threads_end(coder, allocator);
1022
1023		coder->threads = NULL;
1024		coder->threads_max = 0;
1025
1026		coder->threads_initialized = 0;
1027		coder->threads_free = NULL;
1028
1029		coder->threads = lzma_alloc(
1030				options->threads * sizeof(worker_thread),
1031				allocator);
1032		if (coder->threads == NULL)
1033			return LZMA_MEM_ERROR;
1034
1035		coder->threads_max = options->threads;
1036	} else {
1037		// Reuse the old structures and threads. Tell the running
1038		// threads to stop and wait until they have stopped.
1039		threads_stop(coder, true);
1040	}
1041
1042	// Output queue
1043	return_if_error(lzma_outq_init(&coder->outq, allocator,
1044			outbuf_size_max, options->threads));
1045
1046	// Timeout
1047	coder->timeout = options->timeout;
1048
1049	// Free the old filter chain and copy the new one.
1050	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051		lzma_free(coder->filters[i].options, allocator);
1052
1053	return_if_error(lzma_filters_copy(
1054			filters, coder->filters, allocator));
1055
1056	// Index
1057	lzma_index_end(coder->index, allocator);
1058	coder->index = lzma_index_init(allocator);
1059	if (coder->index == NULL)
1060		return LZMA_MEM_ERROR;
1061
1062	// Stream Header
1063	coder->stream_flags.version = 0;
1064	coder->stream_flags.check = options->check;
1065	return_if_error(lzma_stream_header_encode(
1066			&coder->stream_flags, coder->header));
1067
1068	coder->header_pos = 0;
1069
1070	// Progress info
1071	coder->progress_in = 0;
1072	coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1073
1074	return LZMA_OK;
1075}
1076
1077
1078extern LZMA_API(lzma_ret)
1079lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1080{
1081	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1082
1083	strm->internal->supported_actions[LZMA_RUN] = true;
1084// 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1085	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1086	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1087	strm->internal->supported_actions[LZMA_FINISH] = true;
1088
1089	return LZMA_OK;
1090}
1091
1092
1093// This function name is a monster but it's consistent with the older
1094// monster names. :-( 31 chars is the max that C99 requires so in that
1095// sense it's not too long. ;-)
1096extern LZMA_API(uint64_t)
1097lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1098{
1099	lzma_options_easy easy;
1100	const lzma_filter *filters;
1101	uint64_t block_size;
1102	uint64_t outbuf_size_max;
1103
1104	if (get_options(options, &easy, &filters, &block_size,
1105			&outbuf_size_max) != LZMA_OK)
1106		return UINT64_MAX;
1107
1108	// Memory usage of the input buffers
1109	const uint64_t inbuf_memusage = options->threads * block_size;
1110
1111	// Memory usage of the filter encoders
1112	uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1113	if (filters_memusage == UINT64_MAX)
1114		return UINT64_MAX;
1115
1116	filters_memusage *= options->threads;
1117
1118	// Memory usage of the output queue
1119	const uint64_t outq_memusage = lzma_outq_memusage(
1120			outbuf_size_max, options->threads);
1121	if (outq_memusage == UINT64_MAX)
1122		return UINT64_MAX;
1123
1124	// Sum them with overflow checking.
1125	uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126			+ sizeof(lzma_stream_coder)
1127			+ options->threads * sizeof(worker_thread);
1128
1129	if (UINT64_MAX - total_memusage < inbuf_memusage)
1130		return UINT64_MAX;
1131
1132	total_memusage += inbuf_memusage;
1133
1134	if (UINT64_MAX - total_memusage < filters_memusage)
1135		return UINT64_MAX;
1136
1137	total_memusage += filters_memusage;
1138
1139	if (UINT64_MAX - total_memusage < outq_memusage)
1140		return UINT64_MAX;
1141
1142	return total_memusage + outq_memusage;
1143}
1144