1/**
2 * \file
3 * \brief Tests for octopus publish/subscribe API
4 */
5
6/*
7 * Copyright (c) 2011, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <stdlib.h>
16#include <string.h>
17#include <stdio.h>
18
19#include <octopus/octopus.h>
20
21#include "common.h"
22
23static const char* barrier_name = "d2pubsub_test";
24static struct thread_sem ts;
25
26static void message_handler(oct_mode_t mode, char* record, void* st)
27{
28    size_t* received = (size_t*) st;
29
30    if (mode & OCT_ON_PUBLISH) {
31        static const char* receive_order[] =
32        { "msg_2", "msg_4", "msg_5", "msg_5", "msg_6", "msg_7" };
33        char* name = NULL;
34
35        debug_printf("Message: %s received: %zx\n", record, *received);
36
37        errval_t err = oct_read(record, "%s", &name);
38        ASSERT_ERR_OK(err);
39        ASSERT_STRING(receive_order[*received], name);
40
41
42        free(name);
43        free(record);
44    }
45    else if (mode & OCT_REMOVED) {
46        debug_printf("OCT_REMOVED set...\n");
47    }
48
49    (*received)++;
50}
51
52static void subscriber(void)
53{
54    errval_t err;
55    subscription_t id1 = 0;
56    subscription_t id2 = 0;
57    subscription_t id3 = 0;
58    subscription_t id4 = 0;
59    size_t received = 0;
60    char* barrier_record = NULL;
61
62    thread_sem_init(&ts, 0);
63
64    err = oct_subscribe(message_handler, &received, &id1, "111 [] attr: 10 }");
65    ASSERT_ERR(err, OCT_ERR_PARSER_FAIL);
66
67    err = oct_subscribe(message_handler, &received, &id1,
68            "_ { fl: 1.01, attr: 10 }");
69    ASSERT_ERR_OK(err);
70    debug_printf("id is: %"PRIu64"\n", id1);
71
72    char* str = "test.txt";
73    err = oct_subscribe(message_handler, &received, &id2, "_ { str: r'%s' }",
74            str);
75    ASSERT_ERR_OK(err);
76    debug_printf("id is: %"PRIu64"\n", id2);
77
78    err = oct_subscribe(message_handler, &received, &id3, "_ { age > %d }",
79            9);
80    ASSERT_ERR_OK(err);
81    debug_printf("id is: %"PRIu64"\n", id3);
82
83    err = oct_subscribe(message_handler, &received, &id4,
84            "r'^msg_(6|7)$'");
85    ASSERT_ERR_OK(err);
86    debug_printf("id is: %"PRIu64"\n", id4);
87
88    // Synchronize with publisher
89    err = oct_barrier_enter(barrier_name, &barrier_record, 2);
90    if (err_is_fail(err)) DEBUG_ERR(err, "barrier enter");
91    assert(err_is_ok(err));
92
93    // Wait until all messages received
94    while(received != 6) {
95        messages_wait_and_handle_next();
96    }
97
98    // Unsubscribe message handlers
99    err = oct_unsubscribe(id1);
100    ASSERT_ERR_OK(err);
101    err = oct_unsubscribe(id2);
102    ASSERT_ERR_OK(err);
103    err = oct_unsubscribe(id3);
104    ASSERT_ERR_OK(err);
105    err = oct_unsubscribe(id4);
106    ASSERT_ERR_OK(err);
107
108    while(received != 10) {
109        messages_wait_and_handle_next();
110    }
111
112    oct_barrier_leave(barrier_record);
113    free(barrier_record);
114
115    printf("Subscriber all done.\n");
116}
117
118static void publisher(void)
119{
120    errval_t err;
121    char* barrier_record = NULL;
122
123    // Synchronize with subscriber
124    err = oct_barrier_enter(barrier_name, &barrier_record, 2);
125    if (err_is_fail(err)) DEBUG_ERR(err, "barrier enter");
126    assert(err_is_ok(err));
127
128    err = oct_publish("msg_1 { age: %d }", 9);
129    ASSERT_ERR_OK(err);
130
131    err = oct_publish("msg_2 { age: %d }", 10);
132    ASSERT_ERR_OK(err);
133
134    err = oct_publish("msg_3 { str: %d, age: '%d' }", 123, 8);
135    ASSERT_ERR_OK(err);
136
137    err = oct_publish("msg_4 { str: 'test.txt' }");
138    ASSERT_ERR_OK(err);
139
140    err = oct_publish("msg_5 { str: 'test.txt', attr: 10, fl: 1.01 }");
141    ASSERT_ERR_OK(err);
142
143    err = oct_publish("msg_6 { type: 'test', pattern: '123123' }");
144    ASSERT_ERR_OK(err);
145
146    err = oct_publish("msg_7 { type: 'test' }");
147    ASSERT_ERR_OK(err);
148
149    oct_barrier_leave(barrier_record);
150    free(barrier_record);
151
152    printf("Publisher all done.\n");
153}
154
155int main(int argc, char** argv)
156{
157    oct_init();
158    assert(argc >= 2);
159
160    if (strcmp(argv[1], "subscriber") == 0) {
161        subscriber();
162    } else if (strcmp(argv[1], "publisher") == 0) {
163        publisher();
164    } else {
165        printf("Bad arguments (Valid choices are subscriber/publisher).");
166    }
167
168    return EXIT_SUCCESS;
169}
170
171