1/* 2 * net/tipc/link.c: TIPC link code 3 * 4 * Copyright (c) 1996-2006, Ericsson AB 5 * Copyright (c) 2004-2006, Wind River Systems 6 * All rights reserved. 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in the 15 * documentation and/or other materials provided with the distribution. 16 * 3. Neither the names of the copyright holders nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * Alternatively, this software may be distributed under the terms of the 21 * GNU General Public License ("GPL") version 2 as published by the Free 22 * Software Foundation. 23 * 24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 * POSSIBILITY OF SUCH DAMAGE. 35 */ 36 37#include "core.h" 38#include "dbg.h" 39#include "link.h" 40#include "net.h" 41#include "node.h" 42#include "port.h" 43#include "addr.h" 44#include "node_subscr.h" 45#include "name_distr.h" 46#include "bearer.h" 47#include "name_table.h" 48#include "discover.h" 49#include "config.h" 50#include "bcast.h" 51 52 53/* 54 * Limit for deferred reception queue: 55 */ 56 57#define DEF_QUEUE_LIMIT 256u 58 59/* 60 * Link state events: 61 */ 62 63#define STARTING_EVT 856384768 /* link processing trigger */ 64#define TRAFFIC_MSG_EVT 560815u /* rx'd ??? */ 65#define TIMEOUT_EVT 560817u /* link timer expired */ 66 67/* 68 * The following two 'message types' is really just implementation 69 * data conveniently stored in the message header. 70 * They must not be considered part of the protocol 71 */ 72#define OPEN_MSG 0 73#define CLOSED_MSG 1 74 75/* 76 * State value stored in 'exp_msg_count' 77 */ 78 79#define START_CHANGEOVER 100000u 80 81/** 82 * struct link_name - deconstructed link name 83 * @addr_local: network address of node at this end 84 * @if_local: name of interface at this end 85 * @addr_peer: network address of node at far end 86 * @if_peer: name of interface at far end 87 */ 88 89struct link_name { 90 u32 addr_local; 91 char if_local[TIPC_MAX_IF_NAME]; 92 u32 addr_peer; 93 char if_peer[TIPC_MAX_IF_NAME]; 94}; 95 96 97static void link_handle_out_of_seq_msg(struct link *l_ptr, 98 struct sk_buff *buf); 99static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf); 100static int link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf); 101static void link_set_supervision_props(struct link *l_ptr, u32 tolerance); 102static int link_send_sections_long(struct port *sender, 103 struct iovec const *msg_sect, 104 u32 num_sect, u32 destnode); 105static void link_check_defragm_bufs(struct link *l_ptr); 106static void link_state_event(struct link *l_ptr, u32 event); 107static void link_reset_statistics(struct link *l_ptr); 108static void link_print(struct link *l_ptr, struct print_buf *buf, 109 const char *str); 110 111/* 112 * Debugging code used by link routines only 113 * 114 * When debugging link problems on a system that has multiple links, 115 * the standard TIPC debugging routines may not be useful since they 116 * allow the output from multiple links to be intermixed. For this reason 117 * routines of the form "dbg_link_XXX()" have been created that will capture 118 * debug info into a link's personal print buffer, which can then be dumped 119 * into the TIPC system log (TIPC_LOG) upon request. 120 * 121 * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size 122 * of the print buffer used by each link. If LINK_LOG_BUF_SIZE is set to 0, 123 * the dbg_link_XXX() routines simply send their output to the standard 124 * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful 125 * when there is only a single link in the system being debugged. 126 * 127 * Notes: 128 * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE 129 * - "l_ptr" must be valid when using dbg_link_XXX() macros 130 */ 131 132#define LINK_LOG_BUF_SIZE 0 133 134#define dbg_link(fmt, arg...) do {if (LINK_LOG_BUF_SIZE) tipc_printf(&l_ptr->print_buf, fmt, ## arg); } while(0) 135#define dbg_link_msg(msg, txt) do {if (LINK_LOG_BUF_SIZE) tipc_msg_print(&l_ptr->print_buf, msg, txt); } while(0) 136#define dbg_link_state(txt) do {if (LINK_LOG_BUF_SIZE) link_print(l_ptr, &l_ptr->print_buf, txt); } while(0) 137#define dbg_link_dump() do { \ 138 if (LINK_LOG_BUF_SIZE) { \ 139 tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \ 140 tipc_printbuf_move(LOG, &l_ptr->print_buf); \ 141 } \ 142} while (0) 143 144static void dbg_print_link(struct link *l_ptr, const char *str) 145{ 146 if (DBG_OUTPUT != TIPC_NULL) 147 link_print(l_ptr, DBG_OUTPUT, str); 148} 149 150static void dbg_print_buf_chain(struct sk_buff *root_buf) 151{ 152 if (DBG_OUTPUT != TIPC_NULL) { 153 struct sk_buff *buf = root_buf; 154 155 while (buf) { 156 msg_dbg(buf_msg(buf), "In chain: "); 157 buf = buf->next; 158 } 159 } 160} 161 162/* 163 * Simple link routines 164 */ 165 166static unsigned int align(unsigned int i) 167{ 168 return (i + 3) & ~3u; 169} 170 171static int link_working_working(struct link *l_ptr) 172{ 173 return (l_ptr->state == WORKING_WORKING); 174} 175 176static int link_working_unknown(struct link *l_ptr) 177{ 178 return (l_ptr->state == WORKING_UNKNOWN); 179} 180 181static int link_reset_unknown(struct link *l_ptr) 182{ 183 return (l_ptr->state == RESET_UNKNOWN); 184} 185 186static int link_reset_reset(struct link *l_ptr) 187{ 188 return (l_ptr->state == RESET_RESET); 189} 190 191static int link_blocked(struct link *l_ptr) 192{ 193 return (l_ptr->exp_msg_count || l_ptr->blocked); 194} 195 196static int link_congested(struct link *l_ptr) 197{ 198 return (l_ptr->out_queue_size >= l_ptr->queue_limit[0]); 199} 200 201static u32 link_max_pkt(struct link *l_ptr) 202{ 203 return l_ptr->max_pkt; 204} 205 206static void link_init_max_pkt(struct link *l_ptr) 207{ 208 u32 max_pkt; 209 210 max_pkt = (l_ptr->b_ptr->publ.mtu & ~3); 211 if (max_pkt > MAX_MSG_SIZE) 212 max_pkt = MAX_MSG_SIZE; 213 214 l_ptr->max_pkt_target = max_pkt; 215 if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT) 216 l_ptr->max_pkt = l_ptr->max_pkt_target; 217 else 218 l_ptr->max_pkt = MAX_PKT_DEFAULT; 219 220 l_ptr->max_pkt_probes = 0; 221} 222 223static u32 link_next_sent(struct link *l_ptr) 224{ 225 if (l_ptr->next_out) 226 return msg_seqno(buf_msg(l_ptr->next_out)); 227 return mod(l_ptr->next_out_no); 228} 229 230static u32 link_last_sent(struct link *l_ptr) 231{ 232 return mod(link_next_sent(l_ptr) - 1); 233} 234 235/* 236 * Simple non-static link routines (i.e. referenced outside this file) 237 */ 238 239int tipc_link_is_up(struct link *l_ptr) 240{ 241 if (!l_ptr) 242 return 0; 243 return (link_working_working(l_ptr) || link_working_unknown(l_ptr)); 244} 245 246int tipc_link_is_active(struct link *l_ptr) 247{ 248 return ((l_ptr->owner->active_links[0] == l_ptr) || 249 (l_ptr->owner->active_links[1] == l_ptr)); 250} 251 252/** 253 * link_name_validate - validate & (optionally) deconstruct link name 254 * @name - ptr to link name string 255 * @name_parts - ptr to area for link name components (or NULL if not needed) 256 * 257 * Returns 1 if link name is valid, otherwise 0. 258 */ 259 260static int link_name_validate(const char *name, struct link_name *name_parts) 261{ 262 char name_copy[TIPC_MAX_LINK_NAME]; 263 char *addr_local; 264 char *if_local; 265 char *addr_peer; 266 char *if_peer; 267 char dummy; 268 u32 z_local, c_local, n_local; 269 u32 z_peer, c_peer, n_peer; 270 u32 if_local_len; 271 u32 if_peer_len; 272 273 /* copy link name & ensure length is OK */ 274 275 name_copy[TIPC_MAX_LINK_NAME - 1] = 0; 276 /* need above in case non-Posix strncpy() doesn't pad with nulls */ 277 strncpy(name_copy, name, TIPC_MAX_LINK_NAME); 278 if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0) 279 return 0; 280 281 /* ensure all component parts of link name are present */ 282 283 addr_local = name_copy; 284 if ((if_local = strchr(addr_local, ':')) == NULL) 285 return 0; 286 *(if_local++) = 0; 287 if ((addr_peer = strchr(if_local, '-')) == NULL) 288 return 0; 289 *(addr_peer++) = 0; 290 if_local_len = addr_peer - if_local; 291 if ((if_peer = strchr(addr_peer, ':')) == NULL) 292 return 0; 293 *(if_peer++) = 0; 294 if_peer_len = strlen(if_peer) + 1; 295 296 /* validate component parts of link name */ 297 298 if ((sscanf(addr_local, "%u.%u.%u%c", 299 &z_local, &c_local, &n_local, &dummy) != 3) || 300 (sscanf(addr_peer, "%u.%u.%u%c", 301 &z_peer, &c_peer, &n_peer, &dummy) != 3) || 302 (z_local > 255) || (c_local > 4095) || (n_local > 4095) || 303 (z_peer > 255) || (c_peer > 4095) || (n_peer > 4095) || 304 (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) || 305 (if_peer_len <= 1) || (if_peer_len > TIPC_MAX_IF_NAME) || 306 (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) || 307 (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1))) 308 return 0; 309 310 /* return link name components, if necessary */ 311 312 if (name_parts) { 313 name_parts->addr_local = tipc_addr(z_local, c_local, n_local); 314 strcpy(name_parts->if_local, if_local); 315 name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer); 316 strcpy(name_parts->if_peer, if_peer); 317 } 318 return 1; 319} 320 321/** 322 * link_timeout - handle expiration of link timer 323 * @l_ptr: pointer to link 324 * 325 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict 326 * with tipc_link_delete(). (There is no risk that the node will be deleted by 327 * another thread because tipc_link_delete() always cancels the link timer before 328 * tipc_node_delete() is called.) 329 */ 330 331static void link_timeout(struct link *l_ptr) 332{ 333 tipc_node_lock(l_ptr->owner); 334 335 /* update counters used in statistical profiling of send traffic */ 336 337 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 338 l_ptr->stats.queue_sz_counts++; 339 340 if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz) 341 l_ptr->stats.max_queue_sz = l_ptr->out_queue_size; 342 343 if (l_ptr->first_out) { 344 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 345 u32 length = msg_size(msg); 346 347 if ((msg_user(msg) == MSG_FRAGMENTER) 348 && (msg_type(msg) == FIRST_FRAGMENT)) { 349 length = msg_size(msg_get_wrapped(msg)); 350 } 351 if (length) { 352 l_ptr->stats.msg_lengths_total += length; 353 l_ptr->stats.msg_length_counts++; 354 if (length <= 64) 355 l_ptr->stats.msg_length_profile[0]++; 356 else if (length <= 256) 357 l_ptr->stats.msg_length_profile[1]++; 358 else if (length <= 1024) 359 l_ptr->stats.msg_length_profile[2]++; 360 else if (length <= 4096) 361 l_ptr->stats.msg_length_profile[3]++; 362 else if (length <= 16384) 363 l_ptr->stats.msg_length_profile[4]++; 364 else if (length <= 32768) 365 l_ptr->stats.msg_length_profile[5]++; 366 else 367 l_ptr->stats.msg_length_profile[6]++; 368 } 369 } 370 371 /* do all other link processing performed on a periodic basis */ 372 373 link_check_defragm_bufs(l_ptr); 374 375 link_state_event(l_ptr, TIMEOUT_EVT); 376 377 if (l_ptr->next_out) 378 tipc_link_push_queue(l_ptr); 379 380 tipc_node_unlock(l_ptr->owner); 381} 382 383static void link_set_timer(struct link *l_ptr, u32 time) 384{ 385 k_start_timer(&l_ptr->timer, time); 386} 387 388/** 389 * tipc_link_create - create a new link 390 * @b_ptr: pointer to associated bearer 391 * @peer: network address of node at other end of link 392 * @media_addr: media address to use when sending messages over link 393 * 394 * Returns pointer to link. 395 */ 396 397struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer, 398 const struct tipc_media_addr *media_addr) 399{ 400 struct link *l_ptr; 401 struct tipc_msg *msg; 402 char *if_name; 403 404 l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC); 405 if (!l_ptr) { 406 warn("Link creation failed, no memory\n"); 407 return NULL; 408 } 409 410 l_ptr->addr = peer; 411 if_name = strchr(b_ptr->publ.name, ':') + 1; 412 sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:", 413 tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr), 414 tipc_node(tipc_own_addr), 415 if_name, 416 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); 417 /* note: peer i/f is appended to link name by reset/activate */ 418 memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr)); 419 k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr); 420 list_add_tail(&l_ptr->link_list, &b_ptr->links); 421 l_ptr->checkpoint = 1; 422 l_ptr->b_ptr = b_ptr; 423 link_set_supervision_props(l_ptr, b_ptr->media->tolerance); 424 l_ptr->state = RESET_UNKNOWN; 425 426 l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg; 427 msg = l_ptr->pmsg; 428 msg_init(msg, LINK_PROTOCOL, RESET_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr); 429 msg_set_size(msg, sizeof(l_ptr->proto_msg)); 430 msg_set_session(msg, tipc_random); 431 msg_set_bearer_id(msg, b_ptr->identity); 432 strcpy((char *)msg_data(msg), if_name); 433 434 l_ptr->priority = b_ptr->priority; 435 tipc_link_set_queue_limits(l_ptr, b_ptr->media->window); 436 437 link_init_max_pkt(l_ptr); 438 439 l_ptr->next_out_no = 1; 440 INIT_LIST_HEAD(&l_ptr->waiting_ports); 441 442 link_reset_statistics(l_ptr); 443 444 l_ptr->owner = tipc_node_attach_link(l_ptr); 445 if (!l_ptr->owner) { 446 kfree(l_ptr); 447 return NULL; 448 } 449 450 if (LINK_LOG_BUF_SIZE) { 451 char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC); 452 453 if (!pb) { 454 kfree(l_ptr); 455 warn("Link creation failed, no memory for print buffer\n"); 456 return NULL; 457 } 458 tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE); 459 } 460 461 tipc_k_signal((Handler)tipc_link_start, (unsigned long)l_ptr); 462 463 dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n", 464 l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit); 465 466 return l_ptr; 467} 468 469/** 470 * tipc_link_delete - delete a link 471 * @l_ptr: pointer to link 472 * 473 * Note: 'tipc_net_lock' is write_locked, bearer is locked. 474 * This routine must not grab the node lock until after link timer cancellation 475 * to avoid a potential deadlock situation. 476 */ 477 478void tipc_link_delete(struct link *l_ptr) 479{ 480 if (!l_ptr) { 481 err("Attempt to delete non-existent link\n"); 482 return; 483 } 484 485 dbg("tipc_link_delete()\n"); 486 487 k_cancel_timer(&l_ptr->timer); 488 489 tipc_node_lock(l_ptr->owner); 490 tipc_link_reset(l_ptr); 491 tipc_node_detach_link(l_ptr->owner, l_ptr); 492 tipc_link_stop(l_ptr); 493 list_del_init(&l_ptr->link_list); 494 if (LINK_LOG_BUF_SIZE) 495 kfree(l_ptr->print_buf.buf); 496 tipc_node_unlock(l_ptr->owner); 497 k_term_timer(&l_ptr->timer); 498 kfree(l_ptr); 499} 500 501void tipc_link_start(struct link *l_ptr) 502{ 503 dbg("tipc_link_start %x\n", l_ptr); 504 link_state_event(l_ptr, STARTING_EVT); 505} 506 507/** 508 * link_schedule_port - schedule port for deferred sending 509 * @l_ptr: pointer to link 510 * @origport: reference to sending port 511 * @sz: amount of data to be sent 512 * 513 * Schedules port for renewed sending of messages after link congestion 514 * has abated. 515 */ 516 517static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz) 518{ 519 struct port *p_ptr; 520 521 spin_lock_bh(&tipc_port_list_lock); 522 p_ptr = tipc_port_lock(origport); 523 if (p_ptr) { 524 if (!p_ptr->wakeup) 525 goto exit; 526 if (!list_empty(&p_ptr->wait_list)) 527 goto exit; 528 p_ptr->congested_link = l_ptr; 529 p_ptr->publ.congested = 1; 530 p_ptr->waiting_pkts = 1 + ((sz - 1) / link_max_pkt(l_ptr)); 531 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 532 l_ptr->stats.link_congs++; 533exit: 534 tipc_port_unlock(p_ptr); 535 } 536 spin_unlock_bh(&tipc_port_list_lock); 537 return -ELINKCONG; 538} 539 540void tipc_link_wakeup_ports(struct link *l_ptr, int all) 541{ 542 struct port *p_ptr; 543 struct port *temp_p_ptr; 544 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 545 546 if (all) 547 win = 100000; 548 if (win <= 0) 549 return; 550 if (!spin_trylock_bh(&tipc_port_list_lock)) 551 return; 552 if (link_congested(l_ptr)) 553 goto exit; 554 list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports, 555 wait_list) { 556 if (win <= 0) 557 break; 558 list_del_init(&p_ptr->wait_list); 559 p_ptr->congested_link = NULL; 560 spin_lock_bh(p_ptr->publ.lock); 561 p_ptr->publ.congested = 0; 562 p_ptr->wakeup(&p_ptr->publ); 563 win -= p_ptr->waiting_pkts; 564 spin_unlock_bh(p_ptr->publ.lock); 565 } 566 567exit: 568 spin_unlock_bh(&tipc_port_list_lock); 569} 570 571/** 572 * link_release_outqueue - purge link's outbound message queue 573 * @l_ptr: pointer to link 574 */ 575 576static void link_release_outqueue(struct link *l_ptr) 577{ 578 struct sk_buff *buf = l_ptr->first_out; 579 struct sk_buff *next; 580 581 while (buf) { 582 next = buf->next; 583 buf_discard(buf); 584 buf = next; 585 } 586 l_ptr->first_out = NULL; 587 l_ptr->out_queue_size = 0; 588} 589 590/** 591 * tipc_link_reset_fragments - purge link's inbound message fragments queue 592 * @l_ptr: pointer to link 593 */ 594 595void tipc_link_reset_fragments(struct link *l_ptr) 596{ 597 struct sk_buff *buf = l_ptr->defragm_buf; 598 struct sk_buff *next; 599 600 while (buf) { 601 next = buf->next; 602 buf_discard(buf); 603 buf = next; 604 } 605 l_ptr->defragm_buf = NULL; 606} 607 608/** 609 * tipc_link_stop - purge all inbound and outbound messages associated with link 610 * @l_ptr: pointer to link 611 */ 612 613void tipc_link_stop(struct link *l_ptr) 614{ 615 struct sk_buff *buf; 616 struct sk_buff *next; 617 618 buf = l_ptr->oldest_deferred_in; 619 while (buf) { 620 next = buf->next; 621 buf_discard(buf); 622 buf = next; 623 } 624 625 buf = l_ptr->first_out; 626 while (buf) { 627 next = buf->next; 628 buf_discard(buf); 629 buf = next; 630 } 631 632 tipc_link_reset_fragments(l_ptr); 633 634 buf_discard(l_ptr->proto_msg_queue); 635 l_ptr->proto_msg_queue = NULL; 636} 637 638 639#define link_send_event(fcn, l_ptr, up) do { } while (0) 640 641 642void tipc_link_reset(struct link *l_ptr) 643{ 644 struct sk_buff *buf; 645 u32 prev_state = l_ptr->state; 646 u32 checkpoint = l_ptr->next_in_no; 647 int was_active_link = tipc_link_is_active(l_ptr); 648 649 msg_set_session(l_ptr->pmsg, msg_session(l_ptr->pmsg) + 1); 650 651 /* Link is down, accept any session: */ 652 l_ptr->peer_session = 0; 653 654 /* Prepare for max packet size negotiation */ 655 link_init_max_pkt(l_ptr); 656 657 l_ptr->state = RESET_UNKNOWN; 658 dbg_link_state("Resetting Link\n"); 659 660 if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET)) 661 return; 662 663 tipc_node_link_down(l_ptr->owner, l_ptr); 664 tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr); 665 if (was_active_link && tipc_node_has_active_links(l_ptr->owner) && 666 l_ptr->owner->permit_changeover) { 667 l_ptr->reset_checkpoint = checkpoint; 668 l_ptr->exp_msg_count = START_CHANGEOVER; 669 } 670 671 /* Clean up all queues: */ 672 673 link_release_outqueue(l_ptr); 674 buf_discard(l_ptr->proto_msg_queue); 675 l_ptr->proto_msg_queue = NULL; 676 buf = l_ptr->oldest_deferred_in; 677 while (buf) { 678 struct sk_buff *next = buf->next; 679 buf_discard(buf); 680 buf = next; 681 } 682 if (!list_empty(&l_ptr->waiting_ports)) 683 tipc_link_wakeup_ports(l_ptr, 1); 684 685 l_ptr->retransm_queue_head = 0; 686 l_ptr->retransm_queue_size = 0; 687 l_ptr->last_out = NULL; 688 l_ptr->first_out = NULL; 689 l_ptr->next_out = NULL; 690 l_ptr->unacked_window = 0; 691 l_ptr->checkpoint = 1; 692 l_ptr->next_out_no = 1; 693 l_ptr->deferred_inqueue_sz = 0; 694 l_ptr->oldest_deferred_in = NULL; 695 l_ptr->newest_deferred_in = NULL; 696 l_ptr->fsm_msg_cnt = 0; 697 l_ptr->stale_count = 0; 698 link_reset_statistics(l_ptr); 699 700 link_send_event(tipc_cfg_link_event, l_ptr, 0); 701 if (!in_own_cluster(l_ptr->addr)) 702 link_send_event(tipc_disc_link_event, l_ptr, 0); 703} 704 705 706static void link_activate(struct link *l_ptr) 707{ 708 l_ptr->next_in_no = l_ptr->stats.recv_info = 1; 709 tipc_node_link_up(l_ptr->owner, l_ptr); 710 tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr); 711 link_send_event(tipc_cfg_link_event, l_ptr, 1); 712 if (!in_own_cluster(l_ptr->addr)) 713 link_send_event(tipc_disc_link_event, l_ptr, 1); 714} 715 716/** 717 * link_state_event - link finite state machine 718 * @l_ptr: pointer to link 719 * @event: state machine event to process 720 */ 721 722static void link_state_event(struct link *l_ptr, unsigned event) 723{ 724 struct link *other; 725 u32 cont_intv = l_ptr->continuity_interval; 726 727 if (!l_ptr->started && (event != STARTING_EVT)) 728 return; /* Not yet. */ 729 730 if (link_blocked(l_ptr)) { 731 if (event == TIMEOUT_EVT) { 732 link_set_timer(l_ptr, cont_intv); 733 } 734 return; /* Changeover going on */ 735 } 736 dbg_link("STATE_EV: <%s> ", l_ptr->name); 737 738 switch (l_ptr->state) { 739 case WORKING_WORKING: 740 dbg_link("WW/"); 741 switch (event) { 742 case TRAFFIC_MSG_EVT: 743 dbg_link("TRF-"); 744 /* fall through */ 745 case ACTIVATE_MSG: 746 dbg_link("ACT\n"); 747 break; 748 case TIMEOUT_EVT: 749 dbg_link("TIM "); 750 if (l_ptr->next_in_no != l_ptr->checkpoint) { 751 l_ptr->checkpoint = l_ptr->next_in_no; 752 if (tipc_bclink_acks_missing(l_ptr->owner)) { 753 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 754 0, 0, 0, 0, 0); 755 l_ptr->fsm_msg_cnt++; 756 } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) { 757 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 758 1, 0, 0, 0, 0); 759 l_ptr->fsm_msg_cnt++; 760 } 761 link_set_timer(l_ptr, cont_intv); 762 break; 763 } 764 dbg_link(" -> WU\n"); 765 l_ptr->state = WORKING_UNKNOWN; 766 l_ptr->fsm_msg_cnt = 0; 767 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 768 l_ptr->fsm_msg_cnt++; 769 link_set_timer(l_ptr, cont_intv / 4); 770 break; 771 case RESET_MSG: 772 dbg_link("RES -> RR\n"); 773 info("Resetting link <%s>, requested by peer\n", 774 l_ptr->name); 775 tipc_link_reset(l_ptr); 776 l_ptr->state = RESET_RESET; 777 l_ptr->fsm_msg_cnt = 0; 778 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 779 l_ptr->fsm_msg_cnt++; 780 link_set_timer(l_ptr, cont_intv); 781 break; 782 default: 783 err("Unknown link event %u in WW state\n", event); 784 } 785 break; 786 case WORKING_UNKNOWN: 787 dbg_link("WU/"); 788 switch (event) { 789 case TRAFFIC_MSG_EVT: 790 dbg_link("TRF-"); 791 case ACTIVATE_MSG: 792 dbg_link("ACT -> WW\n"); 793 l_ptr->state = WORKING_WORKING; 794 l_ptr->fsm_msg_cnt = 0; 795 link_set_timer(l_ptr, cont_intv); 796 break; 797 case RESET_MSG: 798 dbg_link("RES -> RR\n"); 799 info("Resetting link <%s>, requested by peer " 800 "while probing\n", l_ptr->name); 801 tipc_link_reset(l_ptr); 802 l_ptr->state = RESET_RESET; 803 l_ptr->fsm_msg_cnt = 0; 804 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 805 l_ptr->fsm_msg_cnt++; 806 link_set_timer(l_ptr, cont_intv); 807 break; 808 case TIMEOUT_EVT: 809 dbg_link("TIM "); 810 if (l_ptr->next_in_no != l_ptr->checkpoint) { 811 dbg_link("-> WW \n"); 812 l_ptr->state = WORKING_WORKING; 813 l_ptr->fsm_msg_cnt = 0; 814 l_ptr->checkpoint = l_ptr->next_in_no; 815 if (tipc_bclink_acks_missing(l_ptr->owner)) { 816 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 817 0, 0, 0, 0, 0); 818 l_ptr->fsm_msg_cnt++; 819 } 820 link_set_timer(l_ptr, cont_intv); 821 } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) { 822 dbg_link("Probing %u/%u,timer = %u ms)\n", 823 l_ptr->fsm_msg_cnt, l_ptr->abort_limit, 824 cont_intv / 4); 825 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 826 1, 0, 0, 0, 0); 827 l_ptr->fsm_msg_cnt++; 828 link_set_timer(l_ptr, cont_intv / 4); 829 } else { /* Link has failed */ 830 dbg_link("-> RU (%u probes unanswered)\n", 831 l_ptr->fsm_msg_cnt); 832 warn("Resetting link <%s>, peer not responding\n", 833 l_ptr->name); 834 tipc_link_reset(l_ptr); 835 l_ptr->state = RESET_UNKNOWN; 836 l_ptr->fsm_msg_cnt = 0; 837 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 838 0, 0, 0, 0, 0); 839 l_ptr->fsm_msg_cnt++; 840 link_set_timer(l_ptr, cont_intv); 841 } 842 break; 843 default: 844 err("Unknown link event %u in WU state\n", event); 845 } 846 break; 847 case RESET_UNKNOWN: 848 dbg_link("RU/"); 849 switch (event) { 850 case TRAFFIC_MSG_EVT: 851 dbg_link("TRF-\n"); 852 break; 853 case ACTIVATE_MSG: 854 other = l_ptr->owner->active_links[0]; 855 if (other && link_working_unknown(other)) { 856 dbg_link("ACT\n"); 857 break; 858 } 859 dbg_link("ACT -> WW\n"); 860 l_ptr->state = WORKING_WORKING; 861 l_ptr->fsm_msg_cnt = 0; 862 link_activate(l_ptr); 863 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 864 l_ptr->fsm_msg_cnt++; 865 link_set_timer(l_ptr, cont_intv); 866 break; 867 case RESET_MSG: 868 dbg_link("RES \n"); 869 dbg_link(" -> RR\n"); 870 l_ptr->state = RESET_RESET; 871 l_ptr->fsm_msg_cnt = 0; 872 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0); 873 l_ptr->fsm_msg_cnt++; 874 link_set_timer(l_ptr, cont_intv); 875 break; 876 case STARTING_EVT: 877 dbg_link("START-"); 878 l_ptr->started = 1; 879 /* fall through */ 880 case TIMEOUT_EVT: 881 dbg_link("TIM \n"); 882 tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0); 883 l_ptr->fsm_msg_cnt++; 884 link_set_timer(l_ptr, cont_intv); 885 break; 886 default: 887 err("Unknown link event %u in RU state\n", event); 888 } 889 break; 890 case RESET_RESET: 891 dbg_link("RR/ "); 892 switch (event) { 893 case TRAFFIC_MSG_EVT: 894 dbg_link("TRF-"); 895 /* fall through */ 896 case ACTIVATE_MSG: 897 other = l_ptr->owner->active_links[0]; 898 if (other && link_working_unknown(other)) { 899 dbg_link("ACT\n"); 900 break; 901 } 902 dbg_link("ACT -> WW\n"); 903 l_ptr->state = WORKING_WORKING; 904 l_ptr->fsm_msg_cnt = 0; 905 link_activate(l_ptr); 906 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 907 l_ptr->fsm_msg_cnt++; 908 link_set_timer(l_ptr, cont_intv); 909 break; 910 case RESET_MSG: 911 dbg_link("RES\n"); 912 break; 913 case TIMEOUT_EVT: 914 dbg_link("TIM\n"); 915 tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0); 916 l_ptr->fsm_msg_cnt++; 917 link_set_timer(l_ptr, cont_intv); 918 dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt); 919 break; 920 default: 921 err("Unknown link event %u in RR state\n", event); 922 } 923 break; 924 default: 925 err("Unknown link state %u/%u\n", l_ptr->state, event); 926 } 927} 928 929/* 930 * link_bundle_buf(): Append contents of a buffer to 931 * the tail of an existing one. 932 */ 933 934static int link_bundle_buf(struct link *l_ptr, 935 struct sk_buff *bundler, 936 struct sk_buff *buf) 937{ 938 struct tipc_msg *bundler_msg = buf_msg(bundler); 939 struct tipc_msg *msg = buf_msg(buf); 940 u32 size = msg_size(msg); 941 u32 bundle_size = msg_size(bundler_msg); 942 u32 to_pos = align(bundle_size); 943 u32 pad = to_pos - bundle_size; 944 945 if (msg_user(bundler_msg) != MSG_BUNDLER) 946 return 0; 947 if (msg_type(bundler_msg) != OPEN_MSG) 948 return 0; 949 if (skb_tailroom(bundler) < (pad + size)) 950 return 0; 951 if (link_max_pkt(l_ptr) < (to_pos + size)) 952 return 0; 953 954 skb_put(bundler, pad + size); 955 skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size); 956 msg_set_size(bundler_msg, to_pos + size); 957 msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1); 958 dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n", 959 msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg)); 960 msg_dbg(msg, "PACKD:"); 961 buf_discard(buf); 962 l_ptr->stats.sent_bundled++; 963 return 1; 964} 965 966static void link_add_to_outqueue(struct link *l_ptr, 967 struct sk_buff *buf, 968 struct tipc_msg *msg) 969{ 970 u32 ack = mod(l_ptr->next_in_no - 1); 971 u32 seqno = mod(l_ptr->next_out_no++); 972 973 msg_set_word(msg, 2, ((ack << 16) | seqno)); 974 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 975 buf->next = NULL; 976 if (l_ptr->first_out) { 977 l_ptr->last_out->next = buf; 978 l_ptr->last_out = buf; 979 } else 980 l_ptr->first_out = l_ptr->last_out = buf; 981 l_ptr->out_queue_size++; 982} 983 984/* 985 * tipc_link_send_buf() is the 'full path' for messages, called from 986 * inside TIPC when the 'fast path' in tipc_send_buf 987 * has failed, and from link_send() 988 */ 989 990int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf) 991{ 992 struct tipc_msg *msg = buf_msg(buf); 993 u32 size = msg_size(msg); 994 u32 dsz = msg_data_sz(msg); 995 u32 queue_size = l_ptr->out_queue_size; 996 u32 imp = msg_tot_importance(msg); 997 u32 queue_limit = l_ptr->queue_limit[imp]; 998 u32 max_packet = link_max_pkt(l_ptr); 999 1000 msg_set_prevnode(msg, tipc_own_addr); /* If routed message */ 1001 1002 /* Match msg importance against queue limits: */ 1003 1004 if (unlikely(queue_size >= queue_limit)) { 1005 if (imp <= TIPC_CRITICAL_IMPORTANCE) { 1006 return link_schedule_port(l_ptr, msg_origport(msg), 1007 size); 1008 } 1009 msg_dbg(msg, "TIPC: Congestion, throwing away\n"); 1010 buf_discard(buf); 1011 if (imp > CONN_MANAGER) { 1012 warn("Resetting link <%s>, send queue full", l_ptr->name); 1013 tipc_link_reset(l_ptr); 1014 } 1015 return dsz; 1016 } 1017 1018 /* Fragmentation needed ? */ 1019 1020 if (size > max_packet) 1021 return tipc_link_send_long_buf(l_ptr, buf); 1022 1023 /* Packet can be queued or sent: */ 1024 1025 if (queue_size > l_ptr->stats.max_queue_sz) 1026 l_ptr->stats.max_queue_sz = queue_size; 1027 1028 if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && 1029 !link_congested(l_ptr))) { 1030 link_add_to_outqueue(l_ptr, buf, msg); 1031 1032 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { 1033 l_ptr->unacked_window = 0; 1034 } else { 1035 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1036 l_ptr->stats.bearer_congs++; 1037 l_ptr->next_out = buf; 1038 } 1039 return dsz; 1040 } 1041 /* Congestion: can message be bundled ?: */ 1042 1043 if ((msg_user(msg) != CHANGEOVER_PROTOCOL) && 1044 (msg_user(msg) != MSG_FRAGMENTER)) { 1045 1046 /* Try adding message to an existing bundle */ 1047 1048 if (l_ptr->next_out && 1049 link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { 1050 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); 1051 return dsz; 1052 } 1053 1054 /* Try creating a new bundle */ 1055 1056 if (size <= max_packet * 2 / 3) { 1057 struct sk_buff *bundler = buf_acquire(max_packet); 1058 struct tipc_msg bundler_hdr; 1059 1060 if (bundler) { 1061 msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG, 1062 TIPC_OK, INT_H_SIZE, l_ptr->addr); 1063 skb_copy_to_linear_data(bundler, &bundler_hdr, 1064 INT_H_SIZE); 1065 skb_trim(bundler, INT_H_SIZE); 1066 link_bundle_buf(l_ptr, bundler, buf); 1067 buf = bundler; 1068 msg = buf_msg(buf); 1069 l_ptr->stats.sent_bundles++; 1070 } 1071 } 1072 } 1073 if (!l_ptr->next_out) 1074 l_ptr->next_out = buf; 1075 link_add_to_outqueue(l_ptr, buf, msg); 1076 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr); 1077 return dsz; 1078} 1079 1080/* 1081 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has 1082 * not been selected yet, and the the owner node is not locked 1083 * Called by TIPC internal users, e.g. the name distributor 1084 */ 1085 1086int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector) 1087{ 1088 struct link *l_ptr; 1089 struct node *n_ptr; 1090 int res = -ELINKCONG; 1091 1092 read_lock_bh(&tipc_net_lock); 1093 n_ptr = tipc_node_select(dest, selector); 1094 if (n_ptr) { 1095 tipc_node_lock(n_ptr); 1096 l_ptr = n_ptr->active_links[selector & 1]; 1097 if (l_ptr) { 1098 dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest); 1099 res = tipc_link_send_buf(l_ptr, buf); 1100 } else { 1101 dbg("Attempt to send msg to unreachable node:\n"); 1102 msg_dbg(buf_msg(buf),">>>"); 1103 buf_discard(buf); 1104 } 1105 tipc_node_unlock(n_ptr); 1106 } else { 1107 dbg("Attempt to send msg to unknown node:\n"); 1108 msg_dbg(buf_msg(buf),">>>"); 1109 buf_discard(buf); 1110 } 1111 read_unlock_bh(&tipc_net_lock); 1112 return res; 1113} 1114 1115/* 1116 * link_send_buf_fast: Entry for data messages where the 1117 * destination link is known and the header is complete, 1118 * inclusive total message length. Very time critical. 1119 * Link is locked. Returns user data length. 1120 */ 1121 1122static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf, 1123 u32 *used_max_pkt) 1124{ 1125 struct tipc_msg *msg = buf_msg(buf); 1126 int res = msg_data_sz(msg); 1127 1128 if (likely(!link_congested(l_ptr))) { 1129 if (likely(msg_size(msg) <= link_max_pkt(l_ptr))) { 1130 if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { 1131 link_add_to_outqueue(l_ptr, buf, msg); 1132 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, 1133 &l_ptr->media_addr))) { 1134 l_ptr->unacked_window = 0; 1135 msg_dbg(msg,"SENT_FAST:"); 1136 return res; 1137 } 1138 dbg("failed sent fast...\n"); 1139 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1140 l_ptr->stats.bearer_congs++; 1141 l_ptr->next_out = buf; 1142 return res; 1143 } 1144 } 1145 else 1146 *used_max_pkt = link_max_pkt(l_ptr); 1147 } 1148 return tipc_link_send_buf(l_ptr, buf); /* All other cases */ 1149} 1150 1151/* 1152 * tipc_send_buf_fast: Entry for data messages where the 1153 * destination node is known and the header is complete, 1154 * inclusive total message length. 1155 * Returns user data length. 1156 */ 1157int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode) 1158{ 1159 struct link *l_ptr; 1160 struct node *n_ptr; 1161 int res; 1162 u32 selector = msg_origport(buf_msg(buf)) & 1; 1163 u32 dummy; 1164 1165 if (destnode == tipc_own_addr) 1166 return tipc_port_recv_msg(buf); 1167 1168 read_lock_bh(&tipc_net_lock); 1169 n_ptr = tipc_node_select(destnode, selector); 1170 if (likely(n_ptr)) { 1171 tipc_node_lock(n_ptr); 1172 l_ptr = n_ptr->active_links[selector]; 1173 dbg("send_fast: buf %x selected %x, destnode = %x\n", 1174 buf, l_ptr, destnode); 1175 if (likely(l_ptr)) { 1176 res = link_send_buf_fast(l_ptr, buf, &dummy); 1177 tipc_node_unlock(n_ptr); 1178 read_unlock_bh(&tipc_net_lock); 1179 return res; 1180 } 1181 tipc_node_unlock(n_ptr); 1182 } 1183 read_unlock_bh(&tipc_net_lock); 1184 res = msg_data_sz(buf_msg(buf)); 1185 tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1186 return res; 1187} 1188 1189 1190/* 1191 * tipc_link_send_sections_fast: Entry for messages where the 1192 * destination processor is known and the header is complete, 1193 * except for total message length. 1194 * Returns user data length or errno. 1195 */ 1196int tipc_link_send_sections_fast(struct port *sender, 1197 struct iovec const *msg_sect, 1198 const u32 num_sect, 1199 u32 destaddr) 1200{ 1201 struct tipc_msg *hdr = &sender->publ.phdr; 1202 struct link *l_ptr; 1203 struct sk_buff *buf; 1204 struct node *node; 1205 int res; 1206 u32 selector = msg_origport(hdr) & 1; 1207 1208again: 1209 /* 1210 * Try building message using port's max_pkt hint. 1211 * (Must not hold any locks while building message.) 1212 */ 1213 1214 res = msg_build(hdr, msg_sect, num_sect, sender->max_pkt, 1215 !sender->user_port, &buf); 1216 1217 read_lock_bh(&tipc_net_lock); 1218 node = tipc_node_select(destaddr, selector); 1219 if (likely(node)) { 1220 tipc_node_lock(node); 1221 l_ptr = node->active_links[selector]; 1222 if (likely(l_ptr)) { 1223 if (likely(buf)) { 1224 res = link_send_buf_fast(l_ptr, buf, 1225 &sender->max_pkt); 1226 if (unlikely(res < 0)) 1227 buf_discard(buf); 1228exit: 1229 tipc_node_unlock(node); 1230 read_unlock_bh(&tipc_net_lock); 1231 return res; 1232 } 1233 1234 /* Exit if build request was invalid */ 1235 1236 if (unlikely(res < 0)) 1237 goto exit; 1238 1239 /* Exit if link (or bearer) is congested */ 1240 1241 if (link_congested(l_ptr) || 1242 !list_empty(&l_ptr->b_ptr->cong_links)) { 1243 res = link_schedule_port(l_ptr, 1244 sender->publ.ref, res); 1245 goto exit; 1246 } 1247 1248 /* 1249 * Message size exceeds max_pkt hint; update hint, 1250 * then re-try fast path or fragment the message 1251 */ 1252 1253 sender->max_pkt = link_max_pkt(l_ptr); 1254 tipc_node_unlock(node); 1255 read_unlock_bh(&tipc_net_lock); 1256 1257 1258 if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt) 1259 goto again; 1260 1261 return link_send_sections_long(sender, msg_sect, 1262 num_sect, destaddr); 1263 } 1264 tipc_node_unlock(node); 1265 } 1266 read_unlock_bh(&tipc_net_lock); 1267 1268 /* Couldn't find a link to the destination node */ 1269 1270 if (buf) 1271 return tipc_reject_msg(buf, TIPC_ERR_NO_NODE); 1272 if (res >= 0) 1273 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1274 TIPC_ERR_NO_NODE); 1275 return res; 1276} 1277 1278/* 1279 * link_send_sections_long(): Entry for long messages where the 1280 * destination node is known and the header is complete, 1281 * inclusive total message length. 1282 * Link and bearer congestion status have been checked to be ok, 1283 * and are ignored if they change. 1284 * 1285 * Note that fragments do not use the full link MTU so that they won't have 1286 * to undergo refragmentation if link changeover causes them to be sent 1287 * over another link with an additional tunnel header added as prefix. 1288 * (Refragmentation will still occur if the other link has a smaller MTU.) 1289 * 1290 * Returns user data length or errno. 1291 */ 1292static int link_send_sections_long(struct port *sender, 1293 struct iovec const *msg_sect, 1294 u32 num_sect, 1295 u32 destaddr) 1296{ 1297 struct link *l_ptr; 1298 struct node *node; 1299 struct tipc_msg *hdr = &sender->publ.phdr; 1300 u32 dsz = msg_data_sz(hdr); 1301 u32 max_pkt,fragm_sz,rest; 1302 struct tipc_msg fragm_hdr; 1303 struct sk_buff *buf,*buf_chain,*prev; 1304 u32 fragm_crs,fragm_rest,hsz,sect_rest; 1305 const unchar *sect_crs; 1306 int curr_sect; 1307 u32 fragm_no; 1308 1309again: 1310 fragm_no = 1; 1311 max_pkt = sender->max_pkt - INT_H_SIZE; 1312 /* leave room for tunnel header in case of link changeover */ 1313 fragm_sz = max_pkt - INT_H_SIZE; 1314 /* leave room for fragmentation header in each fragment */ 1315 rest = dsz; 1316 fragm_crs = 0; 1317 fragm_rest = 0; 1318 sect_rest = 0; 1319 sect_crs = NULL; 1320 curr_sect = -1; 1321 1322 /* Prepare reusable fragment header: */ 1323 1324 msg_dbg(hdr, ">FRAGMENTING>"); 1325 msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 1326 TIPC_OK, INT_H_SIZE, msg_destnode(hdr)); 1327 msg_set_link_selector(&fragm_hdr, sender->publ.ref); 1328 msg_set_size(&fragm_hdr, max_pkt); 1329 msg_set_fragm_no(&fragm_hdr, 1); 1330 1331 /* Prepare header of first fragment: */ 1332 1333 buf_chain = buf = buf_acquire(max_pkt); 1334 if (!buf) 1335 return -ENOMEM; 1336 buf->next = NULL; 1337 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); 1338 hsz = msg_hdr_sz(hdr); 1339 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz); 1340 msg_dbg(buf_msg(buf), ">BUILD>"); 1341 1342 /* Chop up message: */ 1343 1344 fragm_crs = INT_H_SIZE + hsz; 1345 fragm_rest = fragm_sz - hsz; 1346 1347 do { /* For all sections */ 1348 u32 sz; 1349 1350 if (!sect_rest) { 1351 sect_rest = msg_sect[++curr_sect].iov_len; 1352 sect_crs = (const unchar *)msg_sect[curr_sect].iov_base; 1353 } 1354 1355 if (sect_rest < fragm_rest) 1356 sz = sect_rest; 1357 else 1358 sz = fragm_rest; 1359 1360 if (likely(!sender->user_port)) { 1361 if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) { 1362error: 1363 for (; buf_chain; buf_chain = buf) { 1364 buf = buf_chain->next; 1365 buf_discard(buf_chain); 1366 } 1367 return -EFAULT; 1368 } 1369 } else 1370 skb_copy_to_linear_data_offset(buf, fragm_crs, 1371 sect_crs, sz); 1372 sect_crs += sz; 1373 sect_rest -= sz; 1374 fragm_crs += sz; 1375 fragm_rest -= sz; 1376 rest -= sz; 1377 1378 if (!fragm_rest && rest) { 1379 1380 /* Initiate new fragment: */ 1381 if (rest <= fragm_sz) { 1382 fragm_sz = rest; 1383 msg_set_type(&fragm_hdr,LAST_FRAGMENT); 1384 } else { 1385 msg_set_type(&fragm_hdr, FRAGMENT); 1386 } 1387 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 1388 msg_set_fragm_no(&fragm_hdr, ++fragm_no); 1389 prev = buf; 1390 buf = buf_acquire(fragm_sz + INT_H_SIZE); 1391 if (!buf) 1392 goto error; 1393 1394 buf->next = NULL; 1395 prev->next = buf; 1396 skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE); 1397 fragm_crs = INT_H_SIZE; 1398 fragm_rest = fragm_sz; 1399 msg_dbg(buf_msg(buf)," >BUILD>"); 1400 } 1401 } 1402 while (rest > 0); 1403 1404 /* 1405 * Now we have a buffer chain. Select a link and check 1406 * that packet size is still OK 1407 */ 1408 node = tipc_node_select(destaddr, sender->publ.ref & 1); 1409 if (likely(node)) { 1410 tipc_node_lock(node); 1411 l_ptr = node->active_links[sender->publ.ref & 1]; 1412 if (!l_ptr) { 1413 tipc_node_unlock(node); 1414 goto reject; 1415 } 1416 if (link_max_pkt(l_ptr) < max_pkt) { 1417 sender->max_pkt = link_max_pkt(l_ptr); 1418 tipc_node_unlock(node); 1419 for (; buf_chain; buf_chain = buf) { 1420 buf = buf_chain->next; 1421 buf_discard(buf_chain); 1422 } 1423 goto again; 1424 } 1425 } else { 1426reject: 1427 for (; buf_chain; buf_chain = buf) { 1428 buf = buf_chain->next; 1429 buf_discard(buf_chain); 1430 } 1431 return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect, 1432 TIPC_ERR_NO_NODE); 1433 } 1434 1435 /* Append whole chain to send queue: */ 1436 1437 buf = buf_chain; 1438 l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1); 1439 if (!l_ptr->next_out) 1440 l_ptr->next_out = buf_chain; 1441 l_ptr->stats.sent_fragmented++; 1442 while (buf) { 1443 struct sk_buff *next = buf->next; 1444 struct tipc_msg *msg = buf_msg(buf); 1445 1446 l_ptr->stats.sent_fragments++; 1447 msg_set_long_msgno(msg, l_ptr->long_msg_seq_no); 1448 link_add_to_outqueue(l_ptr, buf, msg); 1449 msg_dbg(msg, ">ADD>"); 1450 buf = next; 1451 } 1452 1453 /* Send it, if possible: */ 1454 1455 tipc_link_push_queue(l_ptr); 1456 tipc_node_unlock(node); 1457 return dsz; 1458} 1459 1460/* 1461 * tipc_link_push_packet: Push one unsent packet to the media 1462 */ 1463u32 tipc_link_push_packet(struct link *l_ptr) 1464{ 1465 struct sk_buff *buf = l_ptr->first_out; 1466 u32 r_q_size = l_ptr->retransm_queue_size; 1467 u32 r_q_head = l_ptr->retransm_queue_head; 1468 1469 /* Step to position where retransmission failed, if any, */ 1470 /* consider that buffers may have been released in meantime */ 1471 1472 if (r_q_size && buf) { 1473 u32 last = lesser(mod(r_q_head + r_q_size), 1474 link_last_sent(l_ptr)); 1475 u32 first = msg_seqno(buf_msg(buf)); 1476 1477 while (buf && less(first, r_q_head)) { 1478 first = mod(first + 1); 1479 buf = buf->next; 1480 } 1481 l_ptr->retransm_queue_head = r_q_head = first; 1482 l_ptr->retransm_queue_size = r_q_size = mod(last - first); 1483 } 1484 1485 /* Continue retransmission now, if there is anything: */ 1486 1487 if (r_q_size && buf && !skb_cloned(buf)) { 1488 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1489 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 1490 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1491 msg_dbg(buf_msg(buf), ">DEF-RETR>"); 1492 l_ptr->retransm_queue_head = mod(++r_q_head); 1493 l_ptr->retransm_queue_size = --r_q_size; 1494 l_ptr->stats.retransmitted++; 1495 return TIPC_OK; 1496 } else { 1497 l_ptr->stats.bearer_congs++; 1498 msg_dbg(buf_msg(buf), "|>DEF-RETR>"); 1499 return PUSH_FAILED; 1500 } 1501 } 1502 1503 /* Send deferred protocol message, if any: */ 1504 1505 buf = l_ptr->proto_msg_queue; 1506 if (buf) { 1507 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1508 msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in); 1509 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1510 msg_dbg(buf_msg(buf), ">DEF-PROT>"); 1511 l_ptr->unacked_window = 0; 1512 buf_discard(buf); 1513 l_ptr->proto_msg_queue = NULL; 1514 return TIPC_OK; 1515 } else { 1516 msg_dbg(buf_msg(buf), "|>DEF-PROT>"); 1517 l_ptr->stats.bearer_congs++; 1518 return PUSH_FAILED; 1519 } 1520 } 1521 1522 /* Send one deferred data message, if send window not full: */ 1523 1524 buf = l_ptr->next_out; 1525 if (buf) { 1526 struct tipc_msg *msg = buf_msg(buf); 1527 u32 next = msg_seqno(msg); 1528 u32 first = msg_seqno(buf_msg(l_ptr->first_out)); 1529 1530 if (mod(next - first) < l_ptr->queue_limit[0]) { 1531 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1532 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1533 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1534 if (msg_user(msg) == MSG_BUNDLER) 1535 msg_set_type(msg, CLOSED_MSG); 1536 msg_dbg(msg, ">PUSH-DATA>"); 1537 l_ptr->next_out = buf->next; 1538 return TIPC_OK; 1539 } else { 1540 msg_dbg(msg, "|PUSH-DATA|"); 1541 l_ptr->stats.bearer_congs++; 1542 return PUSH_FAILED; 1543 } 1544 } 1545 } 1546 return PUSH_FINISHED; 1547} 1548 1549/* 1550 * push_queue(): push out the unsent messages of a link where 1551 * congestion has abated. Node is locked 1552 */ 1553void tipc_link_push_queue(struct link *l_ptr) 1554{ 1555 u32 res; 1556 1557 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) 1558 return; 1559 1560 do { 1561 res = tipc_link_push_packet(l_ptr); 1562 } 1563 while (res == TIPC_OK); 1564 if (res == PUSH_FAILED) 1565 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1566} 1567 1568static void link_reset_all(unsigned long addr) 1569{ 1570 struct node *n_ptr; 1571 char addr_string[16]; 1572 u32 i; 1573 1574 read_lock_bh(&tipc_net_lock); 1575 n_ptr = tipc_node_find((u32)addr); 1576 if (!n_ptr) { 1577 read_unlock_bh(&tipc_net_lock); 1578 return; /* node no longer exists */ 1579 } 1580 1581 tipc_node_lock(n_ptr); 1582 1583 warn("Resetting all links to %s\n", 1584 addr_string_fill(addr_string, n_ptr->addr)); 1585 1586 for (i = 0; i < MAX_BEARERS; i++) { 1587 if (n_ptr->links[i]) { 1588 link_print(n_ptr->links[i], TIPC_OUTPUT, 1589 "Resetting link\n"); 1590 tipc_link_reset(n_ptr->links[i]); 1591 } 1592 } 1593 1594 tipc_node_unlock(n_ptr); 1595 read_unlock_bh(&tipc_net_lock); 1596} 1597 1598static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf) 1599{ 1600 struct tipc_msg *msg = buf_msg(buf); 1601 1602 warn("Retransmission failure on link <%s>\n", l_ptr->name); 1603 tipc_msg_print(TIPC_OUTPUT, msg, ">RETR-FAIL>"); 1604 1605 if (l_ptr->addr) { 1606 1607 /* Handle failure on standard link */ 1608 1609 link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n"); 1610 tipc_link_reset(l_ptr); 1611 1612 } else { 1613 1614 /* Handle failure on broadcast link */ 1615 1616 struct node *n_ptr; 1617 char addr_string[16]; 1618 1619 tipc_printf(TIPC_OUTPUT, "Msg seq number: %u, ", msg_seqno(msg)); 1620 tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n", 1621 (unsigned long) TIPC_SKB_CB(buf)->handle); 1622 1623 n_ptr = l_ptr->owner->next; 1624 tipc_node_lock(n_ptr); 1625 1626 addr_string_fill(addr_string, n_ptr->addr); 1627 tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string); 1628 tipc_printf(TIPC_OUTPUT, "Supported: %d, ", n_ptr->bclink.supported); 1629 tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked); 1630 tipc_printf(TIPC_OUTPUT, "Last in: %u, ", n_ptr->bclink.last_in); 1631 tipc_printf(TIPC_OUTPUT, "Gap after: %u, ", n_ptr->bclink.gap_after); 1632 tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to); 1633 tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync); 1634 1635 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr); 1636 1637 tipc_node_unlock(n_ptr); 1638 1639 l_ptr->stale_count = 0; 1640 } 1641} 1642 1643void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf, 1644 u32 retransmits) 1645{ 1646 struct tipc_msg *msg; 1647 1648 if (!buf) 1649 return; 1650 1651 msg = buf_msg(buf); 1652 1653 dbg("Retransmitting %u in link %x\n", retransmits, l_ptr); 1654 1655 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 1656 if (!skb_cloned(buf)) { 1657 msg_dbg(msg, ">NO_RETR->BCONG>"); 1658 dbg_print_link(l_ptr, " "); 1659 l_ptr->retransm_queue_head = msg_seqno(msg); 1660 l_ptr->retransm_queue_size = retransmits; 1661 return; 1662 } else { 1663 /* Don't retransmit if driver already has the buffer */ 1664 } 1665 } else { 1666 /* Detect repeated retransmit failures on uncongested bearer */ 1667 1668 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 1669 if (++l_ptr->stale_count > 100) { 1670 link_retransmit_failure(l_ptr, buf); 1671 return; 1672 } 1673 } else { 1674 l_ptr->last_retransmitted = msg_seqno(msg); 1675 l_ptr->stale_count = 1; 1676 } 1677 } 1678 1679 while (retransmits && (buf != l_ptr->next_out) && buf && !skb_cloned(buf)) { 1680 msg = buf_msg(buf); 1681 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1682 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1683 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1684 msg_dbg(buf_msg(buf), ">RETR>"); 1685 buf = buf->next; 1686 retransmits--; 1687 l_ptr->stats.retransmitted++; 1688 } else { 1689 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 1690 l_ptr->stats.bearer_congs++; 1691 l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf)); 1692 l_ptr->retransm_queue_size = retransmits; 1693 return; 1694 } 1695 } 1696 1697 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; 1698} 1699 1700/* 1701 * link_recv_non_seq: Receive packets which are outside 1702 * the link sequence flow 1703 */ 1704 1705static void link_recv_non_seq(struct sk_buff *buf) 1706{ 1707 struct tipc_msg *msg = buf_msg(buf); 1708 1709 if (msg_user(msg) == LINK_CONFIG) 1710 tipc_disc_recv_msg(buf); 1711 else 1712 tipc_bclink_recv_pkt(buf); 1713} 1714 1715/** 1716 * link_insert_deferred_queue - insert deferred messages back into receive chain 1717 */ 1718 1719static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr, 1720 struct sk_buff *buf) 1721{ 1722 u32 seq_no; 1723 1724 if (l_ptr->oldest_deferred_in == NULL) 1725 return buf; 1726 1727 seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 1728 if (seq_no == mod(l_ptr->next_in_no)) { 1729 l_ptr->newest_deferred_in->next = buf; 1730 buf = l_ptr->oldest_deferred_in; 1731 l_ptr->oldest_deferred_in = NULL; 1732 l_ptr->deferred_inqueue_sz = 0; 1733 } 1734 return buf; 1735} 1736 1737void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr) 1738{ 1739 read_lock_bh(&tipc_net_lock); 1740 while (head) { 1741 struct bearer *b_ptr; 1742 struct node *n_ptr; 1743 struct link *l_ptr; 1744 struct sk_buff *crs; 1745 struct sk_buff *buf = head; 1746 struct tipc_msg *msg = buf_msg(buf); 1747 u32 seq_no = msg_seqno(msg); 1748 u32 ackd = msg_ack(msg); 1749 u32 released = 0; 1750 int type; 1751 1752 b_ptr = (struct bearer *)tb_ptr; 1753 TIPC_SKB_CB(buf)->handle = b_ptr; 1754 1755 head = head->next; 1756 if (unlikely(msg_version(msg) != TIPC_VERSION)) 1757 goto cont; 1758 msg_dbg(msg,"<REC<"); 1759 1760 if (unlikely(msg_non_seq(msg))) { 1761 link_recv_non_seq(buf); 1762 continue; 1763 } 1764 1765 if (unlikely(!msg_short(msg) && 1766 (msg_destnode(msg) != tipc_own_addr))) 1767 goto cont; 1768 1769 n_ptr = tipc_node_find(msg_prevnode(msg)); 1770 if (unlikely(!n_ptr)) 1771 goto cont; 1772 1773 tipc_node_lock(n_ptr); 1774 l_ptr = n_ptr->links[b_ptr->identity]; 1775 if (unlikely(!l_ptr)) { 1776 tipc_node_unlock(n_ptr); 1777 goto cont; 1778 } 1779 /* 1780 * Release acked messages 1781 */ 1782 if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) { 1783 if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported) 1784 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1785 } 1786 1787 crs = l_ptr->first_out; 1788 while ((crs != l_ptr->next_out) && 1789 less_eq(msg_seqno(buf_msg(crs)), ackd)) { 1790 struct sk_buff *next = crs->next; 1791 1792 buf_discard(crs); 1793 crs = next; 1794 released++; 1795 } 1796 if (released) { 1797 l_ptr->first_out = crs; 1798 l_ptr->out_queue_size -= released; 1799 } 1800 if (unlikely(l_ptr->next_out)) 1801 tipc_link_push_queue(l_ptr); 1802 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1803 tipc_link_wakeup_ports(l_ptr, 0); 1804 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1805 l_ptr->stats.sent_acks++; 1806 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1807 } 1808 1809protocol_check: 1810 if (likely(link_working_working(l_ptr))) { 1811 if (likely(seq_no == mod(l_ptr->next_in_no))) { 1812 l_ptr->next_in_no++; 1813 if (unlikely(l_ptr->oldest_deferred_in)) 1814 head = link_insert_deferred_queue(l_ptr, 1815 head); 1816 if (likely(msg_is_dest(msg, tipc_own_addr))) { 1817deliver: 1818 if (likely(msg_isdata(msg))) { 1819 tipc_node_unlock(n_ptr); 1820 tipc_port_recv_msg(buf); 1821 continue; 1822 } 1823 switch (msg_user(msg)) { 1824 case MSG_BUNDLER: 1825 l_ptr->stats.recv_bundles++; 1826 l_ptr->stats.recv_bundled += 1827 msg_msgcnt(msg); 1828 tipc_node_unlock(n_ptr); 1829 tipc_link_recv_bundle(buf); 1830 continue; 1831 case ROUTE_DISTRIBUTOR: 1832 tipc_node_unlock(n_ptr); 1833 tipc_cltr_recv_routing_table(buf); 1834 continue; 1835 case NAME_DISTRIBUTOR: 1836 tipc_node_unlock(n_ptr); 1837 tipc_named_recv(buf); 1838 continue; 1839 case CONN_MANAGER: 1840 tipc_node_unlock(n_ptr); 1841 tipc_port_recv_proto_msg(buf); 1842 continue; 1843 case MSG_FRAGMENTER: 1844 l_ptr->stats.recv_fragments++; 1845 if (tipc_link_recv_fragment(&l_ptr->defragm_buf, 1846 &buf, &msg)) { 1847 l_ptr->stats.recv_fragmented++; 1848 goto deliver; 1849 } 1850 break; 1851 case CHANGEOVER_PROTOCOL: 1852 type = msg_type(msg); 1853 if (link_recv_changeover_msg(&l_ptr, &buf)) { 1854 msg = buf_msg(buf); 1855 seq_no = msg_seqno(msg); 1856 TIPC_SKB_CB(buf)->handle 1857 = b_ptr; 1858 if (type == ORIGINAL_MSG) 1859 goto deliver; 1860 goto protocol_check; 1861 } 1862 break; 1863 } 1864 } 1865 tipc_node_unlock(n_ptr); 1866 tipc_net_route_msg(buf); 1867 continue; 1868 } 1869 link_handle_out_of_seq_msg(l_ptr, buf); 1870 head = link_insert_deferred_queue(l_ptr, head); 1871 tipc_node_unlock(n_ptr); 1872 continue; 1873 } 1874 1875 if (msg_user(msg) == LINK_PROTOCOL) { 1876 link_recv_proto_msg(l_ptr, buf); 1877 head = link_insert_deferred_queue(l_ptr, head); 1878 tipc_node_unlock(n_ptr); 1879 continue; 1880 } 1881 msg_dbg(msg,"NSEQ<REC<"); 1882 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 1883 1884 if (link_working_working(l_ptr)) { 1885 /* Re-insert in front of queue */ 1886 msg_dbg(msg,"RECV-REINS:"); 1887 buf->next = head; 1888 head = buf; 1889 tipc_node_unlock(n_ptr); 1890 continue; 1891 } 1892 tipc_node_unlock(n_ptr); 1893cont: 1894 buf_discard(buf); 1895 } 1896 read_unlock_bh(&tipc_net_lock); 1897} 1898 1899/* 1900 * link_defer_buf(): Sort a received out-of-sequence packet 1901 * into the deferred reception queue. 1902 * Returns the increase of the queue length,i.e. 0 or 1 1903 */ 1904 1905u32 tipc_link_defer_pkt(struct sk_buff **head, 1906 struct sk_buff **tail, 1907 struct sk_buff *buf) 1908{ 1909 struct sk_buff *prev = NULL; 1910 struct sk_buff *crs = *head; 1911 u32 seq_no = msg_seqno(buf_msg(buf)); 1912 1913 buf->next = NULL; 1914 1915 /* Empty queue ? */ 1916 if (*head == NULL) { 1917 *head = *tail = buf; 1918 return 1; 1919 } 1920 1921 /* Last ? */ 1922 if (less(msg_seqno(buf_msg(*tail)), seq_no)) { 1923 (*tail)->next = buf; 1924 *tail = buf; 1925 return 1; 1926 } 1927 1928 /* Scan through queue and sort it in */ 1929 do { 1930 struct tipc_msg *msg = buf_msg(crs); 1931 1932 if (less(seq_no, msg_seqno(msg))) { 1933 buf->next = crs; 1934 if (prev) 1935 prev->next = buf; 1936 else 1937 *head = buf; 1938 return 1; 1939 } 1940 if (seq_no == msg_seqno(msg)) { 1941 break; 1942 } 1943 prev = crs; 1944 crs = crs->next; 1945 } 1946 while (crs); 1947 1948 /* Message is a duplicate of an existing message */ 1949 1950 buf_discard(buf); 1951 return 0; 1952} 1953 1954/** 1955 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet 1956 */ 1957 1958static void link_handle_out_of_seq_msg(struct link *l_ptr, 1959 struct sk_buff *buf) 1960{ 1961 u32 seq_no = msg_seqno(buf_msg(buf)); 1962 1963 if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) { 1964 link_recv_proto_msg(l_ptr, buf); 1965 return; 1966 } 1967 1968 dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n", 1969 seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no); 1970 1971 /* Record OOS packet arrival (force mismatch on next timeout) */ 1972 1973 l_ptr->checkpoint--; 1974 1975 /* 1976 * Discard packet if a duplicate; otherwise add it to deferred queue 1977 * and notify peer of gap as per protocol specification 1978 */ 1979 1980 if (less(seq_no, mod(l_ptr->next_in_no))) { 1981 l_ptr->stats.duplicates++; 1982 buf_discard(buf); 1983 return; 1984 } 1985 1986 if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, 1987 &l_ptr->newest_deferred_in, buf)) { 1988 l_ptr->deferred_inqueue_sz++; 1989 l_ptr->stats.deferred_recv++; 1990 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 1991 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1992 } else 1993 l_ptr->stats.duplicates++; 1994} 1995 1996/* 1997 * Send protocol message to the other endpoint. 1998 */ 1999void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg, 2000 u32 gap, u32 tolerance, u32 priority, u32 ack_mtu) 2001{ 2002 struct sk_buff *buf = NULL; 2003 struct tipc_msg *msg = l_ptr->pmsg; 2004 u32 msg_size = sizeof(l_ptr->proto_msg); 2005 2006 if (link_blocked(l_ptr)) 2007 return; 2008 msg_set_type(msg, msg_typ); 2009 msg_set_net_plane(msg, l_ptr->b_ptr->net_plane); 2010 msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in)); 2011 msg_set_last_bcast(msg, tipc_bclink_get_last_sent()); 2012 2013 if (msg_typ == STATE_MSG) { 2014 u32 next_sent = mod(l_ptr->next_out_no); 2015 2016 if (!tipc_link_is_up(l_ptr)) 2017 return; 2018 if (l_ptr->next_out) 2019 next_sent = msg_seqno(buf_msg(l_ptr->next_out)); 2020 msg_set_next_sent(msg, next_sent); 2021 if (l_ptr->oldest_deferred_in) { 2022 u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 2023 gap = mod(rec - mod(l_ptr->next_in_no)); 2024 } 2025 msg_set_seq_gap(msg, gap); 2026 if (gap) 2027 l_ptr->stats.sent_nacks++; 2028 msg_set_link_tolerance(msg, tolerance); 2029 msg_set_linkprio(msg, priority); 2030 msg_set_max_pkt(msg, ack_mtu); 2031 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 2032 msg_set_probe(msg, probe_msg != 0); 2033 if (probe_msg) { 2034 u32 mtu = l_ptr->max_pkt; 2035 2036 if ((mtu < l_ptr->max_pkt_target) && 2037 link_working_working(l_ptr) && 2038 l_ptr->fsm_msg_cnt) { 2039 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 2040 if (l_ptr->max_pkt_probes == 10) { 2041 l_ptr->max_pkt_target = (msg_size - 4); 2042 l_ptr->max_pkt_probes = 0; 2043 msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3; 2044 } 2045 l_ptr->max_pkt_probes++; 2046 } 2047 2048 l_ptr->stats.sent_probes++; 2049 } 2050 l_ptr->stats.sent_states++; 2051 } else { /* RESET_MSG or ACTIVATE_MSG */ 2052 msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1)); 2053 msg_set_seq_gap(msg, 0); 2054 msg_set_next_sent(msg, 1); 2055 msg_set_link_tolerance(msg, l_ptr->tolerance); 2056 msg_set_linkprio(msg, l_ptr->priority); 2057 msg_set_max_pkt(msg, l_ptr->max_pkt_target); 2058 } 2059 2060 if (tipc_node_has_redundant_links(l_ptr->owner)) { 2061 msg_set_redundant_link(msg); 2062 } else { 2063 msg_clear_redundant_link(msg); 2064 } 2065 msg_set_linkprio(msg, l_ptr->priority); 2066 2067 /* Ensure sequence number will not fit : */ 2068 2069 msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2))); 2070 2071 /* Congestion? */ 2072 2073 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 2074 if (!l_ptr->proto_msg_queue) { 2075 l_ptr->proto_msg_queue = 2076 buf_acquire(sizeof(l_ptr->proto_msg)); 2077 } 2078 buf = l_ptr->proto_msg_queue; 2079 if (!buf) 2080 return; 2081 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 2082 return; 2083 } 2084 msg_set_timestamp(msg, jiffies_to_msecs(jiffies)); 2085 2086 /* Message can be sent */ 2087 2088 msg_dbg(msg, ">>"); 2089 2090 buf = buf_acquire(msg_size); 2091 if (!buf) 2092 return; 2093 2094 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 2095 msg_set_size(buf_msg(buf), msg_size); 2096 2097 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 2098 l_ptr->unacked_window = 0; 2099 buf_discard(buf); 2100 return; 2101 } 2102 2103 /* New congestion */ 2104 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr); 2105 l_ptr->proto_msg_queue = buf; 2106 l_ptr->stats.bearer_congs++; 2107} 2108 2109/* 2110 * Receive protocol message : 2111 * Note that network plane id propagates through the network, and may 2112 * change at any time. The node with lowest address rules 2113 */ 2114 2115static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf) 2116{ 2117 u32 rec_gap = 0; 2118 u32 max_pkt_info; 2119 u32 max_pkt_ack; 2120 u32 msg_tol; 2121 struct tipc_msg *msg = buf_msg(buf); 2122 2123 dbg("AT(%u):", jiffies_to_msecs(jiffies)); 2124 msg_dbg(msg, "<<"); 2125 if (link_blocked(l_ptr)) 2126 goto exit; 2127 2128 /* record unnumbered packet arrival (force mismatch on next timeout) */ 2129 2130 l_ptr->checkpoint--; 2131 2132 if (l_ptr->b_ptr->net_plane != msg_net_plane(msg)) 2133 if (tipc_own_addr > msg_prevnode(msg)) 2134 l_ptr->b_ptr->net_plane = msg_net_plane(msg); 2135 2136 l_ptr->owner->permit_changeover = msg_redundant_link(msg); 2137 2138 switch (msg_type(msg)) { 2139 2140 case RESET_MSG: 2141 if (!link_working_unknown(l_ptr) && l_ptr->peer_session) { 2142 if (msg_session(msg) == l_ptr->peer_session) { 2143 dbg("Duplicate RESET: %u<->%u\n", 2144 msg_session(msg), l_ptr->peer_session); 2145 break; /* duplicate: ignore */ 2146 } 2147 } 2148 /* fall thru' */ 2149 case ACTIVATE_MSG: 2150 /* Update link settings according other endpoint's values */ 2151 2152 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg)); 2153 2154 if ((msg_tol = msg_link_tolerance(msg)) && 2155 (msg_tol > l_ptr->tolerance)) 2156 link_set_supervision_props(l_ptr, msg_tol); 2157 2158 if (msg_linkprio(msg) > l_ptr->priority) 2159 l_ptr->priority = msg_linkprio(msg); 2160 2161 max_pkt_info = msg_max_pkt(msg); 2162 if (max_pkt_info) { 2163 if (max_pkt_info < l_ptr->max_pkt_target) 2164 l_ptr->max_pkt_target = max_pkt_info; 2165 if (l_ptr->max_pkt > l_ptr->max_pkt_target) 2166 l_ptr->max_pkt = l_ptr->max_pkt_target; 2167 } else { 2168 l_ptr->max_pkt = l_ptr->max_pkt_target; 2169 } 2170 l_ptr->owner->bclink.supported = (max_pkt_info != 0); 2171 2172 link_state_event(l_ptr, msg_type(msg)); 2173 2174 l_ptr->peer_session = msg_session(msg); 2175 l_ptr->peer_bearer_id = msg_bearer_id(msg); 2176 2177 /* Synchronize broadcast sequence numbers */ 2178 if (!tipc_node_has_redundant_links(l_ptr->owner)) { 2179 l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg)); 2180 } 2181 break; 2182 case STATE_MSG: 2183 2184 if ((msg_tol = msg_link_tolerance(msg))) 2185 link_set_supervision_props(l_ptr, msg_tol); 2186 2187 if (msg_linkprio(msg) && 2188 (msg_linkprio(msg) != l_ptr->priority)) { 2189 warn("Resetting link <%s>, priority change %u->%u\n", 2190 l_ptr->name, l_ptr->priority, msg_linkprio(msg)); 2191 l_ptr->priority = msg_linkprio(msg); 2192 tipc_link_reset(l_ptr); /* Enforce change to take effect */ 2193 break; 2194 } 2195 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 2196 l_ptr->stats.recv_states++; 2197 if (link_reset_unknown(l_ptr)) 2198 break; 2199 2200 if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) { 2201 rec_gap = mod(msg_next_sent(msg) - 2202 mod(l_ptr->next_in_no)); 2203 } 2204 2205 max_pkt_ack = msg_max_pkt(msg); 2206 if (max_pkt_ack > l_ptr->max_pkt) { 2207 dbg("Link <%s> updated MTU %u -> %u\n", 2208 l_ptr->name, l_ptr->max_pkt, max_pkt_ack); 2209 l_ptr->max_pkt = max_pkt_ack; 2210 l_ptr->max_pkt_probes = 0; 2211 } 2212 2213 max_pkt_ack = 0; 2214 if (msg_probe(msg)) { 2215 l_ptr->stats.recv_probes++; 2216 if (msg_size(msg) > sizeof(l_ptr->proto_msg)) { 2217 max_pkt_ack = msg_size(msg); 2218 } 2219 } 2220 2221 /* Protocol message before retransmits, reduce loss risk */ 2222 2223 tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg)); 2224 2225 if (rec_gap || (msg_probe(msg))) { 2226 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2227 0, rec_gap, 0, 0, max_pkt_ack); 2228 } 2229 if (msg_seq_gap(msg)) { 2230 msg_dbg(msg, "With Gap:"); 2231 l_ptr->stats.recv_nacks++; 2232 tipc_link_retransmit(l_ptr, l_ptr->first_out, 2233 msg_seq_gap(msg)); 2234 } 2235 break; 2236 default: 2237 msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<"); 2238 } 2239exit: 2240 buf_discard(buf); 2241} 2242 2243 2244/* 2245 * tipc_link_tunnel(): Send one message via a link belonging to 2246 * another bearer. Owner node is locked. 2247 */ 2248void tipc_link_tunnel(struct link *l_ptr, 2249 struct tipc_msg *tunnel_hdr, 2250 struct tipc_msg *msg, 2251 u32 selector) 2252{ 2253 struct link *tunnel; 2254 struct sk_buff *buf; 2255 u32 length = msg_size(msg); 2256 2257 tunnel = l_ptr->owner->active_links[selector & 1]; 2258 if (!tipc_link_is_up(tunnel)) { 2259 warn("Link changeover error, " 2260 "tunnel link no longer available\n"); 2261 return; 2262 } 2263 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 2264 buf = buf_acquire(length + INT_H_SIZE); 2265 if (!buf) { 2266 warn("Link changeover error, " 2267 "unable to send tunnel msg\n"); 2268 return; 2269 } 2270 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 2271 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 2272 dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane); 2273 msg_dbg(buf_msg(buf), ">SEND>"); 2274 tipc_link_send_buf(tunnel, buf); 2275} 2276 2277 2278 2279/* 2280 * changeover(): Send whole message queue via the remaining link 2281 * Owner node is locked. 2282 */ 2283 2284void tipc_link_changeover(struct link *l_ptr) 2285{ 2286 u32 msgcount = l_ptr->out_queue_size; 2287 struct sk_buff *crs = l_ptr->first_out; 2288 struct link *tunnel = l_ptr->owner->active_links[0]; 2289 struct tipc_msg tunnel_hdr; 2290 int split_bundles; 2291 2292 if (!tunnel) 2293 return; 2294 2295 if (!l_ptr->owner->permit_changeover) { 2296 warn("Link changeover error, " 2297 "peer did not permit changeover\n"); 2298 return; 2299 } 2300 2301 msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 2302 ORIGINAL_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr); 2303 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 2304 msg_set_msgcnt(&tunnel_hdr, msgcount); 2305 dbg("Link changeover requires %u tunnel messages\n", msgcount); 2306 2307 if (!l_ptr->first_out) { 2308 struct sk_buff *buf; 2309 2310 buf = buf_acquire(INT_H_SIZE); 2311 if (buf) { 2312 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE); 2313 msg_set_size(&tunnel_hdr, INT_H_SIZE); 2314 dbg("%c->%c:", l_ptr->b_ptr->net_plane, 2315 tunnel->b_ptr->net_plane); 2316 msg_dbg(&tunnel_hdr, "EMPTY>SEND>"); 2317 tipc_link_send_buf(tunnel, buf); 2318 } else { 2319 warn("Link changeover error, " 2320 "unable to send changeover msg\n"); 2321 } 2322 return; 2323 } 2324 2325 split_bundles = (l_ptr->owner->active_links[0] != 2326 l_ptr->owner->active_links[1]); 2327 2328 while (crs) { 2329 struct tipc_msg *msg = buf_msg(crs); 2330 2331 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 2332 u32 msgcount = msg_msgcnt(msg); 2333 struct tipc_msg *m = msg_get_wrapped(msg); 2334 unchar* pos = (unchar*)m; 2335 2336 while (msgcount--) { 2337 msg_set_seqno(m,msg_seqno(msg)); 2338 tipc_link_tunnel(l_ptr, &tunnel_hdr, m, 2339 msg_link_selector(m)); 2340 pos += align(msg_size(m)); 2341 m = (struct tipc_msg *)pos; 2342 } 2343 } else { 2344 tipc_link_tunnel(l_ptr, &tunnel_hdr, msg, 2345 msg_link_selector(msg)); 2346 } 2347 crs = crs->next; 2348 } 2349} 2350 2351void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel) 2352{ 2353 struct sk_buff *iter; 2354 struct tipc_msg tunnel_hdr; 2355 2356 msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 2357 DUPLICATE_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr); 2358 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 2359 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 2360 iter = l_ptr->first_out; 2361 while (iter) { 2362 struct sk_buff *outbuf; 2363 struct tipc_msg *msg = buf_msg(iter); 2364 u32 length = msg_size(msg); 2365 2366 if (msg_user(msg) == MSG_BUNDLER) 2367 msg_set_type(msg, CLOSED_MSG); 2368 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 2369 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 2370 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 2371 outbuf = buf_acquire(length + INT_H_SIZE); 2372 if (outbuf == NULL) { 2373 warn("Link changeover error, " 2374 "unable to send duplicate msg\n"); 2375 return; 2376 } 2377 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 2378 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 2379 length); 2380 dbg("%c->%c:", l_ptr->b_ptr->net_plane, 2381 tunnel->b_ptr->net_plane); 2382 msg_dbg(buf_msg(outbuf), ">SEND>"); 2383 tipc_link_send_buf(tunnel, outbuf); 2384 if (!tipc_link_is_up(l_ptr)) 2385 return; 2386 iter = iter->next; 2387 } 2388} 2389 2390 2391 2392/** 2393 * buf_extract - extracts embedded TIPC message from another message 2394 * @skb: encapsulating message buffer 2395 * @from_pos: offset to extract from 2396 * 2397 * Returns a new message buffer containing an embedded message. The 2398 * encapsulating message itself is left unchanged. 2399 */ 2400 2401static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) 2402{ 2403 struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos); 2404 u32 size = msg_size(msg); 2405 struct sk_buff *eb; 2406 2407 eb = buf_acquire(size); 2408 if (eb) 2409 skb_copy_to_linear_data(eb, msg, size); 2410 return eb; 2411} 2412 2413/* 2414 * link_recv_changeover_msg(): Receive tunneled packet sent 2415 * via other link. Node is locked. Return extracted buffer. 2416 */ 2417 2418static int link_recv_changeover_msg(struct link **l_ptr, 2419 struct sk_buff **buf) 2420{ 2421 struct sk_buff *tunnel_buf = *buf; 2422 struct link *dest_link; 2423 struct tipc_msg *msg; 2424 struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf); 2425 u32 msg_typ = msg_type(tunnel_msg); 2426 u32 msg_count = msg_msgcnt(tunnel_msg); 2427 2428 dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)]; 2429 if (!dest_link) { 2430 msg_dbg(tunnel_msg, "NOLINK/<REC<"); 2431 goto exit; 2432 } 2433 if (dest_link == *l_ptr) { 2434 err("Unexpected changeover message on link <%s>\n", 2435 (*l_ptr)->name); 2436 goto exit; 2437 } 2438 dbg("%c<-%c:", dest_link->b_ptr->net_plane, 2439 (*l_ptr)->b_ptr->net_plane); 2440 *l_ptr = dest_link; 2441 msg = msg_get_wrapped(tunnel_msg); 2442 2443 if (msg_typ == DUPLICATE_MSG) { 2444 if (less(msg_seqno(msg), mod(dest_link->next_in_no))) { 2445 msg_dbg(tunnel_msg, "DROP/<REC<"); 2446 goto exit; 2447 } 2448 *buf = buf_extract(tunnel_buf,INT_H_SIZE); 2449 if (*buf == NULL) { 2450 warn("Link changeover error, duplicate msg dropped\n"); 2451 goto exit; 2452 } 2453 msg_dbg(tunnel_msg, "TNL<REC<"); 2454 buf_discard(tunnel_buf); 2455 return 1; 2456 } 2457 2458 /* First original message ?: */ 2459 2460 if (tipc_link_is_up(dest_link)) { 2461 msg_dbg(tunnel_msg, "UP/FIRST/<REC<"); 2462 info("Resetting link <%s>, changeover initiated by peer\n", 2463 dest_link->name); 2464 tipc_link_reset(dest_link); 2465 dest_link->exp_msg_count = msg_count; 2466 dbg("Expecting %u tunnelled messages\n", msg_count); 2467 if (!msg_count) 2468 goto exit; 2469 } else if (dest_link->exp_msg_count == START_CHANGEOVER) { 2470 msg_dbg(tunnel_msg, "BLK/FIRST/<REC<"); 2471 dest_link->exp_msg_count = msg_count; 2472 dbg("Expecting %u tunnelled messages\n", msg_count); 2473 if (!msg_count) 2474 goto exit; 2475 } 2476 2477 /* Receive original message */ 2478 2479 if (dest_link->exp_msg_count == 0) { 2480 warn("Link switchover error, " 2481 "got too many tunnelled messages\n"); 2482 msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<"); 2483 dbg_print_link(dest_link, "LINK:"); 2484 goto exit; 2485 } 2486 dest_link->exp_msg_count--; 2487 if (less(msg_seqno(msg), dest_link->reset_checkpoint)) { 2488 msg_dbg(tunnel_msg, "DROP/DUPL/<REC<"); 2489 goto exit; 2490 } else { 2491 *buf = buf_extract(tunnel_buf, INT_H_SIZE); 2492 if (*buf != NULL) { 2493 msg_dbg(tunnel_msg, "TNL<REC<"); 2494 buf_discard(tunnel_buf); 2495 return 1; 2496 } else { 2497 warn("Link changeover error, original msg dropped\n"); 2498 } 2499 } 2500exit: 2501 *buf = NULL; 2502 buf_discard(tunnel_buf); 2503 return 0; 2504} 2505 2506/* 2507 * Bundler functionality: 2508 */ 2509void tipc_link_recv_bundle(struct sk_buff *buf) 2510{ 2511 u32 msgcount = msg_msgcnt(buf_msg(buf)); 2512 u32 pos = INT_H_SIZE; 2513 struct sk_buff *obuf; 2514 2515 msg_dbg(buf_msg(buf), "<BNDL<: "); 2516 while (msgcount--) { 2517 obuf = buf_extract(buf, pos); 2518 if (obuf == NULL) { 2519 warn("Link unable to unbundle message(s)\n"); 2520 break; 2521 } 2522 pos += align(msg_size(buf_msg(obuf))); 2523 msg_dbg(buf_msg(obuf), " /"); 2524 tipc_net_route_msg(obuf); 2525 } 2526 buf_discard(buf); 2527} 2528 2529/* 2530 * Fragmentation/defragmentation: 2531 */ 2532 2533 2534/* 2535 * tipc_link_send_long_buf: Entry for buffers needing fragmentation. 2536 * The buffer is complete, inclusive total message length. 2537 * Returns user data length. 2538 */ 2539int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf) 2540{ 2541 struct tipc_msg *inmsg = buf_msg(buf); 2542 struct tipc_msg fragm_hdr; 2543 u32 insize = msg_size(inmsg); 2544 u32 dsz = msg_data_sz(inmsg); 2545 unchar *crs = buf->data; 2546 u32 rest = insize; 2547 u32 pack_sz = link_max_pkt(l_ptr); 2548 u32 fragm_sz = pack_sz - INT_H_SIZE; 2549 u32 fragm_no = 1; 2550 u32 destaddr = msg_destnode(inmsg); 2551 2552 if (msg_short(inmsg)) 2553 destaddr = l_ptr->addr; 2554 2555 if (msg_routed(inmsg)) 2556 msg_set_prevnode(inmsg, tipc_own_addr); 2557 2558 /* Prepare reusable fragment header: */ 2559 2560 msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT, 2561 TIPC_OK, INT_H_SIZE, destaddr); 2562 msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg)); 2563 msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++)); 2564 msg_set_fragm_no(&fragm_hdr, fragm_no); 2565 l_ptr->stats.sent_fragmented++; 2566 2567 /* Chop up message: */ 2568 2569 while (rest > 0) { 2570 struct sk_buff *fragm; 2571 2572 if (rest <= fragm_sz) { 2573 fragm_sz = rest; 2574 msg_set_type(&fragm_hdr, LAST_FRAGMENT); 2575 } 2576 fragm = buf_acquire(fragm_sz + INT_H_SIZE); 2577 if (fragm == NULL) { 2578 warn("Link unable to fragment message\n"); 2579 dsz = -ENOMEM; 2580 goto exit; 2581 } 2582 msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE); 2583 skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE); 2584 skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs, 2585 fragm_sz); 2586 /* Send queued messages first, if any: */ 2587 2588 l_ptr->stats.sent_fragments++; 2589 tipc_link_send_buf(l_ptr, fragm); 2590 if (!tipc_link_is_up(l_ptr)) 2591 return dsz; 2592 msg_set_fragm_no(&fragm_hdr, ++fragm_no); 2593 rest -= fragm_sz; 2594 crs += fragm_sz; 2595 msg_set_type(&fragm_hdr, FRAGMENT); 2596 } 2597exit: 2598 buf_discard(buf); 2599 return dsz; 2600} 2601 2602/* 2603 * A pending message being re-assembled must store certain values 2604 * to handle subsequent fragments correctly. The following functions 2605 * help storing these values in unused, available fields in the 2606 * pending message. This makes dynamic memory allocation unecessary. 2607 */ 2608 2609static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno) 2610{ 2611 msg_set_seqno(buf_msg(buf), seqno); 2612} 2613 2614static u32 get_fragm_size(struct sk_buff *buf) 2615{ 2616 return msg_ack(buf_msg(buf)); 2617} 2618 2619static void set_fragm_size(struct sk_buff *buf, u32 sz) 2620{ 2621 msg_set_ack(buf_msg(buf), sz); 2622} 2623 2624static u32 get_expected_frags(struct sk_buff *buf) 2625{ 2626 return msg_bcast_ack(buf_msg(buf)); 2627} 2628 2629static void set_expected_frags(struct sk_buff *buf, u32 exp) 2630{ 2631 msg_set_bcast_ack(buf_msg(buf), exp); 2632} 2633 2634static u32 get_timer_cnt(struct sk_buff *buf) 2635{ 2636 return msg_reroute_cnt(buf_msg(buf)); 2637} 2638 2639static void incr_timer_cnt(struct sk_buff *buf) 2640{ 2641 msg_incr_reroute_cnt(buf_msg(buf)); 2642} 2643 2644/* 2645 * tipc_link_recv_fragment(): Called with node lock on. Returns 2646 * the reassembled buffer if message is complete. 2647 */ 2648int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, 2649 struct tipc_msg **m) 2650{ 2651 struct sk_buff *prev = NULL; 2652 struct sk_buff *fbuf = *fb; 2653 struct tipc_msg *fragm = buf_msg(fbuf); 2654 struct sk_buff *pbuf = *pending; 2655 u32 long_msg_seq_no = msg_long_msgno(fragm); 2656 2657 *fb = NULL; 2658 msg_dbg(fragm,"FRG<REC<"); 2659 2660 /* Is there an incomplete message waiting for this fragment? */ 2661 2662 while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no) 2663 || (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) { 2664 prev = pbuf; 2665 pbuf = pbuf->next; 2666 } 2667 2668 if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) { 2669 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm); 2670 u32 msg_sz = msg_size(imsg); 2671 u32 fragm_sz = msg_data_sz(fragm); 2672 u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz); 2673 u32 max = TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE; 2674 if (msg_type(imsg) == TIPC_MCAST_MSG) 2675 max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE; 2676 if (msg_size(imsg) > max) { 2677 msg_dbg(fragm,"<REC<Oversized: "); 2678 buf_discard(fbuf); 2679 return 0; 2680 } 2681 pbuf = buf_acquire(msg_size(imsg)); 2682 if (pbuf != NULL) { 2683 pbuf->next = *pending; 2684 *pending = pbuf; 2685 skb_copy_to_linear_data(pbuf, imsg, 2686 msg_data_sz(fragm)); 2687 /* Prepare buffer for subsequent fragments. */ 2688 2689 set_long_msg_seqno(pbuf, long_msg_seq_no); 2690 set_fragm_size(pbuf,fragm_sz); 2691 set_expected_frags(pbuf,exp_fragm_cnt - 1); 2692 } else { 2693 warn("Link unable to reassemble fragmented message\n"); 2694 } 2695 buf_discard(fbuf); 2696 return 0; 2697 } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) { 2698 u32 dsz = msg_data_sz(fragm); 2699 u32 fsz = get_fragm_size(pbuf); 2700 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz); 2701 u32 exp_frags = get_expected_frags(pbuf) - 1; 2702 skb_copy_to_linear_data_offset(pbuf, crs, 2703 msg_data(fragm), dsz); 2704 buf_discard(fbuf); 2705 2706 /* Is message complete? */ 2707 2708 if (exp_frags == 0) { 2709 if (prev) 2710 prev->next = pbuf->next; 2711 else 2712 *pending = pbuf->next; 2713 msg_reset_reroute_cnt(buf_msg(pbuf)); 2714 *fb = pbuf; 2715 *m = buf_msg(pbuf); 2716 return 1; 2717 } 2718 set_expected_frags(pbuf,exp_frags); 2719 return 0; 2720 } 2721 dbg(" Discarding orphan fragment %x\n",fbuf); 2722 msg_dbg(fragm,"ORPHAN:"); 2723 dbg("Pending long buffers:\n"); 2724 dbg_print_buf_chain(*pending); 2725 buf_discard(fbuf); 2726 return 0; 2727} 2728 2729/** 2730 * link_check_defragm_bufs - flush stale incoming message fragments 2731 * @l_ptr: pointer to link 2732 */ 2733 2734static void link_check_defragm_bufs(struct link *l_ptr) 2735{ 2736 struct sk_buff *prev = NULL; 2737 struct sk_buff *next = NULL; 2738 struct sk_buff *buf = l_ptr->defragm_buf; 2739 2740 if (!buf) 2741 return; 2742 if (!link_working_working(l_ptr)) 2743 return; 2744 while (buf) { 2745 u32 cnt = get_timer_cnt(buf); 2746 2747 next = buf->next; 2748 if (cnt < 4) { 2749 incr_timer_cnt(buf); 2750 prev = buf; 2751 } else { 2752 dbg(" Discarding incomplete long buffer\n"); 2753 msg_dbg(buf_msg(buf), "LONG:"); 2754 dbg_print_link(l_ptr, "curr:"); 2755 dbg("Pending long buffers:\n"); 2756 dbg_print_buf_chain(l_ptr->defragm_buf); 2757 if (prev) 2758 prev->next = buf->next; 2759 else 2760 l_ptr->defragm_buf = buf->next; 2761 buf_discard(buf); 2762 } 2763 buf = next; 2764 } 2765} 2766 2767 2768 2769static void link_set_supervision_props(struct link *l_ptr, u32 tolerance) 2770{ 2771 l_ptr->tolerance = tolerance; 2772 l_ptr->continuity_interval = 2773 ((tolerance / 4) > 500) ? 500 : tolerance / 4; 2774 l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4); 2775} 2776 2777 2778void tipc_link_set_queue_limits(struct link *l_ptr, u32 window) 2779{ 2780 /* Data messages from this node, inclusive FIRST_FRAGM */ 2781 l_ptr->queue_limit[DATA_LOW] = window; 2782 l_ptr->queue_limit[DATA_MEDIUM] = (window / 3) * 4; 2783 l_ptr->queue_limit[DATA_HIGH] = (window / 3) * 5; 2784 l_ptr->queue_limit[DATA_CRITICAL] = (window / 3) * 6; 2785 /* Transiting data messages,inclusive FIRST_FRAGM */ 2786 l_ptr->queue_limit[DATA_LOW + 4] = 300; 2787 l_ptr->queue_limit[DATA_MEDIUM + 4] = 600; 2788 l_ptr->queue_limit[DATA_HIGH + 4] = 900; 2789 l_ptr->queue_limit[DATA_CRITICAL + 4] = 1200; 2790 l_ptr->queue_limit[CONN_MANAGER] = 1200; 2791 l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200; 2792 l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500; 2793 l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000; 2794 /* FRAGMENT and LAST_FRAGMENT packets */ 2795 l_ptr->queue_limit[MSG_FRAGMENTER] = 4000; 2796} 2797 2798/** 2799 * link_find_link - locate link by name 2800 * @name - ptr to link name string 2801 * @node - ptr to area to be filled with ptr to associated node 2802 * 2803 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted; 2804 * this also prevents link deletion. 2805 * 2806 * Returns pointer to link (or 0 if invalid link name). 2807 */ 2808 2809static struct link *link_find_link(const char *name, struct node **node) 2810{ 2811 struct link_name link_name_parts; 2812 struct bearer *b_ptr; 2813 struct link *l_ptr; 2814 2815 if (!link_name_validate(name, &link_name_parts)) 2816 return NULL; 2817 2818 b_ptr = tipc_bearer_find_interface(link_name_parts.if_local); 2819 if (!b_ptr) 2820 return NULL; 2821 2822 *node = tipc_node_find(link_name_parts.addr_peer); 2823 if (!*node) 2824 return NULL; 2825 2826 l_ptr = (*node)->links[b_ptr->identity]; 2827 if (!l_ptr || strcmp(l_ptr->name, name)) 2828 return NULL; 2829 2830 return l_ptr; 2831} 2832 2833struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space, 2834 u16 cmd) 2835{ 2836 struct tipc_link_config *args; 2837 u32 new_value; 2838 struct link *l_ptr; 2839 struct node *node; 2840 int res; 2841 2842 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG)) 2843 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2844 2845 args = (struct tipc_link_config *)TLV_DATA(req_tlv_area); 2846 new_value = ntohl(args->value); 2847 2848 if (!strcmp(args->name, tipc_bclink_name)) { 2849 if ((cmd == TIPC_CMD_SET_LINK_WINDOW) && 2850 (tipc_bclink_set_queue_limits(new_value) == 0)) 2851 return tipc_cfg_reply_none(); 2852 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED 2853 " (cannot change setting on broadcast link)"); 2854 } 2855 2856 read_lock_bh(&tipc_net_lock); 2857 l_ptr = link_find_link(args->name, &node); 2858 if (!l_ptr) { 2859 read_unlock_bh(&tipc_net_lock); 2860 return tipc_cfg_reply_error_string("link not found"); 2861 } 2862 2863 tipc_node_lock(node); 2864 res = -EINVAL; 2865 switch (cmd) { 2866 case TIPC_CMD_SET_LINK_TOL: 2867 if ((new_value >= TIPC_MIN_LINK_TOL) && 2868 (new_value <= TIPC_MAX_LINK_TOL)) { 2869 link_set_supervision_props(l_ptr, new_value); 2870 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2871 0, 0, new_value, 0, 0); 2872 res = TIPC_OK; 2873 } 2874 break; 2875 case TIPC_CMD_SET_LINK_PRI: 2876 if ((new_value >= TIPC_MIN_LINK_PRI) && 2877 (new_value <= TIPC_MAX_LINK_PRI)) { 2878 l_ptr->priority = new_value; 2879 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 2880 0, 0, 0, new_value, 0); 2881 res = TIPC_OK; 2882 } 2883 break; 2884 case TIPC_CMD_SET_LINK_WINDOW: 2885 if ((new_value >= TIPC_MIN_LINK_WIN) && 2886 (new_value <= TIPC_MAX_LINK_WIN)) { 2887 tipc_link_set_queue_limits(l_ptr, new_value); 2888 res = TIPC_OK; 2889 } 2890 break; 2891 } 2892 tipc_node_unlock(node); 2893 2894 read_unlock_bh(&tipc_net_lock); 2895 if (res) 2896 return tipc_cfg_reply_error_string("cannot change link setting"); 2897 2898 return tipc_cfg_reply_none(); 2899} 2900 2901/** 2902 * link_reset_statistics - reset link statistics 2903 * @l_ptr: pointer to link 2904 */ 2905 2906static void link_reset_statistics(struct link *l_ptr) 2907{ 2908 memset(&l_ptr->stats, 0, sizeof(l_ptr->stats)); 2909 l_ptr->stats.sent_info = l_ptr->next_out_no; 2910 l_ptr->stats.recv_info = l_ptr->next_in_no; 2911} 2912 2913struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space) 2914{ 2915 char *link_name; 2916 struct link *l_ptr; 2917 struct node *node; 2918 2919 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 2920 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 2921 2922 link_name = (char *)TLV_DATA(req_tlv_area); 2923 if (!strcmp(link_name, tipc_bclink_name)) { 2924 if (tipc_bclink_reset_stats()) 2925 return tipc_cfg_reply_error_string("link not found"); 2926 return tipc_cfg_reply_none(); 2927 } 2928 2929 read_lock_bh(&tipc_net_lock); 2930 l_ptr = link_find_link(link_name, &node); 2931 if (!l_ptr) { 2932 read_unlock_bh(&tipc_net_lock); 2933 return tipc_cfg_reply_error_string("link not found"); 2934 } 2935 2936 tipc_node_lock(node); 2937 link_reset_statistics(l_ptr); 2938 tipc_node_unlock(node); 2939 read_unlock_bh(&tipc_net_lock); 2940 return tipc_cfg_reply_none(); 2941} 2942 2943/** 2944 * percent - convert count to a percentage of total (rounding up or down) 2945 */ 2946 2947static u32 percent(u32 count, u32 total) 2948{ 2949 return (count * 100 + (total / 2)) / total; 2950} 2951 2952/** 2953 * tipc_link_stats - print link statistics 2954 * @name: link name 2955 * @buf: print buffer area 2956 * @buf_size: size of print buffer area 2957 * 2958 * Returns length of print buffer data string (or 0 if error) 2959 */ 2960 2961static int tipc_link_stats(const char *name, char *buf, const u32 buf_size) 2962{ 2963 struct print_buf pb; 2964 struct link *l_ptr; 2965 struct node *node; 2966 char *status; 2967 u32 profile_total = 0; 2968 2969 if (!strcmp(name, tipc_bclink_name)) 2970 return tipc_bclink_stats(buf, buf_size); 2971 2972 tipc_printbuf_init(&pb, buf, buf_size); 2973 2974 read_lock_bh(&tipc_net_lock); 2975 l_ptr = link_find_link(name, &node); 2976 if (!l_ptr) { 2977 read_unlock_bh(&tipc_net_lock); 2978 return 0; 2979 } 2980 tipc_node_lock(node); 2981 2982 if (tipc_link_is_active(l_ptr)) 2983 status = "ACTIVE"; 2984 else if (tipc_link_is_up(l_ptr)) 2985 status = "STANDBY"; 2986 else 2987 status = "DEFUNCT"; 2988 tipc_printf(&pb, "Link <%s>\n" 2989 " %s MTU:%u Priority:%u Tolerance:%u ms" 2990 " Window:%u packets\n", 2991 l_ptr->name, status, link_max_pkt(l_ptr), 2992 l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]); 2993 tipc_printf(&pb, " RX packets:%u fragments:%u/%u bundles:%u/%u\n", 2994 l_ptr->next_in_no - l_ptr->stats.recv_info, 2995 l_ptr->stats.recv_fragments, 2996 l_ptr->stats.recv_fragmented, 2997 l_ptr->stats.recv_bundles, 2998 l_ptr->stats.recv_bundled); 2999 tipc_printf(&pb, " TX packets:%u fragments:%u/%u bundles:%u/%u\n", 3000 l_ptr->next_out_no - l_ptr->stats.sent_info, 3001 l_ptr->stats.sent_fragments, 3002 l_ptr->stats.sent_fragmented, 3003 l_ptr->stats.sent_bundles, 3004 l_ptr->stats.sent_bundled); 3005 profile_total = l_ptr->stats.msg_length_counts; 3006 if (!profile_total) 3007 profile_total = 1; 3008 tipc_printf(&pb, " TX profile sample:%u packets average:%u octets\n" 3009 " 0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% " 3010 "-16354:%u%% -32768:%u%% -66000:%u%%\n", 3011 l_ptr->stats.msg_length_counts, 3012 l_ptr->stats.msg_lengths_total / profile_total, 3013 percent(l_ptr->stats.msg_length_profile[0], profile_total), 3014 percent(l_ptr->stats.msg_length_profile[1], profile_total), 3015 percent(l_ptr->stats.msg_length_profile[2], profile_total), 3016 percent(l_ptr->stats.msg_length_profile[3], profile_total), 3017 percent(l_ptr->stats.msg_length_profile[4], profile_total), 3018 percent(l_ptr->stats.msg_length_profile[5], profile_total), 3019 percent(l_ptr->stats.msg_length_profile[6], profile_total)); 3020 tipc_printf(&pb, " RX states:%u probes:%u naks:%u defs:%u dups:%u\n", 3021 l_ptr->stats.recv_states, 3022 l_ptr->stats.recv_probes, 3023 l_ptr->stats.recv_nacks, 3024 l_ptr->stats.deferred_recv, 3025 l_ptr->stats.duplicates); 3026 tipc_printf(&pb, " TX states:%u probes:%u naks:%u acks:%u dups:%u\n", 3027 l_ptr->stats.sent_states, 3028 l_ptr->stats.sent_probes, 3029 l_ptr->stats.sent_nacks, 3030 l_ptr->stats.sent_acks, 3031 l_ptr->stats.retransmitted); 3032 tipc_printf(&pb, " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 3033 l_ptr->stats.bearer_congs, 3034 l_ptr->stats.link_congs, 3035 l_ptr->stats.max_queue_sz, 3036 l_ptr->stats.queue_sz_counts 3037 ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts) 3038 : 0); 3039 3040 tipc_node_unlock(node); 3041 read_unlock_bh(&tipc_net_lock); 3042 return tipc_printbuf_validate(&pb); 3043} 3044 3045#define MAX_LINK_STATS_INFO 2000 3046 3047struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space) 3048{ 3049 struct sk_buff *buf; 3050 struct tlv_desc *rep_tlv; 3051 int str_len; 3052 3053 if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME)) 3054 return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR); 3055 3056 buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO)); 3057 if (!buf) 3058 return NULL; 3059 3060 rep_tlv = (struct tlv_desc *)buf->data; 3061 3062 str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area), 3063 (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO); 3064 if (!str_len) { 3065 buf_discard(buf); 3066 return tipc_cfg_reply_error_string("link not found"); 3067 } 3068 3069 skb_put(buf, TLV_SPACE(str_len)); 3070 TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len); 3071 3072 return buf; 3073} 3074 3075 3076/** 3077 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination 3078 * @dest: network address of destination node 3079 * @selector: used to select from set of active links 3080 * 3081 * If no active link can be found, uses default maximum packet size. 3082 */ 3083 3084u32 tipc_link_get_max_pkt(u32 dest, u32 selector) 3085{ 3086 struct node *n_ptr; 3087 struct link *l_ptr; 3088 u32 res = MAX_PKT_DEFAULT; 3089 3090 if (dest == tipc_own_addr) 3091 return MAX_MSG_SIZE; 3092 3093 read_lock_bh(&tipc_net_lock); 3094 n_ptr = tipc_node_select(dest, selector); 3095 if (n_ptr) { 3096 tipc_node_lock(n_ptr); 3097 l_ptr = n_ptr->active_links[selector & 1]; 3098 if (l_ptr) 3099 res = link_max_pkt(l_ptr); 3100 tipc_node_unlock(n_ptr); 3101 } 3102 read_unlock_bh(&tipc_net_lock); 3103 return res; 3104} 3105 3106 3107static void link_dump_send_queue(struct link *l_ptr) 3108{ 3109 if (l_ptr->next_out) { 3110 info("\nContents of unsent queue:\n"); 3111 dbg_print_buf_chain(l_ptr->next_out); 3112 } 3113 info("\nContents of send queue:\n"); 3114 if (l_ptr->first_out) { 3115 dbg_print_buf_chain(l_ptr->first_out); 3116 } 3117 info("Empty send queue\n"); 3118} 3119 3120static void link_print(struct link *l_ptr, struct print_buf *buf, 3121 const char *str) 3122{ 3123 tipc_printf(buf, str); 3124 if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr)) 3125 return; 3126 tipc_printf(buf, "Link %x<%s>:", 3127 l_ptr->addr, l_ptr->b_ptr->publ.name); 3128 tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no)); 3129 tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no)); 3130 tipc_printf(buf, "SQUE"); 3131 if (l_ptr->first_out) { 3132 tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out))); 3133 if (l_ptr->next_out) 3134 tipc_printf(buf, "%u..", 3135 msg_seqno(buf_msg(l_ptr->next_out))); 3136 tipc_printf(buf, "%u]", 3137 msg_seqno(buf_msg 3138 (l_ptr->last_out)), l_ptr->out_queue_size); 3139 if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) - 3140 msg_seqno(buf_msg(l_ptr->first_out))) 3141 != (l_ptr->out_queue_size - 1)) 3142 || (l_ptr->last_out->next != 0)) { 3143 tipc_printf(buf, "\nSend queue inconsistency\n"); 3144 tipc_printf(buf, "first_out= %x ", l_ptr->first_out); 3145 tipc_printf(buf, "next_out= %x ", l_ptr->next_out); 3146 tipc_printf(buf, "last_out= %x ", l_ptr->last_out); 3147 link_dump_send_queue(l_ptr); 3148 } 3149 } else 3150 tipc_printf(buf, "[]"); 3151 tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size); 3152 if (l_ptr->oldest_deferred_in) { 3153 u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in)); 3154 u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in)); 3155 tipc_printf(buf, ":RQUE[%u..%u]", o, n); 3156 if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) { 3157 tipc_printf(buf, ":RQSIZ(%u)", 3158 l_ptr->deferred_inqueue_sz); 3159 } 3160 } 3161 if (link_working_unknown(l_ptr)) 3162 tipc_printf(buf, ":WU"); 3163 if (link_reset_reset(l_ptr)) 3164 tipc_printf(buf, ":RR"); 3165 if (link_reset_unknown(l_ptr)) 3166 tipc_printf(buf, ":RU"); 3167 if (link_working_working(l_ptr)) 3168 tipc_printf(buf, ":WW"); 3169 tipc_printf(buf, "\n"); 3170} 3171