1219820Sjeff/*
2219820Sjeff * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
3219820Sjeff * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
4219820Sjeff *
5219820Sjeff * This software is available to you under a choice of one of two
6219820Sjeff * licenses.  You may choose to be licensed under the terms of the GNU
7219820Sjeff * General Public License (GPL) Version 2, available from the file
8219820Sjeff * COPYING in the main directory of this source tree, or the
9219820Sjeff * OpenIB.org BSD license below:
10219820Sjeff *
11219820Sjeff *     Redistribution and use in source and binary forms, with or
12219820Sjeff *     without modification, are permitted provided that the following
13219820Sjeff *     conditions are met:
14219820Sjeff *
15219820Sjeff *      - Redistributions of source code must retain the above
16219820Sjeff *        copyright notice, this list of conditions and the following
17219820Sjeff *        disclaimer.
18219820Sjeff *
19219820Sjeff *      - Redistributions in binary form must reproduce the above
20219820Sjeff *        copyright notice, this list of conditions and the following
21219820Sjeff *        disclaimer in the documentation and/or other materials
22219820Sjeff *        provided with the distribution.
23219820Sjeff *
24219820Sjeff * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25219820Sjeff * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26219820Sjeff * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27219820Sjeff * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28219820Sjeff * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29219820Sjeff * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30219820Sjeff * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31219820Sjeff * SOFTWARE.
32219820Sjeff */
33219820Sjeff
34219820Sjeff#include <getopt.h>
35219820Sjeff#include <stdlib.h>
36219820Sjeff#include <string.h>
37219820Sjeff#include <stdio.h>
38219820Sjeff#include <errno.h>
39219820Sjeff#include <sys/types.h>
40219820Sjeff#include <netinet/in.h>
41219820Sjeff#include <sys/socket.h>
42219820Sjeff#include <netdb.h>
43219820Sjeff#include <byteswap.h>
44219820Sjeff#include <semaphore.h>
45219820Sjeff#include <arpa/inet.h>
46219820Sjeff#include <pthread.h>
47219820Sjeff#include <inttypes.h>
48219820Sjeff
49219820Sjeff#include <rdma/rdma_cma.h>
50219820Sjeff#include <infiniband/arch.h>
51219820Sjeff
52219820Sjeffstatic int debug = 0;
53219820Sjeff#define DEBUG_LOG if (debug) printf
54219820Sjeff
55219820Sjeff/*
56219820Sjeff * rping "ping/pong" loop:
57219820Sjeff * 	client sends source rkey/addr/len
58219820Sjeff *	server receives source rkey/add/len
59219820Sjeff *	server rdma reads "ping" data from source
60219820Sjeff * 	server sends "go ahead" on rdma read completion
61219820Sjeff *	client sends sink rkey/addr/len
62219820Sjeff * 	server receives sink rkey/addr/len
63219820Sjeff * 	server rdma writes "pong" data to sink
64219820Sjeff * 	server sends "go ahead" on rdma write completion
65219820Sjeff * 	<repeat loop>
66219820Sjeff */
67219820Sjeff
68219820Sjeff/*
69219820Sjeff * These states are used to signal events between the completion handler
70219820Sjeff * and the main client or server thread.
71219820Sjeff *
72219820Sjeff * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV,
73219820Sjeff * and RDMA_WRITE_COMPLETE for each ping.
74219820Sjeff */
75219820Sjeffenum test_state {
76219820Sjeff	IDLE = 1,
77219820Sjeff	CONNECT_REQUEST,
78219820Sjeff	ADDR_RESOLVED,
79219820Sjeff	ROUTE_RESOLVED,
80219820Sjeff	CONNECTED,
81219820Sjeff	RDMA_READ_ADV,
82219820Sjeff	RDMA_READ_COMPLETE,
83219820Sjeff	RDMA_WRITE_ADV,
84219820Sjeff	RDMA_WRITE_COMPLETE,
85219820Sjeff	ERROR
86219820Sjeff};
87219820Sjeff
88219820Sjeffstruct rping_rdma_info {
89219820Sjeff	uint64_t buf;
90219820Sjeff	uint32_t rkey;
91219820Sjeff	uint32_t size;
92219820Sjeff};
93219820Sjeff
94219820Sjeff/*
95219820Sjeff * Default max buffer size for IO...
96219820Sjeff */
97219820Sjeff#define RPING_BUFSIZE 64*1024
98219820Sjeff#define RPING_SQ_DEPTH 16
99219820Sjeff
100219820Sjeff/* Default string for print data and
101219820Sjeff * minimum buffer size
102219820Sjeff */
103219820Sjeff#define _stringify( _x ) # _x
104219820Sjeff#define stringify( _x ) _stringify( _x )
105219820Sjeff
106219820Sjeff#define RPING_MSG_FMT           "rdma-ping-%d: "
107219820Sjeff#define RPING_MIN_BUFSIZE       sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT)
108219820Sjeff
109219820Sjeff/*
110219820Sjeff * Control block struct.
111219820Sjeff */
112219820Sjeffstruct rping_cb {
113219820Sjeff	int server;			/* 0 iff client */
114219820Sjeff	pthread_t cqthread;
115219820Sjeff	pthread_t persistent_server_thread;
116219820Sjeff	struct ibv_comp_channel *channel;
117219820Sjeff	struct ibv_cq *cq;
118219820Sjeff	struct ibv_pd *pd;
119219820Sjeff	struct ibv_qp *qp;
120219820Sjeff
121219820Sjeff	struct ibv_recv_wr rq_wr;	/* recv work request record */
122219820Sjeff	struct ibv_sge recv_sgl;	/* recv single SGE */
123219820Sjeff	struct rping_rdma_info recv_buf;/* malloc'd buffer */
124219820Sjeff	struct ibv_mr *recv_mr;		/* MR associated with this buffer */
125219820Sjeff
126219820Sjeff	struct ibv_send_wr sq_wr;	/* send work request record */
127219820Sjeff	struct ibv_sge send_sgl;
128219820Sjeff	struct rping_rdma_info send_buf;/* single send buf */
129219820Sjeff	struct ibv_mr *send_mr;
130219820Sjeff
131219820Sjeff	struct ibv_send_wr rdma_sq_wr;	/* rdma work request record */
132219820Sjeff	struct ibv_sge rdma_sgl;	/* rdma single SGE */
133219820Sjeff	char *rdma_buf;			/* used as rdma sink */
134219820Sjeff	struct ibv_mr *rdma_mr;
135219820Sjeff
136219820Sjeff	uint32_t remote_rkey;		/* remote guys RKEY */
137219820Sjeff	uint64_t remote_addr;		/* remote guys TO */
138219820Sjeff	uint32_t remote_len;		/* remote guys LEN */
139219820Sjeff
140219820Sjeff	char *start_buf;		/* rdma read src */
141219820Sjeff	struct ibv_mr *start_mr;
142219820Sjeff
143219820Sjeff	enum test_state state;		/* used for cond/signalling */
144219820Sjeff	sem_t sem;
145219820Sjeff
146219820Sjeff	struct sockaddr_storage sin;
147219820Sjeff	uint16_t port;			/* dst port in NBO */
148219820Sjeff	int verbose;			/* verbose logging */
149219820Sjeff	int count;			/* ping count */
150219820Sjeff	int size;			/* ping data size */
151219820Sjeff	int validate;			/* validate ping data */
152219820Sjeff
153219820Sjeff	/* CM stuff */
154219820Sjeff	pthread_t cmthread;
155219820Sjeff	struct rdma_event_channel *cm_channel;
156219820Sjeff	struct rdma_cm_id *cm_id;	/* connection on client side,*/
157219820Sjeff					/* listener on service side. */
158219820Sjeff	struct rdma_cm_id *child_cm_id;	/* connection on server side */
159219820Sjeff};
160219820Sjeff
161219820Sjeffstatic int rping_cma_event_handler(struct rdma_cm_id *cma_id,
162219820Sjeff				    struct rdma_cm_event *event)
163219820Sjeff{
164219820Sjeff	int ret = 0;
165219820Sjeff	struct rping_cb *cb = cma_id->context;
166219820Sjeff
167219820Sjeff	DEBUG_LOG("cma_event type %s cma_id %p (%s)\n",
168219820Sjeff		  rdma_event_str(event->event), cma_id,
169219820Sjeff		  (cma_id == cb->cm_id) ? "parent" : "child");
170219820Sjeff
171219820Sjeff	switch (event->event) {
172219820Sjeff	case RDMA_CM_EVENT_ADDR_RESOLVED:
173219820Sjeff		cb->state = ADDR_RESOLVED;
174219820Sjeff		ret = rdma_resolve_route(cma_id, 2000);
175219820Sjeff		if (ret) {
176219820Sjeff			cb->state = ERROR;
177219820Sjeff			perror("rdma_resolve_route");
178219820Sjeff			sem_post(&cb->sem);
179219820Sjeff		}
180219820Sjeff		break;
181219820Sjeff
182219820Sjeff	case RDMA_CM_EVENT_ROUTE_RESOLVED:
183219820Sjeff		cb->state = ROUTE_RESOLVED;
184219820Sjeff		sem_post(&cb->sem);
185219820Sjeff		break;
186219820Sjeff
187219820Sjeff	case RDMA_CM_EVENT_CONNECT_REQUEST:
188219820Sjeff		cb->state = CONNECT_REQUEST;
189219820Sjeff		cb->child_cm_id = cma_id;
190219820Sjeff		DEBUG_LOG("child cma %p\n", cb->child_cm_id);
191219820Sjeff		sem_post(&cb->sem);
192219820Sjeff		break;
193219820Sjeff
194219820Sjeff	case RDMA_CM_EVENT_ESTABLISHED:
195219820Sjeff		DEBUG_LOG("ESTABLISHED\n");
196219820Sjeff
197219820Sjeff		/*
198219820Sjeff		 * Server will wake up when first RECV completes.
199219820Sjeff		 */
200219820Sjeff		if (!cb->server) {
201219820Sjeff			cb->state = CONNECTED;
202219820Sjeff		}
203219820Sjeff		sem_post(&cb->sem);
204219820Sjeff		break;
205219820Sjeff
206219820Sjeff	case RDMA_CM_EVENT_ADDR_ERROR:
207219820Sjeff	case RDMA_CM_EVENT_ROUTE_ERROR:
208219820Sjeff	case RDMA_CM_EVENT_CONNECT_ERROR:
209219820Sjeff	case RDMA_CM_EVENT_UNREACHABLE:
210219820Sjeff	case RDMA_CM_EVENT_REJECTED:
211219820Sjeff		fprintf(stderr, "cma event %s, error %d\n",
212219820Sjeff			rdma_event_str(event->event), event->status);
213219820Sjeff		sem_post(&cb->sem);
214219820Sjeff		ret = -1;
215219820Sjeff		break;
216219820Sjeff
217219820Sjeff	case RDMA_CM_EVENT_DISCONNECTED:
218219820Sjeff		fprintf(stderr, "%s DISCONNECT EVENT...\n",
219219820Sjeff			cb->server ? "server" : "client");
220219820Sjeff		sem_post(&cb->sem);
221219820Sjeff		break;
222219820Sjeff
223219820Sjeff	case RDMA_CM_EVENT_DEVICE_REMOVAL:
224219820Sjeff		fprintf(stderr, "cma detected device removal!!!!\n");
225219820Sjeff		ret = -1;
226219820Sjeff		break;
227219820Sjeff
228219820Sjeff	default:
229219820Sjeff		fprintf(stderr, "unhandled event: %s, ignoring\n",
230219820Sjeff			rdma_event_str(event->event));
231219820Sjeff		break;
232219820Sjeff	}
233219820Sjeff
234219820Sjeff	return ret;
235219820Sjeff}
236219820Sjeff
237219820Sjeffstatic int server_recv(struct rping_cb *cb, struct ibv_wc *wc)
238219820Sjeff{
239219820Sjeff	if (wc->byte_len != sizeof(cb->recv_buf)) {
240219820Sjeff		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
241219820Sjeff		return -1;
242219820Sjeff	}
243219820Sjeff
244219820Sjeff	cb->remote_rkey = ntohl(cb->recv_buf.rkey);
245219820Sjeff	cb->remote_addr = ntohll(cb->recv_buf.buf);
246219820Sjeff	cb->remote_len  = ntohl(cb->recv_buf.size);
247219820Sjeff	DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n",
248219820Sjeff		  cb->remote_rkey, cb->remote_addr, cb->remote_len);
249219820Sjeff
250219820Sjeff	if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE)
251219820Sjeff		cb->state = RDMA_READ_ADV;
252219820Sjeff	else
253219820Sjeff		cb->state = RDMA_WRITE_ADV;
254219820Sjeff
255219820Sjeff	return 0;
256219820Sjeff}
257219820Sjeff
258219820Sjeffstatic int client_recv(struct rping_cb *cb, struct ibv_wc *wc)
259219820Sjeff{
260219820Sjeff	if (wc->byte_len != sizeof(cb->recv_buf)) {
261219820Sjeff		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
262219820Sjeff		return -1;
263219820Sjeff	}
264219820Sjeff
265219820Sjeff	if (cb->state == RDMA_READ_ADV)
266219820Sjeff		cb->state = RDMA_WRITE_ADV;
267219820Sjeff	else
268219820Sjeff		cb->state = RDMA_WRITE_COMPLETE;
269219820Sjeff
270219820Sjeff	return 0;
271219820Sjeff}
272219820Sjeff
273219820Sjeffstatic int rping_cq_event_handler(struct rping_cb *cb)
274219820Sjeff{
275219820Sjeff	struct ibv_wc wc;
276219820Sjeff	struct ibv_recv_wr *bad_wr;
277219820Sjeff	int ret;
278219820Sjeff
279219820Sjeff	while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) {
280219820Sjeff		ret = 0;
281219820Sjeff
282219820Sjeff		if (wc.status) {
283219820Sjeff			fprintf(stderr, "cq completion failed status %d\n",
284219820Sjeff				wc.status);
285219820Sjeff			if (wc.status != IBV_WC_WR_FLUSH_ERR)
286219820Sjeff				ret = -1;
287219820Sjeff			goto error;
288219820Sjeff		}
289219820Sjeff
290219820Sjeff		switch (wc.opcode) {
291219820Sjeff		case IBV_WC_SEND:
292219820Sjeff			DEBUG_LOG("send completion\n");
293219820Sjeff			break;
294219820Sjeff
295219820Sjeff		case IBV_WC_RDMA_WRITE:
296219820Sjeff			DEBUG_LOG("rdma write completion\n");
297219820Sjeff			cb->state = RDMA_WRITE_COMPLETE;
298219820Sjeff			sem_post(&cb->sem);
299219820Sjeff			break;
300219820Sjeff
301219820Sjeff		case IBV_WC_RDMA_READ:
302219820Sjeff			DEBUG_LOG("rdma read completion\n");
303219820Sjeff			cb->state = RDMA_READ_COMPLETE;
304219820Sjeff			sem_post(&cb->sem);
305219820Sjeff			break;
306219820Sjeff
307219820Sjeff		case IBV_WC_RECV:
308219820Sjeff			DEBUG_LOG("recv completion\n");
309219820Sjeff			ret = cb->server ? server_recv(cb, &wc) :
310219820Sjeff					   client_recv(cb, &wc);
311219820Sjeff			if (ret) {
312219820Sjeff				fprintf(stderr, "recv wc error: %d\n", ret);
313219820Sjeff				goto error;
314219820Sjeff			}
315219820Sjeff
316219820Sjeff			ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
317219820Sjeff			if (ret) {
318219820Sjeff				fprintf(stderr, "post recv error: %d\n", ret);
319219820Sjeff				goto error;
320219820Sjeff			}
321219820Sjeff			sem_post(&cb->sem);
322219820Sjeff			break;
323219820Sjeff
324219820Sjeff		default:
325219820Sjeff			DEBUG_LOG("unknown!!!!! completion\n");
326219820Sjeff			ret = -1;
327219820Sjeff			goto error;
328219820Sjeff		}
329219820Sjeff	}
330219820Sjeff	if (ret) {
331219820Sjeff		fprintf(stderr, "poll error %d\n", ret);
332219820Sjeff		goto error;
333219820Sjeff	}
334219820Sjeff	return 0;
335219820Sjeff
336219820Sjefferror:
337219820Sjeff	cb->state = ERROR;
338219820Sjeff	sem_post(&cb->sem);
339219820Sjeff	return ret;
340219820Sjeff}
341219820Sjeff
342219820Sjeffstatic int rping_accept(struct rping_cb *cb)
343219820Sjeff{
344219820Sjeff	struct rdma_conn_param conn_param;
345219820Sjeff	int ret;
346219820Sjeff
347219820Sjeff	DEBUG_LOG("accepting client connection request\n");
348219820Sjeff
349219820Sjeff	memset(&conn_param, 0, sizeof conn_param);
350219820Sjeff	conn_param.responder_resources = 1;
351219820Sjeff	conn_param.initiator_depth = 1;
352219820Sjeff
353219820Sjeff	ret = rdma_accept(cb->child_cm_id, &conn_param);
354219820Sjeff	if (ret) {
355219820Sjeff		perror("rdma_accept");
356219820Sjeff		return ret;
357219820Sjeff	}
358219820Sjeff
359219820Sjeff	sem_wait(&cb->sem);
360219820Sjeff	if (cb->state == ERROR) {
361219820Sjeff		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
362219820Sjeff		return -1;
363219820Sjeff	}
364219820Sjeff	return 0;
365219820Sjeff}
366219820Sjeff
367219820Sjeffstatic void rping_setup_wr(struct rping_cb *cb)
368219820Sjeff{
369219820Sjeff	cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf;
370219820Sjeff	cb->recv_sgl.length = sizeof cb->recv_buf;
371219820Sjeff	cb->recv_sgl.lkey = cb->recv_mr->lkey;
372219820Sjeff	cb->rq_wr.sg_list = &cb->recv_sgl;
373219820Sjeff	cb->rq_wr.num_sge = 1;
374219820Sjeff
375219820Sjeff	cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf;
376219820Sjeff	cb->send_sgl.length = sizeof cb->send_buf;
377219820Sjeff	cb->send_sgl.lkey = cb->send_mr->lkey;
378219820Sjeff
379219820Sjeff	cb->sq_wr.opcode = IBV_WR_SEND;
380219820Sjeff	cb->sq_wr.send_flags = IBV_SEND_SIGNALED;
381219820Sjeff	cb->sq_wr.sg_list = &cb->send_sgl;
382219820Sjeff	cb->sq_wr.num_sge = 1;
383219820Sjeff
384219820Sjeff	cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf;
385219820Sjeff	cb->rdma_sgl.lkey = cb->rdma_mr->lkey;
386219820Sjeff	cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
387219820Sjeff	cb->rdma_sq_wr.sg_list = &cb->rdma_sgl;
388219820Sjeff	cb->rdma_sq_wr.num_sge = 1;
389219820Sjeff}
390219820Sjeff
391219820Sjeffstatic int rping_setup_buffers(struct rping_cb *cb)
392219820Sjeff{
393219820Sjeff	int ret;
394219820Sjeff
395219820Sjeff	DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb);
396219820Sjeff
397219820Sjeff	cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf,
398219820Sjeff				 IBV_ACCESS_LOCAL_WRITE);
399219820Sjeff	if (!cb->recv_mr) {
400219820Sjeff		fprintf(stderr, "recv_buf reg_mr failed\n");
401219820Sjeff		return errno;
402219820Sjeff	}
403219820Sjeff
404219820Sjeff	cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0);
405219820Sjeff	if (!cb->send_mr) {
406219820Sjeff		fprintf(stderr, "send_buf reg_mr failed\n");
407219820Sjeff		ret = errno;
408219820Sjeff		goto err1;
409219820Sjeff	}
410219820Sjeff
411219820Sjeff	cb->rdma_buf = malloc(cb->size);
412219820Sjeff	if (!cb->rdma_buf) {
413219820Sjeff		fprintf(stderr, "rdma_buf malloc failed\n");
414219820Sjeff		ret = -ENOMEM;
415219820Sjeff		goto err2;
416219820Sjeff	}
417219820Sjeff
418219820Sjeff	cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size,
419219820Sjeff				 IBV_ACCESS_LOCAL_WRITE |
420219820Sjeff				 IBV_ACCESS_REMOTE_READ |
421219820Sjeff				 IBV_ACCESS_REMOTE_WRITE);
422219820Sjeff	if (!cb->rdma_mr) {
423219820Sjeff		fprintf(stderr, "rdma_buf reg_mr failed\n");
424219820Sjeff		ret = errno;
425219820Sjeff		goto err3;
426219820Sjeff	}
427219820Sjeff
428219820Sjeff	if (!cb->server) {
429219820Sjeff		cb->start_buf = malloc(cb->size);
430219820Sjeff		if (!cb->start_buf) {
431219820Sjeff			fprintf(stderr, "start_buf malloc failed\n");
432219820Sjeff			ret = -ENOMEM;
433219820Sjeff			goto err4;
434219820Sjeff		}
435219820Sjeff
436219820Sjeff		cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size,
437219820Sjeff					  IBV_ACCESS_LOCAL_WRITE |
438219820Sjeff					  IBV_ACCESS_REMOTE_READ |
439219820Sjeff					  IBV_ACCESS_REMOTE_WRITE);
440219820Sjeff		if (!cb->start_mr) {
441219820Sjeff			fprintf(stderr, "start_buf reg_mr failed\n");
442219820Sjeff			ret = errno;
443219820Sjeff			goto err5;
444219820Sjeff		}
445219820Sjeff	}
446219820Sjeff
447219820Sjeff	rping_setup_wr(cb);
448219820Sjeff	DEBUG_LOG("allocated & registered buffers...\n");
449219820Sjeff	return 0;
450219820Sjeff
451219820Sjefferr5:
452219820Sjeff	free(cb->start_buf);
453219820Sjefferr4:
454219820Sjeff	ibv_dereg_mr(cb->rdma_mr);
455219820Sjefferr3:
456219820Sjeff	free(cb->rdma_buf);
457219820Sjefferr2:
458219820Sjeff	ibv_dereg_mr(cb->send_mr);
459219820Sjefferr1:
460219820Sjeff	ibv_dereg_mr(cb->recv_mr);
461219820Sjeff	return ret;
462219820Sjeff}
463219820Sjeff
464219820Sjeffstatic void rping_free_buffers(struct rping_cb *cb)
465219820Sjeff{
466219820Sjeff	DEBUG_LOG("rping_free_buffers called on cb %p\n", cb);
467219820Sjeff	ibv_dereg_mr(cb->recv_mr);
468219820Sjeff	ibv_dereg_mr(cb->send_mr);
469219820Sjeff	ibv_dereg_mr(cb->rdma_mr);
470219820Sjeff	free(cb->rdma_buf);
471219820Sjeff	if (!cb->server) {
472219820Sjeff		ibv_dereg_mr(cb->start_mr);
473219820Sjeff		free(cb->start_buf);
474219820Sjeff	}
475219820Sjeff}
476219820Sjeff
477219820Sjeffstatic int rping_create_qp(struct rping_cb *cb)
478219820Sjeff{
479219820Sjeff	struct ibv_qp_init_attr init_attr;
480219820Sjeff	int ret;
481219820Sjeff
482219820Sjeff	memset(&init_attr, 0, sizeof(init_attr));
483219820Sjeff	init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
484219820Sjeff	init_attr.cap.max_recv_wr = 2;
485219820Sjeff	init_attr.cap.max_recv_sge = 1;
486219820Sjeff	init_attr.cap.max_send_sge = 1;
487219820Sjeff	init_attr.qp_type = IBV_QPT_RC;
488219820Sjeff	init_attr.send_cq = cb->cq;
489219820Sjeff	init_attr.recv_cq = cb->cq;
490219820Sjeff
491219820Sjeff	if (cb->server) {
492219820Sjeff		ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr);
493219820Sjeff		if (!ret)
494219820Sjeff			cb->qp = cb->child_cm_id->qp;
495219820Sjeff	} else {
496219820Sjeff		ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr);
497219820Sjeff		if (!ret)
498219820Sjeff			cb->qp = cb->cm_id->qp;
499219820Sjeff	}
500219820Sjeff
501219820Sjeff	return ret;
502219820Sjeff}
503219820Sjeff
504219820Sjeffstatic void rping_free_qp(struct rping_cb *cb)
505219820Sjeff{
506219820Sjeff	ibv_destroy_qp(cb->qp);
507219820Sjeff	ibv_destroy_cq(cb->cq);
508219820Sjeff	ibv_destroy_comp_channel(cb->channel);
509219820Sjeff	ibv_dealloc_pd(cb->pd);
510219820Sjeff}
511219820Sjeff
512219820Sjeffstatic int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id)
513219820Sjeff{
514219820Sjeff	int ret;
515219820Sjeff
516219820Sjeff	cb->pd = ibv_alloc_pd(cm_id->verbs);
517219820Sjeff	if (!cb->pd) {
518219820Sjeff		fprintf(stderr, "ibv_alloc_pd failed\n");
519219820Sjeff		return errno;
520219820Sjeff	}
521219820Sjeff	DEBUG_LOG("created pd %p\n", cb->pd);
522219820Sjeff
523219820Sjeff	cb->channel = ibv_create_comp_channel(cm_id->verbs);
524219820Sjeff	if (!cb->channel) {
525219820Sjeff		fprintf(stderr, "ibv_create_comp_channel failed\n");
526219820Sjeff		ret = errno;
527219820Sjeff		goto err1;
528219820Sjeff	}
529219820Sjeff	DEBUG_LOG("created channel %p\n", cb->channel);
530219820Sjeff
531219820Sjeff	cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb,
532219820Sjeff				cb->channel, 0);
533219820Sjeff	if (!cb->cq) {
534219820Sjeff		fprintf(stderr, "ibv_create_cq failed\n");
535219820Sjeff		ret = errno;
536219820Sjeff		goto err2;
537219820Sjeff	}
538219820Sjeff	DEBUG_LOG("created cq %p\n", cb->cq);
539219820Sjeff
540219820Sjeff	ret = ibv_req_notify_cq(cb->cq, 0);
541219820Sjeff	if (ret) {
542219820Sjeff		fprintf(stderr, "ibv_create_cq failed\n");
543219820Sjeff		ret = errno;
544219820Sjeff		goto err3;
545219820Sjeff	}
546219820Sjeff
547219820Sjeff	ret = rping_create_qp(cb);
548219820Sjeff	if (ret) {
549219820Sjeff		perror("rdma_create_qp");
550219820Sjeff		goto err3;
551219820Sjeff	}
552219820Sjeff	DEBUG_LOG("created qp %p\n", cb->qp);
553219820Sjeff	return 0;
554219820Sjeff
555219820Sjefferr3:
556219820Sjeff	ibv_destroy_cq(cb->cq);
557219820Sjefferr2:
558219820Sjeff	ibv_destroy_comp_channel(cb->channel);
559219820Sjefferr1:
560219820Sjeff	ibv_dealloc_pd(cb->pd);
561219820Sjeff	return ret;
562219820Sjeff}
563219820Sjeff
564219820Sjeffstatic void *cm_thread(void *arg)
565219820Sjeff{
566219820Sjeff	struct rping_cb *cb = arg;
567219820Sjeff	struct rdma_cm_event *event;
568219820Sjeff	int ret;
569219820Sjeff
570219820Sjeff	while (1) {
571219820Sjeff		ret = rdma_get_cm_event(cb->cm_channel, &event);
572219820Sjeff		if (ret) {
573219820Sjeff			perror("rdma_get_cm_event");
574219820Sjeff			exit(ret);
575219820Sjeff		}
576219820Sjeff		ret = rping_cma_event_handler(event->id, event);
577219820Sjeff		rdma_ack_cm_event(event);
578219820Sjeff		if (ret)
579219820Sjeff			exit(ret);
580219820Sjeff	}
581219820Sjeff}
582219820Sjeff
583219820Sjeffstatic void *cq_thread(void *arg)
584219820Sjeff{
585219820Sjeff	struct rping_cb *cb = arg;
586219820Sjeff	struct ibv_cq *ev_cq;
587219820Sjeff	void *ev_ctx;
588219820Sjeff	int ret;
589219820Sjeff
590219820Sjeff	DEBUG_LOG("cq_thread started.\n");
591219820Sjeff
592219820Sjeff	while (1) {
593219820Sjeff		pthread_testcancel();
594219820Sjeff
595219820Sjeff		ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
596219820Sjeff		if (ret) {
597219820Sjeff			fprintf(stderr, "Failed to get cq event!\n");
598219820Sjeff			pthread_exit(NULL);
599219820Sjeff		}
600219820Sjeff		if (ev_cq != cb->cq) {
601219820Sjeff			fprintf(stderr, "Unknown CQ!\n");
602219820Sjeff			pthread_exit(NULL);
603219820Sjeff		}
604219820Sjeff		ret = ibv_req_notify_cq(cb->cq, 0);
605219820Sjeff		if (ret) {
606219820Sjeff			fprintf(stderr, "Failed to set notify!\n");
607219820Sjeff			pthread_exit(NULL);
608219820Sjeff		}
609219820Sjeff		ret = rping_cq_event_handler(cb);
610219820Sjeff		ibv_ack_cq_events(cb->cq, 1);
611219820Sjeff		if (ret)
612219820Sjeff			pthread_exit(NULL);
613219820Sjeff	}
614219820Sjeff}
615219820Sjeff
616219820Sjeffstatic void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr)
617219820Sjeff{
618219820Sjeff	struct rping_rdma_info *info = &cb->send_buf;
619219820Sjeff
620219820Sjeff	info->buf = htonll((uint64_t) (unsigned long) buf);
621219820Sjeff	info->rkey = htonl(mr->rkey);
622219820Sjeff	info->size = htonl(cb->size);
623219820Sjeff
624219820Sjeff	DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n",
625219820Sjeff		  ntohll(info->buf), ntohl(info->rkey), ntohl(info->size));
626219820Sjeff}
627219820Sjeff
628219820Sjeffstatic int rping_test_server(struct rping_cb *cb)
629219820Sjeff{
630219820Sjeff	struct ibv_send_wr *bad_wr;
631219820Sjeff	int ret;
632219820Sjeff
633219820Sjeff	while (1) {
634219820Sjeff		/* Wait for client's Start STAG/TO/Len */
635219820Sjeff		sem_wait(&cb->sem);
636219820Sjeff		if (cb->state != RDMA_READ_ADV) {
637219820Sjeff			fprintf(stderr, "wait for RDMA_READ_ADV state %d\n",
638219820Sjeff				cb->state);
639219820Sjeff			ret = -1;
640219820Sjeff			break;
641219820Sjeff		}
642219820Sjeff
643219820Sjeff		DEBUG_LOG("server received sink adv\n");
644219820Sjeff
645219820Sjeff		/* Issue RDMA Read. */
646219820Sjeff		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
647219820Sjeff		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
648219820Sjeff		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
649219820Sjeff		cb->rdma_sq_wr.sg_list->length = cb->remote_len;
650219820Sjeff
651219820Sjeff		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
652219820Sjeff		if (ret) {
653219820Sjeff			fprintf(stderr, "post send error %d\n", ret);
654219820Sjeff			break;
655219820Sjeff		}
656219820Sjeff		DEBUG_LOG("server posted rdma read req \n");
657219820Sjeff
658219820Sjeff		/* Wait for read completion */
659219820Sjeff		sem_wait(&cb->sem);
660219820Sjeff		if (cb->state != RDMA_READ_COMPLETE) {
661219820Sjeff			fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n",
662219820Sjeff				cb->state);
663219820Sjeff			ret = -1;
664219820Sjeff			break;
665219820Sjeff		}
666219820Sjeff		DEBUG_LOG("server received read complete\n");
667219820Sjeff
668219820Sjeff		/* Display data in recv buf */
669219820Sjeff		if (cb->verbose)
670219820Sjeff			printf("server ping data: %s\n", cb->rdma_buf);
671219820Sjeff
672219820Sjeff		/* Tell client to continue */
673219820Sjeff		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
674219820Sjeff		if (ret) {
675219820Sjeff			fprintf(stderr, "post send error %d\n", ret);
676219820Sjeff			break;
677219820Sjeff		}
678219820Sjeff		DEBUG_LOG("server posted go ahead\n");
679219820Sjeff
680219820Sjeff		/* Wait for client's RDMA STAG/TO/Len */
681219820Sjeff		sem_wait(&cb->sem);
682219820Sjeff		if (cb->state != RDMA_WRITE_ADV) {
683219820Sjeff			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
684219820Sjeff				cb->state);
685219820Sjeff			ret = -1;
686219820Sjeff			break;
687219820Sjeff		}
688219820Sjeff		DEBUG_LOG("server received sink adv\n");
689219820Sjeff
690219820Sjeff		/* RDMA Write echo data */
691219820Sjeff		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
692219820Sjeff		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
693219820Sjeff		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
694219820Sjeff		cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1;
695219820Sjeff		DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n",
696219820Sjeff			  cb->rdma_sq_wr.sg_list->lkey,
697219820Sjeff			  cb->rdma_sq_wr.sg_list->addr,
698219820Sjeff			  cb->rdma_sq_wr.sg_list->length);
699219820Sjeff
700219820Sjeff		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
701219820Sjeff		if (ret) {
702219820Sjeff			fprintf(stderr, "post send error %d\n", ret);
703219820Sjeff			break;
704219820Sjeff		}
705219820Sjeff
706219820Sjeff		/* Wait for completion */
707219820Sjeff		ret = sem_wait(&cb->sem);
708219820Sjeff		if (cb->state != RDMA_WRITE_COMPLETE) {
709219820Sjeff			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
710219820Sjeff				cb->state);
711219820Sjeff			ret = -1;
712219820Sjeff			break;
713219820Sjeff		}
714219820Sjeff		DEBUG_LOG("server rdma write complete \n");
715219820Sjeff
716219820Sjeff		/* Tell client to begin again */
717219820Sjeff		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
718219820Sjeff		if (ret) {
719219820Sjeff			fprintf(stderr, "post send error %d\n", ret);
720219820Sjeff			break;
721219820Sjeff		}
722219820Sjeff		DEBUG_LOG("server posted go ahead\n");
723219820Sjeff	}
724219820Sjeff
725219820Sjeff	return ret;
726219820Sjeff}
727219820Sjeff
728219820Sjeffstatic int rping_bind_server(struct rping_cb *cb)
729219820Sjeff{
730219820Sjeff	int ret;
731219820Sjeff
732219820Sjeff	if (cb->sin.ss_family == AF_INET)
733219820Sjeff		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
734219820Sjeff	else
735219820Sjeff		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
736219820Sjeff
737219820Sjeff	ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin);
738219820Sjeff	if (ret) {
739219820Sjeff		perror("rdma_bind_addr");
740219820Sjeff		return ret;
741219820Sjeff	}
742219820Sjeff	DEBUG_LOG("rdma_bind_addr successful\n");
743219820Sjeff
744219820Sjeff	DEBUG_LOG("rdma_listen\n");
745219820Sjeff	ret = rdma_listen(cb->cm_id, 3);
746219820Sjeff	if (ret) {
747219820Sjeff		perror("rdma_listen");
748219820Sjeff		return ret;
749219820Sjeff	}
750219820Sjeff
751219820Sjeff	return 0;
752219820Sjeff}
753219820Sjeff
754219820Sjeffstatic struct rping_cb *clone_cb(struct rping_cb *listening_cb)
755219820Sjeff{
756219820Sjeff	struct rping_cb *cb = malloc(sizeof *cb);
757219820Sjeff	if (!cb)
758219820Sjeff		return NULL;
759219820Sjeff	*cb = *listening_cb;
760219820Sjeff	cb->child_cm_id->context = cb;
761219820Sjeff	return cb;
762219820Sjeff}
763219820Sjeff
764219820Sjeffstatic void free_cb(struct rping_cb *cb)
765219820Sjeff{
766219820Sjeff	free(cb);
767219820Sjeff}
768219820Sjeff
769219820Sjeffstatic void *rping_persistent_server_thread(void *arg)
770219820Sjeff{
771219820Sjeff	struct rping_cb *cb = arg;
772219820Sjeff	struct ibv_recv_wr *bad_wr;
773219820Sjeff	int ret;
774219820Sjeff
775219820Sjeff	ret = rping_setup_qp(cb, cb->child_cm_id);
776219820Sjeff	if (ret) {
777219820Sjeff		fprintf(stderr, "setup_qp failed: %d\n", ret);
778219820Sjeff		goto err0;
779219820Sjeff	}
780219820Sjeff
781219820Sjeff	ret = rping_setup_buffers(cb);
782219820Sjeff	if (ret) {
783219820Sjeff		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
784219820Sjeff		goto err1;
785219820Sjeff	}
786219820Sjeff
787219820Sjeff	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
788219820Sjeff	if (ret) {
789219820Sjeff		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
790219820Sjeff		goto err2;
791219820Sjeff	}
792219820Sjeff
793219820Sjeff	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
794219820Sjeff
795219820Sjeff	ret = rping_accept(cb);
796219820Sjeff	if (ret) {
797219820Sjeff		fprintf(stderr, "connect error %d\n", ret);
798219820Sjeff		goto err3;
799219820Sjeff	}
800219820Sjeff
801219820Sjeff	rping_test_server(cb);
802219820Sjeff	rdma_disconnect(cb->child_cm_id);
803219820Sjeff	rping_free_buffers(cb);
804219820Sjeff	rping_free_qp(cb);
805219820Sjeff	pthread_cancel(cb->cqthread);
806219820Sjeff	pthread_join(cb->cqthread, NULL);
807219820Sjeff	rdma_destroy_id(cb->child_cm_id);
808219820Sjeff	free_cb(cb);
809219820Sjeff	return NULL;
810219820Sjefferr3:
811219820Sjeff	pthread_cancel(cb->cqthread);
812219820Sjeff	pthread_join(cb->cqthread, NULL);
813219820Sjefferr2:
814219820Sjeff	rping_free_buffers(cb);
815219820Sjefferr1:
816219820Sjeff	rping_free_qp(cb);
817219820Sjefferr0:
818219820Sjeff	free_cb(cb);
819219820Sjeff	return NULL;
820219820Sjeff}
821219820Sjeff
822219820Sjeffstatic int rping_run_persistent_server(struct rping_cb *listening_cb)
823219820Sjeff{
824219820Sjeff	int ret;
825219820Sjeff	struct rping_cb *cb;
826219820Sjeff
827219820Sjeff	ret = rping_bind_server(listening_cb);
828219820Sjeff	if (ret)
829219820Sjeff		return ret;
830219820Sjeff
831219820Sjeff	while (1) {
832219820Sjeff		sem_wait(&listening_cb->sem);
833219820Sjeff		if (listening_cb->state != CONNECT_REQUEST) {
834219820Sjeff			fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
835219820Sjeff				listening_cb->state);
836219820Sjeff			return -1;
837219820Sjeff		}
838219820Sjeff
839219820Sjeff		cb = clone_cb(listening_cb);
840219820Sjeff		if (!cb)
841219820Sjeff			return -1;
842219820Sjeff		pthread_create(&cb->persistent_server_thread, NULL, rping_persistent_server_thread, cb);
843219820Sjeff	}
844219820Sjeff	return 0;
845219820Sjeff}
846219820Sjeff
847219820Sjeffstatic int rping_run_server(struct rping_cb *cb)
848219820Sjeff{
849219820Sjeff	struct ibv_recv_wr *bad_wr;
850219820Sjeff	int ret;
851219820Sjeff
852219820Sjeff	ret = rping_bind_server(cb);
853219820Sjeff	if (ret)
854219820Sjeff		return ret;
855219820Sjeff
856219820Sjeff	sem_wait(&cb->sem);
857219820Sjeff	if (cb->state != CONNECT_REQUEST) {
858219820Sjeff		fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
859219820Sjeff			cb->state);
860219820Sjeff		return -1;
861219820Sjeff	}
862219820Sjeff
863219820Sjeff	ret = rping_setup_qp(cb, cb->child_cm_id);
864219820Sjeff	if (ret) {
865219820Sjeff		fprintf(stderr, "setup_qp failed: %d\n", ret);
866219820Sjeff		return ret;
867219820Sjeff	}
868219820Sjeff
869219820Sjeff	ret = rping_setup_buffers(cb);
870219820Sjeff	if (ret) {
871219820Sjeff		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
872219820Sjeff		goto err1;
873219820Sjeff	}
874219820Sjeff
875219820Sjeff	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
876219820Sjeff	if (ret) {
877219820Sjeff		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
878219820Sjeff		goto err2;
879219820Sjeff	}
880219820Sjeff
881219820Sjeff	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
882219820Sjeff
883219820Sjeff	ret = rping_accept(cb);
884219820Sjeff	if (ret) {
885219820Sjeff		fprintf(stderr, "connect error %d\n", ret);
886219820Sjeff		goto err2;
887219820Sjeff	}
888219820Sjeff
889219820Sjeff	rping_test_server(cb);
890219820Sjeff	rdma_disconnect(cb->child_cm_id);
891219820Sjeff	rdma_destroy_id(cb->child_cm_id);
892219820Sjefferr2:
893219820Sjeff	rping_free_buffers(cb);
894219820Sjefferr1:
895219820Sjeff	rping_free_qp(cb);
896219820Sjeff
897219820Sjeff	return ret;
898219820Sjeff}
899219820Sjeff
900219820Sjeffstatic int rping_test_client(struct rping_cb *cb)
901219820Sjeff{
902219820Sjeff	int ping, start, cc, i, ret = 0;
903219820Sjeff	struct ibv_send_wr *bad_wr;
904219820Sjeff	unsigned char c;
905219820Sjeff
906219820Sjeff	start = 65;
907219820Sjeff	for (ping = 0; !cb->count || ping < cb->count; ping++) {
908219820Sjeff		cb->state = RDMA_READ_ADV;
909219820Sjeff
910219820Sjeff		/* Put some ascii text in the buffer. */
911219820Sjeff		cc = sprintf(cb->start_buf, RPING_MSG_FMT, ping);
912219820Sjeff		for (i = cc, c = start; i < cb->size; i++) {
913219820Sjeff			cb->start_buf[i] = c;
914219820Sjeff			c++;
915219820Sjeff			if (c > 122)
916219820Sjeff				c = 65;
917219820Sjeff		}
918219820Sjeff		start++;
919219820Sjeff		if (start > 122)
920219820Sjeff			start = 65;
921219820Sjeff		cb->start_buf[cb->size - 1] = 0;
922219820Sjeff
923219820Sjeff		rping_format_send(cb, cb->start_buf, cb->start_mr);
924219820Sjeff		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
925219820Sjeff		if (ret) {
926219820Sjeff			fprintf(stderr, "post send error %d\n", ret);
927219820Sjeff			break;
928219820Sjeff		}
929219820Sjeff
930219820Sjeff		/* Wait for server to ACK */
931219820Sjeff		sem_wait(&cb->sem);
932219820Sjeff		if (cb->state != RDMA_WRITE_ADV) {
933219820Sjeff			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
934219820Sjeff				cb->state);
935219820Sjeff			ret = -1;
936219820Sjeff			break;
937219820Sjeff		}
938219820Sjeff
939219820Sjeff		rping_format_send(cb, cb->rdma_buf, cb->rdma_mr);
940219820Sjeff		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
941219820Sjeff		if (ret) {
942219820Sjeff			fprintf(stderr, "post send error %d\n", ret);
943219820Sjeff			break;
944219820Sjeff		}
945219820Sjeff
946219820Sjeff		/* Wait for the server to say the RDMA Write is complete. */
947219820Sjeff		sem_wait(&cb->sem);
948219820Sjeff		if (cb->state != RDMA_WRITE_COMPLETE) {
949219820Sjeff			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
950219820Sjeff				cb->state);
951219820Sjeff			ret = -1;
952219820Sjeff			break;
953219820Sjeff		}
954219820Sjeff
955219820Sjeff		if (cb->validate)
956219820Sjeff			if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) {
957219820Sjeff				fprintf(stderr, "data mismatch!\n");
958219820Sjeff				ret = -1;
959219820Sjeff				break;
960219820Sjeff			}
961219820Sjeff
962219820Sjeff		if (cb->verbose)
963219820Sjeff			printf("ping data: %s\n", cb->rdma_buf);
964219820Sjeff	}
965219820Sjeff
966219820Sjeff	return ret;
967219820Sjeff}
968219820Sjeff
969219820Sjeffstatic int rping_connect_client(struct rping_cb *cb)
970219820Sjeff{
971219820Sjeff	struct rdma_conn_param conn_param;
972219820Sjeff	int ret;
973219820Sjeff
974219820Sjeff	memset(&conn_param, 0, sizeof conn_param);
975219820Sjeff	conn_param.responder_resources = 1;
976219820Sjeff	conn_param.initiator_depth = 1;
977219820Sjeff	conn_param.retry_count = 10;
978219820Sjeff
979219820Sjeff	ret = rdma_connect(cb->cm_id, &conn_param);
980219820Sjeff	if (ret) {
981219820Sjeff		perror("rdma_connect");
982219820Sjeff		return ret;
983219820Sjeff	}
984219820Sjeff
985219820Sjeff	sem_wait(&cb->sem);
986219820Sjeff	if (cb->state != CONNECTED) {
987219820Sjeff		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
988219820Sjeff		return -1;
989219820Sjeff	}
990219820Sjeff
991219820Sjeff	DEBUG_LOG("rmda_connect successful\n");
992219820Sjeff	return 0;
993219820Sjeff}
994219820Sjeff
995219820Sjeffstatic int rping_bind_client(struct rping_cb *cb)
996219820Sjeff{
997219820Sjeff	int ret;
998219820Sjeff
999219820Sjeff	if (cb->sin.ss_family == AF_INET)
1000219820Sjeff		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
1001219820Sjeff	else
1002219820Sjeff		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
1003219820Sjeff
1004219820Sjeff	ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000);
1005219820Sjeff	if (ret) {
1006219820Sjeff		perror("rdma_resolve_addr");
1007219820Sjeff		return ret;
1008219820Sjeff	}
1009219820Sjeff
1010219820Sjeff	sem_wait(&cb->sem);
1011219820Sjeff	if (cb->state != ROUTE_RESOLVED) {
1012219820Sjeff		fprintf(stderr, "waiting for addr/route resolution state %d\n",
1013219820Sjeff			cb->state);
1014219820Sjeff		return -1;
1015219820Sjeff	}
1016219820Sjeff
1017219820Sjeff	DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n");
1018219820Sjeff	return 0;
1019219820Sjeff}
1020219820Sjeff
1021219820Sjeffstatic int rping_run_client(struct rping_cb *cb)
1022219820Sjeff{
1023219820Sjeff	struct ibv_recv_wr *bad_wr;
1024219820Sjeff	int ret;
1025219820Sjeff
1026219820Sjeff	ret = rping_bind_client(cb);
1027219820Sjeff	if (ret)
1028219820Sjeff		return ret;
1029219820Sjeff
1030219820Sjeff	ret = rping_setup_qp(cb, cb->cm_id);
1031219820Sjeff	if (ret) {
1032219820Sjeff		fprintf(stderr, "setup_qp failed: %d\n", ret);
1033219820Sjeff		return ret;
1034219820Sjeff	}
1035219820Sjeff
1036219820Sjeff	ret = rping_setup_buffers(cb);
1037219820Sjeff	if (ret) {
1038219820Sjeff		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
1039219820Sjeff		goto err1;
1040219820Sjeff	}
1041219820Sjeff
1042219820Sjeff	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
1043219820Sjeff	if (ret) {
1044219820Sjeff		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
1045219820Sjeff		goto err2;
1046219820Sjeff	}
1047219820Sjeff
1048219820Sjeff	pthread_create(&cb->cqthread, NULL, cq_thread, cb);
1049219820Sjeff
1050219820Sjeff	ret = rping_connect_client(cb);
1051219820Sjeff	if (ret) {
1052219820Sjeff		fprintf(stderr, "connect error %d\n", ret);
1053219820Sjeff		goto err2;
1054219820Sjeff	}
1055219820Sjeff
1056219820Sjeff	rping_test_client(cb);
1057219820Sjeff	rdma_disconnect(cb->cm_id);
1058219820Sjefferr2:
1059219820Sjeff	rping_free_buffers(cb);
1060219820Sjefferr1:
1061219820Sjeff	rping_free_qp(cb);
1062219820Sjeff
1063219820Sjeff	return ret;
1064219820Sjeff}
1065219820Sjeff
1066219820Sjeffstatic int get_addr(char *dst, struct sockaddr *addr)
1067219820Sjeff{
1068219820Sjeff	struct addrinfo *res;
1069219820Sjeff	int ret;
1070219820Sjeff
1071219820Sjeff	ret = getaddrinfo(dst, NULL, NULL, &res);
1072219820Sjeff	if (ret) {
1073219820Sjeff		printf("getaddrinfo failed - invalid hostname or IP address\n");
1074219820Sjeff		return ret;
1075219820Sjeff	}
1076219820Sjeff
1077219820Sjeff	if (res->ai_family == PF_INET)
1078219820Sjeff		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in));
1079219820Sjeff	else if (res->ai_family == PF_INET6)
1080219820Sjeff		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6));
1081219820Sjeff	else
1082219820Sjeff		ret = -1;
1083219820Sjeff
1084219820Sjeff	freeaddrinfo(res);
1085219820Sjeff	return ret;
1086219820Sjeff}
1087219820Sjeff
1088219820Sjeffstatic void usage(char *name)
1089219820Sjeff{
1090219820Sjeff	printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n",
1091219820Sjeff	       name);
1092219820Sjeff	printf("%s -c [-vVd] [-S size] [-C count] -a addr [-p port]\n",
1093219820Sjeff	       name);
1094219820Sjeff	printf("\t-c\t\tclient side\n");
1095219820Sjeff	printf("\t-s\t\tserver side.  To bind to any address with IPv6 use -a ::0\n");
1096219820Sjeff	printf("\t-v\t\tdisplay ping data to stdout\n");
1097219820Sjeff	printf("\t-V\t\tvalidate ping data\n");
1098219820Sjeff	printf("\t-d\t\tdebug printfs\n");
1099219820Sjeff	printf("\t-S size \tping data size\n");
1100219820Sjeff	printf("\t-C count\tping count times\n");
1101219820Sjeff	printf("\t-a addr\t\taddress\n");
1102219820Sjeff	printf("\t-p port\t\tport\n");
1103219820Sjeff	printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
1104219820Sjeff}
1105219820Sjeff
1106219820Sjeffint main(int argc, char *argv[])
1107219820Sjeff{
1108219820Sjeff	struct rping_cb *cb;
1109219820Sjeff	int op;
1110219820Sjeff	int ret = 0;
1111219820Sjeff	int persistent_server = 0;
1112219820Sjeff
1113219820Sjeff	cb = malloc(sizeof(*cb));
1114219820Sjeff	if (!cb)
1115219820Sjeff		return -ENOMEM;
1116219820Sjeff
1117219820Sjeff	memset(cb, 0, sizeof(*cb));
1118219820Sjeff	cb->server = -1;
1119219820Sjeff	cb->state = IDLE;
1120219820Sjeff	cb->size = 64;
1121219820Sjeff	cb->sin.ss_family = PF_INET;
1122219820Sjeff	cb->port = htons(7174);
1123219820Sjeff	sem_init(&cb->sem, 0, 0);
1124219820Sjeff
1125219820Sjeff	opterr = 0;
1126219820Sjeff	while ((op=getopt(argc, argv, "a:Pp:C:S:t:scvVd")) != -1) {
1127219820Sjeff		switch (op) {
1128219820Sjeff		case 'a':
1129219820Sjeff			ret = get_addr(optarg, (struct sockaddr *) &cb->sin);
1130219820Sjeff			break;
1131219820Sjeff		case 'P':
1132219820Sjeff			persistent_server = 1;
1133219820Sjeff			break;
1134219820Sjeff		case 'p':
1135219820Sjeff			cb->port = htons(atoi(optarg));
1136219820Sjeff			DEBUG_LOG("port %d\n", (int) atoi(optarg));
1137219820Sjeff			break;
1138219820Sjeff		case 's':
1139219820Sjeff			cb->server = 1;
1140219820Sjeff			DEBUG_LOG("server\n");
1141219820Sjeff			break;
1142219820Sjeff		case 'c':
1143219820Sjeff			cb->server = 0;
1144219820Sjeff			DEBUG_LOG("client\n");
1145219820Sjeff			break;
1146219820Sjeff		case 'S':
1147219820Sjeff			cb->size = atoi(optarg);
1148219820Sjeff			if ((cb->size < RPING_MIN_BUFSIZE) ||
1149219820Sjeff			    (cb->size > (RPING_BUFSIZE - 1))) {
1150219820Sjeff				fprintf(stderr, "Invalid size %d "
1151275107Shselasky				       "(valid range is %d to %d)\n",
1152275107Shselasky				       (int)cb->size, (int)(RPING_MIN_BUFSIZE),
1153275107Shselasky				       (int)(RPING_BUFSIZE));
1154219820Sjeff				ret = EINVAL;
1155219820Sjeff			} else
1156219820Sjeff				DEBUG_LOG("size %d\n", (int) atoi(optarg));
1157219820Sjeff			break;
1158219820Sjeff		case 'C':
1159219820Sjeff			cb->count = atoi(optarg);
1160219820Sjeff			if (cb->count < 0) {
1161219820Sjeff				fprintf(stderr, "Invalid count %d\n",
1162219820Sjeff					cb->count);
1163219820Sjeff				ret = EINVAL;
1164219820Sjeff			} else
1165219820Sjeff				DEBUG_LOG("count %d\n", (int) cb->count);
1166219820Sjeff			break;
1167219820Sjeff		case 'v':
1168219820Sjeff			cb->verbose++;
1169219820Sjeff			DEBUG_LOG("verbose\n");
1170219820Sjeff			break;
1171219820Sjeff		case 'V':
1172219820Sjeff			cb->validate++;
1173219820Sjeff			DEBUG_LOG("validate data\n");
1174219820Sjeff			break;
1175219820Sjeff		case 'd':
1176219820Sjeff			debug++;
1177219820Sjeff			break;
1178219820Sjeff		default:
1179219820Sjeff			usage("rping");
1180219820Sjeff			ret = EINVAL;
1181219820Sjeff			goto out;
1182219820Sjeff		}
1183219820Sjeff	}
1184219820Sjeff	if (ret)
1185219820Sjeff		goto out;
1186219820Sjeff
1187219820Sjeff	if (cb->server == -1) {
1188219820Sjeff		usage("rping");
1189219820Sjeff		ret = EINVAL;
1190219820Sjeff		goto out;
1191219820Sjeff	}
1192219820Sjeff
1193219820Sjeff	cb->cm_channel = rdma_create_event_channel();
1194219820Sjeff	if (!cb->cm_channel) {
1195219820Sjeff		perror("rdma_create_event_channel");
1196219820Sjeff		goto out;
1197219820Sjeff	}
1198219820Sjeff
1199219820Sjeff	ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP);
1200219820Sjeff	if (ret) {
1201219820Sjeff		perror("rdma_create_id");
1202219820Sjeff		goto out2;
1203219820Sjeff	}
1204219820Sjeff	DEBUG_LOG("created cm_id %p\n", cb->cm_id);
1205219820Sjeff
1206219820Sjeff	pthread_create(&cb->cmthread, NULL, cm_thread, cb);
1207219820Sjeff
1208219820Sjeff	if (cb->server) {
1209219820Sjeff		if (persistent_server)
1210219820Sjeff			ret = rping_run_persistent_server(cb);
1211219820Sjeff		else
1212219820Sjeff			ret = rping_run_server(cb);
1213219820Sjeff	} else
1214219820Sjeff		ret = rping_run_client(cb);
1215219820Sjeff
1216219820Sjeff	DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
1217219820Sjeff	rdma_destroy_id(cb->cm_id);
1218219820Sjeffout2:
1219219820Sjeff	rdma_destroy_event_channel(cb->cm_channel);
1220219820Sjeffout:
1221219820Sjeff	free(cb);
1222219820Sjeff	return ret;
1223219820Sjeff}
1224