1/**
2 * \file
3 * \brief Contains handler functions for server-side octopus interface RPC call.
4 */
5
6/*
7 * Copyright (c) 2009, 2010, 2012, ETH Zurich.
8 * All rights reserved.
9 *
10 * This file is distributed under the terms in the attached LICENSE file.
11 * If you do not find this file, copies can be found by writing to:
12 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group.
13 */
14
15#include <stdio.h>
16#include <string.h>
17
18#include <barrelfish/barrelfish.h>
19#include <barrelfish/nameservice_client.h>
20#include <skb/skb.h> // read list
21#include <if/octopus_defs.h>
22
23#include <octopus_server/service.h>
24#include <octopus_server/query.h>
25#include <octopus_server/debug.h>
26
27#include <octopus/parser/ast.h>
28#include <octopus/definitions.h>
29
30#include <bench/bench.h>
31
32#include "queue.h"
33
34/**
35 * Name prefix used to by the functions set_with_idcap_handler() and
36 * get_with_idcap_handler() to store and retrieve records by idcap.
37 *
38 * This essentially emulates a dedicated namespace for records stored with an
39 * id cap. Octopus and the SKB do not support dedicated namespaces atm.
40 * FIXME: store records set with the function 'set_with_idcap' in a dedicated
41 *        namespace.
42 */
43#define IDCAPID_NAME_PREFIX "idcapid."
44
45static uint64_t current_id = 1;
46
47static inline errval_t check_query_length(const char* query) {
48    if (strlen(query) >= MAX_QUERY_LENGTH) {
49        return OCT_ERR_QUERY_SIZE;
50    }
51
52    return SYS_ERR_OK;
53}
54
55errval_t new_oct_reply_state(struct oct_reply_state** drt,
56        oct_reply_handler_fn reply_handler)
57{
58    assert(*drt == NULL);
59    *drt = malloc(sizeof(struct oct_reply_state));
60    if (*drt == NULL) {
61        return LIB_ERR_MALLOC_FAIL;
62    }
63
64    //memset(*drt, 0, sizeof(struct oct_reply_state));
65    (*drt)->query_state.std_out.buffer[0] = '\0';
66    (*drt)->query_state.std_out.length = 0;
67    (*drt)->query_state.std_err.buffer[0] = '\0';
68    (*drt)->query_state.std_err.length = 0;
69
70    (*drt)->binding = 0;
71    (*drt)->return_record = false;
72    (*drt)->error = 0;
73
74    // For set_watch()
75    (*drt)->mode = 0;
76    (*drt)->client_state = 0;
77    (*drt)->client_handler = 0;
78    (*drt)->server_id = 0;
79
80    (*drt)->reply = reply_handler;
81    (*drt)->next = NULL;
82
83    return SYS_ERR_OK;
84}
85
86static void free_oct_reply_state(void* arg)
87{
88    if (arg != NULL) {
89        struct oct_reply_state* drt = (struct oct_reply_state*) arg;
90        // In case we have to free things in oct_reply_state, free here...
91
92        free(drt);
93    } else {
94        assert(!"free_reply_state with NULL argument?");
95    }
96}
97
98static void trigger_send_handler(struct octopus_binding* b,
99        struct oct_reply_state* drs)
100{
101    char* record = drs->query_state.std_out.buffer[0] != '\0' ?
102            drs->query_state.std_out.buffer : NULL;
103
104    errval_t err;
105    err = b->tx_vtbl.trigger(b, MKCONT(free_oct_reply_state, drs),
106            drs->server_id,
107            drs->client_handler,
108            drs->mode,
109            record,
110            drs->client_state);
111    if (err_is_fail(err)) {
112        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
113            oct_rpc_enqueue_reply(b, drs);
114            return;
115        }
116        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
117    }
118}
119
120static inline bool can_install_trigger(octopus_trigger_t trigger, errval_t error)
121{
122    OCT_DEBUG("%s:%s:%d: trigger.m > 0 = %d\n",
123              __FILE__, __FUNCTION__, __LINE__, trigger.m > 0);
124    OCT_DEBUG("%s:%s:%d: trigger.in_case == err_no(error) = %d\n",
125           __FILE__, __FUNCTION__, __LINE__, trigger.in_case == err_no(error));
126
127    return trigger.m > 0 &&
128           (trigger.in_case == err_no(error) ||
129           (trigger.m & OCT_ALWAYS_SET) != 0 );
130}
131
132static inline uint64_t install_trigger(struct octopus_binding* binding,
133        struct ast_object* ast, octopus_trigger_t trigger, errval_t error)
134{
135    errval_t err;
136    uint64_t watch_id = 0;
137
138    if (can_install_trigger(trigger, error)) {
139        struct oct_reply_state* trigger_reply = NULL;
140        err = new_oct_reply_state(&trigger_reply, trigger_send_handler);
141        assert(err_is_ok(err));
142
143        trigger_reply->client_handler = trigger.trigger;
144        trigger_reply->client_state = trigger.st;
145
146        trigger_reply->binding = (trigger.send_to == octopus_BINDING_RPC) ?
147                binding : get_event_binding(binding);
148        if (trigger_reply->binding == NULL) {
149            fprintf(stderr, "No event binding for trigger, send events "
150                            "over regular binding.");
151            trigger_reply->binding = binding;
152        }
153
154        err = set_watch(binding, ast, trigger.m, trigger_reply, &watch_id);
155        assert(err_is_ok(err));
156    }
157
158    return watch_id;
159}
160
161static void remove_trigger_reply(struct octopus_binding* b,
162        struct oct_reply_state* drs)
163{
164    errval_t err;
165    err = b->tx_vtbl.remove_trigger_response(b,
166            MKCONT(free_oct_reply_state, drs),
167            drs->error);
168    if (err_is_fail(err)) {
169        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
170            oct_rpc_enqueue_reply(b, drs);
171            return;
172        }
173        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
174    }
175}
176
177void remove_trigger_handler(struct octopus_binding *b, octopus_trigger_id_t tid)
178{
179    struct oct_reply_state* drs = NULL;
180    errval_t err = new_oct_reply_state(&drs, remove_trigger_reply);
181    assert(err_is_ok(err));
182
183    drs->error = del_watch(b, tid, &drs->query_state);
184    drs->reply(b, drs);
185}
186
187/*static inline void arrival_rate(void)
188{
189    static cycles_t measure_time = 10000;
190    static uint64_t arrivals = 0;
191    static cycles_t start = 0;
192    arrivals++;
193    if ( (arrivals % 100) == 0 && bench_tsc_to_ms(bench_tsc() - start) > measure_time) {
194        printf("Get Rate per sec: %lu\n", arrivals / (measure_time / 1000));
195        start = bench_tsc();
196        arrivals = 0;
197    }
198}*/
199
200static void get_reply(struct octopus_binding* b, struct oct_reply_state* drt)
201{
202    errval_t err;
203    char* reply = err_is_ok(drt->error) ?
204            drt->query_state.std_out.buffer : NULL;
205    err = b->tx_vtbl.get_response(b, MKCONT(free_oct_reply_state, drt),
206            reply, drt->server_id, drt->error);
207    if (err_is_fail(err)) {
208        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
209            oct_rpc_enqueue_reply(b, drt);
210            return;
211        }
212        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
213    }
214}
215
216void get_handler(struct octopus_binding *b, const char *query,
217                 octopus_trigger_t trigger)
218{
219    errval_t err = SYS_ERR_OK;
220
221    struct oct_reply_state* drs = NULL;
222    struct ast_object* ast = NULL;
223    err = new_oct_reply_state(&drs, get_reply);
224    assert(err_is_ok(err));
225
226    err = check_query_length(query);
227    if (err_is_fail(err)) {
228        goto out;
229    }
230
231    err = generate_ast(query, &ast);
232    if (err_is_ok(err)) {
233        err = get_record(ast, &drs->query_state);
234        drs->server_id = install_trigger(b, ast, trigger, err);
235    }
236
237out:
238    drs->error = err;
239    drs->reply(b, drs);
240
241    free_ast(ast);
242}
243
244static void get_names_reply(struct octopus_binding* b,
245        struct oct_reply_state* drt)
246{
247    errval_t err;
248    char* reply = err_is_ok(drt->error) ?
249            drt->query_state.std_out.buffer : NULL;
250    err = b->tx_vtbl.get_names_response(b, MKCONT(free_oct_reply_state, drt),
251            reply, drt->server_id, drt->error);
252    if (err_is_fail(err)) {
253        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
254            oct_rpc_enqueue_reply(b, drt);
255            return;
256        }
257        if (err_no(err) == FLOUNDER_ERR_TX_MSG_SIZE) {
258            debug_printf("max msg size: %u, reply size: %zu\n",
259                    octopus__get_names_response_output_MAX_ARGUMENT_SIZE,
260                    drt->query_state.std_out.length);
261        }
262        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
263    }
264}
265
266void get_names_handler(struct octopus_binding *b, const char *query,
267                       octopus_trigger_t t)
268{
269    OCT_DEBUG(" get_names_handler: %s\n", query);
270
271    errval_t err = SYS_ERR_OK;
272
273    struct oct_reply_state* drs = NULL;
274    struct ast_object* ast = NULL;
275
276    err = new_oct_reply_state(&drs, get_names_reply);
277    assert(err_is_ok(err));
278
279    err = check_query_length(query);
280    if (err_is_fail(err)) {
281        goto out;
282    }
283
284    err = generate_ast(query, &ast);
285    if (err_is_ok(err)) {
286        err = get_record_names(ast, &drs->query_state);
287        drs->server_id = install_trigger(b, ast, t, err);
288    }
289
290out:
291    drs->error = err;
292    drs->reply(b, drs);
293
294    free_ast(ast);
295}
296
297static void set_reply(struct octopus_binding* b, struct oct_reply_state* drs)
298{
299    char* record = err_is_ok(drs->error) && drs->return_record ?
300            drs->query_state.std_out.buffer : NULL;
301
302    errval_t err;
303    err = b->tx_vtbl.set_response(b, MKCONT(free_oct_reply_state, drs), record,
304            drs->server_id, drs->error);
305    if (err_is_fail(err)) {
306        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
307            oct_rpc_enqueue_reply(b, drs);
308            return;
309        }
310        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
311    }
312}
313
314void set_handler(struct octopus_binding *b, const char *query, uint64_t mode,
315        octopus_trigger_t trigger, bool get)
316{
317    OCT_DEBUG(" set_handler: %s\n", query);
318    errval_t err = SYS_ERR_OK;
319
320    struct oct_reply_state* drs = NULL;
321    struct ast_object* ast = NULL;
322
323    err = new_oct_reply_state(&drs, set_reply);
324    assert(err_is_ok(err));
325
326    err = check_query_length(query);
327    if (err_is_fail(err)) {
328        goto out;
329    }
330
331    err = generate_ast(query, &ast);
332    if (err_is_ok(err)) {
333        if (ast->u.on.name->type == nodeType_Ident) {
334            err = set_record(ast, mode, &drs->query_state);
335            drs->server_id = install_trigger(b, ast, trigger, err);
336        }
337        else {
338            // Since we don't have any ACLs atm. we do not
339            // allow name to be a regex/variable, because
340            // we it's not guaranteed which records get
341            // modified in this case.
342            err = OCT_ERR_NO_RECORD_NAME;
343        }
344    }
345
346out:
347    drs->error = err;
348    drs->return_record = get;
349    drs->reply(b, drs);
350
351    free_ast(ast);
352}
353
354static errval_t build_query_with_idcap(char **query_p, struct capref idcap,
355                                       const char *attributes)
356{
357    errval_t err;
358    idcap_id_t id = 0;
359    size_t query_size, bytes_written;
360
361    // retrieve id from idcap
362    err = invoke_idcap_identify(idcap, &id);
363    if (err_is_fail(err)) {
364        return err_push(err, OCT_ERR_IDCAP_INVOKE);
365    }
366
367    err = cap_delete(idcap);
368    assert(err_is_ok(err));
369
370    if (attributes == NULL) {
371        attributes = "";
372    }
373
374    // build query using the idcapid and the attributes
375    query_size = snprintf(NULL, 0, IDCAPID_NAME_PREFIX "%" PRIxIDCAPID "%s", id,
376                          attributes);
377    *query_p = (char *) malloc(query_size + 1); // include \0
378    if (*query_p == NULL) {
379        return LIB_ERR_MALLOC_FAIL;
380    }
381    bytes_written = snprintf(*query_p, query_size + 1, IDCAPID_NAME_PREFIX
382                             "%" PRIxIDCAPID "%s", id, attributes);
383
384    return SYS_ERR_OK;
385}
386
387static void get_with_idcap_reply(struct octopus_binding *b,
388                                 struct oct_reply_state *drt)
389{
390    errval_t err;
391    char *reply = err_is_ok(drt->error) ?
392                  drt->query_state.std_out.buffer : NULL;
393    err = b->tx_vtbl.get_with_idcap_response(b,
394                                             MKCONT(free_oct_reply_state, drt),
395                                             reply, drt->server_id, drt->error);
396    if (err_is_fail(err)) {
397        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
398            oct_rpc_enqueue_reply(b, drt);
399            return;
400        }
401        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
402    }
403}
404
405void get_with_idcap_handler(struct octopus_binding *b, struct capref idcap,
406                            octopus_trigger_t trigger)
407{
408    errval_t err;
409    char *query = NULL;
410    struct oct_reply_state *drs = NULL;
411    struct ast_object *ast = NULL;
412
413    OCT_DEBUG("get_with_idcap_handler: %s\n", query);
414
415    err = new_oct_reply_state(&drs, get_with_idcap_reply);
416    assert(err_is_ok(err));
417
418    err = build_query_with_idcap(&query, idcap, "");
419    if (err_is_fail(err)) {
420        goto out;
421    }
422
423    err = check_query_length(query);
424    if (err_is_fail(err)) {
425        goto out;
426    }
427
428    err = generate_ast(query, &ast);
429    if (err_is_ok(err)) {
430        err = get_record(ast, &drs->query_state);
431        drs->server_id = install_trigger(b, ast, trigger, err);
432    }
433
434out:
435    drs->error = err;
436    drs->reply(b, drs);
437
438    free_ast(ast);
439    if (query != NULL) {
440        free(query);
441    }
442}
443
444static void set_with_idcap_reply(struct octopus_binding *b,
445                                 struct oct_reply_state *drs)
446{
447    char *record = err_is_ok(drs->error) && drs->return_record ?
448            drs->query_state.std_out.buffer : NULL;
449
450    errval_t err;
451    err = b->tx_vtbl.set_with_idcap_response(b,
452                                             MKCONT(free_oct_reply_state, drs),
453                                             record, drs->server_id,
454                                             drs->error);
455    if (err_is_fail(err)) {
456        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
457            oct_rpc_enqueue_reply(b, drs);
458            return;
459        }
460        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
461    }
462}
463
464void set_with_idcap_handler(struct octopus_binding *b, struct capref idcap,
465                            const char *attributes, uint64_t mode,
466                            octopus_trigger_t trigger, bool get)
467{
468    errval_t err;
469    char *query = NULL;
470    struct oct_reply_state *drs = NULL;
471    struct ast_object *ast = NULL;
472
473    err = new_oct_reply_state(&drs, set_with_idcap_reply);
474    assert(err_is_ok(err));
475
476    err = build_query_with_idcap(&query, idcap, attributes);
477    if (err_is_fail(err)) {
478        goto out;
479    }
480    OCT_DEBUG(" set_with_idcap_handler: %s\n", query);
481
482    err = check_query_length(query);
483    if (err_is_fail(err)) {
484        goto out;
485    }
486
487    err = generate_ast(query, &ast);
488    if (err_is_ok(err)) {
489        if (ast->u.on.name->type == nodeType_Ident) {
490            err = set_record(ast, mode, &drs->query_state);
491            drs->server_id = install_trigger(b, ast, trigger, err);
492        } else {
493            err = OCT_ERR_NO_RECORD_NAME;
494        }
495    }
496
497out:
498    drs->error = err;
499    drs->return_record = get;
500    drs->reply(b, drs);
501
502    free_ast(ast);
503    if (query != NULL) {
504        free(query);
505    }
506
507}
508
509static void del_reply(struct octopus_binding* b, struct oct_reply_state* drs)
510{
511    errval_t err;
512    err = b->tx_vtbl.del_response(b, MKCONT(free_oct_reply_state, drs),
513            drs->server_id, drs->error);
514    if (err_is_fail(err)) {
515        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
516            oct_rpc_enqueue_reply(b, drs);
517            return;
518        }
519        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
520    }
521}
522
523void del_handler(struct octopus_binding* b, const char* query,
524                 octopus_trigger_t trigger)
525{
526    OCT_DEBUG(" del_handler: %s\n", query);
527    errval_t err = SYS_ERR_OK;
528
529    struct oct_reply_state* drs = NULL;
530    struct ast_object* ast = NULL;
531
532    err = new_oct_reply_state(&drs, del_reply);
533    assert(err_is_ok(err));
534
535    err = check_query_length(query);
536    if (err_is_fail(err)) {
537        goto out;
538    }
539
540    err = generate_ast(query, &ast);
541    if (err_is_ok(err)) {
542        if (ast->u.on.name->type == nodeType_Ident) {
543            err = del_record(ast, &drs->query_state);
544            drs->server_id = install_trigger(b, ast, trigger, err);
545        }
546        else {
547            // Since we don't have any ACLs atm. we do not
548            // allow name to be a regex/variable
549            // (see set_handler).
550            err = OCT_ERR_NO_RECORD_NAME;
551        }
552    }
553
554out:
555    drs->error = err;
556    drs->reply(b, drs);
557
558    free_ast(ast);
559}
560
561static void exists_reply(struct octopus_binding* b, struct oct_reply_state* drs)
562{
563    errval_t err;
564    err = b->tx_vtbl.exists_response(b, MKCONT(free_oct_reply_state, drs),
565            drs->server_id, drs->error);
566
567    if (err_is_fail(err)) {
568        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
569            oct_rpc_enqueue_reply(b, drs);
570            return;
571        }
572        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
573    }
574}
575
576void exists_handler(struct octopus_binding* b, const char* query,
577        octopus_trigger_t trigger)
578{
579    errval_t err = SYS_ERR_OK;
580
581    struct oct_reply_state* drs = NULL;
582    struct ast_object* ast = NULL;
583
584    err = new_oct_reply_state(&drs, exists_reply);
585    assert(err_is_ok(err));
586
587    err = check_query_length(query);
588    if (err_is_fail(err)) {
589        goto out;
590    }
591
592    err = generate_ast(query, &ast);
593    if (err_is_ok(err)) {
594        err = get_record(ast, &drs->query_state);
595        drs->server_id = install_trigger(b, ast, trigger, err);
596    }
597
598out:
599    drs->error = err;
600    drs->reply(b, drs);
601
602    free_ast(ast);
603}
604
605static void wait_for_reply(struct octopus_binding* b, struct oct_reply_state* drs)
606{
607    errval_t err;
608    err = b->tx_vtbl.wait_for_response(b, MKCONT(free_oct_reply_state, drs),
609            drs->query_state.std_out.buffer, drs->error);
610
611    if (err_is_fail(err)) {
612        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
613            oct_rpc_enqueue_reply(b, drs);
614            return;
615        }
616        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
617    }
618}
619
620// XXX: For compatibility reasons with nameserver API
621void wait_for_handler(struct octopus_binding* b, const char* query) {
622    errval_t err = SYS_ERR_OK;
623    errval_t set_watch_err = SYS_ERR_OK;
624
625    struct oct_reply_state* drs = NULL;
626    struct ast_object* ast = NULL;
627
628    err = new_oct_reply_state(&drs, wait_for_reply);
629    drs->binding = b;
630    assert(err_is_ok(err));
631
632    err = check_query_length(query);
633    if (err_is_fail(err)) {
634        goto out;
635    }
636
637    err = generate_ast(query, &ast);
638    if (err_is_ok(err)) {
639        err = get_record(ast, &drs->query_state);
640        if (err_no(err) == OCT_ERR_NO_RECORD) {
641            uint64_t wid;
642            set_watch_err = set_watch(b, ast, OCT_ON_SET, drs, &wid);
643        }
644    }
645
646out:
647    if (err_no(err) != OCT_ERR_NO_RECORD || err_is_fail(set_watch_err)) {
648        drs->error = err;
649        if (err_is_fail(set_watch_err)) {
650            // implies err = OCT_ERR_NO_RECORD
651            drs->error = set_watch_err;
652        }
653        drs->reply(b, drs);
654    }
655
656    free_ast(ast);
657}
658
659static void subscribe_reply(struct octopus_binding* b,
660        struct oct_reply_state* drs)
661{
662    errval_t err;
663    err = b->tx_vtbl.subscribe_response(b, MKCONT(free_oct_reply_state, drs),
664            drs->server_id, drs->error);
665
666    if (err_is_fail(err)) {
667        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
668            oct_rpc_enqueue_reply(b, drs);
669            return;
670        }
671        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
672    }
673}
674
675void subscribe_handler(struct octopus_binding *b, const char* query,
676        uint64_t trigger_fn, uint64_t state)
677{
678    OCT_DEBUG("subscribe: query = %s\n", query);
679    errval_t err = SYS_ERR_OK;
680
681    struct oct_reply_state* drs = NULL;
682    struct ast_object* ast = NULL;
683
684    err = new_oct_reply_state(&drs, subscribe_reply);
685    assert(err_is_ok(err));
686
687    err = check_query_length(query);
688    if (err_is_fail(err)) {
689        goto out;
690    }
691
692    err = generate_ast(query, &ast);
693    if (err_is_ok(err)) {
694        err = add_subscription(b, ast, trigger_fn, state, drs);
695    }
696
697out:
698    drs->error = err;
699    drs->reply(b, drs);
700
701    free_ast(ast);
702}
703
704static void unsubscribe_reply(struct octopus_binding* b,
705        struct oct_reply_state* drs)
706{
707    errval_t err;
708    err = b->tx_vtbl.unsubscribe_response(b, MKCONT(free_oct_reply_state, drs),
709            drs->error);
710    if (err_is_fail(err)) {
711        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
712            oct_rpc_enqueue_reply(b, drs);
713            return;
714        }
715        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
716    }
717}
718
719static void send_subscribed_message(struct octopus_binding* b, struct oct_reply_state* drs)
720{
721    errval_t err = SYS_ERR_OK;
722    char* record = drs->query_state.std_out.buffer[0] != '\0' ?
723            drs->query_state.std_out.buffer : NULL;
724
725    err = b->tx_vtbl.subscription(b, MKCONT(free_oct_reply_state, drs),
726            drs->server_id, drs->client_handler,
727            drs->mode, record, drs->client_state);
728    if (err_is_fail(err)) {
729        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
730            oct_rpc_enqueue_reply(b, drs);
731            return;
732        }
733        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
734    }
735
736}
737
738void unsubscribe_handler(struct octopus_binding *b, uint64_t id)
739{
740    errval_t err = SYS_ERR_OK;
741
742    OCT_DEBUG("unsubscribe: id = %"PRIu64"\n", id);
743
744    struct oct_reply_state* srs = NULL;
745    err = new_oct_reply_state(&srs, unsubscribe_reply);
746    assert(err_is_ok(err));
747
748    err = del_subscription(b, id, &srs->query_state);
749    if (err_is_ok(err)) {
750        uint64_t binding;
751        uint64_t client_handler;
752        uint64_t client_state;
753        uint64_t server_id;
754
755        skb_read_output_at(srs->query_state.std_out.buffer,
756                "subscriber(%"SCNu64", %"SCNu64", %"SCNu64", %"SCNu64")",
757                &binding, &client_handler, &client_state, &server_id);
758
759        struct oct_reply_state* subscriber = NULL;
760        err = new_oct_reply_state(&subscriber,
761                send_subscribed_message);
762        assert(err_is_ok(err));
763
764#if defined(__i386__) || defined(__arm__)
765        subscriber->binding = (struct octopus_binding*)(uint32_t)binding;
766#else
767        subscriber->binding = (struct octopus_binding*)binding;
768#endif
769        subscriber->client_handler = client_handler;
770        subscriber->client_state = client_state;
771        subscriber->server_id = server_id;
772        subscriber->mode = OCT_REMOVED;
773
774        OCT_DEBUG("publish msg to: recipient:%"PRIu64" id:%"PRIu64"\n", binding, server_id);
775        subscriber->reply(subscriber->binding, subscriber);
776    }
777
778    srs->error = err;
779    srs->reply(b, srs);
780}
781
782static void publish_reply(struct octopus_binding* b, struct oct_reply_state* drs)
783{
784    errval_t err;
785    err = b->tx_vtbl.publish_response(b, MKCONT(free_oct_reply_state, drs),
786            drs->error);
787    if (err_is_fail(err)) {
788        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
789            oct_rpc_enqueue_reply(b, drs);
790            return;
791        }
792        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
793    }
794}
795
796void publish_handler(struct octopus_binding *b, const char* record)
797{
798    OCT_DEBUG("publish_handler query: %s\n", record);
799    errval_t err = SYS_ERR_OK;
800
801    struct oct_reply_state* drs = NULL;
802    err = new_oct_reply_state(&drs, publish_reply);
803    assert(err_is_ok(err));
804
805    err = check_query_length(record);
806    if (err_is_fail(err)) {
807        drs->error = err;
808        drs->reply(b, drs);
809        goto out1;
810    }
811
812    struct ast_object* ast = NULL;
813    err = generate_ast(record, &ast);
814    if (err_is_fail(err)) {
815        drs->error = err;
816        drs->reply(b, drs);
817        goto out2;
818    }
819
820
821    if (err_is_ok(err)) {
822        err = find_subscribers(ast, &drs->query_state);
823        if (err_is_ok(err)) {
824            // Reply to publisher
825            drs->error = err;
826            drs->reply(b, drs);
827
828
829            struct list_parser_status status;
830            skb_read_list_init_offset(&status, drs->query_state.std_out.buffer, 0);
831
832            // TODO remove skb list parser dependency
833            // Send to all subscribers
834            uint64_t binding;
835            uint64_t client_handler;
836            uint64_t client_state;
837            uint64_t server_id;
838
839            while (skb_read_list(&status, "subscriber(%"SCNu64", %"SCNu64", %"SCNu64", %"SCNu64")",
840                    &binding, &client_handler, &client_state, &server_id)) {
841
842                struct oct_reply_state* subscriber = NULL;
843                err = new_oct_reply_state(&subscriber,
844                        send_subscribed_message);
845                assert(err_is_ok(err));
846#if defined(__i386__) || defined(__arm__)
847                subscriber->binding = (struct octopus_binding*)(uint32_t)binding;
848#else
849                subscriber->binding = (struct octopus_binding*)binding;
850#endif
851                subscriber->client_handler = client_handler;
852                strcpy(subscriber->query_state.std_out.buffer, record);
853                subscriber->client_state = client_state;
854                subscriber->server_id = server_id;
855                subscriber->mode = OCT_ON_PUBLISH;
856
857                OCT_DEBUG("publish msg to: recipient:%"PRIu64" id:%"PRIu64"\n", binding, server_id);
858                subscriber->reply(subscriber->binding, subscriber);
859            }
860        }
861    }
862
863out2:
864    free_ast(ast);
865out1:
866    return;
867}
868
869void get_identifier(struct octopus_binding* b)
870{
871    errval_t err = b->tx_vtbl.get_identifier_response(b, NOP_CONT,
872            current_id++);
873    assert(err_is_ok(err));
874}
875
876static void identify_binding_reply(struct octopus_binding* b,
877        struct oct_reply_state* drs)
878{
879    errval_t err;
880    // TODO send drs->error back to client!
881    err = b->tx_vtbl.identify_response(b, MKCONT(free_oct_reply_state, drs));
882    if (err_is_fail(err)) {
883        if (err_no(err) == FLOUNDER_ERR_TX_BUSY) {
884            oct_rpc_enqueue_reply(b, drs);
885            return;
886        }
887        USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__);
888    }
889
890}
891
892void identify_binding(struct octopus_binding* b, uint64_t id,
893        octopus_binding_type_t type)
894{
895    assert(id <= current_id);
896
897    struct oct_reply_state* drs = NULL;
898    errval_t err = new_oct_reply_state(&drs, identify_binding_reply);
899    assert(err_is_ok(err));
900
901    OCT_DEBUG("set binding: id=%"PRIu64" type=%d\n", id, type);
902    drs->error = set_binding(type, id, b);
903    drs->reply(b, drs);
904}
905