1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2006, Ericsson AB 5 * Copyright (c) 2004-2005, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "dbg.h" 40#include "port.h" 41#include "addr.h" 42#include "link.h" 43#include "node.h" 44#include "port.h" 45#include "name_table.h" 46#include "user_reg.h" 47#include "msg.h" 48#include "bcast.h" 49 50/* Connection management: */ 51#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 52#define CONFIRMED 0 53#define PROBING 1 54 55#define MAX_REJECT_SIZE 1024 56 57static struct sk_buff *msg_queue_head = NULL; 58static struct sk_buff *msg_queue_tail = NULL; 59 60DEFINE_SPINLOCK(tipc_port_list_lock); 61static DEFINE_SPINLOCK(queue_lock); 62 63static LIST_HEAD(ports); 64static void port_handle_node_down(unsigned long ref); 65static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err); 66static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err); 67static void port_timeout(unsigned long ref); 68 69 70static u32 port_peernode(struct port *p_ptr) 71{ 72 return msg_destnode(&p_ptr->publ.phdr); 73} 74 75static u32 port_peerport(struct port *p_ptr) 76{ 77 return msg_destport(&p_ptr->publ.phdr); 78} 79 80static u32 port_out_seqno(struct port *p_ptr) 81{ 82 return msg_transp_seqno(&p_ptr->publ.phdr); 83} 84 85static void port_incr_out_seqno(struct port *p_ptr) 86{ 87 struct tipc_msg *m = &p_ptr->publ.phdr; 88 89 if (likely(!msg_routed(m))) 90 return; 91 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1)); 92} 93 94/** 95 * tipc_multicast - send a multicast message to local and remote destinations 96 */ 97 98int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain, 99 u32 num_sect, struct iovec const *msg_sect) 100{ 101 struct tipc_msg *hdr; 102 struct sk_buff *buf; 103 struct sk_buff *ibuf = NULL; 104 struct port_list dports = {0, NULL, }; 105 struct port *oport = tipc_port_deref(ref); 106 int ext_targets; 107 int res; 108 109 if (unlikely(!oport)) 110 return -EINVAL; 111 112 /* Create multicast message */ 113 114 hdr = &oport->publ.phdr; 115 msg_set_type(hdr, TIPC_MCAST_MSG); 116 msg_set_nametype(hdr, seq->type); 117 msg_set_namelower(hdr, seq->lower); 118 msg_set_nameupper(hdr, seq->upper); 119 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 120 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 121 !oport->user_port, &buf); 122 if (unlikely(!buf)) 123 return res; 124 125 /* Figure out where to send multicast message */ 126 127 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 128 TIPC_NODE_SCOPE, &dports); 129 130 /* Send message to destinations (duplicate it only if necessary) */ 131 132 if (ext_targets) { 133 if (dports.count != 0) { 134 ibuf = skb_copy(buf, GFP_ATOMIC); 135 if (ibuf == NULL) { 136 tipc_port_list_free(&dports); 137 buf_discard(buf); 138 return -ENOMEM; 139 } 140 } 141 res = tipc_bclink_send_msg(buf); 142 if ((res < 0) && (dports.count != 0)) { 143 buf_discard(ibuf); 144 } 145 } else { 146 ibuf = buf; 147 } 148 149 if (res >= 0) { 150 if (ibuf) 151 tipc_port_recv_mcast(ibuf, &dports); 152 } else { 153 tipc_port_list_free(&dports); 154 } 155 return res; 156} 157 158/** 159 * tipc_port_recv_mcast - deliver multicast message to all destination ports 160 * 161 * If there is no port list, perform a lookup to create one 162 */ 163 164void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp) 165{ 166 struct tipc_msg* msg; 167 struct port_list dports = {0, NULL, }; 168 struct port_list *item = dp; 169 int cnt = 0; 170 171 msg = buf_msg(buf); 172 173 /* Create destination port list, if one wasn't supplied */ 174 175 if (dp == NULL) { 176 tipc_nametbl_mc_translate(msg_nametype(msg), 177 msg_namelower(msg), 178 msg_nameupper(msg), 179 TIPC_CLUSTER_SCOPE, 180 &dports); 181 item = dp = &dports; 182 } 183 184 /* Deliver a copy of message to each destination port */ 185 186 if (dp->count != 0) { 187 if (dp->count == 1) { 188 msg_set_destport(msg, dp->ports[0]); 189 tipc_port_recv_msg(buf); 190 tipc_port_list_free(dp); 191 return; 192 } 193 for (; cnt < dp->count; cnt++) { 194 int index = cnt % PLSIZE; 195 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 196 197 if (b == NULL) { 198 warn("Unable to deliver multicast message(s)\n"); 199 msg_dbg(msg, "LOST:"); 200 goto exit; 201 } 202 if ((index == 0) && (cnt != 0)) { 203 item = item->next; 204 } 205 msg_set_destport(buf_msg(b),item->ports[index]); 206 tipc_port_recv_msg(b); 207 } 208 } 209exit: 210 buf_discard(buf); 211 tipc_port_list_free(dp); 212} 213 214/** 215 * tipc_createport_raw - create a native TIPC port 216 * 217 * Returns local port reference 218 */ 219 220u32 tipc_createport_raw(void *usr_handle, 221 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 222 void (*wakeup)(struct tipc_port *), 223 const u32 importance) 224{ 225 struct port *p_ptr; 226 struct tipc_msg *msg; 227 u32 ref; 228 229 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 230 if (!p_ptr) { 231 warn("Port creation failed, no memory\n"); 232 return 0; 233 } 234 ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); 235 if (!ref) { 236 warn("Port creation failed, reference table exhausted\n"); 237 kfree(p_ptr); 238 return 0; 239 } 240 241 tipc_port_lock(ref); 242 p_ptr->publ.ref = ref; 243 msg = &p_ptr->publ.phdr; 244 msg_init(msg, DATA_LOW, TIPC_NAMED_MSG, TIPC_OK, LONG_H_SIZE, 0); 245 msg_set_orignode(msg, tipc_own_addr); 246 msg_set_prevnode(msg, tipc_own_addr); 247 msg_set_origport(msg, ref); 248 msg_set_importance(msg,importance); 249 p_ptr->last_in_seqno = 41; 250 p_ptr->sent = 1; 251 p_ptr->publ.usr_handle = usr_handle; 252 INIT_LIST_HEAD(&p_ptr->wait_list); 253 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 254 p_ptr->congested_link = NULL; 255 p_ptr->max_pkt = MAX_PKT_DEFAULT; 256 p_ptr->dispatcher = dispatcher; 257 p_ptr->wakeup = wakeup; 258 p_ptr->user_port = NULL; 259 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 260 spin_lock_bh(&tipc_port_list_lock); 261 INIT_LIST_HEAD(&p_ptr->publications); 262 INIT_LIST_HEAD(&p_ptr->port_list); 263 list_add_tail(&p_ptr->port_list, &ports); 264 spin_unlock_bh(&tipc_port_list_lock); 265 tipc_port_unlock(p_ptr); 266 return ref; 267} 268 269int tipc_deleteport(u32 ref) 270{ 271 struct port *p_ptr; 272 struct sk_buff *buf = NULL; 273 274 tipc_withdraw(ref, 0, NULL); 275 p_ptr = tipc_port_lock(ref); 276 if (!p_ptr) 277 return -EINVAL; 278 279 tipc_ref_discard(ref); 280 tipc_port_unlock(p_ptr); 281 282 k_cancel_timer(&p_ptr->timer); 283 if (p_ptr->publ.connected) { 284 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 285 tipc_nodesub_unsubscribe(&p_ptr->subscription); 286 } 287 if (p_ptr->user_port) { 288 tipc_reg_remove_port(p_ptr->user_port); 289 kfree(p_ptr->user_port); 290 } 291 292 spin_lock_bh(&tipc_port_list_lock); 293 list_del(&p_ptr->port_list); 294 list_del(&p_ptr->wait_list); 295 spin_unlock_bh(&tipc_port_list_lock); 296 k_term_timer(&p_ptr->timer); 297 kfree(p_ptr); 298 dbg("Deleted port %u\n", ref); 299 tipc_net_route_msg(buf); 300 return TIPC_OK; 301} 302 303/** 304 * tipc_get_port() - return port associated with 'ref' 305 * 306 * Note: Port is not locked. 307 */ 308 309struct tipc_port *tipc_get_port(const u32 ref) 310{ 311 return (struct tipc_port *)tipc_ref_deref(ref); 312} 313 314/** 315 * tipc_get_handle - return user handle associated to port 'ref' 316 */ 317 318void *tipc_get_handle(const u32 ref) 319{ 320 struct port *p_ptr; 321 void * handle; 322 323 p_ptr = tipc_port_lock(ref); 324 if (!p_ptr) 325 return NULL; 326 handle = p_ptr->publ.usr_handle; 327 tipc_port_unlock(p_ptr); 328 return handle; 329} 330 331static int port_unreliable(struct port *p_ptr) 332{ 333 return msg_src_droppable(&p_ptr->publ.phdr); 334} 335 336int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 337{ 338 struct port *p_ptr; 339 340 p_ptr = tipc_port_lock(ref); 341 if (!p_ptr) 342 return -EINVAL; 343 *isunreliable = port_unreliable(p_ptr); 344 spin_unlock_bh(p_ptr->publ.lock); 345 return TIPC_OK; 346} 347 348int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 349{ 350 struct port *p_ptr; 351 352 p_ptr = tipc_port_lock(ref); 353 if (!p_ptr) 354 return -EINVAL; 355 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); 356 tipc_port_unlock(p_ptr); 357 return TIPC_OK; 358} 359 360static int port_unreturnable(struct port *p_ptr) 361{ 362 return msg_dest_droppable(&p_ptr->publ.phdr); 363} 364 365int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 366{ 367 struct port *p_ptr; 368 369 p_ptr = tipc_port_lock(ref); 370 if (!p_ptr) 371 return -EINVAL; 372 *isunrejectable = port_unreturnable(p_ptr); 373 spin_unlock_bh(p_ptr->publ.lock); 374 return TIPC_OK; 375} 376 377int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 378{ 379 struct port *p_ptr; 380 381 p_ptr = tipc_port_lock(ref); 382 if (!p_ptr) 383 return -EINVAL; 384 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); 385 tipc_port_unlock(p_ptr); 386 return TIPC_OK; 387} 388 389/* 390 * port_build_proto_msg(): build a port level protocol 391 * or a connection abortion message. Called with 392 * tipc_port lock on. 393 */ 394static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, 395 u32 origport, u32 orignode, 396 u32 usr, u32 type, u32 err, 397 u32 seqno, u32 ack) 398{ 399 struct sk_buff *buf; 400 struct tipc_msg *msg; 401 402 buf = buf_acquire(LONG_H_SIZE); 403 if (buf) { 404 msg = buf_msg(buf); 405 msg_init(msg, usr, type, err, LONG_H_SIZE, destnode); 406 msg_set_destport(msg, destport); 407 msg_set_origport(msg, origport); 408 msg_set_destnode(msg, destnode); 409 msg_set_orignode(msg, orignode); 410 msg_set_transp_seqno(msg, seqno); 411 msg_set_msgcnt(msg, ack); 412 msg_dbg(msg, "PORT>SEND>:"); 413 } 414 return buf; 415} 416 417int tipc_set_msg_option(struct tipc_port *tp_ptr, const char *opt, const u32 sz) 418{ 419 msg_expand(&tp_ptr->phdr, msg_destnode(&tp_ptr->phdr)); 420 msg_set_options(&tp_ptr->phdr, opt, sz); 421 return TIPC_OK; 422} 423 424int tipc_reject_msg(struct sk_buff *buf, u32 err) 425{ 426 struct tipc_msg *msg = buf_msg(buf); 427 struct sk_buff *rbuf; 428 struct tipc_msg *rmsg; 429 int hdr_sz; 430 u32 imp = msg_importance(msg); 431 u32 data_sz = msg_data_sz(msg); 432 433 if (data_sz > MAX_REJECT_SIZE) 434 data_sz = MAX_REJECT_SIZE; 435 if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE)) 436 imp++; 437 msg_dbg(msg, "port->rej: "); 438 439 /* discard rejected message if it shouldn't be returned to sender */ 440 if (msg_errcode(msg) || msg_dest_droppable(msg)) { 441 buf_discard(buf); 442 return data_sz; 443 } 444 445 /* construct rejected message */ 446 if (msg_mcast(msg)) 447 hdr_sz = MCAST_H_SIZE; 448 else 449 hdr_sz = LONG_H_SIZE; 450 rbuf = buf_acquire(data_sz + hdr_sz); 451 if (rbuf == NULL) { 452 buf_discard(buf); 453 return data_sz; 454 } 455 rmsg = buf_msg(rbuf); 456 msg_init(rmsg, imp, msg_type(msg), err, hdr_sz, msg_orignode(msg)); 457 msg_set_destport(rmsg, msg_origport(msg)); 458 msg_set_prevnode(rmsg, tipc_own_addr); 459 msg_set_origport(rmsg, msg_destport(msg)); 460 if (msg_short(msg)) 461 msg_set_orignode(rmsg, tipc_own_addr); 462 else 463 msg_set_orignode(rmsg, msg_destnode(msg)); 464 msg_set_size(rmsg, data_sz + hdr_sz); 465 msg_set_nametype(rmsg, msg_nametype(msg)); 466 msg_set_nameinst(rmsg, msg_nameinst(msg)); 467 skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz); 468 469 /* send self-abort message when rejecting on a connected port */ 470 if (msg_connected(msg)) { 471 struct sk_buff *abuf = NULL; 472 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 473 474 if (p_ptr) { 475 if (p_ptr->publ.connected) 476 abuf = port_build_self_abort_msg(p_ptr, err); 477 tipc_port_unlock(p_ptr); 478 } 479 tipc_net_route_msg(abuf); 480 } 481 482 /* send rejected message */ 483 buf_discard(buf); 484 tipc_net_route_msg(rbuf); 485 return data_sz; 486} 487 488int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, 489 struct iovec const *msg_sect, u32 num_sect, 490 int err) 491{ 492 struct sk_buff *buf; 493 int res; 494 495 res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 496 !p_ptr->user_port, &buf); 497 if (!buf) 498 return res; 499 500 return tipc_reject_msg(buf, err); 501} 502 503static void port_timeout(unsigned long ref) 504{ 505 struct port *p_ptr = tipc_port_lock(ref); 506 struct sk_buff *buf = NULL; 507 508 if (!p_ptr) 509 return; 510 511 if (!p_ptr->publ.connected) { 512 tipc_port_unlock(p_ptr); 513 return; 514 } 515 516 /* Last probe answered ? */ 517 if (p_ptr->probing_state == PROBING) { 518 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 519 } else { 520 buf = port_build_proto_msg(port_peerport(p_ptr), 521 port_peernode(p_ptr), 522 p_ptr->publ.ref, 523 tipc_own_addr, 524 CONN_MANAGER, 525 CONN_PROBE, 526 TIPC_OK, 527 port_out_seqno(p_ptr), 528 0); 529 port_incr_out_seqno(p_ptr); 530 p_ptr->probing_state = PROBING; 531 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 532 } 533 tipc_port_unlock(p_ptr); 534 tipc_net_route_msg(buf); 535} 536 537 538static void port_handle_node_down(unsigned long ref) 539{ 540 struct port *p_ptr = tipc_port_lock(ref); 541 struct sk_buff* buf = NULL; 542 543 if (!p_ptr) 544 return; 545 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 546 tipc_port_unlock(p_ptr); 547 tipc_net_route_msg(buf); 548} 549 550 551static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err) 552{ 553 u32 imp = msg_importance(&p_ptr->publ.phdr); 554 555 if (!p_ptr->publ.connected) 556 return NULL; 557 if (imp < TIPC_CRITICAL_IMPORTANCE) 558 imp++; 559 return port_build_proto_msg(p_ptr->publ.ref, 560 tipc_own_addr, 561 port_peerport(p_ptr), 562 port_peernode(p_ptr), 563 imp, 564 TIPC_CONN_MSG, 565 err, 566 p_ptr->last_in_seqno + 1, 567 0); 568} 569 570 571static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err) 572{ 573 u32 imp = msg_importance(&p_ptr->publ.phdr); 574 575 if (!p_ptr->publ.connected) 576 return NULL; 577 if (imp < TIPC_CRITICAL_IMPORTANCE) 578 imp++; 579 return port_build_proto_msg(port_peerport(p_ptr), 580 port_peernode(p_ptr), 581 p_ptr->publ.ref, 582 tipc_own_addr, 583 imp, 584 TIPC_CONN_MSG, 585 err, 586 port_out_seqno(p_ptr), 587 0); 588} 589 590void tipc_port_recv_proto_msg(struct sk_buff *buf) 591{ 592 struct tipc_msg *msg = buf_msg(buf); 593 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 594 u32 err = TIPC_OK; 595 struct sk_buff *r_buf = NULL; 596 struct sk_buff *abort_buf = NULL; 597 598 msg_dbg(msg, "PORT<RECV<:"); 599 600 if (!p_ptr) { 601 err = TIPC_ERR_NO_PORT; 602 } else if (p_ptr->publ.connected) { 603 if (port_peernode(p_ptr) != msg_orignode(msg)) 604 err = TIPC_ERR_NO_PORT; 605 if (port_peerport(p_ptr) != msg_origport(msg)) 606 err = TIPC_ERR_NO_PORT; 607 if (!err && msg_routed(msg)) { 608 u32 seqno = msg_transp_seqno(msg); 609 u32 myno = ++p_ptr->last_in_seqno; 610 if (seqno != myno) { 611 err = TIPC_ERR_NO_PORT; 612 abort_buf = port_build_self_abort_msg(p_ptr, err); 613 } 614 } 615 if (msg_type(msg) == CONN_ACK) { 616 int wakeup = tipc_port_congested(p_ptr) && 617 p_ptr->publ.congested && 618 p_ptr->wakeup; 619 p_ptr->acked += msg_msgcnt(msg); 620 if (tipc_port_congested(p_ptr)) 621 goto exit; 622 p_ptr->publ.congested = 0; 623 if (!wakeup) 624 goto exit; 625 p_ptr->wakeup(&p_ptr->publ); 626 goto exit; 627 } 628 } else if (p_ptr->publ.published) { 629 err = TIPC_ERR_NO_PORT; 630 } 631 if (err) { 632 r_buf = port_build_proto_msg(msg_origport(msg), 633 msg_orignode(msg), 634 msg_destport(msg), 635 tipc_own_addr, 636 DATA_HIGH, 637 TIPC_CONN_MSG, 638 err, 639 0, 640 0); 641 goto exit; 642 } 643 644 /* All is fine */ 645 if (msg_type(msg) == CONN_PROBE) { 646 r_buf = port_build_proto_msg(msg_origport(msg), 647 msg_orignode(msg), 648 msg_destport(msg), 649 tipc_own_addr, 650 CONN_MANAGER, 651 CONN_PROBE_REPLY, 652 TIPC_OK, 653 port_out_seqno(p_ptr), 654 0); 655 } 656 p_ptr->probing_state = CONFIRMED; 657 port_incr_out_seqno(p_ptr); 658exit: 659 if (p_ptr) 660 tipc_port_unlock(p_ptr); 661 tipc_net_route_msg(r_buf); 662 tipc_net_route_msg(abort_buf); 663 buf_discard(buf); 664} 665 666static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id) 667{ 668 struct publication *publ; 669 670 if (full_id) 671 tipc_printf(buf, "<%u.%u.%u:%u>:", 672 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 673 tipc_node(tipc_own_addr), p_ptr->publ.ref); 674 else 675 tipc_printf(buf, "%-10u:", p_ptr->publ.ref); 676 677 if (p_ptr->publ.connected) { 678 u32 dport = port_peerport(p_ptr); 679 u32 destnode = port_peernode(p_ptr); 680 681 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 682 tipc_zone(destnode), tipc_cluster(destnode), 683 tipc_node(destnode), dport); 684 if (p_ptr->publ.conn_type != 0) 685 tipc_printf(buf, " via {%u,%u}", 686 p_ptr->publ.conn_type, 687 p_ptr->publ.conn_instance); 688 } 689 else if (p_ptr->publ.published) { 690 tipc_printf(buf, " bound to"); 691 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 692 if (publ->lower == publ->upper) 693 tipc_printf(buf, " {%u,%u}", publ->type, 694 publ->lower); 695 else 696 tipc_printf(buf, " {%u,%u,%u}", publ->type, 697 publ->lower, publ->upper); 698 } 699 } 700 tipc_printf(buf, "\n"); 701} 702 703#define MAX_PORT_QUERY 32768 704 705struct sk_buff *tipc_port_get_ports(void) 706{ 707 struct sk_buff *buf; 708 struct tlv_desc *rep_tlv; 709 struct print_buf pb; 710 struct port *p_ptr; 711 int str_len; 712 713 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 714 if (!buf) 715 return NULL; 716 rep_tlv = (struct tlv_desc *)buf->data; 717 718 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 719 spin_lock_bh(&tipc_port_list_lock); 720 list_for_each_entry(p_ptr, &ports, port_list) { 721 spin_lock_bh(p_ptr->publ.lock); 722 port_print(p_ptr, &pb, 0); 723 spin_unlock_bh(p_ptr->publ.lock); 724 } 725 spin_unlock_bh(&tipc_port_list_lock); 726 str_len = tipc_printbuf_validate(&pb); 727 728 skb_put(buf, TLV_SPACE(str_len)); 729 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 730 731 return buf; 732} 733 734 735void tipc_port_reinit(void) 736{ 737 struct port *p_ptr; 738 struct tipc_msg *msg; 739 740 spin_lock_bh(&tipc_port_list_lock); 741 list_for_each_entry(p_ptr, &ports, port_list) { 742 msg = &p_ptr->publ.phdr; 743 if (msg_orignode(msg) == tipc_own_addr) 744 break; 745 msg_set_orignode(msg, tipc_own_addr); 746 } 747 spin_unlock_bh(&tipc_port_list_lock); 748} 749 750 751/* 752 * port_dispatcher_sigh(): Signal handler for messages destinated 753 * to the tipc_port interface. 754 */ 755 756static void port_dispatcher_sigh(void *dummy) 757{ 758 struct sk_buff *buf; 759 760 spin_lock_bh(&queue_lock); 761 buf = msg_queue_head; 762 msg_queue_head = NULL; 763 spin_unlock_bh(&queue_lock); 764 765 while (buf) { 766 struct port *p_ptr; 767 struct user_port *up_ptr; 768 struct tipc_portid orig; 769 struct tipc_name_seq dseq; 770 void *usr_handle; 771 int connected; 772 int published; 773 u32 message_type; 774 775 struct sk_buff *next = buf->next; 776 struct tipc_msg *msg = buf_msg(buf); 777 u32 dref = msg_destport(msg); 778 779 message_type = msg_type(msg); 780 if (message_type > TIPC_DIRECT_MSG) 781 goto reject; /* Unsupported message type */ 782 783 p_ptr = tipc_port_lock(dref); 784 if (!p_ptr) 785 goto reject; /* Port deleted while msg in queue */ 786 787 orig.ref = msg_origport(msg); 788 orig.node = msg_orignode(msg); 789 up_ptr = p_ptr->user_port; 790 usr_handle = up_ptr->usr_handle; 791 connected = p_ptr->publ.connected; 792 published = p_ptr->publ.published; 793 794 if (unlikely(msg_errcode(msg))) 795 goto err; 796 797 switch (message_type) { 798 799 case TIPC_CONN_MSG:{ 800 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 801 u32 peer_port = port_peerport(p_ptr); 802 u32 peer_node = port_peernode(p_ptr); 803 804 spin_unlock_bh(p_ptr->publ.lock); 805 if (unlikely(!connected)) { 806 if (unlikely(published)) 807 goto reject; 808 tipc_connect2port(dref,&orig); 809 } 810 if (unlikely(msg_origport(msg) != peer_port)) 811 goto reject; 812 if (unlikely(msg_orignode(msg) != peer_node)) 813 goto reject; 814 if (unlikely(!cb)) 815 goto reject; 816 if (unlikely(++p_ptr->publ.conn_unacked >= 817 TIPC_FLOW_CONTROL_WIN)) 818 tipc_acknowledge(dref, 819 p_ptr->publ.conn_unacked); 820 skb_pull(buf, msg_hdr_sz(msg)); 821 cb(usr_handle, dref, &buf, msg_data(msg), 822 msg_data_sz(msg)); 823 break; 824 } 825 case TIPC_DIRECT_MSG:{ 826 tipc_msg_event cb = up_ptr->msg_cb; 827 828 spin_unlock_bh(p_ptr->publ.lock); 829 if (unlikely(connected)) 830 goto reject; 831 if (unlikely(!cb)) 832 goto reject; 833 skb_pull(buf, msg_hdr_sz(msg)); 834 cb(usr_handle, dref, &buf, msg_data(msg), 835 msg_data_sz(msg), msg_importance(msg), 836 &orig); 837 break; 838 } 839 case TIPC_MCAST_MSG: 840 case TIPC_NAMED_MSG:{ 841 tipc_named_msg_event cb = up_ptr->named_msg_cb; 842 843 spin_unlock_bh(p_ptr->publ.lock); 844 if (unlikely(connected)) 845 goto reject; 846 if (unlikely(!cb)) 847 goto reject; 848 if (unlikely(!published)) 849 goto reject; 850 dseq.type = msg_nametype(msg); 851 dseq.lower = msg_nameinst(msg); 852 dseq.upper = (message_type == TIPC_NAMED_MSG) 853 ? dseq.lower : msg_nameupper(msg); 854 skb_pull(buf, msg_hdr_sz(msg)); 855 cb(usr_handle, dref, &buf, msg_data(msg), 856 msg_data_sz(msg), msg_importance(msg), 857 &orig, &dseq); 858 break; 859 } 860 } 861 if (buf) 862 buf_discard(buf); 863 buf = next; 864 continue; 865err: 866 switch (message_type) { 867 868 case TIPC_CONN_MSG:{ 869 tipc_conn_shutdown_event cb = 870 up_ptr->conn_err_cb; 871 u32 peer_port = port_peerport(p_ptr); 872 u32 peer_node = port_peernode(p_ptr); 873 874 spin_unlock_bh(p_ptr->publ.lock); 875 if (!connected || !cb) 876 break; 877 if (msg_origport(msg) != peer_port) 878 break; 879 if (msg_orignode(msg) != peer_node) 880 break; 881 tipc_disconnect(dref); 882 skb_pull(buf, msg_hdr_sz(msg)); 883 cb(usr_handle, dref, &buf, msg_data(msg), 884 msg_data_sz(msg), msg_errcode(msg)); 885 break; 886 } 887 case TIPC_DIRECT_MSG:{ 888 tipc_msg_err_event cb = up_ptr->err_cb; 889 890 spin_unlock_bh(p_ptr->publ.lock); 891 if (connected || !cb) 892 break; 893 skb_pull(buf, msg_hdr_sz(msg)); 894 cb(usr_handle, dref, &buf, msg_data(msg), 895 msg_data_sz(msg), msg_errcode(msg), &orig); 896 break; 897 } 898 case TIPC_MCAST_MSG: 899 case TIPC_NAMED_MSG:{ 900 tipc_named_msg_err_event cb = 901 up_ptr->named_err_cb; 902 903 spin_unlock_bh(p_ptr->publ.lock); 904 if (connected || !cb) 905 break; 906 dseq.type = msg_nametype(msg); 907 dseq.lower = msg_nameinst(msg); 908 dseq.upper = (message_type == TIPC_NAMED_MSG) 909 ? dseq.lower : msg_nameupper(msg); 910 skb_pull(buf, msg_hdr_sz(msg)); 911 cb(usr_handle, dref, &buf, msg_data(msg), 912 msg_data_sz(msg), msg_errcode(msg), &dseq); 913 break; 914 } 915 } 916 if (buf) 917 buf_discard(buf); 918 buf = next; 919 continue; 920reject: 921 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 922 buf = next; 923 } 924} 925 926/* 927 * port_dispatcher(): Dispatcher for messages destinated 928 * to the tipc_port interface. Called with port locked. 929 */ 930 931static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 932{ 933 buf->next = NULL; 934 spin_lock_bh(&queue_lock); 935 if (msg_queue_head) { 936 msg_queue_tail->next = buf; 937 msg_queue_tail = buf; 938 } else { 939 msg_queue_tail = msg_queue_head = buf; 940 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 941 } 942 spin_unlock_bh(&queue_lock); 943 return TIPC_OK; 944} 945 946/* 947 * Wake up port after congestion: Called with port locked, 948 * 949 */ 950 951static void port_wakeup_sh(unsigned long ref) 952{ 953 struct port *p_ptr; 954 struct user_port *up_ptr; 955 tipc_continue_event cb = NULL; 956 void *uh = NULL; 957 958 p_ptr = tipc_port_lock(ref); 959 if (p_ptr) { 960 up_ptr = p_ptr->user_port; 961 if (up_ptr) { 962 cb = up_ptr->continue_event_cb; 963 uh = up_ptr->usr_handle; 964 } 965 tipc_port_unlock(p_ptr); 966 } 967 if (cb) 968 cb(uh, ref); 969} 970 971 972static void port_wakeup(struct tipc_port *p_ptr) 973{ 974 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 975} 976 977void tipc_acknowledge(u32 ref, u32 ack) 978{ 979 struct port *p_ptr; 980 struct sk_buff *buf = NULL; 981 982 p_ptr = tipc_port_lock(ref); 983 if (!p_ptr) 984 return; 985 if (p_ptr->publ.connected) { 986 p_ptr->publ.conn_unacked -= ack; 987 buf = port_build_proto_msg(port_peerport(p_ptr), 988 port_peernode(p_ptr), 989 ref, 990 tipc_own_addr, 991 CONN_MANAGER, 992 CONN_ACK, 993 TIPC_OK, 994 port_out_seqno(p_ptr), 995 ack); 996 } 997 tipc_port_unlock(p_ptr); 998 tipc_net_route_msg(buf); 999} 1000 1001/* 1002 * tipc_createport(): user level call. Will add port to 1003 * registry if non-zero user_ref. 1004 */ 1005 1006int tipc_createport(u32 user_ref, 1007 void *usr_handle, 1008 unsigned int importance, 1009 tipc_msg_err_event error_cb, 1010 tipc_named_msg_err_event named_error_cb, 1011 tipc_conn_shutdown_event conn_error_cb, 1012 tipc_msg_event msg_cb, 1013 tipc_named_msg_event named_msg_cb, 1014 tipc_conn_msg_event conn_msg_cb, 1015 tipc_continue_event continue_event_cb,/* May be zero */ 1016 u32 *portref) 1017{ 1018 struct user_port *up_ptr; 1019 struct port *p_ptr; 1020 u32 ref; 1021 1022 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 1023 if (!up_ptr) { 1024 warn("Port creation failed, no memory\n"); 1025 return -ENOMEM; 1026 } 1027 ref = tipc_createport_raw(NULL, port_dispatcher, port_wakeup, importance); 1028 p_ptr = tipc_port_lock(ref); 1029 if (!p_ptr) { 1030 kfree(up_ptr); 1031 return -ENOMEM; 1032 } 1033 1034 p_ptr->user_port = up_ptr; 1035 up_ptr->user_ref = user_ref; 1036 up_ptr->usr_handle = usr_handle; 1037 up_ptr->ref = p_ptr->publ.ref; 1038 up_ptr->err_cb = error_cb; 1039 up_ptr->named_err_cb = named_error_cb; 1040 up_ptr->conn_err_cb = conn_error_cb; 1041 up_ptr->msg_cb = msg_cb; 1042 up_ptr->named_msg_cb = named_msg_cb; 1043 up_ptr->conn_msg_cb = conn_msg_cb; 1044 up_ptr->continue_event_cb = continue_event_cb; 1045 INIT_LIST_HEAD(&up_ptr->uport_list); 1046 tipc_reg_add_port(up_ptr); 1047 *portref = p_ptr->publ.ref; 1048 dbg(" tipc_createport: %x with ref %u\n", p_ptr, p_ptr->publ.ref); 1049 tipc_port_unlock(p_ptr); 1050 return TIPC_OK; 1051} 1052 1053int tipc_ownidentity(u32 ref, struct tipc_portid *id) 1054{ 1055 id->ref = ref; 1056 id->node = tipc_own_addr; 1057 return TIPC_OK; 1058} 1059 1060int tipc_portimportance(u32 ref, unsigned int *importance) 1061{ 1062 struct port *p_ptr; 1063 1064 p_ptr = tipc_port_lock(ref); 1065 if (!p_ptr) 1066 return -EINVAL; 1067 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); 1068 spin_unlock_bh(p_ptr->publ.lock); 1069 return TIPC_OK; 1070} 1071 1072int tipc_set_portimportance(u32 ref, unsigned int imp) 1073{ 1074 struct port *p_ptr; 1075 1076 if (imp > TIPC_CRITICAL_IMPORTANCE) 1077 return -EINVAL; 1078 1079 p_ptr = tipc_port_lock(ref); 1080 if (!p_ptr) 1081 return -EINVAL; 1082 msg_set_importance(&p_ptr->publ.phdr, (u32)imp); 1083 spin_unlock_bh(p_ptr->publ.lock); 1084 return TIPC_OK; 1085} 1086 1087 1088int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1089{ 1090 struct port *p_ptr; 1091 struct publication *publ; 1092 u32 key; 1093 int res = -EINVAL; 1094 1095 p_ptr = tipc_port_lock(ref); 1096 if (!p_ptr) 1097 return -EINVAL; 1098 1099 dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, " 1100 "lower = %u, upper = %u\n", 1101 ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper); 1102 if (p_ptr->publ.connected) 1103 goto exit; 1104 if (seq->lower > seq->upper) 1105 goto exit; 1106 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE)) 1107 goto exit; 1108 key = ref + p_ptr->pub_count + 1; 1109 if (key == ref) { 1110 res = -EADDRINUSE; 1111 goto exit; 1112 } 1113 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 1114 scope, p_ptr->publ.ref, key); 1115 if (publ) { 1116 list_add(&publ->pport_list, &p_ptr->publications); 1117 p_ptr->pub_count++; 1118 p_ptr->publ.published = 1; 1119 res = TIPC_OK; 1120 } 1121exit: 1122 tipc_port_unlock(p_ptr); 1123 return res; 1124} 1125 1126int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1127{ 1128 struct port *p_ptr; 1129 struct publication *publ; 1130 struct publication *tpubl; 1131 int res = -EINVAL; 1132 1133 p_ptr = tipc_port_lock(ref); 1134 if (!p_ptr) 1135 return -EINVAL; 1136 if (!seq) { 1137 list_for_each_entry_safe(publ, tpubl, 1138 &p_ptr->publications, pport_list) { 1139 tipc_nametbl_withdraw(publ->type, publ->lower, 1140 publ->ref, publ->key); 1141 } 1142 res = TIPC_OK; 1143 } else { 1144 list_for_each_entry_safe(publ, tpubl, 1145 &p_ptr->publications, pport_list) { 1146 if (publ->scope != scope) 1147 continue; 1148 if (publ->type != seq->type) 1149 continue; 1150 if (publ->lower != seq->lower) 1151 continue; 1152 if (publ->upper != seq->upper) 1153 break; 1154 tipc_nametbl_withdraw(publ->type, publ->lower, 1155 publ->ref, publ->key); 1156 res = TIPC_OK; 1157 break; 1158 } 1159 } 1160 if (list_empty(&p_ptr->publications)) 1161 p_ptr->publ.published = 0; 1162 tipc_port_unlock(p_ptr); 1163 return res; 1164} 1165 1166int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1167{ 1168 struct port *p_ptr; 1169 struct tipc_msg *msg; 1170 int res = -EINVAL; 1171 1172 p_ptr = tipc_port_lock(ref); 1173 if (!p_ptr) 1174 return -EINVAL; 1175 if (p_ptr->publ.published || p_ptr->publ.connected) 1176 goto exit; 1177 if (!peer->ref) 1178 goto exit; 1179 1180 msg = &p_ptr->publ.phdr; 1181 msg_set_destnode(msg, peer->node); 1182 msg_set_destport(msg, peer->ref); 1183 msg_set_orignode(msg, tipc_own_addr); 1184 msg_set_origport(msg, p_ptr->publ.ref); 1185 msg_set_transp_seqno(msg, 42); 1186 msg_set_type(msg, TIPC_CONN_MSG); 1187 if (!may_route(peer->node)) 1188 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1189 else 1190 msg_set_hdr_sz(msg, LONG_H_SIZE); 1191 1192 p_ptr->probing_interval = PROBING_INTERVAL; 1193 p_ptr->probing_state = CONFIRMED; 1194 p_ptr->publ.connected = 1; 1195 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1196 1197 tipc_nodesub_subscribe(&p_ptr->subscription,peer->node, 1198 (void *)(unsigned long)ref, 1199 (net_ev_handler)port_handle_node_down); 1200 res = TIPC_OK; 1201exit: 1202 tipc_port_unlock(p_ptr); 1203 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1204 return res; 1205} 1206 1207/* 1208 * tipc_disconnect(): Disconnect port form peer. 1209 * This is a node local operation. 1210 */ 1211 1212int tipc_disconnect(u32 ref) 1213{ 1214 struct port *p_ptr; 1215 int res = -ENOTCONN; 1216 1217 p_ptr = tipc_port_lock(ref); 1218 if (!p_ptr) 1219 return -EINVAL; 1220 if (p_ptr->publ.connected) { 1221 p_ptr->publ.connected = 0; 1222 /* let timer expire on it's own to avoid deadlock! */ 1223 tipc_nodesub_unsubscribe(&p_ptr->subscription); 1224 res = TIPC_OK; 1225 } 1226 tipc_port_unlock(p_ptr); 1227 return res; 1228} 1229 1230/* 1231 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1232 */ 1233int tipc_shutdown(u32 ref) 1234{ 1235 struct port *p_ptr; 1236 struct sk_buff *buf = NULL; 1237 1238 p_ptr = tipc_port_lock(ref); 1239 if (!p_ptr) 1240 return -EINVAL; 1241 1242 if (p_ptr->publ.connected) { 1243 u32 imp = msg_importance(&p_ptr->publ.phdr); 1244 if (imp < TIPC_CRITICAL_IMPORTANCE) 1245 imp++; 1246 buf = port_build_proto_msg(port_peerport(p_ptr), 1247 port_peernode(p_ptr), 1248 ref, 1249 tipc_own_addr, 1250 imp, 1251 TIPC_CONN_MSG, 1252 TIPC_CONN_SHUTDOWN, 1253 port_out_seqno(p_ptr), 1254 0); 1255 } 1256 tipc_port_unlock(p_ptr); 1257 tipc_net_route_msg(buf); 1258 return tipc_disconnect(ref); 1259} 1260 1261int tipc_isconnected(u32 ref, int *isconnected) 1262{ 1263 struct port *p_ptr; 1264 1265 p_ptr = tipc_port_lock(ref); 1266 if (!p_ptr) 1267 return -EINVAL; 1268 *isconnected = p_ptr->publ.connected; 1269 tipc_port_unlock(p_ptr); 1270 return TIPC_OK; 1271} 1272 1273int tipc_peer(u32 ref, struct tipc_portid *peer) 1274{ 1275 struct port *p_ptr; 1276 int res; 1277 1278 p_ptr = tipc_port_lock(ref); 1279 if (!p_ptr) 1280 return -EINVAL; 1281 if (p_ptr->publ.connected) { 1282 peer->ref = port_peerport(p_ptr); 1283 peer->node = port_peernode(p_ptr); 1284 res = TIPC_OK; 1285 } else 1286 res = -ENOTCONN; 1287 tipc_port_unlock(p_ptr); 1288 return res; 1289} 1290 1291int tipc_ref_valid(u32 ref) 1292{ 1293 /* Works irrespective of type */ 1294 return !!tipc_ref_deref(ref); 1295} 1296 1297 1298/* 1299 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1300 * message for this node. 1301 */ 1302 1303int tipc_port_recv_sections(struct port *sender, unsigned int num_sect, 1304 struct iovec const *msg_sect) 1305{ 1306 struct sk_buff *buf; 1307 int res; 1308 1309 res = msg_build(&sender->publ.phdr, msg_sect, num_sect, 1310 MAX_MSG_SIZE, !sender->user_port, &buf); 1311 if (likely(buf)) 1312 tipc_port_recv_msg(buf); 1313 return res; 1314} 1315 1316/** 1317 * tipc_send - send message sections on connection 1318 */ 1319 1320int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect) 1321{ 1322 struct port *p_ptr; 1323 u32 destnode; 1324 int res; 1325 1326 p_ptr = tipc_port_deref(ref); 1327 if (!p_ptr || !p_ptr->publ.connected) 1328 return -EINVAL; 1329 1330 p_ptr->publ.congested = 1; 1331 if (!tipc_port_congested(p_ptr)) { 1332 destnode = port_peernode(p_ptr); 1333 if (likely(destnode != tipc_own_addr)) 1334 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1335 destnode); 1336 else 1337 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1338 1339 if (likely(res != -ELINKCONG)) { 1340 port_incr_out_seqno(p_ptr); 1341 p_ptr->publ.congested = 0; 1342 p_ptr->sent++; 1343 return res; 1344 } 1345 } 1346 if (port_unreliable(p_ptr)) { 1347 p_ptr->publ.congested = 0; 1348 /* Just calculate msg length and return */ 1349 return msg_calc_data_size(msg_sect, num_sect); 1350 } 1351 return -ELINKCONG; 1352} 1353 1354/** 1355 * tipc_send_buf - send message buffer on connection 1356 */ 1357 1358int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz) 1359{ 1360 struct port *p_ptr; 1361 struct tipc_msg *msg; 1362 u32 destnode; 1363 u32 hsz; 1364 u32 sz; 1365 u32 res; 1366 1367 p_ptr = tipc_port_deref(ref); 1368 if (!p_ptr || !p_ptr->publ.connected) 1369 return -EINVAL; 1370 1371 msg = &p_ptr->publ.phdr; 1372 hsz = msg_hdr_sz(msg); 1373 sz = hsz + dsz; 1374 msg_set_size(msg, sz); 1375 if (skb_cow(buf, hsz)) 1376 return -ENOMEM; 1377 1378 skb_push(buf, hsz); 1379 skb_copy_to_linear_data(buf, msg, hsz); 1380 destnode = msg_destnode(msg); 1381 p_ptr->publ.congested = 1; 1382 if (!tipc_port_congested(p_ptr)) { 1383 if (likely(destnode != tipc_own_addr)) 1384 res = tipc_send_buf_fast(buf, destnode); 1385 else { 1386 tipc_port_recv_msg(buf); 1387 res = sz; 1388 } 1389 if (likely(res != -ELINKCONG)) { 1390 port_incr_out_seqno(p_ptr); 1391 p_ptr->sent++; 1392 p_ptr->publ.congested = 0; 1393 return res; 1394 } 1395 } 1396 if (port_unreliable(p_ptr)) { 1397 p_ptr->publ.congested = 0; 1398 return dsz; 1399 } 1400 return -ELINKCONG; 1401} 1402 1403/** 1404 * tipc_forward2name - forward message sections to port name 1405 */ 1406 1407int tipc_forward2name(u32 ref, 1408 struct tipc_name const *name, 1409 u32 domain, 1410 u32 num_sect, 1411 struct iovec const *msg_sect, 1412 struct tipc_portid const *orig, 1413 unsigned int importance) 1414{ 1415 struct port *p_ptr; 1416 struct tipc_msg *msg; 1417 u32 destnode = domain; 1418 u32 destport = 0; 1419 int res; 1420 1421 p_ptr = tipc_port_deref(ref); 1422 if (!p_ptr || p_ptr->publ.connected) 1423 return -EINVAL; 1424 1425 msg = &p_ptr->publ.phdr; 1426 msg_set_type(msg, TIPC_NAMED_MSG); 1427 msg_set_orignode(msg, orig->node); 1428 msg_set_origport(msg, orig->ref); 1429 msg_set_hdr_sz(msg, LONG_H_SIZE); 1430 msg_set_nametype(msg, name->type); 1431 msg_set_nameinst(msg, name->instance); 1432 msg_set_lookup_scope(msg, addr_scope(domain)); 1433 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1434 msg_set_importance(msg,importance); 1435 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1436 msg_set_destnode(msg, destnode); 1437 msg_set_destport(msg, destport); 1438 1439 if (likely(destport || destnode)) { 1440 p_ptr->sent++; 1441 if (likely(destnode == tipc_own_addr)) 1442 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1443 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1444 destnode); 1445 if (likely(res != -ELINKCONG)) 1446 return res; 1447 if (port_unreliable(p_ptr)) { 1448 /* Just calculate msg length and return */ 1449 return msg_calc_data_size(msg_sect, num_sect); 1450 } 1451 return -ELINKCONG; 1452 } 1453 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1454 TIPC_ERR_NO_NAME); 1455} 1456 1457/** 1458 * tipc_send2name - send message sections to port name 1459 */ 1460 1461int tipc_send2name(u32 ref, 1462 struct tipc_name const *name, 1463 unsigned int domain, 1464 unsigned int num_sect, 1465 struct iovec const *msg_sect) 1466{ 1467 struct tipc_portid orig; 1468 1469 orig.ref = ref; 1470 orig.node = tipc_own_addr; 1471 return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig, 1472 TIPC_PORT_IMPORTANCE); 1473} 1474 1475/** 1476 * tipc_forward_buf2name - forward message buffer to port name 1477 */ 1478 1479int tipc_forward_buf2name(u32 ref, 1480 struct tipc_name const *name, 1481 u32 domain, 1482 struct sk_buff *buf, 1483 unsigned int dsz, 1484 struct tipc_portid const *orig, 1485 unsigned int importance) 1486{ 1487 struct port *p_ptr; 1488 struct tipc_msg *msg; 1489 u32 destnode = domain; 1490 u32 destport = 0; 1491 int res; 1492 1493 p_ptr = (struct port *)tipc_ref_deref(ref); 1494 if (!p_ptr || p_ptr->publ.connected) 1495 return -EINVAL; 1496 1497 msg = &p_ptr->publ.phdr; 1498 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1499 msg_set_importance(msg, importance); 1500 msg_set_type(msg, TIPC_NAMED_MSG); 1501 msg_set_orignode(msg, orig->node); 1502 msg_set_origport(msg, orig->ref); 1503 msg_set_nametype(msg, name->type); 1504 msg_set_nameinst(msg, name->instance); 1505 msg_set_lookup_scope(msg, addr_scope(domain)); 1506 msg_set_hdr_sz(msg, LONG_H_SIZE); 1507 msg_set_size(msg, LONG_H_SIZE + dsz); 1508 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1509 msg_set_destnode(msg, destnode); 1510 msg_set_destport(msg, destport); 1511 msg_dbg(msg, "forw2name ==> "); 1512 if (skb_cow(buf, LONG_H_SIZE)) 1513 return -ENOMEM; 1514 skb_push(buf, LONG_H_SIZE); 1515 skb_copy_to_linear_data(buf, msg, LONG_H_SIZE); 1516 msg_dbg(buf_msg(buf),"PREP:"); 1517 if (likely(destport || destnode)) { 1518 p_ptr->sent++; 1519 if (destnode == tipc_own_addr) 1520 return tipc_port_recv_msg(buf); 1521 res = tipc_send_buf_fast(buf, destnode); 1522 if (likely(res != -ELINKCONG)) 1523 return res; 1524 if (port_unreliable(p_ptr)) 1525 return dsz; 1526 return -ELINKCONG; 1527 } 1528 return tipc_reject_msg(buf, TIPC_ERR_NO_NAME); 1529} 1530 1531/** 1532 * tipc_send_buf2name - send message buffer to port name 1533 */ 1534 1535int tipc_send_buf2name(u32 ref, 1536 struct tipc_name const *dest, 1537 u32 domain, 1538 struct sk_buff *buf, 1539 unsigned int dsz) 1540{ 1541 struct tipc_portid orig; 1542 1543 orig.ref = ref; 1544 orig.node = tipc_own_addr; 1545 return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig, 1546 TIPC_PORT_IMPORTANCE); 1547} 1548 1549/** 1550 * tipc_forward2port - forward message sections to port identity 1551 */ 1552 1553int tipc_forward2port(u32 ref, 1554 struct tipc_portid const *dest, 1555 unsigned int num_sect, 1556 struct iovec const *msg_sect, 1557 struct tipc_portid const *orig, 1558 unsigned int importance) 1559{ 1560 struct port *p_ptr; 1561 struct tipc_msg *msg; 1562 int res; 1563 1564 p_ptr = tipc_port_deref(ref); 1565 if (!p_ptr || p_ptr->publ.connected) 1566 return -EINVAL; 1567 1568 msg = &p_ptr->publ.phdr; 1569 msg_set_type(msg, TIPC_DIRECT_MSG); 1570 msg_set_orignode(msg, orig->node); 1571 msg_set_origport(msg, orig->ref); 1572 msg_set_destnode(msg, dest->node); 1573 msg_set_destport(msg, dest->ref); 1574 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1575 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1576 msg_set_importance(msg, importance); 1577 p_ptr->sent++; 1578 if (dest->node == tipc_own_addr) 1579 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1580 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node); 1581 if (likely(res != -ELINKCONG)) 1582 return res; 1583 if (port_unreliable(p_ptr)) { 1584 /* Just calculate msg length and return */ 1585 return msg_calc_data_size(msg_sect, num_sect); 1586 } 1587 return -ELINKCONG; 1588} 1589 1590/** 1591 * tipc_send2port - send message sections to port identity 1592 */ 1593 1594int tipc_send2port(u32 ref, 1595 struct tipc_portid const *dest, 1596 unsigned int num_sect, 1597 struct iovec const *msg_sect) 1598{ 1599 struct tipc_portid orig; 1600 1601 orig.ref = ref; 1602 orig.node = tipc_own_addr; 1603 return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 1604 TIPC_PORT_IMPORTANCE); 1605} 1606 1607/** 1608 * tipc_forward_buf2port - forward message buffer to port identity 1609 */ 1610int tipc_forward_buf2port(u32 ref, 1611 struct tipc_portid const *dest, 1612 struct sk_buff *buf, 1613 unsigned int dsz, 1614 struct tipc_portid const *orig, 1615 unsigned int importance) 1616{ 1617 struct port *p_ptr; 1618 struct tipc_msg *msg; 1619 int res; 1620 1621 p_ptr = (struct port *)tipc_ref_deref(ref); 1622 if (!p_ptr || p_ptr->publ.connected) 1623 return -EINVAL; 1624 1625 msg = &p_ptr->publ.phdr; 1626 msg_set_type(msg, TIPC_DIRECT_MSG); 1627 msg_set_orignode(msg, orig->node); 1628 msg_set_origport(msg, orig->ref); 1629 msg_set_destnode(msg, dest->node); 1630 msg_set_destport(msg, dest->ref); 1631 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1632 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1633 msg_set_importance(msg, importance); 1634 msg_set_size(msg, DIR_MSG_H_SIZE + dsz); 1635 if (skb_cow(buf, DIR_MSG_H_SIZE)) 1636 return -ENOMEM; 1637 1638 skb_push(buf, DIR_MSG_H_SIZE); 1639 skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE); 1640 msg_dbg(msg, "buf2port: "); 1641 p_ptr->sent++; 1642 if (dest->node == tipc_own_addr) 1643 return tipc_port_recv_msg(buf); 1644 res = tipc_send_buf_fast(buf, dest->node); 1645 if (likely(res != -ELINKCONG)) 1646 return res; 1647 if (port_unreliable(p_ptr)) 1648 return dsz; 1649 return -ELINKCONG; 1650} 1651 1652/** 1653 * tipc_send_buf2port - send message buffer to port identity 1654 */ 1655 1656int tipc_send_buf2port(u32 ref, 1657 struct tipc_portid const *dest, 1658 struct sk_buff *buf, 1659 unsigned int dsz) 1660{ 1661 struct tipc_portid orig; 1662 1663 orig.ref = ref; 1664 orig.node = tipc_own_addr; 1665 return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 1666 TIPC_PORT_IMPORTANCE); 1667} 1668