1/*
2 * Copyright (c) 2009, 2010, 2011, 2012, ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
8 *
9 * A simple sorted linked list for pending flounder RPC messages in the sm backend.
10 * Because there is no obvious correspondence between RPC calls and replies,
11 * but we have continuations to call after we get the reply,
12 * we keep track of them ourselves.
13 * All our flounder RPC calls and replies contain a transaction id, which is used
14 * to look up the correct callback on receiving a RPC reply.
15 *
16 * using a linked list should be more than enough, since we don't expect many concurrent
17 * pending messages.
18 */
19
20#include "pending_msg.h"
21#include <stdlib.h>
22
23/**
24 * dump pending message TID's for given channel
25 */
26static void pending_msg_dump(struct bulk_channel *channel)
27{
28    assert(channel);
29    thread_mutex_lock_nested(&CHANNEL_DATA(channel)->mutex);
30
31    struct bulk_sm_pending_msg *node = CHANNEL_DATA(channel)->root;
32    debug_printf("Dumping pending message TID's for channel %p.\n", channel);
33    while (node) {
34        debug_printf("  %u\n", node->tid);
35        node = node->next;
36    }
37
38    thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex);
39}
40
41
42/**
43 * add the data to the list of pending messages in channel
44 * generates tid automatically
45 *
46 * @param channel:  Channel this message belongs to
47 * @param tid:      will be filled in with transaction id
48 * @param data:     payload for this message
49 */
50errval_t pending_msg_add(struct bulk_channel* channel,
51                         uint32_t *tid,
52                         union pending_msg_data data)
53{
54    assert(channel);
55    struct bulk_sm_pending_msg *p = malloc(sizeof(struct bulk_sm_pending_msg));
56    assert(p);
57
58    p->data = data;
59    p->next = NULL;
60    p->previous = NULL;
61
62    //seperate variable declared for easier compiler optimization
63    uint32_t thistid = (uint32_t) rand();
64    p->tid = thistid;
65
66    // debug_printf("PENDING MSG: [new tid=%u]\n", thistid);
67    // debug_printf("before:\n");
68    // pending_msg_dump(channel);
69
70    thread_mutex_lock(&CHANNEL_DATA(channel)->mutex);
71    struct bulk_sm_pending_msg *node = CHANNEL_DATA(channel)->root;
72
73    if (node == NULL){    //no other entries
74        CHANNEL_DATA(channel)->root = p;
75        *tid = thistid;
76        thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex);
77        // debug_printf("after:\n");
78        // pending_msg_dump(channel);
79        return SYS_ERR_OK;
80    }  else {
81        while(true){
82            if (node->tid < thistid){
83                if (node->next){
84                    node = node->next;
85                } else {    //end of list reached
86                    p->previous = node;
87                    node->next = p;
88                    *tid = thistid;
89                    thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex);
90                    // debug_printf("after:\n");
91                    // pending_msg_dump(channel);
92                    return SYS_ERR_OK;
93                }
94            } else if (node->tid > thistid) {
95                p->next = node;
96                p->previous = node->previous;
97
98                node->previous = p;
99
100                if (p->previous) {
101                    p->previous->next = p;
102                } else {
103                    //become new root
104                    CHANNEL_DATA(channel)->root = p;
105                }
106
107                *tid = thistid;
108                thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex);
109                // debug_printf("after:\n");
110                // pending_msg_dump(channel);
111                return SYS_ERR_OK;
112            } else {
113                // //tid already taken -> try again with different tid
114                // thistid = (uint32_t) rand();
115                // p->tid = thistid;
116                // node = CHANNEL_DATA(channel)->root; // XXX WRONG. root could be NULL -- jb
117                thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex);
118                free(p);
119                pending_msg_add(channel, tid, data); // XXX does copy of data recursively :-(
120            }
121        }
122    }
123    assert(!"should not be reached");
124}
125
126/**
127 * reads pending message
128 *
129 * @param channel:  Channel this message belongs to
130 * @param tid:      transaction id to look up
131 * @param data:     will be filled in with payload for this message
132 * @param remove:   whether item is to be removed from list
133 */
134errval_t pending_msg_get(struct bulk_channel     *channel,
135                         uint32_t                tid,
136                         union pending_msg_data  *data,
137                         bool                     do_remove)
138{
139    assert(channel);
140
141    thread_mutex_lock(&CHANNEL_DATA(channel)->mutex);
142    struct bulk_sm_pending_msg *p = CHANNEL_DATA(channel)->root;
143
144    // debug_printf("PENDING MSG: [remove tid=%u]\n", tid);
145    if (0) pending_msg_dump(channel); // defined but not used :-(
146
147    while(p != NULL){
148        if (p->tid < tid){
149            p = p->next;
150        } else if (p->tid > tid) {
151            p = NULL;//abort early (list is sorted)
152        } else {
153            //tid matches -> found
154            *data = p->data;
155
156            if (do_remove) {
157                //remove from tree
158                if (p->next){
159                    p->next->previous = p->previous;
160                }
161                if (p->previous){
162                    p->previous->next = p->next;
163                } else {
164                    CHANNEL_DATA(channel)->root = p->next;
165                }
166
167                free(p);
168            }
169
170            thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex);
171            return SYS_ERR_OK;
172        }
173    }
174    thread_mutex_unlock(&CHANNEL_DATA(channel)->mutex);
175    return BULK_TRANSFER_SM_NO_PENDING_MSG;
176}
177