1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
2 *
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 */
11
12#include <linux/module.h>
13#include <linux/circ_buf.h>
14#include <linux/net.h>
15#include <linux/skbuff.h>
16#include <linux/udp.h>
17#include <net/sock.h>
18#include <net/af_rxrpc.h>
19#include "ar-internal.h"
20
21static unsigned rxrpc_ack_defer = 1;
22
23static const char *rxrpc_acks[] = {
24	"---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
25	"-?-"
26};
27
28static const s8 rxrpc_ack_priority[] = {
29	[0]				= 0,
30	[RXRPC_ACK_DELAY]		= 1,
31	[RXRPC_ACK_REQUESTED]		= 2,
32	[RXRPC_ACK_IDLE]		= 3,
33	[RXRPC_ACK_PING_RESPONSE]	= 4,
34	[RXRPC_ACK_DUPLICATE]		= 5,
35	[RXRPC_ACK_OUT_OF_SEQUENCE]	= 6,
36	[RXRPC_ACK_EXCEEDS_WINDOW]	= 7,
37	[RXRPC_ACK_NOSPACE]		= 8,
38};
39
40/*
41 * propose an ACK be sent
42 */
43void __rxrpc_propose_ACK(struct rxrpc_call *call, uint8_t ack_reason,
44			 __be32 serial, bool immediate)
45{
46	unsigned long expiry;
47	s8 prior = rxrpc_ack_priority[ack_reason];
48
49	ASSERTCMP(prior, >, 0);
50
51	_enter("{%d},%s,%%%x,%u",
52	       call->debug_id, rxrpc_acks[ack_reason], ntohl(serial),
53	       immediate);
54
55	if (prior < rxrpc_ack_priority[call->ackr_reason]) {
56		if (immediate)
57			goto cancel_timer;
58		return;
59	}
60
61	/* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
62	 * numbers */
63	if (prior == rxrpc_ack_priority[call->ackr_reason]) {
64		if (prior <= 4)
65			call->ackr_serial = serial;
66		if (immediate)
67			goto cancel_timer;
68		return;
69	}
70
71	call->ackr_reason = ack_reason;
72	call->ackr_serial = serial;
73
74	switch (ack_reason) {
75	case RXRPC_ACK_DELAY:
76		_debug("run delay timer");
77		call->ack_timer.expires = jiffies + rxrpc_ack_timeout * HZ;
78		add_timer(&call->ack_timer);
79		return;
80
81	case RXRPC_ACK_IDLE:
82		if (!immediate) {
83			_debug("run defer timer");
84			expiry = 1;
85			goto run_timer;
86		}
87		goto cancel_timer;
88
89	case RXRPC_ACK_REQUESTED:
90		if (!rxrpc_ack_defer)
91			goto cancel_timer;
92		if (!immediate || serial == cpu_to_be32(1)) {
93			_debug("run defer timer");
94			expiry = rxrpc_ack_defer;
95			goto run_timer;
96		}
97
98	default:
99		_debug("immediate ACK");
100		goto cancel_timer;
101	}
102
103run_timer:
104	expiry += jiffies;
105	if (!timer_pending(&call->ack_timer) ||
106	    time_after(call->ack_timer.expires, expiry))
107		mod_timer(&call->ack_timer, expiry);
108	return;
109
110cancel_timer:
111	_debug("cancel timer %%%u", ntohl(serial));
112	try_to_del_timer_sync(&call->ack_timer);
113	read_lock_bh(&call->state_lock);
114	if (call->state <= RXRPC_CALL_COMPLETE &&
115	    !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
116		rxrpc_queue_call(call);
117	read_unlock_bh(&call->state_lock);
118}
119
120/*
121 * propose an ACK be sent, locking the call structure
122 */
123void rxrpc_propose_ACK(struct rxrpc_call *call, uint8_t ack_reason,
124		       __be32 serial, bool immediate)
125{
126	s8 prior = rxrpc_ack_priority[ack_reason];
127
128	if (prior > rxrpc_ack_priority[call->ackr_reason]) {
129		spin_lock_bh(&call->lock);
130		__rxrpc_propose_ACK(call, ack_reason, serial, immediate);
131		spin_unlock_bh(&call->lock);
132	}
133}
134
135/*
136 * set the resend timer
137 */
138static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
139			     unsigned long resend_at)
140{
141	read_lock_bh(&call->state_lock);
142	if (call->state >= RXRPC_CALL_COMPLETE)
143		resend = 0;
144
145	if (resend & 1) {
146		_debug("SET RESEND");
147		set_bit(RXRPC_CALL_RESEND, &call->events);
148	}
149
150	if (resend & 2) {
151		_debug("MODIFY RESEND TIMER");
152		set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
153		mod_timer(&call->resend_timer, resend_at);
154	} else {
155		_debug("KILL RESEND TIMER");
156		del_timer_sync(&call->resend_timer);
157		clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
158		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
159	}
160	read_unlock_bh(&call->state_lock);
161}
162
163/*
164 * resend packets
165 */
166static void rxrpc_resend(struct rxrpc_call *call)
167{
168	struct rxrpc_skb_priv *sp;
169	struct rxrpc_header *hdr;
170	struct sk_buff *txb;
171	unsigned long *p_txb, resend_at;
172	int loop, stop;
173	u8 resend;
174
175	_enter("{%d,%d,%d,%d},",
176	       call->acks_hard, call->acks_unacked,
177	       atomic_read(&call->sequence),
178	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
179
180	stop = 0;
181	resend = 0;
182	resend_at = 0;
183
184	for (loop = call->acks_tail;
185	     loop != call->acks_head || stop;
186	     loop = (loop + 1) &  (call->acks_winsz - 1)
187	     ) {
188		p_txb = call->acks_window + loop;
189		smp_read_barrier_depends();
190		if (*p_txb & 1)
191			continue;
192
193		txb = (struct sk_buff *) *p_txb;
194		sp = rxrpc_skb(txb);
195
196		if (sp->need_resend) {
197			sp->need_resend = 0;
198
199			/* each Tx packet has a new serial number */
200			sp->hdr.serial =
201				htonl(atomic_inc_return(&call->conn->serial));
202
203			hdr = (struct rxrpc_header *) txb->head;
204			hdr->serial = sp->hdr.serial;
205
206			_proto("Tx DATA %%%u { #%d }",
207			       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
208			if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
209				stop = 0;
210				sp->resend_at = jiffies + 3;
211			} else {
212				sp->resend_at =
213					jiffies + rxrpc_resend_timeout * HZ;
214			}
215		}
216
217		if (time_after_eq(jiffies + 1, sp->resend_at)) {
218			sp->need_resend = 1;
219			resend |= 1;
220		} else if (resend & 2) {
221			if (time_before(sp->resend_at, resend_at))
222				resend_at = sp->resend_at;
223		} else {
224			resend_at = sp->resend_at;
225			resend |= 2;
226		}
227	}
228
229	rxrpc_set_resend(call, resend, resend_at);
230	_leave("");
231}
232
233/*
234 * handle resend timer expiry
235 */
236static void rxrpc_resend_timer(struct rxrpc_call *call)
237{
238	struct rxrpc_skb_priv *sp;
239	struct sk_buff *txb;
240	unsigned long *p_txb, resend_at;
241	int loop;
242	u8 resend;
243
244	_enter("%d,%d,%d",
245	       call->acks_tail, call->acks_unacked, call->acks_head);
246
247	resend = 0;
248	resend_at = 0;
249
250	for (loop = call->acks_unacked;
251	     loop != call->acks_head;
252	     loop = (loop + 1) &  (call->acks_winsz - 1)
253	     ) {
254		p_txb = call->acks_window + loop;
255		smp_read_barrier_depends();
256		txb = (struct sk_buff *) (*p_txb & ~1);
257		sp = rxrpc_skb(txb);
258
259		ASSERT(!(*p_txb & 1));
260
261		if (sp->need_resend) {
262			;
263		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
264			sp->need_resend = 1;
265			resend |= 1;
266		} else if (resend & 2) {
267			if (time_before(sp->resend_at, resend_at))
268				resend_at = sp->resend_at;
269		} else {
270			resend_at = sp->resend_at;
271			resend |= 2;
272		}
273	}
274
275	rxrpc_set_resend(call, resend, resend_at);
276	_leave("");
277}
278
279/*
280 * process soft ACKs of our transmitted packets
281 * - these indicate packets the peer has or has not received, but hasn't yet
282 *   given to the consumer, and so can still be discarded and re-requested
283 */
284static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
285				   struct rxrpc_ackpacket *ack,
286				   struct sk_buff *skb)
287{
288	struct rxrpc_skb_priv *sp;
289	struct sk_buff *txb;
290	unsigned long *p_txb, resend_at;
291	int loop;
292	u8 sacks[RXRPC_MAXACKS], resend;
293
294	_enter("{%d,%d},{%d},",
295	       call->acks_hard,
296	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
297	       ack->nAcks);
298
299	if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
300		goto protocol_error;
301
302	resend = 0;
303	resend_at = 0;
304	for (loop = 0; loop < ack->nAcks; loop++) {
305		p_txb = call->acks_window;
306		p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
307		smp_read_barrier_depends();
308		txb = (struct sk_buff *) (*p_txb & ~1);
309		sp = rxrpc_skb(txb);
310
311		switch (sacks[loop]) {
312		case RXRPC_ACK_TYPE_ACK:
313			sp->need_resend = 0;
314			*p_txb |= 1;
315			break;
316		case RXRPC_ACK_TYPE_NACK:
317			sp->need_resend = 1;
318			*p_txb &= ~1;
319			resend = 1;
320			break;
321		default:
322			_debug("Unsupported ACK type %d", sacks[loop]);
323			goto protocol_error;
324		}
325	}
326
327	smp_mb();
328	call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
329
330	/* anything not explicitly ACK'd is implicitly NACK'd, but may just not
331	 * have been received or processed yet by the far end */
332	for (loop = call->acks_unacked;
333	     loop != call->acks_head;
334	     loop = (loop + 1) &  (call->acks_winsz - 1)
335	     ) {
336		p_txb = call->acks_window + loop;
337		smp_read_barrier_depends();
338		txb = (struct sk_buff *) (*p_txb & ~1);
339		sp = rxrpc_skb(txb);
340
341		if (*p_txb & 1) {
342			/* packet must have been discarded */
343			sp->need_resend = 1;
344			*p_txb &= ~1;
345			resend |= 1;
346		} else if (sp->need_resend) {
347			;
348		} else if (time_after_eq(jiffies + 1, sp->resend_at)) {
349			sp->need_resend = 1;
350			resend |= 1;
351		} else if (resend & 2) {
352			if (time_before(sp->resend_at, resend_at))
353				resend_at = sp->resend_at;
354		} else {
355			resend_at = sp->resend_at;
356			resend |= 2;
357		}
358	}
359
360	rxrpc_set_resend(call, resend, resend_at);
361	_leave(" = 0");
362	return 0;
363
364protocol_error:
365	_leave(" = -EPROTO");
366	return -EPROTO;
367}
368
369/*
370 * discard hard-ACK'd packets from the Tx window
371 */
372static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
373{
374	struct rxrpc_skb_priv *sp;
375	unsigned long _skb;
376	int tail = call->acks_tail, old_tail;
377	int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
378
379	_enter("{%u,%u},%u", call->acks_hard, win, hard);
380
381	ASSERTCMP(hard - call->acks_hard, <=, win);
382
383	while (call->acks_hard < hard) {
384		smp_read_barrier_depends();
385		_skb = call->acks_window[tail] & ~1;
386		sp = rxrpc_skb((struct sk_buff *) _skb);
387		rxrpc_free_skb((struct sk_buff *) _skb);
388		old_tail = tail;
389		tail = (tail + 1) & (call->acks_winsz - 1);
390		call->acks_tail = tail;
391		if (call->acks_unacked == old_tail)
392			call->acks_unacked = tail;
393		call->acks_hard++;
394	}
395
396	wake_up(&call->tx_waitq);
397}
398
399/*
400 * clear the Tx window in the event of a failure
401 */
402static void rxrpc_clear_tx_window(struct rxrpc_call *call)
403{
404	rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
405}
406
407/*
408 * drain the out of sequence received packet queue into the packet Rx queue
409 */
410static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
411{
412	struct rxrpc_skb_priv *sp;
413	struct sk_buff *skb;
414	bool terminal;
415	int ret;
416
417	_enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
418
419	spin_lock_bh(&call->lock);
420
421	ret = -ECONNRESET;
422	if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
423		goto socket_unavailable;
424
425	skb = skb_dequeue(&call->rx_oos_queue);
426	if (skb) {
427		sp = rxrpc_skb(skb);
428
429		_debug("drain OOS packet %d [%d]",
430		       ntohl(sp->hdr.seq), call->rx_first_oos);
431
432		if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
433			skb_queue_head(&call->rx_oos_queue, skb);
434			call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
435			_debug("requeue %p {%u}", skb, call->rx_first_oos);
436		} else {
437			skb->mark = RXRPC_SKB_MARK_DATA;
438			terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
439				!(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
440			ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
441			BUG_ON(ret < 0);
442			_debug("drain #%u", call->rx_data_post);
443			call->rx_data_post++;
444
445			/* find out what the next packet is */
446			skb = skb_peek(&call->rx_oos_queue);
447			if (skb)
448				call->rx_first_oos =
449					ntohl(rxrpc_skb(skb)->hdr.seq);
450			else
451				call->rx_first_oos = 0;
452			_debug("peek %p {%u}", skb, call->rx_first_oos);
453		}
454	}
455
456	ret = 0;
457socket_unavailable:
458	spin_unlock_bh(&call->lock);
459	_leave(" = %d", ret);
460	return ret;
461}
462
463/*
464 * insert an out of sequence packet into the buffer
465 */
466static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
467				    struct sk_buff *skb)
468{
469	struct rxrpc_skb_priv *sp, *psp;
470	struct sk_buff *p;
471	u32 seq;
472
473	sp = rxrpc_skb(skb);
474	seq = ntohl(sp->hdr.seq);
475	_enter(",,{%u}", seq);
476
477	skb->destructor = rxrpc_packet_destructor;
478	ASSERTCMP(sp->call, ==, NULL);
479	sp->call = call;
480	rxrpc_get_call(call);
481
482	/* insert into the buffer in sequence order */
483	spin_lock_bh(&call->lock);
484
485	skb_queue_walk(&call->rx_oos_queue, p) {
486		psp = rxrpc_skb(p);
487		if (ntohl(psp->hdr.seq) > seq) {
488			_debug("insert oos #%u before #%u",
489			       seq, ntohl(psp->hdr.seq));
490			skb_insert(p, skb, &call->rx_oos_queue);
491			goto inserted;
492		}
493	}
494
495	_debug("append oos #%u", seq);
496	skb_queue_tail(&call->rx_oos_queue, skb);
497inserted:
498
499	/* we might now have a new front to the queue */
500	if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
501		call->rx_first_oos = seq;
502
503	read_lock(&call->state_lock);
504	if (call->state < RXRPC_CALL_COMPLETE &&
505	    call->rx_data_post == call->rx_first_oos) {
506		_debug("drain rx oos now");
507		set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
508	}
509	read_unlock(&call->state_lock);
510
511	spin_unlock_bh(&call->lock);
512	_leave(" [stored #%u]", call->rx_first_oos);
513}
514
515/*
516 * clear the Tx window on final ACK reception
517 */
518static void rxrpc_zap_tx_window(struct rxrpc_call *call)
519{
520	struct rxrpc_skb_priv *sp;
521	struct sk_buff *skb;
522	unsigned long _skb, *acks_window;
523	uint8_t winsz = call->acks_winsz;
524	int tail;
525
526	acks_window = call->acks_window;
527	call->acks_window = NULL;
528
529	while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
530		tail = call->acks_tail;
531		smp_read_barrier_depends();
532		_skb = acks_window[tail] & ~1;
533		smp_mb();
534		call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
535
536		skb = (struct sk_buff *) _skb;
537		sp = rxrpc_skb(skb);
538		_debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
539		rxrpc_free_skb(skb);
540	}
541
542	kfree(acks_window);
543}
544
545/*
546 * process the extra information that may be appended to an ACK packet
547 */
548static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
549				  unsigned latest, int nAcks)
550{
551	struct rxrpc_ackinfo ackinfo;
552	struct rxrpc_peer *peer;
553	unsigned mtu;
554
555	if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
556		_leave(" [no ackinfo]");
557		return;
558	}
559
560	_proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
561	       latest,
562	       ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
563	       ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
564
565	mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
566
567	peer = call->conn->trans->peer;
568	if (mtu < peer->maxdata) {
569		spin_lock_bh(&peer->lock);
570		peer->maxdata = mtu;
571		peer->mtu = mtu + peer->hdrsize;
572		spin_unlock_bh(&peer->lock);
573		_net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
574	}
575}
576
577/*
578 * process packets in the reception queue
579 */
580static int rxrpc_process_rx_queue(struct rxrpc_call *call,
581				  u32 *_abort_code)
582{
583	struct rxrpc_ackpacket ack;
584	struct rxrpc_skb_priv *sp;
585	struct sk_buff *skb;
586	bool post_ACK;
587	int latest;
588	u32 hard, tx;
589
590	_enter("");
591
592process_further:
593	skb = skb_dequeue(&call->rx_queue);
594	if (!skb)
595		return -EAGAIN;
596
597	_net("deferred skb %p", skb);
598
599	sp = rxrpc_skb(skb);
600
601	_debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
602
603	post_ACK = false;
604
605	switch (sp->hdr.type) {
606		/* data packets that wind up here have been received out of
607		 * order, need security processing or are jumbo packets */
608	case RXRPC_PACKET_TYPE_DATA:
609		_proto("OOSQ DATA %%%u { #%u }",
610		       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
611
612		/* secured packets must be verified and possibly decrypted */
613		if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
614			goto protocol_error;
615
616		rxrpc_insert_oos_packet(call, skb);
617		goto process_further;
618
619		/* partial ACK to process */
620	case RXRPC_PACKET_TYPE_ACK:
621		if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
622			_debug("extraction failure");
623			goto protocol_error;
624		}
625		if (!skb_pull(skb, sizeof(ack)))
626			BUG();
627
628		latest = ntohl(sp->hdr.serial);
629		hard = ntohl(ack.firstPacket);
630		tx = atomic_read(&call->sequence);
631
632		_proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
633		       latest,
634		       ntohs(ack.maxSkew),
635		       hard,
636		       ntohl(ack.previousPacket),
637		       ntohl(ack.serial),
638		       rxrpc_acks[ack.reason],
639		       ack.nAcks);
640
641		rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
642
643		if (ack.reason == RXRPC_ACK_PING) {
644			_proto("Rx ACK %%%u PING Request", latest);
645			rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
646					  sp->hdr.serial, true);
647		}
648
649		/* discard any out-of-order or duplicate ACKs */
650		if (latest - call->acks_latest <= 0) {
651			_debug("discard ACK %d <= %d",
652			       latest, call->acks_latest);
653			goto discard;
654		}
655		call->acks_latest = latest;
656
657		if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
658		    call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
659		    call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
660		    call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
661			goto discard;
662
663		_debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
664
665		if (hard > 0) {
666			if (hard - 1 > tx) {
667				_debug("hard-ACK'd packet %d not transmitted"
668				       " (%d top)",
669				       hard - 1, tx);
670				goto protocol_error;
671			}
672
673			if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
674			     call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
675			    hard > tx)
676				goto all_acked;
677
678			smp_rmb();
679			rxrpc_rotate_tx_window(call, hard - 1);
680		}
681
682		if (ack.nAcks > 0) {
683			if (hard - 1 + ack.nAcks > tx) {
684				_debug("soft-ACK'd packet %d+%d not"
685				       " transmitted (%d top)",
686				       hard - 1, ack.nAcks, tx);
687				goto protocol_error;
688			}
689
690			if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
691				goto protocol_error;
692		}
693		goto discard;
694
695		/* complete ACK to process */
696	case RXRPC_PACKET_TYPE_ACKALL:
697		goto all_acked;
698
699		/* abort and busy are handled elsewhere */
700	case RXRPC_PACKET_TYPE_BUSY:
701	case RXRPC_PACKET_TYPE_ABORT:
702		BUG();
703
704		/* connection level events - also handled elsewhere */
705	case RXRPC_PACKET_TYPE_CHALLENGE:
706	case RXRPC_PACKET_TYPE_RESPONSE:
707	case RXRPC_PACKET_TYPE_DEBUG:
708		BUG();
709	}
710
711	/* if we've had a hard ACK that covers all the packets we've sent, then
712	 * that ends that phase of the operation */
713all_acked:
714	write_lock_bh(&call->state_lock);
715	_debug("ack all %d", call->state);
716
717	switch (call->state) {
718	case RXRPC_CALL_CLIENT_AWAIT_REPLY:
719		call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
720		break;
721	case RXRPC_CALL_SERVER_AWAIT_ACK:
722		_debug("srv complete");
723		call->state = RXRPC_CALL_COMPLETE;
724		post_ACK = true;
725		break;
726	case RXRPC_CALL_CLIENT_SEND_REQUEST:
727	case RXRPC_CALL_SERVER_RECV_REQUEST:
728		goto protocol_error_unlock; /* can't occur yet */
729	default:
730		write_unlock_bh(&call->state_lock);
731		goto discard; /* assume packet left over from earlier phase */
732	}
733
734	write_unlock_bh(&call->state_lock);
735
736	/* if all the packets we sent are hard-ACK'd, then we can discard
737	 * whatever we've got left */
738	_debug("clear Tx %d",
739	       CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
740
741	del_timer_sync(&call->resend_timer);
742	clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
743	clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
744
745	if (call->acks_window)
746		rxrpc_zap_tx_window(call);
747
748	if (post_ACK) {
749		/* post the final ACK message for userspace to pick up */
750		_debug("post ACK");
751		skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
752		sp->call = call;
753		rxrpc_get_call(call);
754		spin_lock_bh(&call->lock);
755		if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
756			BUG();
757		spin_unlock_bh(&call->lock);
758		goto process_further;
759	}
760
761discard:
762	rxrpc_free_skb(skb);
763	goto process_further;
764
765protocol_error_unlock:
766	write_unlock_bh(&call->state_lock);
767protocol_error:
768	rxrpc_free_skb(skb);
769	_leave(" = -EPROTO");
770	return -EPROTO;
771}
772
773/*
774 * post a message to the socket Rx queue for recvmsg() to pick up
775 */
776static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
777			      bool fatal)
778{
779	struct rxrpc_skb_priv *sp;
780	struct sk_buff *skb;
781	int ret;
782
783	_enter("{%d,%lx},%u,%u,%d",
784	       call->debug_id, call->flags, mark, error, fatal);
785
786	/* remove timers and things for fatal messages */
787	if (fatal) {
788		del_timer_sync(&call->resend_timer);
789		del_timer_sync(&call->ack_timer);
790		clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
791	}
792
793	if (mark != RXRPC_SKB_MARK_NEW_CALL &&
794	    !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
795		_leave("[no userid]");
796		return 0;
797	}
798
799	if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
800		skb = alloc_skb(0, GFP_NOFS);
801		if (!skb)
802			return -ENOMEM;
803
804		rxrpc_new_skb(skb);
805
806		skb->mark = mark;
807
808		sp = rxrpc_skb(skb);
809		memset(sp, 0, sizeof(*sp));
810		sp->error = error;
811		sp->call = call;
812		rxrpc_get_call(call);
813
814		spin_lock_bh(&call->lock);
815		ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
816		spin_unlock_bh(&call->lock);
817		if (ret < 0)
818			BUG();
819	}
820
821	return 0;
822}
823
824/*
825 * handle background processing of incoming call packets and ACK / abort
826 * generation
827 */
828void rxrpc_process_call(struct work_struct *work)
829{
830	struct rxrpc_call *call =
831		container_of(work, struct rxrpc_call, processor);
832	struct rxrpc_ackpacket ack;
833	struct rxrpc_ackinfo ackinfo;
834	struct rxrpc_header hdr;
835	struct msghdr msg;
836	struct kvec iov[5];
837	unsigned long bits;
838	__be32 data, pad;
839	size_t len;
840	int genbit, loop, nbit, ioc, ret, mtu;
841	u32 abort_code = RX_PROTOCOL_ERROR;
842	u8 *acks = NULL;
843
844	//printk("\n--------------------\n");
845	_enter("{%d,%s,%lx} [%lu]",
846	       call->debug_id, rxrpc_call_states[call->state], call->events,
847	       (jiffies - call->creation_jif) / (HZ / 10));
848
849	if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
850		_debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
851		return;
852	}
853
854	/* there's a good chance we're going to have to send a message, so set
855	 * one up in advance */
856	msg.msg_name	= &call->conn->trans->peer->srx.transport.sin;
857	msg.msg_namelen	= sizeof(call->conn->trans->peer->srx.transport.sin);
858	msg.msg_control	= NULL;
859	msg.msg_controllen = 0;
860	msg.msg_flags	= 0;
861
862	hdr.epoch	= call->conn->epoch;
863	hdr.cid		= call->cid;
864	hdr.callNumber	= call->call_id;
865	hdr.seq		= 0;
866	hdr.type	= RXRPC_PACKET_TYPE_ACK;
867	hdr.flags	= call->conn->out_clientflag;
868	hdr.userStatus	= 0;
869	hdr.securityIndex = call->conn->security_ix;
870	hdr._rsvd	= 0;
871	hdr.serviceId	= call->conn->service_id;
872
873	memset(iov, 0, sizeof(iov));
874	iov[0].iov_base	= &hdr;
875	iov[0].iov_len	= sizeof(hdr);
876
877	/* deal with events of a final nature */
878	if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
879		rxrpc_release_call(call);
880		clear_bit(RXRPC_CALL_RELEASE, &call->events);
881	}
882
883	if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
884		int error;
885
886		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
887		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
888		clear_bit(RXRPC_CALL_ABORT, &call->events);
889
890		error = call->conn->trans->peer->net_error;
891		_debug("post net error %d", error);
892
893		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
894				       error, true) < 0)
895			goto no_mem;
896		clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
897		goto kill_ACKs;
898	}
899
900	if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
901		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
902
903		clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
904		clear_bit(RXRPC_CALL_ABORT, &call->events);
905
906		_debug("post conn abort");
907
908		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
909				       call->conn->error, true) < 0)
910			goto no_mem;
911		clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
912		goto kill_ACKs;
913	}
914
915	if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
916		hdr.type = RXRPC_PACKET_TYPE_BUSY;
917		genbit = RXRPC_CALL_REJECT_BUSY;
918		goto send_message;
919	}
920
921	if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
922		ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
923
924		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
925				       ECONNABORTED, true) < 0)
926			goto no_mem;
927		hdr.type = RXRPC_PACKET_TYPE_ABORT;
928		data = htonl(call->abort_code);
929		iov[1].iov_base = &data;
930		iov[1].iov_len = sizeof(data);
931		genbit = RXRPC_CALL_ABORT;
932		goto send_message;
933	}
934
935	if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
936		genbit = RXRPC_CALL_ACK_FINAL;
937
938		ack.bufferSpace	= htons(8);
939		ack.maxSkew	= 0;
940		ack.serial	= 0;
941		ack.reason	= RXRPC_ACK_IDLE;
942		ack.nAcks	= 0;
943		call->ackr_reason = 0;
944
945		spin_lock_bh(&call->lock);
946		ack.serial = call->ackr_serial;
947		ack.previousPacket = call->ackr_prev_seq;
948		ack.firstPacket = htonl(call->rx_data_eaten + 1);
949		spin_unlock_bh(&call->lock);
950
951		pad = 0;
952
953		iov[1].iov_base = &ack;
954		iov[1].iov_len	= sizeof(ack);
955		iov[2].iov_base = &pad;
956		iov[2].iov_len	= 3;
957		iov[3].iov_base = &ackinfo;
958		iov[3].iov_len	= sizeof(ackinfo);
959		goto send_ACK;
960	}
961
962	if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
963			    (1 << RXRPC_CALL_RCVD_ABORT))
964	    ) {
965		u32 mark;
966
967		if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
968			mark = RXRPC_SKB_MARK_REMOTE_ABORT;
969		else
970			mark = RXRPC_SKB_MARK_BUSY;
971
972		_debug("post abort/busy");
973		rxrpc_clear_tx_window(call);
974		if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
975			goto no_mem;
976
977		clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
978		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
979		goto kill_ACKs;
980	}
981
982	if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
983		_debug("do implicit ackall");
984		rxrpc_clear_tx_window(call);
985	}
986
987	if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
988		write_lock_bh(&call->state_lock);
989		if (call->state <= RXRPC_CALL_COMPLETE) {
990			call->state = RXRPC_CALL_LOCALLY_ABORTED;
991			call->abort_code = RX_CALL_TIMEOUT;
992			set_bit(RXRPC_CALL_ABORT, &call->events);
993		}
994		write_unlock_bh(&call->state_lock);
995
996		_debug("post timeout");
997		if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
998				       ETIME, true) < 0)
999			goto no_mem;
1000
1001		clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
1002		goto kill_ACKs;
1003	}
1004
1005	/* deal with assorted inbound messages */
1006	if (!skb_queue_empty(&call->rx_queue)) {
1007		switch (rxrpc_process_rx_queue(call, &abort_code)) {
1008		case 0:
1009		case -EAGAIN:
1010			break;
1011		case -ENOMEM:
1012			goto no_mem;
1013		case -EKEYEXPIRED:
1014		case -EKEYREJECTED:
1015		case -EPROTO:
1016			rxrpc_abort_call(call, abort_code);
1017			goto kill_ACKs;
1018		}
1019	}
1020
1021	/* handle resending */
1022	if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
1023		rxrpc_resend_timer(call);
1024	if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
1025		rxrpc_resend(call);
1026
1027	/* consider sending an ordinary ACK */
1028	if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1029		_debug("send ACK: window: %d - %d { %lx }",
1030		       call->rx_data_eaten, call->ackr_win_top,
1031		       call->ackr_window[0]);
1032
1033		if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1034		    call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
1035			/* ACK by sending reply DATA packet in this state */
1036			clear_bit(RXRPC_CALL_ACK, &call->events);
1037			goto maybe_reschedule;
1038		}
1039
1040		genbit = RXRPC_CALL_ACK;
1041
1042		acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1043			       GFP_NOFS);
1044		if (!acks)
1045			goto no_mem;
1046
1047		//hdr.flags	= RXRPC_SLOW_START_OK;
1048		ack.bufferSpace	= htons(8);
1049		ack.maxSkew	= 0;
1050		ack.serial	= 0;
1051		ack.reason	= 0;
1052
1053		spin_lock_bh(&call->lock);
1054		ack.reason = call->ackr_reason;
1055		ack.serial = call->ackr_serial;
1056		ack.previousPacket = call->ackr_prev_seq;
1057		ack.firstPacket = htonl(call->rx_data_eaten + 1);
1058
1059		ack.nAcks = 0;
1060		for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1061			nbit = loop * BITS_PER_LONG;
1062			for (bits = call->ackr_window[loop]; bits; bits >>= 1
1063			     ) {
1064				_debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1065				if (bits & 1) {
1066					acks[nbit] = RXRPC_ACK_TYPE_ACK;
1067					ack.nAcks = nbit + 1;
1068				}
1069				nbit++;
1070			}
1071		}
1072		call->ackr_reason = 0;
1073		spin_unlock_bh(&call->lock);
1074
1075		pad = 0;
1076
1077		iov[1].iov_base = &ack;
1078		iov[1].iov_len	= sizeof(ack);
1079		iov[2].iov_base = acks;
1080		iov[2].iov_len	= ack.nAcks;
1081		iov[3].iov_base = &pad;
1082		iov[3].iov_len	= 3;
1083		iov[4].iov_base = &ackinfo;
1084		iov[4].iov_len	= sizeof(ackinfo);
1085
1086		switch (ack.reason) {
1087		case RXRPC_ACK_REQUESTED:
1088		case RXRPC_ACK_DUPLICATE:
1089		case RXRPC_ACK_OUT_OF_SEQUENCE:
1090		case RXRPC_ACK_EXCEEDS_WINDOW:
1091		case RXRPC_ACK_NOSPACE:
1092		case RXRPC_ACK_PING:
1093		case RXRPC_ACK_PING_RESPONSE:
1094			goto send_ACK_with_skew;
1095		case RXRPC_ACK_DELAY:
1096		case RXRPC_ACK_IDLE:
1097			goto send_ACK;
1098		}
1099	}
1100
1101	/* handle completion of security negotiations on an incoming
1102	 * connection */
1103	if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
1104		_debug("secured");
1105		spin_lock_bh(&call->lock);
1106
1107		if (call->state == RXRPC_CALL_SERVER_SECURING) {
1108			_debug("securing");
1109			write_lock(&call->conn->lock);
1110			if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1111			    !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1112				_debug("not released");
1113				call->state = RXRPC_CALL_SERVER_ACCEPTING;
1114				list_move_tail(&call->accept_link,
1115					       &call->socket->acceptq);
1116			}
1117			write_unlock(&call->conn->lock);
1118			read_lock(&call->state_lock);
1119			if (call->state < RXRPC_CALL_COMPLETE)
1120				set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1121			read_unlock(&call->state_lock);
1122		}
1123
1124		spin_unlock_bh(&call->lock);
1125		if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1126			goto maybe_reschedule;
1127	}
1128
1129	/* post a notification of an acceptable connection to the app */
1130	if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1131		_debug("post accept");
1132		if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1133				       0, false) < 0)
1134			goto no_mem;
1135		clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1136		goto maybe_reschedule;
1137	}
1138
1139	/* handle incoming call acceptance */
1140	if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
1141		_debug("accepted");
1142		ASSERTCMP(call->rx_data_post, ==, 0);
1143		call->rx_data_post = 1;
1144		read_lock_bh(&call->state_lock);
1145		if (call->state < RXRPC_CALL_COMPLETE)
1146			set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
1147		read_unlock_bh(&call->state_lock);
1148	}
1149
1150	/* drain the out of sequence received packet queue into the packet Rx
1151	 * queue */
1152	if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
1153		while (call->rx_data_post == call->rx_first_oos)
1154			if (rxrpc_drain_rx_oos_queue(call) < 0)
1155				break;
1156		goto maybe_reschedule;
1157	}
1158
1159	/* other events may have been raised since we started checking */
1160	goto maybe_reschedule;
1161
1162send_ACK_with_skew:
1163	ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1164			    ntohl(ack.serial));
1165send_ACK:
1166	mtu = call->conn->trans->peer->if_mtu;
1167	mtu -= call->conn->trans->peer->hdrsize;
1168	ackinfo.maxMTU	= htonl(mtu);
1169	ackinfo.rwind	= htonl(32);
1170
1171	/* permit the peer to send us jumbo packets if it wants to */
1172	ackinfo.rxMTU	= htonl(5692);
1173	ackinfo.jumbo_max = htonl(4);
1174
1175	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1176	_proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1177	       ntohl(hdr.serial),
1178	       ntohs(ack.maxSkew),
1179	       ntohl(ack.firstPacket),
1180	       ntohl(ack.previousPacket),
1181	       ntohl(ack.serial),
1182	       rxrpc_acks[ack.reason],
1183	       ack.nAcks);
1184
1185	del_timer_sync(&call->ack_timer);
1186	if (ack.nAcks > 0)
1187		set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
1188	goto send_message_2;
1189
1190send_message:
1191	_debug("send message");
1192
1193	hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1194	_proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1195send_message_2:
1196
1197	len = iov[0].iov_len;
1198	ioc = 1;
1199	if (iov[4].iov_len) {
1200		ioc = 5;
1201		len += iov[4].iov_len;
1202		len += iov[3].iov_len;
1203		len += iov[2].iov_len;
1204		len += iov[1].iov_len;
1205	} else if (iov[3].iov_len) {
1206		ioc = 4;
1207		len += iov[3].iov_len;
1208		len += iov[2].iov_len;
1209		len += iov[1].iov_len;
1210	} else if (iov[2].iov_len) {
1211		ioc = 3;
1212		len += iov[2].iov_len;
1213		len += iov[1].iov_len;
1214	} else if (iov[1].iov_len) {
1215		ioc = 2;
1216		len += iov[1].iov_len;
1217	}
1218
1219	ret = kernel_sendmsg(call->conn->trans->local->socket,
1220			     &msg, iov, ioc, len);
1221	if (ret < 0) {
1222		_debug("sendmsg failed: %d", ret);
1223		read_lock_bh(&call->state_lock);
1224		if (call->state < RXRPC_CALL_DEAD)
1225			rxrpc_queue_call(call);
1226		read_unlock_bh(&call->state_lock);
1227		goto error;
1228	}
1229
1230	switch (genbit) {
1231	case RXRPC_CALL_ABORT:
1232		clear_bit(genbit, &call->events);
1233		clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1234		goto kill_ACKs;
1235
1236	case RXRPC_CALL_ACK_FINAL:
1237		write_lock_bh(&call->state_lock);
1238		if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1239			call->state = RXRPC_CALL_COMPLETE;
1240		write_unlock_bh(&call->state_lock);
1241		goto kill_ACKs;
1242
1243	default:
1244		clear_bit(genbit, &call->events);
1245		switch (call->state) {
1246		case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1247		case RXRPC_CALL_CLIENT_RECV_REPLY:
1248		case RXRPC_CALL_SERVER_RECV_REQUEST:
1249		case RXRPC_CALL_SERVER_ACK_REQUEST:
1250			_debug("start ACK timer");
1251			rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
1252					  call->ackr_serial, false);
1253		default:
1254			break;
1255		}
1256		goto maybe_reschedule;
1257	}
1258
1259kill_ACKs:
1260	del_timer_sync(&call->ack_timer);
1261	if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
1262		rxrpc_put_call(call);
1263	clear_bit(RXRPC_CALL_ACK, &call->events);
1264
1265maybe_reschedule:
1266	if (call->events || !skb_queue_empty(&call->rx_queue)) {
1267		read_lock_bh(&call->state_lock);
1268		if (call->state < RXRPC_CALL_DEAD)
1269			rxrpc_queue_call(call);
1270		read_unlock_bh(&call->state_lock);
1271	}
1272
1273	/* don't leave aborted connections on the accept queue */
1274	if (call->state >= RXRPC_CALL_COMPLETE &&
1275	    !list_empty(&call->accept_link)) {
1276		_debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1277		       call, call->events, call->flags,
1278		       ntohl(call->conn->cid));
1279
1280		read_lock_bh(&call->state_lock);
1281		if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1282		    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1283			rxrpc_queue_call(call);
1284		read_unlock_bh(&call->state_lock);
1285	}
1286
1287error:
1288	clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
1289	kfree(acks);
1290
1291	/* because we don't want two CPUs both processing the work item for one
1292	 * call at the same time, we use a flag to note when it's busy; however
1293	 * this means there's a race between clearing the flag and setting the
1294	 * work pending bit and the work item being processed again */
1295	if (call->events && !work_pending(&call->processor)) {
1296		_debug("jumpstart %x", ntohl(call->conn->cid));
1297		rxrpc_queue_call(call);
1298	}
1299
1300	_leave("");
1301	return;
1302
1303no_mem:
1304	_debug("out of memory");
1305	goto maybe_reschedule;
1306}
1307