1/* RxRPC recvmsg() implementation 2 * 3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved. 4 * Written by David Howells (dhowells@redhat.com) 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public License 8 * as published by the Free Software Foundation; either version 9 * 2 of the License, or (at your option) any later version. 10 */ 11 12#include <linux/net.h> 13#include <linux/skbuff.h> 14#include <net/sock.h> 15#include <net/af_rxrpc.h> 16#include "ar-internal.h" 17 18/* 19 * removal a call's user ID from the socket tree to make the user ID available 20 * again and so that it won't be seen again in association with that call 21 */ 22void rxrpc_remove_user_ID(struct rxrpc_sock *rx, struct rxrpc_call *call) 23{ 24 _debug("RELEASE CALL %d", call->debug_id); 25 26 if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { 27 write_lock_bh(&rx->call_lock); 28 rb_erase(&call->sock_node, &call->socket->calls); 29 clear_bit(RXRPC_CALL_HAS_USERID, &call->flags); 30 write_unlock_bh(&rx->call_lock); 31 } 32 33 read_lock_bh(&call->state_lock); 34 if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) && 35 !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events)) 36 rxrpc_queue_call(call); 37 read_unlock_bh(&call->state_lock); 38} 39 40/* 41 * receive a message from an RxRPC socket 42 * - we need to be careful about two or more threads calling recvmsg 43 * simultaneously 44 */ 45int rxrpc_recvmsg(struct kiocb *iocb, struct socket *sock, 46 struct msghdr *msg, size_t len, int flags) 47{ 48 struct rxrpc_skb_priv *sp; 49 struct rxrpc_call *call = NULL, *continue_call = NULL; 50 struct rxrpc_sock *rx = rxrpc_sk(sock->sk); 51 struct sk_buff *skb; 52 long timeo; 53 int copy, ret, ullen, offset, copied = 0; 54 u32 abort_code; 55 56 DEFINE_WAIT(wait); 57 58 _enter(",,,%zu,%d", len, flags); 59 60 if (flags & (MSG_OOB | MSG_TRUNC)) 61 return -EOPNOTSUPP; 62 63 ullen = msg->msg_flags & MSG_CMSG_COMPAT ? 4 : sizeof(unsigned long); 64 65 timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT); 66 msg->msg_flags |= MSG_MORE; 67 68 lock_sock(&rx->sk); 69 70 for (;;) { 71 /* return immediately if a client socket has no outstanding 72 * calls */ 73 if (RB_EMPTY_ROOT(&rx->calls)) { 74 if (copied) 75 goto out; 76 if (rx->sk.sk_state != RXRPC_SERVER_LISTENING) { 77 release_sock(&rx->sk); 78 if (continue_call) 79 rxrpc_put_call(continue_call); 80 return -ENODATA; 81 } 82 } 83 84 /* get the next message on the Rx queue */ 85 skb = skb_peek(&rx->sk.sk_receive_queue); 86 if (!skb) { 87 /* nothing remains on the queue */ 88 if (copied && 89 (msg->msg_flags & MSG_PEEK || timeo == 0)) 90 goto out; 91 92 /* wait for a message to turn up */ 93 release_sock(&rx->sk); 94 prepare_to_wait_exclusive(rx->sk.sk_sleep, &wait, 95 TASK_INTERRUPTIBLE); 96 ret = sock_error(&rx->sk); 97 if (ret) 98 goto wait_error; 99 100 if (skb_queue_empty(&rx->sk.sk_receive_queue)) { 101 if (signal_pending(current)) 102 goto wait_interrupted; 103 timeo = schedule_timeout(timeo); 104 } 105 finish_wait(rx->sk.sk_sleep, &wait); 106 lock_sock(&rx->sk); 107 continue; 108 } 109 110 peek_next_packet: 111 sp = rxrpc_skb(skb); 112 call = sp->call; 113 ASSERT(call != NULL); 114 115 _debug("next pkt %s", rxrpc_pkts[sp->hdr.type]); 116 117 /* make sure we wait for the state to be updated in this call */ 118 spin_lock_bh(&call->lock); 119 spin_unlock_bh(&call->lock); 120 121 if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) { 122 _debug("packet from released call"); 123 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 124 BUG(); 125 rxrpc_free_skb(skb); 126 continue; 127 } 128 129 /* determine whether to continue last data receive */ 130 if (continue_call) { 131 _debug("maybe cont"); 132 if (call != continue_call || 133 skb->mark != RXRPC_SKB_MARK_DATA) { 134 release_sock(&rx->sk); 135 rxrpc_put_call(continue_call); 136 _leave(" = %d [noncont]", copied); 137 return copied; 138 } 139 } 140 141 rxrpc_get_call(call); 142 143 /* copy the peer address and timestamp */ 144 if (!continue_call) { 145 if (msg->msg_name && msg->msg_namelen > 0) 146 memcpy(&msg->msg_name, &call->conn->trans->peer->srx, 147 sizeof(call->conn->trans->peer->srx)); 148 sock_recv_timestamp(msg, &rx->sk, skb); 149 } 150 151 /* receive the message */ 152 if (skb->mark != RXRPC_SKB_MARK_DATA) 153 goto receive_non_data_message; 154 155 _debug("recvmsg DATA #%u { %d, %d }", 156 ntohl(sp->hdr.seq), skb->len, sp->offset); 157 158 if (!continue_call) { 159 /* only set the control data once per recvmsg() */ 160 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 161 ullen, &call->user_call_ID); 162 if (ret < 0) 163 goto copy_error; 164 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 165 } 166 167 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 168 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 169 call->rx_data_recv = ntohl(sp->hdr.seq); 170 171 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 172 173 offset = sp->offset; 174 copy = skb->len - offset; 175 if (copy > len - copied) 176 copy = len - copied; 177 178 if (skb->ip_summed == CHECKSUM_UNNECESSARY) { 179 ret = skb_copy_datagram_iovec(skb, offset, 180 msg->msg_iov, copy); 181 } else { 182 ret = skb_copy_and_csum_datagram_iovec(skb, offset, 183 msg->msg_iov); 184 if (ret == -EINVAL) 185 goto csum_copy_error; 186 } 187 188 if (ret < 0) 189 goto copy_error; 190 191 /* handle piecemeal consumption of data packets */ 192 _debug("copied %d+%d", copy, copied); 193 194 offset += copy; 195 copied += copy; 196 197 if (!(flags & MSG_PEEK)) 198 sp->offset = offset; 199 200 if (sp->offset < skb->len) { 201 _debug("buffer full"); 202 ASSERTCMP(copied, ==, len); 203 break; 204 } 205 206 /* we transferred the whole data packet */ 207 if (sp->hdr.flags & RXRPC_LAST_PACKET) { 208 _debug("last"); 209 if (call->conn->out_clientflag) { 210 /* last byte of reply received */ 211 ret = copied; 212 goto terminal_message; 213 } 214 215 /* last bit of request received */ 216 if (!(flags & MSG_PEEK)) { 217 _debug("eat packet"); 218 if (skb_dequeue(&rx->sk.sk_receive_queue) != 219 skb) 220 BUG(); 221 rxrpc_free_skb(skb); 222 } 223 msg->msg_flags &= ~MSG_MORE; 224 break; 225 } 226 227 /* move on to the next data message */ 228 _debug("next"); 229 if (!continue_call) 230 continue_call = sp->call; 231 else 232 rxrpc_put_call(call); 233 call = NULL; 234 235 if (flags & MSG_PEEK) { 236 _debug("peek next"); 237 skb = skb->next; 238 if (skb == (struct sk_buff *) &rx->sk.sk_receive_queue) 239 break; 240 goto peek_next_packet; 241 } 242 243 _debug("eat packet"); 244 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 245 BUG(); 246 rxrpc_free_skb(skb); 247 } 248 249 /* end of non-terminal data packet reception for the moment */ 250 _debug("end rcv data"); 251out: 252 release_sock(&rx->sk); 253 if (call) 254 rxrpc_put_call(call); 255 if (continue_call) 256 rxrpc_put_call(continue_call); 257 _leave(" = %d [data]", copied); 258 return copied; 259 260 /* handle non-DATA messages such as aborts, incoming connections and 261 * final ACKs */ 262receive_non_data_message: 263 _debug("non-data"); 264 265 if (skb->mark == RXRPC_SKB_MARK_NEW_CALL) { 266 _debug("RECV NEW CALL"); 267 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &abort_code); 268 if (ret < 0) 269 goto copy_error; 270 if (!(flags & MSG_PEEK)) { 271 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 272 BUG(); 273 rxrpc_free_skb(skb); 274 } 275 goto out; 276 } 277 278 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID, 279 ullen, &call->user_call_ID); 280 if (ret < 0) 281 goto copy_error; 282 ASSERT(test_bit(RXRPC_CALL_HAS_USERID, &call->flags)); 283 284 switch (skb->mark) { 285 case RXRPC_SKB_MARK_DATA: 286 BUG(); 287 case RXRPC_SKB_MARK_FINAL_ACK: 288 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &abort_code); 289 break; 290 case RXRPC_SKB_MARK_BUSY: 291 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code); 292 break; 293 case RXRPC_SKB_MARK_REMOTE_ABORT: 294 abort_code = call->abort_code; 295 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code); 296 break; 297 case RXRPC_SKB_MARK_NET_ERROR: 298 _debug("RECV NET ERROR %d", sp->error); 299 abort_code = sp->error; 300 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &abort_code); 301 break; 302 case RXRPC_SKB_MARK_LOCAL_ERROR: 303 _debug("RECV LOCAL ERROR %d", sp->error); 304 abort_code = sp->error; 305 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, 306 &abort_code); 307 break; 308 default: 309 BUG(); 310 break; 311 } 312 313 if (ret < 0) 314 goto copy_error; 315 316terminal_message: 317 _debug("terminal"); 318 msg->msg_flags &= ~MSG_MORE; 319 msg->msg_flags |= MSG_EOR; 320 321 if (!(flags & MSG_PEEK)) { 322 _net("free terminal skb %p", skb); 323 if (skb_dequeue(&rx->sk.sk_receive_queue) != skb) 324 BUG(); 325 rxrpc_free_skb(skb); 326 rxrpc_remove_user_ID(rx, call); 327 } 328 329 release_sock(&rx->sk); 330 rxrpc_put_call(call); 331 if (continue_call) 332 rxrpc_put_call(continue_call); 333 _leave(" = %d", ret); 334 return ret; 335 336copy_error: 337 _debug("copy error"); 338 release_sock(&rx->sk); 339 rxrpc_put_call(call); 340 if (continue_call) 341 rxrpc_put_call(continue_call); 342 _leave(" = %d", ret); 343 return ret; 344 345csum_copy_error: 346 _debug("csum error"); 347 release_sock(&rx->sk); 348 if (continue_call) 349 rxrpc_put_call(continue_call); 350 rxrpc_kill_skb(skb); 351 skb_kill_datagram(&rx->sk, skb, flags); 352 rxrpc_put_call(call); 353 return -EAGAIN; 354 355wait_interrupted: 356 ret = sock_intr_errno(timeo); 357wait_error: 358 finish_wait(rx->sk.sk_sleep, &wait); 359 if (continue_call) 360 rxrpc_put_call(continue_call); 361 if (copied) 362 copied = ret; 363 _leave(" = %d [waitfail %d]", copied, ret); 364 return copied; 365 366} 367 368/** 369 * rxrpc_kernel_data_delivered - Record delivery of data message 370 * @skb: Message holding data 371 * 372 * Record the delivery of a data message. This permits RxRPC to keep its 373 * tracking correct. The socket buffer will be deleted. 374 */ 375void rxrpc_kernel_data_delivered(struct sk_buff *skb) 376{ 377 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 378 struct rxrpc_call *call = sp->call; 379 380 ASSERTCMP(ntohl(sp->hdr.seq), >=, call->rx_data_recv); 381 ASSERTCMP(ntohl(sp->hdr.seq), <=, call->rx_data_recv + 1); 382 call->rx_data_recv = ntohl(sp->hdr.seq); 383 384 ASSERTCMP(ntohl(sp->hdr.seq), >, call->rx_data_eaten); 385 rxrpc_free_skb(skb); 386} 387 388EXPORT_SYMBOL(rxrpc_kernel_data_delivered); 389 390/** 391 * rxrpc_kernel_is_data_last - Determine if data message is last one 392 * @skb: Message holding data 393 * 394 * Determine if data message is last one for the parent call. 395 */ 396bool rxrpc_kernel_is_data_last(struct sk_buff *skb) 397{ 398 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 399 400 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA); 401 402 return sp->hdr.flags & RXRPC_LAST_PACKET; 403} 404 405EXPORT_SYMBOL(rxrpc_kernel_is_data_last); 406 407/** 408 * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message 409 * @skb: Message indicating an abort 410 * 411 * Get the abort code from an RxRPC abort message. 412 */ 413u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb) 414{ 415 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 416 417 ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT); 418 419 return sp->call->abort_code; 420} 421 422EXPORT_SYMBOL(rxrpc_kernel_get_abort_code); 423 424/** 425 * rxrpc_kernel_get_error - Get the error number from an RxRPC error message 426 * @skb: Message indicating an error 427 * 428 * Get the error number from an RxRPC error message. 429 */ 430int rxrpc_kernel_get_error_number(struct sk_buff *skb) 431{ 432 struct rxrpc_skb_priv *sp = rxrpc_skb(skb); 433 434 return sp->error; 435} 436 437EXPORT_SYMBOL(rxrpc_kernel_get_error_number); 438