1/*-
2 * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3 *
4 * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
5 *
6 * This software is available to you under a choice of one of two
7 * licenses.  You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * OpenIB.org BSD license below:
11 *
12 *     Redistribution and use in source and binary forms, with or
13 *     without modification, are permitted provided that the following
14 *     conditions are met:
15 *
16 *      - Redistributions of source code must retain the above
17 *        copyright notice, this list of conditions and the following
18 *        disclaimer.
19 *
20 *      - Redistributions in binary form must reproduce the above
21 *        copyright notice, this list of conditions and the following
22 *        disclaimer in the documentation and/or other materials
23 *        provided with the distribution.
24 *
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32 * SOFTWARE.
33 */
34#include <linux/tcp.h>
35#include <asm/ioctls.h>
36#include <linux/workqueue.h>
37#include <linux/net.h>
38#include <linux/socket.h>
39#include <net/protocol.h>
40#include <net/inet_common.h>
41#include <rdma/rdma_cm.h>
42#include <rdma/ib_verbs.h>
43#include <rdma/ib_fmr_pool.h>
44#include <rdma/ib_umem.h>
45#include <net/tcp.h> /* for memcpy_toiovec */
46#include <asm/io.h>
47#include <asm/uaccess.h>
48#include <linux/delay.h>
49#include "sdp.h"
50
51static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa)
52{
53	struct sdp_sock *ssk = sdp_sk(sk);
54	struct mbuf *mb;
55	int payload_len;
56	struct page *payload_pg;
57	int off, len;
58	struct ib_umem_chunk *chunk;
59
60	WARN_ON(ssk->tx_sa);
61
62	BUG_ON(!tx_sa);
63	BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
64	BUG_ON(!tx_sa->umem);
65	BUG_ON(!tx_sa->umem->chunk_list.next);
66
67	chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
68	BUG_ON(!chunk->nmap);
69
70	off = tx_sa->umem->offset;
71	len = tx_sa->umem->length;
72
73	tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
74
75	mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0);
76	if (!mb) {
77		return -ENOMEM;
78	}
79	sdp_dbg_data(sk, "sending SrcAvail\n");
80
81	TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb
82					 * but continue to live after mb is freed */
83	ssk->tx_sa = tx_sa;
84
85	/* must have payload inlined in SrcAvail packet in combined mode */
86	payload_len = MIN(tx_sa->umem->page_size - off, len);
87	payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah));
88	payload_pg  = sg_page(&chunk->page_list[0]);
89	get_page(payload_pg);
90
91	sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
92		off, payload_pg, payload_len);
93
94	mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags,
95			payload_pg, off, payload_len);
96
97	mb->len             += payload_len;
98	mb->data_len         = payload_len;
99	mb->truesize        += payload_len;
100//	sk->sk_wmem_queued   += payload_len;
101//	sk->sk_forward_alloc -= payload_len;
102
103	mb_entail(sk, ssk, mb);
104
105	ssk->write_seq += payload_len;
106	SDP_SKB_CB(mb)->end_seq += payload_len;
107
108	tx_sa->bytes_sent = tx_sa->umem->length;
109	tx_sa->bytes_acked = payload_len;
110
111	/* TODO: pushing the mb into the tx_queue should be enough */
112
113	return 0;
114}
115
116static int sdp_post_srcavail_cancel(struct socket *sk)
117{
118	struct sdp_sock *ssk = sdp_sk(sk);
119	struct mbuf *mb;
120
121	sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n");
122
123	mb = sdp_alloc_mb_srcavail_cancel(sk, 0);
124	mb_entail(sk, ssk, mb);
125
126	sdp_post_sends(ssk, 0);
127
128	schedule_delayed_work(&ssk->srcavail_cancel_work,
129			SDP_SRCAVAIL_CANCEL_TIMEOUT);
130
131	return 0;
132}
133
134void srcavail_cancel_timeout(struct work_struct *work)
135{
136	struct sdp_sock *ssk =
137		container_of(work, struct sdp_sock, srcavail_cancel_work.work);
138	struct socket *sk = ssk->socket;
139
140	lock_sock(sk);
141
142	sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
143			" closing connection\n");
144	sdp_set_error(sk, -ECONNRESET);
145	wake_up(&ssk->wq);
146
147	release_sock(sk);
148}
149
150static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
151		int ignore_signals)
152{
153	struct socket *sk = ssk->socket;
154	int err = 0;
155	long vm_wait = 0;
156	long current_timeo = *timeo_p;
157	struct tx_srcavail_state *tx_sa = ssk->tx_sa;
158	DEFINE_WAIT(wait);
159
160	sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
161	sdp_prf1(sk, NULL, "Going to sleep");
162	while (ssk->qp_active) {
163		prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
164
165		if (unlikely(!*timeo_p)) {
166			err = -ETIME;
167			tx_sa->abort_flags |= TX_SA_TIMEDOUT;
168			sdp_prf1(sk, NULL, "timeout");
169			SDPSTATS_COUNTER_INC(zcopy_tx_timeout);
170			break;
171		}
172
173		else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
174			err = -EINVAL;
175			sdp_dbg_data(sk, "acked bytes > sent bytes\n");
176			tx_sa->abort_flags |= TX_SA_ERROR;
177			break;
178		}
179
180		if (tx_sa->abort_flags & TX_SA_SENDSM) {
181			sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
182			SDPSTATS_COUNTER_INC(zcopy_tx_aborted);
183			err = -EAGAIN;
184			break ;
185		}
186
187		if (!ignore_signals) {
188			if (signal_pending(current)) {
189				err = -EINTR;
190				sdp_prf1(sk, NULL, "signalled");
191				tx_sa->abort_flags |= TX_SA_INTRRUPTED;
192				break;
193			}
194
195			if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) {
196				sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n");
197				tx_sa->abort_flags |= TX_SA_CROSS_SEND;
198				SDPSTATS_COUNTER_INC(zcopy_cross_send);
199				err = -ETIME;
200				break ;
201			}
202		}
203
204		posts_handler_put(ssk);
205
206		sk_wait_event(sk, &current_timeo,
207				tx_sa->abort_flags &&
208				ssk->rx_sa &&
209				(tx_sa->bytes_acked < tx_sa->bytes_sent) &&
210				vm_wait);
211		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
212
213		posts_handler_get(ssk);
214
215		if (tx_sa->bytes_acked == tx_sa->bytes_sent)
216			break;
217
218		if (vm_wait) {
219			vm_wait -= current_timeo;
220			current_timeo = *timeo_p;
221			if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
222			    (current_timeo -= vm_wait) < 0)
223				current_timeo = 0;
224			vm_wait = 0;
225		}
226		*timeo_p = current_timeo;
227	}
228
229	finish_wait(sk->sk_sleep, &wait);
230
231	sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
232			tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
233
234	if (!ssk->qp_active) {
235		sdp_dbg(sk, "QP destroyed while waiting\n");
236		return -EINVAL;
237	}
238	return err;
239}
240
241static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
242{
243	struct socket *sk = ssk->socket;
244	long timeo = HZ * 5; /* Timeout for RDMA read */
245	DEFINE_WAIT(wait);
246
247	sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
248	while (1) {
249		prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
250
251		if (!ssk->tx_ring.rdma_inflight->busy) {
252			sdp_dbg_data(sk, "got rdma cqe\n");
253			break;
254		}
255
256		if (!ssk->qp_active) {
257			sdp_dbg_data(sk, "QP destroyed\n");
258			break;
259		}
260
261		if (!timeo) {
262			sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
263			WARN_ON(1);
264			break;
265		}
266
267		posts_handler_put(ssk);
268
269		sdp_prf1(sk, NULL, "Going to sleep");
270		sk_wait_event(sk, &timeo,
271			!ssk->tx_ring.rdma_inflight->busy);
272		sdp_prf1(sk, NULL, "Woke up");
273		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
274
275		posts_handler_get(ssk);
276	}
277
278	finish_wait(sk->sk_sleep, &wait);
279
280	sdp_dbg_data(sk, "Finished waiting\n");
281}
282
283int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
284		struct rx_srcavail_state *rx_sa)
285{
286	struct mbuf *mb;
287	int copied = rx_sa->used - rx_sa->reported;
288
289	if (rx_sa->used <= rx_sa->reported)
290		return 0;
291
292	mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0);
293
294	rx_sa->reported += copied;
295
296	/* TODO: What if no tx_credits available? */
297	sdp_post_send(ssk, mb);
298
299	return 0;
300}
301
302int sdp_post_sendsm(struct socket *sk)
303{
304	struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0);
305
306	sdp_post_send(sdp_sk(sk), mb);
307
308	return 0;
309}
310
311static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len)
312{
313	sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len);
314	while (len > 0) {
315		if (iov->iov_len) {
316			int copy = min_t(unsigned int, iov->iov_len, len);
317			len -= copy;
318			iov->iov_len -= copy;
319			iov->iov_base += copy;
320		}
321		iov++;
322	}
323
324	return 0;
325}
326
327static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
328{
329	int bytes = 0;
330
331	while (sge_cnt > 0) {
332		bytes += sge->length;
333		sge++;
334		sge_cnt--;
335	}
336
337	return bytes;
338}
339void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
340{
341	struct socket *sk = ssk->socket;
342	unsigned long flags;
343
344	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
345
346	if (!ssk->tx_sa) {
347		sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
348		goto out;
349	}
350
351	if (ssk->tx_sa->mseq > mseq_ack) {
352		sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. "
353			"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
354			mseq_ack, ssk->tx_sa->mseq);
355		goto out;
356	}
357
358	sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
359
360	ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
361	cancel_delayed_work(&ssk->srcavail_cancel_work);
362
363	wake_up(sk->sk_sleep);
364	sdp_dbg_data(sk, "woke up sleepers\n");
365
366out:
367	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
368}
369
370void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
371		u32 bytes_completed)
372{
373	struct socket *sk = ssk->socket;
374	unsigned long flags;
375
376	sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
377	sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
378
379	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
380
381	BUG_ON(!ssk);
382
383	if (!ssk->tx_sa) {
384		sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
385		goto out;
386	}
387
388	if (ssk->tx_sa->mseq > mseq_ack) {
389		sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. "
390			"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
391			mseq_ack, ssk->tx_sa->mseq);
392		goto out;
393	}
394
395	ssk->tx_sa->bytes_acked += bytes_completed;
396
397	wake_up(sk->sk_sleep);
398	sdp_dbg_data(sk, "woke up sleepers\n");
399
400out:
401	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
402	return;
403}
404
405static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset)
406{
407	unsigned long avail;
408	unsigned long lock_limit;
409
410	if (capable(CAP_IPC_LOCK))
411		return ULONG_MAX;
412
413	lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
414	avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT);
415
416	return avail - offset;
417}
418
419static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len,
420	struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
421{
422	struct ib_pool_fmr *fmr;
423	struct ib_umem *umem;
424	struct ib_device *dev;
425	u64 *pages;
426	struct ib_umem_chunk *chunk;
427	int n, j, k;
428	int rc = 0;
429	unsigned long max_lockable_bytes;
430
431	if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) {
432		sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
433			len, SDP_MAX_RDMA_READ_LEN);
434		len = SDP_MAX_RDMA_READ_LEN;
435	}
436
437	max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK);
438	if (unlikely(len > max_lockable_bytes)) {
439		sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n",
440			len, max_lockable_bytes);
441		len = max_lockable_bytes;
442	}
443
444	sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n",
445			uaddr, len, max_lockable_bytes);
446
447	umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
448		IB_ACCESS_REMOTE_WRITE, 0);
449
450	if (IS_ERR(umem)) {
451		rc = PTR_ERR(umem);
452		sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc);
453		sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n",
454				current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur,
455				current->signal->rlim[RLIMIT_MEMLOCK].rlim_max,
456				capable(CAP_IPC_LOCK));
457		goto err_umem_get;
458	}
459
460	sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
461		umem->offset, umem->length);
462
463	pages = (u64 *) __get_free_page(GFP_KERNEL);
464	if (!pages)
465		goto err_pages_alloc;
466
467	n = 0;
468
469	dev = sdp_sk(sk)->ib_device;
470	list_for_each_entry(chunk, &umem->chunk_list, list) {
471		for (j = 0; j < chunk->nmap; ++j) {
472			len = ib_sg_dma_len(dev,
473					&chunk->page_list[j]) >> PAGE_SHIFT;
474
475			for (k = 0; k < len; ++k) {
476				pages[n++] = ib_sg_dma_address(dev,
477						&chunk->page_list[j]) +
478					umem->page_size * k;
479
480			}
481		}
482	}
483
484	fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
485	if (IS_ERR(fmr)) {
486		sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
487		goto err_fmr_alloc;
488	}
489
490	free_page((unsigned long) pages);
491
492	*_umem = umem;
493	*_fmr = fmr;
494
495	return 0;
496
497err_fmr_alloc:
498	free_page((unsigned long) pages);
499
500err_pages_alloc:
501	ib_umem_release(umem);
502
503err_umem_get:
504
505	return rc;
506}
507
508void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
509{
510	if (!sdp_sk(sk)->qp_active)
511		return;
512
513	ib_fmr_pool_unmap(*_fmr);
514	*_fmr = NULL;
515
516	ib_umem_release(*_umem);
517	*_umem = NULL;
518}
519
520static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa)
521{
522	struct sdp_sock *ssk = sdp_sk(sk);
523	struct ib_send_wr *bad_wr;
524	struct ib_send_wr wr = { NULL };
525	struct ib_sge sge;
526
527	wr.opcode = IB_WR_RDMA_READ;
528	wr.next = NULL;
529	wr.wr_id = SDP_OP_RDMA;
530	wr.wr.rdma.rkey = rx_sa->rkey;
531	wr.send_flags = 0;
532
533	ssk->tx_ring.rdma_inflight = rx_sa;
534
535	sge.addr = rx_sa->umem->offset;
536	sge.length = rx_sa->umem->length;
537	sge.lkey = rx_sa->fmr->fmr->lkey;
538
539	wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
540	wr.num_sge = 1;
541	wr.sg_list = &sge;
542	rx_sa->busy++;
543
544	wr.send_flags = IB_SEND_SIGNALED;
545
546	return ib_post_send(ssk->qp, &wr, &bad_wr);
547}
548
549int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb,
550		unsigned long *used)
551{
552	struct sdp_sock *ssk = sdp_sk(sk);
553	struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb);
554	int got_srcavail_cancel;
555	int rc = 0;
556	int len = *used;
557	int copied;
558
559	sdp_dbg_data(ssk->socket, "preparing RDMA read."
560		" len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
561
562	sock_hold(sk, SOCK_REF_RDMA_RD);
563
564	if (len > rx_sa->len) {
565		sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
566		WARN_ON(1);
567		len = rx_sa->len;
568	}
569
570	rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
571	if (rc) {
572		sdp_warn(sk, "Error allocating fmr: %d\n", rc);
573		goto err_alloc_fmr;
574	}
575
576	rc = sdp_post_rdma_read(sk, rx_sa);
577	if (unlikely(rc)) {
578		sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
579		sdp_set_error(ssk->socket, -ECONNRESET);
580		wake_up(&ssk->wq);
581		goto err_post_send;
582	}
583
584	sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc);
585
586	got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
587
588	sdp_arm_tx_cq(sk);
589
590	sdp_wait_rdma_wr_finished(ssk);
591
592	sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc);
593	if (!ssk->qp_active) {
594		sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
595		rc = -EPIPE;
596		goto err_post_send;
597	}
598
599	copied = rx_sa->umem->length;
600
601	sdp_update_iov_used(sk, iov, copied);
602	rx_sa->used += copied;
603	atomic_add(copied, &ssk->rcv_nxt);
604	*used = copied;
605
606	ssk->tx_ring.rdma_inflight = NULL;
607
608err_post_send:
609	sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
610
611err_alloc_fmr:
612	if (rc && ssk->qp_active) {
613		sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
614		rx_sa->flags |= RX_SA_ABORTED;
615	}
616
617	sock_put(sk, SOCK_REF_RDMA_RD);
618
619	return rc;
620}
621
622static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p)
623{
624	struct sdp_sock *ssk = sdp_sk(sk);
625	int ret = 0;
626	int credits_needed = 1;
627
628	sdp_dbg_data(sk, "Wait for mem\n");
629
630	set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
631
632	SDPSTATS_COUNTER_INC(send_wait_for_mem);
633
634	sdp_do_posts(ssk);
635
636	sdp_xmit_poll(ssk, 1);
637
638	ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
639
640	return ret;
641}
642
643static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa,
644		struct iovec *iov, long *timeo)
645{
646	struct sdp_sock *ssk = sdp_sk(sk);
647	int rc = 0;
648	unsigned long lock_flags;
649
650	rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
651			&tx_sa->fmr, &tx_sa->umem);
652	if (rc) {
653		sdp_warn(sk, "Error allocating fmr: %d\n", rc);
654		goto err_alloc_fmr;
655	}
656
657	if (tx_slots_free(ssk) == 0) {
658		rc = wait_for_sndbuf(sk, timeo);
659		if (rc) {
660			sdp_warn(sk, "Couldn't get send buffer\n");
661			goto err_no_tx_slots;
662		}
663	}
664
665	rc = sdp_post_srcavail(sk, tx_sa);
666	if (rc) {
667		sdp_dbg(sk, "Error posting SrcAvail\n");
668		goto err_abort_send;
669	}
670
671	rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
672	if (unlikely(rc)) {
673		enum tx_sa_flag f = tx_sa->abort_flags;
674
675		if (f & TX_SA_SENDSM) {
676			sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
677		} else if (f & TX_SA_ERROR) {
678			sdp_dbg_data(sk, "SrcAvail error completion\n");
679			sdp_reset(sk);
680			SDPSTATS_COUNTER_INC(zcopy_tx_error);
681		} else if (ssk->qp_active) {
682			sdp_post_srcavail_cancel(sk);
683
684			/* Wait for RdmaRdCompl/SendSM to
685			 * finish the transaction */
686			*timeo = 2 * HZ;
687			sdp_dbg_data(sk, "Waiting for SendSM\n");
688			sdp_wait_rdmardcompl(ssk, timeo, 1);
689			sdp_dbg_data(sk, "finished waiting\n");
690
691			cancel_delayed_work(&ssk->srcavail_cancel_work);
692		} else {
693			sdp_dbg_data(sk, "QP was destroyed while waiting\n");
694		}
695	} else {
696		sdp_dbg_data(sk, "got RdmaRdCompl\n");
697	}
698
699	spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
700	ssk->tx_sa = NULL;
701	spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
702
703err_abort_send:
704	sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
705
706err_no_tx_slots:
707	sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
708
709err_alloc_fmr:
710	return rc;
711}
712
713int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov)
714{
715	struct sdp_sock *ssk = sdp_sk(sk);
716	int rc = 0;
717	long timeo;
718	struct tx_srcavail_state *tx_sa;
719	int offset;
720	size_t bytes_to_copy = 0;
721	int copied = 0;
722
723	sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n",
724			iov->iov_base, iov->iov_len);
725	sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
726	if (ssk->rx_sa) {
727		sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
728		return 0;
729	}
730
731	sock_hold(ssk->socket, SOCK_REF_ZCOPY);
732
733	SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
734
735	timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
736
737	/* Ok commence sending. */
738	offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
739
740	tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
741	if (!tx_sa) {
742		sdp_warn(sk, "Error allocating zcopy context\n");
743		rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
744		goto err_alloc_tx_sa;
745	}
746
747	bytes_to_copy = iov->iov_len;
748	do {
749		tx_sa_reset(tx_sa);
750
751		rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
752
753		if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) {
754			sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
755				iov->iov_len);
756			break;
757		}
758	} while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
759
760	kfree(tx_sa);
761err_alloc_tx_sa:
762	copied = bytes_to_copy - iov->iov_len;
763
764	sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
765
766	sock_put(ssk->socket, SOCK_REF_ZCOPY);
767
768	if (rc < 0 && rc != -EAGAIN && rc != -ETIME)
769		return rc;
770
771	return copied;
772}
773
774void sdp_abort_srcavail(struct socket *sk)
775{
776	struct sdp_sock *ssk = sdp_sk(sk);
777	struct tx_srcavail_state *tx_sa = ssk->tx_sa;
778	unsigned long flags;
779
780	if (!tx_sa)
781		return;
782
783	cancel_delayed_work(&ssk->srcavail_cancel_work);
784	flush_scheduled_work();
785
786	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
787
788	sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
789
790	ssk->tx_sa = NULL;
791
792	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
793}
794
795void sdp_abort_rdma_read(struct socket *sk)
796{
797	struct sdp_sock *ssk = sdp_sk(sk);
798	struct rx_srcavail_state *rx_sa = ssk->rx_sa;
799
800	if (!rx_sa)
801		return;
802
803	sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
804
805	ssk->rx_sa = NULL;
806}
807