1/* 2 * Copyright (c) 2006 Oracle. All rights reserved. 3 * 4 * This software is available to you under a choice of one of two 5 * licenses. You may choose to be licensed under the terms of the GNU 6 * General Public License (GPL) Version 2, available from the file 7 * COPYING in the main directory of this source tree, or the 8 * OpenIB.org BSD license below: 9 * 10 * Redistribution and use in source and binary forms, with or 11 * without modification, are permitted provided that the following 12 * conditions are met: 13 * 14 * - Redistributions of source code must retain the above 15 * copyright notice, this list of conditions and the following 16 * disclaimer. 17 * 18 * - Redistributions in binary form must reproduce the above 19 * copyright notice, this list of conditions and the following 20 * disclaimer in the documentation and/or other materials 21 * provided with the distribution. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS 27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN 28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN 29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 30 * SOFTWARE. 31 * 32 */ 33#include <linux/kernel.h> 34#include <linux/in.h> 35#include <linux/device.h> 36#include <linux/dmapool.h> 37 38#include "rds.h" 39#include "rdma.h" 40#include "iw.h" 41 42static void rds_iw_send_rdma_complete(struct rds_message *rm, 43 int wc_status) 44{ 45 int notify_status; 46 47 switch (wc_status) { 48 case IB_WC_WR_FLUSH_ERR: 49 return; 50 51 case IB_WC_SUCCESS: 52 notify_status = RDS_RDMA_SUCCESS; 53 break; 54 55 case IB_WC_REM_ACCESS_ERR: 56 notify_status = RDS_RDMA_REMOTE_ERROR; 57 break; 58 59 default: 60 notify_status = RDS_RDMA_OTHER_ERROR; 61 break; 62 } 63 rds_rdma_send_complete(rm, notify_status); 64} 65 66static void rds_iw_send_unmap_rdma(struct rds_iw_connection *ic, 67 struct rds_rdma_op *op) 68{ 69 if (op->r_mapped) { 70 ib_dma_unmap_sg(ic->i_cm_id->device, 71 op->r_sg, op->r_nents, 72 op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); 73 op->r_mapped = 0; 74 } 75} 76 77static void rds_iw_send_unmap_rm(struct rds_iw_connection *ic, 78 struct rds_iw_send_work *send, 79 int wc_status) 80{ 81 struct rds_message *rm = send->s_rm; 82 83 rdsdebug("ic %p send %p rm %p\n", ic, send, rm); 84 85 ib_dma_unmap_sg(ic->i_cm_id->device, 86 rm->m_sg, rm->m_nents, 87 DMA_TO_DEVICE); 88 89 if (rm->m_rdma_op != NULL) { 90 rds_iw_send_unmap_rdma(ic, rm->m_rdma_op); 91 92 /* If the user asked for a completion notification on this 93 * message, we can implement three different semantics: 94 * 1. Notify when we received the ACK on the RDS message 95 * that was queued with the RDMA. This provides reliable 96 * notification of RDMA status at the expense of a one-way 97 * packet delay. 98 * 2. Notify when the IB stack gives us the completion event for 99 * the RDMA operation. 100 * 3. Notify when the IB stack gives us the completion event for 101 * the accompanying RDS messages. 102 * Here, we implement approach #3. To implement approach #2, 103 * call rds_rdma_send_complete from the cq_handler. To implement #1, 104 * don't call rds_rdma_send_complete at all, and fall back to the notify 105 * handling in the ACK processing code. 106 * 107 * Note: There's no need to explicitly sync any RDMA buffers using 108 * ib_dma_sync_sg_for_cpu - the completion for the RDMA 109 * operation itself unmapped the RDMA buffers, which takes care 110 * of synching. 111 */ 112 rds_iw_send_rdma_complete(rm, wc_status); 113 114 if (rm->m_rdma_op->r_write) 115 rds_stats_add(s_send_rdma_bytes, rm->m_rdma_op->r_bytes); 116 else 117 rds_stats_add(s_recv_rdma_bytes, rm->m_rdma_op->r_bytes); 118 } 119 120 /* If anyone waited for this message to get flushed out, wake 121 * them up now */ 122 rds_message_unmapped(rm); 123 124 rds_message_put(rm); 125 send->s_rm = NULL; 126} 127 128void rds_iw_send_init_ring(struct rds_iw_connection *ic) 129{ 130 struct rds_iw_send_work *send; 131 u32 i; 132 133 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 134 struct ib_sge *sge; 135 136 send->s_rm = NULL; 137 send->s_op = NULL; 138 send->s_mapping = NULL; 139 140 send->s_wr.next = NULL; 141 send->s_wr.wr_id = i; 142 send->s_wr.sg_list = send->s_sge; 143 send->s_wr.num_sge = 1; 144 send->s_wr.opcode = IB_WR_SEND; 145 send->s_wr.send_flags = 0; 146 send->s_wr.ex.imm_data = 0; 147 148 sge = rds_iw_data_sge(ic, send->s_sge); 149 sge->lkey = 0; 150 151 sge = rds_iw_header_sge(ic, send->s_sge); 152 sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header)); 153 sge->length = sizeof(struct rds_header); 154 sge->lkey = 0; 155 156 send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fastreg_message_size); 157 if (IS_ERR(send->s_mr)) { 158 printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_mr failed\n"); 159 break; 160 } 161 162 send->s_page_list = ib_alloc_fast_reg_page_list( 163 ic->i_cm_id->device, fastreg_message_size); 164 if (IS_ERR(send->s_page_list)) { 165 printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_page_list failed\n"); 166 break; 167 } 168 } 169} 170 171void rds_iw_send_clear_ring(struct rds_iw_connection *ic) 172{ 173 struct rds_iw_send_work *send; 174 u32 i; 175 176 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 177 BUG_ON(!send->s_mr); 178 ib_dereg_mr(send->s_mr); 179 BUG_ON(!send->s_page_list); 180 ib_free_fast_reg_page_list(send->s_page_list); 181 if (send->s_wr.opcode == 0xdead) 182 continue; 183 if (send->s_rm) 184 rds_iw_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR); 185 if (send->s_op) 186 rds_iw_send_unmap_rdma(ic, send->s_op); 187 } 188} 189 190/* 191 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc 192 * operations performed in the send path. As the sender allocs and potentially 193 * unallocs the next free entry in the ring it doesn't alter which is 194 * the next to be freed, which is what this is concerned with. 195 */ 196void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context) 197{ 198 struct rds_connection *conn = context; 199 struct rds_iw_connection *ic = conn->c_transport_data; 200 struct ib_wc wc; 201 struct rds_iw_send_work *send; 202 u32 completed; 203 u32 oldest; 204 u32 i; 205 int ret; 206 207 rdsdebug("cq %p conn %p\n", cq, conn); 208 rds_iw_stats_inc(s_iw_tx_cq_call); 209 ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP); 210 if (ret) 211 rdsdebug("ib_req_notify_cq send failed: %d\n", ret); 212 213 while (ib_poll_cq(cq, 1, &wc) > 0) { 214 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", 215 (unsigned long long)wc.wr_id, wc.status, wc.byte_len, 216 be32_to_cpu(wc.ex.imm_data)); 217 rds_iw_stats_inc(s_iw_tx_cq_event); 218 219 if (wc.status != IB_WC_SUCCESS) { 220 printk(KERN_ERR "WC Error: status = %d opcode = %d\n", wc.status, wc.opcode); 221 break; 222 } 223 224 if (wc.opcode == IB_WC_LOCAL_INV && wc.wr_id == RDS_IW_LOCAL_INV_WR_ID) { 225 ic->i_fastreg_posted = 0; 226 continue; 227 } 228 229 if (wc.opcode == IB_WC_FAST_REG_MR && wc.wr_id == RDS_IW_FAST_REG_WR_ID) { 230 ic->i_fastreg_posted = 1; 231 continue; 232 } 233 234 if (wc.wr_id == RDS_IW_ACK_WR_ID) { 235 if (ic->i_ack_queued + HZ/2 < jiffies) 236 rds_iw_stats_inc(s_iw_tx_stalled); 237 rds_iw_ack_send_complete(ic); 238 continue; 239 } 240 241 oldest = rds_iw_ring_oldest(&ic->i_send_ring); 242 243 completed = rds_iw_ring_completed(&ic->i_send_ring, wc.wr_id, oldest); 244 245 for (i = 0; i < completed; i++) { 246 send = &ic->i_sends[oldest]; 247 248 /* In the error case, wc.opcode sometimes contains garbage */ 249 switch (send->s_wr.opcode) { 250 case IB_WR_SEND: 251 if (send->s_rm) 252 rds_iw_send_unmap_rm(ic, send, wc.status); 253 break; 254 case IB_WR_FAST_REG_MR: 255 case IB_WR_RDMA_WRITE: 256 case IB_WR_RDMA_READ: 257 case IB_WR_RDMA_READ_WITH_INV: 258 /* Nothing to be done - the SG list will be unmapped 259 * when the SEND completes. */ 260 break; 261 default: 262 if (printk_ratelimit()) 263 printk(KERN_NOTICE 264 "RDS/IW: %s: unexpected opcode 0x%x in WR!\n", 265 __func__, send->s_wr.opcode); 266 break; 267 } 268 269 send->s_wr.opcode = 0xdead; 270 send->s_wr.num_sge = 1; 271 if (send->s_queued + HZ/2 < jiffies) 272 rds_iw_stats_inc(s_iw_tx_stalled); 273 274 /* If a RDMA operation produced an error, signal this right 275 * away. If we don't, the subsequent SEND that goes with this 276 * RDMA will be canceled with ERR_WFLUSH, and the application 277 * never learn that the RDMA failed. */ 278 if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) { 279 struct rds_message *rm; 280 281 rm = rds_send_get_message(conn, send->s_op); 282 if (rm) 283 rds_iw_send_rdma_complete(rm, wc.status); 284 } 285 286 oldest = (oldest + 1) % ic->i_send_ring.w_nr; 287 } 288 289 rds_iw_ring_free(&ic->i_send_ring, completed); 290 291 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) || 292 test_bit(0, &conn->c_map_queued)) 293 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 294 295 /* We expect errors as the qp is drained during shutdown */ 296 if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) { 297 rds_iw_conn_error(conn, 298 "send completion on %pI4 " 299 "had status %u, disconnecting and reconnecting\n", 300 &conn->c_faddr, wc.status); 301 } 302 } 303} 304 305/* 306 * This is the main function for allocating credits when sending 307 * messages. 308 * 309 * Conceptually, we have two counters: 310 * - send credits: this tells us how many WRs we're allowed 311 * to submit without overruning the reciever's queue. For 312 * each SEND WR we post, we decrement this by one. 313 * 314 * - posted credits: this tells us how many WRs we recently 315 * posted to the receive queue. This value is transferred 316 * to the peer as a "credit update" in a RDS header field. 317 * Every time we transmit credits to the peer, we subtract 318 * the amount of transferred credits from this counter. 319 * 320 * It is essential that we avoid situations where both sides have 321 * exhausted their send credits, and are unable to send new credits 322 * to the peer. We achieve this by requiring that we send at least 323 * one credit update to the peer before exhausting our credits. 324 * When new credits arrive, we subtract one credit that is withheld 325 * until we've posted new buffers and are ready to transmit these 326 * credits (see rds_iw_send_add_credits below). 327 * 328 * The RDS send code is essentially single-threaded; rds_send_xmit 329 * grabs c_send_lock to ensure exclusive access to the send ring. 330 * However, the ACK sending code is independent and can race with 331 * message SENDs. 332 * 333 * In the send path, we need to update the counters for send credits 334 * and the counter of posted buffers atomically - when we use the 335 * last available credit, we cannot allow another thread to race us 336 * and grab the posted credits counter. Hence, we have to use a 337 * spinlock to protect the credit counter, or use atomics. 338 * 339 * Spinlocks shared between the send and the receive path are bad, 340 * because they create unnecessary delays. An early implementation 341 * using a spinlock showed a 5% degradation in throughput at some 342 * loads. 343 * 344 * This implementation avoids spinlocks completely, putting both 345 * counters into a single atomic, and updating that atomic using 346 * atomic_add (in the receive path, when receiving fresh credits), 347 * and using atomic_cmpxchg when updating the two counters. 348 */ 349int rds_iw_send_grab_credits(struct rds_iw_connection *ic, 350 u32 wanted, u32 *adv_credits, int need_posted, int max_posted) 351{ 352 unsigned int avail, posted, got = 0, advertise; 353 long oldval, newval; 354 355 *adv_credits = 0; 356 if (!ic->i_flowctl) 357 return wanted; 358 359try_again: 360 advertise = 0; 361 oldval = newval = atomic_read(&ic->i_credits); 362 posted = IB_GET_POST_CREDITS(oldval); 363 avail = IB_GET_SEND_CREDITS(oldval); 364 365 rdsdebug("rds_iw_send_grab_credits(%u): credits=%u posted=%u\n", 366 wanted, avail, posted); 367 368 /* The last credit must be used to send a credit update. */ 369 if (avail && !posted) 370 avail--; 371 372 if (avail < wanted) { 373 struct rds_connection *conn = ic->i_cm_id->context; 374 375 /* Oops, there aren't that many credits left! */ 376 set_bit(RDS_LL_SEND_FULL, &conn->c_flags); 377 got = avail; 378 } else { 379 /* Sometimes you get what you want, lalala. */ 380 got = wanted; 381 } 382 newval -= IB_SET_SEND_CREDITS(got); 383 384 /* 385 * If need_posted is non-zero, then the caller wants 386 * the posted regardless of whether any send credits are 387 * available. 388 */ 389 if (posted && (got || need_posted)) { 390 advertise = min_t(unsigned int, posted, max_posted); 391 newval -= IB_SET_POST_CREDITS(advertise); 392 } 393 394 /* Finally bill everything */ 395 if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval) 396 goto try_again; 397 398 *adv_credits = advertise; 399 return got; 400} 401 402void rds_iw_send_add_credits(struct rds_connection *conn, unsigned int credits) 403{ 404 struct rds_iw_connection *ic = conn->c_transport_data; 405 406 if (credits == 0) 407 return; 408 409 rdsdebug("rds_iw_send_add_credits(%u): current=%u%s\n", 410 credits, 411 IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)), 412 test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : ""); 413 414 atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits); 415 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags)) 416 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 417 418 WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384); 419 420 rds_iw_stats_inc(s_iw_rx_credit_updates); 421} 422 423void rds_iw_advertise_credits(struct rds_connection *conn, unsigned int posted) 424{ 425 struct rds_iw_connection *ic = conn->c_transport_data; 426 427 if (posted == 0) 428 return; 429 430 atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits); 431 432 /* Decide whether to send an update to the peer now. 433 * If we would send a credit update for every single buffer we 434 * post, we would end up with an ACK storm (ACK arrives, 435 * consumes buffer, we refill the ring, send ACK to remote 436 * advertising the newly posted buffer... ad inf) 437 * 438 * Performance pretty much depends on how often we send 439 * credit updates - too frequent updates mean lots of ACKs. 440 * Too infrequent updates, and the peer will run out of 441 * credits and has to throttle. 442 * For the time being, 16 seems to be a good compromise. 443 */ 444 if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16) 445 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 446} 447 448static inline void 449rds_iw_xmit_populate_wr(struct rds_iw_connection *ic, 450 struct rds_iw_send_work *send, unsigned int pos, 451 unsigned long buffer, unsigned int length, 452 int send_flags) 453{ 454 struct ib_sge *sge; 455 456 WARN_ON(pos != send - ic->i_sends); 457 458 send->s_wr.send_flags = send_flags; 459 send->s_wr.opcode = IB_WR_SEND; 460 send->s_wr.num_sge = 2; 461 send->s_wr.next = NULL; 462 send->s_queued = jiffies; 463 send->s_op = NULL; 464 465 if (length != 0) { 466 sge = rds_iw_data_sge(ic, send->s_sge); 467 sge->addr = buffer; 468 sge->length = length; 469 sge->lkey = rds_iw_local_dma_lkey(ic); 470 471 sge = rds_iw_header_sge(ic, send->s_sge); 472 } else { 473 /* We're sending a packet with no payload. There is only 474 * one SGE */ 475 send->s_wr.num_sge = 1; 476 sge = &send->s_sge[0]; 477 } 478 479 sge->addr = ic->i_send_hdrs_dma + (pos * sizeof(struct rds_header)); 480 sge->length = sizeof(struct rds_header); 481 sge->lkey = rds_iw_local_dma_lkey(ic); 482} 483 484/* 485 * This can be called multiple times for a given message. The first time 486 * we see a message we map its scatterlist into the IB device so that 487 * we can provide that mapped address to the IB scatter gather entries 488 * in the IB work requests. We translate the scatterlist into a series 489 * of work requests that fragment the message. These work requests complete 490 * in order so we pass ownership of the message to the completion handler 491 * once we send the final fragment. 492 * 493 * The RDS core uses the c_send_lock to only enter this function once 494 * per connection. This makes sure that the tx ring alloc/unalloc pairs 495 * don't get out of sync and confuse the ring. 496 */ 497int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm, 498 unsigned int hdr_off, unsigned int sg, unsigned int off) 499{ 500 struct rds_iw_connection *ic = conn->c_transport_data; 501 struct ib_device *dev = ic->i_cm_id->device; 502 struct rds_iw_send_work *send = NULL; 503 struct rds_iw_send_work *first; 504 struct rds_iw_send_work *prev; 505 struct ib_send_wr *failed_wr; 506 struct scatterlist *scat; 507 u32 pos; 508 u32 i; 509 u32 work_alloc; 510 u32 credit_alloc; 511 u32 posted; 512 u32 adv_credits = 0; 513 int send_flags = 0; 514 int sent; 515 int ret; 516 int flow_controlled = 0; 517 518 BUG_ON(off % RDS_FRAG_SIZE); 519 BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header)); 520 521 /* Fastreg support */ 522 if (rds_rdma_cookie_key(rm->m_rdma_cookie) && !ic->i_fastreg_posted) { 523 ret = -EAGAIN; 524 goto out; 525 } 526 527 if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) 528 i = 1; 529 else 530 i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE); 531 532 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos); 533 if (work_alloc == 0) { 534 set_bit(RDS_LL_SEND_FULL, &conn->c_flags); 535 rds_iw_stats_inc(s_iw_tx_ring_full); 536 ret = -ENOMEM; 537 goto out; 538 } 539 540 credit_alloc = work_alloc; 541 if (ic->i_flowctl) { 542 credit_alloc = rds_iw_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT); 543 adv_credits += posted; 544 if (credit_alloc < work_alloc) { 545 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc); 546 work_alloc = credit_alloc; 547 flow_controlled++; 548 } 549 if (work_alloc == 0) { 550 set_bit(RDS_LL_SEND_FULL, &conn->c_flags); 551 rds_iw_stats_inc(s_iw_tx_throttle); 552 ret = -ENOMEM; 553 goto out; 554 } 555 } 556 557 /* map the message the first time we see it */ 558 if (ic->i_rm == NULL) { 559 /* 560 printk(KERN_NOTICE "rds_iw_xmit prep msg dport=%u flags=0x%x len=%d\n", 561 be16_to_cpu(rm->m_inc.i_hdr.h_dport), 562 rm->m_inc.i_hdr.h_flags, 563 be32_to_cpu(rm->m_inc.i_hdr.h_len)); 564 */ 565 if (rm->m_nents) { 566 rm->m_count = ib_dma_map_sg(dev, 567 rm->m_sg, rm->m_nents, DMA_TO_DEVICE); 568 rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->m_count); 569 if (rm->m_count == 0) { 570 rds_iw_stats_inc(s_iw_tx_sg_mapping_failure); 571 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 572 ret = -ENOMEM; 573 goto out; 574 } 575 } else { 576 rm->m_count = 0; 577 } 578 579 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs; 580 ic->i_unsignaled_bytes = rds_iw_sysctl_max_unsig_bytes; 581 rds_message_addref(rm); 582 ic->i_rm = rm; 583 584 /* Finalize the header */ 585 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)) 586 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED; 587 if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) 588 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED; 589 590 /* If it has a RDMA op, tell the peer we did it. This is 591 * used by the peer to release use-once RDMA MRs. */ 592 if (rm->m_rdma_op) { 593 struct rds_ext_header_rdma ext_hdr; 594 595 ext_hdr.h_rdma_rkey = cpu_to_be32(rm->m_rdma_op->r_key); 596 rds_message_add_extension(&rm->m_inc.i_hdr, 597 RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr)); 598 } 599 if (rm->m_rdma_cookie) { 600 rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr, 601 rds_rdma_cookie_key(rm->m_rdma_cookie), 602 rds_rdma_cookie_offset(rm->m_rdma_cookie)); 603 } 604 605 /* Note - rds_iw_piggyb_ack clears the ACK_REQUIRED bit, so 606 * we should not do this unless we have a chance of at least 607 * sticking the header into the send ring. Which is why we 608 * should call rds_iw_ring_alloc first. */ 609 rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_iw_piggyb_ack(ic)); 610 rds_message_make_checksum(&rm->m_inc.i_hdr); 611 612 /* 613 * Update adv_credits since we reset the ACK_REQUIRED bit. 614 */ 615 rds_iw_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits); 616 adv_credits += posted; 617 BUG_ON(adv_credits > 255); 618 } 619 620 send = &ic->i_sends[pos]; 621 first = send; 622 prev = NULL; 623 scat = &rm->m_sg[sg]; 624 sent = 0; 625 i = 0; 626 627 /* Sometimes you want to put a fence between an RDMA 628 * READ and the following SEND. 629 * We could either do this all the time 630 * or when requested by the user. Right now, we let 631 * the application choose. 632 */ 633 if (rm->m_rdma_op && rm->m_rdma_op->r_fence) 634 send_flags = IB_SEND_FENCE; 635 636 /* 637 * We could be copying the header into the unused tail of the page. 638 * That would need to be changed in the future when those pages might 639 * be mapped userspace pages or page cache pages. So instead we always 640 * use a second sge and our long-lived ring of mapped headers. We send 641 * the header after the data so that the data payload can be aligned on 642 * the receiver. 643 */ 644 645 /* handle a 0-len message */ 646 if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) { 647 rds_iw_xmit_populate_wr(ic, send, pos, 0, 0, send_flags); 648 goto add_header; 649 } 650 651 /* if there's data reference it with a chain of work reqs */ 652 for (; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) { 653 unsigned int len; 654 655 send = &ic->i_sends[pos]; 656 657 len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off); 658 rds_iw_xmit_populate_wr(ic, send, pos, 659 ib_sg_dma_address(dev, scat) + off, len, 660 send_flags); 661 662 /* 663 * We want to delay signaling completions just enough to get 664 * the batching benefits but not so much that we create dead time 665 * on the wire. 666 */ 667 if (ic->i_unsignaled_wrs-- == 0) { 668 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs; 669 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 670 } 671 672 ic->i_unsignaled_bytes -= len; 673 if (ic->i_unsignaled_bytes <= 0) { 674 ic->i_unsignaled_bytes = rds_iw_sysctl_max_unsig_bytes; 675 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 676 } 677 678 /* 679 * Always signal the last one if we're stopping due to flow control. 680 */ 681 if (flow_controlled && i == (work_alloc-1)) 682 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 683 684 rdsdebug("send %p wr %p num_sge %u next %p\n", send, 685 &send->s_wr, send->s_wr.num_sge, send->s_wr.next); 686 687 sent += len; 688 off += len; 689 if (off == ib_sg_dma_len(dev, scat)) { 690 scat++; 691 off = 0; 692 } 693 694add_header: 695 /* Tack on the header after the data. The header SGE should already 696 * have been set up to point to the right header buffer. */ 697 memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header)); 698 699 if (0) { 700 struct rds_header *hdr = &ic->i_send_hdrs[pos]; 701 702 printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n", 703 be16_to_cpu(hdr->h_dport), 704 hdr->h_flags, 705 be32_to_cpu(hdr->h_len)); 706 } 707 if (adv_credits) { 708 struct rds_header *hdr = &ic->i_send_hdrs[pos]; 709 710 /* add credit and redo the header checksum */ 711 hdr->h_credit = adv_credits; 712 rds_message_make_checksum(hdr); 713 adv_credits = 0; 714 rds_iw_stats_inc(s_iw_tx_credit_updates); 715 } 716 717 if (prev) 718 prev->s_wr.next = &send->s_wr; 719 prev = send; 720 721 pos = (pos + 1) % ic->i_send_ring.w_nr; 722 } 723 724 /* Account the RDS header in the number of bytes we sent, but just once. 725 * The caller has no concept of fragmentation. */ 726 if (hdr_off == 0) 727 sent += sizeof(struct rds_header); 728 729 /* if we finished the message then send completion owns it */ 730 if (scat == &rm->m_sg[rm->m_count]) { 731 prev->s_rm = ic->i_rm; 732 prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 733 ic->i_rm = NULL; 734 } 735 736 if (i < work_alloc) { 737 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i); 738 work_alloc = i; 739 } 740 if (ic->i_flowctl && i < credit_alloc) 741 rds_iw_send_add_credits(conn, credit_alloc - i); 742 743 failed_wr = &first->s_wr; 744 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); 745 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, 746 first, &first->s_wr, ret, failed_wr); 747 BUG_ON(failed_wr != &first->s_wr); 748 if (ret) { 749 printk(KERN_WARNING "RDS/IW: ib_post_send to %pI4 " 750 "returned %d\n", &conn->c_faddr, ret); 751 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 752 if (prev->s_rm) { 753 ic->i_rm = prev->s_rm; 754 prev->s_rm = NULL; 755 } 756 goto out; 757 } 758 759 ret = sent; 760out: 761 BUG_ON(adv_credits); 762 return ret; 763} 764 765static void rds_iw_build_send_fastreg(struct rds_iw_device *rds_iwdev, struct rds_iw_connection *ic, struct rds_iw_send_work *send, int nent, int len, u64 sg_addr) 766{ 767 BUG_ON(nent > send->s_page_list->max_page_list_len); 768 /* 769 * Perform a WR for the fast_reg_mr. Each individual page 770 * in the sg list is added to the fast reg page list and placed 771 * inside the fast_reg_mr WR. 772 */ 773 send->s_wr.opcode = IB_WR_FAST_REG_MR; 774 send->s_wr.wr.fast_reg.length = len; 775 send->s_wr.wr.fast_reg.rkey = send->s_mr->rkey; 776 send->s_wr.wr.fast_reg.page_list = send->s_page_list; 777 send->s_wr.wr.fast_reg.page_list_len = nent; 778 send->s_wr.wr.fast_reg.page_shift = PAGE_SHIFT; 779 send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_REMOTE_WRITE; 780 send->s_wr.wr.fast_reg.iova_start = sg_addr; 781 782 ib_update_fast_reg_key(send->s_mr, send->s_remap_count++); 783} 784 785int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op) 786{ 787 struct rds_iw_connection *ic = conn->c_transport_data; 788 struct rds_iw_send_work *send = NULL; 789 struct rds_iw_send_work *first; 790 struct rds_iw_send_work *prev; 791 struct ib_send_wr *failed_wr; 792 struct rds_iw_device *rds_iwdev; 793 struct scatterlist *scat; 794 unsigned long len; 795 u64 remote_addr = op->r_remote_addr; 796 u32 pos, fr_pos; 797 u32 work_alloc; 798 u32 i; 799 u32 j; 800 int sent; 801 int ret; 802 int num_sge; 803 804 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client); 805 806 /* map the message the first time we see it */ 807 if (!op->r_mapped) { 808 op->r_count = ib_dma_map_sg(ic->i_cm_id->device, 809 op->r_sg, op->r_nents, (op->r_write) ? 810 DMA_TO_DEVICE : DMA_FROM_DEVICE); 811 rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->r_count); 812 if (op->r_count == 0) { 813 rds_iw_stats_inc(s_iw_tx_sg_mapping_failure); 814 ret = -ENOMEM; 815 goto out; 816 } 817 818 op->r_mapped = 1; 819 } 820 821 if (!op->r_write) { 822 /* Alloc space on the send queue for the fastreg */ 823 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, 1, &fr_pos); 824 if (work_alloc != 1) { 825 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 826 rds_iw_stats_inc(s_iw_tx_ring_full); 827 ret = -ENOMEM; 828 goto out; 829 } 830 } 831 832 /* 833 * Instead of knowing how to return a partial rdma read/write we insist that there 834 * be enough work requests to send the entire message. 835 */ 836 i = ceil(op->r_count, rds_iwdev->max_sge); 837 838 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos); 839 if (work_alloc != i) { 840 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 841 rds_iw_stats_inc(s_iw_tx_ring_full); 842 ret = -ENOMEM; 843 goto out; 844 } 845 846 send = &ic->i_sends[pos]; 847 if (!op->r_write) { 848 first = prev = &ic->i_sends[fr_pos]; 849 } else { 850 first = send; 851 prev = NULL; 852 } 853 scat = &op->r_sg[0]; 854 sent = 0; 855 num_sge = op->r_count; 856 857 for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) { 858 send->s_wr.send_flags = 0; 859 send->s_queued = jiffies; 860 861 /* 862 * We want to delay signaling completions just enough to get 863 * the batching benefits but not so much that we create dead time on the wire. 864 */ 865 if (ic->i_unsignaled_wrs-- == 0) { 866 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs; 867 send->s_wr.send_flags = IB_SEND_SIGNALED; 868 } 869 870 /* To avoid the need to have the plumbing to invalidate the fastreg_mr used 871 * for local access after RDS is finished with it, using 872 * IB_WR_RDMA_READ_WITH_INV will invalidate it after the read has completed. 873 */ 874 if (op->r_write) 875 send->s_wr.opcode = IB_WR_RDMA_WRITE; 876 else 877 send->s_wr.opcode = IB_WR_RDMA_READ_WITH_INV; 878 879 send->s_wr.wr.rdma.remote_addr = remote_addr; 880 send->s_wr.wr.rdma.rkey = op->r_key; 881 send->s_op = op; 882 883 if (num_sge > rds_iwdev->max_sge) { 884 send->s_wr.num_sge = rds_iwdev->max_sge; 885 num_sge -= rds_iwdev->max_sge; 886 } else 887 send->s_wr.num_sge = num_sge; 888 889 send->s_wr.next = NULL; 890 891 if (prev) 892 prev->s_wr.next = &send->s_wr; 893 894 for (j = 0; j < send->s_wr.num_sge && scat != &op->r_sg[op->r_count]; j++) { 895 len = ib_sg_dma_len(ic->i_cm_id->device, scat); 896 897 if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV) 898 send->s_page_list->page_list[j] = ib_sg_dma_address(ic->i_cm_id->device, scat); 899 else { 900 send->s_sge[j].addr = ib_sg_dma_address(ic->i_cm_id->device, scat); 901 send->s_sge[j].length = len; 902 send->s_sge[j].lkey = rds_iw_local_dma_lkey(ic); 903 } 904 905 sent += len; 906 rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr); 907 remote_addr += len; 908 909 scat++; 910 } 911 912 if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV) { 913 send->s_wr.num_sge = 1; 914 send->s_sge[0].addr = conn->c_xmit_rm->m_rs->rs_user_addr; 915 send->s_sge[0].length = conn->c_xmit_rm->m_rs->rs_user_bytes; 916 send->s_sge[0].lkey = ic->i_sends[fr_pos].s_mr->lkey; 917 } 918 919 rdsdebug("send %p wr %p num_sge %u next %p\n", send, 920 &send->s_wr, send->s_wr.num_sge, send->s_wr.next); 921 922 prev = send; 923 if (++send == &ic->i_sends[ic->i_send_ring.w_nr]) 924 send = ic->i_sends; 925 } 926 927 /* if we finished the message then send completion owns it */ 928 if (scat == &op->r_sg[op->r_count]) 929 first->s_wr.send_flags = IB_SEND_SIGNALED; 930 931 if (i < work_alloc) { 932 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i); 933 work_alloc = i; 934 } 935 936 /* On iWARP, local memory access by a remote system (ie, RDMA Read) is not 937 * recommended. Putting the lkey on the wire is a security hole, as it can 938 * allow for memory access to all of memory on the remote system. Some 939 * adapters do not allow using the lkey for this at all. To bypass this use a 940 * fastreg_mr (or possibly a dma_mr) 941 */ 942 if (!op->r_write) { 943 rds_iw_build_send_fastreg(rds_iwdev, ic, &ic->i_sends[fr_pos], 944 op->r_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr); 945 work_alloc++; 946 } 947 948 failed_wr = &first->s_wr; 949 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); 950 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, 951 first, &first->s_wr, ret, failed_wr); 952 BUG_ON(failed_wr != &first->s_wr); 953 if (ret) { 954 printk(KERN_WARNING "RDS/IW: rdma ib_post_send to %pI4 " 955 "returned %d\n", &conn->c_faddr, ret); 956 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc); 957 goto out; 958 } 959 960out: 961 return ret; 962} 963 964void rds_iw_xmit_complete(struct rds_connection *conn) 965{ 966 struct rds_iw_connection *ic = conn->c_transport_data; 967 968 /* We may have a pending ACK or window update we were unable 969 * to send previously (due to flow control). Try again. */ 970 rds_iw_attempt_ack(ic); 971} 972