1/** 2 * \file 3 * \brief Contains handler functions for server-side octopus interface RPC call. 4 */ 5 6/* 7 * Copyright (c) 2009, 2010, 2012, ETH Zurich. 8 * All rights reserved. 9 * 10 * This file is distributed under the terms in the attached LICENSE file. 11 * If you do not find this file, copies can be found by writing to: 12 * ETH Zurich D-INFK, Universitaetstr. 6, CH-8092 Zurich. Attn: Systems Group. 13 */ 14 15#include <stdio.h> 16#include <string.h> 17 18#include <barrelfish/barrelfish.h> 19#include <barrelfish/nameservice_client.h> 20#include <skb/skb.h> // read list 21#include <if/octopus_defs.h> 22 23#include <octopus_server/service.h> 24#include <octopus_server/query.h> 25#include <octopus_server/debug.h> 26 27#include <octopus/parser/ast.h> 28#include <octopus/definitions.h> 29 30#include <bench/bench.h> 31 32#include "queue.h" 33 34/** 35 * Name prefix used to by the functions set_with_idcap_handler() and 36 * get_with_idcap_handler() to store and retrieve records by idcap. 37 * 38 * This essentially emulates a dedicated namespace for records stored with an 39 * id cap. Octopus and the SKB do not support dedicated namespaces atm. 40 * FIXME: store records set with the function 'set_with_idcap' in a dedicated 41 * namespace. 42 */ 43#define IDCAPID_NAME_PREFIX "idcapid." 44 45static uint64_t current_id = 1; 46 47static inline errval_t check_query_length(const char* query) { 48 if (strlen(query) >= MAX_QUERY_LENGTH) { 49 return OCT_ERR_QUERY_SIZE; 50 } 51 52 return SYS_ERR_OK; 53} 54 55errval_t new_oct_reply_state(struct oct_reply_state** drt, 56 oct_reply_handler_fn reply_handler) 57{ 58 assert(*drt == NULL); 59 *drt = malloc(sizeof(struct oct_reply_state)); 60 if (*drt == NULL) { 61 return LIB_ERR_MALLOC_FAIL; 62 } 63 64 //memset(*drt, 0, sizeof(struct oct_reply_state)); 65 (*drt)->query_state.std_out.buffer[0] = '\0'; 66 (*drt)->query_state.std_out.length = 0; 67 (*drt)->query_state.std_err.buffer[0] = '\0'; 68 (*drt)->query_state.std_err.length = 0; 69 70 (*drt)->binding = 0; 71 (*drt)->return_record = false; 72 (*drt)->error = 0; 73 74 // For set_watch() 75 (*drt)->mode = 0; 76 (*drt)->client_state = 0; 77 (*drt)->client_handler = 0; 78 (*drt)->server_id = 0; 79 80 (*drt)->reply = reply_handler; 81 (*drt)->next = NULL; 82 83 return SYS_ERR_OK; 84} 85 86static void free_oct_reply_state(void* arg) 87{ 88 if (arg != NULL) { 89 struct oct_reply_state* drt = (struct oct_reply_state*) arg; 90 // In case we have to free things in oct_reply_state, free here... 91 92 free(drt); 93 } else { 94 assert(!"free_reply_state with NULL argument?"); 95 } 96} 97 98static void trigger_send_handler(struct octopus_binding* b, 99 struct oct_reply_state* drs) 100{ 101 char* record = drs->query_state.std_out.buffer[0] != '\0' ? 102 drs->query_state.std_out.buffer : NULL; 103 104 errval_t err; 105 err = b->tx_vtbl.trigger(b, MKCONT(free_oct_reply_state, drs), 106 drs->server_id, 107 drs->client_handler, 108 drs->mode, 109 record, 110 drs->client_state); 111 if (err_is_fail(err)) { 112 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 113 oct_rpc_enqueue_reply(b, drs); 114 return; 115 } 116 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 117 } 118} 119 120static inline bool can_install_trigger(octopus_trigger_t trigger, errval_t error) 121{ 122 OCT_DEBUG("%s:%s:%d: trigger.m > 0 = %d\n", 123 __FILE__, __FUNCTION__, __LINE__, trigger.m > 0); 124 OCT_DEBUG("%s:%s:%d: trigger.in_case == err_no(error) = %d\n", 125 __FILE__, __FUNCTION__, __LINE__, trigger.in_case == err_no(error)); 126 127 return trigger.m > 0 && 128 (trigger.in_case == err_no(error) || 129 (trigger.m & OCT_ALWAYS_SET) != 0 ); 130} 131 132static inline uint64_t install_trigger(struct octopus_binding* binding, 133 struct ast_object* ast, octopus_trigger_t trigger, errval_t error) 134{ 135 errval_t err; 136 uint64_t watch_id = 0; 137 138 if (can_install_trigger(trigger, error)) { 139 struct oct_reply_state* trigger_reply = NULL; 140 err = new_oct_reply_state(&trigger_reply, trigger_send_handler); 141 assert(err_is_ok(err)); 142 143 trigger_reply->client_handler = trigger.trigger; 144 trigger_reply->client_state = trigger.st; 145 146 trigger_reply->binding = (trigger.send_to == octopus_BINDING_RPC) ? 147 binding : get_event_binding(binding); 148 if (trigger_reply->binding == NULL) { 149 fprintf(stderr, "No event binding for trigger, send events " 150 "over regular binding."); 151 trigger_reply->binding = binding; 152 } 153 154 err = set_watch(binding, ast, trigger.m, trigger_reply, &watch_id); 155 assert(err_is_ok(err)); 156 } 157 158 return watch_id; 159} 160 161static void remove_trigger_reply(struct octopus_binding* b, 162 struct oct_reply_state* drs) 163{ 164 errval_t err; 165 err = b->tx_vtbl.remove_trigger_response(b, 166 MKCONT(free_oct_reply_state, drs), 167 drs->error); 168 if (err_is_fail(err)) { 169 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 170 oct_rpc_enqueue_reply(b, drs); 171 return; 172 } 173 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 174 } 175} 176 177void remove_trigger_handler(struct octopus_binding *b, octopus_trigger_id_t tid) 178{ 179 struct oct_reply_state* drs = NULL; 180 errval_t err = new_oct_reply_state(&drs, remove_trigger_reply); 181 assert(err_is_ok(err)); 182 183 drs->error = del_watch(b, tid, &drs->query_state); 184 drs->reply(b, drs); 185} 186 187/*static inline void arrival_rate(void) 188{ 189 static cycles_t measure_time = 10000; 190 static uint64_t arrivals = 0; 191 static cycles_t start = 0; 192 arrivals++; 193 if ( (arrivals % 100) == 0 && bench_tsc_to_ms(bench_tsc() - start) > measure_time) { 194 printf("Get Rate per sec: %lu\n", arrivals / (measure_time / 1000)); 195 start = bench_tsc(); 196 arrivals = 0; 197 } 198}*/ 199 200static void get_reply(struct octopus_binding* b, struct oct_reply_state* drt) 201{ 202 errval_t err; 203 char* reply = err_is_ok(drt->error) ? 204 drt->query_state.std_out.buffer : NULL; 205 err = b->tx_vtbl.get_response(b, MKCONT(free_oct_reply_state, drt), 206 reply, drt->server_id, drt->error); 207 if (err_is_fail(err)) { 208 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 209 oct_rpc_enqueue_reply(b, drt); 210 return; 211 } 212 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 213 } 214} 215 216void get_handler(struct octopus_binding *b, const char *query, 217 octopus_trigger_t trigger) 218{ 219 errval_t err = SYS_ERR_OK; 220 221 struct oct_reply_state* drs = NULL; 222 struct ast_object* ast = NULL; 223 err = new_oct_reply_state(&drs, get_reply); 224 assert(err_is_ok(err)); 225 226 err = check_query_length(query); 227 if (err_is_fail(err)) { 228 goto out; 229 } 230 231 err = generate_ast(query, &ast); 232 if (err_is_ok(err)) { 233 err = get_record(ast, &drs->query_state); 234 drs->server_id = install_trigger(b, ast, trigger, err); 235 } 236 237out: 238 drs->error = err; 239 drs->reply(b, drs); 240 241 free_ast(ast); 242} 243 244static void get_names_reply(struct octopus_binding* b, 245 struct oct_reply_state* drt) 246{ 247 errval_t err; 248 char* reply = err_is_ok(drt->error) ? 249 drt->query_state.std_out.buffer : NULL; 250 err = b->tx_vtbl.get_names_response(b, MKCONT(free_oct_reply_state, drt), 251 reply, drt->server_id, drt->error); 252 if (err_is_fail(err)) { 253 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 254 oct_rpc_enqueue_reply(b, drt); 255 return; 256 } 257 if (err_no(err) == FLOUNDER_ERR_TX_MSG_SIZE) { 258 debug_printf("max msg size: %u, reply size: %zu\n", 259 octopus__get_names_response_output_MAX_ARGUMENT_SIZE, 260 drt->query_state.std_out.length); 261 } 262 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 263 } 264} 265 266void get_names_handler(struct octopus_binding *b, const char *query, 267 octopus_trigger_t t) 268{ 269 OCT_DEBUG(" get_names_handler: %s\n", query); 270 271 errval_t err = SYS_ERR_OK; 272 273 struct oct_reply_state* drs = NULL; 274 struct ast_object* ast = NULL; 275 276 err = new_oct_reply_state(&drs, get_names_reply); 277 assert(err_is_ok(err)); 278 279 err = check_query_length(query); 280 if (err_is_fail(err)) { 281 goto out; 282 } 283 284 err = generate_ast(query, &ast); 285 if (err_is_ok(err)) { 286 err = get_record_names(ast, &drs->query_state); 287 drs->server_id = install_trigger(b, ast, t, err); 288 } 289 290out: 291 drs->error = err; 292 drs->reply(b, drs); 293 294 free_ast(ast); 295} 296 297static void set_reply(struct octopus_binding* b, struct oct_reply_state* drs) 298{ 299 char* record = err_is_ok(drs->error) && drs->return_record ? 300 drs->query_state.std_out.buffer : NULL; 301 302 errval_t err; 303 err = b->tx_vtbl.set_response(b, MKCONT(free_oct_reply_state, drs), record, 304 drs->server_id, drs->error); 305 if (err_is_fail(err)) { 306 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 307 oct_rpc_enqueue_reply(b, drs); 308 return; 309 } 310 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 311 } 312} 313 314void set_handler(struct octopus_binding *b, const char *query, uint64_t mode, 315 octopus_trigger_t trigger, bool get) 316{ 317 OCT_DEBUG(" set_handler: %s\n", query); 318 errval_t err = SYS_ERR_OK; 319 320 struct oct_reply_state* drs = NULL; 321 struct ast_object* ast = NULL; 322 323 err = new_oct_reply_state(&drs, set_reply); 324 assert(err_is_ok(err)); 325 326 err = check_query_length(query); 327 if (err_is_fail(err)) { 328 goto out; 329 } 330 331 err = generate_ast(query, &ast); 332 if (err_is_ok(err)) { 333 if (ast->u.on.name->type == nodeType_Ident) { 334 err = set_record(ast, mode, &drs->query_state); 335 drs->server_id = install_trigger(b, ast, trigger, err); 336 } 337 else { 338 // Since we don't have any ACLs atm. we do not 339 // allow name to be a regex/variable, because 340 // we it's not guaranteed which records get 341 // modified in this case. 342 err = OCT_ERR_NO_RECORD_NAME; 343 } 344 } 345 346out: 347 drs->error = err; 348 drs->return_record = get; 349 drs->reply(b, drs); 350 351 free_ast(ast); 352} 353 354static errval_t build_query_with_idcap(char **query_p, struct capref idcap, 355 const char *attributes) 356{ 357 errval_t err; 358 idcap_id_t id = 0; 359 size_t query_size, bytes_written; 360 361 // retrieve id from idcap 362 err = invoke_idcap_identify(idcap, &id); 363 if (err_is_fail(err)) { 364 return err_push(err, OCT_ERR_IDCAP_INVOKE); 365 } 366 367 err = cap_delete(idcap); 368 assert(err_is_ok(err)); 369 370 if (attributes == NULL) { 371 attributes = ""; 372 } 373 374 // build query using the idcapid and the attributes 375 query_size = snprintf(NULL, 0, IDCAPID_NAME_PREFIX "%" PRIxIDCAPID "%s", id, 376 attributes); 377 *query_p = (char *) malloc(query_size + 1); // include \0 378 if (*query_p == NULL) { 379 return LIB_ERR_MALLOC_FAIL; 380 } 381 bytes_written = snprintf(*query_p, query_size + 1, IDCAPID_NAME_PREFIX 382 "%" PRIxIDCAPID "%s", id, attributes); 383 384 return SYS_ERR_OK; 385} 386 387static void get_with_idcap_reply(struct octopus_binding *b, 388 struct oct_reply_state *drt) 389{ 390 errval_t err; 391 char *reply = err_is_ok(drt->error) ? 392 drt->query_state.std_out.buffer : NULL; 393 err = b->tx_vtbl.get_with_idcap_response(b, 394 MKCONT(free_oct_reply_state, drt), 395 reply, drt->server_id, drt->error); 396 if (err_is_fail(err)) { 397 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 398 oct_rpc_enqueue_reply(b, drt); 399 return; 400 } 401 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 402 } 403} 404 405void get_with_idcap_handler(struct octopus_binding *b, struct capref idcap, 406 octopus_trigger_t trigger) 407{ 408 errval_t err; 409 char *query = NULL; 410 struct oct_reply_state *drs = NULL; 411 struct ast_object *ast = NULL; 412 413 OCT_DEBUG("get_with_idcap_handler: %s\n", query); 414 415 err = new_oct_reply_state(&drs, get_with_idcap_reply); 416 assert(err_is_ok(err)); 417 418 err = build_query_with_idcap(&query, idcap, ""); 419 if (err_is_fail(err)) { 420 goto out; 421 } 422 423 err = check_query_length(query); 424 if (err_is_fail(err)) { 425 goto out; 426 } 427 428 err = generate_ast(query, &ast); 429 if (err_is_ok(err)) { 430 err = get_record(ast, &drs->query_state); 431 drs->server_id = install_trigger(b, ast, trigger, err); 432 } 433 434out: 435 drs->error = err; 436 drs->reply(b, drs); 437 438 free_ast(ast); 439 if (query != NULL) { 440 free(query); 441 } 442} 443 444static void set_with_idcap_reply(struct octopus_binding *b, 445 struct oct_reply_state *drs) 446{ 447 char *record = err_is_ok(drs->error) && drs->return_record ? 448 drs->query_state.std_out.buffer : NULL; 449 450 errval_t err; 451 err = b->tx_vtbl.set_with_idcap_response(b, 452 MKCONT(free_oct_reply_state, drs), 453 record, drs->server_id, 454 drs->error); 455 if (err_is_fail(err)) { 456 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 457 oct_rpc_enqueue_reply(b, drs); 458 return; 459 } 460 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 461 } 462} 463 464void set_with_idcap_handler(struct octopus_binding *b, struct capref idcap, 465 const char *attributes, uint64_t mode, 466 octopus_trigger_t trigger, bool get) 467{ 468 errval_t err; 469 char *query = NULL; 470 struct oct_reply_state *drs = NULL; 471 struct ast_object *ast = NULL; 472 473 err = new_oct_reply_state(&drs, set_with_idcap_reply); 474 assert(err_is_ok(err)); 475 476 err = build_query_with_idcap(&query, idcap, attributes); 477 if (err_is_fail(err)) { 478 goto out; 479 } 480 OCT_DEBUG(" set_with_idcap_handler: %s\n", query); 481 482 err = check_query_length(query); 483 if (err_is_fail(err)) { 484 goto out; 485 } 486 487 err = generate_ast(query, &ast); 488 if (err_is_ok(err)) { 489 if (ast->u.on.name->type == nodeType_Ident) { 490 err = set_record(ast, mode, &drs->query_state); 491 drs->server_id = install_trigger(b, ast, trigger, err); 492 } else { 493 err = OCT_ERR_NO_RECORD_NAME; 494 } 495 } 496 497out: 498 drs->error = err; 499 drs->return_record = get; 500 drs->reply(b, drs); 501 502 free_ast(ast); 503 if (query != NULL) { 504 free(query); 505 } 506 507} 508 509static void del_reply(struct octopus_binding* b, struct oct_reply_state* drs) 510{ 511 errval_t err; 512 err = b->tx_vtbl.del_response(b, MKCONT(free_oct_reply_state, drs), 513 drs->server_id, drs->error); 514 if (err_is_fail(err)) { 515 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 516 oct_rpc_enqueue_reply(b, drs); 517 return; 518 } 519 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 520 } 521} 522 523void del_handler(struct octopus_binding* b, const char* query, 524 octopus_trigger_t trigger) 525{ 526 OCT_DEBUG(" del_handler: %s\n", query); 527 errval_t err = SYS_ERR_OK; 528 529 struct oct_reply_state* drs = NULL; 530 struct ast_object* ast = NULL; 531 532 err = new_oct_reply_state(&drs, del_reply); 533 assert(err_is_ok(err)); 534 535 err = check_query_length(query); 536 if (err_is_fail(err)) { 537 goto out; 538 } 539 540 err = generate_ast(query, &ast); 541 if (err_is_ok(err)) { 542 if (ast->u.on.name->type == nodeType_Ident) { 543 err = del_record(ast, &drs->query_state); 544 drs->server_id = install_trigger(b, ast, trigger, err); 545 } 546 else { 547 // Since we don't have any ACLs atm. we do not 548 // allow name to be a regex/variable 549 // (see set_handler). 550 err = OCT_ERR_NO_RECORD_NAME; 551 } 552 } 553 554out: 555 drs->error = err; 556 drs->reply(b, drs); 557 558 free_ast(ast); 559} 560 561static void exists_reply(struct octopus_binding* b, struct oct_reply_state* drs) 562{ 563 errval_t err; 564 err = b->tx_vtbl.exists_response(b, MKCONT(free_oct_reply_state, drs), 565 drs->server_id, drs->error); 566 567 if (err_is_fail(err)) { 568 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 569 oct_rpc_enqueue_reply(b, drs); 570 return; 571 } 572 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 573 } 574} 575 576void exists_handler(struct octopus_binding* b, const char* query, 577 octopus_trigger_t trigger) 578{ 579 errval_t err = SYS_ERR_OK; 580 581 struct oct_reply_state* drs = NULL; 582 struct ast_object* ast = NULL; 583 584 err = new_oct_reply_state(&drs, exists_reply); 585 assert(err_is_ok(err)); 586 587 err = check_query_length(query); 588 if (err_is_fail(err)) { 589 goto out; 590 } 591 592 err = generate_ast(query, &ast); 593 if (err_is_ok(err)) { 594 err = get_record(ast, &drs->query_state); 595 drs->server_id = install_trigger(b, ast, trigger, err); 596 } 597 598out: 599 drs->error = err; 600 drs->reply(b, drs); 601 602 free_ast(ast); 603} 604 605static void wait_for_reply(struct octopus_binding* b, struct oct_reply_state* drs) 606{ 607 errval_t err; 608 err = b->tx_vtbl.wait_for_response(b, MKCONT(free_oct_reply_state, drs), 609 drs->query_state.std_out.buffer, drs->error); 610 611 if (err_is_fail(err)) { 612 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 613 oct_rpc_enqueue_reply(b, drs); 614 return; 615 } 616 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 617 } 618} 619 620// XXX: For compatibility reasons with nameserver API 621void wait_for_handler(struct octopus_binding* b, const char* query) { 622 errval_t err = SYS_ERR_OK; 623 errval_t set_watch_err = SYS_ERR_OK; 624 625 struct oct_reply_state* drs = NULL; 626 struct ast_object* ast = NULL; 627 628 err = new_oct_reply_state(&drs, wait_for_reply); 629 drs->binding = b; 630 assert(err_is_ok(err)); 631 632 err = check_query_length(query); 633 if (err_is_fail(err)) { 634 goto out; 635 } 636 637 err = generate_ast(query, &ast); 638 if (err_is_ok(err)) { 639 err = get_record(ast, &drs->query_state); 640 if (err_no(err) == OCT_ERR_NO_RECORD) { 641 uint64_t wid; 642 set_watch_err = set_watch(b, ast, OCT_ON_SET, drs, &wid); 643 } 644 } 645 646out: 647 if (err_no(err) != OCT_ERR_NO_RECORD || err_is_fail(set_watch_err)) { 648 drs->error = err; 649 if (err_is_fail(set_watch_err)) { 650 // implies err = OCT_ERR_NO_RECORD 651 drs->error = set_watch_err; 652 } 653 drs->reply(b, drs); 654 } 655 656 free_ast(ast); 657} 658 659static void subscribe_reply(struct octopus_binding* b, 660 struct oct_reply_state* drs) 661{ 662 errval_t err; 663 err = b->tx_vtbl.subscribe_response(b, MKCONT(free_oct_reply_state, drs), 664 drs->server_id, drs->error); 665 666 if (err_is_fail(err)) { 667 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 668 oct_rpc_enqueue_reply(b, drs); 669 return; 670 } 671 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 672 } 673} 674 675void subscribe_handler(struct octopus_binding *b, const char* query, 676 uint64_t trigger_fn, uint64_t state) 677{ 678 OCT_DEBUG("subscribe: query = %s\n", query); 679 errval_t err = SYS_ERR_OK; 680 681 struct oct_reply_state* drs = NULL; 682 struct ast_object* ast = NULL; 683 684 err = new_oct_reply_state(&drs, subscribe_reply); 685 assert(err_is_ok(err)); 686 687 err = check_query_length(query); 688 if (err_is_fail(err)) { 689 goto out; 690 } 691 692 err = generate_ast(query, &ast); 693 if (err_is_ok(err)) { 694 err = add_subscription(b, ast, trigger_fn, state, drs); 695 } 696 697out: 698 drs->error = err; 699 drs->reply(b, drs); 700 701 free_ast(ast); 702} 703 704static void unsubscribe_reply(struct octopus_binding* b, 705 struct oct_reply_state* drs) 706{ 707 errval_t err; 708 err = b->tx_vtbl.unsubscribe_response(b, MKCONT(free_oct_reply_state, drs), 709 drs->error); 710 if (err_is_fail(err)) { 711 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 712 oct_rpc_enqueue_reply(b, drs); 713 return; 714 } 715 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 716 } 717} 718 719static void send_subscribed_message(struct octopus_binding* b, struct oct_reply_state* drs) 720{ 721 errval_t err = SYS_ERR_OK; 722 char* record = drs->query_state.std_out.buffer[0] != '\0' ? 723 drs->query_state.std_out.buffer : NULL; 724 725 err = b->tx_vtbl.subscription(b, MKCONT(free_oct_reply_state, drs), 726 drs->server_id, drs->client_handler, 727 drs->mode, record, drs->client_state); 728 if (err_is_fail(err)) { 729 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 730 oct_rpc_enqueue_reply(b, drs); 731 return; 732 } 733 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 734 } 735 736} 737 738void unsubscribe_handler(struct octopus_binding *b, uint64_t id) 739{ 740 errval_t err = SYS_ERR_OK; 741 742 OCT_DEBUG("unsubscribe: id = %"PRIu64"\n", id); 743 744 struct oct_reply_state* srs = NULL; 745 err = new_oct_reply_state(&srs, unsubscribe_reply); 746 assert(err_is_ok(err)); 747 748 err = del_subscription(b, id, &srs->query_state); 749 if (err_is_ok(err)) { 750 uint64_t binding; 751 uint64_t client_handler; 752 uint64_t client_state; 753 uint64_t server_id; 754 755 skb_read_output_at(srs->query_state.std_out.buffer, 756 "subscriber(%"SCNu64", %"SCNu64", %"SCNu64", %"SCNu64")", 757 &binding, &client_handler, &client_state, &server_id); 758 759 struct oct_reply_state* subscriber = NULL; 760 err = new_oct_reply_state(&subscriber, 761 send_subscribed_message); 762 assert(err_is_ok(err)); 763 764#if defined(__i386__) || defined(__arm__) 765 subscriber->binding = (struct octopus_binding*)(uint32_t)binding; 766#else 767 subscriber->binding = (struct octopus_binding*)binding; 768#endif 769 subscriber->client_handler = client_handler; 770 subscriber->client_state = client_state; 771 subscriber->server_id = server_id; 772 subscriber->mode = OCT_REMOVED; 773 774 OCT_DEBUG("publish msg to: recipient:%"PRIu64" id:%"PRIu64"\n", binding, server_id); 775 subscriber->reply(subscriber->binding, subscriber); 776 } 777 778 srs->error = err; 779 srs->reply(b, srs); 780} 781 782static void publish_reply(struct octopus_binding* b, struct oct_reply_state* drs) 783{ 784 errval_t err; 785 err = b->tx_vtbl.publish_response(b, MKCONT(free_oct_reply_state, drs), 786 drs->error); 787 if (err_is_fail(err)) { 788 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 789 oct_rpc_enqueue_reply(b, drs); 790 return; 791 } 792 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 793 } 794} 795 796void publish_handler(struct octopus_binding *b, const char* record) 797{ 798 OCT_DEBUG("publish_handler query: %s\n", record); 799 errval_t err = SYS_ERR_OK; 800 801 struct oct_reply_state* drs = NULL; 802 err = new_oct_reply_state(&drs, publish_reply); 803 assert(err_is_ok(err)); 804 805 err = check_query_length(record); 806 if (err_is_fail(err)) { 807 drs->error = err; 808 drs->reply(b, drs); 809 goto out1; 810 } 811 812 struct ast_object* ast = NULL; 813 err = generate_ast(record, &ast); 814 if (err_is_fail(err)) { 815 drs->error = err; 816 drs->reply(b, drs); 817 goto out2; 818 } 819 820 821 if (err_is_ok(err)) { 822 err = find_subscribers(ast, &drs->query_state); 823 if (err_is_ok(err)) { 824 // Reply to publisher 825 drs->error = err; 826 drs->reply(b, drs); 827 828 829 struct list_parser_status status; 830 skb_read_list_init_offset(&status, drs->query_state.std_out.buffer, 0); 831 832 // TODO remove skb list parser dependency 833 // Send to all subscribers 834 uint64_t binding; 835 uint64_t client_handler; 836 uint64_t client_state; 837 uint64_t server_id; 838 839 while (skb_read_list(&status, "subscriber(%"SCNu64", %"SCNu64", %"SCNu64", %"SCNu64")", 840 &binding, &client_handler, &client_state, &server_id)) { 841 842 struct oct_reply_state* subscriber = NULL; 843 err = new_oct_reply_state(&subscriber, 844 send_subscribed_message); 845 assert(err_is_ok(err)); 846#if defined(__i386__) || defined(__arm__) 847 subscriber->binding = (struct octopus_binding*)(uint32_t)binding; 848#else 849 subscriber->binding = (struct octopus_binding*)binding; 850#endif 851 subscriber->client_handler = client_handler; 852 strcpy(subscriber->query_state.std_out.buffer, record); 853 subscriber->client_state = client_state; 854 subscriber->server_id = server_id; 855 subscriber->mode = OCT_ON_PUBLISH; 856 857 OCT_DEBUG("publish msg to: recipient:%"PRIu64" id:%"PRIu64"\n", binding, server_id); 858 subscriber->reply(subscriber->binding, subscriber); 859 } 860 } 861 } 862 863out2: 864 free_ast(ast); 865out1: 866 return; 867} 868 869void get_identifier(struct octopus_binding* b) 870{ 871 errval_t err = b->tx_vtbl.get_identifier_response(b, NOP_CONT, 872 current_id++); 873 assert(err_is_ok(err)); 874} 875 876static void identify_binding_reply(struct octopus_binding* b, 877 struct oct_reply_state* drs) 878{ 879 errval_t err; 880 // TODO send drs->error back to client! 881 err = b->tx_vtbl.identify_response(b, MKCONT(free_oct_reply_state, drs)); 882 if (err_is_fail(err)) { 883 if (err_no(err) == FLOUNDER_ERR_TX_BUSY) { 884 oct_rpc_enqueue_reply(b, drs); 885 return; 886 } 887 USER_PANIC_ERR(err, "SKB sending %s failed!", __FUNCTION__); 888 } 889 890} 891 892void identify_binding(struct octopus_binding* b, uint64_t id, 893 octopus_binding_type_t type) 894{ 895 assert(id <= current_id); 896 897 struct oct_reply_state* drs = NULL; 898 errval_t err = new_oct_reply_state(&drs, identify_binding_reply); 899 assert(err_is_ok(err)); 900 901 OCT_DEBUG("set binding: id=%"PRIu64" type=%d\n", id, type); 902 drs->error = set_binding(type, id, b); 903 drs->reply(b, drs); 904} 905