sys_pipe.c revision 98989
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 98989 2002-06-28 22:35:12Z alfred $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/kernel.h>
59#include <sys/lock.h>
60#include <sys/mutex.h>
61#include <sys/ttycom.h>
62#include <sys/stat.h>
63#include <sys/malloc.h>
64#include <sys/poll.h>
65#include <sys/selinfo.h>
66#include <sys/signalvar.h>
67#include <sys/sysproto.h>
68#include <sys/pipe.h>
69#include <sys/proc.h>
70#include <sys/vnode.h>
71#include <sys/uio.h>
72#include <sys/event.h>
73
74#include <vm/vm.h>
75#include <vm/vm_param.h>
76#include <vm/vm_object.h>
77#include <vm/vm_kern.h>
78#include <vm/vm_extern.h>
79#include <vm/pmap.h>
80#include <vm/vm_map.h>
81#include <vm/vm_page.h>
82#include <vm/uma.h>
83
84/*
85 * Use this define if you want to disable *fancy* VM things.  Expect an
86 * approx 30% decrease in transfer rate.  This could be useful for
87 * NetBSD or OpenBSD.
88 */
89/* #define PIPE_NODIRECT */
90
91/*
92 * interfaces to the outside world
93 */
94static int pipe_read(struct file *fp, struct uio *uio,
95		struct ucred *cred, int flags, struct thread *td);
96static int pipe_write(struct file *fp, struct uio *uio,
97		struct ucred *cred, int flags, struct thread *td);
98static int pipe_close(struct file *fp, struct thread *td);
99static int pipe_poll(struct file *fp, int events, struct ucred *cred,
100		struct thread *td);
101static int pipe_kqfilter(struct file *fp, struct knote *kn);
102static int pipe_stat(struct file *fp, struct stat *sb, struct thread *td);
103static int pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td);
104
105static struct fileops pipeops = {
106	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
107	pipe_stat, pipe_close
108};
109
110static void	filt_pipedetach(struct knote *kn);
111static int	filt_piperead(struct knote *kn, long hint);
112static int	filt_pipewrite(struct knote *kn, long hint);
113
114static struct filterops pipe_rfiltops =
115	{ 1, NULL, filt_pipedetach, filt_piperead };
116static struct filterops pipe_wfiltops =
117	{ 1, NULL, filt_pipedetach, filt_pipewrite };
118
119#define PIPE_GET_GIANT(pipe)						\
120	do {								\
121		KASSERT(((pipe)->pipe_state & PIPE_LOCKFL) != 0,	\
122		    ("%s:%d PIPE_GET_GIANT: line pipe not locked",	\
123		     __FILE__, __LINE__));				\
124		PIPE_UNLOCK(pipe);					\
125		mtx_lock(&Giant);					\
126	} while (0)
127
128#define PIPE_DROP_GIANT(pipe)						\
129	do {								\
130		mtx_unlock(&Giant);					\
131		PIPE_LOCK(pipe);					\
132	} while (0)
133
134/*
135 * Default pipe buffer size(s), this can be kind-of large now because pipe
136 * space is pageable.  The pipe code will try to maintain locality of
137 * reference for performance reasons, so small amounts of outstanding I/O
138 * will not wipe the cache.
139 */
140#define MINPIPESIZE (PIPE_SIZE/3)
141#define MAXPIPESIZE (2*PIPE_SIZE/3)
142
143/*
144 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
145 * is there so that on large systems, we don't exhaust it.
146 */
147#define MAXPIPEKVA (8*1024*1024)
148
149/*
150 * Limit for direct transfers, we cannot, of course limit
151 * the amount of kva for pipes in general though.
152 */
153#define LIMITPIPEKVA (16*1024*1024)
154
155/*
156 * Limit the number of "big" pipes
157 */
158#define LIMITBIGPIPES	32
159static int nbigpipe;
160
161static int amountpipekva;
162
163static void pipeinit(void *dummy __unused);
164static void pipeclose(struct pipe *cpipe);
165static void pipe_free_kmem(struct pipe *cpipe);
166static int pipe_create(struct pipe **cpipep);
167static __inline int pipelock(struct pipe *cpipe, int catch);
168static __inline void pipeunlock(struct pipe *cpipe);
169static __inline void pipeselwakeup(struct pipe *cpipe);
170#ifndef PIPE_NODIRECT
171static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio);
172static void pipe_destroy_write_buffer(struct pipe *wpipe);
173static int pipe_direct_write(struct pipe *wpipe, struct uio *uio);
174static void pipe_clone_write_buffer(struct pipe *wpipe);
175#endif
176static int pipespace(struct pipe *cpipe, int size);
177
178static uma_zone_t pipe_zone;
179
180SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
181
182static void
183pipeinit(void *dummy __unused)
184{
185	pipe_zone = uma_zcreate("PIPE", sizeof(struct pipe), NULL,
186	    NULL, NULL, NULL, UMA_ALIGN_PTR, 0);
187}
188
189/*
190 * The pipe system call for the DTYPE_PIPE type of pipes
191 */
192
193/* ARGSUSED */
194int
195pipe(td, uap)
196	struct thread *td;
197	struct pipe_args /* {
198		int	dummy;
199	} */ *uap;
200{
201	struct filedesc *fdp = td->td_proc->p_fd;
202	struct file *rf, *wf;
203	struct pipe *rpipe, *wpipe;
204	struct mtx *pmtx;
205	int fd, error;
206
207	KASSERT(pipe_zone != NULL, ("pipe_zone not initialized"));
208
209	pmtx = malloc(sizeof(*pmtx), M_TEMP, M_WAITOK | M_ZERO);
210
211	rpipe = wpipe = NULL;
212	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
213		pipeclose(rpipe);
214		pipeclose(wpipe);
215		free(pmtx, M_TEMP);
216		return (ENFILE);
217	}
218
219	rpipe->pipe_state |= PIPE_DIRECTOK;
220	wpipe->pipe_state |= PIPE_DIRECTOK;
221
222	error = falloc(td, &rf, &fd);
223	if (error) {
224		pipeclose(rpipe);
225		pipeclose(wpipe);
226		free(pmtx, M_TEMP);
227		return (error);
228	}
229	fhold(rf);
230	td->td_retval[0] = fd;
231
232	/*
233	 * Warning: once we've gotten past allocation of the fd for the
234	 * read-side, we can only drop the read side via fdrop() in order
235	 * to avoid races against processes which manage to dup() the read
236	 * side while we are blocked trying to allocate the write side.
237	 */
238	FILE_LOCK(rf);
239	rf->f_flag = FREAD | FWRITE;
240	rf->f_type = DTYPE_PIPE;
241	rf->f_data = (caddr_t)rpipe;
242	rf->f_ops = &pipeops;
243	FILE_UNLOCK(rf);
244	error = falloc(td, &wf, &fd);
245	if (error) {
246		FILEDESC_LOCK(fdp);
247		if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
248			fdp->fd_ofiles[td->td_retval[0]] = NULL;
249			FILEDESC_UNLOCK(fdp);
250			fdrop(rf, td);
251		} else
252			FILEDESC_UNLOCK(fdp);
253		fdrop(rf, td);
254		/* rpipe has been closed by fdrop(). */
255		pipeclose(wpipe);
256		free(pmtx, M_TEMP);
257		return (error);
258	}
259	FILE_LOCK(wf);
260	wf->f_flag = FREAD | FWRITE;
261	wf->f_type = DTYPE_PIPE;
262	wf->f_data = (caddr_t)wpipe;
263	wf->f_ops = &pipeops;
264	FILE_UNLOCK(wf);
265	td->td_retval[1] = fd;
266	rpipe->pipe_peer = wpipe;
267	wpipe->pipe_peer = rpipe;
268	mtx_init(pmtx, "pipe mutex", NULL, MTX_DEF | MTX_RECURSE);
269	rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx;
270	fdrop(rf, td);
271
272	return (0);
273}
274
275/*
276 * Allocate kva for pipe circular buffer, the space is pageable
277 * This routine will 'realloc' the size of a pipe safely, if it fails
278 * it will retain the old buffer.
279 * If it fails it will return ENOMEM.
280 */
281static int
282pipespace(cpipe, size)
283	struct pipe *cpipe;
284	int size;
285{
286	struct vm_object *object;
287	caddr_t buffer;
288	int npages, error;
289
290	GIANT_REQUIRED;
291	KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
292	       ("pipespace: pipe mutex locked"));
293
294	npages = round_page(size)/PAGE_SIZE;
295	/*
296	 * Create an object, I don't like the idea of paging to/from
297	 * kernel_object.
298	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
299	 */
300	object = vm_object_allocate(OBJT_DEFAULT, npages);
301	buffer = (caddr_t) vm_map_min(kernel_map);
302
303	/*
304	 * Insert the object into the kernel map, and allocate kva for it.
305	 * The map entry is, by default, pageable.
306	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
307	 */
308	error = vm_map_find(kernel_map, object, 0,
309		(vm_offset_t *) &buffer, size, 1,
310		VM_PROT_ALL, VM_PROT_ALL, 0);
311
312	if (error != KERN_SUCCESS) {
313		vm_object_deallocate(object);
314		return (ENOMEM);
315	}
316
317	/* free old resources if we're resizing */
318	pipe_free_kmem(cpipe);
319	cpipe->pipe_buffer.object = object;
320	cpipe->pipe_buffer.buffer = buffer;
321	cpipe->pipe_buffer.size = size;
322	cpipe->pipe_buffer.in = 0;
323	cpipe->pipe_buffer.out = 0;
324	cpipe->pipe_buffer.cnt = 0;
325	amountpipekva += cpipe->pipe_buffer.size;
326	return (0);
327}
328
329/*
330 * initialize and allocate VM and memory for pipe
331 */
332static int
333pipe_create(cpipep)
334	struct pipe **cpipep;
335{
336	struct pipe *cpipe;
337	int error;
338
339	*cpipep = uma_zalloc(pipe_zone, M_WAITOK);
340	if (*cpipep == NULL)
341		return (ENOMEM);
342
343	cpipe = *cpipep;
344
345	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
346	cpipe->pipe_buffer.object = NULL;
347#ifndef PIPE_NODIRECT
348	cpipe->pipe_map.kva = NULL;
349#endif
350	/*
351	 * protect so pipeclose() doesn't follow a junk pointer
352	 * if pipespace() fails.
353	 */
354	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
355	cpipe->pipe_state = 0;
356	cpipe->pipe_peer = NULL;
357	cpipe->pipe_busy = 0;
358
359#ifndef PIPE_NODIRECT
360	/*
361	 * pipe data structure initializations to support direct pipe I/O
362	 */
363	cpipe->pipe_map.cnt = 0;
364	cpipe->pipe_map.kva = 0;
365	cpipe->pipe_map.pos = 0;
366	cpipe->pipe_map.npages = 0;
367	/* cpipe->pipe_map.ms[] = invalid */
368#endif
369
370	cpipe->pipe_mtxp = NULL;	/* avoid pipespace assertion */
371	error = pipespace(cpipe, PIPE_SIZE);
372	if (error)
373		return (error);
374
375	vfs_timestamp(&cpipe->pipe_ctime);
376	cpipe->pipe_atime = cpipe->pipe_ctime;
377	cpipe->pipe_mtime = cpipe->pipe_ctime;
378
379	return (0);
380}
381
382
383/*
384 * lock a pipe for I/O, blocking other access
385 */
386static __inline int
387pipelock(cpipe, catch)
388	struct pipe *cpipe;
389	int catch;
390{
391	int error;
392
393	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
394	while (cpipe->pipe_state & PIPE_LOCKFL) {
395		cpipe->pipe_state |= PIPE_LWANT;
396		error = msleep(cpipe, PIPE_MTX(cpipe),
397		    catch ? (PRIBIO | PCATCH) : PRIBIO,
398		    "pipelk", 0);
399		if (error != 0)
400			return (error);
401	}
402	cpipe->pipe_state |= PIPE_LOCKFL;
403	return (0);
404}
405
406/*
407 * unlock a pipe I/O lock
408 */
409static __inline void
410pipeunlock(cpipe)
411	struct pipe *cpipe;
412{
413
414	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
415	cpipe->pipe_state &= ~PIPE_LOCKFL;
416	if (cpipe->pipe_state & PIPE_LWANT) {
417		cpipe->pipe_state &= ~PIPE_LWANT;
418		wakeup(cpipe);
419	}
420}
421
422static __inline void
423pipeselwakeup(cpipe)
424	struct pipe *cpipe;
425{
426
427	if (cpipe->pipe_state & PIPE_SEL) {
428		cpipe->pipe_state &= ~PIPE_SEL;
429		selwakeup(&cpipe->pipe_sel);
430	}
431	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
432		pgsigio(&cpipe->pipe_sigio, SIGIO, 0);
433	KNOTE(&cpipe->pipe_sel.si_note, 0);
434}
435
436/* ARGSUSED */
437static int
438pipe_read(fp, uio, cred, flags, td)
439	struct file *fp;
440	struct uio *uio;
441	struct ucred *cred;
442	struct thread *td;
443	int flags;
444{
445	struct pipe *rpipe = (struct pipe *) fp->f_data;
446	int error;
447	int nread = 0;
448	u_int size;
449
450	PIPE_LOCK(rpipe);
451	++rpipe->pipe_busy;
452	error = pipelock(rpipe, 1);
453	if (error)
454		goto unlocked_error;
455
456	while (uio->uio_resid) {
457		/*
458		 * normal pipe buffer receive
459		 */
460		if (rpipe->pipe_buffer.cnt > 0) {
461			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
462			if (size > rpipe->pipe_buffer.cnt)
463				size = rpipe->pipe_buffer.cnt;
464			if (size > (u_int) uio->uio_resid)
465				size = (u_int) uio->uio_resid;
466
467			PIPE_UNLOCK(rpipe);
468			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
469					size, uio);
470			PIPE_LOCK(rpipe);
471			if (error)
472				break;
473
474			rpipe->pipe_buffer.out += size;
475			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
476				rpipe->pipe_buffer.out = 0;
477
478			rpipe->pipe_buffer.cnt -= size;
479
480			/*
481			 * If there is no more to read in the pipe, reset
482			 * its pointers to the beginning.  This improves
483			 * cache hit stats.
484			 */
485			if (rpipe->pipe_buffer.cnt == 0) {
486				rpipe->pipe_buffer.in = 0;
487				rpipe->pipe_buffer.out = 0;
488			}
489			nread += size;
490#ifndef PIPE_NODIRECT
491		/*
492		 * Direct copy, bypassing a kernel buffer.
493		 */
494		} else if ((size = rpipe->pipe_map.cnt) &&
495			   (rpipe->pipe_state & PIPE_DIRECTW)) {
496			caddr_t	va;
497			if (size > (u_int) uio->uio_resid)
498				size = (u_int) uio->uio_resid;
499
500			va = (caddr_t) rpipe->pipe_map.kva +
501			    rpipe->pipe_map.pos;
502			PIPE_UNLOCK(rpipe);
503			error = uiomove(va, size, uio);
504			PIPE_LOCK(rpipe);
505			if (error)
506				break;
507			nread += size;
508			rpipe->pipe_map.pos += size;
509			rpipe->pipe_map.cnt -= size;
510			if (rpipe->pipe_map.cnt == 0) {
511				rpipe->pipe_state &= ~PIPE_DIRECTW;
512				wakeup(rpipe);
513			}
514#endif
515		} else {
516			/*
517			 * detect EOF condition
518			 * read returns 0 on EOF, no need to set error
519			 */
520			if (rpipe->pipe_state & PIPE_EOF)
521				break;
522
523			/*
524			 * If the "write-side" has been blocked, wake it up now.
525			 */
526			if (rpipe->pipe_state & PIPE_WANTW) {
527				rpipe->pipe_state &= ~PIPE_WANTW;
528				wakeup(rpipe);
529			}
530
531			/*
532			 * Break if some data was read.
533			 */
534			if (nread > 0)
535				break;
536
537			/*
538			 * Unlock the pipe buffer for our remaining processing.  We
539			 * will either break out with an error or we will sleep and
540			 * relock to loop.
541			 */
542			pipeunlock(rpipe);
543
544			/*
545			 * Handle non-blocking mode operation or
546			 * wait for more data.
547			 */
548			if (fp->f_flag & FNONBLOCK) {
549				error = EAGAIN;
550			} else {
551				rpipe->pipe_state |= PIPE_WANTR;
552				if ((error = msleep(rpipe, PIPE_MTX(rpipe),
553				    PRIBIO | PCATCH,
554				    "piperd", 0)) == 0)
555					error = pipelock(rpipe, 1);
556			}
557			if (error)
558				goto unlocked_error;
559		}
560	}
561	pipeunlock(rpipe);
562
563	/* XXX: should probably do this before getting any locks. */
564	if (error == 0)
565		vfs_timestamp(&rpipe->pipe_atime);
566unlocked_error:
567	--rpipe->pipe_busy;
568
569	/*
570	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
571	 */
572	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
573		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
574		wakeup(rpipe);
575	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
576		/*
577		 * Handle write blocking hysteresis.
578		 */
579		if (rpipe->pipe_state & PIPE_WANTW) {
580			rpipe->pipe_state &= ~PIPE_WANTW;
581			wakeup(rpipe);
582		}
583	}
584
585	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
586		pipeselwakeup(rpipe);
587
588	PIPE_UNLOCK(rpipe);
589	return (error);
590}
591
592#ifndef PIPE_NODIRECT
593/*
594 * Map the sending processes' buffer into kernel space and wire it.
595 * This is similar to a physical write operation.
596 */
597static int
598pipe_build_write_buffer(wpipe, uio)
599	struct pipe *wpipe;
600	struct uio *uio;
601{
602	u_int size;
603	int i;
604	vm_offset_t addr, endaddr, paddr;
605
606	GIANT_REQUIRED;
607	PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
608
609	size = (u_int) uio->uio_iov->iov_len;
610	if (size > wpipe->pipe_buffer.size)
611		size = wpipe->pipe_buffer.size;
612
613	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
614	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
615	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
616		vm_page_t m;
617
618		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
619		    (paddr = pmap_extract(vmspace_pmap(curproc->p_vmspace),
620		     addr)) == 0) {
621			int j;
622
623			for (j = 0; j < i; j++)
624				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
625			return (EFAULT);
626		}
627
628		m = PHYS_TO_VM_PAGE(paddr);
629		vm_page_wire(m);
630		wpipe->pipe_map.ms[i] = m;
631	}
632
633/*
634 * set up the control block
635 */
636	wpipe->pipe_map.npages = i;
637	wpipe->pipe_map.pos =
638	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
639	wpipe->pipe_map.cnt = size;
640
641/*
642 * and map the buffer
643 */
644	if (wpipe->pipe_map.kva == 0) {
645		/*
646		 * We need to allocate space for an extra page because the
647		 * address range might (will) span pages at times.
648		 */
649		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
650			wpipe->pipe_buffer.size + PAGE_SIZE);
651		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
652	}
653	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
654		wpipe->pipe_map.npages);
655
656/*
657 * and update the uio data
658 */
659
660	uio->uio_iov->iov_len -= size;
661	uio->uio_iov->iov_base += size;
662	if (uio->uio_iov->iov_len == 0)
663		uio->uio_iov++;
664	uio->uio_resid -= size;
665	uio->uio_offset += size;
666	return (0);
667}
668
669/*
670 * unmap and unwire the process buffer
671 */
672static void
673pipe_destroy_write_buffer(wpipe)
674	struct pipe *wpipe;
675{
676	int i;
677
678	GIANT_REQUIRED;
679	PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
680
681	if (wpipe->pipe_map.kva) {
682		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
683
684		if (amountpipekva > MAXPIPEKVA) {
685			vm_offset_t kva = wpipe->pipe_map.kva;
686			wpipe->pipe_map.kva = 0;
687			kmem_free(kernel_map, kva,
688				wpipe->pipe_buffer.size + PAGE_SIZE);
689			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
690		}
691	}
692	for (i = 0; i < wpipe->pipe_map.npages; i++)
693		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
694	wpipe->pipe_map.npages = 0;
695}
696
697/*
698 * In the case of a signal, the writing process might go away.  This
699 * code copies the data into the circular buffer so that the source
700 * pages can be freed without loss of data.
701 */
702static void
703pipe_clone_write_buffer(wpipe)
704	struct pipe *wpipe;
705{
706	int size;
707	int pos;
708
709	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
710	size = wpipe->pipe_map.cnt;
711	pos = wpipe->pipe_map.pos;
712
713	wpipe->pipe_buffer.in = size;
714	wpipe->pipe_buffer.out = 0;
715	wpipe->pipe_buffer.cnt = size;
716	wpipe->pipe_state &= ~PIPE_DIRECTW;
717
718	PIPE_GET_GIANT(wpipe);
719	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
720	    (caddr_t) wpipe->pipe_buffer.buffer, size);
721	pipe_destroy_write_buffer(wpipe);
722	PIPE_DROP_GIANT(wpipe);
723}
724
725/*
726 * This implements the pipe buffer write mechanism.  Note that only
727 * a direct write OR a normal pipe write can be pending at any given time.
728 * If there are any characters in the pipe buffer, the direct write will
729 * be deferred until the receiving process grabs all of the bytes from
730 * the pipe buffer.  Then the direct mapping write is set-up.
731 */
732static int
733pipe_direct_write(wpipe, uio)
734	struct pipe *wpipe;
735	struct uio *uio;
736{
737	int error;
738
739retry:
740	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
741	while (wpipe->pipe_state & PIPE_DIRECTW) {
742		if (wpipe->pipe_state & PIPE_WANTR) {
743			wpipe->pipe_state &= ~PIPE_WANTR;
744			wakeup(wpipe);
745		}
746		wpipe->pipe_state |= PIPE_WANTW;
747		error = msleep(wpipe, PIPE_MTX(wpipe),
748		    PRIBIO | PCATCH, "pipdww", 0);
749		if (error)
750			goto error1;
751		if (wpipe->pipe_state & PIPE_EOF) {
752			error = EPIPE;
753			goto error1;
754		}
755	}
756	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
757	if (wpipe->pipe_buffer.cnt > 0) {
758		if (wpipe->pipe_state & PIPE_WANTR) {
759			wpipe->pipe_state &= ~PIPE_WANTR;
760			wakeup(wpipe);
761		}
762
763		wpipe->pipe_state |= PIPE_WANTW;
764		error = msleep(wpipe, PIPE_MTX(wpipe),
765		    PRIBIO | PCATCH, "pipdwc", 0);
766		if (error)
767			goto error1;
768		if (wpipe->pipe_state & PIPE_EOF) {
769			error = EPIPE;
770			goto error1;
771		}
772		goto retry;
773	}
774
775	wpipe->pipe_state |= PIPE_DIRECTW;
776
777	pipelock(wpipe, 0);
778	PIPE_GET_GIANT(wpipe);
779	error = pipe_build_write_buffer(wpipe, uio);
780	PIPE_DROP_GIANT(wpipe);
781	pipeunlock(wpipe);
782	if (error) {
783		wpipe->pipe_state &= ~PIPE_DIRECTW;
784		goto error1;
785	}
786
787	error = 0;
788	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
789		if (wpipe->pipe_state & PIPE_EOF) {
790			pipelock(wpipe, 0);
791			PIPE_GET_GIANT(wpipe);
792			pipe_destroy_write_buffer(wpipe);
793			PIPE_DROP_GIANT(wpipe);
794			pipeunlock(wpipe);
795			pipeselwakeup(wpipe);
796			error = EPIPE;
797			goto error1;
798		}
799		if (wpipe->pipe_state & PIPE_WANTR) {
800			wpipe->pipe_state &= ~PIPE_WANTR;
801			wakeup(wpipe);
802		}
803		pipeselwakeup(wpipe);
804		error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
805		    "pipdwt", 0);
806	}
807
808	pipelock(wpipe,0);
809	if (wpipe->pipe_state & PIPE_DIRECTW) {
810		/*
811		 * this bit of trickery substitutes a kernel buffer for
812		 * the process that might be going away.
813		 */
814		pipe_clone_write_buffer(wpipe);
815	} else {
816		PIPE_GET_GIANT(wpipe);
817		pipe_destroy_write_buffer(wpipe);
818		PIPE_DROP_GIANT(wpipe);
819	}
820	pipeunlock(wpipe);
821	return (error);
822
823error1:
824	wakeup(wpipe);
825	return (error);
826}
827#endif
828
829static int
830pipe_write(fp, uio, cred, flags, td)
831	struct file *fp;
832	struct uio *uio;
833	struct ucred *cred;
834	struct thread *td;
835	int flags;
836{
837	int error = 0;
838	int orig_resid;
839	struct pipe *wpipe, *rpipe;
840
841	rpipe = (struct pipe *) fp->f_data;
842	wpipe = rpipe->pipe_peer;
843
844	PIPE_LOCK(rpipe);
845	/*
846	 * detect loss of pipe read side, issue SIGPIPE if lost.
847	 */
848	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
849		PIPE_UNLOCK(rpipe);
850		return (EPIPE);
851	}
852	++wpipe->pipe_busy;
853
854	/*
855	 * If it is advantageous to resize the pipe buffer, do
856	 * so.
857	 */
858	if ((uio->uio_resid > PIPE_SIZE) &&
859		(nbigpipe < LIMITBIGPIPES) &&
860		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
861		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
862		(wpipe->pipe_buffer.cnt == 0)) {
863
864		if ((error = pipelock(wpipe,1)) == 0) {
865			PIPE_GET_GIANT(wpipe);
866			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
867				nbigpipe++;
868			PIPE_DROP_GIANT(wpipe);
869			pipeunlock(wpipe);
870		}
871	}
872
873	/*
874	 * If an early error occured unbusy and return, waking up any pending
875	 * readers.
876	 */
877	if (error) {
878		--wpipe->pipe_busy;
879		if ((wpipe->pipe_busy == 0) &&
880		    (wpipe->pipe_state & PIPE_WANT)) {
881			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
882			wakeup(wpipe);
883		}
884		PIPE_UNLOCK(rpipe);
885		return(error);
886	}
887
888	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
889
890	orig_resid = uio->uio_resid;
891
892	while (uio->uio_resid) {
893		int space;
894
895#ifndef PIPE_NODIRECT
896		/*
897		 * If the transfer is large, we can gain performance if
898		 * we do process-to-process copies directly.
899		 * If the write is non-blocking, we don't use the
900		 * direct write mechanism.
901		 *
902		 * The direct write mechanism will detect the reader going
903		 * away on us.
904		 */
905		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
906		    (fp->f_flag & FNONBLOCK) == 0 &&
907			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
908			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
909			error = pipe_direct_write( wpipe, uio);
910			if (error)
911				break;
912			continue;
913		}
914#endif
915
916		/*
917		 * Pipe buffered writes cannot be coincidental with
918		 * direct writes.  We wait until the currently executing
919		 * direct write is completed before we start filling the
920		 * pipe buffer.  We break out if a signal occurs or the
921		 * reader goes away.
922		 */
923	retrywrite:
924		while (wpipe->pipe_state & PIPE_DIRECTW) {
925			if (wpipe->pipe_state & PIPE_WANTR) {
926				wpipe->pipe_state &= ~PIPE_WANTR;
927				wakeup(wpipe);
928			}
929			error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH,
930			    "pipbww", 0);
931			if (wpipe->pipe_state & PIPE_EOF)
932				break;
933			if (error)
934				break;
935		}
936		if (wpipe->pipe_state & PIPE_EOF) {
937			error = EPIPE;
938			break;
939		}
940
941		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
942
943		/* Writes of size <= PIPE_BUF must be atomic. */
944		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
945			space = 0;
946
947		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
948			if ((error = pipelock(wpipe,1)) == 0) {
949				int size;	/* Transfer size */
950				int segsize;	/* first segment to transfer */
951
952				/*
953				 * It is possible for a direct write to
954				 * slip in on us... handle it here...
955				 */
956				if (wpipe->pipe_state & PIPE_DIRECTW) {
957					pipeunlock(wpipe);
958					goto retrywrite;
959				}
960				/*
961				 * If a process blocked in uiomove, our
962				 * value for space might be bad.
963				 *
964				 * XXX will we be ok if the reader has gone
965				 * away here?
966				 */
967				if (space > wpipe->pipe_buffer.size -
968				    wpipe->pipe_buffer.cnt) {
969					pipeunlock(wpipe);
970					goto retrywrite;
971				}
972
973				/*
974				 * Transfer size is minimum of uio transfer
975				 * and free space in pipe buffer.
976				 */
977				if (space > uio->uio_resid)
978					size = uio->uio_resid;
979				else
980					size = space;
981				/*
982				 * First segment to transfer is minimum of
983				 * transfer size and contiguous space in
984				 * pipe buffer.  If first segment to transfer
985				 * is less than the transfer size, we've got
986				 * a wraparound in the buffer.
987				 */
988				segsize = wpipe->pipe_buffer.size -
989					wpipe->pipe_buffer.in;
990				if (segsize > size)
991					segsize = size;
992
993				/* Transfer first segment */
994
995				PIPE_UNLOCK(rpipe);
996				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
997						segsize, uio);
998				PIPE_LOCK(rpipe);
999
1000				if (error == 0 && segsize < size) {
1001					/*
1002					 * Transfer remaining part now, to
1003					 * support atomic writes.  Wraparound
1004					 * happened.
1005					 */
1006					if (wpipe->pipe_buffer.in + segsize !=
1007					    wpipe->pipe_buffer.size)
1008						panic("Expected pipe buffer wraparound disappeared");
1009
1010					PIPE_UNLOCK(rpipe);
1011					error = uiomove(&wpipe->pipe_buffer.buffer[0],
1012							size - segsize, uio);
1013					PIPE_LOCK(rpipe);
1014				}
1015				if (error == 0) {
1016					wpipe->pipe_buffer.in += size;
1017					if (wpipe->pipe_buffer.in >=
1018					    wpipe->pipe_buffer.size) {
1019						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
1020							panic("Expected wraparound bad");
1021						wpipe->pipe_buffer.in = size - segsize;
1022					}
1023
1024					wpipe->pipe_buffer.cnt += size;
1025					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
1026						panic("Pipe buffer overflow");
1027
1028				}
1029				pipeunlock(wpipe);
1030			}
1031			if (error)
1032				break;
1033
1034		} else {
1035			/*
1036			 * If the "read-side" has been blocked, wake it up now.
1037			 */
1038			if (wpipe->pipe_state & PIPE_WANTR) {
1039				wpipe->pipe_state &= ~PIPE_WANTR;
1040				wakeup(wpipe);
1041			}
1042
1043			/*
1044			 * don't block on non-blocking I/O
1045			 */
1046			if (fp->f_flag & FNONBLOCK) {
1047				error = EAGAIN;
1048				break;
1049			}
1050
1051			/*
1052			 * We have no more space and have something to offer,
1053			 * wake up select/poll.
1054			 */
1055			pipeselwakeup(wpipe);
1056
1057			wpipe->pipe_state |= PIPE_WANTW;
1058			error = msleep(wpipe, PIPE_MTX(rpipe),
1059			    PRIBIO | PCATCH, "pipewr", 0);
1060			if (error != 0)
1061				break;
1062			/*
1063			 * If read side wants to go away, we just issue a signal
1064			 * to ourselves.
1065			 */
1066			if (wpipe->pipe_state & PIPE_EOF) {
1067				error = EPIPE;
1068				break;
1069			}
1070		}
1071	}
1072
1073	--wpipe->pipe_busy;
1074
1075	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
1076		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
1077		wakeup(wpipe);
1078	} else if (wpipe->pipe_buffer.cnt > 0) {
1079		/*
1080		 * If we have put any characters in the buffer, we wake up
1081		 * the reader.
1082		 */
1083		if (wpipe->pipe_state & PIPE_WANTR) {
1084			wpipe->pipe_state &= ~PIPE_WANTR;
1085			wakeup(wpipe);
1086		}
1087	}
1088
1089	/*
1090	 * Don't return EPIPE if I/O was successful
1091	 */
1092	if ((wpipe->pipe_buffer.cnt == 0) &&
1093	    (uio->uio_resid == 0) &&
1094	    (error == EPIPE)) {
1095		error = 0;
1096	}
1097
1098	if (error == 0)
1099		vfs_timestamp(&wpipe->pipe_mtime);
1100
1101	/*
1102	 * We have something to offer,
1103	 * wake up select/poll.
1104	 */
1105	if (wpipe->pipe_buffer.cnt)
1106		pipeselwakeup(wpipe);
1107
1108	PIPE_UNLOCK(rpipe);
1109	return (error);
1110}
1111
1112/*
1113 * we implement a very minimal set of ioctls for compatibility with sockets.
1114 */
1115int
1116pipe_ioctl(fp, cmd, data, td)
1117	struct file *fp;
1118	u_long cmd;
1119	caddr_t data;
1120	struct thread *td;
1121{
1122	struct pipe *mpipe = (struct pipe *)fp->f_data;
1123
1124	switch (cmd) {
1125
1126	case FIONBIO:
1127		return (0);
1128
1129	case FIOASYNC:
1130		PIPE_LOCK(mpipe);
1131		if (*(int *)data) {
1132			mpipe->pipe_state |= PIPE_ASYNC;
1133		} else {
1134			mpipe->pipe_state &= ~PIPE_ASYNC;
1135		}
1136		PIPE_UNLOCK(mpipe);
1137		return (0);
1138
1139	case FIONREAD:
1140		PIPE_LOCK(mpipe);
1141		if (mpipe->pipe_state & PIPE_DIRECTW)
1142			*(int *)data = mpipe->pipe_map.cnt;
1143		else
1144			*(int *)data = mpipe->pipe_buffer.cnt;
1145		PIPE_UNLOCK(mpipe);
1146		return (0);
1147
1148	case FIOSETOWN:
1149		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1150
1151	case FIOGETOWN:
1152		*(int *)data = fgetown(mpipe->pipe_sigio);
1153		return (0);
1154
1155	/* This is deprecated, FIOSETOWN should be used instead. */
1156	case TIOCSPGRP:
1157		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1158
1159	/* This is deprecated, FIOGETOWN should be used instead. */
1160	case TIOCGPGRP:
1161		*(int *)data = -fgetown(mpipe->pipe_sigio);
1162		return (0);
1163
1164	}
1165	return (ENOTTY);
1166}
1167
1168int
1169pipe_poll(fp, events, cred, td)
1170	struct file *fp;
1171	int events;
1172	struct ucred *cred;
1173	struct thread *td;
1174{
1175	struct pipe *rpipe = (struct pipe *)fp->f_data;
1176	struct pipe *wpipe;
1177	int revents = 0;
1178
1179	wpipe = rpipe->pipe_peer;
1180	PIPE_LOCK(rpipe);
1181	if (events & (POLLIN | POLLRDNORM))
1182		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1183		    (rpipe->pipe_buffer.cnt > 0) ||
1184		    (rpipe->pipe_state & PIPE_EOF))
1185			revents |= events & (POLLIN | POLLRDNORM);
1186
1187	if (events & (POLLOUT | POLLWRNORM))
1188		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1189		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1190		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1191			revents |= events & (POLLOUT | POLLWRNORM);
1192
1193	if ((rpipe->pipe_state & PIPE_EOF) ||
1194	    (wpipe == NULL) ||
1195	    (wpipe->pipe_state & PIPE_EOF))
1196		revents |= POLLHUP;
1197
1198	if (revents == 0) {
1199		if (events & (POLLIN | POLLRDNORM)) {
1200			selrecord(td, &rpipe->pipe_sel);
1201			rpipe->pipe_state |= PIPE_SEL;
1202		}
1203
1204		if (events & (POLLOUT | POLLWRNORM)) {
1205			selrecord(td, &wpipe->pipe_sel);
1206			wpipe->pipe_state |= PIPE_SEL;
1207		}
1208	}
1209	PIPE_UNLOCK(rpipe);
1210
1211	return (revents);
1212}
1213
1214/*
1215 * We shouldn't need locks here as we're doing a read and this should
1216 * be a natural race.
1217 */
1218static int
1219pipe_stat(fp, ub, td)
1220	struct file *fp;
1221	struct stat *ub;
1222	struct thread *td;
1223{
1224	struct pipe *pipe = (struct pipe *)fp->f_data;
1225
1226	bzero((caddr_t)ub, sizeof(*ub));
1227	ub->st_mode = S_IFIFO;
1228	ub->st_blksize = pipe->pipe_buffer.size;
1229	ub->st_size = pipe->pipe_buffer.cnt;
1230	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1231	ub->st_atimespec = pipe->pipe_atime;
1232	ub->st_mtimespec = pipe->pipe_mtime;
1233	ub->st_ctimespec = pipe->pipe_ctime;
1234	ub->st_uid = fp->f_cred->cr_uid;
1235	ub->st_gid = fp->f_cred->cr_gid;
1236	/*
1237	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1238	 * XXX (st_dev, st_ino) should be unique.
1239	 */
1240	return (0);
1241}
1242
1243/* ARGSUSED */
1244static int
1245pipe_close(fp, td)
1246	struct file *fp;
1247	struct thread *td;
1248{
1249	struct pipe *cpipe = (struct pipe *)fp->f_data;
1250
1251	fp->f_ops = &badfileops;
1252	fp->f_data = NULL;
1253	funsetown(&cpipe->pipe_sigio);
1254	pipeclose(cpipe);
1255	return (0);
1256}
1257
1258static void
1259pipe_free_kmem(cpipe)
1260	struct pipe *cpipe;
1261{
1262
1263	GIANT_REQUIRED;
1264	KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
1265	       ("pipespace: pipe mutex locked"));
1266
1267	if (cpipe->pipe_buffer.buffer != NULL) {
1268		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1269			--nbigpipe;
1270		amountpipekva -= cpipe->pipe_buffer.size;
1271		kmem_free(kernel_map,
1272			(vm_offset_t)cpipe->pipe_buffer.buffer,
1273			cpipe->pipe_buffer.size);
1274		cpipe->pipe_buffer.buffer = NULL;
1275	}
1276#ifndef PIPE_NODIRECT
1277	if (cpipe->pipe_map.kva != NULL) {
1278		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1279		kmem_free(kernel_map,
1280			cpipe->pipe_map.kva,
1281			cpipe->pipe_buffer.size + PAGE_SIZE);
1282		cpipe->pipe_map.cnt = 0;
1283		cpipe->pipe_map.kva = 0;
1284		cpipe->pipe_map.pos = 0;
1285		cpipe->pipe_map.npages = 0;
1286	}
1287#endif
1288}
1289
1290/*
1291 * shutdown the pipe
1292 */
1293static void
1294pipeclose(cpipe)
1295	struct pipe *cpipe;
1296{
1297	struct pipe *ppipe;
1298	int hadpeer;
1299
1300	if (cpipe == NULL)
1301		return;
1302
1303	hadpeer = 0;
1304
1305	/* partially created pipes won't have a valid mutex. */
1306	if (PIPE_MTX(cpipe) != NULL)
1307		PIPE_LOCK(cpipe);
1308
1309	pipeselwakeup(cpipe);
1310
1311	/*
1312	 * If the other side is blocked, wake it up saying that
1313	 * we want to close it down.
1314	 */
1315	while (cpipe->pipe_busy) {
1316		wakeup(cpipe);
1317		cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1318		msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
1319	}
1320
1321	/*
1322	 * Disconnect from peer
1323	 */
1324	if ((ppipe = cpipe->pipe_peer) != NULL) {
1325		hadpeer++;
1326		pipeselwakeup(ppipe);
1327
1328		ppipe->pipe_state |= PIPE_EOF;
1329		wakeup(ppipe);
1330		KNOTE(&ppipe->pipe_sel.si_note, 0);
1331		ppipe->pipe_peer = NULL;
1332	}
1333	/*
1334	 * free resources
1335	 */
1336	if (PIPE_MTX(cpipe) != NULL) {
1337		PIPE_UNLOCK(cpipe);
1338		if (!hadpeer) {
1339			mtx_destroy(PIPE_MTX(cpipe));
1340			free(PIPE_MTX(cpipe), M_TEMP);
1341		}
1342	}
1343	mtx_lock(&Giant);
1344	pipe_free_kmem(cpipe);
1345	uma_zfree(pipe_zone, cpipe);
1346	mtx_unlock(&Giant);
1347}
1348
1349/*ARGSUSED*/
1350static int
1351pipe_kqfilter(struct file *fp, struct knote *kn)
1352{
1353	struct pipe *cpipe;
1354
1355	cpipe = (struct pipe *)kn->kn_fp->f_data;
1356	switch (kn->kn_filter) {
1357	case EVFILT_READ:
1358		kn->kn_fop = &pipe_rfiltops;
1359		break;
1360	case EVFILT_WRITE:
1361		kn->kn_fop = &pipe_wfiltops;
1362		cpipe = cpipe->pipe_peer;
1363		break;
1364	default:
1365		return (1);
1366	}
1367	kn->kn_hook = (caddr_t)cpipe;
1368
1369	PIPE_LOCK(cpipe);
1370	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1371	PIPE_UNLOCK(cpipe);
1372	return (0);
1373}
1374
1375static void
1376filt_pipedetach(struct knote *kn)
1377{
1378	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1379
1380	PIPE_LOCK(cpipe);
1381	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1382	PIPE_UNLOCK(cpipe);
1383}
1384
1385/*ARGSUSED*/
1386static int
1387filt_piperead(struct knote *kn, long hint)
1388{
1389	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1390	struct pipe *wpipe = rpipe->pipe_peer;
1391
1392	PIPE_LOCK(rpipe);
1393	kn->kn_data = rpipe->pipe_buffer.cnt;
1394	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1395		kn->kn_data = rpipe->pipe_map.cnt;
1396
1397	if ((rpipe->pipe_state & PIPE_EOF) ||
1398	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1399		kn->kn_flags |= EV_EOF;
1400		PIPE_UNLOCK(rpipe);
1401		return (1);
1402	}
1403	PIPE_UNLOCK(rpipe);
1404	return (kn->kn_data > 0);
1405}
1406
1407/*ARGSUSED*/
1408static int
1409filt_pipewrite(struct knote *kn, long hint)
1410{
1411	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1412	struct pipe *wpipe = rpipe->pipe_peer;
1413
1414	PIPE_LOCK(rpipe);
1415	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1416		kn->kn_data = 0;
1417		kn->kn_flags |= EV_EOF;
1418		PIPE_UNLOCK(rpipe);
1419		return (1);
1420	}
1421	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1422	if (wpipe->pipe_state & PIPE_DIRECTW)
1423		kn->kn_data = 0;
1424
1425	PIPE_UNLOCK(rpipe);
1426	return (kn->kn_data >= PIPE_BUF);
1427}
1428