sdp_tx.c revision 331769
1/*
2 * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses.  You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 *     Redistribution and use in source and binary forms, with or
11 *     without modification, are permitted provided that the following
12 *     conditions are met:
13 *
14 *      - Redistributions of source code must retain the above
15 *        copyright notice, this list of conditions and the following
16 *        disclaimer.
17 *
18 *      - Redistributions in binary form must reproduce the above
19 *        copyright notice, this list of conditions and the following
20 *        disclaimer in the documentation and/or other materials
21 *        provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 */
32#include "sdp.h"
33
34#define sdp_cnt(var) do { (var)++; } while (0)
35
36SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
37		"Total number of keepalive probes sent.");
38
39static int sdp_process_tx_cq(struct sdp_sock *ssk);
40static void sdp_poll_tx_timeout(void *data);
41
42int
43sdp_xmit_poll(struct sdp_sock *ssk, int force)
44{
45	int wc_processed = 0;
46
47	SDP_WLOCK_ASSERT(ssk);
48	sdp_prf(ssk->socket, NULL, "%s", __func__);
49
50	/* If we don't have a pending timer, set one up to catch our recent
51	   post in case the interface becomes idle */
52	if (!callout_pending(&ssk->tx_ring.timer))
53		callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
54		    sdp_poll_tx_timeout, ssk);
55
56	/* Poll the CQ every SDP_TX_POLL_MODER packets */
57	if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
58		wc_processed = sdp_process_tx_cq(ssk);
59
60	return wc_processed;
61}
62
63void
64sdp_post_send(struct sdp_sock *ssk, struct mbuf *mb)
65{
66	struct sdp_buf *tx_req;
67	struct sdp_bsdh *h;
68	unsigned long mseq;
69	struct ib_device *dev;
70	struct ib_send_wr *bad_wr;
71	struct ib_sge ibsge[SDP_MAX_SEND_SGES];
72	struct ib_sge *sge;
73	struct ib_send_wr tx_wr = { NULL };
74	int i, rc;
75	u64 addr;
76
77	SDPSTATS_COUNTER_MID_INC(post_send, h->mid);
78	SDPSTATS_HIST(send_size, mb->len);
79
80	if (!ssk->qp_active) {
81		m_freem(mb);
82		return;
83	}
84
85	mseq = ring_head(ssk->tx_ring);
86	h = mtod(mb, struct sdp_bsdh *);
87	ssk->tx_packets++;
88	ssk->tx_bytes += mb->m_pkthdr.len;
89
90#ifdef SDP_ZCOPY
91	if (unlikely(h->mid == SDP_MID_SRCAVAIL)) {
92		struct tx_srcavail_state *tx_sa = TX_SRCAVAIL_STATE(mb);
93		if (ssk->tx_sa != tx_sa) {
94			sdp_dbg_data(ssk->socket, "SrcAvail cancelled "
95					"before being sent!\n");
96			WARN_ON(1);
97			m_freem(mb);
98			return;
99		}
100		TX_SRCAVAIL_STATE(mb)->mseq = mseq;
101	}
102#endif
103
104	if (unlikely(mb->m_flags & M_URG))
105		h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
106	else
107		h->flags = 0;
108
109	mb->m_flags |= M_RDONLY; /* Don't allow compression once sent. */
110	h->bufs = htons(rx_ring_posted(ssk));
111	h->len = htonl(mb->m_pkthdr.len);
112	h->mseq = htonl(mseq);
113	h->mseq_ack = htonl(mseq_ack(ssk));
114
115	sdp_prf1(ssk->socket, mb, "TX: %s bufs: %d mseq:%ld ack:%d",
116			mid2str(h->mid), rx_ring_posted(ssk), mseq,
117			ntohl(h->mseq_ack));
118
119	SDP_DUMP_PACKET(ssk->socket, "TX", mb, h);
120
121	tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
122	tx_req->mb = mb;
123	dev = ssk->ib_device;
124	sge = &ibsge[0];
125	for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
126		addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
127		    DMA_TO_DEVICE);
128		/* TODO: proper error handling */
129		BUG_ON(ib_dma_mapping_error(dev, addr));
130		BUG_ON(i >= SDP_MAX_SEND_SGES);
131		tx_req->mapping[i] = addr;
132		sge->addr = addr;
133		sge->length = mb->m_len;
134		sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
135	}
136	tx_wr.next = NULL;
137	tx_wr.wr_id = mseq | SDP_OP_SEND;
138	tx_wr.sg_list = ibsge;
139	tx_wr.num_sge = i;
140	tx_wr.opcode = IB_WR_SEND;
141	tx_wr.send_flags = IB_SEND_SIGNALED;
142	if (unlikely(tx_req->mb->m_flags & M_URG))
143		tx_wr.send_flags |= IB_SEND_SOLICITED;
144
145	rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
146	if (unlikely(rc)) {
147		sdp_dbg(ssk->socket,
148				"ib_post_send failed with status %d.\n", rc);
149
150		sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
151
152		sdp_notify(ssk, ECONNRESET);
153		m_freem(tx_req->mb);
154		return;
155	}
156
157	atomic_inc(&ssk->tx_ring.head);
158	atomic_dec(&ssk->tx_ring.credits);
159	atomic_set(&ssk->remote_credits, rx_ring_posted(ssk));
160
161	return;
162}
163
164static struct mbuf *
165sdp_send_completion(struct sdp_sock *ssk, int mseq)
166{
167	struct ib_device *dev;
168	struct sdp_buf *tx_req;
169	struct mbuf *mb = NULL;
170	struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
171
172	if (unlikely(mseq != ring_tail(*tx_ring))) {
173		printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
174			mseq, ring_tail(*tx_ring));
175		goto out;
176	}
177
178	dev = ssk->ib_device;
179	tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
180	mb = tx_req->mb;
181	sdp_cleanup_sdp_buf(ssk, tx_req, DMA_TO_DEVICE);
182
183#ifdef SDP_ZCOPY
184	/* TODO: AIO and real zcopy code; add their context support here */
185	if (BZCOPY_STATE(mb))
186		BZCOPY_STATE(mb)->busy--;
187#endif
188
189	atomic_inc(&tx_ring->tail);
190
191out:
192	return mb;
193}
194
195static int
196sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
197{
198	struct mbuf *mb = NULL;
199	struct sdp_bsdh *h;
200
201	if (unlikely(wc->status)) {
202		if (wc->status != IB_WC_WR_FLUSH_ERR) {
203			sdp_prf(ssk->socket, mb, "Send completion with error. "
204				"Status %d", wc->status);
205			sdp_dbg_data(ssk->socket, "Send completion with error. "
206				"Status %d\n", wc->status);
207			sdp_notify(ssk, ECONNRESET);
208		}
209	}
210
211	mb = sdp_send_completion(ssk, wc->wr_id);
212	if (unlikely(!mb))
213		return -1;
214
215	h = mtod(mb, struct sdp_bsdh *);
216	sdp_prf1(ssk->socket, mb, "tx completion. mseq:%d", ntohl(h->mseq));
217	sdp_dbg(ssk->socket, "tx completion. %p %d mseq:%d",
218	    mb, mb->m_pkthdr.len, ntohl(h->mseq));
219	m_freem(mb);
220
221	return 0;
222}
223
224static inline void
225sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
226{
227
228	if (likely(wc->wr_id & SDP_OP_SEND)) {
229		sdp_handle_send_comp(ssk, wc);
230		return;
231	}
232
233#ifdef SDP_ZCOPY
234	if (wc->wr_id & SDP_OP_RDMA) {
235		/* TODO: handle failed RDMA read cqe */
236
237		sdp_dbg_data(ssk->socket,
238	 	    "TX comp: RDMA read. status: %d\n", wc->status);
239		sdp_prf1(sk, NULL, "TX comp: RDMA read");
240
241		if (!ssk->tx_ring.rdma_inflight) {
242			sdp_warn(ssk->socket, "ERROR: unexpected RDMA read\n");
243			return;
244		}
245
246		if (!ssk->tx_ring.rdma_inflight->busy) {
247			sdp_warn(ssk->socket,
248			    "ERROR: too many RDMA read completions\n");
249			return;
250		}
251
252		/* Only last RDMA read WR is signalled. Order is guaranteed -
253		 * therefore if Last RDMA read WR is completed - all other
254		 * have, too */
255		ssk->tx_ring.rdma_inflight->busy = 0;
256		sowwakeup(ssk->socket);
257		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
258		return;
259	}
260#endif
261
262	/* Keepalive probe sent cleanup */
263	sdp_cnt(sdp_keepalive_probes_sent);
264
265	if (likely(!wc->status))
266		return;
267
268	sdp_dbg(ssk->socket, " %s consumes KEEPALIVE status %d\n",
269			__func__, wc->status);
270
271	if (wc->status == IB_WC_WR_FLUSH_ERR)
272		return;
273
274	sdp_notify(ssk, ECONNRESET);
275}
276
277static int
278sdp_process_tx_cq(struct sdp_sock *ssk)
279{
280	struct ib_wc ibwc[SDP_NUM_WC];
281	int n, i;
282	int wc_processed = 0;
283
284	SDP_WLOCK_ASSERT(ssk);
285
286	if (!ssk->tx_ring.cq) {
287		sdp_dbg(ssk->socket, "tx irq on destroyed tx_cq\n");
288		return 0;
289	}
290
291	do {
292		n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
293		for (i = 0; i < n; ++i) {
294			sdp_process_tx_wc(ssk, ibwc + i);
295			wc_processed++;
296		}
297	} while (n == SDP_NUM_WC);
298
299	if (wc_processed) {
300		sdp_post_sends(ssk, M_NOWAIT);
301		sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d",
302				(u32) tx_ring_posted(ssk));
303		sowwakeup(ssk->socket);
304	}
305
306	return wc_processed;
307}
308
309static void
310sdp_poll_tx(struct sdp_sock *ssk)
311{
312	struct socket *sk = ssk->socket;
313	u32 inflight, wc_processed;
314
315	sdp_prf1(ssk->socket, NULL, "TX timeout: inflight=%d, head=%d tail=%d",
316		(u32) tx_ring_posted(ssk),
317		ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
318
319	if (unlikely(ssk->state == TCPS_CLOSED)) {
320		sdp_warn(sk, "Socket is closed\n");
321		goto out;
322	}
323
324	wc_processed = sdp_process_tx_cq(ssk);
325	if (!wc_processed)
326		SDPSTATS_COUNTER_INC(tx_poll_miss);
327	else
328		SDPSTATS_COUNTER_INC(tx_poll_hit);
329
330	inflight = (u32) tx_ring_posted(ssk);
331	sdp_prf1(ssk->socket, NULL, "finished tx processing. inflight = %d",
332	    inflight);
333
334	/* If there are still packets in flight and the timer has not already
335	 * been scheduled by the Tx routine then schedule it here to guarantee
336	 * completion processing of these packets */
337	if (inflight)
338		callout_reset(&ssk->tx_ring.timer, SDP_TX_POLL_TIMEOUT,
339		    sdp_poll_tx_timeout, ssk);
340out:
341#ifdef SDP_ZCOPY
342	if (ssk->tx_ring.rdma_inflight && ssk->tx_ring.rdma_inflight->busy) {
343		sdp_prf1(sk, NULL, "RDMA is inflight - arming irq");
344		sdp_arm_tx_cq(ssk);
345	}
346#endif
347	return;
348}
349
350static void
351sdp_poll_tx_timeout(void *data)
352{
353	struct sdp_sock *ssk = (struct sdp_sock *)data;
354
355	if (!callout_active(&ssk->tx_ring.timer))
356		return;
357	callout_deactivate(&ssk->tx_ring.timer);
358	sdp_poll_tx(ssk);
359}
360
361static void
362sdp_tx_irq(struct ib_cq *cq, void *cq_context)
363{
364	struct sdp_sock *ssk;
365
366	ssk = cq_context;
367	sdp_prf1(ssk->socket, NULL, "tx irq");
368	sdp_dbg_data(ssk->socket, "Got tx comp interrupt\n");
369	SDPSTATS_COUNTER_INC(tx_int_count);
370	SDP_WLOCK(ssk);
371	sdp_poll_tx(ssk);
372	SDP_WUNLOCK(ssk);
373}
374
375static
376void sdp_tx_ring_purge(struct sdp_sock *ssk)
377{
378	while (tx_ring_posted(ssk)) {
379		struct mbuf *mb;
380		mb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
381		if (!mb)
382			break;
383		m_freem(mb);
384	}
385}
386
387void
388sdp_post_keepalive(struct sdp_sock *ssk)
389{
390	int rc;
391	struct ib_send_wr wr, *bad_wr;
392
393	sdp_dbg(ssk->socket, "%s\n", __func__);
394
395	memset(&wr, 0, sizeof(wr));
396
397	wr.next    = NULL;
398	wr.wr_id   = 0;
399	wr.sg_list = NULL;
400	wr.num_sge = 0;
401	wr.opcode  = IB_WR_RDMA_WRITE;
402
403	rc = ib_post_send(ssk->qp, &wr, &bad_wr);
404	if (rc) {
405		sdp_dbg(ssk->socket,
406			"ib_post_keepalive failed with status %d.\n", rc);
407		sdp_notify(ssk, ECONNRESET);
408	}
409
410	sdp_cnt(sdp_keepalive_probes_sent);
411}
412
413static void
414sdp_tx_cq_event_handler(struct ib_event *event, void *data)
415{
416}
417
418int
419sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
420{
421	struct ib_cq_init_attr tx_cq_attr = {
422		.cqe = SDP_TX_SIZE,
423		.comp_vector = 0,
424		.flags = 0,
425	};
426	struct ib_cq *tx_cq;
427	int rc = 0;
428
429	sdp_dbg(ssk->socket, "tx ring create\n");
430	callout_init_rw(&ssk->tx_ring.timer, &ssk->lock, 0);
431	callout_init_rw(&ssk->nagle_timer, &ssk->lock, 0);
432	atomic_set(&ssk->tx_ring.head, 1);
433	atomic_set(&ssk->tx_ring.tail, 1);
434
435	ssk->tx_ring.buffer = malloc(sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE,
436	    M_SDP, M_WAITOK);
437
438	tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
439			  ssk, &tx_cq_attr);
440	if (IS_ERR(tx_cq)) {
441		rc = PTR_ERR(tx_cq);
442		sdp_warn(ssk->socket, "Unable to allocate TX CQ: %d.\n", rc);
443		goto err_cq;
444	}
445	ssk->tx_ring.cq = tx_cq;
446	ssk->tx_ring.poll_cnt = 0;
447	sdp_arm_tx_cq(ssk);
448
449	return 0;
450
451err_cq:
452	free(ssk->tx_ring.buffer, M_SDP);
453	ssk->tx_ring.buffer = NULL;
454	return rc;
455}
456
457void
458sdp_tx_ring_destroy(struct sdp_sock *ssk)
459{
460
461	sdp_dbg(ssk->socket, "tx ring destroy\n");
462	SDP_WLOCK(ssk);
463	callout_stop(&ssk->tx_ring.timer);
464	callout_stop(&ssk->nagle_timer);
465	SDP_WUNLOCK(ssk);
466	callout_drain(&ssk->tx_ring.timer);
467	callout_drain(&ssk->nagle_timer);
468
469	if (ssk->tx_ring.buffer) {
470		sdp_tx_ring_purge(ssk);
471		free(ssk->tx_ring.buffer, M_SDP);
472		ssk->tx_ring.buffer = NULL;
473	}
474
475	if (ssk->tx_ring.cq) {
476		if (ib_destroy_cq(ssk->tx_ring.cq)) {
477			sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
478					ssk->tx_ring.cq);
479		} else {
480			ssk->tx_ring.cq = NULL;
481		}
482	}
483
484	WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
485}
486