1/* Copyright (C) 2021-2024 Free Software Foundation, Inc. 2 Contributed by Oracle. 3 4 This file is part of GNU Binutils. 5 6 This program is free software; you can redistribute it and/or modify 7 it under the terms of the GNU General Public License as published by 8 the Free Software Foundation; either version 3, or (at your option) 9 any later version. 10 11 This program is distributed in the hope that it will be useful, 12 but WITHOUT ANY WARRANTY; without even the implied warranty of 13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 GNU General Public License for more details. 15 16 You should have received a copy of the GNU General Public License 17 along with this program; if not, write to the Free Software 18 Foundation, 51 Franklin Street - Fifth Floor, Boston, 19 MA 02110-1301, USA. */ 20 21#include "config.h" 22#include <stdio.h> 23#include <stdlib.h> 24#include <string.h> 25#include <sys/types.h> 26#include <unistd.h> 27 28#include "DbeThread.h" 29#include "util.h" 30#include "vec.h" 31 32static void 33cleanup_free_mutex (void* arg) { 34 // pthread_mutex_t *p_mutex = (pthread_mutex_t *) arg; 35 // if (p_mutex) 36 // pthread_mutex_unlock (p_mutex); 37} 38 39static void* 40thread_pool_loop (void* arg) 41{ 42 DbeThreadPool *thrp = (DbeThreadPool*) arg; 43 Dprintf (DEBUG_THREADS, "thread_pool_loop:%d starting thread=%llu\n", 44 __LINE__, (unsigned long long) pthread_self ()); 45 46 /* set my cancel state to 'enabled', and cancel type to 'defered'. */ 47 pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL); 48 pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL); 49 50 /* set thread cleanup handler */ 51 pthread_cleanup_push (cleanup_free_mutex, (void*) & (thrp->p_mutex)); 52 for (;;) 53 { 54 DbeQueue *q = thrp->get_queue (); 55 if (q) 56 { /* a request is pending */ 57 Dprintf (DEBUG_THREADS, 58 "thread_pool_loop:%d thread=%llu queue=%d start\n", 59 __LINE__, (unsigned long long) pthread_self (), q->id); 60 q->func (q->arg); 61 Dprintf (DEBUG_THREADS, 62 "thread_pool_loop:%d thread=%llu queue=%d done\n", 63 __LINE__, (unsigned long long) pthread_self (), q->id); 64 delete q; 65 continue; 66 } 67 if (thrp->no_new_queues) 68 { 69 Dprintf (DEBUG_THREADS, "thread_pool_loop:%d exit thread=%llu\n", 70 __LINE__, (unsigned long long) pthread_self ()); 71 pthread_exit (NULL); 72 } 73 Dprintf (DEBUG_THREADS, 74 "thread_pool_loop:%d before pthread_cond_wait thread=%llu\n", 75 __LINE__, (unsigned long long) pthread_self ()); 76 pthread_mutex_lock (&thrp->p_mutex); 77 pthread_cond_wait (&thrp->p_cond_var, &thrp->p_mutex); 78 Dprintf (DEBUG_THREADS, 79 "thread_pool_loop:%d after pthread_cond_wait thread=%llu\n", 80 __LINE__, (unsigned long long) pthread_self ()); 81 pthread_mutex_unlock (&thrp->p_mutex); 82 } 83 84 // never reached, but we must use it here. See `man pthread_cleanup_push` 85 pthread_cleanup_pop (0); 86} 87 88DbeThreadPool::DbeThreadPool (int _max_threads) 89{ 90 static const int DBE_NTHREADS_DEFAULT = 4; 91 char *s = getenv ("GPROFNG_DBE_NTHREADS"); 92 if (s) 93 { 94 max_threads = atoi (s); 95 if (max_threads < 0) 96 max_threads = 0; 97 if (_max_threads > 0 && max_threads < _max_threads) 98 max_threads = _max_threads; 99 } 100 else 101 { 102 max_threads = _max_threads; 103 if (max_threads < 0) 104 max_threads = DBE_NTHREADS_DEFAULT; 105 } 106 Dprintf (DEBUG_THREADS, "DbeThreadPool:%d max_threads %d ---> %d\n", 107 __LINE__, _max_threads, max_threads); 108 pthread_mutex_init (&p_mutex, NULL); 109 pthread_cond_init (&p_cond_var, NULL); 110 threads = new Vector <pthread_t>(max_threads); 111 queue = NULL; 112 last_queue = NULL; 113 no_new_queues = false; 114 queues_cnt = 0; 115 total_queues = 0; 116} 117 118DbeThreadPool::~DbeThreadPool () 119{ 120 delete threads; 121} 122 123DbeQueue * 124DbeThreadPool::get_queue () 125{ 126 pthread_mutex_lock (&p_mutex); 127 DbeQueue *q = queue; 128 Dprintf (DEBUG_THREADS, 129 "get_queue:%d thr: %lld id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n", 130 __LINE__, (unsigned long long) pthread_self (), 131 q ? q->id : -1, queues_cnt, (int) threads->size (), max_threads); 132 if (q) 133 { 134 queue = q->next; 135 queues_cnt--; 136 } 137 pthread_mutex_unlock (&p_mutex); 138 return q; 139} 140 141void 142DbeThreadPool::put_queue (DbeQueue *q) 143{ 144 if (max_threads == 0) 145 { 146 // nothing runs in parallel 147 q->id = ++total_queues; 148 Dprintf (DEBUG_THREADS, NTXT ("put_queue:%d thr=%lld max_threads=%d queue (%d) runs on the worked thread\n"), 149 __LINE__, (unsigned long long) pthread_self (), max_threads, q->id); 150 q->func (q->arg); 151 delete q; 152 return; 153 } 154 155 pthread_mutex_lock (&p_mutex); 156 // nothing runs in parallel 157 q->id = ++total_queues; 158 Dprintf (DEBUG_THREADS, "put_queue:%d thr=%lld max_threads=%d queue (%d)\n", 159 __LINE__, (unsigned long long) pthread_self (), max_threads, q->id); 160 if (queue) 161 { 162 last_queue->next = q; 163 last_queue = q; 164 } 165 else 166 { 167 queue = q; 168 last_queue = q; 169 } 170 queues_cnt++; 171 Dprintf (DEBUG_THREADS, 172 "put_queue:%d id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n", 173 __LINE__, q->id, queues_cnt, (int) threads->size (), max_threads); 174 if (queues_cnt > threads->size () && threads->size () < max_threads) 175 { 176 pthread_t thr; 177 int r = pthread_create (&thr, NULL, thread_pool_loop, (void *) this); 178 Dprintf (DEBUG_THREADS, 179 "put_queue:%d pthread_create returns %d thr=%llu\n", 180 __LINE__, r, (unsigned long long) thr); 181 if (r) 182 fprintf (stderr, GTXT ("pthread_create failed. errnum=%d (%s)\n"), r, 183 STR (strerror (r))); 184 else 185 threads->append (thr); 186 } 187 pthread_cond_signal (&p_cond_var); 188 pthread_mutex_unlock (&p_mutex); 189} 190 191void 192DbeThreadPool::wait_queues () 193{ 194 pthread_mutex_lock (&p_mutex); 195 no_new_queues = true; 196 pthread_mutex_unlock (&p_mutex); 197 pthread_cond_broadcast (&p_cond_var); 198 for (;;) // Run requests on the worked thread too 199 { 200 DbeQueue *q = get_queue (); 201 if (q == NULL) 202 break; 203 Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d start\n", 204 __LINE__, (unsigned long long) pthread_self (), q->id); 205 q->func (q->arg); 206 Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d done\n", 207 __LINE__, (unsigned long long) pthread_self (), q->id); 208 delete q; 209 } 210 for (int i = 0, sz = threads->size (); i < sz; i++) 211 { 212 void *retval; 213 pthread_join (threads->get (i), &retval); 214 } 215} 216 217DbeQueue::DbeQueue (int (*_func) (void *arg), void *_arg) 218{ 219 func = _func; 220 arg = _arg; 221 next = NULL; 222} 223 224DbeQueue::~DbeQueue () { } 225