1/**
2 * \file
3 * \brief Publish/Subscribe client API implementation
4 *
5 * The individual handler functions are stored in a function table on the
6 * client side. The API provides convenience functions for subscribe/
7 * unsubscribe and publish.
8 *
9 */
10
11/*
12 * Copyright (c) 2011, ETH Zurich.
13 * All rights reserved.
14 *
15 * This file is distributed under the terms in the attached LICENSE file.
16 * If you do not find this file, copies can be found by writing to:
17 * ETH Zurich D-INFK, Universitaetstrasse 6, CH-8092 Zurich. Attn: Systems Group.
18 */
19
20#include <assert.h>
21
22#include <barrelfish/barrelfish.h>
23#include <barrelfish/threads.h>
24
25#include <if/octopus_defs.h>
26#include <if/octopus_thc.h>
27
28#include <octopus/init.h>
29#include <octopus/pubsub.h>
30
31#include "common.h"
32#include "handler.h"
33
34void subscription_handler(struct octopus_binding *b, subscription_t id,
35        uint64_t fn, octopus_mode_t mode, const char *record,
36        uint64_t st)
37{
38
39    // XXX: Probably send some offset around and use 32bit in flounder?
40    // XXX: The casting to uintptr_t is for 32-bit archs
41    subscription_handler_fn handler_fn = (subscription_handler_fn)(uintptr_t)fn;
42    void* state = (void*)(uintptr_t)st;
43
44    if (handler_fn != NULL) {
45        handler_fn(mode, record, state);
46    }
47    else {
48        fprintf(stderr, "Incoming subscription(%"PRIu64") for %s with unset handler function.",
49                id, record);
50    }
51}
52
53/**
54 * \brief Subscribe for a given type of message.
55 *
56 * \param[in] function Handler function in case a matching record is
57 * published.
58 * \param[in] state State passed on to handler function.
59 * \param[out] id Id of the subscription. In case of the value is undefined.
60 * \param query What type of records you want to subscribe.
61 * \param ... Additional arguments to format the record using vsprintf.
62 *
63 * \retval SYS_ERR_OK
64 * \retval OCT_ERR_MAX_SUBSCRIPTIONS
65 * \retval OCT_ERR_PARSER_FAIL
66 * \retval OCT_ERR_ENGINE_FAIL
67 */
68errval_t oct_subscribe(subscription_handler_fn function, const void *state,
69        subscription_t *id, const char *query, ...)
70{
71    assert(function != NULL);
72    assert(query != NULL);
73    assert(id != NULL);
74
75    va_list args;
76    errval_t err = SYS_ERR_OK;
77
78    char* buf = NULL;
79    FORMAT_QUERY(query, args, buf);
80
81    // send to skb
82    struct octopus_thc_client_binding_t* cl = oct_get_thc_client();
83    errval_t error_code;
84
85    uint64_t fl_function = 0;
86    uint64_t fl_state = 0;
87
88    fl_function = (uint64_t)(uintptr_t)function;
89    fl_state = (uint64_t)(uintptr_t)state;
90
91    err = cl->call_seq.subscribe(cl, buf, fl_function,
92            fl_state, id, &error_code); // XXX: Sending Pointer as uint64
93    if (err_is_ok(err)) {
94        err = error_code;
95    }
96
97    free(buf);
98    return err;
99}
100
101/**
102 * \brief Unsubscribes a subscription.
103 *
104 * \param id Id of the subscription (as provided by oct_subscribe).
105 *
106 * \retval SYS_ERR_OK
107 * \retval OCT_ERR_PARSER_FAIL
108 * \retval OCT_ERR_ENGINE_FAIL
109 */
110errval_t oct_unsubscribe(subscription_t id)
111{
112    // send to skb
113    struct octopus_thc_client_binding_t* cl = oct_get_thc_client();
114    errval_t error_code;
115    errval_t err = cl->call_seq.unsubscribe(cl, id, &error_code);
116    if (err_is_ok(err)) {
117        err = error_code;
118    }
119
120    return err;
121}
122
123/**
124 * \brief Publishes a record.
125 *
126 * \param record The record to publish.
127 * \param ... Additional arguments to format the record using vsprintf.
128 *
129 * \retval SYS_ERR_OK
130 * \retval OCT_ERR_PARSER_FAIL
131 * \retval OCT_ERR_ENGINE_FAIL
132 */
133errval_t oct_publish(const char *record, ...)
134{
135    assert(record != NULL);
136
137    va_list args;
138    errval_t err = SYS_ERR_OK;
139
140    char *buf = NULL;
141    FORMAT_QUERY(record, args, buf);
142
143    struct octopus_thc_client_binding_t* cl = oct_get_thc_client();
144    errval_t error_code;
145    err = cl->call_seq.publish(cl, buf, &error_code);
146    if(err_is_ok(err)) {
147        err = error_code;
148    }
149
150    free(buf);
151    return err;
152}
153