sys_pipe.c revision 91968
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice immediately at the beginning of the file, without modification,
10 *    this list of conditions, and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Absolutely no warranty of function or purpose is made by the author
15 *    John S. Dyson.
16 * 4. Modifications may be freely made to this file if the above conditions
17 *    are met.
18 *
19 * $FreeBSD: head/sys/kern/sys_pipe.c 91968 2002-03-09 22:06:31Z alfred $
20 */
21
22/*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29/*
30 * This code has two modes of operation, a small write mode and a large
31 * write mode.  The small write mode acts like conventional pipes with
32 * a kernel buffer.  If the buffer is less than PIPE_MINDIRECT, then the
33 * "normal" pipe buffering is done.  If the buffer is between PIPE_MINDIRECT
34 * and PIPE_SIZE in size, it is fully mapped and wired into the kernel, and
35 * the receiving process can copy it directly from the pages in the sending
36 * process.
37 *
38 * If the sending process receives a signal, it is possible that it will
39 * go away, and certainly its address space can change, because control
40 * is returned back to the user-mode side.  In that case, the pipe code
41 * arranges to copy the buffer supplied by the user process, to a pageable
42 * kernel buffer, and the receiving process will grab the data from the
43 * pageable kernel buffer.  Since signals don't happen all that often,
44 * the copy operation is normally eliminated.
45 *
46 * The constant PIPE_MINDIRECT is chosen to make sure that buffering will
47 * happen for small transfers so that the system will not spend all of
48 * its time context switching.  PIPE_SIZE is constrained by the
49 * amount of kernel virtual memory.
50 */
51
52#include <sys/param.h>
53#include <sys/systm.h>
54#include <sys/fcntl.h>
55#include <sys/file.h>
56#include <sys/filedesc.h>
57#include <sys/filio.h>
58#include <sys/kernel.h>
59#include <sys/lock.h>
60#include <sys/mutex.h>
61#include <sys/ttycom.h>
62#include <sys/stat.h>
63#include <sys/malloc.h>
64#include <sys/poll.h>
65#include <sys/selinfo.h>
66#include <sys/signalvar.h>
67#include <sys/sysproto.h>
68#include <sys/pipe.h>
69#include <sys/proc.h>
70#include <sys/vnode.h>
71#include <sys/uio.h>
72#include <sys/event.h>
73
74#include <vm/vm.h>
75#include <vm/vm_param.h>
76#include <vm/vm_object.h>
77#include <vm/vm_kern.h>
78#include <vm/vm_extern.h>
79#include <vm/pmap.h>
80#include <vm/vm_map.h>
81#include <vm/vm_page.h>
82#include <vm/vm_zone.h>
83
84/*
85 * Use this define if you want to disable *fancy* VM things.  Expect an
86 * approx 30% decrease in transfer rate.  This could be useful for
87 * NetBSD or OpenBSD.
88 */
89/* #define PIPE_NODIRECT */
90
91/*
92 * interfaces to the outside world
93 */
94static int pipe_read(struct file *fp, struct uio *uio,
95		struct ucred *cred, int flags, struct thread *td);
96static int pipe_write(struct file *fp, struct uio *uio,
97		struct ucred *cred, int flags, struct thread *td);
98static int pipe_close(struct file *fp, struct thread *td);
99static int pipe_poll(struct file *fp, int events, struct ucred *cred,
100		struct thread *td);
101static int pipe_kqfilter(struct file *fp, struct knote *kn);
102static int pipe_stat(struct file *fp, struct stat *sb, struct thread *td);
103static int pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct thread *td);
104
105static struct fileops pipeops = {
106	pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
107	pipe_stat, pipe_close
108};
109
110static void	filt_pipedetach(struct knote *kn);
111static int	filt_piperead(struct knote *kn, long hint);
112static int	filt_pipewrite(struct knote *kn, long hint);
113
114static struct filterops pipe_rfiltops =
115	{ 1, NULL, filt_pipedetach, filt_piperead };
116static struct filterops pipe_wfiltops =
117	{ 1, NULL, filt_pipedetach, filt_pipewrite };
118
119#define PIPE_GET_GIANT(pipe)							\
120	do {								\
121		PIPE_UNLOCK(wpipe);					\
122		mtx_lock(&Giant);					\
123	} while (0)
124
125#define PIPE_DROP_GIANT(pipe)						\
126	do {								\
127		mtx_unlock(&Giant);					\
128		PIPE_LOCK(wpipe);					\
129	} while (0)
130
131/*
132 * Default pipe buffer size(s), this can be kind-of large now because pipe
133 * space is pageable.  The pipe code will try to maintain locality of
134 * reference for performance reasons, so small amounts of outstanding I/O
135 * will not wipe the cache.
136 */
137#define MINPIPESIZE (PIPE_SIZE/3)
138#define MAXPIPESIZE (2*PIPE_SIZE/3)
139
140/*
141 * Maximum amount of kva for pipes -- this is kind-of a soft limit, but
142 * is there so that on large systems, we don't exhaust it.
143 */
144#define MAXPIPEKVA (8*1024*1024)
145
146/*
147 * Limit for direct transfers, we cannot, of course limit
148 * the amount of kva for pipes in general though.
149 */
150#define LIMITPIPEKVA (16*1024*1024)
151
152/*
153 * Limit the number of "big" pipes
154 */
155#define LIMITBIGPIPES	32
156static int nbigpipe;
157
158static int amountpipekva;
159
160static void pipeinit(void *dummy __unused);
161static void pipeclose(struct pipe *cpipe);
162static void pipe_free_kmem(struct pipe *cpipe);
163static int pipe_create(struct pipe **cpipep);
164static __inline int pipelock(struct pipe *cpipe, int catch);
165static __inline void pipeunlock(struct pipe *cpipe);
166static __inline void pipeselwakeup(struct pipe *cpipe);
167#ifndef PIPE_NODIRECT
168static int pipe_build_write_buffer(struct pipe *wpipe, struct uio *uio);
169static void pipe_destroy_write_buffer(struct pipe *wpipe);
170static int pipe_direct_write(struct pipe *wpipe, struct uio *uio);
171static void pipe_clone_write_buffer(struct pipe *wpipe);
172#endif
173static int pipespace(struct pipe *cpipe, int size);
174
175static vm_zone_t pipe_zone;
176
177SYSINIT(vfs, SI_SUB_VFS, SI_ORDER_ANY, pipeinit, NULL);
178
179static void
180pipeinit(void *dummy __unused)
181{
182
183	pipe_zone = zinit("PIPE", sizeof(struct pipe), 0, 0, 4);
184}
185
186/*
187 * The pipe system call for the DTYPE_PIPE type of pipes
188 */
189
190/* ARGSUSED */
191int
192pipe(td, uap)
193	struct thread *td;
194	struct pipe_args /* {
195		int	dummy;
196	} */ *uap;
197{
198	struct filedesc *fdp = td->td_proc->p_fd;
199	struct file *rf, *wf;
200	struct pipe *rpipe, *wpipe;
201	struct mtx *pmtx;
202	int fd, error;
203
204	KASSERT(pipe_zone != NULL, ("pipe_zone not initialized"));
205
206	pmtx = malloc(sizeof(*pmtx), M_TEMP, M_WAITOK | M_ZERO);
207
208	rpipe = wpipe = NULL;
209	if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
210		pipeclose(rpipe);
211		pipeclose(wpipe);
212		free(pmtx, M_TEMP);
213		return (ENFILE);
214	}
215
216	rpipe->pipe_state |= PIPE_DIRECTOK;
217	wpipe->pipe_state |= PIPE_DIRECTOK;
218
219	error = falloc(td, &rf, &fd);
220	if (error) {
221		pipeclose(rpipe);
222		pipeclose(wpipe);
223		free(pmtx, M_TEMP);
224		return (error);
225	}
226	fhold(rf);
227	td->td_retval[0] = fd;
228
229	/*
230	 * Warning: once we've gotten past allocation of the fd for the
231	 * read-side, we can only drop the read side via fdrop() in order
232	 * to avoid races against processes which manage to dup() the read
233	 * side while we are blocked trying to allocate the write side.
234	 */
235	FILE_LOCK(rf);
236	rf->f_flag = FREAD | FWRITE;
237	rf->f_type = DTYPE_PIPE;
238	rf->f_data = (caddr_t)rpipe;
239	rf->f_ops = &pipeops;
240	FILE_UNLOCK(rf);
241	error = falloc(td, &wf, &fd);
242	if (error) {
243		FILEDESC_LOCK(fdp);
244		if (fdp->fd_ofiles[td->td_retval[0]] == rf) {
245			fdp->fd_ofiles[td->td_retval[0]] = NULL;
246			FILEDESC_UNLOCK(fdp);
247			fdrop(rf, td);
248		} else
249			FILEDESC_UNLOCK(fdp);
250		fdrop(rf, td);
251		/* rpipe has been closed by fdrop(). */
252		pipeclose(wpipe);
253		free(pmtx, M_TEMP);
254		return (error);
255	}
256	FILE_LOCK(wf);
257	wf->f_flag = FREAD | FWRITE;
258	wf->f_type = DTYPE_PIPE;
259	wf->f_data = (caddr_t)wpipe;
260	wf->f_ops = &pipeops;
261	FILE_UNLOCK(wf);
262	td->td_retval[1] = fd;
263	rpipe->pipe_peer = wpipe;
264	wpipe->pipe_peer = rpipe;
265	mtx_init(pmtx, "pipe mutex", MTX_DEF);
266	rpipe->pipe_mtxp = wpipe->pipe_mtxp = pmtx;
267	fdrop(rf, td);
268
269	return (0);
270}
271
272/*
273 * Allocate kva for pipe circular buffer, the space is pageable
274 * This routine will 'realloc' the size of a pipe safely, if it fails
275 * it will retain the old buffer.
276 * If it fails it will return ENOMEM.
277 */
278static int
279pipespace(cpipe, size)
280	struct pipe *cpipe;
281	int size;
282{
283	struct vm_object *object;
284	caddr_t buffer;
285	int npages, error;
286
287	GIANT_REQUIRED;
288	KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
289	       ("pipespace: pipe mutex locked"));
290
291	npages = round_page(size)/PAGE_SIZE;
292	/*
293	 * Create an object, I don't like the idea of paging to/from
294	 * kernel_object.
295	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
296	 */
297	object = vm_object_allocate(OBJT_DEFAULT, npages);
298	buffer = (caddr_t) vm_map_min(kernel_map);
299
300	/*
301	 * Insert the object into the kernel map, and allocate kva for it.
302	 * The map entry is, by default, pageable.
303	 * XXX -- minor change needed here for NetBSD/OpenBSD VM systems.
304	 */
305	error = vm_map_find(kernel_map, object, 0,
306		(vm_offset_t *) &buffer, size, 1,
307		VM_PROT_ALL, VM_PROT_ALL, 0);
308
309	if (error != KERN_SUCCESS) {
310		vm_object_deallocate(object);
311		return (ENOMEM);
312	}
313
314	/* free old resources if we're resizing */
315	pipe_free_kmem(cpipe);
316	cpipe->pipe_buffer.object = object;
317	cpipe->pipe_buffer.buffer = buffer;
318	cpipe->pipe_buffer.size = size;
319	cpipe->pipe_buffer.in = 0;
320	cpipe->pipe_buffer.out = 0;
321	cpipe->pipe_buffer.cnt = 0;
322	amountpipekva += cpipe->pipe_buffer.size;
323	return (0);
324}
325
326/*
327 * initialize and allocate VM and memory for pipe
328 */
329static int
330pipe_create(cpipep)
331	struct pipe **cpipep;
332{
333	struct pipe *cpipe;
334	int error;
335
336	*cpipep = zalloc(pipe_zone);
337	if (*cpipep == NULL)
338		return (ENOMEM);
339
340	cpipe = *cpipep;
341
342	/* so pipespace()->pipe_free_kmem() doesn't follow junk pointer */
343	cpipe->pipe_buffer.object = NULL;
344#ifndef PIPE_NODIRECT
345	cpipe->pipe_map.kva = NULL;
346#endif
347	/*
348	 * protect so pipeclose() doesn't follow a junk pointer
349	 * if pipespace() fails.
350	 */
351	bzero(&cpipe->pipe_sel, sizeof(cpipe->pipe_sel));
352	cpipe->pipe_state = 0;
353	cpipe->pipe_peer = NULL;
354	cpipe->pipe_busy = 0;
355
356#ifndef PIPE_NODIRECT
357	/*
358	 * pipe data structure initializations to support direct pipe I/O
359	 */
360	cpipe->pipe_map.cnt = 0;
361	cpipe->pipe_map.kva = 0;
362	cpipe->pipe_map.pos = 0;
363	cpipe->pipe_map.npages = 0;
364	/* cpipe->pipe_map.ms[] = invalid */
365#endif
366
367	cpipe->pipe_mtxp = NULL;	/* avoid pipespace assertion */
368	error = pipespace(cpipe, PIPE_SIZE);
369	if (error)
370		return (error);
371
372	vfs_timestamp(&cpipe->pipe_ctime);
373	cpipe->pipe_atime = cpipe->pipe_ctime;
374	cpipe->pipe_mtime = cpipe->pipe_ctime;
375
376	return (0);
377}
378
379
380/*
381 * lock a pipe for I/O, blocking other access
382 */
383static __inline int
384pipelock(cpipe, catch)
385	struct pipe *cpipe;
386	int catch;
387{
388	int error;
389
390	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
391	while (cpipe->pipe_state & PIPE_LOCKFL) {
392		cpipe->pipe_state |= PIPE_LWANT;
393		error = msleep(cpipe, PIPE_MTX(cpipe),
394		    catch ? (PRIBIO | PCATCH) : PRIBIO,
395		    "pipelk", 0);
396		if (error != 0)
397			return (error);
398	}
399	cpipe->pipe_state |= PIPE_LOCKFL;
400	return (0);
401}
402
403/*
404 * unlock a pipe I/O lock
405 */
406static __inline void
407pipeunlock(cpipe)
408	struct pipe *cpipe;
409{
410
411	PIPE_LOCK_ASSERT(cpipe, MA_OWNED);
412	cpipe->pipe_state &= ~PIPE_LOCKFL;
413	if (cpipe->pipe_state & PIPE_LWANT) {
414		cpipe->pipe_state &= ~PIPE_LWANT;
415		wakeup(cpipe);
416	}
417}
418
419static __inline void
420pipeselwakeup(cpipe)
421	struct pipe *cpipe;
422{
423
424	if (cpipe->pipe_state & PIPE_SEL) {
425		cpipe->pipe_state &= ~PIPE_SEL;
426		selwakeup(&cpipe->pipe_sel);
427	}
428	if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio)
429		pgsigio(cpipe->pipe_sigio, SIGIO, 0);
430	KNOTE(&cpipe->pipe_sel.si_note, 0);
431}
432
433/* ARGSUSED */
434static int
435pipe_read(fp, uio, cred, flags, td)
436	struct file *fp;
437	struct uio *uio;
438	struct ucred *cred;
439	struct thread *td;
440	int flags;
441{
442	struct pipe *rpipe = (struct pipe *) fp->f_data;
443	int error;
444	int nread = 0;
445	u_int size;
446
447	PIPE_LOCK(rpipe);
448	++rpipe->pipe_busy;
449	error = pipelock(rpipe, 1);
450	if (error)
451		goto unlocked_error;
452
453	while (uio->uio_resid) {
454		/*
455		 * normal pipe buffer receive
456		 */
457		if (rpipe->pipe_buffer.cnt > 0) {
458			size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
459			if (size > rpipe->pipe_buffer.cnt)
460				size = rpipe->pipe_buffer.cnt;
461			if (size > (u_int) uio->uio_resid)
462				size = (u_int) uio->uio_resid;
463
464			PIPE_UNLOCK(rpipe);
465			error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
466					size, uio);
467			PIPE_LOCK(rpipe);
468			if (error)
469				break;
470
471			rpipe->pipe_buffer.out += size;
472			if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
473				rpipe->pipe_buffer.out = 0;
474
475			rpipe->pipe_buffer.cnt -= size;
476
477			/*
478			 * If there is no more to read in the pipe, reset
479			 * its pointers to the beginning.  This improves
480			 * cache hit stats.
481			 */
482			if (rpipe->pipe_buffer.cnt == 0) {
483				rpipe->pipe_buffer.in = 0;
484				rpipe->pipe_buffer.out = 0;
485			}
486			nread += size;
487#ifndef PIPE_NODIRECT
488		/*
489		 * Direct copy, bypassing a kernel buffer.
490		 */
491		} else if ((size = rpipe->pipe_map.cnt) &&
492			   (rpipe->pipe_state & PIPE_DIRECTW)) {
493			caddr_t	va;
494			if (size > (u_int) uio->uio_resid)
495				size = (u_int) uio->uio_resid;
496
497			va = (caddr_t) rpipe->pipe_map.kva +
498			    rpipe->pipe_map.pos;
499			PIPE_UNLOCK(rpipe);
500			error = uiomove(va, size, uio);
501			PIPE_LOCK(rpipe);
502			if (error)
503				break;
504			nread += size;
505			rpipe->pipe_map.pos += size;
506			rpipe->pipe_map.cnt -= size;
507			if (rpipe->pipe_map.cnt == 0) {
508				rpipe->pipe_state &= ~PIPE_DIRECTW;
509				wakeup(rpipe);
510			}
511#endif
512		} else {
513			/*
514			 * detect EOF condition
515			 * read returns 0 on EOF, no need to set error
516			 */
517			if (rpipe->pipe_state & PIPE_EOF)
518				break;
519
520			/*
521			 * If the "write-side" has been blocked, wake it up now.
522			 */
523			if (rpipe->pipe_state & PIPE_WANTW) {
524				rpipe->pipe_state &= ~PIPE_WANTW;
525				wakeup(rpipe);
526			}
527
528			/*
529			 * Break if some data was read.
530			 */
531			if (nread > 0)
532				break;
533
534			/*
535			 * Unlock the pipe buffer for our remaining processing.  We
536			 * will either break out with an error or we will sleep and
537			 * relock to loop.
538			 */
539			pipeunlock(rpipe);
540
541			/*
542			 * Handle non-blocking mode operation or
543			 * wait for more data.
544			 */
545			if (fp->f_flag & FNONBLOCK) {
546				error = EAGAIN;
547			} else {
548				rpipe->pipe_state |= PIPE_WANTR;
549				if ((error = msleep(rpipe, PIPE_MTX(rpipe),
550				    PRIBIO | PCATCH,
551				    "piperd", 0)) == 0)
552					error = pipelock(rpipe, 1);
553			}
554			if (error)
555				goto unlocked_error;
556		}
557	}
558	pipeunlock(rpipe);
559
560	/* XXX: should probably do this before getting any locks. */
561	if (error == 0)
562		vfs_timestamp(&rpipe->pipe_atime);
563unlocked_error:
564	--rpipe->pipe_busy;
565
566	/*
567	 * PIPE_WANT processing only makes sense if pipe_busy is 0.
568	 */
569	if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
570		rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
571		wakeup(rpipe);
572	} else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
573		/*
574		 * Handle write blocking hysteresis.
575		 */
576		if (rpipe->pipe_state & PIPE_WANTW) {
577			rpipe->pipe_state &= ~PIPE_WANTW;
578			wakeup(rpipe);
579		}
580	}
581
582	if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
583		pipeselwakeup(rpipe);
584
585	PIPE_UNLOCK(rpipe);
586	return (error);
587}
588
589#ifndef PIPE_NODIRECT
590/*
591 * Map the sending processes' buffer into kernel space and wire it.
592 * This is similar to a physical write operation.
593 */
594static int
595pipe_build_write_buffer(wpipe, uio)
596	struct pipe *wpipe;
597	struct uio *uio;
598{
599	u_int size;
600	int i;
601	vm_offset_t addr, endaddr, paddr;
602
603	GIANT_REQUIRED;
604	PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
605
606	size = (u_int) uio->uio_iov->iov_len;
607	if (size > wpipe->pipe_buffer.size)
608		size = wpipe->pipe_buffer.size;
609
610	endaddr = round_page((vm_offset_t)uio->uio_iov->iov_base + size);
611	addr = trunc_page((vm_offset_t)uio->uio_iov->iov_base);
612	for (i = 0; addr < endaddr; addr += PAGE_SIZE, i++) {
613		vm_page_t m;
614
615		if (vm_fault_quick((caddr_t)addr, VM_PROT_READ) < 0 ||
616		    (paddr = pmap_kextract(addr)) == 0) {
617			int j;
618
619			for (j = 0; j < i; j++)
620				vm_page_unwire(wpipe->pipe_map.ms[j], 1);
621			return (EFAULT);
622		}
623
624		m = PHYS_TO_VM_PAGE(paddr);
625		vm_page_wire(m);
626		wpipe->pipe_map.ms[i] = m;
627	}
628
629/*
630 * set up the control block
631 */
632	wpipe->pipe_map.npages = i;
633	wpipe->pipe_map.pos =
634	    ((vm_offset_t) uio->uio_iov->iov_base) & PAGE_MASK;
635	wpipe->pipe_map.cnt = size;
636
637/*
638 * and map the buffer
639 */
640	if (wpipe->pipe_map.kva == 0) {
641		/*
642		 * We need to allocate space for an extra page because the
643		 * address range might (will) span pages at times.
644		 */
645		wpipe->pipe_map.kva = kmem_alloc_pageable(kernel_map,
646			wpipe->pipe_buffer.size + PAGE_SIZE);
647		amountpipekva += wpipe->pipe_buffer.size + PAGE_SIZE;
648	}
649	pmap_qenter(wpipe->pipe_map.kva, wpipe->pipe_map.ms,
650		wpipe->pipe_map.npages);
651
652/*
653 * and update the uio data
654 */
655
656	uio->uio_iov->iov_len -= size;
657	uio->uio_iov->iov_base += size;
658	if (uio->uio_iov->iov_len == 0)
659		uio->uio_iov++;
660	uio->uio_resid -= size;
661	uio->uio_offset += size;
662	return (0);
663}
664
665/*
666 * unmap and unwire the process buffer
667 */
668static void
669pipe_destroy_write_buffer(wpipe)
670	struct pipe *wpipe;
671{
672	int i;
673
674	GIANT_REQUIRED;
675	PIPE_LOCK_ASSERT(wpipe, MA_NOTOWNED);
676
677	if (wpipe->pipe_map.kva) {
678		pmap_qremove(wpipe->pipe_map.kva, wpipe->pipe_map.npages);
679
680		if (amountpipekva > MAXPIPEKVA) {
681			vm_offset_t kva = wpipe->pipe_map.kva;
682			wpipe->pipe_map.kva = 0;
683			kmem_free(kernel_map, kva,
684				wpipe->pipe_buffer.size + PAGE_SIZE);
685			amountpipekva -= wpipe->pipe_buffer.size + PAGE_SIZE;
686		}
687	}
688	for (i = 0; i < wpipe->pipe_map.npages; i++)
689		vm_page_unwire(wpipe->pipe_map.ms[i], 1);
690	wpipe->pipe_map.npages = 0;
691}
692
693/*
694 * In the case of a signal, the writing process might go away.  This
695 * code copies the data into the circular buffer so that the source
696 * pages can be freed without loss of data.
697 */
698static void
699pipe_clone_write_buffer(wpipe)
700	struct pipe *wpipe;
701{
702	int size;
703	int pos;
704
705	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
706	size = wpipe->pipe_map.cnt;
707	pos = wpipe->pipe_map.pos;
708	bcopy((caddr_t) wpipe->pipe_map.kva + pos,
709	    (caddr_t) wpipe->pipe_buffer.buffer, size);
710
711	wpipe->pipe_buffer.in = size;
712	wpipe->pipe_buffer.out = 0;
713	wpipe->pipe_buffer.cnt = size;
714	wpipe->pipe_state &= ~PIPE_DIRECTW;
715
716	PIPE_GET_GIANT(wpipe);
717	pipe_destroy_write_buffer(wpipe);
718	PIPE_DROP_GIANT(wpipe);
719}
720
721/*
722 * This implements the pipe buffer write mechanism.  Note that only
723 * a direct write OR a normal pipe write can be pending at any given time.
724 * If there are any characters in the pipe buffer, the direct write will
725 * be deferred until the receiving process grabs all of the bytes from
726 * the pipe buffer.  Then the direct mapping write is set-up.
727 */
728static int
729pipe_direct_write(wpipe, uio)
730	struct pipe *wpipe;
731	struct uio *uio;
732{
733	int error;
734
735retry:
736	PIPE_LOCK_ASSERT(wpipe, MA_OWNED);
737	while (wpipe->pipe_state & PIPE_DIRECTW) {
738		if (wpipe->pipe_state & PIPE_WANTR) {
739			wpipe->pipe_state &= ~PIPE_WANTR;
740			wakeup(wpipe);
741		}
742		wpipe->pipe_state |= PIPE_WANTW;
743		error = msleep(wpipe, PIPE_MTX(wpipe),
744		    PRIBIO | PCATCH, "pipdww", 0);
745		if (error)
746			goto error1;
747		if (wpipe->pipe_state & PIPE_EOF) {
748			error = EPIPE;
749			goto error1;
750		}
751	}
752	wpipe->pipe_map.cnt = 0;	/* transfer not ready yet */
753	if (wpipe->pipe_buffer.cnt > 0) {
754		if (wpipe->pipe_state & PIPE_WANTR) {
755			wpipe->pipe_state &= ~PIPE_WANTR;
756			wakeup(wpipe);
757		}
758
759		wpipe->pipe_state |= PIPE_WANTW;
760		error = msleep(wpipe, PIPE_MTX(wpipe),
761		    PRIBIO | PCATCH, "pipdwc", 0);
762		if (error)
763			goto error1;
764		if (wpipe->pipe_state & PIPE_EOF) {
765			error = EPIPE;
766			goto error1;
767		}
768		goto retry;
769	}
770
771	wpipe->pipe_state |= PIPE_DIRECTW;
772
773	PIPE_GET_GIANT(wpipe);
774	error = pipe_build_write_buffer(wpipe, uio);
775	PIPE_DROP_GIANT(wpipe);
776	if (error) {
777		wpipe->pipe_state &= ~PIPE_DIRECTW;
778		goto error1;
779	}
780
781	error = 0;
782	while (!error && (wpipe->pipe_state & PIPE_DIRECTW)) {
783		if (wpipe->pipe_state & PIPE_EOF) {
784			pipelock(wpipe, 0);
785			PIPE_GET_GIANT(wpipe);
786			pipe_destroy_write_buffer(wpipe);
787			PIPE_DROP_GIANT(wpipe);
788			pipeunlock(wpipe);
789			pipeselwakeup(wpipe);
790			error = EPIPE;
791			goto error1;
792		}
793		if (wpipe->pipe_state & PIPE_WANTR) {
794			wpipe->pipe_state &= ~PIPE_WANTR;
795			wakeup(wpipe);
796		}
797		pipeselwakeup(wpipe);
798		error = msleep(wpipe, PIPE_MTX(wpipe), PRIBIO | PCATCH,
799		    "pipdwt", 0);
800	}
801
802	pipelock(wpipe,0);
803	if (wpipe->pipe_state & PIPE_DIRECTW) {
804		/*
805		 * this bit of trickery substitutes a kernel buffer for
806		 * the process that might be going away.
807		 */
808		pipe_clone_write_buffer(wpipe);
809	} else {
810		PIPE_GET_GIANT(wpipe);
811		pipe_destroy_write_buffer(wpipe);
812		PIPE_DROP_GIANT(wpipe);
813	}
814	pipeunlock(wpipe);
815	return (error);
816
817error1:
818	wakeup(wpipe);
819	return (error);
820}
821#endif
822
823static int
824pipe_write(fp, uio, cred, flags, td)
825	struct file *fp;
826	struct uio *uio;
827	struct ucred *cred;
828	struct thread *td;
829	int flags;
830{
831	int error = 0;
832	int orig_resid;
833	struct pipe *wpipe, *rpipe;
834
835	rpipe = (struct pipe *) fp->f_data;
836	wpipe = rpipe->pipe_peer;
837
838	PIPE_LOCK(rpipe);
839	/*
840	 * detect loss of pipe read side, issue SIGPIPE if lost.
841	 */
842	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
843		PIPE_UNLOCK(rpipe);
844		return (EPIPE);
845	}
846	++wpipe->pipe_busy;
847
848	/*
849	 * If it is advantageous to resize the pipe buffer, do
850	 * so.
851	 */
852	if ((uio->uio_resid > PIPE_SIZE) &&
853		(nbigpipe < LIMITBIGPIPES) &&
854		(wpipe->pipe_state & PIPE_DIRECTW) == 0 &&
855		(wpipe->pipe_buffer.size <= PIPE_SIZE) &&
856		(wpipe->pipe_buffer.cnt == 0)) {
857
858		if ((error = pipelock(wpipe,1)) == 0) {
859			PIPE_GET_GIANT(rpipe);
860			if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
861				nbigpipe++;
862			PIPE_DROP_GIANT(rpipe);
863			pipeunlock(wpipe);
864		}
865	}
866
867	/*
868	 * If an early error occured unbusy and return, waking up any pending
869	 * readers.
870	 */
871	if (error) {
872		--wpipe->pipe_busy;
873		if ((wpipe->pipe_busy == 0) &&
874		    (wpipe->pipe_state & PIPE_WANT)) {
875			wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
876			wakeup(wpipe);
877		}
878		PIPE_UNLOCK(rpipe);
879		return(error);
880	}
881
882	KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone"));
883
884	orig_resid = uio->uio_resid;
885
886	while (uio->uio_resid) {
887		int space;
888
889#ifndef PIPE_NODIRECT
890		/*
891		 * If the transfer is large, we can gain performance if
892		 * we do process-to-process copies directly.
893		 * If the write is non-blocking, we don't use the
894		 * direct write mechanism.
895		 *
896		 * The direct write mechanism will detect the reader going
897		 * away on us.
898		 */
899		if ((uio->uio_iov->iov_len >= PIPE_MINDIRECT) &&
900		    (fp->f_flag & FNONBLOCK) == 0 &&
901			(wpipe->pipe_map.kva || (amountpipekva < LIMITPIPEKVA)) &&
902			(uio->uio_iov->iov_len >= PIPE_MINDIRECT)) {
903			error = pipe_direct_write( wpipe, uio);
904			if (error)
905				break;
906			continue;
907		}
908#endif
909
910		/*
911		 * Pipe buffered writes cannot be coincidental with
912		 * direct writes.  We wait until the currently executing
913		 * direct write is completed before we start filling the
914		 * pipe buffer.  We break out if a signal occurs or the
915		 * reader goes away.
916		 */
917	retrywrite:
918		while (wpipe->pipe_state & PIPE_DIRECTW) {
919			if (wpipe->pipe_state & PIPE_WANTR) {
920				wpipe->pipe_state &= ~PIPE_WANTR;
921				wakeup(wpipe);
922			}
923			error = msleep(wpipe, PIPE_MTX(rpipe), PRIBIO | PCATCH,
924			    "pipbww", 0);
925			if (wpipe->pipe_state & PIPE_EOF)
926				break;
927			if (error)
928				break;
929		}
930		if (wpipe->pipe_state & PIPE_EOF) {
931			error = EPIPE;
932			break;
933		}
934
935		space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
936
937		/* Writes of size <= PIPE_BUF must be atomic. */
938		if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
939			space = 0;
940
941		if (space > 0 && (wpipe->pipe_buffer.cnt < PIPE_SIZE)) {
942			if ((error = pipelock(wpipe,1)) == 0) {
943				int size;	/* Transfer size */
944				int segsize;	/* first segment to transfer */
945
946				/*
947				 * It is possible for a direct write to
948				 * slip in on us... handle it here...
949				 */
950				if (wpipe->pipe_state & PIPE_DIRECTW) {
951					pipeunlock(wpipe);
952					goto retrywrite;
953				}
954				/*
955				 * If a process blocked in uiomove, our
956				 * value for space might be bad.
957				 *
958				 * XXX will we be ok if the reader has gone
959				 * away here?
960				 */
961				if (space > wpipe->pipe_buffer.size -
962				    wpipe->pipe_buffer.cnt) {
963					pipeunlock(wpipe);
964					goto retrywrite;
965				}
966
967				/*
968				 * Transfer size is minimum of uio transfer
969				 * and free space in pipe buffer.
970				 */
971				if (space > uio->uio_resid)
972					size = uio->uio_resid;
973				else
974					size = space;
975				/*
976				 * First segment to transfer is minimum of
977				 * transfer size and contiguous space in
978				 * pipe buffer.  If first segment to transfer
979				 * is less than the transfer size, we've got
980				 * a wraparound in the buffer.
981				 */
982				segsize = wpipe->pipe_buffer.size -
983					wpipe->pipe_buffer.in;
984				if (segsize > size)
985					segsize = size;
986
987				/* Transfer first segment */
988
989				PIPE_UNLOCK(rpipe);
990				error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
991						segsize, uio);
992				PIPE_LOCK(rpipe);
993
994				if (error == 0 && segsize < size) {
995					/*
996					 * Transfer remaining part now, to
997					 * support atomic writes.  Wraparound
998					 * happened.
999					 */
1000					if (wpipe->pipe_buffer.in + segsize !=
1001					    wpipe->pipe_buffer.size)
1002						panic("Expected pipe buffer wraparound disappeared");
1003
1004					PIPE_UNLOCK(rpipe);
1005					error = uiomove(&wpipe->pipe_buffer.buffer[0],
1006							size - segsize, uio);
1007					PIPE_LOCK(rpipe);
1008				}
1009				if (error == 0) {
1010					wpipe->pipe_buffer.in += size;
1011					if (wpipe->pipe_buffer.in >=
1012					    wpipe->pipe_buffer.size) {
1013						if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
1014							panic("Expected wraparound bad");
1015						wpipe->pipe_buffer.in = size - segsize;
1016					}
1017
1018					wpipe->pipe_buffer.cnt += size;
1019					if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
1020						panic("Pipe buffer overflow");
1021
1022				}
1023				pipeunlock(wpipe);
1024			}
1025			if (error)
1026				break;
1027
1028		} else {
1029			/*
1030			 * If the "read-side" has been blocked, wake it up now.
1031			 */
1032			if (wpipe->pipe_state & PIPE_WANTR) {
1033				wpipe->pipe_state &= ~PIPE_WANTR;
1034				wakeup(wpipe);
1035			}
1036
1037			/*
1038			 * don't block on non-blocking I/O
1039			 */
1040			if (fp->f_flag & FNONBLOCK) {
1041				error = EAGAIN;
1042				break;
1043			}
1044
1045			/*
1046			 * We have no more space and have something to offer,
1047			 * wake up select/poll.
1048			 */
1049			pipeselwakeup(wpipe);
1050
1051			wpipe->pipe_state |= PIPE_WANTW;
1052			error = msleep(wpipe, PIPE_MTX(rpipe),
1053			    PRIBIO | PCATCH, "pipewr", 0);
1054			if (error != 0)
1055				break;
1056			/*
1057			 * If read side wants to go away, we just issue a signal
1058			 * to ourselves.
1059			 */
1060			if (wpipe->pipe_state & PIPE_EOF) {
1061				error = EPIPE;
1062				break;
1063			}
1064		}
1065	}
1066
1067	--wpipe->pipe_busy;
1068
1069	if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
1070		wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
1071		wakeup(wpipe);
1072	} else if (wpipe->pipe_buffer.cnt > 0) {
1073		/*
1074		 * If we have put any characters in the buffer, we wake up
1075		 * the reader.
1076		 */
1077		if (wpipe->pipe_state & PIPE_WANTR) {
1078			wpipe->pipe_state &= ~PIPE_WANTR;
1079			wakeup(wpipe);
1080		}
1081	}
1082
1083	/*
1084	 * Don't return EPIPE if I/O was successful
1085	 */
1086	if ((wpipe->pipe_buffer.cnt == 0) &&
1087	    (uio->uio_resid == 0) &&
1088	    (error == EPIPE)) {
1089		error = 0;
1090	}
1091
1092	if (error == 0)
1093		vfs_timestamp(&wpipe->pipe_mtime);
1094
1095	/*
1096	 * We have something to offer,
1097	 * wake up select/poll.
1098	 */
1099	if (wpipe->pipe_buffer.cnt)
1100		pipeselwakeup(wpipe);
1101
1102	PIPE_UNLOCK(rpipe);
1103	return (error);
1104}
1105
1106/*
1107 * we implement a very minimal set of ioctls for compatibility with sockets.
1108 */
1109int
1110pipe_ioctl(fp, cmd, data, td)
1111	struct file *fp;
1112	u_long cmd;
1113	caddr_t data;
1114	struct thread *td;
1115{
1116	struct pipe *mpipe = (struct pipe *)fp->f_data;
1117
1118	switch (cmd) {
1119
1120	case FIONBIO:
1121		return (0);
1122
1123	case FIOASYNC:
1124		PIPE_LOCK(mpipe);
1125		if (*(int *)data) {
1126			mpipe->pipe_state |= PIPE_ASYNC;
1127		} else {
1128			mpipe->pipe_state &= ~PIPE_ASYNC;
1129		}
1130		PIPE_UNLOCK(mpipe);
1131		return (0);
1132
1133	case FIONREAD:
1134		PIPE_LOCK(mpipe);
1135		if (mpipe->pipe_state & PIPE_DIRECTW)
1136			*(int *)data = mpipe->pipe_map.cnt;
1137		else
1138			*(int *)data = mpipe->pipe_buffer.cnt;
1139		PIPE_UNLOCK(mpipe);
1140		return (0);
1141
1142	case FIOSETOWN:
1143		return (fsetown(*(int *)data, &mpipe->pipe_sigio));
1144
1145	case FIOGETOWN:
1146		*(int *)data = fgetown(mpipe->pipe_sigio);
1147		return (0);
1148
1149	/* This is deprecated, FIOSETOWN should be used instead. */
1150	case TIOCSPGRP:
1151		return (fsetown(-(*(int *)data), &mpipe->pipe_sigio));
1152
1153	/* This is deprecated, FIOGETOWN should be used instead. */
1154	case TIOCGPGRP:
1155		*(int *)data = -fgetown(mpipe->pipe_sigio);
1156		return (0);
1157
1158	}
1159	return (ENOTTY);
1160}
1161
1162int
1163pipe_poll(fp, events, cred, td)
1164	struct file *fp;
1165	int events;
1166	struct ucred *cred;
1167	struct thread *td;
1168{
1169	struct pipe *rpipe = (struct pipe *)fp->f_data;
1170	struct pipe *wpipe;
1171	int revents = 0;
1172
1173	wpipe = rpipe->pipe_peer;
1174	PIPE_LOCK(rpipe);
1175	if (events & (POLLIN | POLLRDNORM))
1176		if ((rpipe->pipe_state & PIPE_DIRECTW) ||
1177		    (rpipe->pipe_buffer.cnt > 0) ||
1178		    (rpipe->pipe_state & PIPE_EOF))
1179			revents |= events & (POLLIN | POLLRDNORM);
1180
1181	if (events & (POLLOUT | POLLWRNORM))
1182		if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF) ||
1183		    (((wpipe->pipe_state & PIPE_DIRECTW) == 0) &&
1184		     (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF))
1185			revents |= events & (POLLOUT | POLLWRNORM);
1186
1187	if ((rpipe->pipe_state & PIPE_EOF) ||
1188	    (wpipe == NULL) ||
1189	    (wpipe->pipe_state & PIPE_EOF))
1190		revents |= POLLHUP;
1191
1192	if (revents == 0) {
1193		if (events & (POLLIN | POLLRDNORM)) {
1194			selrecord(td, &rpipe->pipe_sel);
1195			rpipe->pipe_state |= PIPE_SEL;
1196		}
1197
1198		if (events & (POLLOUT | POLLWRNORM)) {
1199			selrecord(td, &wpipe->pipe_sel);
1200			wpipe->pipe_state |= PIPE_SEL;
1201		}
1202	}
1203	PIPE_UNLOCK(rpipe);
1204
1205	return (revents);
1206}
1207
1208static int
1209pipe_stat(fp, ub, td)
1210	struct file *fp;
1211	struct stat *ub;
1212	struct thread *td;
1213{
1214	struct pipe *pipe = (struct pipe *)fp->f_data;
1215
1216	bzero((caddr_t)ub, sizeof(*ub));
1217	ub->st_mode = S_IFIFO;
1218	ub->st_blksize = pipe->pipe_buffer.size;
1219	ub->st_size = pipe->pipe_buffer.cnt;
1220	ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1221	ub->st_atimespec = pipe->pipe_atime;
1222	ub->st_mtimespec = pipe->pipe_mtime;
1223	ub->st_ctimespec = pipe->pipe_ctime;
1224	ub->st_uid = fp->f_cred->cr_uid;
1225	ub->st_gid = fp->f_cred->cr_gid;
1226	/*
1227	 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
1228	 * XXX (st_dev, st_ino) should be unique.
1229	 */
1230	return (0);
1231}
1232
1233/* ARGSUSED */
1234static int
1235pipe_close(fp, td)
1236	struct file *fp;
1237	struct thread *td;
1238{
1239	struct pipe *cpipe = (struct pipe *)fp->f_data;
1240
1241	fp->f_ops = &badfileops;
1242	fp->f_data = NULL;
1243	funsetown(cpipe->pipe_sigio);
1244	pipeclose(cpipe);
1245	return (0);
1246}
1247
1248static void
1249pipe_free_kmem(cpipe)
1250	struct pipe *cpipe;
1251{
1252
1253	GIANT_REQUIRED;
1254	KASSERT(cpipe->pipe_mtxp == NULL || !mtx_owned(PIPE_MTX(cpipe)),
1255	       ("pipespace: pipe mutex locked"));
1256
1257	if (cpipe->pipe_buffer.buffer != NULL) {
1258		if (cpipe->pipe_buffer.size > PIPE_SIZE)
1259			--nbigpipe;
1260		amountpipekva -= cpipe->pipe_buffer.size;
1261		kmem_free(kernel_map,
1262			(vm_offset_t)cpipe->pipe_buffer.buffer,
1263			cpipe->pipe_buffer.size);
1264		cpipe->pipe_buffer.buffer = NULL;
1265	}
1266#ifndef PIPE_NODIRECT
1267	if (cpipe->pipe_map.kva != NULL) {
1268		amountpipekva -= cpipe->pipe_buffer.size + PAGE_SIZE;
1269		kmem_free(kernel_map,
1270			cpipe->pipe_map.kva,
1271			cpipe->pipe_buffer.size + PAGE_SIZE);
1272		cpipe->pipe_map.cnt = 0;
1273		cpipe->pipe_map.kva = 0;
1274		cpipe->pipe_map.pos = 0;
1275		cpipe->pipe_map.npages = 0;
1276	}
1277#endif
1278}
1279
1280/*
1281 * shutdown the pipe
1282 */
1283static void
1284pipeclose(cpipe)
1285	struct pipe *cpipe;
1286{
1287	struct pipe *ppipe;
1288	int hadpeer;
1289
1290	if (cpipe == NULL)
1291		return;
1292
1293	hadpeer = 0;
1294
1295	/* partially created pipes won't have a valid mutex. */
1296	if (PIPE_MTX(cpipe) != NULL)
1297		PIPE_LOCK(cpipe);
1298
1299	pipeselwakeup(cpipe);
1300
1301	/*
1302	 * If the other side is blocked, wake it up saying that
1303	 * we want to close it down.
1304	 */
1305	while (cpipe->pipe_busy) {
1306		wakeup(cpipe);
1307		cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
1308		msleep(cpipe, PIPE_MTX(cpipe), PRIBIO, "pipecl", 0);
1309	}
1310
1311	/*
1312	 * Disconnect from peer
1313	 */
1314	if ((ppipe = cpipe->pipe_peer) != NULL) {
1315		hadpeer++;
1316		pipeselwakeup(ppipe);
1317
1318		ppipe->pipe_state |= PIPE_EOF;
1319		wakeup(ppipe);
1320		KNOTE(&ppipe->pipe_sel.si_note, 0);
1321		ppipe->pipe_peer = NULL;
1322	}
1323	/*
1324	 * free resources
1325	 */
1326	if (PIPE_MTX(cpipe) != NULL) {
1327		PIPE_UNLOCK(cpipe);
1328		if (!hadpeer) {
1329			mtx_destroy(PIPE_MTX(cpipe));
1330			free(PIPE_MTX(cpipe), M_TEMP);
1331		}
1332	}
1333	mtx_lock(&Giant);
1334	pipe_free_kmem(cpipe);
1335	zfree(pipe_zone, cpipe);
1336	mtx_unlock(&Giant);
1337}
1338
1339/*ARGSUSED*/
1340static int
1341pipe_kqfilter(struct file *fp, struct knote *kn)
1342{
1343	struct pipe *cpipe;
1344
1345	cpipe = (struct pipe *)kn->kn_fp->f_data;
1346	switch (kn->kn_filter) {
1347	case EVFILT_READ:
1348		kn->kn_fop = &pipe_rfiltops;
1349		break;
1350	case EVFILT_WRITE:
1351		kn->kn_fop = &pipe_wfiltops;
1352		cpipe = cpipe->pipe_peer;
1353		break;
1354	default:
1355		return (1);
1356	}
1357	kn->kn_hook = (caddr_t)cpipe;
1358
1359	PIPE_LOCK(cpipe);
1360	SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext);
1361	PIPE_UNLOCK(cpipe);
1362	return (0);
1363}
1364
1365static void
1366filt_pipedetach(struct knote *kn)
1367{
1368	struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1369
1370	PIPE_LOCK(cpipe);
1371	SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext);
1372	PIPE_UNLOCK(cpipe);
1373}
1374
1375/*ARGSUSED*/
1376static int
1377filt_piperead(struct knote *kn, long hint)
1378{
1379	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1380	struct pipe *wpipe = rpipe->pipe_peer;
1381
1382	PIPE_LOCK(rpipe);
1383	kn->kn_data = rpipe->pipe_buffer.cnt;
1384	if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
1385		kn->kn_data = rpipe->pipe_map.cnt;
1386
1387	if ((rpipe->pipe_state & PIPE_EOF) ||
1388	    (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1389		kn->kn_flags |= EV_EOF;
1390		PIPE_UNLOCK(rpipe);
1391		return (1);
1392	}
1393	PIPE_UNLOCK(rpipe);
1394	return (kn->kn_data > 0);
1395}
1396
1397/*ARGSUSED*/
1398static int
1399filt_pipewrite(struct knote *kn, long hint)
1400{
1401	struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1402	struct pipe *wpipe = rpipe->pipe_peer;
1403
1404	PIPE_LOCK(rpipe);
1405	if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
1406		kn->kn_data = 0;
1407		kn->kn_flags |= EV_EOF;
1408		PIPE_UNLOCK(rpipe);
1409		return (1);
1410	}
1411	kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
1412	if (wpipe->pipe_state & PIPE_DIRECTW)
1413		kn->kn_data = 0;
1414
1415	PIPE_UNLOCK(rpipe);
1416	return (kn->kn_data >= PIPE_BUF);
1417}
1418