xsrq_pingpong.c revision 331769
1/*
2 * Copyright (c) 2005 Topspin Communications.  All rights reserved.
3 * Copyright (c) 2011 Intel Corporation, Inc.  All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses.  You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * OpenIB.org BSD license below:
10 *
11 *     Redistribution and use in source and binary forms, with or
12 *     without modification, are permitted provided that the following
13 *     conditions are met:
14 *
15 *      - Redistributions of source code must retain the above
16 *        copyright notice, this list of conditions and the following
17 *        disclaimer.
18 *
19 *      - Redistributions in binary form must reproduce the above
20 *        copyright notice, this list of conditions and the following
21 *        disclaimer in the documentation and/or other materials
22 *        provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33#define _GNU_SOURCE
34#include <config.h>
35
36#include <stdio.h>
37#include <fcntl.h>
38#include <errno.h>
39#include <stdlib.h>
40#include <unistd.h>
41#include <string.h>
42#include <sys/types.h>
43#include <sys/socket.h>
44#include <sys/time.h>
45#include <netdb.h>
46#include <malloc.h>
47#include <getopt.h>
48#include <arpa/inet.h>
49#include <time.h>
50
51#include "pingpong.h"
52
53#define MSG_FORMAT "%04x:%06x:%06x:%06x:%06x:%32s"
54#define MSG_SIZE   66
55#define MSG_SSCAN  "%x:%x:%x:%x:%x:%s"
56#define ADDR_FORMAT \
57	"%8s: LID %04x, QPN RECV %06x SEND %06x, PSN %06x, SRQN %06x, GID %s\n"
58#define TERMINATION_FORMAT "%s"
59#define TERMINATION_MSG_SIZE 4
60#define TERMINATION_MSG "END"
61static int page_size;
62
63struct pingpong_dest {
64	union ibv_gid gid;
65	int lid;
66	int recv_qpn;
67	int send_qpn;
68	int recv_psn;
69	int send_psn;
70	int srqn;
71	int pp_cnt;
72	int sockfd;
73};
74
75struct pingpong_context {
76	struct ibv_context	*context;
77	struct ibv_comp_channel *channel;
78	struct ibv_pd		*pd;
79	struct ibv_mr		*mr;
80	struct ibv_cq		*send_cq;
81	struct ibv_cq		*recv_cq;
82	struct ibv_srq		*srq;
83	struct ibv_xrcd		*xrcd;
84	struct ibv_qp		**recv_qp;
85	struct ibv_qp		**send_qp;
86	struct pingpong_dest	*rem_dest;
87	void			*buf;
88	int			 lid;
89	int			 sl;
90	enum ibv_mtu		 mtu;
91	int			 ib_port;
92	int			 fd;
93	int			 size;
94	int			 num_clients;
95	int			 num_tests;
96	int			 use_event;
97	int			 gidx;
98};
99
100static struct pingpong_context ctx;
101
102
103static int open_device(char *ib_devname)
104{
105	struct ibv_device **dev_list;
106	int i = 0;
107
108	dev_list = ibv_get_device_list(NULL);
109	if (!dev_list) {
110		fprintf(stderr, "Failed to get IB devices list");
111		return -1;
112	}
113
114	if (ib_devname) {
115		for (; dev_list[i]; ++i) {
116			if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname))
117				break;
118		}
119	}
120	if (!dev_list[i]) {
121		fprintf(stderr, "IB device %s not found\n",
122			ib_devname ? ib_devname : "");
123		return -1;
124	}
125
126	ctx.context = ibv_open_device(dev_list[i]);
127	if (!ctx.context) {
128		fprintf(stderr, "Couldn't get context for %s\n",
129			ibv_get_device_name(dev_list[i]));
130		return -1;
131	}
132
133	ibv_free_device_list(dev_list);
134	return 0;
135}
136
137static int create_qps(void)
138{
139	struct ibv_qp_init_attr_ex init;
140	struct ibv_qp_attr mod;
141	int i;
142
143	for (i = 0; i < ctx.num_clients; ++i) {
144
145		memset(&init, 0, sizeof init);
146		init.qp_type = IBV_QPT_XRC_RECV;
147		init.comp_mask = IBV_QP_INIT_ATTR_XRCD;
148		init.xrcd = ctx.xrcd;
149
150		ctx.recv_qp[i] = ibv_create_qp_ex(ctx.context, &init);
151		if (!ctx.recv_qp[i])  {
152			fprintf(stderr, "Couldn't create recv QP[%d] errno %d\n",
153				i, errno);
154			return 1;
155		}
156
157		mod.qp_state        = IBV_QPS_INIT;
158		mod.pkey_index      = 0;
159		mod.port_num        = ctx.ib_port;
160		mod.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
161
162		if (ibv_modify_qp(ctx.recv_qp[i], &mod,
163				  IBV_QP_STATE | IBV_QP_PKEY_INDEX |
164				  IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)) {
165			fprintf(stderr, "Failed to modify recv QP[%d] to INIT\n", i);
166			return 1;
167		}
168
169		memset(&init, 0, sizeof init);
170		init.qp_type	      = IBV_QPT_XRC_SEND;
171		init.send_cq	      = ctx.send_cq;
172		init.cap.max_send_wr  = ctx.num_clients * ctx.num_tests;
173		init.cap.max_send_sge = 1;
174		init.comp_mask	      = IBV_QP_INIT_ATTR_PD;
175		init.pd		      = ctx.pd;
176
177		ctx.send_qp[i] = ibv_create_qp_ex(ctx.context, &init);
178		if (!ctx.send_qp[i])  {
179			fprintf(stderr, "Couldn't create send QP[%d] errno %d\n",
180				i, errno);
181			return 1;
182		}
183
184		mod.qp_state        = IBV_QPS_INIT;
185		mod.pkey_index      = 0;
186		mod.port_num        = ctx.ib_port;
187		mod.qp_access_flags = 0;
188
189		if (ibv_modify_qp(ctx.send_qp[i], &mod,
190				  IBV_QP_STATE | IBV_QP_PKEY_INDEX |
191				  IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)) {
192			fprintf(stderr, "Failed to modify send QP[%d] to INIT\n", i);
193			return 1;
194		}
195	}
196
197	return 0;
198}
199
200static int pp_init_ctx(char *ib_devname)
201{
202	struct ibv_srq_init_attr_ex attr;
203	struct ibv_xrcd_init_attr xrcd_attr;
204	struct ibv_port_attr port_attr;
205
206	ctx.recv_qp = calloc(ctx.num_clients, sizeof *ctx.recv_qp);
207	ctx.send_qp = calloc(ctx.num_clients, sizeof *ctx.send_qp);
208	ctx.rem_dest = calloc(ctx.num_clients, sizeof *ctx.rem_dest);
209	if (!ctx.recv_qp || !ctx.send_qp || !ctx.rem_dest)
210		return 1;
211
212	if (open_device(ib_devname)) {
213		fprintf(stderr, "Failed to open device\n");
214		return 1;
215	}
216
217	if (pp_get_port_info(ctx.context, ctx.ib_port, &port_attr)) {
218		fprintf(stderr, "Failed to get port info\n");
219		return 1;
220	}
221
222	ctx.lid = port_attr.lid;
223	if (port_attr.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx.lid) {
224		fprintf(stderr, "Couldn't get local LID\n");
225		return 1;
226	}
227
228	ctx.buf = memalign(page_size, ctx.size);
229	if (!ctx.buf) {
230		fprintf(stderr, "Couldn't allocate work buf.\n");
231		return 1;
232	}
233
234	memset(ctx.buf, 0, ctx.size);
235
236	if (ctx.use_event) {
237		ctx.channel = ibv_create_comp_channel(ctx.context);
238		if (!ctx.channel) {
239			fprintf(stderr, "Couldn't create completion channel\n");
240			return 1;
241		}
242	}
243
244	ctx.pd = ibv_alloc_pd(ctx.context);
245	if (!ctx.pd) {
246		fprintf(stderr, "Couldn't allocate PD\n");
247		return 1;
248	}
249
250	ctx.mr = ibv_reg_mr(ctx.pd, ctx.buf, ctx.size, IBV_ACCESS_LOCAL_WRITE);
251	if (!ctx.mr) {
252		fprintf(stderr, "Couldn't register MR\n");
253		return 1;
254	}
255
256	ctx.fd = open("/tmp/xrc_domain", O_RDONLY | O_CREAT, S_IRUSR | S_IRGRP);
257	if (ctx.fd < 0) {
258		fprintf(stderr,
259			"Couldn't create the file for the XRC Domain "
260			"but not stopping %d\n", errno);
261		ctx.fd = -1;
262	}
263
264	memset(&xrcd_attr, 0, sizeof xrcd_attr);
265	xrcd_attr.comp_mask = IBV_XRCD_INIT_ATTR_FD | IBV_XRCD_INIT_ATTR_OFLAGS;
266	xrcd_attr.fd = ctx.fd;
267	xrcd_attr.oflags = O_CREAT;
268	ctx.xrcd = ibv_open_xrcd(ctx.context, &xrcd_attr);
269	if (!ctx.xrcd) {
270		fprintf(stderr, "Couldn't Open the XRC Domain %d\n", errno);
271		return 1;
272	}
273
274	ctx.recv_cq = ibv_create_cq(ctx.context, ctx.num_clients, &ctx.recv_cq,
275				    ctx.channel, 0);
276	if (!ctx.recv_cq) {
277		fprintf(stderr, "Couldn't create recv CQ\n");
278		return 1;
279	}
280
281	if (ctx.use_event) {
282		if (ibv_req_notify_cq(ctx.recv_cq, 0)) {
283			fprintf(stderr, "Couldn't request CQ notification\n");
284			return 1;
285		}
286	}
287
288	ctx.send_cq = ibv_create_cq(ctx.context, ctx.num_clients, NULL, NULL, 0);
289	if (!ctx.send_cq) {
290		fprintf(stderr, "Couldn't create send CQ\n");
291		return 1;
292	}
293
294	memset(&attr, 0, sizeof attr);
295	attr.attr.max_wr = ctx.num_clients;
296	attr.attr.max_sge = 1;
297	attr.comp_mask = IBV_SRQ_INIT_ATTR_TYPE | IBV_SRQ_INIT_ATTR_XRCD |
298			 IBV_SRQ_INIT_ATTR_CQ | IBV_SRQ_INIT_ATTR_PD;
299	attr.srq_type = IBV_SRQT_XRC;
300	attr.xrcd = ctx.xrcd;
301	attr.cq = ctx.recv_cq;
302	attr.pd = ctx.pd;
303
304	ctx.srq = ibv_create_srq_ex(ctx.context, &attr);
305	if (!ctx.srq)  {
306		fprintf(stderr, "Couldn't create SRQ\n");
307		return 1;
308	}
309
310	if (create_qps())
311		return 1;
312
313	return 0;
314}
315
316static int recv_termination_ack(int index)
317{
318	char msg[TERMINATION_MSG_SIZE];
319	int n = 0, r;
320	int sockfd = ctx.rem_dest[index].sockfd;
321
322	while (n < TERMINATION_MSG_SIZE) {
323		r = read(sockfd, msg + n, TERMINATION_MSG_SIZE - n);
324		if (r < 0) {
325			perror("client read");
326			fprintf(stderr,
327				"%d/%d: Couldn't read remote termination ack\n",
328				n, TERMINATION_MSG_SIZE);
329			return 1;
330		}
331		n += r;
332	}
333
334	if (strcmp(msg, TERMINATION_MSG)) {
335		fprintf(stderr, "Invalid termination ack was accepted\n");
336		return 1;
337	}
338
339	return 0;
340}
341
342static int send_termination_ack(int index)
343{
344	char msg[TERMINATION_MSG_SIZE];
345	int sockfd = ctx.rem_dest[index].sockfd;
346
347	sprintf(msg, TERMINATION_FORMAT, TERMINATION_MSG);
348
349	if (write(sockfd, msg, TERMINATION_MSG_SIZE) != TERMINATION_MSG_SIZE) {
350		fprintf(stderr, "Couldn't send termination ack\n");
351		return 1;
352	}
353
354	return 0;
355}
356
357static int pp_client_termination(void)
358{
359	if (send_termination_ack(0))
360		return 1;
361	if (recv_termination_ack(0))
362		return 1;
363
364	return 0;
365}
366
367static int pp_server_termination(void)
368{
369	int i;
370
371	for (i = 0; i < ctx.num_clients; i++) {
372		if (recv_termination_ack(i))
373			return 1;
374	}
375
376	for (i = 0; i < ctx.num_clients; i++) {
377		if (send_termination_ack(i))
378			return 1;
379	}
380
381	return 0;
382}
383
384static int send_local_dest(int sockfd, int index)
385{
386	char msg[MSG_SIZE];
387	char gid[33];
388	uint32_t srq_num;
389	union ibv_gid local_gid;
390
391	if (ctx.gidx >= 0) {
392		if (ibv_query_gid(ctx.context, ctx.ib_port, ctx.gidx,
393				  &local_gid)) {
394			fprintf(stderr, "can't read sgid of index %d\n",
395				ctx.gidx);
396			return -1;
397		}
398	} else {
399		memset(&local_gid, 0, sizeof(local_gid));
400	}
401
402	ctx.rem_dest[index].recv_psn = lrand48() & 0xffffff;
403	if (ibv_get_srq_num(ctx.srq, &srq_num)) {
404		fprintf(stderr, "Couldn't get SRQ num\n");
405		return -1;
406	}
407
408	inet_ntop(AF_INET6, &local_gid, gid, sizeof(gid));
409	printf(ADDR_FORMAT, "local", ctx.lid, ctx.recv_qp[index]->qp_num,
410		ctx.send_qp[index]->qp_num, ctx.rem_dest[index].recv_psn,
411		srq_num, gid);
412
413	gid_to_wire_gid(&local_gid, gid);
414	sprintf(msg, MSG_FORMAT, ctx.lid, ctx.recv_qp[index]->qp_num,
415		ctx.send_qp[index]->qp_num, ctx.rem_dest[index].recv_psn,
416		srq_num, gid);
417
418	if (write(sockfd, msg, MSG_SIZE) != MSG_SIZE) {
419		fprintf(stderr, "Couldn't send local address\n");
420		return -1;
421	}
422
423	return 0;
424}
425
426static int recv_remote_dest(int sockfd, int index)
427{
428	struct pingpong_dest *rem_dest;
429	char msg[MSG_SIZE];
430	char gid[33];
431	int n = 0, r;
432
433	while (n < MSG_SIZE) {
434		r = read(sockfd, msg + n, MSG_SIZE - n);
435		if (r < 0) {
436			perror("client read");
437			fprintf(stderr,
438				"%d/%d: Couldn't read remote address [%d]\n",
439				n, MSG_SIZE, index);
440			return -1;
441		}
442		n += r;
443	}
444
445	rem_dest = &ctx.rem_dest[index];
446	sscanf(msg, MSG_SSCAN, &rem_dest->lid, &rem_dest->recv_qpn,
447		&rem_dest->send_qpn, &rem_dest->send_psn, &rem_dest->srqn, gid);
448
449	wire_gid_to_gid(gid, &rem_dest->gid);
450	inet_ntop(AF_INET6, &rem_dest->gid, gid, sizeof(gid));
451	printf(ADDR_FORMAT, "remote", rem_dest->lid, rem_dest->recv_qpn,
452		rem_dest->send_qpn, rem_dest->send_psn, rem_dest->srqn,
453		gid);
454
455	rem_dest->sockfd = sockfd;
456	return 0;
457}
458
459static void set_ah_attr(struct ibv_ah_attr *attr, struct pingpong_context *myctx,
460			int index)
461{
462	attr->is_global = 1;
463	attr->grh.hop_limit = 5;
464	attr->grh.dgid = myctx->rem_dest[index].gid;
465	attr->grh.sgid_index = myctx->gidx;
466}
467
468static int connect_qps(int index)
469{
470	struct ibv_qp_attr attr;
471
472	memset(&attr, 0, sizeof attr);
473	attr.qp_state	      = IBV_QPS_RTR;
474	attr.dest_qp_num      = ctx.rem_dest[index].send_qpn;
475	attr.path_mtu	      = ctx.mtu;
476	attr.rq_psn	      = ctx.rem_dest[index].send_psn;
477	attr.min_rnr_timer    = 12;
478	attr.ah_attr.dlid     = ctx.rem_dest[index].lid;
479	attr.ah_attr.sl	      = ctx.sl;
480	attr.ah_attr.port_num = ctx.ib_port;
481
482	if (ctx.rem_dest[index].gid.global.interface_id)
483		set_ah_attr(&attr.ah_attr, &ctx, index);
484
485	if (ibv_modify_qp(ctx.recv_qp[index], &attr,
486			  IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU |
487			  IBV_QP_DEST_QPN | IBV_QP_RQ_PSN |
488			  IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER)) {
489		fprintf(stderr, "Failed to modify recv QP[%d] to RTR\n", index);
490		return 1;
491	}
492
493	memset(&attr, 0, sizeof attr);
494	attr.qp_state = IBV_QPS_RTS;
495	attr.timeout = 14;
496	attr.sq_psn = ctx.rem_dest[index].recv_psn;
497
498	if (ibv_modify_qp(ctx.recv_qp[index], &attr,
499			  IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_SQ_PSN)) {
500		fprintf(stderr, "Failed to modify recv QP[%d] to RTS\n", index);
501		return 1;
502	}
503
504	memset(&attr, 0, sizeof attr);
505	attr.qp_state	      = IBV_QPS_RTR;
506	attr.dest_qp_num      = ctx.rem_dest[index].recv_qpn;
507	attr.path_mtu	      = ctx.mtu;
508	attr.rq_psn	      = ctx.rem_dest[index].send_psn;
509	attr.ah_attr.dlid     = ctx.rem_dest[index].lid;
510	attr.ah_attr.sl	      = ctx.sl;
511	attr.ah_attr.port_num = ctx.ib_port;
512
513	if (ctx.rem_dest[index].gid.global.interface_id)
514		set_ah_attr(&attr.ah_attr, &ctx, index);
515
516	if (ibv_modify_qp(ctx.send_qp[index], &attr,
517			  IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU |
518			  IBV_QP_DEST_QPN | IBV_QP_RQ_PSN)) {
519		fprintf(stderr, "Failed to modify send QP[%d] to RTR\n", index);
520		return 1;
521	}
522
523	memset(&attr, 0, sizeof attr);
524	attr.qp_state = IBV_QPS_RTS;
525	attr.timeout = 14;
526	attr.retry_cnt = 7;
527	attr.rnr_retry = 7;
528	attr.sq_psn = ctx.rem_dest[index].recv_psn;
529
530	if (ibv_modify_qp(ctx.send_qp[index], &attr,
531			  IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_SQ_PSN |
532			  IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_MAX_QP_RD_ATOMIC)) {
533		fprintf(stderr, "Failed to modify send QP[%d] to RTS\n", index);
534		return 1;
535	}
536
537	return 0;
538}
539
540static int pp_client_connect(const char *servername, int port)
541{
542	struct addrinfo *res, *t;
543	char *service;
544	int ret;
545	int sockfd = -1;
546	struct addrinfo hints = {
547		.ai_family   = AF_INET,
548		.ai_socktype = SOCK_STREAM
549	};
550
551	if (asprintf(&service, "%d", port) < 0)
552		return 1;
553
554	ret = getaddrinfo(servername, service, &hints, &res);
555	if (ret < 0) {
556		fprintf(stderr, "%s for %s:%d\n", gai_strerror(ret), servername, port);
557		free(service);
558		return 1;
559	}
560
561	for (t = res; t; t = t->ai_next) {
562		sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
563		if (sockfd >= 0) {
564			if (!connect(sockfd, t->ai_addr, t->ai_addrlen))
565				break;
566			close(sockfd);
567			sockfd = -1;
568		}
569	}
570
571	freeaddrinfo_null(res);
572	free(service);
573
574	if (sockfd < 0) {
575		fprintf(stderr, "Couldn't connect to %s:%d\n", servername, port);
576		return 1;
577	}
578
579	if (send_local_dest(sockfd, 0))
580		return 1;
581
582	if (recv_remote_dest(sockfd, 0))
583		return 1;
584
585	if (connect_qps(0))
586		return 1;
587
588	return 0;
589}
590
591static int pp_server_connect(int port)
592{
593	struct addrinfo *res, *t;
594	char *service;
595	int ret, i, n;
596	int sockfd = -1, connfd;
597	struct addrinfo hints = {
598		.ai_flags    = AI_PASSIVE,
599		.ai_family   = AF_INET,
600		.ai_socktype = SOCK_STREAM
601	};
602
603	if (asprintf(&service, "%d", port) < 0)
604		return 1;
605
606	ret = getaddrinfo(NULL, service, &hints, &res);
607	if (ret < 0) {
608		fprintf(stderr, "%s for port %d\n", gai_strerror(ret), port);
609		free(service);
610		return 1;
611	}
612
613	for (t = res; t; t = t->ai_next) {
614		sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
615		if (sockfd >= 0) {
616			n = 1;
617			setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n);
618			if (!bind(sockfd, t->ai_addr, t->ai_addrlen))
619				break;
620			close(sockfd);
621			sockfd = -1;
622		}
623	}
624
625	freeaddrinfo_null(res);
626	free(service);
627
628	if (sockfd < 0) {
629		fprintf(stderr, "Couldn't listen to port %d\n", port);
630		return 1;
631	}
632
633	listen(sockfd, ctx.num_clients);
634
635	for (i = 0; i < ctx.num_clients; i++) {
636		connfd = accept(sockfd, NULL, NULL);
637		if (connfd < 0) {
638			fprintf(stderr, "accept() failed for client %d\n", i);
639			return 1;
640		}
641
642		if (recv_remote_dest(connfd, i))
643			return 1;
644
645		if (send_local_dest(connfd, i))
646			return 1;
647
648		if (connect_qps(i))
649			return 1;
650	}
651
652	close(sockfd);
653	return 0;
654}
655
656
657static int pp_close_ctx(void)
658{
659	int i;
660
661	for (i = 0; i < ctx.num_clients; ++i) {
662
663		if (ibv_destroy_qp(ctx.send_qp[i])) {
664			fprintf(stderr, "Couldn't destroy INI QP[%d]\n", i);
665			return 1;
666		}
667
668		if (ibv_destroy_qp(ctx.recv_qp[i])) {
669			fprintf(stderr, "Couldn't destroy TGT QP[%d]\n", i);
670			return 1;
671		}
672
673		if (ctx.rem_dest[i].sockfd)
674			close(ctx.rem_dest[i].sockfd);
675	}
676
677	if (ibv_destroy_srq(ctx.srq)) {
678		fprintf(stderr, "Couldn't destroy SRQ\n");
679		return 1;
680	}
681
682	if (ctx.xrcd && ibv_close_xrcd(ctx.xrcd)) {
683		fprintf(stderr, "Couldn't close the XRC Domain\n");
684		return 1;
685	}
686	if (ctx.fd >= 0 && close(ctx.fd)) {
687		fprintf(stderr, "Couldn't close the file for the XRC Domain\n");
688		return 1;
689	}
690
691	if (ibv_destroy_cq(ctx.send_cq)) {
692		fprintf(stderr, "Couldn't destroy send CQ\n");
693		return 1;
694	}
695
696	if (ibv_destroy_cq(ctx.recv_cq)) {
697		fprintf(stderr, "Couldn't destroy recv CQ\n");
698		return 1;
699	}
700
701	if (ibv_dereg_mr(ctx.mr)) {
702		fprintf(stderr, "Couldn't deregister MR\n");
703		return 1;
704	}
705
706	if (ibv_dealloc_pd(ctx.pd)) {
707		fprintf(stderr, "Couldn't deallocate PD\n");
708		return 1;
709	}
710
711	if (ctx.channel) {
712		if (ibv_destroy_comp_channel(ctx.channel)) {
713			fprintf(stderr,
714				"Couldn't destroy completion channel\n");
715			return 1;
716		}
717	}
718
719	if (ibv_close_device(ctx.context)) {
720		fprintf(stderr, "Couldn't release context\n");
721		return 1;
722	}
723
724	free(ctx.buf);
725	free(ctx.rem_dest);
726	free(ctx.send_qp);
727	free(ctx.recv_qp);
728	return 0;
729}
730
731static int pp_post_recv(int cnt)
732{
733	struct ibv_sge sge;
734	struct ibv_recv_wr wr, *bad_wr;
735
736	sge.addr = (uintptr_t) ctx.buf;
737	sge.length = ctx.size;
738	sge.lkey = ctx.mr->lkey;
739
740	wr.next       = NULL;
741	wr.wr_id      = (uintptr_t) &ctx;
742	wr.sg_list    = &sge;
743	wr.num_sge    = 1;
744
745	while (cnt--) {
746		if (ibv_post_srq_recv(ctx.srq, &wr, &bad_wr)) {
747			fprintf(stderr, "Failed to post receive to SRQ\n");
748			return 1;
749		}
750	}
751	return 0;
752}
753
754/*
755 * Send to each client round robin on each set of xrc send/recv qp.
756 * Generate a completion on the last send.
757 */
758static int pp_post_send(int index)
759{
760	struct ibv_sge sge;
761	struct ibv_send_wr wr, *bad_wr;
762	int qpi;
763
764	sge.addr = (uintptr_t) ctx.buf;
765	sge.length = ctx.size;
766	sge.lkey = ctx.mr->lkey;
767
768	wr.wr_id   = (uintptr_t) index;
769	wr.next    = NULL;
770	wr.sg_list = &sge;
771	wr.num_sge = 1;
772	wr.opcode  = IBV_WR_SEND;
773	wr.qp_type.xrc.remote_srqn = ctx.rem_dest[index].srqn;
774
775	qpi = (index + ctx.rem_dest[index].pp_cnt) % ctx.num_clients;
776	wr.send_flags = (++ctx.rem_dest[index].pp_cnt >= ctx.num_tests) ?
777			IBV_SEND_SIGNALED : 0;
778
779	return ibv_post_send(ctx.send_qp[qpi], &wr, &bad_wr);
780}
781
782static int find_qp(int qpn)
783{
784	int i;
785
786	if (ctx.num_clients == 1)
787		return 0;
788
789	for (i = 0; i < ctx.num_clients; ++i)
790		if (ctx.recv_qp[i]->qp_num == qpn)
791			return i;
792
793	fprintf(stderr, "Unable to find qp %x\n", qpn);
794	return 0;
795}
796
797static int get_cq_event(void)
798{
799	struct ibv_cq *ev_cq;
800	void          *ev_ctx;
801
802	if (ibv_get_cq_event(ctx.channel, &ev_cq, &ev_ctx)) {
803		fprintf(stderr, "Failed to get cq_event\n");
804		return 1;
805	}
806
807	if (ev_cq != ctx.recv_cq) {
808		fprintf(stderr, "CQ event for unknown CQ %p\n", ev_cq);
809		return 1;
810	}
811
812	if (ibv_req_notify_cq(ctx.recv_cq, 0)) {
813		fprintf(stderr, "Couldn't request CQ notification\n");
814		return 1;
815	}
816
817	return 0;
818}
819
820static void init(void)
821{
822	srand48(getpid() * time(NULL));
823
824	ctx.size = 4096;
825	ctx.ib_port = 1;
826	ctx.num_clients  = 1;
827	ctx.num_tests = 5;
828	ctx.mtu = IBV_MTU_2048;
829	ctx.sl = 0;
830	ctx.gidx = -1;
831}
832
833static void usage(const char *argv0)
834{
835	printf("Usage:\n");
836	printf("  %s            start a server and wait for connection\n", argv0);
837	printf("  %s <host>     connect to server at <host>\n", argv0);
838	printf("\n");
839	printf("Options:\n");
840	printf("  -p, --port=<port>      listen on/connect to port <port> (default 18515)\n");
841	printf("  -d, --ib-dev=<dev>     use IB device <dev> (default first device found)\n");
842	printf("  -i, --ib-port=<port>   use port <port> of IB device (default 1)\n");
843	printf("  -s, --size=<size>      size of message to exchange (default 4096)\n");
844	printf("  -m, --mtu=<size>       path MTU (default 2048)\n");
845	printf("  -c, --clients=<n>      number of clients (on server only, default 1)\n");
846	printf("  -n, --num_tests=<n>    number of tests per client (default 5)\n");
847	printf("  -l, --sl=<sl>          service level value\n");
848	printf("  -e, --events           sleep on CQ events (default poll)\n");
849	printf("  -g, --gid-idx=<gid index> local port gid index\n");
850}
851
852int main(int argc, char *argv[])
853{
854	char          *ib_devname = NULL;
855	char          *servername = NULL;
856	int           port = 18515;
857	int           i, total, cnt = 0;
858	int           ne, qpi, num_cq_events = 0;
859	struct ibv_wc wc;
860
861	init();
862	while (1) {
863		int c;
864
865		static struct option long_options[] = {
866			{ .name = "port",      .has_arg = 1, .val = 'p' },
867			{ .name = "ib-dev",    .has_arg = 1, .val = 'd' },
868			{ .name = "ib-port",   .has_arg = 1, .val = 'i' },
869			{ .name = "size",      .has_arg = 1, .val = 's' },
870			{ .name = "mtu",       .has_arg = 1, .val = 'm' },
871			{ .name = "clients",   .has_arg = 1, .val = 'c' },
872			{ .name = "num_tests", .has_arg = 1, .val = 'n' },
873			{ .name = "sl",        .has_arg = 1, .val = 'l' },
874			{ .name = "events",    .has_arg = 0, .val = 'e' },
875			{ .name = "gid-idx",   .has_arg = 1, .val = 'g' },
876			{}
877		};
878
879		c = getopt_long(argc, argv, "p:d:i:s:m:c:n:l:eg:", long_options,
880				NULL);
881		if (c == -1)
882			break;
883
884		switch (c) {
885		case 'p':
886			port = strtol(optarg, NULL, 0);
887			if (port < 0 || port > 65535) {
888				usage(argv[0]);
889				return 1;
890			}
891			break;
892		case 'd':
893			ib_devname = strdupa(optarg);
894			break;
895		case 'i':
896			ctx.ib_port = strtol(optarg, NULL, 0);
897			if (ctx.ib_port < 0) {
898				usage(argv[0]);
899				return 1;
900			}
901			break;
902		case 's':
903			ctx.size = strtol(optarg, NULL, 0);
904			break;
905		case 'm':
906			ctx.mtu = pp_mtu_to_enum(strtol(optarg, NULL, 0));
907			if (ctx.mtu == 0) {
908				usage(argv[0]);
909				return 1;
910			}
911			break;
912		case 'c':
913			ctx.num_clients = strtol(optarg, NULL, 0);
914			break;
915		case 'n':
916			ctx.num_tests = strtol(optarg, NULL, 0);
917			break;
918		case 'l':
919			ctx.sl = strtol(optarg, NULL, 0);
920			break;
921		case 'g':
922			ctx.gidx = strtol(optarg, NULL, 0);
923			break;
924		case 'e':
925			ctx.use_event = 1;
926			break;
927		default:
928			usage(argv[0]);
929			return 1;
930		}
931	}
932
933	if (optind == argc - 1) {
934		servername = strdupa(argv[optind]);
935		ctx.num_clients = 1;
936	} else if (optind < argc) {
937		usage(argv[0]);
938		return 1;
939	}
940
941	page_size = sysconf(_SC_PAGESIZE);
942
943	if (pp_init_ctx(ib_devname))
944		return 1;
945
946	if (pp_post_recv(ctx.num_clients)) {
947		fprintf(stderr, "Couldn't post receives\n");
948		return 1;
949	}
950
951	if (servername) {
952		if (pp_client_connect(servername, port))
953			return 1;
954	} else {
955		if (pp_server_connect(port))
956			return 1;
957
958		for (i = 0; i < ctx.num_clients; i++)
959			pp_post_send(i);
960	}
961
962	total = ctx.num_clients * ctx.num_tests;
963	while (cnt < total) {
964		if (ctx.use_event) {
965			if (get_cq_event())
966				return 1;
967
968			++num_cq_events;
969		}
970
971		do {
972			ne = ibv_poll_cq(ctx.recv_cq, 1, &wc);
973			if (ne < 0) {
974				fprintf(stderr, "Error polling cq %d\n", ne);
975				return 1;
976			} else if (ne == 0) {
977				break;
978			}
979
980			if (wc.status) {
981				fprintf(stderr, "Work completion error %d\n", wc.status);
982				return 1;
983			}
984
985			pp_post_recv(ne);
986			qpi = find_qp(wc.qp_num);
987			if (ctx.rem_dest[qpi].pp_cnt < ctx.num_tests)
988				pp_post_send(qpi);
989			cnt += ne;
990		} while (ne > 0);
991	}
992
993	for (cnt = 0; cnt < ctx.num_clients; cnt += ne) {
994		ne = ibv_poll_cq(ctx.send_cq, 1, &wc);
995		if (ne < 0) {
996			fprintf(stderr, "Error polling cq %d\n", ne);
997			return 1;
998		}
999	}
1000
1001	if (ctx.use_event)
1002		ibv_ack_cq_events(ctx.recv_cq, num_cq_events);
1003
1004	/* Process should get an ack from the daemon to close its resources to
1005	  * make sure latest daemon's response sent via its target QP destined
1006	  * to an XSRQ created by another client won't be lost.
1007	  * Failure to do so may cause the other client to wait for that sent
1008	  * message forever. See comment on pp_post_send.
1009	*/
1010	if (servername) {
1011		if (pp_client_termination())
1012			return 1;
1013	} else if (pp_server_termination()) {
1014		return 1;
1015	}
1016
1017	if (pp_close_ctx())
1018		return 1;
1019
1020	printf("success\n");
1021	return 0;
1022}
1023