vfs_aio.c revision 154706
1/*-
2 * Copyright (c) 1997 John S. Dyson.  All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 *    notice, this list of conditions and the following disclaimer.
9 * 2. John S. Dyson's name may not be used to endorse or promote products
10 *    derived from this software without specific prior written permission.
11 *
12 * DISCLAIMER:  This code isn't warranted to do anything useful.  Anything
13 * bad that happens because of using this software isn't the responsibility
14 * of the author.  This software is distributed AS-IS.
15 */
16
17/*
18 * This file contains support for the POSIX 1003.1B AIO/LIO facility.
19 */
20
21#include <sys/cdefs.h>
22__FBSDID("$FreeBSD: head/sys/kern/vfs_aio.c 154706 2006-01-23 10:27:15Z davidxu $");
23
24#include <sys/param.h>
25#include <sys/systm.h>
26#include <sys/malloc.h>
27#include <sys/bio.h>
28#include <sys/buf.h>
29#include <sys/eventhandler.h>
30#include <sys/sysproto.h>
31#include <sys/filedesc.h>
32#include <sys/kernel.h>
33#include <sys/module.h>
34#include <sys/kthread.h>
35#include <sys/fcntl.h>
36#include <sys/file.h>
37#include <sys/limits.h>
38#include <sys/lock.h>
39#include <sys/mutex.h>
40#include <sys/unistd.h>
41#include <sys/proc.h>
42#include <sys/resourcevar.h>
43#include <sys/signalvar.h>
44#include <sys/protosw.h>
45#include <sys/sema.h>
46#include <sys/socket.h>
47#include <sys/socketvar.h>
48#include <sys/syscall.h>
49#include <sys/sysent.h>
50#include <sys/sysctl.h>
51#include <sys/sx.h>
52#include <sys/taskqueue.h>
53#include <sys/vnode.h>
54#include <sys/conf.h>
55#include <sys/event.h>
56
57#include <machine/atomic.h>
58
59#include <posix4/posix4.h>
60#include <vm/vm.h>
61#include <vm/vm_extern.h>
62#include <vm/pmap.h>
63#include <vm/vm_map.h>
64#include <vm/uma.h>
65#include <sys/aio.h>
66
67#include "opt_vfs_aio.h"
68
69/*
70 * Counter for allocating reference ids to new jobs.  Wrapped to 1 on
71 * overflow.
72 */
73static	long jobrefid;
74
75#define JOBST_NULL		0x0
76#define JOBST_JOBQSOCK		0x1
77#define JOBST_JOBQGLOBAL	0x2
78#define JOBST_JOBRUNNING	0x3
79#define JOBST_JOBFINISHED	0x4
80#define	JOBST_JOBQBUF		0x5
81
82#ifndef MAX_AIO_PER_PROC
83#define MAX_AIO_PER_PROC	32
84#endif
85
86#ifndef MAX_AIO_QUEUE_PER_PROC
87#define MAX_AIO_QUEUE_PER_PROC	256 /* Bigger than AIO_LISTIO_MAX */
88#endif
89
90#ifndef MAX_AIO_PROCS
91#define MAX_AIO_PROCS		32
92#endif
93
94#ifndef MAX_AIO_QUEUE
95#define	MAX_AIO_QUEUE		1024 /* Bigger than AIO_LISTIO_MAX */
96#endif
97
98#ifndef TARGET_AIO_PROCS
99#define TARGET_AIO_PROCS	4
100#endif
101
102#ifndef MAX_BUF_AIO
103#define MAX_BUF_AIO		16
104#endif
105
106#ifndef AIOD_TIMEOUT_DEFAULT
107#define	AIOD_TIMEOUT_DEFAULT	(10 * hz)
108#endif
109
110#ifndef AIOD_LIFETIME_DEFAULT
111#define AIOD_LIFETIME_DEFAULT	(30 * hz)
112#endif
113
114static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0, "Async IO management");
115
116static int max_aio_procs = MAX_AIO_PROCS;
117SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs,
118	CTLFLAG_RW, &max_aio_procs, 0,
119	"Maximum number of kernel threads to use for handling async IO ");
120
121static int num_aio_procs = 0;
122SYSCTL_INT(_vfs_aio, OID_AUTO, num_aio_procs,
123	CTLFLAG_RD, &num_aio_procs, 0,
124	"Number of presently active kernel threads for async IO");
125
126/*
127 * The code will adjust the actual number of AIO processes towards this
128 * number when it gets a chance.
129 */
130static int target_aio_procs = TARGET_AIO_PROCS;
131SYSCTL_INT(_vfs_aio, OID_AUTO, target_aio_procs, CTLFLAG_RW, &target_aio_procs,
132	0, "Preferred number of ready kernel threads for async IO");
133
134static int max_queue_count = MAX_AIO_QUEUE;
135SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue, CTLFLAG_RW, &max_queue_count, 0,
136    "Maximum number of aio requests to queue, globally");
137
138static int num_queue_count = 0;
139SYSCTL_INT(_vfs_aio, OID_AUTO, num_queue_count, CTLFLAG_RD, &num_queue_count, 0,
140    "Number of queued aio requests");
141
142static int num_buf_aio = 0;
143SYSCTL_INT(_vfs_aio, OID_AUTO, num_buf_aio, CTLFLAG_RD, &num_buf_aio, 0,
144    "Number of aio requests presently handled by the buf subsystem");
145
146/* Number of async I/O thread in the process of being started */
147/* XXX This should be local to aio_aqueue() */
148static int num_aio_resv_start = 0;
149
150static int aiod_timeout;
151SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_timeout, CTLFLAG_RW, &aiod_timeout, 0,
152    "Timeout value for synchronous aio operations");
153
154static int aiod_lifetime;
155SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0,
156    "Maximum lifetime for idle aiod");
157
158static int unloadable = 0;
159SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0,
160    "Allow unload of aio (not recommended)");
161
162
163static int max_aio_per_proc = MAX_AIO_PER_PROC;
164SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc,
165    0, "Maximum active aio requests per process (stored in the process)");
166
167static int max_aio_queue_per_proc = MAX_AIO_QUEUE_PER_PROC;
168SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue_per_proc, CTLFLAG_RW,
169    &max_aio_queue_per_proc, 0,
170    "Maximum queued aio requests per process (stored in the process)");
171
172static int max_buf_aio = MAX_BUF_AIO;
173SYSCTL_INT(_vfs_aio, OID_AUTO, max_buf_aio, CTLFLAG_RW, &max_buf_aio, 0,
174    "Maximum buf aio requests per process (stored in the process)");
175
176typedef struct oaiocb {
177	int	aio_fildes;		/* File descriptor */
178	off_t	aio_offset;		/* File offset for I/O */
179	volatile void *aio_buf;         /* I/O buffer in process space */
180	size_t	aio_nbytes;		/* Number of bytes for I/O */
181	struct	osigevent aio_sigevent;	/* Signal to deliver */
182	int	aio_lio_opcode;		/* LIO opcode */
183	int	aio_reqprio;		/* Request priority -- ignored */
184	struct	__aiocb_private	_aiocb_private;
185} oaiocb_t;
186
187struct aiocblist {
188	TAILQ_ENTRY(aiocblist) list;	/* List of jobs */
189	TAILQ_ENTRY(aiocblist) plist;	/* List of jobs for proc */
190	TAILQ_ENTRY(aiocblist) allist;
191	int	jobflags;
192	int	jobstate;
193	int	inputcharge;
194	int	outputcharge;
195	struct	buf *bp;		/* Buffer pointer */
196	struct	proc *userproc;		/* User process */
197	struct  ucred *cred;		/* Active credential when created */
198	struct	file *fd_file;		/* Pointer to file structure */
199	struct	aioliojob *lio;		/* Optional lio job */
200	struct	aiocb *uuaiocb;		/* Pointer in userspace of aiocb */
201	struct	knlist klist;		/* list of knotes */
202	struct	aiocb uaiocb;		/* Kernel I/O control block */
203	ksiginfo_t ksi;			/* Realtime signal info */
204	struct task	biotask;
205};
206
207/* jobflags */
208#define AIOCBLIST_RUNDOWN	0x04
209#define AIOCBLIST_DONE		0x10
210#define AIOCBLIST_BUFDONE	0x20
211
212/*
213 * AIO process info
214 */
215#define AIOP_FREE	0x1			/* proc on free queue */
216
217struct aiothreadlist {
218	int aiothreadflags;			/* AIO proc flags */
219	TAILQ_ENTRY(aiothreadlist) list;	/* List of processes */
220	struct thread *aiothread;		/* The AIO thread */
221};
222
223/*
224 * data-structure for lio signal management
225 */
226struct aioliojob {
227	int	lioj_flags;
228	int	lioj_count;
229	int	lioj_finished_count;
230	struct	sigevent lioj_signal;	/* signal on all I/O done */
231	TAILQ_ENTRY(aioliojob) lioj_list;
232	struct  knlist klist;		/* list of knotes */
233	ksiginfo_t lioj_ksi;	/* Realtime signal info */
234};
235
236#define	LIOJ_SIGNAL		0x1	/* signal on all done (lio) */
237#define	LIOJ_SIGNAL_POSTED	0x2	/* signal has been posted */
238#define LIOJ_KEVENT_POSTED	0x4	/* kevent triggered */
239
240/*
241 * per process aio data structure
242 */
243struct kaioinfo {
244	int	kaio_flags;		/* per process kaio flags */
245	int	kaio_maxactive_count;	/* maximum number of AIOs */
246	int	kaio_active_count;	/* number of currently used AIOs */
247	int	kaio_qallowed_count;	/* maxiumu size of AIO queue */
248	int	kaio_count;		/* size of AIO queue */
249	int	kaio_ballowed_count;	/* maximum number of buffers */
250	int	kaio_buffer_count;	/* number of physio buffers */
251	TAILQ_HEAD(,aiocblist) kaio_all;	/* all AIOs in the process */
252	TAILQ_HEAD(,aiocblist) kaio_done;	/* done queue for process */
253	TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* list of lio jobs */
254	TAILQ_HEAD(,aiocblist) kaio_jobqueue;	/* job queue for process */
255	TAILQ_HEAD(,aiocblist) kaio_bufqueue;	/* buffer job queue for process */
256	TAILQ_HEAD(,aiocblist) kaio_sockqueue;	/* queue for aios waiting on sockets */
257};
258
259#define KAIO_RUNDOWN	0x1	/* process is being run down */
260#define KAIO_WAKEUP	0x2	/* wakeup process when there is a significant event */
261
262static TAILQ_HEAD(,aiothreadlist) aio_freeproc;		/* Idle daemons */
263static struct sema aio_newproc_sem;
264static struct mtx aio_job_mtx;
265static struct mtx aio_sock_mtx;
266static TAILQ_HEAD(,aiocblist) aio_jobs;			/* Async job list */
267static struct unrhdr *aiod_unr;
268
269static void	aio_init_aioinfo(struct proc *p);
270static void	aio_onceonly(void);
271static int	aio_free_entry(struct aiocblist *aiocbe);
272static void	aio_process(struct aiocblist *aiocbe);
273static int	aio_newproc(int *);
274static int	aio_aqueue(struct thread *td, struct aiocb *job,
275			struct aioliojob *lio, int type, int osigev);
276static void	aio_physwakeup(struct buf *bp);
277static void	aio_proc_rundown(void *arg, struct proc *p);
278static int	aio_qphysio(struct proc *p, struct aiocblist *iocb);
279static void	biohelper(void *, int);
280static void	aio_daemon(void *param);
281static void	aio_swake_cb(struct socket *, struct sockbuf *);
282static int	aio_unload(void);
283static int	filt_aioattach(struct knote *kn);
284static void	filt_aiodetach(struct knote *kn);
285static int	filt_aio(struct knote *kn, long hint);
286static int	filt_lioattach(struct knote *kn);
287static void	filt_liodetach(struct knote *kn);
288static int	filt_lio(struct knote *kn, long hint);
289#define DONE_BUF 1
290#define DONE_QUEUE 2
291static void	aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type);
292static int	do_lio_listio(struct thread *td, struct lio_listio_args *uap,
293			int oldsigev);
294
295/*
296 * Zones for:
297 * 	kaio	Per process async io info
298 *	aiop	async io thread data
299 *	aiocb	async io jobs
300 *	aiol	list io job pointer - internal to aio_suspend XXX
301 *	aiolio	list io jobs
302 */
303static uma_zone_t kaio_zone, aiop_zone, aiocb_zone, aiol_zone, aiolio_zone;
304
305/* kqueue filters for aio */
306static struct filterops aio_filtops =
307	{ 0, filt_aioattach, filt_aiodetach, filt_aio };
308static struct filterops lio_filtops =
309	{ 0, filt_lioattach, filt_liodetach, filt_lio };
310
311static eventhandler_tag exit_tag, exec_tag;
312
313TASKQUEUE_DEFINE_THREAD(aiod_bio);
314
315/*
316 * Main operations function for use as a kernel module.
317 */
318static int
319aio_modload(struct module *module, int cmd, void *arg)
320{
321	int error = 0;
322
323	switch (cmd) {
324	case MOD_LOAD:
325		aio_onceonly();
326		break;
327	case MOD_UNLOAD:
328		error = aio_unload();
329		break;
330	case MOD_SHUTDOWN:
331		break;
332	default:
333		error = EINVAL;
334		break;
335	}
336	return (error);
337}
338
339static moduledata_t aio_mod = {
340	"aio",
341	&aio_modload,
342	NULL
343};
344
345SYSCALL_MODULE_HELPER(aio_return);
346SYSCALL_MODULE_HELPER(aio_suspend);
347SYSCALL_MODULE_HELPER(aio_cancel);
348SYSCALL_MODULE_HELPER(aio_error);
349SYSCALL_MODULE_HELPER(aio_read);
350SYSCALL_MODULE_HELPER(aio_write);
351SYSCALL_MODULE_HELPER(aio_waitcomplete);
352SYSCALL_MODULE_HELPER(lio_listio);
353SYSCALL_MODULE_HELPER(oaio_read);
354SYSCALL_MODULE_HELPER(oaio_write);
355SYSCALL_MODULE_HELPER(olio_listio);
356
357DECLARE_MODULE(aio, aio_mod,
358	SI_SUB_VFS, SI_ORDER_ANY);
359MODULE_VERSION(aio, 1);
360
361/*
362 * Startup initialization
363 */
364static void
365aio_onceonly(void)
366{
367
368	/* XXX: should probably just use so->callback */
369	aio_swake = &aio_swake_cb;
370	exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL,
371	    EVENTHANDLER_PRI_ANY);
372	exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown, NULL,
373	    EVENTHANDLER_PRI_ANY);
374	kqueue_add_filteropts(EVFILT_AIO, &aio_filtops);
375	kqueue_add_filteropts(EVFILT_LIO, &lio_filtops);
376	TAILQ_INIT(&aio_freeproc);
377	sema_init(&aio_newproc_sem, 0, "aio_new_proc");
378	mtx_init(&aio_job_mtx, "aio_job", NULL, MTX_DEF);
379	mtx_init(&aio_sock_mtx, "aio_sock", NULL, MTX_DEF);
380	TAILQ_INIT(&aio_jobs);
381	aiod_unr = new_unrhdr(1, INT_MAX, NULL);
382	kaio_zone = uma_zcreate("AIO", sizeof(struct kaioinfo), NULL, NULL,
383	    NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
384	aiop_zone = uma_zcreate("AIOP", sizeof(struct aiothreadlist), NULL,
385	    NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
386	aiocb_zone = uma_zcreate("AIOCB", sizeof(struct aiocblist), NULL, NULL,
387	    NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
388	aiol_zone = uma_zcreate("AIOL", AIO_LISTIO_MAX*sizeof(intptr_t) , NULL,
389	    NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
390	aiolio_zone = uma_zcreate("AIOLIO", sizeof(struct aioliojob), NULL,
391	    NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
392	aiod_timeout = AIOD_TIMEOUT_DEFAULT;
393	aiod_lifetime = AIOD_LIFETIME_DEFAULT;
394	jobrefid = 1;
395	async_io_version = _POSIX_VERSION;
396	p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, AIO_LISTIO_MAX);
397	p31b_setcfg(CTL_P1003_1B_AIO_MAX, MAX_AIO_QUEUE);
398	p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, 0);
399}
400
401/*
402 * Callback for unload of AIO when used as a module.
403 */
404static int
405aio_unload(void)
406{
407	int error;
408
409	/*
410	 * XXX: no unloads by default, it's too dangerous.
411	 * perhaps we could do it if locked out callers and then
412	 * did an aio_proc_rundown() on each process.
413	 *
414	 * jhb: aio_proc_rundown() needs to run on curproc though,
415	 * so I don't think that would fly.
416	 */
417	if (!unloadable)
418		return (EOPNOTSUPP);
419
420	error = kqueue_del_filteropts(EVFILT_AIO);
421	if (error)
422		return error;
423	async_io_version = 0;
424	aio_swake = NULL;
425	taskqueue_free(taskqueue_aiod_bio);
426	delete_unrhdr(aiod_unr);
427	EVENTHANDLER_DEREGISTER(process_exit, exit_tag);
428	EVENTHANDLER_DEREGISTER(process_exec, exec_tag);
429	mtx_destroy(&aio_job_mtx);
430	mtx_destroy(&aio_sock_mtx);
431	sema_destroy(&aio_newproc_sem);
432	p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1);
433	p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1);
434	p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1);
435	return (0);
436}
437
438/*
439 * Init the per-process aioinfo structure.  The aioinfo limits are set
440 * per-process for user limit (resource) management.
441 */
442static void
443aio_init_aioinfo(struct proc *p)
444{
445	struct kaioinfo *ki;
446
447	ki = uma_zalloc(kaio_zone, M_WAITOK);
448	ki->kaio_flags = 0;
449	ki->kaio_maxactive_count = max_aio_per_proc;
450	ki->kaio_active_count = 0;
451	ki->kaio_qallowed_count = max_aio_queue_per_proc;
452	ki->kaio_count = 0;
453	ki->kaio_ballowed_count = max_buf_aio;
454	ki->kaio_buffer_count = 0;
455	TAILQ_INIT(&ki->kaio_all);
456	TAILQ_INIT(&ki->kaio_done);
457	TAILQ_INIT(&ki->kaio_jobqueue);
458	TAILQ_INIT(&ki->kaio_bufqueue);
459	TAILQ_INIT(&ki->kaio_liojoblist);
460	TAILQ_INIT(&ki->kaio_sockqueue);
461	PROC_LOCK(p);
462	if (p->p_aioinfo == NULL) {
463		p->p_aioinfo = ki;
464		PROC_UNLOCK(p);
465	} else {
466		PROC_UNLOCK(p);
467		uma_zfree(kaio_zone, ki);
468	}
469
470	while (num_aio_procs < target_aio_procs)
471		aio_newproc(NULL);
472}
473
474static int
475aio_sendsig(struct proc *p, struct sigevent *sigev, ksiginfo_t *ksi)
476{
477	PROC_LOCK_ASSERT(p, MA_OWNED);
478	if (!KSI_ONQ(ksi)) {
479		ksi->ksi_code = SI_ASYNCIO;
480		ksi->ksi_flags |= KSI_EXT | KSI_INS;
481		return (psignal_event(p, sigev, ksi));
482	}
483	return (0);
484}
485
486/*
487 * Free a job entry.  Wait for completion if it is currently active, but don't
488 * delay forever.  If we delay, we return a flag that says that we have to
489 * restart the queue scan.
490 */
491static int
492aio_free_entry(struct aiocblist *aiocbe)
493{
494	struct kaioinfo *ki;
495	struct aioliojob *lj;
496	struct proc *p;
497
498	p = aiocbe->userproc;
499
500	PROC_LOCK_ASSERT(p, MA_OWNED);
501	MPASS(curproc == p);
502	MPASS(aiocbe->jobstate == JOBST_JOBFINISHED);
503
504	ki = p->p_aioinfo;
505	MPASS(ki != NULL);
506
507	atomic_subtract_int(&num_queue_count, 1);
508
509	ki->kaio_count--;
510	MPASS(ki->kaio_count >= 0);
511
512	lj = aiocbe->lio;
513	if (lj) {
514		lj->lioj_count--;
515		lj->lioj_finished_count--;
516
517		if (lj->lioj_count == 0) {
518			TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
519			/* lio is going away, we need to destroy any knotes */
520			knlist_delete(&lj->klist, curthread, 1);
521			sigqueue_take(&lj->lioj_ksi);
522			uma_zfree(aiolio_zone, lj);
523		}
524	}
525
526	TAILQ_REMOVE(&ki->kaio_done, aiocbe, plist);
527	TAILQ_REMOVE(&ki->kaio_all, aiocbe, allist);
528
529	/* aiocbe is going away, we need to destroy any knotes */
530	knlist_delete(&aiocbe->klist, curthread, 1);
531	sigqueue_take(&aiocbe->ksi);
532
533 	MPASS(aiocbe->bp == NULL);
534	aiocbe->jobstate = JOBST_NULL;
535
536	/* Wake up anyone who has interest to do cleanup work. */
537	if (ki->kaio_flags & (KAIO_WAKEUP | KAIO_RUNDOWN)) {
538		ki->kaio_flags &= ~KAIO_WAKEUP;
539		wakeup(&p->p_aioinfo);
540	}
541	PROC_UNLOCK(p);
542
543	/*
544	 * The thread argument here is used to find the owning process
545	 * and is also passed to fo_close() which may pass it to various
546	 * places such as devsw close() routines.  Because of that, we
547	 * need a thread pointer from the process owning the job that is
548	 * persistent and won't disappear out from under us or move to
549	 * another process.
550	 *
551	 * Currently, all the callers of this function call it to remove
552	 * an aiocblist from the current process' job list either via a
553	 * syscall or due to the current process calling exit() or
554	 * execve().  Thus, we know that p == curproc.  We also know that
555	 * curthread can't exit since we are curthread.
556	 *
557	 * Therefore, we use curthread as the thread to pass to
558	 * knlist_delete().  This does mean that it is possible for the
559	 * thread pointer at close time to differ from the thread pointer
560	 * at open time, but this is already true of file descriptors in
561	 * a multithreaded process.
562	 */
563	fdrop(aiocbe->fd_file, curthread);
564	crfree(aiocbe->cred);
565	uma_zfree(aiocb_zone, aiocbe);
566	PROC_LOCK(p);
567
568	return (0);
569}
570
571/*
572 * Rundown the jobs for a given process.
573 */
574static void
575aio_proc_rundown(void *arg, struct proc *p)
576{
577	struct kaioinfo *ki;
578	struct aioliojob *lj;
579	struct aiocblist *cbe, *cbn;
580	struct file *fp;
581	struct socket *so;
582
583	KASSERT(curthread->td_proc == p,
584	    ("%s: called on non-curproc", __func__));
585	ki = p->p_aioinfo;
586	if (ki == NULL)
587		return;
588
589	PROC_LOCK(p);
590
591restart:
592	ki->kaio_flags |= KAIO_RUNDOWN;
593
594	/*
595	 * Try to cancel all pending requests. This code simulates
596	 * aio_cancel on all pending I/O requests.
597	 */
598	while ((cbe = TAILQ_FIRST(&ki->kaio_sockqueue))) {
599		fp = cbe->fd_file;
600		so = fp->f_data;
601		mtx_lock(&aio_sock_mtx);
602		TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
603		mtx_unlock(&aio_sock_mtx);
604		TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
605		TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, cbe, plist);
606		cbe->jobstate = JOBST_JOBQGLOBAL;
607	}
608
609	TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
610		mtx_lock(&aio_job_mtx);
611		if (cbe->jobstate == JOBST_JOBQGLOBAL) {
612			TAILQ_REMOVE(&aio_jobs, cbe, list);
613			mtx_unlock(&aio_job_mtx);
614			cbe->jobstate = JOBST_JOBFINISHED;
615			cbe->uaiocb._aiocb_private.status = -1;
616			cbe->uaiocb._aiocb_private.error = ECANCELED;
617			TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
618			aio_bio_done_notify(p, cbe, DONE_QUEUE);
619		} else {
620			mtx_unlock(&aio_job_mtx);
621		}
622	}
623
624	if (TAILQ_FIRST(&ki->kaio_sockqueue))
625		goto restart;
626
627	/* Wait for all running I/O to be finished */
628	if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
629	    TAILQ_FIRST(&ki->kaio_jobqueue)) {
630		ki->kaio_flags |= KAIO_WAKEUP;
631		msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO, "aioprn", hz);
632		goto restart;
633	}
634
635	/* Free all completed I/O requests. */
636	while ((cbe = TAILQ_FIRST(&ki->kaio_done)) != NULL)
637		aio_free_entry(cbe);
638
639	while ((lj = TAILQ_FIRST(&ki->kaio_liojoblist)) != NULL) {
640		if (lj->lioj_count == 0) {
641			TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
642			knlist_delete(&lj->klist, curthread, 1);
643			sigqueue_take(&lj->lioj_ksi);
644			uma_zfree(aiolio_zone, lj);
645		} else {
646			panic("LIO job not cleaned up: C:%d, FC:%d\n",
647			    lj->lioj_count, lj->lioj_finished_count);
648		}
649	}
650
651	uma_zfree(kaio_zone, ki);
652	p->p_aioinfo = NULL;
653	PROC_UNLOCK(p);
654}
655
656/*
657 * Select a job to run (called by an AIO daemon).
658 */
659static struct aiocblist *
660aio_selectjob(struct aiothreadlist *aiop)
661{
662	struct aiocblist *aiocbe;
663	struct kaioinfo *ki;
664	struct proc *userp;
665
666	mtx_assert(&aio_job_mtx, MA_OWNED);
667	TAILQ_FOREACH(aiocbe, &aio_jobs, list) {
668		userp = aiocbe->userproc;
669		ki = userp->p_aioinfo;
670
671		if (ki->kaio_active_count < ki->kaio_maxactive_count) {
672			TAILQ_REMOVE(&aio_jobs, aiocbe, list);
673			/* Account for currently active jobs. */
674			ki->kaio_active_count++;
675			aiocbe->jobstate = JOBST_JOBRUNNING;
676			break;
677		}
678	}
679	return (aiocbe);
680}
681
682/*
683 * The AIO processing activity.  This is the code that does the I/O request for
684 * the non-physio version of the operations.  The normal vn operations are used,
685 * and this code should work in all instances for every type of file, including
686 * pipes, sockets, fifos, and regular files.
687 *
688 * XXX I don't think these code work well with pipes, sockets and fifo, the
689 * problem is the aiod threads can be blocked if there is not data or no
690 * buffer space, and file was not opened with O_NONBLOCK, all aiod threads
691 * will be blocked if there is couple of such processes. We need a FOF_OFFSET
692 * like flag to override f_flag to tell low level system to do non-blocking
693 * I/O, we can not muck O_NONBLOCK because there is full of race between
694 * userland and aiod threads, although there is a trigger mechanism for socket,
695 * but it also does not work well if userland is misbehaviored.
696 */
697static void
698aio_process(struct aiocblist *aiocbe)
699{
700	struct ucred *td_savedcred;
701	struct thread *td;
702	struct proc *mycp;
703	struct aiocb *cb;
704	struct file *fp;
705	struct socket *so;
706	struct uio auio;
707	struct iovec aiov;
708	int cnt;
709	int error;
710	int oublock_st, oublock_end;
711	int inblock_st, inblock_end;
712
713	td = curthread;
714	td_savedcred = td->td_ucred;
715	td->td_ucred = aiocbe->cred;
716	mycp = td->td_proc;
717	cb = &aiocbe->uaiocb;
718	fp = aiocbe->fd_file;
719
720	aiov.iov_base = (void *)(uintptr_t)cb->aio_buf;
721	aiov.iov_len = cb->aio_nbytes;
722
723	auio.uio_iov = &aiov;
724	auio.uio_iovcnt = 1;
725	auio.uio_offset = cb->aio_offset;
726	auio.uio_resid = cb->aio_nbytes;
727	cnt = cb->aio_nbytes;
728	auio.uio_segflg = UIO_USERSPACE;
729	auio.uio_td = td;
730
731	inblock_st = mycp->p_stats->p_ru.ru_inblock;
732	oublock_st = mycp->p_stats->p_ru.ru_oublock;
733	/*
734	 * aio_aqueue() acquires a reference to the file that is
735	 * released in aio_free_entry().
736	 */
737	if (cb->aio_lio_opcode == LIO_READ) {
738		auio.uio_rw = UIO_READ;
739		error = fo_read(fp, &auio, fp->f_cred, FOF_OFFSET, td);
740	} else {
741		auio.uio_rw = UIO_WRITE;
742		error = fo_write(fp, &auio, fp->f_cred, FOF_OFFSET, td);
743	}
744	inblock_end = mycp->p_stats->p_ru.ru_inblock;
745	oublock_end = mycp->p_stats->p_ru.ru_oublock;
746
747	aiocbe->inputcharge = inblock_end - inblock_st;
748	aiocbe->outputcharge = oublock_end - oublock_st;
749
750	if ((error) && (auio.uio_resid != cnt)) {
751		if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
752			error = 0;
753		if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) {
754			int sigpipe = 1;
755			if (fp->f_type == DTYPE_SOCKET) {
756				so = fp->f_data;
757				if (so->so_options & SO_NOSIGPIPE)
758					sigpipe = 0;
759			}
760			if (sigpipe) {
761				PROC_LOCK(aiocbe->userproc);
762				psignal(aiocbe->userproc, SIGPIPE);
763				PROC_UNLOCK(aiocbe->userproc);
764			}
765		}
766	}
767
768	cnt -= auio.uio_resid;
769	cb->_aiocb_private.error = error;
770	cb->_aiocb_private.status = cnt;
771	td->td_ucred = td_savedcred;
772}
773
774static void
775aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type)
776{
777	struct aioliojob *lj;
778	struct kaioinfo *ki;
779	int lj_done;
780
781	PROC_LOCK_ASSERT(userp, MA_OWNED);
782	ki = userp->p_aioinfo;
783	lj = aiocbe->lio;
784	lj_done = 0;
785	if (lj) {
786		lj->lioj_finished_count++;
787		if (lj->lioj_count == lj->lioj_finished_count)
788			lj_done = 1;
789	}
790	if (type == DONE_QUEUE) {
791		aiocbe->jobflags |= AIOCBLIST_DONE;
792	} else {
793		aiocbe->jobflags |= AIOCBLIST_BUFDONE;
794		ki->kaio_buffer_count--;
795	}
796	TAILQ_INSERT_TAIL(&ki->kaio_done, aiocbe, plist);
797	aiocbe->jobstate = JOBST_JOBFINISHED;
798	if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
799	    aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID)
800		aio_sendsig(userp, &aiocbe->uaiocb.aio_sigevent, &aiocbe->ksi);
801
802	KNOTE_LOCKED(&aiocbe->klist, 1);
803
804	if (lj_done) {
805		if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
806			lj->lioj_flags |= LIOJ_KEVENT_POSTED;
807			KNOTE_LOCKED(&lj->klist, 1);
808		}
809		if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED))
810		    == LIOJ_SIGNAL
811		    && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL ||
812		        lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) {
813			aio_sendsig(userp, &lj->lioj_signal, &lj->lioj_ksi);
814			lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
815		}
816	}
817	if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) {
818		ki->kaio_flags &= ~KAIO_WAKEUP;
819		wakeup(&userp->p_aioinfo);
820	}
821}
822
823/*
824 * The AIO daemon, most of the actual work is done in aio_process,
825 * but the setup (and address space mgmt) is done in this routine.
826 */
827static void
828aio_daemon(void *_id)
829{
830	struct aiocblist *aiocbe;
831	struct aiothreadlist *aiop;
832	struct kaioinfo *ki;
833	struct proc *curcp, *mycp, *userp;
834	struct vmspace *myvm, *tmpvm;
835	struct thread *td = curthread;
836	struct pgrp *newpgrp;
837	struct session *newsess;
838	int id = (intptr_t)_id;
839
840	/*
841	 * Local copies of curproc (cp) and vmspace (myvm)
842	 */
843	mycp = td->td_proc;
844	myvm = mycp->p_vmspace;
845
846	KASSERT(mycp->p_textvp == NULL, ("kthread has a textvp"));
847
848	/*
849	 * Allocate and ready the aio control info.  There is one aiop structure
850	 * per daemon.
851	 */
852	aiop = uma_zalloc(aiop_zone, M_WAITOK);
853	aiop->aiothread = td;
854	aiop->aiothreadflags |= AIOP_FREE;
855
856	/*
857	 * Place thread (lightweight process) onto the AIO free thread list.
858	 */
859	mtx_lock(&aio_job_mtx);
860	TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
861	mtx_unlock(&aio_job_mtx);
862
863	/*
864	 * Get rid of our current filedescriptors.  AIOD's don't need any
865	 * filedescriptors, except as temporarily inherited from the client.
866	 */
867	fdfree(td);
868
869	/* The daemon resides in its own pgrp. */
870	MALLOC(newpgrp, struct pgrp *, sizeof(struct pgrp), M_PGRP,
871		M_WAITOK | M_ZERO);
872	MALLOC(newsess, struct session *, sizeof(struct session), M_SESSION,
873		M_WAITOK | M_ZERO);
874
875	sx_xlock(&proctree_lock);
876	enterpgrp(mycp, mycp->p_pid, newpgrp, newsess);
877	sx_xunlock(&proctree_lock);
878
879	/*
880	 * Wakeup parent process.  (Parent sleeps to keep from blasting away
881	 * and creating too many daemons.)
882	 */
883	sema_post(&aio_newproc_sem);
884
885	mtx_lock(&aio_job_mtx);
886	for (;;) {
887		/*
888		 * curcp is the current daemon process context.
889		 * userp is the current user process context.
890		 */
891		curcp = mycp;
892
893		/*
894		 * Take daemon off of free queue
895		 */
896		if (aiop->aiothreadflags & AIOP_FREE) {
897			TAILQ_REMOVE(&aio_freeproc, aiop, list);
898			aiop->aiothreadflags &= ~AIOP_FREE;
899		}
900
901		/*
902		 * Check for jobs.
903		 */
904		while ((aiocbe = aio_selectjob(aiop)) != NULL) {
905			mtx_unlock(&aio_job_mtx);
906			userp = aiocbe->userproc;
907
908			/*
909			 * Connect to process address space for user program.
910			 */
911			if (userp != curcp) {
912				/*
913				 * Save the current address space that we are
914				 * connected to.
915				 */
916				tmpvm = mycp->p_vmspace;
917
918				/*
919				 * Point to the new user address space, and
920				 * refer to it.
921				 */
922				mycp->p_vmspace = userp->p_vmspace;
923				atomic_add_int(&mycp->p_vmspace->vm_refcnt, 1);
924
925				/* Activate the new mapping. */
926				pmap_activate(FIRST_THREAD_IN_PROC(mycp));
927
928				/*
929				 * If the old address space wasn't the daemons
930				 * own address space, then we need to remove the
931				 * daemon's reference from the other process
932				 * that it was acting on behalf of.
933				 */
934				if (tmpvm != myvm) {
935					vmspace_free(tmpvm);
936				}
937				curcp = userp;
938			}
939
940			ki = userp->p_aioinfo;
941
942			/* Do the I/O function. */
943			aio_process(aiocbe);
944
945			mtx_lock(&aio_job_mtx);
946			/* Decrement the active job count. */
947			ki->kaio_active_count--;
948			mtx_unlock(&aio_job_mtx);
949
950			PROC_LOCK(userp);
951			TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
952			aio_bio_done_notify(userp, aiocbe, DONE_QUEUE);
953			if (aiocbe->jobflags & AIOCBLIST_RUNDOWN) {
954				wakeup(aiocbe);
955				aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN;
956			}
957			PROC_UNLOCK(userp);
958
959			mtx_lock(&aio_job_mtx);
960		}
961
962		/*
963		 * Disconnect from user address space.
964		 */
965		if (curcp != mycp) {
966
967			mtx_unlock(&aio_job_mtx);
968
969			/* Get the user address space to disconnect from. */
970			tmpvm = mycp->p_vmspace;
971
972			/* Get original address space for daemon. */
973			mycp->p_vmspace = myvm;
974
975			/* Activate the daemon's address space. */
976			pmap_activate(FIRST_THREAD_IN_PROC(mycp));
977#ifdef DIAGNOSTIC
978			if (tmpvm == myvm) {
979				printf("AIOD: vmspace problem -- %d\n",
980				    mycp->p_pid);
981			}
982#endif
983			/* Remove our vmspace reference. */
984			vmspace_free(tmpvm);
985
986			curcp = mycp;
987
988			mtx_lock(&aio_job_mtx);
989			/*
990			 * We have to restart to avoid race, we only sleep if
991			 * no job can be selected, that should be
992			 * curcp == mycp.
993			 */
994			continue;
995		}
996
997		mtx_assert(&aio_job_mtx, MA_OWNED);
998
999		TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
1000		aiop->aiothreadflags |= AIOP_FREE;
1001
1002		/*
1003		 * If daemon is inactive for a long time, allow it to exit,
1004		 * thereby freeing resources.
1005		 */
1006		if (msleep(aiop->aiothread, &aio_job_mtx, PRIBIO, "aiordy",
1007		    aiod_lifetime)) {
1008			if (TAILQ_EMPTY(&aio_jobs)) {
1009				if ((aiop->aiothreadflags & AIOP_FREE) &&
1010				    (num_aio_procs > target_aio_procs)) {
1011					TAILQ_REMOVE(&aio_freeproc, aiop, list);
1012					num_aio_procs--;
1013					mtx_unlock(&aio_job_mtx);
1014					uma_zfree(aiop_zone, aiop);
1015					free_unr(aiod_unr, id);
1016#ifdef DIAGNOSTIC
1017					if (mycp->p_vmspace->vm_refcnt <= 1) {
1018						printf("AIOD: bad vm refcnt for"
1019						    " exiting daemon: %d\n",
1020						    mycp->p_vmspace->vm_refcnt);
1021					}
1022#endif
1023					kthread_exit(0);
1024				}
1025			}
1026		}
1027	}
1028	mtx_unlock(&aio_job_mtx);
1029	panic("shouldn't be here\n");
1030}
1031
1032/*
1033 * Create a new AIO daemon. This is mostly a kernel-thread fork routine. The
1034 * AIO daemon modifies its environment itself.
1035 */
1036static int
1037aio_newproc(int *start)
1038{
1039	int error;
1040	struct proc *p;
1041	int id;
1042
1043	id = alloc_unr(aiod_unr);
1044	error = kthread_create(aio_daemon, (void *)(intptr_t)id, &p,
1045		RFNOWAIT, 0, "aiod%d", id);
1046	if (error == 0) {
1047		/*
1048		 * Wait until daemon is started.
1049		 */
1050		sema_wait(&aio_newproc_sem);
1051		mtx_lock(&aio_job_mtx);
1052		num_aio_procs++;
1053		if (start != NULL)
1054			*start--;
1055		mtx_unlock(&aio_job_mtx);
1056	} else {
1057		free_unr(aiod_unr, id);
1058	}
1059	return (error);
1060}
1061
1062/*
1063 * Try the high-performance, low-overhead physio method for eligible
1064 * VCHR devices.  This method doesn't use an aio helper thread, and
1065 * thus has very low overhead.
1066 *
1067 * Assumes that the caller, aio_aqueue(), has incremented the file
1068 * structure's reference count, preventing its deallocation for the
1069 * duration of this call.
1070 */
1071static int
1072aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
1073{
1074	struct aiocb *cb;
1075	struct file *fp;
1076	struct buf *bp;
1077	struct vnode *vp;
1078	struct kaioinfo *ki;
1079	struct aioliojob *lj;
1080	int error;
1081
1082	cb = &aiocbe->uaiocb;
1083	fp = aiocbe->fd_file;
1084
1085	if (fp->f_type != DTYPE_VNODE)
1086		return (-1);
1087
1088	vp = fp->f_vnode;
1089
1090	/*
1091	 * If its not a disk, we don't want to return a positive error.
1092	 * It causes the aio code to not fall through to try the thread
1093	 * way when you're talking to a regular file.
1094	 */
1095	if (!vn_isdisk(vp, &error)) {
1096		if (error == ENOTBLK)
1097			return (-1);
1098		else
1099			return (error);
1100	}
1101
1102 	if (cb->aio_nbytes % vp->v_bufobj.bo_bsize)
1103		return (-1);
1104
1105	if (cb->aio_nbytes > vp->v_rdev->si_iosize_max)
1106		return (-1);
1107
1108	if (cb->aio_nbytes >
1109	    MAXPHYS - (((vm_offset_t) cb->aio_buf) & PAGE_MASK))
1110		return (-1);
1111
1112	ki = p->p_aioinfo;
1113	if (ki->kaio_buffer_count >= ki->kaio_ballowed_count)
1114		return (-1);
1115
1116	/* Create and build a buffer header for a transfer. */
1117	bp = (struct buf *)getpbuf(NULL);
1118	BUF_KERNPROC(bp);
1119
1120	PROC_LOCK(p);
1121	ki->kaio_count++;
1122	ki->kaio_buffer_count++;
1123	lj = aiocbe->lio;
1124	if (lj)
1125		lj->lioj_count++;
1126	PROC_UNLOCK(p);
1127
1128	/*
1129	 * Get a copy of the kva from the physical buffer.
1130	 */
1131	error = 0;
1132
1133	bp->b_bcount = cb->aio_nbytes;
1134	bp->b_bufsize = cb->aio_nbytes;
1135	bp->b_iodone = aio_physwakeup;
1136	bp->b_saveaddr = bp->b_data;
1137	bp->b_data = (void *)(uintptr_t)cb->aio_buf;
1138	bp->b_offset = cb->aio_offset;
1139	bp->b_iooffset = cb->aio_offset;
1140	bp->b_blkno = btodb(cb->aio_offset);
1141	bp->b_iocmd = cb->aio_lio_opcode == LIO_WRITE ? BIO_WRITE : BIO_READ;
1142
1143	/*
1144	 * Bring buffer into kernel space.
1145	 */
1146	if (vmapbuf(bp) < 0) {
1147		error = EFAULT;
1148		goto doerror;
1149	}
1150
1151	PROC_LOCK(p);
1152	aiocbe->bp = bp;
1153	bp->b_caller1 = (void *)aiocbe;
1154	TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, aiocbe, plist);
1155	TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
1156	aiocbe->jobstate = JOBST_JOBQBUF;
1157	cb->_aiocb_private.status = cb->aio_nbytes;
1158	PROC_UNLOCK(p);
1159
1160	atomic_add_int(&num_queue_count, 1);
1161	atomic_add_int(&num_buf_aio, 1);
1162
1163	bp->b_error = 0;
1164
1165	TASK_INIT(&aiocbe->biotask, 0, biohelper, aiocbe);
1166
1167	/* Perform transfer. */
1168	dev_strategy(vp->v_rdev, bp);
1169	return (0);
1170
1171doerror:
1172	PROC_LOCK(p);
1173	ki->kaio_count--;
1174	ki->kaio_buffer_count--;
1175	if (lj)
1176		lj->lioj_count--;
1177	aiocbe->bp = NULL;
1178	PROC_UNLOCK(p);
1179	relpbuf(bp, NULL);
1180	return (error);
1181}
1182
1183/*
1184 * Wake up aio requests that may be serviceable now.
1185 */
1186static void
1187aio_swake_cb(struct socket *so, struct sockbuf *sb)
1188{
1189	struct aiocblist *cb, *cbn;
1190	struct proc *p;
1191	struct kaioinfo *ki = NULL;
1192	int opcode, wakecount = 0;
1193	struct aiothreadlist *aiop;
1194
1195	if (sb == &so->so_snd) {
1196		opcode = LIO_WRITE;
1197		SOCKBUF_LOCK(&so->so_snd);
1198		so->so_snd.sb_flags &= ~SB_AIO;
1199		SOCKBUF_UNLOCK(&so->so_snd);
1200	} else {
1201		opcode = LIO_READ;
1202		SOCKBUF_LOCK(&so->so_rcv);
1203		so->so_rcv.sb_flags &= ~SB_AIO;
1204		SOCKBUF_UNLOCK(&so->so_rcv);
1205	}
1206
1207	mtx_lock(&aio_sock_mtx);
1208	TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) {
1209		if (opcode == cb->uaiocb.aio_lio_opcode) {
1210			if (cb->jobstate != JOBST_JOBQSOCK)
1211				panic("invalid queue value");
1212			p = cb->userproc;
1213			ki = p->p_aioinfo;
1214			TAILQ_REMOVE(&so->so_aiojobq, cb, list);
1215			PROC_LOCK(p);
1216			TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist);
1217			/*
1218			 * XXX check AIO_RUNDOWN, and don't put on
1219			 * jobqueue if it was set.
1220			 */
1221			TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist);
1222			cb->jobstate = JOBST_JOBQGLOBAL;
1223			mtx_lock(&aio_job_mtx);
1224			TAILQ_INSERT_TAIL(&aio_jobs, cb, list);
1225			mtx_unlock(&aio_job_mtx);
1226			PROC_UNLOCK(p);
1227			wakecount++;
1228		}
1229	}
1230	mtx_unlock(&aio_sock_mtx);
1231
1232	while (wakecount--) {
1233		mtx_lock(&aio_job_mtx);
1234		if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) {
1235			TAILQ_REMOVE(&aio_freeproc, aiop, list);
1236			aiop->aiothreadflags &= ~AIOP_FREE;
1237			wakeup(aiop->aiothread);
1238		}
1239		mtx_unlock(&aio_job_mtx);
1240	}
1241}
1242
1243/*
1244 * Queue a new AIO request.  Choosing either the threaded or direct physio VCHR
1245 * technique is done in this code.
1246 */
1247static int
1248aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj,
1249	int type, int oldsigev)
1250{
1251	struct proc *p = td->td_proc;
1252	struct file *fp;
1253	struct socket *so;
1254	struct aiocblist *aiocbe;
1255	struct aiothreadlist *aiop;
1256	struct kaioinfo *ki;
1257	struct kevent kev;
1258	struct kqueue *kq;
1259	struct file *kq_fp;
1260	struct sockbuf *sb;
1261	int opcode;
1262	int error;
1263	int fd;
1264	int jid;
1265
1266	if (p->p_aioinfo == NULL)
1267		aio_init_aioinfo(p);
1268
1269	ki = p->p_aioinfo;
1270
1271	suword(&job->_aiocb_private.status, -1);
1272	suword(&job->_aiocb_private.error, 0);
1273	suword(&job->_aiocb_private.kernelinfo, -1);
1274
1275	if (num_queue_count >= max_queue_count ||
1276	    ki->kaio_count >= ki->kaio_qallowed_count) {
1277		suword(&job->_aiocb_private.error, EAGAIN);
1278		return (EAGAIN);
1279	}
1280
1281	aiocbe = uma_zalloc(aiocb_zone, M_WAITOK | M_ZERO);
1282	aiocbe->inputcharge = 0;
1283	aiocbe->outputcharge = 0;
1284	knlist_init(&aiocbe->klist, &p->p_mtx, NULL, NULL, NULL);
1285
1286	if (oldsigev) {
1287		bzero(&aiocbe->uaiocb, sizeof(struct aiocb));
1288		error = copyin(job, &aiocbe->uaiocb, sizeof(struct oaiocb));
1289		bcopy(&aiocbe->uaiocb.__spare__, &aiocbe->uaiocb.aio_sigevent,
1290			sizeof(struct osigevent));
1291	} else {
1292		error = copyin(job, &aiocbe->uaiocb, sizeof(struct aiocb));
1293	}
1294	if (error) {
1295		suword(&job->_aiocb_private.error, error);
1296		uma_zfree(aiocb_zone, aiocbe);
1297		return (error);
1298	}
1299
1300	if (aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT &&
1301	    aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_SIGNAL &&
1302	    aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_THREAD_ID &&
1303	    aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_NONE) {
1304		suword(&job->_aiocb_private.error, EINVAL);
1305		uma_zfree(aiocb_zone, aiocbe);
1306		return (EINVAL);
1307	}
1308
1309	if ((aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
1310	     aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID) &&
1311		!_SIG_VALID(aiocbe->uaiocb.aio_sigevent.sigev_signo)) {
1312		uma_zfree(aiocb_zone, aiocbe);
1313		return (EINVAL);
1314	}
1315
1316	ksiginfo_init(&aiocbe->ksi);
1317
1318	/* Save userspace address of the job info. */
1319	aiocbe->uuaiocb = job;
1320
1321	/* Get the opcode. */
1322	if (type != LIO_NOP)
1323		aiocbe->uaiocb.aio_lio_opcode = type;
1324	opcode = aiocbe->uaiocb.aio_lio_opcode;
1325
1326	/* Fetch the file object for the specified file descriptor. */
1327	fd = aiocbe->uaiocb.aio_fildes;
1328	switch (opcode) {
1329	case LIO_WRITE:
1330		error = fget_write(td, fd, &fp);
1331		break;
1332	case LIO_READ:
1333		error = fget_read(td, fd, &fp);
1334		break;
1335	default:
1336		error = fget(td, fd, &fp);
1337	}
1338	if (error) {
1339		uma_zfree(aiocb_zone, aiocbe);
1340		suword(&job->_aiocb_private.error, EBADF);
1341		return (error);
1342	}
1343	aiocbe->fd_file = fp;
1344
1345	if (aiocbe->uaiocb.aio_offset == -1LL) {
1346		error = EINVAL;
1347		goto aqueue_fail;
1348	}
1349
1350	mtx_lock(&aio_job_mtx);
1351	jid = jobrefid;
1352	if (jobrefid == LONG_MAX)
1353		jobrefid = 1;
1354	else
1355		jobrefid++;
1356	mtx_unlock(&aio_job_mtx);
1357
1358	error = suword(&job->_aiocb_private.kernelinfo, jid);
1359	if (error) {
1360		error = EINVAL;
1361		goto aqueue_fail;
1362	}
1363	aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)(intptr_t)jid;
1364
1365	if (opcode == LIO_NOP) {
1366		fdrop(fp, td);
1367		uma_zfree(aiocb_zone, aiocbe);
1368		return (0);
1369	}
1370	if ((opcode != LIO_READ) && (opcode != LIO_WRITE)) {
1371		error = EINVAL;
1372		goto aqueue_fail;
1373	}
1374
1375	if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
1376		kev.ident = aiocbe->uaiocb.aio_sigevent.sigev_notify_kqueue;
1377	} else
1378		goto no_kqueue;
1379	error = fget(td, (u_int)kev.ident, &kq_fp);
1380	if (error)
1381		goto aqueue_fail;
1382	if (kq_fp->f_type != DTYPE_KQUEUE) {
1383		fdrop(kq_fp, td);
1384		error = EBADF;
1385		goto aqueue_fail;
1386	}
1387	kq = kq_fp->f_data;
1388	kev.ident = (uintptr_t)aiocbe->uuaiocb;
1389	kev.filter = EVFILT_AIO;
1390	kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1;
1391	kev.data = (intptr_t)aiocbe;
1392	kev.udata = aiocbe->uaiocb.aio_sigevent.sigev_value.sival_ptr;
1393	error = kqueue_register(kq, &kev, td, 1);
1394	fdrop(kq_fp, td);
1395aqueue_fail:
1396	if (error) {
1397		fdrop(fp, td);
1398		uma_zfree(aiocb_zone, aiocbe);
1399		suword(&job->_aiocb_private.error, error);
1400		goto done;
1401	}
1402no_kqueue:
1403
1404	suword(&job->_aiocb_private.error, EINPROGRESS);
1405	aiocbe->uaiocb._aiocb_private.error = EINPROGRESS;
1406	aiocbe->userproc = p;
1407	aiocbe->cred = crhold(td->td_ucred);
1408	aiocbe->jobflags = 0;
1409	aiocbe->lio = lj;
1410
1411	if (fp->f_type == DTYPE_SOCKET) {
1412		/*
1413		 * Alternate queueing for socket ops: Reach down into the
1414		 * descriptor to get the socket data.  Then check to see if the
1415		 * socket is ready to be read or written (based on the requested
1416		 * operation).
1417		 *
1418		 * If it is not ready for io, then queue the aiocbe on the
1419		 * socket, and set the flags so we get a call when sbnotify()
1420		 * happens.
1421		 *
1422		 * Note if opcode is neither LIO_WRITE nor LIO_READ we lock
1423		 * and unlock the snd sockbuf for no reason.
1424		 */
1425		so = fp->f_data;
1426		sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd;
1427		SOCKBUF_LOCK(sb);
1428		if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
1429		    LIO_WRITE) && (!sowriteable(so)))) {
1430			mtx_lock(&aio_sock_mtx);
1431			TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
1432			mtx_unlock(&aio_sock_mtx);
1433
1434			sb->sb_flags |= SB_AIO;
1435			PROC_LOCK(p);
1436			TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist);
1437			TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
1438			aiocbe->jobstate = JOBST_JOBQSOCK;
1439			ki->kaio_count++;
1440			if (lj)
1441				lj->lioj_count++;
1442			PROC_UNLOCK(p);
1443			SOCKBUF_UNLOCK(sb);
1444			atomic_add_int(&num_queue_count, 1);
1445			error = 0;
1446			goto done;
1447		}
1448		SOCKBUF_UNLOCK(sb);
1449	}
1450
1451	if ((error = aio_qphysio(p, aiocbe)) == 0)
1452		goto done;
1453#if 0
1454	if (error > 0) {
1455		aiocbe->uaiocb._aiocb_private.error = error;
1456		suword(&job->_aiocb_private.error, error);
1457		goto done;
1458	}
1459#endif
1460	/* No buffer for daemon I/O. */
1461	aiocbe->bp = NULL;
1462
1463	PROC_LOCK(p);
1464	ki->kaio_count++;
1465	if (lj)
1466		lj->lioj_count++;
1467	TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
1468	TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist);
1469
1470	mtx_lock(&aio_job_mtx);
1471	TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list);
1472	aiocbe->jobstate = JOBST_JOBQGLOBAL;
1473	PROC_UNLOCK(p);
1474
1475	atomic_add_int(&num_queue_count, 1);
1476
1477	/*
1478	 * If we don't have a free AIO process, and we are below our quota, then
1479	 * start one.  Otherwise, depend on the subsequent I/O completions to
1480	 * pick-up this job.  If we don't sucessfully create the new process
1481	 * (thread) due to resource issues, we return an error for now (EAGAIN),
1482	 * which is likely not the correct thing to do.
1483	 */
1484retryproc:
1485	error = 0;
1486	if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) {
1487		TAILQ_REMOVE(&aio_freeproc, aiop, list);
1488		aiop->aiothreadflags &= ~AIOP_FREE;
1489		wakeup(aiop->aiothread);
1490	} else if (((num_aio_resv_start + num_aio_procs) < max_aio_procs) &&
1491	    ((ki->kaio_active_count + num_aio_resv_start) <
1492	    ki->kaio_maxactive_count)) {
1493		num_aio_resv_start++;
1494		mtx_unlock(&aio_job_mtx);
1495		error = aio_newproc(&num_aio_resv_start);
1496		mtx_lock(&aio_job_mtx);
1497		if (error) {
1498			num_aio_resv_start--;
1499			goto retryproc;
1500		}
1501	}
1502	mtx_unlock(&aio_job_mtx);
1503
1504done:
1505	return (error);
1506}
1507
1508/*
1509 * Support the aio_return system call, as a side-effect, kernel resources are
1510 * released.
1511 */
1512int
1513aio_return(struct thread *td, struct aio_return_args *uap)
1514{
1515	struct proc *p = td->td_proc;
1516	struct aiocblist *cb;
1517	struct aiocb *uaiocb;
1518	struct kaioinfo *ki;
1519	int status, error;
1520
1521	ki = p->p_aioinfo;
1522	if (ki == NULL)
1523		return (EINVAL);
1524	uaiocb = uap->aiocbp;
1525	PROC_LOCK(p);
1526	TAILQ_FOREACH(cb, &ki->kaio_done, plist) {
1527		if (cb->uuaiocb == uaiocb)
1528			break;
1529	}
1530	if (cb != NULL) {
1531		MPASS(cb->jobstate == JOBST_JOBFINISHED);
1532		status = cb->uaiocb._aiocb_private.status;
1533		error = cb->uaiocb._aiocb_private.error;
1534		td->td_retval[0] = status;
1535		if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
1536			p->p_stats->p_ru.ru_oublock +=
1537			    cb->outputcharge;
1538			cb->outputcharge = 0;
1539		} else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
1540			p->p_stats->p_ru.ru_inblock += cb->inputcharge;
1541			cb->inputcharge = 0;
1542		}
1543		aio_free_entry(cb);
1544		suword(&uaiocb->_aiocb_private.error, error);
1545		suword(&uaiocb->_aiocb_private.status, status);
1546		error = 0;
1547	} else
1548		error = EINVAL;
1549	PROC_UNLOCK(p);
1550	return (error);
1551}
1552
1553/*
1554 * Allow a process to wakeup when any of the I/O requests are completed.
1555 */
1556int
1557aio_suspend(struct thread *td, struct aio_suspend_args *uap)
1558{
1559	struct proc *p = td->td_proc;
1560	struct timeval atv;
1561	struct timespec ts;
1562	struct aiocb *const *cbptr, *cbp;
1563	struct kaioinfo *ki;
1564	struct aiocblist *cb, *cbfirst;
1565	struct aiocb **ujoblist;
1566	int njoblist;
1567	int error;
1568	int timo;
1569	int i;
1570
1571	if (uap->nent < 0 || uap->nent > AIO_LISTIO_MAX)
1572		return (EINVAL);
1573
1574	timo = 0;
1575	if (uap->timeout) {
1576		/* Get timespec struct. */
1577		if ((error = copyin(uap->timeout, &ts, sizeof(ts))) != 0)
1578			return (error);
1579
1580		if (ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000)
1581			return (EINVAL);
1582
1583		TIMESPEC_TO_TIMEVAL(&atv, &ts);
1584		if (itimerfix(&atv))
1585			return (EINVAL);
1586		timo = tvtohz(&atv);
1587	}
1588
1589	ki = p->p_aioinfo;
1590	if (ki == NULL)
1591		return (EAGAIN);
1592
1593	njoblist = 0;
1594	ujoblist = uma_zalloc(aiol_zone, M_WAITOK);
1595	cbptr = uap->aiocbp;
1596
1597	for (i = 0; i < uap->nent; i++) {
1598		cbp = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
1599		if (cbp == 0)
1600			continue;
1601		ujoblist[njoblist] = cbp;
1602		njoblist++;
1603	}
1604
1605	if (njoblist == 0) {
1606		uma_zfree(aiol_zone, ujoblist);
1607		return (0);
1608	}
1609
1610	PROC_LOCK(p);
1611	for (;;) {
1612		cbfirst = NULL;
1613		error = 0;
1614		TAILQ_FOREACH(cb, &ki->kaio_all, allist) {
1615			for (i = 0; i < njoblist; i++) {
1616				if (cb->uuaiocb == ujoblist[i]) {
1617					if (cbfirst == NULL)
1618						cbfirst = cb;
1619					if (cb->jobstate == JOBST_JOBFINISHED)
1620						goto RETURN;
1621				}
1622			}
1623		}
1624		/* All tasks were finished. */
1625		if (cbfirst == NULL)
1626			break;
1627
1628		ki->kaio_flags |= KAIO_WAKEUP;
1629		error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH,
1630		    "aiospn", timo);
1631		if (error == ERESTART)
1632			error = EINTR;
1633		if (error)
1634			break;
1635	}
1636RETURN:
1637	PROC_UNLOCK(p);
1638	uma_zfree(aiol_zone, ujoblist);
1639	return (error);
1640}
1641
1642/*
1643 * aio_cancel cancels any non-physio aio operations not currently in
1644 * progress.
1645 */
1646int
1647aio_cancel(struct thread *td, struct aio_cancel_args *uap)
1648{
1649	struct proc *p = td->td_proc;
1650	struct kaioinfo *ki;
1651	struct aiocblist *cbe, *cbn;
1652	struct file *fp;
1653	struct socket *so;
1654	int error;
1655	int cancelled = 0;
1656	int notcancelled = 0;
1657	struct vnode *vp;
1658
1659	/* Lookup file object. */
1660	error = fget(td, uap->fd, &fp);
1661	if (error)
1662		return (error);
1663
1664	ki = p->p_aioinfo;
1665	if (ki == NULL)
1666		goto done;
1667
1668	if (fp->f_type == DTYPE_VNODE) {
1669		vp = fp->f_vnode;
1670		if (vn_isdisk(vp, &error)) {
1671			fdrop(fp, td);
1672			td->td_retval[0] = AIO_NOTCANCELED;
1673			return (0);
1674		}
1675	} else if (fp->f_type == DTYPE_SOCKET) {
1676		so = fp->f_data;
1677		mtx_lock(&aio_sock_mtx);
1678		TAILQ_FOREACH_SAFE(cbe, &so->so_aiojobq, list, cbn) {
1679			if (cbe->userproc == p &&
1680			    (uap->aiocbp == NULL ||
1681			     uap->aiocbp == cbe->uuaiocb)) {
1682				TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
1683				PROC_LOCK(p);
1684				TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
1685				cbe->jobstate = JOBST_JOBRUNNING;
1686				cbe->uaiocb._aiocb_private.status = -1;
1687				cbe->uaiocb._aiocb_private.error = ECANCELED;
1688				aio_bio_done_notify(p, cbe, DONE_QUEUE);
1689				PROC_UNLOCK(p);
1690				cancelled++;
1691				if (uap->aiocbp != NULL)
1692					break;
1693			}
1694		}
1695		mtx_unlock(&aio_sock_mtx);
1696		if (cancelled && uap->aiocbp != NULL) {
1697			fdrop(fp, td);
1698			td->td_retval[0] = AIO_CANCELED;
1699			return (0);
1700		}
1701	}
1702
1703	PROC_LOCK(p);
1704	TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) {
1705		if ((uap->fd == cbe->uaiocb.aio_fildes) &&
1706		    ((uap->aiocbp == NULL) ||
1707		     (uap->aiocbp == cbe->uuaiocb))) {
1708			mtx_lock(&aio_job_mtx);
1709			if (cbe->jobstate == JOBST_JOBQGLOBAL) {
1710				TAILQ_REMOVE(&aio_jobs, cbe, list);
1711				mtx_unlock(&aio_job_mtx);
1712				TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
1713				cbe->uaiocb._aiocb_private.status = -1;
1714				cbe->uaiocb._aiocb_private.error = ECANCELED;
1715				aio_bio_done_notify(p, cbe, DONE_QUEUE);
1716				cancelled++;
1717			} else {
1718				mtx_unlock(&aio_job_mtx);
1719				notcancelled++;
1720			}
1721		}
1722	}
1723	PROC_UNLOCK(p);
1724
1725done:
1726	fdrop(fp, td);
1727	if (notcancelled) {
1728		td->td_retval[0] = AIO_NOTCANCELED;
1729		return (0);
1730	}
1731	if (cancelled) {
1732		td->td_retval[0] = AIO_CANCELED;
1733		return (0);
1734	}
1735	td->td_retval[0] = AIO_ALLDONE;
1736
1737	return (0);
1738}
1739
1740/*
1741 * aio_error is implemented in the kernel level for compatibility purposes only.
1742 * For a user mode async implementation, it would be best to do it in a userland
1743 * subroutine.
1744 */
1745int
1746aio_error(struct thread *td, struct aio_error_args *uap)
1747{
1748	struct proc *p = td->td_proc;
1749	struct aiocblist *cb;
1750	struct kaioinfo *ki;
1751	int status;
1752
1753	ki = p->p_aioinfo;
1754	if (ki == NULL) {
1755		td->td_retval[0] = EINVAL;
1756		return (0);
1757	}
1758
1759	PROC_LOCK(p);
1760	TAILQ_FOREACH(cb, &ki->kaio_all, allist) {
1761		if (cb->uuaiocb == uap->aiocbp) {
1762			if (cb->jobstate == JOBST_JOBFINISHED)
1763				td->td_retval[0] =
1764					cb->uaiocb._aiocb_private.error;
1765			else
1766				td->td_retval[0] = EINPROGRESS;
1767			PROC_UNLOCK(p);
1768			return (0);
1769		}
1770	}
1771	PROC_UNLOCK(p);
1772
1773	/*
1774	 * Hack for failure of aio_aqueue.
1775	 */
1776	status = fuword(&uap->aiocbp->_aiocb_private.status);
1777	if (status == -1) {
1778		td->td_retval[0] = fuword(&uap->aiocbp->_aiocb_private.error);
1779		return (0);
1780	}
1781
1782	td->td_retval[0] = EINVAL;
1783	return (0);
1784}
1785
1786/* syscall - asynchronous read from a file (REALTIME) */
1787int
1788oaio_read(struct thread *td, struct oaio_read_args *uap)
1789{
1790
1791	return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_READ, 1);
1792}
1793
1794int
1795aio_read(struct thread *td, struct aio_read_args *uap)
1796{
1797
1798	return aio_aqueue(td, uap->aiocbp, NULL, LIO_READ, 0);
1799}
1800
1801/* syscall - asynchronous write to a file (REALTIME) */
1802int
1803oaio_write(struct thread *td, struct oaio_write_args *uap)
1804{
1805
1806	return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_WRITE, 1);
1807}
1808
1809int
1810aio_write(struct thread *td, struct aio_write_args *uap)
1811{
1812
1813	return aio_aqueue(td, uap->aiocbp, NULL, LIO_WRITE, 0);
1814}
1815
1816/* syscall - list directed I/O (REALTIME) */
1817int
1818olio_listio(struct thread *td, struct olio_listio_args *uap)
1819{
1820	return do_lio_listio(td, (struct lio_listio_args *)uap, 1);
1821}
1822
1823/* syscall - list directed I/O (REALTIME) */
1824int
1825lio_listio(struct thread *td, struct lio_listio_args *uap)
1826{
1827	return do_lio_listio(td, uap, 0);
1828}
1829
1830static int
1831do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev)
1832{
1833	struct proc *p = td->td_proc;
1834	struct aiocb *iocb, * const *cbptr;
1835	struct kaioinfo *ki;
1836	struct aioliojob *lj;
1837	struct kevent kev;
1838	struct kqueue * kq;
1839	struct file *kq_fp;
1840	int nent;
1841	int error;
1842	int nerror;
1843	int i;
1844
1845	if ((uap->mode != LIO_NOWAIT) && (uap->mode != LIO_WAIT))
1846		return (EINVAL);
1847
1848	nent = uap->nent;
1849	if (nent < 0 || nent > AIO_LISTIO_MAX)
1850		return (EINVAL);
1851
1852	if (p->p_aioinfo == NULL)
1853		aio_init_aioinfo(p);
1854
1855	ki = p->p_aioinfo;
1856
1857	lj = uma_zalloc(aiolio_zone, M_WAITOK);
1858	lj->lioj_flags = 0;
1859	lj->lioj_count = 0;
1860	lj->lioj_finished_count = 0;
1861	knlist_init(&lj->klist, &p->p_mtx, NULL, NULL, NULL);
1862	ksiginfo_init(&lj->lioj_ksi);
1863
1864	/*
1865	 * Setup signal.
1866	 */
1867	if (uap->sig && (uap->mode == LIO_NOWAIT)) {
1868		bzero(&lj->lioj_signal, sizeof(&lj->lioj_signal));
1869		error = copyin(uap->sig, &lj->lioj_signal,
1870				oldsigev ? sizeof(struct osigevent) :
1871					   sizeof(struct sigevent));
1872		if (error) {
1873			uma_zfree(aiolio_zone, lj);
1874			return (error);
1875		}
1876
1877		if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
1878			/* Assume only new style KEVENT */
1879			error = fget(td, lj->lioj_signal.sigev_notify_kqueue,
1880				&kq_fp);
1881			if (error) {
1882				uma_zfree(aiolio_zone, lj);
1883				return (error);
1884			}
1885			if (kq_fp->f_type != DTYPE_KQUEUE) {
1886				fdrop(kq_fp, td);
1887				uma_zfree(aiolio_zone, lj);
1888				return (EBADF);
1889			}
1890			kq = (struct kqueue *)kq_fp->f_data;
1891			kev.filter = EVFILT_LIO;
1892			kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1;
1893			kev.ident = (uintptr_t)lj; /* something unique */
1894			kev.data = (intptr_t)lj;
1895			/* pass user defined sigval data */
1896			kev.udata = lj->lioj_signal.sigev_value.sival_ptr;
1897			error = kqueue_register(kq, &kev, td, 1);
1898			fdrop(kq_fp, td);
1899			if (error) {
1900				uma_zfree(aiolio_zone, lj);
1901				return (error);
1902			}
1903		} else if (lj->lioj_signal.sigev_notify == SIGEV_NONE) {
1904			;
1905		} else if (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL ||
1906			   lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID) {
1907				if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) {
1908					uma_zfree(aiolio_zone, lj);
1909					return EINVAL;
1910				}
1911				lj->lioj_flags |= LIOJ_SIGNAL;
1912		} else {
1913			uma_zfree(aiolio_zone, lj);
1914			return EINVAL;
1915		}
1916	}
1917
1918	PROC_LOCK(p);
1919	TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list);
1920	/*
1921	 * Add extra aiocb count to avoid the lio to be freed
1922	 * by other threads doing aio_waitcomplete or aio_return,
1923	 * and prevent event from being sent until we have queued
1924	 * all tasks.
1925	 */
1926	lj->lioj_count = 1;
1927	PROC_UNLOCK(p);
1928
1929	/*
1930	 * Get pointers to the list of I/O requests.
1931	 */
1932	nerror = 0;
1933	cbptr = uap->acb_list;
1934	for (i = 0; i < uap->nent; i++) {
1935		iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
1936		if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) {
1937			error = aio_aqueue(td, iocb, lj, 0, oldsigev);
1938			if (error != 0)
1939				nerror++;
1940		}
1941	}
1942
1943	error = 0;
1944	PROC_LOCK(p);
1945	if (uap->mode == LIO_WAIT) {
1946		while (lj->lioj_count - 1 != lj->lioj_finished_count) {
1947			ki->kaio_flags |= KAIO_WAKEUP;
1948			error = msleep(&p->p_aioinfo, &p->p_mtx,
1949			    PRIBIO | PCATCH, "aiospn", 0);
1950			if (error == ERESTART)
1951				error = EINTR;
1952			if (error)
1953				break;
1954		}
1955	} else {
1956		if (lj->lioj_count - 1 == lj->lioj_finished_count) {
1957			if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) {
1958				lj->lioj_flags |= LIOJ_KEVENT_POSTED;
1959				KNOTE_LOCKED(&lj->klist, 1);
1960			}
1961			if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED))
1962			    == LIOJ_SIGNAL
1963			    && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL ||
1964			    lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) {
1965				aio_sendsig(p, &lj->lioj_signal,
1966					    &lj->lioj_ksi);
1967				lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
1968			}
1969		}
1970	}
1971	lj->lioj_count--;
1972	if (lj->lioj_count == 0) {
1973		TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
1974		knlist_delete(&lj->klist, curthread, 1);
1975		sigqueue_take(&lj->lioj_ksi);
1976		PROC_UNLOCK(p);
1977		uma_zfree(aiolio_zone, lj);
1978	} else
1979		PROC_UNLOCK(p);
1980
1981	if (nerror)
1982		return (EIO);
1983	return (error);
1984}
1985
1986/*
1987 * Called from interrupt thread for physio, we should return as fast
1988 * as possible, so we schedule a biohelper task.
1989 */
1990static void
1991aio_physwakeup(struct buf *bp)
1992{
1993	struct aiocblist *aiocbe;
1994
1995	aiocbe = (struct aiocblist *)bp->b_caller1;
1996	taskqueue_enqueue(taskqueue_aiod_bio, &aiocbe->biotask);
1997}
1998
1999/*
2000 * Task routine to perform heavy tasks, process wakeup, and signals.
2001 */
2002static void
2003biohelper(void *context, int pending)
2004{
2005	struct aiocblist *aiocbe = context;
2006	struct buf *bp;
2007	struct proc *userp;
2008	int nblks;
2009
2010	bp = aiocbe->bp;
2011	userp = aiocbe->userproc;
2012	PROC_LOCK(userp);
2013	aiocbe->uaiocb._aiocb_private.status -= bp->b_resid;
2014	aiocbe->uaiocb._aiocb_private.error = 0;
2015	if (bp->b_ioflags & BIO_ERROR)
2016		aiocbe->uaiocb._aiocb_private.error = bp->b_error;
2017	nblks = btodb(aiocbe->uaiocb.aio_nbytes);
2018	if (aiocbe->uaiocb.aio_lio_opcode == LIO_WRITE)
2019		aiocbe->outputcharge += nblks;
2020	else
2021		aiocbe->inputcharge += nblks;
2022	aiocbe->bp = NULL;
2023	TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, aiocbe, plist);
2024	aio_bio_done_notify(userp, aiocbe, DONE_BUF);
2025	PROC_UNLOCK(userp);
2026
2027	/* Release mapping into kernel space. */
2028	vunmapbuf(bp);
2029	relpbuf(bp, NULL);
2030	atomic_subtract_int(&num_buf_aio, 1);
2031}
2032
2033/* syscall - wait for the next completion of an aio request */
2034int
2035aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap)
2036{
2037	struct proc *p = td->td_proc;
2038	struct timeval atv;
2039	struct timespec ts;
2040	struct kaioinfo *ki;
2041	struct aiocblist *cb;
2042	struct aiocb *uuaiocb;
2043	int error, status, timo;
2044
2045	suword(uap->aiocbp, (long)NULL);
2046
2047	timo = 0;
2048	if (uap->timeout) {
2049		/* Get timespec struct. */
2050		error = copyin(uap->timeout, &ts, sizeof(ts));
2051		if (error)
2052			return (error);
2053
2054		if ((ts.tv_nsec < 0) || (ts.tv_nsec >= 1000000000))
2055			return (EINVAL);
2056
2057		TIMESPEC_TO_TIMEVAL(&atv, &ts);
2058		if (itimerfix(&atv))
2059			return (EINVAL);
2060		timo = tvtohz(&atv);
2061	}
2062
2063	if (p->p_aioinfo == NULL)
2064		aio_init_aioinfo(p);
2065	ki = p->p_aioinfo;
2066
2067	error = 0;
2068	cb = NULL;
2069	PROC_LOCK(p);
2070	while ((cb = TAILQ_FIRST(&ki->kaio_done)) == NULL) {
2071		ki->kaio_flags |= KAIO_WAKEUP;
2072		error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH,
2073		    "aiowc", timo);
2074		if (error == ERESTART)
2075			error = EINTR;
2076		if (error)
2077			break;
2078	}
2079
2080	if (cb != NULL) {
2081		MPASS(cb->jobstate == JOBST_JOBFINISHED);
2082		uuaiocb = cb->uuaiocb;
2083		status = cb->uaiocb._aiocb_private.status;
2084		error = cb->uaiocb._aiocb_private.error;
2085		td->td_retval[0] = status;
2086		if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
2087			p->p_stats->p_ru.ru_oublock += cb->outputcharge;
2088			cb->outputcharge = 0;
2089		} else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
2090			p->p_stats->p_ru.ru_inblock += cb->inputcharge;
2091			cb->inputcharge = 0;
2092		}
2093		aio_free_entry(cb);
2094		PROC_UNLOCK(p);
2095		suword(uap->aiocbp, (long)uuaiocb);
2096		suword(&uuaiocb->_aiocb_private.error, error);
2097		suword(&uuaiocb->_aiocb_private.status, status);
2098	} else
2099		PROC_UNLOCK(p);
2100
2101	return (error);
2102}
2103
2104/* kqueue attach function */
2105static int
2106filt_aioattach(struct knote *kn)
2107{
2108	struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2109
2110	/*
2111	 * The aiocbe pointer must be validated before using it, so
2112	 * registration is restricted to the kernel; the user cannot
2113	 * set EV_FLAG1.
2114	 */
2115	if ((kn->kn_flags & EV_FLAG1) == 0)
2116		return (EPERM);
2117	kn->kn_flags &= ~EV_FLAG1;
2118
2119	knlist_add(&aiocbe->klist, kn, 0);
2120
2121	return (0);
2122}
2123
2124/* kqueue detach function */
2125static void
2126filt_aiodetach(struct knote *kn)
2127{
2128	struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2129
2130	if (!knlist_empty(&aiocbe->klist))
2131		knlist_remove(&aiocbe->klist, kn, 0);
2132}
2133
2134/* kqueue filter function */
2135/*ARGSUSED*/
2136static int
2137filt_aio(struct knote *kn, long hint)
2138{
2139	struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2140
2141	kn->kn_data = aiocbe->uaiocb._aiocb_private.error;
2142	if (aiocbe->jobstate != JOBST_JOBFINISHED)
2143		return (0);
2144	kn->kn_flags |= EV_EOF;
2145	return (1);
2146}
2147
2148/* kqueue attach function */
2149static int
2150filt_lioattach(struct knote *kn)
2151{
2152	struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata;
2153
2154	/*
2155	 * The aioliojob pointer must be validated before using it, so
2156	 * registration is restricted to the kernel; the user cannot
2157	 * set EV_FLAG1.
2158	 */
2159	if ((kn->kn_flags & EV_FLAG1) == 0)
2160		return (EPERM);
2161	kn->kn_flags &= ~EV_FLAG1;
2162
2163	knlist_add(&lj->klist, kn, 0);
2164
2165	return (0);
2166}
2167
2168/* kqueue detach function */
2169static void
2170filt_liodetach(struct knote *kn)
2171{
2172	struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata;
2173
2174	if (!knlist_empty(&lj->klist))
2175		knlist_remove(&lj->klist, kn, 0);
2176}
2177
2178/* kqueue filter function */
2179/*ARGSUSED*/
2180static int
2181filt_lio(struct knote *kn, long hint)
2182{
2183	struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata;
2184
2185	return (lj->lioj_flags & LIOJ_KEVENT_POSTED);
2186}
2187