1/** 2 * \file 3 * \brief Publish/Subscribe client API implementation 4 * 5 * The individual handler functions are stored in a function table on the 6 * client side. The API provides convenience functions for subscribe/ 7 * unsubscribe and publish. 8 * 9 */ 10 11/* 12 * Copyright (c) 2011, ETH Zurich. 13 * All rights reserved. 14 * 15 * This file is distributed under the terms in the attached LICENSE file. 16 * If you do not find this file, copies can be found by writing to: 17 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group. 18 */ 19 20#include <assert.h> 21 22#include <barrelfish/barrelfish.h> 23#include <barrelfish/threads.h> 24 25#include <if/octopus_defs.h> 26#include <if/octopus_thc.h> 27 28#include <octopus/init.h> 29#include <octopus/pubsub.h> 30 31#include "common.h" 32#include "handler.h" 33 34void subscription_handler(struct octopus_binding *b, subscription_t id, 35 uint64_t fn, octopus_mode_t mode, const char *record, 36 uint64_t st) 37{ 38 39 // XXX: Probably send some offset around and use 32bit in flounder? 40 // XXX: The casting to uintptr_t is for 32-bit archs 41 subscription_handler_fn handler_fn = (subscription_handler_fn)(uintptr_t)fn; 42 void* state = (void*)(uintptr_t)st; 43 44 if (handler_fn != NULL) { 45 handler_fn(mode, record, state); 46 } 47 else { 48 fprintf(stderr, "Incoming subscription(%"PRIu64") for %s with unset handler function.", 49 id, record); 50 } 51} 52 53/** 54 * \brief Subscribe for a given type of message. 55 * 56 * \param[in] function Handler function in case a matching record is 57 * published. 58 * \param[in] state State passed on to handler function. 59 * \param[out] id Id of the subscription. In case of the value is undefined. 60 * \param query What type of records you want to subscribe. 61 * \param ... Additional arguments to format the record using vsprintf. 62 * 63 * \retval SYS_ERR_OK 64 * \retval OCT_ERR_MAX_SUBSCRIPTIONS 65 * \retval OCT_ERR_PARSER_FAIL 66 * \retval OCT_ERR_ENGINE_FAIL 67 */ 68errval_t oct_subscribe(subscription_handler_fn function, const void *state, 69 subscription_t *id, const char *query, ...) 70{ 71 assert(function != NULL); 72 assert(query != NULL); 73 assert(id != NULL); 74 75 va_list args; 76 errval_t err = SYS_ERR_OK; 77 78 char* buf = NULL; 79 FORMAT_QUERY(query, args, buf); 80 81 // send to skb 82 struct octopus_thc_client_binding_t* cl = oct_get_thc_client(); 83 errval_t error_code; 84 85 uint64_t fl_function = 0; 86 uint64_t fl_state = 0; 87 88 fl_function = (uint64_t)(uintptr_t)function; 89 fl_state = (uint64_t)(uintptr_t)state; 90 91 err = cl->call_seq.subscribe(cl, buf, fl_function, 92 fl_state, id, &error_code); // XXX: Sending Pointer as uint64 93 if (err_is_ok(err)) { 94 err = error_code; 95 } 96 97 free(buf); 98 return err; 99} 100 101/** 102 * \brief Unsubscribes a subscription. 103 * 104 * \param id Id of the subscription (as provided by oct_subscribe). 105 * 106 * \retval SYS_ERR_OK 107 * \retval OCT_ERR_PARSER_FAIL 108 * \retval OCT_ERR_ENGINE_FAIL 109 */ 110errval_t oct_unsubscribe(subscription_t id) 111{ 112 // send to skb 113 struct octopus_thc_client_binding_t* cl = oct_get_thc_client(); 114 errval_t error_code; 115 errval_t err = cl->call_seq.unsubscribe(cl, id, &error_code); 116 if (err_is_ok(err)) { 117 err = error_code; 118 } 119 120 return err; 121} 122 123/** 124 * \brief Publishes a record. 125 * 126 * \param record The record to publish. 127 * \param ... Additional arguments to format the record using vsprintf. 128 * 129 * \retval SYS_ERR_OK 130 * \retval OCT_ERR_PARSER_FAIL 131 * \retval OCT_ERR_ENGINE_FAIL 132 */ 133errval_t oct_publish(const char *record, ...) 134{ 135 assert(record != NULL); 136 137 va_list args; 138 errval_t err = SYS_ERR_OK; 139 140 char *buf = NULL; 141 FORMAT_QUERY(record, args, buf); 142 143 struct octopus_thc_client_binding_t* cl = oct_get_thc_client(); 144 errval_t error_code; 145 err = cl->call_seq.publish(cl, buf, &error_code); 146 if(err_is_ok(err)) { 147 err = error_code; 148 } 149 150 free(buf); 151 return err; 152} 153