vfs_aio.c revision 154706
1/*- 2 * Copyright (c) 1997 John S. Dyson. All rights reserved. 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. John S. Dyson's name may not be used to endorse or promote products 10 * derived from this software without specific prior written permission. 11 * 12 * DISCLAIMER: This code isn't warranted to do anything useful. Anything 13 * bad that happens because of using this software isn't the responsibility 14 * of the author. This software is distributed AS-IS. 15 */ 16 17/* 18 * This file contains support for the POSIX 1003.1B AIO/LIO facility. 19 */ 20 21#include <sys/cdefs.h> 22__FBSDID("$FreeBSD: head/sys/kern/vfs_aio.c 154706 2006-01-23 10:27:15Z davidxu $"); 23 24#include <sys/param.h> 25#include <sys/systm.h> 26#include <sys/malloc.h> 27#include <sys/bio.h> 28#include <sys/buf.h> 29#include <sys/eventhandler.h> 30#include <sys/sysproto.h> 31#include <sys/filedesc.h> 32#include <sys/kernel.h> 33#include <sys/module.h> 34#include <sys/kthread.h> 35#include <sys/fcntl.h> 36#include <sys/file.h> 37#include <sys/limits.h> 38#include <sys/lock.h> 39#include <sys/mutex.h> 40#include <sys/unistd.h> 41#include <sys/proc.h> 42#include <sys/resourcevar.h> 43#include <sys/signalvar.h> 44#include <sys/protosw.h> 45#include <sys/sema.h> 46#include <sys/socket.h> 47#include <sys/socketvar.h> 48#include <sys/syscall.h> 49#include <sys/sysent.h> 50#include <sys/sysctl.h> 51#include <sys/sx.h> 52#include <sys/taskqueue.h> 53#include <sys/vnode.h> 54#include <sys/conf.h> 55#include <sys/event.h> 56 57#include <machine/atomic.h> 58 59#include <posix4/posix4.h> 60#include <vm/vm.h> 61#include <vm/vm_extern.h> 62#include <vm/pmap.h> 63#include <vm/vm_map.h> 64#include <vm/uma.h> 65#include <sys/aio.h> 66 67#include "opt_vfs_aio.h" 68 69/* 70 * Counter for allocating reference ids to new jobs. Wrapped to 1 on 71 * overflow. 72 */ 73static long jobrefid; 74 75#define JOBST_NULL 0x0 76#define JOBST_JOBQSOCK 0x1 77#define JOBST_JOBQGLOBAL 0x2 78#define JOBST_JOBRUNNING 0x3 79#define JOBST_JOBFINISHED 0x4 80#define JOBST_JOBQBUF 0x5 81 82#ifndef MAX_AIO_PER_PROC 83#define MAX_AIO_PER_PROC 32 84#endif 85 86#ifndef MAX_AIO_QUEUE_PER_PROC 87#define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */ 88#endif 89 90#ifndef MAX_AIO_PROCS 91#define MAX_AIO_PROCS 32 92#endif 93 94#ifndef MAX_AIO_QUEUE 95#define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */ 96#endif 97 98#ifndef TARGET_AIO_PROCS 99#define TARGET_AIO_PROCS 4 100#endif 101 102#ifndef MAX_BUF_AIO 103#define MAX_BUF_AIO 16 104#endif 105 106#ifndef AIOD_TIMEOUT_DEFAULT 107#define AIOD_TIMEOUT_DEFAULT (10 * hz) 108#endif 109 110#ifndef AIOD_LIFETIME_DEFAULT 111#define AIOD_LIFETIME_DEFAULT (30 * hz) 112#endif 113 114static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0, "Async IO management"); 115 116static int max_aio_procs = MAX_AIO_PROCS; 117SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs, 118 CTLFLAG_RW, &max_aio_procs, 0, 119 "Maximum number of kernel threads to use for handling async IO "); 120 121static int num_aio_procs = 0; 122SYSCTL_INT(_vfs_aio, OID_AUTO, num_aio_procs, 123 CTLFLAG_RD, &num_aio_procs, 0, 124 "Number of presently active kernel threads for async IO"); 125 126/* 127 * The code will adjust the actual number of AIO processes towards this 128 * number when it gets a chance. 129 */ 130static int target_aio_procs = TARGET_AIO_PROCS; 131SYSCTL_INT(_vfs_aio, OID_AUTO, target_aio_procs, CTLFLAG_RW, &target_aio_procs, 132 0, "Preferred number of ready kernel threads for async IO"); 133 134static int max_queue_count = MAX_AIO_QUEUE; 135SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue, CTLFLAG_RW, &max_queue_count, 0, 136 "Maximum number of aio requests to queue, globally"); 137 138static int num_queue_count = 0; 139SYSCTL_INT(_vfs_aio, OID_AUTO, num_queue_count, CTLFLAG_RD, &num_queue_count, 0, 140 "Number of queued aio requests"); 141 142static int num_buf_aio = 0; 143SYSCTL_INT(_vfs_aio, OID_AUTO, num_buf_aio, CTLFLAG_RD, &num_buf_aio, 0, 144 "Number of aio requests presently handled by the buf subsystem"); 145 146/* Number of async I/O thread in the process of being started */ 147/* XXX This should be local to aio_aqueue() */ 148static int num_aio_resv_start = 0; 149 150static int aiod_timeout; 151SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_timeout, CTLFLAG_RW, &aiod_timeout, 0, 152 "Timeout value for synchronous aio operations"); 153 154static int aiod_lifetime; 155SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0, 156 "Maximum lifetime for idle aiod"); 157 158static int unloadable = 0; 159SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0, 160 "Allow unload of aio (not recommended)"); 161 162 163static int max_aio_per_proc = MAX_AIO_PER_PROC; 164SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc, 165 0, "Maximum active aio requests per process (stored in the process)"); 166 167static int max_aio_queue_per_proc = MAX_AIO_QUEUE_PER_PROC; 168SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_queue_per_proc, CTLFLAG_RW, 169 &max_aio_queue_per_proc, 0, 170 "Maximum queued aio requests per process (stored in the process)"); 171 172static int max_buf_aio = MAX_BUF_AIO; 173SYSCTL_INT(_vfs_aio, OID_AUTO, max_buf_aio, CTLFLAG_RW, &max_buf_aio, 0, 174 "Maximum buf aio requests per process (stored in the process)"); 175 176typedef struct oaiocb { 177 int aio_fildes; /* File descriptor */ 178 off_t aio_offset; /* File offset for I/O */ 179 volatile void *aio_buf; /* I/O buffer in process space */ 180 size_t aio_nbytes; /* Number of bytes for I/O */ 181 struct osigevent aio_sigevent; /* Signal to deliver */ 182 int aio_lio_opcode; /* LIO opcode */ 183 int aio_reqprio; /* Request priority -- ignored */ 184 struct __aiocb_private _aiocb_private; 185} oaiocb_t; 186 187struct aiocblist { 188 TAILQ_ENTRY(aiocblist) list; /* List of jobs */ 189 TAILQ_ENTRY(aiocblist) plist; /* List of jobs for proc */ 190 TAILQ_ENTRY(aiocblist) allist; 191 int jobflags; 192 int jobstate; 193 int inputcharge; 194 int outputcharge; 195 struct buf *bp; /* Buffer pointer */ 196 struct proc *userproc; /* User process */ 197 struct ucred *cred; /* Active credential when created */ 198 struct file *fd_file; /* Pointer to file structure */ 199 struct aioliojob *lio; /* Optional lio job */ 200 struct aiocb *uuaiocb; /* Pointer in userspace of aiocb */ 201 struct knlist klist; /* list of knotes */ 202 struct aiocb uaiocb; /* Kernel I/O control block */ 203 ksiginfo_t ksi; /* Realtime signal info */ 204 struct task biotask; 205}; 206 207/* jobflags */ 208#define AIOCBLIST_RUNDOWN 0x04 209#define AIOCBLIST_DONE 0x10 210#define AIOCBLIST_BUFDONE 0x20 211 212/* 213 * AIO process info 214 */ 215#define AIOP_FREE 0x1 /* proc on free queue */ 216 217struct aiothreadlist { 218 int aiothreadflags; /* AIO proc flags */ 219 TAILQ_ENTRY(aiothreadlist) list; /* List of processes */ 220 struct thread *aiothread; /* The AIO thread */ 221}; 222 223/* 224 * data-structure for lio signal management 225 */ 226struct aioliojob { 227 int lioj_flags; 228 int lioj_count; 229 int lioj_finished_count; 230 struct sigevent lioj_signal; /* signal on all I/O done */ 231 TAILQ_ENTRY(aioliojob) lioj_list; 232 struct knlist klist; /* list of knotes */ 233 ksiginfo_t lioj_ksi; /* Realtime signal info */ 234}; 235 236#define LIOJ_SIGNAL 0x1 /* signal on all done (lio) */ 237#define LIOJ_SIGNAL_POSTED 0x2 /* signal has been posted */ 238#define LIOJ_KEVENT_POSTED 0x4 /* kevent triggered */ 239 240/* 241 * per process aio data structure 242 */ 243struct kaioinfo { 244 int kaio_flags; /* per process kaio flags */ 245 int kaio_maxactive_count; /* maximum number of AIOs */ 246 int kaio_active_count; /* number of currently used AIOs */ 247 int kaio_qallowed_count; /* maxiumu size of AIO queue */ 248 int kaio_count; /* size of AIO queue */ 249 int kaio_ballowed_count; /* maximum number of buffers */ 250 int kaio_buffer_count; /* number of physio buffers */ 251 TAILQ_HEAD(,aiocblist) kaio_all; /* all AIOs in the process */ 252 TAILQ_HEAD(,aiocblist) kaio_done; /* done queue for process */ 253 TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* list of lio jobs */ 254 TAILQ_HEAD(,aiocblist) kaio_jobqueue; /* job queue for process */ 255 TAILQ_HEAD(,aiocblist) kaio_bufqueue; /* buffer job queue for process */ 256 TAILQ_HEAD(,aiocblist) kaio_sockqueue; /* queue for aios waiting on sockets */ 257}; 258 259#define KAIO_RUNDOWN 0x1 /* process is being run down */ 260#define KAIO_WAKEUP 0x2 /* wakeup process when there is a significant event */ 261 262static TAILQ_HEAD(,aiothreadlist) aio_freeproc; /* Idle daemons */ 263static struct sema aio_newproc_sem; 264static struct mtx aio_job_mtx; 265static struct mtx aio_sock_mtx; 266static TAILQ_HEAD(,aiocblist) aio_jobs; /* Async job list */ 267static struct unrhdr *aiod_unr; 268 269static void aio_init_aioinfo(struct proc *p); 270static void aio_onceonly(void); 271static int aio_free_entry(struct aiocblist *aiocbe); 272static void aio_process(struct aiocblist *aiocbe); 273static int aio_newproc(int *); 274static int aio_aqueue(struct thread *td, struct aiocb *job, 275 struct aioliojob *lio, int type, int osigev); 276static void aio_physwakeup(struct buf *bp); 277static void aio_proc_rundown(void *arg, struct proc *p); 278static int aio_qphysio(struct proc *p, struct aiocblist *iocb); 279static void biohelper(void *, int); 280static void aio_daemon(void *param); 281static void aio_swake_cb(struct socket *, struct sockbuf *); 282static int aio_unload(void); 283static int filt_aioattach(struct knote *kn); 284static void filt_aiodetach(struct knote *kn); 285static int filt_aio(struct knote *kn, long hint); 286static int filt_lioattach(struct knote *kn); 287static void filt_liodetach(struct knote *kn); 288static int filt_lio(struct knote *kn, long hint); 289#define DONE_BUF 1 290#define DONE_QUEUE 2 291static void aio_bio_done_notify( struct proc *userp, struct aiocblist *aiocbe, int type); 292static int do_lio_listio(struct thread *td, struct lio_listio_args *uap, 293 int oldsigev); 294 295/* 296 * Zones for: 297 * kaio Per process async io info 298 * aiop async io thread data 299 * aiocb async io jobs 300 * aiol list io job pointer - internal to aio_suspend XXX 301 * aiolio list io jobs 302 */ 303static uma_zone_t kaio_zone, aiop_zone, aiocb_zone, aiol_zone, aiolio_zone; 304 305/* kqueue filters for aio */ 306static struct filterops aio_filtops = 307 { 0, filt_aioattach, filt_aiodetach, filt_aio }; 308static struct filterops lio_filtops = 309 { 0, filt_lioattach, filt_liodetach, filt_lio }; 310 311static eventhandler_tag exit_tag, exec_tag; 312 313TASKQUEUE_DEFINE_THREAD(aiod_bio); 314 315/* 316 * Main operations function for use as a kernel module. 317 */ 318static int 319aio_modload(struct module *module, int cmd, void *arg) 320{ 321 int error = 0; 322 323 switch (cmd) { 324 case MOD_LOAD: 325 aio_onceonly(); 326 break; 327 case MOD_UNLOAD: 328 error = aio_unload(); 329 break; 330 case MOD_SHUTDOWN: 331 break; 332 default: 333 error = EINVAL; 334 break; 335 } 336 return (error); 337} 338 339static moduledata_t aio_mod = { 340 "aio", 341 &aio_modload, 342 NULL 343}; 344 345SYSCALL_MODULE_HELPER(aio_return); 346SYSCALL_MODULE_HELPER(aio_suspend); 347SYSCALL_MODULE_HELPER(aio_cancel); 348SYSCALL_MODULE_HELPER(aio_error); 349SYSCALL_MODULE_HELPER(aio_read); 350SYSCALL_MODULE_HELPER(aio_write); 351SYSCALL_MODULE_HELPER(aio_waitcomplete); 352SYSCALL_MODULE_HELPER(lio_listio); 353SYSCALL_MODULE_HELPER(oaio_read); 354SYSCALL_MODULE_HELPER(oaio_write); 355SYSCALL_MODULE_HELPER(olio_listio); 356 357DECLARE_MODULE(aio, aio_mod, 358 SI_SUB_VFS, SI_ORDER_ANY); 359MODULE_VERSION(aio, 1); 360 361/* 362 * Startup initialization 363 */ 364static void 365aio_onceonly(void) 366{ 367 368 /* XXX: should probably just use so->callback */ 369 aio_swake = &aio_swake_cb; 370 exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL, 371 EVENTHANDLER_PRI_ANY); 372 exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown, NULL, 373 EVENTHANDLER_PRI_ANY); 374 kqueue_add_filteropts(EVFILT_AIO, &aio_filtops); 375 kqueue_add_filteropts(EVFILT_LIO, &lio_filtops); 376 TAILQ_INIT(&aio_freeproc); 377 sema_init(&aio_newproc_sem, 0, "aio_new_proc"); 378 mtx_init(&aio_job_mtx, "aio_job", NULL, MTX_DEF); 379 mtx_init(&aio_sock_mtx, "aio_sock", NULL, MTX_DEF); 380 TAILQ_INIT(&aio_jobs); 381 aiod_unr = new_unrhdr(1, INT_MAX, NULL); 382 kaio_zone = uma_zcreate("AIO", sizeof(struct kaioinfo), NULL, NULL, 383 NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE); 384 aiop_zone = uma_zcreate("AIOP", sizeof(struct aiothreadlist), NULL, 385 NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE); 386 aiocb_zone = uma_zcreate("AIOCB", sizeof(struct aiocblist), NULL, NULL, 387 NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE); 388 aiol_zone = uma_zcreate("AIOL", AIO_LISTIO_MAX*sizeof(intptr_t) , NULL, 389 NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE); 390 aiolio_zone = uma_zcreate("AIOLIO", sizeof(struct aioliojob), NULL, 391 NULL, NULL, NULL, UMA_ALIGN_PTR, UMA_ZONE_NOFREE); 392 aiod_timeout = AIOD_TIMEOUT_DEFAULT; 393 aiod_lifetime = AIOD_LIFETIME_DEFAULT; 394 jobrefid = 1; 395 async_io_version = _POSIX_VERSION; 396 p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, AIO_LISTIO_MAX); 397 p31b_setcfg(CTL_P1003_1B_AIO_MAX, MAX_AIO_QUEUE); 398 p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, 0); 399} 400 401/* 402 * Callback for unload of AIO when used as a module. 403 */ 404static int 405aio_unload(void) 406{ 407 int error; 408 409 /* 410 * XXX: no unloads by default, it's too dangerous. 411 * perhaps we could do it if locked out callers and then 412 * did an aio_proc_rundown() on each process. 413 * 414 * jhb: aio_proc_rundown() needs to run on curproc though, 415 * so I don't think that would fly. 416 */ 417 if (!unloadable) 418 return (EOPNOTSUPP); 419 420 error = kqueue_del_filteropts(EVFILT_AIO); 421 if (error) 422 return error; 423 async_io_version = 0; 424 aio_swake = NULL; 425 taskqueue_free(taskqueue_aiod_bio); 426 delete_unrhdr(aiod_unr); 427 EVENTHANDLER_DEREGISTER(process_exit, exit_tag); 428 EVENTHANDLER_DEREGISTER(process_exec, exec_tag); 429 mtx_destroy(&aio_job_mtx); 430 mtx_destroy(&aio_sock_mtx); 431 sema_destroy(&aio_newproc_sem); 432 p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1); 433 p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1); 434 p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1); 435 return (0); 436} 437 438/* 439 * Init the per-process aioinfo structure. The aioinfo limits are set 440 * per-process for user limit (resource) management. 441 */ 442static void 443aio_init_aioinfo(struct proc *p) 444{ 445 struct kaioinfo *ki; 446 447 ki = uma_zalloc(kaio_zone, M_WAITOK); 448 ki->kaio_flags = 0; 449 ki->kaio_maxactive_count = max_aio_per_proc; 450 ki->kaio_active_count = 0; 451 ki->kaio_qallowed_count = max_aio_queue_per_proc; 452 ki->kaio_count = 0; 453 ki->kaio_ballowed_count = max_buf_aio; 454 ki->kaio_buffer_count = 0; 455 TAILQ_INIT(&ki->kaio_all); 456 TAILQ_INIT(&ki->kaio_done); 457 TAILQ_INIT(&ki->kaio_jobqueue); 458 TAILQ_INIT(&ki->kaio_bufqueue); 459 TAILQ_INIT(&ki->kaio_liojoblist); 460 TAILQ_INIT(&ki->kaio_sockqueue); 461 PROC_LOCK(p); 462 if (p->p_aioinfo == NULL) { 463 p->p_aioinfo = ki; 464 PROC_UNLOCK(p); 465 } else { 466 PROC_UNLOCK(p); 467 uma_zfree(kaio_zone, ki); 468 } 469 470 while (num_aio_procs < target_aio_procs) 471 aio_newproc(NULL); 472} 473 474static int 475aio_sendsig(struct proc *p, struct sigevent *sigev, ksiginfo_t *ksi) 476{ 477 PROC_LOCK_ASSERT(p, MA_OWNED); 478 if (!KSI_ONQ(ksi)) { 479 ksi->ksi_code = SI_ASYNCIO; 480 ksi->ksi_flags |= KSI_EXT | KSI_INS; 481 return (psignal_event(p, sigev, ksi)); 482 } 483 return (0); 484} 485 486/* 487 * Free a job entry. Wait for completion if it is currently active, but don't 488 * delay forever. If we delay, we return a flag that says that we have to 489 * restart the queue scan. 490 */ 491static int 492aio_free_entry(struct aiocblist *aiocbe) 493{ 494 struct kaioinfo *ki; 495 struct aioliojob *lj; 496 struct proc *p; 497 498 p = aiocbe->userproc; 499 500 PROC_LOCK_ASSERT(p, MA_OWNED); 501 MPASS(curproc == p); 502 MPASS(aiocbe->jobstate == JOBST_JOBFINISHED); 503 504 ki = p->p_aioinfo; 505 MPASS(ki != NULL); 506 507 atomic_subtract_int(&num_queue_count, 1); 508 509 ki->kaio_count--; 510 MPASS(ki->kaio_count >= 0); 511 512 lj = aiocbe->lio; 513 if (lj) { 514 lj->lioj_count--; 515 lj->lioj_finished_count--; 516 517 if (lj->lioj_count == 0) { 518 TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); 519 /* lio is going away, we need to destroy any knotes */ 520 knlist_delete(&lj->klist, curthread, 1); 521 sigqueue_take(&lj->lioj_ksi); 522 uma_zfree(aiolio_zone, lj); 523 } 524 } 525 526 TAILQ_REMOVE(&ki->kaio_done, aiocbe, plist); 527 TAILQ_REMOVE(&ki->kaio_all, aiocbe, allist); 528 529 /* aiocbe is going away, we need to destroy any knotes */ 530 knlist_delete(&aiocbe->klist, curthread, 1); 531 sigqueue_take(&aiocbe->ksi); 532 533 MPASS(aiocbe->bp == NULL); 534 aiocbe->jobstate = JOBST_NULL; 535 536 /* Wake up anyone who has interest to do cleanup work. */ 537 if (ki->kaio_flags & (KAIO_WAKEUP | KAIO_RUNDOWN)) { 538 ki->kaio_flags &= ~KAIO_WAKEUP; 539 wakeup(&p->p_aioinfo); 540 } 541 PROC_UNLOCK(p); 542 543 /* 544 * The thread argument here is used to find the owning process 545 * and is also passed to fo_close() which may pass it to various 546 * places such as devsw close() routines. Because of that, we 547 * need a thread pointer from the process owning the job that is 548 * persistent and won't disappear out from under us or move to 549 * another process. 550 * 551 * Currently, all the callers of this function call it to remove 552 * an aiocblist from the current process' job list either via a 553 * syscall or due to the current process calling exit() or 554 * execve(). Thus, we know that p == curproc. We also know that 555 * curthread can't exit since we are curthread. 556 * 557 * Therefore, we use curthread as the thread to pass to 558 * knlist_delete(). This does mean that it is possible for the 559 * thread pointer at close time to differ from the thread pointer 560 * at open time, but this is already true of file descriptors in 561 * a multithreaded process. 562 */ 563 fdrop(aiocbe->fd_file, curthread); 564 crfree(aiocbe->cred); 565 uma_zfree(aiocb_zone, aiocbe); 566 PROC_LOCK(p); 567 568 return (0); 569} 570 571/* 572 * Rundown the jobs for a given process. 573 */ 574static void 575aio_proc_rundown(void *arg, struct proc *p) 576{ 577 struct kaioinfo *ki; 578 struct aioliojob *lj; 579 struct aiocblist *cbe, *cbn; 580 struct file *fp; 581 struct socket *so; 582 583 KASSERT(curthread->td_proc == p, 584 ("%s: called on non-curproc", __func__)); 585 ki = p->p_aioinfo; 586 if (ki == NULL) 587 return; 588 589 PROC_LOCK(p); 590 591restart: 592 ki->kaio_flags |= KAIO_RUNDOWN; 593 594 /* 595 * Try to cancel all pending requests. This code simulates 596 * aio_cancel on all pending I/O requests. 597 */ 598 while ((cbe = TAILQ_FIRST(&ki->kaio_sockqueue))) { 599 fp = cbe->fd_file; 600 so = fp->f_data; 601 mtx_lock(&aio_sock_mtx); 602 TAILQ_REMOVE(&so->so_aiojobq, cbe, list); 603 mtx_unlock(&aio_sock_mtx); 604 TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist); 605 TAILQ_INSERT_HEAD(&ki->kaio_jobqueue, cbe, plist); 606 cbe->jobstate = JOBST_JOBQGLOBAL; 607 } 608 609 TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) { 610 mtx_lock(&aio_job_mtx); 611 if (cbe->jobstate == JOBST_JOBQGLOBAL) { 612 TAILQ_REMOVE(&aio_jobs, cbe, list); 613 mtx_unlock(&aio_job_mtx); 614 cbe->jobstate = JOBST_JOBFINISHED; 615 cbe->uaiocb._aiocb_private.status = -1; 616 cbe->uaiocb._aiocb_private.error = ECANCELED; 617 TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist); 618 aio_bio_done_notify(p, cbe, DONE_QUEUE); 619 } else { 620 mtx_unlock(&aio_job_mtx); 621 } 622 } 623 624 if (TAILQ_FIRST(&ki->kaio_sockqueue)) 625 goto restart; 626 627 /* Wait for all running I/O to be finished */ 628 if (TAILQ_FIRST(&ki->kaio_bufqueue) || 629 TAILQ_FIRST(&ki->kaio_jobqueue)) { 630 ki->kaio_flags |= KAIO_WAKEUP; 631 msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO, "aioprn", hz); 632 goto restart; 633 } 634 635 /* Free all completed I/O requests. */ 636 while ((cbe = TAILQ_FIRST(&ki->kaio_done)) != NULL) 637 aio_free_entry(cbe); 638 639 while ((lj = TAILQ_FIRST(&ki->kaio_liojoblist)) != NULL) { 640 if (lj->lioj_count == 0) { 641 TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); 642 knlist_delete(&lj->klist, curthread, 1); 643 sigqueue_take(&lj->lioj_ksi); 644 uma_zfree(aiolio_zone, lj); 645 } else { 646 panic("LIO job not cleaned up: C:%d, FC:%d\n", 647 lj->lioj_count, lj->lioj_finished_count); 648 } 649 } 650 651 uma_zfree(kaio_zone, ki); 652 p->p_aioinfo = NULL; 653 PROC_UNLOCK(p); 654} 655 656/* 657 * Select a job to run (called by an AIO daemon). 658 */ 659static struct aiocblist * 660aio_selectjob(struct aiothreadlist *aiop) 661{ 662 struct aiocblist *aiocbe; 663 struct kaioinfo *ki; 664 struct proc *userp; 665 666 mtx_assert(&aio_job_mtx, MA_OWNED); 667 TAILQ_FOREACH(aiocbe, &aio_jobs, list) { 668 userp = aiocbe->userproc; 669 ki = userp->p_aioinfo; 670 671 if (ki->kaio_active_count < ki->kaio_maxactive_count) { 672 TAILQ_REMOVE(&aio_jobs, aiocbe, list); 673 /* Account for currently active jobs. */ 674 ki->kaio_active_count++; 675 aiocbe->jobstate = JOBST_JOBRUNNING; 676 break; 677 } 678 } 679 return (aiocbe); 680} 681 682/* 683 * The AIO processing activity. This is the code that does the I/O request for 684 * the non-physio version of the operations. The normal vn operations are used, 685 * and this code should work in all instances for every type of file, including 686 * pipes, sockets, fifos, and regular files. 687 * 688 * XXX I don't think these code work well with pipes, sockets and fifo, the 689 * problem is the aiod threads can be blocked if there is not data or no 690 * buffer space, and file was not opened with O_NONBLOCK, all aiod threads 691 * will be blocked if there is couple of such processes. We need a FOF_OFFSET 692 * like flag to override f_flag to tell low level system to do non-blocking 693 * I/O, we can not muck O_NONBLOCK because there is full of race between 694 * userland and aiod threads, although there is a trigger mechanism for socket, 695 * but it also does not work well if userland is misbehaviored. 696 */ 697static void 698aio_process(struct aiocblist *aiocbe) 699{ 700 struct ucred *td_savedcred; 701 struct thread *td; 702 struct proc *mycp; 703 struct aiocb *cb; 704 struct file *fp; 705 struct socket *so; 706 struct uio auio; 707 struct iovec aiov; 708 int cnt; 709 int error; 710 int oublock_st, oublock_end; 711 int inblock_st, inblock_end; 712 713 td = curthread; 714 td_savedcred = td->td_ucred; 715 td->td_ucred = aiocbe->cred; 716 mycp = td->td_proc; 717 cb = &aiocbe->uaiocb; 718 fp = aiocbe->fd_file; 719 720 aiov.iov_base = (void *)(uintptr_t)cb->aio_buf; 721 aiov.iov_len = cb->aio_nbytes; 722 723 auio.uio_iov = &aiov; 724 auio.uio_iovcnt = 1; 725 auio.uio_offset = cb->aio_offset; 726 auio.uio_resid = cb->aio_nbytes; 727 cnt = cb->aio_nbytes; 728 auio.uio_segflg = UIO_USERSPACE; 729 auio.uio_td = td; 730 731 inblock_st = mycp->p_stats->p_ru.ru_inblock; 732 oublock_st = mycp->p_stats->p_ru.ru_oublock; 733 /* 734 * aio_aqueue() acquires a reference to the file that is 735 * released in aio_free_entry(). 736 */ 737 if (cb->aio_lio_opcode == LIO_READ) { 738 auio.uio_rw = UIO_READ; 739 error = fo_read(fp, &auio, fp->f_cred, FOF_OFFSET, td); 740 } else { 741 auio.uio_rw = UIO_WRITE; 742 error = fo_write(fp, &auio, fp->f_cred, FOF_OFFSET, td); 743 } 744 inblock_end = mycp->p_stats->p_ru.ru_inblock; 745 oublock_end = mycp->p_stats->p_ru.ru_oublock; 746 747 aiocbe->inputcharge = inblock_end - inblock_st; 748 aiocbe->outputcharge = oublock_end - oublock_st; 749 750 if ((error) && (auio.uio_resid != cnt)) { 751 if (error == ERESTART || error == EINTR || error == EWOULDBLOCK) 752 error = 0; 753 if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) { 754 int sigpipe = 1; 755 if (fp->f_type == DTYPE_SOCKET) { 756 so = fp->f_data; 757 if (so->so_options & SO_NOSIGPIPE) 758 sigpipe = 0; 759 } 760 if (sigpipe) { 761 PROC_LOCK(aiocbe->userproc); 762 psignal(aiocbe->userproc, SIGPIPE); 763 PROC_UNLOCK(aiocbe->userproc); 764 } 765 } 766 } 767 768 cnt -= auio.uio_resid; 769 cb->_aiocb_private.error = error; 770 cb->_aiocb_private.status = cnt; 771 td->td_ucred = td_savedcred; 772} 773 774static void 775aio_bio_done_notify(struct proc *userp, struct aiocblist *aiocbe, int type) 776{ 777 struct aioliojob *lj; 778 struct kaioinfo *ki; 779 int lj_done; 780 781 PROC_LOCK_ASSERT(userp, MA_OWNED); 782 ki = userp->p_aioinfo; 783 lj = aiocbe->lio; 784 lj_done = 0; 785 if (lj) { 786 lj->lioj_finished_count++; 787 if (lj->lioj_count == lj->lioj_finished_count) 788 lj_done = 1; 789 } 790 if (type == DONE_QUEUE) { 791 aiocbe->jobflags |= AIOCBLIST_DONE; 792 } else { 793 aiocbe->jobflags |= AIOCBLIST_BUFDONE; 794 ki->kaio_buffer_count--; 795 } 796 TAILQ_INSERT_TAIL(&ki->kaio_done, aiocbe, plist); 797 aiocbe->jobstate = JOBST_JOBFINISHED; 798 if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL || 799 aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID) 800 aio_sendsig(userp, &aiocbe->uaiocb.aio_sigevent, &aiocbe->ksi); 801 802 KNOTE_LOCKED(&aiocbe->klist, 1); 803 804 if (lj_done) { 805 if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) { 806 lj->lioj_flags |= LIOJ_KEVENT_POSTED; 807 KNOTE_LOCKED(&lj->klist, 1); 808 } 809 if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) 810 == LIOJ_SIGNAL 811 && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL || 812 lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) { 813 aio_sendsig(userp, &lj->lioj_signal, &lj->lioj_ksi); 814 lj->lioj_flags |= LIOJ_SIGNAL_POSTED; 815 } 816 } 817 if (ki->kaio_flags & (KAIO_RUNDOWN|KAIO_WAKEUP)) { 818 ki->kaio_flags &= ~KAIO_WAKEUP; 819 wakeup(&userp->p_aioinfo); 820 } 821} 822 823/* 824 * The AIO daemon, most of the actual work is done in aio_process, 825 * but the setup (and address space mgmt) is done in this routine. 826 */ 827static void 828aio_daemon(void *_id) 829{ 830 struct aiocblist *aiocbe; 831 struct aiothreadlist *aiop; 832 struct kaioinfo *ki; 833 struct proc *curcp, *mycp, *userp; 834 struct vmspace *myvm, *tmpvm; 835 struct thread *td = curthread; 836 struct pgrp *newpgrp; 837 struct session *newsess; 838 int id = (intptr_t)_id; 839 840 /* 841 * Local copies of curproc (cp) and vmspace (myvm) 842 */ 843 mycp = td->td_proc; 844 myvm = mycp->p_vmspace; 845 846 KASSERT(mycp->p_textvp == NULL, ("kthread has a textvp")); 847 848 /* 849 * Allocate and ready the aio control info. There is one aiop structure 850 * per daemon. 851 */ 852 aiop = uma_zalloc(aiop_zone, M_WAITOK); 853 aiop->aiothread = td; 854 aiop->aiothreadflags |= AIOP_FREE; 855 856 /* 857 * Place thread (lightweight process) onto the AIO free thread list. 858 */ 859 mtx_lock(&aio_job_mtx); 860 TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list); 861 mtx_unlock(&aio_job_mtx); 862 863 /* 864 * Get rid of our current filedescriptors. AIOD's don't need any 865 * filedescriptors, except as temporarily inherited from the client. 866 */ 867 fdfree(td); 868 869 /* The daemon resides in its own pgrp. */ 870 MALLOC(newpgrp, struct pgrp *, sizeof(struct pgrp), M_PGRP, 871 M_WAITOK | M_ZERO); 872 MALLOC(newsess, struct session *, sizeof(struct session), M_SESSION, 873 M_WAITOK | M_ZERO); 874 875 sx_xlock(&proctree_lock); 876 enterpgrp(mycp, mycp->p_pid, newpgrp, newsess); 877 sx_xunlock(&proctree_lock); 878 879 /* 880 * Wakeup parent process. (Parent sleeps to keep from blasting away 881 * and creating too many daemons.) 882 */ 883 sema_post(&aio_newproc_sem); 884 885 mtx_lock(&aio_job_mtx); 886 for (;;) { 887 /* 888 * curcp is the current daemon process context. 889 * userp is the current user process context. 890 */ 891 curcp = mycp; 892 893 /* 894 * Take daemon off of free queue 895 */ 896 if (aiop->aiothreadflags & AIOP_FREE) { 897 TAILQ_REMOVE(&aio_freeproc, aiop, list); 898 aiop->aiothreadflags &= ~AIOP_FREE; 899 } 900 901 /* 902 * Check for jobs. 903 */ 904 while ((aiocbe = aio_selectjob(aiop)) != NULL) { 905 mtx_unlock(&aio_job_mtx); 906 userp = aiocbe->userproc; 907 908 /* 909 * Connect to process address space for user program. 910 */ 911 if (userp != curcp) { 912 /* 913 * Save the current address space that we are 914 * connected to. 915 */ 916 tmpvm = mycp->p_vmspace; 917 918 /* 919 * Point to the new user address space, and 920 * refer to it. 921 */ 922 mycp->p_vmspace = userp->p_vmspace; 923 atomic_add_int(&mycp->p_vmspace->vm_refcnt, 1); 924 925 /* Activate the new mapping. */ 926 pmap_activate(FIRST_THREAD_IN_PROC(mycp)); 927 928 /* 929 * If the old address space wasn't the daemons 930 * own address space, then we need to remove the 931 * daemon's reference from the other process 932 * that it was acting on behalf of. 933 */ 934 if (tmpvm != myvm) { 935 vmspace_free(tmpvm); 936 } 937 curcp = userp; 938 } 939 940 ki = userp->p_aioinfo; 941 942 /* Do the I/O function. */ 943 aio_process(aiocbe); 944 945 mtx_lock(&aio_job_mtx); 946 /* Decrement the active job count. */ 947 ki->kaio_active_count--; 948 mtx_unlock(&aio_job_mtx); 949 950 PROC_LOCK(userp); 951 TAILQ_REMOVE(&ki->kaio_jobqueue, aiocbe, plist); 952 aio_bio_done_notify(userp, aiocbe, DONE_QUEUE); 953 if (aiocbe->jobflags & AIOCBLIST_RUNDOWN) { 954 wakeup(aiocbe); 955 aiocbe->jobflags &= ~AIOCBLIST_RUNDOWN; 956 } 957 PROC_UNLOCK(userp); 958 959 mtx_lock(&aio_job_mtx); 960 } 961 962 /* 963 * Disconnect from user address space. 964 */ 965 if (curcp != mycp) { 966 967 mtx_unlock(&aio_job_mtx); 968 969 /* Get the user address space to disconnect from. */ 970 tmpvm = mycp->p_vmspace; 971 972 /* Get original address space for daemon. */ 973 mycp->p_vmspace = myvm; 974 975 /* Activate the daemon's address space. */ 976 pmap_activate(FIRST_THREAD_IN_PROC(mycp)); 977#ifdef DIAGNOSTIC 978 if (tmpvm == myvm) { 979 printf("AIOD: vmspace problem -- %d\n", 980 mycp->p_pid); 981 } 982#endif 983 /* Remove our vmspace reference. */ 984 vmspace_free(tmpvm); 985 986 curcp = mycp; 987 988 mtx_lock(&aio_job_mtx); 989 /* 990 * We have to restart to avoid race, we only sleep if 991 * no job can be selected, that should be 992 * curcp == mycp. 993 */ 994 continue; 995 } 996 997 mtx_assert(&aio_job_mtx, MA_OWNED); 998 999 TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list); 1000 aiop->aiothreadflags |= AIOP_FREE; 1001 1002 /* 1003 * If daemon is inactive for a long time, allow it to exit, 1004 * thereby freeing resources. 1005 */ 1006 if (msleep(aiop->aiothread, &aio_job_mtx, PRIBIO, "aiordy", 1007 aiod_lifetime)) { 1008 if (TAILQ_EMPTY(&aio_jobs)) { 1009 if ((aiop->aiothreadflags & AIOP_FREE) && 1010 (num_aio_procs > target_aio_procs)) { 1011 TAILQ_REMOVE(&aio_freeproc, aiop, list); 1012 num_aio_procs--; 1013 mtx_unlock(&aio_job_mtx); 1014 uma_zfree(aiop_zone, aiop); 1015 free_unr(aiod_unr, id); 1016#ifdef DIAGNOSTIC 1017 if (mycp->p_vmspace->vm_refcnt <= 1) { 1018 printf("AIOD: bad vm refcnt for" 1019 " exiting daemon: %d\n", 1020 mycp->p_vmspace->vm_refcnt); 1021 } 1022#endif 1023 kthread_exit(0); 1024 } 1025 } 1026 } 1027 } 1028 mtx_unlock(&aio_job_mtx); 1029 panic("shouldn't be here\n"); 1030} 1031 1032/* 1033 * Create a new AIO daemon. This is mostly a kernel-thread fork routine. The 1034 * AIO daemon modifies its environment itself. 1035 */ 1036static int 1037aio_newproc(int *start) 1038{ 1039 int error; 1040 struct proc *p; 1041 int id; 1042 1043 id = alloc_unr(aiod_unr); 1044 error = kthread_create(aio_daemon, (void *)(intptr_t)id, &p, 1045 RFNOWAIT, 0, "aiod%d", id); 1046 if (error == 0) { 1047 /* 1048 * Wait until daemon is started. 1049 */ 1050 sema_wait(&aio_newproc_sem); 1051 mtx_lock(&aio_job_mtx); 1052 num_aio_procs++; 1053 if (start != NULL) 1054 *start--; 1055 mtx_unlock(&aio_job_mtx); 1056 } else { 1057 free_unr(aiod_unr, id); 1058 } 1059 return (error); 1060} 1061 1062/* 1063 * Try the high-performance, low-overhead physio method for eligible 1064 * VCHR devices. This method doesn't use an aio helper thread, and 1065 * thus has very low overhead. 1066 * 1067 * Assumes that the caller, aio_aqueue(), has incremented the file 1068 * structure's reference count, preventing its deallocation for the 1069 * duration of this call. 1070 */ 1071static int 1072aio_qphysio(struct proc *p, struct aiocblist *aiocbe) 1073{ 1074 struct aiocb *cb; 1075 struct file *fp; 1076 struct buf *bp; 1077 struct vnode *vp; 1078 struct kaioinfo *ki; 1079 struct aioliojob *lj; 1080 int error; 1081 1082 cb = &aiocbe->uaiocb; 1083 fp = aiocbe->fd_file; 1084 1085 if (fp->f_type != DTYPE_VNODE) 1086 return (-1); 1087 1088 vp = fp->f_vnode; 1089 1090 /* 1091 * If its not a disk, we don't want to return a positive error. 1092 * It causes the aio code to not fall through to try the thread 1093 * way when you're talking to a regular file. 1094 */ 1095 if (!vn_isdisk(vp, &error)) { 1096 if (error == ENOTBLK) 1097 return (-1); 1098 else 1099 return (error); 1100 } 1101 1102 if (cb->aio_nbytes % vp->v_bufobj.bo_bsize) 1103 return (-1); 1104 1105 if (cb->aio_nbytes > vp->v_rdev->si_iosize_max) 1106 return (-1); 1107 1108 if (cb->aio_nbytes > 1109 MAXPHYS - (((vm_offset_t) cb->aio_buf) & PAGE_MASK)) 1110 return (-1); 1111 1112 ki = p->p_aioinfo; 1113 if (ki->kaio_buffer_count >= ki->kaio_ballowed_count) 1114 return (-1); 1115 1116 /* Create and build a buffer header for a transfer. */ 1117 bp = (struct buf *)getpbuf(NULL); 1118 BUF_KERNPROC(bp); 1119 1120 PROC_LOCK(p); 1121 ki->kaio_count++; 1122 ki->kaio_buffer_count++; 1123 lj = aiocbe->lio; 1124 if (lj) 1125 lj->lioj_count++; 1126 PROC_UNLOCK(p); 1127 1128 /* 1129 * Get a copy of the kva from the physical buffer. 1130 */ 1131 error = 0; 1132 1133 bp->b_bcount = cb->aio_nbytes; 1134 bp->b_bufsize = cb->aio_nbytes; 1135 bp->b_iodone = aio_physwakeup; 1136 bp->b_saveaddr = bp->b_data; 1137 bp->b_data = (void *)(uintptr_t)cb->aio_buf; 1138 bp->b_offset = cb->aio_offset; 1139 bp->b_iooffset = cb->aio_offset; 1140 bp->b_blkno = btodb(cb->aio_offset); 1141 bp->b_iocmd = cb->aio_lio_opcode == LIO_WRITE ? BIO_WRITE : BIO_READ; 1142 1143 /* 1144 * Bring buffer into kernel space. 1145 */ 1146 if (vmapbuf(bp) < 0) { 1147 error = EFAULT; 1148 goto doerror; 1149 } 1150 1151 PROC_LOCK(p); 1152 aiocbe->bp = bp; 1153 bp->b_caller1 = (void *)aiocbe; 1154 TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, aiocbe, plist); 1155 TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist); 1156 aiocbe->jobstate = JOBST_JOBQBUF; 1157 cb->_aiocb_private.status = cb->aio_nbytes; 1158 PROC_UNLOCK(p); 1159 1160 atomic_add_int(&num_queue_count, 1); 1161 atomic_add_int(&num_buf_aio, 1); 1162 1163 bp->b_error = 0; 1164 1165 TASK_INIT(&aiocbe->biotask, 0, biohelper, aiocbe); 1166 1167 /* Perform transfer. */ 1168 dev_strategy(vp->v_rdev, bp); 1169 return (0); 1170 1171doerror: 1172 PROC_LOCK(p); 1173 ki->kaio_count--; 1174 ki->kaio_buffer_count--; 1175 if (lj) 1176 lj->lioj_count--; 1177 aiocbe->bp = NULL; 1178 PROC_UNLOCK(p); 1179 relpbuf(bp, NULL); 1180 return (error); 1181} 1182 1183/* 1184 * Wake up aio requests that may be serviceable now. 1185 */ 1186static void 1187aio_swake_cb(struct socket *so, struct sockbuf *sb) 1188{ 1189 struct aiocblist *cb, *cbn; 1190 struct proc *p; 1191 struct kaioinfo *ki = NULL; 1192 int opcode, wakecount = 0; 1193 struct aiothreadlist *aiop; 1194 1195 if (sb == &so->so_snd) { 1196 opcode = LIO_WRITE; 1197 SOCKBUF_LOCK(&so->so_snd); 1198 so->so_snd.sb_flags &= ~SB_AIO; 1199 SOCKBUF_UNLOCK(&so->so_snd); 1200 } else { 1201 opcode = LIO_READ; 1202 SOCKBUF_LOCK(&so->so_rcv); 1203 so->so_rcv.sb_flags &= ~SB_AIO; 1204 SOCKBUF_UNLOCK(&so->so_rcv); 1205 } 1206 1207 mtx_lock(&aio_sock_mtx); 1208 TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) { 1209 if (opcode == cb->uaiocb.aio_lio_opcode) { 1210 if (cb->jobstate != JOBST_JOBQSOCK) 1211 panic("invalid queue value"); 1212 p = cb->userproc; 1213 ki = p->p_aioinfo; 1214 TAILQ_REMOVE(&so->so_aiojobq, cb, list); 1215 PROC_LOCK(p); 1216 TAILQ_REMOVE(&ki->kaio_sockqueue, cb, plist); 1217 /* 1218 * XXX check AIO_RUNDOWN, and don't put on 1219 * jobqueue if it was set. 1220 */ 1221 TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, cb, plist); 1222 cb->jobstate = JOBST_JOBQGLOBAL; 1223 mtx_lock(&aio_job_mtx); 1224 TAILQ_INSERT_TAIL(&aio_jobs, cb, list); 1225 mtx_unlock(&aio_job_mtx); 1226 PROC_UNLOCK(p); 1227 wakecount++; 1228 } 1229 } 1230 mtx_unlock(&aio_sock_mtx); 1231 1232 while (wakecount--) { 1233 mtx_lock(&aio_job_mtx); 1234 if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) { 1235 TAILQ_REMOVE(&aio_freeproc, aiop, list); 1236 aiop->aiothreadflags &= ~AIOP_FREE; 1237 wakeup(aiop->aiothread); 1238 } 1239 mtx_unlock(&aio_job_mtx); 1240 } 1241} 1242 1243/* 1244 * Queue a new AIO request. Choosing either the threaded or direct physio VCHR 1245 * technique is done in this code. 1246 */ 1247static int 1248aio_aqueue(struct thread *td, struct aiocb *job, struct aioliojob *lj, 1249 int type, int oldsigev) 1250{ 1251 struct proc *p = td->td_proc; 1252 struct file *fp; 1253 struct socket *so; 1254 struct aiocblist *aiocbe; 1255 struct aiothreadlist *aiop; 1256 struct kaioinfo *ki; 1257 struct kevent kev; 1258 struct kqueue *kq; 1259 struct file *kq_fp; 1260 struct sockbuf *sb; 1261 int opcode; 1262 int error; 1263 int fd; 1264 int jid; 1265 1266 if (p->p_aioinfo == NULL) 1267 aio_init_aioinfo(p); 1268 1269 ki = p->p_aioinfo; 1270 1271 suword(&job->_aiocb_private.status, -1); 1272 suword(&job->_aiocb_private.error, 0); 1273 suword(&job->_aiocb_private.kernelinfo, -1); 1274 1275 if (num_queue_count >= max_queue_count || 1276 ki->kaio_count >= ki->kaio_qallowed_count) { 1277 suword(&job->_aiocb_private.error, EAGAIN); 1278 return (EAGAIN); 1279 } 1280 1281 aiocbe = uma_zalloc(aiocb_zone, M_WAITOK | M_ZERO); 1282 aiocbe->inputcharge = 0; 1283 aiocbe->outputcharge = 0; 1284 knlist_init(&aiocbe->klist, &p->p_mtx, NULL, NULL, NULL); 1285 1286 if (oldsigev) { 1287 bzero(&aiocbe->uaiocb, sizeof(struct aiocb)); 1288 error = copyin(job, &aiocbe->uaiocb, sizeof(struct oaiocb)); 1289 bcopy(&aiocbe->uaiocb.__spare__, &aiocbe->uaiocb.aio_sigevent, 1290 sizeof(struct osigevent)); 1291 } else { 1292 error = copyin(job, &aiocbe->uaiocb, sizeof(struct aiocb)); 1293 } 1294 if (error) { 1295 suword(&job->_aiocb_private.error, error); 1296 uma_zfree(aiocb_zone, aiocbe); 1297 return (error); 1298 } 1299 1300 if (aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT && 1301 aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_SIGNAL && 1302 aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_THREAD_ID && 1303 aiocbe->uaiocb.aio_sigevent.sigev_notify != SIGEV_NONE) { 1304 suword(&job->_aiocb_private.error, EINVAL); 1305 uma_zfree(aiocb_zone, aiocbe); 1306 return (EINVAL); 1307 } 1308 1309 if ((aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL || 1310 aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_THREAD_ID) && 1311 !_SIG_VALID(aiocbe->uaiocb.aio_sigevent.sigev_signo)) { 1312 uma_zfree(aiocb_zone, aiocbe); 1313 return (EINVAL); 1314 } 1315 1316 ksiginfo_init(&aiocbe->ksi); 1317 1318 /* Save userspace address of the job info. */ 1319 aiocbe->uuaiocb = job; 1320 1321 /* Get the opcode. */ 1322 if (type != LIO_NOP) 1323 aiocbe->uaiocb.aio_lio_opcode = type; 1324 opcode = aiocbe->uaiocb.aio_lio_opcode; 1325 1326 /* Fetch the file object for the specified file descriptor. */ 1327 fd = aiocbe->uaiocb.aio_fildes; 1328 switch (opcode) { 1329 case LIO_WRITE: 1330 error = fget_write(td, fd, &fp); 1331 break; 1332 case LIO_READ: 1333 error = fget_read(td, fd, &fp); 1334 break; 1335 default: 1336 error = fget(td, fd, &fp); 1337 } 1338 if (error) { 1339 uma_zfree(aiocb_zone, aiocbe); 1340 suword(&job->_aiocb_private.error, EBADF); 1341 return (error); 1342 } 1343 aiocbe->fd_file = fp; 1344 1345 if (aiocbe->uaiocb.aio_offset == -1LL) { 1346 error = EINVAL; 1347 goto aqueue_fail; 1348 } 1349 1350 mtx_lock(&aio_job_mtx); 1351 jid = jobrefid; 1352 if (jobrefid == LONG_MAX) 1353 jobrefid = 1; 1354 else 1355 jobrefid++; 1356 mtx_unlock(&aio_job_mtx); 1357 1358 error = suword(&job->_aiocb_private.kernelinfo, jid); 1359 if (error) { 1360 error = EINVAL; 1361 goto aqueue_fail; 1362 } 1363 aiocbe->uaiocb._aiocb_private.kernelinfo = (void *)(intptr_t)jid; 1364 1365 if (opcode == LIO_NOP) { 1366 fdrop(fp, td); 1367 uma_zfree(aiocb_zone, aiocbe); 1368 return (0); 1369 } 1370 if ((opcode != LIO_READ) && (opcode != LIO_WRITE)) { 1371 error = EINVAL; 1372 goto aqueue_fail; 1373 } 1374 1375 if (aiocbe->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT) { 1376 kev.ident = aiocbe->uaiocb.aio_sigevent.sigev_notify_kqueue; 1377 } else 1378 goto no_kqueue; 1379 error = fget(td, (u_int)kev.ident, &kq_fp); 1380 if (error) 1381 goto aqueue_fail; 1382 if (kq_fp->f_type != DTYPE_KQUEUE) { 1383 fdrop(kq_fp, td); 1384 error = EBADF; 1385 goto aqueue_fail; 1386 } 1387 kq = kq_fp->f_data; 1388 kev.ident = (uintptr_t)aiocbe->uuaiocb; 1389 kev.filter = EVFILT_AIO; 1390 kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1; 1391 kev.data = (intptr_t)aiocbe; 1392 kev.udata = aiocbe->uaiocb.aio_sigevent.sigev_value.sival_ptr; 1393 error = kqueue_register(kq, &kev, td, 1); 1394 fdrop(kq_fp, td); 1395aqueue_fail: 1396 if (error) { 1397 fdrop(fp, td); 1398 uma_zfree(aiocb_zone, aiocbe); 1399 suword(&job->_aiocb_private.error, error); 1400 goto done; 1401 } 1402no_kqueue: 1403 1404 suword(&job->_aiocb_private.error, EINPROGRESS); 1405 aiocbe->uaiocb._aiocb_private.error = EINPROGRESS; 1406 aiocbe->userproc = p; 1407 aiocbe->cred = crhold(td->td_ucred); 1408 aiocbe->jobflags = 0; 1409 aiocbe->lio = lj; 1410 1411 if (fp->f_type == DTYPE_SOCKET) { 1412 /* 1413 * Alternate queueing for socket ops: Reach down into the 1414 * descriptor to get the socket data. Then check to see if the 1415 * socket is ready to be read or written (based on the requested 1416 * operation). 1417 * 1418 * If it is not ready for io, then queue the aiocbe on the 1419 * socket, and set the flags so we get a call when sbnotify() 1420 * happens. 1421 * 1422 * Note if opcode is neither LIO_WRITE nor LIO_READ we lock 1423 * and unlock the snd sockbuf for no reason. 1424 */ 1425 so = fp->f_data; 1426 sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd; 1427 SOCKBUF_LOCK(sb); 1428 if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode == 1429 LIO_WRITE) && (!sowriteable(so)))) { 1430 mtx_lock(&aio_sock_mtx); 1431 TAILQ_INSERT_TAIL(&so->so_aiojobq, aiocbe, list); 1432 mtx_unlock(&aio_sock_mtx); 1433 1434 sb->sb_flags |= SB_AIO; 1435 PROC_LOCK(p); 1436 TAILQ_INSERT_TAIL(&ki->kaio_sockqueue, aiocbe, plist); 1437 TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist); 1438 aiocbe->jobstate = JOBST_JOBQSOCK; 1439 ki->kaio_count++; 1440 if (lj) 1441 lj->lioj_count++; 1442 PROC_UNLOCK(p); 1443 SOCKBUF_UNLOCK(sb); 1444 atomic_add_int(&num_queue_count, 1); 1445 error = 0; 1446 goto done; 1447 } 1448 SOCKBUF_UNLOCK(sb); 1449 } 1450 1451 if ((error = aio_qphysio(p, aiocbe)) == 0) 1452 goto done; 1453#if 0 1454 if (error > 0) { 1455 aiocbe->uaiocb._aiocb_private.error = error; 1456 suword(&job->_aiocb_private.error, error); 1457 goto done; 1458 } 1459#endif 1460 /* No buffer for daemon I/O. */ 1461 aiocbe->bp = NULL; 1462 1463 PROC_LOCK(p); 1464 ki->kaio_count++; 1465 if (lj) 1466 lj->lioj_count++; 1467 TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, aiocbe, plist); 1468 TAILQ_INSERT_TAIL(&ki->kaio_all, aiocbe, allist); 1469 1470 mtx_lock(&aio_job_mtx); 1471 TAILQ_INSERT_TAIL(&aio_jobs, aiocbe, list); 1472 aiocbe->jobstate = JOBST_JOBQGLOBAL; 1473 PROC_UNLOCK(p); 1474 1475 atomic_add_int(&num_queue_count, 1); 1476 1477 /* 1478 * If we don't have a free AIO process, and we are below our quota, then 1479 * start one. Otherwise, depend on the subsequent I/O completions to 1480 * pick-up this job. If we don't sucessfully create the new process 1481 * (thread) due to resource issues, we return an error for now (EAGAIN), 1482 * which is likely not the correct thing to do. 1483 */ 1484retryproc: 1485 error = 0; 1486 if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) { 1487 TAILQ_REMOVE(&aio_freeproc, aiop, list); 1488 aiop->aiothreadflags &= ~AIOP_FREE; 1489 wakeup(aiop->aiothread); 1490 } else if (((num_aio_resv_start + num_aio_procs) < max_aio_procs) && 1491 ((ki->kaio_active_count + num_aio_resv_start) < 1492 ki->kaio_maxactive_count)) { 1493 num_aio_resv_start++; 1494 mtx_unlock(&aio_job_mtx); 1495 error = aio_newproc(&num_aio_resv_start); 1496 mtx_lock(&aio_job_mtx); 1497 if (error) { 1498 num_aio_resv_start--; 1499 goto retryproc; 1500 } 1501 } 1502 mtx_unlock(&aio_job_mtx); 1503 1504done: 1505 return (error); 1506} 1507 1508/* 1509 * Support the aio_return system call, as a side-effect, kernel resources are 1510 * released. 1511 */ 1512int 1513aio_return(struct thread *td, struct aio_return_args *uap) 1514{ 1515 struct proc *p = td->td_proc; 1516 struct aiocblist *cb; 1517 struct aiocb *uaiocb; 1518 struct kaioinfo *ki; 1519 int status, error; 1520 1521 ki = p->p_aioinfo; 1522 if (ki == NULL) 1523 return (EINVAL); 1524 uaiocb = uap->aiocbp; 1525 PROC_LOCK(p); 1526 TAILQ_FOREACH(cb, &ki->kaio_done, plist) { 1527 if (cb->uuaiocb == uaiocb) 1528 break; 1529 } 1530 if (cb != NULL) { 1531 MPASS(cb->jobstate == JOBST_JOBFINISHED); 1532 status = cb->uaiocb._aiocb_private.status; 1533 error = cb->uaiocb._aiocb_private.error; 1534 td->td_retval[0] = status; 1535 if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) { 1536 p->p_stats->p_ru.ru_oublock += 1537 cb->outputcharge; 1538 cb->outputcharge = 0; 1539 } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) { 1540 p->p_stats->p_ru.ru_inblock += cb->inputcharge; 1541 cb->inputcharge = 0; 1542 } 1543 aio_free_entry(cb); 1544 suword(&uaiocb->_aiocb_private.error, error); 1545 suword(&uaiocb->_aiocb_private.status, status); 1546 error = 0; 1547 } else 1548 error = EINVAL; 1549 PROC_UNLOCK(p); 1550 return (error); 1551} 1552 1553/* 1554 * Allow a process to wakeup when any of the I/O requests are completed. 1555 */ 1556int 1557aio_suspend(struct thread *td, struct aio_suspend_args *uap) 1558{ 1559 struct proc *p = td->td_proc; 1560 struct timeval atv; 1561 struct timespec ts; 1562 struct aiocb *const *cbptr, *cbp; 1563 struct kaioinfo *ki; 1564 struct aiocblist *cb, *cbfirst; 1565 struct aiocb **ujoblist; 1566 int njoblist; 1567 int error; 1568 int timo; 1569 int i; 1570 1571 if (uap->nent < 0 || uap->nent > AIO_LISTIO_MAX) 1572 return (EINVAL); 1573 1574 timo = 0; 1575 if (uap->timeout) { 1576 /* Get timespec struct. */ 1577 if ((error = copyin(uap->timeout, &ts, sizeof(ts))) != 0) 1578 return (error); 1579 1580 if (ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000) 1581 return (EINVAL); 1582 1583 TIMESPEC_TO_TIMEVAL(&atv, &ts); 1584 if (itimerfix(&atv)) 1585 return (EINVAL); 1586 timo = tvtohz(&atv); 1587 } 1588 1589 ki = p->p_aioinfo; 1590 if (ki == NULL) 1591 return (EAGAIN); 1592 1593 njoblist = 0; 1594 ujoblist = uma_zalloc(aiol_zone, M_WAITOK); 1595 cbptr = uap->aiocbp; 1596 1597 for (i = 0; i < uap->nent; i++) { 1598 cbp = (struct aiocb *)(intptr_t)fuword(&cbptr[i]); 1599 if (cbp == 0) 1600 continue; 1601 ujoblist[njoblist] = cbp; 1602 njoblist++; 1603 } 1604 1605 if (njoblist == 0) { 1606 uma_zfree(aiol_zone, ujoblist); 1607 return (0); 1608 } 1609 1610 PROC_LOCK(p); 1611 for (;;) { 1612 cbfirst = NULL; 1613 error = 0; 1614 TAILQ_FOREACH(cb, &ki->kaio_all, allist) { 1615 for (i = 0; i < njoblist; i++) { 1616 if (cb->uuaiocb == ujoblist[i]) { 1617 if (cbfirst == NULL) 1618 cbfirst = cb; 1619 if (cb->jobstate == JOBST_JOBFINISHED) 1620 goto RETURN; 1621 } 1622 } 1623 } 1624 /* All tasks were finished. */ 1625 if (cbfirst == NULL) 1626 break; 1627 1628 ki->kaio_flags |= KAIO_WAKEUP; 1629 error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH, 1630 "aiospn", timo); 1631 if (error == ERESTART) 1632 error = EINTR; 1633 if (error) 1634 break; 1635 } 1636RETURN: 1637 PROC_UNLOCK(p); 1638 uma_zfree(aiol_zone, ujoblist); 1639 return (error); 1640} 1641 1642/* 1643 * aio_cancel cancels any non-physio aio operations not currently in 1644 * progress. 1645 */ 1646int 1647aio_cancel(struct thread *td, struct aio_cancel_args *uap) 1648{ 1649 struct proc *p = td->td_proc; 1650 struct kaioinfo *ki; 1651 struct aiocblist *cbe, *cbn; 1652 struct file *fp; 1653 struct socket *so; 1654 int error; 1655 int cancelled = 0; 1656 int notcancelled = 0; 1657 struct vnode *vp; 1658 1659 /* Lookup file object. */ 1660 error = fget(td, uap->fd, &fp); 1661 if (error) 1662 return (error); 1663 1664 ki = p->p_aioinfo; 1665 if (ki == NULL) 1666 goto done; 1667 1668 if (fp->f_type == DTYPE_VNODE) { 1669 vp = fp->f_vnode; 1670 if (vn_isdisk(vp, &error)) { 1671 fdrop(fp, td); 1672 td->td_retval[0] = AIO_NOTCANCELED; 1673 return (0); 1674 } 1675 } else if (fp->f_type == DTYPE_SOCKET) { 1676 so = fp->f_data; 1677 mtx_lock(&aio_sock_mtx); 1678 TAILQ_FOREACH_SAFE(cbe, &so->so_aiojobq, list, cbn) { 1679 if (cbe->userproc == p && 1680 (uap->aiocbp == NULL || 1681 uap->aiocbp == cbe->uuaiocb)) { 1682 TAILQ_REMOVE(&so->so_aiojobq, cbe, list); 1683 PROC_LOCK(p); 1684 TAILQ_REMOVE(&ki->kaio_sockqueue, cbe, plist); 1685 cbe->jobstate = JOBST_JOBRUNNING; 1686 cbe->uaiocb._aiocb_private.status = -1; 1687 cbe->uaiocb._aiocb_private.error = ECANCELED; 1688 aio_bio_done_notify(p, cbe, DONE_QUEUE); 1689 PROC_UNLOCK(p); 1690 cancelled++; 1691 if (uap->aiocbp != NULL) 1692 break; 1693 } 1694 } 1695 mtx_unlock(&aio_sock_mtx); 1696 if (cancelled && uap->aiocbp != NULL) { 1697 fdrop(fp, td); 1698 td->td_retval[0] = AIO_CANCELED; 1699 return (0); 1700 } 1701 } 1702 1703 PROC_LOCK(p); 1704 TAILQ_FOREACH_SAFE(cbe, &ki->kaio_jobqueue, plist, cbn) { 1705 if ((uap->fd == cbe->uaiocb.aio_fildes) && 1706 ((uap->aiocbp == NULL) || 1707 (uap->aiocbp == cbe->uuaiocb))) { 1708 mtx_lock(&aio_job_mtx); 1709 if (cbe->jobstate == JOBST_JOBQGLOBAL) { 1710 TAILQ_REMOVE(&aio_jobs, cbe, list); 1711 mtx_unlock(&aio_job_mtx); 1712 TAILQ_REMOVE(&ki->kaio_jobqueue, cbe, plist); 1713 cbe->uaiocb._aiocb_private.status = -1; 1714 cbe->uaiocb._aiocb_private.error = ECANCELED; 1715 aio_bio_done_notify(p, cbe, DONE_QUEUE); 1716 cancelled++; 1717 } else { 1718 mtx_unlock(&aio_job_mtx); 1719 notcancelled++; 1720 } 1721 } 1722 } 1723 PROC_UNLOCK(p); 1724 1725done: 1726 fdrop(fp, td); 1727 if (notcancelled) { 1728 td->td_retval[0] = AIO_NOTCANCELED; 1729 return (0); 1730 } 1731 if (cancelled) { 1732 td->td_retval[0] = AIO_CANCELED; 1733 return (0); 1734 } 1735 td->td_retval[0] = AIO_ALLDONE; 1736 1737 return (0); 1738} 1739 1740/* 1741 * aio_error is implemented in the kernel level for compatibility purposes only. 1742 * For a user mode async implementation, it would be best to do it in a userland 1743 * subroutine. 1744 */ 1745int 1746aio_error(struct thread *td, struct aio_error_args *uap) 1747{ 1748 struct proc *p = td->td_proc; 1749 struct aiocblist *cb; 1750 struct kaioinfo *ki; 1751 int status; 1752 1753 ki = p->p_aioinfo; 1754 if (ki == NULL) { 1755 td->td_retval[0] = EINVAL; 1756 return (0); 1757 } 1758 1759 PROC_LOCK(p); 1760 TAILQ_FOREACH(cb, &ki->kaio_all, allist) { 1761 if (cb->uuaiocb == uap->aiocbp) { 1762 if (cb->jobstate == JOBST_JOBFINISHED) 1763 td->td_retval[0] = 1764 cb->uaiocb._aiocb_private.error; 1765 else 1766 td->td_retval[0] = EINPROGRESS; 1767 PROC_UNLOCK(p); 1768 return (0); 1769 } 1770 } 1771 PROC_UNLOCK(p); 1772 1773 /* 1774 * Hack for failure of aio_aqueue. 1775 */ 1776 status = fuword(&uap->aiocbp->_aiocb_private.status); 1777 if (status == -1) { 1778 td->td_retval[0] = fuword(&uap->aiocbp->_aiocb_private.error); 1779 return (0); 1780 } 1781 1782 td->td_retval[0] = EINVAL; 1783 return (0); 1784} 1785 1786/* syscall - asynchronous read from a file (REALTIME) */ 1787int 1788oaio_read(struct thread *td, struct oaio_read_args *uap) 1789{ 1790 1791 return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_READ, 1); 1792} 1793 1794int 1795aio_read(struct thread *td, struct aio_read_args *uap) 1796{ 1797 1798 return aio_aqueue(td, uap->aiocbp, NULL, LIO_READ, 0); 1799} 1800 1801/* syscall - asynchronous write to a file (REALTIME) */ 1802int 1803oaio_write(struct thread *td, struct oaio_write_args *uap) 1804{ 1805 1806 return aio_aqueue(td, (struct aiocb *)uap->aiocbp, NULL, LIO_WRITE, 1); 1807} 1808 1809int 1810aio_write(struct thread *td, struct aio_write_args *uap) 1811{ 1812 1813 return aio_aqueue(td, uap->aiocbp, NULL, LIO_WRITE, 0); 1814} 1815 1816/* syscall - list directed I/O (REALTIME) */ 1817int 1818olio_listio(struct thread *td, struct olio_listio_args *uap) 1819{ 1820 return do_lio_listio(td, (struct lio_listio_args *)uap, 1); 1821} 1822 1823/* syscall - list directed I/O (REALTIME) */ 1824int 1825lio_listio(struct thread *td, struct lio_listio_args *uap) 1826{ 1827 return do_lio_listio(td, uap, 0); 1828} 1829 1830static int 1831do_lio_listio(struct thread *td, struct lio_listio_args *uap, int oldsigev) 1832{ 1833 struct proc *p = td->td_proc; 1834 struct aiocb *iocb, * const *cbptr; 1835 struct kaioinfo *ki; 1836 struct aioliojob *lj; 1837 struct kevent kev; 1838 struct kqueue * kq; 1839 struct file *kq_fp; 1840 int nent; 1841 int error; 1842 int nerror; 1843 int i; 1844 1845 if ((uap->mode != LIO_NOWAIT) && (uap->mode != LIO_WAIT)) 1846 return (EINVAL); 1847 1848 nent = uap->nent; 1849 if (nent < 0 || nent > AIO_LISTIO_MAX) 1850 return (EINVAL); 1851 1852 if (p->p_aioinfo == NULL) 1853 aio_init_aioinfo(p); 1854 1855 ki = p->p_aioinfo; 1856 1857 lj = uma_zalloc(aiolio_zone, M_WAITOK); 1858 lj->lioj_flags = 0; 1859 lj->lioj_count = 0; 1860 lj->lioj_finished_count = 0; 1861 knlist_init(&lj->klist, &p->p_mtx, NULL, NULL, NULL); 1862 ksiginfo_init(&lj->lioj_ksi); 1863 1864 /* 1865 * Setup signal. 1866 */ 1867 if (uap->sig && (uap->mode == LIO_NOWAIT)) { 1868 bzero(&lj->lioj_signal, sizeof(&lj->lioj_signal)); 1869 error = copyin(uap->sig, &lj->lioj_signal, 1870 oldsigev ? sizeof(struct osigevent) : 1871 sizeof(struct sigevent)); 1872 if (error) { 1873 uma_zfree(aiolio_zone, lj); 1874 return (error); 1875 } 1876 1877 if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) { 1878 /* Assume only new style KEVENT */ 1879 error = fget(td, lj->lioj_signal.sigev_notify_kqueue, 1880 &kq_fp); 1881 if (error) { 1882 uma_zfree(aiolio_zone, lj); 1883 return (error); 1884 } 1885 if (kq_fp->f_type != DTYPE_KQUEUE) { 1886 fdrop(kq_fp, td); 1887 uma_zfree(aiolio_zone, lj); 1888 return (EBADF); 1889 } 1890 kq = (struct kqueue *)kq_fp->f_data; 1891 kev.filter = EVFILT_LIO; 1892 kev.flags = EV_ADD | EV_ENABLE | EV_FLAG1; 1893 kev.ident = (uintptr_t)lj; /* something unique */ 1894 kev.data = (intptr_t)lj; 1895 /* pass user defined sigval data */ 1896 kev.udata = lj->lioj_signal.sigev_value.sival_ptr; 1897 error = kqueue_register(kq, &kev, td, 1); 1898 fdrop(kq_fp, td); 1899 if (error) { 1900 uma_zfree(aiolio_zone, lj); 1901 return (error); 1902 } 1903 } else if (lj->lioj_signal.sigev_notify == SIGEV_NONE) { 1904 ; 1905 } else if (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL || 1906 lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID) { 1907 if (!_SIG_VALID(lj->lioj_signal.sigev_signo)) { 1908 uma_zfree(aiolio_zone, lj); 1909 return EINVAL; 1910 } 1911 lj->lioj_flags |= LIOJ_SIGNAL; 1912 } else { 1913 uma_zfree(aiolio_zone, lj); 1914 return EINVAL; 1915 } 1916 } 1917 1918 PROC_LOCK(p); 1919 TAILQ_INSERT_TAIL(&ki->kaio_liojoblist, lj, lioj_list); 1920 /* 1921 * Add extra aiocb count to avoid the lio to be freed 1922 * by other threads doing aio_waitcomplete or aio_return, 1923 * and prevent event from being sent until we have queued 1924 * all tasks. 1925 */ 1926 lj->lioj_count = 1; 1927 PROC_UNLOCK(p); 1928 1929 /* 1930 * Get pointers to the list of I/O requests. 1931 */ 1932 nerror = 0; 1933 cbptr = uap->acb_list; 1934 for (i = 0; i < uap->nent; i++) { 1935 iocb = (struct aiocb *)(intptr_t)fuword(&cbptr[i]); 1936 if (((intptr_t)iocb != -1) && ((intptr_t)iocb != 0)) { 1937 error = aio_aqueue(td, iocb, lj, 0, oldsigev); 1938 if (error != 0) 1939 nerror++; 1940 } 1941 } 1942 1943 error = 0; 1944 PROC_LOCK(p); 1945 if (uap->mode == LIO_WAIT) { 1946 while (lj->lioj_count - 1 != lj->lioj_finished_count) { 1947 ki->kaio_flags |= KAIO_WAKEUP; 1948 error = msleep(&p->p_aioinfo, &p->p_mtx, 1949 PRIBIO | PCATCH, "aiospn", 0); 1950 if (error == ERESTART) 1951 error = EINTR; 1952 if (error) 1953 break; 1954 } 1955 } else { 1956 if (lj->lioj_count - 1 == lj->lioj_finished_count) { 1957 if (lj->lioj_signal.sigev_notify == SIGEV_KEVENT) { 1958 lj->lioj_flags |= LIOJ_KEVENT_POSTED; 1959 KNOTE_LOCKED(&lj->klist, 1); 1960 } 1961 if ((lj->lioj_flags & (LIOJ_SIGNAL|LIOJ_SIGNAL_POSTED)) 1962 == LIOJ_SIGNAL 1963 && (lj->lioj_signal.sigev_notify == SIGEV_SIGNAL || 1964 lj->lioj_signal.sigev_notify == SIGEV_THREAD_ID)) { 1965 aio_sendsig(p, &lj->lioj_signal, 1966 &lj->lioj_ksi); 1967 lj->lioj_flags |= LIOJ_SIGNAL_POSTED; 1968 } 1969 } 1970 } 1971 lj->lioj_count--; 1972 if (lj->lioj_count == 0) { 1973 TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); 1974 knlist_delete(&lj->klist, curthread, 1); 1975 sigqueue_take(&lj->lioj_ksi); 1976 PROC_UNLOCK(p); 1977 uma_zfree(aiolio_zone, lj); 1978 } else 1979 PROC_UNLOCK(p); 1980 1981 if (nerror) 1982 return (EIO); 1983 return (error); 1984} 1985 1986/* 1987 * Called from interrupt thread for physio, we should return as fast 1988 * as possible, so we schedule a biohelper task. 1989 */ 1990static void 1991aio_physwakeup(struct buf *bp) 1992{ 1993 struct aiocblist *aiocbe; 1994 1995 aiocbe = (struct aiocblist *)bp->b_caller1; 1996 taskqueue_enqueue(taskqueue_aiod_bio, &aiocbe->biotask); 1997} 1998 1999/* 2000 * Task routine to perform heavy tasks, process wakeup, and signals. 2001 */ 2002static void 2003biohelper(void *context, int pending) 2004{ 2005 struct aiocblist *aiocbe = context; 2006 struct buf *bp; 2007 struct proc *userp; 2008 int nblks; 2009 2010 bp = aiocbe->bp; 2011 userp = aiocbe->userproc; 2012 PROC_LOCK(userp); 2013 aiocbe->uaiocb._aiocb_private.status -= bp->b_resid; 2014 aiocbe->uaiocb._aiocb_private.error = 0; 2015 if (bp->b_ioflags & BIO_ERROR) 2016 aiocbe->uaiocb._aiocb_private.error = bp->b_error; 2017 nblks = btodb(aiocbe->uaiocb.aio_nbytes); 2018 if (aiocbe->uaiocb.aio_lio_opcode == LIO_WRITE) 2019 aiocbe->outputcharge += nblks; 2020 else 2021 aiocbe->inputcharge += nblks; 2022 aiocbe->bp = NULL; 2023 TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, aiocbe, plist); 2024 aio_bio_done_notify(userp, aiocbe, DONE_BUF); 2025 PROC_UNLOCK(userp); 2026 2027 /* Release mapping into kernel space. */ 2028 vunmapbuf(bp); 2029 relpbuf(bp, NULL); 2030 atomic_subtract_int(&num_buf_aio, 1); 2031} 2032 2033/* syscall - wait for the next completion of an aio request */ 2034int 2035aio_waitcomplete(struct thread *td, struct aio_waitcomplete_args *uap) 2036{ 2037 struct proc *p = td->td_proc; 2038 struct timeval atv; 2039 struct timespec ts; 2040 struct kaioinfo *ki; 2041 struct aiocblist *cb; 2042 struct aiocb *uuaiocb; 2043 int error, status, timo; 2044 2045 suword(uap->aiocbp, (long)NULL); 2046 2047 timo = 0; 2048 if (uap->timeout) { 2049 /* Get timespec struct. */ 2050 error = copyin(uap->timeout, &ts, sizeof(ts)); 2051 if (error) 2052 return (error); 2053 2054 if ((ts.tv_nsec < 0) || (ts.tv_nsec >= 1000000000)) 2055 return (EINVAL); 2056 2057 TIMESPEC_TO_TIMEVAL(&atv, &ts); 2058 if (itimerfix(&atv)) 2059 return (EINVAL); 2060 timo = tvtohz(&atv); 2061 } 2062 2063 if (p->p_aioinfo == NULL) 2064 aio_init_aioinfo(p); 2065 ki = p->p_aioinfo; 2066 2067 error = 0; 2068 cb = NULL; 2069 PROC_LOCK(p); 2070 while ((cb = TAILQ_FIRST(&ki->kaio_done)) == NULL) { 2071 ki->kaio_flags |= KAIO_WAKEUP; 2072 error = msleep(&p->p_aioinfo, &p->p_mtx, PRIBIO | PCATCH, 2073 "aiowc", timo); 2074 if (error == ERESTART) 2075 error = EINTR; 2076 if (error) 2077 break; 2078 } 2079 2080 if (cb != NULL) { 2081 MPASS(cb->jobstate == JOBST_JOBFINISHED); 2082 uuaiocb = cb->uuaiocb; 2083 status = cb->uaiocb._aiocb_private.status; 2084 error = cb->uaiocb._aiocb_private.error; 2085 td->td_retval[0] = status; 2086 if (cb->uaiocb.aio_lio_opcode == LIO_WRITE) { 2087 p->p_stats->p_ru.ru_oublock += cb->outputcharge; 2088 cb->outputcharge = 0; 2089 } else if (cb->uaiocb.aio_lio_opcode == LIO_READ) { 2090 p->p_stats->p_ru.ru_inblock += cb->inputcharge; 2091 cb->inputcharge = 0; 2092 } 2093 aio_free_entry(cb); 2094 PROC_UNLOCK(p); 2095 suword(uap->aiocbp, (long)uuaiocb); 2096 suword(&uuaiocb->_aiocb_private.error, error); 2097 suword(&uuaiocb->_aiocb_private.status, status); 2098 } else 2099 PROC_UNLOCK(p); 2100 2101 return (error); 2102} 2103 2104/* kqueue attach function */ 2105static int 2106filt_aioattach(struct knote *kn) 2107{ 2108 struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata; 2109 2110 /* 2111 * The aiocbe pointer must be validated before using it, so 2112 * registration is restricted to the kernel; the user cannot 2113 * set EV_FLAG1. 2114 */ 2115 if ((kn->kn_flags & EV_FLAG1) == 0) 2116 return (EPERM); 2117 kn->kn_flags &= ~EV_FLAG1; 2118 2119 knlist_add(&aiocbe->klist, kn, 0); 2120 2121 return (0); 2122} 2123 2124/* kqueue detach function */ 2125static void 2126filt_aiodetach(struct knote *kn) 2127{ 2128 struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata; 2129 2130 if (!knlist_empty(&aiocbe->klist)) 2131 knlist_remove(&aiocbe->klist, kn, 0); 2132} 2133 2134/* kqueue filter function */ 2135/*ARGSUSED*/ 2136static int 2137filt_aio(struct knote *kn, long hint) 2138{ 2139 struct aiocblist *aiocbe = (struct aiocblist *)kn->kn_sdata; 2140 2141 kn->kn_data = aiocbe->uaiocb._aiocb_private.error; 2142 if (aiocbe->jobstate != JOBST_JOBFINISHED) 2143 return (0); 2144 kn->kn_flags |= EV_EOF; 2145 return (1); 2146} 2147 2148/* kqueue attach function */ 2149static int 2150filt_lioattach(struct knote *kn) 2151{ 2152 struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata; 2153 2154 /* 2155 * The aioliojob pointer must be validated before using it, so 2156 * registration is restricted to the kernel; the user cannot 2157 * set EV_FLAG1. 2158 */ 2159 if ((kn->kn_flags & EV_FLAG1) == 0) 2160 return (EPERM); 2161 kn->kn_flags &= ~EV_FLAG1; 2162 2163 knlist_add(&lj->klist, kn, 0); 2164 2165 return (0); 2166} 2167 2168/* kqueue detach function */ 2169static void 2170filt_liodetach(struct knote *kn) 2171{ 2172 struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata; 2173 2174 if (!knlist_empty(&lj->klist)) 2175 knlist_remove(&lj->klist, kn, 0); 2176} 2177 2178/* kqueue filter function */ 2179/*ARGSUSED*/ 2180static int 2181filt_lio(struct knote *kn, long hint) 2182{ 2183 struct aioliojob * lj = (struct aioliojob *)kn->kn_sdata; 2184 2185 return (lj->lioj_flags & LIOJ_KEVENT_POSTED); 2186} 2187