sys_pipe.c revision 14037
1/* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $Id: sys_pipe.c,v 1.10 1996/02/09 04:36:36 dyson Exp $ 20 */ 21 22#ifndef OLD_PIPE 23 24/* 25 * This file contains a high-performance replacement for the socket-based 26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 27 * all features of sockets, but does do everything that pipes normally 28 * do. 29 */ 30 31/* 32 * This code has two modes of operation, a small write mode and a large 33 * write mode. The small write mode acts like conventional pipes with 34 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 35 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 36 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 37 * the receiving process can copy it directly from the pages in the sending 38 * process. 39 * 40 * If the sending process receives a signal, it is possible that it will 41 * go away, and certainly its address space can change, because control 42 * is returned back to the user-mode side. In that case, the pipe code 43 * arranges to copy the buffer supplied by the user process, to a pageable 44 * kernel buffer, and the receiving process will grab the data from the 45 * pageable kernel buffer. Since signals don't happen all that often, 46 * the copy operation is normally eliminated. 47 * 48 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 49 * happen for small transfers so that the system will not spend all of 50 * its time context switching. PIPE_SIZE is constrained by the 51 * amount of kernel virtual memory. 52 */ 53 54#include <sys/param.h> 55#include <sys/systm.h> 56#include <sys/proc.h> 57#include <sys/file.h> 58#include <sys/protosw.h> 59#include <sys/stat.h> 60#include <sys/filedesc.h> 61#include <sys/malloc.h> 62#include <sys/ioctl.h> 63#include <sys/stat.h> 64#include <sys/select.h> 65#include <sys/signalvar.h> 66#include <sys/errno.h> 67#include <sys/queue.h> 68#include <sys/vmmeter.h> 69#include <sys/kernel.h> 70#include <sys/sysproto.h> 71#include <sys/pipe.h> 72 73#include <vm/vm.h> 74#include <vm/vm_prot.h> 75#include <vm/vm_param.h> 76#include <vm/lock.h> 77#include <vm/vm_object.h> 78#include <vm/vm_kern.h> 79#include <vm/vm_extern.h> 80#include <vm/pmap.h> 81#include <vm/vm_map.h> 82#include <vm/vm_page.h> 83 84/* 85 * Use this define if you want to disable *fancy* VM things. Expect an 86 * approx 30% decrease in transfer rate. This could be useful for 87 * NetBSD or OpenBSD. 88 */ 89/* #define PIPE_NODIRECT */ 90 91/* 92 * interfaces to the outside world 93 */ 94static int pipe_read __P((struct file *fp, struct uio *uio, 95 struct ucred *cred)); 96static int pipe_write __P((struct file *fp, struct uio *uio, 97 struct ucred *cred)); 98static int pipe_close __P((struct file *fp, struct proc *p)); 99static int pipe_select __P((struct file *fp, int which, struct proc *p)); 100static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p)); 101 102static struct fileops pipeops = 103 { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close }; 104 105/* 106 * Default pipe buffer size(s), this can be kind-of large now because pipe 107 * space is pageable. The pipe code will try to maintain locality of 108 * reference for performance reasons, so small amounts of outstanding I/O 109 * will not wipe the cache. 110 */ 111#define MINPIPESIZE (PIPE_SIZE/3) 112#define MAXPIPESIZE (2*PIPE_SIZE/3) 113 114/* 115 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 116 * is there so that on large systems, we don't exhaust it. 117 */ 118#define MAXPIPEKVA (8*1024*1024) 119 120/* 121 * Limit for direct transfers, we cannot, of course limit 122 * the amount of kva for pipes in general though. 123 */ 124#define LIMITPIPEKVA (16*1024*1024) 125int amountpipekva; 126 127static void pipeclose __P((struct pipe *cpipe)); 128static void pipebufferinit __P((struct pipe *cpipe)); 129static void pipeinit __P((struct pipe *cpipe)); 130static __inline int pipelock __P((struct pipe *cpipe, int catch)); 131static __inline void pipeunlock __P((struct pipe *cpipe)); 132#ifndef PIPE_NODIRECT 133static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio)); 134static void pipe_destroy_write_buffer __P((struct pipe *wpipe)); 135static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio)); 136static void pipe_clone_write_buffer __P((struct pipe *wpipe)); 137static void pipe_mark_pages_clean __P((struct pipe *cpipe)); 138#endif 139static int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio)); 140static void pipespace __P((struct pipe *cpipe)); 141 142/* 143 * The pipe system call for the DTYPE_PIPE type of pipes 144 */ 145 146/* ARGSUSED */ 147int 148pipe(p, uap, retval) 149 struct proc *p; 150 struct pipe_args /* { 151 int dummy; 152 } */ *uap; 153 int retval[]; 154{ 155 register struct filedesc *fdp = p->p_fd; 156 struct file *rf, *wf; 157 struct pipe *rpipe, *wpipe; 158 int fd, error; 159 160 rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK); 161 pipeinit(rpipe); 162 rpipe->pipe_state |= PIPE_DIRECTOK; 163 wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK); 164 pipeinit(wpipe); 165 wpipe->pipe_state |= PIPE_DIRECTOK; 166 167 error = falloc(p, &rf, &fd); 168 if (error) 169 goto free2; 170 retval[0] = fd; 171 rf->f_flag = FREAD | FWRITE; 172 rf->f_type = DTYPE_PIPE; 173 rf->f_ops = &pipeops; 174 rf->f_data = (caddr_t)rpipe; 175 error = falloc(p, &wf, &fd); 176 if (error) 177 goto free3; 178 wf->f_flag = FREAD | FWRITE; 179 wf->f_type = DTYPE_PIPE; 180 wf->f_ops = &pipeops; 181 wf->f_data = (caddr_t)wpipe; 182 retval[1] = fd; 183 184 rpipe->pipe_peer = wpipe; 185 wpipe->pipe_peer = rpipe; 186 187 return (0); 188free3: 189 ffree(rf); 190 fdp->fd_ofiles[retval[0]] = 0; 191free2: 192 (void)pipeclose(wpipe); 193free1: 194 (void)pipeclose(rpipe); 195 return (error); 196} 197 198/* 199 * Allocate kva for pipe circular buffer, the space is pageable 200 */ 201static void 202pipespace(cpipe) 203 struct pipe *cpipe; 204{ 205 int npages, error; 206 207 npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE; 208 /* 209 * Create an object, I don't like the idea of paging to/from 210 * kernel_object. 211 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 212 */ 213 cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages); 214 cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map); 215 216 /* 217 * Insert the object into the kernel map, and allocate kva for it. 218 * The map entry is, by default, pageable. 219 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 220 */ 221 error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0, 222 (vm_offset_t *) &cpipe->pipe_buffer.buffer, 223 cpipe->pipe_buffer.size, 1, 224 VM_PROT_ALL, VM_PROT_ALL, 0); 225 226 if (error != KERN_SUCCESS) 227 panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error); 228 amountpipekva += cpipe->pipe_buffer.size; 229} 230 231/* 232 * initialize and allocate VM and memory for pipe 233 */ 234static void 235pipeinit(cpipe) 236 struct pipe *cpipe; 237{ 238 int s; 239 240 cpipe->pipe_buffer.in = 0; 241 cpipe->pipe_buffer.out = 0; 242 cpipe->pipe_buffer.cnt = 0; 243 cpipe->pipe_buffer.size = PIPE_SIZE; 244 /* Buffer kva gets dynamically allocated */ 245 cpipe->pipe_buffer.buffer = NULL; 246 247 cpipe->pipe_state = 0; 248 cpipe->pipe_peer = NULL; 249 cpipe->pipe_busy = 0; 250 s = splhigh(); 251 cpipe->pipe_ctime = time; 252 cpipe->pipe_atime = time; 253 cpipe->pipe_mtime = time; 254 splx(s); 255 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 256 257#ifndef PIPE_NODIRECT 258 /* 259 * pipe data structure initializations to support direct pipe I/O 260 */ 261 cpipe->pipe_map.cnt = 0; 262 cpipe->pipe_map.kva = 0; 263 cpipe->pipe_map.pos = 0; 264 cpipe->pipe_map.npages = 0; 265#endif 266} 267 268 269/* 270 * lock a pipe for I/O, blocking other access 271 */ 272static __inline int 273pipelock(cpipe, catch) 274 struct pipe *cpipe; 275 int catch; 276{ 277 int error; 278 while (cpipe->pipe_state & PIPE_LOCK) { 279 cpipe->pipe_state |= PIPE_LWANT; 280 if (error = tsleep( &cpipe->pipe_state, 281 catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) { 282 return error; 283 } 284 } 285 cpipe->pipe_state |= PIPE_LOCK; 286 return 0; 287} 288 289/* 290 * unlock a pipe I/O lock 291 */ 292static __inline void 293pipeunlock(cpipe) 294 struct pipe *cpipe; 295{ 296 cpipe->pipe_state &= ~PIPE_LOCK; 297 if (cpipe->pipe_state & PIPE_LWANT) { 298 cpipe->pipe_state &= ~PIPE_LWANT; 299 wakeup(&cpipe->pipe_state); 300 } 301 return; 302} 303 304static __inline void 305pipeselwakeup(cpipe) 306 struct pipe *cpipe; 307{ 308 if (cpipe->pipe_state & PIPE_SEL) { 309 cpipe->pipe_state &= ~PIPE_SEL; 310 selwakeup(&cpipe->pipe_sel); 311 } 312} 313 314#ifndef PIPE_NODIRECT 315#if 0 316static void 317pipe_mark_pages_clean(cpipe) 318 struct pipe *cpipe; 319{ 320 vm_size_t off; 321 vm_page_t m; 322 323 for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) { 324 m = vm_page_lookup(cpipe->pipe_buffer.object, off); 325 if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) { 326 m->dirty = 0; 327 pmap_clear_modify(VM_PAGE_TO_PHYS(m)); 328 } 329 } 330} 331#endif 332#endif 333 334/* ARGSUSED */ 335static int 336pipe_read(fp, uio, cred) 337 struct file *fp; 338 struct uio *uio; 339 struct ucred *cred; 340{ 341 342 struct pipe *rpipe = (struct pipe *) fp->f_data; 343 int error = 0; 344 int nread = 0; 345 int size; 346 347 ++rpipe->pipe_busy; 348 while (uio->uio_resid) { 349 /* 350 * normal pipe buffer receive 351 */ 352 if (rpipe->pipe_buffer.cnt > 0) { 353 int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 354 if (size > rpipe->pipe_buffer.cnt) 355 size = rpipe->pipe_buffer.cnt; 356 if (size > uio->uio_resid) 357 size = uio->uio_resid; 358 if ((error = pipelock(rpipe,1)) == 0) { 359 error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 360 size, uio); 361 pipeunlock(rpipe); 362 } 363 if (error) { 364 break; 365 } 366 rpipe->pipe_buffer.out += size; 367 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 368 rpipe->pipe_buffer.out = 0; 369 370 rpipe->pipe_buffer.cnt -= size; 371 nread += size; 372#ifndef PIPE_NODIRECT 373 /* 374 * Direct copy, bypassing a kernel buffer. 375 */ 376 } else if ((size = rpipe->pipe_map.cnt) && 377 (rpipe->pipe_state & PIPE_DIRECTW)) { 378 caddr_t va; 379 if (size > uio->uio_resid) 380 size = uio->uio_resid; 381 if ((error = pipelock(rpipe,1)) == 0) { 382 va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos; 383 error = uiomove(va, size, uio); 384 pipeunlock(rpipe); 385 } 386 if (error) 387 break; 388 nread += size; 389 rpipe->pipe_map.pos += size; 390 rpipe->pipe_map.cnt -= size; 391 if (rpipe->pipe_map.cnt == 0) { 392 rpipe->pipe_state &= ~PIPE_DIRECTW; 393 wakeup(rpipe); 394 } 395#endif 396 } else { 397 /* 398 * detect EOF condition 399 */ 400 if (rpipe->pipe_state & PIPE_EOF) { 401 break; 402 } 403 /* 404 * If the "write-side" has been blocked, wake it up now. 405 */ 406 if (rpipe->pipe_state & PIPE_WANTW) { 407 rpipe->pipe_state &= ~PIPE_WANTW; 408 wakeup(rpipe); 409 } 410 if (nread > 0) 411 break; 412 if (rpipe->pipe_state & PIPE_NBIO) { 413 error = EAGAIN; 414 break; 415 } 416 417 /* 418 * If there is no more to read in the pipe, reset 419 * its pointers to the beginning. This improves 420 * cache hit stats. 421 */ 422 423 if ((error = pipelock(rpipe,1)) == 0) { 424 if (rpipe->pipe_buffer.cnt == 0) { 425 rpipe->pipe_buffer.in = 0; 426 rpipe->pipe_buffer.out = 0; 427 } 428 pipeunlock(rpipe); 429 } else { 430 break; 431 } 432 rpipe->pipe_state |= PIPE_WANTR; 433 if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) { 434 break; 435 } 436 } 437 } 438 439 if (error == 0) { 440 int s = splhigh(); 441 rpipe->pipe_atime = time; 442 splx(s); 443 } 444 445 --rpipe->pipe_busy; 446 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 447 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 448 wakeup(rpipe); 449 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 450 /* 451 * If there is no more to read in the pipe, reset 452 * its pointers to the beginning. This improves 453 * cache hit stats. 454 */ 455 if ((error == 0) && (error = pipelock(rpipe,1)) == 0) { 456 if (rpipe->pipe_buffer.cnt == 0) { 457#if 0 458 pipe_mark_pages_clean(rpipe); 459#endif 460 rpipe->pipe_buffer.in = 0; 461 rpipe->pipe_buffer.out = 0; 462 } 463 pipeunlock(rpipe); 464 } 465 466 /* 467 * If the "write-side" has been blocked, wake it up now. 468 */ 469 if (rpipe->pipe_state & PIPE_WANTW) { 470 rpipe->pipe_state &= ~PIPE_WANTW; 471 wakeup(rpipe); 472 } 473 } 474 475 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) > 0) 476 pipeselwakeup(rpipe); 477 478 return error; 479} 480 481#ifndef PIPE_NODIRECT 482/* 483 * Map the sending processes' buffer into kernel space and wire it. 484 * This is similar to a physical write operation. 485 */ 486static int 487pipe_build_write_buffer(wpipe, uio) 488 struct pipe *wpipe; 489 struct uio *uio; 490{ 491 int size; 492 int i; 493 vm_offset_t addr, endaddr, paddr; 494 495 size = uio->uio_iov->iov_len; 496 if (size > wpipe->pipe_buffer.size) 497 size = wpipe->pipe_buffer.size; 498 499 endaddr = round_page(uio->uio_iov->iov_base + size); 500 for(i = 0, addr = trunc_page(uio->uio_iov->iov_base); 501 addr < endaddr; 502 addr += PAGE_SIZE, i+=1) { 503 504 vm_page_t m; 505 506 vm_fault_quick( (caddr_t) addr, VM_PROT_READ); 507 paddr = pmap_kextract(addr); 508 if (!paddr) { 509 int j; 510 for(j=0;j<i;j++) 511 vm_page_unwire(wpipe->pipe_map.ms[j]); 512 return EFAULT; 513 } 514 515 m = PHYS_TO_VM_PAGE(paddr); 516 vm_page_wire(m); 517 wpipe->pipe_map.ms[i] = m; 518 } 519 520/* 521 * set up the control block 522 */ 523 wpipe->pipe_map.npages = i; 524 wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 525 wpipe->pipe_map.cnt = size; 526 527/* 528 * and map the buffer 529 */ 530 if (wpipe->pipe_map.kva == 0) { 531 /* 532 * We need to allocate space for an extra page because the 533 * address range might (will) span pages at times. 534 */ 535 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 536 wpipe->pipe_buffer.size + PAGE_SIZE); 537 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE; 538 } 539 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 540 wpipe->pipe_map.npages); 541 542/* 543 * and update the uio data 544 */ 545 546 uio->uio_iov->iov_len -= size; 547 uio->uio_iov->iov_base += size; 548 if (uio->uio_iov->iov_len == 0) 549 uio->uio_iov++; 550 uio->uio_resid -= size; 551 uio->uio_offset += size; 552 return 0; 553} 554 555/* 556 * unmap and unwire the process buffer 557 */ 558static void 559pipe_destroy_write_buffer(wpipe) 560struct pipe *wpipe; 561{ 562 int i; 563 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 564 565 if (wpipe->pipe_map.kva) { 566 if (amountpipekva > MAXPIPEKVA) { 567 vm_offset_t kva = wpipe->pipe_map.kva; 568 wpipe->pipe_map.kva = 0; 569 kmem_free(kernel_map, kva, 570 wpipe->pipe_buffer.size + PAGE_SIZE); 571 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 572 } 573 } 574 for (i=0;i<wpipe->pipe_map.npages;i++) 575 vm_page_unwire(wpipe->pipe_map.ms[i]); 576} 577 578/* 579 * In the case of a signal, the writing process might go away. This 580 * code copies the data into the circular buffer so that the source 581 * pages can be freed without loss of data. 582 */ 583static void 584pipe_clone_write_buffer(wpipe) 585struct pipe *wpipe; 586{ 587 int size; 588 int pos; 589 590 size = wpipe->pipe_map.cnt; 591 pos = wpipe->pipe_map.pos; 592 bcopy((caddr_t) wpipe->pipe_map.kva+pos, 593 (caddr_t) wpipe->pipe_buffer.buffer, 594 size); 595 596 wpipe->pipe_buffer.in = size; 597 wpipe->pipe_buffer.out = 0; 598 wpipe->pipe_buffer.cnt = size; 599 wpipe->pipe_state &= ~PIPE_DIRECTW; 600 601 pipe_destroy_write_buffer(wpipe); 602} 603 604/* 605 * This implements the pipe buffer write mechanism. Note that only 606 * a direct write OR a normal pipe write can be pending at any given time. 607 * If there are any characters in the pipe buffer, the direct write will 608 * be deferred until the receiving process grabs all of the bytes from 609 * the pipe buffer. Then the direct mapping write is set-up. 610 */ 611static int 612pipe_direct_write(wpipe, uio) 613 struct pipe *wpipe; 614 struct uio *uio; 615{ 616 int error; 617retry: 618 while (wpipe->pipe_state & PIPE_DIRECTW) { 619 if ( wpipe->pipe_state & PIPE_WANTR) { 620 wpipe->pipe_state &= ~PIPE_WANTR; 621 wakeup(wpipe); 622 } 623 wpipe->pipe_state |= PIPE_WANTW; 624 error = tsleep(wpipe, 625 PRIBIO|PCATCH, "pipdww", 0); 626 if (error || (wpipe->pipe_state & PIPE_EOF)) 627 goto error1; 628 } 629 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 630 if (wpipe->pipe_buffer.cnt > 0) { 631 if ( wpipe->pipe_state & PIPE_WANTR) { 632 wpipe->pipe_state &= ~PIPE_WANTR; 633 wakeup(wpipe); 634 } 635 636 wpipe->pipe_state |= PIPE_WANTW; 637 error = tsleep(wpipe, 638 PRIBIO|PCATCH, "pipdwc", 0); 639 if (error || (wpipe->pipe_state & PIPE_EOF)) { 640 wpipe->pipe_state &= ~PIPE_DIRECTW; 641 if (error == 0) 642 error = EPIPE; 643 goto error1; 644 } 645 goto retry; 646 } 647 648 wpipe->pipe_state |= PIPE_DIRECTW; 649 650 error = pipe_build_write_buffer(wpipe, uio); 651 if (error) { 652 wpipe->pipe_state &= ~PIPE_DIRECTW; 653 goto error1; 654 } 655 656 error = 0; 657 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 658 if (wpipe->pipe_state & PIPE_EOF) { 659 pipelock(wpipe, 0); 660 pipe_destroy_write_buffer(wpipe); 661 pipeunlock(wpipe); 662 pipeselwakeup(wpipe); 663 wakeup(wpipe); 664 return EPIPE; 665 } 666 if (wpipe->pipe_state & PIPE_WANTR) { 667 wpipe->pipe_state &= ~PIPE_WANTR; 668 wakeup(wpipe); 669 } 670 pipeselwakeup(wpipe); 671 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0); 672 } 673 674 pipelock(wpipe,0); 675 if (wpipe->pipe_state & PIPE_DIRECTW) { 676 /* 677 * this bit of trickery substitutes a kernel buffer for 678 * the process that might be going away. 679 */ 680 pipe_clone_write_buffer(wpipe); 681 } else { 682 pipe_destroy_write_buffer(wpipe); 683 } 684 pipeunlock(wpipe); 685 return error; 686 687error1: 688 wakeup(wpipe); 689 return error; 690} 691#endif 692 693static __inline int 694pipewrite(wpipe, uio, nbio) 695 struct pipe *wpipe; 696 struct uio *uio; 697 int nbio; 698{ 699 int error = 0; 700 int orig_resid; 701 702 /* 703 * detect loss of pipe read side, issue SIGPIPE if lost. 704 */ 705 if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) { 706 return EPIPE; 707 } 708 709 if( wpipe->pipe_buffer.buffer == NULL) { 710 if ((error = pipelock(wpipe,1)) == 0) { 711 pipespace(wpipe); 712 pipeunlock(wpipe); 713 } else { 714 return error; 715 } 716 } 717 718 ++wpipe->pipe_busy; 719 orig_resid = uio->uio_resid; 720 while (uio->uio_resid) { 721 int space; 722#ifndef PIPE_NODIRECT 723 /* 724 * If the transfer is large, we can gain performance if 725 * we do process-to-process copies directly. 726 */ 727 if ((amountpipekva < LIMITPIPEKVA) && 728 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 729 error = pipe_direct_write( wpipe, uio); 730 if (error) { 731 break; 732 } 733 continue; 734 } 735#endif 736 737 /* 738 * Pipe buffered writes cannot be coincidental with 739 * direct writes. We wait until the currently executing 740 * direct write is completed before we start filling the 741 * pipe buffer. 742 */ 743 retrywrite: 744 while (wpipe->pipe_state & PIPE_DIRECTW) { 745 if (wpipe->pipe_state & PIPE_WANTR) { 746 wpipe->pipe_state &= ~PIPE_WANTR; 747 wakeup(wpipe); 748 } 749 error = tsleep(wpipe, 750 PRIBIO|PCATCH, "pipbww", 0); 751 if (error) 752 break; 753 } 754 755 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 756 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 757 space = 0; 758 759 /* 760 * We must afford contiguous writes on buffers of size 761 * PIPE_BUF or less. 762 */ 763 if (space > 0) { 764 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in; 765 if (size > space) 766 size = space; 767 if (size > uio->uio_resid) 768 size = uio->uio_resid; 769 if ((error = pipelock(wpipe,1)) == 0) { 770 /* 771 * It is possible for a direct write to 772 * slip in on us... handle it here... 773 */ 774 if (wpipe->pipe_state & PIPE_DIRECTW) { 775 pipeunlock(wpipe); 776 goto retrywrite; 777 } 778 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 779 size, uio); 780 pipeunlock(wpipe); 781 } 782 if (error) 783 break; 784 785 wpipe->pipe_buffer.in += size; 786 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size) 787 wpipe->pipe_buffer.in = 0; 788 789 wpipe->pipe_buffer.cnt += size; 790 } else { 791 /* 792 * If the "read-side" has been blocked, wake it up now. 793 */ 794 if (wpipe->pipe_state & PIPE_WANTR) { 795 wpipe->pipe_state &= ~PIPE_WANTR; 796 wakeup(wpipe); 797 } 798 799 /* 800 * don't block on non-blocking I/O 801 */ 802 if (nbio) { 803 error = EAGAIN; 804 break; 805 } 806 807 /* 808 * We have no more space and have something to offer, 809 * wake up selects. 810 */ 811 pipeselwakeup(wpipe); 812 813 wpipe->pipe_state |= PIPE_WANTW; 814 if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) { 815 break; 816 } 817 /* 818 * If read side wants to go away, we just issue a signal 819 * to ourselves. 820 */ 821 if (wpipe->pipe_state & PIPE_EOF) { 822 error = EPIPE; 823 break; 824 } 825 } 826 } 827 828 if ((wpipe->pipe_busy == 0) && 829 (wpipe->pipe_state & PIPE_WANT)) { 830 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR); 831 wakeup(wpipe); 832 } else if (wpipe->pipe_buffer.cnt > 0) { 833 /* 834 * If we have put any characters in the buffer, we wake up 835 * the reader. 836 */ 837 if (wpipe->pipe_state & PIPE_WANTR) { 838 wpipe->pipe_state &= ~PIPE_WANTR; 839 wakeup(wpipe); 840 } 841 } 842 843 /* 844 * Don't return EPIPE if I/O was successful 845 */ 846 if ((wpipe->pipe_buffer.cnt == 0) && 847 (uio->uio_resid == 0) && 848 (error == EPIPE)) 849 error = 0; 850 851 if (error = 0) { 852 int s = splhigh(); 853 wpipe->pipe_mtime = time; 854 splx(s); 855 } 856 /* 857 * We have something to offer, 858 * wake up select. 859 */ 860 if (wpipe->pipe_buffer.cnt > 0) 861 pipeselwakeup(wpipe); 862 863 --wpipe->pipe_busy; 864 return error; 865} 866 867/* ARGSUSED */ 868static int 869pipe_write(fp, uio, cred) 870 struct file *fp; 871 struct uio *uio; 872 struct ucred *cred; 873{ 874 struct pipe *rpipe = (struct pipe *) fp->f_data; 875 struct pipe *wpipe = rpipe->pipe_peer; 876 return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0); 877} 878 879/* 880 * we implement a very minimal set of ioctls for compatibility with sockets. 881 */ 882int 883pipe_ioctl(fp, cmd, data, p) 884 struct file *fp; 885 int cmd; 886 register caddr_t data; 887 struct proc *p; 888{ 889 register struct pipe *mpipe = (struct pipe *)fp->f_data; 890 891 switch (cmd) { 892 893 case FIONBIO: 894 if (*(int *)data) 895 mpipe->pipe_state |= PIPE_NBIO; 896 else 897 mpipe->pipe_state &= ~PIPE_NBIO; 898 return (0); 899 900 case FIOASYNC: 901 if (*(int *)data) { 902 mpipe->pipe_state |= PIPE_ASYNC; 903 } else { 904 mpipe->pipe_state &= ~PIPE_ASYNC; 905 } 906 return (0); 907 908 case FIONREAD: 909 if (mpipe->pipe_state & PIPE_DIRECTW) 910 *(int *)data = mpipe->pipe_map.cnt; 911 else 912 *(int *)data = mpipe->pipe_buffer.cnt; 913 return (0); 914 915 case SIOCSPGRP: 916 mpipe->pipe_pgid = *(int *)data; 917 return (0); 918 919 case SIOCGPGRP: 920 *(int *)data = mpipe->pipe_pgid; 921 return (0); 922 923 } 924 return ENOSYS; 925} 926 927int 928pipe_select(fp, which, p) 929 struct file *fp; 930 int which; 931 struct proc *p; 932{ 933 register struct pipe *rpipe = (struct pipe *)fp->f_data; 934 struct pipe *wpipe; 935 936 wpipe = rpipe->pipe_peer; 937 switch (which) { 938 939 case FREAD: 940 if (rpipe->pipe_buffer.cnt > 0 || 941 (rpipe->pipe_state & PIPE_EOF)) { 942 return (1); 943 } 944 selrecord(p, &rpipe->pipe_sel); 945 rpipe->pipe_state |= PIPE_SEL; 946 break; 947 948 case FWRITE: 949 if ((wpipe == NULL) || 950 (wpipe->pipe_state & PIPE_EOF) || 951 ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { 952 return (1); 953 } 954 selrecord(p, &wpipe->pipe_sel); 955 wpipe->pipe_state |= PIPE_SEL; 956 break; 957 958 case 0: 959 if ((rpipe->pipe_state & PIPE_EOF) || 960 (wpipe == NULL) || 961 (wpipe->pipe_state & PIPE_EOF)) { 962 return (1); 963 } 964 965 selrecord(p, &rpipe->pipe_sel); 966 rpipe->pipe_state |= PIPE_SEL; 967 break; 968 } 969 return (0); 970} 971 972int 973pipe_stat(pipe, ub) 974 register struct pipe *pipe; 975 register struct stat *ub; 976{ 977 bzero((caddr_t)ub, sizeof (*ub)); 978 ub->st_mode = S_IFSOCK; 979 ub->st_blksize = pipe->pipe_buffer.size; 980 ub->st_size = pipe->pipe_buffer.cnt; 981 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 982 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 983 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 984 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 985 return 0; 986} 987 988/* ARGSUSED */ 989static int 990pipe_close(fp, p) 991 struct file *fp; 992 struct proc *p; 993{ 994 int error = 0; 995 struct pipe *cpipe = (struct pipe *)fp->f_data; 996 pipeclose(cpipe); 997 fp->f_data = NULL; 998 return 0; 999} 1000 1001/* 1002 * shutdown the pipe 1003 */ 1004static void 1005pipeclose(cpipe) 1006 struct pipe *cpipe; 1007{ 1008 struct pipe *ppipe; 1009 if (cpipe) { 1010 1011 pipeselwakeup(cpipe); 1012 1013 /* 1014 * If the other side is blocked, wake it up saying that 1015 * we want to close it down. 1016 */ 1017 while (cpipe->pipe_busy) { 1018 wakeup(cpipe); 1019 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF; 1020 tsleep(cpipe, PRIBIO, "pipecl", 0); 1021 } 1022 1023 /* 1024 * Disconnect from peer 1025 */ 1026 if (ppipe = cpipe->pipe_peer) { 1027 pipeselwakeup(ppipe); 1028 1029 ppipe->pipe_state |= PIPE_EOF; 1030 wakeup(ppipe); 1031 ppipe->pipe_peer = NULL; 1032 } 1033 1034 /* 1035 * free resources 1036 */ 1037 if (cpipe->pipe_buffer.buffer) { 1038 amountpipekva -= cpipe->pipe_buffer.size; 1039 kmem_free(kernel_map, 1040 (vm_offset_t)cpipe->pipe_buffer.buffer, 1041 cpipe->pipe_buffer.size); 1042 } 1043#ifndef PIPE_NODIRECT 1044 if (cpipe->pipe_map.kva) { 1045 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE; 1046 kmem_free(kernel_map, 1047 cpipe->pipe_map.kva, 1048 cpipe->pipe_buffer.size + PAGE_SIZE); 1049 } 1050#endif 1051 free(cpipe, M_TEMP); 1052 } 1053} 1054#endif 1055