1/*
2 * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses.  You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 *     Redistribution and use in source and binary forms, with or
11 *     without modification, are permitted provided that the following
12 *     conditions are met:
13 *
14 *      - Redistributions of source code must retain the above
15 *        copyright notice, this list of conditions and the following
16 *        disclaimer.
17 *
18 *      - Redistributions in binary form must reproduce the above
19 *        copyright notice, this list of conditions and the following
20 *        disclaimer in the documentation and/or other materials
21 *        provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 */
32#include <linux/tcp.h>
33#include <asm/ioctls.h>
34#include <linux/workqueue.h>
35#include <linux/net.h>
36#include <linux/socket.h>
37#include <net/protocol.h>
38#include <net/inet_common.h>
39#include <rdma/rdma_cm.h>
40#include <rdma/ib_verbs.h>
41#include <rdma/ib_fmr_pool.h>
42#include <rdma/ib_umem.h>
43#include <net/tcp.h> /* for memcpy_toiovec */
44#include <asm/io.h>
45#include <asm/uaccess.h>
46#include <linux/delay.h>
47#include "sdp.h"
48
49static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa)
50{
51	struct sdp_sock *ssk = sdp_sk(sk);
52	struct mbuf *mb;
53	int payload_len;
54	struct page *payload_pg;
55	int off, len;
56	struct ib_umem_chunk *chunk;
57
58	WARN_ON(ssk->tx_sa);
59
60	BUG_ON(!tx_sa);
61	BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
62	BUG_ON(!tx_sa->umem);
63	BUG_ON(!tx_sa->umem->chunk_list.next);
64
65	chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
66	BUG_ON(!chunk->nmap);
67
68	off = tx_sa->umem->offset;
69	len = tx_sa->umem->length;
70
71	tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
72
73	mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0);
74	if (!mb) {
75		return -ENOMEM;
76	}
77	sdp_dbg_data(sk, "sending SrcAvail\n");
78
79	TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb
80					 * but continue to live after mb is freed */
81	ssk->tx_sa = tx_sa;
82
83	/* must have payload inlined in SrcAvail packet in combined mode */
84	payload_len = MIN(tx_sa->umem->page_size - off, len);
85	payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah));
86	payload_pg  = sg_page(&chunk->page_list[0]);
87	get_page(payload_pg);
88
89	sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
90		off, payload_pg, payload_len);
91
92	mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags,
93			payload_pg, off, payload_len);
94
95	mb->len             += payload_len;
96	mb->data_len         = payload_len;
97	mb->truesize        += payload_len;
98//	sk->sk_wmem_queued   += payload_len;
99//	sk->sk_forward_alloc -= payload_len;
100
101	mb_entail(sk, ssk, mb);
102
103	ssk->write_seq += payload_len;
104	SDP_SKB_CB(mb)->end_seq += payload_len;
105
106	tx_sa->bytes_sent = tx_sa->umem->length;
107	tx_sa->bytes_acked = payload_len;
108
109	/* TODO: pushing the mb into the tx_queue should be enough */
110
111	return 0;
112}
113
114static int sdp_post_srcavail_cancel(struct socket *sk)
115{
116	struct sdp_sock *ssk = sdp_sk(sk);
117	struct mbuf *mb;
118
119	sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n");
120
121	mb = sdp_alloc_mb_srcavail_cancel(sk, 0);
122	mb_entail(sk, ssk, mb);
123
124	sdp_post_sends(ssk, 0);
125
126	schedule_delayed_work(&ssk->srcavail_cancel_work,
127			SDP_SRCAVAIL_CANCEL_TIMEOUT);
128
129	return 0;
130}
131
132void srcavail_cancel_timeout(struct work_struct *work)
133{
134	struct sdp_sock *ssk =
135		container_of(work, struct sdp_sock, srcavail_cancel_work.work);
136	struct socket *sk = ssk->socket;
137
138	lock_sock(sk);
139
140	sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
141			" closing connection\n");
142	sdp_set_error(sk, -ECONNRESET);
143	wake_up(&ssk->wq);
144
145	release_sock(sk);
146}
147
148static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
149		int ignore_signals)
150{
151	struct socket *sk = ssk->socket;
152	int err = 0;
153	long vm_wait = 0;
154	long current_timeo = *timeo_p;
155	struct tx_srcavail_state *tx_sa = ssk->tx_sa;
156	DEFINE_WAIT(wait);
157
158	sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
159	sdp_prf1(sk, NULL, "Going to sleep");
160	while (ssk->qp_active) {
161		prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
162
163		if (unlikely(!*timeo_p)) {
164			err = -ETIME;
165			tx_sa->abort_flags |= TX_SA_TIMEDOUT;
166			sdp_prf1(sk, NULL, "timeout");
167			SDPSTATS_COUNTER_INC(zcopy_tx_timeout);
168			break;
169		}
170
171		else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
172			err = -EINVAL;
173			sdp_dbg_data(sk, "acked bytes > sent bytes\n");
174			tx_sa->abort_flags |= TX_SA_ERROR;
175			break;
176		}
177
178		if (tx_sa->abort_flags & TX_SA_SENDSM) {
179			sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
180			SDPSTATS_COUNTER_INC(zcopy_tx_aborted);
181			err = -EAGAIN;
182			break ;
183		}
184
185		if (!ignore_signals) {
186			if (signal_pending(current)) {
187				err = -EINTR;
188				sdp_prf1(sk, NULL, "signalled");
189				tx_sa->abort_flags |= TX_SA_INTRRUPTED;
190				break;
191			}
192
193			if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) {
194				sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n");
195				tx_sa->abort_flags |= TX_SA_CROSS_SEND;
196				SDPSTATS_COUNTER_INC(zcopy_cross_send);
197				err = -ETIME;
198				break ;
199			}
200		}
201
202		posts_handler_put(ssk);
203
204		sk_wait_event(sk, &current_timeo,
205				tx_sa->abort_flags &&
206				ssk->rx_sa &&
207				(tx_sa->bytes_acked < tx_sa->bytes_sent) &&
208				vm_wait);
209		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
210
211		posts_handler_get(ssk);
212
213		if (tx_sa->bytes_acked == tx_sa->bytes_sent)
214			break;
215
216		if (vm_wait) {
217			vm_wait -= current_timeo;
218			current_timeo = *timeo_p;
219			if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
220			    (current_timeo -= vm_wait) < 0)
221				current_timeo = 0;
222			vm_wait = 0;
223		}
224		*timeo_p = current_timeo;
225	}
226
227	finish_wait(sk->sk_sleep, &wait);
228
229	sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
230			tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
231
232	if (!ssk->qp_active) {
233		sdp_dbg(sk, "QP destroyed while waiting\n");
234		return -EINVAL;
235	}
236	return err;
237}
238
239static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
240{
241	struct socket *sk = ssk->socket;
242	long timeo = HZ * 5; /* Timeout for for RDMA read */
243	DEFINE_WAIT(wait);
244
245	sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
246	while (1) {
247		prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
248
249		if (!ssk->tx_ring.rdma_inflight->busy) {
250			sdp_dbg_data(sk, "got rdma cqe\n");
251			break;
252		}
253
254		if (!ssk->qp_active) {
255			sdp_dbg_data(sk, "QP destroyed\n");
256			break;
257		}
258
259		if (!timeo) {
260			sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
261			WARN_ON(1);
262			break;
263		}
264
265		posts_handler_put(ssk);
266
267		sdp_prf1(sk, NULL, "Going to sleep");
268		sk_wait_event(sk, &timeo,
269			!ssk->tx_ring.rdma_inflight->busy);
270		sdp_prf1(sk, NULL, "Woke up");
271		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
272
273		posts_handler_get(ssk);
274	}
275
276	finish_wait(sk->sk_sleep, &wait);
277
278	sdp_dbg_data(sk, "Finished waiting\n");
279}
280
281int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
282		struct rx_srcavail_state *rx_sa)
283{
284	struct mbuf *mb;
285	int copied = rx_sa->used - rx_sa->reported;
286
287	if (rx_sa->used <= rx_sa->reported)
288		return 0;
289
290	mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0);
291
292	rx_sa->reported += copied;
293
294	/* TODO: What if no tx_credits available? */
295	sdp_post_send(ssk, mb);
296
297	return 0;
298}
299
300int sdp_post_sendsm(struct socket *sk)
301{
302	struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0);
303
304	sdp_post_send(sdp_sk(sk), mb);
305
306	return 0;
307}
308
309static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len)
310{
311	sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len);
312	while (len > 0) {
313		if (iov->iov_len) {
314			int copy = min_t(unsigned int, iov->iov_len, len);
315			len -= copy;
316			iov->iov_len -= copy;
317			iov->iov_base += copy;
318		}
319		iov++;
320	}
321
322	return 0;
323}
324
325static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
326{
327	int bytes = 0;
328
329	while (sge_cnt > 0) {
330		bytes += sge->length;
331		sge++;
332		sge_cnt--;
333	}
334
335	return bytes;
336}
337void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
338{
339	struct socket *sk = ssk->socket;
340	unsigned long flags;
341
342	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
343
344	if (!ssk->tx_sa) {
345		sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
346		goto out;
347	}
348
349	if (ssk->tx_sa->mseq > mseq_ack) {
350		sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. "
351			"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
352			mseq_ack, ssk->tx_sa->mseq);
353		goto out;
354	}
355
356	sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
357
358	ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
359	cancel_delayed_work(&ssk->srcavail_cancel_work);
360
361	wake_up(sk->sk_sleep);
362	sdp_dbg_data(sk, "woke up sleepers\n");
363
364out:
365	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
366}
367
368void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
369		u32 bytes_completed)
370{
371	struct socket *sk = ssk->socket;
372	unsigned long flags;
373
374	sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
375	sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
376
377	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
378
379	BUG_ON(!ssk);
380
381	if (!ssk->tx_sa) {
382		sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
383		goto out;
384	}
385
386	if (ssk->tx_sa->mseq > mseq_ack) {
387		sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. "
388			"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
389			mseq_ack, ssk->tx_sa->mseq);
390		goto out;
391	}
392
393	ssk->tx_sa->bytes_acked += bytes_completed;
394
395	wake_up(sk->sk_sleep);
396	sdp_dbg_data(sk, "woke up sleepers\n");
397
398out:
399	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
400	return;
401}
402
403static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset)
404{
405	unsigned long avail;
406	unsigned long lock_limit;
407
408	if (capable(CAP_IPC_LOCK))
409		return ULONG_MAX;
410
411	lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
412	avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT);
413
414	return avail - offset;
415}
416
417static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len,
418	struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
419{
420	struct ib_pool_fmr *fmr;
421	struct ib_umem *umem;
422	struct ib_device *dev;
423	u64 *pages;
424	struct ib_umem_chunk *chunk;
425	int n, j, k;
426	int rc = 0;
427	unsigned long max_lockable_bytes;
428
429	if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) {
430		sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
431			len, SDP_MAX_RDMA_READ_LEN);
432		len = SDP_MAX_RDMA_READ_LEN;
433	}
434
435	max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK);
436	if (unlikely(len > max_lockable_bytes)) {
437		sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n",
438			len, max_lockable_bytes);
439		len = max_lockable_bytes;
440	}
441
442	sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n",
443			uaddr, len, max_lockable_bytes);
444
445	umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
446		IB_ACCESS_REMOTE_WRITE, 0);
447
448	if (IS_ERR(umem)) {
449		rc = PTR_ERR(umem);
450		sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc);
451		sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n",
452				current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur,
453				current->signal->rlim[RLIMIT_MEMLOCK].rlim_max,
454				capable(CAP_IPC_LOCK));
455		goto err_umem_get;
456	}
457
458	sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
459		umem->offset, umem->length);
460
461	pages = (u64 *) __get_free_page(GFP_KERNEL);
462	if (!pages)
463		goto err_pages_alloc;
464
465	n = 0;
466
467	dev = sdp_sk(sk)->ib_device;
468	list_for_each_entry(chunk, &umem->chunk_list, list) {
469		for (j = 0; j < chunk->nmap; ++j) {
470			len = ib_sg_dma_len(dev,
471					&chunk->page_list[j]) >> PAGE_SHIFT;
472
473			for (k = 0; k < len; ++k) {
474				pages[n++] = ib_sg_dma_address(dev,
475						&chunk->page_list[j]) +
476					umem->page_size * k;
477
478			}
479		}
480	}
481
482	fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
483	if (IS_ERR(fmr)) {
484		sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
485		goto err_fmr_alloc;
486	}
487
488	free_page((unsigned long) pages);
489
490	*_umem = umem;
491	*_fmr = fmr;
492
493	return 0;
494
495err_fmr_alloc:
496	free_page((unsigned long) pages);
497
498err_pages_alloc:
499	ib_umem_release(umem);
500
501err_umem_get:
502
503	return rc;
504}
505
506void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
507{
508	if (!sdp_sk(sk)->qp_active)
509		return;
510
511	ib_fmr_pool_unmap(*_fmr);
512	*_fmr = NULL;
513
514	ib_umem_release(*_umem);
515	*_umem = NULL;
516}
517
518static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa)
519{
520	struct sdp_sock *ssk = sdp_sk(sk);
521	struct ib_send_wr *bad_wr;
522	struct ib_send_wr wr = { NULL };
523	struct ib_sge sge;
524
525	wr.opcode = IB_WR_RDMA_READ;
526	wr.next = NULL;
527	wr.wr_id = SDP_OP_RDMA;
528	wr.wr.rdma.rkey = rx_sa->rkey;
529	wr.send_flags = 0;
530
531	ssk->tx_ring.rdma_inflight = rx_sa;
532
533	sge.addr = rx_sa->umem->offset;
534	sge.length = rx_sa->umem->length;
535	sge.lkey = rx_sa->fmr->fmr->lkey;
536
537	wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
538	wr.num_sge = 1;
539	wr.sg_list = &sge;
540	rx_sa->busy++;
541
542	wr.send_flags = IB_SEND_SIGNALED;
543
544	return ib_post_send(ssk->qp, &wr, &bad_wr);
545}
546
547int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb,
548		unsigned long *used)
549{
550	struct sdp_sock *ssk = sdp_sk(sk);
551	struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb);
552	int got_srcavail_cancel;
553	int rc = 0;
554	int len = *used;
555	int copied;
556
557	sdp_dbg_data(ssk->socket, "preparing RDMA read."
558		" len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
559
560	sock_hold(sk, SOCK_REF_RDMA_RD);
561
562	if (len > rx_sa->len) {
563		sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
564		WARN_ON(1);
565		len = rx_sa->len;
566	}
567
568	rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
569	if (rc) {
570		sdp_warn(sk, "Error allocating fmr: %d\n", rc);
571		goto err_alloc_fmr;
572	}
573
574	rc = sdp_post_rdma_read(sk, rx_sa);
575	if (unlikely(rc)) {
576		sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
577		sdp_set_error(ssk->socket, -ECONNRESET);
578		wake_up(&ssk->wq);
579		goto err_post_send;
580	}
581
582	sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc);
583
584	got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
585
586	sdp_arm_tx_cq(sk);
587
588	sdp_wait_rdma_wr_finished(ssk);
589
590	sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc);
591	if (!ssk->qp_active) {
592		sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
593		rc = -EPIPE;
594		goto err_post_send;
595	}
596
597	copied = rx_sa->umem->length;
598
599	sdp_update_iov_used(sk, iov, copied);
600	rx_sa->used += copied;
601	atomic_add(copied, &ssk->rcv_nxt);
602	*used = copied;
603
604	ssk->tx_ring.rdma_inflight = NULL;
605
606err_post_send:
607	sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
608
609err_alloc_fmr:
610	if (rc && ssk->qp_active) {
611		sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
612		rx_sa->flags |= RX_SA_ABORTED;
613	}
614
615	sock_put(sk, SOCK_REF_RDMA_RD);
616
617	return rc;
618}
619
620static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p)
621{
622	struct sdp_sock *ssk = sdp_sk(sk);
623	int ret = 0;
624	int credits_needed = 1;
625
626	sdp_dbg_data(sk, "Wait for mem\n");
627
628	set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
629
630	SDPSTATS_COUNTER_INC(send_wait_for_mem);
631
632	sdp_do_posts(ssk);
633
634	sdp_xmit_poll(ssk, 1);
635
636	ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
637
638	return ret;
639}
640
641static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa,
642		struct iovec *iov, long *timeo)
643{
644	struct sdp_sock *ssk = sdp_sk(sk);
645	int rc = 0;
646	unsigned long lock_flags;
647
648	rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
649			&tx_sa->fmr, &tx_sa->umem);
650	if (rc) {
651		sdp_warn(sk, "Error allocating fmr: %d\n", rc);
652		goto err_alloc_fmr;
653	}
654
655	if (tx_slots_free(ssk) == 0) {
656		rc = wait_for_sndbuf(sk, timeo);
657		if (rc) {
658			sdp_warn(sk, "Couldn't get send buffer\n");
659			goto err_no_tx_slots;
660		}
661	}
662
663	rc = sdp_post_srcavail(sk, tx_sa);
664	if (rc) {
665		sdp_dbg(sk, "Error posting SrcAvail\n");
666		goto err_abort_send;
667	}
668
669	rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
670	if (unlikely(rc)) {
671		enum tx_sa_flag f = tx_sa->abort_flags;
672
673		if (f & TX_SA_SENDSM) {
674			sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
675		} else if (f & TX_SA_ERROR) {
676			sdp_dbg_data(sk, "SrcAvail error completion\n");
677			sdp_reset(sk);
678			SDPSTATS_COUNTER_INC(zcopy_tx_error);
679		} else if (ssk->qp_active) {
680			sdp_post_srcavail_cancel(sk);
681
682			/* Wait for RdmaRdCompl/SendSM to
683			 * finish the transaction */
684			*timeo = 2 * HZ;
685			sdp_dbg_data(sk, "Waiting for SendSM\n");
686			sdp_wait_rdmardcompl(ssk, timeo, 1);
687			sdp_dbg_data(sk, "finished waiting\n");
688
689			cancel_delayed_work(&ssk->srcavail_cancel_work);
690		} else {
691			sdp_dbg_data(sk, "QP was destroyed while waiting\n");
692		}
693	} else {
694		sdp_dbg_data(sk, "got RdmaRdCompl\n");
695	}
696
697	spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
698	ssk->tx_sa = NULL;
699	spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
700
701err_abort_send:
702	sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
703
704err_no_tx_slots:
705	sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
706
707err_alloc_fmr:
708	return rc;
709}
710
711int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov)
712{
713	struct sdp_sock *ssk = sdp_sk(sk);
714	int rc = 0;
715	long timeo;
716	struct tx_srcavail_state *tx_sa;
717	int offset;
718	size_t bytes_to_copy = 0;
719	int copied = 0;
720
721	sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n",
722			iov->iov_base, iov->iov_len);
723	sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
724	if (ssk->rx_sa) {
725		sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
726		return 0;
727	}
728
729	sock_hold(ssk->socket, SOCK_REF_ZCOPY);
730
731	SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
732
733	timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
734
735	/* Ok commence sending. */
736	offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
737
738	tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
739	if (!tx_sa) {
740		sdp_warn(sk, "Error allocating zcopy context\n");
741		rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
742		goto err_alloc_tx_sa;
743	}
744
745	bytes_to_copy = iov->iov_len;
746	do {
747		tx_sa_reset(tx_sa);
748
749		rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
750
751		if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) {
752			sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
753				iov->iov_len);
754			break;
755		}
756	} while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
757
758	kfree(tx_sa);
759err_alloc_tx_sa:
760	copied = bytes_to_copy - iov->iov_len;
761
762	sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
763
764	sock_put(ssk->socket, SOCK_REF_ZCOPY);
765
766	if (rc < 0 && rc != -EAGAIN && rc != -ETIME)
767		return rc;
768
769	return copied;
770}
771
772void sdp_abort_srcavail(struct socket *sk)
773{
774	struct sdp_sock *ssk = sdp_sk(sk);
775	struct tx_srcavail_state *tx_sa = ssk->tx_sa;
776	unsigned long flags;
777
778	if (!tx_sa)
779		return;
780
781	cancel_delayed_work(&ssk->srcavail_cancel_work);
782	flush_scheduled_work();
783
784	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
785
786	sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
787
788	ssk->tx_sa = NULL;
789
790	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
791}
792
793void sdp_abort_rdma_read(struct socket *sk)
794{
795	struct sdp_sock *ssk = sdp_sk(sk);
796	struct rx_srcavail_state *rx_sa = ssk->rx_sa;
797
798	if (!rx_sa)
799		return;
800
801	sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
802
803	ssk->rx_sa = NULL;
804}
805