1/*
2 * linux/net/sunrpc/xprtsock.c
3 *
4 * Client-side transport implementation for sockets.
5 *
6 * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com>
7 * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com>
8 * TCP NFS related read + write fixes
9 *  (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie>
10 *
11 * Rewrite of larges part of the code in order to stabilize TCP stuff.
12 * Fix behaviour when socket buffer is full.
13 *  (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no>
14 *
15 * IP socket transport implementation, (C) 2005 Chuck Lever <cel@netapp.com>
16 */
17
18#include <linux/types.h>
19#include <linux/slab.h>
20#include <linux/capability.h>
21#include <linux/pagemap.h>
22#include <linux/errno.h>
23#include <linux/socket.h>
24#include <linux/in.h>
25#include <linux/net.h>
26#include <linux/mm.h>
27#include <linux/udp.h>
28#include <linux/tcp.h>
29#include <linux/sunrpc/clnt.h>
30#include <linux/sunrpc/sched.h>
31#include <linux/file.h>
32
33#include <net/sock.h>
34#include <net/checksum.h>
35#include <net/udp.h>
36#include <net/tcp.h>
37
38/*
39 * xprtsock tunables
40 */
41unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
42unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
43
44unsigned int xprt_min_resvport = RPC_DEF_MIN_RESVPORT;
45unsigned int xprt_max_resvport = RPC_DEF_MAX_RESVPORT;
46
47/*
48 * We can register our own files under /proc/sys/sunrpc by
49 * calling register_sysctl_table() again.  The files in that
50 * directory become the union of all files registered there.
51 *
52 * We simply need to make sure that we don't collide with
53 * someone else's file names!
54 */
55
56#ifdef RPC_DEBUG
57
58static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
59static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
60static unsigned int xprt_min_resvport_limit = RPC_MIN_RESVPORT;
61static unsigned int xprt_max_resvport_limit = RPC_MAX_RESVPORT;
62
63static struct ctl_table_header *sunrpc_table_header;
64
65static ctl_table xs_tunables_table[] = {
66	{
67		.ctl_name	= CTL_SLOTTABLE_UDP,
68		.procname	= "udp_slot_table_entries",
69		.data		= &xprt_udp_slot_table_entries,
70		.maxlen		= sizeof(unsigned int),
71		.mode		= 0644,
72		.proc_handler	= &proc_dointvec_minmax,
73		.strategy	= &sysctl_intvec,
74		.extra1		= &min_slot_table_size,
75		.extra2		= &max_slot_table_size
76	},
77	{
78		.ctl_name	= CTL_SLOTTABLE_TCP,
79		.procname	= "tcp_slot_table_entries",
80		.data		= &xprt_tcp_slot_table_entries,
81		.maxlen		= sizeof(unsigned int),
82		.mode		= 0644,
83		.proc_handler	= &proc_dointvec_minmax,
84		.strategy	= &sysctl_intvec,
85		.extra1		= &min_slot_table_size,
86		.extra2		= &max_slot_table_size
87	},
88	{
89		.ctl_name	= CTL_MIN_RESVPORT,
90		.procname	= "min_resvport",
91		.data		= &xprt_min_resvport,
92		.maxlen		= sizeof(unsigned int),
93		.mode		= 0644,
94		.proc_handler	= &proc_dointvec_minmax,
95		.strategy	= &sysctl_intvec,
96		.extra1		= &xprt_min_resvport_limit,
97		.extra2		= &xprt_max_resvport_limit
98	},
99	{
100		.ctl_name	= CTL_MAX_RESVPORT,
101		.procname	= "max_resvport",
102		.data		= &xprt_max_resvport,
103		.maxlen		= sizeof(unsigned int),
104		.mode		= 0644,
105		.proc_handler	= &proc_dointvec_minmax,
106		.strategy	= &sysctl_intvec,
107		.extra1		= &xprt_min_resvport_limit,
108		.extra2		= &xprt_max_resvport_limit
109	},
110	{
111		.ctl_name = 0,
112	},
113};
114
115static ctl_table sunrpc_table[] = {
116	{
117		.ctl_name	= CTL_SUNRPC,
118		.procname	= "sunrpc",
119		.mode		= 0555,
120		.child		= xs_tunables_table
121	},
122	{
123		.ctl_name = 0,
124	},
125};
126
127#endif
128
129/*
130 * How many times to try sending a request on a socket before waiting
131 * for the socket buffer to clear.
132 */
133#define XS_SENDMSG_RETRY	(10U)
134
135/*
136 * Time out for an RPC UDP socket connect.  UDP socket connects are
137 * synchronous, but we set a timeout anyway in case of resource
138 * exhaustion on the local host.
139 */
140#define XS_UDP_CONN_TO		(5U * HZ)
141
142/*
143 * Wait duration for an RPC TCP connection to be established.  Solaris
144 * NFS over TCP uses 60 seconds, for example, which is in line with how
145 * long a server takes to reboot.
146 */
147#define XS_TCP_CONN_TO		(60U * HZ)
148
149/*
150 * Wait duration for a reply from the RPC portmapper.
151 */
152#define XS_BIND_TO		(60U * HZ)
153
154/*
155 * Delay if a UDP socket connect error occurs.  This is most likely some
156 * kind of resource problem on the local host.
157 */
158#define XS_UDP_REEST_TO		(2U * HZ)
159
160/*
161 * The reestablish timeout allows clients to delay for a bit before attempting
162 * to reconnect to a server that just dropped our connection.
163 *
164 * We implement an exponential backoff when trying to reestablish a TCP
165 * transport connection with the server.  Some servers like to drop a TCP
166 * connection when they are overworked, so we start with a short timeout and
167 * increase over time if the server is down or not responding.
168 */
169#define XS_TCP_INIT_REEST_TO	(3U * HZ)
170#define XS_TCP_MAX_REEST_TO	(5U * 60 * HZ)
171
172/*
173 * TCP idle timeout; client drops the transport socket if it is idle
174 * for this long.  Note that we also timeout UDP sockets to prevent
175 * holding port numbers when there is no RPC traffic.
176 */
177#define XS_IDLE_DISC_TO		(5U * 60 * HZ)
178
179#ifdef RPC_DEBUG
180# undef  RPC_DEBUG_DATA
181# define RPCDBG_FACILITY	RPCDBG_TRANS
182#endif
183
184#ifdef RPC_DEBUG_DATA
185static void xs_pktdump(char *msg, u32 *packet, unsigned int count)
186{
187	u8 *buf = (u8 *) packet;
188	int j;
189
190	dprintk("RPC:       %s\n", msg);
191	for (j = 0; j < count && j < 128; j += 4) {
192		if (!(j & 31)) {
193			if (j)
194				dprintk("\n");
195			dprintk("0x%04x ", j);
196		}
197		dprintk("%02x%02x%02x%02x ",
198			buf[j], buf[j+1], buf[j+2], buf[j+3]);
199	}
200	dprintk("\n");
201}
202#else
203static inline void xs_pktdump(char *msg, u32 *packet, unsigned int count)
204{
205	/* NOP */
206}
207#endif
208
209struct sock_xprt {
210	struct rpc_xprt		xprt;
211
212	/*
213	 * Network layer
214	 */
215	struct socket *		sock;
216	struct sock *		inet;
217
218	/*
219	 * State of TCP reply receive
220	 */
221	__be32			tcp_fraghdr,
222				tcp_xid;
223
224	u32			tcp_offset,
225				tcp_reclen;
226
227	unsigned long		tcp_copied,
228				tcp_flags;
229
230	/*
231	 * Connection of transports
232	 */
233	struct delayed_work	connect_worker;
234	unsigned short		port;
235
236	/*
237	 * UDP socket buffer size parameters
238	 */
239	size_t			rcvsize,
240				sndsize;
241
242	/*
243	 * Saved socket callback addresses
244	 */
245	void			(*old_data_ready)(struct sock *, int);
246	void			(*old_state_change)(struct sock *);
247	void			(*old_write_space)(struct sock *);
248};
249
250/*
251 * TCP receive state flags
252 */
253#define TCP_RCV_LAST_FRAG	(1UL << 0)
254#define TCP_RCV_COPY_FRAGHDR	(1UL << 1)
255#define TCP_RCV_COPY_XID	(1UL << 2)
256#define TCP_RCV_COPY_DATA	(1UL << 3)
257
258static void xs_format_peer_addresses(struct rpc_xprt *xprt)
259{
260	struct sockaddr_in *addr = (struct sockaddr_in *) &xprt->addr;
261	char *buf;
262
263	buf = kzalloc(20, GFP_KERNEL);
264	if (buf) {
265		snprintf(buf, 20, "%u.%u.%u.%u",
266				NIPQUAD(addr->sin_addr.s_addr));
267	}
268	xprt->address_strings[RPC_DISPLAY_ADDR] = buf;
269
270	buf = kzalloc(8, GFP_KERNEL);
271	if (buf) {
272		snprintf(buf, 8, "%u",
273				ntohs(addr->sin_port));
274	}
275	xprt->address_strings[RPC_DISPLAY_PORT] = buf;
276
277	if (xprt->prot == IPPROTO_UDP)
278		xprt->address_strings[RPC_DISPLAY_PROTO] = "udp";
279	else
280		xprt->address_strings[RPC_DISPLAY_PROTO] = "tcp";
281
282	buf = kzalloc(48, GFP_KERNEL);
283	if (buf) {
284		snprintf(buf, 48, "addr=%u.%u.%u.%u port=%u proto=%s",
285			NIPQUAD(addr->sin_addr.s_addr),
286			ntohs(addr->sin_port),
287			xprt->prot == IPPROTO_UDP ? "udp" : "tcp");
288	}
289	xprt->address_strings[RPC_DISPLAY_ALL] = buf;
290}
291
292static void xs_free_peer_addresses(struct rpc_xprt *xprt)
293{
294	kfree(xprt->address_strings[RPC_DISPLAY_ADDR]);
295	kfree(xprt->address_strings[RPC_DISPLAY_PORT]);
296	kfree(xprt->address_strings[RPC_DISPLAY_ALL]);
297}
298
299#define XS_SENDMSG_FLAGS	(MSG_DONTWAIT | MSG_NOSIGNAL)
300
301static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
302{
303	struct msghdr msg = {
304		.msg_name	= addr,
305		.msg_namelen	= addrlen,
306		.msg_flags	= XS_SENDMSG_FLAGS | (more ? MSG_MORE : 0),
307	};
308	struct kvec iov = {
309		.iov_base	= vec->iov_base + base,
310		.iov_len	= vec->iov_len - base,
311	};
312
313	if (iov.iov_len != 0)
314		return kernel_sendmsg(sock, &msg, &iov, 1, iov.iov_len);
315	return kernel_sendmsg(sock, &msg, NULL, 0, 0);
316}
317
318static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned int base, int more)
319{
320	struct page **ppage;
321	unsigned int remainder;
322	int err, sent = 0;
323
324	remainder = xdr->page_len - base;
325	base += xdr->page_base;
326	ppage = xdr->pages + (base >> PAGE_SHIFT);
327	base &= ~PAGE_MASK;
328	for(;;) {
329		unsigned int len = min_t(unsigned int, PAGE_SIZE - base, remainder);
330		int flags = XS_SENDMSG_FLAGS;
331
332		remainder -= len;
333		if (remainder != 0 || more)
334			flags |= MSG_MORE;
335		err = sock->ops->sendpage(sock, *ppage, base, len, flags);
336		if (remainder == 0 || err != len)
337			break;
338		sent += err;
339		ppage++;
340		base = 0;
341	}
342	if (sent == 0)
343		return err;
344	if (err > 0)
345		sent += err;
346	return sent;
347}
348
349/**
350 * xs_sendpages - write pages directly to a socket
351 * @sock: socket to send on
352 * @addr: UDP only -- address of destination
353 * @addrlen: UDP only -- length of destination address
354 * @xdr: buffer containing this request
355 * @base: starting position in the buffer
356 *
357 */
358static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen, struct xdr_buf *xdr, unsigned int base)
359{
360	unsigned int remainder = xdr->len - base;
361	int err, sent = 0;
362
363	if (unlikely(!sock))
364		return -ENOTCONN;
365
366	clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
367	if (base != 0) {
368		addr = NULL;
369		addrlen = 0;
370	}
371
372	if (base < xdr->head[0].iov_len || addr != NULL) {
373		unsigned int len = xdr->head[0].iov_len - base;
374		remainder -= len;
375		err = xs_send_kvec(sock, addr, addrlen, &xdr->head[0], base, remainder != 0);
376		if (remainder == 0 || err != len)
377			goto out;
378		sent += err;
379		base = 0;
380	} else
381		base -= xdr->head[0].iov_len;
382
383	if (base < xdr->page_len) {
384		unsigned int len = xdr->page_len - base;
385		remainder -= len;
386		err = xs_send_pagedata(sock, xdr, base, remainder != 0);
387		if (remainder == 0 || err != len)
388			goto out;
389		sent += err;
390		base = 0;
391	} else
392		base -= xdr->page_len;
393
394	if (base >= xdr->tail[0].iov_len)
395		return sent;
396	err = xs_send_kvec(sock, NULL, 0, &xdr->tail[0], base, 0);
397out:
398	if (sent == 0)
399		return err;
400	if (err > 0)
401		sent += err;
402	return sent;
403}
404
405/**
406 * xs_nospace - place task on wait queue if transmit was incomplete
407 * @task: task to put to sleep
408 *
409 */
410static void xs_nospace(struct rpc_task *task)
411{
412	struct rpc_rqst *req = task->tk_rqstp;
413	struct rpc_xprt *xprt = req->rq_xprt;
414	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
415
416	dprintk("RPC: %5u xmit incomplete (%u left of %u)\n",
417			task->tk_pid, req->rq_slen - req->rq_bytes_sent,
418			req->rq_slen);
419
420	if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
421		/* Protect against races with write_space */
422		spin_lock_bh(&xprt->transport_lock);
423
424		/* Don't race with disconnect */
425		if (!xprt_connected(xprt))
426			task->tk_status = -ENOTCONN;
427		else if (test_bit(SOCK_NOSPACE, &transport->sock->flags))
428			xprt_wait_for_buffer_space(task);
429
430		spin_unlock_bh(&xprt->transport_lock);
431	} else
432		/* Keep holding the socket if it is blocked */
433		rpc_delay(task, HZ>>4);
434}
435
436/**
437 * xs_udp_send_request - write an RPC request to a UDP socket
438 * @task: address of RPC task that manages the state of an RPC request
439 *
440 * Return values:
441 *        0:	The request has been sent
442 *   EAGAIN:	The socket was blocked, please call again later to
443 *		complete the request
444 * ENOTCONN:	Caller needs to invoke connect logic then call again
445 *    other:	Some other error occured, the request was not sent
446 */
447static int xs_udp_send_request(struct rpc_task *task)
448{
449	struct rpc_rqst *req = task->tk_rqstp;
450	struct rpc_xprt *xprt = req->rq_xprt;
451	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
452	struct xdr_buf *xdr = &req->rq_snd_buf;
453	int status;
454
455	xs_pktdump("packet data:",
456				req->rq_svec->iov_base,
457				req->rq_svec->iov_len);
458
459	req->rq_xtime = jiffies;
460	status = xs_sendpages(transport->sock,
461			      (struct sockaddr *) &xprt->addr,
462			      xprt->addrlen, xdr,
463			      req->rq_bytes_sent);
464
465	dprintk("RPC:       xs_udp_send_request(%u) = %d\n",
466			xdr->len - req->rq_bytes_sent, status);
467
468	if (likely(status >= (int) req->rq_slen))
469		return 0;
470
471	/* Still some bytes left; set up for a retry later. */
472	if (status > 0)
473		status = -EAGAIN;
474
475	switch (status) {
476	case -ENETUNREACH:
477	case -EPIPE:
478	case -ECONNREFUSED:
479		/* When the server has died, an ICMP port unreachable message
480		 * prompts ECONNREFUSED. */
481		break;
482	case -EAGAIN:
483		xs_nospace(task);
484		break;
485	default:
486		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
487			-status);
488		break;
489	}
490
491	return status;
492}
493
494static inline void xs_encode_tcp_record_marker(struct xdr_buf *buf)
495{
496	u32 reclen = buf->len - sizeof(rpc_fraghdr);
497	rpc_fraghdr *base = buf->head[0].iov_base;
498	*base = htonl(RPC_LAST_STREAM_FRAGMENT | reclen);
499}
500
501static int xs_tcp_send_request(struct rpc_task *task)
502{
503	struct rpc_rqst *req = task->tk_rqstp;
504	struct rpc_xprt *xprt = req->rq_xprt;
505	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
506	struct xdr_buf *xdr = &req->rq_snd_buf;
507	int status, retry = 0;
508
509	xs_encode_tcp_record_marker(&req->rq_snd_buf);
510
511	xs_pktdump("packet data:",
512				req->rq_svec->iov_base,
513				req->rq_svec->iov_len);
514
515	/* Continue transmitting the packet/record. We must be careful
516	 * to cope with writespace callbacks arriving _after_ we have
517	 * called sendmsg(). */
518	while (1) {
519		req->rq_xtime = jiffies;
520		status = xs_sendpages(transport->sock,
521					NULL, 0, xdr, req->rq_bytes_sent);
522
523		dprintk("RPC:       xs_tcp_send_request(%u) = %d\n",
524				xdr->len - req->rq_bytes_sent, status);
525
526		if (unlikely(status < 0))
527			break;
528
529		/* If we've sent the entire packet, immediately
530		 * reset the count of bytes sent. */
531		req->rq_bytes_sent += status;
532		task->tk_bytes_sent += status;
533		if (likely(req->rq_bytes_sent >= req->rq_slen)) {
534			req->rq_bytes_sent = 0;
535			return 0;
536		}
537
538		status = -EAGAIN;
539		if (retry++ > XS_SENDMSG_RETRY)
540			break;
541	}
542
543	switch (status) {
544	case -EAGAIN:
545		xs_nospace(task);
546		break;
547	case -ECONNREFUSED:
548	case -ECONNRESET:
549	case -ENOTCONN:
550	case -EPIPE:
551		status = -ENOTCONN;
552		break;
553	default:
554		dprintk("RPC:       sendmsg returned unrecognized error %d\n",
555			-status);
556		xprt_disconnect(xprt);
557		break;
558	}
559
560	return status;
561}
562
563/**
564 * xs_tcp_release_xprt - clean up after a tcp transmission
565 * @xprt: transport
566 * @task: rpc task
567 *
568 * This cleans up if an error causes us to abort the transmission of a request.
569 * In this case, the socket may need to be reset in order to avoid confusing
570 * the server.
571 */
572static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
573{
574	struct rpc_rqst *req;
575
576	if (task != xprt->snd_task)
577		return;
578	if (task == NULL)
579		goto out_release;
580	req = task->tk_rqstp;
581	if (req->rq_bytes_sent == 0)
582		goto out_release;
583	if (req->rq_bytes_sent == req->rq_snd_buf.len)
584		goto out_release;
585	set_bit(XPRT_CLOSE_WAIT, &task->tk_xprt->state);
586out_release:
587	xprt_release_xprt(xprt, task);
588}
589
590/**
591 * xs_close - close a socket
592 * @xprt: transport
593 *
594 * This is used when all requests are complete; ie, no DRC state remains
595 * on the server we want to save.
596 */
597static void xs_close(struct rpc_xprt *xprt)
598{
599	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
600	struct socket *sock = transport->sock;
601	struct sock *sk = transport->inet;
602
603	if (!sk)
604		goto clear_close_wait;
605
606	dprintk("RPC:       xs_close xprt %p\n", xprt);
607
608	write_lock_bh(&sk->sk_callback_lock);
609	transport->inet = NULL;
610	transport->sock = NULL;
611
612	sk->sk_user_data = NULL;
613	sk->sk_data_ready = transport->old_data_ready;
614	sk->sk_state_change = transport->old_state_change;
615	sk->sk_write_space = transport->old_write_space;
616	write_unlock_bh(&sk->sk_callback_lock);
617
618	sk->sk_no_check = 0;
619
620	sock_release(sock);
621clear_close_wait:
622	smp_mb__before_clear_bit();
623	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
624	smp_mb__after_clear_bit();
625}
626
627/**
628 * xs_destroy - prepare to shutdown a transport
629 * @xprt: doomed transport
630 *
631 */
632static void xs_destroy(struct rpc_xprt *xprt)
633{
634	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
635
636	dprintk("RPC:       xs_destroy xprt %p\n", xprt);
637
638	cancel_delayed_work(&transport->connect_worker);
639	flush_scheduled_work();
640
641	xprt_disconnect(xprt);
642	xs_close(xprt);
643	xs_free_peer_addresses(xprt);
644	kfree(xprt->slot);
645	kfree(xprt);
646}
647
648static inline struct rpc_xprt *xprt_from_sock(struct sock *sk)
649{
650	return (struct rpc_xprt *) sk->sk_user_data;
651}
652
653/**
654 * xs_udp_data_ready - "data ready" callback for UDP sockets
655 * @sk: socket with data to read
656 * @len: how much data to read
657 *
658 */
659static void xs_udp_data_ready(struct sock *sk, int len)
660{
661	struct rpc_task *task;
662	struct rpc_xprt *xprt;
663	struct rpc_rqst *rovr;
664	struct sk_buff *skb;
665	int err, repsize, copied;
666	u32 _xid;
667	__be32 *xp;
668
669	read_lock(&sk->sk_callback_lock);
670	dprintk("RPC:       xs_udp_data_ready...\n");
671	if (!(xprt = xprt_from_sock(sk)))
672		goto out;
673
674	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
675		goto out;
676
677	if (xprt->shutdown)
678		goto dropit;
679
680	repsize = skb->len - sizeof(struct udphdr);
681	if (repsize < 4) {
682		dprintk("RPC:       impossible RPC reply size %d!\n", repsize);
683		goto dropit;
684	}
685
686	/* Copy the XID from the skb... */
687	xp = skb_header_pointer(skb, sizeof(struct udphdr),
688				sizeof(_xid), &_xid);
689	if (xp == NULL)
690		goto dropit;
691
692	/* Look up and lock the request corresponding to the given XID */
693	spin_lock(&xprt->transport_lock);
694	rovr = xprt_lookup_rqst(xprt, *xp);
695	if (!rovr)
696		goto out_unlock;
697	task = rovr->rq_task;
698
699	if ((copied = rovr->rq_private_buf.buflen) > repsize)
700		copied = repsize;
701
702	/* Suck it into the iovec, verify checksum if not done by hw. */
703	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))
704		goto out_unlock;
705
706	/* Something worked... */
707	dst_confirm(skb->dst);
708
709	xprt_adjust_cwnd(task, copied);
710	xprt_update_rtt(task);
711	xprt_complete_rqst(task, copied);
712
713 out_unlock:
714	spin_unlock(&xprt->transport_lock);
715 dropit:
716	skb_free_datagram(sk, skb);
717 out:
718	read_unlock(&sk->sk_callback_lock);
719}
720
721static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
722{
723	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
724	size_t len, used;
725	char *p;
726
727	p = ((char *) &transport->tcp_fraghdr) + transport->tcp_offset;
728	len = sizeof(transport->tcp_fraghdr) - transport->tcp_offset;
729	used = xdr_skb_read_bits(desc, p, len);
730	transport->tcp_offset += used;
731	if (used != len)
732		return;
733
734	transport->tcp_reclen = ntohl(transport->tcp_fraghdr);
735	if (transport->tcp_reclen & RPC_LAST_STREAM_FRAGMENT)
736		transport->tcp_flags |= TCP_RCV_LAST_FRAG;
737	else
738		transport->tcp_flags &= ~TCP_RCV_LAST_FRAG;
739	transport->tcp_reclen &= RPC_FRAGMENT_SIZE_MASK;
740
741	transport->tcp_flags &= ~TCP_RCV_COPY_FRAGHDR;
742	transport->tcp_offset = 0;
743
744	/* Sanity check of the record length */
745	if (unlikely(transport->tcp_reclen < 4)) {
746		dprintk("RPC:       invalid TCP record fragment length\n");
747		xprt_disconnect(xprt);
748		return;
749	}
750	dprintk("RPC:       reading TCP record fragment of length %d\n",
751			transport->tcp_reclen);
752}
753
754static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
755{
756	if (transport->tcp_offset == transport->tcp_reclen) {
757		transport->tcp_flags |= TCP_RCV_COPY_FRAGHDR;
758		transport->tcp_offset = 0;
759		if (transport->tcp_flags & TCP_RCV_LAST_FRAG) {
760			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
761			transport->tcp_flags |= TCP_RCV_COPY_XID;
762			transport->tcp_copied = 0;
763		}
764	}
765}
766
767static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
768{
769	size_t len, used;
770	char *p;
771
772	len = sizeof(transport->tcp_xid) - transport->tcp_offset;
773	dprintk("RPC:       reading XID (%Zu bytes)\n", len);
774	p = ((char *) &transport->tcp_xid) + transport->tcp_offset;
775	used = xdr_skb_read_bits(desc, p, len);
776	transport->tcp_offset += used;
777	if (used != len)
778		return;
779	transport->tcp_flags &= ~TCP_RCV_COPY_XID;
780	transport->tcp_flags |= TCP_RCV_COPY_DATA;
781	transport->tcp_copied = 4;
782	dprintk("RPC:       reading reply for XID %08x\n",
783			ntohl(transport->tcp_xid));
784	xs_tcp_check_fraghdr(transport);
785}
786
787static inline void xs_tcp_read_request(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
788{
789	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
790	struct rpc_rqst *req;
791	struct xdr_buf *rcvbuf;
792	size_t len;
793	ssize_t r;
794
795	/* Find and lock the request corresponding to this xid */
796	spin_lock(&xprt->transport_lock);
797	req = xprt_lookup_rqst(xprt, transport->tcp_xid);
798	if (!req) {
799		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
800		dprintk("RPC:       XID %08x request not found!\n",
801				ntohl(transport->tcp_xid));
802		spin_unlock(&xprt->transport_lock);
803		return;
804	}
805
806	rcvbuf = &req->rq_private_buf;
807	len = desc->count;
808	if (len > transport->tcp_reclen - transport->tcp_offset) {
809		struct xdr_skb_reader my_desc;
810
811		len = transport->tcp_reclen - transport->tcp_offset;
812		memcpy(&my_desc, desc, sizeof(my_desc));
813		my_desc.count = len;
814		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
815					  &my_desc, xdr_skb_read_bits);
816		desc->count -= r;
817		desc->offset += r;
818	} else
819		r = xdr_partial_copy_from_skb(rcvbuf, transport->tcp_copied,
820					  desc, xdr_skb_read_bits);
821
822	if (r > 0) {
823		transport->tcp_copied += r;
824		transport->tcp_offset += r;
825	}
826	if (r != len) {
827		/* Error when copying to the receive buffer,
828		 * usually because we weren't able to allocate
829		 * additional buffer pages. All we can do now
830		 * is turn off TCP_RCV_COPY_DATA, so the request
831		 * will not receive any additional updates,
832		 * and time out.
833		 * Any remaining data from this record will
834		 * be discarded.
835		 */
836		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
837		dprintk("RPC:       XID %08x truncated request\n",
838				ntohl(transport->tcp_xid));
839		dprintk("RPC:       xprt = %p, tcp_copied = %lu, "
840				"tcp_offset = %u, tcp_reclen = %u\n",
841				xprt, transport->tcp_copied,
842				transport->tcp_offset, transport->tcp_reclen);
843		goto out;
844	}
845
846	dprintk("RPC:       XID %08x read %Zd bytes\n",
847			ntohl(transport->tcp_xid), r);
848	dprintk("RPC:       xprt = %p, tcp_copied = %lu, tcp_offset = %u, "
849			"tcp_reclen = %u\n", xprt, transport->tcp_copied,
850			transport->tcp_offset, transport->tcp_reclen);
851
852	if (transport->tcp_copied == req->rq_private_buf.buflen)
853		transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
854	else if (transport->tcp_offset == transport->tcp_reclen) {
855		if (transport->tcp_flags & TCP_RCV_LAST_FRAG)
856			transport->tcp_flags &= ~TCP_RCV_COPY_DATA;
857	}
858
859out:
860	if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
861		xprt_complete_rqst(req->rq_task, transport->tcp_copied);
862	spin_unlock(&xprt->transport_lock);
863	xs_tcp_check_fraghdr(transport);
864}
865
866static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
867{
868	size_t len;
869
870	len = transport->tcp_reclen - transport->tcp_offset;
871	if (len > desc->count)
872		len = desc->count;
873	desc->count -= len;
874	desc->offset += len;
875	transport->tcp_offset += len;
876	dprintk("RPC:       discarded %Zu bytes\n", len);
877	xs_tcp_check_fraghdr(transport);
878}
879
880static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
881{
882	struct rpc_xprt *xprt = rd_desc->arg.data;
883	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
884	struct xdr_skb_reader desc = {
885		.skb	= skb,
886		.offset	= offset,
887		.count	= len,
888	};
889
890	dprintk("RPC:       xs_tcp_data_recv started\n");
891	do {
892		/* Read in a new fragment marker if necessary */
893		/* Can we ever really expect to get completely empty fragments? */
894		if (transport->tcp_flags & TCP_RCV_COPY_FRAGHDR) {
895			xs_tcp_read_fraghdr(xprt, &desc);
896			continue;
897		}
898		/* Read in the xid if necessary */
899		if (transport->tcp_flags & TCP_RCV_COPY_XID) {
900			xs_tcp_read_xid(transport, &desc);
901			continue;
902		}
903		/* Read in the request data */
904		if (transport->tcp_flags & TCP_RCV_COPY_DATA) {
905			xs_tcp_read_request(xprt, &desc);
906			continue;
907		}
908		/* Skip over any trailing bytes on short reads */
909		xs_tcp_read_discard(transport, &desc);
910	} while (desc.count);
911	dprintk("RPC:       xs_tcp_data_recv done\n");
912	return len - desc.count;
913}
914
915/**
916 * xs_tcp_data_ready - "data ready" callback for TCP sockets
917 * @sk: socket with data to read
918 * @bytes: how much data to read
919 *
920 */
921static void xs_tcp_data_ready(struct sock *sk, int bytes)
922{
923	struct rpc_xprt *xprt;
924	read_descriptor_t rd_desc;
925
926	dprintk("RPC:       xs_tcp_data_ready...\n");
927
928	read_lock(&sk->sk_callback_lock);
929	if (!(xprt = xprt_from_sock(sk)))
930		goto out;
931	if (xprt->shutdown)
932		goto out;
933
934	/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
935	rd_desc.arg.data = xprt;
936	rd_desc.count = 65536;
937	tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
938out:
939	read_unlock(&sk->sk_callback_lock);
940}
941
942/**
943 * xs_tcp_state_change - callback to handle TCP socket state changes
944 * @sk: socket whose state has changed
945 *
946 */
947static void xs_tcp_state_change(struct sock *sk)
948{
949	struct rpc_xprt *xprt;
950
951	read_lock(&sk->sk_callback_lock);
952	if (!(xprt = xprt_from_sock(sk)))
953		goto out;
954	dprintk("RPC:       xs_tcp_state_change client %p...\n", xprt);
955	dprintk("RPC:       state %x conn %d dead %d zapped %d\n",
956			sk->sk_state, xprt_connected(xprt),
957			sock_flag(sk, SOCK_DEAD),
958			sock_flag(sk, SOCK_ZAPPED));
959
960	switch (sk->sk_state) {
961	case TCP_ESTABLISHED:
962		spin_lock_bh(&xprt->transport_lock);
963		if (!xprt_test_and_set_connected(xprt)) {
964			struct sock_xprt *transport = container_of(xprt,
965					struct sock_xprt, xprt);
966
967			/* Reset TCP record info */
968			transport->tcp_offset = 0;
969			transport->tcp_reclen = 0;
970			transport->tcp_copied = 0;
971			transport->tcp_flags =
972				TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
973
974			xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
975			xprt_wake_pending_tasks(xprt, 0);
976		}
977		spin_unlock_bh(&xprt->transport_lock);
978		break;
979	case TCP_SYN_SENT:
980	case TCP_SYN_RECV:
981		break;
982	case TCP_CLOSE_WAIT:
983		/* Try to schedule an autoclose RPC calls */
984		set_bit(XPRT_CLOSE_WAIT, &xprt->state);
985		if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
986			schedule_work(&xprt->task_cleanup);
987	default:
988		xprt_disconnect(xprt);
989	}
990 out:
991	read_unlock(&sk->sk_callback_lock);
992}
993
994/**
995 * xs_udp_write_space - callback invoked when socket buffer space
996 *                             becomes available
997 * @sk: socket whose state has changed
998 *
999 * Called when more output buffer space is available for this socket.
1000 * We try not to wake our writers until they can make "significant"
1001 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1002 * with a bunch of small requests.
1003 */
1004static void xs_udp_write_space(struct sock *sk)
1005{
1006	read_lock(&sk->sk_callback_lock);
1007
1008	/* from net/core/sock.c:sock_def_write_space */
1009	if (sock_writeable(sk)) {
1010		struct socket *sock;
1011		struct rpc_xprt *xprt;
1012
1013		if (unlikely(!(sock = sk->sk_socket)))
1014			goto out;
1015		if (unlikely(!(xprt = xprt_from_sock(sk))))
1016			goto out;
1017		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
1018			goto out;
1019
1020		xprt_write_space(xprt);
1021	}
1022
1023 out:
1024	read_unlock(&sk->sk_callback_lock);
1025}
1026
1027/**
1028 * xs_tcp_write_space - callback invoked when socket buffer space
1029 *                             becomes available
1030 * @sk: socket whose state has changed
1031 *
1032 * Called when more output buffer space is available for this socket.
1033 * We try not to wake our writers until they can make "significant"
1034 * progress, otherwise we'll waste resources thrashing kernel_sendmsg
1035 * with a bunch of small requests.
1036 */
1037static void xs_tcp_write_space(struct sock *sk)
1038{
1039	read_lock(&sk->sk_callback_lock);
1040
1041	/* from net/core/stream.c:sk_stream_write_space */
1042	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
1043		struct socket *sock;
1044		struct rpc_xprt *xprt;
1045
1046		if (unlikely(!(sock = sk->sk_socket)))
1047			goto out;
1048		if (unlikely(!(xprt = xprt_from_sock(sk))))
1049			goto out;
1050		if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
1051			goto out;
1052
1053		xprt_write_space(xprt);
1054	}
1055
1056 out:
1057	read_unlock(&sk->sk_callback_lock);
1058}
1059
1060static void xs_udp_do_set_buffer_size(struct rpc_xprt *xprt)
1061{
1062	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1063	struct sock *sk = transport->inet;
1064
1065	if (transport->rcvsize) {
1066		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
1067		sk->sk_rcvbuf = transport->rcvsize * xprt->max_reqs * 2;
1068	}
1069	if (transport->sndsize) {
1070		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
1071		sk->sk_sndbuf = transport->sndsize * xprt->max_reqs * 2;
1072		sk->sk_write_space(sk);
1073	}
1074}
1075
1076/**
1077 * xs_udp_set_buffer_size - set send and receive limits
1078 * @xprt: generic transport
1079 * @sndsize: requested size of send buffer, in bytes
1080 * @rcvsize: requested size of receive buffer, in bytes
1081 *
1082 * Set socket send and receive buffer size limits.
1083 */
1084static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t rcvsize)
1085{
1086	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1087
1088	transport->sndsize = 0;
1089	if (sndsize)
1090		transport->sndsize = sndsize + 1024;
1091	transport->rcvsize = 0;
1092	if (rcvsize)
1093		transport->rcvsize = rcvsize + 1024;
1094
1095	xs_udp_do_set_buffer_size(xprt);
1096}
1097
1098/**
1099 * xs_udp_timer - called when a retransmit timeout occurs on a UDP transport
1100 * @task: task that timed out
1101 *
1102 * Adjust the congestion window after a retransmit timeout has occurred.
1103 */
1104static void xs_udp_timer(struct rpc_task *task)
1105{
1106	xprt_adjust_cwnd(task, -ETIMEDOUT);
1107}
1108
1109static unsigned short xs_get_random_port(void)
1110{
1111	unsigned short range = xprt_max_resvport - xprt_min_resvport;
1112	unsigned short rand = (unsigned short) net_random() % range;
1113	return rand + xprt_min_resvport;
1114}
1115
1116/**
1117 * xs_set_port - reset the port number in the remote endpoint address
1118 * @xprt: generic transport
1119 * @port: new port number
1120 *
1121 */
1122static void xs_set_port(struct rpc_xprt *xprt, unsigned short port)
1123{
1124	struct sockaddr_in *sap = (struct sockaddr_in *) &xprt->addr;
1125
1126	dprintk("RPC:       setting port for xprt %p to %u\n", xprt, port);
1127
1128	sap->sin_port = htons(port);
1129}
1130
1131static int xs_bindresvport(struct sock_xprt *transport, struct socket *sock)
1132{
1133	struct sockaddr_in myaddr = {
1134		.sin_family = AF_INET,
1135	};
1136	int err;
1137	unsigned short port = transport->port;
1138
1139	do {
1140		myaddr.sin_port = htons(port);
1141		err = kernel_bind(sock, (struct sockaddr *) &myaddr,
1142						sizeof(myaddr));
1143		if (err == 0) {
1144			transport->port = port;
1145			dprintk("RPC:       xs_bindresvport bound to port %u\n",
1146					port);
1147			return 0;
1148		}
1149		if (port <= xprt_min_resvport)
1150			port = xprt_max_resvport;
1151		else
1152			port--;
1153	} while (err == -EADDRINUSE && port != transport->port);
1154
1155	dprintk("RPC:       can't bind to reserved port (%d).\n", -err);
1156	return err;
1157}
1158
1159#ifdef CONFIG_DEBUG_LOCK_ALLOC
1160static struct lock_class_key xs_key[2];
1161static struct lock_class_key xs_slock_key[2];
1162
1163static inline void xs_reclassify_socket(struct socket *sock)
1164{
1165	struct sock *sk = sock->sk;
1166	BUG_ON(sk->sk_lock.owner != NULL);
1167	switch (sk->sk_family) {
1168	case AF_INET:
1169		sock_lock_init_class_and_name(sk, "slock-AF_INET-NFS",
1170			&xs_slock_key[0], "sk_lock-AF_INET-NFS", &xs_key[0]);
1171		break;
1172
1173	case AF_INET6:
1174		sock_lock_init_class_and_name(sk, "slock-AF_INET6-NFS",
1175			&xs_slock_key[1], "sk_lock-AF_INET6-NFS", &xs_key[1]);
1176		break;
1177
1178	default:
1179		BUG();
1180	}
1181}
1182#else
1183static inline void xs_reclassify_socket(struct socket *sock)
1184{
1185}
1186#endif
1187
1188/**
1189 * xs_udp_connect_worker - set up a UDP socket
1190 * @work: RPC transport to connect
1191 *
1192 * Invoked by a work queue tasklet.
1193 */
1194static void xs_udp_connect_worker(struct work_struct *work)
1195{
1196	struct sock_xprt *transport =
1197		container_of(work, struct sock_xprt, connect_worker.work);
1198	struct rpc_xprt *xprt = &transport->xprt;
1199	struct socket *sock = transport->sock;
1200	int err, status = -EIO;
1201
1202	if (xprt->shutdown || !xprt_bound(xprt))
1203		goto out;
1204
1205	/* Start by resetting any existing state */
1206	xs_close(xprt);
1207
1208	if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
1209		dprintk("RPC:       can't create UDP transport socket (%d).\n", -err);
1210		goto out;
1211	}
1212	xs_reclassify_socket(sock);
1213
1214	if (xprt->resvport && xs_bindresvport(transport, sock) < 0) {
1215		sock_release(sock);
1216		goto out;
1217	}
1218
1219	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1220			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1221
1222	if (!transport->inet) {
1223		struct sock *sk = sock->sk;
1224
1225		write_lock_bh(&sk->sk_callback_lock);
1226
1227		sk->sk_user_data = xprt;
1228		transport->old_data_ready = sk->sk_data_ready;
1229		transport->old_state_change = sk->sk_state_change;
1230		transport->old_write_space = sk->sk_write_space;
1231		sk->sk_data_ready = xs_udp_data_ready;
1232		sk->sk_write_space = xs_udp_write_space;
1233		sk->sk_no_check = UDP_CSUM_NORCV;
1234		sk->sk_allocation = GFP_ATOMIC;
1235
1236		xprt_set_connected(xprt);
1237
1238		/* Reset to new socket */
1239		transport->sock = sock;
1240		transport->inet = sk;
1241
1242		write_unlock_bh(&sk->sk_callback_lock);
1243	}
1244	xs_udp_do_set_buffer_size(xprt);
1245	status = 0;
1246out:
1247	xprt_wake_pending_tasks(xprt, status);
1248	xprt_clear_connecting(xprt);
1249}
1250
1251/*
1252 * We need to preserve the port number so the reply cache on the server can
1253 * find our cached RPC replies when we get around to reconnecting.
1254 */
1255static void xs_tcp_reuse_connection(struct rpc_xprt *xprt)
1256{
1257	int result;
1258	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1259	struct sockaddr any;
1260
1261	dprintk("RPC:       disconnecting xprt %p to reuse port\n", xprt);
1262
1263	/*
1264	 * Disconnect the transport socket by doing a connect operation
1265	 * with AF_UNSPEC.  This should return immediately...
1266	 */
1267	memset(&any, 0, sizeof(any));
1268	any.sa_family = AF_UNSPEC;
1269	result = kernel_connect(transport->sock, &any, sizeof(any), 0);
1270	if (result)
1271		dprintk("RPC:       AF_UNSPEC connect return code %d\n",
1272				result);
1273}
1274
1275/**
1276 * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
1277 * @work: RPC transport to connect
1278 *
1279 * Invoked by a work queue tasklet.
1280 */
1281static void xs_tcp_connect_worker(struct work_struct *work)
1282{
1283	struct sock_xprt *transport =
1284		container_of(work, struct sock_xprt, connect_worker.work);
1285	struct rpc_xprt *xprt = &transport->xprt;
1286	struct socket *sock = transport->sock;
1287	int err, status = -EIO;
1288
1289	if (xprt->shutdown || !xprt_bound(xprt))
1290		goto out;
1291
1292	if (!sock) {
1293		/* start from scratch */
1294		if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
1295			dprintk("RPC:       can't create TCP transport "
1296					"socket (%d).\n", -err);
1297			goto out;
1298		}
1299		xs_reclassify_socket(sock);
1300
1301		if (xprt->resvport && xs_bindresvport(transport, sock) < 0) {
1302			sock_release(sock);
1303			goto out;
1304		}
1305	} else
1306		/* "close" the socket, preserving the local port */
1307		xs_tcp_reuse_connection(xprt);
1308
1309	dprintk("RPC:       worker connecting xprt %p to address: %s\n",
1310			xprt, xprt->address_strings[RPC_DISPLAY_ALL]);
1311
1312	if (!transport->inet) {
1313		struct sock *sk = sock->sk;
1314
1315		write_lock_bh(&sk->sk_callback_lock);
1316
1317		sk->sk_user_data = xprt;
1318		transport->old_data_ready = sk->sk_data_ready;
1319		transport->old_state_change = sk->sk_state_change;
1320		transport->old_write_space = sk->sk_write_space;
1321		sk->sk_data_ready = xs_tcp_data_ready;
1322		sk->sk_state_change = xs_tcp_state_change;
1323		sk->sk_write_space = xs_tcp_write_space;
1324		sk->sk_allocation = GFP_ATOMIC;
1325
1326		/* socket options */
1327		sk->sk_userlocks |= SOCK_BINDPORT_LOCK;
1328		sock_reset_flag(sk, SOCK_LINGER);
1329		tcp_sk(sk)->linger2 = 0;
1330		tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
1331
1332		xprt_clear_connected(xprt);
1333
1334		/* Reset to new socket */
1335		transport->sock = sock;
1336		transport->inet = sk;
1337
1338		write_unlock_bh(&sk->sk_callback_lock);
1339	}
1340
1341	/* Tell the socket layer to start connecting... */
1342	xprt->stat.connect_count++;
1343	xprt->stat.connect_start = jiffies;
1344	status = kernel_connect(sock, (struct sockaddr *) &xprt->addr,
1345			xprt->addrlen, O_NONBLOCK);
1346	dprintk("RPC:       %p connect status %d connected %d sock state %d\n",
1347			xprt, -status, xprt_connected(xprt),
1348			sock->sk->sk_state);
1349	if (status < 0) {
1350		switch (status) {
1351			case -EINPROGRESS:
1352			case -EALREADY:
1353				goto out_clear;
1354			case -ECONNREFUSED:
1355			case -ECONNRESET:
1356				/* retry with existing socket, after a delay */
1357				break;
1358			default:
1359				/* get rid of existing socket, and retry */
1360				xs_close(xprt);
1361				break;
1362		}
1363	}
1364out:
1365	xprt_wake_pending_tasks(xprt, status);
1366out_clear:
1367	xprt_clear_connecting(xprt);
1368}
1369
1370/**
1371 * xs_connect - connect a socket to a remote endpoint
1372 * @task: address of RPC task that manages state of connect request
1373 *
1374 * TCP: If the remote end dropped the connection, delay reconnecting.
1375 *
1376 * UDP socket connects are synchronous, but we use a work queue anyway
1377 * to guarantee that even unprivileged user processes can set up a
1378 * socket on a privileged port.
1379 *
1380 * If a UDP socket connect fails, the delay behavior here prevents
1381 * retry floods (hard mounts).
1382 */
1383static void xs_connect(struct rpc_task *task)
1384{
1385	struct rpc_xprt *xprt = task->tk_xprt;
1386	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1387
1388	if (xprt_test_and_set_connecting(xprt))
1389		return;
1390
1391	if (transport->sock != NULL) {
1392		dprintk("RPC:       xs_connect delayed xprt %p for %lu "
1393				"seconds\n",
1394				xprt, xprt->reestablish_timeout / HZ);
1395		schedule_delayed_work(&transport->connect_worker,
1396					xprt->reestablish_timeout);
1397		xprt->reestablish_timeout <<= 1;
1398		if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO)
1399			xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO;
1400	} else {
1401		dprintk("RPC:       xs_connect scheduled xprt %p\n", xprt);
1402		schedule_delayed_work(&transport->connect_worker, 0);
1403
1404		/* flush_scheduled_work can sleep... */
1405		if (!RPC_IS_ASYNC(task))
1406			flush_scheduled_work();
1407	}
1408}
1409
1410/**
1411 * xs_udp_print_stats - display UDP socket-specifc stats
1412 * @xprt: rpc_xprt struct containing statistics
1413 * @seq: output file
1414 *
1415 */
1416static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1417{
1418	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1419
1420	seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n",
1421			transport->port,
1422			xprt->stat.bind_count,
1423			xprt->stat.sends,
1424			xprt->stat.recvs,
1425			xprt->stat.bad_xids,
1426			xprt->stat.req_u,
1427			xprt->stat.bklog_u);
1428}
1429
1430/**
1431 * xs_tcp_print_stats - display TCP socket-specifc stats
1432 * @xprt: rpc_xprt struct containing statistics
1433 * @seq: output file
1434 *
1435 */
1436static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
1437{
1438	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
1439	long idle_time = 0;
1440
1441	if (xprt_connected(xprt))
1442		idle_time = (long)(jiffies - xprt->last_used) / HZ;
1443
1444	seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n",
1445			transport->port,
1446			xprt->stat.bind_count,
1447			xprt->stat.connect_count,
1448			xprt->stat.connect_time,
1449			idle_time,
1450			xprt->stat.sends,
1451			xprt->stat.recvs,
1452			xprt->stat.bad_xids,
1453			xprt->stat.req_u,
1454			xprt->stat.bklog_u);
1455}
1456
1457static struct rpc_xprt_ops xs_udp_ops = {
1458	.set_buffer_size	= xs_udp_set_buffer_size,
1459	.reserve_xprt		= xprt_reserve_xprt_cong,
1460	.release_xprt		= xprt_release_xprt_cong,
1461	.rpcbind		= rpcb_getport,
1462	.set_port		= xs_set_port,
1463	.connect		= xs_connect,
1464	.buf_alloc		= rpc_malloc,
1465	.buf_free		= rpc_free,
1466	.send_request		= xs_udp_send_request,
1467	.set_retrans_timeout	= xprt_set_retrans_timeout_rtt,
1468	.timer			= xs_udp_timer,
1469	.release_request	= xprt_release_rqst_cong,
1470	.close			= xs_close,
1471	.destroy		= xs_destroy,
1472	.print_stats		= xs_udp_print_stats,
1473};
1474
1475static struct rpc_xprt_ops xs_tcp_ops = {
1476	.reserve_xprt		= xprt_reserve_xprt,
1477	.release_xprt		= xs_tcp_release_xprt,
1478	.rpcbind		= rpcb_getport,
1479	.set_port		= xs_set_port,
1480	.connect		= xs_connect,
1481	.buf_alloc		= rpc_malloc,
1482	.buf_free		= rpc_free,
1483	.send_request		= xs_tcp_send_request,
1484	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
1485	.close			= xs_close,
1486	.destroy		= xs_destroy,
1487	.print_stats		= xs_tcp_print_stats,
1488};
1489
1490static struct rpc_xprt *xs_setup_xprt(struct sockaddr *addr, size_t addrlen, unsigned int slot_table_size)
1491{
1492	struct rpc_xprt *xprt;
1493	struct sock_xprt *new;
1494
1495	if (addrlen > sizeof(xprt->addr)) {
1496		dprintk("RPC:       xs_setup_xprt: address too large\n");
1497		return ERR_PTR(-EBADF);
1498	}
1499
1500	new = kzalloc(sizeof(*new), GFP_KERNEL);
1501	if (new == NULL) {
1502		dprintk("RPC:       xs_setup_xprt: couldn't allocate "
1503				"rpc_xprt\n");
1504		return ERR_PTR(-ENOMEM);
1505	}
1506	xprt = &new->xprt;
1507
1508	xprt->max_reqs = slot_table_size;
1509	xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL);
1510	if (xprt->slot == NULL) {
1511		kfree(xprt);
1512		dprintk("RPC:       xs_setup_xprt: couldn't allocate slot "
1513				"table\n");
1514		return ERR_PTR(-ENOMEM);
1515	}
1516
1517	memcpy(&xprt->addr, addr, addrlen);
1518	xprt->addrlen = addrlen;
1519	new->port = xs_get_random_port();
1520
1521	return xprt;
1522}
1523
1524/**
1525 * xs_setup_udp - Set up transport to use a UDP socket
1526 * @addr: address of remote server
1527 * @addrlen: length of address in bytes
1528 * @to:   timeout parameters
1529 *
1530 */
1531struct rpc_xprt *xs_setup_udp(struct sockaddr *addr, size_t addrlen, struct rpc_timeout *to)
1532{
1533	struct rpc_xprt *xprt;
1534	struct sock_xprt *transport;
1535
1536	xprt = xs_setup_xprt(addr, addrlen, xprt_udp_slot_table_entries);
1537	if (IS_ERR(xprt))
1538		return xprt;
1539	transport = container_of(xprt, struct sock_xprt, xprt);
1540
1541	if (ntohs(((struct sockaddr_in *)addr)->sin_port) != 0)
1542		xprt_set_bound(xprt);
1543
1544	xprt->prot = IPPROTO_UDP;
1545	xprt->tsh_size = 0;
1546	xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
1547
1548	INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_connect_worker);
1549	xprt->bind_timeout = XS_BIND_TO;
1550	xprt->connect_timeout = XS_UDP_CONN_TO;
1551	xprt->reestablish_timeout = XS_UDP_REEST_TO;
1552	xprt->idle_timeout = XS_IDLE_DISC_TO;
1553
1554	xprt->ops = &xs_udp_ops;
1555
1556	if (to)
1557		xprt->timeout = *to;
1558	else
1559		xprt_set_timeout(&xprt->timeout, 5, 5 * HZ);
1560
1561	xs_format_peer_addresses(xprt);
1562	dprintk("RPC:       set up transport to address %s\n",
1563			xprt->address_strings[RPC_DISPLAY_ALL]);
1564
1565	return xprt;
1566}
1567
1568/**
1569 * xs_setup_tcp - Set up transport to use a TCP socket
1570 * @addr: address of remote server
1571 * @addrlen: length of address in bytes
1572 * @to: timeout parameters
1573 *
1574 */
1575struct rpc_xprt *xs_setup_tcp(struct sockaddr *addr, size_t addrlen, struct rpc_timeout *to)
1576{
1577	struct rpc_xprt *xprt;
1578	struct sock_xprt *transport;
1579
1580	xprt = xs_setup_xprt(addr, addrlen, xprt_tcp_slot_table_entries);
1581	if (IS_ERR(xprt))
1582		return xprt;
1583	transport = container_of(xprt, struct sock_xprt, xprt);
1584
1585	if (ntohs(((struct sockaddr_in *)addr)->sin_port) != 0)
1586		xprt_set_bound(xprt);
1587
1588	xprt->prot = IPPROTO_TCP;
1589	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
1590	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
1591
1592	INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker);
1593	xprt->bind_timeout = XS_BIND_TO;
1594	xprt->connect_timeout = XS_TCP_CONN_TO;
1595	xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO;
1596	xprt->idle_timeout = XS_IDLE_DISC_TO;
1597
1598	xprt->ops = &xs_tcp_ops;
1599
1600	if (to)
1601		xprt->timeout = *to;
1602	else
1603		xprt_set_timeout(&xprt->timeout, 2, 60 * HZ);
1604
1605	xs_format_peer_addresses(xprt);
1606	dprintk("RPC:       set up transport to address %s\n",
1607			xprt->address_strings[RPC_DISPLAY_ALL]);
1608
1609	return xprt;
1610}
1611
1612/**
1613 * init_socket_xprt - set up xprtsock's sysctls
1614 *
1615 */
1616int init_socket_xprt(void)
1617{
1618#ifdef RPC_DEBUG
1619	if (!sunrpc_table_header)
1620		sunrpc_table_header = register_sysctl_table(sunrpc_table);
1621#endif
1622
1623	return 0;
1624}
1625
1626/**
1627 * cleanup_socket_xprt - remove xprtsock's sysctls
1628 *
1629 */
1630void cleanup_socket_xprt(void)
1631{
1632#ifdef RPC_DEBUG
1633	if (sunrpc_table_header) {
1634		unregister_sysctl_table(sunrpc_table_header);
1635		sunrpc_table_header = NULL;
1636	}
1637#endif
1638}
1639