1278307Srpaulo/////////////////////////////////////////////////////////////////////////////// 2278307Srpaulo// 3278307Srpaulo/// \file stream_encoder_mt.c 4278307Srpaulo/// \brief Multithreaded .xz Stream encoder 5278307Srpaulo// 6278307Srpaulo// Author: Lasse Collin 7278307Srpaulo// 8278307Srpaulo// This file has been put into the public domain. 9278307Srpaulo// You can do whatever you want with this file. 10278307Srpaulo// 11278307Srpaulo/////////////////////////////////////////////////////////////////////////////// 12278307Srpaulo 13278307Srpaulo#include "filter_encoder.h" 14278307Srpaulo#include "easy_preset.h" 15278307Srpaulo#include "block_encoder.h" 16278307Srpaulo#include "block_buffer_encoder.h" 17278307Srpaulo#include "index_encoder.h" 18278307Srpaulo#include "outqueue.h" 19278307Srpaulo 20278307Srpaulo 21278307Srpaulo/// Maximum supported block size. This makes it simpler to prevent integer 22278307Srpaulo/// overflows if we are given unusually large block size. 23278307Srpaulo#define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX) 24278307Srpaulo 25278307Srpaulo 26278307Srpaulotypedef enum { 27278307Srpaulo /// Waiting for work. 28278307Srpaulo THR_IDLE, 29278307Srpaulo 30278307Srpaulo /// Encoding is in progress. 31278307Srpaulo THR_RUN, 32278307Srpaulo 33278307Srpaulo /// Encoding is in progress but no more input data will 34278307Srpaulo /// be read. 35278307Srpaulo THR_FINISH, 36278307Srpaulo 37278307Srpaulo /// The main thread wants the thread to stop whatever it was doing 38278307Srpaulo /// but not exit. 39278307Srpaulo THR_STOP, 40278307Srpaulo 41278307Srpaulo /// The main thread wants the thread to exit. We could use 42278307Srpaulo /// cancellation but since there's stopped anyway, this is lazier. 43278307Srpaulo THR_EXIT, 44278307Srpaulo 45278307Srpaulo} worker_state; 46278307Srpaulo 47312518Sdelphijtypedef struct lzma_stream_coder_s lzma_stream_coder; 48278307Srpaulo 49278307Srpaulotypedef struct worker_thread_s worker_thread; 50278307Srpaulostruct worker_thread_s { 51278307Srpaulo worker_state state; 52278307Srpaulo 53278307Srpaulo /// Input buffer of coder->block_size bytes. The main thread will 54278307Srpaulo /// put new input into this and update in_size accordingly. Once 55278307Srpaulo /// no more input is coming, state will be set to THR_FINISH. 56278307Srpaulo uint8_t *in; 57278307Srpaulo 58278307Srpaulo /// Amount of data available in the input buffer. This is modified 59278307Srpaulo /// only by the main thread. 60278307Srpaulo size_t in_size; 61278307Srpaulo 62278307Srpaulo /// Output buffer for this thread. This is set by the main 63278307Srpaulo /// thread every time a new Block is started with this thread 64278307Srpaulo /// structure. 65278307Srpaulo lzma_outbuf *outbuf; 66278307Srpaulo 67278307Srpaulo /// Pointer to the main structure is needed when putting this 68278307Srpaulo /// thread back to the stack of free threads. 69312518Sdelphij lzma_stream_coder *coder; 70278307Srpaulo 71278307Srpaulo /// The allocator is set by the main thread. Since a copy of the 72278307Srpaulo /// pointer is kept here, the application must not change the 73278307Srpaulo /// allocator before calling lzma_end(). 74278307Srpaulo const lzma_allocator *allocator; 75278307Srpaulo 76278307Srpaulo /// Amount of uncompressed data that has already been compressed. 77278307Srpaulo uint64_t progress_in; 78278307Srpaulo 79278307Srpaulo /// Amount of compressed data that is ready. 80278307Srpaulo uint64_t progress_out; 81278307Srpaulo 82278307Srpaulo /// Block encoder 83278307Srpaulo lzma_next_coder block_encoder; 84278307Srpaulo 85278307Srpaulo /// Compression options for this Block 86278307Srpaulo lzma_block block_options; 87278307Srpaulo 88278307Srpaulo /// Next structure in the stack of free worker threads. 89278307Srpaulo worker_thread *next; 90278307Srpaulo 91278307Srpaulo mythread_mutex mutex; 92278307Srpaulo mythread_cond cond; 93278307Srpaulo 94278307Srpaulo /// The ID of this thread is used to join the thread 95278307Srpaulo /// when it's not needed anymore. 96278307Srpaulo mythread thread_id; 97278307Srpaulo}; 98278307Srpaulo 99278307Srpaulo 100312518Sdelphijstruct lzma_stream_coder_s { 101278307Srpaulo enum { 102278307Srpaulo SEQ_STREAM_HEADER, 103278307Srpaulo SEQ_BLOCK, 104278307Srpaulo SEQ_INDEX, 105278307Srpaulo SEQ_STREAM_FOOTER, 106278307Srpaulo } sequence; 107278307Srpaulo 108278307Srpaulo /// Start a new Block every block_size bytes of input unless 109278307Srpaulo /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier. 110278307Srpaulo size_t block_size; 111278307Srpaulo 112278307Srpaulo /// The filter chain currently in use 113278307Srpaulo lzma_filter filters[LZMA_FILTERS_MAX + 1]; 114278307Srpaulo 115278307Srpaulo 116278307Srpaulo /// Index to hold sizes of the Blocks 117278307Srpaulo lzma_index *index; 118278307Srpaulo 119278307Srpaulo /// Index encoder 120278307Srpaulo lzma_next_coder index_encoder; 121278307Srpaulo 122278307Srpaulo 123278307Srpaulo /// Stream Flags for encoding the Stream Header and Stream Footer. 124278307Srpaulo lzma_stream_flags stream_flags; 125278307Srpaulo 126278307Srpaulo /// Buffer to hold Stream Header and Stream Footer. 127278307Srpaulo uint8_t header[LZMA_STREAM_HEADER_SIZE]; 128278307Srpaulo 129278307Srpaulo /// Read position in header[] 130278307Srpaulo size_t header_pos; 131278307Srpaulo 132278307Srpaulo 133278307Srpaulo /// Output buffer queue for compressed data 134278307Srpaulo lzma_outq outq; 135278307Srpaulo 136278307Srpaulo 137278307Srpaulo /// Maximum wait time if cannot use all the input and cannot 138278307Srpaulo /// fill the output buffer. This is in milliseconds. 139278307Srpaulo uint32_t timeout; 140278307Srpaulo 141278307Srpaulo 142278307Srpaulo /// Error code from a worker thread 143278307Srpaulo lzma_ret thread_error; 144278307Srpaulo 145278307Srpaulo /// Array of allocated thread-specific structures 146278307Srpaulo worker_thread *threads; 147278307Srpaulo 148278307Srpaulo /// Number of structures in "threads" above. This is also the 149278307Srpaulo /// number of threads that will be created at maximum. 150278307Srpaulo uint32_t threads_max; 151278307Srpaulo 152278307Srpaulo /// Number of thread structures that have been initialized, and 153278307Srpaulo /// thus the number of worker threads actually created so far. 154278307Srpaulo uint32_t threads_initialized; 155278307Srpaulo 156278307Srpaulo /// Stack of free threads. When a thread finishes, it puts itself 157278307Srpaulo /// back into this stack. This starts as empty because threads 158278307Srpaulo /// are created only when actually needed. 159278307Srpaulo worker_thread *threads_free; 160278307Srpaulo 161278307Srpaulo /// The most recent worker thread to which the main thread writes 162278307Srpaulo /// the new input from the application. 163278307Srpaulo worker_thread *thr; 164278307Srpaulo 165278307Srpaulo 166278307Srpaulo /// Amount of uncompressed data in Blocks that have already 167278307Srpaulo /// been finished. 168278307Srpaulo uint64_t progress_in; 169278307Srpaulo 170278307Srpaulo /// Amount of compressed data in Stream Header + Blocks that 171278307Srpaulo /// have already been finished. 172278307Srpaulo uint64_t progress_out; 173278307Srpaulo 174278307Srpaulo 175278307Srpaulo mythread_mutex mutex; 176278307Srpaulo mythread_cond cond; 177278307Srpaulo}; 178278307Srpaulo 179278307Srpaulo 180278307Srpaulo/// Tell the main thread that something has gone wrong. 181278307Srpaulostatic void 182278307Srpauloworker_error(worker_thread *thr, lzma_ret ret) 183278307Srpaulo{ 184278307Srpaulo assert(ret != LZMA_OK); 185278307Srpaulo assert(ret != LZMA_STREAM_END); 186278307Srpaulo 187278307Srpaulo mythread_sync(thr->coder->mutex) { 188278307Srpaulo if (thr->coder->thread_error == LZMA_OK) 189278307Srpaulo thr->coder->thread_error = ret; 190278307Srpaulo 191278307Srpaulo mythread_cond_signal(&thr->coder->cond); 192278307Srpaulo } 193278307Srpaulo 194278307Srpaulo return; 195278307Srpaulo} 196278307Srpaulo 197278307Srpaulo 198278307Srpaulostatic worker_state 199278307Srpauloworker_encode(worker_thread *thr, worker_state state) 200278307Srpaulo{ 201278307Srpaulo assert(thr->progress_in == 0); 202278307Srpaulo assert(thr->progress_out == 0); 203278307Srpaulo 204278307Srpaulo // Set the Block options. 205278307Srpaulo thr->block_options = (lzma_block){ 206278307Srpaulo .version = 0, 207278307Srpaulo .check = thr->coder->stream_flags.check, 208278307Srpaulo .compressed_size = thr->coder->outq.buf_size_max, 209278307Srpaulo .uncompressed_size = thr->coder->block_size, 210278307Srpaulo 211278307Srpaulo // TODO: To allow changing the filter chain, the filters 212278307Srpaulo // array must be copied to each worker_thread. 213278307Srpaulo .filters = thr->coder->filters, 214278307Srpaulo }; 215278307Srpaulo 216278307Srpaulo // Calculate maximum size of the Block Header. This amount is 217278307Srpaulo // reserved in the beginning of the buffer so that Block Header 218278307Srpaulo // along with Compressed Size and Uncompressed Size can be 219278307Srpaulo // written there. 220278307Srpaulo lzma_ret ret = lzma_block_header_size(&thr->block_options); 221278307Srpaulo if (ret != LZMA_OK) { 222278307Srpaulo worker_error(thr, ret); 223278307Srpaulo return THR_STOP; 224278307Srpaulo } 225278307Srpaulo 226278307Srpaulo // Initialize the Block encoder. 227278307Srpaulo ret = lzma_block_encoder_init(&thr->block_encoder, 228278307Srpaulo thr->allocator, &thr->block_options); 229278307Srpaulo if (ret != LZMA_OK) { 230278307Srpaulo worker_error(thr, ret); 231278307Srpaulo return THR_STOP; 232278307Srpaulo } 233278307Srpaulo 234278307Srpaulo size_t in_pos = 0; 235278307Srpaulo size_t in_size = 0; 236278307Srpaulo 237278307Srpaulo thr->outbuf->size = thr->block_options.header_size; 238278307Srpaulo const size_t out_size = thr->coder->outq.buf_size_max; 239278307Srpaulo 240278307Srpaulo do { 241278307Srpaulo mythread_sync(thr->mutex) { 242278307Srpaulo // Store in_pos and out_pos into *thr so that 243278307Srpaulo // an application may read them via 244278307Srpaulo // lzma_get_progress() to get progress information. 245278307Srpaulo // 246278307Srpaulo // NOTE: These aren't updated when the encoding 247278307Srpaulo // finishes. Instead, the final values are taken 248278307Srpaulo // later from thr->outbuf. 249278307Srpaulo thr->progress_in = in_pos; 250278307Srpaulo thr->progress_out = thr->outbuf->size; 251278307Srpaulo 252278307Srpaulo while (in_size == thr->in_size 253278307Srpaulo && thr->state == THR_RUN) 254278307Srpaulo mythread_cond_wait(&thr->cond, &thr->mutex); 255278307Srpaulo 256278307Srpaulo state = thr->state; 257278307Srpaulo in_size = thr->in_size; 258278307Srpaulo } 259278307Srpaulo 260278307Srpaulo // Return if we were asked to stop or exit. 261278307Srpaulo if (state >= THR_STOP) 262278307Srpaulo return state; 263278307Srpaulo 264278307Srpaulo lzma_action action = state == THR_FINISH 265278307Srpaulo ? LZMA_FINISH : LZMA_RUN; 266278307Srpaulo 267278307Srpaulo // Limit the amount of input given to the Block encoder 268278307Srpaulo // at once. This way this thread can react fairly quickly 269278307Srpaulo // if the main thread wants us to stop or exit. 270278307Srpaulo static const size_t in_chunk_max = 16384; 271278307Srpaulo size_t in_limit = in_size; 272278307Srpaulo if (in_size - in_pos > in_chunk_max) { 273278307Srpaulo in_limit = in_pos + in_chunk_max; 274278307Srpaulo action = LZMA_RUN; 275278307Srpaulo } 276278307Srpaulo 277278307Srpaulo ret = thr->block_encoder.code( 278278307Srpaulo thr->block_encoder.coder, thr->allocator, 279278307Srpaulo thr->in, &in_pos, in_limit, thr->outbuf->buf, 280278307Srpaulo &thr->outbuf->size, out_size, action); 281278307Srpaulo } while (ret == LZMA_OK && thr->outbuf->size < out_size); 282278307Srpaulo 283278307Srpaulo switch (ret) { 284278307Srpaulo case LZMA_STREAM_END: 285278307Srpaulo assert(state == THR_FINISH); 286278307Srpaulo 287278307Srpaulo // Encode the Block Header. By doing it after 288278307Srpaulo // the compression, we can store the Compressed Size 289278307Srpaulo // and Uncompressed Size fields. 290278307Srpaulo ret = lzma_block_header_encode(&thr->block_options, 291278307Srpaulo thr->outbuf->buf); 292278307Srpaulo if (ret != LZMA_OK) { 293278307Srpaulo worker_error(thr, ret); 294278307Srpaulo return THR_STOP; 295278307Srpaulo } 296278307Srpaulo 297278307Srpaulo break; 298278307Srpaulo 299278307Srpaulo case LZMA_OK: 300278307Srpaulo // The data was incompressible. Encode it using uncompressed 301278307Srpaulo // LZMA2 chunks. 302278307Srpaulo // 303278307Srpaulo // First wait that we have gotten all the input. 304278307Srpaulo mythread_sync(thr->mutex) { 305278307Srpaulo while (thr->state == THR_RUN) 306278307Srpaulo mythread_cond_wait(&thr->cond, &thr->mutex); 307278307Srpaulo 308278307Srpaulo state = thr->state; 309278307Srpaulo in_size = thr->in_size; 310278307Srpaulo } 311278307Srpaulo 312278307Srpaulo if (state >= THR_STOP) 313278307Srpaulo return state; 314278307Srpaulo 315278307Srpaulo // Do the encoding. This takes care of the Block Header too. 316278307Srpaulo thr->outbuf->size = 0; 317278307Srpaulo ret = lzma_block_uncomp_encode(&thr->block_options, 318278307Srpaulo thr->in, in_size, thr->outbuf->buf, 319278307Srpaulo &thr->outbuf->size, out_size); 320278307Srpaulo 321278307Srpaulo // It shouldn't fail. 322278307Srpaulo if (ret != LZMA_OK) { 323278307Srpaulo worker_error(thr, LZMA_PROG_ERROR); 324278307Srpaulo return THR_STOP; 325278307Srpaulo } 326278307Srpaulo 327278307Srpaulo break; 328278307Srpaulo 329278307Srpaulo default: 330278307Srpaulo worker_error(thr, ret); 331278307Srpaulo return THR_STOP; 332278307Srpaulo } 333278307Srpaulo 334278307Srpaulo // Set the size information that will be read by the main thread 335278307Srpaulo // to write the Index field. 336278307Srpaulo thr->outbuf->unpadded_size 337278307Srpaulo = lzma_block_unpadded_size(&thr->block_options); 338278307Srpaulo assert(thr->outbuf->unpadded_size != 0); 339278307Srpaulo thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; 340278307Srpaulo 341278307Srpaulo return THR_FINISH; 342278307Srpaulo} 343278307Srpaulo 344278307Srpaulo 345278307Srpaulostatic MYTHREAD_RET_TYPE 346278307Srpauloworker_start(void *thr_ptr) 347278307Srpaulo{ 348278307Srpaulo worker_thread *thr = thr_ptr; 349278307Srpaulo worker_state state = THR_IDLE; // Init to silence a warning 350278307Srpaulo 351278307Srpaulo while (true) { 352278307Srpaulo // Wait for work. 353278307Srpaulo mythread_sync(thr->mutex) { 354278307Srpaulo while (true) { 355278307Srpaulo // The thread is already idle so if we are 356278307Srpaulo // requested to stop, just set the state. 357278307Srpaulo if (thr->state == THR_STOP) { 358278307Srpaulo thr->state = THR_IDLE; 359278307Srpaulo mythread_cond_signal(&thr->cond); 360278307Srpaulo } 361278307Srpaulo 362278307Srpaulo state = thr->state; 363278307Srpaulo if (state != THR_IDLE) 364278307Srpaulo break; 365278307Srpaulo 366278307Srpaulo mythread_cond_wait(&thr->cond, &thr->mutex); 367278307Srpaulo } 368278307Srpaulo } 369278307Srpaulo 370278307Srpaulo assert(state != THR_IDLE); 371278307Srpaulo assert(state != THR_STOP); 372278307Srpaulo 373278307Srpaulo if (state <= THR_FINISH) 374278307Srpaulo state = worker_encode(thr, state); 375278307Srpaulo 376278307Srpaulo if (state == THR_EXIT) 377278307Srpaulo break; 378278307Srpaulo 379278307Srpaulo // Mark the thread as idle unless the main thread has 380278307Srpaulo // told us to exit. Signal is needed for the case 381278307Srpaulo // where the main thread is waiting for the threads to stop. 382278307Srpaulo mythread_sync(thr->mutex) { 383278307Srpaulo if (thr->state != THR_EXIT) { 384278307Srpaulo thr->state = THR_IDLE; 385278307Srpaulo mythread_cond_signal(&thr->cond); 386278307Srpaulo } 387278307Srpaulo } 388278307Srpaulo 389278307Srpaulo mythread_sync(thr->coder->mutex) { 390278307Srpaulo // Mark the output buffer as finished if 391278307Srpaulo // no errors occurred. 392278307Srpaulo thr->outbuf->finished = state == THR_FINISH; 393278307Srpaulo 394278307Srpaulo // Update the main progress info. 395278307Srpaulo thr->coder->progress_in 396278307Srpaulo += thr->outbuf->uncompressed_size; 397278307Srpaulo thr->coder->progress_out += thr->outbuf->size; 398278307Srpaulo thr->progress_in = 0; 399278307Srpaulo thr->progress_out = 0; 400278307Srpaulo 401278307Srpaulo // Return this thread to the stack of free threads. 402278307Srpaulo thr->next = thr->coder->threads_free; 403278307Srpaulo thr->coder->threads_free = thr; 404278307Srpaulo 405278307Srpaulo mythread_cond_signal(&thr->coder->cond); 406278307Srpaulo } 407278307Srpaulo } 408278307Srpaulo 409278307Srpaulo // Exiting, free the resources. 410278307Srpaulo mythread_mutex_destroy(&thr->mutex); 411278307Srpaulo mythread_cond_destroy(&thr->cond); 412278307Srpaulo 413278307Srpaulo lzma_next_end(&thr->block_encoder, thr->allocator); 414278307Srpaulo lzma_free(thr->in, thr->allocator); 415278307Srpaulo return MYTHREAD_RET_VALUE; 416278307Srpaulo} 417278307Srpaulo 418278307Srpaulo 419278307Srpaulo/// Make the threads stop but not exit. Optionally wait for them to stop. 420278307Srpaulostatic void 421312518Sdelphijthreads_stop(lzma_stream_coder *coder, bool wait_for_threads) 422278307Srpaulo{ 423278307Srpaulo // Tell the threads to stop. 424278307Srpaulo for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 425278307Srpaulo mythread_sync(coder->threads[i].mutex) { 426278307Srpaulo coder->threads[i].state = THR_STOP; 427278307Srpaulo mythread_cond_signal(&coder->threads[i].cond); 428278307Srpaulo } 429278307Srpaulo } 430278307Srpaulo 431278307Srpaulo if (!wait_for_threads) 432278307Srpaulo return; 433278307Srpaulo 434278307Srpaulo // Wait for the threads to settle in the idle state. 435278307Srpaulo for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 436278307Srpaulo mythread_sync(coder->threads[i].mutex) { 437278307Srpaulo while (coder->threads[i].state != THR_IDLE) 438278307Srpaulo mythread_cond_wait(&coder->threads[i].cond, 439278307Srpaulo &coder->threads[i].mutex); 440278307Srpaulo } 441278307Srpaulo } 442278307Srpaulo 443278307Srpaulo return; 444278307Srpaulo} 445278307Srpaulo 446278307Srpaulo 447278307Srpaulo/// Stop the threads and free the resources associated with them. 448278307Srpaulo/// Wait until the threads have exited. 449278307Srpaulostatic void 450312518Sdelphijthreads_end(lzma_stream_coder *coder, const lzma_allocator *allocator) 451278307Srpaulo{ 452278307Srpaulo for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 453278307Srpaulo mythread_sync(coder->threads[i].mutex) { 454278307Srpaulo coder->threads[i].state = THR_EXIT; 455278307Srpaulo mythread_cond_signal(&coder->threads[i].cond); 456278307Srpaulo } 457278307Srpaulo } 458278307Srpaulo 459278307Srpaulo for (uint32_t i = 0; i < coder->threads_initialized; ++i) { 460278307Srpaulo int ret = mythread_join(coder->threads[i].thread_id); 461278307Srpaulo assert(ret == 0); 462278307Srpaulo (void)ret; 463278307Srpaulo } 464278307Srpaulo 465278307Srpaulo lzma_free(coder->threads, allocator); 466278307Srpaulo return; 467278307Srpaulo} 468278307Srpaulo 469278307Srpaulo 470278307Srpaulo/// Initialize a new worker_thread structure and create a new thread. 471278307Srpaulostatic lzma_ret 472312518Sdelphijinitialize_new_thread(lzma_stream_coder *coder, 473312518Sdelphij const lzma_allocator *allocator) 474278307Srpaulo{ 475278307Srpaulo worker_thread *thr = &coder->threads[coder->threads_initialized]; 476278307Srpaulo 477278307Srpaulo thr->in = lzma_alloc(coder->block_size, allocator); 478278307Srpaulo if (thr->in == NULL) 479278307Srpaulo return LZMA_MEM_ERROR; 480278307Srpaulo 481278307Srpaulo if (mythread_mutex_init(&thr->mutex)) 482278307Srpaulo goto error_mutex; 483278307Srpaulo 484278307Srpaulo if (mythread_cond_init(&thr->cond)) 485278307Srpaulo goto error_cond; 486278307Srpaulo 487278307Srpaulo thr->state = THR_IDLE; 488278307Srpaulo thr->allocator = allocator; 489278307Srpaulo thr->coder = coder; 490278307Srpaulo thr->progress_in = 0; 491278307Srpaulo thr->progress_out = 0; 492278307Srpaulo thr->block_encoder = LZMA_NEXT_CODER_INIT; 493278307Srpaulo 494278307Srpaulo if (mythread_create(&thr->thread_id, &worker_start, thr)) 495278307Srpaulo goto error_thread; 496278307Srpaulo 497278307Srpaulo ++coder->threads_initialized; 498278307Srpaulo coder->thr = thr; 499278307Srpaulo 500278307Srpaulo return LZMA_OK; 501278307Srpaulo 502278307Srpauloerror_thread: 503278307Srpaulo mythread_cond_destroy(&thr->cond); 504278307Srpaulo 505278307Srpauloerror_cond: 506278307Srpaulo mythread_mutex_destroy(&thr->mutex); 507278307Srpaulo 508278307Srpauloerror_mutex: 509278307Srpaulo lzma_free(thr->in, allocator); 510278307Srpaulo return LZMA_MEM_ERROR; 511278307Srpaulo} 512278307Srpaulo 513278307Srpaulo 514278307Srpaulostatic lzma_ret 515312518Sdelphijget_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) 516278307Srpaulo{ 517278307Srpaulo // If there are no free output subqueues, there is no 518278307Srpaulo // point to try getting a thread. 519278307Srpaulo if (!lzma_outq_has_buf(&coder->outq)) 520278307Srpaulo return LZMA_OK; 521278307Srpaulo 522278307Srpaulo // If there is a free structure on the stack, use it. 523278307Srpaulo mythread_sync(coder->mutex) { 524278307Srpaulo if (coder->threads_free != NULL) { 525278307Srpaulo coder->thr = coder->threads_free; 526278307Srpaulo coder->threads_free = coder->threads_free->next; 527278307Srpaulo } 528278307Srpaulo } 529278307Srpaulo 530278307Srpaulo if (coder->thr == NULL) { 531278307Srpaulo // If there are no uninitialized structures left, return. 532278307Srpaulo if (coder->threads_initialized == coder->threads_max) 533278307Srpaulo return LZMA_OK; 534278307Srpaulo 535278307Srpaulo // Initialize a new thread. 536278307Srpaulo return_if_error(initialize_new_thread(coder, allocator)); 537278307Srpaulo } 538278307Srpaulo 539278307Srpaulo // Reset the parts of the thread state that have to be done 540278307Srpaulo // in the main thread. 541278307Srpaulo mythread_sync(coder->thr->mutex) { 542278307Srpaulo coder->thr->state = THR_RUN; 543278307Srpaulo coder->thr->in_size = 0; 544278307Srpaulo coder->thr->outbuf = lzma_outq_get_buf(&coder->outq); 545278307Srpaulo mythread_cond_signal(&coder->thr->cond); 546278307Srpaulo } 547278307Srpaulo 548278307Srpaulo return LZMA_OK; 549278307Srpaulo} 550278307Srpaulo 551278307Srpaulo 552278307Srpaulostatic lzma_ret 553312518Sdelphijstream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, 554278307Srpaulo const uint8_t *restrict in, size_t *restrict in_pos, 555278307Srpaulo size_t in_size, lzma_action action) 556278307Srpaulo{ 557278307Srpaulo while (*in_pos < in_size 558278307Srpaulo || (coder->thr != NULL && action != LZMA_RUN)) { 559278307Srpaulo if (coder->thr == NULL) { 560278307Srpaulo // Get a new thread. 561278307Srpaulo const lzma_ret ret = get_thread(coder, allocator); 562278307Srpaulo if (coder->thr == NULL) 563278307Srpaulo return ret; 564278307Srpaulo } 565278307Srpaulo 566278307Srpaulo // Copy the input data to thread's buffer. 567278307Srpaulo size_t thr_in_size = coder->thr->in_size; 568278307Srpaulo lzma_bufcpy(in, in_pos, in_size, coder->thr->in, 569278307Srpaulo &thr_in_size, coder->block_size); 570278307Srpaulo 571278307Srpaulo // Tell the Block encoder to finish if 572278307Srpaulo // - it has got block_size bytes of input; or 573278307Srpaulo // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH, 574278307Srpaulo // or LZMA_FULL_BARRIER was used. 575278307Srpaulo // 576278307Srpaulo // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 577278307Srpaulo const bool finish = thr_in_size == coder->block_size 578278307Srpaulo || (*in_pos == in_size && action != LZMA_RUN); 579278307Srpaulo 580278307Srpaulo bool block_error = false; 581278307Srpaulo 582278307Srpaulo mythread_sync(coder->thr->mutex) { 583278307Srpaulo if (coder->thr->state == THR_IDLE) { 584278307Srpaulo // Something has gone wrong with the Block 585278307Srpaulo // encoder. It has set coder->thread_error 586278307Srpaulo // which we will read a few lines later. 587278307Srpaulo block_error = true; 588278307Srpaulo } else { 589278307Srpaulo // Tell the Block encoder its new amount 590278307Srpaulo // of input and update the state if needed. 591278307Srpaulo coder->thr->in_size = thr_in_size; 592278307Srpaulo 593278307Srpaulo if (finish) 594278307Srpaulo coder->thr->state = THR_FINISH; 595278307Srpaulo 596278307Srpaulo mythread_cond_signal(&coder->thr->cond); 597278307Srpaulo } 598278307Srpaulo } 599278307Srpaulo 600278307Srpaulo if (block_error) { 601278307Srpaulo lzma_ret ret; 602278307Srpaulo 603278307Srpaulo mythread_sync(coder->mutex) { 604278307Srpaulo ret = coder->thread_error; 605278307Srpaulo } 606278307Srpaulo 607278307Srpaulo return ret; 608278307Srpaulo } 609278307Srpaulo 610278307Srpaulo if (finish) 611278307Srpaulo coder->thr = NULL; 612278307Srpaulo } 613278307Srpaulo 614278307Srpaulo return LZMA_OK; 615278307Srpaulo} 616278307Srpaulo 617278307Srpaulo 618278307Srpaulo/// Wait until more input can be consumed, more output can be read, or 619278307Srpaulo/// an optional timeout is reached. 620278307Srpaulostatic bool 621312518Sdelphijwait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, 622278307Srpaulo bool *has_blocked, bool has_input) 623278307Srpaulo{ 624278307Srpaulo if (coder->timeout != 0 && !*has_blocked) { 625278307Srpaulo // Every time when stream_encode_mt() is called via 626278307Srpaulo // lzma_code(), *has_blocked starts as false. We set it 627278307Srpaulo // to true here and calculate the absolute time when 628278307Srpaulo // we must return if there's nothing to do. 629278307Srpaulo // 630278307Srpaulo // The idea of *has_blocked is to avoid unneeded calls 631278307Srpaulo // to mythread_condtime_set(), which may do a syscall 632278307Srpaulo // depending on the operating system. 633278307Srpaulo *has_blocked = true; 634278307Srpaulo mythread_condtime_set(wait_abs, &coder->cond, coder->timeout); 635278307Srpaulo } 636278307Srpaulo 637278307Srpaulo bool timed_out = false; 638278307Srpaulo 639278307Srpaulo mythread_sync(coder->mutex) { 640278307Srpaulo // There are four things that we wait. If one of them 641278307Srpaulo // becomes possible, we return. 642278307Srpaulo // - If there is input left, we need to get a free 643278307Srpaulo // worker thread and an output buffer for it. 644278307Srpaulo // - Data ready to be read from the output queue. 645278307Srpaulo // - A worker thread indicates an error. 646278307Srpaulo // - Time out occurs. 647278307Srpaulo while ((!has_input || coder->threads_free == NULL 648278307Srpaulo || !lzma_outq_has_buf(&coder->outq)) 649278307Srpaulo && !lzma_outq_is_readable(&coder->outq) 650278307Srpaulo && coder->thread_error == LZMA_OK 651278307Srpaulo && !timed_out) { 652278307Srpaulo if (coder->timeout != 0) 653278307Srpaulo timed_out = mythread_cond_timedwait( 654278307Srpaulo &coder->cond, &coder->mutex, 655278307Srpaulo wait_abs) != 0; 656278307Srpaulo else 657278307Srpaulo mythread_cond_wait(&coder->cond, 658278307Srpaulo &coder->mutex); 659278307Srpaulo } 660278307Srpaulo } 661278307Srpaulo 662278307Srpaulo return timed_out; 663278307Srpaulo} 664278307Srpaulo 665278307Srpaulo 666278307Srpaulostatic lzma_ret 667312518Sdelphijstream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, 668278307Srpaulo const uint8_t *restrict in, size_t *restrict in_pos, 669278307Srpaulo size_t in_size, uint8_t *restrict out, 670278307Srpaulo size_t *restrict out_pos, size_t out_size, lzma_action action) 671278307Srpaulo{ 672312518Sdelphij lzma_stream_coder *coder = coder_ptr; 673312518Sdelphij 674278307Srpaulo switch (coder->sequence) { 675278307Srpaulo case SEQ_STREAM_HEADER: 676278307Srpaulo lzma_bufcpy(coder->header, &coder->header_pos, 677278307Srpaulo sizeof(coder->header), 678278307Srpaulo out, out_pos, out_size); 679278307Srpaulo if (coder->header_pos < sizeof(coder->header)) 680278307Srpaulo return LZMA_OK; 681278307Srpaulo 682278307Srpaulo coder->header_pos = 0; 683278307Srpaulo coder->sequence = SEQ_BLOCK; 684278307Srpaulo 685278307Srpaulo // Fall through 686278307Srpaulo 687278307Srpaulo case SEQ_BLOCK: { 688278307Srpaulo // Initialized to silence warnings. 689278307Srpaulo lzma_vli unpadded_size = 0; 690278307Srpaulo lzma_vli uncompressed_size = 0; 691278307Srpaulo lzma_ret ret = LZMA_OK; 692278307Srpaulo 693278307Srpaulo // These are for wait_for_work(). 694278307Srpaulo bool has_blocked = false; 695278307Srpaulo mythread_condtime wait_abs; 696278307Srpaulo 697278307Srpaulo while (true) { 698278307Srpaulo mythread_sync(coder->mutex) { 699278307Srpaulo // Check for Block encoder errors. 700278307Srpaulo ret = coder->thread_error; 701278307Srpaulo if (ret != LZMA_OK) { 702278307Srpaulo assert(ret != LZMA_STREAM_END); 703278307Srpaulo break; 704278307Srpaulo } 705278307Srpaulo 706278307Srpaulo // Try to read compressed data to out[]. 707278307Srpaulo ret = lzma_outq_read(&coder->outq, 708278307Srpaulo out, out_pos, out_size, 709278307Srpaulo &unpadded_size, 710278307Srpaulo &uncompressed_size); 711278307Srpaulo } 712278307Srpaulo 713278307Srpaulo if (ret == LZMA_STREAM_END) { 714278307Srpaulo // End of Block. Add it to the Index. 715278307Srpaulo ret = lzma_index_append(coder->index, 716278307Srpaulo allocator, unpadded_size, 717278307Srpaulo uncompressed_size); 718278307Srpaulo 719278307Srpaulo // If we didn't fill the output buffer yet, 720278307Srpaulo // try to read more data. Maybe the next 721278307Srpaulo // outbuf has been finished already too. 722278307Srpaulo if (*out_pos < out_size) 723278307Srpaulo continue; 724278307Srpaulo } 725278307Srpaulo 726278307Srpaulo if (ret != LZMA_OK) { 727278307Srpaulo // coder->thread_error was set or 728278307Srpaulo // lzma_index_append() failed. 729278307Srpaulo threads_stop(coder, false); 730278307Srpaulo return ret; 731278307Srpaulo } 732278307Srpaulo 733278307Srpaulo // Try to give uncompressed data to a worker thread. 734278307Srpaulo ret = stream_encode_in(coder, allocator, 735278307Srpaulo in, in_pos, in_size, action); 736278307Srpaulo if (ret != LZMA_OK) { 737278307Srpaulo threads_stop(coder, false); 738278307Srpaulo return ret; 739278307Srpaulo } 740278307Srpaulo 741278307Srpaulo // See if we should wait or return. 742278307Srpaulo // 743278307Srpaulo // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER. 744278307Srpaulo if (*in_pos == in_size) { 745278307Srpaulo // LZMA_RUN: More data is probably coming 746278307Srpaulo // so return to let the caller fill the 747278307Srpaulo // input buffer. 748278307Srpaulo if (action == LZMA_RUN) 749278307Srpaulo return LZMA_OK; 750278307Srpaulo 751278307Srpaulo // LZMA_FULL_BARRIER: The same as with 752278307Srpaulo // LZMA_RUN but tell the caller that the 753278307Srpaulo // barrier was completed. 754278307Srpaulo if (action == LZMA_FULL_BARRIER) 755278307Srpaulo return LZMA_STREAM_END; 756278307Srpaulo 757278307Srpaulo // Finishing or flushing isn't completed until 758278307Srpaulo // all input data has been encoded and copied 759278307Srpaulo // to the output buffer. 760278307Srpaulo if (lzma_outq_is_empty(&coder->outq)) { 761278307Srpaulo // LZMA_FINISH: Continue to encode 762278307Srpaulo // the Index field. 763278307Srpaulo if (action == LZMA_FINISH) 764278307Srpaulo break; 765278307Srpaulo 766278307Srpaulo // LZMA_FULL_FLUSH: Return to tell 767278307Srpaulo // the caller that flushing was 768278307Srpaulo // completed. 769278307Srpaulo if (action == LZMA_FULL_FLUSH) 770278307Srpaulo return LZMA_STREAM_END; 771278307Srpaulo } 772278307Srpaulo } 773278307Srpaulo 774278307Srpaulo // Return if there is no output space left. 775278307Srpaulo // This check must be done after testing the input 776278307Srpaulo // buffer, because we might want to use a different 777278307Srpaulo // return code. 778278307Srpaulo if (*out_pos == out_size) 779278307Srpaulo return LZMA_OK; 780278307Srpaulo 781278307Srpaulo // Neither in nor out has been used completely. 782278307Srpaulo // Wait until there's something we can do. 783278307Srpaulo if (wait_for_work(coder, &wait_abs, &has_blocked, 784278307Srpaulo *in_pos < in_size)) 785278307Srpaulo return LZMA_TIMED_OUT; 786278307Srpaulo } 787278307Srpaulo 788278307Srpaulo // All Blocks have been encoded and the threads have stopped. 789278307Srpaulo // Prepare to encode the Index field. 790278307Srpaulo return_if_error(lzma_index_encoder_init( 791278307Srpaulo &coder->index_encoder, allocator, 792278307Srpaulo coder->index)); 793278307Srpaulo coder->sequence = SEQ_INDEX; 794278307Srpaulo 795278307Srpaulo // Update the progress info to take the Index and 796278307Srpaulo // Stream Footer into account. Those are very fast to encode 797278307Srpaulo // so in terms of progress information they can be thought 798278307Srpaulo // to be ready to be copied out. 799278307Srpaulo coder->progress_out += lzma_index_size(coder->index) 800278307Srpaulo + LZMA_STREAM_HEADER_SIZE; 801278307Srpaulo } 802278307Srpaulo 803278307Srpaulo // Fall through 804278307Srpaulo 805278307Srpaulo case SEQ_INDEX: { 806278307Srpaulo // Call the Index encoder. It doesn't take any input, so 807278307Srpaulo // those pointers can be NULL. 808278307Srpaulo const lzma_ret ret = coder->index_encoder.code( 809278307Srpaulo coder->index_encoder.coder, allocator, 810278307Srpaulo NULL, NULL, 0, 811278307Srpaulo out, out_pos, out_size, LZMA_RUN); 812278307Srpaulo if (ret != LZMA_STREAM_END) 813278307Srpaulo return ret; 814278307Srpaulo 815278307Srpaulo // Encode the Stream Footer into coder->buffer. 816278307Srpaulo coder->stream_flags.backward_size 817278307Srpaulo = lzma_index_size(coder->index); 818278307Srpaulo if (lzma_stream_footer_encode(&coder->stream_flags, 819278307Srpaulo coder->header) != LZMA_OK) 820278307Srpaulo return LZMA_PROG_ERROR; 821278307Srpaulo 822278307Srpaulo coder->sequence = SEQ_STREAM_FOOTER; 823278307Srpaulo } 824278307Srpaulo 825278307Srpaulo // Fall through 826278307Srpaulo 827278307Srpaulo case SEQ_STREAM_FOOTER: 828278307Srpaulo lzma_bufcpy(coder->header, &coder->header_pos, 829278307Srpaulo sizeof(coder->header), 830278307Srpaulo out, out_pos, out_size); 831278307Srpaulo return coder->header_pos < sizeof(coder->header) 832278307Srpaulo ? LZMA_OK : LZMA_STREAM_END; 833278307Srpaulo } 834278307Srpaulo 835278307Srpaulo assert(0); 836278307Srpaulo return LZMA_PROG_ERROR; 837278307Srpaulo} 838278307Srpaulo 839278307Srpaulo 840278307Srpaulostatic void 841312518Sdelphijstream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator) 842278307Srpaulo{ 843312518Sdelphij lzma_stream_coder *coder = coder_ptr; 844312518Sdelphij 845278307Srpaulo // Threads must be killed before the output queue can be freed. 846278307Srpaulo threads_end(coder, allocator); 847278307Srpaulo lzma_outq_end(&coder->outq, allocator); 848278307Srpaulo 849278307Srpaulo for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 850278307Srpaulo lzma_free(coder->filters[i].options, allocator); 851278307Srpaulo 852278307Srpaulo lzma_next_end(&coder->index_encoder, allocator); 853278307Srpaulo lzma_index_end(coder->index, allocator); 854278307Srpaulo 855278307Srpaulo mythread_cond_destroy(&coder->cond); 856278307Srpaulo mythread_mutex_destroy(&coder->mutex); 857278307Srpaulo 858278307Srpaulo lzma_free(coder, allocator); 859278307Srpaulo return; 860278307Srpaulo} 861278307Srpaulo 862278307Srpaulo 863278307Srpaulo/// Options handling for lzma_stream_encoder_mt_init() and 864278307Srpaulo/// lzma_stream_encoder_mt_memusage() 865278307Srpaulostatic lzma_ret 866278307Srpauloget_options(const lzma_mt *options, lzma_options_easy *opt_easy, 867278307Srpaulo const lzma_filter **filters, uint64_t *block_size, 868278307Srpaulo uint64_t *outbuf_size_max) 869278307Srpaulo{ 870278307Srpaulo // Validate some of the options. 871278307Srpaulo if (options == NULL) 872278307Srpaulo return LZMA_PROG_ERROR; 873278307Srpaulo 874278307Srpaulo if (options->flags != 0 || options->threads == 0 875278307Srpaulo || options->threads > LZMA_THREADS_MAX) 876278307Srpaulo return LZMA_OPTIONS_ERROR; 877278307Srpaulo 878278307Srpaulo if (options->filters != NULL) { 879278307Srpaulo // Filter chain was given, use it as is. 880278307Srpaulo *filters = options->filters; 881278307Srpaulo } else { 882278307Srpaulo // Use a preset. 883278307Srpaulo if (lzma_easy_preset(opt_easy, options->preset)) 884278307Srpaulo return LZMA_OPTIONS_ERROR; 885278307Srpaulo 886278307Srpaulo *filters = opt_easy->filters; 887278307Srpaulo } 888278307Srpaulo 889278307Srpaulo // Block size 890278307Srpaulo if (options->block_size > 0) { 891278307Srpaulo if (options->block_size > BLOCK_SIZE_MAX) 892278307Srpaulo return LZMA_OPTIONS_ERROR; 893278307Srpaulo 894278307Srpaulo *block_size = options->block_size; 895278307Srpaulo } else { 896278307Srpaulo // Determine the Block size from the filter chain. 897278307Srpaulo *block_size = lzma_mt_block_size(*filters); 898278307Srpaulo if (*block_size == 0) 899278307Srpaulo return LZMA_OPTIONS_ERROR; 900278307Srpaulo 901278307Srpaulo assert(*block_size <= BLOCK_SIZE_MAX); 902278307Srpaulo } 903278307Srpaulo 904278307Srpaulo // Calculate the maximum amount output that a single output buffer 905278307Srpaulo // may need to hold. This is the same as the maximum total size of 906278307Srpaulo // a Block. 907278307Srpaulo *outbuf_size_max = lzma_block_buffer_bound64(*block_size); 908278307Srpaulo if (*outbuf_size_max == 0) 909278307Srpaulo return LZMA_MEM_ERROR; 910278307Srpaulo 911278307Srpaulo return LZMA_OK; 912278307Srpaulo} 913278307Srpaulo 914278307Srpaulo 915278307Srpaulostatic void 916312518Sdelphijget_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out) 917278307Srpaulo{ 918312518Sdelphij lzma_stream_coder *coder = coder_ptr; 919312518Sdelphij 920278307Srpaulo // Lock coder->mutex to prevent finishing threads from moving their 921312518Sdelphij // progress info from the worker_thread structure to lzma_stream_coder. 922278307Srpaulo mythread_sync(coder->mutex) { 923278307Srpaulo *progress_in = coder->progress_in; 924278307Srpaulo *progress_out = coder->progress_out; 925278307Srpaulo 926278307Srpaulo for (size_t i = 0; i < coder->threads_initialized; ++i) { 927278307Srpaulo mythread_sync(coder->threads[i].mutex) { 928278307Srpaulo *progress_in += coder->threads[i].progress_in; 929278307Srpaulo *progress_out += coder->threads[i] 930278307Srpaulo .progress_out; 931278307Srpaulo } 932278307Srpaulo } 933278307Srpaulo } 934278307Srpaulo 935278307Srpaulo return; 936278307Srpaulo} 937278307Srpaulo 938278307Srpaulo 939278307Srpaulostatic lzma_ret 940278307Srpaulostream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, 941278307Srpaulo const lzma_mt *options) 942278307Srpaulo{ 943278307Srpaulo lzma_next_coder_init(&stream_encoder_mt_init, next, allocator); 944278307Srpaulo 945278307Srpaulo // Get the filter chain. 946278307Srpaulo lzma_options_easy easy; 947278307Srpaulo const lzma_filter *filters; 948278307Srpaulo uint64_t block_size; 949278307Srpaulo uint64_t outbuf_size_max; 950278307Srpaulo return_if_error(get_options(options, &easy, &filters, 951278307Srpaulo &block_size, &outbuf_size_max)); 952278307Srpaulo 953278307Srpaulo#if SIZE_MAX < UINT64_MAX 954278307Srpaulo if (block_size > SIZE_MAX) 955278307Srpaulo return LZMA_MEM_ERROR; 956278307Srpaulo#endif 957278307Srpaulo 958278307Srpaulo // Validate the filter chain so that we can give an error in this 959278307Srpaulo // function instead of delaying it to the first call to lzma_code(). 960278307Srpaulo // The memory usage calculation verifies the filter chain as 961278307Srpaulo // a side effect so we take advatange of that. 962278307Srpaulo if (lzma_raw_encoder_memusage(filters) == UINT64_MAX) 963278307Srpaulo return LZMA_OPTIONS_ERROR; 964278307Srpaulo 965278307Srpaulo // Validate the Check ID. 966278307Srpaulo if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX) 967278307Srpaulo return LZMA_PROG_ERROR; 968278307Srpaulo 969278307Srpaulo if (!lzma_check_is_supported(options->check)) 970278307Srpaulo return LZMA_UNSUPPORTED_CHECK; 971278307Srpaulo 972278307Srpaulo // Allocate and initialize the base structure if needed. 973312518Sdelphij lzma_stream_coder *coder = next->coder; 974312518Sdelphij if (coder == NULL) { 975312518Sdelphij coder = lzma_alloc(sizeof(lzma_stream_coder), allocator); 976312518Sdelphij if (coder == NULL) 977278307Srpaulo return LZMA_MEM_ERROR; 978278307Srpaulo 979312518Sdelphij next->coder = coder; 980312518Sdelphij 981278307Srpaulo // For the mutex and condition variable initializations 982278307Srpaulo // the error handling has to be done here because 983278307Srpaulo // stream_encoder_mt_end() doesn't know if they have 984278307Srpaulo // already been initialized or not. 985312518Sdelphij if (mythread_mutex_init(&coder->mutex)) { 986312518Sdelphij lzma_free(coder, allocator); 987278307Srpaulo next->coder = NULL; 988278307Srpaulo return LZMA_MEM_ERROR; 989278307Srpaulo } 990278307Srpaulo 991312518Sdelphij if (mythread_cond_init(&coder->cond)) { 992312518Sdelphij mythread_mutex_destroy(&coder->mutex); 993312518Sdelphij lzma_free(coder, allocator); 994278307Srpaulo next->coder = NULL; 995278307Srpaulo return LZMA_MEM_ERROR; 996278307Srpaulo } 997278307Srpaulo 998278307Srpaulo next->code = &stream_encode_mt; 999278307Srpaulo next->end = &stream_encoder_mt_end; 1000278307Srpaulo next->get_progress = &get_progress; 1001278307Srpaulo// next->update = &stream_encoder_mt_update; 1002278307Srpaulo 1003312518Sdelphij coder->filters[0].id = LZMA_VLI_UNKNOWN; 1004312518Sdelphij coder->index_encoder = LZMA_NEXT_CODER_INIT; 1005312518Sdelphij coder->index = NULL; 1006312518Sdelphij memzero(&coder->outq, sizeof(coder->outq)); 1007312518Sdelphij coder->threads = NULL; 1008312518Sdelphij coder->threads_max = 0; 1009312518Sdelphij coder->threads_initialized = 0; 1010278307Srpaulo } 1011278307Srpaulo 1012278307Srpaulo // Basic initializations 1013312518Sdelphij coder->sequence = SEQ_STREAM_HEADER; 1014312518Sdelphij coder->block_size = (size_t)(block_size); 1015312518Sdelphij coder->thread_error = LZMA_OK; 1016312518Sdelphij coder->thr = NULL; 1017278307Srpaulo 1018278307Srpaulo // Allocate the thread-specific base structures. 1019278307Srpaulo assert(options->threads > 0); 1020312518Sdelphij if (coder->threads_max != options->threads) { 1021312518Sdelphij threads_end(coder, allocator); 1022278307Srpaulo 1023312518Sdelphij coder->threads = NULL; 1024312518Sdelphij coder->threads_max = 0; 1025278307Srpaulo 1026312518Sdelphij coder->threads_initialized = 0; 1027312518Sdelphij coder->threads_free = NULL; 1028278307Srpaulo 1029312518Sdelphij coder->threads = lzma_alloc( 1030278307Srpaulo options->threads * sizeof(worker_thread), 1031278307Srpaulo allocator); 1032312518Sdelphij if (coder->threads == NULL) 1033278307Srpaulo return LZMA_MEM_ERROR; 1034278307Srpaulo 1035312518Sdelphij coder->threads_max = options->threads; 1036278307Srpaulo } else { 1037278307Srpaulo // Reuse the old structures and threads. Tell the running 1038278307Srpaulo // threads to stop and wait until they have stopped. 1039312518Sdelphij threads_stop(coder, true); 1040278307Srpaulo } 1041278307Srpaulo 1042278307Srpaulo // Output queue 1043312518Sdelphij return_if_error(lzma_outq_init(&coder->outq, allocator, 1044278307Srpaulo outbuf_size_max, options->threads)); 1045278307Srpaulo 1046278307Srpaulo // Timeout 1047312518Sdelphij coder->timeout = options->timeout; 1048278307Srpaulo 1049278307Srpaulo // Free the old filter chain and copy the new one. 1050312518Sdelphij for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i) 1051312518Sdelphij lzma_free(coder->filters[i].options, allocator); 1052278307Srpaulo 1053278307Srpaulo return_if_error(lzma_filters_copy( 1054312518Sdelphij filters, coder->filters, allocator)); 1055278307Srpaulo 1056278307Srpaulo // Index 1057312518Sdelphij lzma_index_end(coder->index, allocator); 1058312518Sdelphij coder->index = lzma_index_init(allocator); 1059312518Sdelphij if (coder->index == NULL) 1060278307Srpaulo return LZMA_MEM_ERROR; 1061278307Srpaulo 1062278307Srpaulo // Stream Header 1063312518Sdelphij coder->stream_flags.version = 0; 1064312518Sdelphij coder->stream_flags.check = options->check; 1065278307Srpaulo return_if_error(lzma_stream_header_encode( 1066312518Sdelphij &coder->stream_flags, coder->header)); 1067278307Srpaulo 1068312518Sdelphij coder->header_pos = 0; 1069278307Srpaulo 1070278307Srpaulo // Progress info 1071312518Sdelphij coder->progress_in = 0; 1072312518Sdelphij coder->progress_out = LZMA_STREAM_HEADER_SIZE; 1073278307Srpaulo 1074278307Srpaulo return LZMA_OK; 1075278307Srpaulo} 1076278307Srpaulo 1077278307Srpaulo 1078278307Srpauloextern LZMA_API(lzma_ret) 1079278307Srpaulolzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options) 1080278307Srpaulo{ 1081278307Srpaulo lzma_next_strm_init(stream_encoder_mt_init, strm, options); 1082278307Srpaulo 1083278307Srpaulo strm->internal->supported_actions[LZMA_RUN] = true; 1084278307Srpaulo// strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true; 1085278307Srpaulo strm->internal->supported_actions[LZMA_FULL_FLUSH] = true; 1086278307Srpaulo strm->internal->supported_actions[LZMA_FULL_BARRIER] = true; 1087278307Srpaulo strm->internal->supported_actions[LZMA_FINISH] = true; 1088278307Srpaulo 1089278307Srpaulo return LZMA_OK; 1090278307Srpaulo} 1091278307Srpaulo 1092278307Srpaulo 1093278307Srpaulo// This function name is a monster but it's consistent with the older 1094278307Srpaulo// monster names. :-( 31 chars is the max that C99 requires so in that 1095278307Srpaulo// sense it's not too long. ;-) 1096278307Srpauloextern LZMA_API(uint64_t) 1097278307Srpaulolzma_stream_encoder_mt_memusage(const lzma_mt *options) 1098278307Srpaulo{ 1099278307Srpaulo lzma_options_easy easy; 1100278307Srpaulo const lzma_filter *filters; 1101278307Srpaulo uint64_t block_size; 1102278307Srpaulo uint64_t outbuf_size_max; 1103278307Srpaulo 1104278307Srpaulo if (get_options(options, &easy, &filters, &block_size, 1105278307Srpaulo &outbuf_size_max) != LZMA_OK) 1106278307Srpaulo return UINT64_MAX; 1107278307Srpaulo 1108278307Srpaulo // Memory usage of the input buffers 1109278307Srpaulo const uint64_t inbuf_memusage = options->threads * block_size; 1110278307Srpaulo 1111278307Srpaulo // Memory usage of the filter encoders 1112278307Srpaulo uint64_t filters_memusage = lzma_raw_encoder_memusage(filters); 1113278307Srpaulo if (filters_memusage == UINT64_MAX) 1114278307Srpaulo return UINT64_MAX; 1115278307Srpaulo 1116278307Srpaulo filters_memusage *= options->threads; 1117278307Srpaulo 1118278307Srpaulo // Memory usage of the output queue 1119278307Srpaulo const uint64_t outq_memusage = lzma_outq_memusage( 1120278307Srpaulo outbuf_size_max, options->threads); 1121278307Srpaulo if (outq_memusage == UINT64_MAX) 1122278307Srpaulo return UINT64_MAX; 1123278307Srpaulo 1124278307Srpaulo // Sum them with overflow checking. 1125312518Sdelphij uint64_t total_memusage = LZMA_MEMUSAGE_BASE 1126312518Sdelphij + sizeof(lzma_stream_coder) 1127278307Srpaulo + options->threads * sizeof(worker_thread); 1128278307Srpaulo 1129278307Srpaulo if (UINT64_MAX - total_memusage < inbuf_memusage) 1130278307Srpaulo return UINT64_MAX; 1131278307Srpaulo 1132278307Srpaulo total_memusage += inbuf_memusage; 1133278307Srpaulo 1134278307Srpaulo if (UINT64_MAX - total_memusage < filters_memusage) 1135278307Srpaulo return UINT64_MAX; 1136278307Srpaulo 1137278307Srpaulo total_memusage += filters_memusage; 1138278307Srpaulo 1139278307Srpaulo if (UINT64_MAX - total_memusage < outq_memusage) 1140278307Srpaulo return UINT64_MAX; 1141278307Srpaulo 1142278307Srpaulo return total_memusage + outq_memusage; 1143278307Srpaulo} 1144