stream_encoder_mt.c revision 292588
1///////////////////////////////////////////////////////////////////////////////
2//
3/// \file       stream_encoder_mt.c
4/// \brief      Multithreaded .xz Stream encoder
5//
6//  Author:     Lasse Collin
7//
8//  This file has been put into the public domain.
9//  You can do whatever you want with this file.
10//
11///////////////////////////////////////////////////////////////////////////////
12
13#include "filter_encoder.h"
14#include "easy_preset.h"
15#include "block_encoder.h"
16#include "block_buffer_encoder.h"
17#include "index_encoder.h"
18#include "outqueue.h"
19
20
21/// Maximum supported block size. This makes it simpler to prevent integer
22/// overflows if we are given unusually large block size.
23#define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24
25
26typedef enum {
27	/// Waiting for work.
28	THR_IDLE,
29
30	/// Encoding is in progress.
31	THR_RUN,
32
33	/// Encoding is in progress but no more input data will
34	/// be read.
35	THR_FINISH,
36
37	/// The main thread wants the thread to stop whatever it was doing
38	/// but not exit.
39	THR_STOP,
40
41	/// The main thread wants the thread to exit. We could use
42	/// cancellation but since there's stopped anyway, this is lazier.
43	THR_EXIT,
44
45} worker_state;
46
47
48typedef struct worker_thread_s worker_thread;
49struct worker_thread_s {
50	worker_state state;
51
52	/// Input buffer of coder->block_size bytes. The main thread will
53	/// put new input into this and update in_size accordingly. Once
54	/// no more input is coming, state will be set to THR_FINISH.
55	uint8_t *in;
56
57	/// Amount of data available in the input buffer. This is modified
58	/// only by the main thread.
59	size_t in_size;
60
61	/// Output buffer for this thread. This is set by the main
62	/// thread every time a new Block is started with this thread
63	/// structure.
64	lzma_outbuf *outbuf;
65
66	/// Pointer to the main structure is needed when putting this
67	/// thread back to the stack of free threads.
68	lzma_coder *coder;
69
70	/// The allocator is set by the main thread. Since a copy of the
71	/// pointer is kept here, the application must not change the
72	/// allocator before calling lzma_end().
73	const lzma_allocator *allocator;
74
75	/// Amount of uncompressed data that has already been compressed.
76	uint64_t progress_in;
77
78	/// Amount of compressed data that is ready.
79	uint64_t progress_out;
80
81	/// Block encoder
82	lzma_next_coder block_encoder;
83
84	/// Compression options for this Block
85	lzma_block block_options;
86
87	/// Next structure in the stack of free worker threads.
88	worker_thread *next;
89
90	mythread_mutex mutex;
91	mythread_cond cond;
92
93	/// The ID of this thread is used to join the thread
94	/// when it's not needed anymore.
95	mythread thread_id;
96};
97
98
99struct lzma_coder_s {
100	enum {
101		SEQ_STREAM_HEADER,
102		SEQ_BLOCK,
103		SEQ_INDEX,
104		SEQ_STREAM_FOOTER,
105	} sequence;
106
107	/// Start a new Block every block_size bytes of input unless
108	/// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
109	size_t block_size;
110
111	/// The filter chain currently in use
112	lzma_filter filters[LZMA_FILTERS_MAX + 1];
113
114
115	/// Index to hold sizes of the Blocks
116	lzma_index *index;
117
118	/// Index encoder
119	lzma_next_coder index_encoder;
120
121
122	/// Stream Flags for encoding the Stream Header and Stream Footer.
123	lzma_stream_flags stream_flags;
124
125	/// Buffer to hold Stream Header and Stream Footer.
126	uint8_t header[LZMA_STREAM_HEADER_SIZE];
127
128	/// Read position in header[]
129	size_t header_pos;
130
131
132	/// Output buffer queue for compressed data
133	lzma_outq outq;
134
135
136	/// Maximum wait time if cannot use all the input and cannot
137	/// fill the output buffer. This is in milliseconds.
138	uint32_t timeout;
139
140
141	/// Error code from a worker thread
142	lzma_ret thread_error;
143
144	/// Array of allocated thread-specific structures
145	worker_thread *threads;
146
147	/// Number of structures in "threads" above. This is also the
148	/// number of threads that will be created at maximum.
149	uint32_t threads_max;
150
151	/// Number of thread structures that have been initialized, and
152	/// thus the number of worker threads actually created so far.
153	uint32_t threads_initialized;
154
155	/// Stack of free threads. When a thread finishes, it puts itself
156	/// back into this stack. This starts as empty because threads
157	/// are created only when actually needed.
158	worker_thread *threads_free;
159
160	/// The most recent worker thread to which the main thread writes
161	/// the new input from the application.
162	worker_thread *thr;
163
164
165	/// Amount of uncompressed data in Blocks that have already
166	/// been finished.
167	uint64_t progress_in;
168
169	/// Amount of compressed data in Stream Header + Blocks that
170	/// have already been finished.
171	uint64_t progress_out;
172
173
174	mythread_mutex mutex;
175	mythread_cond cond;
176};
177
178
179/// Tell the main thread that something has gone wrong.
180static void
181worker_error(worker_thread *thr, lzma_ret ret)
182{
183	assert(ret != LZMA_OK);
184	assert(ret != LZMA_STREAM_END);
185
186	mythread_sync(thr->coder->mutex) {
187		if (thr->coder->thread_error == LZMA_OK)
188			thr->coder->thread_error = ret;
189
190		mythread_cond_signal(&thr->coder->cond);
191	}
192
193	return;
194}
195
196
197static worker_state
198worker_encode(worker_thread *thr, worker_state state)
199{
200	assert(thr->progress_in == 0);
201	assert(thr->progress_out == 0);
202
203	// Set the Block options.
204	thr->block_options = (lzma_block){
205		.version = 0,
206		.check = thr->coder->stream_flags.check,
207		.compressed_size = thr->coder->outq.buf_size_max,
208		.uncompressed_size = thr->coder->block_size,
209
210		// TODO: To allow changing the filter chain, the filters
211		// array must be copied to each worker_thread.
212		.filters = thr->coder->filters,
213	};
214
215	// Calculate maximum size of the Block Header. This amount is
216	// reserved in the beginning of the buffer so that Block Header
217	// along with Compressed Size and Uncompressed Size can be
218	// written there.
219	lzma_ret ret = lzma_block_header_size(&thr->block_options);
220	if (ret != LZMA_OK) {
221		worker_error(thr, ret);
222		return THR_STOP;
223	}
224
225	// Initialize the Block encoder.
226	ret = lzma_block_encoder_init(&thr->block_encoder,
227			thr->allocator, &thr->block_options);
228	if (ret != LZMA_OK) {
229		worker_error(thr, ret);
230		return THR_STOP;
231	}
232
233	size_t in_pos = 0;
234	size_t in_size = 0;
235
236	thr->outbuf->size = thr->block_options.header_size;
237	const size_t out_size = thr->coder->outq.buf_size_max;
238
239	do {
240		mythread_sync(thr->mutex) {
241			// Store in_pos and out_pos into *thr so that
242			// an application may read them via
243			// lzma_get_progress() to get progress information.
244			//
245			// NOTE: These aren't updated when the encoding
246			// finishes. Instead, the final values are taken
247			// later from thr->outbuf.
248			thr->progress_in = in_pos;
249			thr->progress_out = thr->outbuf->size;
250
251			while (in_size == thr->in_size
252					&& thr->state == THR_RUN)
253				mythread_cond_wait(&thr->cond, &thr->mutex);
254
255			state = thr->state;
256			in_size = thr->in_size;
257		}
258
259		// Return if we were asked to stop or exit.
260		if (state >= THR_STOP)
261			return state;
262
263		lzma_action action = state == THR_FINISH
264				? LZMA_FINISH : LZMA_RUN;
265
266		// Limit the amount of input given to the Block encoder
267		// at once. This way this thread can react fairly quickly
268		// if the main thread wants us to stop or exit.
269		static const size_t in_chunk_max = 16384;
270		size_t in_limit = in_size;
271		if (in_size - in_pos > in_chunk_max) {
272			in_limit = in_pos + in_chunk_max;
273			action = LZMA_RUN;
274		}
275
276		ret = thr->block_encoder.code(
277				thr->block_encoder.coder, thr->allocator,
278				thr->in, &in_pos, in_limit, thr->outbuf->buf,
279				&thr->outbuf->size, out_size, action);
280	} while (ret == LZMA_OK && thr->outbuf->size < out_size);
281
282	switch (ret) {
283	case LZMA_STREAM_END:
284		assert(state == THR_FINISH);
285
286		// Encode the Block Header. By doing it after
287		// the compression, we can store the Compressed Size
288		// and Uncompressed Size fields.
289		ret = lzma_block_header_encode(&thr->block_options,
290				thr->outbuf->buf);
291		if (ret != LZMA_OK) {
292			worker_error(thr, ret);
293			return THR_STOP;
294		}
295
296		break;
297
298	case LZMA_OK:
299		// The data was incompressible. Encode it using uncompressed
300		// LZMA2 chunks.
301		//
302		// First wait that we have gotten all the input.
303		mythread_sync(thr->mutex) {
304			while (thr->state == THR_RUN)
305				mythread_cond_wait(&thr->cond, &thr->mutex);
306
307			state = thr->state;
308			in_size = thr->in_size;
309		}
310
311		if (state >= THR_STOP)
312			return state;
313
314		// Do the encoding. This takes care of the Block Header too.
315		thr->outbuf->size = 0;
316		ret = lzma_block_uncomp_encode(&thr->block_options,
317				thr->in, in_size, thr->outbuf->buf,
318				&thr->outbuf->size, out_size);
319
320		// It shouldn't fail.
321		if (ret != LZMA_OK) {
322			worker_error(thr, LZMA_PROG_ERROR);
323			return THR_STOP;
324		}
325
326		break;
327
328	default:
329		worker_error(thr, ret);
330		return THR_STOP;
331	}
332
333	// Set the size information that will be read by the main thread
334	// to write the Index field.
335	thr->outbuf->unpadded_size
336			= lzma_block_unpadded_size(&thr->block_options);
337	assert(thr->outbuf->unpadded_size != 0);
338	thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
339
340	return THR_FINISH;
341}
342
343
344static MYTHREAD_RET_TYPE
345worker_start(void *thr_ptr)
346{
347	worker_thread *thr = thr_ptr;
348	worker_state state = THR_IDLE; // Init to silence a warning
349
350	while (true) {
351		// Wait for work.
352		mythread_sync(thr->mutex) {
353			while (true) {
354				// The thread is already idle so if we are
355				// requested to stop, just set the state.
356				if (thr->state == THR_STOP) {
357					thr->state = THR_IDLE;
358					mythread_cond_signal(&thr->cond);
359				}
360
361				state = thr->state;
362				if (state != THR_IDLE)
363					break;
364
365				mythread_cond_wait(&thr->cond, &thr->mutex);
366			}
367		}
368
369		assert(state != THR_IDLE);
370		assert(state != THR_STOP);
371
372		if (state <= THR_FINISH)
373			state = worker_encode(thr, state);
374
375		if (state == THR_EXIT)
376			break;
377
378		// Mark the thread as idle unless the main thread has
379		// told us to exit. Signal is needed for the case
380		// where the main thread is waiting for the threads to stop.
381		mythread_sync(thr->mutex) {
382			if (thr->state != THR_EXIT) {
383				thr->state = THR_IDLE;
384				mythread_cond_signal(&thr->cond);
385			}
386		}
387
388		mythread_sync(thr->coder->mutex) {
389			// Mark the output buffer as finished if
390			// no errors occurred.
391			thr->outbuf->finished = state == THR_FINISH;
392
393			// Update the main progress info.
394			thr->coder->progress_in
395					+= thr->outbuf->uncompressed_size;
396			thr->coder->progress_out += thr->outbuf->size;
397			thr->progress_in = 0;
398			thr->progress_out = 0;
399
400			// Return this thread to the stack of free threads.
401			thr->next = thr->coder->threads_free;
402			thr->coder->threads_free = thr;
403
404			mythread_cond_signal(&thr->coder->cond);
405		}
406	}
407
408	// Exiting, free the resources.
409	mythread_mutex_destroy(&thr->mutex);
410	mythread_cond_destroy(&thr->cond);
411
412	lzma_next_end(&thr->block_encoder, thr->allocator);
413	lzma_free(thr->in, thr->allocator);
414	return MYTHREAD_RET_VALUE;
415}
416
417
418/// Make the threads stop but not exit. Optionally wait for them to stop.
419static void
420threads_stop(lzma_coder *coder, bool wait_for_threads)
421{
422	// Tell the threads to stop.
423	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
424		mythread_sync(coder->threads[i].mutex) {
425			coder->threads[i].state = THR_STOP;
426			mythread_cond_signal(&coder->threads[i].cond);
427		}
428	}
429
430	if (!wait_for_threads)
431		return;
432
433	// Wait for the threads to settle in the idle state.
434	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
435		mythread_sync(coder->threads[i].mutex) {
436			while (coder->threads[i].state != THR_IDLE)
437				mythread_cond_wait(&coder->threads[i].cond,
438						&coder->threads[i].mutex);
439		}
440	}
441
442	return;
443}
444
445
446/// Stop the threads and free the resources associated with them.
447/// Wait until the threads have exited.
448static void
449threads_end(lzma_coder *coder, const lzma_allocator *allocator)
450{
451	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
452		mythread_sync(coder->threads[i].mutex) {
453			coder->threads[i].state = THR_EXIT;
454			mythread_cond_signal(&coder->threads[i].cond);
455		}
456	}
457
458	for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
459		int ret = mythread_join(coder->threads[i].thread_id);
460		assert(ret == 0);
461		(void)ret;
462	}
463
464	lzma_free(coder->threads, allocator);
465	return;
466}
467
468
469/// Initialize a new worker_thread structure and create a new thread.
470static lzma_ret
471initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
472{
473	worker_thread *thr = &coder->threads[coder->threads_initialized];
474
475	thr->in = lzma_alloc(coder->block_size, allocator);
476	if (thr->in == NULL)
477		return LZMA_MEM_ERROR;
478
479	if (mythread_mutex_init(&thr->mutex))
480		goto error_mutex;
481
482	if (mythread_cond_init(&thr->cond))
483		goto error_cond;
484
485	thr->state = THR_IDLE;
486	thr->allocator = allocator;
487	thr->coder = coder;
488	thr->progress_in = 0;
489	thr->progress_out = 0;
490	thr->block_encoder = LZMA_NEXT_CODER_INIT;
491
492	if (mythread_create(&thr->thread_id, &worker_start, thr))
493		goto error_thread;
494
495	++coder->threads_initialized;
496	coder->thr = thr;
497
498	return LZMA_OK;
499
500error_thread:
501	mythread_cond_destroy(&thr->cond);
502
503error_cond:
504	mythread_mutex_destroy(&thr->mutex);
505
506error_mutex:
507	lzma_free(thr->in, allocator);
508	return LZMA_MEM_ERROR;
509}
510
511
512static lzma_ret
513get_thread(lzma_coder *coder, const lzma_allocator *allocator)
514{
515	// If there are no free output subqueues, there is no
516	// point to try getting a thread.
517	if (!lzma_outq_has_buf(&coder->outq))
518		return LZMA_OK;
519
520	// If there is a free structure on the stack, use it.
521	mythread_sync(coder->mutex) {
522		if (coder->threads_free != NULL) {
523			coder->thr = coder->threads_free;
524			coder->threads_free = coder->threads_free->next;
525		}
526	}
527
528	if (coder->thr == NULL) {
529		// If there are no uninitialized structures left, return.
530		if (coder->threads_initialized == coder->threads_max)
531			return LZMA_OK;
532
533		// Initialize a new thread.
534		return_if_error(initialize_new_thread(coder, allocator));
535	}
536
537	// Reset the parts of the thread state that have to be done
538	// in the main thread.
539	mythread_sync(coder->thr->mutex) {
540		coder->thr->state = THR_RUN;
541		coder->thr->in_size = 0;
542		coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
543		mythread_cond_signal(&coder->thr->cond);
544	}
545
546	return LZMA_OK;
547}
548
549
550static lzma_ret
551stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
552		const uint8_t *restrict in, size_t *restrict in_pos,
553		size_t in_size, lzma_action action)
554{
555	while (*in_pos < in_size
556			|| (coder->thr != NULL && action != LZMA_RUN)) {
557		if (coder->thr == NULL) {
558			// Get a new thread.
559			const lzma_ret ret = get_thread(coder, allocator);
560			if (coder->thr == NULL)
561				return ret;
562		}
563
564		// Copy the input data to thread's buffer.
565		size_t thr_in_size = coder->thr->in_size;
566		lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
567				&thr_in_size, coder->block_size);
568
569		// Tell the Block encoder to finish if
570		//  - it has got block_size bytes of input; or
571		//  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
572		//    or LZMA_FULL_BARRIER was used.
573		//
574		// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
575		const bool finish = thr_in_size == coder->block_size
576				|| (*in_pos == in_size && action != LZMA_RUN);
577
578		bool block_error = false;
579
580		mythread_sync(coder->thr->mutex) {
581			if (coder->thr->state == THR_IDLE) {
582				// Something has gone wrong with the Block
583				// encoder. It has set coder->thread_error
584				// which we will read a few lines later.
585				block_error = true;
586			} else {
587				// Tell the Block encoder its new amount
588				// of input and update the state if needed.
589				coder->thr->in_size = thr_in_size;
590
591				if (finish)
592					coder->thr->state = THR_FINISH;
593
594				mythread_cond_signal(&coder->thr->cond);
595			}
596		}
597
598		if (block_error) {
599			lzma_ret ret;
600
601			mythread_sync(coder->mutex) {
602				ret = coder->thread_error;
603			}
604
605			return ret;
606		}
607
608		if (finish)
609			coder->thr = NULL;
610	}
611
612	return LZMA_OK;
613}
614
615
616/// Wait until more input can be consumed, more output can be read, or
617/// an optional timeout is reached.
618static bool
619wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
620		bool *has_blocked, bool has_input)
621{
622	if (coder->timeout != 0 && !*has_blocked) {
623		// Every time when stream_encode_mt() is called via
624		// lzma_code(), *has_blocked starts as false. We set it
625		// to true here and calculate the absolute time when
626		// we must return if there's nothing to do.
627		//
628		// The idea of *has_blocked is to avoid unneeded calls
629		// to mythread_condtime_set(), which may do a syscall
630		// depending on the operating system.
631		*has_blocked = true;
632		mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
633	}
634
635	bool timed_out = false;
636
637	mythread_sync(coder->mutex) {
638		// There are four things that we wait. If one of them
639		// becomes possible, we return.
640		//  - If there is input left, we need to get a free
641		//    worker thread and an output buffer for it.
642		//  - Data ready to be read from the output queue.
643		//  - A worker thread indicates an error.
644		//  - Time out occurs.
645		while ((!has_input || coder->threads_free == NULL
646					|| !lzma_outq_has_buf(&coder->outq))
647				&& !lzma_outq_is_readable(&coder->outq)
648				&& coder->thread_error == LZMA_OK
649				&& !timed_out) {
650			if (coder->timeout != 0)
651				timed_out = mythread_cond_timedwait(
652						&coder->cond, &coder->mutex,
653						wait_abs) != 0;
654			else
655				mythread_cond_wait(&coder->cond,
656						&coder->mutex);
657		}
658	}
659
660	return timed_out;
661}
662
663
664static lzma_ret
665stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
666		const uint8_t *restrict in, size_t *restrict in_pos,
667		size_t in_size, uint8_t *restrict out,
668		size_t *restrict out_pos, size_t out_size, lzma_action action)
669{
670	switch (coder->sequence) {
671	case SEQ_STREAM_HEADER:
672		lzma_bufcpy(coder->header, &coder->header_pos,
673				sizeof(coder->header),
674				out, out_pos, out_size);
675		if (coder->header_pos < sizeof(coder->header))
676			return LZMA_OK;
677
678		coder->header_pos = 0;
679		coder->sequence = SEQ_BLOCK;
680
681	// Fall through
682
683	case SEQ_BLOCK: {
684		// Initialized to silence warnings.
685		lzma_vli unpadded_size = 0;
686		lzma_vli uncompressed_size = 0;
687		lzma_ret ret = LZMA_OK;
688
689		// These are for wait_for_work().
690		bool has_blocked = false;
691		mythread_condtime wait_abs;
692
693		while (true) {
694			mythread_sync(coder->mutex) {
695				// Check for Block encoder errors.
696				ret = coder->thread_error;
697				if (ret != LZMA_OK) {
698					assert(ret != LZMA_STREAM_END);
699					break;
700				}
701
702				// Try to read compressed data to out[].
703				ret = lzma_outq_read(&coder->outq,
704						out, out_pos, out_size,
705						&unpadded_size,
706						&uncompressed_size);
707			}
708
709			if (ret == LZMA_STREAM_END) {
710				// End of Block. Add it to the Index.
711				ret = lzma_index_append(coder->index,
712						allocator, unpadded_size,
713						uncompressed_size);
714
715				// If we didn't fill the output buffer yet,
716				// try to read more data. Maybe the next
717				// outbuf has been finished already too.
718				if (*out_pos < out_size)
719					continue;
720			}
721
722			if (ret != LZMA_OK) {
723				// coder->thread_error was set or
724				// lzma_index_append() failed.
725				threads_stop(coder, false);
726				return ret;
727			}
728
729			// Try to give uncompressed data to a worker thread.
730			ret = stream_encode_in(coder, allocator,
731					in, in_pos, in_size, action);
732			if (ret != LZMA_OK) {
733				threads_stop(coder, false);
734				return ret;
735			}
736
737			// See if we should wait or return.
738			//
739			// TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
740			if (*in_pos == in_size) {
741				// LZMA_RUN: More data is probably coming
742				// so return to let the caller fill the
743				// input buffer.
744				if (action == LZMA_RUN)
745					return LZMA_OK;
746
747				// LZMA_FULL_BARRIER: The same as with
748				// LZMA_RUN but tell the caller that the
749				// barrier was completed.
750				if (action == LZMA_FULL_BARRIER)
751					return LZMA_STREAM_END;
752
753				// Finishing or flushing isn't completed until
754				// all input data has been encoded and copied
755				// to the output buffer.
756				if (lzma_outq_is_empty(&coder->outq)) {
757					// LZMA_FINISH: Continue to encode
758					// the Index field.
759					if (action == LZMA_FINISH)
760						break;
761
762					// LZMA_FULL_FLUSH: Return to tell
763					// the caller that flushing was
764					// completed.
765					if (action == LZMA_FULL_FLUSH)
766						return LZMA_STREAM_END;
767				}
768			}
769
770			// Return if there is no output space left.
771			// This check must be done after testing the input
772			// buffer, because we might want to use a different
773			// return code.
774			if (*out_pos == out_size)
775				return LZMA_OK;
776
777			// Neither in nor out has been used completely.
778			// Wait until there's something we can do.
779			if (wait_for_work(coder, &wait_abs, &has_blocked,
780					*in_pos < in_size))
781				return LZMA_TIMED_OUT;
782		}
783
784		// All Blocks have been encoded and the threads have stopped.
785		// Prepare to encode the Index field.
786		return_if_error(lzma_index_encoder_init(
787				&coder->index_encoder, allocator,
788				coder->index));
789		coder->sequence = SEQ_INDEX;
790
791		// Update the progress info to take the Index and
792		// Stream Footer into account. Those are very fast to encode
793		// so in terms of progress information they can be thought
794		// to be ready to be copied out.
795		coder->progress_out += lzma_index_size(coder->index)
796				+ LZMA_STREAM_HEADER_SIZE;
797	}
798
799	// Fall through
800
801	case SEQ_INDEX: {
802		// Call the Index encoder. It doesn't take any input, so
803		// those pointers can be NULL.
804		const lzma_ret ret = coder->index_encoder.code(
805				coder->index_encoder.coder, allocator,
806				NULL, NULL, 0,
807				out, out_pos, out_size, LZMA_RUN);
808		if (ret != LZMA_STREAM_END)
809			return ret;
810
811		// Encode the Stream Footer into coder->buffer.
812		coder->stream_flags.backward_size
813				= lzma_index_size(coder->index);
814		if (lzma_stream_footer_encode(&coder->stream_flags,
815				coder->header) != LZMA_OK)
816			return LZMA_PROG_ERROR;
817
818		coder->sequence = SEQ_STREAM_FOOTER;
819	}
820
821	// Fall through
822
823	case SEQ_STREAM_FOOTER:
824		lzma_bufcpy(coder->header, &coder->header_pos,
825				sizeof(coder->header),
826				out, out_pos, out_size);
827		return coder->header_pos < sizeof(coder->header)
828				? LZMA_OK : LZMA_STREAM_END;
829	}
830
831	assert(0);
832	return LZMA_PROG_ERROR;
833}
834
835
836static void
837stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator)
838{
839	// Threads must be killed before the output queue can be freed.
840	threads_end(coder, allocator);
841	lzma_outq_end(&coder->outq, allocator);
842
843	for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
844		lzma_free(coder->filters[i].options, allocator);
845
846	lzma_next_end(&coder->index_encoder, allocator);
847	lzma_index_end(coder->index, allocator);
848
849	mythread_cond_destroy(&coder->cond);
850	mythread_mutex_destroy(&coder->mutex);
851
852	lzma_free(coder, allocator);
853	return;
854}
855
856
857/// Options handling for lzma_stream_encoder_mt_init() and
858/// lzma_stream_encoder_mt_memusage()
859static lzma_ret
860get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
861		const lzma_filter **filters, uint64_t *block_size,
862		uint64_t *outbuf_size_max)
863{
864	// Validate some of the options.
865	if (options == NULL)
866		return LZMA_PROG_ERROR;
867
868	if (options->flags != 0 || options->threads == 0
869			|| options->threads > LZMA_THREADS_MAX)
870		return LZMA_OPTIONS_ERROR;
871
872	if (options->filters != NULL) {
873		// Filter chain was given, use it as is.
874		*filters = options->filters;
875	} else {
876		// Use a preset.
877		if (lzma_easy_preset(opt_easy, options->preset))
878			return LZMA_OPTIONS_ERROR;
879
880		*filters = opt_easy->filters;
881	}
882
883	// Block size
884	if (options->block_size > 0) {
885		if (options->block_size > BLOCK_SIZE_MAX)
886			return LZMA_OPTIONS_ERROR;
887
888		*block_size = options->block_size;
889	} else {
890		// Determine the Block size from the filter chain.
891		*block_size = lzma_mt_block_size(*filters);
892		if (*block_size == 0)
893			return LZMA_OPTIONS_ERROR;
894
895		assert(*block_size <= BLOCK_SIZE_MAX);
896	}
897
898	// Calculate the maximum amount output that a single output buffer
899	// may need to hold. This is the same as the maximum total size of
900	// a Block.
901	*outbuf_size_max = lzma_block_buffer_bound64(*block_size);
902	if (*outbuf_size_max == 0)
903		return LZMA_MEM_ERROR;
904
905	return LZMA_OK;
906}
907
908
909static void
910get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
911{
912	// Lock coder->mutex to prevent finishing threads from moving their
913	// progress info from the worker_thread structure to lzma_coder.
914	mythread_sync(coder->mutex) {
915		*progress_in = coder->progress_in;
916		*progress_out = coder->progress_out;
917
918		for (size_t i = 0; i < coder->threads_initialized; ++i) {
919			mythread_sync(coder->threads[i].mutex) {
920				*progress_in += coder->threads[i].progress_in;
921				*progress_out += coder->threads[i]
922						.progress_out;
923			}
924		}
925	}
926
927	return;
928}
929
930
931static lzma_ret
932stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
933		const lzma_mt *options)
934{
935	lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
936
937	// Get the filter chain.
938	lzma_options_easy easy;
939	const lzma_filter *filters;
940	uint64_t block_size;
941	uint64_t outbuf_size_max;
942	return_if_error(get_options(options, &easy, &filters,
943			&block_size, &outbuf_size_max));
944
945#if SIZE_MAX < UINT64_MAX
946	if (block_size > SIZE_MAX)
947		return LZMA_MEM_ERROR;
948#endif
949
950	// Validate the filter chain so that we can give an error in this
951	// function instead of delaying it to the first call to lzma_code().
952	// The memory usage calculation verifies the filter chain as
953	// a side effect so we take advatange of that.
954	if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
955		return LZMA_OPTIONS_ERROR;
956
957	// Validate the Check ID.
958	if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
959		return LZMA_PROG_ERROR;
960
961	if (!lzma_check_is_supported(options->check))
962		return LZMA_UNSUPPORTED_CHECK;
963
964	// Allocate and initialize the base structure if needed.
965	if (next->coder == NULL) {
966		next->coder = lzma_alloc(sizeof(lzma_coder), allocator);
967		if (next->coder == NULL)
968			return LZMA_MEM_ERROR;
969
970		// For the mutex and condition variable initializations
971		// the error handling has to be done here because
972		// stream_encoder_mt_end() doesn't know if they have
973		// already been initialized or not.
974		if (mythread_mutex_init(&next->coder->mutex)) {
975			lzma_free(next->coder, allocator);
976			next->coder = NULL;
977			return LZMA_MEM_ERROR;
978		}
979
980		if (mythread_cond_init(&next->coder->cond)) {
981			mythread_mutex_destroy(&next->coder->mutex);
982			lzma_free(next->coder, allocator);
983			next->coder = NULL;
984			return LZMA_MEM_ERROR;
985		}
986
987		next->code = &stream_encode_mt;
988		next->end = &stream_encoder_mt_end;
989		next->get_progress = &get_progress;
990// 		next->update = &stream_encoder_mt_update;
991
992		next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
993		next->coder->index_encoder = LZMA_NEXT_CODER_INIT;
994		next->coder->index = NULL;
995		memzero(&next->coder->outq, sizeof(next->coder->outq));
996		next->coder->threads = NULL;
997		next->coder->threads_max = 0;
998		next->coder->threads_initialized = 0;
999	}
1000
1001	// Basic initializations
1002	next->coder->sequence = SEQ_STREAM_HEADER;
1003	next->coder->block_size = (size_t)(block_size);
1004	next->coder->thread_error = LZMA_OK;
1005	next->coder->thr = NULL;
1006
1007	// Allocate the thread-specific base structures.
1008	assert(options->threads > 0);
1009	if (next->coder->threads_max != options->threads) {
1010		threads_end(next->coder, allocator);
1011
1012		next->coder->threads = NULL;
1013		next->coder->threads_max = 0;
1014
1015		next->coder->threads_initialized = 0;
1016		next->coder->threads_free = NULL;
1017
1018		next->coder->threads = lzma_alloc(
1019				options->threads * sizeof(worker_thread),
1020				allocator);
1021		if (next->coder->threads == NULL)
1022			return LZMA_MEM_ERROR;
1023
1024		next->coder->threads_max = options->threads;
1025	} else {
1026		// Reuse the old structures and threads. Tell the running
1027		// threads to stop and wait until they have stopped.
1028		threads_stop(next->coder, true);
1029	}
1030
1031	// Output queue
1032	return_if_error(lzma_outq_init(&next->coder->outq, allocator,
1033			outbuf_size_max, options->threads));
1034
1035	// Timeout
1036	next->coder->timeout = options->timeout;
1037
1038	// Free the old filter chain and copy the new one.
1039	for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1040		lzma_free(next->coder->filters[i].options, allocator);
1041
1042	return_if_error(lzma_filters_copy(
1043			filters, next->coder->filters, allocator));
1044
1045	// Index
1046	lzma_index_end(next->coder->index, allocator);
1047	next->coder->index = lzma_index_init(allocator);
1048	if (next->coder->index == NULL)
1049		return LZMA_MEM_ERROR;
1050
1051	// Stream Header
1052	next->coder->stream_flags.version = 0;
1053	next->coder->stream_flags.check = options->check;
1054	return_if_error(lzma_stream_header_encode(
1055			&next->coder->stream_flags, next->coder->header));
1056
1057	next->coder->header_pos = 0;
1058
1059	// Progress info
1060	next->coder->progress_in = 0;
1061	next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1062
1063	return LZMA_OK;
1064}
1065
1066
1067extern LZMA_API(lzma_ret)
1068lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1069{
1070	lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1071
1072	strm->internal->supported_actions[LZMA_RUN] = true;
1073// 	strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1074	strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1075	strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1076	strm->internal->supported_actions[LZMA_FINISH] = true;
1077
1078	return LZMA_OK;
1079}
1080
1081
1082// This function name is a monster but it's consistent with the older
1083// monster names. :-( 31 chars is the max that C99 requires so in that
1084// sense it's not too long. ;-)
1085extern LZMA_API(uint64_t)
1086lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1087{
1088	lzma_options_easy easy;
1089	const lzma_filter *filters;
1090	uint64_t block_size;
1091	uint64_t outbuf_size_max;
1092
1093	if (get_options(options, &easy, &filters, &block_size,
1094			&outbuf_size_max) != LZMA_OK)
1095		return UINT64_MAX;
1096
1097	// Memory usage of the input buffers
1098	const uint64_t inbuf_memusage = options->threads * block_size;
1099
1100	// Memory usage of the filter encoders
1101	uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1102	if (filters_memusage == UINT64_MAX)
1103		return UINT64_MAX;
1104
1105	filters_memusage *= options->threads;
1106
1107	// Memory usage of the output queue
1108	const uint64_t outq_memusage = lzma_outq_memusage(
1109			outbuf_size_max, options->threads);
1110	if (outq_memusage == UINT64_MAX)
1111		return UINT64_MAX;
1112
1113	// Sum them with overflow checking.
1114	uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder)
1115			+ options->threads * sizeof(worker_thread);
1116
1117	if (UINT64_MAX - total_memusage < inbuf_memusage)
1118		return UINT64_MAX;
1119
1120	total_memusage += inbuf_memusage;
1121
1122	if (UINT64_MAX - total_memusage < filters_memusage)
1123		return UINT64_MAX;
1124
1125	total_memusage += filters_memusage;
1126
1127	if (UINT64_MAX - total_memusage < outq_memusage)
1128		return UINT64_MAX;
1129
1130	return total_memusage + outq_memusage;
1131}
1132