rcopy.c revision 331769
1/*
2 * Copyright (c) 2011 Intel Corporation.  All rights reserved.
3 *
4 * This software is available to you under the OpenIB.org BSD license
5 * below:
6 *
7 *     Redistribution and use in source and binary forms, with or
8 *     without modification, are permitted provided that the following
9 *     conditions are met:
10 *
11 *      - Redistributions of source code must retain the above
12 *        copyright notice, this list of conditions and the following
13 *        disclaimer.
14 *
15 *      - Redistributions in binary form must reproduce the above
16 *        copyright notice, this list of conditions and the following
17 *        disclaimer in the documentation and/or other materials
18 *        provided with the distribution.
19 *
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV
23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
24 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
26 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27 * SOFTWARE.
28 */
29
30#include <stdio.h>
31#include <stdlib.h>
32#include <string.h>
33#include <strings.h>
34#include <errno.h>
35#include <getopt.h>
36#include <arpa/inet.h>
37#include <sys/mman.h>
38#include <sys/types.h>
39#include <sys/socket.h>
40#include <sys/time.h>
41#include <sys/stat.h>
42#include <fcntl.h>
43#include <netdb.h>
44#include <unistd.h>
45
46#include <rdma/rsocket.h>
47
48union rsocket_address {
49	struct sockaddr		sa;
50	struct sockaddr_in	sin;
51	struct sockaddr_in6	sin6;
52	struct sockaddr_storage storage;
53};
54
55static const char *port = "7427";
56static char *dst_addr;
57static char *dst_file;
58static char *src_file;
59static struct timeval start, end;
60//static void buf[1024 * 1024];
61static uint64_t bytes;
62static int fd;
63static void *file_addr;
64
65enum {
66	CMD_NOOP,
67	CMD_OPEN,
68	CMD_CLOSE,
69	CMD_WRITE,
70	CMD_RESP = 0x80,
71};
72
73/* TODO: handle byte swapping */
74struct msg_hdr {
75	uint8_t  version;
76	uint8_t  command;
77	uint16_t len;
78	uint32_t data;
79	uint64_t id;
80};
81
82struct msg_open {
83	struct msg_hdr hdr;
84	char path[0];
85};
86
87struct msg_write {
88	struct msg_hdr hdr;
89	uint64_t size;
90};
91
92static void show_perf(void)
93{
94	float usec;
95
96	usec = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec);
97
98	printf("%lld bytes in %.2f seconds = %.2f Gb/sec\n",
99	       (long long) bytes, usec / 1000000., (bytes * 8) / (1000. * usec));
100}
101
102static char *_ntop(union rsocket_address *rsa)
103{
104	static char addr[32];
105
106	switch (rsa->sa.sa_family) {
107	case AF_INET:
108		inet_ntop(AF_INET, &rsa->sin.sin_addr, addr, sizeof addr);
109		break;
110	case AF_INET6:
111		inet_ntop(AF_INET6, &rsa->sin6.sin6_addr, addr, sizeof addr);
112		break;
113	default:
114		addr[0] = '\0';
115		break;
116	}
117
118	return addr;
119}
120
121static size_t _recv(int rs, char *msg, size_t len)
122{
123	size_t ret, offset;
124
125	for (offset = 0; offset < len; offset += ret) {
126		ret = rrecv(rs, msg + offset, len - offset, 0);
127		if (ret <= 0)
128			return ret;
129	}
130
131	return len;
132}
133
134static int msg_recv_hdr(int rs, struct msg_hdr *hdr)
135{
136	int ret;
137
138	ret = _recv(rs, (char *) hdr, sizeof *hdr);
139	if (ret != sizeof *hdr)
140		return -1;
141
142	if (hdr->version || hdr->len < sizeof *hdr) {
143		printf("invalid version %d or length %d\n",
144		       hdr->version, hdr->len);
145		return -1;
146	}
147
148	return sizeof *hdr;
149}
150
151static int msg_get_resp(int rs, struct msg_hdr *msg, uint8_t cmd)
152{
153	int ret;
154
155	ret = msg_recv_hdr(rs, msg);
156	if (ret != sizeof *msg)
157		return ret;
158
159	if ((msg->len != sizeof *msg) || (msg->command != (cmd | CMD_RESP))) {
160		printf("invalid length %d or bad command response %x:%x\n",
161		       msg->len, msg->command, cmd | CMD_RESP);
162		return -1;
163	}
164
165	return msg->data;
166}
167
168static void msg_send_resp(int rs, struct msg_hdr *msg, uint32_t status)
169{
170	struct msg_hdr resp;
171
172	resp.version = 0;
173	resp.command = msg->command | CMD_RESP;
174	resp.len = sizeof resp;
175	resp.data = status;
176	resp.id = msg->id;
177	rsend(rs, (char *) &resp, sizeof resp, 0);
178}
179
180static int server_listen(void)
181{
182	struct addrinfo hints, *res;
183	int ret, rs;
184
185	memset(&hints, 0, sizeof hints);
186	hints.ai_flags = RAI_PASSIVE;
187 	ret = getaddrinfo(NULL, port, &hints, &res);
188	if (ret) {
189		printf("getaddrinfo failed: %s\n", gai_strerror(ret));
190		return ret;
191	}
192
193	rs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol);
194	if (rs < 0) {
195		perror("rsocket failed\n");
196		ret = rs;
197		goto free;
198	}
199
200	ret = 1;
201	ret = rsetsockopt(rs, SOL_SOCKET, SO_REUSEADDR, &ret, sizeof ret);
202	if (ret) {
203		perror("rsetsockopt failed");
204		goto close;
205	}
206
207	ret = rbind(rs, res->ai_addr, res->ai_addrlen);
208	if (ret) {
209		perror("rbind failed");
210		goto close;
211	}
212
213	ret = rlisten(rs, 1);
214	if (ret) {
215		perror("rlisten failed");
216		goto close;
217	}
218
219	ret = rs;
220	goto free;
221
222close:
223	rclose(rs);
224free:
225	freeaddrinfo(res);
226	return ret;
227}
228
229static int server_open(int rs, struct msg_hdr *msg)
230{
231	char *path = NULL;
232	int ret, len;
233
234	printf("opening: ");
235	fflush(NULL);
236	if (file_addr || fd > 0) {
237		printf("cannot open another file\n");
238		ret = EBUSY;
239		goto out;
240	}
241
242	len = msg->len - sizeof *msg;
243	path = malloc(len);
244	if (!path) {
245		printf("cannot allocate path name\n");
246		ret = ENOMEM;
247		goto out;
248	}
249
250	ret = _recv(rs, path, len);
251	if (ret != len) {
252		printf("error receiving path\n");
253		goto out;
254	}
255
256	printf("%s, ", path);
257	fflush(NULL);
258	fd = open(path, O_RDWR | O_CREAT | O_TRUNC, msg->data);
259	if (fd < 0) {
260		printf("unable to open destination file\n");
261		ret = errno;
262	}
263
264	ret = 0;
265out:
266	if (path)
267		free(path);
268
269	msg_send_resp(rs, msg, ret);
270	return ret;
271}
272
273static void server_close(int rs, struct msg_hdr *msg)
274{
275	printf("closing...");
276	fflush(NULL);
277	msg_send_resp(rs, msg, 0);
278
279	if (file_addr) {
280		munmap(file_addr, bytes);
281		file_addr = NULL;
282	}
283
284	if (fd > 0) {
285		close(fd);
286		fd = 0;
287	}
288	printf("done\n");
289}
290
291static int server_write(int rs, struct msg_hdr *msg)
292{
293	size_t len;
294	int ret;
295
296	printf("transferring");
297	fflush(NULL);
298	if (fd <= 0) {
299		printf("...file not opened\n");
300		ret = EINVAL;
301		goto out;
302	}
303
304	if (msg->len != sizeof(struct msg_write)) {
305		printf("...invalid message length %d\n", msg->len);
306		ret = EINVAL;
307		goto out;
308	}
309
310	ret = _recv(rs, (char *) &bytes, sizeof bytes);
311	if (ret != sizeof bytes)
312		goto out;
313
314	ret = ftruncate(fd, bytes);
315	if (ret)
316		goto out;
317
318	file_addr = mmap(NULL, bytes, PROT_WRITE, MAP_SHARED, fd, 0);
319	if (file_addr == (void *) -1) {
320		printf("...error mapping file\n");
321		ret = errno;
322		goto out;
323	}
324
325	printf("...%lld bytes...", (long long) bytes);
326	fflush(NULL);
327	len = _recv(rs, file_addr, bytes);
328	if (len != bytes) {
329		printf("...error receiving data\n");
330		ret = (int) len;
331	}
332out:
333	msg_send_resp(rs, msg, ret);
334	return ret;
335}
336
337static void server_process(int rs)
338{
339	struct msg_hdr msg;
340	int ret;
341
342	do {
343		ret = msg_recv_hdr(rs, &msg);
344		if (ret != sizeof msg)
345			break;
346
347		switch (msg.command) {
348		case CMD_OPEN:
349			ret = server_open(rs, &msg);
350			break;
351		case CMD_CLOSE:
352			server_close(rs, &msg);
353			ret = 0;
354			break;
355		case CMD_WRITE:
356			ret = server_write(rs, &msg);
357			break;
358		default:
359			msg_send_resp(rs, &msg, EINVAL);
360			ret = -1;
361			break;
362		}
363
364	} while (!ret);
365}
366
367static int server_run(void)
368{
369	int lrs, rs;
370	union rsocket_address rsa;
371	socklen_t len;
372
373	lrs = server_listen();
374	if (lrs < 0)
375		return lrs;
376
377	while (1) {
378		len = sizeof rsa;
379		printf("waiting for connection...");
380		fflush(NULL);
381		rs = raccept(lrs, &rsa.sa, &len);
382
383		printf("client: %s\n", _ntop(&rsa));
384		server_process(rs);
385
386		rshutdown(rs, SHUT_RDWR);
387		rclose(rs);
388	}
389	return 0;
390}
391
392static int client_connect(void)
393{
394	struct addrinfo *res;
395	int ret, rs;
396
397 	ret = getaddrinfo(dst_addr, port, NULL, &res);
398	if (ret) {
399		printf("getaddrinfo failed: %s\n", gai_strerror(ret));
400		return ret;
401	}
402
403	rs = rsocket(res->ai_family, res->ai_socktype, res->ai_protocol);
404	if (rs < 0) {
405		perror("rsocket failed\n");
406		goto free;
407	}
408
409	ret = rconnect(rs, res->ai_addr, res->ai_addrlen);
410	if (ret) {
411		perror("rconnect failed\n");
412		rclose(rs);
413		rs = ret;
414	}
415
416free:
417	freeaddrinfo(res);
418	return rs;
419}
420
421static int client_open(int rs)
422{
423	struct msg_open *msg;
424	struct stat stats;
425	uint32_t len;
426	int ret;
427
428	printf("opening...");
429	fflush(NULL);
430	fd = open(src_file, O_RDONLY);
431	if (fd < 0)
432		return fd;
433
434	ret = fstat(fd, &stats);
435	if (ret < 0)
436		goto err1;
437
438	bytes = (uint64_t) stats.st_size;
439	file_addr = mmap(NULL, bytes, PROT_READ, MAP_SHARED, fd, 0);
440	if (file_addr == (void *) -1) {
441		ret = errno;
442		goto err1;
443	}
444
445	len = (((uint32_t) strlen(dst_file)) + 8) & 0xFFFFFFF8;
446	msg = calloc(1, sizeof(*msg) + len);
447	if (!msg) {
448		ret = -1;
449		goto err2;
450	}
451
452	msg->hdr.command = CMD_OPEN;
453	msg->hdr.len = sizeof(*msg) + len;
454	msg->hdr.data = (uint32_t) stats.st_mode;
455	strcpy(msg->path, dst_file);
456	ret = rsend(rs, msg, msg->hdr.len, 0);
457	if (ret != msg->hdr.len)
458		goto err3;
459
460	ret = msg_get_resp(rs, &msg->hdr, CMD_OPEN);
461	if (ret)
462		goto err3;
463
464	return 0;
465
466err3:
467	free(msg);
468err2:
469	munmap(file_addr, bytes);
470err1:
471	close(fd);
472	return ret;
473}
474
475static int client_start_write(int rs)
476{
477	struct msg_write msg;
478	int ret;
479
480	printf("transferring");
481	fflush(NULL);
482	memset(&msg, 0, sizeof msg);
483	msg.hdr.command = CMD_WRITE;
484	msg.hdr.len = sizeof(msg);
485	msg.size = bytes;
486
487	ret = rsend(rs, &msg, sizeof msg, 0);
488	if (ret != msg.hdr.len)
489		return ret;
490
491	return 0;
492}
493
494static int client_close(int rs)
495{
496	struct msg_hdr msg;
497	int ret;
498
499	printf("closing...");
500	fflush(NULL);
501	memset(&msg, 0, sizeof msg);
502	msg.command = CMD_CLOSE;
503	msg.len = sizeof msg;
504	ret = rsend(rs, (char *) &msg, msg.len, 0);
505	if (ret != msg.len)
506		goto out;
507
508	ret = msg_get_resp(rs, &msg, CMD_CLOSE);
509	if (ret)
510		goto out;
511
512	printf("done\n");
513out:
514	munmap(file_addr, bytes);
515	close(fd);
516	return ret;
517}
518
519static int client_run(void)
520{
521	struct msg_hdr ack;
522	int ret, rs;
523	size_t len;
524
525	rs = client_connect();
526	if (rs < 0)
527		return rs;
528
529	ret = client_open(rs);
530	if (ret)
531		goto shutdown;
532
533	ret = client_start_write(rs);
534	if (ret)
535		goto close;
536
537	printf("...");
538	fflush(NULL);
539	gettimeofday(&start, NULL);
540	len = rsend(rs, file_addr, bytes, 0);
541	if (len == bytes)
542		ret = msg_get_resp(rs, &ack, CMD_WRITE);
543	else
544		ret = (int) len;
545
546	gettimeofday(&end, NULL);
547
548close:
549	client_close(rs);
550shutdown:
551	rshutdown(rs, SHUT_RDWR);
552	rclose(rs);
553	if (!ret)
554		show_perf();
555	return ret;
556}
557
558static void show_usage(char *program)
559{
560	printf("usage 1: %s [options]\n", program);
561	printf("\t     starts the server application\n");
562	printf("\t[-p  port_number]\n");
563	printf("usage 2: %s source server[:destination] [options]\n", program);
564	printf("\t     source - file name and path\n");
565	printf("\t     server - name or address\n");
566	printf("\t     destination - file name and path\n");
567	printf("\t[-p  port_number]\n");
568	exit(1);
569}
570
571static void server_opts(int argc, char **argv)
572{
573	int op;
574
575	while ((op = getopt(argc, argv, "p:")) != -1) {
576		switch (op) {
577		case 'p':
578			port = optarg;
579			break;
580		default:
581			show_usage(argv[0]);
582		}
583	}
584}
585
586static void client_opts(int argc, char **argv)
587{
588	int op;
589
590	if (argc < 3)
591		show_usage(argv[0]);
592
593	src_file = argv[1];
594	dst_addr = argv[2];
595	dst_file = strchr(dst_addr, ':');
596	if (dst_file) {
597		*dst_file = '\0';
598		dst_file++;
599	}
600	if (!dst_file)
601		dst_file = src_file;
602
603	while ((op = getopt(argc, argv, "p:")) != -1) {
604		switch (op) {
605		case 'p':
606			port = optarg;
607			break;
608		default:
609			show_usage(argv[0]);
610		}
611	}
612
613}
614
615int main(int argc, char **argv)
616{
617	int ret;
618
619	if (argc == 1 || argv[1][0] == '-') {
620		server_opts(argc, argv);
621		ret = server_run();
622	} else {
623		client_opts(argc, argv);
624		ret = client_run();
625	}
626
627	return ret;
628}
629