1/*-
2 * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3 *
4 * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
5 *
6 * This software is available to you under a choice of one of two
7 * licenses.  You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * OpenIB.org BSD license below:
11 *
12 *     Redistribution and use in source and binary forms, with or
13 *     without modification, are permitted provided that the following
14 *     conditions are met:
15 *
16 *      - Redistributions of source code must retain the above
17 *        copyright notice, this list of conditions and the following
18 *        disclaimer.
19 *
20 *      - Redistributions in binary form must reproduce the above
21 *        copyright notice, this list of conditions and the following
22 *        disclaimer in the documentation and/or other materials
23 *        provided with the distribution.
24 *
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32 * SOFTWARE.
33 */
34#include "sdp.h"
35
36SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
37		"Receive buffer initial size in bytes.");
38SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
39		"Receive buffer size scale factor.");
40
41/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
42static void
43sdp_handle_disconn(struct sdp_sock *ssk)
44{
45
46	sdp_dbg(ssk->socket, "%s\n", __func__);
47
48	SDP_WLOCK_ASSERT(ssk);
49	if (TCPS_HAVERCVDFIN(ssk->state) == 0)
50		socantrcvmore(ssk->socket);
51
52	switch (ssk->state) {
53	case TCPS_SYN_RECEIVED:
54	case TCPS_ESTABLISHED:
55		ssk->state = TCPS_CLOSE_WAIT;
56		break;
57
58	case TCPS_FIN_WAIT_1:
59		/* Received a reply FIN - start Infiniband tear down */
60		sdp_dbg(ssk->socket,
61		    "%s: Starting Infiniband tear down sending DREQ\n",
62		    __func__);
63
64		sdp_cancel_dreq_wait_timeout(ssk);
65		ssk->qp_active = 0;
66		if (ssk->id) {
67			struct rdma_cm_id *id;
68
69			id = ssk->id;
70			SDP_WUNLOCK(ssk);
71			rdma_disconnect(id);
72			SDP_WLOCK(ssk);
73		} else {
74			sdp_warn(ssk->socket,
75			    "%s: ssk->id is NULL\n", __func__);
76			return;
77		}
78		break;
79	case TCPS_TIME_WAIT:
80		/* This is a mutual close situation and we've got the DREQ from
81		   the peer before the SDP_MID_DISCONNECT */
82		break;
83	case TCPS_CLOSED:
84		/* FIN arrived after IB teardown started - do nothing */
85		sdp_dbg(ssk->socket, "%s: fin in state %s\n",
86		    __func__, sdp_state_str(ssk->state));
87		return;
88	default:
89		sdp_warn(ssk->socket,
90		    "%s: FIN in unexpected state. state=%d\n",
91		    __func__, ssk->state);
92		break;
93	}
94}
95
96static int
97sdp_post_recv(struct sdp_sock *ssk)
98{
99	struct sdp_buf *rx_req;
100	int i, rc;
101	u64 addr;
102	struct ib_device *dev;
103	struct ib_recv_wr rx_wr = { NULL };
104	struct ib_sge ibsge[SDP_MAX_RECV_SGES];
105	struct ib_sge *sge = ibsge;
106	struct ib_recv_wr *bad_wr;
107	struct mbuf *mb, *m;
108	struct sdp_bsdh *h;
109	int id = ring_head(ssk->rx_ring);
110
111	/* Now, allocate and repost recv */
112	sdp_prf(ssk->socket, mb, "Posting mb");
113	mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
114	if (mb == NULL) {
115		/* Retry so we can't stall out with no memory. */
116		if (!rx_ring_posted(ssk))
117			queue_work(rx_comp_wq, &ssk->rx_comp_work);
118		return -1;
119	}
120	for (m = mb; m != NULL; m = m->m_next) {
121		m->m_len = M_SIZE(m);
122		mb->m_pkthdr.len += m->m_len;
123	}
124	h = mtod(mb, struct sdp_bsdh *);
125	rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
126	rx_req->mb = mb;
127	dev = ssk->ib_device;
128        for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
129		addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
130		    DMA_TO_DEVICE);
131		/* TODO: proper error handling */
132		BUG_ON(ib_dma_mapping_error(dev, addr));
133		BUG_ON(i >= SDP_MAX_RECV_SGES);
134		rx_req->mapping[i] = addr;
135		sge->addr = addr;
136		sge->length = mb->m_len;
137		sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
138        }
139
140	rx_wr.next = NULL;
141	rx_wr.wr_id = id | SDP_OP_RECV;
142	rx_wr.sg_list = ibsge;
143	rx_wr.num_sge = i;
144	rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
145	if (unlikely(rc)) {
146		sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
147
148		sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
149		m_freem(mb);
150
151		sdp_notify(ssk, ECONNRESET);
152
153		return -1;
154	}
155
156	atomic_inc(&ssk->rx_ring.head);
157	SDPSTATS_COUNTER_INC(post_recv);
158
159	return 0;
160}
161
162static inline int
163sdp_post_recvs_needed(struct sdp_sock *ssk)
164{
165	unsigned long bytes_in_process;
166	unsigned long max_bytes;
167	int buffer_size;
168	int posted;
169
170	if (!ssk->qp_active || !ssk->socket)
171		return 0;
172
173	posted = rx_ring_posted(ssk);
174	if (posted >= SDP_RX_SIZE)
175		return 0;
176	if (posted < SDP_MIN_TX_CREDITS)
177		return 1;
178
179	buffer_size = ssk->recv_bytes;
180	max_bytes = max(ssk->socket->so_rcv.sb_hiwat,
181	    (1 + SDP_MIN_TX_CREDITS) * buffer_size);
182	max_bytes *= rcvbuf_scale;
183	/*
184	 * Compute bytes in the receive queue and socket buffer.
185	 */
186	bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
187	bytes_in_process += sbused(&ssk->socket->so_rcv);
188
189	return bytes_in_process < max_bytes;
190}
191
192static inline void
193sdp_post_recvs(struct sdp_sock *ssk)
194{
195
196	while (sdp_post_recvs_needed(ssk))
197		if (sdp_post_recv(ssk))
198			return;
199}
200
201static inline struct mbuf *
202sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
203{
204	struct sdp_sock *ssk = sdp_sk(sk);
205	struct sdp_bsdh *h;
206
207	h = mtod(mb, struct sdp_bsdh *);
208
209#ifdef SDP_ZCOPY
210	SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
211	if (h->mid == SDP_MID_SRCAVAIL) {
212		struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
213		struct rx_srcavail_state *rx_sa;
214
215		ssk->srcavail_cancel_mseq = 0;
216
217		ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
218				sizeof(struct rx_srcavail_state), M_NOWAIT);
219
220		rx_sa->mseq = ntohl(h->mseq);
221		rx_sa->used = 0;
222		rx_sa->len = mb_len = ntohl(srcah->len);
223		rx_sa->rkey = ntohl(srcah->rkey);
224		rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
225		rx_sa->flags = 0;
226
227		if (ssk->tx_sa) {
228			sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
229					"for TX SrcAvail. waking up TX SrcAvail"
230					"to be aborted\n");
231			wake_up(sk->sk_sleep);
232		}
233
234		atomic_add(mb->len, &ssk->rcv_nxt);
235		sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
236			mb_len, rx_sa->vaddr);
237	} else
238#endif
239	{
240		atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
241	}
242
243	m_adj(mb, SDP_HEAD_SIZE);
244	SOCKBUF_LOCK(&sk->so_rcv);
245	if (unlikely(h->flags & SDP_OOB_PRES))
246		sdp_urg(ssk, mb);
247	sbappend_locked(&sk->so_rcv, mb, 0);
248	sorwakeup_locked(sk);
249	return mb;
250}
251
252static int
253sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
254{
255
256	return MIN(new_size, SDP_MAX_PACKET);
257}
258
259int
260sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
261{
262
263	ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
264	sdp_post_recvs(ssk);
265
266	return 0;
267}
268
269int
270sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
271{
272	u32 curr_size = ssk->recv_bytes;
273	u32 max_size = SDP_MAX_PACKET;
274
275	if (new_size > curr_size && new_size <= max_size) {
276		ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
277		return 0;
278	}
279	return -1;
280}
281
282static void
283sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
284{
285	if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
286		ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
287	else
288		ssk->recv_request_head = ring_tail(ssk->rx_ring);
289	ssk->recv_request = 1;
290}
291
292static void
293sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
294{
295	u32 new_size = ntohl(buf->size);
296
297	if (new_size > ssk->xmit_size_goal)
298		ssk->xmit_size_goal = new_size;
299}
300
301static struct mbuf *
302sdp_recv_completion(struct sdp_sock *ssk, int id)
303{
304	struct sdp_buf *rx_req;
305	struct ib_device *dev;
306	struct mbuf *mb;
307
308	if (unlikely(id != ring_tail(ssk->rx_ring))) {
309		printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
310			id, ring_tail(ssk->rx_ring));
311		return NULL;
312	}
313
314	dev = ssk->ib_device;
315	rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
316	mb = rx_req->mb;
317	sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
318
319	atomic_inc(&ssk->rx_ring.tail);
320	atomic_dec(&ssk->remote_credits);
321	return mb;
322}
323
324static void
325sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
326{
327	struct sdp_bsdh *h;
328	struct socket *sk;
329
330	SDP_WLOCK_ASSERT(ssk);
331
332	sk = ssk->socket;
333 	h = mtod(mb, struct sdp_bsdh *);
334	switch (h->mid) {
335	case SDP_MID_DATA:
336	case SDP_MID_SRCAVAIL:
337		sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
338
339		/* got data in RCV_SHUTDOWN */
340		if (ssk->state == TCPS_FIN_WAIT_1) {
341			sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
342			sdp_notify(ssk, ECONNRESET);
343		}
344
345		break;
346#ifdef SDP_ZCOPY
347	case SDP_MID_RDMARDCOMPL:
348		break;
349	case SDP_MID_SENDSM:
350		sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
351		break;
352	case SDP_MID_SRCAVAIL_CANCEL:
353		sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
354		sdp_prf(sk, NULL, "Handling SrcAvailCancel");
355		if (ssk->rx_sa) {
356			ssk->srcavail_cancel_mseq = ntohl(h->mseq);
357			ssk->rx_sa->flags |= RX_SA_ABORTED;
358			ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
359			                      the dirty logic from recvmsg */
360		} else {
361			sdp_dbg(sk, "Got SrcAvailCancel - "
362					"but no SrcAvail in process\n");
363		}
364		break;
365	case SDP_MID_SINKAVAIL:
366		sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
367		sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
368		/* FALLTHROUGH */
369#endif
370	case SDP_MID_ABORT:
371		sdp_dbg_data(sk, "Handling ABORT\n");
372		sdp_prf(sk, NULL, "Handling ABORT");
373		sdp_notify(ssk, ECONNRESET);
374		break;
375	case SDP_MID_DISCONN:
376		sdp_dbg_data(sk, "Handling DISCONN\n");
377		sdp_prf(sk, NULL, "Handling DISCONN");
378		sdp_handle_disconn(ssk);
379		break;
380	case SDP_MID_CHRCVBUF:
381		sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
382		sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
383		break;
384	case SDP_MID_CHRCVBUF_ACK:
385		sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
386		sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
387		break;
388	default:
389		/* TODO: Handle other messages */
390		sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
391		break;
392	}
393	m_freem(mb);
394}
395
396static int
397sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
398{
399	struct socket *sk;
400	struct sdp_bsdh *h;
401	unsigned long mseq_ack;
402	int credits_before;
403
404	h = mtod(mb, struct sdp_bsdh *);
405	sk = ssk->socket;
406	/*
407	 * If another thread is in so_pcbfree this may be partially torn
408	 * down but no further synchronization is required as the destroying
409	 * thread will wait for receive to shutdown before discarding the
410	 * socket.
411	 */
412	if (sk == NULL) {
413		m_freem(mb);
414		return 0;
415	}
416
417	SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
418
419	mseq_ack = ntohl(h->mseq_ack);
420	credits_before = tx_credits(ssk);
421	atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
422			1 + ntohs(h->bufs));
423	if (mseq_ack >= ssk->nagle_last_unacked)
424		ssk->nagle_last_unacked = 0;
425
426	sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
427		mid2str(h->mid), ntohs(h->bufs), credits_before,
428		tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
429
430	if (unlikely(h->mid == SDP_MID_DATA &&
431	    mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
432		/* Credit update is valid even after RCV_SHUTDOWN */
433		m_freem(mb);
434		return 0;
435	}
436
437	if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
438	    TCPS_HAVERCVDFIN(ssk->state)) {
439		sdp_prf(sk, NULL, "Control mb - queing to control queue");
440#ifdef SDP_ZCOPY
441		if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
442			sdp_dbg_data(sk, "Got SrcAvailCancel. "
443					"seq: 0x%d seq_ack: 0x%d\n",
444					ntohl(h->mseq), ntohl(h->mseq_ack));
445			ssk->srcavail_cancel_mseq = ntohl(h->mseq);
446		}
447
448
449		if (h->mid == SDP_MID_RDMARDCOMPL) {
450			struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
451			sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
452			sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
453					ntohl(rrch->len));
454		}
455#endif
456		if (mbufq_enqueue(&ssk->rxctlq, mb) != 0)
457			m_freem(mb);
458		return (0);
459	}
460
461	sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
462	mb = sdp_sock_queue_rcv_mb(sk, mb);
463
464
465	return 0;
466}
467
468/* called only from irq */
469static struct mbuf *
470sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
471{
472	struct mbuf *mb;
473	struct sdp_bsdh *h;
474	struct socket *sk = ssk->socket;
475	int mseq;
476
477	mb = sdp_recv_completion(ssk, wc->wr_id);
478	if (unlikely(!mb))
479		return NULL;
480
481	if (unlikely(wc->status)) {
482		if (ssk->qp_active && sk) {
483			sdp_dbg(sk, "Recv completion with error. "
484					"Status %d, vendor: %d\n",
485				wc->status, wc->vendor_err);
486			sdp_abort(sk);
487			ssk->qp_active = 0;
488		}
489		m_freem(mb);
490		return NULL;
491	}
492
493	sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
494			(int)wc->wr_id, wc->byte_len);
495	if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
496		sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
497				wc->byte_len, sizeof(struct sdp_bsdh));
498		m_freem(mb);
499		return NULL;
500	}
501	/* Use m_adj to trim the tail of data we didn't use. */
502	m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
503	h = mtod(mb, struct sdp_bsdh *);
504
505	SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
506
507	ssk->rx_packets++;
508	ssk->rx_bytes += mb->m_pkthdr.len;
509
510	mseq = ntohl(h->mseq);
511	atomic_set(&ssk->mseq_ack, mseq);
512	if (mseq != (int)wc->wr_id)
513		sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
514				mseq, (int)wc->wr_id);
515
516	return mb;
517}
518
519/* Wakeup writers if we now have credits. */
520static void
521sdp_bzcopy_write_space(struct sdp_sock *ssk)
522{
523	struct socket *sk = ssk->socket;
524
525	if (tx_credits(ssk) >= ssk->min_bufs && sk)
526		sowwakeup(sk);
527}
528
529/* only from interrupt. */
530static int
531sdp_poll_rx_cq(struct sdp_sock *ssk)
532{
533	struct ib_cq *cq = ssk->rx_ring.cq;
534	struct ib_wc ibwc[SDP_NUM_WC];
535	int n, i;
536	int wc_processed = 0;
537	struct mbuf *mb;
538
539	do {
540		n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
541		for (i = 0; i < n; ++i) {
542			struct ib_wc *wc = &ibwc[i];
543
544			BUG_ON(!(wc->wr_id & SDP_OP_RECV));
545			mb = sdp_process_rx_wc(ssk, wc);
546			if (!mb)
547				continue;
548
549			sdp_process_rx_mb(ssk, mb);
550			wc_processed++;
551		}
552	} while (n == SDP_NUM_WC);
553
554	if (wc_processed)
555		sdp_bzcopy_write_space(ssk);
556
557	return wc_processed;
558}
559
560static void
561sdp_rx_comp_work(struct work_struct *work)
562{
563	struct sdp_sock *ssk = container_of(work, struct sdp_sock,
564			rx_comp_work);
565
566	sdp_prf(ssk->socket, NULL, "%s", __func__);
567
568	SDP_WLOCK(ssk);
569	if (unlikely(!ssk->qp)) {
570		sdp_prf(ssk->socket, NULL, "qp was destroyed");
571		goto out;
572	}
573	if (unlikely(!ssk->rx_ring.cq)) {
574		sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
575		goto out;
576	}
577
578	if (unlikely(!ssk->poll_cq)) {
579		struct rdma_cm_id *id = ssk->id;
580		if (id && id->qp)
581			rdma_notify(id, IB_EVENT_COMM_EST);
582		goto out;
583	}
584
585	sdp_do_posts(ssk);
586out:
587	SDP_WUNLOCK(ssk);
588}
589
590void
591sdp_do_posts(struct sdp_sock *ssk)
592{
593	struct socket *sk = ssk->socket;
594	int xmit_poll_force;
595	struct mbuf *mb;
596
597	SDP_WLOCK_ASSERT(ssk);
598	if (!ssk->qp_active) {
599		sdp_dbg(sk, "QP is deactivated\n");
600		return;
601	}
602
603	while ((mb = mbufq_dequeue(&ssk->rxctlq)) != NULL)
604		sdp_process_rx_ctl_mb(ssk, mb);
605
606	if (ssk->state == TCPS_TIME_WAIT)
607		return;
608
609	if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
610		return;
611
612	sdp_post_recvs(ssk);
613
614	if (tx_ring_posted(ssk))
615		sdp_xmit_poll(ssk, 1);
616
617	sdp_post_sends(ssk, M_NOWAIT);
618
619	xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
620
621	if (credit_update_needed(ssk) || xmit_poll_force) {
622		/* if has pending tx because run out of tx_credits - xmit it */
623		sdp_prf(sk, NULL, "Processing to free pending sends");
624		sdp_xmit_poll(ssk,  xmit_poll_force);
625		sdp_prf(sk, NULL, "Sending credit update");
626		sdp_post_sends(ssk, M_NOWAIT);
627	}
628
629}
630
631int
632sdp_process_rx(struct sdp_sock *ssk)
633{
634	int wc_processed = 0;
635	int credits_before;
636
637	if (!rx_ring_trylock(&ssk->rx_ring)) {
638		sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
639		return 0;
640	}
641
642	credits_before = tx_credits(ssk);
643
644	wc_processed = sdp_poll_rx_cq(ssk);
645	sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
646
647	if (wc_processed) {
648		sdp_prf(ssk->socket, NULL, "credits:  %d -> %d",
649				credits_before, tx_credits(ssk));
650		queue_work(rx_comp_wq, &ssk->rx_comp_work);
651	}
652	sdp_arm_rx_cq(ssk);
653
654	rx_ring_unlock(&ssk->rx_ring);
655
656	return (wc_processed);
657}
658
659static void
660sdp_rx_irq(struct ib_cq *cq, void *cq_context)
661{
662	struct sdp_sock *ssk;
663
664	ssk = cq_context;
665	KASSERT(cq == ssk->rx_ring.cq,
666	    ("%s: mismatched cq on %p", __func__, ssk));
667
668	SDPSTATS_COUNTER_INC(rx_int_count);
669
670	sdp_prf(sk, NULL, "rx irq");
671
672	sdp_process_rx(ssk);
673}
674
675static
676void sdp_rx_ring_purge(struct sdp_sock *ssk)
677{
678	while (rx_ring_posted(ssk) > 0) {
679		struct mbuf *mb;
680		mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
681		if (!mb)
682			break;
683		m_freem(mb);
684	}
685}
686
687void
688sdp_rx_ring_init(struct sdp_sock *ssk)
689{
690	ssk->rx_ring.buffer = NULL;
691	ssk->rx_ring.destroyed = 0;
692	rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
693}
694
695static void
696sdp_rx_cq_event_handler(struct ib_event *event, void *data)
697{
698}
699
700int
701sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
702{
703	struct ib_cq_init_attr rx_cq_attr = {
704		.cqe = SDP_RX_SIZE,
705		.comp_vector = 0,
706		.flags = 0,
707	};
708	struct ib_cq *rx_cq;
709	int rc = 0;
710
711	sdp_dbg(ssk->socket, "rx ring created");
712	INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
713	atomic_set(&ssk->rx_ring.head, 1);
714	atomic_set(&ssk->rx_ring.tail, 1);
715
716	ssk->rx_ring.buffer = malloc(sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE,
717	    M_SDP, M_WAITOK);
718
719	rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
720	    ssk, &rx_cq_attr);
721	if (IS_ERR(rx_cq)) {
722		rc = PTR_ERR(rx_cq);
723		sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
724		goto err_cq;
725	}
726
727	sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
728	sdp_arm_rx_cq(ssk);
729
730	return 0;
731
732err_cq:
733	free(ssk->rx_ring.buffer, M_SDP);
734	ssk->rx_ring.buffer = NULL;
735	return rc;
736}
737
738void
739sdp_rx_ring_destroy(struct sdp_sock *ssk)
740{
741
742	cancel_work_sync(&ssk->rx_comp_work);
743	rx_ring_destroy_lock(&ssk->rx_ring);
744
745	if (ssk->rx_ring.buffer) {
746		sdp_rx_ring_purge(ssk);
747		free(ssk->rx_ring.buffer, M_SDP);
748		ssk->rx_ring.buffer = NULL;
749	}
750
751	if (ssk->rx_ring.cq) {
752		if (ib_destroy_cq(ssk->rx_ring.cq)) {
753			sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
754				ssk->rx_ring.cq);
755		} else {
756			ssk->rx_ring.cq = NULL;
757		}
758	}
759
760	WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
761}
762