1/** 2 * \file 3 * \brief Tests for octopus publish/subscribe API 4 */ 5 6/* 7 * Copyright (c) 2011, ETH Zurich. 8 * All rights reserved. 9 * 10 * This file is distributed under the terms in the attached LICENSE file. 11 * If you do not find this file, copies can be found by writing to: 12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group. 13 */ 14 15#include <stdlib.h> 16#include <string.h> 17#include <stdio.h> 18 19#include <octopus/octopus.h> 20 21#include "common.h" 22 23static const char* barrier_name = "d2pubsub_test"; 24static struct thread_sem ts; 25 26static void message_handler(oct_mode_t mode, char* record, void* st) 27{ 28 size_t* received = (size_t*) st; 29 30 if (mode & OCT_ON_PUBLISH) { 31 static const char* receive_order[] = 32 { "msg_2", "msg_4", "msg_5", "msg_5", "msg_6", "msg_7" }; 33 char* name = NULL; 34 35 debug_printf("Message: %s received: %zx\n", record, *received); 36 37 errval_t err = oct_read(record, "%s", &name); 38 ASSERT_ERR_OK(err); 39 ASSERT_STRING(receive_order[*received], name); 40 41 42 free(name); 43 free(record); 44 } 45 else if (mode & OCT_REMOVED) { 46 debug_printf("OCT_REMOVED set...\n"); 47 } 48 49 (*received)++; 50} 51 52static void subscriber(void) 53{ 54 errval_t err; 55 subscription_t id1 = 0; 56 subscription_t id2 = 0; 57 subscription_t id3 = 0; 58 subscription_t id4 = 0; 59 size_t received = 0; 60 char* barrier_record = NULL; 61 62 thread_sem_init(&ts, 0); 63 64 err = oct_subscribe(message_handler, &received, &id1, "111 [] attr: 10 }"); 65 ASSERT_ERR(err, OCT_ERR_PARSER_FAIL); 66 67 err = oct_subscribe(message_handler, &received, &id1, 68 "_ { fl: 1.01, attr: 10 }"); 69 ASSERT_ERR_OK(err); 70 debug_printf("id is: %"PRIu64"\n", id1); 71 72 char* str = "test.txt"; 73 err = oct_subscribe(message_handler, &received, &id2, "_ { str: r'%s' }", 74 str); 75 ASSERT_ERR_OK(err); 76 debug_printf("id is: %"PRIu64"\n", id2); 77 78 err = oct_subscribe(message_handler, &received, &id3, "_ { age > %d }", 79 9); 80 ASSERT_ERR_OK(err); 81 debug_printf("id is: %"PRIu64"\n", id3); 82 83 err = oct_subscribe(message_handler, &received, &id4, 84 "r'^msg_(6|7)$'"); 85 ASSERT_ERR_OK(err); 86 debug_printf("id is: %"PRIu64"\n", id4); 87 88 // Synchronize with publisher 89 err = oct_barrier_enter(barrier_name, &barrier_record, 2); 90 if (err_is_fail(err)) DEBUG_ERR(err, "barrier enter"); 91 assert(err_is_ok(err)); 92 93 // Wait until all messages received 94 while(received != 6) { 95 messages_wait_and_handle_next(); 96 } 97 98 // Unsubscribe message handlers 99 err = oct_unsubscribe(id1); 100 ASSERT_ERR_OK(err); 101 err = oct_unsubscribe(id2); 102 ASSERT_ERR_OK(err); 103 err = oct_unsubscribe(id3); 104 ASSERT_ERR_OK(err); 105 err = oct_unsubscribe(id4); 106 ASSERT_ERR_OK(err); 107 108 while(received != 10) { 109 messages_wait_and_handle_next(); 110 } 111 112 oct_barrier_leave(barrier_record); 113 free(barrier_record); 114 115 printf("Subscriber all done.\n"); 116} 117 118static void publisher(void) 119{ 120 errval_t err; 121 char* barrier_record = NULL; 122 123 // Synchronize with subscriber 124 err = oct_barrier_enter(barrier_name, &barrier_record, 2); 125 if (err_is_fail(err)) DEBUG_ERR(err, "barrier enter"); 126 assert(err_is_ok(err)); 127 128 err = oct_publish("msg_1 { age: %d }", 9); 129 ASSERT_ERR_OK(err); 130 131 err = oct_publish("msg_2 { age: %d }", 10); 132 ASSERT_ERR_OK(err); 133 134 err = oct_publish("msg_3 { str: %d, age: '%d' }", 123, 8); 135 ASSERT_ERR_OK(err); 136 137 err = oct_publish("msg_4 { str: 'test.txt' }"); 138 ASSERT_ERR_OK(err); 139 140 err = oct_publish("msg_5 { str: 'test.txt', attr: 10, fl: 1.01 }"); 141 ASSERT_ERR_OK(err); 142 143 err = oct_publish("msg_6 { type: 'test', pattern: '123123' }"); 144 ASSERT_ERR_OK(err); 145 146 err = oct_publish("msg_7 { type: 'test' }"); 147 ASSERT_ERR_OK(err); 148 149 oct_barrier_leave(barrier_record); 150 free(barrier_record); 151 152 printf("Publisher all done.\n"); 153} 154 155int main(int argc, char** argv) 156{ 157 oct_init(); 158 assert(argc >= 2); 159 160 if (strcmp(argv[1], "subscriber") == 0) { 161 subscriber(); 162 } else if (strcmp(argv[1], "publisher") == 0) { 163 publisher(); 164 } else { 165 printf("Bad arguments (Valid choices are subscriber/publisher)."); 166 } 167 168 return EXIT_SUCCESS; 169} 170 171