1/* Copyright (c) 2007-2009, Stanford University
2* All rights reserved.
3*
4* Redistribution and use in source and binary forms, with or without
5* modification, are permitted provided that the following conditions are met:
6*     * Redistributions of source code must retain the above copyright
7*       notice, this list of conditions and the following disclaimer.
8*     * Redistributions in binary form must reproduce the above copyright
9*       notice, this list of conditions and the following disclaimer in the
10*       documentation and/or other materials provided with the distribution.
11*     * Neither the name of Stanford University nor the names of its
12*       contributors may be used to endorse or promote products derived from
13*       this software without specific prior written permission.
14*
15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY
16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY
19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25*/
26
27#include <barrelfish/barrelfish.h>
28#include <assert.h>
29
30#include "atomic.h"
31#include "memory.h"
32#include "tpool.h"
33#include "stddefines.h"
34
35typedef struct {
36    struct thread_sem           sem_run;
37    unsigned int    *num_workers_done;
38    struct thread_sem           *sem_all_workers_done;
39    thread_func     *thread_func;
40    void            **thread_func_arg;
41    void            **ret;
42    int             *num_workers;
43    int             *die;
44} thread_arg_t;
45
46struct tpool_t {
47    int             num_threads;
48    int             num_workers;
49    int             die;
50    thread_func     thread_func;
51    struct thread_sem           sem_all_workers_done;
52    unsigned int    num_workers_done;
53    void            **args;
54    struct thread       **threads;
55    thread_arg_t    *thread_args;
56};
57
58static int thread_loop (void *);
59
60tpool_t* tpool_create (int num_threads)
61{
62    int             i, ret;
63    tpool_t         *tpool;
64    /* pthread_attr_t  attr; */
65
66    tpool = phoenix_mem_calloc (1, sizeof (tpool_t));
67    if (tpool == NULL)
68        return NULL;
69
70    tpool->num_threads = num_threads;
71    tpool->num_workers = num_threads;
72
73    tpool->args = (void **)phoenix_mem_malloc (sizeof (void *) * num_threads);
74    if (tpool->args == NULL)
75        goto fail_args;
76
77    // Barrelfish: tpool->threads still allocated but not actually
78    // used except for first idle thread, which I believe is created
79    // by mistake on core 0, as it is never actually used.
80    tpool->threads = (struct thread **)phoenix_mem_malloc (sizeof (struct thread *) * num_threads);
81    if (tpool->threads == NULL)
82        goto fail_threads;
83
84    tpool->thread_args = (thread_arg_t *)phoenix_mem_malloc (
85        sizeof (thread_arg_t) * num_threads);
86    if (tpool->thread_args == NULL)
87        goto fail_thread_args;
88
89    thread_sem_init(&tpool->sem_all_workers_done, 0);
90
91    /* CHECK_ERROR (pthread_attr_init (&attr)); */
92    /* CHECK_ERROR (pthread_attr_setscope (&attr, PTHREAD_SCOPE_SYSTEM)); */
93    /* CHECK_ERROR (pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED)); */
94
95    /* Span domain to all cores */
96    int my_core_id = disp_get_core_id();
97    errval_t err;
98    for (i = my_core_id + 1; i < num_threads + my_core_id; i++) {
99        err = domain_new_dispatcher(i, NULL, NULL);
100        if (err_is_fail(err)) {
101            DEBUG_ERR(err, "failed to span domain");
102            printf("Failed to span domain to %d\n", i);
103            assert(err_is_ok(err));
104        }
105    }
106
107    tpool->die = 0;
108    for (i = 0; i < num_threads; ++i) {
109        /* Initialize thread argument. */
110        thread_sem_init(&(tpool->thread_args[i].sem_run), 0);
111        tpool->thread_args[i].sem_all_workers_done =
112            &tpool->sem_all_workers_done;
113        tpool->thread_args[i].num_workers_done =
114            &tpool->num_workers_done;
115        tpool->thread_args[i].die = &tpool->die;
116        tpool->thread_args[i].thread_func = &tpool->thread_func;
117        tpool->thread_args[i].thread_func_arg = &tpool->args[i];
118        tpool->thread_args[i].ret = (void **)phoenix_mem_malloc (sizeof (void *));
119        CHECK_ERROR (tpool->thread_args[i].ret == NULL);
120        tpool->thread_args[i].num_workers = &tpool->num_workers;
121
122        if(i < num_threads - 1) {
123            do {
124                err = domain_thread_create_on(i + my_core_id + 1, thread_loop,
125                                              &tpool->thread_args[i], NULL);
126                if (err_no(err) == LIB_ERR_NO_SPANNED_DISP) {
127                    thread_yield();
128                }
129            } while(err_no(err) == LIB_ERR_NO_SPANNED_DISP);
130            if (err_is_fail(err)) {
131                DEBUG_ERR(err, "domain_thread_create_on failed");
132                printf("domain_thread_create_on failed on %d\n", i);
133                assert(err_is_ok(err));
134            }
135        } else {
136            tpool->threads[i] = thread_create(thread_loop, &tpool->thread_args[i]);
137            err = thread_detach(tpool->threads[i]);
138            assert(err_is_ok(err));
139        }
140    }
141
142    return tpool;
143
144fail_all_workers_done:
145    phoenix_mem_free (tpool->thread_args);
146fail_thread_args:
147    phoenix_mem_free (tpool->threads);
148fail_threads:
149    phoenix_mem_free (tpool->args);
150fail_args:
151
152    return NULL;
153}
154
155int tpool_set (
156    tpool_t *tpool, thread_func thread_func, void **args, int num_workers)
157{
158    int             i;
159
160    assert (tpool != NULL);
161
162    tpool->thread_func = thread_func;
163
164    assert (num_workers <= tpool->num_threads);
165    tpool->num_workers = num_workers;
166
167    for (i = 0; i < num_workers; ++i)
168    {
169        tpool->args[i] = args[i];
170    }
171
172
173    return 0;
174}
175
176int tpool_begin (tpool_t *tpool)
177{
178    int             i, ret;
179
180    assert (tpool != NULL);
181
182    if (tpool->num_workers == 0)
183        return 0;
184
185    tpool->num_workers_done = 0;
186
187    for (i = 0; i < tpool->num_workers; ++i) {
188        thread_sem_post(&(tpool->thread_args[i].sem_run));
189    }
190
191    return 0;
192}
193
194int tpool_wait (tpool_t *tpool)
195{
196    int             ret;
197
198    assert (tpool != NULL);
199
200    if (tpool->num_workers == 0)
201        return 0;
202
203    thread_sem_wait(&tpool->sem_all_workers_done);
204
205    return 0;
206}
207
208void** tpool_get_results (tpool_t *tpool)
209{
210    int             i;
211    void            **rets;
212
213    assert (tpool != NULL);
214
215    rets = (void **)phoenix_mem_malloc (sizeof (void *) * tpool->num_threads);
216    CHECK_ERROR (rets == NULL);
217
218    for (i = 0; i < tpool->num_threads; ++i) {
219        rets[i] = *(tpool->thread_args[i].ret);
220    }
221
222    return rets;
223}
224
225int tpool_destroy (tpool_t *tpool)
226{
227    int             i;
228    int             result;
229
230    assert (tpool != NULL);
231    assert (tpool->die == 0);
232
233    result = 0;
234    tpool->num_workers = tpool->num_threads;
235    tpool->num_workers_done = 0;
236
237    for (i = 0; i < tpool->num_threads; ++i) {
238        phoenix_mem_free (tpool->thread_args[i].ret);
239
240        tpool->die = 1;
241        thread_sem_post(&tpool->thread_args[i].sem_run);
242    }
243
244    thread_sem_wait(&tpool->sem_all_workers_done);
245
246    phoenix_mem_free (tpool->args);
247    phoenix_mem_free (tpool->threads);
248    phoenix_mem_free (tpool->thread_args);
249
250    phoenix_mem_free (tpool);
251
252    return result;
253}
254
255static int thread_loop (void *arg)
256{
257    thread_arg_t    *thread_arg = arg;
258    thread_func     thread_func;
259    void            *thread_func_arg;
260    void            **ret;
261    int             num_workers_done;
262
263    assert (thread_arg);
264
265    while (1)
266    {
267        thread_sem_wait(&thread_arg->sem_run);
268        if (*thread_arg->die) {
269            break;
270        }
271
272        thread_func = *(thread_arg->thread_func);
273        thread_func_arg = *(thread_arg->thread_func_arg);
274        ret = thread_arg->ret;
275
276        /* Run thread function. */
277        *ret = (*thread_func)(thread_func_arg);
278
279        num_workers_done = fetch_and_inc(thread_arg->num_workers_done) + 1;
280        if (num_workers_done == *thread_arg->num_workers)
281        {
282            /* Everybody's done. */
283            thread_sem_post(thread_arg->sem_all_workers_done);
284        }
285    }
286
287    num_workers_done = fetch_and_inc(thread_arg->num_workers_done) + 1;
288    if (num_workers_done == *thread_arg->num_workers)
289    {
290        /* Everybody's done. */
291        thread_sem_post(thread_arg->sem_all_workers_done);
292    }
293
294    return 0;
295}
296