1/*
2 * Copyright (c) 2005 Topspin Communications.  All rights reserved.
3 * Copyright (c) 2011 Intel Corporation, Inc.  All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses.  You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * OpenIB.org BSD license below:
10 *
11 *     Redistribution and use in source and binary forms, with or
12 *     without modification, are permitted provided that the following
13 *     conditions are met:
14 *
15 *      - Redistributions of source code must retain the above
16 *        copyright notice, this list of conditions and the following
17 *        disclaimer.
18 *
19 *      - Redistributions in binary form must reproduce the above
20 *        copyright notice, this list of conditions and the following
21 *        disclaimer in the documentation and/or other materials
22 *        provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33#define _GNU_SOURCE
34#include <config.h>
35
36#include <stdio.h>
37#include <fcntl.h>
38#include <errno.h>
39#include <stdlib.h>
40#include <unistd.h>
41#include <string.h>
42#include <sys/types.h>
43#include <sys/socket.h>
44#include <sys/time.h>
45#include <netdb.h>
46#include <malloc.h>
47#include <getopt.h>
48#include <arpa/inet.h>
49#include <time.h>
50
51#include "pingpong.h"
52
53#define MSG_FORMAT "%04x:%06x:%06x:%06x:%06x:%32s"
54#define MSG_SIZE   66
55#define MSG_SSCAN  "%x:%x:%x:%x:%x:%s"
56#define ADDR_FORMAT \
57	"%8s: LID %04x, QPN RECV %06x SEND %06x, PSN %06x, SRQN %06x, GID %s\n"
58#define TERMINATION_FORMAT "%s"
59#define TERMINATION_MSG_SIZE 4
60#define TERMINATION_MSG "END"
61static int page_size;
62
63struct pingpong_dest {
64	union ibv_gid gid;
65	int lid;
66	int recv_qpn;
67	int send_qpn;
68	int recv_psn;
69	int send_psn;
70	int srqn;
71	int pp_cnt;
72	int sockfd;
73};
74
75struct pingpong_context {
76	struct ibv_context	*context;
77	struct ibv_comp_channel *channel;
78	struct ibv_pd		*pd;
79	struct ibv_mr		*mr;
80	struct ibv_cq		*send_cq;
81	struct ibv_cq		*recv_cq;
82	struct ibv_srq		*srq;
83	struct ibv_xrcd		*xrcd;
84	struct ibv_qp		**recv_qp;
85	struct ibv_qp		**send_qp;
86	struct pingpong_dest	*rem_dest;
87	void			*buf;
88	int			 lid;
89	int			 sl;
90	enum ibv_mtu		 mtu;
91	int			 ib_port;
92	int			 fd;
93	int			 size;
94	int			 num_clients;
95	int			 num_tests;
96	int			 use_event;
97	int			 gidx;
98};
99
100static struct pingpong_context ctx;
101
102
103static int open_device(char *ib_devname)
104{
105	struct ibv_device **dev_list;
106	int i = 0;
107
108	dev_list = ibv_get_device_list(NULL);
109	if (!dev_list) {
110		fprintf(stderr, "Failed to get IB devices list");
111		return -1;
112	}
113
114	if (ib_devname) {
115		for (; dev_list[i]; ++i) {
116			if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname))
117				break;
118		}
119	}
120	if (!dev_list[i]) {
121		fprintf(stderr, "IB device %s not found\n",
122			ib_devname ? ib_devname : "");
123		return -1;
124	}
125
126	ctx.context = ibv_open_device(dev_list[i]);
127	if (!ctx.context) {
128		fprintf(stderr, "Couldn't get context for %s\n",
129			ibv_get_device_name(dev_list[i]));
130		return -1;
131	}
132
133	ibv_free_device_list(dev_list);
134	return 0;
135}
136
137static int create_qps(void)
138{
139	struct ibv_qp_init_attr_ex init;
140	struct ibv_qp_attr mod;
141	int i;
142
143	for (i = 0; i < ctx.num_clients; ++i) {
144
145		memset(&init, 0, sizeof init);
146		init.qp_type = IBV_QPT_XRC_RECV;
147		init.comp_mask = IBV_QP_INIT_ATTR_XRCD;
148		init.xrcd = ctx.xrcd;
149
150		ctx.recv_qp[i] = ibv_create_qp_ex(ctx.context, &init);
151		if (!ctx.recv_qp[i])  {
152			fprintf(stderr, "Couldn't create recv QP[%d] errno %d\n",
153				i, errno);
154			return 1;
155		}
156
157		mod.qp_state        = IBV_QPS_INIT;
158		mod.pkey_index      = 0;
159		mod.port_num        = ctx.ib_port;
160		mod.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ;
161
162		if (ibv_modify_qp(ctx.recv_qp[i], &mod,
163				  IBV_QP_STATE | IBV_QP_PKEY_INDEX |
164				  IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)) {
165			fprintf(stderr, "Failed to modify recv QP[%d] to INIT\n", i);
166			return 1;
167		}
168
169		memset(&init, 0, sizeof init);
170		init.qp_type	      = IBV_QPT_XRC_SEND;
171		init.send_cq	      = ctx.send_cq;
172		init.cap.max_send_wr  = ctx.num_clients * ctx.num_tests;
173		init.cap.max_send_sge = 1;
174		init.comp_mask	      = IBV_QP_INIT_ATTR_PD;
175		init.pd		      = ctx.pd;
176
177		ctx.send_qp[i] = ibv_create_qp_ex(ctx.context, &init);
178		if (!ctx.send_qp[i])  {
179			fprintf(stderr, "Couldn't create send QP[%d] errno %d\n",
180				i, errno);
181			return 1;
182		}
183
184		mod.qp_state        = IBV_QPS_INIT;
185		mod.pkey_index      = 0;
186		mod.port_num        = ctx.ib_port;
187		mod.qp_access_flags = 0;
188
189		if (ibv_modify_qp(ctx.send_qp[i], &mod,
190				  IBV_QP_STATE | IBV_QP_PKEY_INDEX |
191				  IBV_QP_PORT | IBV_QP_ACCESS_FLAGS)) {
192			fprintf(stderr, "Failed to modify send QP[%d] to INIT\n", i);
193			return 1;
194		}
195	}
196
197	return 0;
198}
199
200static int pp_init_ctx(char *ib_devname)
201{
202	struct ibv_srq_init_attr_ex attr;
203	struct ibv_xrcd_init_attr xrcd_attr;
204	struct ibv_port_attr port_attr;
205
206	ctx.recv_qp = calloc(ctx.num_clients, sizeof *ctx.recv_qp);
207	ctx.send_qp = calloc(ctx.num_clients, sizeof *ctx.send_qp);
208	ctx.rem_dest = calloc(ctx.num_clients, sizeof *ctx.rem_dest);
209	if (!ctx.recv_qp || !ctx.send_qp || !ctx.rem_dest)
210		return 1;
211
212	if (open_device(ib_devname)) {
213		fprintf(stderr, "Failed to open device\n");
214		return 1;
215	}
216
217	if (pp_get_port_info(ctx.context, ctx.ib_port, &port_attr)) {
218		fprintf(stderr, "Failed to get port info\n");
219		return 1;
220	}
221
222	ctx.lid = port_attr.lid;
223	if (port_attr.link_layer != IBV_LINK_LAYER_ETHERNET && !ctx.lid) {
224		fprintf(stderr, "Couldn't get local LID\n");
225		return 1;
226	}
227
228	ctx.buf = memalign(page_size, ctx.size);
229	if (!ctx.buf) {
230		fprintf(stderr, "Couldn't allocate work buf.\n");
231		return 1;
232	}
233
234	memset(ctx.buf, 0, ctx.size);
235
236	if (ctx.use_event) {
237		ctx.channel = ibv_create_comp_channel(ctx.context);
238		if (!ctx.channel) {
239			fprintf(stderr, "Couldn't create completion channel\n");
240			return 1;
241		}
242	}
243
244	ctx.pd = ibv_alloc_pd(ctx.context);
245	if (!ctx.pd) {
246		fprintf(stderr, "Couldn't allocate PD\n");
247		return 1;
248	}
249
250	ctx.mr = ibv_reg_mr(ctx.pd, ctx.buf, ctx.size, IBV_ACCESS_LOCAL_WRITE);
251	if (!ctx.mr) {
252		fprintf(stderr, "Couldn't register MR\n");
253		return 1;
254	}
255
256	ctx.fd = open("/tmp/xrc_domain", O_RDONLY | O_CREAT, S_IRUSR | S_IRGRP);
257	if (ctx.fd < 0) {
258		fprintf(stderr,
259			"Couldn't create the file for the XRC Domain "
260			"but not stopping %d\n", errno);
261		ctx.fd = -1;
262	}
263
264	memset(&xrcd_attr, 0, sizeof xrcd_attr);
265	xrcd_attr.comp_mask = IBV_XRCD_INIT_ATTR_FD | IBV_XRCD_INIT_ATTR_OFLAGS;
266	xrcd_attr.fd = ctx.fd;
267	xrcd_attr.oflags = O_CREAT;
268	ctx.xrcd = ibv_open_xrcd(ctx.context, &xrcd_attr);
269	if (!ctx.xrcd) {
270		fprintf(stderr, "Couldn't Open the XRC Domain %d\n", errno);
271		return 1;
272	}
273
274	ctx.recv_cq = ibv_create_cq(ctx.context, ctx.num_clients, &ctx.recv_cq,
275				    ctx.channel, 0);
276	if (!ctx.recv_cq) {
277		fprintf(stderr, "Couldn't create recv CQ\n");
278		return 1;
279	}
280
281	if (ctx.use_event) {
282		if (ibv_req_notify_cq(ctx.recv_cq, 0)) {
283			fprintf(stderr, "Couldn't request CQ notification\n");
284			return 1;
285		}
286	}
287
288	ctx.send_cq = ibv_create_cq(ctx.context, ctx.num_clients, NULL, NULL, 0);
289	if (!ctx.send_cq) {
290		fprintf(stderr, "Couldn't create send CQ\n");
291		return 1;
292	}
293
294	memset(&attr, 0, sizeof attr);
295	attr.attr.max_wr = ctx.num_clients;
296	attr.attr.max_sge = 1;
297	attr.comp_mask = IBV_SRQ_INIT_ATTR_TYPE | IBV_SRQ_INIT_ATTR_XRCD |
298			 IBV_SRQ_INIT_ATTR_CQ | IBV_SRQ_INIT_ATTR_PD;
299	attr.srq_type = IBV_SRQT_XRC;
300	attr.xrcd = ctx.xrcd;
301	attr.cq = ctx.recv_cq;
302	attr.pd = ctx.pd;
303
304	ctx.srq = ibv_create_srq_ex(ctx.context, &attr);
305	if (!ctx.srq)  {
306		fprintf(stderr, "Couldn't create SRQ\n");
307		return 1;
308	}
309
310	if (create_qps())
311		return 1;
312
313	return 0;
314}
315
316static int recv_termination_ack(int index)
317{
318	char msg[TERMINATION_MSG_SIZE];
319	int n = 0, r;
320	int sockfd = ctx.rem_dest[index].sockfd;
321
322	while (n < TERMINATION_MSG_SIZE) {
323		r = read(sockfd, msg + n, TERMINATION_MSG_SIZE - n);
324		if (r < 0) {
325			perror("client read");
326			fprintf(stderr,
327				"%d/%d: Couldn't read remote termination ack\n",
328				n, TERMINATION_MSG_SIZE);
329			return 1;
330		}
331		n += r;
332	}
333
334	if (strcmp(msg, TERMINATION_MSG)) {
335		fprintf(stderr, "Invalid termination ack was accepted\n");
336		return 1;
337	}
338
339	return 0;
340}
341
342static int send_termination_ack(int index)
343{
344	char msg[TERMINATION_MSG_SIZE];
345	int sockfd = ctx.rem_dest[index].sockfd;
346
347	sprintf(msg, TERMINATION_FORMAT, TERMINATION_MSG);
348
349	if (write(sockfd, msg, TERMINATION_MSG_SIZE) != TERMINATION_MSG_SIZE) {
350		fprintf(stderr, "Couldn't send termination ack\n");
351		return 1;
352	}
353
354	return 0;
355}
356
357static int pp_client_termination(void)
358{
359	if (send_termination_ack(0))
360		return 1;
361	if (recv_termination_ack(0))
362		return 1;
363
364	return 0;
365}
366
367static int pp_server_termination(void)
368{
369	int i;
370
371	for (i = 0; i < ctx.num_clients; i++) {
372		if (recv_termination_ack(i))
373			return 1;
374	}
375
376	for (i = 0; i < ctx.num_clients; i++) {
377		if (send_termination_ack(i))
378			return 1;
379	}
380
381	return 0;
382}
383
384static int send_local_dest(int sockfd, int index)
385{
386	char msg[MSG_SIZE];
387	char gid[33];
388	uint32_t srq_num;
389	union ibv_gid local_gid;
390
391	if (ctx.gidx >= 0) {
392		if (ibv_query_gid(ctx.context, ctx.ib_port, ctx.gidx,
393				  &local_gid)) {
394			fprintf(stderr, "can't read sgid of index %d\n",
395				ctx.gidx);
396			return -1;
397		}
398	} else {
399		memset(&local_gid, 0, sizeof(local_gid));
400	}
401
402	ctx.rem_dest[index].recv_psn = lrand48() & 0xffffff;
403	if (ibv_get_srq_num(ctx.srq, &srq_num)) {
404		fprintf(stderr, "Couldn't get SRQ num\n");
405		return -1;
406	}
407
408	inet_ntop(AF_INET6, &local_gid, gid, sizeof(gid));
409	printf(ADDR_FORMAT, "local", ctx.lid, ctx.recv_qp[index]->qp_num,
410		ctx.send_qp[index]->qp_num, ctx.rem_dest[index].recv_psn,
411		srq_num, gid);
412
413	gid_to_wire_gid(&local_gid, gid);
414	sprintf(msg, MSG_FORMAT, ctx.lid, ctx.recv_qp[index]->qp_num,
415		ctx.send_qp[index]->qp_num, ctx.rem_dest[index].recv_psn,
416		srq_num, gid);
417
418	if (write(sockfd, msg, MSG_SIZE) != MSG_SIZE) {
419		fprintf(stderr, "Couldn't send local address\n");
420		return -1;
421	}
422
423	return 0;
424}
425
426static int recv_remote_dest(int sockfd, int index)
427{
428	struct pingpong_dest *rem_dest;
429	char msg[MSG_SIZE];
430	char gid[33];
431	int n = 0, r;
432
433	while (n < MSG_SIZE) {
434		r = read(sockfd, msg + n, MSG_SIZE - n);
435		if (r < 0) {
436			perror("client read");
437			fprintf(stderr,
438				"%d/%d: Couldn't read remote address [%d]\n",
439				n, MSG_SIZE, index);
440			return -1;
441		}
442		n += r;
443	}
444
445	rem_dest = &ctx.rem_dest[index];
446	sscanf(msg, MSG_SSCAN, &rem_dest->lid, &rem_dest->recv_qpn,
447		&rem_dest->send_qpn, &rem_dest->send_psn, &rem_dest->srqn, gid);
448
449	wire_gid_to_gid(gid, &rem_dest->gid);
450	inet_ntop(AF_INET6, &rem_dest->gid, gid, sizeof(gid));
451	printf(ADDR_FORMAT, "remote", rem_dest->lid, rem_dest->recv_qpn,
452		rem_dest->send_qpn, rem_dest->send_psn, rem_dest->srqn,
453		gid);
454
455	rem_dest->sockfd = sockfd;
456	return 0;
457}
458
459static void set_ah_attr(struct ibv_ah_attr *attr, struct pingpong_context *myctx,
460			int index)
461{
462	attr->is_global = 1;
463	attr->grh.hop_limit = 5;
464	attr->grh.dgid = myctx->rem_dest[index].gid;
465	attr->grh.sgid_index = myctx->gidx;
466}
467
468static int connect_qps(int index)
469{
470	struct ibv_qp_attr attr;
471
472	memset(&attr, 0, sizeof attr);
473	attr.qp_state	      = IBV_QPS_RTR;
474	attr.dest_qp_num      = ctx.rem_dest[index].send_qpn;
475	attr.path_mtu	      = ctx.mtu;
476	attr.rq_psn	      = ctx.rem_dest[index].send_psn;
477	attr.min_rnr_timer    = 12;
478	attr.ah_attr.dlid     = ctx.rem_dest[index].lid;
479	attr.ah_attr.sl	      = ctx.sl;
480	attr.ah_attr.port_num = ctx.ib_port;
481
482	if (ctx.rem_dest[index].gid.global.interface_id)
483		set_ah_attr(&attr.ah_attr, &ctx, index);
484
485	if (ibv_modify_qp(ctx.recv_qp[index], &attr,
486			  IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU |
487			  IBV_QP_DEST_QPN | IBV_QP_RQ_PSN |
488			  IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER)) {
489		fprintf(stderr, "Failed to modify recv QP[%d] to RTR\n", index);
490		return 1;
491	}
492
493	memset(&attr, 0, sizeof attr);
494	attr.qp_state = IBV_QPS_RTS;
495	attr.timeout = 14;
496	attr.sq_psn = ctx.rem_dest[index].recv_psn;
497
498	if (ibv_modify_qp(ctx.recv_qp[index], &attr,
499			  IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_SQ_PSN)) {
500		fprintf(stderr, "Failed to modify recv QP[%d] to RTS\n", index);
501		return 1;
502	}
503
504	memset(&attr, 0, sizeof attr);
505	attr.qp_state	      = IBV_QPS_RTR;
506	attr.dest_qp_num      = ctx.rem_dest[index].recv_qpn;
507	attr.path_mtu	      = ctx.mtu;
508	attr.rq_psn	      = ctx.rem_dest[index].send_psn;
509	attr.ah_attr.dlid     = ctx.rem_dest[index].lid;
510	attr.ah_attr.sl	      = ctx.sl;
511	attr.ah_attr.port_num = ctx.ib_port;
512
513	if (ctx.rem_dest[index].gid.global.interface_id)
514		set_ah_attr(&attr.ah_attr, &ctx, index);
515
516	if (ibv_modify_qp(ctx.send_qp[index], &attr,
517			  IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU |
518			  IBV_QP_DEST_QPN | IBV_QP_RQ_PSN)) {
519		fprintf(stderr, "Failed to modify send QP[%d] to RTR\n", index);
520		return 1;
521	}
522
523	memset(&attr, 0, sizeof attr);
524	attr.qp_state = IBV_QPS_RTS;
525	attr.timeout = 14;
526	attr.retry_cnt = 7;
527	attr.rnr_retry = 7;
528	attr.sq_psn = ctx.rem_dest[index].recv_psn;
529
530	if (ibv_modify_qp(ctx.send_qp[index], &attr,
531			  IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_SQ_PSN |
532			  IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_MAX_QP_RD_ATOMIC)) {
533		fprintf(stderr, "Failed to modify send QP[%d] to RTS\n", index);
534		return 1;
535	}
536
537	return 0;
538}
539
540static int pp_client_connect(const char *servername, int port)
541{
542	struct addrinfo *res, *t;
543	char *service;
544	int ret;
545	int sockfd = -1;
546	struct addrinfo hints = {
547		.ai_family   = AF_UNSPEC,
548		.ai_socktype = SOCK_STREAM
549	};
550
551	if (asprintf(&service, "%d", port) < 0)
552		return 1;
553
554	ret = getaddrinfo(servername, service, &hints, &res);
555	if (ret < 0) {
556		fprintf(stderr, "%s for %s:%d\n", gai_strerror(ret), servername, port);
557		free(service);
558		return 1;
559	}
560
561	for (t = res; t; t = t->ai_next) {
562		sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
563		if (sockfd >= 0) {
564			if (!connect(sockfd, t->ai_addr, t->ai_addrlen))
565				break;
566			close(sockfd);
567			sockfd = -1;
568		}
569	}
570
571	freeaddrinfo_null(res);
572	free(service);
573
574	if (sockfd < 0) {
575		fprintf(stderr, "Couldn't connect to %s:%d\n", servername, port);
576		return 1;
577	}
578
579	if (send_local_dest(sockfd, 0))
580		return 1;
581
582	if (recv_remote_dest(sockfd, 0))
583		return 1;
584
585	if (connect_qps(0))
586		return 1;
587
588	return 0;
589}
590
591static int pp_server_connect(int port)
592{
593	struct addrinfo *res, *t;
594	char *service;
595	int ret, i, n;
596	int sockfd = -1, connfd;
597	struct addrinfo hints = {
598		.ai_flags    = AI_PASSIVE,
599		.ai_family   = AF_INET,
600		.ai_socktype = SOCK_STREAM
601	};
602
603	if (asprintf(&service, "%d", port) < 0)
604		return 1;
605
606	ret = getaddrinfo(NULL, service, &hints, &res);
607	if (ret < 0) {
608		fprintf(stderr, "%s for port %d\n", gai_strerror(ret), port);
609		free(service);
610		return 1;
611	}
612
613	for (t = res; t; t = t->ai_next) {
614		sockfd = socket(t->ai_family, t->ai_socktype, t->ai_protocol);
615		if (sockfd >= 0) {
616			n = 1;
617			setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n);
618			if (!bind(sockfd, t->ai_addr, t->ai_addrlen))
619				break;
620			close(sockfd);
621			sockfd = -1;
622		}
623	}
624
625	freeaddrinfo_null(res);
626	free(service);
627
628	if (sockfd < 0) {
629		fprintf(stderr, "Couldn't listen to port %d\n", port);
630		return 1;
631	}
632
633	if (listen(sockfd, ctx.num_clients) < 0) {
634		perror("listen() failed");
635		close(sockfd);
636		return 1;
637	}
638
639	for (i = 0; i < ctx.num_clients; i++) {
640		connfd = accept(sockfd, NULL, NULL);
641		if (connfd < 0) {
642			fprintf(stderr, "accept() failed for client %d\n", i);
643			return 1;
644		}
645
646		if (recv_remote_dest(connfd, i))
647			return 1;
648
649		if (send_local_dest(connfd, i))
650			return 1;
651
652		if (connect_qps(i))
653			return 1;
654	}
655
656	close(sockfd);
657	return 0;
658}
659
660
661static int pp_close_ctx(void)
662{
663	int i;
664
665	for (i = 0; i < ctx.num_clients; ++i) {
666
667		if (ibv_destroy_qp(ctx.send_qp[i])) {
668			fprintf(stderr, "Couldn't destroy INI QP[%d]\n", i);
669			return 1;
670		}
671
672		if (ibv_destroy_qp(ctx.recv_qp[i])) {
673			fprintf(stderr, "Couldn't destroy TGT QP[%d]\n", i);
674			return 1;
675		}
676
677		if (ctx.rem_dest[i].sockfd)
678			close(ctx.rem_dest[i].sockfd);
679	}
680
681	if (ibv_destroy_srq(ctx.srq)) {
682		fprintf(stderr, "Couldn't destroy SRQ\n");
683		return 1;
684	}
685
686	if (ctx.xrcd && ibv_close_xrcd(ctx.xrcd)) {
687		fprintf(stderr, "Couldn't close the XRC Domain\n");
688		return 1;
689	}
690	if (ctx.fd >= 0 && close(ctx.fd)) {
691		fprintf(stderr, "Couldn't close the file for the XRC Domain\n");
692		return 1;
693	}
694
695	if (ibv_destroy_cq(ctx.send_cq)) {
696		fprintf(stderr, "Couldn't destroy send CQ\n");
697		return 1;
698	}
699
700	if (ibv_destroy_cq(ctx.recv_cq)) {
701		fprintf(stderr, "Couldn't destroy recv CQ\n");
702		return 1;
703	}
704
705	if (ibv_dereg_mr(ctx.mr)) {
706		fprintf(stderr, "Couldn't deregister MR\n");
707		return 1;
708	}
709
710	if (ibv_dealloc_pd(ctx.pd)) {
711		fprintf(stderr, "Couldn't deallocate PD\n");
712		return 1;
713	}
714
715	if (ctx.channel) {
716		if (ibv_destroy_comp_channel(ctx.channel)) {
717			fprintf(stderr,
718				"Couldn't destroy completion channel\n");
719			return 1;
720		}
721	}
722
723	if (ibv_close_device(ctx.context)) {
724		fprintf(stderr, "Couldn't release context\n");
725		return 1;
726	}
727
728	free(ctx.buf);
729	free(ctx.rem_dest);
730	free(ctx.send_qp);
731	free(ctx.recv_qp);
732	return 0;
733}
734
735static int pp_post_recv(int cnt)
736{
737	struct ibv_sge sge;
738	struct ibv_recv_wr wr, *bad_wr;
739
740	sge.addr = (uintptr_t) ctx.buf;
741	sge.length = ctx.size;
742	sge.lkey = ctx.mr->lkey;
743
744	wr.next       = NULL;
745	wr.wr_id      = (uintptr_t) &ctx;
746	wr.sg_list    = &sge;
747	wr.num_sge    = 1;
748
749	while (cnt--) {
750		if (ibv_post_srq_recv(ctx.srq, &wr, &bad_wr)) {
751			fprintf(stderr, "Failed to post receive to SRQ\n");
752			return 1;
753		}
754	}
755	return 0;
756}
757
758/*
759 * Send to each client round robin on each set of xrc send/recv qp.
760 * Generate a completion on the last send.
761 */
762static int pp_post_send(int index)
763{
764	struct ibv_sge sge;
765	struct ibv_send_wr wr, *bad_wr;
766	int qpi;
767
768	sge.addr = (uintptr_t) ctx.buf;
769	sge.length = ctx.size;
770	sge.lkey = ctx.mr->lkey;
771
772	wr.wr_id   = (uintptr_t) index;
773	wr.next    = NULL;
774	wr.sg_list = &sge;
775	wr.num_sge = 1;
776	wr.opcode  = IBV_WR_SEND;
777	wr.qp_type.xrc.remote_srqn = ctx.rem_dest[index].srqn;
778
779	qpi = (index + ctx.rem_dest[index].pp_cnt) % ctx.num_clients;
780	wr.send_flags = (++ctx.rem_dest[index].pp_cnt >= ctx.num_tests) ?
781			IBV_SEND_SIGNALED : 0;
782
783	return ibv_post_send(ctx.send_qp[qpi], &wr, &bad_wr);
784}
785
786static int find_qp(int qpn)
787{
788	int i;
789
790	if (ctx.num_clients == 1)
791		return 0;
792
793	for (i = 0; i < ctx.num_clients; ++i)
794		if (ctx.recv_qp[i]->qp_num == qpn)
795			return i;
796
797	fprintf(stderr, "Unable to find qp %x\n", qpn);
798	return 0;
799}
800
801static int get_cq_event(void)
802{
803	struct ibv_cq *ev_cq;
804	void          *ev_ctx;
805
806	if (ibv_get_cq_event(ctx.channel, &ev_cq, &ev_ctx)) {
807		fprintf(stderr, "Failed to get cq_event\n");
808		return 1;
809	}
810
811	if (ev_cq != ctx.recv_cq) {
812		fprintf(stderr, "CQ event for unknown CQ %p\n", ev_cq);
813		return 1;
814	}
815
816	if (ibv_req_notify_cq(ctx.recv_cq, 0)) {
817		fprintf(stderr, "Couldn't request CQ notification\n");
818		return 1;
819	}
820
821	return 0;
822}
823
824static void init(void)
825{
826	srand48(getpid() * time(NULL));
827
828	ctx.size = 4096;
829	ctx.ib_port = 1;
830	ctx.num_clients  = 1;
831	ctx.num_tests = 5;
832	ctx.mtu = IBV_MTU_2048;
833	ctx.sl = 0;
834	ctx.gidx = -1;
835}
836
837static void usage(const char *argv0)
838{
839	printf("Usage:\n");
840	printf("  %s            start a server and wait for connection\n", argv0);
841	printf("  %s <host>     connect to server at <host>\n", argv0);
842	printf("\n");
843	printf("Options:\n");
844	printf("  -p, --port=<port>      listen on/connect to port <port> (default 18515)\n");
845	printf("  -d, --ib-dev=<dev>     use IB device <dev> (default first device found)\n");
846	printf("  -i, --ib-port=<port>   use port <port> of IB device (default 1)\n");
847	printf("  -s, --size=<size>      size of message to exchange (default 4096)\n");
848	printf("  -m, --mtu=<size>       path MTU (default 2048)\n");
849	printf("  -c, --clients=<n>      number of clients (on server only, default 1)\n");
850	printf("  -n, --num_tests=<n>    number of tests per client (default 5)\n");
851	printf("  -l, --sl=<sl>          service level value\n");
852	printf("  -e, --events           sleep on CQ events (default poll)\n");
853	printf("  -g, --gid-idx=<gid index> local port gid index\n");
854}
855
856int main(int argc, char *argv[])
857{
858	char          *ib_devname = NULL;
859	char          *servername = NULL;
860	int           port = 18515;
861	int           i, total, cnt = 0;
862	int           ne, qpi, num_cq_events = 0;
863	struct ibv_wc wc;
864
865	init();
866	while (1) {
867		int c;
868
869		static struct option long_options[] = {
870			{ .name = "port",      .has_arg = 1, .val = 'p' },
871			{ .name = "ib-dev",    .has_arg = 1, .val = 'd' },
872			{ .name = "ib-port",   .has_arg = 1, .val = 'i' },
873			{ .name = "size",      .has_arg = 1, .val = 's' },
874			{ .name = "mtu",       .has_arg = 1, .val = 'm' },
875			{ .name = "clients",   .has_arg = 1, .val = 'c' },
876			{ .name = "num_tests", .has_arg = 1, .val = 'n' },
877			{ .name = "sl",        .has_arg = 1, .val = 'l' },
878			{ .name = "events",    .has_arg = 0, .val = 'e' },
879			{ .name = "gid-idx",   .has_arg = 1, .val = 'g' },
880			{}
881		};
882
883		c = getopt_long(argc, argv, "p:d:i:s:m:c:n:l:eg:", long_options,
884				NULL);
885		if (c == -1)
886			break;
887
888		switch (c) {
889		case 'p':
890			port = strtol(optarg, NULL, 0);
891			if (port < 0 || port > 65535) {
892				usage(argv[0]);
893				return 1;
894			}
895			break;
896		case 'd':
897			ib_devname = strdupa(optarg);
898			break;
899		case 'i':
900			ctx.ib_port = strtol(optarg, NULL, 0);
901			if (ctx.ib_port < 0) {
902				usage(argv[0]);
903				return 1;
904			}
905			break;
906		case 's':
907			ctx.size = strtol(optarg, NULL, 0);
908			break;
909		case 'm':
910			ctx.mtu = pp_mtu_to_enum(strtol(optarg, NULL, 0));
911			if (ctx.mtu == 0) {
912				usage(argv[0]);
913				return 1;
914			}
915			break;
916		case 'c':
917			ctx.num_clients = strtol(optarg, NULL, 0);
918			break;
919		case 'n':
920			ctx.num_tests = strtol(optarg, NULL, 0);
921			break;
922		case 'l':
923			ctx.sl = strtol(optarg, NULL, 0);
924			break;
925		case 'g':
926			ctx.gidx = strtol(optarg, NULL, 0);
927			break;
928		case 'e':
929			ctx.use_event = 1;
930			break;
931		default:
932			usage(argv[0]);
933			return 1;
934		}
935	}
936
937	if (optind == argc - 1) {
938		servername = strdupa(argv[optind]);
939		ctx.num_clients = 1;
940	} else if (optind < argc) {
941		usage(argv[0]);
942		return 1;
943	}
944
945	page_size = sysconf(_SC_PAGESIZE);
946
947	if (pp_init_ctx(ib_devname))
948		return 1;
949
950	if (pp_post_recv(ctx.num_clients)) {
951		fprintf(stderr, "Couldn't post receives\n");
952		return 1;
953	}
954
955	if (servername) {
956		if (pp_client_connect(servername, port))
957			return 1;
958	} else {
959		if (pp_server_connect(port))
960			return 1;
961
962		for (i = 0; i < ctx.num_clients; i++)
963			pp_post_send(i);
964	}
965
966	total = ctx.num_clients * ctx.num_tests;
967	while (cnt < total) {
968		if (ctx.use_event) {
969			if (get_cq_event())
970				return 1;
971
972			++num_cq_events;
973		}
974
975		do {
976			ne = ibv_poll_cq(ctx.recv_cq, 1, &wc);
977			if (ne < 0) {
978				fprintf(stderr, "Error polling cq %d\n", ne);
979				return 1;
980			} else if (ne == 0) {
981				break;
982			}
983
984			if (wc.status) {
985				fprintf(stderr, "Work completion error %d\n", wc.status);
986				return 1;
987			}
988
989			pp_post_recv(ne);
990			qpi = find_qp(wc.qp_num);
991			if (ctx.rem_dest[qpi].pp_cnt < ctx.num_tests)
992				pp_post_send(qpi);
993			cnt += ne;
994		} while (ne > 0);
995	}
996
997	for (cnt = 0; cnt < ctx.num_clients; cnt += ne) {
998		ne = ibv_poll_cq(ctx.send_cq, 1, &wc);
999		if (ne < 0) {
1000			fprintf(stderr, "Error polling cq %d\n", ne);
1001			return 1;
1002		}
1003	}
1004
1005	if (ctx.use_event)
1006		ibv_ack_cq_events(ctx.recv_cq, num_cq_events);
1007
1008	/* Process should get an ack from the daemon to close its resources to
1009	  * make sure latest daemon's response sent via its target QP destined
1010	  * to an XSRQ created by another client won't be lost.
1011	  * Failure to do so may cause the other client to wait for that sent
1012	  * message forever. See comment on pp_post_send.
1013	*/
1014	if (servername) {
1015		if (pp_client_termination())
1016			return 1;
1017	} else if (pp_server_termination()) {
1018		return 1;
1019	}
1020
1021	if (pp_close_ctx())
1022		return 1;
1023
1024	printf("success\n");
1025	return 0;
1026}
1027