1/**
2 * \file
3 * \brief Bidirectional LMP channel implementation
4 */
5
6/*
7 * Copyright (c) 2009, 2010, 2011, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <barrelfish/barrelfish.h>
16#include <barrelfish/lmp_chan.h>
17#include <barrelfish/dispatcher_arch.h>
18#include <if/monitor_defs.h>
19#include <barrelfish/caddr.h>
20#include <barrelfish/idc_export.h>
21#include <barrelfish/waitset_chan.h>
22#include "waitset_chan_priv.h"
23
24/**
25 * \brief Initialise a new LMP channel
26 *
27 * \param lc  Storage for channel state
28 */
29void lmp_chan_init(struct lmp_chan *lc)
30{
31    assert(lc != NULL);
32    lc->connstate = LMP_DISCONNECTED;
33    waitset_chanstate_init(&lc->send_waitset, CHANTYPE_LMP_OUT);
34    lc->endpoint = NULL;
35#ifndef NDEBUG
36    lc->prev = lc->next = NULL;
37#endif
38}
39
40/// Handler for LMP bind reply messages from the Monitor
41static void bind_lmp_reply_handler(struct monitor_binding *b,
42                                   errval_t success, uintptr_t mon_id,
43                                   uintptr_t conn_id,
44                                   struct capref endpoint)
45{
46    struct lmp_chan *lc = (void *)conn_id;
47
48    assert(lc->connstate == LMP_BIND_WAIT);
49
50    if (err_is_ok(success)) { /* bind succeeded */
51        lc->connstate = LMP_CONNECTED;
52        lc->remote_cap = endpoint;
53    }
54
55    /* either way, tell the user what happened */
56    assert(lc->bind_continuation.handler != NULL);
57    lc->bind_continuation.handler(lc->bind_continuation.st, success, lc);
58}
59
60static void send_bind_cont(void *arg)
61{
62    struct lmp_chan *lc = arg;
63    struct monitor_binding *b = lc->monitor_binding;
64    errval_t err;
65
66    /* Send bind request to the monitor */
67    err = b->tx_vtbl.bind_lmp_client_request(b, NOP_CONT, lc->iref,
68                                            (uintptr_t)lc, lc->buflen_words,
69                                            lc->local_cap);
70    if (err_is_ok(err)) { // request sent ok
71        event_mutex_unlock(&b->mutex);
72    } else if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
73        // register to retry
74        err = b->register_send(b, b->waitset, MKCONT(send_bind_cont,lc));
75        assert(err_is_ok(err)); // we hold the monitor binding mutex
76    } else { // permanent failure sending message
77        event_mutex_unlock(&b->mutex);
78        lc->bind_continuation.handler(lc->bind_continuation.st,
79                                      err_push(err, LIB_ERR_BIND_LMP_REQ), NULL);
80    }
81}
82
83/**
84 * \brief Initialise a new LMP channel and initiate a binding
85 *
86 * \param lc  Storage for channel state
87 * \param cont Continuation for bind completion/failure
88 * \param qnode Storage for an event queue node (used for queuing bind request)
89 * \param iref IREF to which to bind
90 * \param buflen_words Size of incoming buffer, in number of words
91 */
92errval_t lmp_chan_bind(struct lmp_chan *lc, struct lmp_bind_continuation cont,
93                       struct event_queue_node *qnode, iref_t iref,
94                       size_t buflen_words)
95{
96    errval_t err;
97
98    lmp_chan_init(lc);
99
100    /* store bind arguments */
101    lc->iref = iref;
102    lc->buflen_words = buflen_words;
103    lc->bind_continuation = cont;
104
105    /* allocate a cap slot for the new endpoint cap */
106    err = slot_alloc(&lc->local_cap);
107    if (err_is_fail(err)) {
108        waitset_chanstate_destroy(&lc->send_waitset);
109        return err_push(err, LIB_ERR_SLOT_ALLOC);
110    }
111
112    /* allocate a local endpoint */
113    err = lmp_endpoint_create_in_slot(buflen_words, lc->local_cap,
114                                      &lc->endpoint);
115    if (err_is_fail(err)) {
116        slot_free(lc->local_cap);
117        waitset_chanstate_destroy(&lc->send_waitset);
118        return err_push(err, LIB_ERR_ENDPOINT_CREATE);
119    }
120
121    // wait for the ability to use the monitor binding
122    lc->connstate = LMP_BIND_WAIT;
123    struct monitor_binding *mb = lc->monitor_binding = get_monitor_binding();
124    event_mutex_enqueue_lock(&mb->mutex, qnode,
125                             MKCLOSURE(send_bind_cont, lc));
126
127    return SYS_ERR_OK;
128}
129
130/**
131 * \brief Initialise a new LMP channel and initiate a binding
132 *
133 * \param lc  Storage for channel state
134 * \param cont Continuation for bind completion/failure
135 * \param qnode Storage for an event queue node (used for queuing bind request)
136 * \param iref IREF to which to bind
137 * \param buflen_words Size of incoming buffer, in number of words
138 */
139errval_t lmp_chan_bind_to_endpoint(struct lmp_chan *lc, struct capref remoteep,
140                                   size_t buflen_words)
141{
142    errval_t err;
143
144    lmp_chan_init(lc);
145
146    lc->remote_cap = remoteep;
147
148    /* store bind arguments */
149    lc->buflen_words = buflen_words;
150
151    /* allocate a cap slot for the new endpoint cap */
152    err = slot_alloc(&lc->local_cap);
153    if (err_is_fail(err)) {
154        waitset_chanstate_destroy(&lc->send_waitset);
155        return err_push(err, LIB_ERR_SLOT_ALLOC);
156    }
157
158    struct endpoint_identity epid;
159    err = invoke_endpoint_identify(remoteep, &epid);
160    if (err_is_fail(err)) {
161        waitset_chanstate_destroy(&lc->send_waitset);
162        return err;
163    }
164
165    /* allocate a local endpoint */
166    err = lmp_endpoint_create_in_slot_with_iftype(buflen_words, lc->local_cap,
167                                                  &lc->endpoint, epid.iftype);
168    if (err_is_fail(err)) {
169        slot_free(lc->local_cap);
170        waitset_chanstate_destroy(&lc->send_waitset);
171        return err_push(err, LIB_ERR_ENDPOINT_CREATE);
172    }
173
174    // wait for the ability to use the monitor binding
175    lc->connstate = LMP_EP_WAIT_ACK;
176
177    return SYS_ERR_OK;
178}
179
180/// Destroy the local state associated with a given channel
181void lmp_chan_destroy(struct lmp_chan *lc)
182{
183    lc->connstate = LMP_DISCONNECTED;
184    cap_destroy(lc->local_cap);
185
186    if (lc->endpoint != NULL) {
187        lmp_endpoint_free(lc->endpoint);
188    }
189
190    // remove from send retry queue on dispatcher
191    if (waitset_chan_is_registered(&lc->send_waitset)) {
192        assert(lc->prev != NULL && lc->next != NULL);
193        dispatcher_handle_t handle = disp_disable();
194        struct dispatcher_generic *disp = get_dispatcher_generic(handle);
195        if (lc->next == lc->prev) {
196            assert_disabled(lc->next == lc);
197            assert_disabled(disp->lmp_send_events_list == lc);
198            disp->lmp_send_events_list = NULL;
199        } else {
200            lc->prev->next = lc->next;
201            lc->next->prev = lc->prev;
202        }
203        disp_enable(handle);
204
205#ifndef NDEBUG
206        lc->next = lc->prev = NULL;
207#endif
208    }
209
210    waitset_chanstate_destroy(&lc->send_waitset);
211}
212
213struct bind_lmp_reply_state {
214    struct monitor_binding *b;
215    struct lmp_chan *lc;
216    struct monitor_bind_lmp_reply_monitor__tx_args args;
217    struct event_queue_node qnode;
218};
219
220static void send_bind_reply(void *arg)
221{
222    struct bind_lmp_reply_state *st = arg;
223    struct monitor_binding *b = st->b;
224    errval_t err;
225
226    err = b->tx_vtbl.bind_lmp_reply_monitor(b, NOP_CONT, st->args.err,
227                                                st->args.mon_id, st->args.conn_id,
228                                                st->args.ep);
229    if (err_is_ok(err)) {
230        event_mutex_unlock(&b->mutex);
231        free(st);
232    } else if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
233        err = b->register_send(b, b->waitset,
234                                   MKCONT(send_bind_reply,st));
235        assert(err_is_ok(err)); // shouldn't fail, as we have the mutex
236    } else {
237        event_mutex_unlock(&b->mutex);
238        USER_PANIC_ERR(err, "failed sending back reply to LMP bind request;"
239                       " request dropped!");
240        if (st->lc != NULL) {
241            lmp_chan_destroy(st->lc);
242            // FIXME: how do we tell the binding about this!?
243        }
244        free(st);
245    }
246}
247
248/// Handler for LMP bind request messages from the Monitor
249static void bind_lmp_service_request_handler(struct monitor_binding *b,
250                                             uintptr_t service_id,
251                                             uintptr_t mon_id,
252                                             size_t buflen_words,
253                                             struct capref endpoint)
254{
255    struct idc_export *e = (void *)service_id;
256    struct lmp_chan *lc = NULL;
257    errval_t err;
258
259    // call the binding's connect handler
260    if (e->lmp_connect_callback != NULL) {
261        err = e->lmp_connect_callback(e->connect_cb_st, buflen_words, endpoint, &lc);
262    } else {
263        err = LIB_ERR_NO_LMP_BIND_HANDLER;
264    }
265
266    if (err_is_ok(err)) {
267        assert(lc != NULL);
268    }
269
270    // wait for the ability to use the monitor binding
271    struct bind_lmp_reply_state *st = malloc(sizeof(struct bind_lmp_reply_state));
272    assert(st != NULL);
273
274    st->b = b;
275    st->lc = lc;
276    st->args.err = err;
277    st->args.mon_id = mon_id;
278    if (err_is_ok(err)) {
279        st->args.conn_id = (uintptr_t)lc;
280        st->args.ep = lc->local_cap;
281    } else {
282        st->args.conn_id = 0;
283        st->args.ep = NULL_CAP;
284    }
285
286    event_mutex_enqueue_lock(&b->mutex, &st->qnode,
287                             MKCLOSURE(send_bind_reply, st));
288}
289
290/**
291 * \brief Initialise a new LMP channel to accept an incoming binding request
292 *
293 * \param lc  Storage for channel state
294 * \param buflen_words Size of incoming buffer, in words
295 * \param endpoint Capability to remote LMP endpoint
296 */
297errval_t lmp_chan_accept(struct lmp_chan *lc,
298                         size_t buflen_words, struct capref endpoint)
299{
300    errval_t err;
301
302    lmp_chan_init(lc);
303    lc->remote_cap = endpoint;
304
305    /* allocate a cap slot for the new endpoint cap */
306    err = slot_alloc(&lc->local_cap);
307    if (err_is_fail(err)) {
308        return err_push(err, LIB_ERR_SLOT_ALLOC);
309    }
310
311    /* allocate a local endpoint */
312    err = lmp_endpoint_create_in_slot(buflen_words, lc->local_cap,
313                                      &lc->endpoint);
314    if (err_is_fail(err)) {
315        slot_free(lc->local_cap);
316        return err_push(err, LIB_ERR_ENDPOINT_CREATE);
317    }
318
319    /* mark connected */
320    lc->connstate = LMP_CONNECTED;
321    return SYS_ERR_OK;
322}
323
324/**
325 * \brief Initialise a new LMP CHannel endpoint to accept incoming messages
326 *
327 * \param lc  Storage for channel state
328 * \param buflen_words Size of incoming buffer, in words
329 * \param endpoint Slot to store the local endpoint in
330 * \param iftype    the type of interface used with this ep
331 */
332errval_t lmp_chan_endpoint_create_with_iftype(struct lmp_chan *lc, size_t buflen_words,
333                                              struct capref endpoint, uint16_t iftype)
334{
335    errval_t err;
336
337    lmp_chan_init(lc);
338    lc->local_cap = endpoint;
339
340    /* allocate a local endpoint */
341    err = lmp_endpoint_create_in_slot_with_iftype(buflen_words, lc->local_cap,
342                                                  &lc->endpoint, iftype);
343    if (err_is_fail(err)) {
344        return err_push(err, LIB_ERR_ENDPOINT_CREATE);
345    }
346
347    /* mark connected */
348    lc->connstate = LMP_EP_WAIT_CAP;
349    return SYS_ERR_OK;
350}
351/**
352 * \brief Initialise a new LMP CHannel endpoint to accept incoming messages
353 *
354 * \param lc  Storage for channel state
355 * \param buflen_words Size of incoming buffer, in words
356 * \param endpoint Slot to store the local endpoint in
357 */
358errval_t lmp_chan_endpoint_create(struct lmp_chan *lc, size_t buflen_words,
359                                  struct capref endpoint)
360{
361    return lmp_chan_endpoint_create_with_iftype(lc, buflen_words, endpoint, 0);
362}
363
364
365/**
366 * \brief Register an event handler to be notified when messages can be sent
367 *
368 * In the future, call the closure on the given waitset when it is likely that
369 * a message can be sent on the channel. A channel may only be registered
370 * with a single send event handler on a single waitset at any one time.
371 *
372 * \param lc LMP channel
373 * \param ws Waitset
374 * \param closure Event handler
375 */
376errval_t lmp_chan_register_send(struct lmp_chan *lc, struct waitset *ws,
377                                 struct event_closure closure)
378{
379    assert(lc != NULL);
380    assert(ws != NULL);
381
382    errval_t err = waitset_chan_register(ws, &lc->send_waitset, closure);
383    if (err_is_fail(err)) {
384        return err;
385    }
386
387    // enqueue in list of channels with a registered event to retry sending
388    assert(lc->next == NULL && lc->prev == NULL);
389    dispatcher_handle_t handle = disp_disable();
390    struct dispatcher_generic *dp = get_dispatcher_generic(handle);
391    if (dp->lmp_send_events_list == NULL) {
392        dp->lmp_send_events_list = lc;
393        lc->next = lc->prev = lc;
394    } else {
395        lc->prev = dp->lmp_send_events_list->prev;
396        lc->next = dp->lmp_send_events_list;
397        lc->prev->next = lc;
398        lc->next->prev = lc;
399    }
400    disp_enable(handle);
401
402    return err;
403}
404
405/**
406 * \brief Cancel an event registration made with lmp_chan_register_send()
407 *
408 * \param lc LMP channel
409 */
410errval_t lmp_chan_deregister_send(struct lmp_chan *lc)
411{
412    assert(lc != NULL);
413    errval_t err = waitset_chan_deregister(&lc->send_waitset);
414    if (err_is_fail(err)) {
415        return err;
416    }
417
418    // dequeue from list of channels with send events
419    assert(lc->next != NULL && lc->prev != NULL);
420    dispatcher_handle_t handle = disp_disable();
421    struct dispatcher_generic *dp = get_dispatcher_generic(handle);
422    if (lc->next == lc->prev) {
423        assert_disabled(dp->lmp_send_events_list == lc);
424        dp->lmp_send_events_list = NULL;
425    } else {
426        lc->prev->next = lc->next;
427        lc->next->prev = lc->prev;
428        if (dp->lmp_send_events_list == lc) {
429            dp->lmp_send_events_list = lc->next;
430        }
431    }
432#ifndef NDEBUG
433    lc->prev = lc->next = NULL;
434#endif
435
436    disp_enable(handle);
437    return err;
438}
439
440/**
441 * \brief Migrate an event registration to a new waitset.
442 *
443 * \param lc LMP channel
444 * \param ws New waitset to migrate to
445 */
446void lmp_chan_migrate_send(struct lmp_chan *lc, struct waitset *ws)
447{
448    assert(lc != NULL);
449    waitset_chan_migrate(&lc->send_waitset, ws);
450}
451
452/**
453 * \brief Allocate a new receive capability slot for an LMP channel
454 *
455 * This utility function allocates a new receive slot (using #slot_alloc)
456 * and sets it on the channel (using #lmp_chan_set_recv_slot).
457 *
458 * \param lc LMP channel
459 */
460errval_t lmp_chan_alloc_recv_slot(struct lmp_chan *lc)
461{
462    struct capref slot;
463
464    errval_t err = slot_alloc(&slot);
465    if (err_is_fail(err)) {
466        return err_push(err, LIB_ERR_SLOT_ALLOC);
467    }
468
469    lmp_chan_set_recv_slot(lc, slot);
470    return SYS_ERR_OK;
471}
472
473
474/**
475 * \brief Trigger send events for all LMP channels that are registered
476 *
477 * We don't have a good way to determine when we are likely to be able
478 * to send on an LMP channel, so this function just trigger all such
479 * pending events every time the dispatcher is rescheduled.
480 *
481 * Must be called while disabled and from dispatcher logic.
482 */
483void lmp_channels_retry_send_disabled(dispatcher_handle_t handle)
484{
485    struct dispatcher_generic *dp = get_dispatcher_generic(handle);
486    struct lmp_chan *lc, *first = dp->lmp_send_events_list, *next;
487    errval_t err;
488
489    for (lc = first; lc != NULL; lc = next) {
490        next = lc->next;
491        assert(next != NULL);
492        err = waitset_chan_trigger_disabled(&lc->send_waitset, handle);
493        assert_disabled(err_is_ok(err)); // shouldn't fail
494#ifndef NDEBUG
495        lc->next = lc->prev = NULL;
496#endif
497        if (next == first) {
498            break; // wrapped
499        }
500    }
501
502    dp->lmp_send_events_list = NULL;
503}
504
505/// Initialise the LMP channel driver
506void lmp_init(void)
507{
508    struct monitor_binding *mcb = get_monitor_binding();
509    mcb->rx_vtbl.bind_lmp_reply_client = bind_lmp_reply_handler;
510    mcb->rx_vtbl.bind_lmp_service_request = bind_lmp_service_request_handler;
511}
512