sys_pipe.c revision 92751
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 92751 2002-03-20 04:09:59Z jeff $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/kernel.h>
59#include <sys/lock.h>
60#include <sys/mutex.h>
61#include <sys/ttycom.h>
62#include <sys/stat.h>
63#include <sys/malloc.h>
64#include <sys/poll.h>
65#include <sys/selinfo.h>
66#include <sys/signalvar.h>
67#include <sys/sysproto.h>
68#include <sys/pipe.h>
69#include <sys/proc.h>
70#include <sys/vnode.h>
71#include <sys/uio.h>
72#include <sys/event.h>
73
74#include <vm/vm.h>
75#include <vm/vm_param.h>
76#include <vm/vm_object.h>
77#include <vm/vm_kern.h>
78#include <vm/vm_extern.h>
79#include <vm/pmap.h>
80#include <vm/vm_map.h>
81#include <vm/vm_page.h>
82#include <vm/uma.h>
83
84/*
85 * Use this define if you want to disable *fancy* VM things.  Expect an
86 * approx 30% decrease in transfer rate.  This could be useful for
87 * NetBSD or OpenBSD.
88 */
89/* #define PIPE_NODIRECT */
90
91/*
92 * interfaces to the outside world
93 */
94static int pipe_read(struct file *fp, struct uio *uio,
95		struct ucred *cred, int flags, struct thread *td);
96static int pipe_write(struct file *fp, struct uio *uio,
97		struct ucred *cred, int flags, struct thread *td);
98static int pipe_close(struct file *fp, struct thread *td);
99static int pipe_poll(struct file *fp, int events, struct ucred *cred,
100		struct thread *td);
101static int pipe_kqfilter(struct file *fp, struct knote *kn);
102static int pipe_stat(struct file *fp, struct stat *sb, struct thread *td);
103static int pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td);
104
105static struct fileops pipeops = {
106	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
107	pipe_stat, pipe_close
108};
109
110static void	filt_pipedetach(struct knote *kn);
111static int	filt_piperead(struct knote *kn, long hint);
112static int	filt_pipewrite(struct knote *kn, long hint);
113
114static struct filterops pipe_rfiltops =
115	{ 1, NULL, filt_pipedetach, filt_piperead };
116static struct filterops pipe_wfiltops =
117	{ 1, NULL, filt_pipedetach, filt_pipewrite };
118
119#define PIPE_GET_GIANT(pipe)						\
120	do {								\
121		KASSERT(((pipe)->pipe_state & PIPE_LOCKFL) != 0,	\
122		    ("%s:%d PIPE_GET_GIANT: line pipe not locked",	\
123		     __FILE__, __LINE__));				\
124		PIPE_UNLOCK(pipe);					\
125		mtx_lock(&Giant);					\
126	} while (0)
127
128#define PIPE_DROP_GIANT(pipe)						\
129	do {								\
130		mtx_unlock(&Giant);					\
131		PIPE_LOCK(pipe);					\
132	} while (0)
133
134/*
135 * Default pipe buffer size(s), this can be kind-of large now because pipe
136 * space is pageable.  The pipe code will try to maintain locality of
137 * reference for performance reasons, so small amounts of outstanding I/O
138 * will not wipe the cache.
139 */
140#define MINPIPESIZE (PIPE_SIZE/3)
141#define MAXPIPESIZE (2*PIPE_SIZE/3)
142
143/*
144 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
145 * is there so that on large systems, we don't exhaust it.
146 */
147#define MAXPIPEKVA (8*1024*1024)
148
149/*
150 * Limit for direct transfers, we cannot, of course limit
151 * the amount of kva for pipes in general though.
152 */
153#define LIMITPIPEKVA (16*1024*1024)
154
155/*
156 * Limit the number of "big" pipes
157 */
158#define LIMITBIGPIPES	32
159static int nbigpipe;
160
161static int amountpipekva;
162
163static void pipeinit(void *dummy __unused);
164static void pipeclose(struct pipe *cpipe);
165static void pipe_free_kmem(struct pipe *cpipe);
166static int pipe_create(struct pipe **cpipep);
167static __inline int pipelock(struct pipe *cpipe, int catch);
168static __inline void pipeunlock(struct pipe *cpipe);
169static __inline void pipeselwakeup(struct pipe *cpipe);
170#ifndef PIPE_NODIRECT
171static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio);
172static void pipe_destroy_write_buffer(struct pipe *wpipe);
173static int pipe_direct_write(struct pipe *wpipe, struct uio *uio);
174static void pipe_clone_write_buffer(struct pipe *wpipe);
175#endif
176static int pipespace(struct pipe *cpipe, int size);
177
178static uma_zone_t pipe_zone;
179
180SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
181
182static void
183pipeinit(void *dummy __unused)
184{
185	pipe_zone = uma_zcreate("PIPE", sizeof(struct pipe), NULL,
186	    NULL, NULL, NULL, UMA_ALIGN_PTR, 0);
187}
188
189/*
190 * The pipe system call for the DTYPE_PIPE type of pipes
191 */
192
193/* ARGSUSED */
194int
195pipe(td, uap)
196	struct thread *td;
197	struct pipe_args /* {
198		int	dummy;
199	} */ *uap;
200{
201	struct filedesc *fdp = td->td_proc->p_fd;
202	struct file *rf, *wf;
203	struct pipe *rpipe, *wpipe;
204	struct mtx *pmtx;
205	int fd, error;
206
207	KASSERT(pipe_zone != NULL, ("pipe_zone not initialized"));
208
209	pmtx = malloc(sizeof(*pmtx), M_TEMP, M_WAITOK | M_ZERO);
210
211	rpipe = wpipe = NULL;
212	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
213		pipeclose(rpipe);
214		pipeclose(wpipe);
215		free(pmtx, M_TEMP);
216		return (ENFILE);
217	}
218
219	rpipe->pipe_state |= PIPE_DIRECTOK;
220	wpipe->pipe_state |= PIPE_DIRECTOK;
221
222	error = falloc(td, &rf, &fd);
223	if (error) {
224		pipeclose(rpipe);
225		pipeclose(wpipe);
226		free(pmtx, M_TEMP);
227		return (error);
228	}
229	fhold(rf);
230	td->td_retval[0] = fd;
231
232	/*
233	 * Warning: once we've gotten past allocation of the fd for the
234	 * read-side, we can only drop the read side via fdrop() in order
235	 * to avoid races against processes which manage to dup() the read
236	 * side while we are blocked trying to allocate the write side.
237	 */
238	FILE_LOCK(rf);
239	rf->f_flag = FREAD | FWRITE;
240	rf->f_type = DTYPE_PIPE;
241	rf->f_data = (caddr_t)rpipe;
242	rf->f_ops = &pipeops;
243	FILE_UNLOCK(rf);
244	error = falloc(td, &wf, &fd);
245	if (error) {
246		FILEDESC_LOCK(fdp);
247		if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
248			fdp->fd_ofiles[td->td_retval[0]] = NULL;
249			FILEDESC_UNLOCK(fdp);
250			fdrop(rf, td);
251		} else
252			FILEDESC_UNLOCK(fdp);
253		fdrop(rf, td);
254		/* rpipe has been closed by fdrop(). */
255		pipeclose(wpipe);
256		free(pmtx, M_TEMP);
257		return (error);
258	}
259	FILE_LOCK(wf);
260	wf->f_flag = FREAD | FWRITE;
261	wf->f_type = DTYPE_PIPE;
262	wf->f_data = (caddr_t)wpipe;
263	wf->f_ops = &pipeops;
264	FILE_UNLOCK(wf);
265	td->td_retval[1] = fd;
266	rpipe->pipe_peer = wpipe;
267	wpipe->pipe_peer = rpipe;
268	mtx_init(pmtx, "pipe mutex", MTX_DEF);
269	rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx;
270	fdrop(rf, td);
271
272	return (0);
273}
274
275/*
276 * Allocate kva for pipe circular buffer, the space is pageable
277 * This routine will 'realloc' the size of a pipe safely, if it fails
278 * it will retain the old buffer.
279 * If it fails it will return ENOMEM.
280 */
281static int
282pipespace(cpipe, size)
283	struct pipe *cpipe;
284	int size;
285{
286	struct vm_object *object;
287	caddr_t buffer;
288	int npages, error;
289
290	GIANT_REQUIRED;
291	KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
292	       ("pipespace: pipe mutex locked"));
293
294	npages = round_page(size)/PAGE_SIZE;
295	/*
296	 * Create an object, I don't like the idea of paging to/from
297	 * kernel_object.
298	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
299	 */
300	object = vm_object_allocate(OBJT_DEFAULT, npages);
301	buffer = (caddr_t) vm_map_min(kernel_map);
302
303	/*
304	 * Insert the object into the kernel map, and allocate kva for it.
305	 * The map entry is, by default, pageable.
306	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
307	 */
308	error = vm_map_find(kernel_map, object, 0,
309		(vm_offset_t *) &buffer, size, 1,
310		VM_PROT_ALL, VM_PROT_ALL, 0);
311
312	if (error != KERN_SUCCESS) {
313		vm_object_deallocate(object);
314		return (ENOMEM);
315	}
316
317	/* free old resources if we're resizing */
318	pipe_free_kmem(cpipe);
319	cpipe->pipe_buffer.object = object;
320	cpipe->pipe_buffer.buffer = buffer;
321	cpipe->pipe_buffer.size = size;
322	cpipe->pipe_buffer.in = 0;
323	cpipe->pipe_buffer.out = 0;
324	cpipe->pipe_buffer.cnt = 0;
325	amountpipekva += cpipe->pipe_buffer.size;
326	return (0);
327}
328
329/*
330 * initialize and allocate VM and memory for pipe
331 */
332static int
333pipe_create(cpipep)
334	struct pipe **cpipep;
335{
336	struct pipe *cpipe;
337	int error;
338
339	*cpipep = uma_zalloc(pipe_zone, M_WAITOK);
340	if (*cpipep == NULL)
341		return (ENOMEM);
342
343	cpipe = *cpipep;
344
345	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
346	cpipe->pipe_buffer.object = NULL;
347#ifndef PIPE_NODIRECT
348	cpipe->pipe_map.kva = NULL;
349#endif
350	/*
351	 * protect so pipeclose() doesn't follow a junk pointer
352	 * if pipespace() fails.
353	 */
354	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
355	cpipe->pipe_state = 0;
356	cpipe->pipe_peer = NULL;
357	cpipe->pipe_busy = 0;
358
359#ifndef PIPE_NODIRECT
360	/*
361	 * pipe data structure initializations to support direct pipe I/O
362	 */
363	cpipe->pipe_map.cnt = 0;
364	cpipe->pipe_map.kva = 0;
365	cpipe->pipe_map.pos = 0;
366	cpipe->pipe_map.npages = 0;
367	/* cpipe->pipe_map.ms[] = invalid */
368#endif
369
370	cpipe->pipe_mtxp = NULL;	/* avoid pipespace assertion */
371	error = pipespace(cpipe, PIPE_SIZE);
372	if (error)
373		return (error);
374
375	vfs_timestamp(&cpipe->pipe_ctime);
376	cpipe->pipe_atime = cpipe->pipe_ctime;
377	cpipe->pipe_mtime = cpipe->pipe_ctime;
378
379	return (0);
380}
381
382
383/*
384 * lock a pipe for I/O, blocking other access
385 */
386static __inline int
387pipelock(cpipe, catch)
388	struct pipe *cpipe;
389	int catch;
390{
391	int error;
392
393	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
394	while (cpipe->pipe_state & PIPE_LOCKFL) {
395		cpipe->pipe_state |= PIPE_LWANT;
396		error = msleep(cpipe, PIPE_MTX(cpipe),
397		    catch ? (PRIBIO | PCATCH) : PRIBIO,
398		    "pipelk", 0);
399		if (error != 0)
400			return (error);
401	}
402	cpipe->pipe_state |= PIPE_LOCKFL;
403	return (0);
404}
405
406/*
407 * unlock a pipe I/O lock
408 */
409static __inline void
410pipeunlock(cpipe)
411	struct pipe *cpipe;
412{
413
414	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
415	cpipe->pipe_state &= ~PIPE_LOCKFL;
416	if (cpipe->pipe_state & PIPE_LWANT) {
417		cpipe->pipe_state &= ~PIPE_LWANT;
418		wakeup(cpipe);
419	}
420}
421
422static __inline void
423pipeselwakeup(cpipe)
424	struct pipe *cpipe;
425{
426
427	if (cpipe->pipe_state & PIPE_SEL) {
428		cpipe->pipe_state &= ~PIPE_SEL;
429		selwakeup(&cpipe->pipe_sel);
430	}
431	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
432		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
433	KNOTE(&cpipe->pipe_sel.si_note, 0);
434}
435
436/* ARGSUSED */
437static int
438pipe_read(fp, uio, cred, flags, td)
439	struct file *fp;
440	struct uio *uio;
441	struct ucred *cred;
442	struct thread *td;
443	int flags;
444{
445	struct pipe *rpipe = (struct pipe *) fp->f_data;
446	int error;
447	int nread = 0;
448	u_int size;
449
450	PIPE_LOCK(rpipe);
451	++rpipe->pipe_busy;
452	error = pipelock(rpipe, 1);
453	if (error)
454		goto unlocked_error;
455
456	while (uio->uio_resid) {
457		/*
458		 * normal pipe buffer receive
459		 */
460		if (rpipe->pipe_buffer.cnt > 0) {
461			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
462			if (size > rpipe->pipe_buffer.cnt)
463				size = rpipe->pipe_buffer.cnt;
464			if (size > (u_int) uio->uio_resid)
465				size = (u_int) uio->uio_resid;
466
467			PIPE_UNLOCK(rpipe);
468			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
469					size, uio);
470			PIPE_LOCK(rpipe);
471			if (error)
472				break;
473
474			rpipe->pipe_buffer.out += size;
475			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
476				rpipe->pipe_buffer.out = 0;
477
478			rpipe->pipe_buffer.cnt -= size;
479
480			/*
481			 * If there is no more to read in the pipe, reset
482			 * its pointers to the beginning.  This improves
483			 * cache hit stats.
484			 */
485			if (rpipe->pipe_buffer.cnt == 0) {
486				rpipe->pipe_buffer.in = 0;
487				rpipe->pipe_buffer.out = 0;
488			}
489			nread += size;
490#ifndef PIPE_NODIRECT
491		/*
492		 * Direct copy, bypassing a kernel buffer.
493		 */
494		} else if ((size = rpipe->pipe_map.cnt) &&
495			   (rpipe->pipe_state & PIPE_DIRECTW)) {
496			caddr_t	va;
497			if (size > (u_int) uio->uio_resid)
498				size = (u_int) uio->uio_resid;
499
500			va = (caddr_t) rpipe->pipe_map.kva +
501			    rpipe->pipe_map.pos;
502			PIPE_UNLOCK(rpipe);
503			error = uiomove(va, size, uio);
504			PIPE_LOCK(rpipe);
505			if (error)
506				break;
507			nread += size;
508			rpipe->pipe_map.pos += size;
509			rpipe->pipe_map.cnt -= size;
510			if (rpipe->pipe_map.cnt == 0) {
511				rpipe->pipe_state &= ~PIPE_DIRECTW;
512				wakeup(rpipe);
513			}
514#endif
515		} else {
516			/*
517			 * detect EOF condition
518			 * read returns 0 on EOF, no need to set error
519			 */
520			if (rpipe->pipe_state & PIPE_EOF)
521				break;
522
523			/*
524			 * If the "write-side" has been blocked, wake it up now.
525			 */
526			if (rpipe->pipe_state & PIPE_WANTW) {
527				rpipe->pipe_state &= ~PIPE_WANTW;
528				wakeup(rpipe);
529			}
530
531			/*
532			 * Break if some data was read.
533			 */
534			if (nread > 0)
535				break;
536
537			/*
538			 * Unlock the pipe buffer for our remaining processing.  We
539			 * will either break out with an error or we will sleep and
540			 * relock to loop.
541			 */
542			pipeunlock(rpipe);
543
544			/*
545			 * Handle non-blocking mode operation or
546			 * wait for more data.
547			 */
548			if (fp->f_flag & FNONBLOCK) {
549				error = EAGAIN;
550			} else {
551				rpipe->pipe_state |= PIPE_WANTR;
552				if ((error = msleep(rpipe, PIPE_MTX(rpipe),
553				    PRIBIO | PCATCH,
554				    "piperd", 0)) == 0)
555					error = pipelock(rpipe, 1);
556			}
557			if (error)
558				goto unlocked_error;
559		}
560	}
561	pipeunlock(rpipe);
562
563	/* XXX: should probably do this before getting any locks. */
564	if (error == 0)
565		vfs_timestamp(&rpipe->pipe_atime);
566unlocked_error:
567	--rpipe->pipe_busy;
568
569	/*
570	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
571	 */
572	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
573		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
574		wakeup(rpipe);
575	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
576		/*
577		 * Handle write blocking hysteresis.
578		 */
579		if (rpipe->pipe_state & PIPE_WANTW) {
580			rpipe->pipe_state &= ~PIPE_WANTW;
581			wakeup(rpipe);
582		}
583	}
584
585	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
586		pipeselwakeup(rpipe);
587
588	PIPE_UNLOCK(rpipe);
589	return (error);
590}
591
592#ifndef PIPE_NODIRECT
593/*
594 * Map the sending processes' buffer into kernel space and wire it.
595 * This is similar to a physical write operation.
596 */
597static int
598pipe_build_write_buffer(wpipe, uio)
599	struct pipe *wpipe;
600	struct uio *uio;
601{
602	u_int size;
603	int i;
604	vm_offset_t addr, endaddr, paddr;
605
606	GIANT_REQUIRED;
607	PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
608
609	size = (u_int) uio->uio_iov->iov_len;
610	if (size > wpipe->pipe_buffer.size)
611		size = wpipe->pipe_buffer.size;
612
613	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
614	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
615	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
616		vm_page_t m;
617
618		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
619		    (paddr = pmap_kextract(addr)) == 0) {
620			int j;
621
622			for (j = 0; j < i; j++)
623				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
624			return (EFAULT);
625		}
626
627		m = PHYS_TO_VM_PAGE(paddr);
628		vm_page_wire(m);
629		wpipe->pipe_map.ms[i] = m;
630	}
631
632/*
633 * set up the control block
634 */
635	wpipe->pipe_map.npages = i;
636	wpipe->pipe_map.pos =
637	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
638	wpipe->pipe_map.cnt = size;
639
640/*
641 * and map the buffer
642 */
643	if (wpipe->pipe_map.kva == 0) {
644		/*
645		 * We need to allocate space for an extra page because the
646		 * address range might (will) span pages at times.
647		 */
648		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
649			wpipe->pipe_buffer.size + PAGE_SIZE);
650		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
651	}
652	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
653		wpipe->pipe_map.npages);
654
655/*
656 * and update the uio data
657 */
658
659	uio->uio_iov->iov_len -= size;
660	uio->uio_iov->iov_base += size;
661	if (uio->uio_iov->iov_len == 0)
662		uio->uio_iov++;
663	uio->uio_resid -= size;
664	uio->uio_offset += size;
665	return (0);
666}
667
668/*
669 * unmap and unwire the process buffer
670 */
671static void
672pipe_destroy_write_buffer(wpipe)
673	struct pipe *wpipe;
674{
675	int i;
676
677	GIANT_REQUIRED;
678	PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
679
680	if (wpipe->pipe_map.kva) {
681		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
682
683		if (amountpipekva > MAXPIPEKVA) {
684			vm_offset_t kva = wpipe->pipe_map.kva;
685			wpipe->pipe_map.kva = 0;
686			kmem_free(kernel_map, kva,
687				wpipe->pipe_buffer.size + PAGE_SIZE);
688			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
689		}
690	}
691	for (i = 0; i < wpipe->pipe_map.npages; i++)
692		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
693	wpipe->pipe_map.npages = 0;
694}
695
696/*
697 * In the case of a signal, the writing process might go away.  This
698 * code copies the data into the circular buffer so that the source
699 * pages can be freed without loss of data.
700 */
701static void
702pipe_clone_write_buffer(wpipe)
703	struct pipe *wpipe;
704{
705	int size;
706	int pos;
707
708	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
709	size = wpipe->pipe_map.cnt;
710	pos = wpipe->pipe_map.pos;
711	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
712	    (caddr_t) wpipe->pipe_buffer.buffer, size);
713
714	wpipe->pipe_buffer.in = size;
715	wpipe->pipe_buffer.out = 0;
716	wpipe->pipe_buffer.cnt = size;
717	wpipe->pipe_state &= ~PIPE_DIRECTW;
718
719	PIPE_GET_GIANT(wpipe);
720	pipe_destroy_write_buffer(wpipe);
721	PIPE_DROP_GIANT(wpipe);
722}
723
724/*
725 * This implements the pipe buffer write mechanism.  Note that only
726 * a direct write OR a normal pipe write can be pending at any given time.
727 * If there are any characters in the pipe buffer, the direct write will
728 * be deferred until the receiving process grabs all of the bytes from
729 * the pipe buffer.  Then the direct mapping write is set-up.
730 */
731static int
732pipe_direct_write(wpipe, uio)
733	struct pipe *wpipe;
734	struct uio *uio;
735{
736	int error;
737
738retry:
739	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
740	while (wpipe->pipe_state & PIPE_DIRECTW) {
741		if (wpipe->pipe_state & PIPE_WANTR) {
742			wpipe->pipe_state &= ~PIPE_WANTR;
743			wakeup(wpipe);
744		}
745		wpipe->pipe_state |= PIPE_WANTW;
746		error = msleep(wpipe, PIPE_MTX(wpipe),
747		    PRIBIO | PCATCH, "pipdww", 0);
748		if (error)
749			goto error1;
750		if (wpipe->pipe_state & PIPE_EOF) {
751			error = EPIPE;
752			goto error1;
753		}
754	}
755	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
756	if (wpipe->pipe_buffer.cnt > 0) {
757		if (wpipe->pipe_state & PIPE_WANTR) {
758			wpipe->pipe_state &= ~PIPE_WANTR;
759			wakeup(wpipe);
760		}
761
762		wpipe->pipe_state |= PIPE_WANTW;
763		error = msleep(wpipe, PIPE_MTX(wpipe),
764		    PRIBIO | PCATCH, "pipdwc", 0);
765		if (error)
766			goto error1;
767		if (wpipe->pipe_state & PIPE_EOF) {
768			error = EPIPE;
769			goto error1;
770		}
771		goto retry;
772	}
773
774	wpipe->pipe_state |= PIPE_DIRECTW;
775
776	pipelock(wpipe, 0);
777	PIPE_GET_GIANT(wpipe);
778	error = pipe_build_write_buffer(wpipe, uio);
779	PIPE_DROP_GIANT(wpipe);
780	pipeunlock(wpipe);
781	if (error) {
782		wpipe->pipe_state &= ~PIPE_DIRECTW;
783		goto error1;
784	}
785
786	error = 0;
787	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
788		if (wpipe->pipe_state & PIPE_EOF) {
789			pipelock(wpipe, 0);
790			PIPE_GET_GIANT(wpipe);
791			pipe_destroy_write_buffer(wpipe);
792			PIPE_DROP_GIANT(wpipe);
793			pipeunlock(wpipe);
794			pipeselwakeup(wpipe);
795			error = EPIPE;
796			goto error1;
797		}
798		if (wpipe->pipe_state & PIPE_WANTR) {
799			wpipe->pipe_state &= ~PIPE_WANTR;
800			wakeup(wpipe);
801		}
802		pipeselwakeup(wpipe);
803		error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
804		    "pipdwt", 0);
805	}
806
807	pipelock(wpipe,0);
808	if (wpipe->pipe_state & PIPE_DIRECTW) {
809		/*
810		 * this bit of trickery substitutes a kernel buffer for
811		 * the process that might be going away.
812		 */
813		pipe_clone_write_buffer(wpipe);
814	} else {
815		PIPE_GET_GIANT(wpipe);
816		pipe_destroy_write_buffer(wpipe);
817		PIPE_DROP_GIANT(wpipe);
818	}
819	pipeunlock(wpipe);
820	return (error);
821
822error1:
823	wakeup(wpipe);
824	return (error);
825}
826#endif
827
828static int
829pipe_write(fp, uio, cred, flags, td)
830	struct file *fp;
831	struct uio *uio;
832	struct ucred *cred;
833	struct thread *td;
834	int flags;
835{
836	int error = 0;
837	int orig_resid;
838	struct pipe *wpipe, *rpipe;
839
840	rpipe = (struct pipe *) fp->f_data;
841	wpipe = rpipe->pipe_peer;
842
843	PIPE_LOCK(rpipe);
844	/*
845	 * detect loss of pipe read side, issue SIGPIPE if lost.
846	 */
847	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
848		PIPE_UNLOCK(rpipe);
849		return (EPIPE);
850	}
851	++wpipe->pipe_busy;
852
853	/*
854	 * If it is advantageous to resize the pipe buffer, do
855	 * so.
856	 */
857	if ((uio->uio_resid > PIPE_SIZE) &&
858		(nbigpipe < LIMITBIGPIPES) &&
859		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
860		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
861		(wpipe->pipe_buffer.cnt == 0)) {
862
863		if ((error = pipelock(wpipe,1)) == 0) {
864			PIPE_GET_GIANT(wpipe);
865			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
866				nbigpipe++;
867			PIPE_DROP_GIANT(wpipe);
868			pipeunlock(wpipe);
869		}
870	}
871
872	/*
873	 * If an early error occured unbusy and return, waking up any pending
874	 * readers.
875	 */
876	if (error) {
877		--wpipe->pipe_busy;
878		if ((wpipe->pipe_busy == 0) &&
879		    (wpipe->pipe_state & PIPE_WANT)) {
880			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
881			wakeup(wpipe);
882		}
883		PIPE_UNLOCK(rpipe);
884		return(error);
885	}
886
887	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
888
889	orig_resid = uio->uio_resid;
890
891	while (uio->uio_resid) {
892		int space;
893
894#ifndef PIPE_NODIRECT
895		/*
896		 * If the transfer is large, we can gain performance if
897		 * we do process-to-process copies directly.
898		 * If the write is non-blocking, we don't use the
899		 * direct write mechanism.
900		 *
901		 * The direct write mechanism will detect the reader going
902		 * away on us.
903		 */
904		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
905		    (fp->f_flag & FNONBLOCK) == 0 &&
906			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
907			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
908			error = pipe_direct_write( wpipe, uio);
909			if (error)
910				break;
911			continue;
912		}
913#endif
914
915		/*
916		 * Pipe buffered writes cannot be coincidental with
917		 * direct writes.  We wait until the currently executing
918		 * direct write is completed before we start filling the
919		 * pipe buffer.  We break out if a signal occurs or the
920		 * reader goes away.
921		 */
922	retrywrite:
923		while (wpipe->pipe_state & PIPE_DIRECTW) {
924			if (wpipe->pipe_state & PIPE_WANTR) {
925				wpipe->pipe_state &= ~PIPE_WANTR;
926				wakeup(wpipe);
927			}
928			error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH,
929			    "pipbww", 0);
930			if (wpipe->pipe_state & PIPE_EOF)
931				break;
932			if (error)
933				break;
934		}
935		if (wpipe->pipe_state & PIPE_EOF) {
936			error = EPIPE;
937			break;
938		}
939
940		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
941
942		/* Writes of size <= PIPE_BUF must be atomic. */
943		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
944			space = 0;
945
946		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
947			if ((error = pipelock(wpipe,1)) == 0) {
948				int size;	/* Transfer size */
949				int segsize;	/* first segment to transfer */
950
951				/*
952				 * It is possible for a direct write to
953				 * slip in on us... handle it here...
954				 */
955				if (wpipe->pipe_state & PIPE_DIRECTW) {
956					pipeunlock(wpipe);
957					goto retrywrite;
958				}
959				/*
960				 * If a process blocked in uiomove, our
961				 * value for space might be bad.
962				 *
963				 * XXX will we be ok if the reader has gone
964				 * away here?
965				 */
966				if (space > wpipe->pipe_buffer.size -
967				    wpipe->pipe_buffer.cnt) {
968					pipeunlock(wpipe);
969					goto retrywrite;
970				}
971
972				/*
973				 * Transfer size is minimum of uio transfer
974				 * and free space in pipe buffer.
975				 */
976				if (space > uio->uio_resid)
977					size = uio->uio_resid;
978				else
979					size = space;
980				/*
981				 * First segment to transfer is minimum of
982				 * transfer size and contiguous space in
983				 * pipe buffer.  If first segment to transfer
984				 * is less than the transfer size, we've got
985				 * a wraparound in the buffer.
986				 */
987				segsize = wpipe->pipe_buffer.size -
988					wpipe->pipe_buffer.in;
989				if (segsize > size)
990					segsize = size;
991
992				/* Transfer first segment */
993
994				PIPE_UNLOCK(rpipe);
995				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
996						segsize, uio);
997				PIPE_LOCK(rpipe);
998
999				if (error == 0 && segsize < size) {
1000					/*
1001					 * Transfer remaining part now, to
1002					 * support atomic writes.  Wraparound
1003					 * happened.
1004					 */
1005					if (wpipe->pipe_buffer.in + segsize !=
1006					    wpipe->pipe_buffer.size)
1007						panic("Expected pipe buffer wraparound disappeared");
1008
1009					PIPE_UNLOCK(rpipe);
1010					error = uiomove(&wpipe->pipe_buffer.buffer[0],
1011							size - segsize, uio);
1012					PIPE_LOCK(rpipe);
1013				}
1014				if (error == 0) {
1015					wpipe->pipe_buffer.in += size;
1016					if (wpipe->pipe_buffer.in >=
1017					    wpipe->pipe_buffer.size) {
1018						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
1019							panic("Expected wraparound bad");
1020						wpipe->pipe_buffer.in = size - segsize;
1021					}
1022
1023					wpipe->pipe_buffer.cnt += size;
1024					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
1025						panic("Pipe buffer overflow");
1026
1027				}
1028				pipeunlock(wpipe);
1029			}
1030			if (error)
1031				break;
1032
1033		} else {
1034			/*
1035			 * If the "read-side" has been blocked, wake it up now.
1036			 */
1037			if (wpipe->pipe_state & PIPE_WANTR) {
1038				wpipe->pipe_state &= ~PIPE_WANTR;
1039				wakeup(wpipe);
1040			}
1041
1042			/*
1043			 * don't block on non-blocking I/O
1044			 */
1045			if (fp->f_flag & FNONBLOCK) {
1046				error = EAGAIN;
1047				break;
1048			}
1049
1050			/*
1051			 * We have no more space and have something to offer,
1052			 * wake up select/poll.
1053			 */
1054			pipeselwakeup(wpipe);
1055
1056			wpipe->pipe_state |= PIPE_WANTW;
1057			error = msleep(wpipe, PIPE_MTX(rpipe),
1058			    PRIBIO | PCATCH, "pipewr", 0);
1059			if (error != 0)
1060				break;
1061			/*
1062			 * If read side wants to go away, we just issue a signal
1063			 * to ourselves.
1064			 */
1065			if (wpipe->pipe_state & PIPE_EOF) {
1066				error = EPIPE;
1067				break;
1068			}
1069		}
1070	}
1071
1072	--wpipe->pipe_busy;
1073
1074	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
1075		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
1076		wakeup(wpipe);
1077	} else if (wpipe->pipe_buffer.cnt > 0) {
1078		/*
1079		 * If we have put any characters in the buffer, we wake up
1080		 * the reader.
1081		 */
1082		if (wpipe->pipe_state & PIPE_WANTR) {
1083			wpipe->pipe_state &= ~PIPE_WANTR;
1084			wakeup(wpipe);
1085		}
1086	}
1087
1088	/*
1089	 * Don't return EPIPE if I/O was successful
1090	 */
1091	if ((wpipe->pipe_buffer.cnt == 0) &&
1092	    (uio->uio_resid == 0) &&
1093	    (error == EPIPE)) {
1094		error = 0;
1095	}
1096
1097	if (error == 0)
1098		vfs_timestamp(&wpipe->pipe_mtime);
1099
1100	/*
1101	 * We have something to offer,
1102	 * wake up select/poll.
1103	 */
1104	if (wpipe->pipe_buffer.cnt)
1105		pipeselwakeup(wpipe);
1106
1107	PIPE_UNLOCK(rpipe);
1108	return (error);
1109}
1110
1111/*
1112 * we implement a very minimal set of ioctls for compatibility with sockets.
1113 */
1114int
1115pipe_ioctl(fp, cmd, data, td)
1116	struct file *fp;
1117	u_long cmd;
1118	caddr_t data;
1119	struct thread *td;
1120{
1121	struct pipe *mpipe = (struct pipe *)fp->f_data;
1122
1123	switch (cmd) {
1124
1125	case FIONBIO:
1126		return (0);
1127
1128	case FIOASYNC:
1129		PIPE_LOCK(mpipe);
1130		if (*(int *)data) {
1131			mpipe->pipe_state |= PIPE_ASYNC;
1132		} else {
1133			mpipe->pipe_state &= ~PIPE_ASYNC;
1134		}
1135		PIPE_UNLOCK(mpipe);
1136		return (0);
1137
1138	case FIONREAD:
1139		PIPE_LOCK(mpipe);
1140		if (mpipe->pipe_state & PIPE_DIRECTW)
1141			*(int *)data = mpipe->pipe_map.cnt;
1142		else
1143			*(int *)data = mpipe->pipe_buffer.cnt;
1144		PIPE_UNLOCK(mpipe);
1145		return (0);
1146
1147	case FIOSETOWN:
1148		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1149
1150	case FIOGETOWN:
1151		*(int *)data = fgetown(mpipe->pipe_sigio);
1152		return (0);
1153
1154	/* This is deprecated, FIOSETOWN should be used instead. */
1155	case TIOCSPGRP:
1156		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1157
1158	/* This is deprecated, FIOGETOWN should be used instead. */
1159	case TIOCGPGRP:
1160		*(int *)data = -fgetown(mpipe->pipe_sigio);
1161		return (0);
1162
1163	}
1164	return (ENOTTY);
1165}
1166
1167int
1168pipe_poll(fp, events, cred, td)
1169	struct file *fp;
1170	int events;
1171	struct ucred *cred;
1172	struct thread *td;
1173{
1174	struct pipe *rpipe = (struct pipe *)fp->f_data;
1175	struct pipe *wpipe;
1176	int revents = 0;
1177
1178	wpipe = rpipe->pipe_peer;
1179	PIPE_LOCK(rpipe);
1180	if (events & (POLLIN | POLLRDNORM))
1181		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1182		    (rpipe->pipe_buffer.cnt > 0) ||
1183		    (rpipe->pipe_state & PIPE_EOF))
1184			revents |= events & (POLLIN | POLLRDNORM);
1185
1186	if (events & (POLLOUT | POLLWRNORM))
1187		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1188		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1189		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1190			revents |= events & (POLLOUT | POLLWRNORM);
1191
1192	if ((rpipe->pipe_state & PIPE_EOF) ||
1193	    (wpipe == NULL) ||
1194	    (wpipe->pipe_state & PIPE_EOF))
1195		revents |= POLLHUP;
1196
1197	if (revents == 0) {
1198		if (events & (POLLIN | POLLRDNORM)) {
1199			selrecord(td, &rpipe->pipe_sel);
1200			rpipe->pipe_state |= PIPE_SEL;
1201		}
1202
1203		if (events & (POLLOUT | POLLWRNORM)) {
1204			selrecord(td, &wpipe->pipe_sel);
1205			wpipe->pipe_state |= PIPE_SEL;
1206		}
1207	}
1208	PIPE_UNLOCK(rpipe);
1209
1210	return (revents);
1211}
1212
1213static int
1214pipe_stat(fp, ub, td)
1215	struct file *fp;
1216	struct stat *ub;
1217	struct thread *td;
1218{
1219	struct pipe *pipe = (struct pipe *)fp->f_data;
1220
1221	bzero((caddr_t)ub, sizeof(*ub));
1222	ub->st_mode = S_IFIFO;
1223	ub->st_blksize = pipe->pipe_buffer.size;
1224	ub->st_size = pipe->pipe_buffer.cnt;
1225	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1226	ub->st_atimespec = pipe->pipe_atime;
1227	ub->st_mtimespec = pipe->pipe_mtime;
1228	ub->st_ctimespec = pipe->pipe_ctime;
1229	ub->st_uid = fp->f_cred->cr_uid;
1230	ub->st_gid = fp->f_cred->cr_gid;
1231	/*
1232	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1233	 * XXX (st_dev, st_ino) should be unique.
1234	 */
1235	return (0);
1236}
1237
1238/* ARGSUSED */
1239static int
1240pipe_close(fp, td)
1241	struct file *fp;
1242	struct thread *td;
1243{
1244	struct pipe *cpipe = (struct pipe *)fp->f_data;
1245
1246	fp->f_ops = &badfileops;
1247	fp->f_data = NULL;
1248	funsetown(cpipe->pipe_sigio);
1249	pipeclose(cpipe);
1250	return (0);
1251}
1252
1253static void
1254pipe_free_kmem(cpipe)
1255	struct pipe *cpipe;
1256{
1257
1258	GIANT_REQUIRED;
1259	KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
1260	       ("pipespace: pipe mutex locked"));
1261
1262	if (cpipe->pipe_buffer.buffer != NULL) {
1263		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1264			--nbigpipe;
1265		amountpipekva -= cpipe->pipe_buffer.size;
1266		kmem_free(kernel_map,
1267			(vm_offset_t)cpipe->pipe_buffer.buffer,
1268			cpipe->pipe_buffer.size);
1269		cpipe->pipe_buffer.buffer = NULL;
1270	}
1271#ifndef PIPE_NODIRECT
1272	if (cpipe->pipe_map.kva != NULL) {
1273		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1274		kmem_free(kernel_map,
1275			cpipe->pipe_map.kva,
1276			cpipe->pipe_buffer.size + PAGE_SIZE);
1277		cpipe->pipe_map.cnt = 0;
1278		cpipe->pipe_map.kva = 0;
1279		cpipe->pipe_map.pos = 0;
1280		cpipe->pipe_map.npages = 0;
1281	}
1282#endif
1283}
1284
1285/*
1286 * shutdown the pipe
1287 */
1288static void
1289pipeclose(cpipe)
1290	struct pipe *cpipe;
1291{
1292	struct pipe *ppipe;
1293	int hadpeer;
1294
1295	if (cpipe == NULL)
1296		return;
1297
1298	hadpeer = 0;
1299
1300	/* partially created pipes won't have a valid mutex. */
1301	if (PIPE_MTX(cpipe) != NULL)
1302		PIPE_LOCK(cpipe);
1303
1304	pipeselwakeup(cpipe);
1305
1306	/*
1307	 * If the other side is blocked, wake it up saying that
1308	 * we want to close it down.
1309	 */
1310	while (cpipe->pipe_busy) {
1311		wakeup(cpipe);
1312		cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1313		msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
1314	}
1315
1316	/*
1317	 * Disconnect from peer
1318	 */
1319	if ((ppipe = cpipe->pipe_peer) != NULL) {
1320		hadpeer++;
1321		pipeselwakeup(ppipe);
1322
1323		ppipe->pipe_state |= PIPE_EOF;
1324		wakeup(ppipe);
1325		KNOTE(&ppipe->pipe_sel.si_note, 0);
1326		ppipe->pipe_peer = NULL;
1327	}
1328	/*
1329	 * free resources
1330	 */
1331	if (PIPE_MTX(cpipe) != NULL) {
1332		PIPE_UNLOCK(cpipe);
1333		if (!hadpeer) {
1334			mtx_destroy(PIPE_MTX(cpipe));
1335			free(PIPE_MTX(cpipe), M_TEMP);
1336		}
1337	}
1338	mtx_lock(&Giant);
1339	pipe_free_kmem(cpipe);
1340	uma_zfree(pipe_zone, cpipe);
1341	mtx_unlock(&Giant);
1342}
1343
1344/*ARGSUSED*/
1345static int
1346pipe_kqfilter(struct file *fp, struct knote *kn)
1347{
1348	struct pipe *cpipe;
1349
1350	cpipe = (struct pipe *)kn->kn_fp->f_data;
1351	switch (kn->kn_filter) {
1352	case EVFILT_READ:
1353		kn->kn_fop = &pipe_rfiltops;
1354		break;
1355	case EVFILT_WRITE:
1356		kn->kn_fop = &pipe_wfiltops;
1357		cpipe = cpipe->pipe_peer;
1358		break;
1359	default:
1360		return (1);
1361	}
1362	kn->kn_hook = (caddr_t)cpipe;
1363
1364	PIPE_LOCK(cpipe);
1365	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1366	PIPE_UNLOCK(cpipe);
1367	return (0);
1368}
1369
1370static void
1371filt_pipedetach(struct knote *kn)
1372{
1373	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1374
1375	PIPE_LOCK(cpipe);
1376	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1377	PIPE_UNLOCK(cpipe);
1378}
1379
1380/*ARGSUSED*/
1381static int
1382filt_piperead(struct knote *kn, long hint)
1383{
1384	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1385	struct pipe *wpipe = rpipe->pipe_peer;
1386
1387	PIPE_LOCK(rpipe);
1388	kn->kn_data = rpipe->pipe_buffer.cnt;
1389	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1390		kn->kn_data = rpipe->pipe_map.cnt;
1391
1392	if ((rpipe->pipe_state & PIPE_EOF) ||
1393	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1394		kn->kn_flags |= EV_EOF;
1395		PIPE_UNLOCK(rpipe);
1396		return (1);
1397	}
1398	PIPE_UNLOCK(rpipe);
1399	return (kn->kn_data > 0);
1400}
1401
1402/*ARGSUSED*/
1403static int
1404filt_pipewrite(struct knote *kn, long hint)
1405{
1406	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1407	struct pipe *wpipe = rpipe->pipe_peer;
1408
1409	PIPE_LOCK(rpipe);
1410	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1411		kn->kn_data = 0;
1412		kn->kn_flags |= EV_EOF;
1413		PIPE_UNLOCK(rpipe);
1414		return (1);
1415	}
1416	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1417	if (wpipe->pipe_state & PIPE_DIRECTW)
1418		kn->kn_data = 0;
1419
1420	PIPE_UNLOCK(rpipe);
1421	return (kn->kn_data >= PIPE_BUF);
1422}
1423