Deleted Added
full compact
stream_encoder_mt.c (292588) stream_encoder_mt.c (312518)
1///////////////////////////////////////////////////////////////////////////////
2//
3/// \file stream_encoder_mt.c
4/// \brief Multithreaded .xz Stream encoder
5//
6// Author: Lasse Collin
7//
8// This file has been put into the public domain.

--- 30 unchanged lines hidden (view full) ---

39 THR_STOP,
40
41 /// The main thread wants the thread to exit. We could use
42 /// cancellation but since there's stopped anyway, this is lazier.
43 THR_EXIT,
44
45} worker_state;
46
1///////////////////////////////////////////////////////////////////////////////
2//
3/// \file stream_encoder_mt.c
4/// \brief Multithreaded .xz Stream encoder
5//
6// Author: Lasse Collin
7//
8// This file has been put into the public domain.

--- 30 unchanged lines hidden (view full) ---

39 THR_STOP,
40
41 /// The main thread wants the thread to exit. We could use
42 /// cancellation but since there's stopped anyway, this is lazier.
43 THR_EXIT,
44
45} worker_state;
46
47typedef struct lzma_stream_coder_s lzma_stream_coder;
47
48typedef struct worker_thread_s worker_thread;
49struct worker_thread_s {
50 worker_state state;
51
52 /// Input buffer of coder->block_size bytes. The main thread will
53 /// put new input into this and update in_size accordingly. Once
54 /// no more input is coming, state will be set to THR_FINISH.

--- 5 unchanged lines hidden (view full) ---

60
61 /// Output buffer for this thread. This is set by the main
62 /// thread every time a new Block is started with this thread
63 /// structure.
64 lzma_outbuf *outbuf;
65
66 /// Pointer to the main structure is needed when putting this
67 /// thread back to the stack of free threads.
48
49typedef struct worker_thread_s worker_thread;
50struct worker_thread_s {
51 worker_state state;
52
53 /// Input buffer of coder->block_size bytes. The main thread will
54 /// put new input into this and update in_size accordingly. Once
55 /// no more input is coming, state will be set to THR_FINISH.

--- 5 unchanged lines hidden (view full) ---

61
62 /// Output buffer for this thread. This is set by the main
63 /// thread every time a new Block is started with this thread
64 /// structure.
65 lzma_outbuf *outbuf;
66
67 /// Pointer to the main structure is needed when putting this
68 /// thread back to the stack of free threads.
68 lzma_coder *coder;
69 lzma_stream_coder *coder;
69
70 /// The allocator is set by the main thread. Since a copy of the
71 /// pointer is kept here, the application must not change the
72 /// allocator before calling lzma_end().
73 const lzma_allocator *allocator;
74
75 /// Amount of uncompressed data that has already been compressed.
76 uint64_t progress_in;

--- 14 unchanged lines hidden (view full) ---

91 mythread_cond cond;
92
93 /// The ID of this thread is used to join the thread
94 /// when it's not needed anymore.
95 mythread thread_id;
96};
97
98
70
71 /// The allocator is set by the main thread. Since a copy of the
72 /// pointer is kept here, the application must not change the
73 /// allocator before calling lzma_end().
74 const lzma_allocator *allocator;
75
76 /// Amount of uncompressed data that has already been compressed.
77 uint64_t progress_in;

--- 14 unchanged lines hidden (view full) ---

92 mythread_cond cond;
93
94 /// The ID of this thread is used to join the thread
95 /// when it's not needed anymore.
96 mythread thread_id;
97};
98
99
99struct lzma_coder_s {
100struct lzma_stream_coder_s {
100 enum {
101 SEQ_STREAM_HEADER,
102 SEQ_BLOCK,
103 SEQ_INDEX,
104 SEQ_STREAM_FOOTER,
105 } sequence;
106
107 /// Start a new Block every block_size bytes of input unless

--- 304 unchanged lines hidden (view full) ---

412 lzma_next_end(&thr->block_encoder, thr->allocator);
413 lzma_free(thr->in, thr->allocator);
414 return MYTHREAD_RET_VALUE;
415}
416
417
418/// Make the threads stop but not exit. Optionally wait for them to stop.
419static void
101 enum {
102 SEQ_STREAM_HEADER,
103 SEQ_BLOCK,
104 SEQ_INDEX,
105 SEQ_STREAM_FOOTER,
106 } sequence;
107
108 /// Start a new Block every block_size bytes of input unless

--- 304 unchanged lines hidden (view full) ---

413 lzma_next_end(&thr->block_encoder, thr->allocator);
414 lzma_free(thr->in, thr->allocator);
415 return MYTHREAD_RET_VALUE;
416}
417
418
419/// Make the threads stop but not exit. Optionally wait for them to stop.
420static void
420threads_stop(lzma_coder *coder, bool wait_for_threads)
421threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
421{
422 // Tell the threads to stop.
423 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
424 mythread_sync(coder->threads[i].mutex) {
425 coder->threads[i].state = THR_STOP;
426 mythread_cond_signal(&coder->threads[i].cond);
427 }
428 }

--- 12 unchanged lines hidden (view full) ---

441
442 return;
443}
444
445
446/// Stop the threads and free the resources associated with them.
447/// Wait until the threads have exited.
448static void
422{
423 // Tell the threads to stop.
424 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
425 mythread_sync(coder->threads[i].mutex) {
426 coder->threads[i].state = THR_STOP;
427 mythread_cond_signal(&coder->threads[i].cond);
428 }
429 }

--- 12 unchanged lines hidden (view full) ---

442
443 return;
444}
445
446
447/// Stop the threads and free the resources associated with them.
448/// Wait until the threads have exited.
449static void
449threads_end(lzma_coder *coder, const lzma_allocator *allocator)
450threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
450{
451 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
452 mythread_sync(coder->threads[i].mutex) {
453 coder->threads[i].state = THR_EXIT;
454 mythread_cond_signal(&coder->threads[i].cond);
455 }
456 }
457

--- 5 unchanged lines hidden (view full) ---

463
464 lzma_free(coder->threads, allocator);
465 return;
466}
467
468
469/// Initialize a new worker_thread structure and create a new thread.
470static lzma_ret
451{
452 for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453 mythread_sync(coder->threads[i].mutex) {
454 coder->threads[i].state = THR_EXIT;
455 mythread_cond_signal(&coder->threads[i].cond);
456 }
457 }
458

--- 5 unchanged lines hidden (view full) ---

464
465 lzma_free(coder->threads, allocator);
466 return;
467}
468
469
470/// Initialize a new worker_thread structure and create a new thread.
471static lzma_ret
471initialize_new_thread(lzma_coder *coder, const lzma_allocator *allocator)
472initialize_new_thread(lzma_stream_coder *coder,
473 const lzma_allocator *allocator)
472{
473 worker_thread *thr = &coder->threads[coder->threads_initialized];
474
475 thr->in = lzma_alloc(coder->block_size, allocator);
476 if (thr->in == NULL)
477 return LZMA_MEM_ERROR;
478
479 if (mythread_mutex_init(&thr->mutex))

--- 25 unchanged lines hidden (view full) ---

505
506error_mutex:
507 lzma_free(thr->in, allocator);
508 return LZMA_MEM_ERROR;
509}
510
511
512static lzma_ret
474{
475 worker_thread *thr = &coder->threads[coder->threads_initialized];
476
477 thr->in = lzma_alloc(coder->block_size, allocator);
478 if (thr->in == NULL)
479 return LZMA_MEM_ERROR;
480
481 if (mythread_mutex_init(&thr->mutex))

--- 25 unchanged lines hidden (view full) ---

507
508error_mutex:
509 lzma_free(thr->in, allocator);
510 return LZMA_MEM_ERROR;
511}
512
513
514static lzma_ret
513get_thread(lzma_coder *coder, const lzma_allocator *allocator)
515get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
514{
515 // If there are no free output subqueues, there is no
516 // point to try getting a thread.
517 if (!lzma_outq_has_buf(&coder->outq))
518 return LZMA_OK;
519
520 // If there is a free structure on the stack, use it.
521 mythread_sync(coder->mutex) {

--- 21 unchanged lines hidden (view full) ---

543 mythread_cond_signal(&coder->thr->cond);
544 }
545
546 return LZMA_OK;
547}
548
549
550static lzma_ret
516{
517 // If there are no free output subqueues, there is no
518 // point to try getting a thread.
519 if (!lzma_outq_has_buf(&coder->outq))
520 return LZMA_OK;
521
522 // If there is a free structure on the stack, use it.
523 mythread_sync(coder->mutex) {

--- 21 unchanged lines hidden (view full) ---

545 mythread_cond_signal(&coder->thr->cond);
546 }
547
548 return LZMA_OK;
549}
550
551
552static lzma_ret
551stream_encode_in(lzma_coder *coder, const lzma_allocator *allocator,
553stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
552 const uint8_t *restrict in, size_t *restrict in_pos,
553 size_t in_size, lzma_action action)
554{
555 while (*in_pos < in_size
556 || (coder->thr != NULL && action != LZMA_RUN)) {
557 if (coder->thr == NULL) {
558 // Get a new thread.
559 const lzma_ret ret = get_thread(coder, allocator);

--- 51 unchanged lines hidden (view full) ---

611
612 return LZMA_OK;
613}
614
615
616/// Wait until more input can be consumed, more output can be read, or
617/// an optional timeout is reached.
618static bool
554 const uint8_t *restrict in, size_t *restrict in_pos,
555 size_t in_size, lzma_action action)
556{
557 while (*in_pos < in_size
558 || (coder->thr != NULL && action != LZMA_RUN)) {
559 if (coder->thr == NULL) {
560 // Get a new thread.
561 const lzma_ret ret = get_thread(coder, allocator);

--- 51 unchanged lines hidden (view full) ---

613
614 return LZMA_OK;
615}
616
617
618/// Wait until more input can be consumed, more output can be read, or
619/// an optional timeout is reached.
620static bool
619wait_for_work(lzma_coder *coder, mythread_condtime *wait_abs,
621wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
620 bool *has_blocked, bool has_input)
621{
622 if (coder->timeout != 0 && !*has_blocked) {
623 // Every time when stream_encode_mt() is called via
624 // lzma_code(), *has_blocked starts as false. We set it
625 // to true here and calculate the absolute time when
626 // we must return if there's nothing to do.
627 //

--- 29 unchanged lines hidden (view full) ---

657 }
658 }
659
660 return timed_out;
661}
662
663
664static lzma_ret
622 bool *has_blocked, bool has_input)
623{
624 if (coder->timeout != 0 && !*has_blocked) {
625 // Every time when stream_encode_mt() is called via
626 // lzma_code(), *has_blocked starts as false. We set it
627 // to true here and calculate the absolute time when
628 // we must return if there's nothing to do.
629 //

--- 29 unchanged lines hidden (view full) ---

659 }
660 }
661
662 return timed_out;
663}
664
665
666static lzma_ret
665stream_encode_mt(lzma_coder *coder, const lzma_allocator *allocator,
667stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
666 const uint8_t *restrict in, size_t *restrict in_pos,
667 size_t in_size, uint8_t *restrict out,
668 size_t *restrict out_pos, size_t out_size, lzma_action action)
669{
668 const uint8_t *restrict in, size_t *restrict in_pos,
669 size_t in_size, uint8_t *restrict out,
670 size_t *restrict out_pos, size_t out_size, lzma_action action)
671{
672 lzma_stream_coder *coder = coder_ptr;
673
670 switch (coder->sequence) {
671 case SEQ_STREAM_HEADER:
672 lzma_bufcpy(coder->header, &coder->header_pos,
673 sizeof(coder->header),
674 out, out_pos, out_size);
675 if (coder->header_pos < sizeof(coder->header))
676 return LZMA_OK;
677

--- 151 unchanged lines hidden (view full) ---

829 }
830
831 assert(0);
832 return LZMA_PROG_ERROR;
833}
834
835
836static void
674 switch (coder->sequence) {
675 case SEQ_STREAM_HEADER:
676 lzma_bufcpy(coder->header, &coder->header_pos,
677 sizeof(coder->header),
678 out, out_pos, out_size);
679 if (coder->header_pos < sizeof(coder->header))
680 return LZMA_OK;
681

--- 151 unchanged lines hidden (view full) ---

833 }
834
835 assert(0);
836 return LZMA_PROG_ERROR;
837}
838
839
840static void
837stream_encoder_mt_end(lzma_coder *coder, const lzma_allocator *allocator)
841stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
838{
842{
843 lzma_stream_coder *coder = coder_ptr;
844
839 // Threads must be killed before the output queue can be freed.
840 threads_end(coder, allocator);
841 lzma_outq_end(&coder->outq, allocator);
842
843 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
844 lzma_free(coder->filters[i].options, allocator);
845
846 lzma_next_end(&coder->index_encoder, allocator);

--- 55 unchanged lines hidden (view full) ---

902 if (*outbuf_size_max == 0)
903 return LZMA_MEM_ERROR;
904
905 return LZMA_OK;
906}
907
908
909static void
845 // Threads must be killed before the output queue can be freed.
846 threads_end(coder, allocator);
847 lzma_outq_end(&coder->outq, allocator);
848
849 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
850 lzma_free(coder->filters[i].options, allocator);
851
852 lzma_next_end(&coder->index_encoder, allocator);

--- 55 unchanged lines hidden (view full) ---

908 if (*outbuf_size_max == 0)
909 return LZMA_MEM_ERROR;
910
911 return LZMA_OK;
912}
913
914
915static void
910get_progress(lzma_coder *coder, uint64_t *progress_in, uint64_t *progress_out)
916get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
911{
917{
918 lzma_stream_coder *coder = coder_ptr;
919
912 // Lock coder->mutex to prevent finishing threads from moving their
920 // Lock coder->mutex to prevent finishing threads from moving their
913 // progress info from the worker_thread structure to lzma_coder.
921 // progress info from the worker_thread structure to lzma_stream_coder.
914 mythread_sync(coder->mutex) {
915 *progress_in = coder->progress_in;
916 *progress_out = coder->progress_out;
917
918 for (size_t i = 0; i < coder->threads_initialized; ++i) {
919 mythread_sync(coder->threads[i].mutex) {
920 *progress_in += coder->threads[i].progress_in;
921 *progress_out += coder->threads[i]

--- 35 unchanged lines hidden (view full) ---

957 // Validate the Check ID.
958 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
959 return LZMA_PROG_ERROR;
960
961 if (!lzma_check_is_supported(options->check))
962 return LZMA_UNSUPPORTED_CHECK;
963
964 // Allocate and initialize the base structure if needed.
922 mythread_sync(coder->mutex) {
923 *progress_in = coder->progress_in;
924 *progress_out = coder->progress_out;
925
926 for (size_t i = 0; i < coder->threads_initialized; ++i) {
927 mythread_sync(coder->threads[i].mutex) {
928 *progress_in += coder->threads[i].progress_in;
929 *progress_out += coder->threads[i]

--- 35 unchanged lines hidden (view full) ---

965 // Validate the Check ID.
966 if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
967 return LZMA_PROG_ERROR;
968
969 if (!lzma_check_is_supported(options->check))
970 return LZMA_UNSUPPORTED_CHECK;
971
972 // Allocate and initialize the base structure if needed.
965 if (next->coder == NULL) {
966 next->coder = lzma_alloc(sizeof(lzma_coder), allocator);
967 if (next->coder == NULL)
973 lzma_stream_coder *coder = next->coder;
974 if (coder == NULL) {
975 coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
976 if (coder == NULL)
968 return LZMA_MEM_ERROR;
969
977 return LZMA_MEM_ERROR;
978
979 next->coder = coder;
980
970 // For the mutex and condition variable initializations
971 // the error handling has to be done here because
972 // stream_encoder_mt_end() doesn't know if they have
973 // already been initialized or not.
981 // For the mutex and condition variable initializations
982 // the error handling has to be done here because
983 // stream_encoder_mt_end() doesn't know if they have
984 // already been initialized or not.
974 if (mythread_mutex_init(&next->coder->mutex)) {
975 lzma_free(next->coder, allocator);
985 if (mythread_mutex_init(&coder->mutex)) {
986 lzma_free(coder, allocator);
976 next->coder = NULL;
977 return LZMA_MEM_ERROR;
978 }
979
987 next->coder = NULL;
988 return LZMA_MEM_ERROR;
989 }
990
980 if (mythread_cond_init(&next->coder->cond)) {
981 mythread_mutex_destroy(&next->coder->mutex);
982 lzma_free(next->coder, allocator);
991 if (mythread_cond_init(&coder->cond)) {
992 mythread_mutex_destroy(&coder->mutex);
993 lzma_free(coder, allocator);
983 next->coder = NULL;
984 return LZMA_MEM_ERROR;
985 }
986
987 next->code = &stream_encode_mt;
988 next->end = &stream_encoder_mt_end;
989 next->get_progress = &get_progress;
990// next->update = &stream_encoder_mt_update;
991
994 next->coder = NULL;
995 return LZMA_MEM_ERROR;
996 }
997
998 next->code = &stream_encode_mt;
999 next->end = &stream_encoder_mt_end;
1000 next->get_progress = &get_progress;
1001// next->update = &stream_encoder_mt_update;
1002
992 next->coder->filters[0].id = LZMA_VLI_UNKNOWN;
993 next->coder->index_encoder = LZMA_NEXT_CODER_INIT;
994 next->coder->index = NULL;
995 memzero(&next->coder->outq, sizeof(next->coder->outq));
996 next->coder->threads = NULL;
997 next->coder->threads_max = 0;
998 next->coder->threads_initialized = 0;
1003 coder->filters[0].id = LZMA_VLI_UNKNOWN;
1004 coder->index_encoder = LZMA_NEXT_CODER_INIT;
1005 coder->index = NULL;
1006 memzero(&coder->outq, sizeof(coder->outq));
1007 coder->threads = NULL;
1008 coder->threads_max = 0;
1009 coder->threads_initialized = 0;
999 }
1000
1001 // Basic initializations
1010 }
1011
1012 // Basic initializations
1002 next->coder->sequence = SEQ_STREAM_HEADER;
1003 next->coder->block_size = (size_t)(block_size);
1004 next->coder->thread_error = LZMA_OK;
1005 next->coder->thr = NULL;
1013 coder->sequence = SEQ_STREAM_HEADER;
1014 coder->block_size = (size_t)(block_size);
1015 coder->thread_error = LZMA_OK;
1016 coder->thr = NULL;
1006
1007 // Allocate the thread-specific base structures.
1008 assert(options->threads > 0);
1017
1018 // Allocate the thread-specific base structures.
1019 assert(options->threads > 0);
1009 if (next->coder->threads_max != options->threads) {
1010 threads_end(next->coder, allocator);
1020 if (coder->threads_max != options->threads) {
1021 threads_end(coder, allocator);
1011
1022
1012 next->coder->threads = NULL;
1013 next->coder->threads_max = 0;
1023 coder->threads = NULL;
1024 coder->threads_max = 0;
1014
1025
1015 next->coder->threads_initialized = 0;
1016 next->coder->threads_free = NULL;
1026 coder->threads_initialized = 0;
1027 coder->threads_free = NULL;
1017
1028
1018 next->coder->threads = lzma_alloc(
1029 coder->threads = lzma_alloc(
1019 options->threads * sizeof(worker_thread),
1020 allocator);
1030 options->threads * sizeof(worker_thread),
1031 allocator);
1021 if (next->coder->threads == NULL)
1032 if (coder->threads == NULL)
1022 return LZMA_MEM_ERROR;
1023
1033 return LZMA_MEM_ERROR;
1034
1024 next->coder->threads_max = options->threads;
1035 coder->threads_max = options->threads;
1025 } else {
1026 // Reuse the old structures and threads. Tell the running
1027 // threads to stop and wait until they have stopped.
1036 } else {
1037 // Reuse the old structures and threads. Tell the running
1038 // threads to stop and wait until they have stopped.
1028 threads_stop(next->coder, true);
1039 threads_stop(coder, true);
1029 }
1030
1031 // Output queue
1040 }
1041
1042 // Output queue
1032 return_if_error(lzma_outq_init(&next->coder->outq, allocator,
1043 return_if_error(lzma_outq_init(&coder->outq, allocator,
1033 outbuf_size_max, options->threads));
1034
1035 // Timeout
1044 outbuf_size_max, options->threads));
1045
1046 // Timeout
1036 next->coder->timeout = options->timeout;
1047 coder->timeout = options->timeout;
1037
1038 // Free the old filter chain and copy the new one.
1048
1049 // Free the old filter chain and copy the new one.
1039 for (size_t i = 0; next->coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1040 lzma_free(next->coder->filters[i].options, allocator);
1050 for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051 lzma_free(coder->filters[i].options, allocator);
1041
1042 return_if_error(lzma_filters_copy(
1052
1053 return_if_error(lzma_filters_copy(
1043 filters, next->coder->filters, allocator));
1054 filters, coder->filters, allocator));
1044
1045 // Index
1055
1056 // Index
1046 lzma_index_end(next->coder->index, allocator);
1047 next->coder->index = lzma_index_init(allocator);
1048 if (next->coder->index == NULL)
1057 lzma_index_end(coder->index, allocator);
1058 coder->index = lzma_index_init(allocator);
1059 if (coder->index == NULL)
1049 return LZMA_MEM_ERROR;
1050
1051 // Stream Header
1060 return LZMA_MEM_ERROR;
1061
1062 // Stream Header
1052 next->coder->stream_flags.version = 0;
1053 next->coder->stream_flags.check = options->check;
1063 coder->stream_flags.version = 0;
1064 coder->stream_flags.check = options->check;
1054 return_if_error(lzma_stream_header_encode(
1065 return_if_error(lzma_stream_header_encode(
1055 &next->coder->stream_flags, next->coder->header));
1066 &coder->stream_flags, coder->header));
1056
1067
1057 next->coder->header_pos = 0;
1068 coder->header_pos = 0;
1058
1059 // Progress info
1069
1070 // Progress info
1060 next->coder->progress_in = 0;
1061 next->coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1071 coder->progress_in = 0;
1072 coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1062
1063 return LZMA_OK;
1064}
1065
1066
1067extern LZMA_API(lzma_ret)
1068lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1069{

--- 36 unchanged lines hidden (view full) ---

1106
1107 // Memory usage of the output queue
1108 const uint64_t outq_memusage = lzma_outq_memusage(
1109 outbuf_size_max, options->threads);
1110 if (outq_memusage == UINT64_MAX)
1111 return UINT64_MAX;
1112
1113 // Sum them with overflow checking.
1073
1074 return LZMA_OK;
1075}
1076
1077
1078extern LZMA_API(lzma_ret)
1079lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1080{

--- 36 unchanged lines hidden (view full) ---

1117
1118 // Memory usage of the output queue
1119 const uint64_t outq_memusage = lzma_outq_memusage(
1120 outbuf_size_max, options->threads);
1121 if (outq_memusage == UINT64_MAX)
1122 return UINT64_MAX;
1123
1124 // Sum them with overflow checking.
1114 uint64_t total_memusage = LZMA_MEMUSAGE_BASE + sizeof(lzma_coder)
1125 uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126 + sizeof(lzma_stream_coder)
1115 + options->threads * sizeof(worker_thread);
1116
1117 if (UINT64_MAX - total_memusage < inbuf_memusage)
1118 return UINT64_MAX;
1119
1120 total_memusage += inbuf_memusage;
1121
1122 if (UINT64_MAX - total_memusage < filters_memusage)
1123 return UINT64_MAX;
1124
1125 total_memusage += filters_memusage;
1126
1127 if (UINT64_MAX - total_memusage < outq_memusage)
1128 return UINT64_MAX;
1129
1130 return total_memusage + outq_memusage;
1131}
1127 + options->threads * sizeof(worker_thread);
1128
1129 if (UINT64_MAX - total_memusage < inbuf_memusage)
1130 return UINT64_MAX;
1131
1132 total_memusage += inbuf_memusage;
1133
1134 if (UINT64_MAX - total_memusage < filters_memusage)
1135 return UINT64_MAX;
1136
1137 total_memusage += filters_memusage;
1138
1139 if (UINT64_MAX - total_memusage < outq_memusage)
1140 return UINT64_MAX;
1141
1142 return total_memusage + outq_memusage;
1143}