1/*
2 * net/tipc/link.c: TIPC link code
3 *
4 * Copyright (c) 1996-2006, Ericsson AB
5 * Copyright (c) 2004-2006, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 *    notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 *    notice, this list of conditions and the following disclaimer in the
15 *    documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 *    contributors may be used to endorse or promote products derived from
18 *    this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include "core.h"
38#include "dbg.h"
39#include "link.h"
40#include "net.h"
41#include "node.h"
42#include "port.h"
43#include "addr.h"
44#include "node_subscr.h"
45#include "name_distr.h"
46#include "bearer.h"
47#include "name_table.h"
48#include "discover.h"
49#include "config.h"
50#include "bcast.h"
51
52
53/*
54 * Limit for deferred reception queue:
55 */
56
57#define DEF_QUEUE_LIMIT 256u
58
59/*
60 * Link state events:
61 */
62
63#define  STARTING_EVT    856384768	/* link processing trigger */
64#define  TRAFFIC_MSG_EVT 560815u	/* rx'd ??? */
65#define  TIMEOUT_EVT     560817u	/* link timer expired */
66
67/*
68 * The following two 'message types' is really just implementation
69 * data conveniently stored in the message header.
70 * They must not be considered part of the protocol
71 */
72#define OPEN_MSG   0
73#define CLOSED_MSG 1
74
75/*
76 * State value stored in 'exp_msg_count'
77 */
78
79#define START_CHANGEOVER 100000u
80
81/**
82 * struct link_name - deconstructed link name
83 * @addr_local: network address of node at this end
84 * @if_local: name of interface at this end
85 * @addr_peer: network address of node at far end
86 * @if_peer: name of interface at far end
87 */
88
89struct link_name {
90	u32 addr_local;
91	char if_local[TIPC_MAX_IF_NAME];
92	u32 addr_peer;
93	char if_peer[TIPC_MAX_IF_NAME];
94};
95
96
97static void link_handle_out_of_seq_msg(struct link *l_ptr,
98				       struct sk_buff *buf);
99static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf);
100static int  link_recv_changeover_msg(struct link **l_ptr, struct sk_buff **buf);
101static void link_set_supervision_props(struct link *l_ptr, u32 tolerance);
102static int  link_send_sections_long(struct port *sender,
103				    struct iovec const *msg_sect,
104				    u32 num_sect, u32 destnode);
105static void link_check_defragm_bufs(struct link *l_ptr);
106static void link_state_event(struct link *l_ptr, u32 event);
107static void link_reset_statistics(struct link *l_ptr);
108static void link_print(struct link *l_ptr, struct print_buf *buf,
109		       const char *str);
110
111/*
112 * Debugging code used by link routines only
113 *
114 * When debugging link problems on a system that has multiple links,
115 * the standard TIPC debugging routines may not be useful since they
116 * allow the output from multiple links to be intermixed.  For this reason
117 * routines of the form "dbg_link_XXX()" have been created that will capture
118 * debug info into a link's personal print buffer, which can then be dumped
119 * into the TIPC system log (TIPC_LOG) upon request.
120 *
121 * To enable per-link debugging, use LINK_LOG_BUF_SIZE to specify the size
122 * of the print buffer used by each link.  If LINK_LOG_BUF_SIZE is set to 0,
123 * the dbg_link_XXX() routines simply send their output to the standard
124 * debug print buffer (DBG_OUTPUT), if it has been defined; this can be useful
125 * when there is only a single link in the system being debugged.
126 *
127 * Notes:
128 * - When enabled, LINK_LOG_BUF_SIZE should be set to at least TIPC_PB_MIN_SIZE
129 * - "l_ptr" must be valid when using dbg_link_XXX() macros
130 */
131
132#define LINK_LOG_BUF_SIZE 0
133
134#define dbg_link(fmt, arg...)  do {if (LINK_LOG_BUF_SIZE) tipc_printf(&l_ptr->print_buf, fmt, ## arg); } while(0)
135#define dbg_link_msg(msg, txt) do {if (LINK_LOG_BUF_SIZE) tipc_msg_print(&l_ptr->print_buf, msg, txt); } while(0)
136#define dbg_link_state(txt) do {if (LINK_LOG_BUF_SIZE) link_print(l_ptr, &l_ptr->print_buf, txt); } while(0)
137#define dbg_link_dump() do { \
138	if (LINK_LOG_BUF_SIZE) { \
139		tipc_printf(LOG, "\n\nDumping link <%s>:\n", l_ptr->name); \
140		tipc_printbuf_move(LOG, &l_ptr->print_buf); \
141	} \
142} while (0)
143
144static void dbg_print_link(struct link *l_ptr, const char *str)
145{
146	if (DBG_OUTPUT != TIPC_NULL)
147		link_print(l_ptr, DBG_OUTPUT, str);
148}
149
150static void dbg_print_buf_chain(struct sk_buff *root_buf)
151{
152	if (DBG_OUTPUT != TIPC_NULL) {
153		struct sk_buff *buf = root_buf;
154
155		while (buf) {
156			msg_dbg(buf_msg(buf), "In chain: ");
157			buf = buf->next;
158		}
159	}
160}
161
162/*
163 *  Simple link routines
164 */
165
166static unsigned int align(unsigned int i)
167{
168	return (i + 3) & ~3u;
169}
170
171static int link_working_working(struct link *l_ptr)
172{
173	return (l_ptr->state == WORKING_WORKING);
174}
175
176static int link_working_unknown(struct link *l_ptr)
177{
178	return (l_ptr->state == WORKING_UNKNOWN);
179}
180
181static int link_reset_unknown(struct link *l_ptr)
182{
183	return (l_ptr->state == RESET_UNKNOWN);
184}
185
186static int link_reset_reset(struct link *l_ptr)
187{
188	return (l_ptr->state == RESET_RESET);
189}
190
191static int link_blocked(struct link *l_ptr)
192{
193	return (l_ptr->exp_msg_count || l_ptr->blocked);
194}
195
196static int link_congested(struct link *l_ptr)
197{
198	return (l_ptr->out_queue_size >= l_ptr->queue_limit[0]);
199}
200
201static u32 link_max_pkt(struct link *l_ptr)
202{
203	return l_ptr->max_pkt;
204}
205
206static void link_init_max_pkt(struct link *l_ptr)
207{
208	u32 max_pkt;
209
210	max_pkt = (l_ptr->b_ptr->publ.mtu & ~3);
211	if (max_pkt > MAX_MSG_SIZE)
212		max_pkt = MAX_MSG_SIZE;
213
214	l_ptr->max_pkt_target = max_pkt;
215	if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
216		l_ptr->max_pkt = l_ptr->max_pkt_target;
217	else
218		l_ptr->max_pkt = MAX_PKT_DEFAULT;
219
220	l_ptr->max_pkt_probes = 0;
221}
222
223static u32 link_next_sent(struct link *l_ptr)
224{
225	if (l_ptr->next_out)
226		return msg_seqno(buf_msg(l_ptr->next_out));
227	return mod(l_ptr->next_out_no);
228}
229
230static u32 link_last_sent(struct link *l_ptr)
231{
232	return mod(link_next_sent(l_ptr) - 1);
233}
234
235/*
236 *  Simple non-static link routines (i.e. referenced outside this file)
237 */
238
239int tipc_link_is_up(struct link *l_ptr)
240{
241	if (!l_ptr)
242		return 0;
243	return (link_working_working(l_ptr) || link_working_unknown(l_ptr));
244}
245
246int tipc_link_is_active(struct link *l_ptr)
247{
248	return ((l_ptr->owner->active_links[0] == l_ptr) ||
249		(l_ptr->owner->active_links[1] == l_ptr));
250}
251
252/**
253 * link_name_validate - validate & (optionally) deconstruct link name
254 * @name - ptr to link name string
255 * @name_parts - ptr to area for link name components (or NULL if not needed)
256 *
257 * Returns 1 if link name is valid, otherwise 0.
258 */
259
260static int link_name_validate(const char *name, struct link_name *name_parts)
261{
262	char name_copy[TIPC_MAX_LINK_NAME];
263	char *addr_local;
264	char *if_local;
265	char *addr_peer;
266	char *if_peer;
267	char dummy;
268	u32 z_local, c_local, n_local;
269	u32 z_peer, c_peer, n_peer;
270	u32 if_local_len;
271	u32 if_peer_len;
272
273	/* copy link name & ensure length is OK */
274
275	name_copy[TIPC_MAX_LINK_NAME - 1] = 0;
276	/* need above in case non-Posix strncpy() doesn't pad with nulls */
277	strncpy(name_copy, name, TIPC_MAX_LINK_NAME);
278	if (name_copy[TIPC_MAX_LINK_NAME - 1] != 0)
279		return 0;
280
281	/* ensure all component parts of link name are present */
282
283	addr_local = name_copy;
284	if ((if_local = strchr(addr_local, ':')) == NULL)
285		return 0;
286	*(if_local++) = 0;
287	if ((addr_peer = strchr(if_local, '-')) == NULL)
288		return 0;
289	*(addr_peer++) = 0;
290	if_local_len = addr_peer - if_local;
291	if ((if_peer = strchr(addr_peer, ':')) == NULL)
292		return 0;
293	*(if_peer++) = 0;
294	if_peer_len = strlen(if_peer) + 1;
295
296	/* validate component parts of link name */
297
298	if ((sscanf(addr_local, "%u.%u.%u%c",
299		    &z_local, &c_local, &n_local, &dummy) != 3) ||
300	    (sscanf(addr_peer, "%u.%u.%u%c",
301		    &z_peer, &c_peer, &n_peer, &dummy) != 3) ||
302	    (z_local > 255) || (c_local > 4095) || (n_local > 4095) ||
303	    (z_peer  > 255) || (c_peer  > 4095) || (n_peer  > 4095) ||
304	    (if_local_len <= 1) || (if_local_len > TIPC_MAX_IF_NAME) ||
305	    (if_peer_len  <= 1) || (if_peer_len  > TIPC_MAX_IF_NAME) ||
306	    (strspn(if_local, tipc_alphabet) != (if_local_len - 1)) ||
307	    (strspn(if_peer, tipc_alphabet) != (if_peer_len - 1)))
308		return 0;
309
310	/* return link name components, if necessary */
311
312	if (name_parts) {
313		name_parts->addr_local = tipc_addr(z_local, c_local, n_local);
314		strcpy(name_parts->if_local, if_local);
315		name_parts->addr_peer = tipc_addr(z_peer, c_peer, n_peer);
316		strcpy(name_parts->if_peer, if_peer);
317	}
318	return 1;
319}
320
321/**
322 * link_timeout - handle expiration of link timer
323 * @l_ptr: pointer to link
324 *
325 * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
326 * with tipc_link_delete().  (There is no risk that the node will be deleted by
327 * another thread because tipc_link_delete() always cancels the link timer before
328 * tipc_node_delete() is called.)
329 */
330
331static void link_timeout(struct link *l_ptr)
332{
333	tipc_node_lock(l_ptr->owner);
334
335	/* update counters used in statistical profiling of send traffic */
336
337	l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size;
338	l_ptr->stats.queue_sz_counts++;
339
340	if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
341		l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
342
343	if (l_ptr->first_out) {
344		struct tipc_msg *msg = buf_msg(l_ptr->first_out);
345		u32 length = msg_size(msg);
346
347		if ((msg_user(msg) == MSG_FRAGMENTER)
348		    && (msg_type(msg) == FIRST_FRAGMENT)) {
349			length = msg_size(msg_get_wrapped(msg));
350		}
351		if (length) {
352			l_ptr->stats.msg_lengths_total += length;
353			l_ptr->stats.msg_length_counts++;
354			if (length <= 64)
355				l_ptr->stats.msg_length_profile[0]++;
356			else if (length <= 256)
357				l_ptr->stats.msg_length_profile[1]++;
358			else if (length <= 1024)
359				l_ptr->stats.msg_length_profile[2]++;
360			else if (length <= 4096)
361				l_ptr->stats.msg_length_profile[3]++;
362			else if (length <= 16384)
363				l_ptr->stats.msg_length_profile[4]++;
364			else if (length <= 32768)
365				l_ptr->stats.msg_length_profile[5]++;
366			else
367				l_ptr->stats.msg_length_profile[6]++;
368		}
369	}
370
371	/* do all other link processing performed on a periodic basis */
372
373	link_check_defragm_bufs(l_ptr);
374
375	link_state_event(l_ptr, TIMEOUT_EVT);
376
377	if (l_ptr->next_out)
378		tipc_link_push_queue(l_ptr);
379
380	tipc_node_unlock(l_ptr->owner);
381}
382
383static void link_set_timer(struct link *l_ptr, u32 time)
384{
385	k_start_timer(&l_ptr->timer, time);
386}
387
388/**
389 * tipc_link_create - create a new link
390 * @b_ptr: pointer to associated bearer
391 * @peer: network address of node at other end of link
392 * @media_addr: media address to use when sending messages over link
393 *
394 * Returns pointer to link.
395 */
396
397struct link *tipc_link_create(struct bearer *b_ptr, const u32 peer,
398			      const struct tipc_media_addr *media_addr)
399{
400	struct link *l_ptr;
401	struct tipc_msg *msg;
402	char *if_name;
403
404	l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
405	if (!l_ptr) {
406		warn("Link creation failed, no memory\n");
407		return NULL;
408	}
409
410	l_ptr->addr = peer;
411	if_name = strchr(b_ptr->publ.name, ':') + 1;
412	sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:",
413		tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
414		tipc_node(tipc_own_addr),
415		if_name,
416		tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
417		/* note: peer i/f is appended to link name by reset/activate */
418	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
419	k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
420	list_add_tail(&l_ptr->link_list, &b_ptr->links);
421	l_ptr->checkpoint = 1;
422	l_ptr->b_ptr = b_ptr;
423	link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
424	l_ptr->state = RESET_UNKNOWN;
425
426	l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
427	msg = l_ptr->pmsg;
428	msg_init(msg, LINK_PROTOCOL, RESET_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr);
429	msg_set_size(msg, sizeof(l_ptr->proto_msg));
430	msg_set_session(msg, tipc_random);
431	msg_set_bearer_id(msg, b_ptr->identity);
432	strcpy((char *)msg_data(msg), if_name);
433
434	l_ptr->priority = b_ptr->priority;
435	tipc_link_set_queue_limits(l_ptr, b_ptr->media->window);
436
437	link_init_max_pkt(l_ptr);
438
439	l_ptr->next_out_no = 1;
440	INIT_LIST_HEAD(&l_ptr->waiting_ports);
441
442	link_reset_statistics(l_ptr);
443
444	l_ptr->owner = tipc_node_attach_link(l_ptr);
445	if (!l_ptr->owner) {
446		kfree(l_ptr);
447		return NULL;
448	}
449
450	if (LINK_LOG_BUF_SIZE) {
451		char *pb = kmalloc(LINK_LOG_BUF_SIZE, GFP_ATOMIC);
452
453		if (!pb) {
454			kfree(l_ptr);
455			warn("Link creation failed, no memory for print buffer\n");
456			return NULL;
457		}
458		tipc_printbuf_init(&l_ptr->print_buf, pb, LINK_LOG_BUF_SIZE);
459	}
460
461	tipc_k_signal((Handler)tipc_link_start, (unsigned long)l_ptr);
462
463	dbg("tipc_link_create(): tolerance = %u,cont intv = %u, abort_limit = %u\n",
464	    l_ptr->tolerance, l_ptr->continuity_interval, l_ptr->abort_limit);
465
466	return l_ptr;
467}
468
469/**
470 * tipc_link_delete - delete a link
471 * @l_ptr: pointer to link
472 *
473 * Note: 'tipc_net_lock' is write_locked, bearer is locked.
474 * This routine must not grab the node lock until after link timer cancellation
475 * to avoid a potential deadlock situation.
476 */
477
478void tipc_link_delete(struct link *l_ptr)
479{
480	if (!l_ptr) {
481		err("Attempt to delete non-existent link\n");
482		return;
483	}
484
485	dbg("tipc_link_delete()\n");
486
487	k_cancel_timer(&l_ptr->timer);
488
489	tipc_node_lock(l_ptr->owner);
490	tipc_link_reset(l_ptr);
491	tipc_node_detach_link(l_ptr->owner, l_ptr);
492	tipc_link_stop(l_ptr);
493	list_del_init(&l_ptr->link_list);
494	if (LINK_LOG_BUF_SIZE)
495		kfree(l_ptr->print_buf.buf);
496	tipc_node_unlock(l_ptr->owner);
497	k_term_timer(&l_ptr->timer);
498	kfree(l_ptr);
499}
500
501void tipc_link_start(struct link *l_ptr)
502{
503	dbg("tipc_link_start %x\n", l_ptr);
504	link_state_event(l_ptr, STARTING_EVT);
505}
506
507/**
508 * link_schedule_port - schedule port for deferred sending
509 * @l_ptr: pointer to link
510 * @origport: reference to sending port
511 * @sz: amount of data to be sent
512 *
513 * Schedules port for renewed sending of messages after link congestion
514 * has abated.
515 */
516
517static int link_schedule_port(struct link *l_ptr, u32 origport, u32 sz)
518{
519	struct port *p_ptr;
520
521	spin_lock_bh(&tipc_port_list_lock);
522	p_ptr = tipc_port_lock(origport);
523	if (p_ptr) {
524		if (!p_ptr->wakeup)
525			goto exit;
526		if (!list_empty(&p_ptr->wait_list))
527			goto exit;
528		p_ptr->congested_link = l_ptr;
529		p_ptr->publ.congested = 1;
530		p_ptr->waiting_pkts = 1 + ((sz - 1) / link_max_pkt(l_ptr));
531		list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports);
532		l_ptr->stats.link_congs++;
533exit:
534		tipc_port_unlock(p_ptr);
535	}
536	spin_unlock_bh(&tipc_port_list_lock);
537	return -ELINKCONG;
538}
539
540void tipc_link_wakeup_ports(struct link *l_ptr, int all)
541{
542	struct port *p_ptr;
543	struct port *temp_p_ptr;
544	int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size;
545
546	if (all)
547		win = 100000;
548	if (win <= 0)
549		return;
550	if (!spin_trylock_bh(&tipc_port_list_lock))
551		return;
552	if (link_congested(l_ptr))
553		goto exit;
554	list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
555				 wait_list) {
556		if (win <= 0)
557			break;
558		list_del_init(&p_ptr->wait_list);
559		p_ptr->congested_link = NULL;
560		spin_lock_bh(p_ptr->publ.lock);
561		p_ptr->publ.congested = 0;
562		p_ptr->wakeup(&p_ptr->publ);
563		win -= p_ptr->waiting_pkts;
564		spin_unlock_bh(p_ptr->publ.lock);
565	}
566
567exit:
568	spin_unlock_bh(&tipc_port_list_lock);
569}
570
571/**
572 * link_release_outqueue - purge link's outbound message queue
573 * @l_ptr: pointer to link
574 */
575
576static void link_release_outqueue(struct link *l_ptr)
577{
578	struct sk_buff *buf = l_ptr->first_out;
579	struct sk_buff *next;
580
581	while (buf) {
582		next = buf->next;
583		buf_discard(buf);
584		buf = next;
585	}
586	l_ptr->first_out = NULL;
587	l_ptr->out_queue_size = 0;
588}
589
590/**
591 * tipc_link_reset_fragments - purge link's inbound message fragments queue
592 * @l_ptr: pointer to link
593 */
594
595void tipc_link_reset_fragments(struct link *l_ptr)
596{
597	struct sk_buff *buf = l_ptr->defragm_buf;
598	struct sk_buff *next;
599
600	while (buf) {
601		next = buf->next;
602		buf_discard(buf);
603		buf = next;
604	}
605	l_ptr->defragm_buf = NULL;
606}
607
608/**
609 * tipc_link_stop - purge all inbound and outbound messages associated with link
610 * @l_ptr: pointer to link
611 */
612
613void tipc_link_stop(struct link *l_ptr)
614{
615	struct sk_buff *buf;
616	struct sk_buff *next;
617
618	buf = l_ptr->oldest_deferred_in;
619	while (buf) {
620		next = buf->next;
621		buf_discard(buf);
622		buf = next;
623	}
624
625	buf = l_ptr->first_out;
626	while (buf) {
627		next = buf->next;
628		buf_discard(buf);
629		buf = next;
630	}
631
632	tipc_link_reset_fragments(l_ptr);
633
634	buf_discard(l_ptr->proto_msg_queue);
635	l_ptr->proto_msg_queue = NULL;
636}
637
638
639#define link_send_event(fcn, l_ptr, up) do { } while (0)
640
641
642void tipc_link_reset(struct link *l_ptr)
643{
644	struct sk_buff *buf;
645	u32 prev_state = l_ptr->state;
646	u32 checkpoint = l_ptr->next_in_no;
647	int was_active_link = tipc_link_is_active(l_ptr);
648
649	msg_set_session(l_ptr->pmsg, msg_session(l_ptr->pmsg) + 1);
650
651	/* Link is down, accept any session: */
652	l_ptr->peer_session = 0;
653
654	/* Prepare for max packet size negotiation */
655	link_init_max_pkt(l_ptr);
656
657	l_ptr->state = RESET_UNKNOWN;
658	dbg_link_state("Resetting Link\n");
659
660	if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
661		return;
662
663	tipc_node_link_down(l_ptr->owner, l_ptr);
664	tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
665	if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
666	    l_ptr->owner->permit_changeover) {
667		l_ptr->reset_checkpoint = checkpoint;
668		l_ptr->exp_msg_count = START_CHANGEOVER;
669	}
670
671	/* Clean up all queues: */
672
673	link_release_outqueue(l_ptr);
674	buf_discard(l_ptr->proto_msg_queue);
675	l_ptr->proto_msg_queue = NULL;
676	buf = l_ptr->oldest_deferred_in;
677	while (buf) {
678		struct sk_buff *next = buf->next;
679		buf_discard(buf);
680		buf = next;
681	}
682	if (!list_empty(&l_ptr->waiting_ports))
683		tipc_link_wakeup_ports(l_ptr, 1);
684
685	l_ptr->retransm_queue_head = 0;
686	l_ptr->retransm_queue_size = 0;
687	l_ptr->last_out = NULL;
688	l_ptr->first_out = NULL;
689	l_ptr->next_out = NULL;
690	l_ptr->unacked_window = 0;
691	l_ptr->checkpoint = 1;
692	l_ptr->next_out_no = 1;
693	l_ptr->deferred_inqueue_sz = 0;
694	l_ptr->oldest_deferred_in = NULL;
695	l_ptr->newest_deferred_in = NULL;
696	l_ptr->fsm_msg_cnt = 0;
697	l_ptr->stale_count = 0;
698	link_reset_statistics(l_ptr);
699
700	link_send_event(tipc_cfg_link_event, l_ptr, 0);
701	if (!in_own_cluster(l_ptr->addr))
702		link_send_event(tipc_disc_link_event, l_ptr, 0);
703}
704
705
706static void link_activate(struct link *l_ptr)
707{
708	l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
709	tipc_node_link_up(l_ptr->owner, l_ptr);
710	tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
711	link_send_event(tipc_cfg_link_event, l_ptr, 1);
712	if (!in_own_cluster(l_ptr->addr))
713		link_send_event(tipc_disc_link_event, l_ptr, 1);
714}
715
716/**
717 * link_state_event - link finite state machine
718 * @l_ptr: pointer to link
719 * @event: state machine event to process
720 */
721
722static void link_state_event(struct link *l_ptr, unsigned event)
723{
724	struct link *other;
725	u32 cont_intv = l_ptr->continuity_interval;
726
727	if (!l_ptr->started && (event != STARTING_EVT))
728		return;		/* Not yet. */
729
730	if (link_blocked(l_ptr)) {
731		if (event == TIMEOUT_EVT) {
732			link_set_timer(l_ptr, cont_intv);
733		}
734		return;	  /* Changeover going on */
735	}
736	dbg_link("STATE_EV: <%s> ", l_ptr->name);
737
738	switch (l_ptr->state) {
739	case WORKING_WORKING:
740		dbg_link("WW/");
741		switch (event) {
742		case TRAFFIC_MSG_EVT:
743			dbg_link("TRF-");
744			/* fall through */
745		case ACTIVATE_MSG:
746			dbg_link("ACT\n");
747			break;
748		case TIMEOUT_EVT:
749			dbg_link("TIM ");
750			if (l_ptr->next_in_no != l_ptr->checkpoint) {
751				l_ptr->checkpoint = l_ptr->next_in_no;
752				if (tipc_bclink_acks_missing(l_ptr->owner)) {
753					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
754								 0, 0, 0, 0, 0);
755					l_ptr->fsm_msg_cnt++;
756				} else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
757					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
758								 1, 0, 0, 0, 0);
759					l_ptr->fsm_msg_cnt++;
760				}
761				link_set_timer(l_ptr, cont_intv);
762				break;
763			}
764			dbg_link(" -> WU\n");
765			l_ptr->state = WORKING_UNKNOWN;
766			l_ptr->fsm_msg_cnt = 0;
767			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
768			l_ptr->fsm_msg_cnt++;
769			link_set_timer(l_ptr, cont_intv / 4);
770			break;
771		case RESET_MSG:
772			dbg_link("RES -> RR\n");
773			info("Resetting link <%s>, requested by peer\n",
774			     l_ptr->name);
775			tipc_link_reset(l_ptr);
776			l_ptr->state = RESET_RESET;
777			l_ptr->fsm_msg_cnt = 0;
778			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
779			l_ptr->fsm_msg_cnt++;
780			link_set_timer(l_ptr, cont_intv);
781			break;
782		default:
783			err("Unknown link event %u in WW state\n", event);
784		}
785		break;
786	case WORKING_UNKNOWN:
787		dbg_link("WU/");
788		switch (event) {
789		case TRAFFIC_MSG_EVT:
790			dbg_link("TRF-");
791		case ACTIVATE_MSG:
792			dbg_link("ACT -> WW\n");
793			l_ptr->state = WORKING_WORKING;
794			l_ptr->fsm_msg_cnt = 0;
795			link_set_timer(l_ptr, cont_intv);
796			break;
797		case RESET_MSG:
798			dbg_link("RES -> RR\n");
799			info("Resetting link <%s>, requested by peer "
800			     "while probing\n", l_ptr->name);
801			tipc_link_reset(l_ptr);
802			l_ptr->state = RESET_RESET;
803			l_ptr->fsm_msg_cnt = 0;
804			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
805			l_ptr->fsm_msg_cnt++;
806			link_set_timer(l_ptr, cont_intv);
807			break;
808		case TIMEOUT_EVT:
809			dbg_link("TIM ");
810			if (l_ptr->next_in_no != l_ptr->checkpoint) {
811				dbg_link("-> WW \n");
812				l_ptr->state = WORKING_WORKING;
813				l_ptr->fsm_msg_cnt = 0;
814				l_ptr->checkpoint = l_ptr->next_in_no;
815				if (tipc_bclink_acks_missing(l_ptr->owner)) {
816					tipc_link_send_proto_msg(l_ptr, STATE_MSG,
817								 0, 0, 0, 0, 0);
818					l_ptr->fsm_msg_cnt++;
819				}
820				link_set_timer(l_ptr, cont_intv);
821			} else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
822				dbg_link("Probing %u/%u,timer = %u ms)\n",
823					 l_ptr->fsm_msg_cnt, l_ptr->abort_limit,
824					 cont_intv / 4);
825				tipc_link_send_proto_msg(l_ptr, STATE_MSG,
826							 1, 0, 0, 0, 0);
827				l_ptr->fsm_msg_cnt++;
828				link_set_timer(l_ptr, cont_intv / 4);
829			} else {	/* Link has failed */
830				dbg_link("-> RU (%u probes unanswered)\n",
831					 l_ptr->fsm_msg_cnt);
832				warn("Resetting link <%s>, peer not responding\n",
833				     l_ptr->name);
834				tipc_link_reset(l_ptr);
835				l_ptr->state = RESET_UNKNOWN;
836				l_ptr->fsm_msg_cnt = 0;
837				tipc_link_send_proto_msg(l_ptr, RESET_MSG,
838							 0, 0, 0, 0, 0);
839				l_ptr->fsm_msg_cnt++;
840				link_set_timer(l_ptr, cont_intv);
841			}
842			break;
843		default:
844			err("Unknown link event %u in WU state\n", event);
845		}
846		break;
847	case RESET_UNKNOWN:
848		dbg_link("RU/");
849		switch (event) {
850		case TRAFFIC_MSG_EVT:
851			dbg_link("TRF-\n");
852			break;
853		case ACTIVATE_MSG:
854			other = l_ptr->owner->active_links[0];
855			if (other && link_working_unknown(other)) {
856				dbg_link("ACT\n");
857				break;
858			}
859			dbg_link("ACT -> WW\n");
860			l_ptr->state = WORKING_WORKING;
861			l_ptr->fsm_msg_cnt = 0;
862			link_activate(l_ptr);
863			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
864			l_ptr->fsm_msg_cnt++;
865			link_set_timer(l_ptr, cont_intv);
866			break;
867		case RESET_MSG:
868			dbg_link("RES \n");
869			dbg_link(" -> RR\n");
870			l_ptr->state = RESET_RESET;
871			l_ptr->fsm_msg_cnt = 0;
872			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
873			l_ptr->fsm_msg_cnt++;
874			link_set_timer(l_ptr, cont_intv);
875			break;
876		case STARTING_EVT:
877			dbg_link("START-");
878			l_ptr->started = 1;
879			/* fall through */
880		case TIMEOUT_EVT:
881			dbg_link("TIM \n");
882			tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
883			l_ptr->fsm_msg_cnt++;
884			link_set_timer(l_ptr, cont_intv);
885			break;
886		default:
887			err("Unknown link event %u in RU state\n", event);
888		}
889		break;
890	case RESET_RESET:
891		dbg_link("RR/ ");
892		switch (event) {
893		case TRAFFIC_MSG_EVT:
894			dbg_link("TRF-");
895			/* fall through */
896		case ACTIVATE_MSG:
897			other = l_ptr->owner->active_links[0];
898			if (other && link_working_unknown(other)) {
899				dbg_link("ACT\n");
900				break;
901			}
902			dbg_link("ACT -> WW\n");
903			l_ptr->state = WORKING_WORKING;
904			l_ptr->fsm_msg_cnt = 0;
905			link_activate(l_ptr);
906			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
907			l_ptr->fsm_msg_cnt++;
908			link_set_timer(l_ptr, cont_intv);
909			break;
910		case RESET_MSG:
911			dbg_link("RES\n");
912			break;
913		case TIMEOUT_EVT:
914			dbg_link("TIM\n");
915			tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
916			l_ptr->fsm_msg_cnt++;
917			link_set_timer(l_ptr, cont_intv);
918			dbg_link("fsm_msg_cnt %u\n", l_ptr->fsm_msg_cnt);
919			break;
920		default:
921			err("Unknown link event %u in RR state\n", event);
922		}
923		break;
924	default:
925		err("Unknown link state %u/%u\n", l_ptr->state, event);
926	}
927}
928
929/*
930 * link_bundle_buf(): Append contents of a buffer to
931 * the tail of an existing one.
932 */
933
934static int link_bundle_buf(struct link *l_ptr,
935			   struct sk_buff *bundler,
936			   struct sk_buff *buf)
937{
938	struct tipc_msg *bundler_msg = buf_msg(bundler);
939	struct tipc_msg *msg = buf_msg(buf);
940	u32 size = msg_size(msg);
941	u32 bundle_size = msg_size(bundler_msg);
942	u32 to_pos = align(bundle_size);
943	u32 pad = to_pos - bundle_size;
944
945	if (msg_user(bundler_msg) != MSG_BUNDLER)
946		return 0;
947	if (msg_type(bundler_msg) != OPEN_MSG)
948		return 0;
949	if (skb_tailroom(bundler) < (pad + size))
950		return 0;
951	if (link_max_pkt(l_ptr) < (to_pos + size))
952		return 0;
953
954	skb_put(bundler, pad + size);
955	skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
956	msg_set_size(bundler_msg, to_pos + size);
957	msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
958	dbg("Packed msg # %u(%u octets) into pos %u in buf(#%u)\n",
959	    msg_msgcnt(bundler_msg), size, to_pos, msg_seqno(bundler_msg));
960	msg_dbg(msg, "PACKD:");
961	buf_discard(buf);
962	l_ptr->stats.sent_bundled++;
963	return 1;
964}
965
966static void link_add_to_outqueue(struct link *l_ptr,
967				 struct sk_buff *buf,
968				 struct tipc_msg *msg)
969{
970	u32 ack = mod(l_ptr->next_in_no - 1);
971	u32 seqno = mod(l_ptr->next_out_no++);
972
973	msg_set_word(msg, 2, ((ack << 16) | seqno));
974	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
975	buf->next = NULL;
976	if (l_ptr->first_out) {
977		l_ptr->last_out->next = buf;
978		l_ptr->last_out = buf;
979	} else
980		l_ptr->first_out = l_ptr->last_out = buf;
981	l_ptr->out_queue_size++;
982}
983
984/*
985 * tipc_link_send_buf() is the 'full path' for messages, called from
986 * inside TIPC when the 'fast path' in tipc_send_buf
987 * has failed, and from link_send()
988 */
989
990int tipc_link_send_buf(struct link *l_ptr, struct sk_buff *buf)
991{
992	struct tipc_msg *msg = buf_msg(buf);
993	u32 size = msg_size(msg);
994	u32 dsz = msg_data_sz(msg);
995	u32 queue_size = l_ptr->out_queue_size;
996	u32 imp = msg_tot_importance(msg);
997	u32 queue_limit = l_ptr->queue_limit[imp];
998	u32 max_packet = link_max_pkt(l_ptr);
999
1000	msg_set_prevnode(msg, tipc_own_addr);	/* If routed message */
1001
1002	/* Match msg importance against queue limits: */
1003
1004	if (unlikely(queue_size >= queue_limit)) {
1005		if (imp <= TIPC_CRITICAL_IMPORTANCE) {
1006			return link_schedule_port(l_ptr, msg_origport(msg),
1007						  size);
1008		}
1009		msg_dbg(msg, "TIPC: Congestion, throwing away\n");
1010		buf_discard(buf);
1011		if (imp > CONN_MANAGER) {
1012			warn("Resetting link <%s>, send queue full", l_ptr->name);
1013			tipc_link_reset(l_ptr);
1014		}
1015		return dsz;
1016	}
1017
1018	/* Fragmentation needed ? */
1019
1020	if (size > max_packet)
1021		return tipc_link_send_long_buf(l_ptr, buf);
1022
1023	/* Packet can be queued or sent: */
1024
1025	if (queue_size > l_ptr->stats.max_queue_sz)
1026		l_ptr->stats.max_queue_sz = queue_size;
1027
1028	if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) &&
1029		   !link_congested(l_ptr))) {
1030		link_add_to_outqueue(l_ptr, buf, msg);
1031
1032		if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) {
1033			l_ptr->unacked_window = 0;
1034		} else {
1035			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1036			l_ptr->stats.bearer_congs++;
1037			l_ptr->next_out = buf;
1038		}
1039		return dsz;
1040	}
1041	/* Congestion: can message be bundled ?: */
1042
1043	if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
1044	    (msg_user(msg) != MSG_FRAGMENTER)) {
1045
1046		/* Try adding message to an existing bundle */
1047
1048		if (l_ptr->next_out &&
1049		    link_bundle_buf(l_ptr, l_ptr->last_out, buf)) {
1050			tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1051			return dsz;
1052		}
1053
1054		/* Try creating a new bundle */
1055
1056		if (size <= max_packet * 2 / 3) {
1057			struct sk_buff *bundler = buf_acquire(max_packet);
1058			struct tipc_msg bundler_hdr;
1059
1060			if (bundler) {
1061				msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
1062					 TIPC_OK, INT_H_SIZE, l_ptr->addr);
1063				skb_copy_to_linear_data(bundler, &bundler_hdr,
1064							INT_H_SIZE);
1065				skb_trim(bundler, INT_H_SIZE);
1066				link_bundle_buf(l_ptr, bundler, buf);
1067				buf = bundler;
1068				msg = buf_msg(buf);
1069				l_ptr->stats.sent_bundles++;
1070			}
1071		}
1072	}
1073	if (!l_ptr->next_out)
1074		l_ptr->next_out = buf;
1075	link_add_to_outqueue(l_ptr, buf, msg);
1076	tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
1077	return dsz;
1078}
1079
1080/*
1081 * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
1082 * not been selected yet, and the the owner node is not locked
1083 * Called by TIPC internal users, e.g. the name distributor
1084 */
1085
1086int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
1087{
1088	struct link *l_ptr;
1089	struct node *n_ptr;
1090	int res = -ELINKCONG;
1091
1092	read_lock_bh(&tipc_net_lock);
1093	n_ptr = tipc_node_select(dest, selector);
1094	if (n_ptr) {
1095		tipc_node_lock(n_ptr);
1096		l_ptr = n_ptr->active_links[selector & 1];
1097		if (l_ptr) {
1098			dbg("tipc_link_send: found link %x for dest %x\n", l_ptr, dest);
1099			res = tipc_link_send_buf(l_ptr, buf);
1100		} else {
1101			dbg("Attempt to send msg to unreachable node:\n");
1102			msg_dbg(buf_msg(buf),">>>");
1103			buf_discard(buf);
1104		}
1105		tipc_node_unlock(n_ptr);
1106	} else {
1107		dbg("Attempt to send msg to unknown node:\n");
1108		msg_dbg(buf_msg(buf),">>>");
1109		buf_discard(buf);
1110	}
1111	read_unlock_bh(&tipc_net_lock);
1112	return res;
1113}
1114
1115/*
1116 * link_send_buf_fast: Entry for data messages where the
1117 * destination link is known and the header is complete,
1118 * inclusive total message length. Very time critical.
1119 * Link is locked. Returns user data length.
1120 */
1121
1122static int link_send_buf_fast(struct link *l_ptr, struct sk_buff *buf,
1123			      u32 *used_max_pkt)
1124{
1125	struct tipc_msg *msg = buf_msg(buf);
1126	int res = msg_data_sz(msg);
1127
1128	if (likely(!link_congested(l_ptr))) {
1129		if (likely(msg_size(msg) <= link_max_pkt(l_ptr))) {
1130			if (likely(list_empty(&l_ptr->b_ptr->cong_links))) {
1131				link_add_to_outqueue(l_ptr, buf, msg);
1132				if (likely(tipc_bearer_send(l_ptr->b_ptr, buf,
1133							    &l_ptr->media_addr))) {
1134					l_ptr->unacked_window = 0;
1135					msg_dbg(msg,"SENT_FAST:");
1136					return res;
1137				}
1138				dbg("failed sent fast...\n");
1139				tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1140				l_ptr->stats.bearer_congs++;
1141				l_ptr->next_out = buf;
1142				return res;
1143			}
1144		}
1145		else
1146			*used_max_pkt = link_max_pkt(l_ptr);
1147	}
1148	return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
1149}
1150
1151/*
1152 * tipc_send_buf_fast: Entry for data messages where the
1153 * destination node is known and the header is complete,
1154 * inclusive total message length.
1155 * Returns user data length.
1156 */
1157int tipc_send_buf_fast(struct sk_buff *buf, u32 destnode)
1158{
1159	struct link *l_ptr;
1160	struct node *n_ptr;
1161	int res;
1162	u32 selector = msg_origport(buf_msg(buf)) & 1;
1163	u32 dummy;
1164
1165	if (destnode == tipc_own_addr)
1166		return tipc_port_recv_msg(buf);
1167
1168	read_lock_bh(&tipc_net_lock);
1169	n_ptr = tipc_node_select(destnode, selector);
1170	if (likely(n_ptr)) {
1171		tipc_node_lock(n_ptr);
1172		l_ptr = n_ptr->active_links[selector];
1173		dbg("send_fast: buf %x selected %x, destnode = %x\n",
1174		    buf, l_ptr, destnode);
1175		if (likely(l_ptr)) {
1176			res = link_send_buf_fast(l_ptr, buf, &dummy);
1177			tipc_node_unlock(n_ptr);
1178			read_unlock_bh(&tipc_net_lock);
1179			return res;
1180		}
1181		tipc_node_unlock(n_ptr);
1182	}
1183	read_unlock_bh(&tipc_net_lock);
1184	res = msg_data_sz(buf_msg(buf));
1185	tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1186	return res;
1187}
1188
1189
1190/*
1191 * tipc_link_send_sections_fast: Entry for messages where the
1192 * destination processor is known and the header is complete,
1193 * except for total message length.
1194 * Returns user data length or errno.
1195 */
1196int tipc_link_send_sections_fast(struct port *sender,
1197				 struct iovec const *msg_sect,
1198				 const u32 num_sect,
1199				 u32 destaddr)
1200{
1201	struct tipc_msg *hdr = &sender->publ.phdr;
1202	struct link *l_ptr;
1203	struct sk_buff *buf;
1204	struct node *node;
1205	int res;
1206	u32 selector = msg_origport(hdr) & 1;
1207
1208again:
1209	/*
1210	 * Try building message using port's max_pkt hint.
1211	 * (Must not hold any locks while building message.)
1212	 */
1213
1214	res = msg_build(hdr, msg_sect, num_sect, sender->max_pkt,
1215			!sender->user_port, &buf);
1216
1217	read_lock_bh(&tipc_net_lock);
1218	node = tipc_node_select(destaddr, selector);
1219	if (likely(node)) {
1220		tipc_node_lock(node);
1221		l_ptr = node->active_links[selector];
1222		if (likely(l_ptr)) {
1223			if (likely(buf)) {
1224				res = link_send_buf_fast(l_ptr, buf,
1225							 &sender->max_pkt);
1226				if (unlikely(res < 0))
1227					buf_discard(buf);
1228exit:
1229				tipc_node_unlock(node);
1230				read_unlock_bh(&tipc_net_lock);
1231				return res;
1232			}
1233
1234			/* Exit if build request was invalid */
1235
1236			if (unlikely(res < 0))
1237				goto exit;
1238
1239			/* Exit if link (or bearer) is congested */
1240
1241			if (link_congested(l_ptr) ||
1242			    !list_empty(&l_ptr->b_ptr->cong_links)) {
1243				res = link_schedule_port(l_ptr,
1244							 sender->publ.ref, res);
1245				goto exit;
1246			}
1247
1248			/*
1249			 * Message size exceeds max_pkt hint; update hint,
1250			 * then re-try fast path or fragment the message
1251			 */
1252
1253			sender->max_pkt = link_max_pkt(l_ptr);
1254			tipc_node_unlock(node);
1255			read_unlock_bh(&tipc_net_lock);
1256
1257
1258			if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
1259				goto again;
1260
1261			return link_send_sections_long(sender, msg_sect,
1262						       num_sect, destaddr);
1263		}
1264		tipc_node_unlock(node);
1265	}
1266	read_unlock_bh(&tipc_net_lock);
1267
1268	/* Couldn't find a link to the destination node */
1269
1270	if (buf)
1271		return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
1272	if (res >= 0)
1273		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1274						 TIPC_ERR_NO_NODE);
1275	return res;
1276}
1277
1278/*
1279 * link_send_sections_long(): Entry for long messages where the
1280 * destination node is known and the header is complete,
1281 * inclusive total message length.
1282 * Link and bearer congestion status have been checked to be ok,
1283 * and are ignored if they change.
1284 *
1285 * Note that fragments do not use the full link MTU so that they won't have
1286 * to undergo refragmentation if link changeover causes them to be sent
1287 * over another link with an additional tunnel header added as prefix.
1288 * (Refragmentation will still occur if the other link has a smaller MTU.)
1289 *
1290 * Returns user data length or errno.
1291 */
1292static int link_send_sections_long(struct port *sender,
1293				   struct iovec const *msg_sect,
1294				   u32 num_sect,
1295				   u32 destaddr)
1296{
1297	struct link *l_ptr;
1298	struct node *node;
1299	struct tipc_msg *hdr = &sender->publ.phdr;
1300	u32 dsz = msg_data_sz(hdr);
1301	u32 max_pkt,fragm_sz,rest;
1302	struct tipc_msg fragm_hdr;
1303	struct sk_buff *buf,*buf_chain,*prev;
1304	u32 fragm_crs,fragm_rest,hsz,sect_rest;
1305	const unchar *sect_crs;
1306	int curr_sect;
1307	u32 fragm_no;
1308
1309again:
1310	fragm_no = 1;
1311	max_pkt = sender->max_pkt - INT_H_SIZE;
1312		/* leave room for tunnel header in case of link changeover */
1313	fragm_sz = max_pkt - INT_H_SIZE;
1314		/* leave room for fragmentation header in each fragment */
1315	rest = dsz;
1316	fragm_crs = 0;
1317	fragm_rest = 0;
1318	sect_rest = 0;
1319	sect_crs = NULL;
1320	curr_sect = -1;
1321
1322	/* Prepare reusable fragment header: */
1323
1324	msg_dbg(hdr, ">FRAGMENTING>");
1325	msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
1326		 TIPC_OK, INT_H_SIZE, msg_destnode(hdr));
1327	msg_set_link_selector(&fragm_hdr, sender->publ.ref);
1328	msg_set_size(&fragm_hdr, max_pkt);
1329	msg_set_fragm_no(&fragm_hdr, 1);
1330
1331	/* Prepare header of first fragment: */
1332
1333	buf_chain = buf = buf_acquire(max_pkt);
1334	if (!buf)
1335		return -ENOMEM;
1336	buf->next = NULL;
1337	skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1338	hsz = msg_hdr_sz(hdr);
1339	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, hdr, hsz);
1340	msg_dbg(buf_msg(buf), ">BUILD>");
1341
1342	/* Chop up message: */
1343
1344	fragm_crs = INT_H_SIZE + hsz;
1345	fragm_rest = fragm_sz - hsz;
1346
1347	do {		/* For all sections */
1348		u32 sz;
1349
1350		if (!sect_rest) {
1351			sect_rest = msg_sect[++curr_sect].iov_len;
1352			sect_crs = (const unchar *)msg_sect[curr_sect].iov_base;
1353		}
1354
1355		if (sect_rest < fragm_rest)
1356			sz = sect_rest;
1357		else
1358			sz = fragm_rest;
1359
1360		if (likely(!sender->user_port)) {
1361			if (copy_from_user(buf->data + fragm_crs, sect_crs, sz)) {
1362error:
1363				for (; buf_chain; buf_chain = buf) {
1364					buf = buf_chain->next;
1365					buf_discard(buf_chain);
1366				}
1367				return -EFAULT;
1368			}
1369		} else
1370			skb_copy_to_linear_data_offset(buf, fragm_crs,
1371						       sect_crs, sz);
1372		sect_crs += sz;
1373		sect_rest -= sz;
1374		fragm_crs += sz;
1375		fragm_rest -= sz;
1376		rest -= sz;
1377
1378		if (!fragm_rest && rest) {
1379
1380			/* Initiate new fragment: */
1381			if (rest <= fragm_sz) {
1382				fragm_sz = rest;
1383				msg_set_type(&fragm_hdr,LAST_FRAGMENT);
1384			} else {
1385				msg_set_type(&fragm_hdr, FRAGMENT);
1386			}
1387			msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
1388			msg_set_fragm_no(&fragm_hdr, ++fragm_no);
1389			prev = buf;
1390			buf = buf_acquire(fragm_sz + INT_H_SIZE);
1391			if (!buf)
1392				goto error;
1393
1394			buf->next = NULL;
1395			prev->next = buf;
1396			skb_copy_to_linear_data(buf, &fragm_hdr, INT_H_SIZE);
1397			fragm_crs = INT_H_SIZE;
1398			fragm_rest = fragm_sz;
1399			msg_dbg(buf_msg(buf),"  >BUILD>");
1400		}
1401	}
1402	while (rest > 0);
1403
1404	/*
1405	 * Now we have a buffer chain. Select a link and check
1406	 * that packet size is still OK
1407	 */
1408	node = tipc_node_select(destaddr, sender->publ.ref & 1);
1409	if (likely(node)) {
1410		tipc_node_lock(node);
1411		l_ptr = node->active_links[sender->publ.ref & 1];
1412		if (!l_ptr) {
1413			tipc_node_unlock(node);
1414			goto reject;
1415		}
1416		if (link_max_pkt(l_ptr) < max_pkt) {
1417			sender->max_pkt = link_max_pkt(l_ptr);
1418			tipc_node_unlock(node);
1419			for (; buf_chain; buf_chain = buf) {
1420				buf = buf_chain->next;
1421				buf_discard(buf_chain);
1422			}
1423			goto again;
1424		}
1425	} else {
1426reject:
1427		for (; buf_chain; buf_chain = buf) {
1428			buf = buf_chain->next;
1429			buf_discard(buf_chain);
1430		}
1431		return tipc_port_reject_sections(sender, hdr, msg_sect, num_sect,
1432						 TIPC_ERR_NO_NODE);
1433	}
1434
1435	/* Append whole chain to send queue: */
1436
1437	buf = buf_chain;
1438	l_ptr->long_msg_seq_no = mod(l_ptr->long_msg_seq_no + 1);
1439	if (!l_ptr->next_out)
1440		l_ptr->next_out = buf_chain;
1441	l_ptr->stats.sent_fragmented++;
1442	while (buf) {
1443		struct sk_buff *next = buf->next;
1444		struct tipc_msg *msg = buf_msg(buf);
1445
1446		l_ptr->stats.sent_fragments++;
1447		msg_set_long_msgno(msg, l_ptr->long_msg_seq_no);
1448		link_add_to_outqueue(l_ptr, buf, msg);
1449		msg_dbg(msg, ">ADD>");
1450		buf = next;
1451	}
1452
1453	/* Send it, if possible: */
1454
1455	tipc_link_push_queue(l_ptr);
1456	tipc_node_unlock(node);
1457	return dsz;
1458}
1459
1460/*
1461 * tipc_link_push_packet: Push one unsent packet to the media
1462 */
1463u32 tipc_link_push_packet(struct link *l_ptr)
1464{
1465	struct sk_buff *buf = l_ptr->first_out;
1466	u32 r_q_size = l_ptr->retransm_queue_size;
1467	u32 r_q_head = l_ptr->retransm_queue_head;
1468
1469	/* Step to position where retransmission failed, if any,    */
1470	/* consider that buffers may have been released in meantime */
1471
1472	if (r_q_size && buf) {
1473		u32 last = lesser(mod(r_q_head + r_q_size),
1474				  link_last_sent(l_ptr));
1475		u32 first = msg_seqno(buf_msg(buf));
1476
1477		while (buf && less(first, r_q_head)) {
1478			first = mod(first + 1);
1479			buf = buf->next;
1480		}
1481		l_ptr->retransm_queue_head = r_q_head = first;
1482		l_ptr->retransm_queue_size = r_q_size = mod(last - first);
1483	}
1484
1485	/* Continue retransmission now, if there is anything: */
1486
1487	if (r_q_size && buf && !skb_cloned(buf)) {
1488		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1489		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1490		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1491			msg_dbg(buf_msg(buf), ">DEF-RETR>");
1492			l_ptr->retransm_queue_head = mod(++r_q_head);
1493			l_ptr->retransm_queue_size = --r_q_size;
1494			l_ptr->stats.retransmitted++;
1495			return TIPC_OK;
1496		} else {
1497			l_ptr->stats.bearer_congs++;
1498			msg_dbg(buf_msg(buf), "|>DEF-RETR>");
1499			return PUSH_FAILED;
1500		}
1501	}
1502
1503	/* Send deferred protocol message, if any: */
1504
1505	buf = l_ptr->proto_msg_queue;
1506	if (buf) {
1507		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1508		msg_set_bcast_ack(buf_msg(buf),l_ptr->owner->bclink.last_in);
1509		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1510			msg_dbg(buf_msg(buf), ">DEF-PROT>");
1511			l_ptr->unacked_window = 0;
1512			buf_discard(buf);
1513			l_ptr->proto_msg_queue = NULL;
1514			return TIPC_OK;
1515		} else {
1516			msg_dbg(buf_msg(buf), "|>DEF-PROT>");
1517			l_ptr->stats.bearer_congs++;
1518			return PUSH_FAILED;
1519		}
1520	}
1521
1522	/* Send one deferred data message, if send window not full: */
1523
1524	buf = l_ptr->next_out;
1525	if (buf) {
1526		struct tipc_msg *msg = buf_msg(buf);
1527		u32 next = msg_seqno(msg);
1528		u32 first = msg_seqno(buf_msg(l_ptr->first_out));
1529
1530		if (mod(next - first) < l_ptr->queue_limit[0]) {
1531			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1532			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1533			if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1534				if (msg_user(msg) == MSG_BUNDLER)
1535					msg_set_type(msg, CLOSED_MSG);
1536				msg_dbg(msg, ">PUSH-DATA>");
1537				l_ptr->next_out = buf->next;
1538				return TIPC_OK;
1539			} else {
1540				msg_dbg(msg, "|PUSH-DATA|");
1541				l_ptr->stats.bearer_congs++;
1542				return PUSH_FAILED;
1543			}
1544		}
1545	}
1546	return PUSH_FINISHED;
1547}
1548
1549/*
1550 * push_queue(): push out the unsent messages of a link where
1551 *               congestion has abated. Node is locked
1552 */
1553void tipc_link_push_queue(struct link *l_ptr)
1554{
1555	u32 res;
1556
1557	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr))
1558		return;
1559
1560	do {
1561		res = tipc_link_push_packet(l_ptr);
1562	}
1563	while (res == TIPC_OK);
1564	if (res == PUSH_FAILED)
1565		tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1566}
1567
1568static void link_reset_all(unsigned long addr)
1569{
1570	struct node *n_ptr;
1571	char addr_string[16];
1572	u32 i;
1573
1574	read_lock_bh(&tipc_net_lock);
1575	n_ptr = tipc_node_find((u32)addr);
1576	if (!n_ptr) {
1577		read_unlock_bh(&tipc_net_lock);
1578		return;	/* node no longer exists */
1579	}
1580
1581	tipc_node_lock(n_ptr);
1582
1583	warn("Resetting all links to %s\n",
1584	     addr_string_fill(addr_string, n_ptr->addr));
1585
1586	for (i = 0; i < MAX_BEARERS; i++) {
1587		if (n_ptr->links[i]) {
1588			link_print(n_ptr->links[i], TIPC_OUTPUT,
1589				   "Resetting link\n");
1590			tipc_link_reset(n_ptr->links[i]);
1591		}
1592	}
1593
1594	tipc_node_unlock(n_ptr);
1595	read_unlock_bh(&tipc_net_lock);
1596}
1597
1598static void link_retransmit_failure(struct link *l_ptr, struct sk_buff *buf)
1599{
1600	struct tipc_msg *msg = buf_msg(buf);
1601
1602	warn("Retransmission failure on link <%s>\n", l_ptr->name);
1603	tipc_msg_print(TIPC_OUTPUT, msg, ">RETR-FAIL>");
1604
1605	if (l_ptr->addr) {
1606
1607		/* Handle failure on standard link */
1608
1609		link_print(l_ptr, TIPC_OUTPUT, "Resetting link\n");
1610		tipc_link_reset(l_ptr);
1611
1612	} else {
1613
1614		/* Handle failure on broadcast link */
1615
1616		struct node *n_ptr;
1617		char addr_string[16];
1618
1619		tipc_printf(TIPC_OUTPUT, "Msg seq number: %u,  ", msg_seqno(msg));
1620		tipc_printf(TIPC_OUTPUT, "Outstanding acks: %lu\n",
1621				     (unsigned long) TIPC_SKB_CB(buf)->handle);
1622
1623		n_ptr = l_ptr->owner->next;
1624		tipc_node_lock(n_ptr);
1625
1626		addr_string_fill(addr_string, n_ptr->addr);
1627		tipc_printf(TIPC_OUTPUT, "Multicast link info for %s\n", addr_string);
1628		tipc_printf(TIPC_OUTPUT, "Supported: %d,  ", n_ptr->bclink.supported);
1629		tipc_printf(TIPC_OUTPUT, "Acked: %u\n", n_ptr->bclink.acked);
1630		tipc_printf(TIPC_OUTPUT, "Last in: %u,  ", n_ptr->bclink.last_in);
1631		tipc_printf(TIPC_OUTPUT, "Gap after: %u,  ", n_ptr->bclink.gap_after);
1632		tipc_printf(TIPC_OUTPUT, "Gap to: %u\n", n_ptr->bclink.gap_to);
1633		tipc_printf(TIPC_OUTPUT, "Nack sync: %u\n\n", n_ptr->bclink.nack_sync);
1634
1635		tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1636
1637		tipc_node_unlock(n_ptr);
1638
1639		l_ptr->stale_count = 0;
1640	}
1641}
1642
1643void tipc_link_retransmit(struct link *l_ptr, struct sk_buff *buf,
1644			  u32 retransmits)
1645{
1646	struct tipc_msg *msg;
1647
1648	if (!buf)
1649		return;
1650
1651	msg = buf_msg(buf);
1652
1653	dbg("Retransmitting %u in link %x\n", retransmits, l_ptr);
1654
1655	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
1656		if (!skb_cloned(buf)) {
1657			msg_dbg(msg, ">NO_RETR->BCONG>");
1658			dbg_print_link(l_ptr, "   ");
1659			l_ptr->retransm_queue_head = msg_seqno(msg);
1660			l_ptr->retransm_queue_size = retransmits;
1661			return;
1662		} else {
1663			/* Don't retransmit if driver already has the buffer */
1664		}
1665	} else {
1666		/* Detect repeated retransmit failures on uncongested bearer */
1667
1668		if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1669			if (++l_ptr->stale_count > 100) {
1670				link_retransmit_failure(l_ptr, buf);
1671				return;
1672			}
1673		} else {
1674			l_ptr->last_retransmitted = msg_seqno(msg);
1675			l_ptr->stale_count = 1;
1676		}
1677	}
1678
1679	while (retransmits && (buf != l_ptr->next_out) && buf && !skb_cloned(buf)) {
1680		msg = buf_msg(buf);
1681		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1682		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1683		if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1684			msg_dbg(buf_msg(buf), ">RETR>");
1685			buf = buf->next;
1686			retransmits--;
1687			l_ptr->stats.retransmitted++;
1688		} else {
1689			tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1690			l_ptr->stats.bearer_congs++;
1691			l_ptr->retransm_queue_head = msg_seqno(buf_msg(buf));
1692			l_ptr->retransm_queue_size = retransmits;
1693			return;
1694		}
1695	}
1696
1697	l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1698}
1699
1700/*
1701 * link_recv_non_seq: Receive packets which are outside
1702 *                    the link sequence flow
1703 */
1704
1705static void link_recv_non_seq(struct sk_buff *buf)
1706{
1707	struct tipc_msg *msg = buf_msg(buf);
1708
1709	if (msg_user(msg) ==  LINK_CONFIG)
1710		tipc_disc_recv_msg(buf);
1711	else
1712		tipc_bclink_recv_pkt(buf);
1713}
1714
1715/**
1716 * link_insert_deferred_queue - insert deferred messages back into receive chain
1717 */
1718
1719static struct sk_buff *link_insert_deferred_queue(struct link *l_ptr,
1720						  struct sk_buff *buf)
1721{
1722	u32 seq_no;
1723
1724	if (l_ptr->oldest_deferred_in == NULL)
1725		return buf;
1726
1727	seq_no = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
1728	if (seq_no == mod(l_ptr->next_in_no)) {
1729		l_ptr->newest_deferred_in->next = buf;
1730		buf = l_ptr->oldest_deferred_in;
1731		l_ptr->oldest_deferred_in = NULL;
1732		l_ptr->deferred_inqueue_sz = 0;
1733	}
1734	return buf;
1735}
1736
1737void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *tb_ptr)
1738{
1739	read_lock_bh(&tipc_net_lock);
1740	while (head) {
1741		struct bearer *b_ptr;
1742		struct node *n_ptr;
1743		struct link *l_ptr;
1744		struct sk_buff *crs;
1745		struct sk_buff *buf = head;
1746		struct tipc_msg *msg = buf_msg(buf);
1747		u32 seq_no = msg_seqno(msg);
1748		u32 ackd = msg_ack(msg);
1749		u32 released = 0;
1750		int type;
1751
1752		b_ptr = (struct bearer *)tb_ptr;
1753		TIPC_SKB_CB(buf)->handle = b_ptr;
1754
1755		head = head->next;
1756		if (unlikely(msg_version(msg) != TIPC_VERSION))
1757			goto cont;
1758			msg_dbg(msg,"<REC<");
1759
1760		if (unlikely(msg_non_seq(msg))) {
1761			link_recv_non_seq(buf);
1762			continue;
1763		}
1764
1765		if (unlikely(!msg_short(msg) &&
1766			     (msg_destnode(msg) != tipc_own_addr)))
1767			goto cont;
1768
1769		n_ptr = tipc_node_find(msg_prevnode(msg));
1770		if (unlikely(!n_ptr))
1771			goto cont;
1772
1773		tipc_node_lock(n_ptr);
1774		l_ptr = n_ptr->links[b_ptr->identity];
1775		if (unlikely(!l_ptr)) {
1776			tipc_node_unlock(n_ptr);
1777			goto cont;
1778		}
1779		/*
1780		 * Release acked messages
1781		 */
1782		if (less(n_ptr->bclink.acked, msg_bcast_ack(msg))) {
1783			if (tipc_node_is_up(n_ptr) && n_ptr->bclink.supported)
1784				tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1785		}
1786
1787		crs = l_ptr->first_out;
1788		while ((crs != l_ptr->next_out) &&
1789		       less_eq(msg_seqno(buf_msg(crs)), ackd)) {
1790			struct sk_buff *next = crs->next;
1791
1792			buf_discard(crs);
1793			crs = next;
1794			released++;
1795		}
1796		if (released) {
1797			l_ptr->first_out = crs;
1798			l_ptr->out_queue_size -= released;
1799		}
1800		if (unlikely(l_ptr->next_out))
1801			tipc_link_push_queue(l_ptr);
1802		if (unlikely(!list_empty(&l_ptr->waiting_ports)))
1803			tipc_link_wakeup_ports(l_ptr, 0);
1804		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1805			l_ptr->stats.sent_acks++;
1806			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1807		}
1808
1809protocol_check:
1810		if (likely(link_working_working(l_ptr))) {
1811			if (likely(seq_no == mod(l_ptr->next_in_no))) {
1812				l_ptr->next_in_no++;
1813				if (unlikely(l_ptr->oldest_deferred_in))
1814					head = link_insert_deferred_queue(l_ptr,
1815									  head);
1816				if (likely(msg_is_dest(msg, tipc_own_addr))) {
1817deliver:
1818					if (likely(msg_isdata(msg))) {
1819						tipc_node_unlock(n_ptr);
1820						tipc_port_recv_msg(buf);
1821						continue;
1822					}
1823					switch (msg_user(msg)) {
1824					case MSG_BUNDLER:
1825						l_ptr->stats.recv_bundles++;
1826						l_ptr->stats.recv_bundled +=
1827							msg_msgcnt(msg);
1828						tipc_node_unlock(n_ptr);
1829						tipc_link_recv_bundle(buf);
1830						continue;
1831					case ROUTE_DISTRIBUTOR:
1832						tipc_node_unlock(n_ptr);
1833						tipc_cltr_recv_routing_table(buf);
1834						continue;
1835					case NAME_DISTRIBUTOR:
1836						tipc_node_unlock(n_ptr);
1837						tipc_named_recv(buf);
1838						continue;
1839					case CONN_MANAGER:
1840						tipc_node_unlock(n_ptr);
1841						tipc_port_recv_proto_msg(buf);
1842						continue;
1843					case MSG_FRAGMENTER:
1844						l_ptr->stats.recv_fragments++;
1845						if (tipc_link_recv_fragment(&l_ptr->defragm_buf,
1846									    &buf, &msg)) {
1847							l_ptr->stats.recv_fragmented++;
1848							goto deliver;
1849						}
1850						break;
1851					case CHANGEOVER_PROTOCOL:
1852						type = msg_type(msg);
1853						if (link_recv_changeover_msg(&l_ptr, &buf)) {
1854							msg = buf_msg(buf);
1855							seq_no = msg_seqno(msg);
1856							TIPC_SKB_CB(buf)->handle
1857								= b_ptr;
1858							if (type == ORIGINAL_MSG)
1859								goto deliver;
1860							goto protocol_check;
1861						}
1862						break;
1863					}
1864				}
1865				tipc_node_unlock(n_ptr);
1866				tipc_net_route_msg(buf);
1867				continue;
1868			}
1869			link_handle_out_of_seq_msg(l_ptr, buf);
1870			head = link_insert_deferred_queue(l_ptr, head);
1871			tipc_node_unlock(n_ptr);
1872			continue;
1873		}
1874
1875		if (msg_user(msg) == LINK_PROTOCOL) {
1876			link_recv_proto_msg(l_ptr, buf);
1877			head = link_insert_deferred_queue(l_ptr, head);
1878			tipc_node_unlock(n_ptr);
1879			continue;
1880		}
1881		msg_dbg(msg,"NSEQ<REC<");
1882		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1883
1884		if (link_working_working(l_ptr)) {
1885			/* Re-insert in front of queue */
1886			msg_dbg(msg,"RECV-REINS:");
1887			buf->next = head;
1888			head = buf;
1889			tipc_node_unlock(n_ptr);
1890			continue;
1891		}
1892		tipc_node_unlock(n_ptr);
1893cont:
1894		buf_discard(buf);
1895	}
1896	read_unlock_bh(&tipc_net_lock);
1897}
1898
1899/*
1900 * link_defer_buf(): Sort a received out-of-sequence packet
1901 *                   into the deferred reception queue.
1902 * Returns the increase of the queue length,i.e. 0 or 1
1903 */
1904
1905u32 tipc_link_defer_pkt(struct sk_buff **head,
1906			struct sk_buff **tail,
1907			struct sk_buff *buf)
1908{
1909	struct sk_buff *prev = NULL;
1910	struct sk_buff *crs = *head;
1911	u32 seq_no = msg_seqno(buf_msg(buf));
1912
1913	buf->next = NULL;
1914
1915	/* Empty queue ? */
1916	if (*head == NULL) {
1917		*head = *tail = buf;
1918		return 1;
1919	}
1920
1921	/* Last ? */
1922	if (less(msg_seqno(buf_msg(*tail)), seq_no)) {
1923		(*tail)->next = buf;
1924		*tail = buf;
1925		return 1;
1926	}
1927
1928	/* Scan through queue and sort it in */
1929	do {
1930		struct tipc_msg *msg = buf_msg(crs);
1931
1932		if (less(seq_no, msg_seqno(msg))) {
1933			buf->next = crs;
1934			if (prev)
1935				prev->next = buf;
1936			else
1937				*head = buf;
1938			return 1;
1939		}
1940		if (seq_no == msg_seqno(msg)) {
1941			break;
1942		}
1943		prev = crs;
1944		crs = crs->next;
1945	}
1946	while (crs);
1947
1948	/* Message is a duplicate of an existing message */
1949
1950	buf_discard(buf);
1951	return 0;
1952}
1953
1954/**
1955 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1956 */
1957
1958static void link_handle_out_of_seq_msg(struct link *l_ptr,
1959				       struct sk_buff *buf)
1960{
1961	u32 seq_no = msg_seqno(buf_msg(buf));
1962
1963	if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1964		link_recv_proto_msg(l_ptr, buf);
1965		return;
1966	}
1967
1968	dbg("rx OOS msg: seq_no %u, expecting %u (%u)\n",
1969	    seq_no, mod(l_ptr->next_in_no), l_ptr->next_in_no);
1970
1971	/* Record OOS packet arrival (force mismatch on next timeout) */
1972
1973	l_ptr->checkpoint--;
1974
1975	/*
1976	 * Discard packet if a duplicate; otherwise add it to deferred queue
1977	 * and notify peer of gap as per protocol specification
1978	 */
1979
1980	if (less(seq_no, mod(l_ptr->next_in_no))) {
1981		l_ptr->stats.duplicates++;
1982		buf_discard(buf);
1983		return;
1984	}
1985
1986	if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in,
1987				&l_ptr->newest_deferred_in, buf)) {
1988		l_ptr->deferred_inqueue_sz++;
1989		l_ptr->stats.deferred_recv++;
1990		if ((l_ptr->deferred_inqueue_sz % 16) == 1)
1991			tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1992	} else
1993		l_ptr->stats.duplicates++;
1994}
1995
1996/*
1997 * Send protocol message to the other endpoint.
1998 */
1999void tipc_link_send_proto_msg(struct link *l_ptr, u32 msg_typ, int probe_msg,
2000			      u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
2001{
2002	struct sk_buff *buf = NULL;
2003	struct tipc_msg *msg = l_ptr->pmsg;
2004	u32 msg_size = sizeof(l_ptr->proto_msg);
2005
2006	if (link_blocked(l_ptr))
2007		return;
2008	msg_set_type(msg, msg_typ);
2009	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
2010	msg_set_bcast_ack(msg, mod(l_ptr->owner->bclink.last_in));
2011	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
2012
2013	if (msg_typ == STATE_MSG) {
2014		u32 next_sent = mod(l_ptr->next_out_no);
2015
2016		if (!tipc_link_is_up(l_ptr))
2017			return;
2018		if (l_ptr->next_out)
2019			next_sent = msg_seqno(buf_msg(l_ptr->next_out));
2020		msg_set_next_sent(msg, next_sent);
2021		if (l_ptr->oldest_deferred_in) {
2022			u32 rec = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
2023			gap = mod(rec - mod(l_ptr->next_in_no));
2024		}
2025		msg_set_seq_gap(msg, gap);
2026		if (gap)
2027			l_ptr->stats.sent_nacks++;
2028		msg_set_link_tolerance(msg, tolerance);
2029		msg_set_linkprio(msg, priority);
2030		msg_set_max_pkt(msg, ack_mtu);
2031		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
2032		msg_set_probe(msg, probe_msg != 0);
2033		if (probe_msg) {
2034			u32 mtu = l_ptr->max_pkt;
2035
2036			if ((mtu < l_ptr->max_pkt_target) &&
2037			    link_working_working(l_ptr) &&
2038			    l_ptr->fsm_msg_cnt) {
2039				msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2040				if (l_ptr->max_pkt_probes == 10) {
2041					l_ptr->max_pkt_target = (msg_size - 4);
2042					l_ptr->max_pkt_probes = 0;
2043					msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
2044				}
2045				l_ptr->max_pkt_probes++;
2046			}
2047
2048			l_ptr->stats.sent_probes++;
2049		}
2050		l_ptr->stats.sent_states++;
2051	} else {		/* RESET_MSG or ACTIVATE_MSG */
2052		msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
2053		msg_set_seq_gap(msg, 0);
2054		msg_set_next_sent(msg, 1);
2055		msg_set_link_tolerance(msg, l_ptr->tolerance);
2056		msg_set_linkprio(msg, l_ptr->priority);
2057		msg_set_max_pkt(msg, l_ptr->max_pkt_target);
2058	}
2059
2060	if (tipc_node_has_redundant_links(l_ptr->owner)) {
2061		msg_set_redundant_link(msg);
2062	} else {
2063		msg_clear_redundant_link(msg);
2064	}
2065	msg_set_linkprio(msg, l_ptr->priority);
2066
2067	/* Ensure sequence number will not fit : */
2068
2069	msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
2070
2071	/* Congestion? */
2072
2073	if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) {
2074		if (!l_ptr->proto_msg_queue) {
2075			l_ptr->proto_msg_queue =
2076				buf_acquire(sizeof(l_ptr->proto_msg));
2077		}
2078		buf = l_ptr->proto_msg_queue;
2079		if (!buf)
2080			return;
2081		skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2082		return;
2083	}
2084	msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
2085
2086	/* Message can be sent */
2087
2088	msg_dbg(msg, ">>");
2089
2090	buf = buf_acquire(msg_size);
2091	if (!buf)
2092		return;
2093
2094	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
2095	msg_set_size(buf_msg(buf), msg_size);
2096
2097	if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
2098		l_ptr->unacked_window = 0;
2099		buf_discard(buf);
2100		return;
2101	}
2102
2103	/* New congestion */
2104	tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
2105	l_ptr->proto_msg_queue = buf;
2106	l_ptr->stats.bearer_congs++;
2107}
2108
2109/*
2110 * Receive protocol message :
2111 * Note that network plane id propagates through the network, and may
2112 * change at any time. The node with lowest address rules
2113 */
2114
2115static void link_recv_proto_msg(struct link *l_ptr, struct sk_buff *buf)
2116{
2117	u32 rec_gap = 0;
2118	u32 max_pkt_info;
2119	u32 max_pkt_ack;
2120	u32 msg_tol;
2121	struct tipc_msg *msg = buf_msg(buf);
2122
2123	dbg("AT(%u):", jiffies_to_msecs(jiffies));
2124	msg_dbg(msg, "<<");
2125	if (link_blocked(l_ptr))
2126		goto exit;
2127
2128	/* record unnumbered packet arrival (force mismatch on next timeout) */
2129
2130	l_ptr->checkpoint--;
2131
2132	if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
2133		if (tipc_own_addr > msg_prevnode(msg))
2134			l_ptr->b_ptr->net_plane = msg_net_plane(msg);
2135
2136	l_ptr->owner->permit_changeover = msg_redundant_link(msg);
2137
2138	switch (msg_type(msg)) {
2139
2140	case RESET_MSG:
2141		if (!link_working_unknown(l_ptr) && l_ptr->peer_session) {
2142			if (msg_session(msg) == l_ptr->peer_session) {
2143				dbg("Duplicate RESET: %u<->%u\n",
2144				    msg_session(msg), l_ptr->peer_session);
2145				break; /* duplicate: ignore */
2146			}
2147		}
2148		/* fall thru' */
2149	case ACTIVATE_MSG:
2150		/* Update link settings according other endpoint's values */
2151
2152		strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
2153
2154		if ((msg_tol = msg_link_tolerance(msg)) &&
2155		    (msg_tol > l_ptr->tolerance))
2156			link_set_supervision_props(l_ptr, msg_tol);
2157
2158		if (msg_linkprio(msg) > l_ptr->priority)
2159			l_ptr->priority = msg_linkprio(msg);
2160
2161		max_pkt_info = msg_max_pkt(msg);
2162		if (max_pkt_info) {
2163			if (max_pkt_info < l_ptr->max_pkt_target)
2164				l_ptr->max_pkt_target = max_pkt_info;
2165			if (l_ptr->max_pkt > l_ptr->max_pkt_target)
2166				l_ptr->max_pkt = l_ptr->max_pkt_target;
2167		} else {
2168			l_ptr->max_pkt = l_ptr->max_pkt_target;
2169		}
2170		l_ptr->owner->bclink.supported = (max_pkt_info != 0);
2171
2172		link_state_event(l_ptr, msg_type(msg));
2173
2174		l_ptr->peer_session = msg_session(msg);
2175		l_ptr->peer_bearer_id = msg_bearer_id(msg);
2176
2177		/* Synchronize broadcast sequence numbers */
2178		if (!tipc_node_has_redundant_links(l_ptr->owner)) {
2179			l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
2180		}
2181		break;
2182	case STATE_MSG:
2183
2184		if ((msg_tol = msg_link_tolerance(msg)))
2185			link_set_supervision_props(l_ptr, msg_tol);
2186
2187		if (msg_linkprio(msg) &&
2188		    (msg_linkprio(msg) != l_ptr->priority)) {
2189			warn("Resetting link <%s>, priority change %u->%u\n",
2190			     l_ptr->name, l_ptr->priority, msg_linkprio(msg));
2191			l_ptr->priority = msg_linkprio(msg);
2192			tipc_link_reset(l_ptr); /* Enforce change to take effect */
2193			break;
2194		}
2195		link_state_event(l_ptr, TRAFFIC_MSG_EVT);
2196		l_ptr->stats.recv_states++;
2197		if (link_reset_unknown(l_ptr))
2198			break;
2199
2200		if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
2201			rec_gap = mod(msg_next_sent(msg) -
2202				      mod(l_ptr->next_in_no));
2203		}
2204
2205		max_pkt_ack = msg_max_pkt(msg);
2206		if (max_pkt_ack > l_ptr->max_pkt) {
2207			dbg("Link <%s> updated MTU %u -> %u\n",
2208			    l_ptr->name, l_ptr->max_pkt, max_pkt_ack);
2209			l_ptr->max_pkt = max_pkt_ack;
2210			l_ptr->max_pkt_probes = 0;
2211		}
2212
2213		max_pkt_ack = 0;
2214		if (msg_probe(msg)) {
2215			l_ptr->stats.recv_probes++;
2216			if (msg_size(msg) > sizeof(l_ptr->proto_msg)) {
2217				max_pkt_ack = msg_size(msg);
2218			}
2219		}
2220
2221		/* Protocol message before retransmits, reduce loss risk */
2222
2223		tipc_bclink_check_gap(l_ptr->owner, msg_last_bcast(msg));
2224
2225		if (rec_gap || (msg_probe(msg))) {
2226			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2227						 0, rec_gap, 0, 0, max_pkt_ack);
2228		}
2229		if (msg_seq_gap(msg)) {
2230			msg_dbg(msg, "With Gap:");
2231			l_ptr->stats.recv_nacks++;
2232			tipc_link_retransmit(l_ptr, l_ptr->first_out,
2233					     msg_seq_gap(msg));
2234		}
2235		break;
2236	default:
2237		msg_dbg(buf_msg(buf), "<DISCARDING UNKNOWN<");
2238	}
2239exit:
2240	buf_discard(buf);
2241}
2242
2243
2244/*
2245 * tipc_link_tunnel(): Send one message via a link belonging to
2246 * another bearer. Owner node is locked.
2247 */
2248void tipc_link_tunnel(struct link *l_ptr,
2249		      struct tipc_msg *tunnel_hdr,
2250		      struct tipc_msg  *msg,
2251		      u32 selector)
2252{
2253	struct link *tunnel;
2254	struct sk_buff *buf;
2255	u32 length = msg_size(msg);
2256
2257	tunnel = l_ptr->owner->active_links[selector & 1];
2258	if (!tipc_link_is_up(tunnel)) {
2259		warn("Link changeover error, "
2260		     "tunnel link no longer available\n");
2261		return;
2262	}
2263	msg_set_size(tunnel_hdr, length + INT_H_SIZE);
2264	buf = buf_acquire(length + INT_H_SIZE);
2265	if (!buf) {
2266		warn("Link changeover error, "
2267		     "unable to send tunnel msg\n");
2268		return;
2269	}
2270	skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
2271	skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
2272	dbg("%c->%c:", l_ptr->b_ptr->net_plane, tunnel->b_ptr->net_plane);
2273	msg_dbg(buf_msg(buf), ">SEND>");
2274	tipc_link_send_buf(tunnel, buf);
2275}
2276
2277
2278
2279/*
2280 * changeover(): Send whole message queue via the remaining link
2281 *               Owner node is locked.
2282 */
2283
2284void tipc_link_changeover(struct link *l_ptr)
2285{
2286	u32 msgcount = l_ptr->out_queue_size;
2287	struct sk_buff *crs = l_ptr->first_out;
2288	struct link *tunnel = l_ptr->owner->active_links[0];
2289	struct tipc_msg tunnel_hdr;
2290	int split_bundles;
2291
2292	if (!tunnel)
2293		return;
2294
2295	if (!l_ptr->owner->permit_changeover) {
2296		warn("Link changeover error, "
2297		     "peer did not permit changeover\n");
2298		return;
2299	}
2300
2301	msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2302		 ORIGINAL_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr);
2303	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2304	msg_set_msgcnt(&tunnel_hdr, msgcount);
2305	dbg("Link changeover requires %u tunnel messages\n", msgcount);
2306
2307	if (!l_ptr->first_out) {
2308		struct sk_buff *buf;
2309
2310		buf = buf_acquire(INT_H_SIZE);
2311		if (buf) {
2312			skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
2313			msg_set_size(&tunnel_hdr, INT_H_SIZE);
2314			dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2315			    tunnel->b_ptr->net_plane);
2316			msg_dbg(&tunnel_hdr, "EMPTY>SEND>");
2317			tipc_link_send_buf(tunnel, buf);
2318		} else {
2319			warn("Link changeover error, "
2320			     "unable to send changeover msg\n");
2321		}
2322		return;
2323	}
2324
2325	split_bundles = (l_ptr->owner->active_links[0] !=
2326			 l_ptr->owner->active_links[1]);
2327
2328	while (crs) {
2329		struct tipc_msg *msg = buf_msg(crs);
2330
2331		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
2332			u32 msgcount = msg_msgcnt(msg);
2333			struct tipc_msg *m = msg_get_wrapped(msg);
2334			unchar* pos = (unchar*)m;
2335
2336			while (msgcount--) {
2337				msg_set_seqno(m,msg_seqno(msg));
2338				tipc_link_tunnel(l_ptr, &tunnel_hdr, m,
2339						 msg_link_selector(m));
2340				pos += align(msg_size(m));
2341				m = (struct tipc_msg *)pos;
2342			}
2343		} else {
2344			tipc_link_tunnel(l_ptr, &tunnel_hdr, msg,
2345					 msg_link_selector(msg));
2346		}
2347		crs = crs->next;
2348	}
2349}
2350
2351void tipc_link_send_duplicate(struct link *l_ptr, struct link *tunnel)
2352{
2353	struct sk_buff *iter;
2354	struct tipc_msg tunnel_hdr;
2355
2356	msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
2357		 DUPLICATE_MSG, TIPC_OK, INT_H_SIZE, l_ptr->addr);
2358	msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size);
2359	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
2360	iter = l_ptr->first_out;
2361	while (iter) {
2362		struct sk_buff *outbuf;
2363		struct tipc_msg *msg = buf_msg(iter);
2364		u32 length = msg_size(msg);
2365
2366		if (msg_user(msg) == MSG_BUNDLER)
2367			msg_set_type(msg, CLOSED_MSG);
2368		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));	/* Update */
2369		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
2370		msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
2371		outbuf = buf_acquire(length + INT_H_SIZE);
2372		if (outbuf == NULL) {
2373			warn("Link changeover error, "
2374			     "unable to send duplicate msg\n");
2375			return;
2376		}
2377		skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
2378		skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
2379					       length);
2380		dbg("%c->%c:", l_ptr->b_ptr->net_plane,
2381		    tunnel->b_ptr->net_plane);
2382		msg_dbg(buf_msg(outbuf), ">SEND>");
2383		tipc_link_send_buf(tunnel, outbuf);
2384		if (!tipc_link_is_up(l_ptr))
2385			return;
2386		iter = iter->next;
2387	}
2388}
2389
2390
2391
2392/**
2393 * buf_extract - extracts embedded TIPC message from another message
2394 * @skb: encapsulating message buffer
2395 * @from_pos: offset to extract from
2396 *
2397 * Returns a new message buffer containing an embedded message.  The
2398 * encapsulating message itself is left unchanged.
2399 */
2400
2401static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
2402{
2403	struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
2404	u32 size = msg_size(msg);
2405	struct sk_buff *eb;
2406
2407	eb = buf_acquire(size);
2408	if (eb)
2409		skb_copy_to_linear_data(eb, msg, size);
2410	return eb;
2411}
2412
2413/*
2414 *  link_recv_changeover_msg(): Receive tunneled packet sent
2415 *  via other link. Node is locked. Return extracted buffer.
2416 */
2417
2418static int link_recv_changeover_msg(struct link **l_ptr,
2419				    struct sk_buff **buf)
2420{
2421	struct sk_buff *tunnel_buf = *buf;
2422	struct link *dest_link;
2423	struct tipc_msg *msg;
2424	struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
2425	u32 msg_typ = msg_type(tunnel_msg);
2426	u32 msg_count = msg_msgcnt(tunnel_msg);
2427
2428	dest_link = (*l_ptr)->owner->links[msg_bearer_id(tunnel_msg)];
2429	if (!dest_link) {
2430		msg_dbg(tunnel_msg, "NOLINK/<REC<");
2431		goto exit;
2432	}
2433	if (dest_link == *l_ptr) {
2434		err("Unexpected changeover message on link <%s>\n",
2435		    (*l_ptr)->name);
2436		goto exit;
2437	}
2438	dbg("%c<-%c:", dest_link->b_ptr->net_plane,
2439	    (*l_ptr)->b_ptr->net_plane);
2440	*l_ptr = dest_link;
2441	msg = msg_get_wrapped(tunnel_msg);
2442
2443	if (msg_typ == DUPLICATE_MSG) {
2444		if (less(msg_seqno(msg), mod(dest_link->next_in_no))) {
2445			msg_dbg(tunnel_msg, "DROP/<REC<");
2446			goto exit;
2447		}
2448		*buf = buf_extract(tunnel_buf,INT_H_SIZE);
2449		if (*buf == NULL) {
2450			warn("Link changeover error, duplicate msg dropped\n");
2451			goto exit;
2452		}
2453		msg_dbg(tunnel_msg, "TNL<REC<");
2454		buf_discard(tunnel_buf);
2455		return 1;
2456	}
2457
2458	/* First original message ?: */
2459
2460	if (tipc_link_is_up(dest_link)) {
2461		msg_dbg(tunnel_msg, "UP/FIRST/<REC<");
2462		info("Resetting link <%s>, changeover initiated by peer\n",
2463		     dest_link->name);
2464		tipc_link_reset(dest_link);
2465		dest_link->exp_msg_count = msg_count;
2466		dbg("Expecting %u tunnelled messages\n", msg_count);
2467		if (!msg_count)
2468			goto exit;
2469	} else if (dest_link->exp_msg_count == START_CHANGEOVER) {
2470		msg_dbg(tunnel_msg, "BLK/FIRST/<REC<");
2471		dest_link->exp_msg_count = msg_count;
2472		dbg("Expecting %u tunnelled messages\n", msg_count);
2473		if (!msg_count)
2474			goto exit;
2475	}
2476
2477	/* Receive original message */
2478
2479	if (dest_link->exp_msg_count == 0) {
2480		warn("Link switchover error, "
2481		     "got too many tunnelled messages\n");
2482		msg_dbg(tunnel_msg, "OVERDUE/DROP/<REC<");
2483		dbg_print_link(dest_link, "LINK:");
2484		goto exit;
2485	}
2486	dest_link->exp_msg_count--;
2487	if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
2488		msg_dbg(tunnel_msg, "DROP/DUPL/<REC<");
2489		goto exit;
2490	} else {
2491		*buf = buf_extract(tunnel_buf, INT_H_SIZE);
2492		if (*buf != NULL) {
2493			msg_dbg(tunnel_msg, "TNL<REC<");
2494			buf_discard(tunnel_buf);
2495			return 1;
2496		} else {
2497			warn("Link changeover error, original msg dropped\n");
2498		}
2499	}
2500exit:
2501	*buf = NULL;
2502	buf_discard(tunnel_buf);
2503	return 0;
2504}
2505
2506/*
2507 *  Bundler functionality:
2508 */
2509void tipc_link_recv_bundle(struct sk_buff *buf)
2510{
2511	u32 msgcount = msg_msgcnt(buf_msg(buf));
2512	u32 pos = INT_H_SIZE;
2513	struct sk_buff *obuf;
2514
2515	msg_dbg(buf_msg(buf), "<BNDL<: ");
2516	while (msgcount--) {
2517		obuf = buf_extract(buf, pos);
2518		if (obuf == NULL) {
2519			warn("Link unable to unbundle message(s)\n");
2520			break;
2521		}
2522		pos += align(msg_size(buf_msg(obuf)));
2523		msg_dbg(buf_msg(obuf), "     /");
2524		tipc_net_route_msg(obuf);
2525	}
2526	buf_discard(buf);
2527}
2528
2529/*
2530 *  Fragmentation/defragmentation:
2531 */
2532
2533
2534/*
2535 * tipc_link_send_long_buf: Entry for buffers needing fragmentation.
2536 * The buffer is complete, inclusive total message length.
2537 * Returns user data length.
2538 */
2539int tipc_link_send_long_buf(struct link *l_ptr, struct sk_buff *buf)
2540{
2541	struct tipc_msg *inmsg = buf_msg(buf);
2542	struct tipc_msg fragm_hdr;
2543	u32 insize = msg_size(inmsg);
2544	u32 dsz = msg_data_sz(inmsg);
2545	unchar *crs = buf->data;
2546	u32 rest = insize;
2547	u32 pack_sz = link_max_pkt(l_ptr);
2548	u32 fragm_sz = pack_sz - INT_H_SIZE;
2549	u32 fragm_no = 1;
2550	u32 destaddr = msg_destnode(inmsg);
2551
2552	if (msg_short(inmsg))
2553		destaddr = l_ptr->addr;
2554
2555	if (msg_routed(inmsg))
2556		msg_set_prevnode(inmsg, tipc_own_addr);
2557
2558	/* Prepare reusable fragment header: */
2559
2560	msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
2561		 TIPC_OK, INT_H_SIZE, destaddr);
2562	msg_set_link_selector(&fragm_hdr, msg_link_selector(inmsg));
2563	msg_set_long_msgno(&fragm_hdr, mod(l_ptr->long_msg_seq_no++));
2564	msg_set_fragm_no(&fragm_hdr, fragm_no);
2565	l_ptr->stats.sent_fragmented++;
2566
2567	/* Chop up message: */
2568
2569	while (rest > 0) {
2570		struct sk_buff *fragm;
2571
2572		if (rest <= fragm_sz) {
2573			fragm_sz = rest;
2574			msg_set_type(&fragm_hdr, LAST_FRAGMENT);
2575		}
2576		fragm = buf_acquire(fragm_sz + INT_H_SIZE);
2577		if (fragm == NULL) {
2578			warn("Link unable to fragment message\n");
2579			dsz = -ENOMEM;
2580			goto exit;
2581		}
2582		msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
2583		skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
2584		skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
2585					       fragm_sz);
2586		/*  Send queued messages first, if any: */
2587
2588		l_ptr->stats.sent_fragments++;
2589		tipc_link_send_buf(l_ptr, fragm);
2590		if (!tipc_link_is_up(l_ptr))
2591			return dsz;
2592		msg_set_fragm_no(&fragm_hdr, ++fragm_no);
2593		rest -= fragm_sz;
2594		crs += fragm_sz;
2595		msg_set_type(&fragm_hdr, FRAGMENT);
2596	}
2597exit:
2598	buf_discard(buf);
2599	return dsz;
2600}
2601
2602/*
2603 * A pending message being re-assembled must store certain values
2604 * to handle subsequent fragments correctly. The following functions
2605 * help storing these values in unused, available fields in the
2606 * pending message. This makes dynamic memory allocation unecessary.
2607 */
2608
2609static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2610{
2611	msg_set_seqno(buf_msg(buf), seqno);
2612}
2613
2614static u32 get_fragm_size(struct sk_buff *buf)
2615{
2616	return msg_ack(buf_msg(buf));
2617}
2618
2619static void set_fragm_size(struct sk_buff *buf, u32 sz)
2620{
2621	msg_set_ack(buf_msg(buf), sz);
2622}
2623
2624static u32 get_expected_frags(struct sk_buff *buf)
2625{
2626	return msg_bcast_ack(buf_msg(buf));
2627}
2628
2629static void set_expected_frags(struct sk_buff *buf, u32 exp)
2630{
2631	msg_set_bcast_ack(buf_msg(buf), exp);
2632}
2633
2634static u32 get_timer_cnt(struct sk_buff *buf)
2635{
2636	return msg_reroute_cnt(buf_msg(buf));
2637}
2638
2639static void incr_timer_cnt(struct sk_buff *buf)
2640{
2641	msg_incr_reroute_cnt(buf_msg(buf));
2642}
2643
2644/*
2645 * tipc_link_recv_fragment(): Called with node lock on. Returns
2646 * the reassembled buffer if message is complete.
2647 */
2648int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2649			    struct tipc_msg **m)
2650{
2651	struct sk_buff *prev = NULL;
2652	struct sk_buff *fbuf = *fb;
2653	struct tipc_msg *fragm = buf_msg(fbuf);
2654	struct sk_buff *pbuf = *pending;
2655	u32 long_msg_seq_no = msg_long_msgno(fragm);
2656
2657	*fb = NULL;
2658	msg_dbg(fragm,"FRG<REC<");
2659
2660	/* Is there an incomplete message waiting for this fragment? */
2661
2662	while (pbuf && ((msg_seqno(buf_msg(pbuf)) != long_msg_seq_no)
2663			|| (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) {
2664		prev = pbuf;
2665		pbuf = pbuf->next;
2666	}
2667
2668	if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2669		struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2670		u32 msg_sz = msg_size(imsg);
2671		u32 fragm_sz = msg_data_sz(fragm);
2672		u32 exp_fragm_cnt = msg_sz/fragm_sz + !!(msg_sz % fragm_sz);
2673		u32 max =  TIPC_MAX_USER_MSG_SIZE + LONG_H_SIZE;
2674		if (msg_type(imsg) == TIPC_MCAST_MSG)
2675			max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2676		if (msg_size(imsg) > max) {
2677			msg_dbg(fragm,"<REC<Oversized: ");
2678			buf_discard(fbuf);
2679			return 0;
2680		}
2681		pbuf = buf_acquire(msg_size(imsg));
2682		if (pbuf != NULL) {
2683			pbuf->next = *pending;
2684			*pending = pbuf;
2685			skb_copy_to_linear_data(pbuf, imsg,
2686						msg_data_sz(fragm));
2687			/*  Prepare buffer for subsequent fragments. */
2688
2689			set_long_msg_seqno(pbuf, long_msg_seq_no);
2690			set_fragm_size(pbuf,fragm_sz);
2691			set_expected_frags(pbuf,exp_fragm_cnt - 1);
2692		} else {
2693			warn("Link unable to reassemble fragmented message\n");
2694		}
2695		buf_discard(fbuf);
2696		return 0;
2697	} else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2698		u32 dsz = msg_data_sz(fragm);
2699		u32 fsz = get_fragm_size(pbuf);
2700		u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2701		u32 exp_frags = get_expected_frags(pbuf) - 1;
2702		skb_copy_to_linear_data_offset(pbuf, crs,
2703					       msg_data(fragm), dsz);
2704		buf_discard(fbuf);
2705
2706		/* Is message complete? */
2707
2708		if (exp_frags == 0) {
2709			if (prev)
2710				prev->next = pbuf->next;
2711			else
2712				*pending = pbuf->next;
2713			msg_reset_reroute_cnt(buf_msg(pbuf));
2714			*fb = pbuf;
2715			*m = buf_msg(pbuf);
2716			return 1;
2717		}
2718		set_expected_frags(pbuf,exp_frags);
2719		return 0;
2720	}
2721	dbg(" Discarding orphan fragment %x\n",fbuf);
2722	msg_dbg(fragm,"ORPHAN:");
2723	dbg("Pending long buffers:\n");
2724	dbg_print_buf_chain(*pending);
2725	buf_discard(fbuf);
2726	return 0;
2727}
2728
2729/**
2730 * link_check_defragm_bufs - flush stale incoming message fragments
2731 * @l_ptr: pointer to link
2732 */
2733
2734static void link_check_defragm_bufs(struct link *l_ptr)
2735{
2736	struct sk_buff *prev = NULL;
2737	struct sk_buff *next = NULL;
2738	struct sk_buff *buf = l_ptr->defragm_buf;
2739
2740	if (!buf)
2741		return;
2742	if (!link_working_working(l_ptr))
2743		return;
2744	while (buf) {
2745		u32 cnt = get_timer_cnt(buf);
2746
2747		next = buf->next;
2748		if (cnt < 4) {
2749			incr_timer_cnt(buf);
2750			prev = buf;
2751		} else {
2752			dbg(" Discarding incomplete long buffer\n");
2753			msg_dbg(buf_msg(buf), "LONG:");
2754			dbg_print_link(l_ptr, "curr:");
2755			dbg("Pending long buffers:\n");
2756			dbg_print_buf_chain(l_ptr->defragm_buf);
2757			if (prev)
2758				prev->next = buf->next;
2759			else
2760				l_ptr->defragm_buf = buf->next;
2761			buf_discard(buf);
2762		}
2763		buf = next;
2764	}
2765}
2766
2767
2768
2769static void link_set_supervision_props(struct link *l_ptr, u32 tolerance)
2770{
2771	l_ptr->tolerance = tolerance;
2772	l_ptr->continuity_interval =
2773		((tolerance / 4) > 500) ? 500 : tolerance / 4;
2774	l_ptr->abort_limit = tolerance / (l_ptr->continuity_interval / 4);
2775}
2776
2777
2778void tipc_link_set_queue_limits(struct link *l_ptr, u32 window)
2779{
2780	/* Data messages from this node, inclusive FIRST_FRAGM */
2781	l_ptr->queue_limit[DATA_LOW] = window;
2782	l_ptr->queue_limit[DATA_MEDIUM] = (window / 3) * 4;
2783	l_ptr->queue_limit[DATA_HIGH] = (window / 3) * 5;
2784	l_ptr->queue_limit[DATA_CRITICAL] = (window / 3) * 6;
2785	/* Transiting data messages,inclusive FIRST_FRAGM */
2786	l_ptr->queue_limit[DATA_LOW + 4] = 300;
2787	l_ptr->queue_limit[DATA_MEDIUM + 4] = 600;
2788	l_ptr->queue_limit[DATA_HIGH + 4] = 900;
2789	l_ptr->queue_limit[DATA_CRITICAL + 4] = 1200;
2790	l_ptr->queue_limit[CONN_MANAGER] = 1200;
2791	l_ptr->queue_limit[ROUTE_DISTRIBUTOR] = 1200;
2792	l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
2793	l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
2794	/* FRAGMENT and LAST_FRAGMENT packets */
2795	l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
2796}
2797
2798/**
2799 * link_find_link - locate link by name
2800 * @name - ptr to link name string
2801 * @node - ptr to area to be filled with ptr to associated node
2802 *
2803 * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
2804 * this also prevents link deletion.
2805 *
2806 * Returns pointer to link (or 0 if invalid link name).
2807 */
2808
2809static struct link *link_find_link(const char *name, struct node **node)
2810{
2811	struct link_name link_name_parts;
2812	struct bearer *b_ptr;
2813	struct link *l_ptr;
2814
2815	if (!link_name_validate(name, &link_name_parts))
2816		return NULL;
2817
2818	b_ptr = tipc_bearer_find_interface(link_name_parts.if_local);
2819	if (!b_ptr)
2820		return NULL;
2821
2822	*node = tipc_node_find(link_name_parts.addr_peer);
2823	if (!*node)
2824		return NULL;
2825
2826	l_ptr = (*node)->links[b_ptr->identity];
2827	if (!l_ptr || strcmp(l_ptr->name, name))
2828		return NULL;
2829
2830	return l_ptr;
2831}
2832
2833struct sk_buff *tipc_link_cmd_config(const void *req_tlv_area, int req_tlv_space,
2834				     u16 cmd)
2835{
2836	struct tipc_link_config *args;
2837	u32 new_value;
2838	struct link *l_ptr;
2839	struct node *node;
2840	int res;
2841
2842	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_CONFIG))
2843		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2844
2845	args = (struct tipc_link_config *)TLV_DATA(req_tlv_area);
2846	new_value = ntohl(args->value);
2847
2848	if (!strcmp(args->name, tipc_bclink_name)) {
2849		if ((cmd == TIPC_CMD_SET_LINK_WINDOW) &&
2850		    (tipc_bclink_set_queue_limits(new_value) == 0))
2851			return tipc_cfg_reply_none();
2852		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
2853						   " (cannot change setting on broadcast link)");
2854	}
2855
2856	read_lock_bh(&tipc_net_lock);
2857	l_ptr = link_find_link(args->name, &node);
2858	if (!l_ptr) {
2859		read_unlock_bh(&tipc_net_lock);
2860		return tipc_cfg_reply_error_string("link not found");
2861	}
2862
2863	tipc_node_lock(node);
2864	res = -EINVAL;
2865	switch (cmd) {
2866	case TIPC_CMD_SET_LINK_TOL:
2867		if ((new_value >= TIPC_MIN_LINK_TOL) &&
2868		    (new_value <= TIPC_MAX_LINK_TOL)) {
2869			link_set_supervision_props(l_ptr, new_value);
2870			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2871						 0, 0, new_value, 0, 0);
2872			res = TIPC_OK;
2873		}
2874		break;
2875	case TIPC_CMD_SET_LINK_PRI:
2876		if ((new_value >= TIPC_MIN_LINK_PRI) &&
2877		    (new_value <= TIPC_MAX_LINK_PRI)) {
2878			l_ptr->priority = new_value;
2879			tipc_link_send_proto_msg(l_ptr, STATE_MSG,
2880						 0, 0, 0, new_value, 0);
2881			res = TIPC_OK;
2882		}
2883		break;
2884	case TIPC_CMD_SET_LINK_WINDOW:
2885		if ((new_value >= TIPC_MIN_LINK_WIN) &&
2886		    (new_value <= TIPC_MAX_LINK_WIN)) {
2887			tipc_link_set_queue_limits(l_ptr, new_value);
2888			res = TIPC_OK;
2889		}
2890		break;
2891	}
2892	tipc_node_unlock(node);
2893
2894	read_unlock_bh(&tipc_net_lock);
2895	if (res)
2896		return tipc_cfg_reply_error_string("cannot change link setting");
2897
2898	return tipc_cfg_reply_none();
2899}
2900
2901/**
2902 * link_reset_statistics - reset link statistics
2903 * @l_ptr: pointer to link
2904 */
2905
2906static void link_reset_statistics(struct link *l_ptr)
2907{
2908	memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
2909	l_ptr->stats.sent_info = l_ptr->next_out_no;
2910	l_ptr->stats.recv_info = l_ptr->next_in_no;
2911}
2912
2913struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_space)
2914{
2915	char *link_name;
2916	struct link *l_ptr;
2917	struct node *node;
2918
2919	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
2920		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
2921
2922	link_name = (char *)TLV_DATA(req_tlv_area);
2923	if (!strcmp(link_name, tipc_bclink_name)) {
2924		if (tipc_bclink_reset_stats())
2925			return tipc_cfg_reply_error_string("link not found");
2926		return tipc_cfg_reply_none();
2927	}
2928
2929	read_lock_bh(&tipc_net_lock);
2930	l_ptr = link_find_link(link_name, &node);
2931	if (!l_ptr) {
2932		read_unlock_bh(&tipc_net_lock);
2933		return tipc_cfg_reply_error_string("link not found");
2934	}
2935
2936	tipc_node_lock(node);
2937	link_reset_statistics(l_ptr);
2938	tipc_node_unlock(node);
2939	read_unlock_bh(&tipc_net_lock);
2940	return tipc_cfg_reply_none();
2941}
2942
2943/**
2944 * percent - convert count to a percentage of total (rounding up or down)
2945 */
2946
2947static u32 percent(u32 count, u32 total)
2948{
2949	return (count * 100 + (total / 2)) / total;
2950}
2951
2952/**
2953 * tipc_link_stats - print link statistics
2954 * @name: link name
2955 * @buf: print buffer area
2956 * @buf_size: size of print buffer area
2957 *
2958 * Returns length of print buffer data string (or 0 if error)
2959 */
2960
2961static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2962{
2963	struct print_buf pb;
2964	struct link *l_ptr;
2965	struct node *node;
2966	char *status;
2967	u32 profile_total = 0;
2968
2969	if (!strcmp(name, tipc_bclink_name))
2970		return tipc_bclink_stats(buf, buf_size);
2971
2972	tipc_printbuf_init(&pb, buf, buf_size);
2973
2974	read_lock_bh(&tipc_net_lock);
2975	l_ptr = link_find_link(name, &node);
2976	if (!l_ptr) {
2977		read_unlock_bh(&tipc_net_lock);
2978		return 0;
2979	}
2980	tipc_node_lock(node);
2981
2982	if (tipc_link_is_active(l_ptr))
2983		status = "ACTIVE";
2984	else if (tipc_link_is_up(l_ptr))
2985		status = "STANDBY";
2986	else
2987		status = "DEFUNCT";
2988	tipc_printf(&pb, "Link <%s>\n"
2989			 "  %s  MTU:%u  Priority:%u  Tolerance:%u ms"
2990			 "  Window:%u packets\n",
2991		    l_ptr->name, status, link_max_pkt(l_ptr),
2992		    l_ptr->priority, l_ptr->tolerance, l_ptr->queue_limit[0]);
2993	tipc_printf(&pb, "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
2994		    l_ptr->next_in_no - l_ptr->stats.recv_info,
2995		    l_ptr->stats.recv_fragments,
2996		    l_ptr->stats.recv_fragmented,
2997		    l_ptr->stats.recv_bundles,
2998		    l_ptr->stats.recv_bundled);
2999	tipc_printf(&pb, "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
3000		    l_ptr->next_out_no - l_ptr->stats.sent_info,
3001		    l_ptr->stats.sent_fragments,
3002		    l_ptr->stats.sent_fragmented,
3003		    l_ptr->stats.sent_bundles,
3004		    l_ptr->stats.sent_bundled);
3005	profile_total = l_ptr->stats.msg_length_counts;
3006	if (!profile_total)
3007		profile_total = 1;
3008	tipc_printf(&pb, "  TX profile sample:%u packets  average:%u octets\n"
3009			 "  0-64:%u%% -256:%u%% -1024:%u%% -4096:%u%% "
3010			 "-16354:%u%% -32768:%u%% -66000:%u%%\n",
3011		    l_ptr->stats.msg_length_counts,
3012		    l_ptr->stats.msg_lengths_total / profile_total,
3013		    percent(l_ptr->stats.msg_length_profile[0], profile_total),
3014		    percent(l_ptr->stats.msg_length_profile[1], profile_total),
3015		    percent(l_ptr->stats.msg_length_profile[2], profile_total),
3016		    percent(l_ptr->stats.msg_length_profile[3], profile_total),
3017		    percent(l_ptr->stats.msg_length_profile[4], profile_total),
3018		    percent(l_ptr->stats.msg_length_profile[5], profile_total),
3019		    percent(l_ptr->stats.msg_length_profile[6], profile_total));
3020	tipc_printf(&pb, "  RX states:%u probes:%u naks:%u defs:%u dups:%u\n",
3021		    l_ptr->stats.recv_states,
3022		    l_ptr->stats.recv_probes,
3023		    l_ptr->stats.recv_nacks,
3024		    l_ptr->stats.deferred_recv,
3025		    l_ptr->stats.duplicates);
3026	tipc_printf(&pb, "  TX states:%u probes:%u naks:%u acks:%u dups:%u\n",
3027		    l_ptr->stats.sent_states,
3028		    l_ptr->stats.sent_probes,
3029		    l_ptr->stats.sent_nacks,
3030		    l_ptr->stats.sent_acks,
3031		    l_ptr->stats.retransmitted);
3032	tipc_printf(&pb, "  Congestion bearer:%u link:%u  Send queue max:%u avg:%u\n",
3033		    l_ptr->stats.bearer_congs,
3034		    l_ptr->stats.link_congs,
3035		    l_ptr->stats.max_queue_sz,
3036		    l_ptr->stats.queue_sz_counts
3037		    ? (l_ptr->stats.accu_queue_sz / l_ptr->stats.queue_sz_counts)
3038		    : 0);
3039
3040	tipc_node_unlock(node);
3041	read_unlock_bh(&tipc_net_lock);
3042	return tipc_printbuf_validate(&pb);
3043}
3044
3045#define MAX_LINK_STATS_INFO 2000
3046
3047struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area, int req_tlv_space)
3048{
3049	struct sk_buff *buf;
3050	struct tlv_desc *rep_tlv;
3051	int str_len;
3052
3053	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
3054		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
3055
3056	buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_LINK_STATS_INFO));
3057	if (!buf)
3058		return NULL;
3059
3060	rep_tlv = (struct tlv_desc *)buf->data;
3061
3062	str_len = tipc_link_stats((char *)TLV_DATA(req_tlv_area),
3063				  (char *)TLV_DATA(rep_tlv), MAX_LINK_STATS_INFO);
3064	if (!str_len) {
3065		buf_discard(buf);
3066		return tipc_cfg_reply_error_string("link not found");
3067	}
3068
3069	skb_put(buf, TLV_SPACE(str_len));
3070	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
3071
3072	return buf;
3073}
3074
3075
3076/**
3077 * tipc_link_get_max_pkt - get maximum packet size to use when sending to destination
3078 * @dest: network address of destination node
3079 * @selector: used to select from set of active links
3080 *
3081 * If no active link can be found, uses default maximum packet size.
3082 */
3083
3084u32 tipc_link_get_max_pkt(u32 dest, u32 selector)
3085{
3086	struct node *n_ptr;
3087	struct link *l_ptr;
3088	u32 res = MAX_PKT_DEFAULT;
3089
3090	if (dest == tipc_own_addr)
3091		return MAX_MSG_SIZE;
3092
3093	read_lock_bh(&tipc_net_lock);
3094	n_ptr = tipc_node_select(dest, selector);
3095	if (n_ptr) {
3096		tipc_node_lock(n_ptr);
3097		l_ptr = n_ptr->active_links[selector & 1];
3098		if (l_ptr)
3099			res = link_max_pkt(l_ptr);
3100		tipc_node_unlock(n_ptr);
3101	}
3102	read_unlock_bh(&tipc_net_lock);
3103	return res;
3104}
3105
3106
3107static void link_dump_send_queue(struct link *l_ptr)
3108{
3109	if (l_ptr->next_out) {
3110		info("\nContents of unsent queue:\n");
3111		dbg_print_buf_chain(l_ptr->next_out);
3112	}
3113	info("\nContents of send queue:\n");
3114	if (l_ptr->first_out) {
3115		dbg_print_buf_chain(l_ptr->first_out);
3116	}
3117	info("Empty send queue\n");
3118}
3119
3120static void link_print(struct link *l_ptr, struct print_buf *buf,
3121		       const char *str)
3122{
3123	tipc_printf(buf, str);
3124	if (link_reset_reset(l_ptr) || link_reset_unknown(l_ptr))
3125		return;
3126	tipc_printf(buf, "Link %x<%s>:",
3127		    l_ptr->addr, l_ptr->b_ptr->publ.name);
3128	tipc_printf(buf, ": NXO(%u):", mod(l_ptr->next_out_no));
3129	tipc_printf(buf, "NXI(%u):", mod(l_ptr->next_in_no));
3130	tipc_printf(buf, "SQUE");
3131	if (l_ptr->first_out) {
3132		tipc_printf(buf, "[%u..", msg_seqno(buf_msg(l_ptr->first_out)));
3133		if (l_ptr->next_out)
3134			tipc_printf(buf, "%u..",
3135				    msg_seqno(buf_msg(l_ptr->next_out)));
3136		tipc_printf(buf, "%u]",
3137			    msg_seqno(buf_msg
3138				      (l_ptr->last_out)), l_ptr->out_queue_size);
3139		if ((mod(msg_seqno(buf_msg(l_ptr->last_out)) -
3140			 msg_seqno(buf_msg(l_ptr->first_out)))
3141		     != (l_ptr->out_queue_size - 1))
3142		    || (l_ptr->last_out->next != 0)) {
3143			tipc_printf(buf, "\nSend queue inconsistency\n");
3144			tipc_printf(buf, "first_out= %x ", l_ptr->first_out);
3145			tipc_printf(buf, "next_out= %x ", l_ptr->next_out);
3146			tipc_printf(buf, "last_out= %x ", l_ptr->last_out);
3147			link_dump_send_queue(l_ptr);
3148		}
3149	} else
3150		tipc_printf(buf, "[]");
3151	tipc_printf(buf, "SQSIZ(%u)", l_ptr->out_queue_size);
3152	if (l_ptr->oldest_deferred_in) {
3153		u32 o = msg_seqno(buf_msg(l_ptr->oldest_deferred_in));
3154		u32 n = msg_seqno(buf_msg(l_ptr->newest_deferred_in));
3155		tipc_printf(buf, ":RQUE[%u..%u]", o, n);
3156		if (l_ptr->deferred_inqueue_sz != mod((n + 1) - o)) {
3157			tipc_printf(buf, ":RQSIZ(%u)",
3158				    l_ptr->deferred_inqueue_sz);
3159		}
3160	}
3161	if (link_working_unknown(l_ptr))
3162		tipc_printf(buf, ":WU");
3163	if (link_reset_reset(l_ptr))
3164		tipc_printf(buf, ":RR");
3165	if (link_reset_unknown(l_ptr))
3166		tipc_printf(buf, ":RU");
3167	if (link_working_working(l_ptr))
3168		tipc_printf(buf, ":WW");
3169	tipc_printf(buf, "\n");
3170}
3171