sys_pipe.c revision 132579
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 */
19
20/*
21 * This file contains a high-performance replacement for the socket-based
22 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
23 * all features of sockets, but does do everything that pipes normally
24 * do.
25 */
26
27/*
28 * This code has two modes of operation, a small write mode and a large
29 * write mode.  The small write mode acts like conventional pipes with
30 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
31 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
32 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
33 * the receiving process can copy it directly from the pages in the sending
34 * process.
35 *
36 * If the sending process receives a signal, it is possible that it will
37 * go away, and certainly its address space can change, because control
38 * is returned back to the user-mode side.  In that case, the pipe code
39 * arranges to copy the buffer supplied by the user process, to a pageable
40 * kernel buffer, and the receiving process will grab the data from the
41 * pageable kernel buffer.  Since signals don't happen all that often,
42 * the copy operation is normally eliminated.
43 *
44 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
45 * happen for small transfers so that the system will not spend all of
46 * its time context switching.
47 *
48 * In order to limit the resource use of pipes, two sysctls exist:
49 *
50 * kern.ipc.maxpipekva - This is a hard limit on the amount of pageable
51 * address space available to us in pipe_map.  Whenever the amount in use
52 * exceeds half of this value, all new pipes will be created with size
53 * SMALL_PIPE_SIZE, rather than PIPE_SIZE.  Big pipe creation will be limited
54 * as well.  This value is loader tunable only.
55 *
56 * These values are autotuned in subr_param.c.
57 *
58 * Memory usage may be monitored through the sysctls
59 * kern.ipc.pipes, kern.ipc.pipekva and kern.ipc.pipekvawired.
60 *
61 */
62
63#include <sys/cdefs.h>
64__FBSDID("$FreeBSD: head/sys/kern/sys_pipe.c 132579 2004-07-23 14:11:04Z rwatson $");
65
66#include "opt_mac.h"
67
68#include <sys/param.h>
69#include <sys/systm.h>
70#include <sys/fcntl.h>
71#include <sys/file.h>
72#include <sys/filedesc.h>
73#include <sys/filio.h>
74#include <sys/kernel.h>
75#include <sys/lock.h>
76#include <sys/mac.h>
77#include <sys/mutex.h>
78#include <sys/ttycom.h>
79#include <sys/stat.h>
80#include <sys/malloc.h>
81#include <sys/poll.h>
82#include <sys/selinfo.h>
83#include <sys/signalvar.h>
84#include <sys/sysctl.h>
85#include <sys/sysproto.h>
86#include <sys/pipe.h>
87#include <sys/proc.h>
88#include <sys/vnode.h>
89#include <sys/uio.h>
90#include <sys/event.h>
91
92#include <vm/vm.h>
93#include <vm/vm_param.h>
94#include <vm/vm_object.h>
95#include <vm/vm_kern.h>
96#include <vm/vm_extern.h>
97#include <vm/pmap.h>
98#include <vm/vm_map.h>
99#include <vm/vm_page.h>
100#include <vm/uma.h>
101
102/*
103 * Use this define if you want to disable *fancy* VM things.  Expect an
104 * approx 30% decrease in transfer rate.  This could be useful for
105 * NetBSD or OpenBSD.
106 */
107/* #define PIPE_NODIRECT */
108
109/*
110 * interfaces to the outside world
111 */
112static fo_rdwr_t	pipe_read;
113static fo_rdwr_t	pipe_write;
114static fo_ioctl_t	pipe_ioctl;
115static fo_poll_t	pipe_poll;
116static fo_kqfilter_t	pipe_kqfilter;
117static fo_stat_t	pipe_stat;
118static fo_close_t	pipe_close;
119
120static struct fileops pipeops = {
121	.fo_read = pipe_read,
122	.fo_write = pipe_write,
123	.fo_ioctl = pipe_ioctl,
124	.fo_poll = pipe_poll,
125	.fo_kqfilter = pipe_kqfilter,
126	.fo_stat = pipe_stat,
127	.fo_close = pipe_close,
128	.fo_flags = DFLAG_PASSABLE
129};
130
131static void	filt_pipedetach(struct knote *kn);
132static int	filt_piperead(struct knote *kn, long hint);
133static int	filt_pipewrite(struct knote *kn, long hint);
134
135static struct filterops pipe_rfiltops =
136	{ 1, NULL, filt_pipedetach, filt_piperead };
137static struct filterops pipe_wfiltops =
138	{ 1, NULL, filt_pipedetach, filt_pipewrite };
139
140/*
141 * Default pipe buffer size(s), this can be kind-of large now because pipe
142 * space is pageable.  The pipe code will try to maintain locality of
143 * reference for performance reasons, so small amounts of outstanding I/O
144 * will not wipe the cache.
145 */
146#define MINPIPESIZE (PIPE_SIZE/3)
147#define MAXPIPESIZE (2*PIPE_SIZE/3)
148
149/*
150 * Limit the number of "big" pipes
151 */
152#define LIMITBIGPIPES	32
153static int nbigpipe;
154
155static int amountpipes;
156static int amountpipekva;
157
158SYSCTL_DECL(_kern_ipc);
159
160SYSCTL_INT(_kern_ipc, OID_AUTO, maxpipekva, CTLFLAG_RDTUN,
161	   &maxpipekva, 0, "Pipe KVA limit");
162SYSCTL_INT(_kern_ipc, OID_AUTO, pipes, CTLFLAG_RD,
163	   &amountpipes, 0, "Current # of pipes");
164SYSCTL_INT(_kern_ipc, OID_AUTO, bigpipes, CTLFLAG_RD,
165	   &nbigpipe, 0, "Current # of big pipes");
166SYSCTL_INT(_kern_ipc, OID_AUTO, pipekva, CTLFLAG_RD,
167	   &amountpipekva, 0, "Pipe KVA usage");
168
169static void pipeinit(void *dummy __unused);
170static void pipeclose(struct pipe *cpipe);
171static void pipe_free_kmem(struct pipe *cpipe);
172static int pipe_create(struct pipe *pipe);
173static __inline int pipelock(struct pipe *cpipe, int catch);
174static __inline void pipeunlock(struct pipe *cpipe);
175static __inline void pipeselwakeup(struct pipe *cpipe);
176#ifndef PIPE_NODIRECT
177static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio);
178static void pipe_destroy_write_buffer(struct pipe *wpipe);
179static int pipe_direct_write(struct pipe *wpipe, struct uio *uio);
180static void pipe_clone_write_buffer(struct pipe *wpipe);
181#endif
182static int pipespace(struct pipe *cpipe, int size);
183static int pipespace_new(struct pipe *cpipe, int size);
184
185static void	pipe_zone_ctor(void *mem, int size, void *arg);
186static void	pipe_zone_dtor(void *mem, int size, void *arg);
187static void	pipe_zone_init(void *mem, int size);
188static void	pipe_zone_fini(void *mem, int size);
189
190static uma_zone_t pipe_zone;
191
192SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
193
194static void
195pipeinit(void *dummy __unused)
196{
197
198	pipe_zone = uma_zcreate("PIPE", sizeof(struct pipepair),
199	    pipe_zone_ctor, pipe_zone_dtor, pipe_zone_init, pipe_zone_fini,
200	    UMA_ALIGN_PTR, 0);
201	KASSERT(pipe_zone != NULL, ("pipe_zone not initialized"));
202}
203
204static void
205pipe_zone_ctor(void *mem, int size, void *arg)
206{
207	struct pipepair *pp;
208	struct pipe *rpipe, *wpipe;
209
210	KASSERT(size == sizeof(*pp), ("pipe_zone_ctor: wrong size"));
211
212	pp = (struct pipepair *)mem;
213
214	/*
215	 * We zero both pipe endpoints to make sure all the kmem pointers
216	 * are NULL, flag fields are zero'd, etc.  We timestamp both
217	 * endpoints with the same time.
218	 */
219	rpipe = &pp->pp_rpipe;
220	bzero(rpipe, sizeof(*rpipe));
221	vfs_timestamp(&rpipe->pipe_ctime);
222	rpipe->pipe_atime = rpipe->pipe_mtime = rpipe->pipe_ctime;
223
224	wpipe = &pp->pp_wpipe;
225	bzero(wpipe, sizeof(*wpipe));
226	wpipe->pipe_ctime = rpipe->pipe_ctime;
227	wpipe->pipe_atime = wpipe->pipe_mtime = rpipe->pipe_ctime;
228
229	rpipe->pipe_peer = wpipe;
230	rpipe->pipe_pair = pp;
231	wpipe->pipe_peer = rpipe;
232	wpipe->pipe_pair = pp;
233
234	/*
235	 * Mark both endpoints as present; they will later get free'd
236	 * one at a time.  When both are free'd, then the whole pair
237	 * is released.
238	 */
239	rpipe->pipe_present = 1;
240	wpipe->pipe_present = 1;
241
242	/*
243	 * Eventually, the MAC Framework may initialize the label
244	 * in ctor or init, but for now we do it elswhere to avoid
245	 * blocking in ctor or init.
246	 */
247	pp->pp_label = NULL;
248
249	atomic_add_int(&amountpipes, 2);
250}
251
252static void
253pipe_zone_dtor(void *mem, int size, void *arg)
254{
255	struct pipepair *pp;
256
257	KASSERT(size == sizeof(*pp), ("pipe_zone_dtor: wrong size"));
258
259	pp = (struct pipepair *)mem;
260
261	atomic_subtract_int(&amountpipes, 2);
262}
263
264static void
265pipe_zone_init(void *mem, int size)
266{
267	struct pipepair *pp;
268
269	KASSERT(size == sizeof(*pp), ("pipe_zone_init: wrong size"));
270
271	pp = (struct pipepair *)mem;
272
273	mtx_init(&pp->pp_mtx, "pipe mutex", NULL, MTX_DEF | MTX_RECURSE);
274}
275
276static void
277pipe_zone_fini(void *mem, int size)
278{
279	struct pipepair *pp;
280
281	KASSERT(size == sizeof(*pp), ("pipe_zone_fini: wrong size"));
282
283	pp = (struct pipepair *)mem;
284
285	mtx_destroy(&pp->pp_mtx);
286}
287
288/*
289 * The pipe system call for the DTYPE_PIPE type of pipes.  If we fail,
290 * let the zone pick up the pieces via pipeclose().
291 */
292
293/* ARGSUSED */
294int
295pipe(td, uap)
296	struct thread *td;
297	struct pipe_args /* {
298		int	dummy;
299	} */ *uap;
300{
301	struct filedesc *fdp = td->td_proc->p_fd;
302	struct file *rf, *wf;
303	struct pipepair *pp;
304	struct pipe *rpipe, *wpipe;
305	int fd, error;
306
307	pp = uma_zalloc(pipe_zone, M_WAITOK);
308#ifdef MAC
309	/*
310	 * The MAC label is shared between the connected endpoints.  As a
311	 * result mac_init_pipe() and mac_create_pipe() are called once
312	 * for the pair, and not on the endpoints.
313	 */
314	mac_init_pipe(pp);
315	mac_create_pipe(td->td_ucred, pp);
316#endif
317	rpipe = &pp->pp_rpipe;
318	wpipe = &pp->pp_wpipe;
319
320	if (pipe_create(rpipe) || pipe_create(wpipe)) {
321		pipeclose(rpipe);
322		pipeclose(wpipe);
323		return (ENFILE);
324	}
325
326	rpipe->pipe_state |= PIPE_DIRECTOK;
327	wpipe->pipe_state |= PIPE_DIRECTOK;
328
329	error = falloc(td, &rf, &fd);
330	if (error) {
331		pipeclose(rpipe);
332		pipeclose(wpipe);
333		return (error);
334	}
335	/* An extra reference on `rf' has been held for us by falloc(). */
336	td->td_retval[0] = fd;
337
338	/*
339	 * Warning: once we've gotten past allocation of the fd for the
340	 * read-side, we can only drop the read side via fdrop() in order
341	 * to avoid races against processes which manage to dup() the read
342	 * side while we are blocked trying to allocate the write side.
343	 */
344	FILE_LOCK(rf);
345	rf->f_flag = FREAD | FWRITE;
346	rf->f_type = DTYPE_PIPE;
347	rf->f_data = rpipe;
348	rf->f_ops = &pipeops;
349	FILE_UNLOCK(rf);
350	error = falloc(td, &wf, &fd);
351	if (error) {
352		FILEDESC_LOCK(fdp);
353		if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
354			fdp->fd_ofiles[td->td_retval[0]] = NULL;
355			fdunused(fdp, td->td_retval[0]);
356			FILEDESC_UNLOCK(fdp);
357			fdrop(rf, td);
358		} else {
359			FILEDESC_UNLOCK(fdp);
360		}
361		fdrop(rf, td);
362		/* rpipe has been closed by fdrop(). */
363		pipeclose(wpipe);
364		return (error);
365	}
366	/* An extra reference on `wf' has been held for us by falloc(). */
367	FILE_LOCK(wf);
368	wf->f_flag = FREAD | FWRITE;
369	wf->f_type = DTYPE_PIPE;
370	wf->f_data = wpipe;
371	wf->f_ops = &pipeops;
372	FILE_UNLOCK(wf);
373	fdrop(wf, td);
374	td->td_retval[1] = fd;
375	fdrop(rf, td);
376
377	return (0);
378}
379
380/*
381 * Allocate kva for pipe circular buffer, the space is pageable
382 * This routine will 'realloc' the size of a pipe safely, if it fails
383 * it will retain the old buffer.
384 * If it fails it will return ENOMEM.
385 */
386static int
387pipespace_new(cpipe, size)
388	struct pipe *cpipe;
389	int size;
390{
391	caddr_t buffer;
392	int error;
393	static int curfail = 0;
394	static struct timeval lastfail;
395
396	KASSERT(!mtx_owned(PIPE_MTX(cpipe)), ("pipespace: pipe mutex locked"));
397
398	size = round_page(size);
399	/*
400	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
401	 */
402	buffer = (caddr_t) vm_map_min(pipe_map);
403
404	/*
405	 * The map entry is, by default, pageable.
406	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
407	 */
408	error = vm_map_find(pipe_map, NULL, 0,
409		(vm_offset_t *) &buffer, size, 1,
410		VM_PROT_ALL, VM_PROT_ALL, 0);
411	if (error != KERN_SUCCESS) {
412		if (ppsratecheck(&lastfail, &curfail, 1))
413			printf("kern.ipc.maxpipekva exceeded; see tuning(7)\n");
414		return (ENOMEM);
415	}
416
417	/* free old resources if we're resizing */
418	pipe_free_kmem(cpipe);
419	cpipe->pipe_buffer.buffer = buffer;
420	cpipe->pipe_buffer.size = size;
421	cpipe->pipe_buffer.in = 0;
422	cpipe->pipe_buffer.out = 0;
423	cpipe->pipe_buffer.cnt = 0;
424	atomic_add_int(&amountpipekva, cpipe->pipe_buffer.size);
425	return (0);
426}
427
428/*
429 * Wrapper for pipespace_new() that performs locking assertions.
430 */
431static int
432pipespace(cpipe, size)
433	struct pipe *cpipe;
434	int size;
435{
436
437	/*
438	 * XXXRW: Seems like we should really assert PIPE_LOCKFL on the
439	 * pipe_state here.
440	 */
441
442	return (pipespace_new(cpipe, size));
443}
444
445/*
446 * lock a pipe for I/O, blocking other access
447 */
448static __inline int
449pipelock(cpipe, catch)
450	struct pipe *cpipe;
451	int catch;
452{
453	int error;
454
455	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
456	while (cpipe->pipe_state & PIPE_LOCKFL) {
457		cpipe->pipe_state |= PIPE_LWANT;
458		error = msleep(cpipe, PIPE_MTX(cpipe),
459		    catch ? (PRIBIO | PCATCH) : PRIBIO,
460		    "pipelk", 0);
461		if (error != 0)
462			return (error);
463	}
464	cpipe->pipe_state |= PIPE_LOCKFL;
465	return (0);
466}
467
468/*
469 * unlock a pipe I/O lock
470 */
471static __inline void
472pipeunlock(cpipe)
473	struct pipe *cpipe;
474{
475
476	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
477	cpipe->pipe_state &= ~PIPE_LOCKFL;
478	if (cpipe->pipe_state & PIPE_LWANT) {
479		cpipe->pipe_state &= ~PIPE_LWANT;
480		wakeup(cpipe);
481	}
482}
483
484static __inline void
485pipeselwakeup(cpipe)
486	struct pipe *cpipe;
487{
488
489	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
490	if (cpipe->pipe_state & PIPE_SEL) {
491		cpipe->pipe_state &= ~PIPE_SEL;
492		selwakeuppri(&cpipe->pipe_sel, PSOCK);
493	}
494	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
495		pgsigio(&cpipe->pipe_sigio, SIGIO, 0);
496	KNOTE(&cpipe->pipe_sel.si_note, 0);
497}
498
499/*
500 * Initialize and allocate VM and memory for pipe.  The structure
501 * will start out zero'd from the ctor, so we just manage the kmem.
502 */
503static int
504pipe_create(pipe)
505	struct pipe *pipe;
506{
507	int error;
508
509	/*
510	 * Reduce to 1/4th pipe size if we're over our global max.
511	 */
512	if (amountpipekva > maxpipekva / 2)
513		error = pipespace(pipe, SMALL_PIPE_SIZE);
514	else
515		error = pipespace(pipe, PIPE_SIZE);
516	return (error);
517}
518
519/* ARGSUSED */
520static int
521pipe_read(fp, uio, active_cred, flags, td)
522	struct file *fp;
523	struct uio *uio;
524	struct ucred *active_cred;
525	struct thread *td;
526	int flags;
527{
528	struct pipe *rpipe = fp->f_data;
529	int error;
530	int nread = 0;
531	u_int size;
532
533	PIPE_LOCK(rpipe);
534	++rpipe->pipe_busy;
535	error = pipelock(rpipe, 1);
536	if (error)
537		goto unlocked_error;
538
539#ifdef MAC
540	error = mac_check_pipe_read(active_cred, rpipe->pipe_pair);
541	if (error)
542		goto locked_error;
543#endif
544
545	while (uio->uio_resid) {
546		/*
547		 * normal pipe buffer receive
548		 */
549		if (rpipe->pipe_buffer.cnt > 0) {
550			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
551			if (size > rpipe->pipe_buffer.cnt)
552				size = rpipe->pipe_buffer.cnt;
553			if (size > (u_int) uio->uio_resid)
554				size = (u_int) uio->uio_resid;
555
556			PIPE_UNLOCK(rpipe);
557			error = uiomove(
558			    &rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
559			    size, uio);
560			PIPE_LOCK(rpipe);
561			if (error)
562				break;
563
564			rpipe->pipe_buffer.out += size;
565			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
566				rpipe->pipe_buffer.out = 0;
567
568			rpipe->pipe_buffer.cnt -= size;
569
570			/*
571			 * If there is no more to read in the pipe, reset
572			 * its pointers to the beginning.  This improves
573			 * cache hit stats.
574			 */
575			if (rpipe->pipe_buffer.cnt == 0) {
576				rpipe->pipe_buffer.in = 0;
577				rpipe->pipe_buffer.out = 0;
578			}
579			nread += size;
580#ifndef PIPE_NODIRECT
581		/*
582		 * Direct copy, bypassing a kernel buffer.
583		 */
584		} else if ((size = rpipe->pipe_map.cnt) &&
585			   (rpipe->pipe_state & PIPE_DIRECTW)) {
586			if (size > (u_int) uio->uio_resid)
587				size = (u_int) uio->uio_resid;
588
589			PIPE_UNLOCK(rpipe);
590			error = uiomove_fromphys(rpipe->pipe_map.ms,
591			    rpipe->pipe_map.pos, size, uio);
592			PIPE_LOCK(rpipe);
593			if (error)
594				break;
595			nread += size;
596			rpipe->pipe_map.pos += size;
597			rpipe->pipe_map.cnt -= size;
598			if (rpipe->pipe_map.cnt == 0) {
599				rpipe->pipe_state &= ~PIPE_DIRECTW;
600				wakeup(rpipe);
601			}
602#endif
603		} else {
604			/*
605			 * detect EOF condition
606			 * read returns 0 on EOF, no need to set error
607			 */
608			if (rpipe->pipe_state & PIPE_EOF)
609				break;
610
611			/*
612			 * If the "write-side" has been blocked, wake it up now.
613			 */
614			if (rpipe->pipe_state & PIPE_WANTW) {
615				rpipe->pipe_state &= ~PIPE_WANTW;
616				wakeup(rpipe);
617			}
618
619			/*
620			 * Break if some data was read.
621			 */
622			if (nread > 0)
623				break;
624
625			/*
626			 * Unlock the pipe buffer for our remaining processing.
627			 * We will either break out with an error or we will
628			 * sleep and relock to loop.
629			 */
630			pipeunlock(rpipe);
631
632			/*
633			 * Handle non-blocking mode operation or
634			 * wait for more data.
635			 */
636			if (fp->f_flag & FNONBLOCK) {
637				error = EAGAIN;
638			} else {
639				rpipe->pipe_state |= PIPE_WANTR;
640				if ((error = msleep(rpipe, PIPE_MTX(rpipe),
641				    PRIBIO | PCATCH,
642				    "piperd", 0)) == 0)
643					error = pipelock(rpipe, 1);
644			}
645			if (error)
646				goto unlocked_error;
647		}
648	}
649#ifdef MAC
650locked_error:
651#endif
652	pipeunlock(rpipe);
653
654	/* XXX: should probably do this before getting any locks. */
655	if (error == 0)
656		vfs_timestamp(&rpipe->pipe_atime);
657unlocked_error:
658	--rpipe->pipe_busy;
659
660	/*
661	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
662	 */
663	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
664		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
665		wakeup(rpipe);
666	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
667		/*
668		 * Handle write blocking hysteresis.
669		 */
670		if (rpipe->pipe_state & PIPE_WANTW) {
671			rpipe->pipe_state &= ~PIPE_WANTW;
672			wakeup(rpipe);
673		}
674	}
675
676	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
677		pipeselwakeup(rpipe);
678
679	PIPE_UNLOCK(rpipe);
680	return (error);
681}
682
683#ifndef PIPE_NODIRECT
684/*
685 * Map the sending processes' buffer into kernel space and wire it.
686 * This is similar to a physical write operation.
687 */
688static int
689pipe_build_write_buffer(wpipe, uio)
690	struct pipe *wpipe;
691	struct uio *uio;
692{
693	pmap_t pmap;
694	u_int size;
695	int i, j;
696	vm_offset_t addr, endaddr;
697
698	PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
699
700	size = (u_int) uio->uio_iov->iov_len;
701	if (size > wpipe->pipe_buffer.size)
702		size = wpipe->pipe_buffer.size;
703
704	pmap = vmspace_pmap(curproc->p_vmspace);
705	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
706	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
707	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
708		/*
709		 * vm_fault_quick() can sleep.  Consequently,
710		 * vm_page_lock_queue() and vm_page_unlock_queue()
711		 * should not be performed outside of this loop.
712		 */
713	race:
714		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0) {
715			vm_page_lock_queues();
716			for (j = 0; j < i; j++)
717				vm_page_unhold(wpipe->pipe_map.ms[j]);
718			vm_page_unlock_queues();
719			return (EFAULT);
720		}
721		wpipe->pipe_map.ms[i] = pmap_extract_and_hold(pmap, addr,
722		    VM_PROT_READ);
723		if (wpipe->pipe_map.ms[i] == NULL)
724			goto race;
725	}
726
727/*
728 * set up the control block
729 */
730	wpipe->pipe_map.npages = i;
731	wpipe->pipe_map.pos =
732	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
733	wpipe->pipe_map.cnt = size;
734
735/*
736 * and update the uio data
737 */
738
739	uio->uio_iov->iov_len -= size;
740	uio->uio_iov->iov_base = (char *)uio->uio_iov->iov_base + size;
741	if (uio->uio_iov->iov_len == 0)
742		uio->uio_iov++;
743	uio->uio_resid -= size;
744	uio->uio_offset += size;
745	return (0);
746}
747
748/*
749 * unmap and unwire the process buffer
750 */
751static void
752pipe_destroy_write_buffer(wpipe)
753	struct pipe *wpipe;
754{
755	int i;
756
757	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
758	vm_page_lock_queues();
759	for (i = 0; i < wpipe->pipe_map.npages; i++) {
760		vm_page_unhold(wpipe->pipe_map.ms[i]);
761	}
762	vm_page_unlock_queues();
763	wpipe->pipe_map.npages = 0;
764}
765
766/*
767 * In the case of a signal, the writing process might go away.  This
768 * code copies the data into the circular buffer so that the source
769 * pages can be freed without loss of data.
770 */
771static void
772pipe_clone_write_buffer(wpipe)
773	struct pipe *wpipe;
774{
775	struct uio uio;
776	struct iovec iov;
777	int size;
778	int pos;
779
780	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
781	size = wpipe->pipe_map.cnt;
782	pos = wpipe->pipe_map.pos;
783
784	wpipe->pipe_buffer.in = size;
785	wpipe->pipe_buffer.out = 0;
786	wpipe->pipe_buffer.cnt = size;
787	wpipe->pipe_state &= ~PIPE_DIRECTW;
788
789	PIPE_UNLOCK(wpipe);
790	iov.iov_base = wpipe->pipe_buffer.buffer;
791	iov.iov_len = size;
792	uio.uio_iov = &iov;
793	uio.uio_iovcnt = 1;
794	uio.uio_offset = 0;
795	uio.uio_resid = size;
796	uio.uio_segflg = UIO_SYSSPACE;
797	uio.uio_rw = UIO_READ;
798	uio.uio_td = curthread;
799	uiomove_fromphys(wpipe->pipe_map.ms, pos, size, &uio);
800	PIPE_LOCK(wpipe);
801	pipe_destroy_write_buffer(wpipe);
802}
803
804/*
805 * This implements the pipe buffer write mechanism.  Note that only
806 * a direct write OR a normal pipe write can be pending at any given time.
807 * If there are any characters in the pipe buffer, the direct write will
808 * be deferred until the receiving process grabs all of the bytes from
809 * the pipe buffer.  Then the direct mapping write is set-up.
810 */
811static int
812pipe_direct_write(wpipe, uio)
813	struct pipe *wpipe;
814	struct uio *uio;
815{
816	int error;
817
818retry:
819	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
820	while (wpipe->pipe_state & PIPE_DIRECTW) {
821		if (wpipe->pipe_state & PIPE_WANTR) {
822			wpipe->pipe_state &= ~PIPE_WANTR;
823			wakeup(wpipe);
824		}
825		wpipe->pipe_state |= PIPE_WANTW;
826		error = msleep(wpipe, PIPE_MTX(wpipe),
827		    PRIBIO | PCATCH, "pipdww", 0);
828		if (error)
829			goto error1;
830		if (wpipe->pipe_state & PIPE_EOF) {
831			error = EPIPE;
832			goto error1;
833		}
834	}
835	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
836	if (wpipe->pipe_buffer.cnt > 0) {
837		if (wpipe->pipe_state & PIPE_WANTR) {
838			wpipe->pipe_state &= ~PIPE_WANTR;
839			wakeup(wpipe);
840		}
841
842		wpipe->pipe_state |= PIPE_WANTW;
843		error = msleep(wpipe, PIPE_MTX(wpipe),
844		    PRIBIO | PCATCH, "pipdwc", 0);
845		if (error)
846			goto error1;
847		if (wpipe->pipe_state & PIPE_EOF) {
848			error = EPIPE;
849			goto error1;
850		}
851		goto retry;
852	}
853
854	wpipe->pipe_state |= PIPE_DIRECTW;
855
856	pipelock(wpipe, 0);
857	if (wpipe->pipe_state & PIPE_EOF) {
858		error = EPIPE;
859		goto error2;
860	}
861	PIPE_UNLOCK(wpipe);
862	error = pipe_build_write_buffer(wpipe, uio);
863	PIPE_LOCK(wpipe);
864	pipeunlock(wpipe);
865	if (error) {
866		wpipe->pipe_state &= ~PIPE_DIRECTW;
867		goto error1;
868	}
869
870	error = 0;
871	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
872		if (wpipe->pipe_state & PIPE_EOF) {
873			pipelock(wpipe, 0);
874			pipe_destroy_write_buffer(wpipe);
875			pipeselwakeup(wpipe);
876			pipeunlock(wpipe);
877			error = EPIPE;
878			goto error1;
879		}
880		if (wpipe->pipe_state & PIPE_WANTR) {
881			wpipe->pipe_state &= ~PIPE_WANTR;
882			wakeup(wpipe);
883		}
884		pipeselwakeup(wpipe);
885		error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
886		    "pipdwt", 0);
887	}
888
889	pipelock(wpipe,0);
890	if (wpipe->pipe_state & PIPE_EOF)
891		error = EPIPE;
892	if (wpipe->pipe_state & PIPE_DIRECTW) {
893		/*
894		 * this bit of trickery substitutes a kernel buffer for
895		 * the process that might be going away.
896		 */
897		pipe_clone_write_buffer(wpipe);
898	} else {
899		pipe_destroy_write_buffer(wpipe);
900	}
901error2:
902	pipeunlock(wpipe);
903	return (error);
904
905error1:
906	wakeup(wpipe);
907	return (error);
908}
909#endif
910
911static int
912pipe_write(fp, uio, active_cred, flags, td)
913	struct file *fp;
914	struct uio *uio;
915	struct ucred *active_cred;
916	struct thread *td;
917	int flags;
918{
919	int error = 0;
920	int orig_resid;
921	struct pipe *wpipe, *rpipe;
922
923	rpipe = fp->f_data;
924	wpipe = rpipe->pipe_peer;
925
926	PIPE_LOCK(rpipe);
927	/*
928	 * detect loss of pipe read side, issue SIGPIPE if lost.
929	 */
930	if ((!wpipe->pipe_present) || (wpipe->pipe_state & PIPE_EOF)) {
931		PIPE_UNLOCK(rpipe);
932		return (EPIPE);
933	}
934#ifdef MAC
935	error = mac_check_pipe_write(active_cred, wpipe->pipe_pair);
936	if (error) {
937		PIPE_UNLOCK(rpipe);
938		return (error);
939	}
940#endif
941	++wpipe->pipe_busy;
942
943	/*
944	 * If it is advantageous to resize the pipe buffer, do
945	 * so.
946	 */
947	if ((uio->uio_resid > PIPE_SIZE) &&
948		(amountpipekva < maxpipekva / 2) &&
949		(nbigpipe < LIMITBIGPIPES) &&
950		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
951		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
952		(wpipe->pipe_buffer.cnt == 0)) {
953
954		if ((error = pipelock(wpipe, 1)) == 0) {
955			if (wpipe->pipe_state & PIPE_EOF)
956				error = EPIPE;
957			else {
958				PIPE_UNLOCK(wpipe);
959				if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
960					atomic_add_int(&nbigpipe, 1);
961				PIPE_LOCK(wpipe);
962			}
963			pipeunlock(wpipe);
964		}
965	}
966
967	/*
968	 * If an early error occured unbusy and return, waking up any pending
969	 * readers.
970	 */
971	if (error) {
972		--wpipe->pipe_busy;
973		if ((wpipe->pipe_busy == 0) &&
974		    (wpipe->pipe_state & PIPE_WANT)) {
975			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
976			wakeup(wpipe);
977		}
978		PIPE_UNLOCK(rpipe);
979		return(error);
980	}
981
982	orig_resid = uio->uio_resid;
983
984	while (uio->uio_resid) {
985		int space;
986
987#ifndef PIPE_NODIRECT
988		/*
989		 * If the transfer is large, we can gain performance if
990		 * we do process-to-process copies directly.
991		 * If the write is non-blocking, we don't use the
992		 * direct write mechanism.
993		 *
994		 * The direct write mechanism will detect the reader going
995		 * away on us.
996		 */
997		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
998		    (fp->f_flag & FNONBLOCK) == 0) {
999			error = pipe_direct_write(wpipe, uio);
1000			if (error)
1001				break;
1002			continue;
1003		}
1004#endif
1005
1006		/*
1007		 * Pipe buffered writes cannot be coincidental with
1008		 * direct writes.  We wait until the currently executing
1009		 * direct write is completed before we start filling the
1010		 * pipe buffer.  We break out if a signal occurs or the
1011		 * reader goes away.
1012		 */
1013	retrywrite:
1014		while (wpipe->pipe_state & PIPE_DIRECTW) {
1015			if (wpipe->pipe_state & PIPE_WANTR) {
1016				wpipe->pipe_state &= ~PIPE_WANTR;
1017				wakeup(wpipe);
1018			}
1019			error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH,
1020			    "pipbww", 0);
1021			if (wpipe->pipe_state & PIPE_EOF) {
1022				error = EPIPE;
1023				break;
1024			}
1025			if (error)
1026				break;
1027		}
1028
1029		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1030
1031		/* Writes of size <= PIPE_BUF must be atomic. */
1032		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
1033			space = 0;
1034
1035		if (space > 0) {
1036			if ((error = pipelock(wpipe,1)) == 0) {
1037				int size;	/* Transfer size */
1038				int segsize;	/* first segment to transfer */
1039
1040				/*
1041				 * It is possible for a direct write/EOF to
1042				 * slip in on us... handle them here...
1043				 */
1044				if (wpipe->pipe_state & PIPE_EOF)
1045					goto lost_wpipe;
1046				if (wpipe->pipe_state & PIPE_DIRECTW) {
1047					pipeunlock(wpipe);
1048					goto retrywrite;
1049				}
1050				/*
1051				 * If a process blocked in uiomove, our
1052				 * value for space might be bad.
1053				 *
1054				 * XXX will we be ok if the reader has gone
1055				 * away here?
1056				 */
1057				if (space > wpipe->pipe_buffer.size -
1058				    wpipe->pipe_buffer.cnt) {
1059					pipeunlock(wpipe);
1060					goto retrywrite;
1061				}
1062
1063				/*
1064				 * Transfer size is minimum of uio transfer
1065				 * and free space in pipe buffer.
1066				 */
1067				if (space > uio->uio_resid)
1068					size = uio->uio_resid;
1069				else
1070					size = space;
1071				/*
1072				 * First segment to transfer is minimum of
1073				 * transfer size and contiguous space in
1074				 * pipe buffer.  If first segment to transfer
1075				 * is less than the transfer size, we've got
1076				 * a wraparound in the buffer.
1077				 */
1078				segsize = wpipe->pipe_buffer.size -
1079					wpipe->pipe_buffer.in;
1080				if (segsize > size)
1081					segsize = size;
1082
1083				/* Transfer first segment */
1084
1085				PIPE_UNLOCK(rpipe);
1086				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
1087						segsize, uio);
1088				PIPE_LOCK(rpipe);
1089
1090				if (error == 0 && segsize < size) {
1091					/*
1092					 * Transfer remaining part now, to
1093					 * support atomic writes.  Wraparound
1094					 * happened.
1095					 */
1096					if (wpipe->pipe_buffer.in + segsize !=
1097					    wpipe->pipe_buffer.size)
1098						panic("Expected pipe buffer "
1099						    "wraparound disappeared");
1100
1101					PIPE_UNLOCK(rpipe);
1102					error = uiomove(
1103					    &wpipe->pipe_buffer.buffer[0],
1104					    size - segsize, uio);
1105					PIPE_LOCK(rpipe);
1106				}
1107				if (error == 0) {
1108					wpipe->pipe_buffer.in += size;
1109					if (wpipe->pipe_buffer.in >=
1110					    wpipe->pipe_buffer.size) {
1111						if (wpipe->pipe_buffer.in !=
1112						    size - segsize +
1113						    wpipe->pipe_buffer.size)
1114							panic("Expected "
1115							    "wraparound bad");
1116						wpipe->pipe_buffer.in = size -
1117						    segsize;
1118					}
1119
1120					wpipe->pipe_buffer.cnt += size;
1121					if (wpipe->pipe_buffer.cnt >
1122					    wpipe->pipe_buffer.size)
1123						panic("Pipe buffer overflow");
1124
1125				}
1126lost_wpipe:
1127				pipeunlock(wpipe);
1128			}
1129			if (error)
1130				break;
1131
1132		} else {
1133			/*
1134			 * If the "read-side" has been blocked, wake it up now.
1135			 */
1136			if (wpipe->pipe_state & PIPE_WANTR) {
1137				wpipe->pipe_state &= ~PIPE_WANTR;
1138				wakeup(wpipe);
1139			}
1140
1141			/*
1142			 * don't block on non-blocking I/O
1143			 */
1144			if (fp->f_flag & FNONBLOCK) {
1145				error = EAGAIN;
1146				break;
1147			}
1148
1149			/*
1150			 * We have no more space and have something to offer,
1151			 * wake up select/poll.
1152			 */
1153			pipeselwakeup(wpipe);
1154
1155			wpipe->pipe_state |= PIPE_WANTW;
1156			error = msleep(wpipe, PIPE_MTX(rpipe),
1157			    PRIBIO | PCATCH, "pipewr", 0);
1158			if (error != 0)
1159				break;
1160			/*
1161			 * If read side wants to go away, we just issue a signal
1162			 * to ourselves.
1163			 */
1164			if (wpipe->pipe_state & PIPE_EOF) {
1165				error = EPIPE;
1166				break;
1167			}
1168		}
1169	}
1170
1171	--wpipe->pipe_busy;
1172
1173	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
1174		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
1175		wakeup(wpipe);
1176	} else if (wpipe->pipe_buffer.cnt > 0) {
1177		/*
1178		 * If we have put any characters in the buffer, we wake up
1179		 * the reader.
1180		 */
1181		if (wpipe->pipe_state & PIPE_WANTR) {
1182			wpipe->pipe_state &= ~PIPE_WANTR;
1183			wakeup(wpipe);
1184		}
1185	}
1186
1187	/*
1188	 * Don't return EPIPE if I/O was successful
1189	 */
1190	if ((wpipe->pipe_buffer.cnt == 0) &&
1191	    (uio->uio_resid == 0) &&
1192	    (error == EPIPE)) {
1193		error = 0;
1194	}
1195
1196	if (error == 0)
1197		vfs_timestamp(&wpipe->pipe_mtime);
1198
1199	/*
1200	 * We have something to offer,
1201	 * wake up select/poll.
1202	 */
1203	if (wpipe->pipe_buffer.cnt)
1204		pipeselwakeup(wpipe);
1205
1206	PIPE_UNLOCK(rpipe);
1207	return (error);
1208}
1209
1210/*
1211 * we implement a very minimal set of ioctls for compatibility with sockets.
1212 */
1213static int
1214pipe_ioctl(fp, cmd, data, active_cred, td)
1215	struct file *fp;
1216	u_long cmd;
1217	void *data;
1218	struct ucred *active_cred;
1219	struct thread *td;
1220{
1221	struct pipe *mpipe = fp->f_data;
1222#ifdef MAC
1223	int error;
1224#endif
1225
1226	PIPE_LOCK(mpipe);
1227
1228#ifdef MAC
1229	error = mac_check_pipe_ioctl(active_cred, mpipe->pipe_pair, cmd, data);
1230	if (error) {
1231		PIPE_UNLOCK(mpipe);
1232		return (error);
1233	}
1234#endif
1235
1236	switch (cmd) {
1237
1238	case FIONBIO:
1239		PIPE_UNLOCK(mpipe);
1240		return (0);
1241
1242	case FIOASYNC:
1243		if (*(int *)data) {
1244			mpipe->pipe_state |= PIPE_ASYNC;
1245		} else {
1246			mpipe->pipe_state &= ~PIPE_ASYNC;
1247		}
1248		PIPE_UNLOCK(mpipe);
1249		return (0);
1250
1251	case FIONREAD:
1252		if (mpipe->pipe_state & PIPE_DIRECTW)
1253			*(int *)data = mpipe->pipe_map.cnt;
1254		else
1255			*(int *)data = mpipe->pipe_buffer.cnt;
1256		PIPE_UNLOCK(mpipe);
1257		return (0);
1258
1259	case FIOSETOWN:
1260		PIPE_UNLOCK(mpipe);
1261		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1262
1263	case FIOGETOWN:
1264		PIPE_UNLOCK(mpipe);
1265		*(int *)data = fgetown(&mpipe->pipe_sigio);
1266		return (0);
1267
1268	/* This is deprecated, FIOSETOWN should be used instead. */
1269	case TIOCSPGRP:
1270		PIPE_UNLOCK(mpipe);
1271		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1272
1273	/* This is deprecated, FIOGETOWN should be used instead. */
1274	case TIOCGPGRP:
1275		PIPE_UNLOCK(mpipe);
1276		*(int *)data = -fgetown(&mpipe->pipe_sigio);
1277		return (0);
1278
1279	}
1280	PIPE_UNLOCK(mpipe);
1281	return (ENOTTY);
1282}
1283
1284static int
1285pipe_poll(fp, events, active_cred, td)
1286	struct file *fp;
1287	int events;
1288	struct ucred *active_cred;
1289	struct thread *td;
1290{
1291	struct pipe *rpipe = fp->f_data;
1292	struct pipe *wpipe;
1293	int revents = 0;
1294#ifdef MAC
1295	int error;
1296#endif
1297
1298	wpipe = rpipe->pipe_peer;
1299	PIPE_LOCK(rpipe);
1300#ifdef MAC
1301	error = mac_check_pipe_poll(active_cred, rpipe->pipe_pair);
1302	if (error)
1303		goto locked_error;
1304#endif
1305	if (events & (POLLIN | POLLRDNORM))
1306		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1307		    (rpipe->pipe_buffer.cnt > 0) ||
1308		    (rpipe->pipe_state & PIPE_EOF))
1309			revents |= events & (POLLIN | POLLRDNORM);
1310
1311	if (events & (POLLOUT | POLLWRNORM))
1312		if (!wpipe->pipe_present || (wpipe->pipe_state & PIPE_EOF) ||
1313		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1314		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1315			revents |= events & (POLLOUT | POLLWRNORM);
1316
1317	if ((rpipe->pipe_state & PIPE_EOF) ||
1318	    (!wpipe->pipe_present) ||
1319	    (wpipe->pipe_state & PIPE_EOF))
1320		revents |= POLLHUP;
1321
1322	if (revents == 0) {
1323		if (events & (POLLIN | POLLRDNORM)) {
1324			selrecord(td, &rpipe->pipe_sel);
1325			rpipe->pipe_state |= PIPE_SEL;
1326		}
1327
1328		if (events & (POLLOUT | POLLWRNORM)) {
1329			selrecord(td, &wpipe->pipe_sel);
1330			wpipe->pipe_state |= PIPE_SEL;
1331		}
1332	}
1333#ifdef MAC
1334locked_error:
1335#endif
1336	PIPE_UNLOCK(rpipe);
1337
1338	return (revents);
1339}
1340
1341/*
1342 * We shouldn't need locks here as we're doing a read and this should
1343 * be a natural race.
1344 */
1345static int
1346pipe_stat(fp, ub, active_cred, td)
1347	struct file *fp;
1348	struct stat *ub;
1349	struct ucred *active_cred;
1350	struct thread *td;
1351{
1352	struct pipe *pipe = fp->f_data;
1353#ifdef MAC
1354	int error;
1355
1356	PIPE_LOCK(pipe);
1357	error = mac_check_pipe_stat(active_cred, pipe->pipe_pair);
1358	PIPE_UNLOCK(pipe);
1359	if (error)
1360		return (error);
1361#endif
1362	bzero(ub, sizeof(*ub));
1363	ub->st_mode = S_IFIFO;
1364	ub->st_blksize = pipe->pipe_buffer.size;
1365	if (pipe->pipe_state & PIPE_DIRECTW)
1366		ub->st_size = pipe->pipe_map.cnt;
1367	else
1368		ub->st_size = pipe->pipe_buffer.cnt;
1369	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1370	ub->st_atimespec = pipe->pipe_atime;
1371	ub->st_mtimespec = pipe->pipe_mtime;
1372	ub->st_ctimespec = pipe->pipe_ctime;
1373	ub->st_uid = fp->f_cred->cr_uid;
1374	ub->st_gid = fp->f_cred->cr_gid;
1375	/*
1376	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1377	 * XXX (st_dev, st_ino) should be unique.
1378	 */
1379	return (0);
1380}
1381
1382/* ARGSUSED */
1383static int
1384pipe_close(fp, td)
1385	struct file *fp;
1386	struct thread *td;
1387{
1388	struct pipe *cpipe = fp->f_data;
1389
1390	fp->f_ops = &badfileops;
1391	fp->f_data = NULL;
1392	funsetown(&cpipe->pipe_sigio);
1393	pipeclose(cpipe);
1394	return (0);
1395}
1396
1397static void
1398pipe_free_kmem(cpipe)
1399	struct pipe *cpipe;
1400{
1401
1402	KASSERT(!mtx_owned(PIPE_MTX(cpipe)),
1403	    ("pipe_free_kmem: pipe mutex locked"));
1404
1405	if (cpipe->pipe_buffer.buffer != NULL) {
1406		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1407			atomic_subtract_int(&nbigpipe, 1);
1408		atomic_subtract_int(&amountpipekva, cpipe->pipe_buffer.size);
1409		vm_map_remove(pipe_map,
1410		    (vm_offset_t)cpipe->pipe_buffer.buffer,
1411		    (vm_offset_t)cpipe->pipe_buffer.buffer + cpipe->pipe_buffer.size);
1412		cpipe->pipe_buffer.buffer = NULL;
1413	}
1414#ifndef PIPE_NODIRECT
1415	{
1416		cpipe->pipe_map.cnt = 0;
1417		cpipe->pipe_map.pos = 0;
1418		cpipe->pipe_map.npages = 0;
1419	}
1420#endif
1421}
1422
1423/*
1424 * shutdown the pipe
1425 */
1426static void
1427pipeclose(cpipe)
1428	struct pipe *cpipe;
1429{
1430	struct pipepair *pp;
1431	struct pipe *ppipe;
1432
1433	KASSERT(cpipe != NULL, ("pipeclose: cpipe == NULL"));
1434
1435	PIPE_LOCK(cpipe);
1436	pp = cpipe->pipe_pair;
1437
1438	pipeselwakeup(cpipe);
1439
1440	/*
1441	 * If the other side is blocked, wake it up saying that
1442	 * we want to close it down.
1443	 */
1444	cpipe->pipe_state |= PIPE_EOF;
1445	while (cpipe->pipe_busy) {
1446		wakeup(cpipe);
1447		cpipe->pipe_state |= PIPE_WANT;
1448		msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
1449	}
1450
1451
1452	/*
1453	 * Disconnect from peer, if any.
1454	 */
1455	ppipe = cpipe->pipe_peer;
1456	if (ppipe->pipe_present != 0) {
1457		pipeselwakeup(ppipe);
1458
1459		ppipe->pipe_state |= PIPE_EOF;
1460		wakeup(ppipe);
1461		KNOTE(&ppipe->pipe_sel.si_note, 0);
1462	}
1463
1464	/*
1465	 * Mark this endpoint as free.  Release kmem resources.  We
1466	 * don't mark this endpoint as unused until we've finished
1467	 * doing that, or the pipe might disappear out from under
1468	 * us.
1469	 */
1470	pipelock(cpipe, 0);
1471	PIPE_UNLOCK(cpipe);
1472	pipe_free_kmem(cpipe);
1473	PIPE_LOCK(cpipe);
1474	cpipe->pipe_present = 0;
1475	pipeunlock(cpipe);
1476
1477	/*
1478	 * If both endpoints are now closed, release the memory for the
1479	 * pipe pair.  If not, unlock.
1480	 */
1481	if (ppipe->pipe_present == 0) {
1482		PIPE_UNLOCK(cpipe);
1483#ifdef MAC
1484		mac_destroy_pipe(pp);
1485#endif
1486		uma_zfree(pipe_zone, cpipe->pipe_pair);
1487	} else
1488		PIPE_UNLOCK(cpipe);
1489}
1490
1491/*ARGSUSED*/
1492static int
1493pipe_kqfilter(struct file *fp, struct knote *kn)
1494{
1495	struct pipe *cpipe;
1496
1497	cpipe = kn->kn_fp->f_data;
1498	PIPE_LOCK(cpipe);
1499	switch (kn->kn_filter) {
1500	case EVFILT_READ:
1501		kn->kn_fop = &pipe_rfiltops;
1502		break;
1503	case EVFILT_WRITE:
1504		kn->kn_fop = &pipe_wfiltops;
1505		if (!cpipe->pipe_peer->pipe_present) {
1506			/* other end of pipe has been closed */
1507			PIPE_UNLOCK(cpipe);
1508			return (EPIPE);
1509		}
1510		cpipe = cpipe->pipe_peer;
1511		break;
1512	default:
1513		PIPE_UNLOCK(cpipe);
1514		return (1);
1515	}
1516
1517	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1518	PIPE_UNLOCK(cpipe);
1519	return (0);
1520}
1521
1522static void
1523filt_pipedetach(struct knote *kn)
1524{
1525	struct pipe *cpipe = (struct pipe *)kn->kn_fp->f_data;
1526
1527	PIPE_LOCK(cpipe);
1528	if (kn->kn_filter == EVFILT_WRITE) {
1529		if (!cpipe->pipe_peer->pipe_present) {
1530			PIPE_UNLOCK(cpipe);
1531			return;
1532		}
1533		cpipe = cpipe->pipe_peer;
1534	}
1535	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1536	PIPE_UNLOCK(cpipe);
1537}
1538
1539/*ARGSUSED*/
1540static int
1541filt_piperead(struct knote *kn, long hint)
1542{
1543	struct pipe *rpipe = kn->kn_fp->f_data;
1544	struct pipe *wpipe = rpipe->pipe_peer;
1545
1546	PIPE_LOCK(rpipe);
1547	kn->kn_data = rpipe->pipe_buffer.cnt;
1548	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1549		kn->kn_data = rpipe->pipe_map.cnt;
1550
1551	if ((rpipe->pipe_state & PIPE_EOF) ||
1552	    (!wpipe->pipe_present) || (wpipe->pipe_state & PIPE_EOF)) {
1553		kn->kn_flags |= EV_EOF;
1554		PIPE_UNLOCK(rpipe);
1555		return (1);
1556	}
1557	PIPE_UNLOCK(rpipe);
1558	return (kn->kn_data > 0);
1559}
1560
1561/*ARGSUSED*/
1562static int
1563filt_pipewrite(struct knote *kn, long hint)
1564{
1565	struct pipe *rpipe = kn->kn_fp->f_data;
1566	struct pipe *wpipe = rpipe->pipe_peer;
1567
1568	PIPE_LOCK(rpipe);
1569	if ((!wpipe->pipe_present) || (wpipe->pipe_state & PIPE_EOF)) {
1570		kn->kn_data = 0;
1571		kn->kn_flags |= EV_EOF;
1572		PIPE_UNLOCK(rpipe);
1573		return (1);
1574	}
1575	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1576	if (wpipe->pipe_state & PIPE_DIRECTW)
1577		kn->kn_data = 0;
1578
1579	PIPE_UNLOCK(rpipe);
1580	return (kn->kn_data >= PIPE_BUF);
1581}
1582