sys_pipe.c revision 16322
11541Srgrimes/*
21541Srgrimes * Copyright (c) 1996 John S. Dyson
31541Srgrimes * All rights reserved.
41541Srgrimes *
51541Srgrimes * Redistribution and use in source and binary forms, with or without
61541Srgrimes * modification, are permitted provided that the following conditions
71541Srgrimes * are met:
81541Srgrimes * 1. Redistributions of source code must retain the above copyright
91541Srgrimes *    notice immediately at the beginning of the file, without modification,
101541Srgrimes *    this list of conditions, and the following disclaimer.
111541Srgrimes * 2. Redistributions in binary form must reproduce the above copyright
121541Srgrimes *    notice, this list of conditions and the following disclaimer in the
131541Srgrimes *    documentation and/or other materials provided with the distribution.
141541Srgrimes * 3. Absolutely no warranty of function or purpose is made by the author
151541Srgrimes *    John S. Dyson.
161541Srgrimes * 4. Modifications may be freely made to this file if the above conditions
171541Srgrimes *    are met.
181541Srgrimes *
191541Srgrimes * $Id: sys_pipe.c,v 1.15 1996/03/25 01:48:28 dyson Exp $
201541Srgrimes */
211541Srgrimes
221541Srgrimes#ifndef OLD_PIPE
231541Srgrimes
241541Srgrimes/*
251541Srgrimes * This file contains a high-performance replacement for the socket-based
261541Srgrimes * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
271541Srgrimes * all features of sockets, but does do everything that pipes normally
281541Srgrimes * do.
291541Srgrimes */
301541Srgrimes
311541Srgrimes/*
321541Srgrimes * This code has two modes of operation, a small write mode and a large
331541Srgrimes * write mode.  The small write mode acts like conventional pipes with
341541Srgrimes * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
351541Srgrimes * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
361541Srgrimes * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
371541Srgrimes * the receiving process can copy it directly from the pages in the sending
381541Srgrimes * process.
392112Swollman *
401541Srgrimes * If the sending process receives a signal, it is possible that it will
411541Srgrimes * go away, and certainly its address space can change, because control
421541Srgrimes * is returned back to the user-mode side.  In that case, the pipe code
431541Srgrimes * arranges to copy the buffer supplied by the user process, to a pageable
442112Swollman * kernel buffer, and the receiving process will grab the data from the
451541Srgrimes * pageable kernel buffer.  Since signals don't happen all that often,
461541Srgrimes * the copy operation is normally eliminated.
471541Srgrimes *
481541Srgrimes * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
491541Srgrimes * happen for small transfers so that the system will not spend all of
501541Srgrimes * its time context switching.  PIPE_SIZE is constrained by the
511541Srgrimes * amount of kernel virtual memory.
521541Srgrimes */
531541Srgrimes
541541Srgrimes#include <sys/param.h>
551541Srgrimes#include <sys/systm.h>
561541Srgrimes#include <sys/proc.h>
571541Srgrimes#include <sys/file.h>
581541Srgrimes#include <sys/protosw.h>
591541Srgrimes#include <sys/stat.h>
601541Srgrimes#include <sys/filedesc.h>
611541Srgrimes#include <sys/malloc.h>
621541Srgrimes#include <sys/ioctl.h>
631541Srgrimes#include <sys/stat.h>
641541Srgrimes#include <sys/select.h>
651541Srgrimes#include <sys/signalvar.h>
661541Srgrimes#include <sys/errno.h>
671541Srgrimes#include <sys/queue.h>
681541Srgrimes#include <sys/vmmeter.h>
691541Srgrimes#include <sys/kernel.h>
701541Srgrimes#include <sys/sysproto.h>
711541Srgrimes#include <sys/pipe.h>
721541Srgrimes
731541Srgrimes#include <vm/vm.h>
741541Srgrimes#include <vm/vm_prot.h>
751541Srgrimes#include <vm/vm_param.h>
761541Srgrimes#include <vm/lock.h>
771541Srgrimes#include <vm/vm_object.h>
781541Srgrimes#include <vm/vm_kern.h>
791541Srgrimes#include <vm/vm_extern.h>
801541Srgrimes#include <vm/pmap.h>
811541Srgrimes#include <vm/vm_map.h>
821541Srgrimes#include <vm/vm_page.h>
831541Srgrimes
841541Srgrimes/*
851541Srgrimes * Use this define if you want to disable *fancy* VM things.  Expect an
861541Srgrimes * approx 30% decrease in transfer rate.  This could be useful for
871541Srgrimes * NetBSD or OpenBSD.
881541Srgrimes */
891541Srgrimes/* #define PIPE_NODIRECT */
901541Srgrimes
911541Srgrimes/*
921541Srgrimes * interfaces to the outside world
931541Srgrimes */
941541Srgrimesstatic int pipe_read __P((struct file *fp, struct uio *uio,
951541Srgrimes		struct ucred *cred));
961541Srgrimesstatic int pipe_write __P((struct file *fp, struct uio *uio,
971541Srgrimes		struct ucred *cred));
981541Srgrimesstatic int pipe_close __P((struct file *fp, struct proc *p));
991541Srgrimesstatic int pipe_select __P((struct file *fp, int which, struct proc *p));
1001541Srgrimesstatic int pipe_ioctl __P((struct file *fp, int cmd, caddr_t data, struct proc *p));
1011541Srgrimes
1021541Srgrimesstatic struct fileops pipeops =
1031541Srgrimes    { pipe_read, pipe_write, pipe_ioctl, pipe_select, pipe_close };
1041541Srgrimes
1051541Srgrimes/*
1061541Srgrimes * Default pipe buffer size(s), this can be kind-of large now because pipe
1071541Srgrimes * space is pageable.  The pipe code will try to maintain locality of
1081541Srgrimes * reference for performance reasons, so small amounts of outstanding I/O
1091541Srgrimes * will not wipe the cache.
1101541Srgrimes */
1111541Srgrimes#define MINPIPESIZE (PIPE_SIZE/3)
1121541Srgrimes#define MAXPIPESIZE (2*PIPE_SIZE/3)
1131541Srgrimes
1141541Srgrimes/*
1151541Srgrimes * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
1161541Srgrimes * is there so that on large systems, we don't exhaust it.
1171541Srgrimes */
1181541Srgrimes#define MAXPIPEKVA (8*1024*1024)
1191541Srgrimes
1201541Srgrimes/*
1211541Srgrimes * Limit for direct transfers, we cannot, of course limit
1221541Srgrimes * the amount of kva for pipes in general though.
1231541Srgrimes */
1241541Srgrimes#define LIMITPIPEKVA (16*1024*1024)
1251541Srgrimesint amountpipekva;
1261541Srgrimes
1271541Srgrimesstatic void pipeclose __P((struct pipe *cpipe));
1281541Srgrimesstatic void pipeinit __P((struct pipe *cpipe));
1291541Srgrimesstatic __inline int pipelock __P((struct pipe *cpipe, int catch));
1301541Srgrimesstatic __inline void pipeunlock __P((struct pipe *cpipe));
1311541Srgrimesstatic __inline void pipeselwakeup __P((struct pipe *cpipe));
1321541Srgrimes#ifndef PIPE_NODIRECT
1331541Srgrimesstatic int pipe_build_write_buffer __P((struct pipe *wpipe, struct uio *uio));
1341541Srgrimesstatic void pipe_destroy_write_buffer __P((struct pipe *wpipe));
1351541Srgrimesstatic int pipe_direct_write __P((struct pipe *wpipe, struct uio *uio));
1361541Srgrimesstatic void pipe_clone_write_buffer __P((struct pipe *wpipe));
1371541Srgrimes#endif
1381541Srgrimesstatic int pipewrite __P((struct pipe *wpipe, struct uio *uio, int nbio));
1391541Srgrimesstatic void pipespace __P((struct pipe *cpipe));
1401541Srgrimes
1411541Srgrimes/*
1421541Srgrimes * The pipe system call for the DTYPE_PIPE type of pipes
1431541Srgrimes */
1441541Srgrimes
1451541Srgrimes/* ARGSUSED */
1461541Srgrimesint
1471541Srgrimespipe(p, uap, retval)
1481541Srgrimes	struct proc *p;
1491541Srgrimes	struct pipe_args /* {
1501541Srgrimes		int	dummy;
1511541Srgrimes	} */ *uap;
1521541Srgrimes	int retval[];
1531541Srgrimes{
1541541Srgrimes	register struct filedesc *fdp = p->p_fd;
1551541Srgrimes	struct file *rf, *wf;
1561541Srgrimes	struct pipe *rpipe, *wpipe;
1571541Srgrimes	int fd, error;
1581541Srgrimes
1591541Srgrimes	rpipe = malloc( sizeof (*rpipe), M_TEMP, M_WAITOK);
1601541Srgrimes	pipeinit(rpipe);
1611541Srgrimes	rpipe->pipe_state |= PIPE_DIRECTOK;
1621541Srgrimes	wpipe = malloc( sizeof (*wpipe), M_TEMP, M_WAITOK);
1631541Srgrimes	pipeinit(wpipe);
1641541Srgrimes	wpipe->pipe_state |= PIPE_DIRECTOK;
1651541Srgrimes
1661541Srgrimes	error = falloc(p, &rf, &fd);
1671541Srgrimes	if (error)
1681541Srgrimes		goto free2;
1691541Srgrimes	retval[0] = fd;
1701541Srgrimes	rf->f_flag = FREAD | FWRITE;
1711541Srgrimes	rf->f_type = DTYPE_PIPE;
1721541Srgrimes	rf->f_ops = &pipeops;
1731541Srgrimes	rf->f_data = (caddr_t)rpipe;
1741541Srgrimes	error = falloc(p, &wf, &fd);
1751541Srgrimes	if (error)
1761541Srgrimes		goto free3;
1771541Srgrimes	wf->f_flag = FREAD | FWRITE;
1781541Srgrimes	wf->f_type = DTYPE_PIPE;
1791541Srgrimes	wf->f_ops = &pipeops;
1801541Srgrimes	wf->f_data = (caddr_t)wpipe;
1811541Srgrimes	retval[1] = fd;
1821541Srgrimes
1831541Srgrimes	rpipe->pipe_peer = wpipe;
1841541Srgrimes	wpipe->pipe_peer = rpipe;
1851541Srgrimes
1861541Srgrimes	return (0);
1871541Srgrimesfree3:
1881541Srgrimes	ffree(rf);
1891541Srgrimes	fdp->fd_ofiles[retval[0]] = 0;
1901541Srgrimesfree2:
1911541Srgrimes	(void)pipeclose(wpipe);
1921541Srgrimesfree1:
1931541Srgrimes	(void)pipeclose(rpipe);
1941541Srgrimes	return (error);
1951541Srgrimes}
1961541Srgrimes
1971541Srgrimes/*
1981541Srgrimes * Allocate kva for pipe circular buffer, the space is pageable
1991541Srgrimes */
2001541Srgrimesstatic void
2011541Srgrimespipespace(cpipe)
2021541Srgrimes	struct pipe *cpipe;
2031541Srgrimes{
2041541Srgrimes	int npages, error;
2051541Srgrimes
2061541Srgrimes	npages = round_page(cpipe->pipe_buffer.size)/PAGE_SIZE;
2071541Srgrimes	/*
2081541Srgrimes	 * Create an object, I don't like the idea of paging to/from
2091541Srgrimes	 * kernel_object.
2101541Srgrimes	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
2111541Srgrimes	 */
2121541Srgrimes	cpipe->pipe_buffer.object = vm_object_allocate(OBJT_DEFAULT, npages);
2131541Srgrimes	cpipe->pipe_buffer.buffer = (caddr_t) vm_map_min(kernel_map);
2141541Srgrimes
2151541Srgrimes	/*
2161541Srgrimes	 * Insert the object into the kernel map, and allocate kva for it.
2171541Srgrimes	 * The map entry is, by default, pageable.
2181541Srgrimes	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
2191541Srgrimes	 */
2201541Srgrimes	error = vm_map_find(kernel_map, cpipe->pipe_buffer.object, 0,
2211541Srgrimes		(vm_offset_t *) &cpipe->pipe_buffer.buffer,
2221549Srgrimes		cpipe->pipe_buffer.size, 1,
2231541Srgrimes		VM_PROT_ALL, VM_PROT_ALL, 0);
2241541Srgrimes
2251541Srgrimes	if (error != KERN_SUCCESS)
2261541Srgrimes		panic("pipeinit: cannot allocate pipe -- out of kvm -- code = %d", error);
2271541Srgrimes	amountpipekva += cpipe->pipe_buffer.size;
2281541Srgrimes}
2291541Srgrimes
2301541Srgrimes/*
2311541Srgrimes * initialize and allocate VM and memory for pipe
2321541Srgrimes */
2331541Srgrimesstatic void
2341541Srgrimespipeinit(cpipe)
2351541Srgrimes	struct pipe *cpipe;
2361541Srgrimes{
2371541Srgrimes	int s;
2381541Srgrimes
2391541Srgrimes	cpipe->pipe_buffer.in = 0;
2401541Srgrimes	cpipe->pipe_buffer.out = 0;
2411541Srgrimes	cpipe->pipe_buffer.cnt = 0;
2421541Srgrimes	cpipe->pipe_buffer.size = PIPE_SIZE;
2431541Srgrimes	/* Buffer kva gets dynamically allocated */
2441541Srgrimes	cpipe->pipe_buffer.buffer = NULL;
2451541Srgrimes
2461541Srgrimes	cpipe->pipe_state = 0;
2471541Srgrimes	cpipe->pipe_peer = NULL;
2481541Srgrimes	cpipe->pipe_busy = 0;
2491541Srgrimes	s = splhigh();
250	cpipe->pipe_ctime = time;
251	cpipe->pipe_atime = time;
252	cpipe->pipe_mtime = time;
253	splx(s);
254	bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
255
256#ifndef PIPE_NODIRECT
257	/*
258	 * pipe data structure initializations to support direct pipe I/O
259	 */
260	cpipe->pipe_map.cnt = 0;
261	cpipe->pipe_map.kva = 0;
262	cpipe->pipe_map.pos = 0;
263	cpipe->pipe_map.npages = 0;
264#endif
265}
266
267
268/*
269 * lock a pipe for I/O, blocking other access
270 */
271static __inline int
272pipelock(cpipe, catch)
273	struct pipe *cpipe;
274	int catch;
275{
276	int error;
277	while (cpipe->pipe_state & PIPE_LOCK) {
278		cpipe->pipe_state |= PIPE_LWANT;
279		if (error = tsleep( cpipe,
280			catch?(PRIBIO|PCATCH):PRIBIO, "pipelk", 0)) {
281			return error;
282		}
283	}
284	cpipe->pipe_state |= PIPE_LOCK;
285	return 0;
286}
287
288/*
289 * unlock a pipe I/O lock
290 */
291static __inline void
292pipeunlock(cpipe)
293	struct pipe *cpipe;
294{
295	cpipe->pipe_state &= ~PIPE_LOCK;
296	if (cpipe->pipe_state & PIPE_LWANT) {
297		cpipe->pipe_state &= ~PIPE_LWANT;
298		wakeup(cpipe);
299	}
300	return;
301}
302
303static __inline void
304pipeselwakeup(cpipe)
305	struct pipe *cpipe;
306{
307	if (cpipe->pipe_state & PIPE_SEL) {
308		cpipe->pipe_state &= ~PIPE_SEL;
309		selwakeup(&cpipe->pipe_sel);
310	}
311}
312
313#ifndef PIPE_NODIRECT
314#if 0
315static void
316pipe_mark_pages_clean(cpipe)
317	struct pipe *cpipe;
318{
319	vm_size_t off;
320	vm_page_t m;
321
322	for(off = 0; off < cpipe->pipe_buffer.object->size; off += 1) {
323		m = vm_page_lookup(cpipe->pipe_buffer.object, off);
324		if ((m != NULL) && (m->busy == 0) && (m->flags & PG_BUSY) == 0) {
325			m->dirty = 0;
326			pmap_clear_modify(VM_PAGE_TO_PHYS(m));
327		}
328	}
329}
330#endif
331#endif
332
333/* ARGSUSED */
334static int
335pipe_read(fp, uio, cred)
336	struct file *fp;
337	struct uio *uio;
338	struct ucred *cred;
339{
340
341	struct pipe *rpipe = (struct pipe *) fp->f_data;
342	int error = 0;
343	int nread = 0;
344	int size;
345
346	++rpipe->pipe_busy;
347	while (uio->uio_resid) {
348		/*
349		 * normal pipe buffer receive
350		 */
351		if (rpipe->pipe_buffer.cnt > 0) {
352			int size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
353			if (size > rpipe->pipe_buffer.cnt)
354				size = rpipe->pipe_buffer.cnt;
355			if (size > uio->uio_resid)
356				size = uio->uio_resid;
357			if ((error = pipelock(rpipe,1)) == 0) {
358				error = uiomove( &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
359					size, uio);
360				pipeunlock(rpipe);
361			}
362			if (error) {
363				break;
364			}
365			rpipe->pipe_buffer.out += size;
366			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
367				rpipe->pipe_buffer.out = 0;
368
369			rpipe->pipe_buffer.cnt -= size;
370			nread += size;
371#ifndef PIPE_NODIRECT
372		/*
373		 * Direct copy, bypassing a kernel buffer.
374		 */
375		} else if ((size = rpipe->pipe_map.cnt) &&
376			(rpipe->pipe_state & PIPE_DIRECTW)) {
377			caddr_t va;
378			if (size > uio->uio_resid)
379				size = uio->uio_resid;
380			if ((error = pipelock(rpipe,1)) == 0) {
381				va = (caddr_t) rpipe->pipe_map.kva + rpipe->pipe_map.pos;
382				error = uiomove(va, size, uio);
383				pipeunlock(rpipe);
384			}
385			if (error)
386				break;
387			nread += size;
388			rpipe->pipe_map.pos += size;
389			rpipe->pipe_map.cnt -= size;
390			if (rpipe->pipe_map.cnt == 0) {
391				rpipe->pipe_state &= ~PIPE_DIRECTW;
392				wakeup(rpipe);
393			}
394#endif
395		} else {
396			/*
397			 * detect EOF condition
398			 */
399			if (rpipe->pipe_state & PIPE_EOF) {
400				/* XXX error = ? */
401				break;
402			}
403			/*
404			 * If the "write-side" has been blocked, wake it up now.
405			 */
406			if (rpipe->pipe_state & PIPE_WANTW) {
407				rpipe->pipe_state &= ~PIPE_WANTW;
408				wakeup(rpipe);
409			}
410			if (nread > 0)
411				break;
412			if (rpipe->pipe_state & PIPE_NBIO) {
413				error = EAGAIN;
414				break;
415			}
416
417			/*
418			 * If there is no more to read in the pipe, reset
419			 * its pointers to the beginning.  This improves
420			 * cache hit stats.
421			 */
422
423			if ((error = pipelock(rpipe,1)) == 0) {
424				if (rpipe->pipe_buffer.cnt == 0) {
425					rpipe->pipe_buffer.in = 0;
426					rpipe->pipe_buffer.out = 0;
427				}
428				pipeunlock(rpipe);
429			} else {
430				break;
431			}
432
433			if (rpipe->pipe_state & PIPE_WANTW) {
434				rpipe->pipe_state &= ~PIPE_WANTW;
435				wakeup(rpipe);
436			}
437
438			rpipe->pipe_state |= PIPE_WANTR;
439			if (error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) {
440				break;
441			}
442		}
443	}
444
445	if (error == 0) {
446		int s = splhigh();
447		rpipe->pipe_atime = time;
448		splx(s);
449	}
450
451	--rpipe->pipe_busy;
452	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
453		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
454		wakeup(rpipe);
455	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
456		/*
457		 * If there is no more to read in the pipe, reset
458		 * its pointers to the beginning.  This improves
459		 * cache hit stats.
460		 */
461		if ((error == 0) && (error = pipelock(rpipe,1)) == 0) {
462			if (rpipe->pipe_buffer.cnt == 0) {
463#if 0
464				pipe_mark_pages_clean(rpipe);
465#endif
466				rpipe->pipe_buffer.in = 0;
467				rpipe->pipe_buffer.out = 0;
468			}
469			pipeunlock(rpipe);
470		}
471
472		/*
473		 * If the "write-side" has been blocked, wake it up now.
474		 */
475		if (rpipe->pipe_state & PIPE_WANTW) {
476			rpipe->pipe_state &= ~PIPE_WANTW;
477			wakeup(rpipe);
478		}
479	}
480
481	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
482		pipeselwakeup(rpipe);
483
484	return error;
485}
486
487#ifndef PIPE_NODIRECT
488/*
489 * Map the sending processes' buffer into kernel space and wire it.
490 * This is similar to a physical write operation.
491 */
492static int
493pipe_build_write_buffer(wpipe, uio)
494	struct pipe *wpipe;
495	struct uio *uio;
496{
497	int size;
498	int i;
499	vm_offset_t addr, endaddr, paddr;
500
501	size = uio->uio_iov->iov_len;
502	if (size > wpipe->pipe_buffer.size)
503		size = wpipe->pipe_buffer.size;
504
505	endaddr = round_page(uio->uio_iov->iov_base + size);
506	for(i = 0, addr = trunc_page(uio->uio_iov->iov_base);
507		addr < endaddr;
508		addr += PAGE_SIZE, i+=1) {
509
510		vm_page_t m;
511
512		vm_fault_quick( (caddr_t) addr, VM_PROT_READ);
513		paddr = pmap_kextract(addr);
514		if (!paddr) {
515			int j;
516			for(j=0;j<i;j++)
517				vm_page_unwire(wpipe->pipe_map.ms[j]);
518			return EFAULT;
519		}
520
521		m = PHYS_TO_VM_PAGE(paddr);
522		vm_page_wire(m);
523		wpipe->pipe_map.ms[i] = m;
524	}
525
526/*
527 * set up the control block
528 */
529	wpipe->pipe_map.npages = i;
530	wpipe->pipe_map.pos = ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
531	wpipe->pipe_map.cnt = size;
532
533/*
534 * and map the buffer
535 */
536	if (wpipe->pipe_map.kva == 0) {
537		/*
538		 * We need to allocate space for an extra page because the
539		 * address range might (will) span pages at times.
540		 */
541		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
542			wpipe->pipe_buffer.size + PAGE_SIZE);
543		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
544	}
545	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
546		wpipe->pipe_map.npages);
547
548/*
549 * and update the uio data
550 */
551
552	uio->uio_iov->iov_len -= size;
553	uio->uio_iov->iov_base += size;
554	if (uio->uio_iov->iov_len == 0)
555		uio->uio_iov++;
556	uio->uio_resid -= size;
557	uio->uio_offset += size;
558	return 0;
559}
560
561/*
562 * unmap and unwire the process buffer
563 */
564static void
565pipe_destroy_write_buffer(wpipe)
566struct pipe *wpipe;
567{
568	int i;
569	pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
570
571	if (wpipe->pipe_map.kva) {
572		if (amountpipekva > MAXPIPEKVA) {
573			vm_offset_t kva = wpipe->pipe_map.kva;
574			wpipe->pipe_map.kva = 0;
575			kmem_free(kernel_map, kva,
576				wpipe->pipe_buffer.size + PAGE_SIZE);
577			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
578		}
579	}
580	for (i=0;i<wpipe->pipe_map.npages;i++)
581		vm_page_unwire(wpipe->pipe_map.ms[i]);
582}
583
584/*
585 * In the case of a signal, the writing process might go away.  This
586 * code copies the data into the circular buffer so that the source
587 * pages can be freed without loss of data.
588 */
589static void
590pipe_clone_write_buffer(wpipe)
591struct pipe *wpipe;
592{
593	int size;
594	int pos;
595
596	size = wpipe->pipe_map.cnt;
597	pos = wpipe->pipe_map.pos;
598	bcopy((caddr_t) wpipe->pipe_map.kva+pos,
599			(caddr_t) wpipe->pipe_buffer.buffer,
600			size);
601
602	wpipe->pipe_buffer.in = size;
603	wpipe->pipe_buffer.out = 0;
604	wpipe->pipe_buffer.cnt = size;
605	wpipe->pipe_state &= ~PIPE_DIRECTW;
606
607	pipe_destroy_write_buffer(wpipe);
608}
609
610/*
611 * This implements the pipe buffer write mechanism.  Note that only
612 * a direct write OR a normal pipe write can be pending at any given time.
613 * If there are any characters in the pipe buffer, the direct write will
614 * be deferred until the receiving process grabs all of the bytes from
615 * the pipe buffer.  Then the direct mapping write is set-up.
616 */
617static int
618pipe_direct_write(wpipe, uio)
619	struct pipe *wpipe;
620	struct uio *uio;
621{
622	int error;
623retry:
624	while (wpipe->pipe_state & PIPE_DIRECTW) {
625		if ( wpipe->pipe_state & PIPE_WANTR) {
626			wpipe->pipe_state &= ~PIPE_WANTR;
627			wakeup(wpipe);
628		}
629		wpipe->pipe_state |= PIPE_WANTW;
630		error = tsleep(wpipe,
631				PRIBIO|PCATCH, "pipdww", 0);
632		if (error)
633			goto error1;
634		if (wpipe->pipe_state & PIPE_EOF) {
635			error = EPIPE;
636			goto error1;
637		}
638	}
639	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
640	if (wpipe->pipe_buffer.cnt > 0) {
641		if ( wpipe->pipe_state & PIPE_WANTR) {
642			wpipe->pipe_state &= ~PIPE_WANTR;
643			wakeup(wpipe);
644		}
645
646		wpipe->pipe_state |= PIPE_WANTW;
647		error = tsleep(wpipe,
648				PRIBIO|PCATCH, "pipdwc", 0);
649		if (error)
650			goto error1;
651		if (wpipe->pipe_state & PIPE_EOF) {
652			error = EPIPE;
653			goto error1;
654		}
655		goto retry;
656	}
657
658	wpipe->pipe_state |= PIPE_DIRECTW;
659
660	error = pipe_build_write_buffer(wpipe, uio);
661	if (error) {
662		wpipe->pipe_state &= ~PIPE_DIRECTW;
663		goto error1;
664	}
665
666	error = 0;
667	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
668		if (wpipe->pipe_state & PIPE_EOF) {
669			pipelock(wpipe, 0);
670			pipe_destroy_write_buffer(wpipe);
671			pipeunlock(wpipe);
672			pipeselwakeup(wpipe);
673			error = EPIPE;
674			goto error1;
675		}
676		if (wpipe->pipe_state & PIPE_WANTR) {
677			wpipe->pipe_state &= ~PIPE_WANTR;
678			wakeup(wpipe);
679		}
680		pipeselwakeup(wpipe);
681		error = tsleep(wpipe, PRIBIO|PCATCH, "pipdwt", 0);
682	}
683
684	pipelock(wpipe,0);
685	if (wpipe->pipe_state & PIPE_DIRECTW) {
686		/*
687		 * this bit of trickery substitutes a kernel buffer for
688		 * the process that might be going away.
689		 */
690		pipe_clone_write_buffer(wpipe);
691	} else {
692		pipe_destroy_write_buffer(wpipe);
693	}
694	pipeunlock(wpipe);
695	return error;
696
697error1:
698	wakeup(wpipe);
699	return error;
700}
701#endif
702
703static __inline int
704pipewrite(wpipe, uio, nbio)
705	struct pipe *wpipe;
706	struct uio *uio;
707	int nbio;
708{
709	int error = 0;
710	int orig_resid;
711
712	/*
713	 * detect loss of pipe read side, issue SIGPIPE if lost.
714	 */
715	if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) {
716		return EPIPE;
717	}
718
719	if( wpipe->pipe_buffer.buffer == NULL) {
720		if ((error = pipelock(wpipe,1)) == 0) {
721			pipespace(wpipe);
722			pipeunlock(wpipe);
723		} else {
724			return error;
725		}
726	}
727
728	++wpipe->pipe_busy;
729	orig_resid = uio->uio_resid;
730	while (uio->uio_resid) {
731		int space;
732#ifndef PIPE_NODIRECT
733		/*
734		 * If the transfer is large, we can gain performance if
735		 * we do process-to-process copies directly.
736		 */
737		if ((amountpipekva < LIMITPIPEKVA) &&
738			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
739			error = pipe_direct_write( wpipe, uio);
740			if (error) {
741				break;
742			}
743			continue;
744		}
745#endif
746
747		/*
748		 * Pipe buffered writes cannot be coincidental with
749		 * direct writes.  We wait until the currently executing
750		 * direct write is completed before we start filling the
751		 * pipe buffer.
752		 */
753	retrywrite:
754		while (wpipe->pipe_state & PIPE_DIRECTW) {
755			if (wpipe->pipe_state & PIPE_WANTR) {
756				wpipe->pipe_state &= ~PIPE_WANTR;
757				wakeup(wpipe);
758			}
759			error = tsleep(wpipe,
760					PRIBIO|PCATCH, "pipbww", 0);
761			if (error)
762				break;
763		}
764
765		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
766
767		/* Writes of size <= PIPE_BUF must be atomic. */
768		/* XXX perhaps they need to be contiguous to be atomic? */
769		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
770			space = 0;
771
772		if (space > 0) {
773			int size = wpipe->pipe_buffer.size - wpipe->pipe_buffer.in;
774			if (size > space)
775				size = space;
776			if (size > uio->uio_resid)
777				size = uio->uio_resid;
778			if ((error = pipelock(wpipe,1)) == 0) {
779				/*
780				 * It is possible for a direct write to
781				 * slip in on us... handle it here...
782				 */
783				if (wpipe->pipe_state & PIPE_DIRECTW) {
784					pipeunlock(wpipe);
785					goto retrywrite;
786				}
787				error = uiomove( &wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
788					size, uio);
789				pipeunlock(wpipe);
790			}
791			if (error)
792				break;
793
794			wpipe->pipe_buffer.in += size;
795			if (wpipe->pipe_buffer.in >= wpipe->pipe_buffer.size)
796				wpipe->pipe_buffer.in = 0;
797
798			wpipe->pipe_buffer.cnt += size;
799		} else {
800			/*
801			 * If the "read-side" has been blocked, wake it up now.
802			 */
803			if (wpipe->pipe_state & PIPE_WANTR) {
804				wpipe->pipe_state &= ~PIPE_WANTR;
805				wakeup(wpipe);
806			}
807
808			/*
809			 * don't block on non-blocking I/O
810			 */
811			if (nbio) {
812				error = EAGAIN;
813				break;
814			}
815
816			/*
817			 * We have no more space and have something to offer,
818			 * wake up selects.
819			 */
820			pipeselwakeup(wpipe);
821
822			wpipe->pipe_state |= PIPE_WANTW;
823			if (error = tsleep(wpipe, (PRIBIO+1)|PCATCH, "pipewr", 0)) {
824				break;
825			}
826			/*
827			 * If read side wants to go away, we just issue a signal
828			 * to ourselves.
829			 */
830			if (wpipe->pipe_state & PIPE_EOF) {
831				error = EPIPE;
832				break;
833			}
834		}
835	}
836
837	--wpipe->pipe_busy;
838	if ((wpipe->pipe_busy == 0) &&
839		(wpipe->pipe_state & PIPE_WANT)) {
840		wpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTR);
841		wakeup(wpipe);
842	} else if (wpipe->pipe_buffer.cnt > 0) {
843		/*
844		 * If we have put any characters in the buffer, we wake up
845		 * the reader.
846		 */
847		if (wpipe->pipe_state & PIPE_WANTR) {
848			wpipe->pipe_state &= ~PIPE_WANTR;
849			wakeup(wpipe);
850		}
851	}
852
853	/*
854	 * Don't return EPIPE if I/O was successful
855	 */
856	if ((wpipe->pipe_buffer.cnt == 0) &&
857		(uio->uio_resid == 0) &&
858		(error == EPIPE))
859		error = 0;
860
861	if (error == 0) {
862		int s = splhigh();
863		wpipe->pipe_mtime = time;
864		splx(s);
865	}
866	/*
867	 * We have something to offer,
868	 * wake up select.
869	 */
870	if (wpipe->pipe_buffer.cnt)
871		pipeselwakeup(wpipe);
872
873	return error;
874}
875
876/* ARGSUSED */
877static int
878pipe_write(fp, uio, cred)
879	struct file *fp;
880	struct uio *uio;
881	struct ucred *cred;
882{
883	struct pipe *rpipe = (struct pipe *) fp->f_data;
884	struct pipe *wpipe = rpipe->pipe_peer;
885	return pipewrite(wpipe, uio, (rpipe->pipe_state & PIPE_NBIO)?1:0);
886}
887
888/*
889 * we implement a very minimal set of ioctls for compatibility with sockets.
890 */
891int
892pipe_ioctl(fp, cmd, data, p)
893	struct file *fp;
894	int cmd;
895	register caddr_t data;
896	struct proc *p;
897{
898	register struct pipe *mpipe = (struct pipe *)fp->f_data;
899
900	switch (cmd) {
901
902	case FIONBIO:
903		if (*(int *)data)
904			mpipe->pipe_state |= PIPE_NBIO;
905		else
906			mpipe->pipe_state &= ~PIPE_NBIO;
907		return (0);
908
909	case FIOASYNC:
910		if (*(int *)data) {
911			mpipe->pipe_state |= PIPE_ASYNC;
912		} else {
913			mpipe->pipe_state &= ~PIPE_ASYNC;
914		}
915		return (0);
916
917	case FIONREAD:
918		if (mpipe->pipe_state & PIPE_DIRECTW)
919			*(int *)data = mpipe->pipe_map.cnt;
920		else
921			*(int *)data = mpipe->pipe_buffer.cnt;
922		return (0);
923
924	case SIOCSPGRP:
925		mpipe->pipe_pgid = *(int *)data;
926		return (0);
927
928	case SIOCGPGRP:
929		*(int *)data = mpipe->pipe_pgid;
930		return (0);
931
932	}
933	return ENOSYS;
934}
935
936int
937pipe_select(fp, which, p)
938	struct file *fp;
939	int which;
940	struct proc *p;
941{
942	register struct pipe *rpipe = (struct pipe *)fp->f_data;
943	struct pipe *wpipe;
944
945	wpipe = rpipe->pipe_peer;
946	switch (which) {
947
948	case FREAD:
949		if ( (rpipe->pipe_state & PIPE_DIRECTW) ||
950			(rpipe->pipe_buffer.cnt > 0) ||
951			(rpipe->pipe_state & PIPE_EOF)) {
952			return (1);
953		}
954		selrecord(p, &rpipe->pipe_sel);
955		rpipe->pipe_state |= PIPE_SEL;
956		break;
957
958	case FWRITE:
959		if ((wpipe == NULL) ||
960			(wpipe->pipe_state & PIPE_EOF) ||
961			(((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
962			 (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)) {
963			return (1);
964		}
965		selrecord(p, &wpipe->pipe_sel);
966		wpipe->pipe_state |= PIPE_SEL;
967		break;
968
969	case 0:
970		if ((rpipe->pipe_state & PIPE_EOF) ||
971			(wpipe == NULL) ||
972			(wpipe->pipe_state & PIPE_EOF)) {
973			return (1);
974		}
975
976		selrecord(p, &rpipe->pipe_sel);
977		rpipe->pipe_state |= PIPE_SEL;
978		break;
979	}
980	return (0);
981}
982
983int
984pipe_stat(pipe, ub)
985	register struct pipe *pipe;
986	register struct stat *ub;
987{
988	bzero((caddr_t)ub, sizeof (*ub));
989	ub->st_mode = S_IFSOCK;
990	ub->st_blksize = pipe->pipe_buffer.size;
991	ub->st_size = pipe->pipe_buffer.cnt;
992	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
993	TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
994	TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
995	TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
996	return 0;
997}
998
999/* ARGSUSED */
1000static int
1001pipe_close(fp, p)
1002	struct file *fp;
1003	struct proc *p;
1004{
1005	struct pipe *cpipe = (struct pipe *)fp->f_data;
1006
1007	pipeclose(cpipe);
1008	fp->f_data = NULL;
1009	return 0;
1010}
1011
1012/*
1013 * shutdown the pipe
1014 */
1015static void
1016pipeclose(cpipe)
1017	struct pipe *cpipe;
1018{
1019	struct pipe *ppipe;
1020	if (cpipe) {
1021
1022		pipeselwakeup(cpipe);
1023
1024		/*
1025		 * If the other side is blocked, wake it up saying that
1026		 * we want to close it down.
1027		 */
1028		while (cpipe->pipe_busy) {
1029			wakeup(cpipe);
1030			cpipe->pipe_state |= PIPE_WANT|PIPE_EOF;
1031			tsleep(cpipe, PRIBIO, "pipecl", 0);
1032		}
1033
1034		/*
1035		 * Disconnect from peer
1036		 */
1037		if (ppipe = cpipe->pipe_peer) {
1038			pipeselwakeup(ppipe);
1039
1040			ppipe->pipe_state |= PIPE_EOF;
1041			wakeup(ppipe);
1042			ppipe->pipe_peer = NULL;
1043		}
1044
1045		/*
1046		 * free resources
1047		 */
1048		if (cpipe->pipe_buffer.buffer) {
1049			amountpipekva -= cpipe->pipe_buffer.size;
1050			kmem_free(kernel_map,
1051				(vm_offset_t)cpipe->pipe_buffer.buffer,
1052				cpipe->pipe_buffer.size);
1053		}
1054#ifndef PIPE_NODIRECT
1055		if (cpipe->pipe_map.kva) {
1056			amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1057			kmem_free(kernel_map,
1058				cpipe->pipe_map.kva,
1059				cpipe->pipe_buffer.size + PAGE_SIZE);
1060		}
1061#endif
1062		free(cpipe, M_TEMP);
1063	}
1064}
1065#endif
1066