1/* RxRPC recvmsg() implementation 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12#include <linux/net.h> 13#include <linux/skbuff.h> 14#include <net/sock.h> 15#include <net/af_rxrpc.h> 16#include "ar-internal.h" 17 18/* 19 * removal a call's user ID from the socket tree to make the user ID available 20 * again and so that it won't be seen again in association with that call 21 */ 22void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 23{ 24 _debug("RELEASE CALL %d", call->debug_id); 25 26 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 27 write_lock_bh(&rx->call_lock); 28 rb_erase(&call->sock_node, &call->socket->calls); 29 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 30 write_unlock_bh(&rx->call_lock); 31 } 32 33 read_lock_bh(&call->state_lock); 34 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 35 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 36 rxrpc_queue_call(call); 37 read_unlock_bh(&call->state_lock); 38} 39 40/* 41 * receive a message from an RxRPC socket 42 * - we need to be careful about two or more threads calling recvmsg 43 * simultaneously 44 */ 45int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock, 46 struct msghdr *msg, size_t len, int flags) 47{ 48 struct rxrpc_skb_priv *sp; 49 struct rxrpc_call *call = NULL, *continue_call = NULL; 50 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 51 struct sk_buff *skb; 52 long timeo; 53 int copy, ret, ullen, offset, copied = 0; 54 u32 abort_code; 55 56 DEFINE_WAIT(wait); 57 58 _enter(",,,%zu,%d", len, flags); 59 60 if (flags & (MSG_OOB | MSG_TRUNC)) 61 return -EOPNOTSUPP; 62 63 ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long); 64 65 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 66 msg->msg_flags |= MSG_MORE; 67 68 lock_sock(&rx->sk); 69 70 for (;;) { 71 /* return immediately if a client socket has no outstanding 72 * calls */ 73 if (RB_EMPTY_ROOT(&rx->calls)) { 74 if (copied) 75 goto out; 76 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 77 release_sock(&rx->sk); 78 if (continue_call) 79 rxrpc_put_call(continue_call); 80 return -ENODATA; 81 } 82 } 83 84 /* get the next message on the Rx queue */ 85 skb = skb_peek(&rx->sk.sk_receive_queue); 86 if (!skb) { 87 /* nothing remains on the queue */ 88 if (copied && 89 (msg->msg_flags & MSG_PEEK || timeo == 0)) 90 goto out; 91 92 /* wait for a message to turn up */ 93 release_sock(&rx->sk); 94 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait, 95 TASK_INTERRUPTIBLE); 96 ret = sock_error(&rx->sk); 97 if (ret) 98 goto wait_error; 99 100 if (skb_queue_empty(&rx->sk.sk_receive_queue)) { 101 if (signal_pending(current)) 102 goto wait_interrupted; 103 timeo = schedule_timeout(timeo); 104 } 105 finish_wait(sk_sleep(&rx->sk), &wait); 106 lock_sock(&rx->sk); 107 continue; 108 } 109 110 peek_next_packet: 111 sp = rxrpc_skb(skb); 112 call = sp->call; 113 ASSERT(call != NULL); 114 115 _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); 116 117 /* make sure we wait for the state to be updated in this call */ 118 spin_lock_bh(&call->lock); 119 spin_unlock_bh(&call->lock); 120 121 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 122 _debug("packet from released call"); 123 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 124 BUG(); 125 rxrpc_free_skb(skb); 126 continue; 127 } 128 129 /* determine whether to continue last data receive */ 130 if (continue_call) { 131 _debug("maybe cont"); 132 if (call != continue_call || 133 skb->mark != RXRPC_SKB_MARK_DATA) { 134 release_sock(&rx->sk); 135 rxrpc_put_call(continue_call); 136 _leave(" = %d [noncont]", copied); 137 return copied; 138 } 139 } 140 141 rxrpc_get_call(call); 142 143 /* copy the peer address and timestamp */ 144 if (!continue_call) { 145 if (msg->msg_name && msg->msg_namelen > 0) 146 memcpy(msg->msg_name, 147 &call->conn->trans->peer->srx, 148 sizeof(call->conn->trans->peer->srx)); 149 sock_recv_ts_and_drops(msg, &rx->sk, skb); 150 } 151 152 /* receive the message */ 153 if (skb->mark != RXRPC_SKB_MARK_DATA) 154 goto receive_non_data_message; 155 156 _debug("recvmsg DATA #%u { %d, %d }", 157 ntohl(sp->hdr.seq), skb->len, sp->offset); 158 159 if (!continue_call) { 160 /* only set the control data once per recvmsg() */ 161 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 162 ullen, &call->user_call_ID); 163 if (ret < 0) 164 goto copy_error; 165 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 166 } 167 168 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 169 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 170 call->rx_data_recv = ntohl(sp->hdr.seq); 171 172 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 173 174 offset = sp->offset; 175 copy = skb->len - offset; 176 if (copy > len - copied) 177 copy = len - copied; 178 179 if (skb->ip_summed == CHECKSUM_UNNECESSARY) { 180 ret = skb_copy_datagram_iovec(skb, offset, 181 msg->msg_iov, copy); 182 } else { 183 ret = skb_copy_and_csum_datagram_iovec(skb, offset, 184 msg->msg_iov); 185 if (ret == -EINVAL) 186 goto csum_copy_error; 187 } 188 189 if (ret < 0) 190 goto copy_error; 191 192 /* handle piecemeal consumption of data packets */ 193 _debug("copied %d+%d", copy, copied); 194 195 offset += copy; 196 copied += copy; 197 198 if (!(flags & MSG_PEEK)) 199 sp->offset = offset; 200 201 if (sp->offset < skb->len) { 202 _debug("buffer full"); 203 ASSERTCMP(copied, ==, len); 204 break; 205 } 206 207 /* we transferred the whole data packet */ 208 if (sp->hdr.flags & RXRPC_LAST_PACKET) { 209 _debug("last"); 210 if (call->conn->out_clientflag) { 211 /* last byte of reply received */ 212 ret = copied; 213 goto terminal_message; 214 } 215 216 /* last bit of request received */ 217 if (!(flags & MSG_PEEK)) { 218 _debug("eat packet"); 219 if (skb_dequeue(&rx->sk.sk_receive_queue) != 220 skb) 221 BUG(); 222 rxrpc_free_skb(skb); 223 } 224 msg->msg_flags &= ~MSG_MORE; 225 break; 226 } 227 228 /* move on to the next data message */ 229 _debug("next"); 230 if (!continue_call) 231 continue_call = sp->call; 232 else 233 rxrpc_put_call(call); 234 call = NULL; 235 236 if (flags & MSG_PEEK) { 237 _debug("peek next"); 238 skb = skb->next; 239 if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue) 240 break; 241 goto peek_next_packet; 242 } 243 244 _debug("eat packet"); 245 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 246 BUG(); 247 rxrpc_free_skb(skb); 248 } 249 250 /* end of non-terminal data packet reception for the moment */ 251 _debug("end rcv data"); 252out: 253 release_sock(&rx->sk); 254 if (call) 255 rxrpc_put_call(call); 256 if (continue_call) 257 rxrpc_put_call(continue_call); 258 _leave(" = %d [data]", copied); 259 return copied; 260 261 /* handle non-DATA messages such as aborts, incoming connections and 262 * final ACKs */ 263receive_non_data_message: 264 _debug("non-data"); 265 266 if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) { 267 _debug("RECV NEW CALL"); 268 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code); 269 if (ret < 0) 270 goto copy_error; 271 if (!(flags & MSG_PEEK)) { 272 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 273 BUG(); 274 rxrpc_free_skb(skb); 275 } 276 goto out; 277 } 278 279 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 280 ullen, &call->user_call_ID); 281 if (ret < 0) 282 goto copy_error; 283 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 284 285 switch (skb->mark) { 286 case RXRPC_SKB_MARK_DATA: 287 BUG(); 288 case RXRPC_SKB_MARK_FINAL_ACK: 289 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code); 290 break; 291 case RXRPC_SKB_MARK_BUSY: 292 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code); 293 break; 294 case RXRPC_SKB_MARK_REMOTE_ABORT: 295 abort_code = call->abort_code; 296 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 297 break; 298 case RXRPC_SKB_MARK_NET_ERROR: 299 _debug("RECV NET ERROR %d", sp->error); 300 abort_code = sp->error; 301 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code); 302 break; 303 case RXRPC_SKB_MARK_LOCAL_ERROR: 304 _debug("RECV LOCAL ERROR %d", sp->error); 305 abort_code = sp->error; 306 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, 307 &abort_code); 308 break; 309 default: 310 BUG(); 311 break; 312 } 313 314 if (ret < 0) 315 goto copy_error; 316 317terminal_message: 318 _debug("terminal"); 319 msg->msg_flags &= ~MSG_MORE; 320 msg->msg_flags |= MSG_EOR; 321 322 if (!(flags & MSG_PEEK)) { 323 _net("free terminal skb %p", skb); 324 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 325 BUG(); 326 rxrpc_free_skb(skb); 327 rxrpc_remove_user_ID(rx, call); 328 } 329 330 release_sock(&rx->sk); 331 rxrpc_put_call(call); 332 if (continue_call) 333 rxrpc_put_call(continue_call); 334 _leave(" = %d", ret); 335 return ret; 336 337copy_error: 338 _debug("copy error"); 339 release_sock(&rx->sk); 340 rxrpc_put_call(call); 341 if (continue_call) 342 rxrpc_put_call(continue_call); 343 _leave(" = %d", ret); 344 return ret; 345 346csum_copy_error: 347 _debug("csum error"); 348 release_sock(&rx->sk); 349 if (continue_call) 350 rxrpc_put_call(continue_call); 351 rxrpc_kill_skb(skb); 352 skb_kill_datagram(&rx->sk, skb, flags); 353 rxrpc_put_call(call); 354 return -EAGAIN; 355 356wait_interrupted: 357 ret = sock_intr_errno(timeo); 358wait_error: 359 finish_wait(sk_sleep(&rx->sk), &wait); 360 if (continue_call) 361 rxrpc_put_call(continue_call); 362 if (copied) 363 copied = ret; 364 _leave(" = %d [waitfail %d]", copied, ret); 365 return copied; 366 367} 368 369/** 370 * rxrpc_kernel_data_delivered - Record delivery of data message 371 * @skb: Message holding data 372 * 373 * Record the delivery of a data message. This permits RxRPC to keep its 374 * tracking correct. The socket buffer will be deleted. 375 */ 376void rxrpc_kernel_data_delivered(struct sk_buff *skb) 377{ 378 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 379 struct rxrpc_call *call = sp->call; 380 381 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 382 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 383 call->rx_data_recv = ntohl(sp->hdr.seq); 384 385 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 386 rxrpc_free_skb(skb); 387} 388 389EXPORT_SYMBOL(rxrpc_kernel_data_delivered); 390 391/** 392 * rxrpc_kernel_is_data_last - Determine if data message is last one 393 * @skb: Message holding data 394 * 395 * Determine if data message is last one for the parent call. 396 */ 397bool rxrpc_kernel_is_data_last(struct sk_buff *skb) 398{ 399 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 400 401 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); 402 403 return sp->hdr.flags & RXRPC_LAST_PACKET; 404} 405 406EXPORT_SYMBOL(rxrpc_kernel_is_data_last); 407 408/** 409 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message 410 * @skb: Message indicating an abort 411 * 412 * Get the abort code from an RxRPC abort message. 413 */ 414u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) 415{ 416 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 417 418 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT); 419 420 return sp->call->abort_code; 421} 422 423EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); 424 425/** 426 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message 427 * @skb: Message indicating an error 428 * 429 * Get the error number from an RxRPC error message. 430 */ 431int rxrpc_kernel_get_error_number(struct sk_buff *skb) 432{ 433 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 434 435 return sp->error; 436} 437 438EXPORT_SYMBOL(rxrpc_kernel_get_error_number); 439