vfs_aio.c revision 143776
1/*-
2 * Copyright (c) 1997 John S. Dyson.  All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions
6 * are met:
7 * 1. Redistributions of source code must retain the above copyright
8 *    notice, this list of conditions and the following disclaimer.
9 * 2. John S. Dyson's name may not be used to endorse or promote products
10 *    derived from this software without specific prior written permission.
11 *
12 * DISCLAIMER:  This code isn't warranted to do anything useful.  Anything
13 * bad that happens because of using this software isn't the responsibility
14 * of the author.  This software is distributed AS-IS.
15 */
16
17/*
18 * This file contains support for the POSIX 1003.1B AIO/LIO facility.
19 */
20
21#include <sys/cdefs.h>
22__FBSDID("$FreeBSD: head/sys/kern/vfs_aio.c 143776 2005-03-18 01:11:39Z jmg $");
23
24#include <sys/param.h>
25#include <sys/systm.h>
26#include <sys/malloc.h>
27#include <sys/bio.h>
28#include <sys/buf.h>
29#include <sys/eventhandler.h>
30#include <sys/sysproto.h>
31#include <sys/filedesc.h>
32#include <sys/kernel.h>
33#include <sys/module.h>
34#include <sys/kthread.h>
35#include <sys/fcntl.h>
36#include <sys/file.h>
37#include <sys/limits.h>
38#include <sys/lock.h>
39#include <sys/mutex.h>
40#include <sys/unistd.h>
41#include <sys/proc.h>
42#include <sys/resourcevar.h>
43#include <sys/signalvar.h>
44#include <sys/protosw.h>
45#include <sys/socketvar.h>
46#include <sys/syscall.h>
47#include <sys/sysent.h>
48#include <sys/sysctl.h>
49#include <sys/sx.h>
50#include <sys/vnode.h>
51#include <sys/conf.h>
52#include <sys/event.h>
53
54#include <posix4/posix4.h>
55#include <vm/vm.h>
56#include <vm/vm_extern.h>
57#include <vm/pmap.h>
58#include <vm/vm_map.h>
59#include <vm/uma.h>
60#include <sys/aio.h>
61
62#include "opt_vfs_aio.h"
63
64NET_NEEDS_GIANT("aio");
65
66/*
67 * Counter for allocating reference ids to new jobs.  Wrapped to 1 on
68 * overflow.
69 */
70static	long jobrefid;
71
72#define JOBST_NULL		0x0
73#define JOBST_JOBQGLOBAL	0x2
74#define JOBST_JOBRUNNING	0x3
75#define JOBST_JOBFINISHED	0x4
76#define	JOBST_JOBQBUF		0x5
77#define	JOBST_JOBBFINISHED	0x6
78
79#ifndef MAX_AIO_PER_PROC
80#define MAX_AIO_PER_PROC	32
81#endif
82
83#ifndef MAX_AIO_QUEUE_PER_PROC
84#define MAX_AIO_QUEUE_PER_PROC	256 /* Bigger than AIO_LISTIO_MAX */
85#endif
86
87#ifndef MAX_AIO_PROCS
88#define MAX_AIO_PROCS		32
89#endif
90
91#ifndef MAX_AIO_QUEUE
92#define	MAX_AIO_QUEUE		1024 /* Bigger than AIO_LISTIO_MAX */
93#endif
94
95#ifndef TARGET_AIO_PROCS
96#define TARGET_AIO_PROCS	4
97#endif
98
99#ifndef MAX_BUF_AIO
100#define MAX_BUF_AIO		16
101#endif
102
103#ifndef AIOD_TIMEOUT_DEFAULT
104#define	AIOD_TIMEOUT_DEFAULT	(10 * hz)
105#endif
106
107#ifndef AIOD_LIFETIME_DEFAULT
108#define AIOD_LIFETIME_DEFAULT	(30 * hz)
109#endif
110
111static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0, "Async IO management");
112
113static int max_aio_procs = MAX_AIO_PROCS;
114SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs,
115	CTLFLAG_RW, &max_aio_procs, 0,
116	"Maximum number of kernel threads to use for handling async IO ");
117
118static int num_aio_procs = 0;
119SYSCTL_INT(_vfs_aio, OID_AUTO, num_aio_procs,
120	CTLFLAG_RD, &num_aio_procs, 0,
121	"Number of presently active kernel threads for async IO");
122
123/*
124 * The code will adjust the actual number of AIO processes towards this
125 * number when it gets a chance.
126 */
127static int target_aio_procs = TARGET_AIO_PROCS;
128SYSCTL_INT(_vfs_aio, OID_AUTO, target_aio_procs, CTLFLAG_RW, &target_aio_procs,
129	0, "Preferred number of ready kernel threads for async IO");
130
131static int max_queue_count = MAX_AIO_QUEUE;
132SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue, CTLFLAG_RW, &max_queue_count, 0,
133    "Maximum number of aio requests to queue, globally");
134
135static int num_queue_count = 0;
136SYSCTL_INT(_vfs_aio, OID_AUTO, num_queue_count, CTLFLAG_RD, &num_queue_count, 0,
137    "Number of queued aio requests");
138
139static int num_buf_aio = 0;
140SYSCTL_INT(_vfs_aio, OID_AUTO, num_buf_aio, CTLFLAG_RD, &num_buf_aio, 0,
141    "Number of aio requests presently handled by the buf subsystem");
142
143/* Number of async I/O thread in the process of being started */
144/* XXX This should be local to _aio_aqueue() */
145static int num_aio_resv_start = 0;
146
147static int aiod_timeout;
148SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_timeout, CTLFLAG_RW, &aiod_timeout, 0,
149    "Timeout value for synchronous aio operations");
150
151static int aiod_lifetime;
152SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0,
153    "Maximum lifetime for idle aiod");
154
155static int unloadable = 0;
156SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0,
157    "Allow unload of aio (not recommended)");
158
159
160static int max_aio_per_proc = MAX_AIO_PER_PROC;
161SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc,
162    0, "Maximum active aio requests per process (stored in the process)");
163
164static int max_aio_queue_per_proc = MAX_AIO_QUEUE_PER_PROC;
165SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue_per_proc, CTLFLAG_RW,
166    &max_aio_queue_per_proc, 0,
167    "Maximum queued aio requests per process (stored in the process)");
168
169static int max_buf_aio = MAX_BUF_AIO;
170SYSCTL_INT(_vfs_aio, OID_AUTO, max_buf_aio, CTLFLAG_RW, &max_buf_aio, 0,
171    "Maximum buf aio requests per process (stored in the process)");
172
173struct aiocblist {
174	TAILQ_ENTRY(aiocblist) list;	/* List of jobs */
175	TAILQ_ENTRY(aiocblist) plist;	/* List of jobs for proc */
176	int	jobflags;
177	int	jobstate;
178	int	inputcharge;
179	int	outputcharge;
180	struct	callout_handle timeouthandle;
181	struct	buf *bp;		/* Buffer pointer */
182	struct	proc *userproc;		/* User process */ /* Not td! */
183	struct  ucred *cred;		/* Active credential when created */
184	struct	file *fd_file;		/* Pointer to file structure */
185	struct	aio_liojob *lio;	/* Optional lio job */
186	struct	aiocb *uuaiocb;		/* Pointer in userspace of aiocb */
187	struct	knlist klist;		/* list of knotes */
188	struct	aiocb uaiocb;		/* Kernel I/O control block */
189};
190
191/* jobflags */
192#define AIOCBLIST_RUNDOWN	0x4
193#define AIOCBLIST_DONE		0x10
194
195/*
196 * AIO process info
197 */
198#define AIOP_FREE	0x1			/* proc on free queue */
199#define AIOP_SCHED	0x2			/* proc explicitly scheduled */
200
201struct aiothreadlist {
202	int aiothreadflags;			/* AIO proc flags */
203	TAILQ_ENTRY(aiothreadlist) list;	/* List of processes */
204	struct thread *aiothread;		/* The AIO thread */
205};
206
207/*
208 * data-structure for lio signal management
209 */
210struct aio_liojob {
211	int	lioj_flags;
212	int	lioj_buffer_count;
213	int	lioj_buffer_finished_count;
214	int	lioj_queue_count;
215	int	lioj_queue_finished_count;
216	struct	sigevent lioj_signal;	/* signal on all I/O done */
217	TAILQ_ENTRY(aio_liojob) lioj_list;
218	struct	kaioinfo *lioj_ki;
219};
220#define	LIOJ_SIGNAL		0x1	/* signal on all done (lio) */
221#define	LIOJ_SIGNAL_POSTED	0x2	/* signal has been posted */
222
223/*
224 * per process aio data structure
225 */
226struct kaioinfo {
227	int	kaio_flags;		/* per process kaio flags */
228	int	kaio_maxactive_count;	/* maximum number of AIOs */
229	int	kaio_active_count;	/* number of currently used AIOs */
230	int	kaio_qallowed_count;	/* maxiumu size of AIO queue */
231	int	kaio_queue_count;	/* size of AIO queue */
232	int	kaio_ballowed_count;	/* maximum number of buffers */
233	int	kaio_queue_finished_count; /* number of daemon jobs finished */
234	int	kaio_buffer_count;	/* number of physio buffers */
235	int	kaio_buffer_finished_count; /* count of I/O done */
236	struct 	proc *kaio_p;		/* process that uses this kaio block */
237	TAILQ_HEAD(,aio_liojob) kaio_liojoblist; /* list of lio jobs */
238	TAILQ_HEAD(,aiocblist) kaio_jobqueue;	/* job queue for process */
239	TAILQ_HEAD(,aiocblist) kaio_jobdone;	/* done queue for process */
240	TAILQ_HEAD(,aiocblist) kaio_bufqueue;	/* buffer job queue for process */
241	TAILQ_HEAD(,aiocblist) kaio_bufdone;	/* buffer done queue for process */
242	TAILQ_HEAD(,aiocblist) kaio_sockqueue;	/* queue for aios waiting on sockets */
243};
244
245#define KAIO_RUNDOWN	0x1	/* process is being run down */
246#define KAIO_WAKEUP	0x2	/* wakeup process when there is a significant event */
247
248static TAILQ_HEAD(,aiothreadlist) aio_activeproc;	/* Active daemons */
249static TAILQ_HEAD(,aiothreadlist) aio_freeproc;		/* Idle daemons */
250static TAILQ_HEAD(,aiocblist) aio_jobs;			/* Async job list */
251static TAILQ_HEAD(,aiocblist) aio_bufjobs;		/* Phys I/O job list */
252
253static void	aio_init_aioinfo(struct proc *p);
254static void	aio_onceonly(void);
255static int	aio_free_entry(struct aiocblist *aiocbe);
256static void	aio_process(struct aiocblist *aiocbe);
257static int	aio_newproc(void);
258static int	aio_aqueue(struct thread *td, struct aiocb *job, int type);
259static void	aio_physwakeup(struct buf *bp);
260static void	aio_proc_rundown(void *arg, struct proc *p);
261static int	aio_fphysio(struct aiocblist *aiocbe);
262static int	aio_qphysio(struct proc *p, struct aiocblist *iocb);
263static void	aio_daemon(void *uproc);
264static void	aio_swake_cb(struct socket *, struct sockbuf *);
265static int	aio_unload(void);
266static void	process_signal(void *aioj);
267static int	filt_aioattach(struct knote *kn);
268static void	filt_aiodetach(struct knote *kn);
269static int	filt_aio(struct knote *kn, long hint);
270
271/*
272 * Zones for:
273 * 	kaio	Per process async io info
274 *	aiop	async io thread data
275 *	aiocb	async io jobs
276 *	aiol	list io job pointer - internal to aio_suspend XXX
277 *	aiolio	list io jobs
278 */
279static uma_zone_t kaio_zone, aiop_zone, aiocb_zone, aiol_zone, aiolio_zone;
280
281/* kqueue filters for aio */
282static struct filterops aio_filtops =
283	{ 0, filt_aioattach, filt_aiodetach, filt_aio };
284
285static eventhandler_tag exit_tag, exec_tag;
286
287/*
288 * Main operations function for use as a kernel module.
289 */
290static int
291aio_modload(struct module *module, int cmd, void *arg)
292{
293	int error = 0;
294
295	switch (cmd) {
296	case MOD_LOAD:
297		aio_onceonly();
298		break;
299	case MOD_UNLOAD:
300		error = aio_unload();
301		break;
302	case MOD_SHUTDOWN:
303		break;
304	default:
305		error = EINVAL;
306		break;
307	}
308	return (error);
309}
310
311static moduledata_t aio_mod = {
312	"aio",
313	&aio_modload,
314	NULL
315};
316
317SYSCALL_MODULE_HELPER(aio_return);
318SYSCALL_MODULE_HELPER(aio_suspend);
319SYSCALL_MODULE_HELPER(aio_cancel);
320SYSCALL_MODULE_HELPER(aio_error);
321SYSCALL_MODULE_HELPER(aio_read);
322SYSCALL_MODULE_HELPER(aio_write);
323SYSCALL_MODULE_HELPER(aio_waitcomplete);
324SYSCALL_MODULE_HELPER(lio_listio);
325
326DECLARE_MODULE(aio, aio_mod,
327	SI_SUB_VFS, SI_ORDER_ANY);
328MODULE_VERSION(aio, 1);
329
330/*
331 * Startup initialization
332 */
333static void
334aio_onceonly(void)
335{
336
337	/* XXX: should probably just use so->callback */
338	aio_swake = &aio_swake_cb;
339	exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL,
340	    EVENTHANDLER_PRI_ANY);
341	exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown, NULL,
342	    EVENTHANDLER_PRI_ANY);
343	kqueue_add_filteropts(EVFILT_AIO, &aio_filtops);
344	TAILQ_INIT(&aio_freeproc);
345	TAILQ_INIT(&aio_activeproc);
346	TAILQ_INIT(&aio_jobs);
347	TAILQ_INIT(&aio_bufjobs);
348	kaio_zone = uma_zcreate("AIO", sizeof(struct kaioinfo), NULL, NULL,
349	    NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
350	aiop_zone = uma_zcreate("AIOP", sizeof(struct aiothreadlist), NULL,
351	    NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
352	aiocb_zone = uma_zcreate("AIOCB", sizeof(struct aiocblist), NULL, NULL,
353	    NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
354	aiol_zone = uma_zcreate("AIOL", AIO_LISTIO_MAX*sizeof(intptr_t) , NULL,
355	    NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
356	aiolio_zone = uma_zcreate("AIOLIO", sizeof(struct aio_liojob), NULL,
357	    NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE);
358	aiod_timeout = AIOD_TIMEOUT_DEFAULT;
359	aiod_lifetime = AIOD_LIFETIME_DEFAULT;
360	jobrefid = 1;
361	async_io_version = _POSIX_VERSION;
362	p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, AIO_LISTIO_MAX);
363	p31b_setcfg(CTL_P1003_1B_AIO_MAX, MAX_AIO_QUEUE);
364	p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, 0);
365}
366
367/*
368 * Callback for unload of AIO when used as a module.
369 */
370static int
371aio_unload(void)
372{
373	int error;
374
375	/*
376	 * XXX: no unloads by default, it's too dangerous.
377	 * perhaps we could do it if locked out callers and then
378	 * did an aio_proc_rundown() on each process.
379	 */
380	if (!unloadable)
381		return (EOPNOTSUPP);
382
383	error = kqueue_del_filteropts(EVFILT_AIO);
384	if (error)
385		return error;
386
387	async_io_version = 0;
388	aio_swake = NULL;
389	EVENTHANDLER_DEREGISTER(process_exit, exit_tag);
390	EVENTHANDLER_DEREGISTER(process_exec, exec_tag);
391	p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1);
392	p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1);
393	p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1);
394	return (0);
395}
396
397/*
398 * Init the per-process aioinfo structure.  The aioinfo limits are set
399 * per-process for user limit (resource) management.
400 */
401static void
402aio_init_aioinfo(struct proc *p)
403{
404	struct kaioinfo *ki;
405
406	if (p->p_aioinfo == NULL) {
407		ki = uma_zalloc(kaio_zone, M_WAITOK);
408		p->p_aioinfo = ki;
409		ki->kaio_flags = 0;
410		ki->kaio_maxactive_count = max_aio_per_proc;
411		ki->kaio_active_count = 0;
412		ki->kaio_qallowed_count = max_aio_queue_per_proc;
413		ki->kaio_queue_count = 0;
414		ki->kaio_ballowed_count = max_buf_aio;
415		ki->kaio_buffer_count = 0;
416		ki->kaio_buffer_finished_count = 0;
417		ki->kaio_p = p;
418		TAILQ_INIT(&ki->kaio_jobdone);
419		TAILQ_INIT(&ki->kaio_jobqueue);
420		TAILQ_INIT(&ki->kaio_bufdone);
421		TAILQ_INIT(&ki->kaio_bufqueue);
422		TAILQ_INIT(&ki->kaio_liojoblist);
423		TAILQ_INIT(&ki->kaio_sockqueue);
424	}
425
426	while (num_aio_procs < target_aio_procs)
427		aio_newproc();
428}
429
430/*
431 * Free a job entry.  Wait for completion if it is currently active, but don't
432 * delay forever.  If we delay, we return a flag that says that we have to
433 * restart the queue scan.
434 */
435static int
436aio_free_entry(struct aiocblist *aiocbe)
437{
438	struct kaioinfo *ki;
439	struct aio_liojob *lj;
440	struct proc *p;
441	int error;
442	int s;
443
444	if (aiocbe->jobstate == JOBST_NULL)
445		panic("aio_free_entry: freeing already free job");
446
447	p = aiocbe->userproc;
448	ki = p->p_aioinfo;
449	lj = aiocbe->lio;
450	if (ki == NULL)
451		panic("aio_free_entry: missing p->p_aioinfo");
452
453	while (aiocbe->jobstate == JOBST_JOBRUNNING) {
454		aiocbe->jobflags |= AIOCBLIST_RUNDOWN;
455		tsleep(aiocbe, PRIBIO, "jobwai", 0);
456	}
457	if (aiocbe->bp == NULL) {
458		if (ki->kaio_queue_count <= 0)
459			panic("aio_free_entry: process queue size <= 0");
460		if (num_queue_count <= 0)
461			panic("aio_free_entry: system wide queue size <= 0");
462
463		if (lj) {
464			lj->lioj_queue_count--;
465			if (aiocbe->jobflags & AIOCBLIST_DONE)
466				lj->lioj_queue_finished_count--;
467		}
468		ki->kaio_queue_count--;
469		if (aiocbe->jobflags & AIOCBLIST_DONE)
470			ki->kaio_queue_finished_count--;
471		num_queue_count--;
472	} else {
473		if (lj) {
474			lj->lioj_buffer_count--;
475			if (aiocbe->jobflags & AIOCBLIST_DONE)
476				lj->lioj_buffer_finished_count--;
477		}
478		if (aiocbe->jobflags & AIOCBLIST_DONE)
479			ki->kaio_buffer_finished_count--;
480		ki->kaio_buffer_count--;
481		num_buf_aio--;
482	}
483
484	/* aiocbe is going away, we need to destroy any knotes */
485	/* XXXKSE Note the thread here is used to eventually find the
486	 * owning process again, but it is also used to do a fo_close
487	 * and that requires the thread. (but does it require the
488	 * OWNING thread? (or maybe the running thread?)
489	 * There is a semantic problem here...
490	 */
491	knlist_delete(&aiocbe->klist, FIRST_THREAD_IN_PROC(p), 0); /* XXXKSE */
492
493	if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags & KAIO_RUNDOWN)
494	    && ((ki->kaio_buffer_count == 0) && (ki->kaio_queue_count == 0)))) {
495		ki->kaio_flags &= ~KAIO_WAKEUP;
496		wakeup(p);
497	}
498
499	if (aiocbe->jobstate == JOBST_JOBQBUF) {
500		if ((error = aio_fphysio(aiocbe)) != 0)
501			return (error);
502		if (aiocbe->jobstate != JOBST_JOBBFINISHED)
503			panic("aio_free_entry: invalid physio finish-up state");
504		s = splbio();
505		TAILQ_REMOVE(&ki->kaio_bufdone, aiocbe, plist);
506		splx(s);
507	} else if (aiocbe->jobstate == JOBST_JOBQGLOBAL) {
508		s = splnet();
509		TAILQ_REMOVE(&aio_jobs, aiocbe, list);
510		TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
511		splx(s);
512	} else if (aiocbe->jobstate == JOBST_JOBFINISHED)
513		TAILQ_REMOVE(&ki->kaio_jobdone, aiocbe, plist);
514	else if (aiocbe->jobstate == JOBST_JOBBFINISHED) {
515		s = splbio();
516		TAILQ_REMOVE(&ki->kaio_bufdone, aiocbe, plist);
517		splx(s);
518		if (aiocbe->bp) {
519			vunmapbuf(aiocbe->bp);
520			relpbuf(aiocbe->bp, NULL);
521			aiocbe->bp = NULL;
522		}
523	}
524	if (lj && (lj->lioj_buffer_count == 0) && (lj->lioj_queue_count == 0)) {
525		TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
526		uma_zfree(aiolio_zone, lj);
527	}
528	aiocbe->jobstate = JOBST_NULL;
529	untimeout(process_signal, aiocbe, aiocbe->timeouthandle);
530	fdrop(aiocbe->fd_file, curthread);
531	crfree(aiocbe->cred);
532	uma_zfree(aiocb_zone, aiocbe);
533	return (0);
534}
535
536/*
537 * Rundown the jobs for a given process.
538 */
539static void
540aio_proc_rundown(void *arg, struct proc *p)
541{
542	int s;
543	struct kaioinfo *ki;
544	struct aio_liojob *lj, *ljn;
545	struct aiocblist *aiocbe, *aiocbn;
546	struct file *fp;
547	struct socket *so;
548
549	ki = p->p_aioinfo;
550	if (ki == NULL)
551		return;
552
553	mtx_lock(&Giant);
554	ki->kaio_flags |= LIOJ_SIGNAL_POSTED;
555	while ((ki->kaio_active_count > 0) || (ki->kaio_buffer_count >
556	    ki->kaio_buffer_finished_count)) {
557		ki->kaio_flags |= KAIO_RUNDOWN;
558		if (tsleep(p, PRIBIO, "kaiowt", aiod_timeout))
559			break;
560	}
561
562	/*
563	 * Move any aio ops that are waiting on socket I/O to the normal job
564	 * queues so they are cleaned up with any others.
565	 */
566	s = splnet();
567	for (aiocbe = TAILQ_FIRST(&ki->kaio_sockqueue); aiocbe; aiocbe =
568	    aiocbn) {
569		aiocbn = TAILQ_NEXT(aiocbe, plist);
570		fp = aiocbe->fd_file;
571		if (fp != NULL) {
572			so = fp->f_data;
573			TAILQ_REMOVE(&so->so_aiojobq, aiocbe, list);
574			if (TAILQ_EMPTY(&so->so_aiojobq)) {
575				SOCKBUF_LOCK(&so->so_snd);
576				so->so_snd.sb_flags &= ~SB_AIO;
577				SOCKBUF_UNLOCK(&so->so_snd);
578				SOCKBUF_LOCK(&so->so_rcv);
579				so->so_rcv.sb_flags &= ~SB_AIO;
580				SOCKBUF_UNLOCK(&so->so_rcv);
581			}
582		}
583		TAILQ_REMOVE(&ki->kaio_sockqueue, aiocbe, plist);
584		TAILQ_INSERT_HEAD(&aio_jobs, aiocbe, list);
585		TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, aiocbe, plist);
586	}
587	splx(s);
588
589restart1:
590	for (aiocbe = TAILQ_FIRST(&ki->kaio_jobdone); aiocbe; aiocbe = aiocbn) {
591		aiocbn = TAILQ_NEXT(aiocbe, plist);
592		if (aio_free_entry(aiocbe))
593			goto restart1;
594	}
595
596restart2:
597	for (aiocbe = TAILQ_FIRST(&ki->kaio_jobqueue); aiocbe; aiocbe =
598	    aiocbn) {
599		aiocbn = TAILQ_NEXT(aiocbe, plist);
600		if (aio_free_entry(aiocbe))
601			goto restart2;
602	}
603
604/*
605 * Note the use of lots of splbio here, trying to avoid splbio for long chains
606 * of I/O.  Probably unnecessary.
607 */
608restart3:
609	s = splbio();
610	while (TAILQ_FIRST(&ki->kaio_bufqueue)) {
611		ki->kaio_flags |= KAIO_WAKEUP;
612		tsleep(p, PRIBIO, "aioprn", 0);
613		splx(s);
614		goto restart3;
615	}
616	splx(s);
617
618restart4:
619	s = splbio();
620	for (aiocbe = TAILQ_FIRST(&ki->kaio_bufdone); aiocbe; aiocbe = aiocbn) {
621		aiocbn = TAILQ_NEXT(aiocbe, plist);
622		if (aio_free_entry(aiocbe)) {
623			splx(s);
624			goto restart4;
625		}
626	}
627	splx(s);
628
629	/*
630	 * If we've slept, jobs might have moved from one queue to another.
631	 * Retry rundown if we didn't manage to empty the queues.
632	 */
633	if (TAILQ_FIRST(&ki->kaio_jobdone) != NULL ||
634	    TAILQ_FIRST(&ki->kaio_jobqueue) != NULL ||
635	    TAILQ_FIRST(&ki->kaio_bufqueue) != NULL ||
636	    TAILQ_FIRST(&ki->kaio_bufdone) != NULL)
637		goto restart1;
638
639	for (lj = TAILQ_FIRST(&ki->kaio_liojoblist); lj; lj = ljn) {
640		ljn = TAILQ_NEXT(lj, lioj_list);
641		if ((lj->lioj_buffer_count == 0) && (lj->lioj_queue_count ==
642		    0)) {
643			TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list);
644			uma_zfree(aiolio_zone, lj);
645		} else {
646#ifdef DIAGNOSTIC
647			printf("LIO job not cleaned up: B:%d, BF:%d, Q:%d, "
648			    "QF:%d\n", lj->lioj_buffer_count,
649			    lj->lioj_buffer_finished_count,
650			    lj->lioj_queue_count,
651			    lj->lioj_queue_finished_count);
652#endif
653		}
654	}
655
656	uma_zfree(kaio_zone, ki);
657	p->p_aioinfo = NULL;
658	mtx_unlock(&Giant);
659}
660
661/*
662 * Select a job to run (called by an AIO daemon).
663 */
664static struct aiocblist *
665aio_selectjob(struct aiothreadlist *aiop)
666{
667	int s;
668	struct aiocblist *aiocbe;
669	struct kaioinfo *ki;
670	struct proc *userp;
671
672	s = splnet();
673	for (aiocbe = TAILQ_FIRST(&aio_jobs); aiocbe; aiocbe =
674	    TAILQ_NEXT(aiocbe, list)) {
675		userp = aiocbe->userproc;
676		ki = userp->p_aioinfo;
677
678		if (ki->kaio_active_count < ki->kaio_maxactive_count) {
679			TAILQ_REMOVE(&aio_jobs, aiocbe, list);
680			splx(s);
681			return (aiocbe);
682		}
683	}
684	splx(s);
685
686	return (NULL);
687}
688
689/*
690 * The AIO processing activity.  This is the code that does the I/O request for
691 * the non-physio version of the operations.  The normal vn operations are used,
692 * and this code should work in all instances for every type of file, including
693 * pipes, sockets, fifos, and regular files.
694 */
695static void
696aio_process(struct aiocblist *aiocbe)
697{
698	struct ucred *td_savedcred;
699	struct thread *td;
700	struct proc *mycp;
701	struct aiocb *cb;
702	struct file *fp;
703	struct uio auio;
704	struct iovec aiov;
705	int cnt;
706	int error;
707	int oublock_st, oublock_end;
708	int inblock_st, inblock_end;
709
710	td = curthread;
711	td_savedcred = td->td_ucred;
712	td->td_ucred = aiocbe->cred;
713	mycp = td->td_proc;
714	cb = &aiocbe->uaiocb;
715	fp = aiocbe->fd_file;
716
717	aiov.iov_base = (void *)(uintptr_t)cb->aio_buf;
718	aiov.iov_len = cb->aio_nbytes;
719
720	auio.uio_iov = &aiov;
721	auio.uio_iovcnt = 1;
722	auio.uio_offset = cb->aio_offset;
723	auio.uio_resid = cb->aio_nbytes;
724	cnt = cb->aio_nbytes;
725	auio.uio_segflg = UIO_USERSPACE;
726	auio.uio_td = td;
727
728	inblock_st = mycp->p_stats->p_ru.ru_inblock;
729	oublock_st = mycp->p_stats->p_ru.ru_oublock;
730	/*
731	 * _aio_aqueue() acquires a reference to the file that is
732	 * released in aio_free_entry().
733	 */
734	if (cb->aio_lio_opcode == LIO_READ) {
735		auio.uio_rw = UIO_READ;
736		error = fo_read(fp, &auio, fp->f_cred, FOF_OFFSET, td);
737	} else {
738		auio.uio_rw = UIO_WRITE;
739		error = fo_write(fp, &auio, fp->f_cred, FOF_OFFSET, td);
740	}
741	inblock_end = mycp->p_stats->p_ru.ru_inblock;
742	oublock_end = mycp->p_stats->p_ru.ru_oublock;
743
744	aiocbe->inputcharge = inblock_end - inblock_st;
745	aiocbe->outputcharge = oublock_end - oublock_st;
746
747	if ((error) && (auio.uio_resid != cnt)) {
748		if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
749			error = 0;
750		if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) {
751			PROC_LOCK(aiocbe->userproc);
752			psignal(aiocbe->userproc, SIGPIPE);
753			PROC_UNLOCK(aiocbe->userproc);
754		}
755	}
756
757	cnt -= auio.uio_resid;
758	cb->_aiocb_private.error = error;
759	cb->_aiocb_private.status = cnt;
760	td->td_ucred = td_savedcred;
761}
762
763/*
764 * The AIO daemon, most of the actual work is done in aio_process,
765 * but the setup (and address space mgmt) is done in this routine.
766 */
767static void
768aio_daemon(void *uproc)
769{
770	int s;
771	struct aio_liojob *lj;
772	struct aiocb *cb;
773	struct aiocblist *aiocbe;
774	struct aiothreadlist *aiop;
775	struct kaioinfo *ki;
776	struct proc *curcp, *mycp, *userp;
777	struct vmspace *myvm, *tmpvm;
778	struct thread *td = curthread;
779	struct pgrp *newpgrp;
780	struct session *newsess;
781
782	mtx_lock(&Giant);
783	/*
784	 * Local copies of curproc (cp) and vmspace (myvm)
785	 */
786	mycp = td->td_proc;
787	myvm = mycp->p_vmspace;
788
789	KASSERT(mycp->p_textvp == NULL, ("kthread has a textvp"));
790
791	/*
792	 * Allocate and ready the aio control info.  There is one aiop structure
793	 * per daemon.
794	 */
795	aiop = uma_zalloc(aiop_zone, M_WAITOK);
796	aiop->aiothread = td;
797	aiop->aiothreadflags |= AIOP_FREE;
798
799	s = splnet();
800
801	/*
802	 * Place thread (lightweight process) onto the AIO free thread list.
803	 */
804	if (TAILQ_EMPTY(&aio_freeproc))
805		wakeup(&aio_freeproc);
806	TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
807
808	splx(s);
809
810	/*
811	 * Get rid of our current filedescriptors.  AIOD's don't need any
812	 * filedescriptors, except as temporarily inherited from the client.
813	 */
814	fdfree(td);
815
816	mtx_unlock(&Giant);
817	/* The daemon resides in its own pgrp. */
818	MALLOC(newpgrp, struct pgrp *, sizeof(struct pgrp), M_PGRP,
819		M_WAITOK | M_ZERO);
820	MALLOC(newsess, struct session *, sizeof(struct session), M_SESSION,
821		M_WAITOK | M_ZERO);
822
823	sx_xlock(&proctree_lock);
824	enterpgrp(mycp, mycp->p_pid, newpgrp, newsess);
825	sx_xunlock(&proctree_lock);
826	mtx_lock(&Giant);
827
828	/*
829	 * Wakeup parent process.  (Parent sleeps to keep from blasting away
830	 * and creating too many daemons.)
831	 */
832	wakeup(mycp);
833
834	for (;;) {
835		/*
836		 * curcp is the current daemon process context.
837		 * userp is the current user process context.
838		 */
839		curcp = mycp;
840
841		/*
842		 * Take daemon off of free queue
843		 */
844		if (aiop->aiothreadflags & AIOP_FREE) {
845			s = splnet();
846			TAILQ_REMOVE(&aio_freeproc, aiop, list);
847			TAILQ_INSERT_TAIL(&aio_activeproc, aiop, list);
848			aiop->aiothreadflags &= ~AIOP_FREE;
849			splx(s);
850		}
851		aiop->aiothreadflags &= ~AIOP_SCHED;
852
853		/*
854		 * Check for jobs.
855		 */
856		while ((aiocbe = aio_selectjob(aiop)) != NULL) {
857			cb = &aiocbe->uaiocb;
858			userp = aiocbe->userproc;
859
860			aiocbe->jobstate = JOBST_JOBRUNNING;
861
862			/*
863			 * Connect to process address space for user program.
864			 */
865			if (userp != curcp) {
866				/*
867				 * Save the current address space that we are
868				 * connected to.
869				 */
870				tmpvm = mycp->p_vmspace;
871
872				/*
873				 * Point to the new user address space, and
874				 * refer to it.
875				 */
876				mycp->p_vmspace = userp->p_vmspace;
877				atomic_add_int(&mycp->p_vmspace->vm_refcnt, 1);
878
879				/* Activate the new mapping. */
880				pmap_activate(FIRST_THREAD_IN_PROC(mycp));
881
882				/*
883				 * If the old address space wasn't the daemons
884				 * own address space, then we need to remove the
885				 * daemon's reference from the other process
886				 * that it was acting on behalf of.
887				 */
888				if (tmpvm != myvm) {
889					vmspace_free(tmpvm);
890				}
891				curcp = userp;
892			}
893
894			ki = userp->p_aioinfo;
895			lj = aiocbe->lio;
896
897			/* Account for currently active jobs. */
898			ki->kaio_active_count++;
899
900			/* Do the I/O function. */
901			aio_process(aiocbe);
902
903			/* Decrement the active job count. */
904			ki->kaio_active_count--;
905
906			/*
907			 * Increment the completion count for wakeup/signal
908			 * comparisons.
909			 */
910			aiocbe->jobflags |= AIOCBLIST_DONE;
911			ki->kaio_queue_finished_count++;
912			if (lj)
913				lj->lioj_queue_finished_count++;
914			if ((ki->kaio_flags & KAIO_WAKEUP) || ((ki->kaio_flags
915			    & KAIO_RUNDOWN) && (ki->kaio_active_count == 0))) {
916				ki->kaio_flags &= ~KAIO_WAKEUP;
917				wakeup(userp);
918			}
919
920			s = splbio();
921			if (lj && (lj->lioj_flags &
922			    (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) == LIOJ_SIGNAL) {
923				if ((lj->lioj_queue_finished_count ==
924				    lj->lioj_queue_count) &&
925				    (lj->lioj_buffer_finished_count ==
926				    lj->lioj_buffer_count)) {
927					PROC_LOCK(userp);
928					psignal(userp,
929					    lj->lioj_signal.sigev_signo);
930					PROC_UNLOCK(userp);
931					lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
932				}
933			}
934			splx(s);
935
936			aiocbe->jobstate = JOBST_JOBFINISHED;
937
938			s = splnet();
939			TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist);
940			TAILQ_INSERT_TAIL(&ki->kaio_jobdone, aiocbe, plist);
941			splx(s);
942			KNOTE_UNLOCKED(&aiocbe->klist, 0);
943
944			if (aiocbe->jobflags & AIOCBLIST_RUNDOWN) {
945				wakeup(aiocbe);
946				aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN;
947			}
948
949			if (cb->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
950				PROC_LOCK(userp);
951				psignal(userp, cb->aio_sigevent.sigev_signo);
952				PROC_UNLOCK(userp);
953			}
954		}
955
956		/*
957		 * Disconnect from user address space.
958		 */
959		if (curcp != mycp) {
960			/* Get the user address space to disconnect from. */
961			tmpvm = mycp->p_vmspace;
962
963			/* Get original address space for daemon. */
964			mycp->p_vmspace = myvm;
965
966			/* Activate the daemon's address space. */
967			pmap_activate(FIRST_THREAD_IN_PROC(mycp));
968#ifdef DIAGNOSTIC
969			if (tmpvm == myvm) {
970				printf("AIOD: vmspace problem -- %d\n",
971				    mycp->p_pid);
972			}
973#endif
974			/* Remove our vmspace reference. */
975			vmspace_free(tmpvm);
976
977			curcp = mycp;
978		}
979
980		/*
981		 * If we are the first to be put onto the free queue, wakeup
982		 * anyone waiting for a daemon.
983		 */
984		s = splnet();
985		TAILQ_REMOVE(&aio_activeproc, aiop, list);
986		if (TAILQ_EMPTY(&aio_freeproc))
987			wakeup(&aio_freeproc);
988		TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list);
989		aiop->aiothreadflags |= AIOP_FREE;
990		splx(s);
991
992		/*
993		 * If daemon is inactive for a long time, allow it to exit,
994		 * thereby freeing resources.
995		 */
996		if ((aiop->aiothreadflags & AIOP_SCHED) == 0 &&
997		    tsleep(aiop->aiothread, PRIBIO, "aiordy", aiod_lifetime)) {
998			s = splnet();
999			if (TAILQ_EMPTY(&aio_jobs)) {
1000				if ((aiop->aiothreadflags & AIOP_FREE) &&
1001				    (num_aio_procs > target_aio_procs)) {
1002					TAILQ_REMOVE(&aio_freeproc, aiop, list);
1003					splx(s);
1004					uma_zfree(aiop_zone, aiop);
1005					num_aio_procs--;
1006#ifdef DIAGNOSTIC
1007					if (mycp->p_vmspace->vm_refcnt <= 1) {
1008						printf("AIOD: bad vm refcnt for"
1009						    " exiting daemon: %d\n",
1010						    mycp->p_vmspace->vm_refcnt);
1011					}
1012#endif
1013					kthread_exit(0);
1014				}
1015			}
1016			splx(s);
1017		}
1018	}
1019}
1020
1021/*
1022 * Create a new AIO daemon.  This is mostly a kernel-thread fork routine.  The
1023 * AIO daemon modifies its environment itself.
1024 */
1025static int
1026aio_newproc(void)
1027{
1028	int error;
1029	struct proc *p;
1030
1031	error = kthread_create(aio_daemon, curproc, &p, RFNOWAIT, 0, "aiod%d",
1032	    num_aio_procs);
1033	if (error)
1034		return (error);
1035
1036	/*
1037	 * Wait until daemon is started, but continue on just in case to
1038	 * handle error conditions.
1039	 */
1040	error = tsleep(p, PZERO, "aiosta", aiod_timeout);
1041
1042	num_aio_procs++;
1043
1044	return (error);
1045}
1046
1047/*
1048 * Try the high-performance, low-overhead physio method for eligible
1049 * VCHR devices.  This method doesn't use an aio helper thread, and
1050 * thus has very low overhead.
1051 *
1052 * Assumes that the caller, _aio_aqueue(), has incremented the file
1053 * structure's reference count, preventing its deallocation for the
1054 * duration of this call.
1055 */
1056static int
1057aio_qphysio(struct proc *p, struct aiocblist *aiocbe)
1058{
1059	int error;
1060	struct aiocb *cb;
1061	struct file *fp;
1062	struct buf *bp;
1063	struct vnode *vp;
1064	struct kaioinfo *ki;
1065	struct aio_liojob *lj;
1066	int s;
1067	int notify;
1068
1069	cb = &aiocbe->uaiocb;
1070	fp = aiocbe->fd_file;
1071
1072	if (fp->f_type != DTYPE_VNODE)
1073		return (-1);
1074
1075	vp = fp->f_vnode;
1076
1077	/*
1078	 * If its not a disk, we don't want to return a positive error.
1079	 * It causes the aio code to not fall through to try the thread
1080	 * way when you're talking to a regular file.
1081	 */
1082	if (!vn_isdisk(vp, &error)) {
1083		if (error == ENOTBLK)
1084			return (-1);
1085		else
1086			return (error);
1087	}
1088
1089 	if (cb->aio_nbytes % vp->v_bufobj.bo_bsize)
1090		return (-1);
1091
1092	if (cb->aio_nbytes >
1093	    MAXPHYS - (((vm_offset_t) cb->aio_buf) & PAGE_MASK))
1094		return (-1);
1095
1096	ki = p->p_aioinfo;
1097	if (ki->kaio_buffer_count >= ki->kaio_ballowed_count)
1098		return (-1);
1099
1100	ki->kaio_buffer_count++;
1101
1102	lj = aiocbe->lio;
1103	if (lj)
1104		lj->lioj_buffer_count++;
1105
1106	/* Create and build a buffer header for a transfer. */
1107	bp = (struct buf *)getpbuf(NULL);
1108	BUF_KERNPROC(bp);
1109
1110	/*
1111	 * Get a copy of the kva from the physical buffer.
1112	 */
1113	error = 0;
1114
1115	bp->b_bcount = cb->aio_nbytes;
1116	bp->b_bufsize = cb->aio_nbytes;
1117	bp->b_iodone = aio_physwakeup;
1118	bp->b_saveaddr = bp->b_data;
1119	bp->b_data = (void *)(uintptr_t)cb->aio_buf;
1120	bp->b_offset = cb->aio_offset;
1121	bp->b_iooffset = cb->aio_offset;
1122	bp->b_blkno = btodb(cb->aio_offset);
1123	bp->b_iocmd = cb->aio_lio_opcode == LIO_WRITE ? BIO_WRITE : BIO_READ;
1124
1125	/*
1126	 * Bring buffer into kernel space.
1127	 */
1128	if (vmapbuf(bp) < 0) {
1129		error = EFAULT;
1130		goto doerror;
1131	}
1132
1133	s = splbio();
1134	aiocbe->bp = bp;
1135	bp->b_caller1 = (void *)aiocbe;
1136	TAILQ_INSERT_TAIL(&aio_bufjobs, aiocbe, list);
1137	TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, aiocbe, plist);
1138	aiocbe->jobstate = JOBST_JOBQBUF;
1139	cb->_aiocb_private.status = cb->aio_nbytes;
1140	num_buf_aio++;
1141	bp->b_error = 0;
1142
1143	splx(s);
1144
1145	/* Perform transfer. */
1146	dev_strategy(vp->v_rdev, bp);
1147
1148	notify = 0;
1149	s = splbio();
1150
1151	/*
1152	 * If we had an error invoking the request, or an error in processing
1153	 * the request before we have returned, we process it as an error in
1154	 * transfer.  Note that such an I/O error is not indicated immediately,
1155	 * but is returned using the aio_error mechanism.  In this case,
1156	 * aio_suspend will return immediately.
1157	 */
1158	if (bp->b_error || (bp->b_ioflags & BIO_ERROR)) {
1159		struct aiocb *job = aiocbe->uuaiocb;
1160
1161		aiocbe->uaiocb._aiocb_private.status = 0;
1162		suword(&job->_aiocb_private.status, 0);
1163		aiocbe->uaiocb._aiocb_private.error = bp->b_error;
1164		suword(&job->_aiocb_private.error, bp->b_error);
1165
1166		ki->kaio_buffer_finished_count++;
1167
1168		if (aiocbe->jobstate != JOBST_JOBBFINISHED) {
1169			aiocbe->jobstate = JOBST_JOBBFINISHED;
1170			aiocbe->jobflags |= AIOCBLIST_DONE;
1171			TAILQ_REMOVE(&aio_bufjobs, aiocbe, list);
1172			TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist);
1173			TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist);
1174			notify = 1;
1175		}
1176	}
1177	splx(s);
1178	if (notify)
1179		KNOTE_UNLOCKED(&aiocbe->klist, 0);
1180	return (0);
1181
1182doerror:
1183	ki->kaio_buffer_count--;
1184	if (lj)
1185		lj->lioj_buffer_count--;
1186	aiocbe->bp = NULL;
1187	relpbuf(bp, NULL);
1188	return (error);
1189}
1190
1191/*
1192 * This waits/tests physio completion.
1193 */
1194static int
1195aio_fphysio(struct aiocblist *iocb)
1196{
1197	int s;
1198	struct buf *bp;
1199	int error;
1200
1201	bp = iocb->bp;
1202
1203	s = splbio();
1204	while ((bp->b_flags & B_DONE) == 0) {
1205		if (tsleep(bp, PRIBIO, "physstr", aiod_timeout)) {
1206			if ((bp->b_flags & B_DONE) == 0) {
1207				splx(s);
1208				return (EINPROGRESS);
1209			} else
1210				break;
1211		}
1212	}
1213	splx(s);
1214
1215	/* Release mapping into kernel space. */
1216	vunmapbuf(bp);
1217	iocb->bp = 0;
1218
1219	error = 0;
1220
1221	/* Check for an error. */
1222	if (bp->b_ioflags & BIO_ERROR)
1223		error = bp->b_error;
1224
1225	relpbuf(bp, NULL);
1226	return (error);
1227}
1228
1229/*
1230 * Wake up aio requests that may be serviceable now.
1231 */
1232static void
1233aio_swake_cb(struct socket *so, struct sockbuf *sb)
1234{
1235	struct aiocblist *cb,*cbn;
1236	struct proc *p;
1237	struct kaioinfo *ki = NULL;
1238	int opcode, wakecount = 0;
1239	struct aiothreadlist *aiop;
1240
1241	if (sb == &so->so_snd) {
1242		opcode = LIO_WRITE;
1243		SOCKBUF_LOCK(&so->so_snd);
1244		so->so_snd.sb_flags &= ~SB_AIO;
1245		SOCKBUF_UNLOCK(&so->so_snd);
1246	} else {
1247		opcode = LIO_READ;
1248		SOCKBUF_LOCK(&so->so_rcv);
1249		so->so_rcv.sb_flags &= ~SB_AIO;
1250		SOCKBUF_UNLOCK(&so->so_rcv);
1251	}
1252
1253	for (cb = TAILQ_FIRST(&so->so_aiojobq); cb; cb = cbn) {
1254		cbn = TAILQ_NEXT(cb, list);
1255		if (opcode == cb->uaiocb.aio_lio_opcode) {
1256			p = cb->userproc;
1257			ki = p->p_aioinfo;
1258			TAILQ_REMOVE(&so->so_aiojobq, cb, list);
1259			TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist);
1260			TAILQ_INSERT_TAIL(&aio_jobs, cb, list);
1261			TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist);
1262			wakecount++;
1263			if (cb->jobstate != JOBST_JOBQGLOBAL)
1264				panic("invalid queue value");
1265		}
1266	}
1267
1268	while (wakecount--) {
1269		if ((aiop = TAILQ_FIRST(&aio_freeproc)) != 0) {
1270			TAILQ_REMOVE(&aio_freeproc, aiop, list);
1271			TAILQ_INSERT_TAIL(&aio_activeproc, aiop, list);
1272			aiop->aiothreadflags &= ~AIOP_FREE;
1273			wakeup(aiop->aiothread);
1274		}
1275	}
1276}
1277
1278/*
1279 * Queue a new AIO request.  Choosing either the threaded or direct physio VCHR
1280 * technique is done in this code.
1281 */
1282static int
1283_aio_aqueue(struct thread *td, struct aiocb *job, struct aio_liojob *lj, int type)
1284{
1285	struct proc *p = td->td_proc;
1286	struct filedesc *fdp;
1287	struct file *fp;
1288	unsigned int fd;
1289	struct socket *so;
1290	int s;
1291	int error;
1292	int opcode, user_opcode;
1293	struct aiocblist *aiocbe;
1294	struct aiothreadlist *aiop;
1295	struct kaioinfo *ki;
1296	struct kevent kev;
1297	struct kqueue *kq;
1298	struct file *kq_fp;
1299	struct sockbuf *sb;
1300
1301	aiocbe = uma_zalloc(aiocb_zone, M_WAITOK);
1302	aiocbe->inputcharge = 0;
1303	aiocbe->outputcharge = 0;
1304	callout_handle_init(&aiocbe->timeouthandle);
1305	/* XXX - need a lock */
1306	knlist_init(&aiocbe->klist, NULL);
1307
1308	suword(&job->_aiocb_private.status, -1);
1309	suword(&job->_aiocb_private.error, 0);
1310	suword(&job->_aiocb_private.kernelinfo, -1);
1311
1312	error = copyin(job, &aiocbe->uaiocb, sizeof(aiocbe->uaiocb));
1313	if (error) {
1314		suword(&job->_aiocb_private.error, error);
1315		uma_zfree(aiocb_zone, aiocbe);
1316		return (error);
1317	}
1318	if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL &&
1319		!_SIG_VALID(aiocbe->uaiocb.aio_sigevent.sigev_signo)) {
1320		uma_zfree(aiocb_zone, aiocbe);
1321		return (EINVAL);
1322	}
1323
1324	/* Save userspace address of the job info. */
1325	aiocbe->uuaiocb = job;
1326
1327	/* Get the opcode. */
1328	user_opcode = aiocbe->uaiocb.aio_lio_opcode;
1329	if (type != LIO_NOP)
1330		aiocbe->uaiocb.aio_lio_opcode = type;
1331	opcode = aiocbe->uaiocb.aio_lio_opcode;
1332
1333	/* Get the fd info for process. */
1334	fdp = p->p_fd;
1335
1336	/*
1337	 * Range check file descriptor.
1338	 */
1339	FILEDESC_LOCK(fdp);
1340	fd = aiocbe->uaiocb.aio_fildes;
1341	if (fd >= fdp->fd_nfiles) {
1342		FILEDESC_UNLOCK(fdp);
1343		uma_zfree(aiocb_zone, aiocbe);
1344		if (type == 0)
1345			suword(&job->_aiocb_private.error, EBADF);
1346		return (EBADF);
1347	}
1348
1349	fp = aiocbe->fd_file = fdp->fd_ofiles[fd];
1350	if ((fp == NULL) ||
1351	    ((opcode == LIO_WRITE) && ((fp->f_flag & FWRITE) == 0)) ||
1352	    ((opcode == LIO_READ) && ((fp->f_flag & FREAD) == 0))) {
1353		FILEDESC_UNLOCK(fdp);
1354		uma_zfree(aiocb_zone, aiocbe);
1355		if (type == 0)
1356			suword(&job->_aiocb_private.error, EBADF);
1357		return (EBADF);
1358	}
1359	fhold(fp);
1360	FILEDESC_UNLOCK(fdp);
1361
1362	if (aiocbe->uaiocb.aio_offset == -1LL) {
1363		error = EINVAL;
1364		goto aqueue_fail;
1365	}
1366	error = suword(&job->_aiocb_private.kernelinfo, jobrefid);
1367	if (error) {
1368		error = EINVAL;
1369		goto aqueue_fail;
1370	}
1371	aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)(intptr_t)jobrefid;
1372	if (jobrefid == LONG_MAX)
1373		jobrefid = 1;
1374	else
1375		jobrefid++;
1376
1377	if (opcode == LIO_NOP) {
1378		fdrop(fp, td);
1379		uma_zfree(aiocb_zone, aiocbe);
1380		if (type == 0) {
1381			suword(&job->_aiocb_private.error, 0);
1382			suword(&job->_aiocb_private.status, 0);
1383			suword(&job->_aiocb_private.kernelinfo, 0);
1384		}
1385		return (0);
1386	}
1387	if ((opcode != LIO_READ) && (opcode != LIO_WRITE)) {
1388		if (type == 0)
1389			suword(&job->_aiocb_private.status, 0);
1390		error = EINVAL;
1391		goto aqueue_fail;
1392	}
1393
1394	if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) {
1395		kev.ident = aiocbe->uaiocb.aio_sigevent.sigev_notify_kqueue;
1396		kev.udata = aiocbe->uaiocb.aio_sigevent.sigev_value.sigval_ptr;
1397	}
1398	else {
1399		/*
1400		 * This method for requesting kevent-based notification won't
1401		 * work on the alpha, since we're passing in a pointer
1402		 * via aio_lio_opcode, which is an int.  Use the SIGEV_KEVENT-
1403		 * based method instead.
1404		 */
1405		if (user_opcode == LIO_NOP || user_opcode == LIO_READ ||
1406		    user_opcode == LIO_WRITE)
1407			goto no_kqueue;
1408
1409		error = copyin((struct kevent *)(uintptr_t)user_opcode,
1410		    &kev, sizeof(kev));
1411		if (error)
1412			goto aqueue_fail;
1413	}
1414	if ((u_int)kev.ident >= fdp->fd_nfiles ||
1415	    (kq_fp = fdp->fd_ofiles[kev.ident]) == NULL ||
1416	    (kq_fp->f_type != DTYPE_KQUEUE)) {
1417		error = EBADF;
1418		goto aqueue_fail;
1419	}
1420	kq = kq_fp->f_data;
1421	kev.ident = (uintptr_t)aiocbe->uuaiocb;
1422	kev.filter = EVFILT_AIO;
1423	kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1;
1424	kev.data = (intptr_t)aiocbe;
1425	error = kqueue_register(kq, &kev, td, 1);
1426aqueue_fail:
1427	if (error) {
1428		fdrop(fp, td);
1429		uma_zfree(aiocb_zone, aiocbe);
1430		if (type == 0)
1431			suword(&job->_aiocb_private.error, error);
1432		goto done;
1433	}
1434no_kqueue:
1435
1436	suword(&job->_aiocb_private.error, EINPROGRESS);
1437	aiocbe->uaiocb._aiocb_private.error = EINPROGRESS;
1438	aiocbe->userproc = p;
1439	aiocbe->cred = crhold(td->td_ucred);
1440	aiocbe->jobflags = 0;
1441	aiocbe->lio = lj;
1442	ki = p->p_aioinfo;
1443
1444	if (fp->f_type == DTYPE_SOCKET) {
1445		/*
1446		 * Alternate queueing for socket ops: Reach down into the
1447		 * descriptor to get the socket data.  Then check to see if the
1448		 * socket is ready to be read or written (based on the requested
1449		 * operation).
1450		 *
1451		 * If it is not ready for io, then queue the aiocbe on the
1452		 * socket, and set the flags so we get a call when sbnotify()
1453		 * happens.
1454		 *
1455		 * Note if opcode is neither LIO_WRITE nor LIO_READ we lock
1456		 * and unlock the snd sockbuf for no reason.
1457		 */
1458		so = fp->f_data;
1459		sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd;
1460		SOCKBUF_LOCK(sb);
1461		s = splnet();
1462		if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
1463		    LIO_WRITE) && (!sowriteable(so)))) {
1464			TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list);
1465			TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist);
1466			sb->sb_flags |= SB_AIO;
1467			aiocbe->jobstate = JOBST_JOBQGLOBAL; /* XXX */
1468			ki->kaio_queue_count++;
1469			num_queue_count++;
1470			SOCKBUF_UNLOCK(sb);
1471			splx(s);
1472			error = 0;
1473			goto done;
1474		}
1475		SOCKBUF_UNLOCK(sb);
1476		splx(s);
1477	}
1478
1479	if ((error = aio_qphysio(p, aiocbe)) == 0)
1480		goto done;
1481	if (error > 0) {
1482		suword(&job->_aiocb_private.status, 0);
1483		aiocbe->uaiocb._aiocb_private.error = error;
1484		suword(&job->_aiocb_private.error, error);
1485		goto done;
1486	}
1487
1488	/* No buffer for daemon I/O. */
1489	aiocbe->bp = NULL;
1490
1491	ki->kaio_queue_count++;
1492	if (lj)
1493		lj->lioj_queue_count++;
1494	s = splnet();
1495	TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist);
1496	TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list);
1497	splx(s);
1498	aiocbe->jobstate = JOBST_JOBQGLOBAL;
1499
1500	num_queue_count++;
1501	error = 0;
1502
1503	/*
1504	 * If we don't have a free AIO process, and we are below our quota, then
1505	 * start one.  Otherwise, depend on the subsequent I/O completions to
1506	 * pick-up this job.  If we don't sucessfully create the new process
1507	 * (thread) due to resource issues, we return an error for now (EAGAIN),
1508	 * which is likely not the correct thing to do.
1509	 */
1510	s = splnet();
1511retryproc:
1512	if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) {
1513		TAILQ_REMOVE(&aio_freeproc, aiop, list);
1514		TAILQ_INSERT_TAIL(&aio_activeproc, aiop, list);
1515		aiop->aiothreadflags &= ~AIOP_FREE;
1516		wakeup(aiop->aiothread);
1517	} else if (((num_aio_resv_start + num_aio_procs) < max_aio_procs) &&
1518	    ((ki->kaio_active_count + num_aio_resv_start) <
1519	    ki->kaio_maxactive_count)) {
1520		num_aio_resv_start++;
1521		if ((error = aio_newproc()) == 0) {
1522			num_aio_resv_start--;
1523			goto retryproc;
1524		}
1525		num_aio_resv_start--;
1526	}
1527	splx(s);
1528done:
1529	return (error);
1530}
1531
1532/*
1533 * This routine queues an AIO request, checking for quotas.
1534 */
1535static int
1536aio_aqueue(struct thread *td, struct aiocb *job, int type)
1537{
1538	struct proc *p = td->td_proc;
1539	struct kaioinfo *ki;
1540
1541	if (p->p_aioinfo == NULL)
1542		aio_init_aioinfo(p);
1543
1544	if (num_queue_count >= max_queue_count)
1545		return (EAGAIN);
1546
1547	ki = p->p_aioinfo;
1548	if (ki->kaio_queue_count >= ki->kaio_qallowed_count)
1549		return (EAGAIN);
1550
1551	return _aio_aqueue(td, job, NULL, type);
1552}
1553
1554/*
1555 * Support the aio_return system call, as a side-effect, kernel resources are
1556 * released.
1557 */
1558int
1559aio_return(struct thread *td, struct aio_return_args *uap)
1560{
1561	struct proc *p = td->td_proc;
1562	int s;
1563	long jobref;
1564	struct aiocblist *cb, *ncb;
1565	struct aiocb *ujob;
1566	struct kaioinfo *ki;
1567
1568	ujob = uap->aiocbp;
1569	jobref = fuword(&ujob->_aiocb_private.kernelinfo);
1570	if (jobref == -1 || jobref == 0)
1571		return (EINVAL);
1572
1573	ki = p->p_aioinfo;
1574	if (ki == NULL)
1575		return (EINVAL);
1576	TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
1577		if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo) ==
1578		    jobref) {
1579			if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
1580				p->p_stats->p_ru.ru_oublock +=
1581				    cb->outputcharge;
1582				cb->outputcharge = 0;
1583			} else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
1584				p->p_stats->p_ru.ru_inblock += cb->inputcharge;
1585				cb->inputcharge = 0;
1586			}
1587			goto done;
1588		}
1589	}
1590	s = splbio();
1591	for (cb = TAILQ_FIRST(&ki->kaio_bufdone); cb; cb = ncb) {
1592		ncb = TAILQ_NEXT(cb, plist);
1593		if (((intptr_t) cb->uaiocb._aiocb_private.kernelinfo)
1594		    == jobref) {
1595			break;
1596		}
1597	}
1598	splx(s);
1599 done:
1600	if (cb != NULL) {
1601		if (ujob == cb->uuaiocb) {
1602			td->td_retval[0] =
1603			    cb->uaiocb._aiocb_private.status;
1604		} else
1605			td->td_retval[0] = EFAULT;
1606		aio_free_entry(cb);
1607		return (0);
1608	}
1609	return (EINVAL);
1610}
1611
1612/*
1613 * Allow a process to wakeup when any of the I/O requests are completed.
1614 */
1615int
1616aio_suspend(struct thread *td, struct aio_suspend_args *uap)
1617{
1618	struct proc *p = td->td_proc;
1619	struct timeval atv;
1620	struct timespec ts;
1621	struct aiocb *const *cbptr, *cbp;
1622	struct kaioinfo *ki;
1623	struct aiocblist *cb;
1624	int i;
1625	int njoblist;
1626	int error, s, timo;
1627	long *ijoblist;
1628	struct aiocb **ujoblist;
1629
1630	if (uap->nent < 0 || uap->nent > AIO_LISTIO_MAX)
1631		return (EINVAL);
1632
1633	timo = 0;
1634	if (uap->timeout) {
1635		/* Get timespec struct. */
1636		if ((error = copyin(uap->timeout, &ts, sizeof(ts))) != 0)
1637			return (error);
1638
1639		if (ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000)
1640			return (EINVAL);
1641
1642		TIMESPEC_TO_TIMEVAL(&atv, &ts);
1643		if (itimerfix(&atv))
1644			return (EINVAL);
1645		timo = tvtohz(&atv);
1646	}
1647
1648	ki = p->p_aioinfo;
1649	if (ki == NULL)
1650		return (EAGAIN);
1651
1652	njoblist = 0;
1653	ijoblist = uma_zalloc(aiol_zone, M_WAITOK);
1654	ujoblist = uma_zalloc(aiol_zone, M_WAITOK);
1655	cbptr = uap->aiocbp;
1656
1657	for (i = 0; i < uap->nent; i++) {
1658		cbp = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
1659		if (cbp == 0)
1660			continue;
1661		ujoblist[njoblist] = cbp;
1662		ijoblist[njoblist] = fuword(&cbp->_aiocb_private.kernelinfo);
1663		njoblist++;
1664	}
1665
1666	if (njoblist == 0) {
1667		uma_zfree(aiol_zone, ijoblist);
1668		uma_zfree(aiol_zone, ujoblist);
1669		return (0);
1670	}
1671
1672	error = 0;
1673	for (;;) {
1674		TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
1675			for (i = 0; i < njoblist; i++) {
1676				if (((intptr_t)
1677				    cb->uaiocb._aiocb_private.kernelinfo) ==
1678				    ijoblist[i]) {
1679					if (ujoblist[i] != cb->uuaiocb)
1680						error = EINVAL;
1681					uma_zfree(aiol_zone, ijoblist);
1682					uma_zfree(aiol_zone, ujoblist);
1683					return (error);
1684				}
1685			}
1686		}
1687
1688		s = splbio();
1689		for (cb = TAILQ_FIRST(&ki->kaio_bufdone); cb; cb =
1690		    TAILQ_NEXT(cb, plist)) {
1691			for (i = 0; i < njoblist; i++) {
1692				if (((intptr_t)
1693				    cb->uaiocb._aiocb_private.kernelinfo) ==
1694				    ijoblist[i]) {
1695					splx(s);
1696					if (ujoblist[i] != cb->uuaiocb)
1697						error = EINVAL;
1698					uma_zfree(aiol_zone, ijoblist);
1699					uma_zfree(aiol_zone, ujoblist);
1700					return (error);
1701				}
1702			}
1703		}
1704
1705		ki->kaio_flags |= KAIO_WAKEUP;
1706		error = tsleep(p, PRIBIO | PCATCH, "aiospn", timo);
1707		splx(s);
1708
1709		if (error == ERESTART || error == EINTR) {
1710			uma_zfree(aiol_zone, ijoblist);
1711			uma_zfree(aiol_zone, ujoblist);
1712			return (EINTR);
1713		} else if (error == EWOULDBLOCK) {
1714			uma_zfree(aiol_zone, ijoblist);
1715			uma_zfree(aiol_zone, ujoblist);
1716			return (EAGAIN);
1717		}
1718	}
1719
1720/* NOTREACHED */
1721	return (EINVAL);
1722}
1723
1724/*
1725 * aio_cancel cancels any non-physio aio operations not currently in
1726 * progress.
1727 */
1728int
1729aio_cancel(struct thread *td, struct aio_cancel_args *uap)
1730{
1731	struct proc *p = td->td_proc;
1732	struct kaioinfo *ki;
1733	struct aiocblist *cbe, *cbn;
1734	struct file *fp;
1735	struct filedesc *fdp;
1736	struct socket *so;
1737	struct proc *po;
1738	int s,error;
1739	int cancelled=0;
1740	int notcancelled=0;
1741	struct vnode *vp;
1742
1743	fdp = p->p_fd;
1744	if ((u_int)uap->fd >= fdp->fd_nfiles ||
1745	    (fp = fdp->fd_ofiles[uap->fd]) == NULL)
1746		return (EBADF);
1747
1748	if (fp->f_type == DTYPE_VNODE) {
1749		vp = fp->f_vnode;
1750
1751		if (vn_isdisk(vp,&error)) {
1752			td->td_retval[0] = AIO_NOTCANCELED;
1753			return (0);
1754		}
1755	} else if (fp->f_type == DTYPE_SOCKET) {
1756		so = fp->f_data;
1757
1758		s = splnet();
1759
1760		for (cbe = TAILQ_FIRST(&so->so_aiojobq); cbe; cbe = cbn) {
1761			cbn = TAILQ_NEXT(cbe, list);
1762			if ((uap->aiocbp == NULL) ||
1763				(uap->aiocbp == cbe->uuaiocb) ) {
1764				po = cbe->userproc;
1765				ki = po->p_aioinfo;
1766				TAILQ_REMOVE(&so->so_aiojobq, cbe, list);
1767				TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist);
1768				TAILQ_INSERT_TAIL(&ki->kaio_jobdone, cbe, plist);
1769				if (ki->kaio_flags & KAIO_WAKEUP) {
1770					wakeup(po);
1771				}
1772				cbe->jobstate = JOBST_JOBFINISHED;
1773				cbe->uaiocb._aiocb_private.status=-1;
1774				cbe->uaiocb._aiocb_private.error=ECANCELED;
1775				cancelled++;
1776/* XXX cancelled, knote? */
1777				if (cbe->uaiocb.aio_sigevent.sigev_notify ==
1778				    SIGEV_SIGNAL) {
1779					PROC_LOCK(cbe->userproc);
1780					psignal(cbe->userproc, cbe->uaiocb.aio_sigevent.sigev_signo);
1781					PROC_UNLOCK(cbe->userproc);
1782				}
1783				if (uap->aiocbp)
1784					break;
1785			}
1786		}
1787		splx(s);
1788
1789		if ((cancelled) && (uap->aiocbp)) {
1790			td->td_retval[0] = AIO_CANCELED;
1791			return (0);
1792		}
1793	}
1794	ki=p->p_aioinfo;
1795	if (ki == NULL)
1796		goto done;
1797	s = splnet();
1798
1799	for (cbe = TAILQ_FIRST(&ki->kaio_jobqueue); cbe; cbe = cbn) {
1800		cbn = TAILQ_NEXT(cbe, plist);
1801
1802		if ((uap->fd == cbe->uaiocb.aio_fildes) &&
1803		    ((uap->aiocbp == NULL ) ||
1804		     (uap->aiocbp == cbe->uuaiocb))) {
1805
1806			if (cbe->jobstate == JOBST_JOBQGLOBAL) {
1807				TAILQ_REMOVE(&aio_jobs, cbe, list);
1808				TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist);
1809				TAILQ_INSERT_TAIL(&ki->kaio_jobdone, cbe,
1810				    plist);
1811				cancelled++;
1812				ki->kaio_queue_finished_count++;
1813				cbe->jobstate = JOBST_JOBFINISHED;
1814				cbe->uaiocb._aiocb_private.status = -1;
1815				cbe->uaiocb._aiocb_private.error = ECANCELED;
1816/* XXX cancelled, knote? */
1817				if (cbe->uaiocb.aio_sigevent.sigev_notify ==
1818				    SIGEV_SIGNAL) {
1819					PROC_LOCK(cbe->userproc);
1820					psignal(cbe->userproc, cbe->uaiocb.aio_sigevent.sigev_signo);
1821					PROC_UNLOCK(cbe->userproc);
1822				}
1823			} else {
1824				notcancelled++;
1825			}
1826		}
1827	}
1828	splx(s);
1829done:
1830	if (notcancelled) {
1831		td->td_retval[0] = AIO_NOTCANCELED;
1832		return (0);
1833	}
1834	if (cancelled) {
1835		td->td_retval[0] = AIO_CANCELED;
1836		return (0);
1837	}
1838	td->td_retval[0] = AIO_ALLDONE;
1839
1840	return (0);
1841}
1842
1843/*
1844 * aio_error is implemented in the kernel level for compatibility purposes only.
1845 * For a user mode async implementation, it would be best to do it in a userland
1846 * subroutine.
1847 */
1848int
1849aio_error(struct thread *td, struct aio_error_args *uap)
1850{
1851	struct proc *p = td->td_proc;
1852	int s;
1853	struct aiocblist *cb;
1854	struct kaioinfo *ki;
1855	long jobref;
1856
1857	ki = p->p_aioinfo;
1858	if (ki == NULL)
1859		return (EINVAL);
1860
1861	jobref = fuword(&uap->aiocbp->_aiocb_private.kernelinfo);
1862	if ((jobref == -1) || (jobref == 0))
1863		return (EINVAL);
1864
1865	TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
1866		if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1867		    jobref) {
1868			td->td_retval[0] = cb->uaiocb._aiocb_private.error;
1869			return (0);
1870		}
1871	}
1872
1873	s = splnet();
1874
1875	for (cb = TAILQ_FIRST(&ki->kaio_jobqueue); cb; cb = TAILQ_NEXT(cb,
1876	    plist)) {
1877		if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1878		    jobref) {
1879			td->td_retval[0] = EINPROGRESS;
1880			splx(s);
1881			return (0);
1882		}
1883	}
1884
1885	for (cb = TAILQ_FIRST(&ki->kaio_sockqueue); cb; cb = TAILQ_NEXT(cb,
1886	    plist)) {
1887		if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1888		    jobref) {
1889			td->td_retval[0] = EINPROGRESS;
1890			splx(s);
1891			return (0);
1892		}
1893	}
1894	splx(s);
1895
1896	s = splbio();
1897	for (cb = TAILQ_FIRST(&ki->kaio_bufdone); cb; cb = TAILQ_NEXT(cb,
1898	    plist)) {
1899		if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1900		    jobref) {
1901			td->td_retval[0] = cb->uaiocb._aiocb_private.error;
1902			splx(s);
1903			return (0);
1904		}
1905	}
1906
1907	for (cb = TAILQ_FIRST(&ki->kaio_bufqueue); cb; cb = TAILQ_NEXT(cb,
1908	    plist)) {
1909		if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo) ==
1910		    jobref) {
1911			td->td_retval[0] = EINPROGRESS;
1912			splx(s);
1913			return (0);
1914		}
1915	}
1916	splx(s);
1917
1918#if (0)
1919	/*
1920	 * Hack for lio.
1921	 */
1922	status = fuword(&uap->aiocbp->_aiocb_private.status);
1923	if (status == -1)
1924		return fuword(&uap->aiocbp->_aiocb_private.error);
1925#endif
1926	return (EINVAL);
1927}
1928
1929/* syscall - asynchronous read from a file (REALTIME) */
1930int
1931aio_read(struct thread *td, struct aio_read_args *uap)
1932{
1933
1934	return aio_aqueue(td, uap->aiocbp, LIO_READ);
1935}
1936
1937/* syscall - asynchronous write to a file (REALTIME) */
1938int
1939aio_write(struct thread *td, struct aio_write_args *uap)
1940{
1941
1942	return aio_aqueue(td, uap->aiocbp, LIO_WRITE);
1943}
1944
1945/* syscall - list directed I/O (REALTIME) */
1946int
1947lio_listio(struct thread *td, struct lio_listio_args *uap)
1948{
1949	struct proc *p = td->td_proc;
1950	int nent, nentqueued;
1951	struct aiocb *iocb, * const *cbptr;
1952	struct aiocblist *cb;
1953	struct kaioinfo *ki;
1954	struct aio_liojob *lj;
1955	int error, runningcode;
1956	int nerror;
1957	int i;
1958	int s;
1959
1960	if ((uap->mode != LIO_NOWAIT) && (uap->mode != LIO_WAIT))
1961		return (EINVAL);
1962
1963	nent = uap->nent;
1964	if (nent < 0 || nent > AIO_LISTIO_MAX)
1965		return (EINVAL);
1966
1967	if (p->p_aioinfo == NULL)
1968		aio_init_aioinfo(p);
1969
1970	if ((nent + num_queue_count) > max_queue_count)
1971		return (EAGAIN);
1972
1973	ki = p->p_aioinfo;
1974	if ((nent + ki->kaio_queue_count) > ki->kaio_qallowed_count)
1975		return (EAGAIN);
1976
1977	lj = uma_zalloc(aiolio_zone, M_WAITOK);
1978	if (!lj)
1979		return (EAGAIN);
1980
1981	lj->lioj_flags = 0;
1982	lj->lioj_buffer_count = 0;
1983	lj->lioj_buffer_finished_count = 0;
1984	lj->lioj_queue_count = 0;
1985	lj->lioj_queue_finished_count = 0;
1986	lj->lioj_ki = ki;
1987
1988	/*
1989	 * Setup signal.
1990	 */
1991	if (uap->sig && (uap->mode == LIO_NOWAIT)) {
1992		error = copyin(uap->sig, &lj->lioj_signal,
1993		    sizeof(lj->lioj_signal));
1994		if (error) {
1995			uma_zfree(aiolio_zone, lj);
1996			return (error);
1997		}
1998		if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) {
1999			uma_zfree(aiolio_zone, lj);
2000			return (EINVAL);
2001		}
2002		lj->lioj_flags |= LIOJ_SIGNAL;
2003	}
2004	TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list);
2005	/*
2006	 * Get pointers to the list of I/O requests.
2007	 */
2008	nerror = 0;
2009	nentqueued = 0;
2010	cbptr = uap->acb_list;
2011	for (i = 0; i < uap->nent; i++) {
2012		iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]);
2013		if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) {
2014			error = _aio_aqueue(td, iocb, lj, 0);
2015			if (error == 0)
2016				nentqueued++;
2017			else
2018				nerror++;
2019		}
2020	}
2021
2022	/*
2023	 * If we haven't queued any, then just return error.
2024	 */
2025	if (nentqueued == 0)
2026		return (0);
2027
2028	/*
2029	 * Calculate the appropriate error return.
2030	 */
2031	runningcode = 0;
2032	if (nerror)
2033		runningcode = EIO;
2034
2035	if (uap->mode == LIO_WAIT) {
2036		int command, found, jobref;
2037
2038		for (;;) {
2039			found = 0;
2040			for (i = 0; i < uap->nent; i++) {
2041				/*
2042				 * Fetch address of the control buf pointer in
2043				 * user space.
2044				 */
2045				iocb = (struct aiocb *)
2046				    (intptr_t)fuword(&cbptr[i]);
2047				if (((intptr_t)iocb == -1) || ((intptr_t)iocb
2048				    == 0))
2049					continue;
2050
2051				/*
2052				 * Fetch the associated command from user space.
2053				 */
2054				command = fuword(&iocb->aio_lio_opcode);
2055				if (command == LIO_NOP) {
2056					found++;
2057					continue;
2058				}
2059
2060				jobref =
2061				    fuword(&iocb->_aiocb_private.kernelinfo);
2062
2063				TAILQ_FOREACH(cb, &ki->kaio_jobdone, plist) {
2064					if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo)
2065					    == jobref) {
2066						if (cb->uaiocb.aio_lio_opcode
2067						    == LIO_WRITE) {
2068							p->p_stats->p_ru.ru_oublock
2069							    +=
2070							    cb->outputcharge;
2071							cb->outputcharge = 0;
2072						} else if (cb->uaiocb.aio_lio_opcode
2073						    == LIO_READ) {
2074							p->p_stats->p_ru.ru_inblock
2075							    += cb->inputcharge;
2076							cb->inputcharge = 0;
2077						}
2078						found++;
2079						break;
2080					}
2081				}
2082
2083				s = splbio();
2084				TAILQ_FOREACH(cb, &ki->kaio_bufdone, plist) {
2085					if (((intptr_t)cb->uaiocb._aiocb_private.kernelinfo)
2086					    == jobref) {
2087						found++;
2088						break;
2089					}
2090				}
2091				splx(s);
2092			}
2093
2094			/*
2095			 * If all I/Os have been disposed of, then we can
2096			 * return.
2097			 */
2098			if (found == nentqueued)
2099				return (runningcode);
2100
2101			ki->kaio_flags |= KAIO_WAKEUP;
2102			error = tsleep(p, PRIBIO | PCATCH, "aiospn", 0);
2103
2104			if (error == EINTR)
2105				return (EINTR);
2106			else if (error == EWOULDBLOCK)
2107				return (EAGAIN);
2108		}
2109	}
2110
2111	return (runningcode);
2112}
2113
2114/*
2115 * This is a weird hack so that we can post a signal.  It is safe to do so from
2116 * a timeout routine, but *not* from an interrupt routine.
2117 */
2118static void
2119process_signal(void *aioj)
2120{
2121	struct aiocblist *aiocbe = aioj;
2122	struct aio_liojob *lj = aiocbe->lio;
2123	struct aiocb *cb = &aiocbe->uaiocb;
2124
2125	if ((lj) && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL) &&
2126		(lj->lioj_queue_count == lj->lioj_queue_finished_count)) {
2127		PROC_LOCK(lj->lioj_ki->kaio_p);
2128		psignal(lj->lioj_ki->kaio_p, lj->lioj_signal.sigev_signo);
2129		PROC_UNLOCK(lj->lioj_ki->kaio_p);
2130		lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
2131	}
2132
2133	if (cb->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2134		PROC_LOCK(aiocbe->userproc);
2135		psignal(aiocbe->userproc, cb->aio_sigevent.sigev_signo);
2136		PROC_UNLOCK(aiocbe->userproc);
2137	}
2138}
2139
2140/*
2141 * Interrupt handler for physio, performs the necessary process wakeups, and
2142 * signals.
2143 */
2144static void
2145aio_physwakeup(struct buf *bp)
2146{
2147	struct aiocblist *aiocbe;
2148	struct proc *p;
2149	struct kaioinfo *ki;
2150	struct aio_liojob *lj;
2151
2152	wakeup(bp);
2153
2154	aiocbe = (struct aiocblist *)bp->b_caller1;
2155	if (aiocbe) {
2156		p = aiocbe->userproc;
2157
2158		aiocbe->jobstate = JOBST_JOBBFINISHED;
2159		aiocbe->uaiocb._aiocb_private.status -= bp->b_resid;
2160		aiocbe->uaiocb._aiocb_private.error = 0;
2161		aiocbe->jobflags |= AIOCBLIST_DONE;
2162
2163		if (bp->b_ioflags & BIO_ERROR)
2164			aiocbe->uaiocb._aiocb_private.error = bp->b_error;
2165
2166		lj = aiocbe->lio;
2167		if (lj) {
2168			lj->lioj_buffer_finished_count++;
2169
2170			/*
2171			 * wakeup/signal if all of the interrupt jobs are done.
2172			 */
2173			if (lj->lioj_buffer_finished_count ==
2174			    lj->lioj_buffer_count) {
2175				/*
2176				 * Post a signal if it is called for.
2177				 */
2178				if ((lj->lioj_flags &
2179				    (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) ==
2180				    LIOJ_SIGNAL) {
2181					lj->lioj_flags |= LIOJ_SIGNAL_POSTED;
2182					aiocbe->timeouthandle =
2183						timeout(process_signal,
2184							aiocbe, 0);
2185				}
2186			}
2187		}
2188
2189		ki = p->p_aioinfo;
2190		if (ki) {
2191			ki->kaio_buffer_finished_count++;
2192			TAILQ_REMOVE(&aio_bufjobs, aiocbe, list);
2193			TAILQ_REMOVE(&ki->kaio_bufqueue, aiocbe, plist);
2194			TAILQ_INSERT_TAIL(&ki->kaio_bufdone, aiocbe, plist);
2195
2196			KNOTE_UNLOCKED(&aiocbe->klist, 0);
2197			/* Do the wakeup. */
2198			if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) {
2199				ki->kaio_flags &= ~KAIO_WAKEUP;
2200				wakeup(p);
2201			}
2202		}
2203
2204		if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL)
2205			aiocbe->timeouthandle =
2206				timeout(process_signal, aiocbe, 0);
2207	}
2208}
2209
2210/* syscall - wait for the next completion of an aio request */
2211int
2212aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap)
2213{
2214	struct proc *p = td->td_proc;
2215	struct timeval atv;
2216	struct timespec ts;
2217	struct kaioinfo *ki;
2218	struct aiocblist *cb = NULL;
2219	int error, s, timo;
2220
2221	suword(uap->aiocbp, (int)NULL);
2222
2223	timo = 0;
2224	if (uap->timeout) {
2225		/* Get timespec struct. */
2226		error = copyin(uap->timeout, &ts, sizeof(ts));
2227		if (error)
2228			return (error);
2229
2230		if ((ts.tv_nsec < 0) || (ts.tv_nsec >= 1000000000))
2231			return (EINVAL);
2232
2233		TIMESPEC_TO_TIMEVAL(&atv, &ts);
2234		if (itimerfix(&atv))
2235			return (EINVAL);
2236		timo = tvtohz(&atv);
2237	}
2238
2239	ki = p->p_aioinfo;
2240	if (ki == NULL)
2241		return (EAGAIN);
2242
2243	for (;;) {
2244		if ((cb = TAILQ_FIRST(&ki->kaio_jobdone)) != 0) {
2245			suword(uap->aiocbp, (uintptr_t)cb->uuaiocb);
2246			td->td_retval[0] = cb->uaiocb._aiocb_private.status;
2247			if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) {
2248				p->p_stats->p_ru.ru_oublock +=
2249				    cb->outputcharge;
2250				cb->outputcharge = 0;
2251			} else if (cb->uaiocb.aio_lio_opcode == LIO_READ) {
2252				p->p_stats->p_ru.ru_inblock += cb->inputcharge;
2253				cb->inputcharge = 0;
2254			}
2255			aio_free_entry(cb);
2256			return (cb->uaiocb._aiocb_private.error);
2257		}
2258
2259		s = splbio();
2260 		if ((cb = TAILQ_FIRST(&ki->kaio_bufdone)) != 0 ) {
2261			splx(s);
2262			suword(uap->aiocbp, (uintptr_t)cb->uuaiocb);
2263			td->td_retval[0] = cb->uaiocb._aiocb_private.status;
2264			aio_free_entry(cb);
2265			return (cb->uaiocb._aiocb_private.error);
2266		}
2267
2268		ki->kaio_flags |= KAIO_WAKEUP;
2269		error = tsleep(p, PRIBIO | PCATCH, "aiowc", timo);
2270		splx(s);
2271
2272		if (error == ERESTART)
2273			return (EINTR);
2274		else if (error < 0)
2275			return (error);
2276		else if (error == EINTR)
2277			return (EINTR);
2278		else if (error == EWOULDBLOCK)
2279			return (EAGAIN);
2280	}
2281}
2282
2283/* kqueue attach function */
2284static int
2285filt_aioattach(struct knote *kn)
2286{
2287	struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2288
2289	/*
2290	 * The aiocbe pointer must be validated before using it, so
2291	 * registration is restricted to the kernel; the user cannot
2292	 * set EV_FLAG1.
2293	 */
2294	if ((kn->kn_flags & EV_FLAG1) == 0)
2295		return (EPERM);
2296	kn->kn_flags &= ~EV_FLAG1;
2297
2298	knlist_add(&aiocbe->klist, kn, 0);
2299
2300	return (0);
2301}
2302
2303/* kqueue detach function */
2304static void
2305filt_aiodetach(struct knote *kn)
2306{
2307	struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2308
2309	knlist_remove(&aiocbe->klist, kn, 0);
2310}
2311
2312/* kqueue filter function */
2313/*ARGSUSED*/
2314static int
2315filt_aio(struct knote *kn, long hint)
2316{
2317	struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata;
2318
2319	kn->kn_data = aiocbe->uaiocb._aiocb_private.error;
2320	if (aiocbe->jobstate != JOBST_JOBFINISHED &&
2321	    aiocbe->jobstate != JOBST_JOBBFINISHED)
2322		return (0);
2323	kn->kn_flags |= EV_EOF;
2324	return (1);
2325}
2326