1/* RxRPC recvmsg() implementation
2 *
3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4 * Written by David Howells (dhowells@redhat.com)
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version
9 * 2 of the License, or (at your option) any later version.
10 */
11
12#include <linux/net.h>
13#include <linux/skbuff.h>
14#include <net/sock.h>
15#include <net/af_rxrpc.h>
16#include "ar-internal.h"
17
18/*
19 * removal a call's user ID from the socket tree to make the user ID available
20 * again and so that it won't be seen again in association with that call
21 */
22void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call)
23{
24	_debug("RELEASE CALL %d", call->debug_id);
25
26	if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
27		write_lock_bh(&rx->call_lock);
28		rb_erase(&call->sock_node, &call->socket->calls);
29		clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
30		write_unlock_bh(&rx->call_lock);
31	}
32
33	read_lock_bh(&call->state_lock);
34	if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
35	    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
36		rxrpc_queue_call(call);
37	read_unlock_bh(&call->state_lock);
38}
39
40/*
41 * receive a message from an RxRPC socket
42 * - we need to be careful about two or more threads calling recvmsg
43 *   simultaneously
44 */
45int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock,
46		  struct msghdr *msg, size_t len, int flags)
47{
48	struct rxrpc_skb_priv *sp;
49	struct rxrpc_call *call = NULL, *continue_call = NULL;
50	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
51	struct sk_buff *skb;
52	long timeo;
53	int copy, ret, ullen, offset, copied = 0;
54	u32 abort_code;
55
56	DEFINE_WAIT(wait);
57
58	_enter(",,,%zu,%d", len, flags);
59
60	if (flags & (MSG_OOB | MSG_TRUNC))
61		return -EOPNOTSUPP;
62
63	ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long);
64
65	timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
66	msg->msg_flags |= MSG_MORE;
67
68	lock_sock(&rx->sk);
69
70	for (;;) {
71		/* return immediately if a client socket has no outstanding
72		 * calls */
73		if (RB_EMPTY_ROOT(&rx->calls)) {
74			if (copied)
75				goto out;
76			if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
77				release_sock(&rx->sk);
78				if (continue_call)
79					rxrpc_put_call(continue_call);
80				return -ENODATA;
81			}
82		}
83
84		/* get the next message on the Rx queue */
85		skb = skb_peek(&rx->sk.sk_receive_queue);
86		if (!skb) {
87			/* nothing remains on the queue */
88			if (copied &&
89			    (msg->msg_flags & MSG_PEEK || timeo == 0))
90				goto out;
91
92			/* wait for a message to turn up */
93			release_sock(&rx->sk);
94			prepare_to_wait_exclusive(rx->sk.sk_sleep, &wait,
95						  TASK_INTERRUPTIBLE);
96			ret = sock_error(&rx->sk);
97			if (ret)
98				goto wait_error;
99
100			if (skb_queue_empty(&rx->sk.sk_receive_queue)) {
101				if (signal_pending(current))
102					goto wait_interrupted;
103				timeo = schedule_timeout(timeo);
104			}
105			finish_wait(rx->sk.sk_sleep, &wait);
106			lock_sock(&rx->sk);
107			continue;
108		}
109
110	peek_next_packet:
111		sp = rxrpc_skb(skb);
112		call = sp->call;
113		ASSERT(call != NULL);
114
115		_debug("next pkt %s", rxrpc_pkts[sp->hdr.type]);
116
117		/* make sure we wait for the state to be updated in this call */
118		spin_lock_bh(&call->lock);
119		spin_unlock_bh(&call->lock);
120
121		if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
122			_debug("packet from released call");
123			if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
124				BUG();
125			rxrpc_free_skb(skb);
126			continue;
127		}
128
129		/* determine whether to continue last data receive */
130		if (continue_call) {
131			_debug("maybe cont");
132			if (call != continue_call ||
133			    skb->mark != RXRPC_SKB_MARK_DATA) {
134				release_sock(&rx->sk);
135				rxrpc_put_call(continue_call);
136				_leave(" = %d [noncont]", copied);
137				return copied;
138			}
139		}
140
141		rxrpc_get_call(call);
142
143		/* copy the peer address and timestamp */
144		if (!continue_call) {
145			if (msg->msg_name && msg->msg_namelen > 0)
146				memcpy(&msg->msg_name, &call->conn->trans->peer->srx,
147				       sizeof(call->conn->trans->peer->srx));
148			sock_recv_timestamp(msg, &rx->sk, skb);
149		}
150
151		/* receive the message */
152		if (skb->mark != RXRPC_SKB_MARK_DATA)
153			goto receive_non_data_message;
154
155		_debug("recvmsg DATA #%u { %d, %d }",
156		       ntohl(sp->hdr.seq), skb->len, sp->offset);
157
158		if (!continue_call) {
159			/* only set the control data once per recvmsg() */
160			ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
161				       ullen, &call->user_call_ID);
162			if (ret < 0)
163				goto copy_error;
164			ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
165		}
166
167		ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
168		ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
169		call->rx_data_recv = ntohl(sp->hdr.seq);
170
171		ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
172
173		offset = sp->offset;
174		copy = skb->len - offset;
175		if (copy > len - copied)
176			copy = len - copied;
177
178		if (skb->ip_summed == CHECKSUM_UNNECESSARY) {
179			ret = skb_copy_datagram_iovec(skb, offset,
180						      msg->msg_iov, copy);
181		} else {
182			ret = skb_copy_and_csum_datagram_iovec(skb, offset,
183							       msg->msg_iov);
184			if (ret == -EINVAL)
185				goto csum_copy_error;
186		}
187
188		if (ret < 0)
189			goto copy_error;
190
191		/* handle piecemeal consumption of data packets */
192		_debug("copied %d+%d", copy, copied);
193
194		offset += copy;
195		copied += copy;
196
197		if (!(flags & MSG_PEEK))
198			sp->offset = offset;
199
200		if (sp->offset < skb->len) {
201			_debug("buffer full");
202			ASSERTCMP(copied, ==, len);
203			break;
204		}
205
206		/* we transferred the whole data packet */
207		if (sp->hdr.flags & RXRPC_LAST_PACKET) {
208			_debug("last");
209			if (call->conn->out_clientflag) {
210				 /* last byte of reply received */
211				ret = copied;
212				goto terminal_message;
213			}
214
215			/* last bit of request received */
216			if (!(flags & MSG_PEEK)) {
217				_debug("eat packet");
218				if (skb_dequeue(&rx->sk.sk_receive_queue) !=
219				    skb)
220					BUG();
221				rxrpc_free_skb(skb);
222			}
223			msg->msg_flags &= ~MSG_MORE;
224			break;
225		}
226
227		/* move on to the next data message */
228		_debug("next");
229		if (!continue_call)
230			continue_call = sp->call;
231		else
232			rxrpc_put_call(call);
233		call = NULL;
234
235		if (flags & MSG_PEEK) {
236			_debug("peek next");
237			skb = skb->next;
238			if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue)
239				break;
240			goto peek_next_packet;
241		}
242
243		_debug("eat packet");
244		if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
245			BUG();
246		rxrpc_free_skb(skb);
247	}
248
249	/* end of non-terminal data packet reception for the moment */
250	_debug("end rcv data");
251out:
252	release_sock(&rx->sk);
253	if (call)
254		rxrpc_put_call(call);
255	if (continue_call)
256		rxrpc_put_call(continue_call);
257	_leave(" = %d [data]", copied);
258	return copied;
259
260	/* handle non-DATA messages such as aborts, incoming connections and
261	 * final ACKs */
262receive_non_data_message:
263	_debug("non-data");
264
265	if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) {
266		_debug("RECV NEW CALL");
267		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code);
268		if (ret < 0)
269			goto copy_error;
270		if (!(flags & MSG_PEEK)) {
271			if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
272				BUG();
273			rxrpc_free_skb(skb);
274		}
275		goto out;
276	}
277
278	ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
279		       ullen, &call->user_call_ID);
280	if (ret < 0)
281		goto copy_error;
282	ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
283
284	switch (skb->mark) {
285	case RXRPC_SKB_MARK_DATA:
286		BUG();
287	case RXRPC_SKB_MARK_FINAL_ACK:
288		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code);
289		break;
290	case RXRPC_SKB_MARK_BUSY:
291		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
292		break;
293	case RXRPC_SKB_MARK_REMOTE_ABORT:
294		abort_code = call->abort_code;
295		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
296		break;
297	case RXRPC_SKB_MARK_NET_ERROR:
298		_debug("RECV NET ERROR %d", sp->error);
299		abort_code = sp->error;
300		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code);
301		break;
302	case RXRPC_SKB_MARK_LOCAL_ERROR:
303		_debug("RECV LOCAL ERROR %d", sp->error);
304		abort_code = sp->error;
305		ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4,
306			       &abort_code);
307		break;
308	default:
309		BUG();
310		break;
311	}
312
313	if (ret < 0)
314		goto copy_error;
315
316terminal_message:
317	_debug("terminal");
318	msg->msg_flags &= ~MSG_MORE;
319	msg->msg_flags |= MSG_EOR;
320
321	if (!(flags & MSG_PEEK)) {
322		_net("free terminal skb %p", skb);
323		if (skb_dequeue(&rx->sk.sk_receive_queue) != skb)
324			BUG();
325		rxrpc_free_skb(skb);
326		rxrpc_remove_user_ID(rx, call);
327	}
328
329	release_sock(&rx->sk);
330	rxrpc_put_call(call);
331	if (continue_call)
332		rxrpc_put_call(continue_call);
333	_leave(" = %d", ret);
334	return ret;
335
336copy_error:
337	_debug("copy error");
338	release_sock(&rx->sk);
339	rxrpc_put_call(call);
340	if (continue_call)
341		rxrpc_put_call(continue_call);
342	_leave(" = %d", ret);
343	return ret;
344
345csum_copy_error:
346	_debug("csum error");
347	release_sock(&rx->sk);
348	if (continue_call)
349		rxrpc_put_call(continue_call);
350	rxrpc_kill_skb(skb);
351	skb_kill_datagram(&rx->sk, skb, flags);
352	rxrpc_put_call(call);
353	return -EAGAIN;
354
355wait_interrupted:
356	ret = sock_intr_errno(timeo);
357wait_error:
358	finish_wait(rx->sk.sk_sleep, &wait);
359	if (continue_call)
360		rxrpc_put_call(continue_call);
361	if (copied)
362		copied = ret;
363	_leave(" = %d [waitfail %d]", copied, ret);
364	return copied;
365
366}
367
368/**
369 * rxrpc_kernel_data_delivered - Record delivery of data message
370 * @skb: Message holding data
371 *
372 * Record the delivery of a data message.  This permits RxRPC to keep its
373 * tracking correct.  The socket buffer will be deleted.
374 */
375void rxrpc_kernel_data_delivered(struct sk_buff *skb)
376{
377	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
378	struct rxrpc_call *call = sp->call;
379
380	ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv);
381	ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1);
382	call->rx_data_recv = ntohl(sp->hdr.seq);
383
384	ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten);
385	rxrpc_free_skb(skb);
386}
387
388EXPORT_SYMBOL(rxrpc_kernel_data_delivered);
389
390/**
391 * rxrpc_kernel_is_data_last - Determine if data message is last one
392 * @skb: Message holding data
393 *
394 * Determine if data message is last one for the parent call.
395 */
396bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
397{
398	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
399
400	ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
401
402	return sp->hdr.flags & RXRPC_LAST_PACKET;
403}
404
405EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
406
407/**
408 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
409 * @skb: Message indicating an abort
410 *
411 * Get the abort code from an RxRPC abort message.
412 */
413u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
414{
415	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
416
417	ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT);
418
419	return sp->call->abort_code;
420}
421
422EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
423
424/**
425 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
426 * @skb: Message indicating an error
427 *
428 * Get the error number from an RxRPC error message.
429 */
430int rxrpc_kernel_get_error_number(struct sk_buff *skb)
431{
432	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
433
434	return sp->error;
435}
436
437EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
438