1/** 2 * \file 3 * \brief 4 */ 5 6/* 7 * Copyright (c) 2009, 2010, ETH Zurich. 8 * All rights reserved. 9 * 10 * This file is distributed under the terms in the attached LICENSE file. 11 * If you do not find this file, copies can be found by writing to: 12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group. 13 */ 14 15#include <string.h> 16#include "internal.h" 17 18enum role_type { 19 MSG_WAIT_MSG_WAIT, 20 WAIT_MSG_WAIT_MSG, 21}; 22 23#define COORDINATOR (bsp_id) 24static enum role_type my_role; 25static bool round; 26static coreid_t experiment_max_cpus; 27 28static void barrier_init(void) 29{ 30 round = false; 31 32 /* Determine roles as per my_core_id */ 33 if (my_core_id == COORDINATOR) { 34 my_role = MSG_WAIT_MSG_WAIT; 35 } else { 36 my_role = WAIT_MSG_WAIT_MSG; 37 } 38} 39 40static void ring_reply(struct rcce_binding *st) 41{ 42 /* nop */ 43} 44 45static void ring_request(struct rcce_binding *st) 46{ 47 assert(!round); 48 round = true; 49 50 errval_t err = st->tx_vtbl.ring_reply(st, NOP_CONT); 51 if (err_is_fail(err)){ 52 DEBUG_ERR(err, "send ring reply"); 53 abort(); 54 } 55} 56 57static void ring_request_cont(void *arg) 58{ 59 struct rcce_binding *b = arg; 60 errval_t err = b->tx_vtbl.ring_request(b, NOP_CONT); 61 if (err_is_fail(err)) { 62 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 63 err = b->register_send(b, get_default_waitset(), 64 MKCONT(ring_request_cont,b)); 65 assert(err_is_ok(err)); 66 return; 67 } 68 DEBUG_ERR(err, "send ring request"); 69 abort(); 70 } 71} 72 73static void message(void) 74{ 75 for (coreid_t i = bsp_id + ((my_core_id - bsp_id + 1) % experiment_max_cpus); 76 i != my_core_id; 77 i = bsp_id + ((i - bsp_id + 1) % experiment_max_cpus)) { 78 if (barray[i] != NULL) { 79 ring_request_cont(barray[i]); 80 return; 81 } 82 } 83 assert(!"Should not get here"); 84} 85 86struct msg_buf msgbuf[MAX_CPUS]; 87 88static void message_request(struct rcce_binding *st, uint16_t coreid, 89 const uint8_t *msg, size_t size) 90{ 91 assert(!msgbuf[coreid].pending); 92 struct msg_buf *m = &msgbuf[coreid]; 93 94 m->msg = memdup(msg, size); 95 m->length = size; 96 m->pending = true; 97 m->bulk = false; 98 dprintf("%d: msg arrived, (%d, %lu)\n", my_core_id, coreid, size); 99} 100 101#ifdef BULK_TRANSFER_ENABLED 102static void bulk_message_request(struct rcce_binding *b, uint16_t coreid, 103 uint64_t id, uint64_t size, 104 uint8_t last_fragment) 105{ 106 struct rcce_state *st = b->st; 107 assert(!msgbuf[coreid].pending); 108 struct msg_buf *m = &msgbuf[coreid]; 109 assert(m->bulk_ready); 110 assert(m->length == size); 111 void *buf = bulk_slave_buf_get_mem(&st->btr, id, NULL); 112 size_t copysize = last_fragment ? size - m->current : BLOCK_SIZE; 113 114 char *bf = buf; 115 /* printf("current = %lu, msg[0] = %d (%p), buf[0] = %d (%p), size = %llu, copysize = %lu\n", */ 116 /* m->current, m->msg[0], m->msg, bf[0], buf, size, copysize); */ 117 /* for(int i = 0; i < 64; i++) { */ 118 /* printf("%d ", bf[i]); */ 119 /* } */ 120 /* printf("\n"); */ 121 /* static int iter = 0; */ 122 /* if(++iter >= 2) { */ 123 /* while(bf[0] == 0); */ 124 /* } */ 125 memcpy(m->msg + m->current, buf, copysize); 126 /* m->msg = bulk_slave_buf_get_mem(&st->btr, id); */ 127 /* m->length = size; */ 128 m->current += copysize; 129 130 if(last_fragment) { 131 m->pending = true; 132 m->bulk = true; 133 m->id = id; 134 m->bulk_ready = false; 135 } 136 137 errval_t err = barray[coreid]->tx_vtbl. 138 bulk_message_reply(barray[coreid], NOP_CONT, my_core_id, id); 139 assert(err_is_ok(err)); 140} 141#endif 142 143static void message_reply(struct rcce_binding *b, uint16_t coreid) 144{ 145 struct rcce_state *st = b->st; 146 assert(st->waitmsg == true); 147 st->waitmsg = false; 148 /* printf("%d: msg ack'd %p, %d\n", my_core_id, st, st->waitmsg); */ 149} 150 151#ifdef BULK_TRANSFER_ENABLED 152static void bulk_message_reply(struct rcce_binding *b, uint16_t coreid, 153 uint64_t id) 154{ 155 struct rcce_state *st = b->st; 156 errval_t err = bulk_free(&st->bt, id); 157 assert(err_is_ok(err)); 158 assert(st->bulk_waitmsg == true); 159 st->bulk_waitmsg = false; 160} 161#endif 162 163static void message_request_cont(void *arg) 164{ 165 struct rcce_state *st = arg; 166 assert(!st->request_done); 167 st->request_done = true; 168 /* printf("%d: msg delivered, %p\n", my_core_id, st); */ 169} 170 171#ifdef BULK_TRANSFER_ENABLED 172static void bulk_recv_ready(struct rcce_binding *b, uint16_t coreid, 173 uint64_t size) 174{ 175 struct rcce_state *st = b->st; 176 177 assert(!st->recv_ready); 178 st->recv_ready = true; 179 dprintf("bulk_recv_ready\n"); 180} 181#endif 182 183#ifdef RCCE_PERF_MEASURE 184# include <barrelfish/dispatcher_arch.h> 185# include <barrelfish/curdispatcher_arch.h> 186# define PERF(x) d->timestamp[x] = rdtsc() 187# define PERFM(x) x 188#else 189# define PERF(x) 190# define PERFM(x) 191#endif 192 193errval_t send_message(char *msg, size_t size, coreid_t dest) 194{ 195 assert(barray[dest] != NULL); 196 struct rcce_state *st = barray[dest]->st; 197 errval_t err; 198 199#ifdef RCCE_PERF_MEASURE 200 dispatcher_handle_t handle = curdispatcher(); 201 struct dispatcher_shared_generic* d = 202 get_dispatcher_shared_generic(handle); 203#endif 204 205 dprintf("%d: S(%lu,%d,%p,%d)\n", my_core_id, size, dest, st, st->waitmsg); 206 207#ifdef BULK_TRANSFER_ENABLED 208 // XXX: Assert we can always send a big buffer as bulk data for performance 209 // reasons 210 if(size > BLOCK_SIZE) { 211 /* printf("size = %lu, BLOCK_SIZE = %u\n", size, BLOCK_SIZE); */ 212 } 213 // assert(size <= BLOCK_SIZE); 214#endif 215 216 PERF(0); 217 218 // Wait til previous message has been processed by receiver 219#ifdef BULK_TRANSFER_ENABLED 220 while(st->waitmsg || st->bulk_waitmsg || !st->recv_ready) { 221#else 222 while(st->waitmsg) { 223#endif 224 dprintf("waiting\n"); 225 messages_wait_and_handle_next(); 226 } 227 st->recv_ready = false; 228 229 PERF(1); 230 231#ifndef BULK_TRANSFER_ENABLED 232 st->waitmsg = true; 233 // Send via UMP 234 st->request_done = false; 235 PERF(2); 236 err = barray[dest]-> 237 tx_vtbl.message_request(barray[dest], MKCONT(message_request_cont,st), 238 my_core_id, (uint8_t *)msg, size); 239 assert(err_is_ok(err)); 240 PERF(16); 241 while(!st->request_done) { 242 /* printf("%d: handling\n", my_core_id); */ 243 messages_wait_and_handle_next(); 244 } 245 PERF(17); 246#else 247 /* printf("recv ready, sending %d\n", msg[0]); */ 248 // Send via bulk transfer 249 for(size_t i = 0; i < size; i += BLOCK_SIZE) { 250 struct bulk_buf *bb = bulk_alloc(&st->bt); 251 assert(bb != NULL); 252 void *buf = bulk_buf_get_mem(bb); 253 size_t sendsize = i + BLOCK_SIZE < size ? BLOCK_SIZE : size - i; 254 bool last_fragment = i + BLOCK_SIZE < size ? false : true; 255 256 memcpy(buf, msg + i, sendsize); 257 char *bf = buf; 258 /* printf("send to %p (%d), msg = %p, i = %lu, sendsize = %lu\n", buf, bf[0], msg, i, sendsize); */ 259 uintptr_t id = bulk_prepare_send(bb); 260 st->bulk_waitmsg = true; 261 err = barray[dest]->tx_vtbl. 262 bulk_message_request(barray[dest], NOP_CONT, my_core_id, id, 263 size, last_fragment); 264 assert(err_is_ok(err)); 265 while(st->bulk_waitmsg) { 266 dprintf("waiting for bulk reply\n"); 267 messages_wait_and_handle_next(); 268 } 269 } 270#endif 271 272 return SYS_ERR_OK; 273} 274 275static void wait(void) 276{ 277 while (!round) { 278 messages_wait_and_handle_next(); 279 } 280 round = false; 281} 282 283void barrier_wait(void) 284{ 285 switch(my_role) { 286 case MSG_WAIT_MSG_WAIT: 287 message(); 288 wait(); 289 message(); 290 wait(); 291 break; 292 293 case WAIT_MSG_WAIT_MSG: 294 wait(); 295 message(); 296 wait(); 297 message(); 298 break; 299 300 default: 301 assert(!"should not get here"); 302 } 303} 304 305void barrier_binding_init(struct rcce_binding *binding) 306{ 307 binding->rx_vtbl.ring_request = ring_request; 308 binding->rx_vtbl.ring_reply = ring_reply; 309 binding->rx_vtbl.message_request = message_request; 310 binding->rx_vtbl.message_reply = message_reply; 311#ifdef BULK_TRANSFER_ENABLED 312 binding->rx_vtbl.bulk_message_request = bulk_message_request; 313 binding->rx_vtbl.bulk_message_reply = bulk_message_reply; 314 binding->rx_vtbl.bulk_recv_ready = bulk_recv_ready; 315#endif 316} 317 318void barriers_init(coreid_t max_cpus) 319{ 320 experiment_max_cpus = max_cpus; 321 322 barrier_init(); 323} 324