1/*
2 * Copyright (c) 2011, ETH Zurich.
3 * All rights reserved.
4 *
5 * This file is distributed under the terms in the attached LICENSE file.
6 * If you do not find this file, copies can be found by writing to:
7 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
8 */
9
10#include <stdlib.h>
11#include <assert.h>
12#include <string.h>
13#include <stdio.h>
14
15#include "barrelfish/barrelfish.h"
16#include "thc/thc.h"
17#include "thc/thcstubs.h"
18
19#define DEBUGPRINTF debug_printf
20
21#define DEBUG_STUBS(XX)
22//#define DEBUG_STUBS(XX) do{ XX; } while (0)
23#define DEBUG_STUBS_PREFIX         "         stubs:   "
24
25//......................................................................
26//
27// Initialization
28
29void thc_init_per_binding_state(struct thc_per_binding_state_t *thc) {
30  thc_lock_init(&thc->thc_binding_lock);
31  thc_sem_init(&thc->thc_next_sender, 1);
32  thc->send_possible_event_requested = 0;
33  thc->waiting_sender = NULL;
34  thc->waiting_complete_sender = NULL;
35}
36
37void thc_init_per_recv_state(struct thc_per_recv_t *recv) {
38  recv->r = NULL;
39  thc_condvar_init(&recv->cv_bh);
40  recv->num_bh = 0;
41  recv->num_discard = 0;
42  thc_queue_init(&recv->fifo_rpc_q);
43  thc_lock_init(&recv->fifo_rpc_lock);
44  recv->fifo_rpc_next_recv = 0;
45}
46
47//......................................................................
48//
49// Bottom-half receive functions
50//
51// The bottom-half stubs work as follows:
52//
53//   1. Call thc_start_bh/thc_start_demuxable_bh.  This checks whether
54//      the call has been abandoned (no receive entry present), whether
55//      the call is in progress (receive entry present, not yet waiting),
56//      or whether the top-half is ready.
57//
58//      NULL => abandoned
59//
60//      Otherwise, this returns with the receiver, and with thc_binding_lock
61//      still held.
62//
63//   2. The Flounder-generated code deposits the parameters in the locations
64//      identified by the receiver
65//
66//   3. Call thc_end_bh to release thc_binding_lock.
67
68struct thc_receiver_info *thc_start_demuxable_bh(struct thc_per_binding_state_t *thc,
69                                                 void *common_binding,
70                                                 struct thc_per_recv_t *recv,
71                                                 uint64_t demux) {
72  assert(demux != NO_DEMUX);
73  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > start_demux_bh\n"));
74  thc_lock_acquire(&thc->thc_binding_lock);
75
76  bool found;
77  struct thc_receiver_info *rxi;
78  do {
79    found = false;
80    rxi = recv->r;
81    while (rxi != NULL) {
82      assert(rxi->demux != NO_DEMUX);
83      if (rxi->demux == demux) {
84        // Wait until the receiver is ready (the response to an OOO RPC
85        // may come back after they've done the send but before they
86        // block in receive).
87        while (rxi->waiter == NULL) {
88          recv->num_bh++;
89          thc_condvar_wait(&recv->cv_bh, &thc->thc_binding_lock);
90          recv->num_bh--;
91        }
92        assert(rxi->waiter != NULL);
93        found = true;
94        break;
95      }
96      rxi = rxi->next;
97    }
98    // No receiver entry present: they were canceled
99    if (!found) {
100      thc_lock_release(&thc->thc_binding_lock);
101      DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < start_demux_bh\n"));
102      return NULL;
103    }
104  } while (!found);
105
106  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < start_demux_bh\n"));
107  return rxi;
108}
109
110
111//......................................................................
112//
113// Top-half receive functions
114
115// Simple receive: add rxi as the sole thc_receiver_info for this
116// message, and wait until a bottom-half wakes us via rxi->waiter.
117
118static void thc_receive0(void *s) {
119  thc_lock_t *l = (thc_lock_t*)s;
120  thc_lock_release(l);
121}
122
123errval_t thc_receive(struct thc_per_binding_state_t *thc,
124                     struct thc_per_recv_t *recv,
125                     struct thc_receiver_info *rxi)
126{
127  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_receive\n"));
128  thc_lock_acquire(&thc->thc_binding_lock);
129
130  assert(recv->r == NULL && "thc_receiver found existing receiver");
131
132  // Install us as the receiver for this message
133  rxi->next = NULL;
134  rxi->demux = NO_DEMUX;
135  recv->r = rxi;
136
137  // Wake any bottom-half functions that are present
138  if (recv->num_bh > 0) {
139    thc_condvar_broadcast(&recv->cv_bh);
140  }
141
142  // Wait until a bottom-half provides a message
143  //
144  // We release the binding lock before blocking.  It is passed back to us
145  // by the bottom-half receive function.
146  THCSuspendThen(&rxi->waiter, thc_receive0, (void*) &thc->thc_binding_lock);
147
148  // Remove us as the receiver
149  assert(recv->r == rxi);
150  recv->r = NULL;
151
152  thc_lock_release(&thc->thc_binding_lock);
153
154  THCIncRecvCount();
155
156  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_receive\n"));
157  return 0;
158}
159
160struct thc_receive_cancel_info {
161  struct thc_per_binding_state_t *thc;
162  struct thc_receiver_info       *rxi;
163  int                             was_canceled;
164};
165
166static void thc_receive_x_cancel_fn(void *c) {
167  struct thc_receive_cancel_info *cinf = (struct thc_receive_cancel_info *)c;
168  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_receive_x_cancel_fn\n"));
169  thc_lock_acquire(&cinf->thc->thc_binding_lock);
170  assert(!cinf->was_canceled);
171  if (cinf->rxi->waiter == NULL) {
172    // We lost a race with an incoming message
173    thc_lock_release(&cinf->thc->thc_binding_lock);
174  } else {
175    // Cancellation got the thc_binding_lock before an
176    // incoming message arrived.
177    cinf->was_canceled = 1;
178    awe_t *awe = cinf->rxi->waiter;
179    cinf->rxi->waiter = NULL;
180    THCYieldTo(awe); // thc->thc_binding_lock passed to top-half function
181  }
182  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_receive_x_cancel_fn\n"));
183}
184
185errval_t thc_receive_x(struct thc_per_binding_state_t *thc,
186                       struct thc_per_recv_t *recv,
187                       struct thc_receiver_info *rxi)
188{
189  struct thc_receive_cancel_info cinf;
190  cancel_item_t ci;
191  int canceled = 0;
192
193  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_receive_x\n"));
194
195  thc_lock_acquire(&thc->thc_binding_lock);
196
197  // Return THC_CANCELED if already requested
198  if (THCIsCancelRequested()) {
199    canceled = 1;
200    goto done;
201  }
202
203  assert(recv->r == NULL && "thc_receiver found existing receiver");
204
205  // Install us as the receiver for this message
206  rxi->next = NULL;
207  rxi->demux = NO_DEMUX;
208  recv->r = rxi;
209
210  // Wake any bottom-half functions that are present
211  if (recv->num_bh > 0) {
212    thc_condvar_broadcast(&recv->cv_bh);
213  }
214
215  // Wait until a bottom-half provides a message
216  //
217  // We release the binding lock before blocking.  It is passed back to us
218  // by the bottom-half receive function or the cancelation function.
219  cinf.thc = thc;
220  cinf.rxi = rxi;
221  cinf.was_canceled = 0;
222  THCAddCancelItem(&ci, &thc_receive_x_cancel_fn, (void*)&cinf);
223  THCSuspendThen(&rxi->waiter, thc_receive0, (void*) &thc->thc_binding_lock);
224  canceled = cinf.was_canceled;
225  if (!canceled) {
226    // Remove cancel item if it did not run
227    if (!THCCancelItemRan(&ci)) {
228      THCRemoveCancelItem(&ci);
229    }
230  }
231
232  // Remove us as the receiver
233  assert(recv->r == rxi);
234  recv->r = NULL;
235
236 done:
237  thc_lock_release(&thc->thc_binding_lock);
238
239  if (!canceled) THCIncRecvCount();
240
241  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_receive_x\n"));
242  return canceled ? THC_CANCELED : SYS_ERR_OK;
243}
244
245// Cause the next "n" messages to be discarded on receipt
246void thc_discard(struct thc_per_binding_state_t *thc,
247                 struct thc_per_recv_t *recv,
248                 uint64_t n)
249{
250  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_discard\n"));
251  thc_lock_acquire(&thc->thc_binding_lock);
252  recv->num_discard += n;
253  if (recv->num_bh > 0) {
254    thc_condvar_broadcast(&recv->cv_bh);
255  }
256  thc_lock_release(&thc->thc_binding_lock);
257    DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_discard\n"));
258}
259
260// Demux receive: add rxi to a list of receivers, and wait until
261// a bottom-half wakes us.
262
263void thc_start_receive_demux(struct thc_per_binding_state_t *thc,
264                             struct thc_per_recv_t *recv,
265                             struct thc_receiver_info *rxi) {
266  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_start_receive_demux\n"));
267  thc_lock_acquire(&thc->thc_binding_lock);
268
269  // Install us on the list of receivers
270  assert(rxi->demux != NO_DEMUX);
271  assert(rxi->waiter == NULL);
272  rxi->next = recv->r;
273  recv->r = rxi;
274
275  thc_lock_release(&thc->thc_binding_lock);
276  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_start_recieve_demux\n"));
277}
278
279errval_t thc_receive_demux(struct thc_per_binding_state_t *thc,
280                           struct thc_per_recv_t *recv,
281                           struct thc_receiver_info *rxi) {
282  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_receive_demux\n"));
283  thc_lock_acquire(&thc->thc_binding_lock);
284
285  // Wake any bottom-half functions that are present
286  if (recv->num_bh > 0) {
287    thc_condvar_broadcast(&recv->cv_bh);
288  }
289
290  // Wait until a bottom-half provides a message
291  //
292  // We release the binding lock before blocking.  It is passed back to us
293  // by the bottom-half receive function.
294
295  THCSuspendThen(&rxi->waiter, thc_receive0, (void*) &thc->thc_binding_lock);
296
297  // Remove us from the list of receivers
298  struct thc_receiver_info **rxip = (struct thc_receiver_info **) &(recv->r);
299  bool found = false;
300  while (*rxip != NULL) {
301    if (*rxip == rxi) {
302      *rxip = rxi -> next;
303      found = true;
304      break;
305    }
306    rxip = &((*rxip)->next);
307  }
308  assert(found);
309
310  thc_lock_release(&thc->thc_binding_lock);
311
312  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_receive_demux\n"));
313  return SYS_ERR_OK;
314}
315
316errval_t thc_receive_demux_x(struct thc_per_binding_state_t *thc,
317                             struct thc_per_recv_t *recv,
318                             struct thc_receiver_info *rxi) {
319  struct thc_receiver_info **rxip;
320  struct thc_receive_cancel_info cinf;
321  cancel_item_t ci;
322  int canceled = 0;
323
324  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_receive_demux_x\n"));
325  thc_lock_acquire(&thc->thc_binding_lock);
326
327  // Return THC_CANCELED if already requested
328  if (THCIsCancelRequested()) {
329    canceled = 1;
330    goto done;
331  }
332
333  // Wake any bottom-half functions that are present
334  if (recv->num_bh > 0) {
335    thc_condvar_broadcast(&recv->cv_bh);
336  }
337
338  // Wait until a bottom-half provides a message
339  //
340  // We release the binding lock before blocking.  It is passed back to us
341  // by the bottom-half receive function.
342  cinf.thc = thc;
343  cinf.rxi = rxi;
344  cinf.was_canceled = 0;
345  THCAddCancelItem(&ci, &thc_receive_x_cancel_fn, (void*)&cinf);
346  THCSuspendThen(&rxi->waiter, thc_receive0, (void*) &thc->thc_binding_lock);
347  canceled = cinf.was_canceled;
348  if (!canceled) {
349    // Remove cancel item if it did not run
350    if (!THCCancelItemRan(&ci)) {
351      THCRemoveCancelItem(&ci);
352    }
353  }
354
355 done:
356  // Remove us from the list of receivers
357  rxip = (struct thc_receiver_info **) &(recv->r);
358  bool found = false;
359  while (*rxip != NULL) {
360    if (*rxip == rxi) {
361      *rxip = rxi -> next;
362      found = true;
363      break;
364    }
365    rxip = &((*rxip)->next);
366  }
367  assert(found);
368
369  thc_lock_release(&thc->thc_binding_lock);
370
371  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_receive_demux_x\n"));
372  return canceled ? THC_CANCELED : SYS_ERR_OK;
373}
374
375errval_t thc_cancel_receive_demux(struct thc_per_binding_state_t *thc,
376                                  struct thc_per_recv_t *recv,
377                                  struct thc_receiver_info *rxi) {
378  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_cancel_receive_demux\n"));
379  thc_lock_acquire(&thc->thc_binding_lock);
380
381  // Remove us from the list of receivers
382  struct thc_receiver_info **rxip = (struct thc_receiver_info **) &(recv->r);
383  bool found = false;
384  while (*rxip != NULL) {
385    if (*rxip == rxi) {
386      *rxip = rxi -> next;
387      found = true;
388      break;
389    }
390    rxip = &((*rxip)->next);
391  }
392  assert(found);
393
394  thc_lock_release(&thc->thc_binding_lock);
395
396  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_cancel_receive_demux\n"));
397  return THC_CANCELED;
398}
399
400//......................................................................
401
402// A receive-any works as a series of steps:
403//
404//  1. Acquire the thc_binding_lock
405//
406//  2. Add ourselves as the sole receiver for each message we are
407//     interested in
408//
409//  3. Block on rxi->cv
410//
411//  4. Remove ourselves from the messages we are interested in
412//
413//  5. Release the thc_binding_lock
414//
415// The flounder-generated code constructs this series of calls,
416// doing steps 2 and 4 for different messages.
417
418void thc_start_receive_any(struct thc_per_binding_state_t *thc) {
419  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_start_receive_any\n"));
420  thc_lock_acquire(&thc->thc_binding_lock);
421  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_start_receive_any\n"));
422}
423
424void thc_start_receiving(struct thc_per_binding_state_t *thc,
425                         struct thc_per_recv_t *recv,
426                         struct thc_receiver_info *rxi) {
427  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_start_receiving\n"));
428  // Install ourselves to receive the message
429  assert(recv->r == NULL &&
430         "receive_any attempted for message with pending receive");
431  recv->r = rxi;
432  rxi->next = NULL;
433  rxi->demux = NO_DEMUX;
434
435  // Wake any bottom-half functions present for this message
436  if (recv->num_bh > 0) {
437    thc_condvar_signal(&recv->cv_bh);
438  }
439  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_start_receiving\n"));
440}
441
442static void thc_wait_receive_any0(void *s) {
443  thc_lock_t *l = (thc_lock_t*)s;
444  thc_lock_release(l);
445}
446
447void thc_wait_receive_any(struct thc_per_binding_state_t *thc,
448                          struct thc_receiver_info *rxi) {
449  // We release the binding lock before blocking.  It is passed back to us
450  // by the bottom-half receive function.
451  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_wait_receive_any\n"));
452  THCSuspendThen(&rxi->waiter,
453                 thc_wait_receive_any0,
454                 (void*)&thc->thc_binding_lock);
455  THCIncRecvCount();
456  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_wait_receive_any\n"));
457}
458
459errval_t thc_wait_receive_any_x(struct thc_per_binding_state_t *thc,
460                                struct thc_receiver_info *rxi) {
461  struct thc_receive_cancel_info cinf;
462  cancel_item_t ci;
463  int canceled = 0;
464  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_wait_receive_any_x\n"));
465  // We release the binding lock before blocking.  It is passed back to us
466  // by the bottom-half receive function, or by the cancel action.
467  cinf.thc = thc;
468  cinf.rxi = rxi;
469  cinf.was_canceled = 0;
470  THCAddCancelItem(&ci, &thc_receive_x_cancel_fn, (void*)&cinf);
471  THCSuspendThen(&rxi->waiter,
472                 thc_wait_receive_any0,
473                 (void*)&thc->thc_binding_lock);
474  canceled = cinf.was_canceled;
475  if (!canceled) {
476    // Remove cancel item if it did not run
477    if (!THCCancelItemRan(&ci)) {
478      THCRemoveCancelItem(&ci);
479    }
480  }
481  if (!canceled) THCIncRecvCount();
482  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_wait_receive_any\n"));
483  return canceled ? THC_CANCELED : SYS_ERR_OK;
484}
485
486void thc_stop_receiving(struct thc_per_binding_state_t *thc,
487
488                        struct thc_per_recv_t *recv,
489                        struct thc_receiver_info *rxi) {
490  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_stop_receiving\n"));
491  assert(recv->r == rxi);
492  assert(rxi->next == NULL);
493  recv->r = NULL;
494  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_stop_receiving\n"));
495}
496
497void thc_end_receive_any(struct thc_per_binding_state_t *thc) {
498  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_end_receive_any\n"));
499  thc_lock_release(&thc->thc_binding_lock);
500  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_end_receive_any\n"));
501}
502
503//......................................................................
504//
505// Synchronization between senders
506
507struct thc_await_send_cancel_info {
508  struct thc_per_binding_state_t *thc;
509  int                             was_canceled;
510};
511
512static void thc_send_possible_event(void *arg) {
513  struct common_binding *b = (struct common_binding*) arg;
514  struct thc_per_binding_state_t *thc;
515  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_send_possible_event\n"));
516  thc = (struct thc_per_binding_state_t *)(b->st);
517  thc_lock_acquire(&thc->thc_binding_lock);
518  thc->send_possible_event_requested = 0;
519  awe_t *awe = thc->waiting_sender;
520  if (awe != NULL) {
521    // Sender waiting
522    thc->waiting_sender = NULL;
523    THCSchedule(awe); // thc_binding_lock passed to sender
524  } else {
525    // No sender waiting (because they were canceled)
526    thc_lock_release(&thc->thc_binding_lock);
527  }
528  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_send_possible_event\n"));
529}
530
531static void thc_await_send_x_cancel_fn(void *c) {
532  struct thc_await_send_cancel_info *cinf = (struct thc_await_send_cancel_info*)c;
533  struct thc_per_binding_state_t *thc = cinf->thc;
534  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_await_send_x_cancel_fn\n"));
535  thc_lock_acquire(&thc->thc_binding_lock);
536  if (thc->waiting_sender == NULL) {
537    // We lost a race with an incoming send_possible event
538    thc_lock_release(&thc->thc_binding_lock);
539  } else {
540    // Cancellation got the thc_binding_lock before an
541    // incoming send_possible event
542    assert(!cinf->was_canceled);
543    cinf->was_canceled = 1;
544    awe_t *awe = cinf->thc->waiting_sender;
545    cinf->thc->waiting_sender = NULL;
546    THCYieldTo(awe); // thc->thc_binding_lock passed to await_send_x;
547  }
548  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_await_send_x_cancel_fn\n"));
549}
550
551static void thc_await_send0(void *s) {
552  thc_lock_t *l = (thc_lock_t*)s;
553  thc_lock_release(l);
554}
555
556
557void thc_await_send(struct thc_per_binding_state_t *thc,
558                    void *f) {
559  struct common_binding *c = (struct common_binding *)f;
560  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_await_send\n"));
561  // Synchronize with thc_send_possible_event callback
562  thc_lock_acquire(&thc->thc_binding_lock);
563
564  // Request an event when sending is possible
565  if (!thc->send_possible_event_requested) {
566    errval_t err = c->register_send(c,
567                                    get_default_waitset(),
568                                    MKCONT(thc_send_possible_event, c));
569    if (err == FLOUNDER_ERR_TX_BUSY) {
570      goto done;
571    }
572
573    assert(err_is_ok(err));
574    thc->send_possible_event_requested = 1;
575  }
576
577  // Wait
578  //
579  // We release the binding lock before blocking.  It is passed back to us
580  // by the notification
581
582  THCSuspendThen(&thc->waiting_sender,
583                 thc_await_send0,
584                 (void*) &thc->thc_binding_lock);
585
586 done:
587  thc_lock_release(&thc->thc_binding_lock);
588  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_await_send\n"));
589}
590
591errval_t thc_await_send_x(struct thc_per_binding_state_t *thc,
592                          void *f) {
593  struct thc_await_send_cancel_info cinf;
594  cancel_item_t ci;
595  int canceled = 0;
596  struct common_binding *c = (struct common_binding *)f;
597  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_await_send_x\n"));
598
599  // Synchronize with thc_send_possible_event callback
600  thc_lock_acquire(&thc->thc_binding_lock);
601
602  // Return THC_CANCELED if already requested
603  if (THCIsCancelRequested()) {
604    canceled = 1;
605    goto done;
606  }
607
608  // Request an event when sending is possible
609  if (!thc->send_possible_event_requested) {
610    errval_t err = c->register_send(c,
611                                    get_default_waitset(),
612                                    MKCONT(thc_send_possible_event, c));
613    if (err == FLOUNDER_ERR_TX_BUSY) {
614      goto done;
615    }
616
617    assert(err_is_ok(err));
618    thc->send_possible_event_requested = 1;
619  }
620
621  // Wait
622  //
623  // We release the binding lock before blocking.  It is passed back to us
624  // by the notification
625
626  cinf.thc = thc;
627  cinf.was_canceled = 0;
628  THCAddCancelItem(&ci, &thc_await_send_x_cancel_fn, (void*)&cinf);
629  THCSuspendThen(&thc->waiting_sender,
630                 thc_await_send0,
631                 (void*) &thc->thc_binding_lock);
632  canceled = cinf.was_canceled;
633  if (!canceled) {
634    // Remove cancel item if it did not run
635    if (!THCCancelItemRan(&ci)) {
636      THCRemoveCancelItem(&ci);
637    }
638  }
639
640 done:
641  thc_lock_release(&thc->thc_binding_lock);
642
643  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_await_send\n"));
644  return canceled ? THC_CANCELED : SYS_ERR_OK;
645}
646
647// Send-completion callback.  This synchronizes with thc_complete_send
648// so that a THC synchronous send does not return until the
649// message has actually been sent.
650
651void thc_complete_send_cb(void *f) {
652  struct common_binding *c = (struct common_binding *)f;
653  struct thc_per_binding_state_t *thc;
654  thc = (struct thc_per_binding_state_t *)(c->st);
655  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_complete_send_cb\n"));
656
657  thc_lock_acquire(&thc->thc_binding_lock);
658
659  if (thc->waiting_complete_sender) {
660    awe_t *awe = thc->waiting_complete_sender;
661    thc->waiting_complete_sender = NULL;
662    THCSchedule(awe); // thc->thc_binding_lock passed to thc_complete_send
663  } else {
664    thc->thc_send_complete = 1;
665    thc_lock_release(&thc->thc_binding_lock);
666  }
667  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_complete_send_cb\n"));
668}
669
670static void thc_complete_send0(void *s) {
671  thc_lock_t *l = (thc_lock_t*)s;
672  thc_lock_release(l);
673}
674
675void thc_complete_send(struct thc_per_binding_state_t *thc,
676                       void *f) {
677  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " > thc_complete_send\n"));
678  // There is at most one sender, so if we are waiting to complete then
679  // nobody else should be waiting for a can-send callback
680  assert(thc->waiting_sender == NULL);
681
682  thc_lock_acquire(&thc->thc_binding_lock);
683
684  if (!thc->thc_send_complete) {
685    // Send completion callback has not yet executed: wait
686    THCSuspendThen(&thc->waiting_complete_sender,
687                   thc_complete_send0,
688                   (void*) &thc->thc_binding_lock);
689  }
690
691  thc_lock_release(&thc->thc_binding_lock);
692  DEBUG_STUBS(DEBUGPRINTF(DEBUG_STUBS_PREFIX " < thc_complete_send\n"));
693}
694
695
696
697
698