1/* RxRPC individual remote procedure call handling
2 *
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 */
11
12#include <linux/module.h>
13#include <linux/circ_buf.h>
14#include <net/sock.h>
15#include <net/af_rxrpc.h>
16#include "ar-internal.h"
17
18const char *rxrpc_call_states[] = {
19	[RXRPC_CALL_CLIENT_SEND_REQUEST]	= "ClSndReq",
20	[RXRPC_CALL_CLIENT_AWAIT_REPLY]		= "ClAwtRpl",
21	[RXRPC_CALL_CLIENT_RECV_REPLY]		= "ClRcvRpl",
22	[RXRPC_CALL_CLIENT_FINAL_ACK]		= "ClFnlACK",
23	[RXRPC_CALL_SERVER_SECURING]		= "SvSecure",
24	[RXRPC_CALL_SERVER_ACCEPTING]		= "SvAccept",
25	[RXRPC_CALL_SERVER_RECV_REQUEST]	= "SvRcvReq",
26	[RXRPC_CALL_SERVER_ACK_REQUEST]		= "SvAckReq",
27	[RXRPC_CALL_SERVER_SEND_REPLY]		= "SvSndRpl",
28	[RXRPC_CALL_SERVER_AWAIT_ACK]		= "SvAwtACK",
29	[RXRPC_CALL_COMPLETE]			= "Complete",
30	[RXRPC_CALL_SERVER_BUSY]		= "SvBusy  ",
31	[RXRPC_CALL_REMOTELY_ABORTED]		= "RmtAbort",
32	[RXRPC_CALL_LOCALLY_ABORTED]		= "LocAbort",
33	[RXRPC_CALL_NETWORK_ERROR]		= "NetError",
34	[RXRPC_CALL_DEAD]			= "Dead    ",
35};
36
37struct kmem_cache *rxrpc_call_jar;
38LIST_HEAD(rxrpc_calls);
39DEFINE_RWLOCK(rxrpc_call_lock);
40static unsigned rxrpc_call_max_lifetime = 60;
41static unsigned rxrpc_dead_call_timeout = 2;
42
43static void rxrpc_destroy_call(struct work_struct *work);
44static void rxrpc_call_life_expired(unsigned long _call);
45static void rxrpc_dead_call_expired(unsigned long _call);
46static void rxrpc_ack_time_expired(unsigned long _call);
47static void rxrpc_resend_time_expired(unsigned long _call);
48
49/*
50 * allocate a new call
51 */
52static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
53{
54	struct rxrpc_call *call;
55
56	call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
57	if (!call)
58		return NULL;
59
60	call->acks_winsz = 16;
61	call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
62				    gfp);
63	if (!call->acks_window) {
64		kmem_cache_free(rxrpc_call_jar, call);
65		return NULL;
66	}
67
68	setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
69		    (unsigned long) call);
70	setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
71		    (unsigned long) call);
72	setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
73		    (unsigned long) call);
74	setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
75		    (unsigned long) call);
76	INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
77	INIT_WORK(&call->processor, &rxrpc_process_call);
78	INIT_LIST_HEAD(&call->accept_link);
79	skb_queue_head_init(&call->rx_queue);
80	skb_queue_head_init(&call->rx_oos_queue);
81	init_waitqueue_head(&call->tx_waitq);
82	spin_lock_init(&call->lock);
83	rwlock_init(&call->state_lock);
84	atomic_set(&call->usage, 1);
85	call->debug_id = atomic_inc_return(&rxrpc_debug_id);
86	call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
87
88	memset(&call->sock_node, 0xed, sizeof(call->sock_node));
89
90	call->rx_data_expect = 1;
91	call->rx_data_eaten = 0;
92	call->rx_first_oos = 0;
93	call->ackr_win_top = call->rx_data_eaten + 1 + RXRPC_MAXACKS;
94	call->creation_jif = jiffies;
95	return call;
96}
97
98/*
99 * allocate a new client call and attempt to to get a connection slot for it
100 */
101static struct rxrpc_call *rxrpc_alloc_client_call(
102	struct rxrpc_sock *rx,
103	struct rxrpc_transport *trans,
104	struct rxrpc_conn_bundle *bundle,
105	gfp_t gfp)
106{
107	struct rxrpc_call *call;
108	int ret;
109
110	_enter("");
111
112	ASSERT(rx != NULL);
113	ASSERT(trans != NULL);
114	ASSERT(bundle != NULL);
115
116	call = rxrpc_alloc_call(gfp);
117	if (!call)
118		return ERR_PTR(-ENOMEM);
119
120	sock_hold(&rx->sk);
121	call->socket = rx;
122	call->rx_data_post = 1;
123
124	ret = rxrpc_connect_call(rx, trans, bundle, call, gfp);
125	if (ret < 0) {
126		kmem_cache_free(rxrpc_call_jar, call);
127		return ERR_PTR(ret);
128	}
129
130	spin_lock(&call->conn->trans->peer->lock);
131	list_add(&call->error_link, &call->conn->trans->peer->error_targets);
132	spin_unlock(&call->conn->trans->peer->lock);
133
134	call->lifetimer.expires = jiffies + rxrpc_call_max_lifetime * HZ;
135	add_timer(&call->lifetimer);
136
137	_leave(" = %p", call);
138	return call;
139}
140
141/*
142 * set up a call for the given data
143 * - called in process context with IRQs enabled
144 */
145struct rxrpc_call *rxrpc_get_client_call(struct rxrpc_sock *rx,
146					 struct rxrpc_transport *trans,
147					 struct rxrpc_conn_bundle *bundle,
148					 unsigned long user_call_ID,
149					 int create,
150					 gfp_t gfp)
151{
152	struct rxrpc_call *call, *candidate;
153	struct rb_node *p, *parent, **pp;
154
155	_enter("%p,%d,%d,%lx,%d",
156	       rx, trans ? trans->debug_id : -1, bundle ? bundle->debug_id : -1,
157	       user_call_ID, create);
158
159	/* search the extant calls first for one that matches the specified
160	 * user ID */
161	read_lock(&rx->call_lock);
162
163	p = rx->calls.rb_node;
164	while (p) {
165		call = rb_entry(p, struct rxrpc_call, sock_node);
166
167		if (user_call_ID < call->user_call_ID)
168			p = p->rb_left;
169		else if (user_call_ID > call->user_call_ID)
170			p = p->rb_right;
171		else
172			goto found_extant_call;
173	}
174
175	read_unlock(&rx->call_lock);
176
177	if (!create || !trans)
178		return ERR_PTR(-EBADSLT);
179
180	/* not yet present - create a candidate for a new record and then
181	 * redo the search */
182	candidate = rxrpc_alloc_client_call(rx, trans, bundle, gfp);
183	if (IS_ERR(candidate)) {
184		_leave(" = %ld", PTR_ERR(candidate));
185		return candidate;
186	}
187
188	candidate->user_call_ID = user_call_ID;
189	__set_bit(RXRPC_CALL_HAS_USERID, &candidate->flags);
190
191	write_lock(&rx->call_lock);
192
193	pp = &rx->calls.rb_node;
194	parent = NULL;
195	while (*pp) {
196		parent = *pp;
197		call = rb_entry(parent, struct rxrpc_call, sock_node);
198
199		if (user_call_ID < call->user_call_ID)
200			pp = &(*pp)->rb_left;
201		else if (user_call_ID > call->user_call_ID)
202			pp = &(*pp)->rb_right;
203		else
204			goto found_extant_second;
205	}
206
207	/* second search also failed; add the new call */
208	call = candidate;
209	candidate = NULL;
210	rxrpc_get_call(call);
211
212	rb_link_node(&call->sock_node, parent, pp);
213	rb_insert_color(&call->sock_node, &rx->calls);
214	write_unlock(&rx->call_lock);
215
216	write_lock_bh(&rxrpc_call_lock);
217	list_add_tail(&call->link, &rxrpc_calls);
218	write_unlock_bh(&rxrpc_call_lock);
219
220	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
221
222	_leave(" = %p [new]", call);
223	return call;
224
225	/* we found the call in the list immediately */
226found_extant_call:
227	rxrpc_get_call(call);
228	read_unlock(&rx->call_lock);
229	_leave(" = %p [extant %d]", call, atomic_read(&call->usage));
230	return call;
231
232	/* we found the call on the second time through the list */
233found_extant_second:
234	rxrpc_get_call(call);
235	write_unlock(&rx->call_lock);
236	rxrpc_put_call(candidate);
237	_leave(" = %p [second %d]", call, atomic_read(&call->usage));
238	return call;
239}
240
241/*
242 * set up an incoming call
243 * - called in process context with IRQs enabled
244 */
245struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
246				       struct rxrpc_connection *conn,
247				       struct rxrpc_header *hdr,
248				       gfp_t gfp)
249{
250	struct rxrpc_call *call, *candidate;
251	struct rb_node **p, *parent;
252	__be32 call_id;
253
254	_enter(",%d,,%x", conn->debug_id, gfp);
255
256	ASSERT(rx != NULL);
257
258	candidate = rxrpc_alloc_call(gfp);
259	if (!candidate)
260		return ERR_PTR(-EBUSY);
261
262	candidate->socket = rx;
263	candidate->conn = conn;
264	candidate->cid = hdr->cid;
265	candidate->call_id = hdr->callNumber;
266	candidate->channel = ntohl(hdr->cid) & RXRPC_CHANNELMASK;
267	candidate->rx_data_post = 0;
268	candidate->state = RXRPC_CALL_SERVER_ACCEPTING;
269	if (conn->security_ix > 0)
270		candidate->state = RXRPC_CALL_SERVER_SECURING;
271
272	write_lock_bh(&conn->lock);
273
274	/* set the channel for this call */
275	call = conn->channels[candidate->channel];
276	_debug("channel[%u] is %p", candidate->channel, call);
277	if (call && call->call_id == hdr->callNumber) {
278		/* already set; must've been a duplicate packet */
279		_debug("extant call [%d]", call->state);
280		ASSERTCMP(call->conn, ==, conn);
281
282		read_lock(&call->state_lock);
283		switch (call->state) {
284		case RXRPC_CALL_LOCALLY_ABORTED:
285			if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
286				rxrpc_queue_call(call);
287		case RXRPC_CALL_REMOTELY_ABORTED:
288			read_unlock(&call->state_lock);
289			goto aborted_call;
290		default:
291			rxrpc_get_call(call);
292			read_unlock(&call->state_lock);
293			goto extant_call;
294		}
295	}
296
297	if (call) {
298		/* it seems the channel is still in use from the previous call
299		 * - ditch the old binding if its call is now complete */
300		_debug("CALL: %u { %s }",
301		       call->debug_id, rxrpc_call_states[call->state]);
302
303		if (call->state >= RXRPC_CALL_COMPLETE) {
304			conn->channels[call->channel] = NULL;
305		} else {
306			write_unlock_bh(&conn->lock);
307			kmem_cache_free(rxrpc_call_jar, candidate);
308			_leave(" = -EBUSY");
309			return ERR_PTR(-EBUSY);
310		}
311	}
312
313	/* check the call number isn't duplicate */
314	_debug("check dup");
315	call_id = hdr->callNumber;
316	p = &conn->calls.rb_node;
317	parent = NULL;
318	while (*p) {
319		parent = *p;
320		call = rb_entry(parent, struct rxrpc_call, conn_node);
321
322		if (call_id < call->call_id)
323			p = &(*p)->rb_left;
324		else if (call_id > call->call_id)
325			p = &(*p)->rb_right;
326		else
327			goto old_call;
328	}
329
330	/* make the call available */
331	_debug("new call");
332	call = candidate;
333	candidate = NULL;
334	rb_link_node(&call->conn_node, parent, p);
335	rb_insert_color(&call->conn_node, &conn->calls);
336	conn->channels[call->channel] = call;
337	sock_hold(&rx->sk);
338	atomic_inc(&conn->usage);
339	write_unlock_bh(&conn->lock);
340
341	spin_lock(&conn->trans->peer->lock);
342	list_add(&call->error_link, &conn->trans->peer->error_targets);
343	spin_unlock(&conn->trans->peer->lock);
344
345	write_lock_bh(&rxrpc_call_lock);
346	list_add_tail(&call->link, &rxrpc_calls);
347	write_unlock_bh(&rxrpc_call_lock);
348
349	_net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
350
351	call->lifetimer.expires = jiffies + rxrpc_call_max_lifetime * HZ;
352	add_timer(&call->lifetimer);
353	_leave(" = %p {%d} [new]", call, call->debug_id);
354	return call;
355
356extant_call:
357	write_unlock_bh(&conn->lock);
358	kmem_cache_free(rxrpc_call_jar, candidate);
359	_leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
360	return call;
361
362aborted_call:
363	write_unlock_bh(&conn->lock);
364	kmem_cache_free(rxrpc_call_jar, candidate);
365	_leave(" = -ECONNABORTED");
366	return ERR_PTR(-ECONNABORTED);
367
368old_call:
369	write_unlock_bh(&conn->lock);
370	kmem_cache_free(rxrpc_call_jar, candidate);
371	_leave(" = -ECONNRESET [old]");
372	return ERR_PTR(-ECONNRESET);
373}
374
375/*
376 * find an extant server call
377 * - called in process context with IRQs enabled
378 */
379struct rxrpc_call *rxrpc_find_server_call(struct rxrpc_sock *rx,
380					  unsigned long user_call_ID)
381{
382	struct rxrpc_call *call;
383	struct rb_node *p;
384
385	_enter("%p,%lx", rx, user_call_ID);
386
387	/* search the extant calls for one that matches the specified user
388	 * ID */
389	read_lock(&rx->call_lock);
390
391	p = rx->calls.rb_node;
392	while (p) {
393		call = rb_entry(p, struct rxrpc_call, sock_node);
394
395		if (user_call_ID < call->user_call_ID)
396			p = p->rb_left;
397		else if (user_call_ID > call->user_call_ID)
398			p = p->rb_right;
399		else
400			goto found_extant_call;
401	}
402
403	read_unlock(&rx->call_lock);
404	_leave(" = NULL");
405	return NULL;
406
407	/* we found the call in the list immediately */
408found_extant_call:
409	rxrpc_get_call(call);
410	read_unlock(&rx->call_lock);
411	_leave(" = %p [%d]", call, atomic_read(&call->usage));
412	return call;
413}
414
415/*
416 * detach a call from a socket and set up for release
417 */
418void rxrpc_release_call(struct rxrpc_call *call)
419{
420	struct rxrpc_connection *conn = call->conn;
421	struct rxrpc_sock *rx = call->socket;
422
423	_enter("{%d,%d,%d,%d}",
424	       call->debug_id, atomic_read(&call->usage),
425	       atomic_read(&call->ackr_not_idle),
426	       call->rx_first_oos);
427
428	spin_lock_bh(&call->lock);
429	if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
430		BUG();
431	spin_unlock_bh(&call->lock);
432
433	/* dissociate from the socket
434	 * - the socket's ref on the call is passed to the death timer
435	 */
436	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
437
438	write_lock_bh(&rx->call_lock);
439	if (!list_empty(&call->accept_link)) {
440		_debug("unlinking once-pending call %p { e=%lx f=%lx }",
441		       call, call->events, call->flags);
442		ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
443		list_del_init(&call->accept_link);
444		sk_acceptq_removed(&rx->sk);
445	} else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
446		rb_erase(&call->sock_node, &rx->calls);
447		memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
448		clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
449	}
450	write_unlock_bh(&rx->call_lock);
451
452	/* free up the channel for reuse */
453	spin_lock(&conn->trans->client_lock);
454	write_lock_bh(&conn->lock);
455	write_lock(&call->state_lock);
456
457	if (conn->channels[call->channel] == call)
458		conn->channels[call->channel] = NULL;
459
460	if (conn->out_clientflag && conn->bundle) {
461		conn->avail_calls++;
462		switch (conn->avail_calls) {
463		case 1:
464			list_move_tail(&conn->bundle_link,
465				       &conn->bundle->avail_conns);
466		case 2 ... RXRPC_MAXCALLS - 1:
467			ASSERT(conn->channels[0] == NULL ||
468			       conn->channels[1] == NULL ||
469			       conn->channels[2] == NULL ||
470			       conn->channels[3] == NULL);
471			break;
472		case RXRPC_MAXCALLS:
473			list_move_tail(&conn->bundle_link,
474				       &conn->bundle->unused_conns);
475			ASSERT(conn->channels[0] == NULL &&
476			       conn->channels[1] == NULL &&
477			       conn->channels[2] == NULL &&
478			       conn->channels[3] == NULL);
479			break;
480		default:
481			printk(KERN_ERR "RxRPC: conn->avail_calls=%d\n",
482			       conn->avail_calls);
483			BUG();
484		}
485	}
486
487	spin_unlock(&conn->trans->client_lock);
488
489	if (call->state < RXRPC_CALL_COMPLETE &&
490	    call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
491		_debug("+++ ABORTING STATE %d +++\n", call->state);
492		call->state = RXRPC_CALL_LOCALLY_ABORTED;
493		call->abort_code = RX_CALL_DEAD;
494		set_bit(RXRPC_CALL_ABORT, &call->events);
495		rxrpc_queue_call(call);
496	}
497	write_unlock(&call->state_lock);
498	write_unlock_bh(&conn->lock);
499
500	/* clean up the Rx queue */
501	if (!skb_queue_empty(&call->rx_queue) ||
502	    !skb_queue_empty(&call->rx_oos_queue)) {
503		struct rxrpc_skb_priv *sp;
504		struct sk_buff *skb;
505
506		_debug("purge Rx queues");
507
508		spin_lock_bh(&call->lock);
509		while ((skb = skb_dequeue(&call->rx_queue)) ||
510		       (skb = skb_dequeue(&call->rx_oos_queue))) {
511			sp = rxrpc_skb(skb);
512			if (sp->call) {
513				ASSERTCMP(sp->call, ==, call);
514				rxrpc_put_call(call);
515				sp->call = NULL;
516			}
517			skb->destructor = NULL;
518			spin_unlock_bh(&call->lock);
519
520			_debug("- zap %s %%%u #%u",
521			       rxrpc_pkts[sp->hdr.type],
522			       ntohl(sp->hdr.serial),
523			       ntohl(sp->hdr.seq));
524			rxrpc_free_skb(skb);
525			spin_lock_bh(&call->lock);
526		}
527		spin_unlock_bh(&call->lock);
528
529		ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
530	}
531
532	del_timer_sync(&call->resend_timer);
533	del_timer_sync(&call->ack_timer);
534	del_timer_sync(&call->lifetimer);
535	call->deadspan.expires = jiffies + rxrpc_dead_call_timeout * HZ;
536	add_timer(&call->deadspan);
537
538	_leave("");
539}
540
541/*
542 * handle a dead call being ready for reaping
543 */
544static void rxrpc_dead_call_expired(unsigned long _call)
545{
546	struct rxrpc_call *call = (struct rxrpc_call *) _call;
547
548	_enter("{%d}", call->debug_id);
549
550	write_lock_bh(&call->state_lock);
551	call->state = RXRPC_CALL_DEAD;
552	write_unlock_bh(&call->state_lock);
553	rxrpc_put_call(call);
554}
555
556/*
557 * mark a call as to be released, aborting it if it's still in progress
558 * - called with softirqs disabled
559 */
560static void rxrpc_mark_call_released(struct rxrpc_call *call)
561{
562	bool sched;
563
564	write_lock(&call->state_lock);
565	if (call->state < RXRPC_CALL_DEAD) {
566		sched = false;
567		if (call->state < RXRPC_CALL_COMPLETE) {
568			_debug("abort call %p", call);
569			call->state = RXRPC_CALL_LOCALLY_ABORTED;
570			call->abort_code = RX_CALL_DEAD;
571			if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
572				sched = true;
573		}
574		if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
575			sched = true;
576		if (sched)
577			rxrpc_queue_call(call);
578	}
579	write_unlock(&call->state_lock);
580}
581
582/*
583 * release all the calls associated with a socket
584 */
585void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
586{
587	struct rxrpc_call *call;
588	struct rb_node *p;
589
590	_enter("%p", rx);
591
592	read_lock_bh(&rx->call_lock);
593
594	/* mark all the calls as no longer wanting incoming packets */
595	for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
596		call = rb_entry(p, struct rxrpc_call, sock_node);
597		rxrpc_mark_call_released(call);
598	}
599
600	/* kill the not-yet-accepted incoming calls */
601	list_for_each_entry(call, &rx->secureq, accept_link) {
602		rxrpc_mark_call_released(call);
603	}
604
605	list_for_each_entry(call, &rx->acceptq, accept_link) {
606		rxrpc_mark_call_released(call);
607	}
608
609	read_unlock_bh(&rx->call_lock);
610	_leave("");
611}
612
613/*
614 * release a call
615 */
616void __rxrpc_put_call(struct rxrpc_call *call)
617{
618	ASSERT(call != NULL);
619
620	_enter("%p{u=%d}", call, atomic_read(&call->usage));
621
622	ASSERTCMP(atomic_read(&call->usage), >, 0);
623
624	if (atomic_dec_and_test(&call->usage)) {
625		_debug("call %d dead", call->debug_id);
626		ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
627		rxrpc_queue_work(&call->destroyer);
628	}
629	_leave("");
630}
631
632/*
633 * clean up a call
634 */
635static void rxrpc_cleanup_call(struct rxrpc_call *call)
636{
637	_net("DESTROY CALL %d", call->debug_id);
638
639	ASSERT(call->socket);
640
641	memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
642
643	del_timer_sync(&call->lifetimer);
644	del_timer_sync(&call->deadspan);
645	del_timer_sync(&call->ack_timer);
646	del_timer_sync(&call->resend_timer);
647
648	ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
649	ASSERTCMP(call->events, ==, 0);
650	if (work_pending(&call->processor)) {
651		_debug("defer destroy");
652		rxrpc_queue_work(&call->destroyer);
653		return;
654	}
655
656	if (call->conn) {
657		spin_lock(&call->conn->trans->peer->lock);
658		list_del(&call->error_link);
659		spin_unlock(&call->conn->trans->peer->lock);
660
661		write_lock_bh(&call->conn->lock);
662		rb_erase(&call->conn_node, &call->conn->calls);
663		write_unlock_bh(&call->conn->lock);
664		rxrpc_put_connection(call->conn);
665	}
666
667	if (call->acks_window) {
668		_debug("kill Tx window %d",
669		       CIRC_CNT(call->acks_head, call->acks_tail,
670				call->acks_winsz));
671		smp_mb();
672		while (CIRC_CNT(call->acks_head, call->acks_tail,
673				call->acks_winsz) > 0) {
674			struct rxrpc_skb_priv *sp;
675			unsigned long _skb;
676
677			_skb = call->acks_window[call->acks_tail] & ~1;
678			sp = rxrpc_skb((struct sk_buff *) _skb);
679			_debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
680			rxrpc_free_skb((struct sk_buff *) _skb);
681			call->acks_tail =
682				(call->acks_tail + 1) & (call->acks_winsz - 1);
683		}
684
685		kfree(call->acks_window);
686	}
687
688	rxrpc_free_skb(call->tx_pending);
689
690	rxrpc_purge_queue(&call->rx_queue);
691	ASSERT(skb_queue_empty(&call->rx_oos_queue));
692	sock_put(&call->socket->sk);
693	kmem_cache_free(rxrpc_call_jar, call);
694}
695
696/*
697 * destroy a call
698 */
699static void rxrpc_destroy_call(struct work_struct *work)
700{
701	struct rxrpc_call *call =
702		container_of(work, struct rxrpc_call, destroyer);
703
704	_enter("%p{%d,%d,%p}",
705	       call, atomic_read(&call->usage), call->channel, call->conn);
706
707	ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
708
709	write_lock_bh(&rxrpc_call_lock);
710	list_del_init(&call->link);
711	write_unlock_bh(&rxrpc_call_lock);
712
713	rxrpc_cleanup_call(call);
714	_leave("");
715}
716
717/*
718 * preemptively destroy all the call records from a transport endpoint rather
719 * than waiting for them to time out
720 */
721void __exit rxrpc_destroy_all_calls(void)
722{
723	struct rxrpc_call *call;
724
725	_enter("");
726	write_lock_bh(&rxrpc_call_lock);
727
728	while (!list_empty(&rxrpc_calls)) {
729		call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
730		_debug("Zapping call %p", call);
731
732		list_del_init(&call->link);
733
734		switch (atomic_read(&call->usage)) {
735		case 0:
736			ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
737			break;
738		case 1:
739			if (del_timer_sync(&call->deadspan) != 0 &&
740			    call->state != RXRPC_CALL_DEAD)
741				rxrpc_dead_call_expired((unsigned long) call);
742			if (call->state != RXRPC_CALL_DEAD)
743				break;
744		default:
745			printk(KERN_ERR "RXRPC:"
746			       " Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
747			       call, atomic_read(&call->usage),
748			       atomic_read(&call->ackr_not_idle),
749			       rxrpc_call_states[call->state],
750			       call->flags, call->events);
751			if (!skb_queue_empty(&call->rx_queue))
752				printk(KERN_ERR"RXRPC: Rx queue occupied\n");
753			if (!skb_queue_empty(&call->rx_oos_queue))
754				printk(KERN_ERR"RXRPC: OOS queue occupied\n");
755			break;
756		}
757
758		write_unlock_bh(&rxrpc_call_lock);
759		cond_resched();
760		write_lock_bh(&rxrpc_call_lock);
761	}
762
763	write_unlock_bh(&rxrpc_call_lock);
764	_leave("");
765}
766
767/*
768 * handle call lifetime being exceeded
769 */
770static void rxrpc_call_life_expired(unsigned long _call)
771{
772	struct rxrpc_call *call = (struct rxrpc_call *) _call;
773
774	if (call->state >= RXRPC_CALL_COMPLETE)
775		return;
776
777	_enter("{%d}", call->debug_id);
778	read_lock_bh(&call->state_lock);
779	if (call->state < RXRPC_CALL_COMPLETE) {
780		set_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
781		rxrpc_queue_call(call);
782	}
783	read_unlock_bh(&call->state_lock);
784}
785
786/*
787 * handle resend timer expiry
788 */
789static void rxrpc_resend_time_expired(unsigned long _call)
790{
791	struct rxrpc_call *call = (struct rxrpc_call *) _call;
792
793	_enter("{%d}", call->debug_id);
794
795	if (call->state >= RXRPC_CALL_COMPLETE)
796		return;
797
798	read_lock_bh(&call->state_lock);
799	clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
800	if (call->state < RXRPC_CALL_COMPLETE &&
801	    !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
802		rxrpc_queue_call(call);
803	read_unlock_bh(&call->state_lock);
804}
805
806/*
807 * handle ACK timer expiry
808 */
809static void rxrpc_ack_time_expired(unsigned long _call)
810{
811	struct rxrpc_call *call = (struct rxrpc_call *) _call;
812
813	_enter("{%d}", call->debug_id);
814
815	if (call->state >= RXRPC_CALL_COMPLETE)
816		return;
817
818	read_lock_bh(&call->state_lock);
819	if (call->state < RXRPC_CALL_COMPLETE &&
820	    !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
821		rxrpc_queue_call(call);
822	read_unlock_bh(&call->state_lock);
823}
824