1/* Copyright (c) 2007-2009, Stanford University 2* All rights reserved. 3* 4* Redistribution and use in source and binary forms, with or without 5* modification, are permitted provided that the following conditions are met: 6* * Redistributions of source code must retain the above copyright 7* notice, this list of conditions and the following disclaimer. 8* * Redistributions in binary form must reproduce the above copyright 9* notice, this list of conditions and the following disclaimer in the 10* documentation and/or other materials provided with the distribution. 11* * Neither the name of Stanford University nor the names of its 12* contributors may be used to endorse or promote products derived from 13* this software without specific prior written permission. 14* 15* THIS SOFTWARE IS PROVIDED BY STANFORD UNIVERSITY ``AS IS'' AND ANY 16* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 17* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 18* DISCLAIMED. IN NO EVENT SHALL STANFORD UNIVERSITY BE LIABLE FOR ANY 19* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 20* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 21* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND 22* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 24* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25*/ 26 27#include <barrelfish/barrelfish.h> 28#include <assert.h> 29 30#include "atomic.h" 31#include "memory.h" 32#include "tpool.h" 33#include "stddefines.h" 34 35typedef struct { 36 struct thread_sem sem_run; 37 unsigned int *num_workers_done; 38 struct thread_sem *sem_all_workers_done; 39 thread_func *thread_func; 40 void **thread_func_arg; 41 void **ret; 42 int *num_workers; 43 int *die; 44} thread_arg_t; 45 46struct tpool_t { 47 int num_threads; 48 int num_workers; 49 int die; 50 thread_func thread_func; 51 struct thread_sem sem_all_workers_done; 52 unsigned int num_workers_done; 53 void **args; 54 struct thread **threads; 55 thread_arg_t *thread_args; 56}; 57 58static int thread_loop (void *); 59 60tpool_t* tpool_create (int num_threads) 61{ 62 int i, ret; 63 tpool_t *tpool; 64 /* pthread_attr_t attr; */ 65 66 tpool = phoenix_mem_calloc (1, sizeof (tpool_t)); 67 if (tpool == NULL) 68 return NULL; 69 70 tpool->num_threads = num_threads; 71 tpool->num_workers = num_threads; 72 73 tpool->args = (void **)phoenix_mem_malloc (sizeof (void *) * num_threads); 74 if (tpool->args == NULL) 75 goto fail_args; 76 77 // Barrelfish: tpool->threads still allocated but not actually 78 // used except for first idle thread, which I believe is created 79 // by mistake on core 0, as it is never actually used. 80 tpool->threads = (struct thread **)phoenix_mem_malloc (sizeof (struct thread *) * num_threads); 81 if (tpool->threads == NULL) 82 goto fail_threads; 83 84 tpool->thread_args = (thread_arg_t *)phoenix_mem_malloc ( 85 sizeof (thread_arg_t) * num_threads); 86 if (tpool->thread_args == NULL) 87 goto fail_thread_args; 88 89 thread_sem_init(&tpool->sem_all_workers_done, 0); 90 91 /* CHECK_ERROR (pthread_attr_init (&attr)); */ 92 /* CHECK_ERROR (pthread_attr_setscope (&attr, PTHREAD_SCOPE_SYSTEM)); */ 93 /* CHECK_ERROR (pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED)); */ 94 95 /* Span domain to all cores */ 96 int my_core_id = disp_get_core_id(); 97 errval_t err; 98 for (i = my_core_id + 1; i < num_threads + my_core_id; i++) { 99 err = domain_new_dispatcher(i, NULL, NULL); 100 if (err_is_fail(err)) { 101 DEBUG_ERR(err, "failed to span domain"); 102 printf("Failed to span domain to %d\n", i); 103 assert(err_is_ok(err)); 104 } 105 } 106 107 tpool->die = 0; 108 for (i = 0; i < num_threads; ++i) { 109 /* Initialize thread argument. */ 110 thread_sem_init(&(tpool->thread_args[i].sem_run), 0); 111 tpool->thread_args[i].sem_all_workers_done = 112 &tpool->sem_all_workers_done; 113 tpool->thread_args[i].num_workers_done = 114 &tpool->num_workers_done; 115 tpool->thread_args[i].die = &tpool->die; 116 tpool->thread_args[i].thread_func = &tpool->thread_func; 117 tpool->thread_args[i].thread_func_arg = &tpool->args[i]; 118 tpool->thread_args[i].ret = (void **)phoenix_mem_malloc (sizeof (void *)); 119 CHECK_ERROR (tpool->thread_args[i].ret == NULL); 120 tpool->thread_args[i].num_workers = &tpool->num_workers; 121 122 if(i < num_threads - 1) { 123 do { 124 err = domain_thread_create_on(i + my_core_id + 1, thread_loop, 125 &tpool->thread_args[i], NULL); 126 if (err_no(err) == LIB_ERR_NO_SPANNED_DISP) { 127 thread_yield(); 128 } 129 } while(err_no(err) == LIB_ERR_NO_SPANNED_DISP); 130 if (err_is_fail(err)) { 131 DEBUG_ERR(err, "domain_thread_create_on failed"); 132 printf("domain_thread_create_on failed on %d\n", i); 133 assert(err_is_ok(err)); 134 } 135 } else { 136 tpool->threads[i] = thread_create(thread_loop, &tpool->thread_args[i]); 137 err = thread_detach(tpool->threads[i]); 138 assert(err_is_ok(err)); 139 } 140 } 141 142 return tpool; 143 144fail_all_workers_done: 145 phoenix_mem_free (tpool->thread_args); 146fail_thread_args: 147 phoenix_mem_free (tpool->threads); 148fail_threads: 149 phoenix_mem_free (tpool->args); 150fail_args: 151 152 return NULL; 153} 154 155int tpool_set ( 156 tpool_t *tpool, thread_func thread_func, void **args, int num_workers) 157{ 158 int i; 159 160 assert (tpool != NULL); 161 162 tpool->thread_func = thread_func; 163 164 assert (num_workers <= tpool->num_threads); 165 tpool->num_workers = num_workers; 166 167 for (i = 0; i < num_workers; ++i) 168 { 169 tpool->args[i] = args[i]; 170 } 171 172 173 return 0; 174} 175 176int tpool_begin (tpool_t *tpool) 177{ 178 int i, ret; 179 180 assert (tpool != NULL); 181 182 if (tpool->num_workers == 0) 183 return 0; 184 185 tpool->num_workers_done = 0; 186 187 for (i = 0; i < tpool->num_workers; ++i) { 188 thread_sem_post(&(tpool->thread_args[i].sem_run)); 189 } 190 191 return 0; 192} 193 194int tpool_wait (tpool_t *tpool) 195{ 196 int ret; 197 198 assert (tpool != NULL); 199 200 if (tpool->num_workers == 0) 201 return 0; 202 203 thread_sem_wait(&tpool->sem_all_workers_done); 204 205 return 0; 206} 207 208void** tpool_get_results (tpool_t *tpool) 209{ 210 int i; 211 void **rets; 212 213 assert (tpool != NULL); 214 215 rets = (void **)phoenix_mem_malloc (sizeof (void *) * tpool->num_threads); 216 CHECK_ERROR (rets == NULL); 217 218 for (i = 0; i < tpool->num_threads; ++i) { 219 rets[i] = *(tpool->thread_args[i].ret); 220 } 221 222 return rets; 223} 224 225int tpool_destroy (tpool_t *tpool) 226{ 227 int i; 228 int result; 229 230 assert (tpool != NULL); 231 assert (tpool->die == 0); 232 233 result = 0; 234 tpool->num_workers = tpool->num_threads; 235 tpool->num_workers_done = 0; 236 237 for (i = 0; i < tpool->num_threads; ++i) { 238 phoenix_mem_free (tpool->thread_args[i].ret); 239 240 tpool->die = 1; 241 thread_sem_post(&tpool->thread_args[i].sem_run); 242 } 243 244 thread_sem_wait(&tpool->sem_all_workers_done); 245 246 phoenix_mem_free (tpool->args); 247 phoenix_mem_free (tpool->threads); 248 phoenix_mem_free (tpool->thread_args); 249 250 phoenix_mem_free (tpool); 251 252 return result; 253} 254 255static int thread_loop (void *arg) 256{ 257 thread_arg_t *thread_arg = arg; 258 thread_func thread_func; 259 void *thread_func_arg; 260 void **ret; 261 int num_workers_done; 262 263 assert (thread_arg); 264 265 while (1) 266 { 267 thread_sem_wait(&thread_arg->sem_run); 268 if (*thread_arg->die) { 269 break; 270 } 271 272 thread_func = *(thread_arg->thread_func); 273 thread_func_arg = *(thread_arg->thread_func_arg); 274 ret = thread_arg->ret; 275 276 /* Run thread function. */ 277 *ret = (*thread_func)(thread_func_arg); 278 279 num_workers_done = fetch_and_inc(thread_arg->num_workers_done) + 1; 280 if (num_workers_done == *thread_arg->num_workers) 281 { 282 /* Everybody's done. */ 283 thread_sem_post(thread_arg->sem_all_workers_done); 284 } 285 } 286 287 num_workers_done = fetch_and_inc(thread_arg->num_workers_done) + 1; 288 if (num_workers_done == *thread_arg->num_workers) 289 { 290 /* Everybody's done. */ 291 thread_sem_post(thread_arg->sem_all_workers_done); 292 } 293 294 return 0; 295} 296