1/*
2 * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
3 */
4
5/*
6 * This file contains code imported from the OFED rds source file ib_send.c
7 * Oracle elects to have and use the contents of ib_send.c under and governed
8 * by the OpenIB.org BSD license (see below for full license text). However,
9 * the following notice accompanied the original version of this file:
10 */
11
12/*
13 * Copyright (c) 2006 Oracle.  All rights reserved.
14 *
15 * This software is available to you under a choice of one of two
16 * licenses.  You may choose to be licensed under the terms of the GNU
17 * General Public License (GPL) Version 2, available from the file
18 * COPYING in the main directory of this source tree, or the
19 * OpenIB.org BSD license below:
20 *
21 *     Redistribution and use in source and binary forms, with or
22 *     without modification, are permitted provided that the following
23 *     conditions are met:
24 *
25 *      - Redistributions of source code must retain the above
26 *        copyright notice, this list of conditions and the following
27 *        disclaimer.
28 *
29 *      - Redistributions in binary form must reproduce the above
30 *        copyright notice, this list of conditions and the following
31 *        disclaimer in the documentation and/or other materials
32 *        provided with the distribution.
33 *
34 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
35 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
36 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
37 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
38 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
39 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
40 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
41 * SOFTWARE.
42 *
43 */
44#include <sys/rds.h>
45
46#include <sys/ib/clients/rdsv3/rdsv3.h>
47#include <sys/ib/clients/rdsv3/rdma.h>
48#include <sys/ib/clients/rdsv3/ib.h>
49#include <sys/ib/clients/rdsv3/rdsv3_debug.h>
50
51static void
52rdsv3_ib_send_rdma_complete(struct rdsv3_message *rm,
53    int wc_status)
54{
55	int notify_status;
56
57	RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d",
58	    rm, wc_status);
59
60	switch (wc_status) {
61	case IBT_WC_WR_FLUSHED_ERR:
62		return;
63
64	case IBT_WC_SUCCESS:
65		notify_status = RDS_RDMA_SUCCESS;
66		break;
67
68	case IBT_WC_REMOTE_ACCESS_ERR:
69		notify_status = RDS_RDMA_REMOTE_ERROR;
70		break;
71
72	default:
73		notify_status = RDS_RDMA_OTHER_ERROR;
74		break;
75	}
76	rdsv3_rdma_send_complete(rm, notify_status);
77
78	RDSV3_DPRINTF4("rdsv3_ib_send_rdma_complete", "rm: %p, wc_status: %d",
79	    rm, wc_status);
80}
81
82static void rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev,
83    uint_t num, struct rdsv3_rdma_sg scat[]);
84
85void
86rdsv3_ib_send_unmap_rdma(struct rdsv3_ib_connection *ic,
87    struct rdsv3_rdma_op *op)
88{
89	RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rdma", "ic: %p, op: %p", ic, op);
90	if (op->r_mapped) {
91		op->r_mapped = 0;
92		if (ic->i_cm_id) {
93			rdsv3_ib_dma_unmap_sg_rdma(ic->i_cm_id->device,
94			    op->r_nents, op->r_rdma_sg);
95		} else {
96			rdsv3_ib_dma_unmap_sg_rdma((struct ib_device *)NULL,
97			    op->r_nents, op->r_rdma_sg);
98		}
99	}
100}
101
102static void
103rdsv3_ib_send_unmap_rm(struct rdsv3_ib_connection *ic,
104    struct rdsv3_ib_send_work *send,
105    int wc_status)
106{
107	struct rdsv3_message *rm = send->s_rm;
108
109	RDSV3_DPRINTF4("rdsv3_ib_send_unmap_rm", "ic %p send %p rm %p\n",
110	    ic, send, rm);
111
112	mutex_enter(&rm->m_rs_lock);
113	if (rm->m_count) {
114		rdsv3_ib_dma_unmap_sg(ic->i_cm_id->device,
115		    rm->m_sg, rm->m_count);
116		rm->m_count = 0;
117	}
118	mutex_exit(&rm->m_rs_lock);
119
120	if (rm->m_rdma_op != NULL) {
121		rdsv3_ib_send_unmap_rdma(ic, rm->m_rdma_op);
122
123		/*
124		 * If the user asked for a completion notification on this
125		 * message, we can implement three different semantics:
126		 *  1.	Notify when we received the ACK on the RDS message
127		 *	that was queued with the RDMA. This provides reliable
128		 *	notification of RDMA status at the expense of a one-way
129		 *	packet delay.
130		 *  2.	Notify when the IB stack gives us the completion
131		 *	event for the RDMA operation.
132		 *  3.	Notify when the IB stack gives us the completion
133		 *	event for the accompanying RDS messages.
134		 * Here, we implement approach #3. To implement approach #2,
135		 * call rdsv3_rdma_send_complete from the cq_handler.
136		 * To implement #1,
137		 * don't call rdsv3_rdma_send_complete at all, and fall back to
138		 * the notify
139		 * handling in the ACK processing code.
140		 *
141		 * Note: There's no need to explicitly sync any RDMA buffers
142		 * using
143		 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
144		 * operation itself unmapped the RDMA buffers, which takes care
145		 * of synching.
146		 */
147		rdsv3_ib_send_rdma_complete(rm, wc_status);
148
149		if (rm->m_rdma_op->r_write)
150			rdsv3_stats_add(s_send_rdma_bytes,
151			    rm->m_rdma_op->r_bytes);
152		else
153			rdsv3_stats_add(s_recv_rdma_bytes,
154			    rm->m_rdma_op->r_bytes);
155	}
156
157	/*
158	 * If anyone waited for this message to get flushed out, wake
159	 * them up now
160	 */
161	rdsv3_message_unmapped(rm);
162
163	rdsv3_message_put(rm);
164	send->s_rm = NULL;
165}
166
167void
168rdsv3_ib_send_init_ring(struct rdsv3_ib_connection *ic)
169{
170	struct rdsv3_ib_send_work *send;
171	uint32_t i;
172
173	RDSV3_DPRINTF4("rdsv3_ib_send_init_ring", "ic: %p", ic);
174
175	for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
176		send->s_rm = NULL;
177		send->s_op = NULL;
178	}
179}
180
181void
182rdsv3_ib_send_clear_ring(struct rdsv3_ib_connection *ic)
183{
184	struct rdsv3_ib_send_work *send;
185	uint32_t i;
186
187	RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "ic: %p", ic);
188
189	for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
190		if (send->s_opcode == 0xdd)
191			continue;
192		if (send->s_rm)
193			rdsv3_ib_send_unmap_rm(ic, send, IBT_WC_WR_FLUSHED_ERR);
194		if (send->s_op)
195			rdsv3_ib_send_unmap_rdma(ic, send->s_op);
196	}
197
198	RDSV3_DPRINTF4("rdsv3_ib_send_clear_ring", "Return: ic: %p", ic);
199}
200
201/*
202 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
203 * operations performed in the send path.  As the sender allocs and potentially
204 * unallocs the next free entry in the ring it doesn't alter which is
205 * the next to be freed, which is what this is concerned with.
206 */
207void
208rdsv3_ib_send_cqe_handler(struct rdsv3_ib_connection *ic, ibt_wc_t *wc)
209{
210	struct rdsv3_connection *conn = ic->conn;
211	struct rdsv3_ib_send_work *send;
212	uint32_t completed, polled;
213	uint32_t oldest;
214	uint32_t i = 0;
215	int ret;
216
217	RDSV3_DPRINTF4("rdsv3_ib_send_cqe_handler",
218	    "wc wc_id 0x%llx status %u byte_len %u imm_data %u\n",
219	    (unsigned long long)wc->wc_id, wc->wc_status,
220	    wc->wc_bytes_xfer, ntohl(wc->wc_immed_data));
221
222	rdsv3_ib_stats_inc(s_ib_tx_cq_event);
223
224	if (wc->wc_id == RDSV3_IB_ACK_WR_ID) {
225		if (ic->i_ack_queued + HZ/2 < jiffies)
226			rdsv3_ib_stats_inc(s_ib_tx_stalled);
227		rdsv3_ib_ack_send_complete(ic);
228		return;
229	}
230
231	oldest = rdsv3_ib_ring_oldest(&ic->i_send_ring);
232
233	completed = rdsv3_ib_ring_completed(&ic->i_send_ring,
234	    (wc->wc_id & ~RDSV3_IB_SEND_OP), oldest);
235
236	for (i = 0; i < completed; i++) {
237		send = &ic->i_sends[oldest];
238
239		/*
240		 * In the error case, wc->opcode sometimes contains
241		 * garbage
242		 */
243		switch (send->s_opcode) {
244		case IBT_WRC_SEND:
245			if (send->s_rm)
246				rdsv3_ib_send_unmap_rm(ic, send,
247				    wc->wc_status);
248			break;
249		case IBT_WRC_RDMAW:
250		case IBT_WRC_RDMAR:
251			/*
252			 * Nothing to be done - the SG list will
253			 * be unmapped
254			 * when the SEND completes.
255			 */
256			break;
257		default:
258#ifndef __lock_lint
259			RDSV3_DPRINTF2("rdsv3_ib_send_cq_comp_handler",
260			    "RDS/IB: %s: unexpected opcode "
261			    "0x%x in WR!",
262			    __func__, send->s_opcode);
263#endif
264			break;
265		}
266
267		send->s_opcode = 0xdd;
268		if (send->s_queued + HZ/2 < jiffies)
269			rdsv3_ib_stats_inc(s_ib_tx_stalled);
270
271		/*
272		 * If a RDMA operation produced an error, signal
273		 * this right
274		 * away. If we don't, the subsequent SEND that goes
275		 * with this
276		 * RDMA will be canceled with ERR_WFLUSH, and the
277		 * application
278		 * never learn that the RDMA failed.
279		 */
280		if (wc->wc_status ==
281		    IBT_WC_REMOTE_ACCESS_ERR && send->s_op) {
282			struct rdsv3_message *rm;
283
284			rm = rdsv3_send_get_message(conn, send->s_op);
285			if (rm) {
286				if (rm->m_rdma_op != NULL)
287					rdsv3_ib_send_unmap_rdma(ic,
288					    rm->m_rdma_op);
289				rdsv3_ib_send_rdma_complete(rm,
290				    wc->wc_status);
291				rdsv3_message_put(rm);
292			}
293		}
294
295		oldest = (oldest + 1) % ic->i_send_ring.w_nr;
296	}
297
298	rdsv3_ib_ring_free(&ic->i_send_ring, completed);
299
300	clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags);
301
302	/* We expect errors as the qp is drained during shutdown */
303	if (wc->wc_status != IBT_WC_SUCCESS && rdsv3_conn_up(conn)) {
304		RDSV3_DPRINTF2("rdsv3_ib_send_cqe_handler",
305		    "send completion on %u.%u.%u.%u "
306		    "had status %u, disconnecting and reconnecting\n",
307		    NIPQUAD(conn->c_faddr), wc->wc_status);
308		rdsv3_conn_drop(conn);
309	}
310
311	RDSV3_DPRINTF4("rdsv3_ib_send_cqe_handler", "Return: conn: %p", ic);
312}
313
314/*
315 * This is the main function for allocating credits when sending
316 * messages.
317 *
318 * Conceptually, we have two counters:
319 *  -	send credits: this tells us how many WRs we're allowed
320 *	to submit without overruning the reciever's queue. For
321 *	each SEND WR we post, we decrement this by one.
322 *
323 *  -	posted credits: this tells us how many WRs we recently
324 *	posted to the receive queue. This value is transferred
325 *	to the peer as a "credit update" in a RDS header field.
326 *	Every time we transmit credits to the peer, we subtract
327 *	the amount of transferred credits from this counter.
328 *
329 * It is essential that we avoid situations where both sides have
330 * exhausted their send credits, and are unable to send new credits
331 * to the peer. We achieve this by requiring that we send at least
332 * one credit update to the peer before exhausting our credits.
333 * When new credits arrive, we subtract one credit that is withheld
334 * until we've posted new buffers and are ready to transmit these
335 * credits (see rdsv3_ib_send_add_credits below).
336 *
337 * The RDS send code is essentially single-threaded; rdsv3_send_xmit
338 * grabs c_send_lock to ensure exclusive access to the send ring.
339 * However, the ACK sending code is independent and can race with
340 * message SENDs.
341 *
342 * In the send path, we need to update the counters for send credits
343 * and the counter of posted buffers atomically - when we use the
344 * last available credit, we cannot allow another thread to race us
345 * and grab the posted credits counter.  Hence, we have to use a
346 * spinlock to protect the credit counter, or use atomics.
347 *
348 * Spinlocks shared between the send and the receive path are bad,
349 * because they create unnecessary delays. An early implementation
350 * using a spinlock showed a 5% degradation in throughput at some
351 * loads.
352 *
353 * This implementation avoids spinlocks completely, putting both
354 * counters into a single atomic, and updating that atomic using
355 * atomic_add (in the receive path, when receiving fresh credits),
356 * and using atomic_cmpxchg when updating the two counters.
357 */
358int
359rdsv3_ib_send_grab_credits(struct rdsv3_ib_connection *ic,
360    uint32_t wanted, uint32_t *adv_credits, int need_posted)
361{
362	unsigned int avail, posted, got = 0, advertise;
363	long oldval, newval;
364
365	RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d",
366	    ic, wanted, *adv_credits, need_posted);
367
368	*adv_credits = 0;
369	if (!ic->i_flowctl)
370		return (wanted);
371
372try_again:
373	advertise = 0;
374	oldval = newval = atomic_get(&ic->i_credits);
375	posted = IB_GET_POST_CREDITS(oldval);
376	avail = IB_GET_SEND_CREDITS(oldval);
377
378	RDSV3_DPRINTF5("rdsv3_ib_send_grab_credits",
379	    "wanted (%u): credits=%u posted=%u\n", wanted, avail, posted);
380
381	/* The last credit must be used to send a credit update. */
382	if (avail && !posted)
383		avail--;
384
385	if (avail < wanted) {
386		struct rdsv3_connection *conn = ic->i_cm_id->context;
387
388		/* Oops, there aren't that many credits left! */
389		set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags);
390		got = avail;
391	} else {
392		/* Sometimes you get what you want, lalala. */
393		got = wanted;
394	}
395	newval -= IB_SET_SEND_CREDITS(got);
396
397	/*
398	 * If need_posted is non-zero, then the caller wants
399	 * the posted regardless of whether any send credits are
400	 * available.
401	 */
402	if (posted && (got || need_posted)) {
403		advertise = min(posted, RDSV3_MAX_ADV_CREDIT);
404		newval -= IB_SET_POST_CREDITS(advertise);
405	}
406
407	/* Finally bill everything */
408	if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
409		goto try_again;
410
411	*adv_credits = advertise;
412
413	RDSV3_DPRINTF4("rdsv3_ib_send_grab_credits", "ic: %p, %d %d %d",
414	    ic, got, *adv_credits, need_posted);
415
416	return (got);
417}
418
419void
420rdsv3_ib_send_add_credits(struct rdsv3_connection *conn, unsigned int credits)
421{
422	struct rdsv3_ib_connection *ic = conn->c_transport_data;
423
424	if (credits == 0)
425		return;
426
427	RDSV3_DPRINTF5("rdsv3_ib_send_add_credits",
428	    "credits (%u): current=%u%s\n",
429	    credits,
430	    IB_GET_SEND_CREDITS(atomic_get(&ic->i_credits)),
431	    test_bit(RDSV3_LL_SEND_FULL, &conn->c_flags) ?
432	    ", ll_send_full" : "");
433
434	atomic_add_32(&ic->i_credits, IB_SET_SEND_CREDITS(credits));
435	if (test_and_clear_bit(RDSV3_LL_SEND_FULL, &conn->c_flags))
436		rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_send_w, 0);
437
438	ASSERT(!(IB_GET_SEND_CREDITS(credits) >= 16384));
439
440	rdsv3_ib_stats_inc(s_ib_rx_credit_updates);
441
442	RDSV3_DPRINTF4("rdsv3_ib_send_add_credits",
443	    "Return: conn: %p, credits: %d",
444	    conn, credits);
445}
446
447void
448rdsv3_ib_advertise_credits(struct rdsv3_connection *conn, unsigned int posted)
449{
450	struct rdsv3_ib_connection *ic = conn->c_transport_data;
451
452	RDSV3_DPRINTF4("rdsv3_ib_advertise_credits", "conn: %p, posted: %d",
453	    conn, posted);
454
455	if (posted == 0)
456		return;
457
458	atomic_add_32(&ic->i_credits, IB_SET_POST_CREDITS(posted));
459
460	/*
461	 * Decide whether to send an update to the peer now.
462	 * If we would send a credit update for every single buffer we
463	 * post, we would end up with an ACK storm (ACK arrives,
464	 * consumes buffer, we refill the ring, send ACK to remote
465	 * advertising the newly posted buffer... ad inf)
466	 *
467	 * Performance pretty much depends on how often we send
468	 * credit updates - too frequent updates mean lots of ACKs.
469	 * Too infrequent updates, and the peer will run out of
470	 * credits and has to throttle.
471	 * For the time being, 16 seems to be a good compromise.
472	 */
473	if (IB_GET_POST_CREDITS(atomic_get(&ic->i_credits)) >= 16)
474		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
475}
476
477static inline void
478rdsv3_ib_xmit_populate_wr(struct rdsv3_ib_connection *ic,
479    ibt_send_wr_t *wr, unsigned int pos,
480    struct rdsv3_scatterlist *scat, unsigned int off, unsigned int length,
481    int send_flags)
482{
483	ibt_wr_ds_t *sge;
484
485	RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr",
486	    "ic: %p, wr: %p scat: %p %d %d %d %d",
487	    ic, wr, scat, pos, off, length, send_flags);
488
489	wr->wr_id = pos | RDSV3_IB_SEND_OP;
490	wr->wr_trans = IBT_RC_SRV;
491	wr->wr_flags = send_flags;
492	wr->wr_opcode = IBT_WRC_SEND;
493
494	if (length != 0) {
495		int	ix, len, assigned;
496		ibt_wr_ds_t *sgl;
497
498		ASSERT(length <= scat->length - off);
499
500		sgl = scat->sgl;
501		if (off != 0) {
502			/* find the right sgl to begin with */
503			while (sgl->ds_len <= off) {
504				off -= sgl->ds_len;
505				sgl++;
506			}
507		}
508
509		ix = 1; /* first data sgl is at 1 */
510		assigned = 0;
511		len = length;
512		do {
513			sge = &wr->wr_sgl[ix++];
514			sge->ds_va = sgl->ds_va + off;
515			assigned = min(len, sgl->ds_len - off);
516			sge->ds_len = assigned;
517			sge->ds_key = sgl->ds_key;
518			len -= assigned;
519			if (len != 0) {
520				sgl++;
521				off = 0;
522			}
523		} while (len > 0);
524
525		wr->wr_nds = ix;
526	} else {
527		/*
528		 * We're sending a packet with no payload. There is only
529		 * one SGE
530		 */
531		wr->wr_nds = 1;
532	}
533
534	sge = &wr->wr_sgl[0];
535	sge->ds_va = ic->i_send_hdrs_dma + (pos * sizeof (struct rdsv3_header));
536	sge->ds_len = sizeof (struct rdsv3_header);
537	sge->ds_key = ic->i_mr->lkey;
538
539	RDSV3_DPRINTF4("rdsv3_ib_xmit_populate_wr",
540	    "Return: ic: %p, wr: %p scat: %p", ic, wr, scat);
541}
542
543/*
544 * This can be called multiple times for a given message.  The first time
545 * we see a message we map its scatterlist into the IB device so that
546 * we can provide that mapped address to the IB scatter gather entries
547 * in the IB work requests.  We translate the scatterlist into a series
548 * of work requests that fragment the message.  These work requests complete
549 * in order so we pass ownership of the message to the completion handler
550 * once we send the final fragment.
551 *
552 * The RDS core uses the c_send_lock to only enter this function once
553 * per connection.  This makes sure that the tx ring alloc/unalloc pairs
554 * don't get out of sync and confuse the ring.
555 */
556int
557rdsv3_ib_xmit(struct rdsv3_connection *conn, struct rdsv3_message *rm,
558    unsigned int hdr_off, unsigned int sg, unsigned int off)
559{
560	struct rdsv3_ib_connection *ic = conn->c_transport_data;
561	struct ib_device *dev = ic->i_cm_id->device;
562	struct rdsv3_ib_send_work *send = NULL;
563	struct rdsv3_ib_send_work *first;
564	struct rdsv3_ib_send_work *prev;
565	ibt_send_wr_t *wr;
566	struct rdsv3_scatterlist *scat;
567	uint32_t pos;
568	uint32_t i;
569	uint32_t work_alloc;
570	uint32_t credit_alloc;
571	uint32_t posted;
572	uint32_t adv_credits = 0;
573	int send_flags = 0;
574	int sent;
575	int ret;
576	int flow_controlled = 0;
577
578	RDSV3_DPRINTF4("rdsv3_ib_xmit", "conn: %p, rm: %p", conn, rm);
579
580	ASSERT(!(off % RDSV3_FRAG_SIZE));
581	ASSERT(!(hdr_off != 0 && hdr_off != sizeof (struct rdsv3_header)));
582
583	/* Do not send cong updates to IB loopback */
584	if (conn->c_loopback &&
585	    rm->m_inc.i_hdr.h_flags & RDSV3_FLAG_CONG_BITMAP) {
586		rdsv3_cong_map_updated(conn->c_fcong, ~(uint64_t)0);
587		return (sizeof (struct rdsv3_header) + RDSV3_CONG_MAP_BYTES);
588	}
589
590#ifndef __lock_lint
591	/* FIXME we may overallocate here */
592	if (ntohl(rm->m_inc.i_hdr.h_len) == 0)
593		i = 1;
594	else
595		i = ceil(ntohl(rm->m_inc.i_hdr.h_len), RDSV3_FRAG_SIZE);
596#endif
597
598	work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, i, &pos);
599	if (work_alloc != i) {
600		rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
601		set_bit(RDSV3_LL_SEND_FULL, &conn->c_flags);
602		rdsv3_ib_stats_inc(s_ib_tx_ring_full);
603		ret = -ENOMEM;
604		goto out;
605	}
606
607	credit_alloc = work_alloc;
608	if (ic->i_flowctl) {
609		credit_alloc = rdsv3_ib_send_grab_credits(ic, work_alloc,
610		    &posted, 0);
611		adv_credits += posted;
612		if (credit_alloc < work_alloc) {
613			rdsv3_ib_ring_unalloc(&ic->i_send_ring,
614			    work_alloc - credit_alloc);
615			work_alloc = credit_alloc;
616			flow_controlled++;
617		}
618		if (work_alloc == 0) {
619			rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
620			rdsv3_ib_stats_inc(s_ib_tx_throttle);
621			ret = -ENOMEM;
622			goto out;
623		}
624	}
625
626	/* map the message the first time we see it */
627	if (ic->i_rm == NULL) {
628		/*
629		 * printk(KERN_NOTICE
630		 * "rdsv3_ib_xmit prep msg dport=%u flags=0x%x len=%d\n",
631		 * be16_to_cpu(rm->m_inc.i_hdr.h_dport),
632		 * rm->m_inc.i_hdr.h_flags,
633		 * be32_to_cpu(rm->m_inc.i_hdr.h_len));
634		 */
635		if (rm->m_nents) {
636			rm->m_count = rdsv3_ib_dma_map_sg(dev,
637			    rm->m_sg, rm->m_nents);
638			RDSV3_DPRINTF5("rdsv3_ib_xmit",
639			    "ic %p mapping rm %p: %d\n", ic, rm, rm->m_count);
640			if (rm->m_count == 0) {
641				rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure);
642				rdsv3_ib_ring_unalloc(&ic->i_send_ring,
643				    work_alloc);
644				ret = -ENOMEM; /* XXX ? */
645				RDSV3_DPRINTF2("rdsv3_ib_xmit",
646				    "fail: ic %p mapping rm %p: %d\n",
647				    ic, rm, rm->m_count);
648				goto out;
649			}
650		} else {
651			rm->m_count = 0;
652		}
653
654		ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs;
655		ic->i_unsignaled_bytes = rdsv3_ib_sysctl_max_unsig_bytes;
656		rdsv3_message_addref(rm);
657		ic->i_rm = rm;
658
659		/* Finalize the header */
660		if (test_bit(RDSV3_MSG_ACK_REQUIRED, &rm->m_flags))
661			rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_ACK_REQUIRED;
662		if (test_bit(RDSV3_MSG_RETRANSMITTED, &rm->m_flags))
663			rm->m_inc.i_hdr.h_flags |= RDSV3_FLAG_RETRANSMITTED;
664
665		/*
666		 * If it has a RDMA op, tell the peer we did it. This is
667		 * used by the peer to release use-once RDMA MRs.
668		 */
669		if (rm->m_rdma_op) {
670			struct rdsv3_ext_header_rdma ext_hdr;
671
672			ext_hdr.h_rdma_rkey = htonl(rm->m_rdma_op->r_key);
673			(void) rdsv3_message_add_extension(&rm->m_inc.i_hdr,
674			    RDSV3_EXTHDR_RDMA, &ext_hdr,
675			    sizeof (ext_hdr));
676		}
677		if (rm->m_rdma_cookie) {
678			(void) rdsv3_message_add_rdma_dest_extension(
679			    &rm->m_inc.i_hdr,
680			    rdsv3_rdma_cookie_key(rm->m_rdma_cookie),
681			    rdsv3_rdma_cookie_offset(rm->m_rdma_cookie));
682		}
683
684		/*
685		 * Note - rdsv3_ib_piggyb_ack clears the ACK_REQUIRED bit, so
686		 * we should not do this unless we have a chance of at least
687		 * sticking the header into the send ring. Which is why we
688		 * should call rdsv3_ib_ring_alloc first.
689		 */
690		rm->m_inc.i_hdr.h_ack = htonll(rdsv3_ib_piggyb_ack(ic));
691		rdsv3_message_make_checksum(&rm->m_inc.i_hdr);
692
693		/*
694		 * Update adv_credits since we reset the ACK_REQUIRED bit.
695		 */
696		(void) rdsv3_ib_send_grab_credits(ic, 0, &posted, 1);
697		adv_credits += posted;
698		ASSERT(adv_credits <= 255);
699	}
700
701	send = &ic->i_sends[pos];
702	first = send;
703	prev = NULL;
704	scat = &rm->m_sg[sg];
705	sent = 0;
706	i = 0;
707
708	/*
709	 * Sometimes you want to put a fence between an RDMA
710	 * READ and the following SEND.
711	 * We could either do this all the time
712	 * or when requested by the user. Right now, we let
713	 * the application choose.
714	 */
715	if (rm->m_rdma_op && rm->m_rdma_op->r_fence)
716		send_flags = IBT_WR_SEND_FENCE;
717
718	/*
719	 * We could be copying the header into the unused tail of the page.
720	 * That would need to be changed in the future when those pages might
721	 * be mapped userspace pages or page cache pages.  So instead we always
722	 * use a second sge and our long-lived ring of mapped headers.  We send
723	 * the header after the data so that the data payload can be aligned on
724	 * the receiver.
725	 */
726
727	/* handle a 0-len message */
728	if (ntohl(rm->m_inc.i_hdr.h_len) == 0) {
729		wr = &ic->i_send_wrs[0];
730		rdsv3_ib_xmit_populate_wr(ic, wr, pos, NULL, 0, 0, send_flags);
731		send->s_queued = jiffies;
732		send->s_op = NULL;
733		send->s_opcode = wr->wr_opcode;
734		goto add_header;
735	}
736
737	/* if there's data reference it with a chain of work reqs */
738	for (; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) {
739		unsigned int len;
740
741		send = &ic->i_sends[pos];
742
743		wr = &ic->i_send_wrs[i];
744		len = min(RDSV3_FRAG_SIZE,
745		    rdsv3_ib_sg_dma_len(dev, scat) - off);
746		rdsv3_ib_xmit_populate_wr(ic, wr, pos, scat, off, len,
747		    send_flags);
748		send->s_queued = jiffies;
749		send->s_op = NULL;
750		send->s_opcode = wr->wr_opcode;
751
752		/*
753		 * We want to delay signaling completions just enough to get
754		 * the batching benefits but not so much that we create dead
755		 * time
756		 * on the wire.
757		 */
758		if (ic->i_unsignaled_wrs-- == 0) {
759			ic->i_unsignaled_wrs = rdsv3_ib_sysctl_max_unsig_wrs;
760			wr->wr_flags |=
761			    IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
762		}
763
764		ic->i_unsignaled_bytes -= len;
765		if (ic->i_unsignaled_bytes <= 0) {
766			ic->i_unsignaled_bytes =
767			    rdsv3_ib_sysctl_max_unsig_bytes;
768			wr->wr_flags |=
769			    IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
770		}
771
772		/*
773		 * Always signal the last one if we're stopping due to flow
774		 * control.
775		 */
776		if (flow_controlled && i == (work_alloc-1)) {
777			wr->wr_flags |=
778			    IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
779		}
780
781		RDSV3_DPRINTF5("rdsv3_ib_xmit", "send %p wr %p num_sge %u \n",
782		    send, wr, wr->wr_nds);
783
784		sent += len;
785		off += len;
786		if (off == rdsv3_ib_sg_dma_len(dev, scat)) {
787			scat++;
788			off = 0;
789		}
790
791add_header:
792		/*
793		 * Tack on the header after the data. The header SGE
794		 * should already
795		 * have been set up to point to the right header buffer.
796		 */
797		(void) memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr,
798		    sizeof (struct rdsv3_header));
799
800		if (0) {
801			struct rdsv3_header *hdr = &ic->i_send_hdrs[pos];
802
803			RDSV3_DPRINTF2("rdsv3_ib_xmit",
804			    "send WR dport=%u flags=0x%x len=%d",
805			    ntohs(hdr->h_dport),
806			    hdr->h_flags,
807			    ntohl(hdr->h_len));
808		}
809		if (adv_credits) {
810			struct rdsv3_header *hdr = &ic->i_send_hdrs[pos];
811
812			/* add credit and redo the header checksum */
813			hdr->h_credit = adv_credits;
814			rdsv3_message_make_checksum(hdr);
815			adv_credits = 0;
816			rdsv3_ib_stats_inc(s_ib_tx_credit_updates);
817		}
818
819		prev = send;
820
821		pos = (pos + 1) % ic->i_send_ring.w_nr;
822	}
823
824	/*
825	 * Account the RDS header in the number of bytes we sent, but just once.
826	 * The caller has no concept of fragmentation.
827	 */
828	if (hdr_off == 0)
829		sent += sizeof (struct rdsv3_header);
830
831	/* if we finished the message then send completion owns it */
832	if (scat == &rm->m_sg[rm->m_count]) {
833		prev->s_rm = ic->i_rm;
834		wr->wr_flags |= IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
835		ic->i_rm = NULL;
836	}
837
838	if (i < work_alloc) {
839		rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
840		work_alloc = i;
841	}
842	if (ic->i_flowctl && i < credit_alloc)
843		rdsv3_ib_send_add_credits(conn, credit_alloc - i);
844
845	/* XXX need to worry about failed_wr and partial sends. */
846	ret = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id),
847	    ic->i_send_wrs, i, &posted);
848	if (posted != i) {
849		RDSV3_DPRINTF2("rdsv3_ib_xmit",
850		    "ic %p first %p nwr: %d ret %d:%d",
851		    ic, first, i, ret, posted);
852	}
853	if (ret) {
854		RDSV3_DPRINTF2("rdsv3_ib_xmit",
855		    "RDS/IB: ib_post_send to %u.%u.%u.%u "
856		    "returned %d\n", NIPQUAD(conn->c_faddr), ret);
857		rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
858		if (prev->s_rm) {
859			ic->i_rm = prev->s_rm;
860			prev->s_rm = NULL;
861		}
862		RDSV3_DPRINTF2("rdsv3_ib_xmit", "ibt_post_send failed\n");
863		rdsv3_conn_drop(ic->conn);
864		ret = -EAGAIN;
865		goto out;
866	}
867
868	ret = sent;
869
870	RDSV3_DPRINTF4("rdsv3_ib_xmit", "Return: conn: %p, rm: %p", conn, rm);
871out:
872	ASSERT(!adv_credits);
873	return (ret);
874}
875
876static void
877rdsv3_ib_dma_unmap_sg_rdma(struct ib_device *dev, uint_t num,
878	struct rdsv3_rdma_sg scat[])
879{
880	ibt_hca_hdl_t hca_hdl;
881	int i;
882	int num_sgl;
883
884	RDSV3_DPRINTF4("rdsv3_ib_dma_unmap_sg", "rdma_sg: %p", scat);
885
886	if (dev) {
887		hca_hdl = ib_get_ibt_hca_hdl(dev);
888	} else {
889		hca_hdl = scat[0].hca_hdl;
890		RDSV3_DPRINTF2("rdsv3_ib_dma_unmap_sg_rdma",
891		    "NULL dev use cached hca_hdl %p", hca_hdl);
892	}
893
894	if (hca_hdl == NULL)
895		return;
896	scat[0].hca_hdl = NULL;
897
898	for (i = 0; i < num; i++) {
899		if (scat[i].mihdl != NULL) {
900			num_sgl = (scat[i].iovec.bytes / PAGESIZE) + 2;
901			kmem_free(scat[i].swr.wr_sgl,
902			    (num_sgl * sizeof (ibt_wr_ds_t)));
903			scat[i].swr.wr_sgl = NULL;
904			(void) ibt_unmap_mem_iov(hca_hdl, scat[i].mihdl);
905			scat[i].mihdl = NULL;
906		} else
907			break;
908	}
909}
910
911/* ARGSUSED */
912uint_t
913rdsv3_ib_dma_map_sg_rdma(struct ib_device *dev, struct rdsv3_rdma_sg scat[],
914    uint_t num, struct rdsv3_scatterlist **scatl)
915{
916	ibt_hca_hdl_t hca_hdl;
917	ibt_iov_attr_t iov_attr;
918	struct buf *bp;
919	uint_t i, j, k;
920	uint_t count;
921	struct rdsv3_scatterlist *sg;
922	int ret;
923
924	RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "scat: %p, num: %d",
925	    scat, num);
926
927	hca_hdl = ib_get_ibt_hca_hdl(dev);
928	scat[0].hca_hdl = hca_hdl;
929	bzero(&iov_attr, sizeof (ibt_iov_attr_t));
930	iov_attr.iov_flags = IBT_IOV_BUF;
931	iov_attr.iov_lso_hdr_sz = 0;
932
933	for (i = 0, count = 0; i < num; i++) {
934		/* transpose umem_cookie  to buf structure */
935		bp = ddi_umem_iosetup(scat[i].umem_cookie,
936		    scat[i].iovec.addr & PAGEOFFSET, scat[i].iovec.bytes,
937		    B_WRITE, 0, 0, NULL, DDI_UMEM_SLEEP);
938		if (bp == NULL) {
939			/* free resources  and return error */
940			goto out;
941		}
942		/* setup ibt_map_mem_iov() attributes */
943		iov_attr.iov_buf = bp;
944		iov_attr.iov_wr_nds = (scat[i].iovec.bytes / PAGESIZE) + 2;
945		scat[i].swr.wr_sgl =
946		    kmem_zalloc(iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t),
947		    KM_SLEEP);
948
949		ret = ibt_map_mem_iov(hca_hdl, &iov_attr,
950		    (ibt_all_wr_t *)&scat[i].swr, &scat[i].mihdl);
951		freerbuf(bp);
952		if (ret != IBT_SUCCESS) {
953			RDSV3_DPRINTF2("rdsv3_ib_dma_map_sg_rdma",
954			    "ibt_map_mem_iov returned: %d", ret);
955			/* free resources and return error */
956			kmem_free(scat[i].swr.wr_sgl,
957			    iov_attr.iov_wr_nds * sizeof (ibt_wr_ds_t));
958			goto out;
959		}
960		count += scat[i].swr.wr_nds;
961
962#ifdef  DEBUG
963		for (j = 0; j < scat[i].swr.wr_nds; j++) {
964			RDSV3_DPRINTF5("rdsv3_ib_dma_map_sg_rdma",
965			    "sgl[%d] va %llx len %x", j,
966			    scat[i].swr.wr_sgl[j].ds_va,
967			    scat[i].swr.wr_sgl[j].ds_len);
968		}
969#endif
970		RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma",
971		    "iovec.bytes: 0x%x scat[%d]swr.wr_nds: %d",
972		    scat[i].iovec.bytes, i, scat[i].swr.wr_nds);
973	}
974
975	count = ((count - 1) / RDSV3_IB_MAX_SGE) + 1;
976	RDSV3_DPRINTF4("rdsv3_ib_dma_map_sg_rdma", "Ret: num: %d", count);
977	return (count);
978
979out:
980	rdsv3_ib_dma_unmap_sg_rdma(dev, num, scat);
981	return (0);
982}
983
984int
985rdsv3_ib_xmit_rdma(struct rdsv3_connection *conn, struct rdsv3_rdma_op *op)
986{
987	struct rdsv3_ib_connection *ic = conn->c_transport_data;
988	struct rdsv3_ib_send_work *send = NULL;
989	struct rdsv3_rdma_sg *scat;
990	uint64_t remote_addr;
991	uint32_t pos;
992	uint32_t work_alloc;
993	uint32_t i, j, k, idx;
994	uint32_t left, count;
995	uint32_t posted;
996	int sent;
997	ibt_status_t status;
998	ibt_send_wr_t *wr;
999	ibt_wr_ds_t *sge;
1000
1001	RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "rdsv3_ib_conn: %p", ic);
1002
1003	/* map the message the first time we see it */
1004	if (!op->r_mapped) {
1005		op->r_count = rdsv3_ib_dma_map_sg_rdma(ic->i_cm_id->device,
1006		    op->r_rdma_sg, op->r_nents, &op->r_sg);
1007		RDSV3_DPRINTF5("rdsv3_ib_xmit_rdma", "ic %p mapping op %p: %d",
1008		    ic, op, op->r_count);
1009		if (op->r_count == 0) {
1010			rdsv3_ib_stats_inc(s_ib_tx_sg_mapping_failure);
1011			RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma",
1012			    "fail: ic %p mapping op %p: %d",
1013			    ic, op, op->r_count);
1014			return (-ENOMEM); /* XXX ? */
1015		}
1016		op->r_mapped = 1;
1017	}
1018
1019	/*
1020	 * Instead of knowing how to return a partial rdma read/write
1021	 * we insist that there
1022	 * be enough work requests to send the entire message.
1023	 */
1024	work_alloc = rdsv3_ib_ring_alloc(&ic->i_send_ring, op->r_count, &pos);
1025	if (work_alloc != op->r_count) {
1026		rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
1027		rdsv3_ib_stats_inc(s_ib_tx_ring_full);
1028		return (-ENOMEM);
1029	}
1030
1031	RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "pos %u cnt %u", pos, op->r_count);
1032	/*
1033	 * take the scatter list and transpose into a list of
1034	 * send wr's each with a scatter list of RDSV3_IB_MAX_SGE
1035	 */
1036	scat = &op->r_rdma_sg[0];
1037	sent = 0;
1038	remote_addr = op->r_remote_addr;
1039
1040	for (i = 0, k = 0; i < op->r_nents; i++) {
1041		left = scat[i].swr.wr_nds;
1042		for (idx = 0; left > 0; k++) {
1043			send = &ic->i_sends[pos];
1044			send->s_queued = jiffies;
1045			send->s_opcode = op->r_write ? IBT_WRC_RDMAW :
1046			    IBT_WRC_RDMAR;
1047			send->s_op = op;
1048
1049			wr = &ic->i_send_wrs[k];
1050			wr->wr_flags = 0;
1051			wr->wr_id = pos | RDSV3_IB_SEND_OP;
1052			wr->wr_trans = IBT_RC_SRV;
1053			wr->wr_opcode = op->r_write ? IBT_WRC_RDMAW :
1054			    IBT_WRC_RDMAR;
1055			wr->wr.rc.rcwr.rdma.rdma_raddr = remote_addr;
1056			wr->wr.rc.rcwr.rdma.rdma_rkey = op->r_key;
1057
1058			if (left > RDSV3_IB_MAX_SGE) {
1059				count = RDSV3_IB_MAX_SGE;
1060				left -= RDSV3_IB_MAX_SGE;
1061			} else {
1062				count = left;
1063				left = 0;
1064			}
1065			wr->wr_nds = count;
1066
1067			for (j = 0; j < count; j++) {
1068				sge = &wr->wr_sgl[j];
1069				*sge = scat[i].swr.wr_sgl[idx];
1070				remote_addr += scat[i].swr.wr_sgl[idx].ds_len;
1071				sent += scat[i].swr.wr_sgl[idx].ds_len;
1072				idx++;
1073				RDSV3_DPRINTF5("xmit_rdma",
1074				    "send_wrs[%d]sgl[%d] va %llx len %x",
1075				    k, j, sge->ds_va, sge->ds_len);
1076			}
1077			RDSV3_DPRINTF5("rdsv3_ib_xmit_rdma",
1078			    "wr[%d] %p key: %x code: %d tlen: %d",
1079			    k, wr, wr->wr.rc.rcwr.rdma.rdma_rkey,
1080			    wr->wr_opcode, sent);
1081
1082			/*
1083			 * We want to delay signaling completions just enough
1084			 * to get the batching benefits but not so much that
1085			 * we create dead time on the wire.
1086			 */
1087			if (ic->i_unsignaled_wrs-- == 0) {
1088				ic->i_unsignaled_wrs =
1089				    rdsv3_ib_sysctl_max_unsig_wrs;
1090				wr->wr_flags = IBT_WR_SEND_SIGNAL;
1091			}
1092
1093			pos = (pos + 1) % ic->i_send_ring.w_nr;
1094		}
1095	}
1096
1097	status = ibt_post_send(ib_get_ibt_channel_hdl(ic->i_cm_id),
1098	    ic->i_send_wrs, k, &posted);
1099	if (status != IBT_SUCCESS) {
1100		RDSV3_DPRINTF2("rdsv3_ib_xmit_rdma",
1101		    "RDS/IB: rdma ib_post_send to %u.%u.%u.%u "
1102		    "returned %d", NIPQUAD(conn->c_faddr), status);
1103		rdsv3_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
1104	}
1105	RDSV3_DPRINTF4("rdsv3_ib_xmit_rdma", "Ret: %p", ic);
1106	return (status);
1107}
1108
1109void
1110rdsv3_ib_xmit_complete(struct rdsv3_connection *conn)
1111{
1112	struct rdsv3_ib_connection *ic = conn->c_transport_data;
1113
1114	RDSV3_DPRINTF4("rdsv3_ib_xmit_complete", "conn: %p", conn);
1115
1116	/*
1117	 * We may have a pending ACK or window update we were unable
1118	 * to send previously (due to flow control). Try again.
1119	 */
1120	rdsv3_ib_attempt_ack(ic);
1121}
1122