1// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2/*
3 * Copyright (c) 2015-2018 Oracle. All rights reserved.
4 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
5 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
6 *
7 * This software is available to you under a choice of one of two
8 * licenses.  You may choose to be licensed under the terms of the GNU
9 * General Public License (GPL) Version 2, available from the file
10 * COPYING in the main directory of this source tree, or the BSD-type
11 * license below:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
15 * are met:
16 *
17 *      Redistributions of source code must retain the above copyright
18 *      notice, this list of conditions and the following disclaimer.
19 *
20 *      Redistributions in binary form must reproduce the above
21 *      copyright notice, this list of conditions and the following
22 *      disclaimer in the documentation and/or other materials provided
23 *      with the distribution.
24 *
25 *      Neither the name of the Network Appliance, Inc. nor the names of
26 *      its contributors may be used to endorse or promote products
27 *      derived from this software without specific prior written
28 *      permission.
29 *
30 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
31 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
32 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
33 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
34 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
36 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
37 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
38 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
39 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
40 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
41 *
42 * Author: Tom Tucker <tom@opengridcomputing.com>
43 */
44
45#include <linux/interrupt.h>
46#include <linux/sched.h>
47#include <linux/slab.h>
48#include <linux/spinlock.h>
49#include <linux/workqueue.h>
50#include <linux/export.h>
51
52#include <rdma/ib_verbs.h>
53#include <rdma/rdma_cm.h>
54#include <rdma/rw.h>
55
56#include <linux/sunrpc/addr.h>
57#include <linux/sunrpc/debug.h>
58#include <linux/sunrpc/svc_xprt.h>
59#include <linux/sunrpc/svc_rdma.h>
60
61#include "xprt_rdma.h"
62#include <trace/events/rpcrdma.h>
63
64#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
65
66static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
67						 struct net *net, int node);
68static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
69					struct net *net,
70					struct sockaddr *sa, int salen,
71					int flags);
72static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
73static void svc_rdma_detach(struct svc_xprt *xprt);
74static void svc_rdma_free(struct svc_xprt *xprt);
75static int svc_rdma_has_wspace(struct svc_xprt *xprt);
76static void svc_rdma_kill_temp_xprt(struct svc_xprt *);
77
78static const struct svc_xprt_ops svc_rdma_ops = {
79	.xpo_create = svc_rdma_create,
80	.xpo_recvfrom = svc_rdma_recvfrom,
81	.xpo_sendto = svc_rdma_sendto,
82	.xpo_result_payload = svc_rdma_result_payload,
83	.xpo_release_ctxt = svc_rdma_release_ctxt,
84	.xpo_detach = svc_rdma_detach,
85	.xpo_free = svc_rdma_free,
86	.xpo_has_wspace = svc_rdma_has_wspace,
87	.xpo_accept = svc_rdma_accept,
88	.xpo_kill_temp_xprt = svc_rdma_kill_temp_xprt,
89};
90
91struct svc_xprt_class svc_rdma_class = {
92	.xcl_name = "rdma",
93	.xcl_owner = THIS_MODULE,
94	.xcl_ops = &svc_rdma_ops,
95	.xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA,
96	.xcl_ident = XPRT_TRANSPORT_RDMA,
97};
98
99/* QP event handler */
100static void qp_event_handler(struct ib_event *event, void *context)
101{
102	struct svc_xprt *xprt = context;
103
104	trace_svcrdma_qp_error(event, (struct sockaddr *)&xprt->xpt_remote);
105	switch (event->event) {
106	/* These are considered benign events */
107	case IB_EVENT_PATH_MIG:
108	case IB_EVENT_COMM_EST:
109	case IB_EVENT_SQ_DRAINED:
110	case IB_EVENT_QP_LAST_WQE_REACHED:
111		break;
112
113	/* These are considered fatal events */
114	case IB_EVENT_PATH_MIG_ERR:
115	case IB_EVENT_QP_FATAL:
116	case IB_EVENT_QP_REQ_ERR:
117	case IB_EVENT_QP_ACCESS_ERR:
118	case IB_EVENT_DEVICE_FATAL:
119	default:
120		svc_xprt_deferred_close(xprt);
121		break;
122	}
123}
124
125static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
126						 struct net *net, int node)
127{
128	static struct lock_class_key svcrdma_rwctx_lock;
129	static struct lock_class_key svcrdma_sctx_lock;
130	static struct lock_class_key svcrdma_dto_lock;
131	struct svcxprt_rdma *cma_xprt;
132
133	cma_xprt = kzalloc_node(sizeof(*cma_xprt), GFP_KERNEL, node);
134	if (!cma_xprt)
135		return NULL;
136
137	svc_xprt_init(net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);
138	INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
139	INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
140	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
141	init_llist_head(&cma_xprt->sc_send_ctxts);
142	init_llist_head(&cma_xprt->sc_recv_ctxts);
143	init_llist_head(&cma_xprt->sc_rw_ctxts);
144	init_waitqueue_head(&cma_xprt->sc_send_wait);
145
146	spin_lock_init(&cma_xprt->sc_lock);
147	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
148	lockdep_set_class(&cma_xprt->sc_rq_dto_lock, &svcrdma_dto_lock);
149	spin_lock_init(&cma_xprt->sc_send_lock);
150	lockdep_set_class(&cma_xprt->sc_send_lock, &svcrdma_sctx_lock);
151	spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
152	lockdep_set_class(&cma_xprt->sc_rw_ctxt_lock, &svcrdma_rwctx_lock);
153
154	/*
155	 * Note that this implies that the underlying transport support
156	 * has some form of congestion control (see RFC 7530 section 3.1
157	 * paragraph 2). For now, we assume that all supported RDMA
158	 * transports are suitable here.
159	 */
160	set_bit(XPT_CONG_CTRL, &cma_xprt->sc_xprt.xpt_flags);
161
162	return cma_xprt;
163}
164
165static void
166svc_rdma_parse_connect_private(struct svcxprt_rdma *newxprt,
167			       struct rdma_conn_param *param)
168{
169	const struct rpcrdma_connect_private *pmsg = param->private_data;
170
171	if (pmsg &&
172	    pmsg->cp_magic == rpcrdma_cmp_magic &&
173	    pmsg->cp_version == RPCRDMA_CMP_VERSION) {
174		newxprt->sc_snd_w_inv = pmsg->cp_flags &
175					RPCRDMA_CMP_F_SND_W_INV_OK;
176
177		dprintk("svcrdma: client send_size %u, recv_size %u "
178			"remote inv %ssupported\n",
179			rpcrdma_decode_buffer_size(pmsg->cp_send_size),
180			rpcrdma_decode_buffer_size(pmsg->cp_recv_size),
181			newxprt->sc_snd_w_inv ? "" : "un");
182	}
183}
184
185/*
186 * This function handles the CONNECT_REQUEST event on a listening
187 * endpoint. It is passed the cma_id for the _new_ connection. The context in
188 * this cma_id is inherited from the listening cma_id and is the svc_xprt
189 * structure for the listening endpoint.
190 *
191 * This function creates a new xprt for the new connection and enqueues it on
192 * the accept queue for the listent xprt. When the listen thread is kicked, it
193 * will call the recvfrom method on the listen xprt which will accept the new
194 * connection.
195 */
196static void handle_connect_req(struct rdma_cm_id *new_cma_id,
197			       struct rdma_conn_param *param)
198{
199	struct svcxprt_rdma *listen_xprt = new_cma_id->context;
200	struct svcxprt_rdma *newxprt;
201	struct sockaddr *sa;
202
203	newxprt = svc_rdma_create_xprt(listen_xprt->sc_xprt.xpt_server,
204				       listen_xprt->sc_xprt.xpt_net,
205				       ibdev_to_node(new_cma_id->device));
206	if (!newxprt)
207		return;
208	newxprt->sc_cm_id = new_cma_id;
209	new_cma_id->context = newxprt;
210	svc_rdma_parse_connect_private(newxprt, param);
211
212	/* Save client advertised inbound read limit for use later in accept. */
213	newxprt->sc_ord = param->initiator_depth;
214
215	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
216	newxprt->sc_xprt.xpt_remotelen = svc_addr_len(sa);
217	memcpy(&newxprt->sc_xprt.xpt_remote, sa,
218	       newxprt->sc_xprt.xpt_remotelen);
219	snprintf(newxprt->sc_xprt.xpt_remotebuf,
220		 sizeof(newxprt->sc_xprt.xpt_remotebuf) - 1, "%pISc", sa);
221
222	/* The remote port is arbitrary and not under the control of the
223	 * client ULP. Set it to a fixed value so that the DRC continues
224	 * to be effective after a reconnect.
225	 */
226	rpc_set_port((struct sockaddr *)&newxprt->sc_xprt.xpt_remote, 0);
227
228	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
229	svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
230
231	/*
232	 * Enqueue the new transport on the accept queue of the listening
233	 * transport
234	 */
235	spin_lock(&listen_xprt->sc_lock);
236	list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
237	spin_unlock(&listen_xprt->sc_lock);
238
239	set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
240	svc_xprt_enqueue(&listen_xprt->sc_xprt);
241}
242
243/**
244 * svc_rdma_listen_handler - Handle CM events generated on a listening endpoint
245 * @cma_id: the server's listener rdma_cm_id
246 * @event: details of the event
247 *
248 * Return values:
249 *     %0: Do not destroy @cma_id
250 *     %1: Destroy @cma_id (never returned here)
251 *
252 * NB: There is never a DEVICE_REMOVAL event for INADDR_ANY listeners.
253 */
254static int svc_rdma_listen_handler(struct rdma_cm_id *cma_id,
255				   struct rdma_cm_event *event)
256{
257	switch (event->event) {
258	case RDMA_CM_EVENT_CONNECT_REQUEST:
259		handle_connect_req(cma_id, &event->param.conn);
260		break;
261	default:
262		break;
263	}
264	return 0;
265}
266
267/**
268 * svc_rdma_cma_handler - Handle CM events on client connections
269 * @cma_id: the server's listener rdma_cm_id
270 * @event: details of the event
271 *
272 * Return values:
273 *     %0: Do not destroy @cma_id
274 *     %1: Destroy @cma_id (never returned here)
275 */
276static int svc_rdma_cma_handler(struct rdma_cm_id *cma_id,
277				struct rdma_cm_event *event)
278{
279	struct svcxprt_rdma *rdma = cma_id->context;
280	struct svc_xprt *xprt = &rdma->sc_xprt;
281
282	switch (event->event) {
283	case RDMA_CM_EVENT_ESTABLISHED:
284		clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
285
286		/* Handle any requests that were received while
287		 * CONN_PENDING was set. */
288		svc_xprt_enqueue(xprt);
289		break;
290	case RDMA_CM_EVENT_DISCONNECTED:
291	case RDMA_CM_EVENT_DEVICE_REMOVAL:
292		svc_xprt_deferred_close(xprt);
293		break;
294	default:
295		break;
296	}
297	return 0;
298}
299
300/*
301 * Create a listening RDMA service endpoint.
302 */
303static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
304					struct net *net,
305					struct sockaddr *sa, int salen,
306					int flags)
307{
308	struct rdma_cm_id *listen_id;
309	struct svcxprt_rdma *cma_xprt;
310	int ret;
311
312	if (sa->sa_family != AF_INET && sa->sa_family != AF_INET6)
313		return ERR_PTR(-EAFNOSUPPORT);
314	cma_xprt = svc_rdma_create_xprt(serv, net, NUMA_NO_NODE);
315	if (!cma_xprt)
316		return ERR_PTR(-ENOMEM);
317	set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
318	strcpy(cma_xprt->sc_xprt.xpt_remotebuf, "listener");
319
320	listen_id = rdma_create_id(net, svc_rdma_listen_handler, cma_xprt,
321				   RDMA_PS_TCP, IB_QPT_RC);
322	if (IS_ERR(listen_id)) {
323		ret = PTR_ERR(listen_id);
324		goto err0;
325	}
326
327	/* Allow both IPv4 and IPv6 sockets to bind a single port
328	 * at the same time.
329	 */
330#if IS_ENABLED(CONFIG_IPV6)
331	ret = rdma_set_afonly(listen_id, 1);
332	if (ret)
333		goto err1;
334#endif
335	ret = rdma_bind_addr(listen_id, sa);
336	if (ret)
337		goto err1;
338	cma_xprt->sc_cm_id = listen_id;
339
340	ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
341	if (ret)
342		goto err1;
343
344	/*
345	 * We need to use the address from the cm_id in case the
346	 * caller specified 0 for the port number.
347	 */
348	sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
349	svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
350
351	return &cma_xprt->sc_xprt;
352
353 err1:
354	rdma_destroy_id(listen_id);
355 err0:
356	kfree(cma_xprt);
357	return ERR_PTR(ret);
358}
359
360/*
361 * This is the xpo_recvfrom function for listening endpoints. Its
362 * purpose is to accept incoming connections. The CMA callback handler
363 * has already created a new transport and attached it to the new CMA
364 * ID.
365 *
366 * There is a queue of pending connections hung on the listening
367 * transport. This queue contains the new svc_xprt structure. This
368 * function takes svc_xprt structures off the accept_q and completes
369 * the connection.
370 */
371static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
372{
373	struct svcxprt_rdma *listen_rdma;
374	struct svcxprt_rdma *newxprt = NULL;
375	struct rdma_conn_param conn_param;
376	struct rpcrdma_connect_private pmsg;
377	struct ib_qp_init_attr qp_attr;
378	unsigned int ctxts, rq_depth;
379	struct ib_device *dev;
380	int ret = 0;
381	RPC_IFDEBUG(struct sockaddr *sap);
382
383	listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
384	clear_bit(XPT_CONN, &xprt->xpt_flags);
385	/* Get the next entry off the accept list */
386	spin_lock(&listen_rdma->sc_lock);
387	if (!list_empty(&listen_rdma->sc_accept_q)) {
388		newxprt = list_entry(listen_rdma->sc_accept_q.next,
389				     struct svcxprt_rdma, sc_accept_q);
390		list_del_init(&newxprt->sc_accept_q);
391	}
392	if (!list_empty(&listen_rdma->sc_accept_q))
393		set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
394	spin_unlock(&listen_rdma->sc_lock);
395	if (!newxprt)
396		return NULL;
397
398	dev = newxprt->sc_cm_id->device;
399	newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
400
401	newxprt->sc_max_req_size = svcrdma_max_req_size;
402	newxprt->sc_max_requests = svcrdma_max_requests;
403	newxprt->sc_max_bc_requests = svcrdma_max_bc_requests;
404	newxprt->sc_recv_batch = RPCRDMA_MAX_RECV_BATCH;
405	newxprt->sc_fc_credits = cpu_to_be32(newxprt->sc_max_requests);
406
407	/* Qualify the transport's resource defaults with the
408	 * capabilities of this particular device.
409	 */
410
411	/* Transport header, head iovec, tail iovec */
412	newxprt->sc_max_send_sges = 3;
413	/* Add one SGE per page list entry */
414	newxprt->sc_max_send_sges += (svcrdma_max_req_size / PAGE_SIZE) + 1;
415	if (newxprt->sc_max_send_sges > dev->attrs.max_send_sge)
416		newxprt->sc_max_send_sges = dev->attrs.max_send_sge;
417	rq_depth = newxprt->sc_max_requests + newxprt->sc_max_bc_requests +
418		   newxprt->sc_recv_batch + 1 /* drain */;
419	if (rq_depth > dev->attrs.max_qp_wr) {
420		rq_depth = dev->attrs.max_qp_wr;
421		newxprt->sc_recv_batch = 1;
422		newxprt->sc_max_requests = rq_depth - 2;
423		newxprt->sc_max_bc_requests = 2;
424	}
425
426	/* Arbitrarily estimate the number of rw_ctxs needed for
427	 * this transport. This is enough rw_ctxs to make forward
428	 * progress even if the client is using one rkey per page
429	 * in each Read chunk.
430	 */
431	ctxts = 3 * RPCSVC_MAXPAGES;
432	newxprt->sc_sq_depth = rq_depth + ctxts;
433	if (newxprt->sc_sq_depth > dev->attrs.max_qp_wr)
434		newxprt->sc_sq_depth = dev->attrs.max_qp_wr;
435	atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
436
437	newxprt->sc_pd = ib_alloc_pd(dev, 0);
438	if (IS_ERR(newxprt->sc_pd)) {
439		trace_svcrdma_pd_err(newxprt, PTR_ERR(newxprt->sc_pd));
440		goto errout;
441	}
442	newxprt->sc_sq_cq = ib_alloc_cq_any(dev, newxprt, newxprt->sc_sq_depth,
443					    IB_POLL_WORKQUEUE);
444	if (IS_ERR(newxprt->sc_sq_cq))
445		goto errout;
446	newxprt->sc_rq_cq =
447		ib_alloc_cq_any(dev, newxprt, rq_depth, IB_POLL_WORKQUEUE);
448	if (IS_ERR(newxprt->sc_rq_cq))
449		goto errout;
450
451	memset(&qp_attr, 0, sizeof qp_attr);
452	qp_attr.event_handler = qp_event_handler;
453	qp_attr.qp_context = &newxprt->sc_xprt;
454	qp_attr.port_num = newxprt->sc_port_num;
455	qp_attr.cap.max_rdma_ctxs = ctxts;
456	qp_attr.cap.max_send_wr = newxprt->sc_sq_depth - ctxts;
457	qp_attr.cap.max_recv_wr = rq_depth;
458	qp_attr.cap.max_send_sge = newxprt->sc_max_send_sges;
459	qp_attr.cap.max_recv_sge = 1;
460	qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
461	qp_attr.qp_type = IB_QPT_RC;
462	qp_attr.send_cq = newxprt->sc_sq_cq;
463	qp_attr.recv_cq = newxprt->sc_rq_cq;
464	dprintk("    cap.max_send_wr = %d, cap.max_recv_wr = %d\n",
465		qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr);
466	dprintk("    cap.max_send_sge = %d, cap.max_recv_sge = %d\n",
467		qp_attr.cap.max_send_sge, qp_attr.cap.max_recv_sge);
468	dprintk("    send CQ depth = %u, recv CQ depth = %u\n",
469		newxprt->sc_sq_depth, rq_depth);
470	ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
471	if (ret) {
472		trace_svcrdma_qp_err(newxprt, ret);
473		goto errout;
474	}
475	newxprt->sc_max_send_sges = qp_attr.cap.max_send_sge;
476	newxprt->sc_qp = newxprt->sc_cm_id->qp;
477
478	if (!(dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS))
479		newxprt->sc_snd_w_inv = false;
480	if (!rdma_protocol_iwarp(dev, newxprt->sc_port_num) &&
481	    !rdma_ib_or_roce(dev, newxprt->sc_port_num)) {
482		trace_svcrdma_fabric_err(newxprt, -EINVAL);
483		goto errout;
484	}
485
486	if (!svc_rdma_post_recvs(newxprt))
487		goto errout;
488
489	/* Construct RDMA-CM private message */
490	pmsg.cp_magic = rpcrdma_cmp_magic;
491	pmsg.cp_version = RPCRDMA_CMP_VERSION;
492	pmsg.cp_flags = 0;
493	pmsg.cp_send_size = pmsg.cp_recv_size =
494		rpcrdma_encode_buffer_size(newxprt->sc_max_req_size);
495
496	/* Accept Connection */
497	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
498	memset(&conn_param, 0, sizeof conn_param);
499	conn_param.responder_resources = 0;
500	conn_param.initiator_depth = min_t(int, newxprt->sc_ord,
501					   dev->attrs.max_qp_init_rd_atom);
502	if (!conn_param.initiator_depth) {
503		ret = -EINVAL;
504		trace_svcrdma_initdepth_err(newxprt, ret);
505		goto errout;
506	}
507	conn_param.private_data = &pmsg;
508	conn_param.private_data_len = sizeof(pmsg);
509	rdma_lock_handler(newxprt->sc_cm_id);
510	newxprt->sc_cm_id->event_handler = svc_rdma_cma_handler;
511	ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
512	rdma_unlock_handler(newxprt->sc_cm_id);
513	if (ret) {
514		trace_svcrdma_accept_err(newxprt, ret);
515		goto errout;
516	}
517
518#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
519	dprintk("svcrdma: new connection accepted on device %s:\n", dev->name);
520	sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
521	dprintk("    local address   : %pIS:%u\n", sap, rpc_get_port(sap));
522	sap = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
523	dprintk("    remote address  : %pIS:%u\n", sap, rpc_get_port(sap));
524	dprintk("    max_sge         : %d\n", newxprt->sc_max_send_sges);
525	dprintk("    sq_depth        : %d\n", newxprt->sc_sq_depth);
526	dprintk("    rdma_rw_ctxs    : %d\n", ctxts);
527	dprintk("    max_requests    : %d\n", newxprt->sc_max_requests);
528	dprintk("    ord             : %d\n", conn_param.initiator_depth);
529#endif
530
531	return &newxprt->sc_xprt;
532
533 errout:
534	/* Take a reference in case the DTO handler runs */
535	svc_xprt_get(&newxprt->sc_xprt);
536	if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
537		ib_destroy_qp(newxprt->sc_qp);
538	rdma_destroy_id(newxprt->sc_cm_id);
539	/* This call to put will destroy the transport */
540	svc_xprt_put(&newxprt->sc_xprt);
541	return NULL;
542}
543
544static void svc_rdma_detach(struct svc_xprt *xprt)
545{
546	struct svcxprt_rdma *rdma =
547		container_of(xprt, struct svcxprt_rdma, sc_xprt);
548
549	rdma_disconnect(rdma->sc_cm_id);
550}
551
552static void __svc_rdma_free(struct work_struct *work)
553{
554	struct svcxprt_rdma *rdma =
555		container_of(work, struct svcxprt_rdma, sc_work);
556
557	/* This blocks until the Completion Queues are empty */
558	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
559		ib_drain_qp(rdma->sc_qp);
560	flush_workqueue(svcrdma_wq);
561
562	svc_rdma_flush_recv_queues(rdma);
563
564	svc_rdma_destroy_rw_ctxts(rdma);
565	svc_rdma_send_ctxts_destroy(rdma);
566	svc_rdma_recv_ctxts_destroy(rdma);
567
568	/* Destroy the QP if present (not a listener) */
569	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
570		ib_destroy_qp(rdma->sc_qp);
571
572	if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
573		ib_free_cq(rdma->sc_sq_cq);
574
575	if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
576		ib_free_cq(rdma->sc_rq_cq);
577
578	if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
579		ib_dealloc_pd(rdma->sc_pd);
580
581	/* Destroy the CM ID */
582	rdma_destroy_id(rdma->sc_cm_id);
583
584	kfree(rdma);
585}
586
587static void svc_rdma_free(struct svc_xprt *xprt)
588{
589	struct svcxprt_rdma *rdma =
590		container_of(xprt, struct svcxprt_rdma, sc_xprt);
591
592	INIT_WORK(&rdma->sc_work, __svc_rdma_free);
593	schedule_work(&rdma->sc_work);
594}
595
596static int svc_rdma_has_wspace(struct svc_xprt *xprt)
597{
598	struct svcxprt_rdma *rdma =
599		container_of(xprt, struct svcxprt_rdma, sc_xprt);
600
601	/*
602	 * If there are already waiters on the SQ,
603	 * return false.
604	 */
605	if (waitqueue_active(&rdma->sc_send_wait))
606		return 0;
607
608	/* Otherwise return true. */
609	return 1;
610}
611
612static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
613{
614}
615