sys_pipe.c revision 27923
1/* 2 * Copyright (c) 1996 John S. Dyson 3 * All rights reserved. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice immediately at the beginning of the file, without modification, 10 * this list of conditions, and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. Absolutely no warranty of function or purpose is made by the author 15 * John S. Dyson. 16 * 4. Modifications may be freely made to this file if the above conditions 17 * are met. 18 * 19 * $Id: sys_pipe.c,v 1.30 1997/08/05 00:05:00 dyson Exp $ 20 */ 21 22/* 23 * This file contains a high-performance replacement for the socket-based 24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support 25 * all features of sockets, but does do everything that pipes normally 26 * do. 27 */ 28 29/* 30 * This code has two modes of operation, a small write mode and a large 31 * write mode. The small write mode acts like conventional pipes with 32 * a kernel buffer. If the buffer is less than PIPE_MINDIRECT, then the 33 * "normal" pipe buffering is done. If the buffer is between PIPE_MINDIRECT 34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and 35 * the receiving process can copy it directly from the pages in the sending 36 * process. 37 * 38 * If the sending process receives a signal, it is possible that it will 39 * go away, and certainly its address space can change, because control 40 * is returned back to the user-mode side. In that case, the pipe code 41 * arranges to copy the buffer supplied by the user process, to a pageable 42 * kernel buffer, and the receiving process will grab the data from the 43 * pageable kernel buffer. Since signals don't happen all that often, 44 * the copy operation is normally eliminated. 45 * 46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will 47 * happen for small transfers so that the system will not spend all of 48 * its time context switching. PIPE_SIZE is constrained by the 49 * amount of kernel virtual memory. 50 */ 51 52#include <sys/param.h> 53#include <sys/systm.h> 54#include <sys/proc.h> 55#include <sys/fcntl.h> 56#include <sys/file.h> 57#include <sys/protosw.h> 58#include <sys/stat.h> 59#include <sys/filedesc.h> 60#include <sys/malloc.h> 61#include <sys/filio.h> 62#include <sys/ttycom.h> 63#include <sys/stat.h> 64#include <sys/select.h> 65#include <sys/signalvar.h> 66#include <sys/errno.h> 67#include <sys/queue.h> 68#include <sys/vmmeter.h> 69#include <sys/kernel.h> 70#include <sys/sysproto.h> 71#include <sys/pipe.h> 72 73#include <vm/vm.h> 74#include <vm/vm_prot.h> 75#include <vm/vm_param.h> 76#include <sys/lock.h> 77#include <vm/vm_object.h> 78#include <vm/vm_kern.h> 79#include <vm/vm_extern.h> 80#include <vm/pmap.h> 81#include <vm/vm_map.h> 82#include <vm/vm_page.h> 83#include <vm/vm_zone.h> 84 85/* 86 * Use this define if you want to disable *fancy* VM things. Expect an 87 * approx 30% decrease in transfer rate. This could be useful for 88 * NetBSD or OpenBSD. 89 */ 90/* #define PIPE_NODIRECT */ 91 92/* 93 * interfaces to the outside world 94 */ 95static int pipe_read __P((struct file *fp, struct uio *uio, 96 struct ucred *cred)); 97static int pipe_write __P((struct file *fp, struct uio *uio, 98 struct ucred *cred)); 99static int pipe_close __P((struct file *fp, struct proc *p)); 100static int pipe_select __P((struct file *fp, int which, struct proc *p)); 101static int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p)); 102 103static struct fileops pipeops = 104 { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close }; 105 106/* 107 * Default pipe buffer size(s), this can be kind-of large now because pipe 108 * space is pageable. The pipe code will try to maintain locality of 109 * reference for performance reasons, so small amounts of outstanding I/O 110 * will not wipe the cache. 111 */ 112#define MINPIPESIZE (PIPE_SIZE/3) 113#define MAXPIPESIZE (2*PIPE_SIZE/3) 114 115/* 116 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but 117 * is there so that on large systems, we don't exhaust it. 118 */ 119#define MAXPIPEKVA (8*1024*1024) 120 121/* 122 * Limit for direct transfers, we cannot, of course limit 123 * the amount of kva for pipes in general though. 124 */ 125#define LIMITPIPEKVA (16*1024*1024) 126 127/* 128 * Limit the number of "big" pipes 129 */ 130#define LIMITBIGPIPES 32 131int nbigpipe; 132 133static int amountpipekva; 134 135static void pipeclose __P((struct pipe *cpipe)); 136static void pipeinit __P((struct pipe *cpipe)); 137static __inline int pipelock __P((struct pipe *cpipe, int catch)); 138static __inline void pipeunlock __P((struct pipe *cpipe)); 139static __inline void pipeselwakeup __P((struct pipe *cpipe)); 140#ifndef PIPE_NODIRECT 141static int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio)); 142static void pipe_destroy_write_buffer __P((struct pipe *wpipe)); 143static int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio)); 144static void pipe_clone_write_buffer __P((struct pipe *wpipe)); 145#endif 146static void pipespace __P((struct pipe *cpipe)); 147 148vm_zone_t pipe_zone; 149 150/* 151 * The pipe system call for the DTYPE_PIPE type of pipes 152 */ 153 154/* ARGSUSED */ 155int 156pipe(p, uap, retval) 157 struct proc *p; 158 struct pipe_args /* { 159 int dummy; 160 } */ *uap; 161 int retval[]; 162{ 163 register struct filedesc *fdp = p->p_fd; 164 struct file *rf, *wf; 165 struct pipe *rpipe, *wpipe; 166 int fd, error; 167 168 if (pipe_zone == NULL) 169 pipe_zone = zinit("PIPE", sizeof (struct pipe), 0, 0, 4); 170 171 rpipe = zalloc( pipe_zone); 172 pipeinit(rpipe); 173 rpipe->pipe_state |= PIPE_DIRECTOK; 174 wpipe = zalloc( pipe_zone); 175 pipeinit(wpipe); 176 wpipe->pipe_state |= PIPE_DIRECTOK; 177 178 error = falloc(p, &rf, &fd); 179 if (error) 180 goto free2; 181 retval[0] = fd; 182 rf->f_flag = FREAD | FWRITE; 183 rf->f_type = DTYPE_PIPE; 184 rf->f_ops = &pipeops; 185 rf->f_data = (caddr_t)rpipe; 186 error = falloc(p, &wf, &fd); 187 if (error) 188 goto free3; 189 wf->f_flag = FREAD | FWRITE; 190 wf->f_type = DTYPE_PIPE; 191 wf->f_ops = &pipeops; 192 wf->f_data = (caddr_t)wpipe; 193 retval[1] = fd; 194 195 rpipe->pipe_peer = wpipe; 196 wpipe->pipe_peer = rpipe; 197 198 return (0); 199free3: 200 ffree(rf); 201 fdp->fd_ofiles[retval[0]] = 0; 202free2: 203 (void)pipeclose(wpipe); 204 (void)pipeclose(rpipe); 205 return (error); 206} 207 208/* 209 * Allocate kva for pipe circular buffer, the space is pageable 210 */ 211static void 212pipespace(cpipe) 213 struct pipe *cpipe; 214{ 215 int npages, error; 216 217 npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE; 218 /* 219 * Create an object, I don't like the idea of paging to/from 220 * kernel_object. 221 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 222 */ 223 cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages); 224 cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map); 225 226 /* 227 * Insert the object into the kernel map, and allocate kva for it. 228 * The map entry is, by default, pageable. 229 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems. 230 */ 231 error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0, 232 (vm_offset_t *) &cpipe->pipe_buffer.buffer, 233 cpipe->pipe_buffer.size, 1, 234 VM_PROT_ALL, VM_PROT_ALL, 0); 235 236 if (error != KERN_SUCCESS) 237 panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error); 238 amountpipekva += cpipe->pipe_buffer.size; 239} 240 241/* 242 * initialize and allocate VM and memory for pipe 243 */ 244static void 245pipeinit(cpipe) 246 struct pipe *cpipe; 247{ 248 int s; 249 250 cpipe->pipe_buffer.in = 0; 251 cpipe->pipe_buffer.out = 0; 252 cpipe->pipe_buffer.cnt = 0; 253 cpipe->pipe_buffer.size = PIPE_SIZE; 254 255 /* Buffer kva gets dynamically allocated */ 256 cpipe->pipe_buffer.buffer = NULL; 257 /* cpipe->pipe_buffer.object = invalid */ 258 259 cpipe->pipe_state = 0; 260 cpipe->pipe_peer = NULL; 261 cpipe->pipe_busy = 0; 262 gettime(&cpipe->pipe_ctime); 263 cpipe->pipe_atime = cpipe->pipe_ctime; 264 cpipe->pipe_mtime = cpipe->pipe_ctime; 265 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel); 266 cpipe->pipe_pgid = NO_PID; 267 268#ifndef PIPE_NODIRECT 269 /* 270 * pipe data structure initializations to support direct pipe I/O 271 */ 272 cpipe->pipe_map.cnt = 0; 273 cpipe->pipe_map.kva = 0; 274 cpipe->pipe_map.pos = 0; 275 cpipe->pipe_map.npages = 0; 276 /* cpipe->pipe_map.ms[] = invalid */ 277#endif 278} 279 280 281/* 282 * lock a pipe for I/O, blocking other access 283 */ 284static __inline int 285pipelock(cpipe, catch) 286 struct pipe *cpipe; 287 int catch; 288{ 289 int error; 290 while (cpipe->pipe_state & PIPE_LOCK) { 291 cpipe->pipe_state |= PIPE_LWANT; 292 if (error = tsleep( cpipe, 293 catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) { 294 return error; 295 } 296 } 297 cpipe->pipe_state |= PIPE_LOCK; 298 return 0; 299} 300 301/* 302 * unlock a pipe I/O lock 303 */ 304static __inline void 305pipeunlock(cpipe) 306 struct pipe *cpipe; 307{ 308 cpipe->pipe_state &= ~PIPE_LOCK; 309 if (cpipe->pipe_state & PIPE_LWANT) { 310 cpipe->pipe_state &= ~PIPE_LWANT; 311 wakeup(cpipe); 312 } 313} 314 315static __inline void 316pipeselwakeup(cpipe) 317 struct pipe *cpipe; 318{ 319 struct proc *p; 320 321 if (cpipe->pipe_state & PIPE_SEL) { 322 cpipe->pipe_state &= ~PIPE_SEL; 323 selwakeup(&cpipe->pipe_sel); 324 } 325 if (cpipe->pipe_state & PIPE_ASYNC) { 326 if (cpipe->pipe_pgid < 0) 327 gsignal(-cpipe->pipe_pgid, SIGIO); 328 else if ((p = pfind(cpipe->pipe_pgid)) != NULL) 329 psignal(p, SIGIO); 330 } 331} 332 333/* ARGSUSED */ 334static int 335pipe_read(fp, uio, cred) 336 struct file *fp; 337 struct uio *uio; 338 struct ucred *cred; 339{ 340 341 struct pipe *rpipe = (struct pipe *) fp->f_data; 342 int error = 0; 343 int nread = 0; 344 u_int size; 345 346 ++rpipe->pipe_busy; 347 while (uio->uio_resid) { 348 /* 349 * normal pipe buffer receive 350 */ 351 if (rpipe->pipe_buffer.cnt > 0) { 352 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out; 353 if (size > rpipe->pipe_buffer.cnt) 354 size = rpipe->pipe_buffer.cnt; 355 if (size > (u_int) uio->uio_resid) 356 size = (u_int) uio->uio_resid; 357 if ((error = pipelock(rpipe,1)) == 0) { 358 error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out], 359 size, uio); 360 pipeunlock(rpipe); 361 } 362 if (error) { 363 break; 364 } 365 rpipe->pipe_buffer.out += size; 366 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size) 367 rpipe->pipe_buffer.out = 0; 368 369 rpipe->pipe_buffer.cnt -= size; 370 nread += size; 371#ifndef PIPE_NODIRECT 372 /* 373 * Direct copy, bypassing a kernel buffer. 374 */ 375 } else if ((size = rpipe->pipe_map.cnt) && 376 (rpipe->pipe_state & PIPE_DIRECTW)) { 377 caddr_t va; 378 if (size > (u_int) uio->uio_resid) 379 size = (u_int) uio->uio_resid; 380 if ((error = pipelock(rpipe,1)) == 0) { 381 va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos; 382 error = uiomove(va, size, uio); 383 pipeunlock(rpipe); 384 } 385 if (error) 386 break; 387 nread += size; 388 rpipe->pipe_map.pos += size; 389 rpipe->pipe_map.cnt -= size; 390 if (rpipe->pipe_map.cnt == 0) { 391 rpipe->pipe_state &= ~PIPE_DIRECTW; 392 wakeup(rpipe); 393 } 394#endif 395 } else { 396 /* 397 * detect EOF condition 398 */ 399 if (rpipe->pipe_state & PIPE_EOF) { 400 /* XXX error = ? */ 401 break; 402 } 403 /* 404 * If the "write-side" has been blocked, wake it up now. 405 */ 406 if (rpipe->pipe_state & PIPE_WANTW) { 407 rpipe->pipe_state &= ~PIPE_WANTW; 408 wakeup(rpipe); 409 } 410 if (nread > 0) 411 break; 412 413 if (fp->f_flag & FNONBLOCK) { 414 error = EAGAIN; 415 break; 416 } 417 418 /* 419 * If there is no more to read in the pipe, reset 420 * its pointers to the beginning. This improves 421 * cache hit stats. 422 */ 423 424 if ((error = pipelock(rpipe,1)) == 0) { 425 if (rpipe->pipe_buffer.cnt == 0) { 426 rpipe->pipe_buffer.in = 0; 427 rpipe->pipe_buffer.out = 0; 428 } 429 pipeunlock(rpipe); 430 } else { 431 break; 432 } 433 434 if (rpipe->pipe_state & PIPE_WANTW) { 435 rpipe->pipe_state &= ~PIPE_WANTW; 436 wakeup(rpipe); 437 } 438 439 rpipe->pipe_state |= PIPE_WANTR; 440 if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) { 441 break; 442 } 443 } 444 } 445 446 if (error == 0) 447 gettime(&rpipe->pipe_atime); 448 449 --rpipe->pipe_busy; 450 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { 451 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); 452 wakeup(rpipe); 453 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) { 454 /* 455 * If there is no more to read in the pipe, reset 456 * its pointers to the beginning. This improves 457 * cache hit stats. 458 */ 459 if (rpipe->pipe_buffer.cnt == 0) { 460 if ((error == 0) && (error = pipelock(rpipe,1)) == 0) { 461 rpipe->pipe_buffer.in = 0; 462 rpipe->pipe_buffer.out = 0; 463 pipeunlock(rpipe); 464 } 465 } 466 467 /* 468 * If the "write-side" has been blocked, wake it up now. 469 */ 470 if (rpipe->pipe_state & PIPE_WANTW) { 471 rpipe->pipe_state &= ~PIPE_WANTW; 472 wakeup(rpipe); 473 } 474 } 475 476 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) 477 pipeselwakeup(rpipe); 478 479 return error; 480} 481 482#ifndef PIPE_NODIRECT 483/* 484 * Map the sending processes' buffer into kernel space and wire it. 485 * This is similar to a physical write operation. 486 */ 487static int 488pipe_build_write_buffer(wpipe, uio) 489 struct pipe *wpipe; 490 struct uio *uio; 491{ 492 u_int size; 493 int i; 494 vm_offset_t addr, endaddr, paddr; 495 496 size = (u_int) uio->uio_iov->iov_len; 497 if (size > wpipe->pipe_buffer.size) 498 size = wpipe->pipe_buffer.size; 499 500 endaddr = round_page(uio->uio_iov->iov_base + size); 501 for(i = 0, addr = trunc_page(uio->uio_iov->iov_base); 502 addr < endaddr; 503 addr += PAGE_SIZE, i+=1) { 504 505 vm_page_t m; 506 507 vm_fault_quick( (caddr_t) addr, VM_PROT_READ); 508 paddr = pmap_kextract(addr); 509 if (!paddr) { 510 int j; 511 for(j=0;j<i;j++) 512 vm_page_unwire(wpipe->pipe_map.ms[j]); 513 return EFAULT; 514 } 515 516 m = PHYS_TO_VM_PAGE(paddr); 517 vm_page_wire(m); 518 wpipe->pipe_map.ms[i] = m; 519 } 520 521/* 522 * set up the control block 523 */ 524 wpipe->pipe_map.npages = i; 525 wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK; 526 wpipe->pipe_map.cnt = size; 527 528/* 529 * and map the buffer 530 */ 531 if (wpipe->pipe_map.kva == 0) { 532 /* 533 * We need to allocate space for an extra page because the 534 * address range might (will) span pages at times. 535 */ 536 wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map, 537 wpipe->pipe_buffer.size + PAGE_SIZE); 538 amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE; 539 } 540 pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms, 541 wpipe->pipe_map.npages); 542 543/* 544 * and update the uio data 545 */ 546 547 uio->uio_iov->iov_len -= size; 548 uio->uio_iov->iov_base += size; 549 if (uio->uio_iov->iov_len == 0) 550 uio->uio_iov++; 551 uio->uio_resid -= size; 552 uio->uio_offset += size; 553 return 0; 554} 555 556/* 557 * unmap and unwire the process buffer 558 */ 559static void 560pipe_destroy_write_buffer(wpipe) 561struct pipe *wpipe; 562{ 563 int i; 564 if (wpipe->pipe_map.kva) { 565 pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages); 566 567 if (amountpipekva > MAXPIPEKVA) { 568 vm_offset_t kva = wpipe->pipe_map.kva; 569 wpipe->pipe_map.kva = 0; 570 kmem_free(kernel_map, kva, 571 wpipe->pipe_buffer.size + PAGE_SIZE); 572 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 573 } 574 } 575 for (i=0;i<wpipe->pipe_map.npages;i++) 576 vm_page_unwire(wpipe->pipe_map.ms[i]); 577} 578 579/* 580 * In the case of a signal, the writing process might go away. This 581 * code copies the data into the circular buffer so that the source 582 * pages can be freed without loss of data. 583 */ 584static void 585pipe_clone_write_buffer(wpipe) 586struct pipe *wpipe; 587{ 588 int size; 589 int pos; 590 591 size = wpipe->pipe_map.cnt; 592 pos = wpipe->pipe_map.pos; 593 bcopy((caddr_t) wpipe->pipe_map.kva+pos, 594 (caddr_t) wpipe->pipe_buffer.buffer, 595 size); 596 597 wpipe->pipe_buffer.in = size; 598 wpipe->pipe_buffer.out = 0; 599 wpipe->pipe_buffer.cnt = size; 600 wpipe->pipe_state &= ~PIPE_DIRECTW; 601 602 pipe_destroy_write_buffer(wpipe); 603} 604 605/* 606 * This implements the pipe buffer write mechanism. Note that only 607 * a direct write OR a normal pipe write can be pending at any given time. 608 * If there are any characters in the pipe buffer, the direct write will 609 * be deferred until the receiving process grabs all of the bytes from 610 * the pipe buffer. Then the direct mapping write is set-up. 611 */ 612static int 613pipe_direct_write(wpipe, uio) 614 struct pipe *wpipe; 615 struct uio *uio; 616{ 617 int error; 618retry: 619 while (wpipe->pipe_state & PIPE_DIRECTW) { 620 if ( wpipe->pipe_state & PIPE_WANTR) { 621 wpipe->pipe_state &= ~PIPE_WANTR; 622 wakeup(wpipe); 623 } 624 wpipe->pipe_state |= PIPE_WANTW; 625 error = tsleep(wpipe, 626 PRIBIO|PCATCH, "pipdww", 0); 627 if (error) 628 goto error1; 629 if (wpipe->pipe_state & PIPE_EOF) { 630 error = EPIPE; 631 goto error1; 632 } 633 } 634 wpipe->pipe_map.cnt = 0; /* transfer not ready yet */ 635 if (wpipe->pipe_buffer.cnt > 0) { 636 if ( wpipe->pipe_state & PIPE_WANTR) { 637 wpipe->pipe_state &= ~PIPE_WANTR; 638 wakeup(wpipe); 639 } 640 641 wpipe->pipe_state |= PIPE_WANTW; 642 error = tsleep(wpipe, 643 PRIBIO|PCATCH, "pipdwc", 0); 644 if (error) 645 goto error1; 646 if (wpipe->pipe_state & PIPE_EOF) { 647 error = EPIPE; 648 goto error1; 649 } 650 goto retry; 651 } 652 653 wpipe->pipe_state |= PIPE_DIRECTW; 654 655 error = pipe_build_write_buffer(wpipe, uio); 656 if (error) { 657 wpipe->pipe_state &= ~PIPE_DIRECTW; 658 goto error1; 659 } 660 661 error = 0; 662 while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) { 663 if (wpipe->pipe_state & PIPE_EOF) { 664 pipelock(wpipe, 0); 665 pipe_destroy_write_buffer(wpipe); 666 pipeunlock(wpipe); 667 pipeselwakeup(wpipe); 668 error = EPIPE; 669 goto error1; 670 } 671 if (wpipe->pipe_state & PIPE_WANTR) { 672 wpipe->pipe_state &= ~PIPE_WANTR; 673 wakeup(wpipe); 674 } 675 pipeselwakeup(wpipe); 676 error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0); 677 } 678 679 pipelock(wpipe,0); 680 if (wpipe->pipe_state & PIPE_DIRECTW) { 681 /* 682 * this bit of trickery substitutes a kernel buffer for 683 * the process that might be going away. 684 */ 685 pipe_clone_write_buffer(wpipe); 686 } else { 687 pipe_destroy_write_buffer(wpipe); 688 } 689 pipeunlock(wpipe); 690 return error; 691 692error1: 693 wakeup(wpipe); 694 return error; 695} 696#endif 697 698static int 699pipe_write(fp, uio, cred) 700 struct file *fp; 701 struct uio *uio; 702 struct ucred *cred; 703{ 704 int error = 0; 705 int orig_resid; 706 707 struct pipe *wpipe, *rpipe; 708 709 rpipe = (struct pipe *) fp->f_data; 710 wpipe = rpipe->pipe_peer; 711 712 /* 713 * detect loss of pipe read side, issue SIGPIPE if lost. 714 */ 715 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { 716 return EPIPE; 717 } 718 719 /* 720 * If it is advantageous to resize the pipe buffer, do 721 * so. 722 */ 723 if ((uio->uio_resid > PIPE_SIZE) && 724 (nbigpipe < LIMITBIGPIPES) && 725 (wpipe->pipe_state & PIPE_DIRECTW) == 0 && 726 (wpipe->pipe_buffer.size <= PIPE_SIZE) && 727 (wpipe->pipe_buffer.cnt == 0)) { 728 729 if (wpipe->pipe_buffer.buffer) { 730 amountpipekva -= wpipe->pipe_buffer.size; 731 kmem_free(kernel_map, 732 (vm_offset_t)wpipe->pipe_buffer.buffer, 733 wpipe->pipe_buffer.size); 734 } 735 736#ifndef PIPE_NODIRECT 737 if (wpipe->pipe_map.kva) { 738 amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE; 739 kmem_free(kernel_map, 740 wpipe->pipe_map.kva, 741 wpipe->pipe_buffer.size + PAGE_SIZE); 742 } 743#endif 744 745 wpipe->pipe_buffer.in = 0; 746 wpipe->pipe_buffer.out = 0; 747 wpipe->pipe_buffer.cnt = 0; 748 wpipe->pipe_buffer.size = BIG_PIPE_SIZE; 749 wpipe->pipe_buffer.buffer = NULL; 750 ++nbigpipe; 751 752#ifndef PIPE_NODIRECT 753 wpipe->pipe_map.cnt = 0; 754 wpipe->pipe_map.kva = 0; 755 wpipe->pipe_map.pos = 0; 756 wpipe->pipe_map.npages = 0; 757#endif 758 759 } 760 761 762 if( wpipe->pipe_buffer.buffer == NULL) { 763 if ((error = pipelock(wpipe,1)) == 0) { 764 pipespace(wpipe); 765 pipeunlock(wpipe); 766 } else { 767 return error; 768 } 769 } 770 771 ++wpipe->pipe_busy; 772 orig_resid = uio->uio_resid; 773 while (uio->uio_resid) { 774 int space; 775#ifndef PIPE_NODIRECT 776 /* 777 * If the transfer is large, we can gain performance if 778 * we do process-to-process copies directly. 779 * If the write is non-blocking, we don't use the 780 * direct write mechanism. 781 */ 782 if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) && 783 (fp->f_flag & FNONBLOCK) == 0 && 784 (wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) && 785 (uio->uio_iov->iov_len >= PIPE_MINDIRECT)) { 786 error = pipe_direct_write( wpipe, uio); 787 if (error) { 788 break; 789 } 790 continue; 791 } 792#endif 793 794 /* 795 * Pipe buffered writes cannot be coincidental with 796 * direct writes. We wait until the currently executing 797 * direct write is completed before we start filling the 798 * pipe buffer. 799 */ 800 retrywrite: 801 while (wpipe->pipe_state & PIPE_DIRECTW) { 802 if (wpipe->pipe_state & PIPE_WANTR) { 803 wpipe->pipe_state &= ~PIPE_WANTR; 804 wakeup(wpipe); 805 } 806 error = tsleep(wpipe, 807 PRIBIO|PCATCH, "pipbww", 0); 808 if (error) 809 break; 810 } 811 812 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; 813 814 /* Writes of size <= PIPE_BUF must be atomic. */ 815 /* XXX perhaps they need to be contiguous to be atomic? */ 816 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) 817 space = 0; 818 819 if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) { 820 /* 821 * This set the maximum transfer as a segment of 822 * the buffer. 823 */ 824 int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in; 825 /* 826 * space is the size left in the buffer 827 */ 828 if (size > space) 829 size = space; 830 /* 831 * now limit it to the size of the uio transfer 832 */ 833 if (size > uio->uio_resid) 834 size = uio->uio_resid; 835 if ((error = pipelock(wpipe,1)) == 0) { 836 /* 837 * It is possible for a direct write to 838 * slip in on us... handle it here... 839 */ 840 if (wpipe->pipe_state & PIPE_DIRECTW) { 841 pipeunlock(wpipe); 842 goto retrywrite; 843 } 844 error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in], 845 size, uio); 846 pipeunlock(wpipe); 847 } 848 if (error) 849 break; 850 851 wpipe->pipe_buffer.in += size; 852 if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size) 853 wpipe->pipe_buffer.in = 0; 854 855 wpipe->pipe_buffer.cnt += size; 856 } else { 857 /* 858 * If the "read-side" has been blocked, wake it up now. 859 */ 860 if (wpipe->pipe_state & PIPE_WANTR) { 861 wpipe->pipe_state &= ~PIPE_WANTR; 862 wakeup(wpipe); 863 } 864 865 /* 866 * don't block on non-blocking I/O 867 */ 868 if (fp->f_flag & FNONBLOCK) { 869 error = EAGAIN; 870 break; 871 } 872 873 /* 874 * We have no more space and have something to offer, 875 * wake up selects. 876 */ 877 pipeselwakeup(wpipe); 878 879 wpipe->pipe_state |= PIPE_WANTW; 880 if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) { 881 break; 882 } 883 /* 884 * If read side wants to go away, we just issue a signal 885 * to ourselves. 886 */ 887 if (wpipe->pipe_state & PIPE_EOF) { 888 error = EPIPE; 889 break; 890 } 891 } 892 } 893 894 --wpipe->pipe_busy; 895 if ((wpipe->pipe_busy == 0) && 896 (wpipe->pipe_state & PIPE_WANT)) { 897 wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR); 898 wakeup(wpipe); 899 } else if (wpipe->pipe_buffer.cnt > 0) { 900 /* 901 * If we have put any characters in the buffer, we wake up 902 * the reader. 903 */ 904 if (wpipe->pipe_state & PIPE_WANTR) { 905 wpipe->pipe_state &= ~PIPE_WANTR; 906 wakeup(wpipe); 907 } 908 } 909 910 /* 911 * Don't return EPIPE if I/O was successful 912 */ 913 if ((wpipe->pipe_buffer.cnt == 0) && 914 (uio->uio_resid == 0) && 915 (error == EPIPE)) 916 error = 0; 917 918 if (error == 0) 919 gettime(&wpipe->pipe_mtime); 920 921 /* 922 * We have something to offer, 923 * wake up select. 924 */ 925 if (wpipe->pipe_buffer.cnt) 926 pipeselwakeup(wpipe); 927 928 return error; 929} 930 931/* 932 * we implement a very minimal set of ioctls for compatibility with sockets. 933 */ 934int 935pipe_ioctl(fp, cmd, data, p) 936 struct file *fp; 937 int cmd; 938 register caddr_t data; 939 struct proc *p; 940{ 941 register struct pipe *mpipe = (struct pipe *)fp->f_data; 942 943 switch (cmd) { 944 945 case FIONBIO: 946 return (0); 947 948 case FIOASYNC: 949 if (*(int *)data) { 950 mpipe->pipe_state |= PIPE_ASYNC; 951 } else { 952 mpipe->pipe_state &= ~PIPE_ASYNC; 953 } 954 return (0); 955 956 case FIONREAD: 957 if (mpipe->pipe_state & PIPE_DIRECTW) 958 *(int *)data = mpipe->pipe_map.cnt; 959 else 960 *(int *)data = mpipe->pipe_buffer.cnt; 961 return (0); 962 963 case TIOCSPGRP: 964 mpipe->pipe_pgid = *(int *)data; 965 return (0); 966 967 case TIOCGPGRP: 968 *(int *)data = mpipe->pipe_pgid; 969 return (0); 970 971 } 972 return (ENOTTY); 973} 974 975int 976pipe_select(fp, which, p) 977 struct file *fp; 978 int which; 979 struct proc *p; 980{ 981 register struct pipe *rpipe = (struct pipe *)fp->f_data; 982 struct pipe *wpipe; 983 984 wpipe = rpipe->pipe_peer; 985 switch (which) { 986 987 case FREAD: 988 if ( (rpipe->pipe_state & PIPE_DIRECTW) || 989 (rpipe->pipe_buffer.cnt > 0) || 990 (rpipe->pipe_state & PIPE_EOF)) { 991 return (1); 992 } 993 selrecord(p, &rpipe->pipe_sel); 994 rpipe->pipe_state |= PIPE_SEL; 995 break; 996 997 case FWRITE: 998 if ((wpipe == NULL) || 999 (wpipe->pipe_state & PIPE_EOF) || 1000 (((wpipe->pipe_state & PIPE_DIRECTW) == 0) && 1001 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) { 1002 return (1); 1003 } 1004 selrecord(p, &wpipe->pipe_sel); 1005 wpipe->pipe_state |= PIPE_SEL; 1006 break; 1007 1008 case 0: 1009 if ((rpipe->pipe_state & PIPE_EOF) || 1010 (wpipe == NULL) || 1011 (wpipe->pipe_state & PIPE_EOF)) { 1012 return (1); 1013 } 1014 1015 selrecord(p, &rpipe->pipe_sel); 1016 rpipe->pipe_state |= PIPE_SEL; 1017 break; 1018 } 1019 return (0); 1020} 1021 1022int 1023pipe_stat(pipe, ub) 1024 register struct pipe *pipe; 1025 register struct stat *ub; 1026{ 1027 bzero((caddr_t)ub, sizeof (*ub)); 1028 ub->st_mode = S_IFIFO; 1029 ub->st_blksize = pipe->pipe_buffer.size; 1030 ub->st_size = pipe->pipe_buffer.cnt; 1031 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; 1032 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec); 1033 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec); 1034 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec); 1035 /* 1036 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, 1037 * st_flags, st_gen. 1038 * XXX (st_dev, st_ino) should be unique. 1039 */ 1040 return 0; 1041} 1042 1043/* ARGSUSED */ 1044static int 1045pipe_close(fp, p) 1046 struct file *fp; 1047 struct proc *p; 1048{ 1049 struct pipe *cpipe = (struct pipe *)fp->f_data; 1050 1051 pipeclose(cpipe); 1052 fp->f_data = NULL; 1053 return 0; 1054} 1055 1056/* 1057 * shutdown the pipe 1058 */ 1059static void 1060pipeclose(cpipe) 1061 struct pipe *cpipe; 1062{ 1063 struct pipe *ppipe; 1064 if (cpipe) { 1065 1066 pipeselwakeup(cpipe); 1067 1068 /* 1069 * If the other side is blocked, wake it up saying that 1070 * we want to close it down. 1071 */ 1072 while (cpipe->pipe_busy) { 1073 wakeup(cpipe); 1074 cpipe->pipe_state |= PIPE_WANT|PIPE_EOF; 1075 tsleep(cpipe, PRIBIO, "pipecl", 0); 1076 } 1077 1078 /* 1079 * Disconnect from peer 1080 */ 1081 if (ppipe = cpipe->pipe_peer) { 1082 pipeselwakeup(ppipe); 1083 1084 ppipe->pipe_state |= PIPE_EOF; 1085 wakeup(ppipe); 1086 ppipe->pipe_peer = NULL; 1087 } 1088 1089 /* 1090 * free resources 1091 */ 1092 if (cpipe->pipe_buffer.buffer) { 1093 if (cpipe->pipe_buffer.size > PIPE_SIZE) 1094 --nbigpipe; 1095 amountpipekva -= cpipe->pipe_buffer.size; 1096 kmem_free(kernel_map, 1097 (vm_offset_t)cpipe->pipe_buffer.buffer, 1098 cpipe->pipe_buffer.size); 1099 } 1100#ifndef PIPE_NODIRECT 1101 if (cpipe->pipe_map.kva) { 1102 amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE; 1103 kmem_free(kernel_map, 1104 cpipe->pipe_map.kva, 1105 cpipe->pipe_buffer.size + PAGE_SIZE); 1106 } 1107#endif 1108 zfree(pipe_zone, cpipe); 1109 } 1110} 1111