1/* $NetBSD: sys_pipe.c,v 1.167 2024/02/10 09:21:54 andvar Exp $ */ 2 3/*- 4 * Copyright (c) 2003, 2007, 2008, 2009, 2023 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Paul Kranenburg, and by Andrew Doran. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS 20 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED 21 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR 22 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS 23 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 29 * POSSIBILITY OF SUCH DAMAGE. 30 */ 31 32/* 33 * Copyright (c) 1996 John S. Dyson 34 * All rights reserved. 35 * 36 * Redistribution and use in source and binary forms, with or without 37 * modification, are permitted provided that the following conditions 38 * are met: 39 * 1. Redistributions of source code must retain the above copyright 40 * notice immediately at the beginning of the file, without modification, 41 * this list of conditions, and the following disclaimer. 42 * 2. Redistributions in binary form must reproduce the above copyright 43 * notice, this list of conditions and the following disclaimer in the 44 * documentation and/or other materials provided with the distribution. 45 * 3. Absolutely no warranty of function or purpose is made by the author 46 * John S. Dyson. 47 * 4. Modifications may be freely made to this file if the above conditions 48 * are met. 49 */ 50 51/* 52 * This file contains a high-performance replacement for the socket-based 53 * pipes scheme originally used. It does not support all features of 54 * sockets, but does do everything that pipes normally do. 55 */ 56 57#include <sys/cdefs.h> 58__KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.167 2024/02/10 09:21:54 andvar Exp $"); 59 60#include <sys/param.h> 61#include <sys/systm.h> 62#include <sys/proc.h> 63#include <sys/fcntl.h> 64#include <sys/file.h> 65#include <sys/filedesc.h> 66#include <sys/filio.h> 67#include <sys/kernel.h> 68#include <sys/ttycom.h> 69#include <sys/stat.h> 70#include <sys/poll.h> 71#include <sys/signalvar.h> 72#include <sys/vnode.h> 73#include <sys/uio.h> 74#include <sys/select.h> 75#include <sys/mount.h> 76#include <sys/syscallargs.h> 77#include <sys/sysctl.h> 78#include <sys/kauth.h> 79#include <sys/atomic.h> 80#include <sys/pipe.h> 81 82static int pipe_read(file_t *, off_t *, struct uio *, kauth_cred_t, int); 83static int pipe_write(file_t *, off_t *, struct uio *, kauth_cred_t, int); 84static int pipe_close(file_t *); 85static int pipe_poll(file_t *, int); 86static int pipe_kqfilter(file_t *, struct knote *); 87static int pipe_stat(file_t *, struct stat *); 88static int pipe_ioctl(file_t *, u_long, void *); 89static void pipe_restart(file_t *); 90static int pipe_fpathconf(file_t *, int, register_t *); 91static int pipe_posix_fadvise(file_t *, off_t, off_t, int); 92 93static const struct fileops pipeops = { 94 .fo_name = "pipe", 95 .fo_read = pipe_read, 96 .fo_write = pipe_write, 97 .fo_ioctl = pipe_ioctl, 98 .fo_fcntl = fnullop_fcntl, 99 .fo_poll = pipe_poll, 100 .fo_stat = pipe_stat, 101 .fo_close = pipe_close, 102 .fo_kqfilter = pipe_kqfilter, 103 .fo_restart = pipe_restart, 104 .fo_fpathconf = pipe_fpathconf, 105 .fo_posix_fadvise = pipe_posix_fadvise, 106}; 107 108/* 109 * Default pipe buffer size(s), this can be kind-of large now because pipe 110 * space is pageable. The pipe code will try to maintain locality of 111 * reference for performance reasons, so small amounts of outstanding I/O 112 * will not wipe the cache. 113 */ 114#define MINPIPESIZE (PIPE_SIZE / 3) 115#define MAXPIPESIZE (2 * PIPE_SIZE / 3) 116 117/* 118 * Limit the number of "big" pipes 119 */ 120#define LIMITBIGPIPES 32 121static u_int maxbigpipes __read_mostly = LIMITBIGPIPES; 122static u_int nbigpipe = 0; 123 124/* 125 * Amount of KVA consumed by pipe buffers. 126 */ 127static u_int amountpipekva = 0; 128 129static void pipeclose(struct pipe *); 130static void pipe_free_kmem(struct pipe *); 131static int pipe_create(struct pipe **, pool_cache_t, struct timespec *); 132static int pipelock(struct pipe *, bool); 133static inline void pipeunlock(struct pipe *); 134static void pipeselwakeup(struct pipe *, struct pipe *, int); 135static int pipespace(struct pipe *, int); 136static int pipe_ctor(void *, void *, int); 137static void pipe_dtor(void *, void *); 138 139static pool_cache_t pipe_wr_cache; 140static pool_cache_t pipe_rd_cache; 141 142void 143pipe_init(void) 144{ 145 146 /* Writer side is not automatically allocated KVA. */ 147 pipe_wr_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "pipewr", 148 NULL, IPL_NONE, pipe_ctor, pipe_dtor, NULL); 149 KASSERT(pipe_wr_cache != NULL); 150 151 /* Reader side gets preallocated KVA. */ 152 pipe_rd_cache = pool_cache_init(sizeof(struct pipe), 0, 0, 0, "piperd", 153 NULL, IPL_NONE, pipe_ctor, pipe_dtor, (void *)1); 154 KASSERT(pipe_rd_cache != NULL); 155} 156 157static int 158pipe_ctor(void *arg, void *obj, int flags) 159{ 160 struct pipe *pipe; 161 vaddr_t va; 162 163 pipe = obj; 164 165 memset(pipe, 0, sizeof(struct pipe)); 166 if (arg != NULL) { 167 /* Preallocate space. */ 168 va = uvm_km_alloc(kernel_map, PIPE_SIZE, 0, 169 UVM_KMF_PAGEABLE | UVM_KMF_WAITVA); 170 KASSERT(va != 0); 171 pipe->pipe_kmem = va; 172 atomic_add_int(&amountpipekva, PIPE_SIZE); 173 } 174 cv_init(&pipe->pipe_rcv, "pipe_rd"); 175 cv_init(&pipe->pipe_wcv, "pipe_wr"); 176 cv_init(&pipe->pipe_draincv, "pipe_drn"); 177 cv_init(&pipe->pipe_lkcv, "pipe_lk"); 178 selinit(&pipe->pipe_sel); 179 pipe->pipe_state = PIPE_SIGNALR; 180 181 return 0; 182} 183 184static void 185pipe_dtor(void *arg, void *obj) 186{ 187 struct pipe *pipe; 188 189 pipe = obj; 190 191 cv_destroy(&pipe->pipe_rcv); 192 cv_destroy(&pipe->pipe_wcv); 193 cv_destroy(&pipe->pipe_draincv); 194 cv_destroy(&pipe->pipe_lkcv); 195 seldestroy(&pipe->pipe_sel); 196 if (pipe->pipe_kmem != 0) { 197 uvm_km_free(kernel_map, pipe->pipe_kmem, PIPE_SIZE, 198 UVM_KMF_PAGEABLE); 199 atomic_add_int(&amountpipekva, -PIPE_SIZE); 200 } 201} 202 203/* 204 * The pipe system call for the DTYPE_PIPE type of pipes 205 */ 206int 207pipe1(struct lwp *l, int *fildes, int flags) 208{ 209 struct pipe *rpipe, *wpipe; 210 struct timespec nt; 211 file_t *rf, *wf; 212 int fd, error; 213 proc_t *p; 214 215 if (flags & ~(O_CLOEXEC|O_NONBLOCK|O_NOSIGPIPE)) 216 return EINVAL; 217 p = curproc; 218 rpipe = wpipe = NULL; 219 getnanotime(&nt); 220 if ((error = pipe_create(&rpipe, pipe_rd_cache, &nt)) || 221 (error = pipe_create(&wpipe, pipe_wr_cache, &nt))) { 222 goto free2; 223 } 224 rpipe->pipe_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE); 225 wpipe->pipe_lock = rpipe->pipe_lock; 226 mutex_obj_hold(wpipe->pipe_lock); 227 228 error = fd_allocfile(&rf, &fd); 229 if (error) 230 goto free2; 231 fildes[0] = fd; 232 233 error = fd_allocfile(&wf, &fd); 234 if (error) 235 goto free3; 236 fildes[1] = fd; 237 238 rf->f_flag = FREAD | flags; 239 rf->f_type = DTYPE_PIPE; 240 rf->f_pipe = rpipe; 241 rf->f_ops = &pipeops; 242 fd_set_exclose(l, fildes[0], (flags & O_CLOEXEC) != 0); 243 244 wf->f_flag = FWRITE | flags; 245 wf->f_type = DTYPE_PIPE; 246 wf->f_pipe = wpipe; 247 wf->f_ops = &pipeops; 248 fd_set_exclose(l, fildes[1], (flags & O_CLOEXEC) != 0); 249 250 rpipe->pipe_peer = wpipe; 251 wpipe->pipe_peer = rpipe; 252 253 fd_affix(p, rf, fildes[0]); 254 fd_affix(p, wf, fildes[1]); 255 return (0); 256free3: 257 fd_abort(p, rf, fildes[0]); 258free2: 259 pipeclose(wpipe); 260 pipeclose(rpipe); 261 262 return (error); 263} 264 265/* 266 * Allocate kva for pipe circular buffer, the space is pageable 267 * This routine will 'realloc' the size of a pipe safely, if it fails 268 * it will retain the old buffer. 269 * If it fails it will return ENOMEM. 270 */ 271static int 272pipespace(struct pipe *pipe, int size) 273{ 274 void *buffer; 275 276 /* 277 * Allocate pageable virtual address space. Physical memory is 278 * allocated on demand. 279 */ 280 if (size == PIPE_SIZE && pipe->pipe_kmem != 0) { 281 buffer = (void *)pipe->pipe_kmem; 282 } else { 283 buffer = (void *)uvm_km_alloc(kernel_map, round_page(size), 284 0, UVM_KMF_PAGEABLE); 285 if (buffer == NULL) 286 return (ENOMEM); 287 atomic_add_int(&amountpipekva, size); 288 } 289 290 /* free old resources if we're resizing */ 291 pipe_free_kmem(pipe); 292 pipe->pipe_buffer.buffer = buffer; 293 pipe->pipe_buffer.size = size; 294 pipe->pipe_buffer.in = 0; 295 pipe->pipe_buffer.out = 0; 296 pipe->pipe_buffer.cnt = 0; 297 return (0); 298} 299 300/* 301 * Initialize and allocate VM and memory for pipe. 302 */ 303static int 304pipe_create(struct pipe **pipep, pool_cache_t cache, struct timespec *nt) 305{ 306 struct pipe *pipe; 307 int error; 308 309 pipe = pool_cache_get(cache, PR_WAITOK); 310 KASSERT(pipe != NULL); 311 *pipep = pipe; 312 error = 0; 313 pipe->pipe_atime = pipe->pipe_mtime = pipe->pipe_btime = *nt; 314 pipe->pipe_lock = NULL; 315 if (cache == pipe_rd_cache) { 316 error = pipespace(pipe, PIPE_SIZE); 317 } else { 318 pipe->pipe_buffer.buffer = NULL; 319 pipe->pipe_buffer.size = 0; 320 pipe->pipe_buffer.in = 0; 321 pipe->pipe_buffer.out = 0; 322 pipe->pipe_buffer.cnt = 0; 323 } 324 return error; 325} 326 327/* 328 * Lock a pipe for I/O, blocking other access 329 * Called with pipe spin lock held. 330 */ 331static int 332pipelock(struct pipe *pipe, bool catch_p) 333{ 334 int error; 335 336 KASSERT(mutex_owned(pipe->pipe_lock)); 337 338 while (pipe->pipe_state & PIPE_LOCKFL) { 339 if (catch_p) { 340 error = cv_wait_sig(&pipe->pipe_lkcv, pipe->pipe_lock); 341 if (error != 0) { 342 return error; 343 } 344 } else 345 cv_wait(&pipe->pipe_lkcv, pipe->pipe_lock); 346 } 347 348 pipe->pipe_state |= PIPE_LOCKFL; 349 350 return 0; 351} 352 353/* 354 * unlock a pipe I/O lock 355 */ 356static inline void 357pipeunlock(struct pipe *pipe) 358{ 359 360 KASSERT(pipe->pipe_state & PIPE_LOCKFL); 361 362 pipe->pipe_state &= ~PIPE_LOCKFL; 363 cv_signal(&pipe->pipe_lkcv); 364} 365 366/* 367 * Select/poll wakeup. This also sends SIGIO to peer connected to 368 * 'sigpipe' side of pipe. 369 */ 370static void 371pipeselwakeup(struct pipe *selp, struct pipe *sigp, int code) 372{ 373 int band; 374 375 switch (code) { 376 case POLL_IN: 377 band = POLLIN|POLLRDNORM; 378 break; 379 case POLL_OUT: 380 band = POLLOUT|POLLWRNORM; 381 break; 382 case POLL_HUP: 383 band = POLLHUP; 384 break; 385 case POLL_ERR: 386 band = POLLERR; 387 break; 388 default: 389 band = 0; 390#ifdef DIAGNOSTIC 391 printf("bad siginfo code %d in pipe notification.\n", code); 392#endif 393 break; 394 } 395 396 selnotify(&selp->pipe_sel, band, NOTE_SUBMIT); 397 398 if (sigp == NULL || (sigp->pipe_state & PIPE_ASYNC) == 0) 399 return; 400 401 fownsignal(sigp->pipe_pgid, SIGIO, code, band, selp); 402} 403 404static int 405pipe_read(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 406 int flags) 407{ 408 struct pipe *rpipe = fp->f_pipe; 409 struct pipebuf *bp = &rpipe->pipe_buffer; 410 kmutex_t *lock = rpipe->pipe_lock; 411 int error; 412 size_t nread = 0; 413 size_t size; 414 size_t ocnt; 415 unsigned int wakeup_state = 0; 416 417 /* 418 * Try to avoid locking the pipe if we have nothing to do. 419 * 420 * There are programs which share one pipe amongst multiple processes 421 * and perform non-blocking reads in parallel, even if the pipe is 422 * empty. This in particular is the case with BSD make, which when 423 * spawned with a high -j number can find itself with over half of the 424 * calls failing to find anything. 425 */ 426 if ((fp->f_flag & FNONBLOCK) != 0) { 427 if (__predict_false(uio->uio_resid == 0)) 428 return (0); 429 if (atomic_load_relaxed(&bp->cnt) == 0 && 430 (atomic_load_relaxed(&rpipe->pipe_state) & PIPE_EOF) == 0) 431 return (EAGAIN); 432 } 433 434 mutex_enter(lock); 435 ++rpipe->pipe_busy; 436 ocnt = bp->cnt; 437 438again: 439 error = pipelock(rpipe, true); 440 if (error) 441 goto unlocked_error; 442 443 while (uio->uio_resid) { 444 /* 445 * Normal pipe buffer receive. 446 */ 447 if (bp->cnt > 0) { 448 size = bp->size - bp->out; 449 if (size > bp->cnt) 450 size = bp->cnt; 451 if (size > uio->uio_resid) 452 size = uio->uio_resid; 453 454 mutex_exit(lock); 455 error = uiomove((char *)bp->buffer + bp->out, size, uio); 456 mutex_enter(lock); 457 if (error) 458 break; 459 460 bp->out += size; 461 if (bp->out >= bp->size) 462 bp->out = 0; 463 464 bp->cnt -= size; 465 466 /* 467 * If there is no more to read in the pipe, reset 468 * its pointers to the beginning. This improves 469 * cache hit stats. 470 */ 471 if (bp->cnt == 0) { 472 bp->in = 0; 473 bp->out = 0; 474 } 475 nread += size; 476 continue; 477 } 478 479 /* 480 * Break if some data was read. 481 */ 482 if (nread > 0) 483 break; 484 485 /* 486 * Detect EOF condition. 487 * Read returns 0 on EOF, no need to set error. 488 */ 489 if (rpipe->pipe_state & PIPE_EOF) 490 break; 491 492 /* 493 * Don't block on non-blocking I/O. 494 */ 495 if (fp->f_flag & FNONBLOCK) { 496 error = EAGAIN; 497 break; 498 } 499 500 /* 501 * Unlock the pipe buffer for our remaining processing. 502 * We will either break out with an error or we will 503 * sleep and relock to loop. 504 */ 505 pipeunlock(rpipe); 506 507#if 1 /* XXX (dsl) I'm sure these aren't needed here ... */ 508 /* 509 * We want to read more, wake up select/poll. 510 */ 511 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 512 513 /* 514 * If the "write-side" is blocked, wake it up now. 515 */ 516 cv_broadcast(&rpipe->pipe_wcv); 517#endif 518 519 if (wakeup_state & PIPE_RESTART) { 520 error = ERESTART; 521 goto unlocked_error; 522 } 523 524 /* Now wait until the pipe is filled */ 525 error = cv_wait_sig(&rpipe->pipe_rcv, lock); 526 if (error != 0) 527 goto unlocked_error; 528 wakeup_state = rpipe->pipe_state; 529 goto again; 530 } 531 532 if (error == 0) 533 getnanotime(&rpipe->pipe_atime); 534 pipeunlock(rpipe); 535 536unlocked_error: 537 --rpipe->pipe_busy; 538 if (rpipe->pipe_busy == 0) { 539 rpipe->pipe_state &= ~PIPE_RESTART; 540 cv_broadcast(&rpipe->pipe_draincv); 541 } 542 if (bp->cnt < MINPIPESIZE) { 543 cv_broadcast(&rpipe->pipe_wcv); 544 } 545 546 /* 547 * If anything was read off the buffer, signal to the writer it's 548 * possible to write more data. Also send signal if we are here for the 549 * first time after last write. 550 */ 551 if ((bp->size - bp->cnt) >= PIPE_BUF 552 && (ocnt != bp->cnt || (rpipe->pipe_state & PIPE_SIGNALR))) { 553 pipeselwakeup(rpipe, rpipe->pipe_peer, POLL_OUT); 554 rpipe->pipe_state &= ~PIPE_SIGNALR; 555 } 556 557 mutex_exit(lock); 558 return (error); 559} 560 561static int 562pipe_write(file_t *fp, off_t *offset, struct uio *uio, kauth_cred_t cred, 563 int flags) 564{ 565 struct pipe *wpipe, *rpipe; 566 struct pipebuf *bp; 567 kmutex_t *lock; 568 int error; 569 unsigned int wakeup_state = 0; 570 571 /* We want to write to our peer */ 572 rpipe = fp->f_pipe; 573 lock = rpipe->pipe_lock; 574 error = 0; 575 576 mutex_enter(lock); 577 wpipe = rpipe->pipe_peer; 578 579 /* 580 * Detect loss of pipe read side, issue SIGPIPE if lost. 581 */ 582 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) != 0) { 583 mutex_exit(lock); 584 return EPIPE; 585 } 586 ++wpipe->pipe_busy; 587 588 /* Acquire the long-term pipe lock */ 589 if ((error = pipelock(wpipe, true)) != 0) { 590 --wpipe->pipe_busy; 591 if (wpipe->pipe_busy == 0) { 592 wpipe->pipe_state &= ~PIPE_RESTART; 593 cv_broadcast(&wpipe->pipe_draincv); 594 } 595 mutex_exit(lock); 596 return (error); 597 } 598 599 bp = &wpipe->pipe_buffer; 600 601 /* 602 * If it is advantageous to resize the pipe buffer, do so. 603 */ 604 if ((uio->uio_resid > PIPE_SIZE) && 605 (nbigpipe < maxbigpipes) && 606 (bp->size <= PIPE_SIZE) && (bp->cnt == 0)) { 607 608 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) 609 atomic_inc_uint(&nbigpipe); 610 } 611 612 while (uio->uio_resid) { 613 size_t space; 614 615 space = bp->size - bp->cnt; 616 617 /* Writes of size <= PIPE_BUF must be atomic. */ 618 if ((space < uio->uio_resid) && (uio->uio_resid <= PIPE_BUF)) 619 space = 0; 620 621 if (space > 0) { 622 int size; /* Transfer size */ 623 int segsize; /* first segment to transfer */ 624 625 /* 626 * Transfer size is minimum of uio transfer 627 * and free space in pipe buffer. 628 */ 629 if (space > uio->uio_resid) 630 size = uio->uio_resid; 631 else 632 size = space; 633 /* 634 * First segment to transfer is minimum of 635 * transfer size and contiguous space in 636 * pipe buffer. If first segment to transfer 637 * is less than the transfer size, we've got 638 * a wraparound in the buffer. 639 */ 640 segsize = bp->size - bp->in; 641 if (segsize > size) 642 segsize = size; 643 644 /* Transfer first segment */ 645 mutex_exit(lock); 646 error = uiomove((char *)bp->buffer + bp->in, segsize, 647 uio); 648 649 if (error == 0 && segsize < size) { 650 /* 651 * Transfer remaining part now, to 652 * support atomic writes. Wraparound 653 * happened. 654 */ 655 KASSERT(bp->in + segsize == bp->size); 656 error = uiomove(bp->buffer, 657 size - segsize, uio); 658 } 659 mutex_enter(lock); 660 if (error) 661 break; 662 663 bp->in += size; 664 if (bp->in >= bp->size) { 665 KASSERT(bp->in == size - segsize + bp->size); 666 bp->in = size - segsize; 667 } 668 669 bp->cnt += size; 670 KASSERT(bp->cnt <= bp->size); 671 wakeup_state = 0; 672 } else { 673 /* 674 * If the "read-side" has been blocked, wake it up now. 675 */ 676 cv_broadcast(&wpipe->pipe_rcv); 677 678 /* 679 * Don't block on non-blocking I/O. 680 */ 681 if (fp->f_flag & FNONBLOCK) { 682 error = EAGAIN; 683 break; 684 } 685 686 /* 687 * We have no more space and have something to offer, 688 * wake up select/poll. 689 */ 690 if (bp->cnt) 691 pipeselwakeup(wpipe, wpipe, POLL_IN); 692 693 if (wakeup_state & PIPE_RESTART) { 694 error = ERESTART; 695 break; 696 } 697 698 /* 699 * If read side wants to go away, we just issue a signal 700 * to ourselves. 701 */ 702 if (wpipe->pipe_state & PIPE_EOF) { 703 error = EPIPE; 704 break; 705 } 706 707 pipeunlock(wpipe); 708 error = cv_wait_sig(&wpipe->pipe_wcv, lock); 709 (void)pipelock(wpipe, false); 710 if (error != 0) 711 break; 712 wakeup_state = wpipe->pipe_state; 713 } 714 } 715 716 --wpipe->pipe_busy; 717 if (wpipe->pipe_busy == 0) { 718 wpipe->pipe_state &= ~PIPE_RESTART; 719 cv_broadcast(&wpipe->pipe_draincv); 720 } 721 if (bp->cnt > 0) { 722 cv_broadcast(&wpipe->pipe_rcv); 723 } 724 725 /* 726 * Don't return EPIPE if I/O was successful 727 */ 728 if (error == EPIPE && bp->cnt == 0 && uio->uio_resid == 0) 729 error = 0; 730 731 if (error == 0) 732 getnanotime(&wpipe->pipe_mtime); 733 734 /* 735 * We have something to offer, wake up select/poll. 736 */ 737 if (bp->cnt) 738 pipeselwakeup(wpipe, wpipe, POLL_IN); 739 740 /* 741 * Arrange for next read(2) to do a signal. 742 */ 743 wpipe->pipe_state |= PIPE_SIGNALR; 744 745 pipeunlock(wpipe); 746 mutex_exit(lock); 747 return (error); 748} 749 750/* 751 * We implement a very minimal set of ioctls for compatibility with sockets. 752 */ 753int 754pipe_ioctl(file_t *fp, u_long cmd, void *data) 755{ 756 struct pipe *pipe = fp->f_pipe; 757 kmutex_t *lock = pipe->pipe_lock; 758 759 switch (cmd) { 760 761 case FIONBIO: 762 return (0); 763 764 case FIOASYNC: 765 mutex_enter(lock); 766 if (*(int *)data) { 767 pipe->pipe_state |= PIPE_ASYNC; 768 } else { 769 pipe->pipe_state &= ~PIPE_ASYNC; 770 } 771 mutex_exit(lock); 772 return (0); 773 774 case FIONREAD: 775 mutex_enter(lock); 776 *(int *)data = pipe->pipe_buffer.cnt; 777 mutex_exit(lock); 778 return (0); 779 780 case FIONWRITE: 781 /* Look at other side */ 782 mutex_enter(lock); 783 pipe = pipe->pipe_peer; 784 if (pipe == NULL) 785 *(int *)data = 0; 786 else 787 *(int *)data = pipe->pipe_buffer.cnt; 788 mutex_exit(lock); 789 return (0); 790 791 case FIONSPACE: 792 /* Look at other side */ 793 mutex_enter(lock); 794 pipe = pipe->pipe_peer; 795 if (pipe == NULL) 796 *(int *)data = 0; 797 else 798 *(int *)data = pipe->pipe_buffer.size - 799 pipe->pipe_buffer.cnt; 800 mutex_exit(lock); 801 return (0); 802 803 case TIOCSPGRP: 804 case FIOSETOWN: 805 return fsetown(&pipe->pipe_pgid, cmd, data); 806 807 case TIOCGPGRP: 808 case FIOGETOWN: 809 return fgetown(pipe->pipe_pgid, cmd, data); 810 811 } 812 return (EPASSTHROUGH); 813} 814 815int 816pipe_poll(file_t *fp, int events) 817{ 818 struct pipe *rpipe = fp->f_pipe; 819 struct pipe *wpipe; 820 int eof = 0; 821 int revents = 0; 822 823 mutex_enter(rpipe->pipe_lock); 824 wpipe = rpipe->pipe_peer; 825 826 if (events & (POLLIN | POLLRDNORM)) 827 if ((rpipe->pipe_buffer.cnt > 0) || 828 (rpipe->pipe_state & PIPE_EOF)) 829 revents |= events & (POLLIN | POLLRDNORM); 830 831 eof |= (rpipe->pipe_state & PIPE_EOF); 832 833 if (wpipe == NULL) 834 revents |= events & (POLLOUT | POLLWRNORM); 835 else { 836 if (events & (POLLOUT | POLLWRNORM)) 837 if ((wpipe->pipe_state & PIPE_EOF) || ( 838 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) 839 revents |= events & (POLLOUT | POLLWRNORM); 840 841 eof |= (wpipe->pipe_state & PIPE_EOF); 842 } 843 844 if (wpipe == NULL || eof) 845 revents |= POLLHUP; 846 847 if (revents == 0) { 848 if (events & (POLLIN | POLLRDNORM)) 849 selrecord(curlwp, &rpipe->pipe_sel); 850 851 if (events & (POLLOUT | POLLWRNORM)) 852 selrecord(curlwp, &wpipe->pipe_sel); 853 } 854 mutex_exit(rpipe->pipe_lock); 855 856 return (revents); 857} 858 859static int 860pipe_stat(file_t *fp, struct stat *ub) 861{ 862 struct pipe *pipe = fp->f_pipe; 863 864 mutex_enter(pipe->pipe_lock); 865 memset(ub, 0, sizeof(*ub)); 866 ub->st_mode = S_IFIFO | S_IRUSR | S_IWUSR; 867 ub->st_blksize = pipe->pipe_buffer.size; 868 if (ub->st_blksize == 0 && pipe->pipe_peer) 869 ub->st_blksize = pipe->pipe_peer->pipe_buffer.size; 870 ub->st_size = pipe->pipe_buffer.cnt; 871 ub->st_blocks = (ub->st_size) ? 1 : 0; 872 ub->st_atimespec = pipe->pipe_atime; 873 ub->st_mtimespec = pipe->pipe_mtime; 874 ub->st_ctimespec = ub->st_birthtimespec = pipe->pipe_btime; 875 ub->st_uid = kauth_cred_geteuid(fp->f_cred); 876 ub->st_gid = kauth_cred_getegid(fp->f_cred); 877 878 /* 879 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen. 880 * XXX (st_dev, st_ino) should be unique. 881 */ 882 mutex_exit(pipe->pipe_lock); 883 return 0; 884} 885 886static int 887pipe_close(file_t *fp) 888{ 889 struct pipe *pipe = fp->f_pipe; 890 891 fp->f_pipe = NULL; 892 pipeclose(pipe); 893 return (0); 894} 895 896static void 897pipe_restart(file_t *fp) 898{ 899 struct pipe *pipe = fp->f_pipe; 900 901 /* 902 * Unblock blocked reads/writes in order to allow close() to complete. 903 * System calls return ERESTART so that the fd is revalidated. 904 * (Partial writes return the transfer length.) 905 */ 906 mutex_enter(pipe->pipe_lock); 907 pipe->pipe_state |= PIPE_RESTART; 908 /* Wakeup both cvs, maybe we only need one, but maybe there are some 909 * other paths where wakeup is needed, and it saves deciding which! */ 910 cv_broadcast(&pipe->pipe_rcv); 911 cv_broadcast(&pipe->pipe_wcv); 912 mutex_exit(pipe->pipe_lock); 913} 914 915static int 916pipe_fpathconf(struct file *fp, int name, register_t *retval) 917{ 918 919 switch (name) { 920 case _PC_PIPE_BUF: 921 *retval = PIPE_BUF; 922 return 0; 923 default: 924 return EINVAL; 925 } 926} 927 928static int 929pipe_posix_fadvise(struct file *fp, off_t offset, off_t len, int advice) 930{ 931 932 return ESPIPE; 933} 934 935static void 936pipe_free_kmem(struct pipe *pipe) 937{ 938 939 if (pipe->pipe_buffer.buffer != NULL) { 940 if (pipe->pipe_buffer.size > PIPE_SIZE) { 941 atomic_dec_uint(&nbigpipe); 942 } 943 if (pipe->pipe_buffer.buffer != (void *)pipe->pipe_kmem) { 944 uvm_km_free(kernel_map, 945 (vaddr_t)pipe->pipe_buffer.buffer, 946 pipe->pipe_buffer.size, UVM_KMF_PAGEABLE); 947 atomic_add_int(&amountpipekva, 948 -pipe->pipe_buffer.size); 949 } 950 pipe->pipe_buffer.buffer = NULL; 951 } 952} 953 954/* 955 * Shutdown the pipe. 956 */ 957static void 958pipeclose(struct pipe *pipe) 959{ 960 kmutex_t *lock; 961 struct pipe *ppipe; 962 963 if (pipe == NULL) 964 return; 965 966 KASSERT(cv_is_valid(&pipe->pipe_rcv)); 967 KASSERT(cv_is_valid(&pipe->pipe_wcv)); 968 KASSERT(cv_is_valid(&pipe->pipe_draincv)); 969 KASSERT(cv_is_valid(&pipe->pipe_lkcv)); 970 971 lock = pipe->pipe_lock; 972 if (lock == NULL) 973 /* Must have failed during create */ 974 goto free_resources; 975 976 mutex_enter(lock); 977 pipeselwakeup(pipe, pipe, POLL_HUP); 978 979 /* 980 * If the other side is blocked, wake it up saying that 981 * we want to close it down. 982 */ 983 pipe->pipe_state |= PIPE_EOF; 984 if (pipe->pipe_busy) { 985 while (pipe->pipe_busy) { 986 cv_broadcast(&pipe->pipe_wcv); 987 cv_wait_sig(&pipe->pipe_draincv, lock); 988 } 989 } 990 991 /* 992 * Disconnect from peer. 993 */ 994 if ((ppipe = pipe->pipe_peer) != NULL) { 995 pipeselwakeup(ppipe, ppipe, POLL_HUP); 996 ppipe->pipe_state |= PIPE_EOF; 997 cv_broadcast(&ppipe->pipe_rcv); 998 ppipe->pipe_peer = NULL; 999 } 1000 1001 /* 1002 * Any knote objects still left in the list are 1003 * the one attached by peer. Since no one will 1004 * traverse this list, we just clear it. 1005 * 1006 * XXX Exposes select/kqueue internals. 1007 */ 1008 SLIST_INIT(&pipe->pipe_sel.sel_klist); 1009 1010 KASSERT((pipe->pipe_state & PIPE_LOCKFL) == 0); 1011 mutex_exit(lock); 1012 mutex_obj_free(lock); 1013 1014 /* 1015 * Free resources. 1016 */ 1017 free_resources: 1018 pipe->pipe_pgid = 0; 1019 pipe->pipe_state = PIPE_SIGNALR; 1020 pipe->pipe_peer = NULL; 1021 pipe->pipe_lock = NULL; 1022 pipe_free_kmem(pipe); 1023 if (pipe->pipe_kmem != 0) { 1024 pool_cache_put(pipe_rd_cache, pipe); 1025 } else { 1026 pool_cache_put(pipe_wr_cache, pipe); 1027 } 1028} 1029 1030static void 1031filt_pipedetach(struct knote *kn) 1032{ 1033 struct pipe *pipe; 1034 kmutex_t *lock; 1035 1036 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1037 lock = pipe->pipe_lock; 1038 1039 mutex_enter(lock); 1040 1041 switch(kn->kn_filter) { 1042 case EVFILT_WRITE: 1043 /* Need the peer structure, not our own. */ 1044 pipe = pipe->pipe_peer; 1045 1046 /* If reader end already closed, just return. */ 1047 if (pipe == NULL) { 1048 mutex_exit(lock); 1049 return; 1050 } 1051 1052 break; 1053 default: 1054 /* Nothing to do. */ 1055 break; 1056 } 1057 1058 KASSERT(kn->kn_hook == pipe); 1059 selremove_knote(&pipe->pipe_sel, kn); 1060 mutex_exit(lock); 1061} 1062 1063static int 1064filt_piperead(struct knote *kn, long hint) 1065{ 1066 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1067 struct pipe *wpipe; 1068 int rv; 1069 1070 if ((hint & NOTE_SUBMIT) == 0) { 1071 mutex_enter(rpipe->pipe_lock); 1072 } 1073 wpipe = rpipe->pipe_peer; 1074 kn->kn_data = rpipe->pipe_buffer.cnt; 1075 1076 if ((rpipe->pipe_state & PIPE_EOF) || 1077 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1078 knote_set_eof(kn, 0); 1079 rv = 1; 1080 } else { 1081 rv = kn->kn_data > 0; 1082 } 1083 1084 if ((hint & NOTE_SUBMIT) == 0) { 1085 mutex_exit(rpipe->pipe_lock); 1086 } 1087 return rv; 1088} 1089 1090static int 1091filt_pipewrite(struct knote *kn, long hint) 1092{ 1093 struct pipe *rpipe = ((file_t *)kn->kn_obj)->f_pipe; 1094 struct pipe *wpipe; 1095 int rv; 1096 1097 if ((hint & NOTE_SUBMIT) == 0) { 1098 mutex_enter(rpipe->pipe_lock); 1099 } 1100 wpipe = rpipe->pipe_peer; 1101 1102 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 1103 kn->kn_data = 0; 1104 knote_set_eof(kn, 0); 1105 rv = 1; 1106 } else { 1107 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 1108 rv = kn->kn_data >= PIPE_BUF; 1109 } 1110 1111 if ((hint & NOTE_SUBMIT) == 0) { 1112 mutex_exit(rpipe->pipe_lock); 1113 } 1114 return rv; 1115} 1116 1117static const struct filterops pipe_rfiltops = { 1118 .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE, 1119 .f_attach = NULL, 1120 .f_detach = filt_pipedetach, 1121 .f_event = filt_piperead, 1122}; 1123 1124static const struct filterops pipe_wfiltops = { 1125 .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE, 1126 .f_attach = NULL, 1127 .f_detach = filt_pipedetach, 1128 .f_event = filt_pipewrite, 1129}; 1130 1131static int 1132pipe_kqfilter(file_t *fp, struct knote *kn) 1133{ 1134 struct pipe *pipe; 1135 kmutex_t *lock; 1136 1137 pipe = ((file_t *)kn->kn_obj)->f_pipe; 1138 lock = pipe->pipe_lock; 1139 1140 mutex_enter(lock); 1141 1142 switch (kn->kn_filter) { 1143 case EVFILT_READ: 1144 kn->kn_fop = &pipe_rfiltops; 1145 break; 1146 case EVFILT_WRITE: 1147 kn->kn_fop = &pipe_wfiltops; 1148 pipe = pipe->pipe_peer; 1149 if (pipe == NULL) { 1150 /* Other end of pipe has been closed. */ 1151 mutex_exit(lock); 1152 return (EBADF); 1153 } 1154 break; 1155 default: 1156 mutex_exit(lock); 1157 return (EINVAL); 1158 } 1159 1160 kn->kn_hook = pipe; 1161 selrecord_knote(&pipe->pipe_sel, kn); 1162 mutex_exit(lock); 1163 1164 return (0); 1165} 1166 1167/* 1168 * Handle pipe sysctls. 1169 */ 1170SYSCTL_SETUP(sysctl_kern_pipe_setup, "sysctl kern.pipe subtree setup") 1171{ 1172 1173 sysctl_createv(clog, 0, NULL, NULL, 1174 CTLFLAG_PERMANENT, 1175 CTLTYPE_NODE, "pipe", 1176 SYSCTL_DESCR("Pipe settings"), 1177 NULL, 0, NULL, 0, 1178 CTL_KERN, KERN_PIPE, CTL_EOL); 1179 1180 sysctl_createv(clog, 0, NULL, NULL, 1181 CTLFLAG_PERMANENT|CTLFLAG_READWRITE, 1182 CTLTYPE_INT, "maxbigpipes", 1183 SYSCTL_DESCR("Maximum number of \"big\" pipes"), 1184 NULL, 0, &maxbigpipes, 0, 1185 CTL_KERN, KERN_PIPE, KERN_PIPE_MAXBIGPIPES, CTL_EOL); 1186 sysctl_createv(clog, 0, NULL, NULL, 1187 CTLFLAG_PERMANENT, 1188 CTLTYPE_INT, "nbigpipes", 1189 SYSCTL_DESCR("Number of \"big\" pipes"), 1190 NULL, 0, &nbigpipe, 0, 1191 CTL_KERN, KERN_PIPE, KERN_PIPE_NBIGPIPES, CTL_EOL); 1192 sysctl_createv(clog, 0, NULL, NULL, 1193 CTLFLAG_PERMANENT, 1194 CTLTYPE_INT, "kvasize", 1195 SYSCTL_DESCR("Amount of kernel memory consumed by pipe " 1196 "buffers"), 1197 NULL, 0, &amountpipekva, 0, 1198 CTL_KERN, KERN_PIPE, KERN_PIPE_KVASIZE, CTL_EOL); 1199} 1200