1/* Copyright (C) 2021-2024 Free Software Foundation, Inc.
2   Contributed by Oracle.
3
4   This file is part of GNU Binutils.
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 3, or (at your option)
9   any later version.
10
11   This program is distributed in the hope that it will be useful,
12   but WITHOUT ANY WARRANTY; without even the implied warranty of
13   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14   GNU General Public License for more details.
15
16   You should have received a copy of the GNU General Public License
17   along with this program; if not, write to the Free Software
18   Foundation, 51 Franklin Street - Fifth Floor, Boston,
19   MA 02110-1301, USA.  */
20
21#include "config.h"
22#include <stdio.h>
23#include <stdlib.h>
24#include <string.h>
25#include <sys/types.h>
26#include <unistd.h>
27
28#include "DbeThread.h"
29#include "util.h"
30#include "vec.h"
31
32static void
33cleanup_free_mutex (void* arg) {
34  //  pthread_mutex_t *p_mutex = (pthread_mutex_t *) arg;
35  //  if (p_mutex)
36  //    pthread_mutex_unlock (p_mutex);
37}
38
39static void*
40thread_pool_loop (void* arg)
41{
42  DbeThreadPool *thrp = (DbeThreadPool*) arg;
43  Dprintf (DEBUG_THREADS, "thread_pool_loop:%d starting thread=%llu\n",
44	   __LINE__, (unsigned long long) pthread_self ());
45
46  /* set my cancel state to 'enabled', and cancel type to 'defered'. */
47  pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
48  pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);
49
50  /* set thread cleanup handler */
51  pthread_cleanup_push (cleanup_free_mutex, (void*) & (thrp->p_mutex));
52  for (;;)
53    {
54      DbeQueue *q = thrp->get_queue ();
55      if (q)
56	{ /* a request is pending */
57	  Dprintf (DEBUG_THREADS,
58		   "thread_pool_loop:%d thread=%llu queue=%d start\n",
59		   __LINE__, (unsigned long long) pthread_self (), q->id);
60	  q->func (q->arg);
61	  Dprintf (DEBUG_THREADS,
62		   "thread_pool_loop:%d thread=%llu queue=%d done\n",
63		   __LINE__, (unsigned long long) pthread_self (), q->id);
64	  delete q;
65	  continue;
66	}
67      if (thrp->no_new_queues)
68	{
69	  Dprintf (DEBUG_THREADS, "thread_pool_loop:%d exit thread=%llu\n",
70		   __LINE__, (unsigned long long) pthread_self ());
71	  pthread_exit (NULL);
72	}
73      Dprintf (DEBUG_THREADS,
74	       "thread_pool_loop:%d before pthread_cond_wait thread=%llu\n",
75	       __LINE__, (unsigned long long) pthread_self ());
76      pthread_mutex_lock (&thrp->p_mutex);
77      pthread_cond_wait (&thrp->p_cond_var, &thrp->p_mutex);
78      Dprintf (DEBUG_THREADS,
79	       "thread_pool_loop:%d after pthread_cond_wait thread=%llu\n",
80	       __LINE__, (unsigned long long) pthread_self ());
81      pthread_mutex_unlock (&thrp->p_mutex);
82    }
83
84  // never reached, but we must use it here. See `man pthread_cleanup_push`
85  pthread_cleanup_pop (0);
86}
87
88DbeThreadPool::DbeThreadPool (int _max_threads)
89{
90  static const int DBE_NTHREADS_DEFAULT = 4;
91  char *s = getenv ("GPROFNG_DBE_NTHREADS");
92  if (s)
93    {
94      max_threads = atoi (s);
95      if (max_threads < 0)
96	max_threads = 0;
97      if (_max_threads > 0 && max_threads < _max_threads)
98	max_threads = _max_threads;
99    }
100  else
101    {
102      max_threads = _max_threads;
103      if (max_threads < 0)
104	max_threads = DBE_NTHREADS_DEFAULT;
105    }
106  Dprintf (DEBUG_THREADS, "DbeThreadPool:%d  max_threads %d ---> %d\n",
107	   __LINE__, _max_threads, max_threads);
108  pthread_mutex_init (&p_mutex, NULL);
109  pthread_cond_init (&p_cond_var, NULL);
110  threads = new Vector <pthread_t>(max_threads);
111  queue = NULL;
112  last_queue = NULL;
113  no_new_queues = false;
114  queues_cnt = 0;
115  total_queues = 0;
116}
117
118DbeThreadPool::~DbeThreadPool ()
119{
120  delete threads;
121}
122
123DbeQueue *
124DbeThreadPool::get_queue ()
125{
126  pthread_mutex_lock (&p_mutex);
127  DbeQueue *q = queue;
128  Dprintf (DEBUG_THREADS,
129   "get_queue:%d thr: %lld id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n",
130	   __LINE__, (unsigned long long) pthread_self (),
131	   q ? q->id : -1, queues_cnt, (int) threads->size (), max_threads);
132  if (q)
133    {
134      queue = q->next;
135      queues_cnt--;
136    }
137  pthread_mutex_unlock (&p_mutex);
138  return q;
139}
140
141void
142DbeThreadPool::put_queue (DbeQueue *q)
143{
144  if (max_threads == 0)
145    {
146      // nothing runs in parallel
147      q->id = ++total_queues;
148      Dprintf (DEBUG_THREADS, NTXT ("put_queue:%d thr=%lld max_threads=%d queue (%d) runs on the worked thread\n"),
149	       __LINE__, (unsigned long long) pthread_self (), max_threads, q->id);
150      q->func (q->arg);
151      delete q;
152      return;
153    }
154
155  pthread_mutex_lock (&p_mutex);
156  // nothing runs in parallel
157  q->id = ++total_queues;
158  Dprintf (DEBUG_THREADS, "put_queue:%d thr=%lld max_threads=%d queue (%d)\n",
159	   __LINE__, (unsigned long long) pthread_self (), max_threads, q->id);
160  if (queue)
161    {
162      last_queue->next = q;
163      last_queue = q;
164    }
165  else
166    {
167      queue = q;
168      last_queue = q;
169    }
170  queues_cnt++;
171  Dprintf (DEBUG_THREADS,
172	   "put_queue:%d id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n",
173	   __LINE__, q->id, queues_cnt, (int) threads->size (), max_threads);
174  if (queues_cnt > threads->size () && threads->size () < max_threads)
175    {
176      pthread_t thr;
177      int r = pthread_create (&thr, NULL, thread_pool_loop, (void *) this);
178      Dprintf (DEBUG_THREADS,
179	       "put_queue:%d pthread_create returns %d thr=%llu\n",
180	       __LINE__, r, (unsigned long long) thr);
181      if (r)
182	fprintf (stderr, GTXT ("pthread_create failed. errnum=%d (%s)\n"), r,
183		 STR (strerror (r)));
184      else
185	threads->append (thr);
186    }
187  pthread_cond_signal (&p_cond_var);
188  pthread_mutex_unlock (&p_mutex);
189}
190
191void
192DbeThreadPool::wait_queues ()
193{
194  pthread_mutex_lock (&p_mutex);
195  no_new_queues = true;
196  pthread_mutex_unlock (&p_mutex);
197  pthread_cond_broadcast (&p_cond_var);
198  for (;;) // Run requests on the worked thread too
199    {
200      DbeQueue *q = get_queue ();
201      if (q == NULL)
202	break;
203      Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d start\n",
204	       __LINE__, (unsigned long long) pthread_self (), q->id);
205      q->func (q->arg);
206      Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d done\n",
207	       __LINE__, (unsigned long long) pthread_self (), q->id);
208      delete q;
209    }
210  for (int i = 0, sz = threads->size (); i < sz; i++)
211    {
212      void *retval;
213      pthread_join (threads->get (i), &retval);
214    }
215}
216
217DbeQueue::DbeQueue (int (*_func) (void *arg), void *_arg)
218{
219  func = _func;
220  arg = _arg;
221  next = NULL;
222}
223
224DbeQueue::~DbeQueue () { }
225