1/* 2 * net/tipc/port.c: TIPC port code 3 * 4 * Copyright (c) 1992-2007, Ericsson AB 5 * Copyright (c) 2004-2008, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "config.h" 39#include "dbg.h" 40#include "port.h" 41#include "addr.h" 42#include "link.h" 43#include "node.h" 44#include "name_table.h" 45#include "user_reg.h" 46#include "msg.h" 47#include "bcast.h" 48 49/* Connection management: */ 50#define PROBING_INTERVAL 3600000 /* [ms] => 1 h */ 51#define CONFIRMED 0 52#define PROBING 1 53 54#define MAX_REJECT_SIZE 1024 55 56static struct sk_buff *msg_queue_head = NULL; 57static struct sk_buff *msg_queue_tail = NULL; 58 59DEFINE_SPINLOCK(tipc_port_list_lock); 60static DEFINE_SPINLOCK(queue_lock); 61 62static LIST_HEAD(ports); 63static void port_handle_node_down(unsigned long ref); 64static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err); 65static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err); 66static void port_timeout(unsigned long ref); 67 68 69static u32 port_peernode(struct port *p_ptr) 70{ 71 return msg_destnode(&p_ptr->publ.phdr); 72} 73 74static u32 port_peerport(struct port *p_ptr) 75{ 76 return msg_destport(&p_ptr->publ.phdr); 77} 78 79static u32 port_out_seqno(struct port *p_ptr) 80{ 81 return msg_transp_seqno(&p_ptr->publ.phdr); 82} 83 84static void port_incr_out_seqno(struct port *p_ptr) 85{ 86 struct tipc_msg *m = &p_ptr->publ.phdr; 87 88 if (likely(!msg_routed(m))) 89 return; 90 msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1)); 91} 92 93/** 94 * tipc_multicast - send a multicast message to local and remote destinations 95 */ 96 97int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain, 98 u32 num_sect, struct iovec const *msg_sect) 99{ 100 struct tipc_msg *hdr; 101 struct sk_buff *buf; 102 struct sk_buff *ibuf = NULL; 103 struct port_list dports = {0, NULL, }; 104 struct port *oport = tipc_port_deref(ref); 105 int ext_targets; 106 int res; 107 108 if (unlikely(!oport)) 109 return -EINVAL; 110 111 /* Create multicast message */ 112 113 hdr = &oport->publ.phdr; 114 msg_set_type(hdr, TIPC_MCAST_MSG); 115 msg_set_nametype(hdr, seq->type); 116 msg_set_namelower(hdr, seq->lower); 117 msg_set_nameupper(hdr, seq->upper); 118 msg_set_hdr_sz(hdr, MCAST_H_SIZE); 119 res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 120 !oport->user_port, &buf); 121 if (unlikely(!buf)) 122 return res; 123 124 /* Figure out where to send multicast message */ 125 126 ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper, 127 TIPC_NODE_SCOPE, &dports); 128 129 /* Send message to destinations (duplicate it only if necessary) */ 130 131 if (ext_targets) { 132 if (dports.count != 0) { 133 ibuf = skb_copy(buf, GFP_ATOMIC); 134 if (ibuf == NULL) { 135 tipc_port_list_free(&dports); 136 buf_discard(buf); 137 return -ENOMEM; 138 } 139 } 140 res = tipc_bclink_send_msg(buf); 141 if ((res < 0) && (dports.count != 0)) { 142 buf_discard(ibuf); 143 } 144 } else { 145 ibuf = buf; 146 } 147 148 if (res >= 0) { 149 if (ibuf) 150 tipc_port_recv_mcast(ibuf, &dports); 151 } else { 152 tipc_port_list_free(&dports); 153 } 154 return res; 155} 156 157/** 158 * tipc_port_recv_mcast - deliver multicast message to all destination ports 159 * 160 * If there is no port list, perform a lookup to create one 161 */ 162 163void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp) 164{ 165 struct tipc_msg* msg; 166 struct port_list dports = {0, NULL, }; 167 struct port_list *item = dp; 168 int cnt = 0; 169 170 msg = buf_msg(buf); 171 172 /* Create destination port list, if one wasn't supplied */ 173 174 if (dp == NULL) { 175 tipc_nametbl_mc_translate(msg_nametype(msg), 176 msg_namelower(msg), 177 msg_nameupper(msg), 178 TIPC_CLUSTER_SCOPE, 179 &dports); 180 item = dp = &dports; 181 } 182 183 /* Deliver a copy of message to each destination port */ 184 185 if (dp->count != 0) { 186 if (dp->count == 1) { 187 msg_set_destport(msg, dp->ports[0]); 188 tipc_port_recv_msg(buf); 189 tipc_port_list_free(dp); 190 return; 191 } 192 for (; cnt < dp->count; cnt++) { 193 int index = cnt % PLSIZE; 194 struct sk_buff *b = skb_clone(buf, GFP_ATOMIC); 195 196 if (b == NULL) { 197 warn("Unable to deliver multicast message(s)\n"); 198 msg_dbg(msg, "LOST:"); 199 goto exit; 200 } 201 if ((index == 0) && (cnt != 0)) { 202 item = item->next; 203 } 204 msg_set_destport(buf_msg(b),item->ports[index]); 205 tipc_port_recv_msg(b); 206 } 207 } 208exit: 209 buf_discard(buf); 210 tipc_port_list_free(dp); 211} 212 213/** 214 * tipc_createport_raw - create a generic TIPC port 215 * 216 * Returns pointer to (locked) TIPC port, or NULL if unable to create it 217 */ 218 219struct tipc_port *tipc_createport_raw(void *usr_handle, 220 u32 (*dispatcher)(struct tipc_port *, struct sk_buff *), 221 void (*wakeup)(struct tipc_port *), 222 const u32 importance) 223{ 224 struct port *p_ptr; 225 struct tipc_msg *msg; 226 u32 ref; 227 228 p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC); 229 if (!p_ptr) { 230 warn("Port creation failed, no memory\n"); 231 return NULL; 232 } 233 ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock); 234 if (!ref) { 235 warn("Port creation failed, reference table exhausted\n"); 236 kfree(p_ptr); 237 return NULL; 238 } 239 240 p_ptr->publ.usr_handle = usr_handle; 241 p_ptr->publ.max_pkt = MAX_PKT_DEFAULT; 242 p_ptr->publ.ref = ref; 243 msg = &p_ptr->publ.phdr; 244 tipc_msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0); 245 msg_set_origport(msg, ref); 246 p_ptr->last_in_seqno = 41; 247 p_ptr->sent = 1; 248 INIT_LIST_HEAD(&p_ptr->wait_list); 249 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 250 p_ptr->dispatcher = dispatcher; 251 p_ptr->wakeup = wakeup; 252 p_ptr->user_port = NULL; 253 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 254 spin_lock_bh(&tipc_port_list_lock); 255 INIT_LIST_HEAD(&p_ptr->publications); 256 INIT_LIST_HEAD(&p_ptr->port_list); 257 list_add_tail(&p_ptr->port_list, &ports); 258 spin_unlock_bh(&tipc_port_list_lock); 259 return &(p_ptr->publ); 260} 261 262int tipc_deleteport(u32 ref) 263{ 264 struct port *p_ptr; 265 struct sk_buff *buf = NULL; 266 267 tipc_withdraw(ref, 0, NULL); 268 p_ptr = tipc_port_lock(ref); 269 if (!p_ptr) 270 return -EINVAL; 271 272 tipc_ref_discard(ref); 273 tipc_port_unlock(p_ptr); 274 275 k_cancel_timer(&p_ptr->timer); 276 if (p_ptr->publ.connected) { 277 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 278 tipc_nodesub_unsubscribe(&p_ptr->subscription); 279 } 280 if (p_ptr->user_port) { 281 tipc_reg_remove_port(p_ptr->user_port); 282 kfree(p_ptr->user_port); 283 } 284 285 spin_lock_bh(&tipc_port_list_lock); 286 list_del(&p_ptr->port_list); 287 list_del(&p_ptr->wait_list); 288 spin_unlock_bh(&tipc_port_list_lock); 289 k_term_timer(&p_ptr->timer); 290 kfree(p_ptr); 291 dbg("Deleted port %u\n", ref); 292 tipc_net_route_msg(buf); 293 return 0; 294} 295 296/** 297 * tipc_get_port() - return port associated with 'ref' 298 * 299 * Note: Port is not locked. 300 */ 301 302struct tipc_port *tipc_get_port(const u32 ref) 303{ 304 return (struct tipc_port *)tipc_ref_deref(ref); 305} 306 307/** 308 * tipc_get_handle - return user handle associated to port 'ref' 309 */ 310 311void *tipc_get_handle(const u32 ref) 312{ 313 struct port *p_ptr; 314 void * handle; 315 316 p_ptr = tipc_port_lock(ref); 317 if (!p_ptr) 318 return NULL; 319 handle = p_ptr->publ.usr_handle; 320 tipc_port_unlock(p_ptr); 321 return handle; 322} 323 324static int port_unreliable(struct port *p_ptr) 325{ 326 return msg_src_droppable(&p_ptr->publ.phdr); 327} 328 329int tipc_portunreliable(u32 ref, unsigned int *isunreliable) 330{ 331 struct port *p_ptr; 332 333 p_ptr = tipc_port_lock(ref); 334 if (!p_ptr) 335 return -EINVAL; 336 *isunreliable = port_unreliable(p_ptr); 337 tipc_port_unlock(p_ptr); 338 return 0; 339} 340 341int tipc_set_portunreliable(u32 ref, unsigned int isunreliable) 342{ 343 struct port *p_ptr; 344 345 p_ptr = tipc_port_lock(ref); 346 if (!p_ptr) 347 return -EINVAL; 348 msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0)); 349 tipc_port_unlock(p_ptr); 350 return 0; 351} 352 353static int port_unreturnable(struct port *p_ptr) 354{ 355 return msg_dest_droppable(&p_ptr->publ.phdr); 356} 357 358int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable) 359{ 360 struct port *p_ptr; 361 362 p_ptr = tipc_port_lock(ref); 363 if (!p_ptr) 364 return -EINVAL; 365 *isunrejectable = port_unreturnable(p_ptr); 366 tipc_port_unlock(p_ptr); 367 return 0; 368} 369 370int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable) 371{ 372 struct port *p_ptr; 373 374 p_ptr = tipc_port_lock(ref); 375 if (!p_ptr) 376 return -EINVAL; 377 msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0)); 378 tipc_port_unlock(p_ptr); 379 return 0; 380} 381 382/* 383 * port_build_proto_msg(): build a port level protocol 384 * or a connection abortion message. Called with 385 * tipc_port lock on. 386 */ 387static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode, 388 u32 origport, u32 orignode, 389 u32 usr, u32 type, u32 err, 390 u32 seqno, u32 ack) 391{ 392 struct sk_buff *buf; 393 struct tipc_msg *msg; 394 395 buf = buf_acquire(LONG_H_SIZE); 396 if (buf) { 397 msg = buf_msg(buf); 398 tipc_msg_init(msg, usr, type, LONG_H_SIZE, destnode); 399 msg_set_errcode(msg, err); 400 msg_set_destport(msg, destport); 401 msg_set_origport(msg, origport); 402 msg_set_orignode(msg, orignode); 403 msg_set_transp_seqno(msg, seqno); 404 msg_set_msgcnt(msg, ack); 405 msg_dbg(msg, "PORT>SEND>:"); 406 } 407 return buf; 408} 409 410int tipc_reject_msg(struct sk_buff *buf, u32 err) 411{ 412 struct tipc_msg *msg = buf_msg(buf); 413 struct sk_buff *rbuf; 414 struct tipc_msg *rmsg; 415 int hdr_sz; 416 u32 imp = msg_importance(msg); 417 u32 data_sz = msg_data_sz(msg); 418 419 if (data_sz > MAX_REJECT_SIZE) 420 data_sz = MAX_REJECT_SIZE; 421 if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE)) 422 imp++; 423 msg_dbg(msg, "port->rej: "); 424 425 /* discard rejected message if it shouldn't be returned to sender */ 426 if (msg_errcode(msg) || msg_dest_droppable(msg)) { 427 buf_discard(buf); 428 return data_sz; 429 } 430 431 /* construct rejected message */ 432 if (msg_mcast(msg)) 433 hdr_sz = MCAST_H_SIZE; 434 else 435 hdr_sz = LONG_H_SIZE; 436 rbuf = buf_acquire(data_sz + hdr_sz); 437 if (rbuf == NULL) { 438 buf_discard(buf); 439 return data_sz; 440 } 441 rmsg = buf_msg(rbuf); 442 tipc_msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg)); 443 msg_set_errcode(rmsg, err); 444 msg_set_destport(rmsg, msg_origport(msg)); 445 msg_set_origport(rmsg, msg_destport(msg)); 446 if (msg_short(msg)) { 447 msg_set_orignode(rmsg, tipc_own_addr); 448 /* leave name type & instance as zeroes */ 449 } else { 450 msg_set_orignode(rmsg, msg_destnode(msg)); 451 msg_set_nametype(rmsg, msg_nametype(msg)); 452 msg_set_nameinst(rmsg, msg_nameinst(msg)); 453 } 454 msg_set_size(rmsg, data_sz + hdr_sz); 455 skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz); 456 457 /* send self-abort message when rejecting on a connected port */ 458 if (msg_connected(msg)) { 459 struct sk_buff *abuf = NULL; 460 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 461 462 if (p_ptr) { 463 if (p_ptr->publ.connected) 464 abuf = port_build_self_abort_msg(p_ptr, err); 465 tipc_port_unlock(p_ptr); 466 } 467 tipc_net_route_msg(abuf); 468 } 469 470 /* send rejected message */ 471 buf_discard(buf); 472 tipc_net_route_msg(rbuf); 473 return data_sz; 474} 475 476int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr, 477 struct iovec const *msg_sect, u32 num_sect, 478 int err) 479{ 480 struct sk_buff *buf; 481 int res; 482 483 res = tipc_msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE, 484 !p_ptr->user_port, &buf); 485 if (!buf) 486 return res; 487 488 return tipc_reject_msg(buf, err); 489} 490 491static void port_timeout(unsigned long ref) 492{ 493 struct port *p_ptr = tipc_port_lock(ref); 494 struct sk_buff *buf = NULL; 495 496 if (!p_ptr) 497 return; 498 499 if (!p_ptr->publ.connected) { 500 tipc_port_unlock(p_ptr); 501 return; 502 } 503 504 /* Last probe answered ? */ 505 if (p_ptr->probing_state == PROBING) { 506 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 507 } else { 508 buf = port_build_proto_msg(port_peerport(p_ptr), 509 port_peernode(p_ptr), 510 p_ptr->publ.ref, 511 tipc_own_addr, 512 CONN_MANAGER, 513 CONN_PROBE, 514 TIPC_OK, 515 port_out_seqno(p_ptr), 516 0); 517 port_incr_out_seqno(p_ptr); 518 p_ptr->probing_state = PROBING; 519 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 520 } 521 tipc_port_unlock(p_ptr); 522 tipc_net_route_msg(buf); 523} 524 525 526static void port_handle_node_down(unsigned long ref) 527{ 528 struct port *p_ptr = tipc_port_lock(ref); 529 struct sk_buff* buf = NULL; 530 531 if (!p_ptr) 532 return; 533 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE); 534 tipc_port_unlock(p_ptr); 535 tipc_net_route_msg(buf); 536} 537 538 539static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err) 540{ 541 u32 imp = msg_importance(&p_ptr->publ.phdr); 542 543 if (!p_ptr->publ.connected) 544 return NULL; 545 if (imp < TIPC_CRITICAL_IMPORTANCE) 546 imp++; 547 return port_build_proto_msg(p_ptr->publ.ref, 548 tipc_own_addr, 549 port_peerport(p_ptr), 550 port_peernode(p_ptr), 551 imp, 552 TIPC_CONN_MSG, 553 err, 554 p_ptr->last_in_seqno + 1, 555 0); 556} 557 558 559static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err) 560{ 561 u32 imp = msg_importance(&p_ptr->publ.phdr); 562 563 if (!p_ptr->publ.connected) 564 return NULL; 565 if (imp < TIPC_CRITICAL_IMPORTANCE) 566 imp++; 567 return port_build_proto_msg(port_peerport(p_ptr), 568 port_peernode(p_ptr), 569 p_ptr->publ.ref, 570 tipc_own_addr, 571 imp, 572 TIPC_CONN_MSG, 573 err, 574 port_out_seqno(p_ptr), 575 0); 576} 577 578void tipc_port_recv_proto_msg(struct sk_buff *buf) 579{ 580 struct tipc_msg *msg = buf_msg(buf); 581 struct port *p_ptr = tipc_port_lock(msg_destport(msg)); 582 u32 err = TIPC_OK; 583 struct sk_buff *r_buf = NULL; 584 struct sk_buff *abort_buf = NULL; 585 586 msg_dbg(msg, "PORT<RECV<:"); 587 588 if (!p_ptr) { 589 err = TIPC_ERR_NO_PORT; 590 } else if (p_ptr->publ.connected) { 591 if (port_peernode(p_ptr) != msg_orignode(msg)) 592 err = TIPC_ERR_NO_PORT; 593 if (port_peerport(p_ptr) != msg_origport(msg)) 594 err = TIPC_ERR_NO_PORT; 595 if (!err && msg_routed(msg)) { 596 u32 seqno = msg_transp_seqno(msg); 597 u32 myno = ++p_ptr->last_in_seqno; 598 if (seqno != myno) { 599 err = TIPC_ERR_NO_PORT; 600 abort_buf = port_build_self_abort_msg(p_ptr, err); 601 } 602 } 603 if (msg_type(msg) == CONN_ACK) { 604 int wakeup = tipc_port_congested(p_ptr) && 605 p_ptr->publ.congested && 606 p_ptr->wakeup; 607 p_ptr->acked += msg_msgcnt(msg); 608 if (tipc_port_congested(p_ptr)) 609 goto exit; 610 p_ptr->publ.congested = 0; 611 if (!wakeup) 612 goto exit; 613 p_ptr->wakeup(&p_ptr->publ); 614 goto exit; 615 } 616 } else if (p_ptr->publ.published) { 617 err = TIPC_ERR_NO_PORT; 618 } 619 if (err) { 620 r_buf = port_build_proto_msg(msg_origport(msg), 621 msg_orignode(msg), 622 msg_destport(msg), 623 tipc_own_addr, 624 TIPC_HIGH_IMPORTANCE, 625 TIPC_CONN_MSG, 626 err, 627 0, 628 0); 629 goto exit; 630 } 631 632 /* All is fine */ 633 if (msg_type(msg) == CONN_PROBE) { 634 r_buf = port_build_proto_msg(msg_origport(msg), 635 msg_orignode(msg), 636 msg_destport(msg), 637 tipc_own_addr, 638 CONN_MANAGER, 639 CONN_PROBE_REPLY, 640 TIPC_OK, 641 port_out_seqno(p_ptr), 642 0); 643 } 644 p_ptr->probing_state = CONFIRMED; 645 port_incr_out_seqno(p_ptr); 646exit: 647 if (p_ptr) 648 tipc_port_unlock(p_ptr); 649 tipc_net_route_msg(r_buf); 650 tipc_net_route_msg(abort_buf); 651 buf_discard(buf); 652} 653 654static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id) 655{ 656 struct publication *publ; 657 658 if (full_id) 659 tipc_printf(buf, "<%u.%u.%u:%u>:", 660 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 661 tipc_node(tipc_own_addr), p_ptr->publ.ref); 662 else 663 tipc_printf(buf, "%-10u:", p_ptr->publ.ref); 664 665 if (p_ptr->publ.connected) { 666 u32 dport = port_peerport(p_ptr); 667 u32 destnode = port_peernode(p_ptr); 668 669 tipc_printf(buf, " connected to <%u.%u.%u:%u>", 670 tipc_zone(destnode), tipc_cluster(destnode), 671 tipc_node(destnode), dport); 672 if (p_ptr->publ.conn_type != 0) 673 tipc_printf(buf, " via {%u,%u}", 674 p_ptr->publ.conn_type, 675 p_ptr->publ.conn_instance); 676 } 677 else if (p_ptr->publ.published) { 678 tipc_printf(buf, " bound to"); 679 list_for_each_entry(publ, &p_ptr->publications, pport_list) { 680 if (publ->lower == publ->upper) 681 tipc_printf(buf, " {%u,%u}", publ->type, 682 publ->lower); 683 else 684 tipc_printf(buf, " {%u,%u,%u}", publ->type, 685 publ->lower, publ->upper); 686 } 687 } 688 tipc_printf(buf, "\n"); 689} 690 691#define MAX_PORT_QUERY 32768 692 693struct sk_buff *tipc_port_get_ports(void) 694{ 695 struct sk_buff *buf; 696 struct tlv_desc *rep_tlv; 697 struct print_buf pb; 698 struct port *p_ptr; 699 int str_len; 700 701 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY)); 702 if (!buf) 703 return NULL; 704 rep_tlv = (struct tlv_desc *)buf->data; 705 706 tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY); 707 spin_lock_bh(&tipc_port_list_lock); 708 list_for_each_entry(p_ptr, &ports, port_list) { 709 spin_lock_bh(p_ptr->publ.lock); 710 port_print(p_ptr, &pb, 0); 711 spin_unlock_bh(p_ptr->publ.lock); 712 } 713 spin_unlock_bh(&tipc_port_list_lock); 714 str_len = tipc_printbuf_validate(&pb); 715 716 skb_put(buf, TLV_SPACE(str_len)); 717 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 718 719 return buf; 720} 721 722 723void tipc_port_reinit(void) 724{ 725 struct port *p_ptr; 726 struct tipc_msg *msg; 727 728 spin_lock_bh(&tipc_port_list_lock); 729 list_for_each_entry(p_ptr, &ports, port_list) { 730 msg = &p_ptr->publ.phdr; 731 if (msg_orignode(msg) == tipc_own_addr) 732 break; 733 msg_set_prevnode(msg, tipc_own_addr); 734 msg_set_orignode(msg, tipc_own_addr); 735 } 736 spin_unlock_bh(&tipc_port_list_lock); 737} 738 739 740/* 741 * port_dispatcher_sigh(): Signal handler for messages destinated 742 * to the tipc_port interface. 743 */ 744 745static void port_dispatcher_sigh(void *dummy) 746{ 747 struct sk_buff *buf; 748 749 spin_lock_bh(&queue_lock); 750 buf = msg_queue_head; 751 msg_queue_head = NULL; 752 spin_unlock_bh(&queue_lock); 753 754 while (buf) { 755 struct port *p_ptr; 756 struct user_port *up_ptr; 757 struct tipc_portid orig; 758 struct tipc_name_seq dseq; 759 void *usr_handle; 760 int connected; 761 int published; 762 u32 message_type; 763 764 struct sk_buff *next = buf->next; 765 struct tipc_msg *msg = buf_msg(buf); 766 u32 dref = msg_destport(msg); 767 768 message_type = msg_type(msg); 769 if (message_type > TIPC_DIRECT_MSG) 770 goto reject; /* Unsupported message type */ 771 772 p_ptr = tipc_port_lock(dref); 773 if (!p_ptr) 774 goto reject; /* Port deleted while msg in queue */ 775 776 orig.ref = msg_origport(msg); 777 orig.node = msg_orignode(msg); 778 up_ptr = p_ptr->user_port; 779 usr_handle = up_ptr->usr_handle; 780 connected = p_ptr->publ.connected; 781 published = p_ptr->publ.published; 782 783 if (unlikely(msg_errcode(msg))) 784 goto err; 785 786 switch (message_type) { 787 788 case TIPC_CONN_MSG:{ 789 tipc_conn_msg_event cb = up_ptr->conn_msg_cb; 790 u32 peer_port = port_peerport(p_ptr); 791 u32 peer_node = port_peernode(p_ptr); 792 793 tipc_port_unlock(p_ptr); 794 if (unlikely(!cb)) 795 goto reject; 796 if (unlikely(!connected)) { 797 if (tipc_connect2port(dref, &orig)) 798 goto reject; 799 } else if ((msg_origport(msg) != peer_port) || 800 (msg_orignode(msg) != peer_node)) 801 goto reject; 802 if (unlikely(++p_ptr->publ.conn_unacked >= 803 TIPC_FLOW_CONTROL_WIN)) 804 tipc_acknowledge(dref, 805 p_ptr->publ.conn_unacked); 806 skb_pull(buf, msg_hdr_sz(msg)); 807 cb(usr_handle, dref, &buf, msg_data(msg), 808 msg_data_sz(msg)); 809 break; 810 } 811 case TIPC_DIRECT_MSG:{ 812 tipc_msg_event cb = up_ptr->msg_cb; 813 814 tipc_port_unlock(p_ptr); 815 if (unlikely(!cb || connected)) 816 goto reject; 817 skb_pull(buf, msg_hdr_sz(msg)); 818 cb(usr_handle, dref, &buf, msg_data(msg), 819 msg_data_sz(msg), msg_importance(msg), 820 &orig); 821 break; 822 } 823 case TIPC_MCAST_MSG: 824 case TIPC_NAMED_MSG:{ 825 tipc_named_msg_event cb = up_ptr->named_msg_cb; 826 827 tipc_port_unlock(p_ptr); 828 if (unlikely(!cb || connected || !published)) 829 goto reject; 830 dseq.type = msg_nametype(msg); 831 dseq.lower = msg_nameinst(msg); 832 dseq.upper = (message_type == TIPC_NAMED_MSG) 833 ? dseq.lower : msg_nameupper(msg); 834 skb_pull(buf, msg_hdr_sz(msg)); 835 cb(usr_handle, dref, &buf, msg_data(msg), 836 msg_data_sz(msg), msg_importance(msg), 837 &orig, &dseq); 838 break; 839 } 840 } 841 if (buf) 842 buf_discard(buf); 843 buf = next; 844 continue; 845err: 846 switch (message_type) { 847 848 case TIPC_CONN_MSG:{ 849 tipc_conn_shutdown_event cb = 850 up_ptr->conn_err_cb; 851 u32 peer_port = port_peerport(p_ptr); 852 u32 peer_node = port_peernode(p_ptr); 853 854 tipc_port_unlock(p_ptr); 855 if (!cb || !connected) 856 break; 857 if ((msg_origport(msg) != peer_port) || 858 (msg_orignode(msg) != peer_node)) 859 break; 860 tipc_disconnect(dref); 861 skb_pull(buf, msg_hdr_sz(msg)); 862 cb(usr_handle, dref, &buf, msg_data(msg), 863 msg_data_sz(msg), msg_errcode(msg)); 864 break; 865 } 866 case TIPC_DIRECT_MSG:{ 867 tipc_msg_err_event cb = up_ptr->err_cb; 868 869 tipc_port_unlock(p_ptr); 870 if (!cb || connected) 871 break; 872 skb_pull(buf, msg_hdr_sz(msg)); 873 cb(usr_handle, dref, &buf, msg_data(msg), 874 msg_data_sz(msg), msg_errcode(msg), &orig); 875 break; 876 } 877 case TIPC_MCAST_MSG: 878 case TIPC_NAMED_MSG:{ 879 tipc_named_msg_err_event cb = 880 up_ptr->named_err_cb; 881 882 tipc_port_unlock(p_ptr); 883 if (!cb || connected) 884 break; 885 dseq.type = msg_nametype(msg); 886 dseq.lower = msg_nameinst(msg); 887 dseq.upper = (message_type == TIPC_NAMED_MSG) 888 ? dseq.lower : msg_nameupper(msg); 889 skb_pull(buf, msg_hdr_sz(msg)); 890 cb(usr_handle, dref, &buf, msg_data(msg), 891 msg_data_sz(msg), msg_errcode(msg), &dseq); 892 break; 893 } 894 } 895 if (buf) 896 buf_discard(buf); 897 buf = next; 898 continue; 899reject: 900 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 901 buf = next; 902 } 903} 904 905/* 906 * port_dispatcher(): Dispatcher for messages destinated 907 * to the tipc_port interface. Called with port locked. 908 */ 909 910static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf) 911{ 912 buf->next = NULL; 913 spin_lock_bh(&queue_lock); 914 if (msg_queue_head) { 915 msg_queue_tail->next = buf; 916 msg_queue_tail = buf; 917 } else { 918 msg_queue_tail = msg_queue_head = buf; 919 tipc_k_signal((Handler)port_dispatcher_sigh, 0); 920 } 921 spin_unlock_bh(&queue_lock); 922 return 0; 923} 924 925/* 926 * Wake up port after congestion: Called with port locked, 927 * 928 */ 929 930static void port_wakeup_sh(unsigned long ref) 931{ 932 struct port *p_ptr; 933 struct user_port *up_ptr; 934 tipc_continue_event cb = NULL; 935 void *uh = NULL; 936 937 p_ptr = tipc_port_lock(ref); 938 if (p_ptr) { 939 up_ptr = p_ptr->user_port; 940 if (up_ptr) { 941 cb = up_ptr->continue_event_cb; 942 uh = up_ptr->usr_handle; 943 } 944 tipc_port_unlock(p_ptr); 945 } 946 if (cb) 947 cb(uh, ref); 948} 949 950 951static void port_wakeup(struct tipc_port *p_ptr) 952{ 953 tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref); 954} 955 956void tipc_acknowledge(u32 ref, u32 ack) 957{ 958 struct port *p_ptr; 959 struct sk_buff *buf = NULL; 960 961 p_ptr = tipc_port_lock(ref); 962 if (!p_ptr) 963 return; 964 if (p_ptr->publ.connected) { 965 p_ptr->publ.conn_unacked -= ack; 966 buf = port_build_proto_msg(port_peerport(p_ptr), 967 port_peernode(p_ptr), 968 ref, 969 tipc_own_addr, 970 CONN_MANAGER, 971 CONN_ACK, 972 TIPC_OK, 973 port_out_seqno(p_ptr), 974 ack); 975 } 976 tipc_port_unlock(p_ptr); 977 tipc_net_route_msg(buf); 978} 979 980/* 981 * tipc_createport(): user level call. Will add port to 982 * registry if non-zero user_ref. 983 */ 984 985int tipc_createport(u32 user_ref, 986 void *usr_handle, 987 unsigned int importance, 988 tipc_msg_err_event error_cb, 989 tipc_named_msg_err_event named_error_cb, 990 tipc_conn_shutdown_event conn_error_cb, 991 tipc_msg_event msg_cb, 992 tipc_named_msg_event named_msg_cb, 993 tipc_conn_msg_event conn_msg_cb, 994 tipc_continue_event continue_event_cb,/* May be zero */ 995 u32 *portref) 996{ 997 struct user_port *up_ptr; 998 struct port *p_ptr; 999 1000 up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC); 1001 if (!up_ptr) { 1002 warn("Port creation failed, no memory\n"); 1003 return -ENOMEM; 1004 } 1005 p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher, 1006 port_wakeup, importance); 1007 if (!p_ptr) { 1008 kfree(up_ptr); 1009 return -ENOMEM; 1010 } 1011 1012 p_ptr->user_port = up_ptr; 1013 up_ptr->user_ref = user_ref; 1014 up_ptr->usr_handle = usr_handle; 1015 up_ptr->ref = p_ptr->publ.ref; 1016 up_ptr->err_cb = error_cb; 1017 up_ptr->named_err_cb = named_error_cb; 1018 up_ptr->conn_err_cb = conn_error_cb; 1019 up_ptr->msg_cb = msg_cb; 1020 up_ptr->named_msg_cb = named_msg_cb; 1021 up_ptr->conn_msg_cb = conn_msg_cb; 1022 up_ptr->continue_event_cb = continue_event_cb; 1023 INIT_LIST_HEAD(&up_ptr->uport_list); 1024 tipc_reg_add_port(up_ptr); 1025 *portref = p_ptr->publ.ref; 1026 tipc_port_unlock(p_ptr); 1027 return 0; 1028} 1029 1030int tipc_ownidentity(u32 ref, struct tipc_portid *id) 1031{ 1032 id->ref = ref; 1033 id->node = tipc_own_addr; 1034 return 0; 1035} 1036 1037int tipc_portimportance(u32 ref, unsigned int *importance) 1038{ 1039 struct port *p_ptr; 1040 1041 p_ptr = tipc_port_lock(ref); 1042 if (!p_ptr) 1043 return -EINVAL; 1044 *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr); 1045 tipc_port_unlock(p_ptr); 1046 return 0; 1047} 1048 1049int tipc_set_portimportance(u32 ref, unsigned int imp) 1050{ 1051 struct port *p_ptr; 1052 1053 if (imp > TIPC_CRITICAL_IMPORTANCE) 1054 return -EINVAL; 1055 1056 p_ptr = tipc_port_lock(ref); 1057 if (!p_ptr) 1058 return -EINVAL; 1059 msg_set_importance(&p_ptr->publ.phdr, (u32)imp); 1060 tipc_port_unlock(p_ptr); 1061 return 0; 1062} 1063 1064 1065int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1066{ 1067 struct port *p_ptr; 1068 struct publication *publ; 1069 u32 key; 1070 int res = -EINVAL; 1071 1072 p_ptr = tipc_port_lock(ref); 1073 if (!p_ptr) 1074 return -EINVAL; 1075 1076 dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, " 1077 "lower = %u, upper = %u\n", 1078 ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper); 1079 if (p_ptr->publ.connected) 1080 goto exit; 1081 if (seq->lower > seq->upper) 1082 goto exit; 1083 if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE)) 1084 goto exit; 1085 key = ref + p_ptr->pub_count + 1; 1086 if (key == ref) { 1087 res = -EADDRINUSE; 1088 goto exit; 1089 } 1090 publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper, 1091 scope, p_ptr->publ.ref, key); 1092 if (publ) { 1093 list_add(&publ->pport_list, &p_ptr->publications); 1094 p_ptr->pub_count++; 1095 p_ptr->publ.published = 1; 1096 res = 0; 1097 } 1098exit: 1099 tipc_port_unlock(p_ptr); 1100 return res; 1101} 1102 1103int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq) 1104{ 1105 struct port *p_ptr; 1106 struct publication *publ; 1107 struct publication *tpubl; 1108 int res = -EINVAL; 1109 1110 p_ptr = tipc_port_lock(ref); 1111 if (!p_ptr) 1112 return -EINVAL; 1113 if (!seq) { 1114 list_for_each_entry_safe(publ, tpubl, 1115 &p_ptr->publications, pport_list) { 1116 tipc_nametbl_withdraw(publ->type, publ->lower, 1117 publ->ref, publ->key); 1118 } 1119 res = 0; 1120 } else { 1121 list_for_each_entry_safe(publ, tpubl, 1122 &p_ptr->publications, pport_list) { 1123 if (publ->scope != scope) 1124 continue; 1125 if (publ->type != seq->type) 1126 continue; 1127 if (publ->lower != seq->lower) 1128 continue; 1129 if (publ->upper != seq->upper) 1130 break; 1131 tipc_nametbl_withdraw(publ->type, publ->lower, 1132 publ->ref, publ->key); 1133 res = 0; 1134 break; 1135 } 1136 } 1137 if (list_empty(&p_ptr->publications)) 1138 p_ptr->publ.published = 0; 1139 tipc_port_unlock(p_ptr); 1140 return res; 1141} 1142 1143int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1144{ 1145 struct port *p_ptr; 1146 struct tipc_msg *msg; 1147 int res = -EINVAL; 1148 1149 p_ptr = tipc_port_lock(ref); 1150 if (!p_ptr) 1151 return -EINVAL; 1152 if (p_ptr->publ.published || p_ptr->publ.connected) 1153 goto exit; 1154 if (!peer->ref) 1155 goto exit; 1156 1157 msg = &p_ptr->publ.phdr; 1158 msg_set_destnode(msg, peer->node); 1159 msg_set_destport(msg, peer->ref); 1160 msg_set_orignode(msg, tipc_own_addr); 1161 msg_set_origport(msg, p_ptr->publ.ref); 1162 msg_set_transp_seqno(msg, 42); 1163 msg_set_type(msg, TIPC_CONN_MSG); 1164 if (!may_route(peer->node)) 1165 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1166 else 1167 msg_set_hdr_sz(msg, LONG_H_SIZE); 1168 1169 p_ptr->probing_interval = PROBING_INTERVAL; 1170 p_ptr->probing_state = CONFIRMED; 1171 p_ptr->publ.connected = 1; 1172 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 1173 1174 tipc_nodesub_subscribe(&p_ptr->subscription,peer->node, 1175 (void *)(unsigned long)ref, 1176 (net_ev_handler)port_handle_node_down); 1177 res = 0; 1178exit: 1179 tipc_port_unlock(p_ptr); 1180 p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1181 return res; 1182} 1183 1184/** 1185 * tipc_disconnect_port - disconnect port from peer 1186 * 1187 * Port must be locked. 1188 */ 1189 1190int tipc_disconnect_port(struct tipc_port *tp_ptr) 1191{ 1192 int res; 1193 1194 if (tp_ptr->connected) { 1195 tp_ptr->connected = 0; 1196 /* let timer expire on it's own to avoid deadlock! */ 1197 tipc_nodesub_unsubscribe( 1198 &((struct port *)tp_ptr)->subscription); 1199 res = 0; 1200 } else { 1201 res = -ENOTCONN; 1202 } 1203 return res; 1204} 1205 1206/* 1207 * tipc_disconnect(): Disconnect port form peer. 1208 * This is a node local operation. 1209 */ 1210 1211int tipc_disconnect(u32 ref) 1212{ 1213 struct port *p_ptr; 1214 int res; 1215 1216 p_ptr = tipc_port_lock(ref); 1217 if (!p_ptr) 1218 return -EINVAL; 1219 res = tipc_disconnect_port((struct tipc_port *)p_ptr); 1220 tipc_port_unlock(p_ptr); 1221 return res; 1222} 1223 1224/* 1225 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect 1226 */ 1227int tipc_shutdown(u32 ref) 1228{ 1229 struct port *p_ptr; 1230 struct sk_buff *buf = NULL; 1231 1232 p_ptr = tipc_port_lock(ref); 1233 if (!p_ptr) 1234 return -EINVAL; 1235 1236 if (p_ptr->publ.connected) { 1237 u32 imp = msg_importance(&p_ptr->publ.phdr); 1238 if (imp < TIPC_CRITICAL_IMPORTANCE) 1239 imp++; 1240 buf = port_build_proto_msg(port_peerport(p_ptr), 1241 port_peernode(p_ptr), 1242 ref, 1243 tipc_own_addr, 1244 imp, 1245 TIPC_CONN_MSG, 1246 TIPC_CONN_SHUTDOWN, 1247 port_out_seqno(p_ptr), 1248 0); 1249 } 1250 tipc_port_unlock(p_ptr); 1251 tipc_net_route_msg(buf); 1252 return tipc_disconnect(ref); 1253} 1254 1255int tipc_isconnected(u32 ref, int *isconnected) 1256{ 1257 struct port *p_ptr; 1258 1259 p_ptr = tipc_port_lock(ref); 1260 if (!p_ptr) 1261 return -EINVAL; 1262 *isconnected = p_ptr->publ.connected; 1263 tipc_port_unlock(p_ptr); 1264 return 0; 1265} 1266 1267int tipc_peer(u32 ref, struct tipc_portid *peer) 1268{ 1269 struct port *p_ptr; 1270 int res; 1271 1272 p_ptr = tipc_port_lock(ref); 1273 if (!p_ptr) 1274 return -EINVAL; 1275 if (p_ptr->publ.connected) { 1276 peer->ref = port_peerport(p_ptr); 1277 peer->node = port_peernode(p_ptr); 1278 res = 0; 1279 } else 1280 res = -ENOTCONN; 1281 tipc_port_unlock(p_ptr); 1282 return res; 1283} 1284 1285int tipc_ref_valid(u32 ref) 1286{ 1287 /* Works irrespective of type */ 1288 return !!tipc_ref_deref(ref); 1289} 1290 1291 1292/* 1293 * tipc_port_recv_sections(): Concatenate and deliver sectioned 1294 * message for this node. 1295 */ 1296 1297int tipc_port_recv_sections(struct port *sender, unsigned int num_sect, 1298 struct iovec const *msg_sect) 1299{ 1300 struct sk_buff *buf; 1301 int res; 1302 1303 res = tipc_msg_build(&sender->publ.phdr, msg_sect, num_sect, 1304 MAX_MSG_SIZE, !sender->user_port, &buf); 1305 if (likely(buf)) 1306 tipc_port_recv_msg(buf); 1307 return res; 1308} 1309 1310/** 1311 * tipc_send - send message sections on connection 1312 */ 1313 1314int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect) 1315{ 1316 struct port *p_ptr; 1317 u32 destnode; 1318 int res; 1319 1320 p_ptr = tipc_port_deref(ref); 1321 if (!p_ptr || !p_ptr->publ.connected) 1322 return -EINVAL; 1323 1324 p_ptr->publ.congested = 1; 1325 if (!tipc_port_congested(p_ptr)) { 1326 destnode = port_peernode(p_ptr); 1327 if (likely(destnode != tipc_own_addr)) 1328 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1329 destnode); 1330 else 1331 res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1332 1333 if (likely(res != -ELINKCONG)) { 1334 port_incr_out_seqno(p_ptr); 1335 p_ptr->publ.congested = 0; 1336 p_ptr->sent++; 1337 return res; 1338 } 1339 } 1340 if (port_unreliable(p_ptr)) { 1341 p_ptr->publ.congested = 0; 1342 /* Just calculate msg length and return */ 1343 return tipc_msg_calc_data_size(msg_sect, num_sect); 1344 } 1345 return -ELINKCONG; 1346} 1347 1348/** 1349 * tipc_send_buf - send message buffer on connection 1350 */ 1351 1352int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz) 1353{ 1354 struct port *p_ptr; 1355 struct tipc_msg *msg; 1356 u32 destnode; 1357 u32 hsz; 1358 u32 sz; 1359 u32 res; 1360 1361 p_ptr = tipc_port_deref(ref); 1362 if (!p_ptr || !p_ptr->publ.connected) 1363 return -EINVAL; 1364 1365 msg = &p_ptr->publ.phdr; 1366 hsz = msg_hdr_sz(msg); 1367 sz = hsz + dsz; 1368 msg_set_size(msg, sz); 1369 if (skb_cow(buf, hsz)) 1370 return -ENOMEM; 1371 1372 skb_push(buf, hsz); 1373 skb_copy_to_linear_data(buf, msg, hsz); 1374 destnode = msg_destnode(msg); 1375 p_ptr->publ.congested = 1; 1376 if (!tipc_port_congested(p_ptr)) { 1377 if (likely(destnode != tipc_own_addr)) 1378 res = tipc_send_buf_fast(buf, destnode); 1379 else { 1380 tipc_port_recv_msg(buf); 1381 res = sz; 1382 } 1383 if (likely(res != -ELINKCONG)) { 1384 port_incr_out_seqno(p_ptr); 1385 p_ptr->sent++; 1386 p_ptr->publ.congested = 0; 1387 return res; 1388 } 1389 } 1390 if (port_unreliable(p_ptr)) { 1391 p_ptr->publ.congested = 0; 1392 return dsz; 1393 } 1394 return -ELINKCONG; 1395} 1396 1397/** 1398 * tipc_forward2name - forward message sections to port name 1399 */ 1400 1401int tipc_forward2name(u32 ref, 1402 struct tipc_name const *name, 1403 u32 domain, 1404 u32 num_sect, 1405 struct iovec const *msg_sect, 1406 struct tipc_portid const *orig, 1407 unsigned int importance) 1408{ 1409 struct port *p_ptr; 1410 struct tipc_msg *msg; 1411 u32 destnode = domain; 1412 u32 destport; 1413 int res; 1414 1415 p_ptr = tipc_port_deref(ref); 1416 if (!p_ptr || p_ptr->publ.connected) 1417 return -EINVAL; 1418 1419 msg = &p_ptr->publ.phdr; 1420 msg_set_type(msg, TIPC_NAMED_MSG); 1421 msg_set_orignode(msg, orig->node); 1422 msg_set_origport(msg, orig->ref); 1423 msg_set_hdr_sz(msg, LONG_H_SIZE); 1424 msg_set_nametype(msg, name->type); 1425 msg_set_nameinst(msg, name->instance); 1426 msg_set_lookup_scope(msg, tipc_addr_scope(domain)); 1427 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1428 msg_set_importance(msg,importance); 1429 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1430 msg_set_destnode(msg, destnode); 1431 msg_set_destport(msg, destport); 1432 1433 if (likely(destport || destnode)) { 1434 p_ptr->sent++; 1435 if (likely(destnode == tipc_own_addr)) 1436 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1437 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, 1438 destnode); 1439 if (likely(res != -ELINKCONG)) 1440 return res; 1441 if (port_unreliable(p_ptr)) { 1442 /* Just calculate msg length and return */ 1443 return tipc_msg_calc_data_size(msg_sect, num_sect); 1444 } 1445 return -ELINKCONG; 1446 } 1447 return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect, 1448 TIPC_ERR_NO_NAME); 1449} 1450 1451/** 1452 * tipc_send2name - send message sections to port name 1453 */ 1454 1455int tipc_send2name(u32 ref, 1456 struct tipc_name const *name, 1457 unsigned int domain, 1458 unsigned int num_sect, 1459 struct iovec const *msg_sect) 1460{ 1461 struct tipc_portid orig; 1462 1463 orig.ref = ref; 1464 orig.node = tipc_own_addr; 1465 return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig, 1466 TIPC_PORT_IMPORTANCE); 1467} 1468 1469/** 1470 * tipc_forward_buf2name - forward message buffer to port name 1471 */ 1472 1473int tipc_forward_buf2name(u32 ref, 1474 struct tipc_name const *name, 1475 u32 domain, 1476 struct sk_buff *buf, 1477 unsigned int dsz, 1478 struct tipc_portid const *orig, 1479 unsigned int importance) 1480{ 1481 struct port *p_ptr; 1482 struct tipc_msg *msg; 1483 u32 destnode = domain; 1484 u32 destport; 1485 int res; 1486 1487 p_ptr = (struct port *)tipc_ref_deref(ref); 1488 if (!p_ptr || p_ptr->publ.connected) 1489 return -EINVAL; 1490 1491 msg = &p_ptr->publ.phdr; 1492 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1493 msg_set_importance(msg, importance); 1494 msg_set_type(msg, TIPC_NAMED_MSG); 1495 msg_set_orignode(msg, orig->node); 1496 msg_set_origport(msg, orig->ref); 1497 msg_set_nametype(msg, name->type); 1498 msg_set_nameinst(msg, name->instance); 1499 msg_set_lookup_scope(msg, tipc_addr_scope(domain)); 1500 msg_set_hdr_sz(msg, LONG_H_SIZE); 1501 msg_set_size(msg, LONG_H_SIZE + dsz); 1502 destport = tipc_nametbl_translate(name->type, name->instance, &destnode); 1503 msg_set_destnode(msg, destnode); 1504 msg_set_destport(msg, destport); 1505 msg_dbg(msg, "forw2name ==> "); 1506 if (skb_cow(buf, LONG_H_SIZE)) 1507 return -ENOMEM; 1508 skb_push(buf, LONG_H_SIZE); 1509 skb_copy_to_linear_data(buf, msg, LONG_H_SIZE); 1510 msg_dbg(buf_msg(buf),"PREP:"); 1511 if (likely(destport || destnode)) { 1512 p_ptr->sent++; 1513 if (destnode == tipc_own_addr) 1514 return tipc_port_recv_msg(buf); 1515 res = tipc_send_buf_fast(buf, destnode); 1516 if (likely(res != -ELINKCONG)) 1517 return res; 1518 if (port_unreliable(p_ptr)) 1519 return dsz; 1520 return -ELINKCONG; 1521 } 1522 return tipc_reject_msg(buf, TIPC_ERR_NO_NAME); 1523} 1524 1525/** 1526 * tipc_send_buf2name - send message buffer to port name 1527 */ 1528 1529int tipc_send_buf2name(u32 ref, 1530 struct tipc_name const *dest, 1531 u32 domain, 1532 struct sk_buff *buf, 1533 unsigned int dsz) 1534{ 1535 struct tipc_portid orig; 1536 1537 orig.ref = ref; 1538 orig.node = tipc_own_addr; 1539 return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig, 1540 TIPC_PORT_IMPORTANCE); 1541} 1542 1543/** 1544 * tipc_forward2port - forward message sections to port identity 1545 */ 1546 1547int tipc_forward2port(u32 ref, 1548 struct tipc_portid const *dest, 1549 unsigned int num_sect, 1550 struct iovec const *msg_sect, 1551 struct tipc_portid const *orig, 1552 unsigned int importance) 1553{ 1554 struct port *p_ptr; 1555 struct tipc_msg *msg; 1556 int res; 1557 1558 p_ptr = tipc_port_deref(ref); 1559 if (!p_ptr || p_ptr->publ.connected) 1560 return -EINVAL; 1561 1562 msg = &p_ptr->publ.phdr; 1563 msg_set_type(msg, TIPC_DIRECT_MSG); 1564 msg_set_orignode(msg, orig->node); 1565 msg_set_origport(msg, orig->ref); 1566 msg_set_destnode(msg, dest->node); 1567 msg_set_destport(msg, dest->ref); 1568 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1569 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1570 msg_set_importance(msg, importance); 1571 p_ptr->sent++; 1572 if (dest->node == tipc_own_addr) 1573 return tipc_port_recv_sections(p_ptr, num_sect, msg_sect); 1574 res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node); 1575 if (likely(res != -ELINKCONG)) 1576 return res; 1577 if (port_unreliable(p_ptr)) { 1578 /* Just calculate msg length and return */ 1579 return tipc_msg_calc_data_size(msg_sect, num_sect); 1580 } 1581 return -ELINKCONG; 1582} 1583 1584/** 1585 * tipc_send2port - send message sections to port identity 1586 */ 1587 1588int tipc_send2port(u32 ref, 1589 struct tipc_portid const *dest, 1590 unsigned int num_sect, 1591 struct iovec const *msg_sect) 1592{ 1593 struct tipc_portid orig; 1594 1595 orig.ref = ref; 1596 orig.node = tipc_own_addr; 1597 return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig, 1598 TIPC_PORT_IMPORTANCE); 1599} 1600 1601/** 1602 * tipc_forward_buf2port - forward message buffer to port identity 1603 */ 1604int tipc_forward_buf2port(u32 ref, 1605 struct tipc_portid const *dest, 1606 struct sk_buff *buf, 1607 unsigned int dsz, 1608 struct tipc_portid const *orig, 1609 unsigned int importance) 1610{ 1611 struct port *p_ptr; 1612 struct tipc_msg *msg; 1613 int res; 1614 1615 p_ptr = (struct port *)tipc_ref_deref(ref); 1616 if (!p_ptr || p_ptr->publ.connected) 1617 return -EINVAL; 1618 1619 msg = &p_ptr->publ.phdr; 1620 msg_set_type(msg, TIPC_DIRECT_MSG); 1621 msg_set_orignode(msg, orig->node); 1622 msg_set_origport(msg, orig->ref); 1623 msg_set_destnode(msg, dest->node); 1624 msg_set_destport(msg, dest->ref); 1625 msg_set_hdr_sz(msg, DIR_MSG_H_SIZE); 1626 if (importance <= TIPC_CRITICAL_IMPORTANCE) 1627 msg_set_importance(msg, importance); 1628 msg_set_size(msg, DIR_MSG_H_SIZE + dsz); 1629 if (skb_cow(buf, DIR_MSG_H_SIZE)) 1630 return -ENOMEM; 1631 1632 skb_push(buf, DIR_MSG_H_SIZE); 1633 skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE); 1634 msg_dbg(msg, "buf2port: "); 1635 p_ptr->sent++; 1636 if (dest->node == tipc_own_addr) 1637 return tipc_port_recv_msg(buf); 1638 res = tipc_send_buf_fast(buf, dest->node); 1639 if (likely(res != -ELINKCONG)) 1640 return res; 1641 if (port_unreliable(p_ptr)) 1642 return dsz; 1643 return -ELINKCONG; 1644} 1645 1646/** 1647 * tipc_send_buf2port - send message buffer to port identity 1648 */ 1649 1650int tipc_send_buf2port(u32 ref, 1651 struct tipc_portid const *dest, 1652 struct sk_buff *buf, 1653 unsigned int dsz) 1654{ 1655 struct tipc_portid orig; 1656 1657 orig.ref = ref; 1658 orig.node = tipc_own_addr; 1659 return tipc_forward_buf2port(ref, dest, buf, dsz, &orig, 1660 TIPC_PORT_IMPORTANCE); 1661} 1662