rping.c revision 331769
1/*
2 * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
3 * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses.  You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * OpenIB.org BSD license below:
10 *
11 *     Redistribution and use in source and binary forms, with or
12 *     without modification, are permitted provided that the following
13 *     conditions are met:
14 *
15 *      - Redistributions of source code must retain the above
16 *        copyright notice, this list of conditions and the following
17 *        disclaimer.
18 *
19 *      - Redistributions in binary form must reproduce the above
20 *        copyright notice, this list of conditions and the following
21 *        disclaimer in the documentation and/or other materials
22 *        provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33#define _GNU_SOURCE
34#include <infiniband/endian.h>
35#include <getopt.h>
36#include <stdlib.h>
37#include <string.h>
38#include <stdio.h>
39#include <errno.h>
40#include <sys/types.h>
41#include <sys/socket.h>
42#include <netdb.h>
43#include <semaphore.h>
44#include <pthread.h>
45#include <inttypes.h>
46#include <libgen.h>
47#include <rdma/rdma_cma.h>
48
49static int debug = 0;
50#define DEBUG_LOG if (debug) printf
51
52/*
53 * rping "ping/pong" loop:
54 * 	client sends source rkey/addr/len
55 *	server receives source rkey/add/len
56 *	server rdma reads "ping" data from source
57 * 	server sends "go ahead" on rdma read completion
58 *	client sends sink rkey/addr/len
59 * 	server receives sink rkey/addr/len
60 * 	server rdma writes "pong" data to sink
61 * 	server sends "go ahead" on rdma write completion
62 * 	<repeat loop>
63 */
64
65/*
66 * These states are used to signal events between the completion handler
67 * and the main client or server thread.
68 *
69 * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV,
70 * and RDMA_WRITE_COMPLETE for each ping.
71 */
72enum test_state {
73	IDLE = 1,
74	CONNECT_REQUEST,
75	ADDR_RESOLVED,
76	ROUTE_RESOLVED,
77	CONNECTED,
78	RDMA_READ_ADV,
79	RDMA_READ_COMPLETE,
80	RDMA_WRITE_ADV,
81	RDMA_WRITE_COMPLETE,
82	DISCONNECTED,
83	ERROR
84};
85
86struct rping_rdma_info {
87	__be64 buf;
88	__be32 rkey;
89	__be32 size;
90};
91
92/*
93 * Default max buffer size for IO...
94 */
95#define RPING_BUFSIZE 64*1024
96#define RPING_SQ_DEPTH 16
97
98/* Default string for print data and
99 * minimum buffer size
100 */
101#define _stringify( _x ) # _x
102#define stringify( _x ) _stringify( _x )
103
104#define RPING_MSG_FMT           "rdma-ping-%d: "
105#define RPING_MIN_BUFSIZE       sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT)
106
107/*
108 * Control block struct.
109 */
110struct rping_cb {
111	int server;			/* 0 iff client */
112	pthread_t cqthread;
113	pthread_t persistent_server_thread;
114	struct ibv_comp_channel *channel;
115	struct ibv_cq *cq;
116	struct ibv_pd *pd;
117	struct ibv_qp *qp;
118
119	struct ibv_recv_wr rq_wr;	/* recv work request record */
120	struct ibv_sge recv_sgl;	/* recv single SGE */
121	struct rping_rdma_info recv_buf;/* malloc'd buffer */
122	struct ibv_mr *recv_mr;		/* MR associated with this buffer */
123
124	struct ibv_send_wr sq_wr;	/* send work request record */
125	struct ibv_sge send_sgl;
126	struct rping_rdma_info send_buf;/* single send buf */
127	struct ibv_mr *send_mr;
128
129	struct ibv_send_wr rdma_sq_wr;	/* rdma work request record */
130	struct ibv_sge rdma_sgl;	/* rdma single SGE */
131	char *rdma_buf;			/* used as rdma sink */
132	struct ibv_mr *rdma_mr;
133
134	uint32_t remote_rkey;		/* remote guys RKEY */
135	uint64_t remote_addr;		/* remote guys TO */
136	uint32_t remote_len;		/* remote guys LEN */
137
138	char *start_buf;		/* rdma read src */
139	struct ibv_mr *start_mr;
140
141	enum test_state state;		/* used for cond/signalling */
142	sem_t sem;
143
144	struct sockaddr_storage sin;
145	struct sockaddr_storage ssource;
146	__be16 port;			/* dst port in NBO */
147	int verbose;			/* verbose logging */
148	int count;			/* ping count */
149	int size;			/* ping data size */
150	int validate;			/* validate ping data */
151
152	/* CM stuff */
153	pthread_t cmthread;
154	struct rdma_event_channel *cm_channel;
155	struct rdma_cm_id *cm_id;	/* connection on client side,*/
156					/* listener on service side. */
157	struct rdma_cm_id *child_cm_id;	/* connection on server side */
158};
159
160static int rping_cma_event_handler(struct rdma_cm_id *cma_id,
161				    struct rdma_cm_event *event)
162{
163	int ret = 0;
164	struct rping_cb *cb = cma_id->context;
165
166	DEBUG_LOG("cma_event type %s cma_id %p (%s)\n",
167		  rdma_event_str(event->event), cma_id,
168		  (cma_id == cb->cm_id) ? "parent" : "child");
169
170	switch (event->event) {
171	case RDMA_CM_EVENT_ADDR_RESOLVED:
172		cb->state = ADDR_RESOLVED;
173		ret = rdma_resolve_route(cma_id, 2000);
174		if (ret) {
175			cb->state = ERROR;
176			perror("rdma_resolve_route");
177			sem_post(&cb->sem);
178		}
179		break;
180
181	case RDMA_CM_EVENT_ROUTE_RESOLVED:
182		cb->state = ROUTE_RESOLVED;
183		sem_post(&cb->sem);
184		break;
185
186	case RDMA_CM_EVENT_CONNECT_REQUEST:
187		cb->state = CONNECT_REQUEST;
188		cb->child_cm_id = cma_id;
189		DEBUG_LOG("child cma %p\n", cb->child_cm_id);
190		sem_post(&cb->sem);
191		break;
192
193	case RDMA_CM_EVENT_ESTABLISHED:
194		DEBUG_LOG("ESTABLISHED\n");
195
196		/*
197		 * Server will wake up when first RECV completes.
198		 */
199		if (!cb->server) {
200			cb->state = CONNECTED;
201		}
202		sem_post(&cb->sem);
203		break;
204
205	case RDMA_CM_EVENT_ADDR_ERROR:
206	case RDMA_CM_EVENT_ROUTE_ERROR:
207	case RDMA_CM_EVENT_CONNECT_ERROR:
208	case RDMA_CM_EVENT_UNREACHABLE:
209	case RDMA_CM_EVENT_REJECTED:
210		fprintf(stderr, "cma event %s, error %d\n",
211			rdma_event_str(event->event), event->status);
212		sem_post(&cb->sem);
213		ret = -1;
214		break;
215
216	case RDMA_CM_EVENT_DISCONNECTED:
217		fprintf(stderr, "%s DISCONNECT EVENT...\n",
218			cb->server ? "server" : "client");
219		cb->state = DISCONNECTED;
220		sem_post(&cb->sem);
221		break;
222
223	case RDMA_CM_EVENT_DEVICE_REMOVAL:
224		fprintf(stderr, "cma detected device removal!!!!\n");
225		cb->state = ERROR;
226		sem_post(&cb->sem);
227		ret = -1;
228		break;
229
230	default:
231		fprintf(stderr, "unhandled event: %s, ignoring\n",
232			rdma_event_str(event->event));
233		break;
234	}
235
236	return ret;
237}
238
239static int server_recv(struct rping_cb *cb, struct ibv_wc *wc)
240{
241	if (wc->byte_len != sizeof(cb->recv_buf)) {
242		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
243		return -1;
244	}
245
246	cb->remote_rkey = be32toh(cb->recv_buf.rkey);
247	cb->remote_addr = be64toh(cb->recv_buf.buf);
248	cb->remote_len  = be32toh(cb->recv_buf.size);
249	DEBUG_LOG("Received rkey %x addr %" PRIx64 " len %d from peer\n",
250		  cb->remote_rkey, cb->remote_addr, cb->remote_len);
251
252	if (cb->state <= CONNECTED || cb->state == RDMA_WRITE_COMPLETE)
253		cb->state = RDMA_READ_ADV;
254	else
255		cb->state = RDMA_WRITE_ADV;
256
257	return 0;
258}
259
260static int client_recv(struct rping_cb *cb, struct ibv_wc *wc)
261{
262	if (wc->byte_len != sizeof(cb->recv_buf)) {
263		fprintf(stderr, "Received bogus data, size %d\n", wc->byte_len);
264		return -1;
265	}
266
267	if (cb->state == RDMA_READ_ADV)
268		cb->state = RDMA_WRITE_ADV;
269	else
270		cb->state = RDMA_WRITE_COMPLETE;
271
272	return 0;
273}
274
275static int rping_cq_event_handler(struct rping_cb *cb)
276{
277	struct ibv_wc wc;
278	struct ibv_recv_wr *bad_wr;
279	int ret;
280	int flushed = 0;
281
282	while ((ret = ibv_poll_cq(cb->cq, 1, &wc)) == 1) {
283		ret = 0;
284
285		if (wc.status) {
286			if (wc.status == IBV_WC_WR_FLUSH_ERR) {
287				flushed = 1;
288				continue;
289
290			}
291			fprintf(stderr,
292				"cq completion failed status %d\n",
293				wc.status);
294			ret = -1;
295			goto error;
296		}
297
298		switch (wc.opcode) {
299		case IBV_WC_SEND:
300			DEBUG_LOG("send completion\n");
301			break;
302
303		case IBV_WC_RDMA_WRITE:
304			DEBUG_LOG("rdma write completion\n");
305			cb->state = RDMA_WRITE_COMPLETE;
306			sem_post(&cb->sem);
307			break;
308
309		case IBV_WC_RDMA_READ:
310			DEBUG_LOG("rdma read completion\n");
311			cb->state = RDMA_READ_COMPLETE;
312			sem_post(&cb->sem);
313			break;
314
315		case IBV_WC_RECV:
316			DEBUG_LOG("recv completion\n");
317			ret = cb->server ? server_recv(cb, &wc) :
318					   client_recv(cb, &wc);
319			if (ret) {
320				fprintf(stderr, "recv wc error: %d\n", ret);
321				goto error;
322			}
323
324			ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
325			if (ret) {
326				fprintf(stderr, "post recv error: %d\n", ret);
327				goto error;
328			}
329			sem_post(&cb->sem);
330			break;
331
332		default:
333			DEBUG_LOG("unknown!!!!! completion\n");
334			ret = -1;
335			goto error;
336		}
337	}
338	if (ret) {
339		fprintf(stderr, "poll error %d\n", ret);
340		goto error;
341	}
342	return flushed;
343
344error:
345	cb->state = ERROR;
346	sem_post(&cb->sem);
347	return ret;
348}
349
350static int rping_accept(struct rping_cb *cb)
351{
352	int ret;
353
354	DEBUG_LOG("accepting client connection request\n");
355
356	ret = rdma_accept(cb->child_cm_id, NULL);
357	if (ret) {
358		perror("rdma_accept");
359		return ret;
360	}
361
362	sem_wait(&cb->sem);
363	if (cb->state == ERROR) {
364		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
365		return -1;
366	}
367	return 0;
368}
369
370static void rping_setup_wr(struct rping_cb *cb)
371{
372	cb->recv_sgl.addr = (uint64_t) (unsigned long) &cb->recv_buf;
373	cb->recv_sgl.length = sizeof cb->recv_buf;
374	cb->recv_sgl.lkey = cb->recv_mr->lkey;
375	cb->rq_wr.sg_list = &cb->recv_sgl;
376	cb->rq_wr.num_sge = 1;
377
378	cb->send_sgl.addr = (uint64_t) (unsigned long) &cb->send_buf;
379	cb->send_sgl.length = sizeof cb->send_buf;
380	cb->send_sgl.lkey = cb->send_mr->lkey;
381
382	cb->sq_wr.opcode = IBV_WR_SEND;
383	cb->sq_wr.send_flags = IBV_SEND_SIGNALED;
384	cb->sq_wr.sg_list = &cb->send_sgl;
385	cb->sq_wr.num_sge = 1;
386
387	cb->rdma_sgl.addr = (uint64_t) (unsigned long) cb->rdma_buf;
388	cb->rdma_sgl.lkey = cb->rdma_mr->lkey;
389	cb->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
390	cb->rdma_sq_wr.sg_list = &cb->rdma_sgl;
391	cb->rdma_sq_wr.num_sge = 1;
392}
393
394static int rping_setup_buffers(struct rping_cb *cb)
395{
396	int ret;
397
398	DEBUG_LOG("rping_setup_buffers called on cb %p\n", cb);
399
400	cb->recv_mr = ibv_reg_mr(cb->pd, &cb->recv_buf, sizeof cb->recv_buf,
401				 IBV_ACCESS_LOCAL_WRITE);
402	if (!cb->recv_mr) {
403		fprintf(stderr, "recv_buf reg_mr failed\n");
404		return errno;
405	}
406
407	cb->send_mr = ibv_reg_mr(cb->pd, &cb->send_buf, sizeof cb->send_buf, 0);
408	if (!cb->send_mr) {
409		fprintf(stderr, "send_buf reg_mr failed\n");
410		ret = errno;
411		goto err1;
412	}
413
414	cb->rdma_buf = malloc(cb->size);
415	if (!cb->rdma_buf) {
416		fprintf(stderr, "rdma_buf malloc failed\n");
417		ret = -ENOMEM;
418		goto err2;
419	}
420
421	cb->rdma_mr = ibv_reg_mr(cb->pd, cb->rdma_buf, cb->size,
422				 IBV_ACCESS_LOCAL_WRITE |
423				 IBV_ACCESS_REMOTE_READ |
424				 IBV_ACCESS_REMOTE_WRITE);
425	if (!cb->rdma_mr) {
426		fprintf(stderr, "rdma_buf reg_mr failed\n");
427		ret = errno;
428		goto err3;
429	}
430
431	if (!cb->server) {
432		cb->start_buf = malloc(cb->size);
433		if (!cb->start_buf) {
434			fprintf(stderr, "start_buf malloc failed\n");
435			ret = -ENOMEM;
436			goto err4;
437		}
438
439		cb->start_mr = ibv_reg_mr(cb->pd, cb->start_buf, cb->size,
440					  IBV_ACCESS_LOCAL_WRITE |
441					  IBV_ACCESS_REMOTE_READ |
442					  IBV_ACCESS_REMOTE_WRITE);
443		if (!cb->start_mr) {
444			fprintf(stderr, "start_buf reg_mr failed\n");
445			ret = errno;
446			goto err5;
447		}
448	}
449
450	rping_setup_wr(cb);
451	DEBUG_LOG("allocated & registered buffers...\n");
452	return 0;
453
454err5:
455	free(cb->start_buf);
456err4:
457	ibv_dereg_mr(cb->rdma_mr);
458err3:
459	free(cb->rdma_buf);
460err2:
461	ibv_dereg_mr(cb->send_mr);
462err1:
463	ibv_dereg_mr(cb->recv_mr);
464	return ret;
465}
466
467static void rping_free_buffers(struct rping_cb *cb)
468{
469	DEBUG_LOG("rping_free_buffers called on cb %p\n", cb);
470	ibv_dereg_mr(cb->recv_mr);
471	ibv_dereg_mr(cb->send_mr);
472	ibv_dereg_mr(cb->rdma_mr);
473	free(cb->rdma_buf);
474	if (!cb->server) {
475		ibv_dereg_mr(cb->start_mr);
476		free(cb->start_buf);
477	}
478}
479
480static int rping_create_qp(struct rping_cb *cb)
481{
482	struct ibv_qp_init_attr init_attr;
483	int ret;
484
485	memset(&init_attr, 0, sizeof(init_attr));
486	init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
487	init_attr.cap.max_recv_wr = 2;
488	init_attr.cap.max_recv_sge = 1;
489	init_attr.cap.max_send_sge = 1;
490	init_attr.qp_type = IBV_QPT_RC;
491	init_attr.send_cq = cb->cq;
492	init_attr.recv_cq = cb->cq;
493
494	if (cb->server) {
495		ret = rdma_create_qp(cb->child_cm_id, cb->pd, &init_attr);
496		if (!ret)
497			cb->qp = cb->child_cm_id->qp;
498	} else {
499		ret = rdma_create_qp(cb->cm_id, cb->pd, &init_attr);
500		if (!ret)
501			cb->qp = cb->cm_id->qp;
502	}
503
504	return ret;
505}
506
507static void rping_free_qp(struct rping_cb *cb)
508{
509	ibv_destroy_qp(cb->qp);
510	ibv_destroy_cq(cb->cq);
511	ibv_destroy_comp_channel(cb->channel);
512	ibv_dealloc_pd(cb->pd);
513}
514
515static int rping_setup_qp(struct rping_cb *cb, struct rdma_cm_id *cm_id)
516{
517	int ret;
518
519	cb->pd = ibv_alloc_pd(cm_id->verbs);
520	if (!cb->pd) {
521		fprintf(stderr, "ibv_alloc_pd failed\n");
522		return errno;
523	}
524	DEBUG_LOG("created pd %p\n", cb->pd);
525
526	cb->channel = ibv_create_comp_channel(cm_id->verbs);
527	if (!cb->channel) {
528		fprintf(stderr, "ibv_create_comp_channel failed\n");
529		ret = errno;
530		goto err1;
531	}
532	DEBUG_LOG("created channel %p\n", cb->channel);
533
534	cb->cq = ibv_create_cq(cm_id->verbs, RPING_SQ_DEPTH * 2, cb,
535				cb->channel, 0);
536	if (!cb->cq) {
537		fprintf(stderr, "ibv_create_cq failed\n");
538		ret = errno;
539		goto err2;
540	}
541	DEBUG_LOG("created cq %p\n", cb->cq);
542
543	ret = ibv_req_notify_cq(cb->cq, 0);
544	if (ret) {
545		fprintf(stderr, "ibv_create_cq failed\n");
546		ret = errno;
547		goto err3;
548	}
549
550	ret = rping_create_qp(cb);
551	if (ret) {
552		perror("rdma_create_qp");
553		goto err3;
554	}
555	DEBUG_LOG("created qp %p\n", cb->qp);
556	return 0;
557
558err3:
559	ibv_destroy_cq(cb->cq);
560err2:
561	ibv_destroy_comp_channel(cb->channel);
562err1:
563	ibv_dealloc_pd(cb->pd);
564	return ret;
565}
566
567static void *cm_thread(void *arg)
568{
569	struct rping_cb *cb = arg;
570	struct rdma_cm_event *event;
571	int ret;
572
573	while (1) {
574		ret = rdma_get_cm_event(cb->cm_channel, &event);
575		if (ret) {
576			perror("rdma_get_cm_event");
577			exit(ret);
578		}
579		ret = rping_cma_event_handler(event->id, event);
580		rdma_ack_cm_event(event);
581		if (ret)
582			exit(ret);
583	}
584}
585
586static void *cq_thread(void *arg)
587{
588	struct rping_cb *cb = arg;
589	struct ibv_cq *ev_cq;
590	void *ev_ctx;
591	int ret;
592
593	DEBUG_LOG("cq_thread started.\n");
594
595	while (1) {
596		pthread_testcancel();
597
598		ret = ibv_get_cq_event(cb->channel, &ev_cq, &ev_ctx);
599		if (ret) {
600			fprintf(stderr, "Failed to get cq event!\n");
601			pthread_exit(NULL);
602		}
603		if (ev_cq != cb->cq) {
604			fprintf(stderr, "Unknown CQ!\n");
605			pthread_exit(NULL);
606		}
607		ret = ibv_req_notify_cq(cb->cq, 0);
608		if (ret) {
609			fprintf(stderr, "Failed to set notify!\n");
610			pthread_exit(NULL);
611		}
612		ret = rping_cq_event_handler(cb);
613		ibv_ack_cq_events(cb->cq, 1);
614		if (ret)
615			pthread_exit(NULL);
616	}
617}
618
619static void rping_format_send(struct rping_cb *cb, char *buf, struct ibv_mr *mr)
620{
621	struct rping_rdma_info *info = &cb->send_buf;
622
623	info->buf = htobe64((uint64_t) (unsigned long) buf);
624	info->rkey = htobe32(mr->rkey);
625	info->size = htobe32(cb->size);
626
627	DEBUG_LOG("RDMA addr %" PRIx64" rkey %x len %d\n",
628		  be64toh(info->buf), be32toh(info->rkey), be32toh(info->size));
629}
630
631static int rping_test_server(struct rping_cb *cb)
632{
633	struct ibv_send_wr *bad_wr;
634	int ret;
635
636	while (1) {
637		/* Wait for client's Start STAG/TO/Len */
638		sem_wait(&cb->sem);
639		if (cb->state != RDMA_READ_ADV) {
640			fprintf(stderr, "wait for RDMA_READ_ADV state %d\n",
641				cb->state);
642			ret = -1;
643			break;
644		}
645
646		DEBUG_LOG("server received sink adv\n");
647
648		/* Issue RDMA Read. */
649		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
650		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
651		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
652		cb->rdma_sq_wr.sg_list->length = cb->remote_len;
653
654		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
655		if (ret) {
656			fprintf(stderr, "post send error %d\n", ret);
657			break;
658		}
659		DEBUG_LOG("server posted rdma read req \n");
660
661		/* Wait for read completion */
662		sem_wait(&cb->sem);
663		if (cb->state != RDMA_READ_COMPLETE) {
664			fprintf(stderr, "wait for RDMA_READ_COMPLETE state %d\n",
665				cb->state);
666			ret = -1;
667			break;
668		}
669		DEBUG_LOG("server received read complete\n");
670
671		/* Display data in recv buf */
672		if (cb->verbose)
673			printf("server ping data: %s\n", cb->rdma_buf);
674
675		/* Tell client to continue */
676		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
677		if (ret) {
678			fprintf(stderr, "post send error %d\n", ret);
679			break;
680		}
681		DEBUG_LOG("server posted go ahead\n");
682
683		/* Wait for client's RDMA STAG/TO/Len */
684		sem_wait(&cb->sem);
685		if (cb->state != RDMA_WRITE_ADV) {
686			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
687				cb->state);
688			ret = -1;
689			break;
690		}
691		DEBUG_LOG("server received sink adv\n");
692
693		/* RDMA Write echo data */
694		cb->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
695		cb->rdma_sq_wr.wr.rdma.rkey = cb->remote_rkey;
696		cb->rdma_sq_wr.wr.rdma.remote_addr = cb->remote_addr;
697		cb->rdma_sq_wr.sg_list->length = strlen(cb->rdma_buf) + 1;
698		DEBUG_LOG("rdma write from lkey %x laddr %" PRIx64 " len %d\n",
699			  cb->rdma_sq_wr.sg_list->lkey,
700			  cb->rdma_sq_wr.sg_list->addr,
701			  cb->rdma_sq_wr.sg_list->length);
702
703		ret = ibv_post_send(cb->qp, &cb->rdma_sq_wr, &bad_wr);
704		if (ret) {
705			fprintf(stderr, "post send error %d\n", ret);
706			break;
707		}
708
709		/* Wait for completion */
710		ret = sem_wait(&cb->sem);
711		if (cb->state != RDMA_WRITE_COMPLETE) {
712			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
713				cb->state);
714			ret = -1;
715			break;
716		}
717		DEBUG_LOG("server rdma write complete \n");
718
719		/* Tell client to begin again */
720		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
721		if (ret) {
722			fprintf(stderr, "post send error %d\n", ret);
723			break;
724		}
725		DEBUG_LOG("server posted go ahead\n");
726	}
727
728	return (cb->state == DISCONNECTED) ? 0 : ret;
729}
730
731static int rping_bind_server(struct rping_cb *cb)
732{
733	int ret;
734
735	if (cb->sin.ss_family == AF_INET)
736		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
737	else
738		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
739
740	ret = rdma_bind_addr(cb->cm_id, (struct sockaddr *) &cb->sin);
741	if (ret) {
742		perror("rdma_bind_addr");
743		return ret;
744	}
745	DEBUG_LOG("rdma_bind_addr successful\n");
746
747	DEBUG_LOG("rdma_listen\n");
748	ret = rdma_listen(cb->cm_id, 3);
749	if (ret) {
750		perror("rdma_listen");
751		return ret;
752	}
753
754	return 0;
755}
756
757static struct rping_cb *clone_cb(struct rping_cb *listening_cb)
758{
759	struct rping_cb *cb = malloc(sizeof *cb);
760	if (!cb)
761		return NULL;
762	memset(cb, 0, sizeof *cb);
763	*cb = *listening_cb;
764	cb->child_cm_id->context = cb;
765	return cb;
766}
767
768static void free_cb(struct rping_cb *cb)
769{
770	free(cb);
771}
772
773static void *rping_persistent_server_thread(void *arg)
774{
775	struct rping_cb *cb = arg;
776	struct ibv_recv_wr *bad_wr;
777	int ret;
778
779	ret = rping_setup_qp(cb, cb->child_cm_id);
780	if (ret) {
781		fprintf(stderr, "setup_qp failed: %d\n", ret);
782		goto err0;
783	}
784
785	ret = rping_setup_buffers(cb);
786	if (ret) {
787		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
788		goto err1;
789	}
790
791	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
792	if (ret) {
793		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
794		goto err2;
795	}
796
797	ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb);
798	if (ret) {
799		perror("pthread_create");
800		goto err2;
801	}
802
803	ret = rping_accept(cb);
804	if (ret) {
805		fprintf(stderr, "connect error %d\n", ret);
806		goto err3;
807	}
808
809	rping_test_server(cb);
810	rdma_disconnect(cb->child_cm_id);
811	pthread_join(cb->cqthread, NULL);
812	rping_free_buffers(cb);
813	rping_free_qp(cb);
814	rdma_destroy_id(cb->child_cm_id);
815	free_cb(cb);
816	return NULL;
817err3:
818	pthread_cancel(cb->cqthread);
819	pthread_join(cb->cqthread, NULL);
820err2:
821	rping_free_buffers(cb);
822err1:
823	rping_free_qp(cb);
824err0:
825	free_cb(cb);
826	return NULL;
827}
828
829static int rping_run_persistent_server(struct rping_cb *listening_cb)
830{
831	int ret;
832	struct rping_cb *cb;
833	pthread_attr_t attr;
834
835	ret = rping_bind_server(listening_cb);
836	if (ret)
837		return ret;
838
839	/*
840	 * Set persistent server threads to DEATCHED state so
841	 * they release all their resources when they exit.
842	 */
843	ret = pthread_attr_init(&attr);
844	if (ret) {
845		perror("pthread_attr_init");
846		return ret;
847	}
848	ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
849	if (ret) {
850		perror("pthread_attr_setdetachstate");
851		return ret;
852	}
853
854	while (1) {
855		sem_wait(&listening_cb->sem);
856		if (listening_cb->state != CONNECT_REQUEST) {
857			fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
858				listening_cb->state);
859			return -1;
860		}
861
862		cb = clone_cb(listening_cb);
863		if (!cb)
864			return -1;
865
866		ret = pthread_create(&cb->persistent_server_thread, &attr, rping_persistent_server_thread, cb);
867		if (ret) {
868			perror("pthread_create");
869			return ret;
870		}
871	}
872	return 0;
873}
874
875static int rping_run_server(struct rping_cb *cb)
876{
877	struct ibv_recv_wr *bad_wr;
878	int ret;
879
880	ret = rping_bind_server(cb);
881	if (ret)
882		return ret;
883
884	sem_wait(&cb->sem);
885	if (cb->state != CONNECT_REQUEST) {
886		fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
887			cb->state);
888		return -1;
889	}
890
891	ret = rping_setup_qp(cb, cb->child_cm_id);
892	if (ret) {
893		fprintf(stderr, "setup_qp failed: %d\n", ret);
894		return ret;
895	}
896
897	ret = rping_setup_buffers(cb);
898	if (ret) {
899		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
900		goto err1;
901	}
902
903	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
904	if (ret) {
905		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
906		goto err2;
907	}
908
909	ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb);
910	if (ret) {
911		perror("pthread_create");
912		goto err2;
913	}
914
915	ret = rping_accept(cb);
916	if (ret) {
917		fprintf(stderr, "connect error %d\n", ret);
918		goto err2;
919	}
920
921	ret = rping_test_server(cb);
922	if (ret) {
923		fprintf(stderr, "rping server failed: %d\n", ret);
924		goto err3;
925	}
926
927	ret = 0;
928err3:
929	rdma_disconnect(cb->child_cm_id);
930	pthread_join(cb->cqthread, NULL);
931	rdma_destroy_id(cb->child_cm_id);
932err2:
933	rping_free_buffers(cb);
934err1:
935	rping_free_qp(cb);
936
937	return ret;
938}
939
940static int rping_test_client(struct rping_cb *cb)
941{
942	int ping, start, cc, i, ret = 0;
943	struct ibv_send_wr *bad_wr;
944	unsigned char c;
945
946	start = 65;
947	for (ping = 0; !cb->count || ping < cb->count; ping++) {
948		cb->state = RDMA_READ_ADV;
949
950		/* Put some ascii text in the buffer. */
951		cc = snprintf(cb->start_buf, cb->size, RPING_MSG_FMT, ping);
952		for (i = cc, c = start; i < cb->size; i++) {
953			cb->start_buf[i] = c;
954			c++;
955			if (c > 122)
956				c = 65;
957		}
958		start++;
959		if (start > 122)
960			start = 65;
961		cb->start_buf[cb->size - 1] = 0;
962
963		rping_format_send(cb, cb->start_buf, cb->start_mr);
964		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
965		if (ret) {
966			fprintf(stderr, "post send error %d\n", ret);
967			break;
968		}
969
970		/* Wait for server to ACK */
971		sem_wait(&cb->sem);
972		if (cb->state != RDMA_WRITE_ADV) {
973			fprintf(stderr, "wait for RDMA_WRITE_ADV state %d\n",
974				cb->state);
975			ret = -1;
976			break;
977		}
978
979		rping_format_send(cb, cb->rdma_buf, cb->rdma_mr);
980		ret = ibv_post_send(cb->qp, &cb->sq_wr, &bad_wr);
981		if (ret) {
982			fprintf(stderr, "post send error %d\n", ret);
983			break;
984		}
985
986		/* Wait for the server to say the RDMA Write is complete. */
987		sem_wait(&cb->sem);
988		if (cb->state != RDMA_WRITE_COMPLETE) {
989			fprintf(stderr, "wait for RDMA_WRITE_COMPLETE state %d\n",
990				cb->state);
991			ret = -1;
992			break;
993		}
994
995		if (cb->validate)
996			if (memcmp(cb->start_buf, cb->rdma_buf, cb->size)) {
997				fprintf(stderr, "data mismatch!\n");
998				ret = -1;
999				break;
1000			}
1001
1002		if (cb->verbose)
1003			printf("ping data: %s\n", cb->rdma_buf);
1004	}
1005
1006	return (cb->state == DISCONNECTED) ? 0 : ret;
1007}
1008
1009static int rping_connect_client(struct rping_cb *cb)
1010{
1011	struct rdma_conn_param conn_param;
1012	int ret;
1013
1014	memset(&conn_param, 0, sizeof conn_param);
1015	conn_param.responder_resources = 1;
1016	conn_param.initiator_depth = 1;
1017	conn_param.retry_count = 7;
1018
1019	ret = rdma_connect(cb->cm_id, &conn_param);
1020	if (ret) {
1021		perror("rdma_connect");
1022		return ret;
1023	}
1024
1025	sem_wait(&cb->sem);
1026	if (cb->state != CONNECTED) {
1027		fprintf(stderr, "wait for CONNECTED state %d\n", cb->state);
1028		return -1;
1029	}
1030
1031	DEBUG_LOG("rmda_connect successful\n");
1032	return 0;
1033}
1034
1035static int rping_bind_client(struct rping_cb *cb)
1036{
1037	int ret;
1038
1039	if (cb->sin.ss_family == AF_INET)
1040		((struct sockaddr_in *) &cb->sin)->sin_port = cb->port;
1041	else
1042		((struct sockaddr_in6 *) &cb->sin)->sin6_port = cb->port;
1043
1044	if (cb->ssource.ss_family)
1045		ret = rdma_resolve_addr(cb->cm_id, (struct sockaddr *) &cb->ssource,
1046					(struct sockaddr *) &cb->sin, 2000);
1047	else
1048		ret = rdma_resolve_addr(cb->cm_id, NULL, (struct sockaddr *) &cb->sin, 2000);
1049
1050	if (ret) {
1051		perror("rdma_resolve_addr");
1052		return ret;
1053	}
1054
1055	sem_wait(&cb->sem);
1056	if (cb->state != ROUTE_RESOLVED) {
1057		fprintf(stderr, "waiting for addr/route resolution state %d\n",
1058			cb->state);
1059		return -1;
1060	}
1061
1062	DEBUG_LOG("rdma_resolve_addr - rdma_resolve_route successful\n");
1063	return 0;
1064}
1065
1066static int rping_run_client(struct rping_cb *cb)
1067{
1068	struct ibv_recv_wr *bad_wr;
1069	int ret;
1070
1071	ret = rping_bind_client(cb);
1072	if (ret)
1073		return ret;
1074
1075	ret = rping_setup_qp(cb, cb->cm_id);
1076	if (ret) {
1077		fprintf(stderr, "setup_qp failed: %d\n", ret);
1078		return ret;
1079	}
1080
1081	ret = rping_setup_buffers(cb);
1082	if (ret) {
1083		fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
1084		goto err1;
1085	}
1086
1087	ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
1088	if (ret) {
1089		fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
1090		goto err2;
1091	}
1092
1093	ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb);
1094	if (ret) {
1095		perror("pthread_create");
1096		goto err2;
1097	}
1098
1099	ret = rping_connect_client(cb);
1100	if (ret) {
1101		fprintf(stderr, "connect error %d\n", ret);
1102		goto err3;
1103	}
1104
1105	ret = rping_test_client(cb);
1106	if (ret) {
1107		fprintf(stderr, "rping client failed: %d\n", ret);
1108		goto err4;
1109	}
1110
1111	ret = 0;
1112err4:
1113	rdma_disconnect(cb->cm_id);
1114err3:
1115	pthread_join(cb->cqthread, NULL);
1116err2:
1117	rping_free_buffers(cb);
1118err1:
1119	rping_free_qp(cb);
1120
1121	return ret;
1122}
1123
1124static int get_addr(char *dst, struct sockaddr *addr)
1125{
1126	struct addrinfo *res;
1127	int ret;
1128
1129	ret = getaddrinfo(dst, NULL, NULL, &res);
1130	if (ret) {
1131		printf("getaddrinfo failed (%s) - invalid hostname or IP address\n", gai_strerror(ret));
1132		return ret;
1133	}
1134
1135	if (res->ai_family == PF_INET)
1136		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in));
1137	else if (res->ai_family == PF_INET6)
1138		memcpy(addr, res->ai_addr, sizeof(struct sockaddr_in6));
1139	else
1140		ret = -1;
1141
1142	freeaddrinfo(res);
1143	return ret;
1144}
1145
1146static void usage(const char *name)
1147{
1148	printf("%s -s [-vVd] [-S size] [-C count] [-a addr] [-p port]\n",
1149	       basename(name));
1150	printf("%s -c [-vVd] [-S size] [-C count] [-I addr] -a addr [-p port]\n",
1151	       basename(name));
1152	printf("\t-c\t\tclient side\n");
1153	printf("\t-I\t\tSource address to bind to for client.\n");
1154	printf("\t-s\t\tserver side.  To bind to any address with IPv6 use -a ::0\n");
1155	printf("\t-v\t\tdisplay ping data to stdout\n");
1156	printf("\t-V\t\tvalidate ping data\n");
1157	printf("\t-d\t\tdebug printfs\n");
1158	printf("\t-S size \tping data size\n");
1159	printf("\t-C count\tping count times\n");
1160	printf("\t-a addr\t\taddress\n");
1161	printf("\t-p port\t\tport\n");
1162	printf("\t-P\t\tpersistent server mode allowing multiple connections\n");
1163}
1164
1165int main(int argc, char *argv[])
1166{
1167	struct rping_cb *cb;
1168	int op;
1169	int ret = 0;
1170	int persistent_server = 0;
1171
1172	cb = malloc(sizeof(*cb));
1173	if (!cb)
1174		return -ENOMEM;
1175
1176	memset(cb, 0, sizeof(*cb));
1177	cb->server = -1;
1178	cb->state = IDLE;
1179	cb->size = 64;
1180	cb->sin.ss_family = PF_INET;
1181	cb->port = htobe16(7174);
1182	sem_init(&cb->sem, 0, 0);
1183
1184	opterr = 0;
1185	while ((op=getopt(argc, argv, "a:I:Pp:C:S:t:scvVd")) != -1) {
1186		switch (op) {
1187		case 'a':
1188			ret = get_addr(optarg, (struct sockaddr *) &cb->sin);
1189			break;
1190		case 'I':
1191			ret = get_addr(optarg, (struct sockaddr *) &cb->ssource);
1192			break;
1193		case 'P':
1194			persistent_server = 1;
1195			break;
1196		case 'p':
1197			cb->port = htobe16(atoi(optarg));
1198			DEBUG_LOG("port %d\n", (int) atoi(optarg));
1199			break;
1200		case 's':
1201			cb->server = 1;
1202			DEBUG_LOG("server\n");
1203			break;
1204		case 'c':
1205			cb->server = 0;
1206			DEBUG_LOG("client\n");
1207			break;
1208		case 'S':
1209			cb->size = atoi(optarg);
1210			if ((cb->size < RPING_MIN_BUFSIZE) ||
1211			    (cb->size > (RPING_BUFSIZE - 1))) {
1212				fprintf(stderr, "Invalid size %d "
1213				       "(valid range is %zd to %d)\n",
1214				       cb->size, RPING_MIN_BUFSIZE, RPING_BUFSIZE);
1215				ret = EINVAL;
1216			} else
1217				DEBUG_LOG("size %d\n", (int) atoi(optarg));
1218			break;
1219		case 'C':
1220			cb->count = atoi(optarg);
1221			if (cb->count < 0) {
1222				fprintf(stderr, "Invalid count %d\n",
1223					cb->count);
1224				ret = EINVAL;
1225			} else
1226				DEBUG_LOG("count %d\n", (int) cb->count);
1227			break;
1228		case 'v':
1229			cb->verbose++;
1230			DEBUG_LOG("verbose\n");
1231			break;
1232		case 'V':
1233			cb->validate++;
1234			DEBUG_LOG("validate data\n");
1235			break;
1236		case 'd':
1237			debug++;
1238			break;
1239		default:
1240			usage("rping");
1241			ret = EINVAL;
1242			goto out;
1243		}
1244	}
1245	if (ret)
1246		goto out;
1247
1248	if (cb->server == -1) {
1249		usage("rping");
1250		ret = EINVAL;
1251		goto out;
1252	}
1253
1254	cb->cm_channel = rdma_create_event_channel();
1255	if (!cb->cm_channel) {
1256		perror("rdma_create_event_channel");
1257		ret = errno;
1258		goto out;
1259	}
1260
1261	ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP);
1262	if (ret) {
1263		perror("rdma_create_id");
1264		goto out2;
1265	}
1266	DEBUG_LOG("created cm_id %p\n", cb->cm_id);
1267
1268	ret = pthread_create(&cb->cmthread, NULL, cm_thread, cb);
1269	if (ret) {
1270		perror("pthread_create");
1271		goto out2;
1272	}
1273
1274	if (cb->server) {
1275		if (persistent_server)
1276			ret = rping_run_persistent_server(cb);
1277		else
1278			ret = rping_run_server(cb);
1279	} else {
1280		ret = rping_run_client(cb);
1281	}
1282
1283	DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
1284	rdma_destroy_id(cb->cm_id);
1285out2:
1286	rdma_destroy_event_channel(cb->cm_channel);
1287out:
1288	free(cb);
1289	return ret;
1290}
1291