1/* 2 * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich. 3 * All rights reserved. 4 * 5 * This file is distributed under the terms in the attached LICENSE file. 6 * If you do not find this file, copies can be found by writing to: 7 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group. 8 * 9 * A simple sorted linked list for pending flounder RPC messages in the sm backend. 10 * Because there is no obvious correspondence between RPC calls and replies, 11 * but we have continuations to call after we get the reply, 12 * we keep track of them ourselves. 13 * All our flounder RPC calls and replies contain a transaction id, which is used 14 * to look up the correct callback on receiving a RPC reply. 15 * 16 * using a linked list should be more than enough, since we don't expect many concurrent 17 * pending messages. 18 */ 19 20#include "pending_msg.h" 21#include <stdlib.h> 22 23/** 24 * dump pending message TID's for given channel 25 */ 26static void pending_msg_dump(struct bulk_channel *channel) 27{ 28 assert(channel); 29 thread_mutex_lock_nested(&CHANNEL_DATA(channel)->mutex); 30 31 struct bulk_sm_pending_msg *node = CHANNEL_DATA(channel)->root; 32 debug_printf("Dumping pending message TID's for channel %p.\n", channel); 33 while (node) { 34 debug_printf(" %u\n", node->tid); 35 node = node->next; 36 } 37 38 thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex); 39} 40 41 42/** 43 * add the data to the list of pending messages in channel 44 * generates tid automatically 45 * 46 * @param channel: Channel this message belongs to 47 * @param tid: will be filled in with transaction id 48 * @param data: payload for this message 49 */ 50errval_t pending_msg_add(struct bulk_channel* channel, 51 uint32_t *tid, 52 union pending_msg_data data) 53{ 54 assert(channel); 55 struct bulk_sm_pending_msg *p = malloc(sizeof(struct bulk_sm_pending_msg)); 56 assert(p); 57 58 p->data = data; 59 p->next = NULL; 60 p->previous = NULL; 61 62 //seperate variable declared for easier compiler optimization 63 uint32_t thistid = (uint32_t) rand(); 64 p->tid = thistid; 65 66 // debug_printf("PENDING MSG: [new tid=%u]\n", thistid); 67 // debug_printf("before:\n"); 68 // pending_msg_dump(channel); 69 70 thread_mutex_lock(&CHANNEL_DATA(channel)->mutex); 71 struct bulk_sm_pending_msg *node = CHANNEL_DATA(channel)->root; 72 73 if (node == NULL){ //no other entries 74 CHANNEL_DATA(channel)->root = p; 75 *tid = thistid; 76 thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex); 77 // debug_printf("after:\n"); 78 // pending_msg_dump(channel); 79 return SYS_ERR_OK; 80 } else { 81 while(true){ 82 if (node->tid < thistid){ 83 if (node->next){ 84 node = node->next; 85 } else { //end of list reached 86 p->previous = node; 87 node->next = p; 88 *tid = thistid; 89 thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex); 90 // debug_printf("after:\n"); 91 // pending_msg_dump(channel); 92 return SYS_ERR_OK; 93 } 94 } else if (node->tid > thistid) { 95 p->next = node; 96 p->previous = node->previous; 97 98 node->previous = p; 99 100 if (p->previous) { 101 p->previous->next = p; 102 } else { 103 //become new root 104 CHANNEL_DATA(channel)->root = p; 105 } 106 107 *tid = thistid; 108 thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex); 109 // debug_printf("after:\n"); 110 // pending_msg_dump(channel); 111 return SYS_ERR_OK; 112 } else { 113 // //tid already taken -> try again with different tid 114 // thistid = (uint32_t) rand(); 115 // p->tid = thistid; 116 // node = CHANNEL_DATA(channel)->root; // XXX WRONG. root could be NULL -- jb 117 thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex); 118 free(p); 119 pending_msg_add(channel, tid, data); // XXX does copy of data recursively :-( 120 } 121 } 122 } 123 assert(!"should not be reached"); 124} 125 126/** 127 * reads pending message 128 * 129 * @param channel: Channel this message belongs to 130 * @param tid: transaction id to look up 131 * @param data: will be filled in with payload for this message 132 * @param remove: whether item is to be removed from list 133 */ 134errval_t pending_msg_get(struct bulk_channel *channel, 135 uint32_t tid, 136 union pending_msg_data *data, 137 bool do_remove) 138{ 139 assert(channel); 140 141 thread_mutex_lock(&CHANNEL_DATA(channel)->mutex); 142 struct bulk_sm_pending_msg *p = CHANNEL_DATA(channel)->root; 143 144 // debug_printf("PENDING MSG: [remove tid=%u]\n", tid); 145 if (0) pending_msg_dump(channel); // defined but not used :-( 146 147 while(p != NULL){ 148 if (p->tid < tid){ 149 p = p->next; 150 } else if (p->tid > tid) { 151 p = NULL;//abort early (list is sorted) 152 } else { 153 //tid matches -> found 154 *data = p->data; 155 156 if (do_remove) { 157 //remove from tree 158 if (p->next){ 159 p->next->previous = p->previous; 160 } 161 if (p->previous){ 162 p->previous->next = p->next; 163 } else { 164 CHANNEL_DATA(channel)->root = p->next; 165 } 166 167 free(p); 168 } 169 170 thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex); 171 return SYS_ERR_OK; 172 } 173 } 174 thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex); 175 return BULK_TRANSFER_SM_NO_PENDING_MSG; 176} 177