sdp_rx.c revision 219820
1/*
2 * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses.  You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 *     Redistribution and use in source and binary forms, with or
11 *     without modification, are permitted provided that the following
12 *     conditions are met:
13 *
14 *      - Redistributions of source code must retain the above
15 *        copyright notice, this list of conditions and the following
16 *        disclaimer.
17 *
18 *      - Redistributions in binary form must reproduce the above
19 *        copyright notice, this list of conditions and the following
20 *        disclaimer in the documentation and/or other materials
21 *        provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 */
32#include "sdp.h"
33
34SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
35		"Receive buffer initial size in bytes.");
36SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
37		"Receive buffer size scale factor.");
38
39/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
40static void
41sdp_handle_disconn(struct sdp_sock *ssk)
42{
43
44	sdp_dbg(ssk->socket, "%s\n", __func__);
45
46	SDP_WLOCK_ASSERT(ssk);
47	if (TCPS_HAVERCVDFIN(ssk->state) == 0)
48		socantrcvmore(ssk->socket);
49
50	switch (ssk->state) {
51	case TCPS_SYN_RECEIVED:
52	case TCPS_ESTABLISHED:
53		ssk->state = TCPS_CLOSE_WAIT;
54		break;
55
56	case TCPS_FIN_WAIT_1:
57		/* Received a reply FIN - start Infiniband tear down */
58		sdp_dbg(ssk->socket,
59		    "%s: Starting Infiniband tear down sending DREQ\n",
60		    __func__);
61
62		sdp_cancel_dreq_wait_timeout(ssk);
63		ssk->qp_active = 0;
64		if (ssk->id) {
65			struct rdma_cm_id *id;
66
67			id = ssk->id;
68			SDP_WUNLOCK(ssk);
69			rdma_disconnect(id);
70			SDP_WLOCK(ssk);
71		} else {
72			sdp_warn(ssk->socket,
73			    "%s: ssk->id is NULL\n", __func__);
74			return;
75		}
76		break;
77	case TCPS_TIME_WAIT:
78		/* This is a mutual close situation and we've got the DREQ from
79		   the peer before the SDP_MID_DISCONNECT */
80		break;
81	case TCPS_CLOSED:
82		/* FIN arrived after IB teardown started - do nothing */
83		sdp_dbg(ssk->socket, "%s: fin in state %s\n",
84		    __func__, sdp_state_str(ssk->state));
85		return;
86	default:
87		sdp_warn(ssk->socket,
88		    "%s: FIN in unexpected state. state=%d\n",
89		    __func__, ssk->state);
90		break;
91	}
92}
93
94static int
95sdp_post_recv(struct sdp_sock *ssk)
96{
97	struct sdp_buf *rx_req;
98	int i, rc;
99	u64 addr;
100	struct ib_device *dev;
101	struct ib_recv_wr rx_wr = { NULL };
102	struct ib_sge ibsge[SDP_MAX_RECV_SGES];
103	struct ib_sge *sge = ibsge;
104	struct ib_recv_wr *bad_wr;
105	struct mbuf *mb, *m;
106	struct sdp_bsdh *h;
107	int id = ring_head(ssk->rx_ring);
108
109	/* Now, allocate and repost recv */
110	sdp_prf(ssk->socket, mb, "Posting mb");
111	mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
112	if (mb == NULL) {
113		/* Retry so we can't stall out with no memory. */
114		if (!rx_ring_posted(ssk))
115			queue_work(rx_comp_wq, &ssk->rx_comp_work);
116		return -1;
117	}
118	for (m = mb; m != NULL; m = m->m_next) {
119		m->m_len = (m->m_flags & M_EXT) ? m->m_ext.ext_size :
120                        ((m->m_flags & M_PKTHDR) ? MHLEN : MLEN);
121		mb->m_pkthdr.len += m->m_len;
122	}
123	h = mtod(mb, struct sdp_bsdh *);
124	rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
125	rx_req->mb = mb;
126	dev = ssk->ib_device;
127        for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
128		addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
129		    DMA_TO_DEVICE);
130		/* TODO: proper error handling */
131		BUG_ON(ib_dma_mapping_error(dev, addr));
132		BUG_ON(i >= SDP_MAX_RECV_SGES);
133		rx_req->mapping[i] = addr;
134		sge->addr = addr;
135		sge->length = mb->m_len;
136		sge->lkey = ssk->sdp_dev->mr->lkey;
137        }
138
139	rx_wr.next = NULL;
140	rx_wr.wr_id = id | SDP_OP_RECV;
141	rx_wr.sg_list = ibsge;
142	rx_wr.num_sge = i;
143	rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
144	if (unlikely(rc)) {
145		sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
146
147		sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
148		m_freem(mb);
149
150		sdp_notify(ssk, ECONNRESET);
151
152		return -1;
153	}
154
155	atomic_inc(&ssk->rx_ring.head);
156	SDPSTATS_COUNTER_INC(post_recv);
157
158	return 0;
159}
160
161static inline int
162sdp_post_recvs_needed(struct sdp_sock *ssk)
163{
164	unsigned long bytes_in_process;
165	unsigned long max_bytes;
166	int buffer_size;
167	int posted;
168
169	if (!ssk->qp_active || !ssk->socket)
170		return 0;
171
172	posted = rx_ring_posted(ssk);
173	if (posted >= SDP_RX_SIZE)
174		return 0;
175	if (posted < SDP_MIN_TX_CREDITS)
176		return 1;
177
178	buffer_size = ssk->recv_bytes;
179	max_bytes = max(ssk->socket->so_snd.sb_hiwat,
180	    (1 + SDP_MIN_TX_CREDITS) * buffer_size);
181	max_bytes *= rcvbuf_scale;
182	/*
183	 * Compute bytes in the receive queue and socket buffer.
184	 */
185	bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
186	bytes_in_process += ssk->socket->so_rcv.sb_cc;
187
188	return bytes_in_process < max_bytes;
189}
190
191static inline void
192sdp_post_recvs(struct sdp_sock *ssk)
193{
194
195	while (sdp_post_recvs_needed(ssk))
196		if (sdp_post_recv(ssk))
197			return;
198}
199
200static inline struct mbuf *
201sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
202{
203	struct sdp_sock *ssk = sdp_sk(sk);
204	struct sdp_bsdh *h;
205
206	h = mtod(mb, struct sdp_bsdh *);
207
208#ifdef SDP_ZCOPY
209	SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
210	if (h->mid == SDP_MID_SRCAVAIL) {
211		struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
212		struct rx_srcavail_state *rx_sa;
213
214		ssk->srcavail_cancel_mseq = 0;
215
216		ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
217				sizeof(struct rx_srcavail_state), M_NOWAIT);
218
219		rx_sa->mseq = ntohl(h->mseq);
220		rx_sa->used = 0;
221		rx_sa->len = mb_len = ntohl(srcah->len);
222		rx_sa->rkey = ntohl(srcah->rkey);
223		rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
224		rx_sa->flags = 0;
225
226		if (ssk->tx_sa) {
227			sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
228					"for TX SrcAvail. waking up TX SrcAvail"
229					"to be aborted\n");
230			wake_up(sk->sk_sleep);
231		}
232
233		atomic_add(mb->len, &ssk->rcv_nxt);
234		sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
235			mb_len, rx_sa->vaddr);
236	} else
237#endif
238	{
239		atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
240	}
241
242	m_adj(mb, SDP_HEAD_SIZE);
243	SOCKBUF_LOCK(&sk->so_rcv);
244	if (unlikely(h->flags & SDP_OOB_PRES))
245		sdp_urg(ssk, mb);
246	sbappend_locked(&sk->so_rcv, mb);
247	sorwakeup_locked(sk);
248	return mb;
249}
250
251static int
252sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
253{
254
255	return MIN(new_size, SDP_MAX_PACKET);
256}
257
258int
259sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
260{
261
262	ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
263	sdp_post_recvs(ssk);
264
265	return 0;
266}
267
268int
269sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
270{
271	u32 curr_size = ssk->recv_bytes;
272	u32 max_size = SDP_MAX_PACKET;
273
274	if (new_size > curr_size && new_size <= max_size) {
275		ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
276		return 0;
277	}
278	return -1;
279}
280
281static void
282sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
283{
284	if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
285		ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
286	else
287		ssk->recv_request_head = ring_tail(ssk->rx_ring);
288	ssk->recv_request = 1;
289}
290
291static void
292sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
293{
294	u32 new_size = ntohl(buf->size);
295
296	if (new_size > ssk->xmit_size_goal)
297		ssk->xmit_size_goal = new_size;
298}
299
300static struct mbuf *
301sdp_recv_completion(struct sdp_sock *ssk, int id)
302{
303	struct sdp_buf *rx_req;
304	struct ib_device *dev;
305	struct mbuf *mb;
306
307	if (unlikely(id != ring_tail(ssk->rx_ring))) {
308		printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
309			id, ring_tail(ssk->rx_ring));
310		return NULL;
311	}
312
313	dev = ssk->ib_device;
314	rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
315	mb = rx_req->mb;
316	sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
317
318	atomic_inc(&ssk->rx_ring.tail);
319	atomic_dec(&ssk->remote_credits);
320	return mb;
321}
322
323/* socket lock should be taken before calling this */
324static int
325sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
326{
327	struct sdp_bsdh *h;
328	struct socket *sk;
329
330	SDP_WLOCK_ASSERT(ssk);
331	sk = ssk->socket;
332 	h = mtod(mb, struct sdp_bsdh *);
333	switch (h->mid) {
334	case SDP_MID_DATA:
335	case SDP_MID_SRCAVAIL:
336		sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
337
338		/* got data in RCV_SHUTDOWN */
339		if (ssk->state == TCPS_FIN_WAIT_1) {
340			sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
341			sdp_notify(ssk, ECONNRESET);
342		}
343		m_freem(mb);
344
345		break;
346#ifdef SDP_ZCOPY
347	case SDP_MID_RDMARDCOMPL:
348		m_freem(mb);
349		break;
350	case SDP_MID_SENDSM:
351		sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
352		m_freem(mb);
353		break;
354	case SDP_MID_SRCAVAIL_CANCEL:
355		sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
356		sdp_prf(sk, NULL, "Handling SrcAvailCancel");
357		if (ssk->rx_sa) {
358			ssk->srcavail_cancel_mseq = ntohl(h->mseq);
359			ssk->rx_sa->flags |= RX_SA_ABORTED;
360			ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
361			                      the dirty logic from recvmsg */
362		} else {
363			sdp_dbg(sk, "Got SrcAvailCancel - "
364					"but no SrcAvail in process\n");
365		}
366		m_freem(mb);
367		break;
368	case SDP_MID_SINKAVAIL:
369		sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
370		sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
371		/* FALLTHROUGH */
372#endif
373	case SDP_MID_ABORT:
374		sdp_dbg_data(sk, "Handling ABORT\n");
375		sdp_prf(sk, NULL, "Handling ABORT");
376		sdp_notify(ssk, ECONNRESET);
377		m_freem(mb);
378		break;
379	case SDP_MID_DISCONN:
380		sdp_dbg_data(sk, "Handling DISCONN\n");
381		sdp_prf(sk, NULL, "Handling DISCONN");
382		sdp_handle_disconn(ssk);
383		break;
384	case SDP_MID_CHRCVBUF:
385		sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
386		sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
387		m_freem(mb);
388		break;
389	case SDP_MID_CHRCVBUF_ACK:
390		sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
391		sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
392		m_freem(mb);
393		break;
394	default:
395		/* TODO: Handle other messages */
396		sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
397		m_freem(mb);
398	}
399
400	return 0;
401}
402
403static int
404sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
405{
406	struct socket *sk;
407	struct sdp_bsdh *h;
408	unsigned long mseq_ack;
409	int credits_before;
410
411	h = mtod(mb, struct sdp_bsdh *);
412	sk = ssk->socket;
413	/*
414	 * If another thread is in so_pcbfree this may be partially torn
415	 * down but no further synchronization is required as the destroying
416	 * thread will wait for receive to shutdown before discarding the
417	 * socket.
418	 */
419	if (sk == NULL) {
420		m_freem(mb);
421		return 0;
422	}
423
424	SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
425
426	mseq_ack = ntohl(h->mseq_ack);
427	credits_before = tx_credits(ssk);
428	atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
429			1 + ntohs(h->bufs));
430	if (mseq_ack >= ssk->nagle_last_unacked)
431		ssk->nagle_last_unacked = 0;
432
433	sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
434		mid2str(h->mid), ntohs(h->bufs), credits_before,
435		tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
436
437	if (unlikely(h->mid == SDP_MID_DATA &&
438	    mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
439		/* Credit update is valid even after RCV_SHUTDOWN */
440		m_freem(mb);
441		return 0;
442	}
443
444	if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
445	    TCPS_HAVERCVDFIN(ssk->state)) {
446		sdp_prf(sk, NULL, "Control mb - queing to control queue");
447#ifdef SDP_ZCOPY
448		if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
449			sdp_dbg_data(sk, "Got SrcAvailCancel. "
450					"seq: 0x%d seq_ack: 0x%d\n",
451					ntohl(h->mseq), ntohl(h->mseq_ack));
452			ssk->srcavail_cancel_mseq = ntohl(h->mseq);
453		}
454
455
456		if (h->mid == SDP_MID_RDMARDCOMPL) {
457			struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
458			sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
459			sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
460					ntohl(rrch->len));
461		}
462#endif
463		mb->m_nextpkt = NULL;
464		if (ssk->rx_ctl_tail)
465			ssk->rx_ctl_tail->m_nextpkt = mb;
466		else
467			ssk->rx_ctl_q = mb;
468		ssk->rx_ctl_tail = mb;
469
470		return 0;
471	}
472
473	sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
474	mb = sdp_sock_queue_rcv_mb(sk, mb);
475
476
477	return 0;
478}
479
480/* called only from irq */
481static struct mbuf *
482sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
483{
484	struct mbuf *mb;
485	struct sdp_bsdh *h;
486	struct socket *sk = ssk->socket;
487	int mseq;
488
489	mb = sdp_recv_completion(ssk, wc->wr_id);
490	if (unlikely(!mb))
491		return NULL;
492
493	if (unlikely(wc->status)) {
494		if (ssk->qp_active && sk) {
495			sdp_dbg(sk, "Recv completion with error. "
496					"Status %d, vendor: %d\n",
497				wc->status, wc->vendor_err);
498			sdp_abort(sk);
499			ssk->qp_active = 0;
500		}
501		m_freem(mb);
502		return NULL;
503	}
504
505	sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
506			(int)wc->wr_id, wc->byte_len);
507	if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
508		sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
509				wc->byte_len, sizeof(struct sdp_bsdh));
510		m_freem(mb);
511		return NULL;
512	}
513	/* Use m_adj to trim the tail of data we didn't use. */
514	m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
515	h = mtod(mb, struct sdp_bsdh *);
516
517	SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
518
519	ssk->rx_packets++;
520	ssk->rx_bytes += mb->m_pkthdr.len;
521
522	mseq = ntohl(h->mseq);
523	atomic_set(&ssk->mseq_ack, mseq);
524	if (mseq != (int)wc->wr_id)
525		sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
526				mseq, (int)wc->wr_id);
527
528	return mb;
529}
530
531/* Wakeup writers if we now have credits. */
532static void
533sdp_bzcopy_write_space(struct sdp_sock *ssk)
534{
535	struct socket *sk = ssk->socket;
536
537	if (tx_credits(ssk) >= ssk->min_bufs && sk)
538		sowwakeup(sk);
539}
540
541/* only from interrupt. */
542static int
543sdp_poll_rx_cq(struct sdp_sock *ssk)
544{
545	struct ib_cq *cq = ssk->rx_ring.cq;
546	struct ib_wc ibwc[SDP_NUM_WC];
547	int n, i;
548	int wc_processed = 0;
549	struct mbuf *mb;
550
551	do {
552		n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
553		for (i = 0; i < n; ++i) {
554			struct ib_wc *wc = &ibwc[i];
555
556			BUG_ON(!(wc->wr_id & SDP_OP_RECV));
557			mb = sdp_process_rx_wc(ssk, wc);
558			if (!mb)
559				continue;
560
561			sdp_process_rx_mb(ssk, mb);
562			wc_processed++;
563		}
564	} while (n == SDP_NUM_WC);
565
566	if (wc_processed)
567		sdp_bzcopy_write_space(ssk);
568
569	return wc_processed;
570}
571
572static void
573sdp_rx_comp_work(struct work_struct *work)
574{
575	struct sdp_sock *ssk = container_of(work, struct sdp_sock,
576			rx_comp_work);
577
578	sdp_prf(ssk->socket, NULL, "%s", __func__);
579
580	SDP_WLOCK(ssk);
581	if (unlikely(!ssk->qp)) {
582		sdp_prf(ssk->socket, NULL, "qp was destroyed");
583		goto out;
584	}
585	if (unlikely(!ssk->rx_ring.cq)) {
586		sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
587		goto out;
588	}
589
590	if (unlikely(!ssk->poll_cq)) {
591		struct rdma_cm_id *id = ssk->id;
592		if (id && id->qp)
593			rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
594		goto out;
595	}
596
597	sdp_do_posts(ssk);
598out:
599	SDP_WUNLOCK(ssk);
600}
601
602void
603sdp_do_posts(struct sdp_sock *ssk)
604{
605	struct socket *sk = ssk->socket;
606	int xmit_poll_force;
607	struct mbuf *mb;
608
609	SDP_WLOCK_ASSERT(ssk);
610	if (!ssk->qp_active) {
611		sdp_dbg(sk, "QP is deactivated\n");
612		return;
613	}
614
615	while ((mb = ssk->rx_ctl_q)) {
616		ssk->rx_ctl_q = mb->m_nextpkt;
617		mb->m_nextpkt = NULL;
618		sdp_process_rx_ctl_mb(ssk, mb);
619	}
620
621	if (ssk->state == TCPS_TIME_WAIT)
622		return;
623
624	if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
625		return;
626
627	sdp_post_recvs(ssk);
628
629	if (tx_ring_posted(ssk))
630		sdp_xmit_poll(ssk, 1);
631
632	sdp_post_sends(ssk, M_NOWAIT);
633
634	xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
635
636	if (credit_update_needed(ssk) || xmit_poll_force) {
637		/* if has pending tx because run out of tx_credits - xmit it */
638		sdp_prf(sk, NULL, "Processing to free pending sends");
639		sdp_xmit_poll(ssk,  xmit_poll_force);
640		sdp_prf(sk, NULL, "Sending credit update");
641		sdp_post_sends(ssk, M_NOWAIT);
642	}
643
644}
645
646int
647sdp_process_rx(struct sdp_sock *ssk)
648{
649	int wc_processed = 0;
650	int credits_before;
651
652	if (!rx_ring_trylock(&ssk->rx_ring)) {
653		sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
654		return 0;
655	}
656
657	credits_before = tx_credits(ssk);
658
659	wc_processed = sdp_poll_rx_cq(ssk);
660	sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
661
662	if (wc_processed) {
663		sdp_prf(ssk->socket, NULL, "credits:  %d -> %d",
664				credits_before, tx_credits(ssk));
665		queue_work(rx_comp_wq, &ssk->rx_comp_work);
666	}
667	sdp_arm_rx_cq(ssk);
668
669	rx_ring_unlock(&ssk->rx_ring);
670
671	return (wc_processed);
672}
673
674static void
675sdp_rx_irq(struct ib_cq *cq, void *cq_context)
676{
677	struct socket *sk = cq_context;
678	struct sdp_sock *ssk = sdp_sk(sk);
679
680	if (cq != ssk->rx_ring.cq) {
681		sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
682		return;
683	}
684
685	SDPSTATS_COUNTER_INC(rx_int_count);
686
687	sdp_prf(sk, NULL, "rx irq");
688
689	sdp_process_rx(ssk);
690}
691
692static
693void sdp_rx_ring_purge(struct sdp_sock *ssk)
694{
695	while (rx_ring_posted(ssk) > 0) {
696		struct mbuf *mb;
697		mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
698		if (!mb)
699			break;
700		m_freem(mb);
701	}
702}
703
704void
705sdp_rx_ring_init(struct sdp_sock *ssk)
706{
707	ssk->rx_ring.buffer = NULL;
708	ssk->rx_ring.destroyed = 0;
709	rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
710}
711
712static void
713sdp_rx_cq_event_handler(struct ib_event *event, void *data)
714{
715}
716
717int
718sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
719{
720	struct ib_cq *rx_cq;
721	int rc = 0;
722
723
724	sdp_dbg(ssk->socket, "rx ring created");
725	INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
726	atomic_set(&ssk->rx_ring.head, 1);
727	atomic_set(&ssk->rx_ring.tail, 1);
728
729	ssk->rx_ring.buffer = kmalloc(
730			sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL);
731	if (!ssk->rx_ring.buffer) {
732		sdp_warn(ssk->socket,
733			"Unable to allocate RX Ring size %zd.\n",
734			 sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
735
736		return -ENOMEM;
737	}
738
739	rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
740			  ssk->socket, SDP_RX_SIZE, IB_CQ_VECTOR_LEAST_ATTACHED);
741
742	if (IS_ERR(rx_cq)) {
743		rc = PTR_ERR(rx_cq);
744		sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
745		goto err_cq;
746	}
747
748	sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
749	sdp_arm_rx_cq(ssk);
750
751	return 0;
752
753err_cq:
754	kfree(ssk->rx_ring.buffer);
755	ssk->rx_ring.buffer = NULL;
756	return rc;
757}
758
759void
760sdp_rx_ring_destroy(struct sdp_sock *ssk)
761{
762
763	cancel_work_sync(&ssk->rx_comp_work);
764	rx_ring_destroy_lock(&ssk->rx_ring);
765
766	if (ssk->rx_ring.buffer) {
767		sdp_rx_ring_purge(ssk);
768
769		kfree(ssk->rx_ring.buffer);
770		ssk->rx_ring.buffer = NULL;
771	}
772
773	if (ssk->rx_ring.cq) {
774		if (ib_destroy_cq(ssk->rx_ring.cq)) {
775			sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
776				ssk->rx_ring.cq);
777		} else {
778			ssk->rx_ring.cq = NULL;
779		}
780	}
781
782	WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
783}
784