sys_socket.c revision 331722
1/*-
2 * Copyright (c) 1982, 1986, 1990, 1993
3 *	The Regents of the University of California.  All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 4. Neither the name of the University nor the names of its contributors
14 *    may be used to endorse or promote products derived from this software
15 *    without specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
18 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
21 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27 * SUCH DAMAGE.
28 *
29 *	@(#)sys_socket.c	8.1 (Berkeley) 6/10/93
30 */
31
32#include <sys/cdefs.h>
33__FBSDID("$FreeBSD: stable/11/sys/kern/sys_socket.c 331722 2018-03-29 02:50:57Z eadler $");
34
35#include <sys/param.h>
36#include <sys/systm.h>
37#include <sys/aio.h>
38#include <sys/domain.h>
39#include <sys/file.h>
40#include <sys/filedesc.h>
41#include <sys/kernel.h>
42#include <sys/kthread.h>
43#include <sys/malloc.h>
44#include <sys/proc.h>
45#include <sys/protosw.h>
46#include <sys/sigio.h>
47#include <sys/signal.h>
48#include <sys/signalvar.h>
49#include <sys/socket.h>
50#include <sys/socketvar.h>
51#include <sys/filio.h>			/* XXX */
52#include <sys/sockio.h>
53#include <sys/stat.h>
54#include <sys/sysctl.h>
55#include <sys/sysproto.h>
56#include <sys/taskqueue.h>
57#include <sys/uio.h>
58#include <sys/ucred.h>
59#include <sys/un.h>
60#include <sys/unpcb.h>
61#include <sys/user.h>
62
63#include <net/if.h>
64#include <net/if_var.h>
65#include <net/route.h>
66#include <net/vnet.h>
67
68#include <netinet/in.h>
69#include <netinet/in_pcb.h>
70
71#include <security/mac/mac_framework.h>
72
73#include <vm/vm.h>
74#include <vm/pmap.h>
75#include <vm/vm_extern.h>
76#include <vm/vm_map.h>
77
78static SYSCTL_NODE(_kern_ipc, OID_AUTO, aio, CTLFLAG_RD, NULL,
79    "socket AIO stats");
80
81static int empty_results;
82SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_results, CTLFLAG_RD, &empty_results,
83    0, "socket operation returned EAGAIN");
84
85static int empty_retries;
86SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_retries, CTLFLAG_RD, &empty_retries,
87    0, "socket operation retries");
88
89static fo_rdwr_t soo_read;
90static fo_rdwr_t soo_write;
91static fo_ioctl_t soo_ioctl;
92static fo_poll_t soo_poll;
93extern fo_kqfilter_t soo_kqfilter;
94static fo_stat_t soo_stat;
95static fo_close_t soo_close;
96static fo_fill_kinfo_t soo_fill_kinfo;
97static fo_aio_queue_t soo_aio_queue;
98
99static void	soo_aio_cancel(struct kaiocb *job);
100
101struct fileops	socketops = {
102	.fo_read = soo_read,
103	.fo_write = soo_write,
104	.fo_truncate = invfo_truncate,
105	.fo_ioctl = soo_ioctl,
106	.fo_poll = soo_poll,
107	.fo_kqfilter = soo_kqfilter,
108	.fo_stat = soo_stat,
109	.fo_close = soo_close,
110	.fo_chmod = invfo_chmod,
111	.fo_chown = invfo_chown,
112	.fo_sendfile = invfo_sendfile,
113	.fo_fill_kinfo = soo_fill_kinfo,
114	.fo_aio_queue = soo_aio_queue,
115	.fo_flags = DFLAG_PASSABLE
116};
117
118static int
119soo_read(struct file *fp, struct uio *uio, struct ucred *active_cred,
120    int flags, struct thread *td)
121{
122	struct socket *so = fp->f_data;
123	int error;
124
125#ifdef MAC
126	error = mac_socket_check_receive(active_cred, so);
127	if (error)
128		return (error);
129#endif
130	error = soreceive(so, 0, uio, 0, 0, 0);
131	return (error);
132}
133
134static int
135soo_write(struct file *fp, struct uio *uio, struct ucred *active_cred,
136    int flags, struct thread *td)
137{
138	struct socket *so = fp->f_data;
139	int error;
140
141#ifdef MAC
142	error = mac_socket_check_send(active_cred, so);
143	if (error)
144		return (error);
145#endif
146	error = sosend(so, 0, uio, 0, 0, 0, uio->uio_td);
147	if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) {
148		PROC_LOCK(uio->uio_td->td_proc);
149		tdsignal(uio->uio_td, SIGPIPE);
150		PROC_UNLOCK(uio->uio_td->td_proc);
151	}
152	return (error);
153}
154
155static int
156soo_ioctl(struct file *fp, u_long cmd, void *data, struct ucred *active_cred,
157    struct thread *td)
158{
159	struct socket *so = fp->f_data;
160	int error = 0;
161
162	switch (cmd) {
163	case FIONBIO:
164		SOCK_LOCK(so);
165		if (*(int *)data)
166			so->so_state |= SS_NBIO;
167		else
168			so->so_state &= ~SS_NBIO;
169		SOCK_UNLOCK(so);
170		break;
171
172	case FIOASYNC:
173		/*
174		 * XXXRW: This code separately acquires SOCK_LOCK(so) and
175		 * SOCKBUF_LOCK(&so->so_rcv) even though they are the same
176		 * mutex to avoid introducing the assumption that they are
177		 * the same.
178		 */
179		if (*(int *)data) {
180			SOCK_LOCK(so);
181			so->so_state |= SS_ASYNC;
182			SOCK_UNLOCK(so);
183			SOCKBUF_LOCK(&so->so_rcv);
184			so->so_rcv.sb_flags |= SB_ASYNC;
185			SOCKBUF_UNLOCK(&so->so_rcv);
186			SOCKBUF_LOCK(&so->so_snd);
187			so->so_snd.sb_flags |= SB_ASYNC;
188			SOCKBUF_UNLOCK(&so->so_snd);
189		} else {
190			SOCK_LOCK(so);
191			so->so_state &= ~SS_ASYNC;
192			SOCK_UNLOCK(so);
193			SOCKBUF_LOCK(&so->so_rcv);
194			so->so_rcv.sb_flags &= ~SB_ASYNC;
195			SOCKBUF_UNLOCK(&so->so_rcv);
196			SOCKBUF_LOCK(&so->so_snd);
197			so->so_snd.sb_flags &= ~SB_ASYNC;
198			SOCKBUF_UNLOCK(&so->so_snd);
199		}
200		break;
201
202	case FIONREAD:
203		/* Unlocked read. */
204		*(int *)data = sbavail(&so->so_rcv);
205		break;
206
207	case FIONWRITE:
208		/* Unlocked read. */
209		*(int *)data = sbavail(&so->so_snd);
210		break;
211
212	case FIONSPACE:
213		/* Unlocked read. */
214		if ((so->so_snd.sb_hiwat < sbused(&so->so_snd)) ||
215		    (so->so_snd.sb_mbmax < so->so_snd.sb_mbcnt))
216			*(int *)data = 0;
217		else
218			*(int *)data = sbspace(&so->so_snd);
219		break;
220
221	case FIOSETOWN:
222		error = fsetown(*(int *)data, &so->so_sigio);
223		break;
224
225	case FIOGETOWN:
226		*(int *)data = fgetown(&so->so_sigio);
227		break;
228
229	case SIOCSPGRP:
230		error = fsetown(-(*(int *)data), &so->so_sigio);
231		break;
232
233	case SIOCGPGRP:
234		*(int *)data = -fgetown(&so->so_sigio);
235		break;
236
237	case SIOCATMARK:
238		/* Unlocked read. */
239		*(int *)data = (so->so_rcv.sb_state & SBS_RCVATMARK) != 0;
240		break;
241	default:
242		/*
243		 * Interface/routing/protocol specific ioctls: interface and
244		 * routing ioctls should have a different entry since a
245		 * socket is unnecessary.
246		 */
247		if (IOCGROUP(cmd) == 'i')
248			error = ifioctl(so, cmd, data, td);
249		else if (IOCGROUP(cmd) == 'r') {
250			CURVNET_SET(so->so_vnet);
251			error = rtioctl_fib(cmd, data, so->so_fibnum);
252			CURVNET_RESTORE();
253		} else {
254			CURVNET_SET(so->so_vnet);
255			error = ((*so->so_proto->pr_usrreqs->pru_control)
256			    (so, cmd, data, 0, td));
257			CURVNET_RESTORE();
258		}
259		break;
260	}
261	return (error);
262}
263
264static int
265soo_poll(struct file *fp, int events, struct ucred *active_cred,
266    struct thread *td)
267{
268	struct socket *so = fp->f_data;
269#ifdef MAC
270	int error;
271
272	error = mac_socket_check_poll(active_cred, so);
273	if (error)
274		return (error);
275#endif
276	return (sopoll(so, events, fp->f_cred, td));
277}
278
279static int
280soo_stat(struct file *fp, struct stat *ub, struct ucred *active_cred,
281    struct thread *td)
282{
283	struct socket *so = fp->f_data;
284	struct sockbuf *sb;
285#ifdef MAC
286	int error;
287#endif
288
289	bzero((caddr_t)ub, sizeof (*ub));
290	ub->st_mode = S_IFSOCK;
291#ifdef MAC
292	error = mac_socket_check_stat(active_cred, so);
293	if (error)
294		return (error);
295#endif
296	/*
297	 * If SBS_CANTRCVMORE is set, but there's still data left in the
298	 * receive buffer, the socket is still readable.
299	 */
300	sb = &so->so_rcv;
301	SOCKBUF_LOCK(sb);
302	if ((sb->sb_state & SBS_CANTRCVMORE) == 0 || sbavail(sb))
303		ub->st_mode |= S_IRUSR | S_IRGRP | S_IROTH;
304	ub->st_size = sbavail(sb) - sb->sb_ctl;
305	SOCKBUF_UNLOCK(sb);
306
307	sb = &so->so_snd;
308	SOCKBUF_LOCK(sb);
309	if ((sb->sb_state & SBS_CANTSENDMORE) == 0)
310		ub->st_mode |= S_IWUSR | S_IWGRP | S_IWOTH;
311	SOCKBUF_UNLOCK(sb);
312	ub->st_uid = so->so_cred->cr_uid;
313	ub->st_gid = so->so_cred->cr_gid;
314	return (*so->so_proto->pr_usrreqs->pru_sense)(so, ub);
315}
316
317/*
318 * API socket close on file pointer.  We call soclose() to close the socket
319 * (including initiating closing protocols).  soclose() will sorele() the
320 * file reference but the actual socket will not go away until the socket's
321 * ref count hits 0.
322 */
323static int
324soo_close(struct file *fp, struct thread *td)
325{
326	int error = 0;
327	struct socket *so;
328
329	so = fp->f_data;
330	fp->f_ops = &badfileops;
331	fp->f_data = NULL;
332
333	if (so)
334		error = soclose(so);
335	return (error);
336}
337
338static int
339soo_fill_kinfo(struct file *fp, struct kinfo_file *kif, struct filedesc *fdp)
340{
341	struct sockaddr *sa;
342	struct inpcb *inpcb;
343	struct unpcb *unpcb;
344	struct socket *so;
345	int error;
346
347	kif->kf_type = KF_TYPE_SOCKET;
348	so = fp->f_data;
349	kif->kf_sock_domain = so->so_proto->pr_domain->dom_family;
350	kif->kf_sock_type = so->so_type;
351	kif->kf_sock_protocol = so->so_proto->pr_protocol;
352	kif->kf_un.kf_sock.kf_sock_pcb = (uintptr_t)so->so_pcb;
353	switch (kif->kf_sock_domain) {
354	case AF_INET:
355	case AF_INET6:
356		if (kif->kf_sock_protocol == IPPROTO_TCP) {
357			if (so->so_pcb != NULL) {
358				inpcb = (struct inpcb *)(so->so_pcb);
359				kif->kf_un.kf_sock.kf_sock_inpcb =
360				    (uintptr_t)inpcb->inp_ppcb;
361			}
362		}
363		break;
364	case AF_UNIX:
365		if (so->so_pcb != NULL) {
366			unpcb = (struct unpcb *)(so->so_pcb);
367			if (unpcb->unp_conn) {
368				kif->kf_un.kf_sock.kf_sock_unpconn =
369				    (uintptr_t)unpcb->unp_conn;
370				kif->kf_un.kf_sock.kf_sock_rcv_sb_state =
371				    so->so_rcv.sb_state;
372				kif->kf_un.kf_sock.kf_sock_snd_sb_state =
373				    so->so_snd.sb_state;
374			}
375		}
376		break;
377	}
378	error = so->so_proto->pr_usrreqs->pru_sockaddr(so, &sa);
379	if (error == 0 && sa->sa_len <= sizeof(kif->kf_sa_local)) {
380		bcopy(sa, &kif->kf_sa_local, sa->sa_len);
381		free(sa, M_SONAME);
382	}
383	error = so->so_proto->pr_usrreqs->pru_peeraddr(so, &sa);
384	if (error == 0 && sa->sa_len <= sizeof(kif->kf_sa_peer)) {
385		bcopy(sa, &kif->kf_sa_peer, sa->sa_len);
386		free(sa, M_SONAME);
387	}
388	strncpy(kif->kf_path, so->so_proto->pr_domain->dom_name,
389	    sizeof(kif->kf_path));
390	return (0);
391}
392
393/*
394 * Use the 'backend3' field in AIO jobs to store the amount of data
395 * completed by the AIO job so far.
396 */
397#define	aio_done	backend3
398
399static STAILQ_HEAD(, task) soaio_jobs;
400static struct mtx soaio_jobs_lock;
401static struct task soaio_kproc_task;
402static int soaio_starting, soaio_idle, soaio_queued;
403static struct unrhdr *soaio_kproc_unr;
404
405static int soaio_max_procs = MAX_AIO_PROCS;
406SYSCTL_INT(_kern_ipc_aio, OID_AUTO, max_procs, CTLFLAG_RW, &soaio_max_procs, 0,
407    "Maximum number of kernel processes to use for async socket IO");
408
409static int soaio_num_procs;
410SYSCTL_INT(_kern_ipc_aio, OID_AUTO, num_procs, CTLFLAG_RD, &soaio_num_procs, 0,
411    "Number of active kernel processes for async socket IO");
412
413static int soaio_target_procs = TARGET_AIO_PROCS;
414SYSCTL_INT(_kern_ipc_aio, OID_AUTO, target_procs, CTLFLAG_RD,
415    &soaio_target_procs, 0,
416    "Preferred number of ready kernel processes for async socket IO");
417
418static int soaio_lifetime;
419SYSCTL_INT(_kern_ipc_aio, OID_AUTO, lifetime, CTLFLAG_RW, &soaio_lifetime, 0,
420    "Maximum lifetime for idle aiod");
421
422static void
423soaio_kproc_loop(void *arg)
424{
425	struct proc *p;
426	struct vmspace *myvm;
427	struct task *task;
428	int error, id, pending;
429
430	id = (intptr_t)arg;
431
432	/*
433	 * Grab an extra reference on the daemon's vmspace so that it
434	 * doesn't get freed by jobs that switch to a different
435	 * vmspace.
436	 */
437	p = curproc;
438	myvm = vmspace_acquire_ref(p);
439
440	mtx_lock(&soaio_jobs_lock);
441	MPASS(soaio_starting > 0);
442	soaio_starting--;
443	for (;;) {
444		while (!STAILQ_EMPTY(&soaio_jobs)) {
445			task = STAILQ_FIRST(&soaio_jobs);
446			STAILQ_REMOVE_HEAD(&soaio_jobs, ta_link);
447			soaio_queued--;
448			pending = task->ta_pending;
449			task->ta_pending = 0;
450			mtx_unlock(&soaio_jobs_lock);
451
452			task->ta_func(task->ta_context, pending);
453
454			mtx_lock(&soaio_jobs_lock);
455		}
456		MPASS(soaio_queued == 0);
457
458		if (p->p_vmspace != myvm) {
459			mtx_unlock(&soaio_jobs_lock);
460			vmspace_switch_aio(myvm);
461			mtx_lock(&soaio_jobs_lock);
462			continue;
463		}
464
465		soaio_idle++;
466		error = mtx_sleep(&soaio_idle, &soaio_jobs_lock, 0, "-",
467		    soaio_lifetime);
468		soaio_idle--;
469		if (error == EWOULDBLOCK && STAILQ_EMPTY(&soaio_jobs) &&
470		    soaio_num_procs > soaio_target_procs)
471			break;
472	}
473	soaio_num_procs--;
474	mtx_unlock(&soaio_jobs_lock);
475	free_unr(soaio_kproc_unr, id);
476	kproc_exit(0);
477}
478
479static void
480soaio_kproc_create(void *context, int pending)
481{
482	struct proc *p;
483	int error, id;
484
485	mtx_lock(&soaio_jobs_lock);
486	for (;;) {
487		if (soaio_num_procs < soaio_target_procs) {
488			/* Must create */
489		} else if (soaio_num_procs >= soaio_max_procs) {
490			/*
491			 * Hit the limit on kernel processes, don't
492			 * create another one.
493			 */
494			break;
495		} else if (soaio_queued <= soaio_idle + soaio_starting) {
496			/*
497			 * No more AIO jobs waiting for a process to be
498			 * created, so stop.
499			 */
500			break;
501		}
502		soaio_starting++;
503		mtx_unlock(&soaio_jobs_lock);
504
505		id = alloc_unr(soaio_kproc_unr);
506		error = kproc_create(soaio_kproc_loop, (void *)(intptr_t)id,
507		    &p, 0, 0, "soaiod%d", id);
508		if (error != 0) {
509			free_unr(soaio_kproc_unr, id);
510			mtx_lock(&soaio_jobs_lock);
511			soaio_starting--;
512			break;
513		}
514
515		mtx_lock(&soaio_jobs_lock);
516		soaio_num_procs++;
517	}
518	mtx_unlock(&soaio_jobs_lock);
519}
520
521void
522soaio_enqueue(struct task *task)
523{
524
525	mtx_lock(&soaio_jobs_lock);
526	MPASS(task->ta_pending == 0);
527	task->ta_pending++;
528	STAILQ_INSERT_TAIL(&soaio_jobs, task, ta_link);
529	soaio_queued++;
530	if (soaio_queued <= soaio_idle)
531		wakeup_one(&soaio_idle);
532	else if (soaio_num_procs < soaio_max_procs)
533		taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
534	mtx_unlock(&soaio_jobs_lock);
535}
536
537static void
538soaio_init(void)
539{
540
541	soaio_lifetime = AIOD_LIFETIME_DEFAULT;
542	STAILQ_INIT(&soaio_jobs);
543	mtx_init(&soaio_jobs_lock, "soaio jobs", NULL, MTX_DEF);
544	soaio_kproc_unr = new_unrhdr(1, INT_MAX, NULL);
545	TASK_INIT(&soaio_kproc_task, 0, soaio_kproc_create, NULL);
546	if (soaio_target_procs > 0)
547		taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
548}
549SYSINIT(soaio, SI_SUB_VFS, SI_ORDER_ANY, soaio_init, NULL);
550
551static __inline int
552soaio_ready(struct socket *so, struct sockbuf *sb)
553{
554	return (sb == &so->so_rcv ? soreadable(so) : sowriteable(so));
555}
556
557static void
558soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job)
559{
560	struct ucred *td_savedcred;
561	struct thread *td;
562	struct file *fp;
563	struct uio uio;
564	struct iovec iov;
565	size_t cnt, done;
566	long ru_before;
567	int error, flags;
568
569	SOCKBUF_UNLOCK(sb);
570	aio_switch_vmspace(job);
571	td = curthread;
572	fp = job->fd_file;
573retry:
574	td_savedcred = td->td_ucred;
575	td->td_ucred = job->cred;
576
577	done = job->aio_done;
578	cnt = job->uaiocb.aio_nbytes - done;
579	iov.iov_base = (void *)((uintptr_t)job->uaiocb.aio_buf + done);
580	iov.iov_len = cnt;
581	uio.uio_iov = &iov;
582	uio.uio_iovcnt = 1;
583	uio.uio_offset = 0;
584	uio.uio_resid = cnt;
585	uio.uio_segflg = UIO_USERSPACE;
586	uio.uio_td = td;
587	flags = MSG_NBIO;
588
589	/*
590	 * For resource usage accounting, only count a completed request
591	 * as a single message to avoid counting multiple calls to
592	 * sosend/soreceive on a blocking socket.
593	 */
594
595	if (sb == &so->so_rcv) {
596		uio.uio_rw = UIO_READ;
597		ru_before = td->td_ru.ru_msgrcv;
598#ifdef MAC
599		error = mac_socket_check_receive(fp->f_cred, so);
600		if (error == 0)
601
602#endif
603			error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
604		if (td->td_ru.ru_msgrcv != ru_before)
605			job->msgrcv = 1;
606	} else {
607		if (!TAILQ_EMPTY(&sb->sb_aiojobq))
608			flags |= MSG_MORETOCOME;
609		uio.uio_rw = UIO_WRITE;
610		ru_before = td->td_ru.ru_msgsnd;
611#ifdef MAC
612		error = mac_socket_check_send(fp->f_cred, so);
613		if (error == 0)
614#endif
615			error = sosend(so, NULL, &uio, NULL, NULL, flags, td);
616		if (td->td_ru.ru_msgsnd != ru_before)
617			job->msgsnd = 1;
618		if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) {
619			PROC_LOCK(job->userproc);
620			kern_psignal(job->userproc, SIGPIPE);
621			PROC_UNLOCK(job->userproc);
622		}
623	}
624
625	done += cnt - uio.uio_resid;
626	job->aio_done = done;
627	td->td_ucred = td_savedcred;
628
629	if (error == EWOULDBLOCK) {
630		/*
631		 * The request was either partially completed or not
632		 * completed at all due to racing with a read() or
633		 * write() on the socket.  If the socket is
634		 * non-blocking, return with any partial completion.
635		 * If the socket is blocking or if no progress has
636		 * been made, requeue this request at the head of the
637		 * queue to try again when the socket is ready.
638		 */
639		MPASS(done != job->uaiocb.aio_nbytes);
640		SOCKBUF_LOCK(sb);
641		if (done == 0 || !(so->so_state & SS_NBIO)) {
642			empty_results++;
643			if (soaio_ready(so, sb)) {
644				empty_retries++;
645				SOCKBUF_UNLOCK(sb);
646				goto retry;
647			}
648
649			if (!aio_set_cancel_function(job, soo_aio_cancel)) {
650				SOCKBUF_UNLOCK(sb);
651				if (done != 0)
652					aio_complete(job, done, 0);
653				else
654					aio_cancel(job);
655				SOCKBUF_LOCK(sb);
656			} else {
657				TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list);
658			}
659			return;
660		}
661		SOCKBUF_UNLOCK(sb);
662	}
663	if (done != 0 && (error == ERESTART || error == EINTR ||
664	    error == EWOULDBLOCK))
665		error = 0;
666	if (error)
667		aio_complete(job, -1, error);
668	else
669		aio_complete(job, done, 0);
670	SOCKBUF_LOCK(sb);
671}
672
673static void
674soaio_process_sb(struct socket *so, struct sockbuf *sb)
675{
676	struct kaiocb *job;
677
678	CURVNET_SET(so->so_vnet);
679	SOCKBUF_LOCK(sb);
680	while (!TAILQ_EMPTY(&sb->sb_aiojobq) && soaio_ready(so, sb)) {
681		job = TAILQ_FIRST(&sb->sb_aiojobq);
682		TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
683		if (!aio_clear_cancel_function(job))
684			continue;
685
686		soaio_process_job(so, sb, job);
687	}
688
689	/*
690	 * If there are still pending requests, the socket must not be
691	 * ready so set SB_AIO to request a wakeup when the socket
692	 * becomes ready.
693	 */
694	if (!TAILQ_EMPTY(&sb->sb_aiojobq))
695		sb->sb_flags |= SB_AIO;
696	sb->sb_flags &= ~SB_AIO_RUNNING;
697	SOCKBUF_UNLOCK(sb);
698
699	ACCEPT_LOCK();
700	SOCK_LOCK(so);
701	sorele(so);
702	CURVNET_RESTORE();
703}
704
705void
706soaio_rcv(void *context, int pending)
707{
708	struct socket *so;
709
710	so = context;
711	soaio_process_sb(so, &so->so_rcv);
712}
713
714void
715soaio_snd(void *context, int pending)
716{
717	struct socket *so;
718
719	so = context;
720	soaio_process_sb(so, &so->so_snd);
721}
722
723void
724sowakeup_aio(struct socket *so, struct sockbuf *sb)
725{
726
727	SOCKBUF_LOCK_ASSERT(sb);
728	sb->sb_flags &= ~SB_AIO;
729	if (sb->sb_flags & SB_AIO_RUNNING)
730		return;
731	sb->sb_flags |= SB_AIO_RUNNING;
732	if (sb == &so->so_snd)
733		SOCK_LOCK(so);
734	soref(so);
735	if (sb == &so->so_snd)
736		SOCK_UNLOCK(so);
737	soaio_enqueue(&sb->sb_aiotask);
738}
739
740static void
741soo_aio_cancel(struct kaiocb *job)
742{
743	struct socket *so;
744	struct sockbuf *sb;
745	long done;
746	int opcode;
747
748	so = job->fd_file->f_data;
749	opcode = job->uaiocb.aio_lio_opcode;
750	if (opcode == LIO_READ)
751		sb = &so->so_rcv;
752	else {
753		MPASS(opcode == LIO_WRITE);
754		sb = &so->so_snd;
755	}
756
757	SOCKBUF_LOCK(sb);
758	if (!aio_cancel_cleared(job))
759		TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
760	if (TAILQ_EMPTY(&sb->sb_aiojobq))
761		sb->sb_flags &= ~SB_AIO;
762	SOCKBUF_UNLOCK(sb);
763
764	done = job->aio_done;
765	if (done != 0)
766		aio_complete(job, done, 0);
767	else
768		aio_cancel(job);
769}
770
771static int
772soo_aio_queue(struct file *fp, struct kaiocb *job)
773{
774	struct socket *so;
775	struct sockbuf *sb;
776	int error;
777
778	so = fp->f_data;
779	error = (*so->so_proto->pr_usrreqs->pru_aio_queue)(so, job);
780	if (error == 0)
781		return (0);
782
783	switch (job->uaiocb.aio_lio_opcode) {
784	case LIO_READ:
785		sb = &so->so_rcv;
786		break;
787	case LIO_WRITE:
788		sb = &so->so_snd;
789		break;
790	default:
791		return (EINVAL);
792	}
793
794	SOCKBUF_LOCK(sb);
795	if (!aio_set_cancel_function(job, soo_aio_cancel))
796		panic("new job was cancelled");
797	TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list);
798	if (!(sb->sb_flags & SB_AIO_RUNNING)) {
799		if (soaio_ready(so, sb))
800			sowakeup_aio(so, sb);
801		else
802			sb->sb_flags |= SB_AIO;
803	}
804	SOCKBUF_UNLOCK(sb);
805	return (0);
806}
807